介绍
今天继续学习关于协程的状态共享和并发逻辑。
协程可用多线程调度器实现并发执行。但是Kotlin协程针对并发处理是如何实现的呢?
例如常见的并发数据的访问啊,状态同步等等。今天就是学习这方面的知识。
1.并发
我们启动一百个协程,让它们同时执行同一个重复操作。然后计算它们的完成时间。
示例:
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//输出
Completed 100000 actions in 14 ms
Counter = 69680
在上面的示例中,它们同时针对一个数据变量做递增计算。但是结果并不是100000。
因为在一百个协程在计算的时候,并没有做并发处理。
我们如何让它们的输出结果为100000呢?
不能直接采用volatile
注解 @Volatile var counter = 0
。
因为vlolatile变量保证可线性化读取和写入变量。但是在大量动作发生时并不提供原子性。
1.1 线程安全的数据结构
我们需要采用一种对线程和协程都有效的解决方法。就是线程安全的数据结构。
它为需要在共享状态上执行的相应操作提供所有必须的同步处理。
示例:
var counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("结果集 = $counter")
}
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("全部动作 ${n * k} 耗时 $time 毫秒")
}
//输出
全部动作 100000 耗时 15 毫秒
结果集 = 100000
它适用于普通计数器、集合、队列和其他标准数据结构以及它们的基本操作。
然而,它并不容易被扩展来应对复杂状态、或一些没有现成的线程安全实现的复杂操作。
2. 颗粒度限制线程
2.1 细粒度限制
限制线程 是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都限制在单个线程中。它通常应用于 UI 程序中:所有 UI 状态都局限于单个事件分发线程或应用主线程中。
示例:
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("全部动作 ${n * k} 耗时 $time 秒")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// confine each increment to a single-threaded context
withContext(counterContext) {
counter++
}
}
}
println("结果集 = $counter")
}
//输出
全部动作 100000 耗时 404 秒
结果集 = 100000
结果是正确的,但是耗时久比较多了。这也是细粒度的线程限制的缺陷。
在上面的示例中,它的每个数据变化操作都得使用withContext()块从多线程Dispatchers.Default 切换到单线程上。所以数据准确,但是耗时长。
2.2 粗粒度限制
相较于细粒度的缺陷。我们根据实践的取舍。提供类粗粒度的限制
例如:状态更新类业务逻辑中大部分都是限于单线程中。
示例:在单线程上下文中运行每个协程。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("全部动作 ${n * k} 耗时 $time 秒")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 将一切都限制在单线程上下文中
withContext(counterContext) {
massiveRun {
counter++
}
}
println("结果集 = $counter")
}
//输出
全部动作 100000 耗时 18 秒
结果集 = 100000
相较于细粒度,它的速度要快了太多。
3. 互斥 Mutex
使用永远不会同时执行的 关键代码块 来保护共享状态的所有修改。
我们以前在java中经常使用:synchronized
或者 ReentrantLock
。在协程也有类似的替代品:Mutex
。
它拥有lock
和unlock
方法,可以隔离关键部分。但是区别在于Mutex.lock()
是一个挂起操作,不会阻塞线程。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("全部动作 ${n * k} 耗时 $time 毫秒")
}
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
mutex.withLock {
counter++
}
}
}
println("结果集 = $counter")
}
//输出
全部动作 100000 耗时 223 毫秒
结果集 = 100000
此示例中锁是细粒度的,因此会付出一些代价。但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态。
这里只是一个简单的学习,之后了解更多的话。再单独出一篇介绍吧。
3.1 Actors
actors
是由协程、被限制并封装到该协程中的状态以及一个与其它协程通信的通道组合而成的实体。
有一个 actor
协程构建器,它可以方便地将 actor
的邮箱通道组合到其作用域中(用来接收消息)、组合发送 channel
与结果集对象,这样对 actor
的单个引用就可以作为其句柄持有。
我们通过示例代码来理解吧。
示例:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 //创建一百个协程对象
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求
// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 状态
for (msg in channel) { //即将到来消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking<Unit> {
val counter = counterActor() // 创建该 actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// 发送一条消息以用来从一个 actor 中获取计数值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
// 关闭该actor
counter.close()
}
//输出
全部动作 100000 耗时 146 毫秒
结果集 = 100000
actor
本身执行时所处上下文(就正确性而言)无关紧要。一个 actor
是一个协程,而一个协程是按顺序执行的,因此将状态限制到特定协程可以解决共享可变状态的问题。实际上,actor
可以修改自己的私有状态, 但只能通过消息互相影响(避免任何锁定)。
actor
在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。
注意,actor 协程构建器是一个双重的 produce 协程构建器。一个 actor 与它接收消息的通道相关联,而一个 producer 与它发送元素的通道相关联。
参考
共享的可变状态与并发 - Kotlin 语言中文站 (kotlincn.net)
kotlinx.coroutines-cn/example-sync-03.kt at master · hltj/kotlinx.coroutines-cn (github.com)
kotlinx.coroutines-cn/example-sync-04.kt at master · hltj/kotlinx.coroutines-cn (github.com)
kotlinx.coroutines-cn/example-sync-06.kt at master · hltj/kotlinx.coroutines-cn (github.com)
kotlinx.coroutines-cn/example-sync-07.kt at master · hltj/kotlinx.coroutines-cn (github.com)
评论区