User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Java   ›   正文
Java

Java—SmallreyMutiny语法学习-响应式思考

2022-09-20 15:13:35
2018  0 3
参考目录 隐藏
1) 响应式编程接口
2) Publisher发布者
3) Subscriber订阅者
4) Subscription订阅
5) Processor处理器
6) 在 Java 应用程序中使用 Mutiny
7) Uni 和 Multi是什么
8) Mutiny常用接口
9) Uni接口
10) UniCreate类
11) completionStage(CompletionStage<? extends T> stage)
12) future(Supplier<Future<? extends T>> supplier)
13) future(Future<? extends T> future, Duration timeout)
14) emitter(Consumer<UniEmitter<? super T>> consumer)
15) emitter(Supplier<S> stateSupplier, BiConsumer<S, UniEmitter<? super T>> consumer)
16) optional(Optional<T> optional)
17) optional(Supplier<Optional<T>> supplier)
18) createFrom()
19) item(Supplier<? extends T> supplier)
20) item(Supplier<S> stateSupplier, Function<S, ? extends T> mapper)
21) item(T item)
22) voidItem()
23) deferred(Supplier<Uni<? extends T>> supplier)
24) Uni接口方法
25) stage(Function<Uni<T>, O> stage)
26) onItem()
27) chain(Supplier<Uni<? extends O>> supplier)
28) chain(Function<? super T, Uni<? extends O>> mapper)
29) flatMap(Function<? super T, Uni<? extends O>> mapper)
30) map(Function<? super T, ? extends O> mapper)
31) invoke(Consumer<? super T> callback)
32) invoke(Runnable callback)
33) call(Function<? super T, Uni<?>> function)
34) call(Supplier<Uni<?>> supplier)
35) subscribe()
36) subscribeAsCompletionStage()
37) await()
38) onSubscription()
39) onItemOrFailure()
40) onFailure()
41) ifNoItem()
42) emitOn(Executor executor)
43) runSubscriptionOn(Executor executor)
44) eventually(Runnable action)
45) eventually(Supplier<Uni<? extends O>> supplier)
46) convert()
47) toMulti()
48) repeat()
49) onTermination()
50) onCancellation()
51) plug(Function<Uni<T>, Uni<R>> operatorProvider)
52) replaceWith/replaceWithNull/replaceWithVoid/replaceIfNullWith
53) log()
54) join()
55) withContext()
56) attachContext()
57) Multi接口方法
58) Uni<List<T>> 转 Multi<T>
59) 如何处理空值
60) ifNull()
61) ifNotNull()
62) 如何处理超时
63) 如何延迟事件
64) 延迟 Uni 的项目
65) 延迟 Multi 的项目
66) 如何使用轮询
67) 反应式相关问题思考
68) Tomcat 与 Netty 的区别
69) Tomcat特点
70) 为什么NIO在读取body时要模拟阻塞?
71) NioEndpoint结构
72) NioEndpoint执行序列图
73) Tomcat请求处理
74) Netty请求处理
75) MVC异步执行的模型
76) Tomcat架构图
77) 概念总结
78) Tomcat NIO Connector
79) WebFlux的异步模型

阅读完需:约 39 分钟

Mutiny、Project-Reactor、RxJava(在web后端)好像都挺冷门的,人气不高,相关资料也不多。不过其实整个Java Reactive都不是很热门,国内用的人不是很多。但是Reactive库都非常优秀,值得学习。

相比其他的框架Mutiny的API还是比较好用的

https://smallrye.io/smallrye-mutiny/1.7.0/

这是它的官网

Mutiny宣传上说的天生就支持的这个功能

最为Quarkus和Vert.x的指定反应式库确实有其过人之处,入门简单

在学习前还是要了解一下什么是反应式编程,反应式编程的白皮书等

其实这几个框架都是基于Reactive Streams规范来写的,而它就是反应式的标准

反应式流 :

http://www.reactive-streams.org/

反应式流自述文​​件 :

https://www.reactivemanifesto.org/

反应式宣言 :

https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#specification


Mutiny 是一个反应式编程库。如果您在 Wikipedia 上查找响应式编程,您会发现以下定义:

反应式编程结合了函数式编程、观察者模式和可迭代模式。

虽然正确,但我们从未发现这个定义很有帮助。它没有清楚地传达反应式编程的全部内容。所以,让我们再做一个更直接的定义:

反应式编程是关于使用数据流进行编程。

反应式编程是关于流的,尤其是观察它们。它将这个想法推向了极限:使用响应式编程,一切都是数据流。

它本质上是异步的,因为您不知道何时会看到数据。(这个概念很重要需要好好理解一下)然而,反应式编程超越了这一点。它提供了一个工具箱来组合流和处理事件。


响应式编程接口

响应式流JVM规范由下列Reactive Streams接口组成

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Publisher发布者会提供(可能)是无限多个元素,并按照Subscriber订阅者的处理速度进行发布(需求驱动)。 当调用Publisher.subscribe(Subscriber)后,有可能的Subscriber方法调用顺序为 onSubscribe onNext* (onError | onComplete)

这代表onSubscribe始终会被调用,接下来是0-N个onNext的事件; 当onNext调用结束后如果有错误会发送onError信号,若一切正常会发送onComplete通知下游没有更多元素了; 如果中途Subscription订阅被取消,那么可能onError和onComplete都不会被调用。

Publisher发布者

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> var1);
}
  1. Publisher发送给Subscriber的onNext信号总次数必须小于或等于Subscriber在当前Subscription中所请求的元素的个数。这条规则确保Publisher不会发送多于Subscriber请求的元素个数,这样隐性地表达了一个happens-before的关系, 即请求元素发生在接收到元素之前。
  2. Publisher可以发送少于Subscriber请求的元素,并且通过发送onComplete或onError信号终止当前的Subscription。 这条规则声明Publisher并不能保证可以提供Subscriber所请求的元素个数,且原因可以是多样的(没有那么多元素,获 取元素失败等等)。
  3. onSubscribe,onNext,onError和onComplete信号必须按照顺序进行发送。 这样确保在多线程的情况下能够确保信号之间的happens-before关系。
  4. 如果Publisher报错,那么它必须发送onError信号给Subscriber。
  5. 如果Publisher正常结束,那么它必须发送onComplete信号给Subscriber。
  6. 当Publisher向Subscriber发送了onError或onComplete信号后,当前Subscriber上的Subscription必须被视为已取消。
  7. 当订阅到达终结状态时(已发送onError或onComplete信号),Publisher不得再发布其他更多信号。
  8. 如果当前订阅Subscription已经被取消,那么相关的Subscriber最终必须停止被发送信号。 换句话说,当Subscription.cancel()被调用后,Subscriber仍有可能接收到一些元素,因为取消调用可能是异步发生 的。
  9. Publisher.subscribe方法中必须先于其他方法调用Subscriber上的onSubscribe方法,并且必须正常返回(除了Subscriber为null值的情况)。 其他所有的异常情况都需要在onSubscribe调用结束后通过onError方法调用来通知订阅者。
  10. Publisher.subscribe方法可以被多次调用但每次都必须是一个不同的Subscriber。也就是相关的Publisher和Subscriber不能被关联多于一次。
  11. Publisher可以支持多个Subscriber并且自由决定当前的Subscription是单播unicast还是组播multicast。

Subscriber订阅者

public interface Subscriber<T> {
    void onSubscribe(Subscription var1);

    void onNext(T var1);

    void onError(Throwable var1);

    void onComplete();
}
  1. Subscriber必须通过调用Subscription.request(long n)来传达自己的需求,这样确保由Subscriber来决定何时接收处理多少个onNext发送过来的元素。
  2. 如果Subscriber怀疑它对信号的处理会影响到Publisher的响应时,它应该确保发送信号的过程是异步的。 也就是说从线程执行的角度来讲,Subscriber的信号处理不应该影响Publisher的程序进度。
  3. Subscriber.onComplete()和Subscriber.onError(Throwable t)方法中不应该调用Subscription或Publisher上的任何方法,
    这样确保在处理终态信号时不会有竞态条件产生。
  4. 当Subscriber接收到onComplete或者onError(Throwable t)的调用后,它必须视当前的Subscription为已取消。
  5. 当Subscriber已经有了一个活跃的Subscription之后,跟其他Publisher之间的Subscription(在接收到onSubscribe信 号后)必须通过调用Subscription.cancel()来取消。 因为在任何时候,一个Subscriber只能跟至多一个Publisher进行交互,多余的订阅必须被取消。
  6. 当Subscriber不再需要某个订阅时,必须调用该订阅上的cancel方法。
  7. Subscriber必须确保对Subscription上request(long n)和cancel()方法的调用是依次分开进行的,不能有时间上的重合 (常见于多线程并发,需要用锁来确保资源独占)。
  8. Subscriber在调用了Subscription.cancel()之后仍要做好接收多个onNext信号的准备,因为仍有可能存在待处理的元素。取消订阅操作不会保证立即执行清理操作。
  9. 即使前序没有Subscription.request(long n)的调用,Subscriber仍然要做好接收onComplete信号的准备。
  10. 即使前序没有Subscription.request(long n)的调用,Subscriber仍然要做好接收onError信号的准备, 因为Publisher的出错可能跟需求发送毫不相关。
  11. Subscriber必须确保所有的信号发送方法的调用发生在该信号对应的处理之前, 也就是Subscriber的实现需要保证异步处理时的线程安全。
  12. 对给定的Subscriber来讲,Subscriber.onSubscribe方法最多被调用一次。
  13. 与Publisher的第9条规则类似,调用Subscriber的onSubscribe,onNext, onError或onComplete方法必须正常返回(除 非元素为null值); 如果Subscriber需要向Publisher表明自身的失败状态,必须通过取消订阅的操作来进行。

Subscription订阅

public interface Subscription {
    void request(long var1);

    void cancel();
}
  1. Subscription.request和Subscription.cancel必须在对应Subscriber的上下文中被调用。 这样是为了确保Subscription描述的是Subscriber和Publisher之间的唯一关联关系
  2. Subscription必须允许Subscriber在onNext或者onSubscribe中同步调用Subscription.request。 因为onNext可能是在request的栈帧上被调用,实际上也就是表明request必须是可重入的。
  3. Subscription.request必须为可能出现的Publisher和Subscriber间的同步递归调用设置上限, 以免出现栈溢出。
  4. Subscription.request应该即时返回,以免影响其调用者的响应。也就是说request线程上不应该执行复杂计算。
  5. Subscription.cancel应该即时返回,以免影响其调用者的响应,并且它也必须是线程安全和幂等的。
  6. 当Subscription被取消之后,任何额外的Subscription.request(long n)方法都应该是空操作。
  7. 当Subscription被取消之后,任何额外的Subscription.cancel()方法都应该是空操作。
  8. 当Subscription没有被取消时,Subscription.request(long n)必须指明subscriber在下一批请求中会注册多少个元素。
  9. 当Subscription没有被取消时,如果Subscription.request(long n)的入参n<0,那么必须调用onError来指定java.lang.IllegalArgumentException异常。
  10. 当Subscription没有被取消时,Subscription.request(long n)可以同步调用当前或其他Subscriber(s)的onNext方法。
  11. 当Subscription没有被取消时,Subscription.request(long n)可以同步调用当前或其他Subscriber(s)的onComplete或onError方法。
  12. 当Subscription没有被取消时,Subscription.cancel()必须要求Publisher最终停止发送元素给Subscriber。
  13. 当Subscription没有被取消时,Subscription.cancel()必须要求Publisher最终删除任何对当前Subscriber的对象引用。
  14. 当Subscription没有被取消时,调用Subscription.cancel()后可能会导致Publisher进入shut-down状态(如果它没有参与任何活跃的Subscription的话)。
  15. 调用Subscription.cancel()必须正常返回。
  16. 调用Subscription.request(long n)必须正常返回。
  17. 一个Subscription必须支持无限次调用request并且必须支持声明2^63-1(java.lang.Long.MAX_VALUE)个元素的需求。 如果请求的元素大于2^63-1,那么Publisher可以将它当作无穷多来处理(不再关注当前订阅的需求状态)。

Processor处理器

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
  1. Processor代表一个处理中的状态,同时是一个Subscriber和Publisher,并且必须同时遵循二者的规范。
  2. Processor可以尝试从onError信号中恢复状态。如果它这么做了,那么它必须将当前Subscription视为已取消, 否则它必须将onError信号传达给它的Subscribers。

在 Java 应用程序中使用 Mutiny

使用您喜欢的构建工具将依赖项添加到您的项目中:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny</artifactId>
    <version>1.7.0</version>
</dependency>

// 与reactor进行转换
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>mutiny-reactor</artifactId>
    <version>1.7.0</version>
</dependency>

一旦你将 Mutiny 提供给你的类路径,你就可以开始编写代码了。让我们从这个简单的程序开始:

import io.smallrye.mutiny.Uni;

public class FirstProgram {

    public static void main(String[] args) {
        Uni.createFrom().item("hello")
           .onItem().transform(item -> item + " mutiny")
           .onItem().transform(String::toUpperCase)
           .subscribe().with(item -> System.out.println(">> " + item));
    }
}

结果

>> HELLO MUTINY

Uni 和 Multi是什么

Uni 和 Multi都可以产生元素,Unit产生1个或0个元素,Multi产生多个元素。

Mutiny是惰性的,如果没有被subscribe就不会被执行。初学者经常忘记这一点,然后”为什么我写的代码没有执行?”

而且,reactive stream的代码debug是有一定难度的。

Uni和Mutiny产生event,我们写的代码就是对这些事件进行处理。我们可以用 onItem(), onFailure(), onCompletion() 来处理这些event。event的传播是具有方向性的,可以向上游传播也可以向下游传播,

和其他响应式流框架类似,流上面的元素(事件)会流经处理流水线(processing pipeline),每个operation会处理事件或者转换为别的类型的事件。


Mutiny常用接口

Uni接口

UniCreate类


completionStage(CompletionStage<? extends T> stage)

把 java 异步任务转换成 uni. 与 subscribeAsCompletionStage 意义相反


future(Supplier<Future<? extends T>> supplier)

future(Future<? extends T> future, Duration timeout)

把 java 的 Future 对象转换为 uni


emitter(Consumer<UniEmitter<? super T>> consumer)

emitter(Supplier<S> stateSupplier, BiConsumer<S, UniEmitter<? super T>> consumer)

可产生 uni.T 对象. Uni 没啥用,等价于 item (). 对 Mutli 比较有用

Multi<Integer> multi = Multi.createFrom().emitter(em -> {
    em.emit(1);
    em.emit(2);
    em.emit(3);
    em.complete();
});

optional(Optional<T> optional)

optional(Supplier<Optional<T>> supplier)

等到 subscribe 时才解开,并且解开后不能为 null (会报 NPE)


createFrom()

开始创建一个 Uni


item(Supplier<? extends T> supplier)

创建一个新的Uni ,它在使用指定的(可能为null )值订阅后立即完成。使用传递的Supplier在订阅时懒惰地检索该项目。与deferred(Supplier)不同,供应商生产的是商品而不是Uni 。

Uni.createFrom().item(()->organizationService.list());

item(Supplier<S> stateSupplier, Function<S, ? extends T> mapper)

跟上面差不多

item(T item)

创建一个新的Uni ,它在使用指定的(可能为null )项目订阅后立即完成。非惰性,IO会阻塞

Uni.createFrom().item(organizationService.list());

voidItem()

创建一个以null项目完成的新Uni

Uni.createFrom().voidItem();

deferred(Supplier<Uni<? extends T>> supplier)

Uni.createFrom().deferred(()->Uni.createFrom().item(organizationService.list()));

Uni接口方法

stage(Function<Uni<T>, O> stage)

分阶段执行。上一阶段出参是下一阶段入参。也可以不写,只是包了一层而已。

 upstream.stage (u->u.invoke) 等价于 upstream.invoke

onItem()

开始针对 uni 中元素做操作,等价于直接用 invoke, call, chain 等。上游抛异常时不会执行


chain(Supplier<Uni<? extends O>> supplier)

chain(Function<? super T, Uni<? extends O>> mapper)

增加执行链,重要,链接多个 uni, 并且传递结果

等价于 onItem().transformToUni()

Uni<Session> uni = getSomeSession();
  return uni.chain(session -> session.persist(fruit))
          .chain(session -> session.flush())
          .chain(() -> server.close());

flatMap(Function<? super T, Uni<? extends O>> mapper)

完全等价于 chain


map(Function<? super T, ? extends O> mapper)

等价于 onItem ().transform() uni 中 T 的转换。不要返回 uni. 传递结果


invoke(Consumer<? super T> callback)

invoke(Runnable callback)

类似 call, 执行非 uni 的逻辑。如果逻辑中包含 uni, uni 并不会被执行。如果抛异常会传递给下游。不用返回结果。顺序执行并非异步执行

Uni.createFrom().item(getResponsePayload()).invoke(it->it.addAll(getOrganization()))

call(Function<? super T, Uni<?>> function)

call(Supplier<Uni<?>> supplier)

类似 invoke, 执行 uni 的逻辑。如果逻辑中包含 uni, uni 会被执行。顺序执行并非异步执行,只是结果不传递给下游而已

Uni.createFrom().item(getResponsePayload())
    .invoke(it->it.addAll(getOrganization()))
    .call(it->{
          longRunningMethodAsync(it);
          return Uni.createFrom().item(new Organization());
    })

subscribe()

通知 upstream 开始执行,结果通过回调使用,不阻塞。可以嵌套

Subscription sub = uni.subscribe().with(
        item -> {},
        failure -> {}     
);

subscribeAsCompletionStage()

将 uni 转换为 CompletableFuture, 即 java 的原生异步任务,不阻塞。与 completionStage 意义相反,等价subscribe().asCompletionStage();


await()

通知 upstream 开始执行,并阻塞等待。可以嵌套

等待(阻塞调用者线程),直到观察到的Uni发出项目或失败。如果观察到的 uni 失败,则抛出失败。

Uni<T> uni = ...;   
T res = uni.await().indefinitely(); 
T res = uni.await().atMost(Duration.ofMillis(1000)); 

onSubscription()

提交时执行的动作,后面可以跟 UniOnSubscribe.invoke 或 UniOnSubscribe.call, 注意执行顺序是先执行 onSubscription, 再去执行 uni

Uni uni =
    Uni.createFrom()
        .item(1)
        .invoke(x -> System.out.println("uni1"))
        .onSubscription()
        .invoke(uniSubscription -> System.out.println("uni2"))
        .onSubscription()
        .invoke(uniSubscription -> System.out.println("uni3"))
            .onItem().failWith(()->new Exception("fail"))
            .invoke(() -> System.out.println("uni4"))
        .onItemOrFailure()
        .invoke(() -> System.out.println("uni5"));
        
System.out.println("caller 1");
uni.await().indefinitely();
System.out.println("caller 2");
caller 1
uni2
uni3
uni1
uni5
2022-07-28 10:30:36,527 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile dev): java.lang.Exception: fail

onItemOrFailure()

类似 onItem, 上游抛异常时也能执行到


onFailure()

类似 onItem, 上游抛异常时会执行


ifNoItem()

如果 uni 中的异步逻辑迟迟没有结果,用 ifNoItem 可以设置等待时间以及超时后逻辑 (但不会抛异常,也不会终止上游的异步执行)

Uni uni =
    Uni.createFrom()
        .voidItem()
        .invoke(
            () -> {
              try {
                for(int i = 0; i < 6; i++){
                  Thread.sleep(500);
                  System.out.println("sleep 500 end");
                }
              } catch (InterruptedException e) {
                throw new RuntimeException(e);
              }
            })
        .ifNoItem()
        .after(Duration.ofMillis(5000))
        .recoverWithItem(
            () -> {
              System.out.println("recoverWithItem1");
              return null;
            })
        .ifNoItem()
        .after(Duration.ofMillis(1000))
        .recoverWithItem(
            () -> {
              System.out.println("recoverWithItem2");
              return null;
            });

System.out.println("caller 1");
uni.await().indefinitely();
System.out.println("caller 2");
caller 1
sleep 500 end
sleep 500 end
recoverWithItem2
sleep 500 end
sleep 500 end
sleep 500 end
sleep 500 end
caller 2

emitOn(Executor executor)

后续 uni 在指定线程里执行

Uni uni =
    Uni.createFrom()
        .voidItem()
        .invoke(() -> System.out.println("uni1" + Thread.currentThread().getName()))
        // 线程池
        .emitOn(Infrastructure.getDefaultWorkerPool())
        .invoke(() -> System.out.println("uni2" + Thread.currentThread().getName()))
        .invoke(() -> System.out.println("uni3" + Thread.currentThread().getName()))
        // 新线程
        .emitOn(Infrastructure.getDefaultExecutor())
        .invoke(() -> System.out.println("uni4" + Thread.currentThread().getName()))
        .invoke(() -> System.out.println("uni5" + Thread.currentThread().getName()));
System.out.println("caller 1" + Thread.currentThread().getName());
uni.await().indefinitely();
System.out.println("caller 2" + Thread.currentThread().getName());
caller 1Quarkus Main Thread
uni1Quarkus Main Thread
uni2executor-thread-0
uni3executor-thread-0
uni4executor-thread-1
uni5executor-thread-1
caller 2Quarkus Main Thread

runSubscriptionOn(Executor executor)

修改所有 uni 执行的线程,与 emitOn 不同的是,它影响所有

Uni uni =
    Uni.createFrom()
        .voidItem()
        .invoke(() -> System.out.println("uni1" + Thread.currentThread().getName()))
        .invoke(() -> System.out.println("uni2" + Thread.currentThread().getName()))
        .onSubscription()
        .invoke(() -> System.out.println("uni7" + Thread.currentThread().getName()))
        .runSubscriptionOn(Infrastructure.getDefaultExecutor())
        .invoke(() -> System.out.println("uni3" + Thread.currentThread().getName()))
        .runSubscriptionOn(Infrastructure.getDefaultExecutor())
        .invoke(() -> System.out.println("uni4" + Thread.currentThread().getName()))
        .invoke(() -> System.out.println("uni5" + Thread.currentThread().getName()))
        .onSubscription()
        .invoke(() -> System.out.println("uni6" + Thread.currentThread().getName()));

System.out.println("caller 1" + Thread.currentThread().getName());
uni.await().indefinitely();
System.out.println("caller 2" + Thread.currentThread().getName());
caller 1Quarkus Main Thread
uni7executor-thread-1
uni6executor-thread-1
uni1executor-thread-1
uni2executor-thread-1
uni3executor-thread-1
uni4executor-thread-1
uni5executor-thread-1
caller 2Quarkus Main Thread

eventually(Runnable action)

eventually(Supplier<Uni<? extends O>> supplier)

等价于 onItemOrFailure ().invoke 成功或失败都执 uni 或非 uni 都支持

String id = ...;   
Session session = getSomeSession();  
session.find(Fruit.class, id)          
       .chain(fruit -> session.remove(fruit)
       .eventually(() -> session.close());

convert()

将Uni转换为其他类型,例如CompletionStage

uni.convert().with(BuiltinConverters.toCompletionStage());
uni.convert().toCompletionStage();

toMulti()

从此Uni创建Multi的实例,转成 Multi 不会拆分 uni.T


repeat()

重复上一个 uni 操作,后面可接 atMost (times) 或者 until

允许配置重复行为。重复允许将Uni转换为Multi特定次数或无限期。每次,都会在Uni上尝试新的订阅。取消订阅会停止重复行为。


onTermination()

提交、失败、主动终止时触发


onCancellation()

主动终止时触发


plug(Function<Uni<T>, Uni<R>> operatorProvider)

跟 stage 类似,但限制返回 uni


replaceWith/replaceWithNull/replaceWithVoid/replaceIfNullWith

跟 map 类似,当不需要前面 uni 结果时用


log()

事件触发时打印自身信息,包括 onSubscribe, onItem. 排查或打印信息用


join()

类似 combine

Uni<Number> a = Uni.createFrom().item(1);
Uni<Number> b = Uni.createFrom().item(2L);
Uni<Number> c = Uni.createFrom().item(3);

Uni<List<Number>> uni = Uni.join().all(a, b, c).andCollectFailures();

withContext()

注意 context 要定义好后在 subscribe.with 时作为参数传入,而后可以在上游 uni 中使用或修改

Context context = Context.of("test", "123");

Uni uni =
    Uni.createFrom()
        .voidItem()
        .attachContext()
        .invoke(
            voidItemWithContext ->
                System.out.println(voidItemWithContext.context().<String>get("test")))
        .invoke(voidItemWithContext -> voidItemWithContext.context().put("test", "1234"))
        .invoke(
            voidItemWithContext ->
                System.out.println(voidItemWithContext.context().<String>get("test")));

System.out.println("caller 1" + Thread.currentThread().getName());
System.out.println(context.<String>get("test"));
uni.subscribe().with(context, item->System.out.println(item));
System.out.println(context.<String>get("test"));
System.out.println("caller 2" + Thread.currentThread().getName());
caller 1Quarkus Main Thread
123
123
1234
ItemWithContext{context=Context{entries={test=1234}}, item=null}
1234
caller 2Quarkus Main Thread

attachContext()

withContext 的简便写法,把 Uni.T 和 context 封装成一个对象


Multi接口方法

Uni<List<T>> 转 Multi<T>

Uni.createFrom().item(Lists.newArrayList(1, 2, 3))
        .onItem()
        .transformToMulti(list-> Multi.createFrom()
        .items(list.stream()));

如何处理空值

该类型可以作为项目Uni发出

ifNull()

判断是空

ifNotNull()

判断不为空

Multi不支持null项目,因为它会破坏与Reactive Streams协议的兼容性。

因此,Uni提供了处理null项目的特定方法。 uni.onItem().ifNull()让您决定当收到的项目是null:

uni.onItem().ifNull().continueWith("hello");
uni.onItem().ifNull().switchTo(() -> Uni.createFrom().item("hello"));
uni.onItem().ifNull().failWith(() -> new Exception("Boom!"));

还可以使用一组对称的方法ifNotNull,让您可以处理项目不为 null的情况:

uni
    .onItem().ifNotNull().transform(String::toUpperCase)
    .onItem().ifNull().continueWith("yolo!");

案例

return MutinyWeapons.convertMono(
        () -> Uni.createFrom().item(()->this.superService.getById(id))
                .emitOn(Infrastructure.getDefaultExecutor())
                .onItem().ifNotNull()
                .transform(it -> new MultipartData().parties(it))
                .onItem().ifNull()
                .fail()
);

如何处理超时

Unis 通常用于表示异步操作,例如进行 HTTP 调用。因此,需要在此类操作上添加超时或截止日期的情况并不少见。如果我们在截止日期之前没有收到响应(收到 Mutiny 术语中的项目),我们认为操作失败。

然后,我们可以通过使用回退值、重试或任何其他故障处理策略从故障中恢复。

要配置超时使用Uni.ifNoItem().after(Duration):

Uni<String> uniWithTimeout = uni
        .ifNoItem().after(Duration.ofMillis(100))
        .recoverWithItem("some fallback item");

到达最后期限时,您可以执行各种操作。首先你可以简单地失败:

Uni<String> uniWithTimeout = uni
        .ifNoItem().after(Duration.ofMillis(100)).fail();

TimeoutException在这种情况下传播A。所以你可以在下游专门处理它:

Uni<String> uniWithTimeout = uni
    .ifNoItem().after(Duration.ofMillis(100)).fail()
    .onFailure(TimeoutException.class).recoverWithItem("we got a timeout");

您还可以传递自定义异常:

Uni<String> uniWithTimeout = uni
    .ifNoItem().after(Duration.ofMillis(100)).failWith(() -> new ServiceUnavailableException());

失败和恢复可能不方便。因此,您可以传递后备项或Uni直接传递:

Uni<String> uniWithTimeout = uni
        .ifNoItem().after(Duration.ofMillis(100)).recoverWithItem(() -> "fallback");
Uni<String> uniWithTimeout = uni
        .ifNoItem().after(Duration.ofMillis(100)).recoverWithUni(() -> someFallbackUni());

如何延迟事件

延迟 Uni 的项目

当您拥有 时Uni,您可以使用以下方式延迟项目发射onItem().delayIt().by(...):

Uni<String> delayed = Uni.createFrom().item("hello")
        .onItem().delayIt().by(Duration.ofMillis(10));

你通过了一个持续时间。收到项目后,它会等待 该持续时间,然后再将其传播给下游消费者。

您还可以根据另一个同伴 Uni延迟项目的发射:

Uni<String> delayed = Uni.createFrom().item("hello")
        // The write method returns a Uni completed
        // when the operation is done.
        .onItem().delayIt().until(this::write);

Uni当函数返回的项目发出项目(可能null)时,项目会向下游传播。如果函数发出失败(或抛出异常),则此失败会向下游传播。

延迟 Multi 的项目

Multi 没有delayIt运算符,因为对所有项目应用相同的延迟很少是您想要做的。但是,有几种方法可以在Multi.

首先,您可以使用onItem().call(),它会延迟发射,直到Uni产生的call发射项目。例如,以下代码段将所有项目延迟 10 毫秒:

Multi<Integer> delayed = multi
    .onItem().call(i ->
        // Delay the emission until the returned uni emits its item
        Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(10))
    );

通常,您不希望对所有项目应用相同的延迟。您可以call按如下方式结合随机延迟:

Random random = new Random();
Multi<Integer> delayed = Multi.createFrom().items(1, 2, 3, 4, 5)
        .onItem().call(i -> {
            Duration delay = Duration.ofMillis(random.nextInt(100) + 1);
            return Uni.createFrom().nullItem().onItem().delayIt().by(delay);
        });

最后,您可能想要限制项目。例如,您可以在每个项目之间引入(最小)一秒延迟。为了实现这一点,结合Multi.createFrom().ticks()和多限制:

// Introduce a one second delay between each item
Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .onOverflow().drop();
Multi<Integer> delayed = Multi.createBy().combining().streams(ticks, multi)
        .using((x, item) -> item);

如何使用轮询

我们周围有很多基于投票的 API。有时您需要使用这些 API 从轮询值生成流。

为此,请使用以下repeat()功能:

PollableDataSource source = new PollableDataSource();
// First creates a uni that emit the polled item.
// Because `poll` blocks, let's use a specific executor
Uni<String> pollItemFromSource = Uni.createFrom().item(source::poll)
        .runSubscriptionOn(executor);
// To get the stream of items, just repeat the uni indefinitely
Multi<String> stream = pollItemFromSource.repeat().indefinitely();

Cancellable cancellable = stream.subscribe().with(item -> System.out.println("Polled item: " + item));
// ... later ..
// when you don't want the items anymore, cancel the subscription and close the source if needed.
cancellable.cancel();
source.close();

您还可以使用repeat().until()将继续重复直到给定谓词返回的方法停止重复true,和/或直接创建Multiusing Multi.createBy().repeating():

PollableDataSource source = new PollableDataSource();
Multi<String> stream = Multi.createBy().repeating()
            .supplier(source::poll)
            .until(s -> s == null)
        .runSubscriptionOn(executor);

stream.subscribe().with(item -> System.out.println("Polled item: " + item));

反应式相关问题思考

Mutiny测试是否是异步,框架与webflux是否效果一样,MVC能不能用

目前的结论:

框架是响应式框架,异步,响应式与异步是不同的。

响应式编程只是分离了 事件的发生(cause) 与 事件的效果(effect)。 正常的顺序编程里,事件的发生与效果都是耦合的,比如等待某某输入事件(cause),然后按顺序发生A、B、C动作事件(effect)。

异步编程里,你提供回调(effect) 给异步事件,待异步事件完成(cause),底层的处理机制便自动调用你的回调,产生效果(effect)。

异步编程只是底层自动帮你处理回调,你完全可以自己写一个观察者模式,把回调保存在数组里,然后某事件发生时(cause),手动去遍历调用这些回调(effect)。

因此反应不反应,跟事件本身是不是异步没有关系,异步只是说明事件发生在另外一个时间轴上,并且自动帮你处理好注册的回调。

MVC也可以用mutiny框架,但是需要转换为Mono,而Mono与webflux中的Mono一致的

MVC可以处理Mono与Flux,不需要引入webflux

webflux引入了reactor-core依赖包,而mutiny也是reactor-core依赖包,所以可以互相转换

MVC中有很多异步的优化,但是需要Servlet3.0以上 MVC中原本的Callable与DeferredResult也是异步的还有Future,他们是最接近webflux的API

MVC可以在多个异步Jar包中切换实现,如果没有引入reactor-core依赖包,便用其他的,如果有就用reactor-core依赖包,reactor-core依赖包是由响应式规范写的

webflux与MVC很多包都是相同的,如boot与web依赖包

如果MVC加上reactor-core依赖包感觉与webflux也没太大区别,尤其是替换tomcat后

tomcat的NIO与Netty的NIO还是有区别的,但是都是满足反应式开发框架的要求

MVC可以用异步的方式来实现反应式开发,但是这仅仅是接口上的异步,不是全链路的异步,比如就算请求线程异步了但是返回前端的数据写入body时还是阻塞的,并不是异步,还是需要占用请求线程来发送

WebFlux是全链路的异步

其他的框架也有异步实现但是应该不是必须的,只是写法上的不同,比如SpringSecurity就提供了MVC与WebFlux的API

反应式应该是全链路异步,而不是简单的开多线程运行


Tomcat 与 Netty 的区别

Tomcat特点

Tomcat与Netty都是IO多路复用的,那为什么Netty比Tomcat好?

虽然Tomcat与Netty都是NIO但是实现的方式还是有区别的

  • Tomcat用的是select,而Netty用epoll
  • Netty采用堆外内存
  • Netty有零拷贝
  • Netty有高性能对象池

因为servlet规范,tomcat要实现servlet规范所以不能最大发挥NIO的特性,servlet3.0之前完全是同步阻塞模型,在read http body 以及 response的情况下,即使tomcat选择 NIO的 connector也是模拟阻塞的行为。

为什么NIO在读取body时要模拟阻塞?

tomcat的NIO完全可以以非阻塞方式处理IO,为什么在读取body部分时要模拟阻塞呢?这是因为servlet规范里定义了ServletInputStream在读数据时是阻塞模式。

Tomcat中NIO的配置

在Connector节点配置protocol="org.apache.coyote.http11.Http11NioProtocol",Http11NioProtocol协议下默认最大连接数是10000,也可以重新修改maxConnections的值,同时我们可以设置最大线程数maxThreads,这里设置的最大线程数就是Excutor的线程池的大小。

在BIO模式下实际上是没有maxConnections,即使配置也不会生效,BIO模式下的maxConnections是保持跟maxThreads大小一致,因为它是一请求一线程模式。

NioEndpoint结构

要理解tomcat的nio最主要就是对NioEndpoint的理解。它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5个部分。

  • LimitLatch是连接控制器,它负责维护连接数的计算,nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求。
  • Acceptor负责接收连接,默认是1个线程来执行,将请求的事件注册到事件列表。
  • Poller来负责轮询,Poller线程数量是cpu的核数Math.min(2,Runtime.getRuntime().availableProcessors())。由Poller将就绪的事件生成SocketProcessor同时交给Excutor去执行。Excutor线程池的大小就是我们在Connector节点配置的maxThreads的值。
  • Excutor的线程中,会完成从socket中读取http request,解析成HttpServletRequest对象,分派到相应的servlet并完成逻辑,然后将response通过socket发回client。
  • socket中读数据和往socket中写数据的过程,并没有像典型的非阻塞的NIO的那样,注册OP_READ或OP_WRITE事件到主Selector,而是直接通过socket完成读写,这时是阻塞完成的,但是在timeout控制上,使用了NIO的Selector机制,但是这个Selector并不是Poller线程维护的主Selector,而是BlockPoller线程中维护的Selector,称之为辅Selector。

NioEndpoint执行序列图

Tomcat请求处理

在这里结果处理完后是直接返回给NIOChannel的。

Netty请求处理

基础版就是单Reactor模型的方式

此图像的alt属性为空;文件名为23CC17A41400E953774B93CC0493A47F.jpg

进阶版就是主从Reactor模型的方式,EventLoop对接请求,将业务的请求处理交给Word线程处理,处理完后结果缓存在队列中,然后EventLoop去轮训队列去处理返回。

IO读写责任范畴是指,Tomcat的IO读写不是由Selector来完成的,Selector只负责分发,IO读写由Word线程来完成的,而Netty是由Selector来完成的,每个连接都由Selector负责也就是EventLoop,并且保存绑定关系直到连接结束

Socket独立性指,如果某个请求线程阻塞了,在Tomcat里只会阻塞某个客户端的连接Socket,并不会像Netty一样将绑定到同一个EventLoop的其他Socket都阻塞住

关于Netty的核心在下面的文章

Netty—初探与核心


MVC异步执行的模型

Tomcat对异步Servlet的支持

  • Tomcat线程池中的线程会调用Servlet#service,叫Tomcat线程,请求处理线程
  • Web程序在service方法的实现里启动的新线程,叫Web应用线程

无论是否在异步状态下,Tomcat都是采用NIO模式的,和Netty一样都是NIO模式

NIO的思想模式(IO多路复用)都是一样的都是Reactor模型

非异步模式下

当一个新请求到达,Tomcat会从线程池取一个线程处理,该线程会调用你的Web应用,Web应用在处理请求过程中,Tomcat线程会一直阻塞,直到Web应用处理完,才输出响应,最后Tomcat回收该线程。

假如Web应用需很长时间处理一个请求(比如DB查询或等待下游的服务调用返回),则Tomcat线程一直不回收,就会占用系统资源,极端情况下会导致“线程饥饿”,即Tomcat没有更多线程处理新请求了,所以有了异步模式,将Tomcat线程与Web线程解绑,但是Controller层返回值也需要支持异步才行,比如返回Flux或者Callable,DeferredResult等

Spring—异步Http

MVC的异步就像是在阻塞的模式下进行魔改的一样。

在接口处理完数据后返回给前端还是需要一个请求线程来返回,需要进行IO处理,而这个IO处理就是阻塞的。

在Servlet 3.0中,我们可以从HttpServletRequest对象中获得一个AsyncContext对象,该对象构成了异步处理的上下文,Request和Response对象都可从中获取。AsyncContext可以从当前线程传给另外的线程,并在新的线程中完成对请求的处理并返回结果给客户端,初始线程便可以还回给容器线程池以处理更多的请求。如此,通过将请求从一个线程传给另一个线程处理的过程便构成了Servlet 3.0中的异步处理。

上图也是Servlet 3.0,为了做Servlet 3.1的对比而重画的,Servlet 3.0对请求的处理虽然是异步的,但是对InputStream和OutputStream的IO操作却依然是阻塞的,对于数据量大的请求体或者返回体,阻塞IO也将导致不必要的等待。

因此在Servlet 3.1中引入了非阻塞IO

我们应该知道Servlet底层的IO是通过如下两个IO流支持的

  • ServletInputStream : 用来读取数据的输入流
  • ServletOutputStream : 用来输出数据的输出流

从Servlet3.1开始,ServletInputStream新增了一个setReadListener(ReadListener listener)方法实现非阻塞式读取数据。

从Servlet3.1开始,ServletOutputStream新增了一个setWriterListener(WriteListener listener)方法实现非阻塞式输出数据。

非阻塞IO代码例子

案例一:

用来模拟接口请求时的阻塞耗时

public class LongRunningProcess {

    public void run() {
        try {

            int millis = ThreadLocalRandom.current().nextInt(2000);
            String currentThread = Thread.currentThread().getName();
            System.out.println(currentThread + " sleep for " + millis + " milliseconds.");
            Thread.sleep(millis);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@WebServlet(value = "/nonBlockingThreadPoolAsync", asyncSupported = true)
public class NonBlockingAsyncHelloServlet extends HttpServlet {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200, 50000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

        AsyncContext asyncContext = request.startAsync();

        ServletInputStream inputStream = request.getInputStream();

        inputStream.setReadListener(new ReadListener() {
            @Override
            public void onDataAvailable() throws IOException {

            }

            @Override
            public void onAllDataRead() throws IOException {
                executor.execute(() -> {
                    new LongRunningProcess().run();

                    try {
                        asyncContext.getResponse().getWriter().write("Hello World!");
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    asyncContext.complete();

                });
            }

            @Override
            public void onError(Throwable t) {
                asyncContext.complete();
            }
        });


    }

}

为ServletInputStream添加了一个ReadListener,并在ReadListener的onAllDataRead()方法中完成了长时处理过程。

案例二:

package com.test.servlet3Noblock;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

@WebServlet(urlPatterns = "/AsyncLongRunningServlet2", asyncSupported = true)
public class AsyncLongRunningServlet extends HttpServlet {
    protected void doGet(HttpServletRequest request,
                         HttpServletResponse response) throws ServletException, IOException {
        request.setCharacterEncoding("UTF-8");
        response.setContentType("text/html;charset=UTF-8");

        AsyncContext actx = request.startAsync();//通过request获得AsyncContent对象

        actx.setTimeout(30*3000);//设置异步调用超时时长

        ServletInputStream in = request.getInputStream();
        //异步读取(实现了非阻塞式读取)
        in.setReadListener(new MyReadListener(in,actx));
        //直接输出到页面的内容(不等异步完成就直接给页面)
        PrintWriter out = response.getWriter();
        out.println("<h1>直接返回页面,不等异步处理结果了</h1>");
        out.flush();
    }

}
package com.test.servlet3Noblock;

import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import java.io.IOException;
import java.io.PrintWriter;

public class MyReadListener implements ReadListener {
    private ServletInputStream inputStream;
    private AsyncContext asyncContext;
    public MyReadListener(ServletInputStream input,AsyncContext context){
        this.inputStream = input;
        this.asyncContext = context;
    }
    //数据可用时触发执行
    @Override
    public void onDataAvailable() throws IOException {
        System.out.println("数据可用时触发执行");
    }

    //数据读完时触发调用
    @Override
    public void onAllDataRead() throws IOException {
        try {
            Thread.sleep(3000);//暂停5秒,模拟耗时处理数据
            PrintWriter out = asyncContext.getResponse().getWriter();
            out.write("数据读完了");
            out.flush();
            System.out.println("数据读完了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    //数据出错触发调用
    @Override
    public void onError(Throwable t){
        System.out.println("数据 出错");
        t.printStackTrace();
    }
}

可以说Servlet3的非阻塞IO是对Servlet3异步的增强。Servlet3的非阻塞是利用java.util.EventListener的事件驱动机制来实现的。

这里的非阻塞IO跟我们常说的JDK NIO不是一个概念,Servlet3.1的非阻塞是同jdk的事件驱动机制来实现。

Servlet 3.1 新特性

  • 提供了@WebServlet、@WebFilter、@WebListener、@WebInitParam等注解的支持
  • web模块化:可以将一个项目分成N个模块,然后通过扫描模块下的META-INF/web-fragment.xml进行装配
  • 容器启动时可插拔:使用ServletContainerInitializer实现,可以在容器启动时自动回调其onStartup方法,插入一些功能
  • 零XML化SpringMVC:使用ServletContainerInitializer即SpringMVC注解配置实现无XML化的SpringMVC配置
  • servlet的异步支持
  • servlet 3.1的非阻塞I/O支持

Tomcat架构图

从tomcat处理整个request请求流程中,异步处于哪一步,这个图是tomcat的总体结构,里面用箭头标明了请求线路。

在tomcat的组件中Connector和Engine是最核心的两个组件,Servlet3的异步处理就是发生在Connector中。

概念总结

Tomcat NIO Connector ,Servlet 3.0 Async,Spring MVC Async 总结梳理:

nio是一种IO的模型,对比与传统的BIO,它可以利用较少的线程处理更多的连接从而增加机器的吞吐量,Tomcat NIO Connector是Tomcat的一种NIO连接模式。异步,前面提到他是一种通讯的方式,它跟NIO没有任务关系,及时没有NIO也可以实现异步,Servlet 3.0 Async是指Servlet 3规范以后支持了异步处理Servlet请求,我们可以把请求线程和业务线程分开。Spring MVC Async是在Servlet3异步的基础上做了一层封装。

Tomcat NIO Connector

Tomcat的Connector 有三种模式,BIO,NIO,APR,Tomcat NIO Connector是其中的NIO模式,使得tomcat容器可以用较少的线程处理大量的连接请求,不再是传统的一请求一线程模式。

Servlet 3.0 Async

Servlet 3.0支持了业务请求的异步处理,Servlet3之前一个请求的处理流程,请求解析、READ BODY,RESPONSE BODY,以及其中的业务逻辑处理都由Tomcat线程池中的一个线程进行处理的。那么3.0以后我们可以让请求线程(IO线程)和业务处理线程分开,进而对业务进行线程池隔离。我们还可以根据业务重要性进行业务分级,然后再把线程池分级。还可以根据这些分级做其它操作比如监控和降级处理。

Spring MVC Async

Spring MVC 3.2 以上版本基于Servlet 3的基础做的封装,使用方式如下

Spring—异步Http


WebFlux的异步模型

WebFlux是全链路的异步请求,在WebFlux里就没有使用Tomcat的SpringBoot的困扰,因为里面写业务和接口都是采用Flux或Mono,都是异步响应式框架。

事件循环是异步的关键

Tomcat与Netty做异步服务器只能说各有优劣

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

3 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 294点赞 593972浏览

随机文章
PostgreSQL—监控
1年前
Kotlin-类型进阶—密封类(二十六)
4年前
java—stream()方法的使用(java8)
5年前
Java—并发编程(八)线程池– (2) 线程池的原理
3年前
SpringBoot—AOP的用法
5年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1927 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 593972 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付