前言

  • 显然,我们不能在 Activity 中调用 GlobalScope,这样可能会造成内存泄漏,看一下如何自定义作用域,具体的步骤我在注释中已给出:

    class MainActivity : AppCompatActivity() {
        // 1. 创建一个 MainScope
        val scope = MainScope()

        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            
            // 2. 启动协程
            scope.launch(Dispatchers.Unconfined) {
                val one = getResult(20)
                val two = getResult(40)
                mNumTv.text = (one + two).toString()
            }
        }

        // 3. 销毁的时候释放
        override fun onDestroy() {
            super.onDestroy()

            scope.cancel()
        }

        private suspend fun getResult(num: Int): Int {
            delay(5000)
            return num * num
        }
    }

    调度器

    调度器的作用是将协程限制在特定的线程执行。主要的调度器类型有:

    什么是挂起?我们就以九心吃饭为例,如果到公司对面的广场吃饭,九心得经过:

    如果九心点广场的外卖呢?

    从九心吃饭的例子可以看出,如果点了外卖,九心花费的时间较少了,可以空闲出更多的时间做自己的事。再仔细分析一下,其实从公司到广场和等待取餐这个过程并没有省去,只是九心把这个过程交给了外卖员。

    协程的原理跟九心点外卖的原理是一致的,耗时阻塞的操作并没有减少,只是交给了其他线程:即学即用Kotlin - 协程-LMLPHP

    launch

    launch 的作用从它的名称就可以看的出来,启动一个新的协程,它返回的是一个 Job对象,我们可以调用 Job#cancel() 取消这个协程。

    除了 launch,还有一个方法跟它很像,就是 async,它的作用是创建一个协程,之后返回一个 Deferred<T>对象,我们可以调用 Deferred#await()去获取返回的值,有点类似于 Java 中的   Future,稍微改一下上面的代码:

    class MainActivity : AppCompatActivity() {
        // 1. 创建一个 MainScope
        val scope = MainScope()

        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)

            // 2. 启动协程
            scope.launch(Dispatchers.Unconfined) {
                val one = async { getResult(20) }
                val two = async { getResult(40) }
                mNumTv.text = (one.await() + two.await()).toString()
            }
        }

        // 3. 销毁的时候释放
        override fun onDestroy() {
            super.onDestroy()

            scope.cancel()
        }

        private suspend fun getResult(num: Int): Int {
            delay(5000)
            return num * num
        }
    }

    与修改前的代码相比,async 能够并发执行任务,执行任务的时间也因此缩短了一半。

    除了上述的并发执行任务,async 还可以对它的 start 入参设置成懒加载

    val one = async(start = CoroutineStart.LAZY) { getResult(20) }

    这样系统就可以在调用它的时候再为它分配资源了。

    suspend

    suspend 是修饰函数的关键字,意思是当前的函数是可以挂起的,但是它仅仅起着提醒的作用,比如,当我们的函数中没有需要挂起的操作的时候,编译器回给我们提醒 Redudant suspend modifier,意思是当前的 suspend 是没有必要的,可以把它删除。

    那我们什么时候需要使用挂起函数呢?常见的场景有:

    withContext  的代码:

    private suspend fun getResult(num: Int): Int {
        return withContext(Dispatchers.IO) {
            num * num
        }
    }

    delay 的代码:

    private suspend fun getResult(num: Int): Int {
        delay(5000)
        return num * num
    }

    结合 Android Jetpack

    在介绍自定义协程作用域的时候,我们需要主动在 Activity 或者 Fragment 中的 onDestroy 方法中调用 job.cancel(),忘记处理可能是程序员经常会犯的错误,如何避免呢?

    Google 总是能够解决程序员的痛点,在 Android Jetpack 中的 lifecycleLiveDataViewModel 已经集成了快速使用协程的方法,如果我们已经引入了 Android Jetpack,可以引入依赖:

        dependencies {
            def lifecycle_version = "2.2.0"
            
            // ViewModel
            implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version"
            // LiveData
            implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle_version"
            // Lifecycles only (without ViewModel or LiveData)
            implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle_version"
        }

    使用可以结合具体的场景,比如结合 Lifecycle,需要使用 lifecycleScope 协程作用域:

    lifecycleScope.launch {
        // 代表当前生命周期处于 Resumed 的时候才会执行(选择性使用)
        whenResumed { 
            // ... 具体的协程代码
        }
    }

    即使你不使用 Android Jetpack 组件,由于 Lifecycles 在很早之前就内置在 Android 系统的代码中,所以你仍然可以仅仅引入 Lifecycle 的协程扩展库,因为它会帮助你很好的处理 Activity 或者 Fragment 的生命周期。

    引入 Android Jetpack 协程扩展库官方文档:点我打开

    二、流

    长期以来,在 Android 中响应式编程的首选方案是 RxJava,我们今天就来了解一下 Kotlin中的响应式编程 Flow。如果你能熟练使用 RxJava,那你肯定能快速上手 Flow。

    曾经我在《即学即用Android Jetpack - ViewModel & LiveData》一文中说过,LiveData 的使用类似于 RxJava,现在我收回这句话,事实上,LiveData 更加简单和纯粹,它建立单一的生产消费模型,Flow 才是类似于 RxJava 的存在。

    1. 基础

    先上一段代码:

    lifecycleScope.launch {
        // 创建一个协程 Flow<T>
        createFlow()
            .collect {num->
                // 具体的消费处理
                // ...
            }
        }
    }

    我在 createFlow 这个方法中,返回了 Flow<Int> 的对象,所以我们可以这样对比。

    创建 Flow 对象

    我们暂不考虑 RxJava中的背压和非背压,直接先将 Flow 对标 RxJava 中的 Observable

    和 RxJava 一样,在创建 Flow 对象的时候我们也需要调用 emit 方法发射数据:

    fun createFlow(): Flow<Int> = flow {
        for (i in 1..10)
            emit(i)
    }

    一直调用 emit 可能不便捷,因为 RxJava 提供了 Observable.just() 这类的操作符,显然,Flow 也为我们提供了快速创建操作:

    比如可以使用 (1..10).asFlow() 代替上述的 Flow 对象的创建。

    消费数据

    collect 方法和 RxJava 中的 subscribe 方法一样,都是用来消费数据的。

    除了简单的用法外,这里有两个问题得注意一下:

    2. 线程切换

    我们学习 RxJava 的时候,大佬们都会说,RxJava 牛逼,牛逼在哪儿呢?

    切换线程,同样的,Flow 的协程切换也很牛逼。Flow 是这么切换协程的:

    lifecycleScope.launch {
        // 创建一个协程 Flow<T>
        createFlow()
            // 将数据发射的操作放到 IO 线程中的协程
            .flowOn(Dispatchers.IO)
            .collect { num ->
                // 具体的消费处理
                // ...
            }
        }
    }

    和 RxJava 对比:

    改变数据发射的线程

    flowOn 使用的参数是协程对应的调度器,它实质改变的是协程对应的线程。

    改变消费数据的线程

    我在上面的表格中并没有写到在 Flow 中如何改变消费线程,并不意味着 Flow 不可以指定消费线程?

    Flow 的消费线程在我们启动协程指定调度器的时候就确认好了,对应着启动协程的调度器。比如在上面的代码中 lifecycleScope 启动的调度器是 Dispatchers.Main,那么 collect 方法就消费在主线程。

    3. 异常和完成

    异常捕获

    Flow 中的 catch 对应着 RxJava 中的 onErrorcatch 操作:

    lifecycleScope.launch {
        flow {
            //...
        }.catch {e->

        }.collect(

        )
    }

    除此以外,你可以使用声明式捕获 try { } catch (e: Throwable) { } 去捕获异常,不过 catch 本质上是一个扩展方法,它是对声明式捕获的封装。

    完成

    Flow 中的 onCompletion 对应这 RxJava 中的 onComplete 回调,onCompletion操作:

    lifecycleScope.launch {
        createFlow()
            .onCompletion {
                // 处理完成操作
            }
            .collect {

            }
    }

    除此以外,我们还可以通过捕获式 try {} finally {} 去获取完成情况。

    4. Flow的特点

    我们在对 Flow 已经有了一些基础的认知了,再来聊一聊 Flow 的特点,Flow 具有以下特点:

    如果你对 Kotlin 中的 Sequence 有一些认识,那么你应该可以轻松的 Get 到前两个点。

    冷流

    有点类似于懒加载,当我们触发 collect 方法的时候,数据才开始发射。

    lifecycleScope.launch {
        val flow = (1..10).asFlow().flowOn(Dispatchers.Main)

        flow.collect { num ->
                // 具体的消费处理
                // ...
            }
        }
    }

    也就是说,在第2行的时候,虽然流创建好了,但是数据一直到第四行发生 collect 才开始发射。

    有序

    看代码比较容易理解:

    lifecycleScope.launch {
        flow {
            for(i in 1..3) {
                Log.e("Flow","$i emit")
                emit(i)
            }
        }.filter {
            Log.e("Flow","$it filter")
            it % 2 != 0
        }.map {
            Log.e("Flow","$it map")
            "${it * it} money"
        }.collect {
            Log.e("Flow","i get $it")
        }
    }

    得到的日志:

    E/Flow: 1 emit
    E/Flow: 1 filter
    E/Flow: 1 map
    E/Flow: i get 1 money
    E/Flow: 2 emit
    E/Flow: 2 filter
    E/Flow: 3 emit
    E/Flow: 3 filter
    E/Flow: 3 map
    E/Flow: i get 9 money

    从日志中,我们很容易得出这样的结论,每个数据都是经过 emitfiltermapcollect 这一套完整的处理流程后,下个数据才会开始处理,而不是所有的数据都先统一 emit,完了再统一 filter,接着 map,最后再 collect

    协作取消

    Flow 采用和协程一样的协作取消,也就是说,Flow 的 collect 只能在可取消的挂起函数中挂起的时候取消,否则不能取消。

    如果我们想取消 Flow 得借助 withTimeoutOrNull 之类的顶层函数,不妨猜一下,下面的代码最终会打印出什么?

    lifecycleScope.launch {
        val f = flow {
            for (i in 1..3) {
                delay(500)
                Log.e(TAG, "emit $i")
                emit(i)
            }
        }
        withTimeoutOrNull(1600) {
            f.collect {
                delay(500)
                Log.e(TAG, "consume $it")
            }
        }
        Log.e(TAG, "cancel")
    }

    5. 操作符对比

    限于篇幅,我仅介绍一下 Flow 中操作符的作用,就不一一介绍每个操作符具体怎么使用了。

    普通操作符:

    特殊的操作符

    总会有一些特殊的情况,比如我只需要取前几个,我只要最新的数据等,不过在这些情况下,数据的发射就是并发执行的。

    组合操作符

    展平流操作符

    展平流有点类似于 RxJava 中的 flatmap,将你发射出去的数据源转变为另一种数据源。

    末端操作符

    顾名思义,就是帮你做 collect 处理,collect 是最基础的末端操作符。

    其他还有一些操作符,我这里就不一一介绍了,感兴趣可以查看 API。

    三、通道

    Channel是一个面向多协程之间数据传输的 BlockQueue。它的使用方式超级简单:

    lifecycleScope.launch {
        // 1. 生成一个 Channel
        val channel = Channel<Int>()

        // 2. Channel 发送数据
        launch {
            for(i in 1..5){
                delay(200)
                channel.send(i * i)
            }
            channel.close()
        }
                
        // 3. Channel 接收数据
        launch {
            for( y in channel)
                Log.e(TAG, "get $y")
        }
    }

    实现协程之间的数据传输需要三步:

    1.创建 Channel

    创建的 Channel的方式可以分为两种:

    如果使用了扩展函数,代码就变成了:

    lifecycleScope.launch {
        // 1. 生成一个 Channel
        val channel = produce<Int> {
            for(i in 1..5){
                delay(200)
                send(i * i)
            }
            close()
        }

        // 2. 接收数据
        // ... 省略 跟之前代码一致
    }

    直接将第一步和第二步合并了。

    2. 发送数据

    发送数据使用的  Channel#send() 方法,当我们数据发送完毕的时候,可以使用 Channel#close() 来表明通道已经结束数据的发送。

    3. 接收数据

    正常情况下,我们仅需要调用 Channel#receive() 获取数据,但是该方法只能获取一次传递的数据,如果我们仅需获取指定次数的数据,可以这么操作:

    repeat(4){
        Log.e(TAG, "get ${channel.receive()}")
    }

    但如果发送的数据不可以预估呢?这个时候我们就需要迭代 Channel

    for( y in channel)
        Log.e(TAG, "get $y")

    四、多协程数据处理

    多协程处理并发数据的时候,原子性同样也得不到保证,协程中出了一种叫 Mutex 的锁,区别是它的 lock 操作是挂起的,非阻塞的,感兴趣的同学可以自行查看。

    总结

    个人感觉协层的主要作用是简化代码的逻辑,减少了代码的回调地狱,结合 Kotlin,既可以写出优雅的代码,还能降低我们犯错的概率。至于提升多协程开发的性能?即学即用Kotlin - 协程-LMLPHP

    如果觉得本文不错,**「三连」**是对我最大的鼓励。我将会在下一篇文章中和大家讨论协程的原理,欢迎大家关注。

    学习协程和 kotlin 还是很有必要的,我们团队在开发新的功能的时候,也全部选择了 Kotlin。

    关于我

    我是九心,新晋互联网码农,如果想要进阶和了解更多的干货,欢迎关注我的公众号接收到的我的最新文章。

    参考文章:

    本文分享自微信公众号 - Android群英传(android_heroes)。
    如有侵权,请联系 support@oschina.cn 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    09-03 06:35