侧边栏壁纸
博主头像
Z同学博主等级

工作磨平激情前,坚持技术的热忱。 欢迎光临Z同学的技术小站。 分享最新的互联网知识。

  • 累计撰写 290 篇文章
  • 累计创建 57 个标签
  • 累计收到 98 条评论

Kotlin 协程 Flow的基础介绍四

Z同学
2022-01-13 / 0 评论 / 0 点赞 / 366 阅读 / 6,347 字
温馨提示:
本文最后更新于 2022-01-13,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1. 介绍

前三篇Flow 基础学习:

Kotlin 协程 Flow的基础介绍一 (zinyan.com)

Kotlin 协程 Flow的基础介绍二 (zinyan.com)

Kotlin 协程 Flow的基础介绍三 (zinyan.com)

本篇继续学习Flow的知识。了解Flow的异常处理与捕获,Flow的完成时的状态检测,

以及主动停止Flow等知识。

2. Flow 异常 try...catch

当运算符中的发射器或则代码抛出异常时,Flow收集可以带有异常的完成。

和常见的出现异常的解决方法一样。我们可以通过try...catch来处理可能发生的异常。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("发射 $i")
        emit(i) // 发射下一个值
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->
            println(value)
            check(value <= 1) { "接收 $value" }
        }
    } catch (e: Throwable) {
        println("捕获异常 $e")
    }
}
//输出
发射 1
1
发射 2
2
捕获异常 java.lang.IllegalStateException: 接收 2

和常见的异常捕获一样。catch之后就会结束了。不会再有新的发射值产生。

上面的示例实际上捕获了在发射器或则任何过渡或末端操作符中发生的任何异常。

示例2:和上面的异常捕获差不多。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("发射: $i")
            emit(i) // 发射下一个值
        }
    }
        .map { value ->
            check(value <= 1) { "收集到值: $value" }
            "string $value"
        }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("异常: $e")
    }
}
//输出
发射: 1
string 1
发射: 2
异常: java.lang.IllegalStateException: 收集到值: 2

仍然会捕获异常并停止Flow数据收集。

3. 异常透明性

我们上面的示例都是在接收器上try捕获异常。那么如果过在发射器产生数据时出现了异常,我们该如何进行封装处理?

Kotlin中定义,Flow 必须对异常透明,也就是说在flow{...}代码块内部的try/catch进行数据值发射,是违反异常透明原则的。

但是发射器可以使用catch操作符来保留异常的透明性并允许封装异常处理。

请注意,这个catch 并不是try成对应关系的catch。

catch操作符的代码块可以分析异常并根据捕获到的异常采取不同的方式进行响应,响应手段为:

  • 可以使用throw抛出异常。
  • 可以使用catch块中的emit将异常转换为正常值进行发射。
  • 可以将异常忽略,或用日志进行打印,或采取其他代码处理异常。

说那么多,我们还是通过示例来进行理解吧。

示例:在捕获异常的时候发射文本:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("发射: $i")
            emit(i) // 发射下一个值
        }
    }
        .map { value ->
            check(value <= 1) { "收集到值: $value" }
            "string $value"
        }

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> emit("异常发射 $e") } // 发射一个异常
        .collect { value -> println(value) }
}
//输出
发射: 1
string 1
发射: 2
异常发射 java.lang.IllegalStateException: 收集到值: 2

上面的示例中,我们没有使用try...catch也能进行异常捕获,输出的结果也是一样的。

3.1 透明捕获-catch

catch过渡操作符遵循异常透明性,仅捕获上游异常。如果collect{...}位于catch之下并抛出了异常。那么这个异常将会出现逃逸无法被捕获。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("发射 $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("异常 $e") } // 不会捕获下游异常
        .collect { value ->
            check(value <= 1) { "捕获结果 $value" }
            println(value)
        }
}

上面的示例运行后,将会出现

发射 1
1
发射 2
Exception in thread "main" java.lang.IllegalStateException: 捕获结果 2
	at com.zinyan.general.MapUtilKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)

错误日志。

3.2 声明式捕获-catch

我们可以将catch操作符的声明性与处理所有异常的期望结果相结合,将collect操作符的代码块移动到onEach中并放在catch操作符之前。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("发射 $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .onEach { value ->
            check(value <= 1) { "收集值: $value" }
            println(value)
        }
        .catch { e -> println("异常 $e") }
        .collect()
}
//输出
发射 1
1
发射 2
异常 java.lang.IllegalStateException: 收集值: 2

4.Flow完成

当Flow收集完成时,如果需要执行一个动作,我们可以使用以下两种模式:

这个动作可以是命令式,也可以是声明式。

所谓的的声明式,就是在调用之前告诉Flow一声,之后结束了你要执行个以下方法。

命令式,就是并不告诉Flow执行后需要如何做。而是只要Flow结束完了我就要执行的方法。

4.1 命令式finally块

上面的异常捕获,其实就是命令式的。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("结束")
    }
}
//输出
1
2
3
结束

4.2 声明式-onCompletion

对于声明式,Flow 拥有onCompletion过渡操作符,它在Flow完全收集时调用。

我们可以使用该函数重写前面的命令式的示例。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { println("结束") }
        .collect { value -> println(value) }
}
//输出
1
2
3
结束

两个方式,输出的结果都是一样的。

相较于命令式,我们使用声明式可以在函数中使用lambda表达式的可空参数Throwable

用于确认Flow收集是正常结束的还是有异常发生过。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Floew 收集过程中有catch异常发生") }
        .catch { cause -> println("异常搜集结束") }
        .collect { value -> println(value) }
}
//输出
1
Floew 收集过程中有catch异常发生
异常搜集结束

onCompletion操作符与 catch 不同,它不处理异常。从上面的示例可以看到异常仍然流向下游。被提供给后面的 catch 操作符处理。

我们知道了Flow的收集并调用命令式或则声明式处理完成的结果或异常情况。

那么在什么时候我们应该使用命令式,什么时候使用声明式呢?Kotlin没有一个官方主张。主要是根据我们自己的喜好和代码风格进行选择。

也就是说,总的来说这两种方式。对于性能来说是没有区别的。我们根据自己的喜好决定使用即可。(ps:可能后续Kotlin不断的更新迭代,会有区别吧)

5. 启动Flow

我们上面的Flow启动都是普通模式,我们如果希望Flow启动时在一个单独的协程中运行。

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*


fun main() = runBlocking<Unit> {
    (1..3).asFlow()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- 在单独的协程中执行流
    println("结束")
}
//输出
结束
Event: 1
Event: 2
Event: 3

我们使用launchIn替换了collectlaunchIn的传参指定了用哪个协程来启动Flow的收集。

在实际的使用场景下,每个协程是有作用域的,当相应作用域被取消被关闭时。

我们使用launchIn启动的Flow 会自动跟随协程被一起回收。就如同我们在给对象绑定Listener时,我们不用关系removeListener。只需要想用的时候addListenersetListener

同时,launchIn会返回一个Job对象。我们可以在不取消整个协程的情况下,仅取消对应的Flow收集

6. Flow 取消检测

Flow的使用中。Flow构造器对每个发射值都执行了ensureActive检测。

意味着我们可以在Flow发射过程中,随时进行取消。

示例:

fun foo(): Flow<Int> = flow {
    for (i in 1..6) {
        println("发射数据: $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        if (value == 3) cancel()  //如果发射到3了那么我就取消
        println(value)
    }
}
//输出
发射数据: 1
1
发射数据: 2
2
发射数据: 3
3
发射数据: 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@4dcbadb4

然后当出现4的时候,由于接收器被取消了。出现了异常

取消是会有异常的,如果出现了异常都会造成性能的影响。所以大部分Flow操作不会自动检测取消检测。例如asFlow构造创建的Flow

示例:

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}
//输出
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@5b80350b

它就会执行完毕后才会报错。仅在从runBlocking返回之前检测一轮取消操作

我们如果就是想取消那种不让取消的Flow该怎么做?

6.1 添加Flow的取消检测-cancellable

我们在上面的示例中,由于Flow只有结束后才执行一轮取消状态检测。我们可以通过

添加 .onEach { currentCoroutineContext().ensureActive() }或使用Kotlin现成的 cancellable操作符。来实现

示例:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}
//输出
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@deb6432

添加之后,它就会每次发射的时候主动进行取消检测了。

到这里,Flow的基本介绍就暂时结束了。

之后使用过程中针对Flow有更多的理解时我将会继续介绍。

0

评论区