侧边栏壁纸
  • 累计撰写 416 篇文章
  • 累计创建 65 个标签
  • 累计收到 145 条评论

目 录CONTENT

文章目录

Kotlin 协程 Flow的基础介绍三

Z同学
2022-01-13 / 0 评论 / 4 点赞 / 1,267 阅读 / 4,129 字
温馨提示:
本文最后更新于 2022-01-13,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1. 介绍

前两篇Flow 基础学习:

Kotlin 协程 Flow的基础介绍一 (zinyan.com)

Kotlin 协程 Flow的基础介绍二 (zinyan.com)

本篇继续学习Flow的知识。了解多个Flow的组合,以及Flow嵌套情况下如何获取最新结果,进行Flow的展平操作等。

2. 组合多个Flow

在Kotlin中,组合多个Flow有很多种方法,这里主要介绍两种zip 和combine。

2.1 zip 操作符

Flow 拥有一个zip操作符,可以用于组合两个流中的相关值。

使用应用于每对值的提供的transform函数,将当前流(this)中的值与other流进行压缩。

一旦其中一个流完成,结果流就完成,并对其余流调用cancel

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow() // asFlow 创建Flow的一种模式,可以参考https://zinyan.com/?p=219 有介绍。
    val strs = flowOf("one", "two", "three") // flowOf 另外的一种Flow的创建模式。
    nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串
        .collect {
            println(it) 
        } // 收集并打印
}
//输出
1 -> one
2 -> two
3 -> three

在上面的例子中,使用了两种不同的Flow构造器。然后我们将strs 的数据通过zip方法,合并到一起了。

上面的两个Flow速度一样,我们如果故意将两个Flow的速度改为不一样会是什么效果呢?

示例2:

fun main() = runBlocking<Unit> {
    val flow = flowOf(1, 2, 3).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
    flow.zip(flow2) { i, s -> i.toString() + s }.collect {
        println(it) // 收集并打印
    }
}
//输出
1a
2b
3c

我们可以看到。flow2 并没有执行完。当flow执行完毕后。整个过程就结束了。

2.2 combine操作符

Flow表示一个变量或则操作的最新值时,可能会需要执行计算。每当上游产生一个值的时候,都需要重新计算。这种相应的操作符统称为combine

zip的第二个示例中,我们发现数据不齐全,如果想数据全,可以将zip换为combine

示例:

fun main() = runBlocking<Unit> {
    val flow = flowOf(1, 2, 3).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
    flow.combine(flow2) { i, s -> i.toString() + s }.collect {
        println(it) // 收集并打印
    }
}
//输出
1a
2b
3b
3c
3d

这就是它的每次新值时重新计算的原理。当第一个流的结果还是b的时候,第二个流已经到3了。或第二个流已经结束了。第一个流还有数据输出。

也就是说,只要还有一个流没有结束,并产生新的数据,那么都会重新计算产生新的结果值并发射。

3. 展平Flow

Flow表示异步接受的值序列,所以很容易遭遇到下面这种情况:每一个值都会触发对另外一个值序列(Flow)的请求。

比如我们有一个每隔300毫秒返回两个字符串的Flow函数。然后我们有另外一个Flow去调用它

示例:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: 第一个值")
    delay(300) // 等待 300 毫秒
    emit("$i: 第二个值")
}

fun main() = runBlocking<Unit> {
    (1..4).asFlow().map { requestFlow(it) }
}

然后在上面的实例中,我们得到了一个包含FlowFlow。我们需要针对这种Flow进行展平为单个流方便下一步处理。

集合和序列都拥有flattenflatMap操作符来做展平操作。

流是具有异步特性的,所以需要不同的展平模式。为此Kotlin提供了一系列的流展平操作符。

这里简单介绍部分。

3.1 flatMapConcat 操作符

连接模式由 flatMapConcatflattenConcat 操作符实现。它们是相应序列操作符最相近的类似物。它们在等待内部流完成之前开始收集下一个值。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: 第一个值")
    delay(300) // 等待 300 毫秒
    emit("$i: 第二个值")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis() // 记录开始时间
    (1..4).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
        .flatMapConcat { requestFlow(it) }
        .collect { value -> // 收集并打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
//输出
1: 第一个值 at 136 ms from start
1: 第二个值 at 450 ms from start
2: 第一个值 at 560 ms from start
2: 第二个值 at 871 ms from start
3: 第一个值 at 979 ms from start
3: 第二个值 at 1289 ms from start
4: 第一个值 at 1396 ms from start
4: 第二个值 at 1709 ms from start

我们可以清楚的看到flatMapConcat的顺序性质。

3.2 flatMapMerge 操作符

相较于上面的flatMapConcat平展符,flatMapMerge平展操作符是将所有Flow的值并发收集并合并到一个单独Flow中,可以更快的处理Flow的结果。

示例:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: 第一个值")
    delay(300) // 等待 300 毫秒
    emit("$i: 第二个值")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis() // 记录开始时间
    (1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // 收集并打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
//输出
1: 第一个值 at 173 ms from start
2: 第一个值 at 269 ms from start
3: 第一个值 at 380 ms from start
1: 第二个值 at 486 ms from start
2: 第二个值 at 582 ms from start
3: 第二个值 at 692 ms from start

可以看到相较于第一种,它的速度要快不少。

通过打印结果可以很明显的看到并发的过程。

flatMapMerge 会按照顺序调用代码块,但是并发收集结果值。

3.3 flatMapLatest 操作符

collectLatest 操作符类似Flow 处理最新值 zinyan.com,也有相对应的“最新”展平模式,在发出新流后立即取消先前流的收集。 flatMapLatest操作符就是用来处理这个事务的

示例:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: 第一个值")
    delay(300) // 等待 300 毫秒
    emit("$i: 第二个值")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis() // 记录开始时间
    (1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
        .flatMapLatest { requestFlow(it) }
        .collect { value -> // 收集并打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
//输出
1: 第一个值 at 156 ms from start
2: 第一个值 at 273 ms from start
3: 第一个值 at 380 ms from start
3: 第二个值 at 690 ms from start

flatMapLatest会在一个新的Flow值到来时取消了块中的所有代码。

具体的Flow的使用,我们只有通过不断的使用才能更深刻的理解它们的意义。

下篇介绍Flow的异常等情况。本篇就到这里吧。

4

评论区