阅读完需:约 20 分钟
协程
在说协程之前可以先讲讲进程和线程:
有一定基础的小伙伴们肯定都知道进程和线程。
进程是什么呢?
直白地讲,进程就是应用程序的启动实例。比如我们运行一个游戏,打开一个软件,就是开启了一个进程。
进程拥有代码和打开的文件资源、数据资源、独立的内存空间。
线程又是什么呢?
线程从属于进程,是程序的实际执行者。一个进程至少包含一个主线程,也可以有更多的子线程。
线程拥有自己的栈空间。
对操作系统来说,线程是最小的执行单元,进程是最小的资源管理单元。
无论进程还是线程,都是由操作系统所管理的。
Java中线程具有五种状态:
但是,线程不同状态之间的转化是谁来实现的呢?是JVM吗?
并不是。JVM需要通过操作系统内核中的TCB(Thread Control Block)模块来改变线程的状态,这一过程需要耗费一定的CPU资源。
进程和线程的痛点
线程之间是如何进行协作的呢?
最经典的例子就是生产者/消费者模式:
若干个生产者线程向队列中写入数据,若干个消费者线程从队列中消费数据。
如何用java语言实现生产者/消费者模式呢?
让我们来看一看代码:
public class ProducerConsumerTest {
public static void main(String args[]) {
final Queue<Integer> sharedQueue = new Queue();
Thread producer = new Producer(sharedQueue);
Thread consumer = new Consumer(sharedQueue);
producer.start();
consumer.start();
}
}
class Producer extends Thread {
private static final int MAX_QUEUE_SIZE = 5;
private final Queue sharedQueue;
public Producer(Queue sharedQueue) {
super();
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (sharedQueue) {
while (sharedQueue.size() >= MAX_QUEUE_SIZE) {
System.out.println("队列满了,等待消费");
try {
sharedQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sharedQueue.add(i);
System.out.println("进行生产 : " + i);
sharedQueue.notify();
}
}
}
}
class Consumer extends Thread {
private final Queue sharedQueue;
public Consumer(Queue sharedQueue) {
super();
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
while (true) {
synchronized (sharedQueue) {
while (sharedQueue.size() == 0) {
try {
System.out.println("队列空了,等待生产");
sharedQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int number = sharedQueue.poll();
System.out.println("进行消费 : " + number);
sharedQueue.notify();
}
}
}
}
这段代码做了下面几件事:
1.定义了一个生产者类,一个消费者类。
2.生产者类循环100次,向同步队列当中插入数据。
3.消费者循环监听同步队列,当队列有数据时拉取数据。
4.如果队列满了(达到5个元素),生产者阻塞。
5.如果队列空了,消费者阻塞。
上面的代码正确地实现了生产者/消费者模式,但是却并不是一个高性能的实现。为什么性能不高呢?原因如下:
1.涉及到同步锁。
2.涉及到线程阻塞状态和可运行状态之间的切换。
3.涉及到线程上下文的切换。
以上涉及到的任何一点,都是非常耗费性能的操作。
协程,英文Coroutines,是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程。
最重要的是,协程不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行)。
这样带来的好处就是性能得到了很大的提升,不会像线程切换那样消耗资源。
既然协程这么好,它到底是怎么来使用的呢?
这段代码十分简单,即使没用过python的小伙伴应该也能基本看懂。
代码中创建了一个叫做consumer的协程,并且在主线程中生产数据,协程中消费数据。
其中 yield 是python当中的语法。当协程执行到yield关键字时,会暂停在那一行,等到主线程调用send方法发送了数据,协程才会接到数据继续执行。
但是,yield让协程暂停,和线程的阻塞是有本质区别的。协程的暂停完全由程序控制,线程的阻塞状态是由操作系统内核来进行切换。
因此,协程的开销远远小于线程的开销。
协程的应用
Lua语言
Lua从5.0版本开始使用协程,通过扩展库coroutine来实现。
Python语言
正如刚才所写的代码示例,python可以通过 yield/send 的方式实现协程。在python 3.5以后,async/await 成为了更好的替代方案。
Go语言
Go语言对协程的实现非常强大而简洁,可以轻松创建成百上千个协程并发执行。
Kotlin 协程
其实简单的说,协程就是可以控制线程切换的代码,或能实现类似的功能的,可以让异步代码同步化,如JS里的async
和await
等。 这种方式可以给我们简化代码,使得业务逻辑清晰。
最常见于网络请求的回调,如一般我们会有一个成功的回调和一个失败的回调,按照异步的写法,两个回调方法中都要处理逻辑,协程可以使异步回调按照同步的方式去返回结果,写法简单,清晰。
kotlin通过suspend关键字来指定函数可以挂起,实现异步转同步的功能。
异步程序最麻烦的就是,当你有多个异步程序同时运行时,比如同时并发请求了好几个接口,要想拿到返回值一并处理,就比较困难。还有就是如果我想按照顺序的异步执行,通常也比较困难
协程本身的概念实际包含了线程调度的概念,只有能控制线程切换,才有可能实现上面的功能。
线程其实是操作系统的概念,而协程则属于编程语言的范畴,它是应用程序层的api层的东西,二者其实没什么太大关系。
值得一提的是协程并不是kotlin里面特有的东西,其他语言如Python、JS等也要类似的语法可以实现。
比较常见的可能就是async
和await
关键字,JS里面比较常见,但是这玩意居然在C#和Python里面也有
js需要在使用的时候添加这俩关键字,kotlin就比较优秀了,使用的时候感觉不出来,就像完全的同步代码一样,只不过它是定义的时候写好了罢了。
挂起函数
kotlin实现恢复挂起点是通过一个接口类Continuation
(英文翻译过来叫”继续”)来实现的
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
它有一个上下文对象,还有一个resumeWith
方法,这个方法就是用来恢复挂起函数用的,如果是成功和失败回调都可以通过result参数来返回。
添加了suspend
关键字的函数,kotlin最终生成下面的方法:
可以看到,suspend
函数实际上需要一个continuation参数
如果挂起函数没有真正被挂起(没有发生线程切换)返回值返回的就是实际参数类型,否则返回的是一个标记。
suspend fun getUserSuspend(name: String) = suspendCoroutine<User> { continuation ->
githubApi.getUserCallback(name).enqueue(object: Callback<User>{
override fun onFailure(call: Call<User>, t: Throwable) =
continuation.resumeWithException(t)
override fun onResponse(call: Call<User>, response: Response<User>) =
response.takeIf { it.isSuccessful }?.body()?.let(continuation::resume)
?: continuation.resumeWithException(HttpException(response))
})
}
suspend fun main(){
val user = getUserSuspend("bennyhuo")
showUser(user)
}
最简单的复写挂起函数的回调:
suspend fun suspendFunc() = suspendCoroutine<Int> {
it.resumeWith(Result.success(1))
}
只不过真正的挂起需要真正的切换线程,如果直接调用的话相当于没有挂起。
协程的创建
suspend {
}.createCoroutine(object: Continuation<Unit>{ //创建协程
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
log("Coroutine End with $result")
}
}).resume(Unit) //恢复
suspend {
}.startCoroutine(object: Continuation<Unit>{ //启动协程
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
log("Coroutine End with $result") //协程执行完后调用
}
})
简单的例子,异步转同步,先使用retrofit的api创建一个接口请求实例:
import okhttp3.Interceptor
import okhttp3.OkHttpClient
import retrofit2.Call
import retrofit2.converter.gson.GsonConverterFactory
import retrofit2.http.GET
import retrofit2.http.Path
val githubApi by lazy {
val retrofit = retrofit2.Retrofit.Builder()
.client(OkHttpClient.Builder().addInterceptor(Interceptor {
it.proceed(it.request()).apply {
log("request: ${code()}")
}
}).build())
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.build()
retrofit.create(GitHubApi::class.java)
}
interface GitHubApi {
@GET("users/{login}")
fun getUserCallback(@Path("login") login: String): Call<User>
@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User
}
data class User(val id: String, val name: String, val url: String)
常见的调用场景:
import com.bennyhuo.kotlin.coroutinebasics.api.User
import com.bennyhuo.kotlin.coroutinebasics.api.githubApi
import retrofit2.Call
import retrofit2.Callback
import retrofit2.Response
//普通的异步请求,成功和失败需要分开在两个回调函数中处理
fun async() {
val call = githubApi.getUserCallback("bennyhuo")
call.enqueue(object : Callback<User> {
override fun onFailure(call: Call<User>, t: Throwable) {
showError(t)
}
override fun onResponse(call: Call<User>, response: Response<User>) {
response.body()?.let(::showUser) ?: showError(NullPointerException())
}
})
}
//for循环中发起多个异步请求,获取请求结果
fun asyncLoop() {
val names = arrayOf("abreslav","udalov", "yole")
names.forEach { name ->
val call = githubApi.getUserCallback(name)
call.enqueue(object : Callback<User> {
override fun onFailure(call: Call<User>, t: Throwable) {
showError(t)
}
override fun onResponse(call: Call<User>, response: Response<User>) {
response.body()?.let(::showUser) ?: showError(NullPointerException())
}
})
}
}
//使用挂起函数来请求,不需要请求的回调
suspend fun coroutine(){
val names = arrayOf("abreslav","udalov", "yole")
names.forEach { name ->
try {
val user = githubApi.getUserSuspend(name) //请求后这里会挂起,直到请求成功之后恢复执行
showUser(user)
} catch (e: Exception) {
showError(e)
}
}
}
//通过挂起函数的方式获取所有异步请求的结果放到一个数组当中
suspend fun coroutineLoop(){
val names = arrayOf("abreslav","udalov", "yole")
val users = names.map { name ->
githubApi.getUserSuspend(name)
}
}
实例
模仿Python的序列生成器Generator
import kotlin.coroutines.*
interface Generator<T> {
operator fun iterator(): Iterator<T>
}
class GeneratorImpl<T>(private val block: suspend GeneratorScope<T>.(T) -> Unit, private val parameter: T): Generator<T> {
override fun iterator(): Iterator<T> {
return GeneratorIterator(block, parameter)
}
}
//密封类的使用 定义状态
sealed class State {
//continuation作为参数方便下次调用
class NotReady(val continuation: Continuation<Unit>): State()
class Ready<T>(val continuation: Continuation<Unit>, val nextValue: T): State()
object Done: State()
}
//GeneratorScope<T>.(T) 点左边的是receiver
class GeneratorIterator<T>(private val block: suspend GeneratorScope<T>.(T) -> Unit, override val parameter: T)
: GeneratorScope<T>(), Iterator<T>, Continuation<Any?> {
override val context: CoroutineContext = EmptyCoroutineContext
private var state: State
init {
val coroutineBlock: suspend GeneratorScope<T>.() -> Unit = { block(parameter) } //挂起函数 调用block lambda表达式传入parameter参数
val start = coroutineBlock.createCoroutine(this, this) //不需要马上启动的话使用createCoroutine创建协程
state = State.NotReady(start) //初始状态肯定是NotReady,createCoroutine返回的start参数就是Continuation类型
println("init====================")
}
//yield是一个挂起函数 覆写GeneratorScope类的方法
override suspend fun yield(value: T) = suspendCoroutine<Unit> {
continuation ->
println("yield========${state.javaClass.simpleName} value=${value}")
state = when(state) {
is State.NotReady -> State.Ready(continuation, value) //调用yield(xx)方法使状态进入Ready状态
is State.Ready<*> -> throw IllegalStateException("Cannot yield a value while ready.")
State.Done -> throw IllegalStateException("Cannot yield a value while done.")
}
//这里continuation没有直接调用resume方法,在后面用户调用hasNext()或next()时调用resume()
}
private fun resume() {
println("resume()====================")
//val currentState = state之后调用.continuation会自动类型转换
when(val currentState = state) {
is State.NotReady -> {
println("resume()====================when NotReady")
currentState.continuation.resume(Unit) // NotReady时调用Continuation的resume方法恢复挂起点继续执行
}
}
}
override fun hasNext(): Boolean {
println("hasNext()====================")
resume()
return state != State.Done
}
//next方法返回yield存入的值
override fun next(): T {
println("next()========${state.javaClass.simpleName}")
return when(val currentState = state) {
is State.NotReady -> {
resume()
return next() //NotReady时调用下次的next
}
is State.Ready<*> -> {
state = State.NotReady(currentState.continuation) //state状态流转
println("next()====return value")
(currentState as State.Ready<T>).nextValue //Ready时才取值返回
}
State.Done -> throw IndexOutOfBoundsException("No value left.") //Done状态调用next()抛异常
}
}
//协程体执行完毕
override fun resumeWith(result: Result<Any?>) {
println("resumeWith====================")
state = State.Done
result.getOrThrow()
}
}
//这个是定义一个receiver类,保证yield()方法只能在lambda表达式的大括号内使用
abstract class GeneratorScope<T> internal constructor(){
protected abstract val parameter: T
abstract suspend fun yield(value: T)
}
//返回值具有迭代器功能
fun <T> generator(block: suspend GeneratorScope<T>.(T) -> Unit): (T) -> Generator<T> {
return { parameter: T ->
println("parameter = $parameter") // parameter = 10 这个是generator接收的start函数,nums(10)
GeneratorImpl(block, parameter)
}
}
fun main() {
val nums = generator { start: Int ->
for (i in 0..5) {
yield(start + i) //yield会挂起函数调用处
}
}
val seq = nums(10)
//println(seq.iterator().next())
for (j in seq) {
println(j)
}
//kotlin官方提供的sequence序列中有yield方法
val sequence = sequence {
yield(1)
yield(2)
yield(3)
yield(4)
yieldAll(listOf(1,2,3,4))
}
for(xx in sequence){
println(xx)
}
}
添加了log输出,打印执行的顺序:
从log输出的顺序可以看出是每次for循环会先调用hasNext()方法,hasNext()中会调用resume()方法,第一次调用resume()相当于启动协程,这时协程会执行到yield调用处的代码yield(start + i)这行并挂起(实际上是一个lambda表达式,它是一个Continuation的实现类SuspendLambad的包装),那yield方法中就会向state中存入值,并同时保存当前的Continuation对象,然后流传状态变化,变成ready, 紧接着for循环里取值操作会调用next方法,然后在下一次的for循环中又会调用resume()方法,这时就是恢复前面一次挂起函数调用处的代码继续执行,也就是执行下一次for循环里yield(start + i)会继续放下一个值,又挂起,next()方法又取值,hasNext()方法又resume()恢复, 继续执行yeild…循环往复。
如果不调用for循环打印,直接调用next获取值呢?
fun main() {
val nums = generator { start: Int ->
for (i in 0..5) {
yield(start + i) //yield会挂起函数调用处
}
}
val seq = nums(10)
println(seq.iterator().next())
}
这时next方法中也会先resume, 相当于启动协程,协程体里包含的lambda表达式开始执行,yield方法存值并挂起,next()方法取值,但这时好像没有调用resumeWith方法。。但还是能正常执行完毕。
这个例子比较绕的一点就是协程体中的代码并不会立即被执行,也就是下面的代码:
fun main() {
val nums = generator { start: Int ->
for (i in 0..5) {
yield(start + i) //yield会挂起函数调用处
}
}
val seq = nums(10) //这样并不会执行上面的for循环里面的代码
// for (j in seq) {
// println(j)
// }
把下面调用的for循环代码注释掉,上面的lambda表达式里面的代码并不会执行
只输出了一个parameter=10,yield方法里面的log一个也没有打印。很神奇,就是说只有调用获取值的时候,才会执行yield方法给你发送值,调用一次发送一次,kotlin官方提供的sequence序列大括号中的yield方法调用也是如此,sequence本身可以当做迭代器使用。
仿 Lua 协程实现非对称协程 API
sealed class Status {
class Created(val continuation: Continuation<Unit>): Status()
class Yielded<P>(val continuation: Continuation<P>): Status()
class Resumed<R>(val continuation: Continuation<R>): Status()
object Dead: Status()
}
class Coroutine<P, R> (
override val context: CoroutineContext = EmptyCoroutineContext,
private val block: suspend Coroutine<P, R>.CoroutineBody.(P) -> R //receiver是Coroutine<P, R>.CoroutineBody 内部类
): Continuation<R> {
companion object {
fun <P, R> create(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend Coroutine<P, R>.CoroutineBody.(P) -> R
): Coroutine<P, R> {
return Coroutine(context, block)
}
}
//内部类,保证yield()方法不能在外部调用 只能在lambda当中调用
inner class CoroutineBody {
var parameter: P? = null
suspend fun yield(value: R): P = suspendCoroutine { continuation ->
val previousStatus = status.getAndUpdate {
when(it) {
is Status.Created -> throw IllegalStateException("Never started!")
is Status.Yielded<*> -> throw IllegalStateException("Already yielded!")
is Status.Resumed<*> -> Status.Yielded(continuation)
Status.Dead -> throw IllegalStateException("Already dead!")
}
}
(previousStatus as? Status.Resumed<R>)?.continuation?.resume(value)
}
}
private val body = CoroutineBody()
private val status: AtomicReference<Status>
val isActive: Boolean
get() = status.get() != Status.Dead
init {
val coroutineBlock: suspend CoroutineBody.() -> R = { block(parameter!!) }
val start = coroutineBlock.createCoroutine(body, this)
status = AtomicReference(Status.Created(start))
}
override fun resumeWith(result: Result<R>) {
val previousStatus = status.getAndUpdate {
when(it) {
is Status.Created -> throw IllegalStateException("Never started!")
is Status.Yielded<*> -> throw IllegalStateException("Already yielded!")
is Status.Resumed<*> -> {
Status.Dead
}
Status.Dead -> throw IllegalStateException("Already dead!")
}
}
(previousStatus as? Status.Resumed<R>)?.continuation?.resumeWith(result)
}
suspend fun resume(value: P): R = suspendCoroutine { continuation ->
val previousStatus = status.getAndUpdate {
when(it) {
is Status.Created -> {
body.parameter = value
Status.Resumed(continuation)
}
is Status.Yielded<*> -> {
Status.Resumed(continuation)
}
is Status.Resumed<*> -> throw IllegalStateException("Already resumed!")
Status.Dead -> throw IllegalStateException("Already dead!")
}
}
when(previousStatus){
is Status.Created -> {
previousStatus.continuation.resume(Unit)
}
is Status.Yielded<*> -> {
(previousStatus as Status.Yielded<P>).continuation.resume(value)
}
}
}
suspend fun <SymT> SymCoroutine<SymT>.yield(value: R): P {
return body.yield(value)
}
}
class Dispatcher: ContinuationInterceptor {
override val key = ContinuationInterceptor
private val executor = Executors.newSingleThreadExecutor()
//拦截器方法
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
return DispatcherContinuation(continuation, executor)
}
}
class DispatcherContinuation<T>(val continuation: Continuation<T>, val executor: Executor): Continuation<T> by continuation { //接口代理
//切换线程 再resumeWith
override fun resumeWith(result: Result<T>) {
executor.execute {
continuation.resumeWith(result)
}
}
}
suspend fun main() {
//生产者 接收Unit,返回Int类型
val producer = Coroutine.create<Unit, Int>(Dispatcher()) {
log("producer start")
for(i in 0..3){
log("send", i)
yield(i)
}
200
}
//消费者 接收Int类型,返回Unit
val consumer = Coroutine.create<Int, Unit>(Dispatcher()) { param: Int ->
log("consumer start", param)
for(i in 0..3){
val value = yield(Unit)
log("receive", value)
}
}
while (producer.isActive && consumer.isActive){
val result = producer.resume(Unit) //生产
consumer.resume(result) //消费
}
}
log:
模仿实现JS中的async/await
语法
import android.os.Handler
import android.os.Looper
import com.bennyhuo.kotlin.coroutinebasics.api.githubApi
import com.bennyhuo.kotlin.coroutinebasics.common.Dispatcher
import com.bennyhuo.kotlin.coroutinebasics.common.DispatcherContext
import com.bennyhuo.kotlin.coroutinebasics.utils.log
import retrofit2.Call
import retrofit2.Callback
import retrofit2.HttpException
import retrofit2.Response
import kotlin.coroutines.*
interface AsyncScope
suspend fun <T> AsyncScope.await(block: () -> Call<T>) = suspendCoroutine<T> {
continuation ->
val call = block()
//okhttp的回调异步处理 分别调用continuation.resume处理
call.enqueue(object : Callback<T>{
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
override fun onResponse(call: Call<T>, response: Response<T>) {
if(response.isSuccessful){
response.body()?.let(continuation::resume) ?: continuation.resumeWithException(NullPointerException())
} else {
continuation.resumeWithException(HttpException(response))
}
}
})
}
fun async(context: CoroutineContext = EmptyCoroutineContext, block: suspend AsyncScope.() -> Unit) {
val completion = AsyncCoroutine(context)
block.startCoroutine(completion, completion)
}
class AsyncCoroutine(override val context: CoroutineContext = EmptyCoroutineContext): Continuation<Unit>, AsyncScope {
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow()
}
}
fun main() {
Looper.prepare()
val handlerDispatcher = DispatcherContext(object : Dispatcher {
val handler = Handler()
override fun dispatch(block: () -> Unit) {
handler.post(block) //线程切换,block会切换到handler所在的线程执行,就是main所在的线程
}
})
async(handlerDispatcher) {
val user = await { githubApi.getUserCallback("bennyhuo") }
log(user)
}
Looper.loop()
}
其中Handler和Looper是移植的android的代码。
输出:
可挂起的main函数
suspend fun main() {
....
}
编译器会为suspend的main函数生成下面等价的代码:
fun main(continuation: Continuation<Unit>): Any? {
return println("Hello")
}
实际上,源码中是在main()函数中调用了runSuspend
方法,runSuspend
当中创建并启动了一个协程体,只不过我看不到这个代码。
fun main() {
runSuspend(::main1 as suspend () -> Unit)
}
/**
* Wrapper for `suspend fun main` and `@Test suspend fun testXXX` functions.
*/
@SinceKotlin("1.3")
internal fun runSuspend(block: suspend () -> Unit) {
val run = RunSuspend()
block.startCoroutine(run)
run.await()
}
private class RunSuspend : Continuation<Unit> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
//-Xallow-result-return-type
var result: Result<Unit>? = null
override fun resumeWith(result: Result<Unit>) = synchronized(this) {
this.result = result
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") (this as Object).notifyAll()
}
fun await() = synchronized(this) {
while (true) {
when (val result = this.result) {
null -> @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") (this as Object).wait()
else -> {
result.getOrThrow() // throw up failure
return
}
}
}
}
}