大家好,欢迎来到IT知识分享网。
协程,英文名是 Coroutine
, 本质上,协程是轻量级的线程, 它的调度切换是协作式的,可以主动挂起和恢复
retrofit2对协程的支持
先来看看我们最常用的retrofit2
,在使用协程和不实用协程的代码区别在哪里
注意retrofit2
在2.6.0
才开始支持协程,所以一定要将retrofit2
升级到2.6.0
及以上
先分别定义两个api,一个是结合rxjava2
的用法,一个结合协程的用法
interface TestApi {
@GET("api/4/news/latest")
fun getLatestNews(): Flowable<LatestNews>
@GET("api/4/news/latest")
suspend fun getLatestNews2(): LatestNews
}
IT知识分享网
可见retrofit2
支持用suspend
定义 getLatestNews2
api为一个挂起函数,即可在协程中使用这个api
再来看看怎么使用两个不同的api
IT知识分享网class CoroutineActivity : AppCompatActivity() {
...
// 这是一个我们使用retrofit2 请求数据+切换线程最常用的方法
fun requestData1() {
testApi.getLatestNews()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : DisposableSubscriber<LatestNews>() {
override fun onComplete() {}
override fun onNext(t: LatestNews) {
tv_text.text = Gson().toJson(t)
}
override fun onError(t: Throwable?) {
tv_text.text = "error"
}
})
}
// 使用协程 请求+渲染数据
fun requestData2() {
GlobalScope.launch(Dispatchers.Main) {
try {
tv_text.text = Gson().toJson(testApi.getLatestNews2())
} catch (e: Exception) {
tv_text.text = "error"
}
}
}
}
rxjava2是使用回调的方式渲染数据,这个大家都知道
而协程需要先使用GlobalScope.launch
启动一个协程(启动协程的方法很多,请自行查看官方文档),并使用Dispatchers.Main
指定协程调度器为主线程(即ui线程), 然后通过 try catch
分别处理正常和异常的情况(暂时使用GlobalScope
上下文启动协程,下面会介绍一种专门再android中启动协程的方法)
这样看来是不是使用协程可以简化很多代码,使代码看起来更加优雅
我们再来看看多个请求并发和串行的情况
先多添加几个api,方便操作
interface TestApi {
@GET("api/3/news/latest")
fun getLatestNews(): Flowable<LatestNews>
@GET("api/3/news/{id}")
fun getNewsDetail(@Path("id") id: Long): Flowable<News>
@GET("api/4/news/latest")
suspend fun getLatestNews2(): LatestNews
@GET("api/3/news/{id}")
suspend fun getNewsDetail2(@Path("id") id: Long): News
}
比如我们先调用getLatestNews()
方法请求一系列的新闻列表,然后在调用getNewsDetail
请求第一个新闻的详情,代码如下
IT知识分享网// 非协程用法
testApi.getLatestNews()
.flatMap {
testApi.getNewsDetail(it.stories!![0].id!!)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : DisposableSubscriber<News>() {
override fun onComplete() {}
override fun onNext(t: News) {
tv_text.text = t.title
}
override fun onError(t: Throwable?) {
tv_text.text = "error"
}
})
// 协程用法
GlobalScope.launch(Dispatchers.Main) {
try {
val lastedNews = testApi.getLatestNews2()
val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
tv_text.text = detail.title
} catch(e: Exception) {
tv_text.text = "error"
}
}
再比如如果我们想调用getNewsDetail
同时请求多个新闻详情数据
// 非协程用法
testApi.getLatestNews()
.flatMap {
Flowable.zip(
testApi.getNewsDetail(it.stories!![0].id!!),
testApi.getNewsDetail(it.stories!![1].id!!),
BiFunction<News, News, List<News>> { news1, news2->
listOf(news1, news2)
}
)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : DisposableSubscriber<List<News>>() {
override fun onComplete() {}
override fun onNext(t: List<News>) {
tv_text.text = t[0].title + t[1].title
}
override fun onError(t: Throwable?) {
tv_text.text = "error"
}
})
// 协程的用法
GlobalScope.launch(Dispatchers.Main) {
try {
// 先请求新闻列表
val lastedNews = testApi.getLatestNews2()
// 再使用async 并发请求第一个和第二个新闻的详情
val detail1 = async { testApi.getNewsDetail2(lastedNews.stories!![0].id!!) }
val detail2 = async { testApi.getNewsDetail2(lastedNews.stories!![1].id!!) }
tv_text.text = detail1.await().title + detail2.await().title
} catch(e: Exception) {
tv_text.text = "error"
}
}
可见相对于非协程的写法(代码中使用rxjava2),协程能让你的代码更加简洁、优雅,能更加清晰的描述你第一步想做什么、第二步想做什么等等
room数据库对协程的支持
room
数据库在2.1.0
开始支持协程, 并且需要导入room-ktx
依赖
implementation "androidx.room:room-ktx:2.1.0"
然后在Dao
中使用suspend
定义挂起函数
@Dao
abstract class UserDao {
@Query("select * from tab_user")
abstract suspend fun getAll(): List<User>
}
最后就像上面retrofit2
那样使用协程即可
class RoomActivity : AppCompatActivity() {
private var adapter: RoomAdapter? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_room)
...
}
...
private fun loadUser() {
GlobalScope.launch(Dispatchers.Main) {
adapter!!.data = AppDataBase.getInstance().userDao().getAll()
}
}
}
这里指介绍room数据库的协程用法,对于room数据库的介绍和其他用法请查看Android Jetpack ROOM数据库用法介绍和android Jetpack ROOM数据库结合其它Library的使用介绍
协程在android里的应用
上面的example
都是使用GlobalScope
上下文来启动协程, 其实真正在android中一般不建议直接使用GlobalScope
,因为使用GlobalScope.launch
时,我们会创建一个顶层协程。虽然它很轻量,但它运行时仍会消耗一些内存资源,如果我们忘记保持对新启动的协程的引用,它还会继续运行,所以我们必须保持所有对GlobalScope.launch
启动协程的引用,然后在activity
destory
(或其它需要cancel
)的时候cancel
掉所有的协程,否则就会造成内存泄露等一系列问题
比如:
class CoroutineActivity : AppCompatActivity() {
private lateinit var testApi: TestApi
private var job1: Job? = null
private var job2: Job? = null
private var job3: Job? = null
...
override fun onDestroy() {
super.onDestroy()
job1?.cancel()
job2?.cancel()
job3?.cancel()
}
...
// 启动第一个顶级协程
fun requestData1() {
job1 = GlobalScope.launch(Dispatchers.Main) {
try {
val lastedNews = testApi.getLatestNews2()
tv_text.text = lastedNews.stories!![0].title
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
// 启动第二个顶级协程
fun requestData2() {
job2 = GlobalScope.launch(Dispatchers.Main) {
try {
val lastedNews = testApi.getLatestNews2()
// 在协程内部启动第三个顶级协程
job3 = GlobalScope.launch(Dispatchers.Main) {
try {
val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
tv_text.text = detail.title
} catch (e: Exception) {
tv_text.text = "error"
}
}
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
}
可见如果使用GlobalScope
启动的协程越多,就必须定义越多的变量持有对启动协程的引用,并在onDestroy
的时候cancel
掉所有协程
下面我们就介绍MainScope
代替GlobalScope
的使用
class CoroutineActivity : AppCompatActivity() {
private var mainScope = MainScope()
private lateinit var testApi: TestApi
...
override fun onDestroy() {
super.onDestroy()
// 只需要调用mainScope.cancel,就会cancel掉所有使用mainScope启动的所有协程
mainScope.cancel()
}
fun requestData1() {
mainScope.launch {
try {
val lastedNews = testApi.getLatestNews2()
tv_text.text = lastedNews.stories!![0].title
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
fun requestData2() {
mainScope.launch {
try {
val lastedNews = testApi.getLatestNews2()
val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
tv_text.text = detail.title
} catch (e: Exception) {
tv_text.text = "error"
}
}
}
}
又或者是使用kotlin委托模式实现如下:
class CoroutineActivity : AppCompatActivity(), CoroutineScope by MainScope() {
private lateinit var testApi: TestApi
...
override fun onDestroy() {
super.onDestroy()
cancel()
}
fun requestData1() {
launch {
try {
val lastedNews = testApi.getLatestNews2()
tv_text.text = lastedNews.stories!![0].title
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
fun requestData2() {
launch {
try {
val lastedNews = testApi.getLatestNews2()
val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
tv_text.text = detail.title
} catch (e: Exception) {
tv_text.text = "error"
}
}
}
}
同时我们先来看看MainScope
的定义
@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
可见使用MainScope
非常简单,只需要在activity onDestroy
中调用MainScope
的cancel
方法即可,而不需要定义其它协程的引用, 并且MainScope
的调度器是Dispatchers.Main
, 所以也不需要手动指定Main调度器
Lifecycle对协程的支持
发现Lifecycle
组件库在2.2.0
的alpha
版中已经有了对于协程的支持
需要添加lifecycle-runtime-ktx
依赖(正式版出来之后,请使用正式版)
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-alpha05"
lifecycle-runtime-ktx
中 给LifecycleOwner
添加了 lifecycleScope
扩展属性(类于上面介绍的MainScope
),用于方便的操作协程;
先看看源码
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
// SupervisorJob 指定协程作用域是单向传递
// Dispatchers.Main.immediate 指定协程体 在主线程中执行
// Dispatchers.Main.immediate 跟 Dispatchers.Main唯一的区别是,如果当前在主线程,这立马执行协程体,而不是走Dispatcher分发流程
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}
同时LifecycleCoroutineScope
还提供了绑定LifecycleOwner
生命周期(一般是指activity
和fragment
)的启动协程的方法;如下:
abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
internal abstract val lifecycle: Lifecycle
// 当 activity 处于created的时候执行 协程体
fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenCreated(block)
}
// 当 activity 处于start的时候执行 协程体
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenStarted(block)
}
// 当 activity 处于resume的时候执行 协程体
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenResumed(block)
}
}
由于上面启动协程的方法绑定了activity
生命周期,所以在activity
destroy
的时候,也实现了自动cancel掉协程
所以我们 CoroutineActivity
Demo的代码可以写的更加简单,如下:
class CoroutineActivity : AppCompatActivity() {
private lateinit var testApi: TestApi
...
fun requestData1() {
lifecycleScope.launchWhenResumed {
try {
val lastedNews = testApi.getLatestNews2()
tv_text.text = lastedNews.stories!![0].title
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
}
LiveData对协程的支持
同时Google也对LiveData提供了对协程的支持,不过需要添加lifecycle-livedata-ktx
依赖
// 现在还是`alpha`版,等正式版发布以后,请替换成正式版
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha05"
lifecycle-livedata-ktx
依赖添加了liveData
顶级函数,返回CoroutineLiveData
源码如下:
...
internal const val DEFAULT_TIMEOUT = 5000L
...
fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit ): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
CoroutineLiveData
是在什么时候启动协程并执行协程体的呢???
internal class CoroutineLiveData<T>(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
private var emittedSource: EmittedSource? = null
init {
val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
blockRunner = BlockRunner(
liveData = this,
block = block,
timeoutInMs = timeoutInMs,
scope = scope
) {
blockRunner = null
}
}
...
// observer(观察者)个数有0到1时执行
// 即第一次调用observe或observeForever时执行
override fun onActive() {
super.onActive()
// 启动协程并执行协程体
blockRunner?.maybeRun()
}
// observer(观察者)个数有1到0时执行
// 即调用removeObserver时触发检查并执行回调
override fun onInactive() {
super.onInactive()
// 取消协程
blockRunner?.cancel()
}
}
可见CoroutineLiveData
是在onActive()
启动协程,在onInactive()
取消协程
所以使用LiveData
对协程的支持, 那么CoroutineActivity
Demo的代码写法如下
class CoroutineActivity : AppCompatActivity() {
private lateinit var testApi: TestApi
...
fun requestData1() {
liveData {
try {
val lastedNews = testApi.getLatestNews2()
emit(lastedNews.stories!![0].title!!)
} catch(e: Exception) {
emit("error")
}
}.observe(this, Observer {
tv_text.text = it
})
}
}
上面我们讲了协程在android里最常用的用法,下面将介绍协程的一些基本知识
协程上下文
协程上下文用CoroutineContext
表示,kotlin
中 比较常用的Job
、协程调度器(CoroutineDispatcher)
、协程拦截器(ContinuationInterceptor)
等都是CoroutineContext
的子类,即它们都是协程上下文
先看一下CoroutineContext
比较重要的plus
方法,它是一个用operator
修复的重载(+)
号的操作符方法
@SinceKotlin("1.3")
public interface CoroutineContext {
/** * Returns a context containing elements from this context and elements from other [context]. * The elements from this context with the same key as in the other one are dropped. */
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
}
比如上面说的MainScope
定义就使用了+号操作符
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
如果你看启动协程的源码就会发现,在kotlin
中 大量使用 + 号操作符,所以kotlin中大部分CoroutineContext
对象都是CombinedContext
对象
上面的example
使用的launch
方法启动协程有三个参数, 分别是协程上下文
、协程启动模式
、协程体
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, // 协程上下文 start: CoroutineStart = CoroutineStart.DEFAULT, // 协程启动模式 block: suspend CoroutineScope.() -> Unit // 协程体 ): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
协程启动模式
-
DEFAULT
立即执行协程体
runBlocking { val job = GlobalScope.launch(start = CoroutineStart.DEFAULT) { println("1: " + Thread.currentThread().name) } // 不需要调用join方法 // job.join() }
打印结果
1: DefaultDispatcher-worker-1
CoroutineStart.DEFAULT
启动模式不需要手动调用join
或start
等方法,而是在调用launch
方法的时候就会自动执行协程体的代码 -
LAZY
只有在需要的情况下才执行协程体
runBlocking { val job = GlobalScope.launch(start = CoroutineStart.LAZY) { println("1: " + Thread.currentThread().name) } // 一定调用join方法 job.join() }
打印结果
1: DefaultDispatcher-worker-1
CoroutineStart.LAZY
启动模式一定要手动调用join
或start
等方法,否者协程体不会执行 -
ATOMIC
立即执行协程体,但在开始运行之前无法取消, 即开启协程会无视
cancelling
状态runBlocking { val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) { println("1: " + Thread.currentThread().name) delay(1000) println("2: " + Thread.currentThread().name) } job.cancel() delay(2000) }
打印结果
1: DefaultDispatcher-worker-1
CoroutineStart. ATOMIC
启动模式的协程体 即使调了cancel
方法 也一定会执行,因为开启协程会无视cancelling
状态;上面的example只打印了一句话,是因为执行delay(1000)
的时候 发现协程处于关闭状态, 所以出现了JobCancellationException
异常,导致下面的代码没有执行,如果delay(1000)
这句代码用try catch
捕获一下异常,就会继续执行下面的代码 -
UNDISPATCHED
立即在当前线程执行协程体,直到第一个 suspend 调用 挂起之后的执行线程取决于上下文当中的调度器了
runBlocking { println("0: " + Thread.currentThread().name) // 注意这里没有用GlobalScope.launch // 因为GlobalScope.launch启动的是一个顶层协程, 无法关联当前协程的上下文(coroutineContext), 导致结果有偏差 launch(context = Dispatchers.Default, start = CoroutineStart.UNDISPATCHED) { println("1: " + Thread.currentThread().name) delay(1000) println("2: " + Thread.currentThread().name) } delay(2000) }
打印结果
0: main 1: main 2: DefaultDispatcher-worker-1
可见 0 和 1 的执行线程是一样的,当执行完
delay(1000)
, 后面的代码执行线程取决于Dispatchers.Default
调度器指定的线程,所以 2 在另一个线程中执行
协程调度器
协程调度器 其实也是 协程上下文
协程调度器是用来指定协程代码块在哪个线程中执行,kotlin
提供了几个默认的协程调度器,分别是Default
、Main
、Unconfined
, 并针对jvm
, kotlin
提供了一个特有的IO
调度器
-
Dispatchers.Default
指定代码块在线程池中执行
GlobalScope.launch(Dispatchers.Default) { println("1: " + Thread.currentThread().name) launch (Dispatchers.Default) { delay(1000) // 延迟1秒后,再继续执行下面的代码 println("2: " + Thread.currentThread().name) } println("3: " + Thread.currentThread().name) }
打印结果如下
1: DefaultDispatcher-worker-1 3: DefaultDispatcher-worker-1 2: DefaultDispatcher-worker-1
-
Dispatchers.Main
指定代码块在main线程中执行(针对Android就是ui线程)
GlobalScope.launch(Dispatchers.Default) { println("1: " + Thread.currentThread().name) launch (Dispatchers.Main) { delay(1000) // 延迟1秒后,再继续执行下面的代码 println("2: " + Thread.currentThread().name) } println("3: " + Thread.currentThread().name) }
打印结果如下:
1: DefaultDispatcher-worker-1 3: DefaultDispatcher-worker-1 2: main
可见Dispatchers.Main就是指定协程代码块在main线程中执行
-
Dispatchers.Unconfined
没有指定协程代码快在哪个特定线程中执行,即当前在哪个线程,代码块中接下来的代码就在哪个线程中执行(即一段协程代码块 由于启动了子协程 导致切换了线程, 那么接下来的代码块也是在这个线程中执行)
GlobalScope.launch(Dispatchers.Default) { println("1: " + Thread.currentThread().name) launch (Dispatchers.Unconfined) { println("2: " + Thread.currentThread().name) requestApi() // delay(1000) 本来想用delay,但是使用requestApi 可能更加清晰 println("3: " + Thread.currentThread().name) } println("4: " + Thread.currentThread().name) } // 定义一个挂起函数,在一个新的子线程中执行 private suspend fun requestApi() = suspendCancellableCoroutine<String> { Thread { println("5: requestApi: " + Thread.currentThread().name) it.resume("success") }.start() }
打印结果如下:
1: DefaultDispatcher-worker-1 2: DefaultDispatcher-worker-1 5: requestApi: Thread-3 4: DefaultDispatcher-worker-1 3: Thread-3
可见2 和 3的代码 执行线程明显不一样;当执行到requestApi这句代码的时候 会切换到子线程(即
Thread-3
)中执行代码,然后接下来的协程代码块就会在Thread-3
中执行 -
Dispatchers.IO
它是基于
Default
调度器背后的线程池,并实现了独立的队列和限制,因此协程调度器从Default
切换到IO
并不会触发线程切换GlobalScope.launch(Dispatchers.Default) { println("1: " + Thread.currentThread().name) launch (Dispatchers.IO) { println("2: " + Thread.currentThread().name) requestApi() // delay(1000) println("3: " + Thread.currentThread().name) } println("4: " + Thread.currentThread().name) }
打印结果如下:
1: DefaultDispatcher-worker-1 4: DefaultDispatcher-worker-1 2: DefaultDispatcher-worker-1 5: requestApi: Thread-3 3: DefaultDispatcher-worker-1
-
绑定到任意自定义线程的调度器(这种方式要谨慎使用)
可以使用
kotlin
自带newSingleThreadContext
方法或者使用ExecutorService
的扩展方法asCoroutineDispatcher
创建一个Dispatcher
// 第一种方法 val dispatcher = newSingleThreadContext("custom thread") // 第二种方法 // val dispatcher = Executors.newSingleThreadExecutor{ r -> Thread(r, "custom thread") }.asCoroutineDispatcher() GlobalScope.launch(dispatcher) { println("1: " + Thread.currentThread().name) delay(1000) println("2: " + Thread.currentThread().name) } runBlocking { delay(2000L) // 一定要close,否则线程永远都不会结束,很危险 dispatcher.close() }
打印结果如下:
1: custom thread 2: custom thread
可见我们可以自己创建线程绑定到协程调度器上,但是这种方式不建议使用,因为一旦手动创建了线程 就需要手动close,否则线程就永远也不会终止,这样会很危险
协程作用域GlobalScope、coroutineScope、supervisorScope
协程作用域是一个非常重的东西
-
GlobeScope
GlobeScope
启动的协程会单独启动一个作用域,无法继承外面协程的作用域,其内部的子协程遵从默认的作用域规则 -
coroutineScope
coroutineScope
启动的协程会继承父协程的作用域,其内部的取消操作是双向传播的,子协程未捕获的异常也会向上传递给父协程 -
supervisorScope
supervisorScope
启动的协程会继承父协程的作用域,他跟coroutineScope
不一样的点是 它是单向传递的,即内部的取消操作和异常传递 只能由父协程向子协程传播,不能从子协程传向父协程MainScope
就是使用的supervisorScope
作用域,所以只需要子协程 出错 或cancel
并不会影响父协程,从而也不会影响兄弟协程
协程异常传递模式
协程的异常传递跟协程作用域有关,要么跟coroutineScope
一样双向传递,要么跟supervisorScope
一样由父协程向子协程单向传递
针对supervisorScope
的单向传递
runBlocking {
println("1")
supervisorScope {
println("2")
// 启动一个子协程
launch {
1/0 // 故意让子协程出现异常
}
delay(100)
println("3")
}
println("4")
}
打印结果如下:
1
2
Exception in thread "main @coroutine#2" java.lang.ArithmeticException: / by zero
3
4
可见在supervisorScope
作用域中启动的子协程如果出现异常,并没有导致父协程异常,并且父协程的代码还能继续往下执行
我们再来验证一下再supervisorScope
作用域中父协程异常是否会传递给子协程
runBlocking {
println("1")
supervisorScope {
println("2")
// 启动一个子协程
launch {
try {
delay(1000)
println("3")
} catch (e: Exception) {
println("error")
}
}
delay(100)
1/0 //父协程报错
println("3")
}
}
1
2
error
java.lang.ArithmeticException: / by zero
可见在supervisorScope
作用域中 父协程确实会将异常传递给子协程
针对coroutineScope
的双向传递
runBlocking {
println("1")
try {
coroutineScope {
println("2")
// 启动一个子协程
launch {
1/0 // 故意让子协程出现异常
}
delay(100)
println("3")
}
} catch (e: Exception) {
println("error")
}
}
打印结果如下:
1
2
error
可见在coroutineScope
作用域中启动的子协程如果出现异常,则会传递给父协程
我们再来验证一下再coroutineScope
作用域中父协程异常是否会传递给子协程
runBlocking {
println("1")
coroutineScope {
println("2")
// 启动一个
launch {
try {
delay(1000)
println("3")
} catch (e: Exception) {
println("error")
}
}
delay(100)
1/0
println("3")
}
}
打印结果如下:
1
2
error
java.lang.ArithmeticException: / by zero
可见在coroutineScope
作用域中 父协程确实会将异常传递给子协程
协程取消
先看一段代码
GlobalScope.launch {
println("1")
// 启动一个子协程
val job = launch {
println("2")
try {// 捕获 协程cancel导致的异常,让代码继续往下执行
delay(1000)
} catch (e: Exception) {
println("error")
}
println("3")
if (isActive) { // 如果协程cancel了,则isActive为false
println("4")
}
delay(1000) // 没有捕获异常,则终止代码继续往下执行
println("5")
}
delay(100)
job.cancel()
}
打印结果如下:
1
2
error
3
当先启动协程,然后cancel,会出现如下几种情况:
- 如果执行到协程体内的代码依赖协程的cancel状态(比如delay方法),则会抛出异常,如果捕获了异常,则会继续往下执行,如果没有捕获异常则终止往下继续执行协程体
- 如果协程体内的代码不依赖协程的cancel状态(即println方法),则会继续往下执行
也就是说 协程的取消(cancel) 导致协程体终止运行的方式是 抛出异常,如果协程体的代码不依赖协程的cancel状态(即没有报错),则协程的取消 对协程体的执行一般没什么影响
比如:
GlobalScope.launch {
val job = launch {
println("==start==")
var i = 0
while (i <= 10) {
Thread.sleep(100)
println(i++)
}
println("==end==")
}
delay(100)
job.cancel()
}
打印结果如下:
==start==
0
1
2
3
4
5
6
7
8
9
10
==end==
可见即使协程取消了,协程体还是在继续运行
如果想结束协程体的运行该怎么办呢??
这个时候可以使用CoroutineScope
的isActive字段判断协程的状态是否被取消了
GlobalScope.launch {
val job = launch {
println("==start==")
var i = 0
while (i <= 10 && isActive) {
Thread.sleep(100)
println(i++)
}
println("==end==")
}
delay(200)
job.cancel()
}
打印结果
==start==
0
1
==end==
可见如果协程取消了,可以使用isActive
字段来判断是否需要执行协程体的某段代码
withContext
在执行协程体的时候,可以使用withContext
方便的切换代码执行所运行线程;比如
GlobalScope.launch(Dispatchers.Default) {
// 在Dispatchers.Default的线程池中执行
println("1: " + Thread.currentThread().name)
withContext(Dispatchers.Main) { // 切换到主线程执行
println("2: " + Thread.currentThread().name)
}
// 在Dispatchers.Default的线程池中执行
println("3: " + Thread.currentThread().name)
val dispatcher = newSingleThreadContext("custom thread")
withContext(dispatcher) { // 切换到自定义线程中执行
println("4: " + Thread.currentThread().name)
}
dispatcher.close()
// 在Dispatchers.Default的线程池中执行
println("5: " + Thread.currentThread().name)
}
打印结果
1: DefaultDispatcher-worker-1
2: main
3: DefaultDispatcher-worker-2
4: custom thread
5: DefaultDispatcher-worker-2
可见我们可以使用withContext
方便的切换代码运行所在的线程
withContext
还可以配合NonCancellable
上下文确保代码块不能被取消
GlobalScope.launch(Dispatchers.Default) {
val job = launch {
println("1: " + Thread.currentThread().name)
try {
delay(1000)
} catch (e: Exception) {
withContext(NonCancellable) { // 配合NonCancellable上下文确保协程体不能被取消
println("error: " + e.message)
delay(100) // 如果没有用withContext(NonCancellable)包裹,则delay(100)会报错, 导致下面的代码不执行
println("2: " + Thread.currentThread().name)
}
}
}
delay(100)
job.cancel()
}
打印结果
1: DefaultDispatcher-worker-1
error: Job was cancelled
2: DefaultDispatcher-worker-1
结构化并发
什么是结构化并发呢?
其实很简单,即保证启动的协程在同一作用域中(个人理解)
当我们使用GlobalScope.launch启动协程的时候会创建一个顶层协程,如果我们每次都使用GlobalScope.launch启动协程, 那么就会创建很多个顶层协程,并且不会相互干扰,即即使一个协程出错或的取消了,另一个协程还是会继续运行,因为它们不是在同一个协程作用域中
GlobalScope.launch(Dispatchers.Default) {
val a1 = GlobalScope.async { 这里使用async启动协程,没有使用launch
delay(1000)
println("1: " + Thread.currentThread().name)
}
val a2 = GlobalScope.async {
delay(100)
1/0 // 故意报错
println("2: " + Thread.currentThread().name)
}
a1.await()
a2.await() // a2.cancel() 也可以使用cancel
}
打印结果如下
1: DefaultDispatcher-worker-1
Exception in thread "DefaultDispatcher-worker-1" java.lang.ArithmeticException: / by zero
可见a2
报错或cancel,并不会影响a1
这到底会引起什么问题呢?
比如我们在一个activity中通常会有多个并发网络请求 请求数据(即会启动多个协程),当其中一个网络请求出错时(即协程出错),我们希望关闭其它并行的网络请求,而不处理(即希望关闭掉其它协程),但是结果并非如此
再比如我们在一个activity中通常会有许多个网络请求(即会启动许多个协程),如果我们总是使用GlobalScope启动协程,那么必须保持每个协程的引用,并在activity destroy时cancel掉所有协程,否则即使activity destroy,那么协程里的异步请求代码还是会继续执行,这样很容易出错或内存泄漏
我们该怎么方便的解决这样的问题呢?
其实我们可以使用结构化并发(即协程作用域)来解决这样的问题,即保证启动的多个协程在同一作用域中,如果cancel掉这个作用域上下文,那么在这个作用域下启动的所有子协程都会取消,同时还可以配合coroutineScope、supervisorScope协程作用域 处理异常传递的问题
所以上面的代码可以这样改
GlobalScope.launch(Dispatchers.Default) {
val a1 = async {
delay(1000)
println("1: " + Thread.currentThread().name)
}
val a2 = async {
delay(100)
1/0 // 故意报错
println("2: " + Thread.currentThread().name)
}
a1.await()
a2.await()
}
即把启动 a1
、a2
协程的GlobalScope去掉,保证a1
、a2
在同一协程作用域中
协程挂起函数原理分析
我们先来看一看retrofit兼容协程的实现源码
suspend fun <T : Any> Call<T>.await(): T {
// 使用suspendCancellableCoroutine定义挂起函数,参数是Continuation对象
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
val invocation = call.request().tag(Invocation::class.java)!!
val method = invocation.method()
val e = KotlinNullPointerException("Response from " +
method.declaringClass.name +
'.' +
method.name +
" was null but response body type was declared as non-null")
// 如果结果异常,则调用Continuation 的 resumeWithException回调
continuation.resumeWithException(e)
} else {
// 如果结果正常,则调用Continuation 的 resume回调
continuation.resume(body)
}
} else {
// 如果结果异常,则调用Continuation 的 resumeWithException回调
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<T>, t: Throwable) {
// 如果结果异常,则调用Continuation 的 resumeWithException回调
continuation.resumeWithException(t)
}
})
}
}
Continuation
的源码和扩展函数如下
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
/** * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the * last suspension point. */
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
@SinceKotlin("1.3")
public interface Continuation<in T> {
/** * The context of the coroutine that corresponds to this continuation. */
public val context: CoroutineContext
/** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. */
public fun resumeWith(result: Result<T>)
}
可见协程挂起函数内部是使用回调将结果返回出去的,当有结果正常返回的时候,Continuation 调用 resume 返回结果,否则调用 resumeWithException 来抛出异常,这与 Callback 的模式一模一样
而我们写协程代码之所以可以看起来是同步的,其实是编译器帮你做了很多事情(即你可以当它是“语法糖”)
注意:使用AndroidStudio反编译kotlin协程代码的时候会导致ide严重卡顿,并且反编译出来的java代码有无数层的嵌套,不知道是无法反编译协程代码,还是AndroidStudio的bug, 导致无法配合kotlin反编译的java代码来讲解
协程的状态转移
上面已经对协程挂起函数原理做了一些解析,如果我们使用了多个挂起函数 那它们是怎么配合运行的呢?
注意: 下面的代码是我copy的别人的代码
suspend fun main() {
log(1)
// returnSuspended()是一个suspend函数
log(returnSuspended())
log(2)
// delay也是一个suspend函数
delay(1000)
log(3)
// returnImmediately也是一个suspend函数
log(returnImmediately())
log(4)
}
对应的java实现代码逻辑如下(注意,下面的代码逻辑上并不能做到十分严谨,仅供学习理解协程使用)
public class ContinuationImpl implements Continuation<Object> {
// label 状态 默认为 0
private int label = 0;
private final Continuation<Unit> completion;
public ContinuationImpl(Continuation<Unit> completion) {
this.completion = completion;
}
@Override
public CoroutineContext getContext() {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object o) {
try {
Object result = o;
switch (label) {
case 0: {
LogKt.log(1);
// 在SuspendFunctionsKt.returnSuspended内部以回调的方式 调用this的resumeWith方法
result = SuspendFunctionsKt.returnSuspended( this);
// label 状态加 1
label++;
if (isSuspended(result)) return;
}
case 1: {
LogKt.log(result);
LogKt.log(2);
// 在DelayKt.delay内部以回调的方式 调用this的resumeWith方法
result = DelayKt.delay(1000, this);
// label 状态加 1
label++;
if (isSuspended(result)) return;
}
case 2: {
LogKt.log(3);
// 在SuspendFunctionsKt.returnImmediately内部以回调的方式 调用this的resumeWith方法
result = SuspendFunctionsKt.returnImmediately( this);
// label 状态加 1
label++;
if (isSuspended(result)) return;
}
case 3:{
LogKt.log(result);
LogKt.log(4);
}
}
completion.resumeWith(Unit.INSTANCE);
} catch (Exception e) {
completion.resumeWith(e);
}
}
private boolean isSuspended(Object result) {
return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
}
}
可见多个挂起函数之间的配合使用是使用label
这个状态字段不断加1 并且 不断调用resumeWith
方法实现的
总结如下:
- 协程的挂起函数本质上就是一个回调,回调类型就是 Continuation
- 协程体的执行就是一个状态机,每一次遇到挂起函数,都是一次状态转移,就像我们前面例子中的 label 不断的自增来实现状态流转一样
最后 非常感谢激活成功教程 Kotlin 协程的博客,这是学习Coroutine
非常好的文章,建议大家去看看
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/13259.html