User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Kotlin   ›   Kotlin Notes   ›   正文
Kotlin Notes

Kotlin-协程—协程初步(三十一)

2021-03-14 23:53:53
1393  0 0
参考目录 隐藏
1) 协程
2) 进程和线程的痛点
3) 协程的应用
4) Kotlin 协程
5) 挂起函数
6) 协程的创建
7) 实例
8) 模仿Python的序列生成器Generator
9) 仿 Lua 协程实现非对称协程 API
10) 模仿实现JS中的async/await语法
11) 可挂起的main函数

阅读完需:约 20 分钟

协程

在说协程之前可以先讲讲进程和线程:

有一定基础的小伙伴们肯定都知道进程和线程。

进程是什么呢?

直白地讲,进程就是应用程序的启动实例。比如我们运行一个游戏,打开一个软件,就是开启了一个进程。

进程拥有代码和打开的文件资源、数据资源、独立的内存空间。

线程又是什么呢?

线程从属于进程,是程序的实际执行者。一个进程至少包含一个主线程,也可以有更多的子线程。

线程拥有自己的栈空间。

对操作系统来说,线程是最小的执行单元,进程是最小的资源管理单元。

无论进程还是线程,都是由操作系统所管理的。

Java中线程具有五种状态:

但是,线程不同状态之间的转化是谁来实现的呢?是JVM吗?

并不是。JVM需要通过操作系统内核中的TCB(Thread Control Block)模块来改变线程的状态,这一过程需要耗费一定的CPU资源。


进程和线程的痛点

线程之间是如何进行协作的呢?

最经典的例子就是生产者/消费者模式:

若干个生产者线程向队列中写入数据,若干个消费者线程从队列中消费数据。

如何用java语言实现生产者/消费者模式呢?

让我们来看一看代码:

public class ProducerConsumerTest {
 
	public static void main(String args[]) {
		final Queue<Integer> sharedQueue = new Queue();
		Thread producer = new Producer(sharedQueue);
		Thread consumer = new Consumer(sharedQueue);
		producer.start();
		consumer.start();
	}
}
class Producer extends Thread {
 
	private static final int MAX_QUEUE_SIZE = 5;
 
	private final Queue sharedQueue;
 
	public Producer(Queue sharedQueue) {
		super();
		this.sharedQueue = sharedQueue;
	}
 
	@Override
	public void run() {
		for (int i = 0; i < 100; i++) {
			synchronized (sharedQueue) {
				while (sharedQueue.size() >= MAX_QUEUE_SIZE) {
					System.out.println("队列满了,等待消费");
					try {
						sharedQueue.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				sharedQueue.add(i);
				System.out.println("进行生产 : " + i);
				sharedQueue.notify();
			}
		}
	}
}
class Consumer extends Thread {
	private final Queue sharedQueue;
 
	public Consumer(Queue sharedQueue) {
		super();
		this.sharedQueue = sharedQueue;
	}
 
	@Override
	public void run() {
		while (true) {
			synchronized (sharedQueue) {
				while (sharedQueue.size() == 0) {
					try {
						System.out.println("队列空了,等待生产");
						sharedQueue.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				int number = sharedQueue.poll();
				System.out.println("进行消费 : " + number);
				sharedQueue.notify();
			}
		}
	}
}

这段代码做了下面几件事:

1.定义了一个生产者类,一个消费者类。

2.生产者类循环100次,向同步队列当中插入数据。

3.消费者循环监听同步队列,当队列有数据时拉取数据。

4.如果队列满了(达到5个元素),生产者阻塞。

5.如果队列空了,消费者阻塞。

上面的代码正确地实现了生产者/消费者模式,但是却并不是一个高性能的实现。为什么性能不高呢?原因如下:

1.涉及到同步锁。

2.涉及到线程阻塞状态和可运行状态之间的切换。

3.涉及到线程上下文的切换。

以上涉及到的任何一点,都是非常耗费性能的操作。

协程,英文Coroutines,是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程。

最重要的是,协程不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行)。

这样带来的好处就是性能得到了很大的提升,不会像线程切换那样消耗资源。

既然协程这么好,它到底是怎么来使用的呢?

这段代码十分简单,即使没用过python的小伙伴应该也能基本看懂。

代码中创建了一个叫做consumer的协程,并且在主线程中生产数据,协程中消费数据。

其中 yield 是python当中的语法。当协程执行到yield关键字时,会暂停在那一行,等到主线程调用send方法发送了数据,协程才会接到数据继续执行。

但是,yield让协程暂停,和线程的阻塞是有本质区别的。协程的暂停完全由程序控制,线程的阻塞状态是由操作系统内核来进行切换。

因此,协程的开销远远小于线程的开销。

协程的应用

Lua语言

Lua从5.0版本开始使用协程,通过扩展库coroutine来实现。

Python语言

正如刚才所写的代码示例,python可以通过 yield/send 的方式实现协程。在python 3.5以后,async/await 成为了更好的替代方案。

Go语言

Go语言对协程的实现非常强大而简洁,可以轻松创建成百上千个协程并发执行。


Kotlin 协程

其实简单的说,协程就是可以控制线程切换的代码,或能实现类似的功能的,可以让异步代码同步化,如JS里的async和await等。 这种方式可以给我们简化代码,使得业务逻辑清晰。

最常见于网络请求的回调,如一般我们会有一个成功的回调和一个失败的回调,按照异步的写法,两个回调方法中都要处理逻辑,协程可以使异步回调按照同步的方式去返回结果,写法简单,清晰。

kotlin通过suspend关键字来指定函数可以挂起,实现异步转同步的功能。

异步程序最麻烦的就是,当你有多个异步程序同时运行时,比如同时并发请求了好几个接口,要想拿到返回值一并处理,就比较困难。还有就是如果我想按照顺序的异步执行,通常也比较困难

协程本身的概念实际包含了线程调度的概念,只有能控制线程切换,才有可能实现上面的功能。


线程其实是操作系统的概念,而协程则属于编程语言的范畴,它是应用程序层的api层的东西,二者其实没什么太大关系。



值得一提的是协程并不是kotlin里面特有的东西,其他语言如Python、JS等也要类似的语法可以实现。


比较常见的可能就是async和await关键字,JS里面比较常见,但是这玩意居然在C#和Python里面也有

js需要在使用的时候添加这俩关键字,kotlin就比较优秀了,使用的时候感觉不出来,就像完全的同步代码一样,只不过它是定义的时候写好了罢了。


挂起函数

kotlin实现恢复挂起点是通过一个接口类Continuation(英文翻译过来叫”继续”)来实现的

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

它有一个上下文对象,还有一个resumeWith方法,这个方法就是用来恢复挂起函数用的,如果是成功和失败回调都可以通过result参数来返回。

添加了suspend关键字的函数,kotlin最终生成下面的方法:

可以看到,suspend函数实际上需要一个continuation参数

如果挂起函数没有真正被挂起(没有发生线程切换)返回值返回的就是实际参数类型,否则返回的是一个标记。

suspend fun getUserSuspend(name: String) = suspendCoroutine<User> { continuation ->
    githubApi.getUserCallback(name).enqueue(object: Callback<User>{
        override fun onFailure(call: Call<User>, t: Throwable) =
            continuation.resumeWithException(t)
        override fun onResponse(call: Call<User>, response: Response<User>) =
            response.takeIf { it.isSuccessful }?.body()?.let(continuation::resume)
                ?: continuation.resumeWithException(HttpException(response))
    })
}
suspend fun main(){
    val user = getUserSuspend("bennyhuo")
    showUser(user)
}

最简单的复写挂起函数的回调:

suspend fun suspendFunc() = suspendCoroutine<Int> {
    it.resumeWith(Result.success(1))
}

只不过真正的挂起需要真正的切换线程,如果直接调用的话相当于没有挂起。


协程的创建

    suspend {

    }.createCoroutine(object: Continuation<Unit>{ //创建协程
        override val context = EmptyCoroutineContext

        override fun resumeWith(result: Result<Unit>) {
            log("Coroutine End with $result")
        }
    }).resume(Unit) //恢复

    suspend {

    }.startCoroutine(object: Continuation<Unit>{ //启动协程
        override val context = EmptyCoroutineContext

        override fun resumeWith(result: Result<Unit>) {
            log("Coroutine End with $result") //协程执行完后调用
        }
    })


简单的例子,异步转同步,先使用retrofit的api创建一个接口请求实例:

import okhttp3.Interceptor
import okhttp3.OkHttpClient
import retrofit2.Call
import retrofit2.converter.gson.GsonConverterFactory
import retrofit2.http.GET
import retrofit2.http.Path

val githubApi by lazy {
    val retrofit = retrofit2.Retrofit.Builder()
            .client(OkHttpClient.Builder().addInterceptor(Interceptor {
                it.proceed(it.request()).apply {
                    log("request: ${code()}")
                }
            }).build())
            .baseUrl("https://api.github.com")
            .addConverterFactory(GsonConverterFactory.create())
            .build()

    retrofit.create(GitHubApi::class.java)
}

interface GitHubApi {
    @GET("users/{login}")
    fun getUserCallback(@Path("login") login: String): Call<User>

    @GET("users/{login}")
    suspend fun getUserSuspend(@Path("login") login: String): User
}

data class User(val id: String, val name: String, val url: String)

常见的调用场景:

import com.bennyhuo.kotlin.coroutinebasics.api.User
import com.bennyhuo.kotlin.coroutinebasics.api.githubApi
import retrofit2.Call
import retrofit2.Callback
import retrofit2.Response

//普通的异步请求,成功和失败需要分开在两个回调函数中处理
fun async() {
    val call = githubApi.getUserCallback("bennyhuo")
    call.enqueue(object : Callback<User> {
        override fun onFailure(call: Call<User>, t: Throwable) {
            showError(t)
        }
        override fun onResponse(call: Call<User>, response: Response<User>) {
            response.body()?.let(::showUser) ?: showError(NullPointerException())
        }
    })
}

//for循环中发起多个异步请求,获取请求结果
fun asyncLoop() {
    val names = arrayOf("abreslav","udalov", "yole")
    names.forEach { name ->
        val call = githubApi.getUserCallback(name)
        call.enqueue(object : Callback<User> {
            override fun onFailure(call: Call<User>, t: Throwable) {
                showError(t)
            }
            override fun onResponse(call: Call<User>, response: Response<User>) {
                response.body()?.let(::showUser) ?: showError(NullPointerException())
            }
        })
    }
}

//使用挂起函数来请求,不需要请求的回调
suspend fun coroutine(){
    val names = arrayOf("abreslav","udalov", "yole")
    names.forEach { name ->
        try {
            val user = githubApi.getUserSuspend(name) //请求后这里会挂起,直到请求成功之后恢复执行
            showUser(user)
        } catch (e: Exception) {
            showError(e)
        }
    }
}

//通过挂起函数的方式获取所有异步请求的结果放到一个数组当中
suspend fun coroutineLoop(){
    val names = arrayOf("abreslav","udalov", "yole")
    val users = names.map { name ->
        githubApi.getUserSuspend(name)
    }
}

实例

模仿Python的序列生成器Generator

import kotlin.coroutines.*


interface Generator<T> {
    operator fun iterator(): Iterator<T>
}

class GeneratorImpl<T>(private val block: suspend GeneratorScope<T>.(T) -> Unit, private val parameter: T): Generator<T> {
    override fun iterator(): Iterator<T> {
        return GeneratorIterator(block, parameter)
    }
}

//密封类的使用 定义状态
sealed class State {
    //continuation作为参数方便下次调用
    class NotReady(val continuation: Continuation<Unit>): State()
    class Ready<T>(val continuation: Continuation<Unit>, val nextValue: T): State()
    object Done: State()
}

//GeneratorScope<T>.(T) 点左边的是receiver
class GeneratorIterator<T>(private val block: suspend GeneratorScope<T>.(T) -> Unit, override val parameter: T)
    : GeneratorScope<T>(), Iterator<T>, Continuation<Any?> {
    override val context: CoroutineContext = EmptyCoroutineContext

    private var state: State

    init {
        val coroutineBlock: suspend GeneratorScope<T>.() -> Unit = { block(parameter) } //挂起函数 调用block lambda表达式传入parameter参数
        val start = coroutineBlock.createCoroutine(this, this) //不需要马上启动的话使用createCoroutine创建协程
        state = State.NotReady(start) //初始状态肯定是NotReady,createCoroutine返回的start参数就是Continuation类型
        println("init====================")
    }

    //yield是一个挂起函数 覆写GeneratorScope类的方法
    override suspend fun yield(value: T) = suspendCoroutine<Unit> {
        continuation ->
        println("yield========${state.javaClass.simpleName} value=${value}")
        state = when(state) {
            is State.NotReady -> State.Ready(continuation, value) //调用yield(xx)方法使状态进入Ready状态
            is State.Ready<*> ->  throw IllegalStateException("Cannot yield a value while ready.")
            State.Done -> throw IllegalStateException("Cannot yield a value while done.")
        }
        //这里continuation没有直接调用resume方法,在后面用户调用hasNext()或next()时调用resume()
    }

    private fun resume() {
        println("resume()====================")
        //val currentState = state之后调用.continuation会自动类型转换
        when(val currentState = state) {
            is State.NotReady -> {
                println("resume()====================when NotReady")
                currentState.continuation.resume(Unit) // NotReady时调用Continuation的resume方法恢复挂起点继续执行
            }
        }
    }

    override fun hasNext(): Boolean {
        println("hasNext()====================")
        resume()
        return state != State.Done
    }

    //next方法返回yield存入的值
    override fun next(): T {
        println("next()========${state.javaClass.simpleName}")
        return when(val currentState = state) {
            is State.NotReady -> {
                resume()
                return next() //NotReady时调用下次的next
            }
            is State.Ready<*> -> {
                state = State.NotReady(currentState.continuation) //state状态流转
                println("next()====return value")
                (currentState as State.Ready<T>).nextValue //Ready时才取值返回
            }
            State.Done -> throw IndexOutOfBoundsException("No value left.") //Done状态调用next()抛异常
        }
    }

    //协程体执行完毕
    override fun resumeWith(result: Result<Any?>) {
        println("resumeWith====================")
        state = State.Done
        result.getOrThrow()
    }

}

//这个是定义一个receiver类,保证yield()方法只能在lambda表达式的大括号内使用
abstract class GeneratorScope<T> internal constructor(){
    protected abstract val parameter: T

    abstract suspend fun yield(value: T)
}

//返回值具有迭代器功能
fun <T> generator(block: suspend GeneratorScope<T>.(T) -> Unit): (T) -> Generator<T> {
    return { parameter: T ->
        println("parameter = $parameter") // parameter = 10 这个是generator接收的start函数,nums(10)
        GeneratorImpl(block, parameter)
    }
}

fun main() {
    val nums = generator { start: Int ->
        for (i in 0..5) {
            yield(start + i) //yield会挂起函数调用处
        }
    }

    val seq = nums(10)

    //println(seq.iterator().next())
    for (j in seq) {
        println(j)
    }

    //kotlin官方提供的sequence序列中有yield方法
    val sequence = sequence {
        yield(1)
        yield(2)
        yield(3)
        yield(4)
        yieldAll(listOf(1,2,3,4))
    }

    for(xx in sequence){
        println(xx)
    }
}

添加了log输出,打印执行的顺序:

从log输出的顺序可以看出是每次for循环会先调用hasNext()方法,hasNext()中会调用resume()方法,第一次调用resume()相当于启动协程,这时协程会执行到yield调用处的代码yield(start + i)这行并挂起(实际上是一个lambda表达式,它是一个Continuation的实现类SuspendLambad的包装),那yield方法中就会向state中存入值,并同时保存当前的Continuation对象,然后流传状态变化,变成ready, 紧接着for循环里取值操作会调用next方法,然后在下一次的for循环中又会调用resume()方法,这时就是恢复前面一次挂起函数调用处的代码继续执行,也就是执行下一次for循环里yield(start + i)会继续放下一个值,又挂起,next()方法又取值,hasNext()方法又resume()恢复, 继续执行yeild…循环往复。

如果不调用for循环打印,直接调用next获取值呢?

fun main() {
    val nums = generator { start: Int ->
        for (i in 0..5) {
            yield(start + i) //yield会挂起函数调用处
        }
    }

    val seq = nums(10)

    println(seq.iterator().next())
}

这时next方法中也会先resume, 相当于启动协程,协程体里包含的lambda表达式开始执行,yield方法存值并挂起,next()方法取值,但这时好像没有调用resumeWith方法。。但还是能正常执行完毕。

这个例子比较绕的一点就是协程体中的代码并不会立即被执行,也就是下面的代码:

fun main() {
    val nums = generator { start: Int ->
        for (i in 0..5) {
            yield(start + i) //yield会挂起函数调用处
        }
    }
    val seq = nums(10) //这样并不会执行上面的for循环里面的代码
//    for (j in seq) {
//        println(j)
//    }

把下面调用的for循环代码注释掉,上面的lambda表达式里面的代码并不会执行

只输出了一个parameter=10,yield方法里面的log一个也没有打印。很神奇,就是说只有调用获取值的时候,才会执行yield方法给你发送值,调用一次发送一次,kotlin官方提供的sequence序列大括号中的yield方法调用也是如此,sequence本身可以当做迭代器使用。


仿 Lua 协程实现非对称协程 API

sealed class Status {
    class Created(val continuation: Continuation<Unit>): Status()
    class Yielded<P>(val continuation: Continuation<P>): Status()
    class Resumed<R>(val continuation: Continuation<R>): Status()
    object Dead: Status()
}

class Coroutine<P, R> (
    override val context: CoroutineContext = EmptyCoroutineContext,
    private val block: suspend Coroutine<P, R>.CoroutineBody.(P) -> R //receiver是Coroutine<P, R>.CoroutineBody 内部类
): Continuation<R> {

    companion object {
        fun <P, R> create(
            context: CoroutineContext = EmptyCoroutineContext,
            block: suspend Coroutine<P, R>.CoroutineBody.(P) -> R
        ): Coroutine<P, R> {
            return Coroutine(context, block)
        }
    }

    //内部类,保证yield()方法不能在外部调用 只能在lambda当中调用
    inner class CoroutineBody {
        var parameter: P? = null

        suspend fun yield(value: R): P = suspendCoroutine { continuation ->
            val previousStatus = status.getAndUpdate {
                when(it) {
                    is Status.Created -> throw IllegalStateException("Never started!")
                    is Status.Yielded<*> -> throw IllegalStateException("Already yielded!")
                    is Status.Resumed<*> -> Status.Yielded(continuation)
                    Status.Dead -> throw IllegalStateException("Already dead!")
                }
            }
            (previousStatus as? Status.Resumed<R>)?.continuation?.resume(value)
        }
    }

    private val body = CoroutineBody()

    private val status: AtomicReference<Status>

    val isActive: Boolean
        get() = status.get() != Status.Dead

    init {
        val coroutineBlock: suspend CoroutineBody.() -> R = { block(parameter!!) }
        val start = coroutineBlock.createCoroutine(body, this)
        status = AtomicReference(Status.Created(start))
    }

    override fun resumeWith(result: Result<R>) {
        val previousStatus = status.getAndUpdate {
            when(it) {
                is Status.Created -> throw IllegalStateException("Never started!")
                is Status.Yielded<*> -> throw IllegalStateException("Already yielded!")
                is Status.Resumed<*> -> {
                    Status.Dead
                }
                Status.Dead -> throw IllegalStateException("Already dead!")
            }
        }
        (previousStatus as? Status.Resumed<R>)?.continuation?.resumeWith(result)
    }

    suspend fun resume(value: P): R = suspendCoroutine { continuation ->
        val previousStatus = status.getAndUpdate {
            when(it) {
                is Status.Created -> {
                    body.parameter = value
                    Status.Resumed(continuation)
                }
                is Status.Yielded<*> -> {
                    Status.Resumed(continuation)
                }
                is Status.Resumed<*> -> throw IllegalStateException("Already resumed!")
                Status.Dead -> throw IllegalStateException("Already dead!")
            }
        }
        when(previousStatus){
            is Status.Created -> {
                previousStatus.continuation.resume(Unit)
            }
            is Status.Yielded<*> -> {
                (previousStatus as Status.Yielded<P>).continuation.resume(value)
            }
        }
    }

    suspend fun <SymT> SymCoroutine<SymT>.yield(value: R): P {
        return body.yield(value)
    }
}

class Dispatcher: ContinuationInterceptor {
    override val key = ContinuationInterceptor

    private val executor = Executors.newSingleThreadExecutor()

    //拦截器方法
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
        return DispatcherContinuation(continuation, executor)
    }
}

class DispatcherContinuation<T>(val continuation: Continuation<T>, val executor: Executor): Continuation<T> by continuation { //接口代理

    //切换线程 再resumeWith
    override fun resumeWith(result: Result<T>) {
        executor.execute {
            continuation.resumeWith(result)
        }
    }
}

suspend fun main() {
    //生产者 接收Unit,返回Int类型
    val producer = Coroutine.create<Unit, Int>(Dispatcher()) {
        log("producer start")
        for(i in 0..3){
            log("send", i)
            yield(i)
        }
        200
    }

    //消费者 接收Int类型,返回Unit
    val consumer = Coroutine.create<Int, Unit>(Dispatcher()) { param: Int ->
        log("consumer start", param)
        for(i in 0..3){
            val value = yield(Unit)
            log("receive", value)
        }
    }

    while (producer.isActive && consumer.isActive){
        val result = producer.resume(Unit) //生产
        consumer.resume(result) //消费
    }
}

log:

模仿实现JS中的async/await语法

import android.os.Handler
import android.os.Looper
import com.bennyhuo.kotlin.coroutinebasics.api.githubApi
import com.bennyhuo.kotlin.coroutinebasics.common.Dispatcher
import com.bennyhuo.kotlin.coroutinebasics.common.DispatcherContext
import com.bennyhuo.kotlin.coroutinebasics.utils.log
import retrofit2.Call
import retrofit2.Callback
import retrofit2.HttpException
import retrofit2.Response
import kotlin.coroutines.*

interface AsyncScope

suspend fun <T> AsyncScope.await(block: () -> Call<T>) = suspendCoroutine<T> {
    continuation ->
    val call = block()
    //okhttp的回调异步处理 分别调用continuation.resume处理
    call.enqueue(object : Callback<T>{
        override fun onFailure(call: Call<T>, t: Throwable) {
            continuation.resumeWithException(t)
        }

        override fun onResponse(call: Call<T>, response: Response<T>) {
            if(response.isSuccessful){
                response.body()?.let(continuation::resume) ?: continuation.resumeWithException(NullPointerException())
            } else {
                continuation.resumeWithException(HttpException(response))
            }
        }
    })
}

fun async(context: CoroutineContext = EmptyCoroutineContext, block: suspend AsyncScope.() -> Unit) {
    val completion = AsyncCoroutine(context)
    block.startCoroutine(completion, completion)
}

class AsyncCoroutine(override val context: CoroutineContext = EmptyCoroutineContext): Continuation<Unit>, AsyncScope {
    override fun resumeWith(result: Result<Unit>) {
        result.getOrThrow()
    }
}

fun main() {
    Looper.prepare()
    
    val handlerDispatcher = DispatcherContext(object : Dispatcher {
        val handler = Handler()
        override fun dispatch(block: () -> Unit) {
            handler.post(block) //线程切换,block会切换到handler所在的线程执行,就是main所在的线程
        }
    })

    async(handlerDispatcher) {
        val user = await { githubApi.getUserCallback("bennyhuo") }
        log(user)
    }

    Looper.loop()
}

其中Handler和Looper是移植的android的代码。

输出:


可挂起的main函数

suspend fun main() {
	....
}

编译器会为suspend的main函数生成下面等价的代码:

fun main(continuation: Continuation<Unit>): Any? {
    return println("Hello")
}

实际上,源码中是在main()函数中调用了runSuspend方法,runSuspend当中创建并启动了一个协程体,只不过我看不到这个代码。

fun main() {
    runSuspend(::main1 as suspend () -> Unit)
}


/**
 * Wrapper for `suspend fun main` and `@Test suspend fun testXXX` functions.
 */
@SinceKotlin("1.3")
internal fun runSuspend(block: suspend () -> Unit) {
    val run = RunSuspend()
    block.startCoroutine(run)
    run.await()
}

private class RunSuspend : Continuation<Unit> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    //-Xallow-result-return-type
    var result: Result<Unit>? = null

    override fun resumeWith(result: Result<Unit>) = synchronized(this) {
        this.result = result
        @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") (this as Object).notifyAll()
    }

    fun await() = synchronized(this) {
        while (true) {
            when (val result = this.result) {
                null -> @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") (this as Object).wait()
                else -> {
                    result.getOrThrow() // throw up failure
                    return
                }
            }
        }
    }
}

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

0 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 294点赞 593691浏览

随机文章
JXLS—Excel模板框架
2年前
SpringMVC笔记—乱码问题(补充)
5年前
SpringSecurity—权限管理模型(RBAC)
4年前
SpringBoot —构建 RESTful 风格应用
5年前
Spring—事务总结
3年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1927 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 593691 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付