1. 介绍
前两篇Flow 基础学习:
Kotlin 协程 Flow的基础介绍一 (zinyan.com)
Kotlin 协程 Flow的基础介绍二 (zinyan.com)
本篇继续学习Flow的知识。了解多个Flow的组合,以及Flow嵌套情况下如何获取最新结果,进行Flow的展平操作等。
2. 组合多个Flow
在Kotlin中,组合多个Flow有很多种方法,这里主要介绍两种zip 和combine。
2.1 zip 操作符
Flow
拥有一个zip
操作符,可以用于组合两个流中的相关值。
使用应用于每对值的提供的transform
函数,将当前流(this
)中的值与other
流进行压缩。
一旦其中一个流完成,结果流就完成,并对其余流调用cancel
。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow() // asFlow 创建Flow的一种模式,可以参考https://zinyan.com/?p=219 有介绍。
val strs = flowOf("one", "two", "three") // flowOf 另外的一种Flow的创建模式。
nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串
.collect {
println(it)
} // 收集并打印
}
//输出
1 -> one
2 -> two
3 -> three
在上面的例子中,使用了两种不同的Flow
构造器。然后我们将strs
的数据通过zip
方法,合并到一起了。
上面的两个Flow
速度一样,我们如果故意将两个Flow
的速度改为不一样会是什么效果呢?
示例2:
fun main() = runBlocking<Unit> {
val flow = flowOf(1, 2, 3).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
flow.zip(flow2) { i, s -> i.toString() + s }.collect {
println(it) // 收集并打印
}
}
//输出
1a
2b
3c
我们可以看到。flow2 并没有执行完。当flow执行完毕后。整个过程就结束了。
2.2 combine操作符
当Flow
表示一个变量或则操作的最新值时,可能会需要执行计算。每当上游产生一个值的时候,都需要重新计算。这种相应的操作符统称为combine
。
在zip
的第二个示例中,我们发现数据不齐全,如果想数据全,可以将zip
换为combine
示例:
fun main() = runBlocking<Unit> {
val flow = flowOf(1, 2, 3).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
println(it) // 收集并打印
}
}
//输出
1a
2b
3b
3c
3d
这就是它的每次新值时重新计算的原理。当第一个流的结果还是b的时候,第二个流已经到3了。或第二个流已经结束了。第一个流还有数据输出。
也就是说,只要还有一个流没有结束,并产生新的数据,那么都会重新计算产生新的结果值并发射。
3. 展平Flow
Flow
表示异步接受的值序列,所以很容易遭遇到下面这种情况:每一个值都会触发对另外一个值序列(Flow
)的请求。
比如我们有一个每隔300毫秒返回两个字符串的Flow
函数。然后我们有另外一个Flow
去调用它
示例:
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: 第一个值")
delay(300) // 等待 300 毫秒
emit("$i: 第二个值")
}
fun main() = runBlocking<Unit> {
(1..4).asFlow().map { requestFlow(it) }
}
然后在上面的实例中,我们得到了一个包含Flow
的Flow
。我们需要针对这种Flow
进行展平为单个流方便下一步处理。
集合和序列都拥有
flatten
和flatMap
操作符来做展平操作。
流是具有异步特性的,所以需要不同的展平模式。为此Kotlin提供了一系列的流展平操作符。
这里简单介绍部分。
3.1 flatMapConcat 操作符
连接模式由 flatMapConcat
与 flattenConcat
操作符实现。它们是相应序列操作符最相近的类似物。它们在等待内部流完成之前开始收集下一个值。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: 第一个值")
delay(300) // 等待 300 毫秒
emit("$i: 第二个值")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..4).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
//输出
1: 第一个值 at 136 ms from start
1: 第二个值 at 450 ms from start
2: 第一个值 at 560 ms from start
2: 第二个值 at 871 ms from start
3: 第一个值 at 979 ms from start
3: 第二个值 at 1289 ms from start
4: 第一个值 at 1396 ms from start
4: 第二个值 at 1709 ms from start
我们可以清楚的看到flatMapConcat
的顺序性质。
3.2 flatMapMerge 操作符
相较于上面的flatMapConcat
平展符,flatMapMerge
平展操作符是将所有Flow
的值并发收集并合并到一个单独Flow
中,可以更快的处理Flow
的结果。
示例:
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: 第一个值")
delay(300) // 等待 300 毫秒
emit("$i: 第二个值")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
//输出
1: 第一个值 at 173 ms from start
2: 第一个值 at 269 ms from start
3: 第一个值 at 380 ms from start
1: 第二个值 at 486 ms from start
2: 第二个值 at 582 ms from start
3: 第二个值 at 692 ms from start
可以看到相较于第一种,它的速度要快不少。
通过打印结果可以很明显的看到并发的过程。
flatMapMerge 会按照顺序调用代码块,但是并发收集结果值。
3.3 flatMapLatest 操作符
与collectLatest
操作符类似Flow 处理最新值 zinyan.com,也有相对应的“最新”展平模式,在发出新流后立即取消先前流的收集。 flatMapLatest
操作符就是用来处理这个事务的
示例:
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: 第一个值")
delay(300) // 等待 300 毫秒
emit("$i: 第二个值")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapLatest { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
//输出
1: 第一个值 at 156 ms from start
2: 第一个值 at 273 ms from start
3: 第一个值 at 380 ms from start
3: 第二个值 at 690 ms from start
flatMapLatest
会在一个新的Flow
值到来时取消了块中的所有代码。
具体的Flow的使用,我们只有通过不断的使用才能更深刻的理解它们的意义。
下篇介绍Flow的异常等情况。本篇就到这里吧。
评论区