侧边栏壁纸
博主头像
Z同学博主等级

工作磨平激情前,坚持技术的热忱。 欢迎光临Z同学的技术小站。 分享最新的互联网知识。

  • 累计撰写 290 篇文章
  • 累计创建 57 个标签
  • 累计收到 98 条评论

Kotlin 协程的上下文和调度器介绍-Dispatchers

Z同学
2021-12-14 / 0 评论 / 0 点赞 / 553 阅读 / 7,448 字
温馨提示:
本文最后更新于 2021-12-14,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

介绍

协程的上下文通常是CoroutineContext类型为代表。这个类型是被定义在Kotlin的标准库中。

在协程中,上下文是各种不同元素的集合。而其中主导作用的元素就是Job。

我们在了解协程的并发与调度的时候涉及到了Job。Kotlin 协程 组合挂起函数和async关键字,实现协程的并发操作 (zinyan.com)

这篇继续深入了解Job。

调度器(Dispatchers )与线程

什么是调度器? 调度器就是一个决定了协程在哪个线程或者哪些线程上执行的控制对象。

它可以将协程限制在一个特定的线程执行,也可以把协程分配到一个线程池,或者让协程不受限制约束的进行运行。

协程上下文对象:CoroutineContext

协程调度器对象:CoroutineDispatcher

而我们通常在使用launch 或者async时可以通过可选参数定义CoroutineContext 对象。然后它会帮我们指定一个调度器对象。也可以使用Dispatchers 对象,定义调度器

示例:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    // 运行在父协程的上下文中,即 runBlocking 主协程
    launch {
        println("main runBlocking      : 我工作的线程 ${Thread.currentThread().name}")
    }
    //调度到主线层中,并且不受限制
    launch(Dispatchers.Unconfined) { // 
        println("Unconfined            : 我工作的线程 ${Thread.currentThread().name}")
        
            println("这个节点是什么事实结束呢")
    }
    //调度到默认线程
    launch(Dispatchers.Default) { 
        println("Default               : 我工作的线程 ${Thread.currentThread().name}")
    }
    // 调度到一个新的线程之中
    launch(newSingleThreadContext("ZinyanThread")) { 
        println("ZinyanThreadContext   : 我工作的线程 ${Thread.currentThread().name}")
    }
}
//输出
Unconfined            : 我工作的线程 main
Default               : 我工作的线程 DefaultDispatcher-worker-1
ZinyanThreadContext   : 我工作的线程 ZinyanThread
main runBlocking      : 我工作的线程 main

下面介绍上面的四种调度逻辑。

launch{...}:默认情况下,它将会从启动它的协程对象中继承上下文以及调度器。

我们上面的例子就是,从main线程中的runBlocking协程对象中继承了上下文,结果显示运行在了main线程之中。

Dispatchers.Unconfined:是特殊的调度器,上面的例子中是运行在了main线层。但是有一个注释,叫做非受限的调度器。然后可以看到,它的输出是最快最早的。但它仅仅只是运行到第一个挂起点。挂起后,它恢复线程中的协程,而这完全由被调用的挂起函数来决定。非受限的调度器非常适用于执行不消耗 CPU 时间的任务,以及不更新局限于特定线程的任何共享数据(如UI)的协程。

它会默认继承外部协程对象。当它被限制在了调用者线程时,继承自它将会有效地限制协程在该线程运行并且具有可预测的 FIFO 调度。

例子:

fun main() = runBlocking<Unit> {
    // 非受限的——将和主线程一起工作
    launch(Dispatchers.Unconfined) {
        println("Unconfined      : 工作线程 ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : 线程延迟 ${Thread.currentThread().name}")
    }
    launch { // 父协程的上下文,主 runBlocking 协程
        println("main runBlocking: 工作线程 ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: 线程延迟 ${Thread.currentThread().name}")
    }
}
//输出
Unconfined      : 工作线程 main
main runBlocking: 工作线程 main
Unconfined      : 线程延迟 kotlinx.coroutines.DefaultExecutor
main runBlocking: 线程延迟 main

所以,该协程的上下文继承自 runBlocking {...} 协程并在 main 线程中运行,当 delay 函数调用的时候,非受限的那个协程在默认的执行者线程中恢复执行。

非受限的调度器是一种高级机制,可以在某些极端情况下提供帮助而不需要调度协程以便稍后执行或产生不希望的副作用, 因为某些操作必须立即在协程中执行。 非受限调度器不应该在通常的代码中使用。

Dispatchers.Default:默认调度器。 默认调度器使用共享的后台线程池。 所以 launch(Dispatchers.Default) { …… }GlobalScope.launch { …… } 使用相同的调度器。

newSingleThreadContext("MyOwnThread") :自定义协程线层。为协程的运行启动了一个线程。 一个专用的线程是一种非常昂贵的资源。 在实际开发中两者都必须被释放,当不再需要的时候,使用 close 函数,或存储在一个顶层变量中使它在整个应用程序中被重用。否则就会出现线程泛滥的情况。

不同线程中的跳转

实现两个协程线程的跳转。示例:

fun main() = runBlocking<Unit> {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                println("开启 Ctx1")
                withContext(ctx2) {
                    println("Ctx2 开始工作")
                }
                println("返回到 Ctx1 ")
            }
        }
    }
}
//输出
开启 Ctx1
Ctx2 开始工作
返回到 Ctx1 

在这个示例中, 使用runBlocking 显式指定了一个上下文。并且之后在协程中使用withContext来改变协程的上下文,而仍然驻留在相同的协程中。

得到上面的输出结果。在这个例子中,使用的都是newSingleThreadContext()创建的线程,而我们使用了标准库中的use函数来释放该线程。避免线程的滥用。

上下文中的Job

协程中的Job是上下文的一部分,并可以使用coroutineContext [Job] 表达式在上下文中检索它。

示例:

fun main() = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
}
//输出
My job is BlockingCoroutine{Active}@1de0aca6

那么这个有什么作用呢?例如我们可以查询协程的活动状态

示例:

fun main() = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
    var s = coroutineContext[Job]?.isActive
    println(s)
}
//输出
true

说明我当前的协程对象是活动的。

而为什么要添加“?” 那是因为对象可能为null。

子协程

当一个协程被其他协程在CoroutineScope中被启动的话,它将会通过CoroutineScope.coroutineContext来继承主协程的上下文。并且这个新协程的Job对象将会成为父协程的子Job对象。

当一个父协程被取消的时候,所有它的子协程也会被递归的取消。

但是,当使用 GlobalScope 来启动一个协程时,则新协程的作业没有父作业。 因此它与这个启动的作用域无关且独立运作。

示例:

fun main() = runBlocking<Unit> {
    // 启动一个协程来处理某种传入请求(request)
    val request = launch {
        // 孵化了两个子作业, 其中一个通过 GlobalScope 启动
        GlobalScope.launch {
            println("job1: 我运行在GlobalScope启动的协程中")
            delay(1000)
            println("job1: 等待了1秒,你会发现我不受取消方法的影响")
        }
        // 另一个则承袭了父协程的上下文
        launch {
            delay(100)
            println("job2: 我是一个父协程启动的子协程对象")
            delay(1000)
            println("job2: 等待1秒,如果父协程被取消后,我也将会被取消。这行就不应该打印")
        }
    }
    delay(500)
    request.cancel() // 取消请求(request)的执行
    delay(1000) // 延迟一秒钟来看看发生了什么
    println("main: 整个协程全部取消后")
}
//输出
job1: 我运行在GlobalScope启动的协程中
job2: 我是一个父协程启动的子协程对象
job1: 等待了1秒,你会发现我不受取消方法的影响
main: 整个协程全部取消后

我们通过输出结果就可以看看到。只有job1 的两个方法被执行了。 而job2 在取消过程中也被跟着进行了取消。

父协程

我们了解了子协程的概念后,才能比较清晰的明白父协程。

一个父协程总是等待所有的子协程执行结束。父协程并不显式的跟踪所有子协程的启动,并且不必使用 Job.join在最后的时候等待它们:

示例:

fun main() = runBlocking<Unit> {
    // 启动一个协程来处理某种传入请求(request)
    val request = launch {
        repeat(3) { i -> // 启动少量的子协程
            launch  {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒的时间
                println("协程: $i 结束")
            }
        }
        println("返回值: 父协程本身已经执行完毕了,但我并没有调用方法明确的关闭所有子协程, 子协程的事务还没有结束")
    }
    request.join() // 等待请求的完成,包括其所有子协程
    println("所有的协程结束")
}
//输出
返回值: 父协程本身已经执行完毕了,但我并没有调用方法明确的关闭所有子协程, 子协程的事务还没有结束
协程: 0 结束
协程: 1 结束
协程: 2 结束
所有的协程结束

我们可以看到,父协程的代码已经执行完毕并输出了。但是子协程仍然处于活动状态,那么整个协程就仍然属于活动状态。

当然,我们如果主动调用.cancel() 那么子协程还没有运行完也会被强制结束了。

这就是协程中的父子协程之间的关系了。

给协程命名-方便进行调试

协程如果打印日志的时候,是会有默认Id的。但是如果是在处理一些特定的请求或者逻辑的话

我们给协程进行命名,那我们在调试的时候就能更方便的进行调试了。

给协程命名通常是通过CoroutineName进行处理。

示例:

val v1 = async(CoroutineName("v1coroutine")) {
    delay(500)
    log("Computing v1")
    252
}
val v2 = async(CoroutineName("v2coroutine")) {
    delay(1000)
    log("Computing v2")
    6
}

例如上面就是我将两个协程对象 进行了命名。这种命名结果只有在log日志中才能看到结果。

初始协程时,多元素添加

我们学过载协程中初始化调度器,在上一步也学习了添加协程名称。

那么我们如果在启动的时候这两个配置属性都要进行添加,那么该如何处理?

可以通过+进行拼接。示例:

fun main() = runBlocking<Unit> {
    // 启动一个协程来处理某种传入请求(request)
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("我工作的线程: ${Thread.currentThread().name}")
    }
}

作用域-CoroutineScope

作用域我们都理解,就是在指定空间和区域内生效而已。而我们如果在Android开发中,使用Activity启动一个协程来处理网络或者异步IO读取等操作。所有的这个协程应该在Activity被销毁后自动取消,来避免内存泄露。

我们除了可以手动处理,并关闭外,我们还可以在协程构建的时候进行声明它的范围。

示例:

class DemoActivity : AppCompatActivity() {
    //MainScope 是使用 Kotlinx协程库自带的工厂函数。
    // 它是使用Dispatchers.Main作为调度器的适配UI线程
    private val mainScope = MainScope()

    //关闭的时候 取消作用域
    fun destroy() {
        mainScope.cancel()
    }

    fun doSomething() {
        // 在示例中启动了 10 个协程,且每个都工作了不同的时长
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
}

然后我们如果关闭activity,协程也会自动进行关闭。

Android 现在在所有具有生命周期的实体中(activity,Fragment等),都对协程作用域提供了一级支持。

局部数据传递

我们如果使用协程,特别是子协程,父协程混杂等等情况。那么如果能够将一些数据在协程与协程之间传递。那么将会大大提高效率。

Kotlin 提供了:ThreadLocalasContextElement 扩展函数来帮助我们,它们创建了额外的上下文元素, 且保留给定 ThreadLocal 的值,并在每次协程切换其上下文时恢复它。

示例:

import kotlinx.coroutines.*

val threadLocal = ThreadLocal<String?>() // declare thread-local variable

fun main() = runBlocking<Unit> {
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}
//输出
Pre-main, current thread: Thread[main,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-1,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main,5,main], thread local value: 'main'

在这个示例中,使用Dispatchers.Default 在线程池中启动了一个新的协程。所以它工作在线程池中的不同线程中,但它仍然具有线程局部变量的值,例如上面就是使用asContextElement 修改get的值从main 改为launch。

(ps:关于这个数据调度我也比较迷,之后有空再进行深入学习了解吧。到时候结合具体的情况进行分析分析协程中的数据调度)

0

评论区