1.介绍
在上一篇:Kotlin 协程 Flow的基础介绍一 (zinyan.com)之后,我们继续学习Flow的相关知识。
了解Flow的连续性,上下文切换,缓冲与合并。
2. Flow 知识
2.1 Flow 是连续的
Flow的每次单独收集,都是按照顺序执行的。除非进行特殊操作的操作符使用多个流。
该收集过程直接在协程中运行,该协程调用末端操作符。 默认情况下不启动新协程。
从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter值 $it")
it % 2 == 0
}
.map {
println("Map 值 $it")
"string $it"
}.collect {
println("Flow 收集结果 $it")
}
}
//输出
Filter值 1
Filter值 2
Map 值 2
Flow 收集结果 string 2
Filter值 3
Filter值 4
Map 值 4
Flow 收集结果 string 4
Filter值 5
我们结合示例进行了解。可能比单纯的文字介绍更清晰。
2.2 Flow 上下文
Flow
的结果收集中通常是在调用协程的上下文中发生。
例如有一个Flow
命名为zinyan
,然后我们使用默认线程执行。
fun zinyan(): Flow<Int> = flow {
println("${Thread.currentThread().name}开始一个Flow ")
for (i in 1..3) {
emit(i) //返回流的结果
}
}
fun main() = runBlocking<Unit> {
zinyan().collect { value -> println("${Thread.currentThread().name}收集到的结果为:$value") }
}
//输出
main开始一个Flow
main收集到的结果为:1
main收集到的结果为:2
main收集到的结果为:3
我们可以看到 在输出打印Thread
的name
后,都显示的main
。因为该main
为我们的默认主线程的上下文
我们在zinyan()
打印显示的main
线程,那是因为我们上面的例子是在主线程中调用的collect
。
同时注意,我们调用Flow的时候,它并不会阻塞我们的线程。
那么,我们如果碰见在其他线程中更新数据,然后再将数据结果传给Ui主线程进行UI界面的刷新。那么我们不能直接使用withContext
修改协程上下文。
因为flow {...}
构建的Flow
对象,必须遵循上下文保存统一。不允许重其他上下文对象中发射emit
示例如下:
fun simple(): Flow<Int> = flow {
// 在流构建器中更改消耗 CPU 代码的上下文的错误方式
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
emit(i) // 发射下一个值
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
在上面的示例中会出现
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
异常。因为我们在Flow中非法改变了上下文对象
那么我们如果要进行更改,该如何呢?可以使用flowOn
操作符。
该函数用于更改流发射的上下文。
示例:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
println("${Thread.currentThread().name} 线程:$i")
emit(i) // 发射下一个值
}
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文的正确方式
fun main() = runBlocking<Unit> {
simple().collect { value ->
println("${Thread.currentThread().name} 线程:$value")
}
}
//输出
DefaultDispatcher-worker-1 线程:1
main 线程:1
DefaultDispatcher-worker-1 线程:2
main 线程:2
DefaultDispatcher-worker-1 线程:3
main 线程:3
2.3 Flow 缓冲-buffer操作符
我们从collect
收集Flow
的结果所花费的时间来看,Flow的速度和它所产生的速度有关,如果一个Flow 每100毫秒发射一个结果值,而collect 也收集比较慢。那么整个下来的话Flow就会运行的特别慢
示例:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假如我们在这里执行了100毫秒的长时间任务
emit(i) // 发射结果值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 假设我们每次得到结果后,需要一个计算过程
println(value)
}
}
println("最终结果耗时: $time 毫秒")
}
//输出
1
2
3
最终结果耗时: 1272 毫秒
不同的电脑性能可能运行的快慢,打印的最终耗时会有波动,但是不会早于1200毫秒。
因为每个数值打印需要花费400毫秒,3x400 = 1200毫秒是Flow的花费时间。
如果是在实际业务过程中,上面的处理就并不理想了。速度太慢,效率太低。
针对这种情况,我们可以使用buffer
缓存操作符,来实现并发运行减少整个流程的耗时。
示例:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假如我们在这里执行了100毫秒的长时间任务
emit(i) // 发射结果值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().buffer().collect { value ->
delay(300) // 假设我们每次得到结果后,需要一个计算过程
println(value)
}
}
println("最终结果耗时: $time 毫秒")
}
//输出
1
2
3
最终结果耗时: 1093 毫秒
整个Flow的结果没有变化,但是产生数据的速度得到了提高。
2.4 Flow 合并-conflate 操作符
上面的buffer 只是是节省了产生数据的时间,而我们collect的时间还是没有太大变化。
我们可以通过合并,操作减少收集流结果时的长时间。但是conflate操作符会造成数据的丢失。因为如果处理速度太慢,而发射速度比较快的情况下。它可能会跳过中间的数据,直接处理最后发射的结果。用于提高处理速度。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假如我们在这里执行了100毫秒的长时间任务
emit(i) // 发射结果值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().buffer().conflate().collect { value ->
delay(300) // 假设我们每次得到结果后,需要一个计算过程
println(value)
}
}
println("最终结果耗时: $time 毫秒")
}
//输出
1
3
最终结果耗时: 785 毫秒
我们可以看到,输出结果没有2,因为速度太快它默认丢弃了。直接处理了最后一个结果值3。
2.5 Flow 处理最新值-collectLatest函数
当我们发射器和收集器都比较慢的时候,合并是一个可以加快处理速度的一种方式。因为它是通过丢弃某个中间发射值来实现速度的飞跃的。
除此之外,我们还有一种方式就是取消缓慢的收集器,并在每次发射新值的时候重启,用于提供速度。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假如我们在这里执行了100毫秒的长时间任务
emit(i) // 发射结果值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 取消并重新发射最后一个值
println("收集到结果 $value")
delay(300) // 假装我们花费 300 毫秒来处理它
println("本次收集结束 $value")
}
}
println("最终结果耗时: $time 毫秒")
}
//输出
收集到结果 1
收集到结果 2
收集到结果 3
本次收集结束 3
最终结果耗时: 678 毫秒
我们可以看到,它的速度要比上面的几种都快,并且没有中间抛弃数据的情况发生。
评论区