1. 介绍
前三篇Flow 基础学习:
Kotlin 协程 Flow的基础介绍一 (zinyan.com)
Kotlin 协程 Flow的基础介绍二 (zinyan.com)
Kotlin 协程 Flow的基础介绍三 (zinyan.com)
本篇继续学习Flow的知识。了解Flow的异常处理与捕获,Flow的完成时的状态检测,
以及主动停止Flow等知识。
2. Flow 异常 try...catch
当运算符中的发射器或则代码抛出异常时,Flow
收集可以带有异常的完成。
和常见的出现异常的解决方法一样。我们可以通过try...catch
来处理可能发生的异常。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("发射 $i")
emit(i) // 发射下一个值
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "接收 $value" }
}
} catch (e: Throwable) {
println("捕获异常 $e")
}
}
//输出
发射 1
1
发射 2
2
捕获异常 java.lang.IllegalStateException: 接收 2
和常见的异常捕获一样。catch之后就会结束了。不会再有新的发射值产生。
上面的示例实际上捕获了在发射器或则任何过渡或末端操作符中发生的任何异常。
示例2:和上面的异常捕获差不多。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("发射: $i")
emit(i) // 发射下一个值
}
}
.map { value ->
check(value <= 1) { "收集到值: $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("异常: $e")
}
}
//输出
发射: 1
string 1
发射: 2
异常: java.lang.IllegalStateException: 收集到值: 2
仍然会捕获异常并停止Flow
数据收集。
3. 异常透明性
我们上面的示例都是在接收器上try
捕获异常。那么如果过在发射器产生数据时出现了异常,我们该如何进行封装处理?
Kotlin
中定义,Flow
必须对异常透明,也就是说在flow{...}
代码块内部的try/catch
进行数据值发射,是违反异常透明原则的。
但是发射器可以使用catch
操作符来保留异常的透明性并允许封装异常处理。
请注意,这个catch 并不是try成对应关系的catch。
catch
操作符的代码块可以分析异常并根据捕获到的异常采取不同的方式进行响应,响应手段为:
- 可以使用
throw
抛出异常。 - 可以使用
catch
块中的emit
将异常转换为正常值进行发射。 - 可以将异常忽略,或用日志进行打印,或采取其他代码处理异常。
说那么多,我们还是通过示例来进行理解吧。
示例:在捕获异常的时候发射文本:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("发射: $i")
emit(i) // 发射下一个值
}
}
.map { value ->
check(value <= 1) { "收集到值: $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> emit("异常发射 $e") } // 发射一个异常
.collect { value -> println(value) }
}
//输出
发射: 1
string 1
发射: 2
异常发射 java.lang.IllegalStateException: 收集到值: 2
上面的示例中,我们没有使用try...catch
也能进行异常捕获,输出的结果也是一样的。
3.1 透明捕获-catch
catch
过渡操作符遵循异常透明性,仅捕获上游异常。如果collect{...}
位于catch之下并抛出了异常。那么这个异常将会出现逃逸无法被捕获。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("发射 $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("异常 $e") } // 不会捕获下游异常
.collect { value ->
check(value <= 1) { "捕获结果 $value" }
println(value)
}
}
上面的示例运行后,将会出现
发射 1
1
发射 2
Exception in thread "main" java.lang.IllegalStateException: 捕获结果 2
at com.zinyan.general.MapUtilKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)
错误日志。
3.2 声明式捕获-catch
我们可以将catch
操作符的声明性与处理所有异常的期望结果相结合,将collect
操作符的代码块移动到onEach
中并放在catch
操作符之前。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("发射 $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.onEach { value ->
check(value <= 1) { "收集值: $value" }
println(value)
}
.catch { e -> println("异常 $e") }
.collect()
}
//输出
发射 1
1
发射 2
异常 java.lang.IllegalStateException: 收集值: 2
4.Flow完成
当Flow收集完成时,如果需要执行一个动作,我们可以使用以下两种模式:
这个动作可以是命令式,也可以是声明式。
所谓的的声明式,就是在调用之前告诉Flow
一声,之后结束了你要执行个以下方法。
命令式,就是并不告诉Flow
执行后需要如何做。而是只要Flow
结束完了我就要执行的方法。
4.1 命令式finally块
上面的异常捕获,其实就是命令式的。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("结束")
}
}
//输出
1
2
3
结束
4.2 声明式-onCompletion
对于声明式,Flow
拥有onCompletion
过渡操作符,它在Flow
完全收集时调用。
我们可以使用该函数重写前面的命令式的示例。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { println("结束") }
.collect { value -> println(value) }
}
//输出
1
2
3
结束
两个方式,输出的结果都是一样的。
相较于命令式,我们使用声明式可以在函数中使用lambda
表达式的可空参数Throwable
用于确认Flow
收集是正常结束的还是有异常发生过。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Floew 收集过程中有catch异常发生") }
.catch { cause -> println("异常搜集结束") }
.collect { value -> println(value) }
}
//输出
1
Floew 收集过程中有catch异常发生
异常搜集结束
onCompletion
操作符与 catch
不同,它不处理异常。从上面的示例可以看到异常仍然流向下游。被提供给后面的 catch
操作符处理。
我们知道了Flow的收集并调用命令式或则声明式处理完成的结果或异常情况。
那么在什么时候我们应该使用命令式,什么时候使用声明式呢?Kotlin没有一个官方主张。主要是根据我们自己的喜好和代码风格进行选择。
也就是说,总的来说这两种方式。对于性能来说是没有区别的。我们根据自己的喜好决定使用即可。(ps:可能后续Kotlin不断的更新迭代,会有区别吧)
5. 启动Flow
我们上面的Flow启动都是普通模式,我们如果希望Flow启动时在一个单独的协程中运行。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..3).asFlow()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- 在单独的协程中执行流
println("结束")
}
//输出
结束
Event: 1
Event: 2
Event: 3
我们使用launchIn
替换了collect
。launchIn
的传参指定了用哪个协程来启动Flow的收集。
在实际的使用场景下,每个协程是有作用域的,当相应作用域被取消被关闭时。
我们使用launchIn
启动的Flow
会自动跟随协程被一起回收。就如同我们在给对象绑定Listener
时,我们不用关系removeListener
。只需要想用的时候addListener
或setListener
。
同时,launchIn
会返回一个Job
对象。我们可以在不取消整个协程的情况下,仅取消对应的Flow
收集
6. Flow 取消检测
在Flow
的使用中。Flow
构造器对每个发射值都执行了ensureActive
检测。
意味着我们可以在Flow
发射过程中,随时进行取消。
示例:
fun foo(): Flow<Int> = flow {
for (i in 1..6) {
println("发射数据: $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel() //如果发射到3了那么我就取消
println(value)
}
}
//输出
发射数据: 1
1
发射数据: 2
2
发射数据: 3
3
发射数据: 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@4dcbadb4
然后当出现4的时候,由于接收器被取消了。出现了异常
取消是会有异常的,如果出现了异常都会造成性能的影响。所以大部分Flow
操作不会自动检测取消检测。例如asFlow
构造创建的Flow
示例:
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
//输出
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@5b80350b
它就会执行完毕后才会报错。仅在从runBlocking
返回之前检测一轮取消操作
我们如果就是想取消那种不让取消的Flow
该怎么做?
6.1 添加Flow
的取消检测-cancellable
我们在上面的示例中,由于Flow只有结束后才执行一轮取消状态检测。我们可以通过
添加 .onEach { currentCoroutineContext().ensureActive() }
或使用Kotlin现成的 cancellable
操作符。来实现
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
//输出
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@deb6432
添加之后,它就会每次发射的时候主动进行取消检测了。
到这里,Flow的基本介绍就暂时结束了。
之后使用过程中针对Flow有更多的理解时我将会继续介绍。
评论区