阅读完需:约 39 分钟
Mutiny、Project-Reactor、RxJava(在web后端)好像都挺冷门的,人气不高,相关资料也不多。不过其实整个Java Reactive都不是很热门,国内用的人不是很多。但是Reactive库都非常优秀,值得学习。
相比其他的框架Mutiny的API还是比较好用的
https://smallrye.io/smallrye-mutiny/1.7.0/
这是它的官网
Mutiny宣传上说的天生就支持的这个功能
最为Quarkus和Vert.x的指定反应式库确实有其过人之处,入门简单
在学习前还是要了解一下什么是反应式编程,反应式编程的白皮书等
其实这几个框架都是基于Reactive Streams
规范来写的,而它就是反应式的标准
反应式流 :
http://www.reactive-streams.org/
反应式流自述文件 :
https://www.reactivemanifesto.org/
反应式宣言 :
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#specification
Mutiny 是一个反应式编程库。如果您在 Wikipedia 上查找响应式编程,您会发现以下定义:
反应式编程结合了函数式编程、观察者模式和可迭代模式。
虽然正确,但我们从未发现这个定义很有帮助。它没有清楚地传达反应式编程的全部内容。所以,让我们再做一个更直接的定义:
反应式编程是关于使用数据流进行编程。
反应式编程是关于流的,尤其是观察它们。它将这个想法推向了极限:使用响应式编程,一切都是数据流。
它本质上是异步的,因为您不知道何时会看到数据。(这个概念很重要需要好好理解一下)然而,反应式编程超越了这一点。它提供了一个工具箱来组合流和处理事件。
响应式编程接口
响应式流JVM规范由下列Reactive Streams
接口组成
Publisher
Subscriber
Subscription
Processor
Publisher
发布者会提供(可能)是无限多个元素,并按照Subscriber
订阅者的处理速度进行发布(需求驱动)。 当调用Publisher.subscribe(Subscriber)
后,有可能的Subscriber
方法调用顺序为 onSubscribe onNext* (onError | onComplete)
这代表onSubscribe
始终会被调用,接下来是0-N个onNext
的事件; 当onNext
调用结束后如果有错误会发送onError
信号,若一切正常会发送onComplete
通知下游没有更多元素了; 如果中途Subscription
订阅被取消,那么可能onError
和onComplete
都不会被调用。
Publisher
发布者
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}
-
Publisher
发送给Subscriber
的onNext
信号总次数必须小于或等于Subscriber
在当前Subscription
中所请求的元素的个数。这条规则确保Publisher
不会发送多于Subscriber
请求的元素个数,这样隐性地表达了一个happens-before
的关系, 即请求元素发生在接收到元素之前。 -
Publisher
可以发送少于Subscriber
请求的元素,并且通过发送onComplete
或onError
信号终止当前的Subscription
。 这条规则声明Publisher
并不能保证可以提供Subscriber
所请求的元素个数,且原因可以是多样的(没有那么多元素,获 取元素失败等等)。 -
onSubscribe,onNext,onError和onComplete
信号必须按照顺序进行发送。 这样确保在多线程的情况下能够确保信号之间的happens-before
关系。 - 如果
Publisher
报错,那么它必须发送onError
信号给Subscriber
。 - 如果
Publisher
正常结束,那么它必须发送onComplete
信号给Subscriber
。 - 当
Publisher
向Subscriber
发送了onError
或onComplete
信号后,当前Subscriber
上的Subscription
必须被视为已取消。 -
当订阅到达终结状态时(已发送onError或onComplete信号),
Publisher
不得再发布其他更多信号。 - 如果当前订阅
Subscription
已经被取消,那么相关的Subscriber
最终必须停止被发送信号。 换句话说,当Subscription.cancel()
被调用后,Subscriber
仍有可能接收到一些元素,因为取消调用可能是异步发生 的。 -
Publisher.subscribe
方法中必须先于其他方法调用Subscriber
上的onSubscribe
方法,并且必须正常返回(除了Subscriber为null值的情况)。 其他所有的异常情况都需要在onSubscribe
调用结束后通过onError
方法调用来通知订阅者。 -
Publisher.subscribe
方法可以被多次调用但每次都必须是一个不同的Subscriber
。也就是相关的Publisher
和Subscriber
不能被关联多于一次。 -
Publisher
可以支持多个Subscriber
并且自由决定当前的Subscription
是单播unicast
还是组播multicast
。
Subscriber
订阅者
public interface Subscriber<T> {
void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
-
Subscriber
必须通过调用Subscription.request(long n)
来传达自己的需求,这样确保由Subscriber
来决定何时接收处理多少个onNext
发送过来的元素。 - 如果
Subscriber
怀疑它对信号的处理会影响到Publisher
的响应时,它应该确保发送信号的过程是异步的。 也就是说从线程执行的角度来讲,Subscriber
的信号处理不应该影响Publisher
的程序进度。 -
Subscriber.onComplete()和Subscriber.onError(Throwable t)
方法中不应该调用Subscription
或Publisher
上的任何方法,
这样确保在处理终态信号时不会有竞态条件产生。 - 当
Subscriber
接收到onComplete
或者onError(Throwable t)
的调用后,它必须视当前的Subscription
为已取消。 - 当
Subscriber
已经有了一个活跃的Subscription
之后,跟其他Publisher
之间的Subscription
(在接收到onSubscribe
信 号后)必须通过调用Subscription.cancel()
来取消。 因为在任何时候,一个Subscriber
只能跟至多一个Publisher
进行交互,多余的订阅必须被取消。 - 当
Subscriber
不再需要某个订阅时,必须调用该订阅上的cancel
方法。 -
Subscriber
必须确保对Subscription上request(long n)和cancel()
方法的调用是依次分开进行的,不能有时间上的重合 (常见于多线程并发,需要用锁来确保资源独占)。 -
Subscriber
在调用了Subscription.cancel()
之后仍要做好接收多个onNext
信号的准备,因为仍有可能存在待处理的元素。取消订阅操作不会保证立即执行清理操作。 - 即使前序没有
Subscription.request(long n)
的调用,Subscriber
仍然要做好接收onComplete
信号的准备。 - 即使前序没有
Subscription.request(long n)
的调用,Subscriber
仍然要做好接收onError
信号的准备, 因为Publisher
的出错可能跟需求发送毫不相关。 -
Subscriber
必须确保所有的信号发送方法的调用发生在该信号对应的处理之前, 也就是Subscriber
的实现需要保证异步处理时的线程安全。 - 对给定的
Subscriber
来讲,Subscriber.onSubscribe
方法最多被调用一次。 - 与
Publisher
的第9条规则类似,调用Subscriber的onSubscribe,onNext, onError或onComplete
方法必须正常返回(除 非元素为null值); 如果Subscriber
需要向Publisher
表明自身的失败状态,必须通过取消订阅的操作来进行。
Subscription
订阅
public interface Subscription {
void request(long var1);
void cancel();
}
-
Subscription.request
和Subscription.cancel
必须在对应Subscriber
的上下文中被调用。 这样是为了确保Subscription
描述的是Subscriber
和Publisher
之间的唯一关联关系 -
Subscription
必须允许Subscriber
在onNext或者onSubscribe
中同步调用Subscription.request
。 因为onNext
可能是在request
的栈帧上被调用,实际上也就是表明request
必须是可重入的。 -
Subscription.request
必须为可能出现的Publisher
和Subscriber
间的同步递归调用设置上限, 以免出现栈溢出。 -
Subscription.request
应该即时返回,以免影响其调用者的响应。也就是说request
线程上不应该执行复杂计算。 -
Subscription.cancel
应该即时返回,以免影响其调用者的响应,并且它也必须是线程安全和幂等的。 - 当
Subscription
被取消之后,任何额外的Subscription.request(long n)
方法都应该是空操作。 - 当
Subscription
被取消之后,任何额外的Subscription.cancel()
方法都应该是空操作。 - 当
Subscription
没有被取消时,Subscription.request(long n)
必须指明subscriber
在下一批请求中会注册多少个元素。 - 当
Subscription
没有被取消时,如果Subscription.request(long n)
的入参n<0,那么必须调用onError
来指定java.lang.IllegalArgumentException
异常。 - 当
Subscription
没有被取消时,Subscription.request(long n)
可以同步调用当前或其他Subscriber(s)的onNext
方法。 - 当
Subscription
没有被取消时,Subscription.request(long n)
可以同步调用当前或其他Subscriber(s)的onComplete或onError
方法。 - 当
Subscription
没有被取消时,Subscription.cancel()
必须要求Publisher
最终停止发送元素给Subscriber
。 - 当
Subscription
没有被取消时,Subscription.cancel()
必须要求Publisher
最终删除任何对当前Subscriber
的对象引用。 - 当
Subscription
没有被取消时,调用Subscription.cancel()
后可能会导致Publisher
进入shut-down状态(如果它没有参与任何活跃的Subscription的话)。 - 调用
Subscription.cancel()
必须正常返回。 - 调用
Subscription.request(long n)
必须正常返回。 - 一个
Subscription
必须支持无限次调用request
并且必须支持声明2^63-1(java.lang.Long.MAX_VALUE)
个元素的需求。 如果请求的元素大于2^63-1
,那么Publisher
可以将它当作无穷多来处理(不再关注当前订阅的需求状态)。
Processor
处理器
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
-
Processor
代表一个处理中的状态,同时是一个Subscriber
和Publisher
,并且必须同时遵循二者的规范。 -
Processor
可以尝试从onError
信号中恢复状态。如果它这么做了,那么它必须将当前Subscription
视为已取消, 否则它必须将onError
信号传达给它的Subscribers
。
在 Java 应用程序中使用 Mutiny
使用您喜欢的构建工具将依赖项添加到您的项目中:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
<version>1.7.0</version>
</dependency>
// 与reactor进行转换
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-reactor</artifactId>
<version>1.7.0</version>
</dependency>
一旦你将 Mutiny 提供给你的类路径,你就可以开始编写代码了。让我们从这个简单的程序开始:
import io.smallrye.mutiny.Uni;
public class FirstProgram {
public static void main(String[] args) {
Uni.createFrom().item("hello")
.onItem().transform(item -> item + " mutiny")
.onItem().transform(String::toUpperCase)
.subscribe().with(item -> System.out.println(">> " + item));
}
}
结果
>> HELLO MUTINY
Uni 和 Multi是什么
Uni 和 Multi都可以产生元素,Unit产生1个或0个元素,Multi产生多个元素。
Mutiny是惰性的,如果没有被subscribe就不会被执行。初学者经常忘记这一点,然后”为什么我写的代码没有执行?”
而且,reactive stream的代码debug是有一定难度的。
Uni和Mutiny产生event,我们写的代码就是对这些事件进行处理。我们可以用 onItem(), onFailure(), onCompletion() 来处理这些event。event的传播是具有方向性的,可以向上游传播也可以向下游传播,
和其他响应式流框架类似,流上面的元素(事件)会流经处理流水线(processing pipeline),每个operation会处理事件或者转换为别的类型的事件。
Mutiny
常用接口
Uni
接口
UniCreate
类
completionStage(CompletionStage<? extends T> stage)
把 java 异步任务转换成 uni. 与 subscribeAsCompletionStage
意义相反
future(Supplier<Future<? extends T>> supplier)
future(Future<? extends T> future, Duration timeout)
把 java 的 Future 对象转换为 uni
emitter(Consumer<UniEmitter<? super T>> consumer)
emitter(Supplier<S> stateSupplier, BiConsumer<S, UniEmitter<? super T>> consumer)
可产生 uni.T 对象. Uni 没啥用,等价于 item (). 对 Mutli 比较有用
Multi<Integer> multi = Multi.createFrom().emitter(em -> {
em.emit(1);
em.emit(2);
em.emit(3);
em.complete();
});
optional(Optional<T> optional)
optional(Supplier<Optional<T>> supplier)
等到 subscribe 时才解开,并且解开后不能为 null (会报 NPE)
createFrom()
开始创建一个 Uni
item(Supplier<? extends T> supplier)
创建一个新的Uni ,它在使用指定的(可能为null )值订阅后立即完成。使用传递的Supplier在订阅时懒惰地检索该项目。与deferred(Supplier)不同,供应商生产的是商品而不是Uni 。
Uni.createFrom().item(()->organizationService.list());
item(Supplier<S> stateSupplier, Function<S, ? extends T> mapper)
跟上面差不多
item(T item)
创建一个新的Uni ,它在使用指定的(可能为null )项目订阅后立即完成。非惰性,IO会阻塞
Uni.createFrom().item(organizationService.list());
voidItem()
创建一个以null项目完成的新Uni
Uni.createFrom().voidItem();
deferred(Supplier<Uni<? extends T>> supplier)
Uni.createFrom().deferred(()->Uni.createFrom().item(organizationService.list()));
Uni
接口方法
stage(Function<Uni<T>, O> stage)
分阶段执行。上一阶段出参是下一阶段入参。也可以不写,只是包了一层而已。
upstream.stage (u->u.invoke) 等价于 upstream.invoke
onItem()
开始针对 uni 中元素做操作,等价于直接用 invoke, call, chain
等。上游抛异常时不会执行
chain(Supplier<Uni<? extends O>> supplier)
chain(Function<? super T, Uni<? extends O>> mapper)
增加执行链,重要,链接多个 uni, 并且传递结果
等价于 onItem().transformToUni()
Uni<Session> uni = getSomeSession();
return uni.chain(session -> session.persist(fruit))
.chain(session -> session.flush())
.chain(() -> server.close());
flatMap(Function<? super T, Uni<? extends O>> mapper)
完全等价于 chain
map(Function<? super T, ? extends O> mapper)
等价于 onItem ().transform()
uni 中 T 的转换。不要返回 uni. 传递结果
invoke(Consumer<? super T> callback)
invoke(Runnable callback)
类似 call, 执行非 uni 的逻辑。如果逻辑中包含 uni, uni 并不会被执行。如果抛异常会传递给下游。不用返回结果。顺序执行并非异步执行
Uni.createFrom().item(getResponsePayload()).invoke(it->it.addAll(getOrganization()))
call(Function<? super T, Uni<?>> function)
call(Supplier<Uni<?>> supplier)
类似 invoke, 执行 uni 的逻辑。如果逻辑中包含 uni, uni 会被执行。顺序执行并非异步执行,只是结果不传递给下游而已
Uni.createFrom().item(getResponsePayload())
.invoke(it->it.addAll(getOrganization()))
.call(it->{
longRunningMethodAsync(it);
return Uni.createFrom().item(new Organization());
})
subscribe()
通知 upstream 开始执行,结果通过回调使用,不阻塞。可以嵌套
Subscription sub = uni.subscribe().with(
item -> {},
failure -> {}
);
subscribeAsCompletionStage()
将 uni 转换为 CompletableFuture
, 即 java 的原生异步任务,不阻塞。与 completionStage
意义相反,等价subscribe().asCompletionStage();
await()
通知 upstream 开始执行,并阻塞等待。可以嵌套
等待(阻塞调用者线程),直到观察到的Uni发出项目或失败。如果观察到的 uni 失败,则抛出失败。
Uni<T> uni = ...;
T res = uni.await().indefinitely();
T res = uni.await().atMost(Duration.ofMillis(1000));
onSubscription()
提交时执行的动作,后面可以跟 UniOnSubscribe.invoke 或 UniOnSubscribe.call,
注意执行顺序是先执行 onSubscription
, 再去执行 uni
Uni uni =
Uni.createFrom()
.item(1)
.invoke(x -> System.out.println("uni1"))
.onSubscription()
.invoke(uniSubscription -> System.out.println("uni2"))
.onSubscription()
.invoke(uniSubscription -> System.out.println("uni3"))
.onItem().failWith(()->new Exception("fail"))
.invoke(() -> System.out.println("uni4"))
.onItemOrFailure()
.invoke(() -> System.out.println("uni5"));
System.out.println("caller 1");
uni.await().indefinitely();
System.out.println("caller 2");
caller 1
uni2
uni3
uni1
uni5
2022-07-28 10:30:36,527 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile dev): java.lang.Exception: fail
onItemOrFailure()
类似 onItem, 上游抛异常时也能执行到
onFailure()
类似 onItem, 上游抛异常时会执行
ifNoItem()
如果 uni 中的异步逻辑迟迟没有结果,用 ifNoItem 可以设置等待时间以及超时后逻辑 (但不会抛异常,也不会终止上游的异步执行)
Uni uni =
Uni.createFrom()
.voidItem()
.invoke(
() -> {
try {
for(int i = 0; i < 6; i++){
Thread.sleep(500);
System.out.println("sleep 500 end");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.ifNoItem()
.after(Duration.ofMillis(5000))
.recoverWithItem(
() -> {
System.out.println("recoverWithItem1");
return null;
})
.ifNoItem()
.after(Duration.ofMillis(1000))
.recoverWithItem(
() -> {
System.out.println("recoverWithItem2");
return null;
});
System.out.println("caller 1");
uni.await().indefinitely();
System.out.println("caller 2");
caller 1
sleep 500 end
sleep 500 end
recoverWithItem2
sleep 500 end
sleep 500 end
sleep 500 end
sleep 500 end
caller 2
emitOn(Executor executor)
后续 uni 在指定线程里执行
Uni uni =
Uni.createFrom()
.voidItem()
.invoke(() -> System.out.println("uni1" + Thread.currentThread().getName()))
// 线程池
.emitOn(Infrastructure.getDefaultWorkerPool())
.invoke(() -> System.out.println("uni2" + Thread.currentThread().getName()))
.invoke(() -> System.out.println("uni3" + Thread.currentThread().getName()))
// 新线程
.emitOn(Infrastructure.getDefaultExecutor())
.invoke(() -> System.out.println("uni4" + Thread.currentThread().getName()))
.invoke(() -> System.out.println("uni5" + Thread.currentThread().getName()));
System.out.println("caller 1" + Thread.currentThread().getName());
uni.await().indefinitely();
System.out.println("caller 2" + Thread.currentThread().getName());
caller 1Quarkus Main Thread
uni1Quarkus Main Thread
uni2executor-thread-0
uni3executor-thread-0
uni4executor-thread-1
uni5executor-thread-1
caller 2Quarkus Main Thread
runSubscriptionOn(Executor executor)
修改所有 uni 执行的线程,与 emitOn 不同的是,它影响所有
Uni uni =
Uni.createFrom()
.voidItem()
.invoke(() -> System.out.println("uni1" + Thread.currentThread().getName()))
.invoke(() -> System.out.println("uni2" + Thread.currentThread().getName()))
.onSubscription()
.invoke(() -> System.out.println("uni7" + Thread.currentThread().getName()))
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.invoke(() -> System.out.println("uni3" + Thread.currentThread().getName()))
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.invoke(() -> System.out.println("uni4" + Thread.currentThread().getName()))
.invoke(() -> System.out.println("uni5" + Thread.currentThread().getName()))
.onSubscription()
.invoke(() -> System.out.println("uni6" + Thread.currentThread().getName()));
System.out.println("caller 1" + Thread.currentThread().getName());
uni.await().indefinitely();
System.out.println("caller 2" + Thread.currentThread().getName());
caller 1Quarkus Main Thread
uni7executor-thread-1
uni6executor-thread-1
uni1executor-thread-1
uni2executor-thread-1
uni3executor-thread-1
uni4executor-thread-1
uni5executor-thread-1
caller 2Quarkus Main Thread
eventually(Runnable action)
eventually(Supplier<Uni<? extends O>> supplier)
等价于 onItemOrFailure ().invoke
成功或失败都执 uni 或非 uni 都支持
String id = ...;
Session session = getSomeSession();
session.find(Fruit.class, id)
.chain(fruit -> session.remove(fruit)
.eventually(() -> session.close());
convert()
将Uni转换为其他类型,例如CompletionStage
uni.convert().with(BuiltinConverters.toCompletionStage());
uni.convert().toCompletionStage();
toMulti()
从此Uni创建Multi的实例,转成 Multi 不会拆分 uni.T
repeat()
重复上一个 uni 操作,后面可接 atMost (times) 或者 until
允许配置重复行为。重复允许将Uni转换为Multi特定次数或无限期。每次,都会在Uni上尝试新的订阅。取消订阅会停止重复行为。
onTermination()
提交、失败、主动终止时触发
onCancellation()
主动终止时触发
plug(Function<Uni<T>, Uni<R>> operatorProvider)
跟 stage 类似,但限制返回 uni
replaceWith/replaceWithNull/replaceWithVoid/replaceIfNullWith
跟 map 类似,当不需要前面 uni 结果时用
log()
事件触发时打印自身信息,包括 onSubscribe, onItem. 排查或打印信息用
join()
类似 combine
Uni<Number> a = Uni.createFrom().item(1);
Uni<Number> b = Uni.createFrom().item(2L);
Uni<Number> c = Uni.createFrom().item(3);
Uni<List<Number>> uni = Uni.join().all(a, b, c).andCollectFailures();
withContext()
注意 context 要定义好后在 subscribe.with
时作为参数传入,而后可以在上游 uni 中使用或修改
Context context = Context.of("test", "123");
Uni uni =
Uni.createFrom()
.voidItem()
.attachContext()
.invoke(
voidItemWithContext ->
System.out.println(voidItemWithContext.context().<String>get("test")))
.invoke(voidItemWithContext -> voidItemWithContext.context().put("test", "1234"))
.invoke(
voidItemWithContext ->
System.out.println(voidItemWithContext.context().<String>get("test")));
System.out.println("caller 1" + Thread.currentThread().getName());
System.out.println(context.<String>get("test"));
uni.subscribe().with(context, item->System.out.println(item));
System.out.println(context.<String>get("test"));
System.out.println("caller 2" + Thread.currentThread().getName());
caller 1Quarkus Main Thread
123
123
1234
ItemWithContext{context=Context{entries={test=1234}}, item=null}
1234
caller 2Quarkus Main Thread
attachContext()
withContext 的简便写法,把 Uni.T 和 context 封装成一个对象
Multi接口方法
Uni<List<T>> 转 Multi<T>
Uni.createFrom().item(Lists.newArrayList(1, 2, 3))
.onItem()
.transformToMulti(list-> Multi.createFrom()
.items(list.stream()));
如何处理空值
该类型可以作为项目Uni
发出
ifNull()
判断是空
ifNotNull()
判断不为空
Multi
不支持null
项目,因为它会破坏与Reactive Streams
协议的兼容性。
因此,Uni
提供了处理null
项目的特定方法。 uni.onItem().ifNull()
让您决定当收到的项目是null
:
uni.onItem().ifNull().continueWith("hello");
uni.onItem().ifNull().switchTo(() -> Uni.createFrom().item("hello"));
uni.onItem().ifNull().failWith(() -> new Exception("Boom!"));
还可以使用一组对称的方法ifNotNull
,让您可以处理项目不为 null的情况:
uni
.onItem().ifNotNull().transform(String::toUpperCase)
.onItem().ifNull().continueWith("yolo!");
案例
return MutinyWeapons.convertMono(
() -> Uni.createFrom().item(()->this.superService.getById(id))
.emitOn(Infrastructure.getDefaultExecutor())
.onItem().ifNotNull()
.transform(it -> new MultipartData().parties(it))
.onItem().ifNull()
.fail()
);
如何处理超时
Unis 通常用于表示异步操作,例如进行 HTTP 调用。因此,需要在此类操作上添加超时或截止日期的情况并不少见。如果我们在截止日期之前没有收到响应(收到 Mutiny 术语中的项目),我们认为操作失败。
然后,我们可以通过使用回退值、重试或任何其他故障处理策略从故障中恢复。
要配置超时使用Uni.ifNoItem().after(Duration)
:
Uni<String> uniWithTimeout = uni
.ifNoItem().after(Duration.ofMillis(100))
.recoverWithItem("some fallback item");
到达最后期限时,您可以执行各种操作。首先你可以简单地失败:
Uni<String> uniWithTimeout = uni
.ifNoItem().after(Duration.ofMillis(100)).fail();
TimeoutException
在这种情况下传播A。所以你可以在下游专门处理它:
Uni<String> uniWithTimeout = uni
.ifNoItem().after(Duration.ofMillis(100)).fail()
.onFailure(TimeoutException.class).recoverWithItem("we got a timeout");
您还可以传递自定义异常:
Uni<String> uniWithTimeout = uni
.ifNoItem().after(Duration.ofMillis(100)).failWith(() -> new ServiceUnavailableException());
失败和恢复可能不方便。因此,您可以传递后备项或Uni
直接传递:
Uni<String> uniWithTimeout = uni
.ifNoItem().after(Duration.ofMillis(100)).recoverWithItem(() -> "fallback");
Uni<String> uniWithTimeout = uni
.ifNoItem().after(Duration.ofMillis(100)).recoverWithUni(() -> someFallbackUni());
如何延迟事件
延迟 Uni 的项目
当您拥有 时Uni
,您可以使用以下方式延迟项目发射onItem().delayIt().by(...)
:
Uni<String> delayed = Uni.createFrom().item("hello")
.onItem().delayIt().by(Duration.ofMillis(10));
你通过了一个持续时间。收到项目后,它会等待 该持续时间,然后再将其传播给下游消费者。
您还可以根据另一个同伴 Uni
延迟项目的发射:
Uni<String> delayed = Uni.createFrom().item("hello")
// The write method returns a Uni completed
// when the operation is done.
.onItem().delayIt().until(this::write);
Uni
当函数返回的项目发出项目(可能null
)时,项目会向下游传播。如果函数发出失败(或抛出异常),则此失败会向下游传播。
延迟 Multi 的项目
Multi 没有delayIt运算符,因为对所有项目应用相同的延迟很少是您想要做的。但是,有几种方法可以在Multi
.
首先,您可以使用onItem().call()
,它会延迟发射,直到Uni
产生的call
发射项目。例如,以下代码段将所有项目延迟 10 毫秒:
Multi<Integer> delayed = multi
.onItem().call(i ->
// Delay the emission until the returned uni emits its item
Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(10))
);
通常,您不希望对所有项目应用相同的延迟。您可以call
按如下方式结合随机延迟:
Random random = new Random();
Multi<Integer> delayed = Multi.createFrom().items(1, 2, 3, 4, 5)
.onItem().call(i -> {
Duration delay = Duration.ofMillis(random.nextInt(100) + 1);
return Uni.createFrom().nullItem().onItem().delayIt().by(delay);
});
最后,您可能想要限制项目。例如,您可以在每个项目之间引入(最小)一秒延迟。为了实现这一点,结合Multi.createFrom().ticks()
和多限制:
// Introduce a one second delay between each item
Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop();
Multi<Integer> delayed = Multi.createBy().combining().streams(ticks, multi)
.using((x, item) -> item);
如何使用轮询
我们周围有很多基于投票的 API。有时您需要使用这些 API 从轮询值生成流。
为此,请使用以下repeat()
功能:
PollableDataSource source = new PollableDataSource();
// First creates a uni that emit the polled item.
// Because `poll` blocks, let's use a specific executor
Uni<String> pollItemFromSource = Uni.createFrom().item(source::poll)
.runSubscriptionOn(executor);
// To get the stream of items, just repeat the uni indefinitely
Multi<String> stream = pollItemFromSource.repeat().indefinitely();
Cancellable cancellable = stream.subscribe().with(item -> System.out.println("Polled item: " + item));
// ... later ..
// when you don't want the items anymore, cancel the subscription and close the source if needed.
cancellable.cancel();
source.close();
您还可以使用repeat().until()
将继续重复直到给定谓词返回的方法停止重复true
,和/或直接创建Multi
using Multi.createBy().repeating()
:
PollableDataSource source = new PollableDataSource();
Multi<String> stream = Multi.createBy().repeating()
.supplier(source::poll)
.until(s -> s == null)
.runSubscriptionOn(executor);
stream.subscribe().with(item -> System.out.println("Polled item: " + item));
反应式相关问题思考
Mutiny测试是否是异步,框架与webflux是否效果一样,MVC能不能用
目前的结论:
框架是响应式框架,异步,响应式与异步是不同的。
响应式编程只是分离了 事件的发生(cause) 与 事件的效果(effect)。 正常的顺序编程里,事件的发生与效果都是耦合的,比如等待某某输入事件(cause),然后按顺序发生A、B、C动作事件(effect)。
异步编程里,你提供回调(effect) 给异步事件,待异步事件完成(cause),底层的处理机制便自动调用你的回调,产生效果(effect)。
异步编程只是底层自动帮你处理回调,你完全可以自己写一个观察者模式,把回调保存在数组里,然后某事件发生时(cause),手动去遍历调用这些回调(effect)。
因此反应不反应,跟事件本身是不是异步没有关系,异步只是说明事件发生在另外一个时间轴上,并且自动帮你处理好注册的回调。
MVC也可以用mutiny框架,但是需要转换为Mono,而Mono与webflux中的Mono一致的
MVC可以处理Mono与Flux,不需要引入webflux
webflux引入了reactor-core依赖包,而mutiny也是reactor-core依赖包,所以可以互相转换
MVC中有很多异步的优化,但是需要Servlet3.0以上 MVC中原本的Callable与DeferredResult也是异步的还有Future,他们是最接近webflux的API
MVC可以在多个异步Jar包中切换实现,如果没有引入reactor-core依赖包,便用其他的,如果有就用reactor-core依赖包,reactor-core依赖包是由响应式规范写的
webflux与MVC很多包都是相同的,如boot与web依赖包
如果MVC加上reactor-core依赖包感觉与webflux也没太大区别,尤其是替换tomcat后
tomcat的NIO与Netty的NIO还是有区别的,但是都是满足反应式开发框架的要求
MVC可以用异步的方式来实现反应式开发,但是这仅仅是接口上的异步,不是全链路的异步,比如就算请求线程异步了但是返回前端的数据写入body时还是阻塞的,并不是异步,还是需要占用请求线程来发送
WebFlux是全链路的异步
其他的框架也有异步实现但是应该不是必须的,只是写法上的不同,比如SpringSecurity就提供了MVC与WebFlux的API
反应式应该是全链路异步,而不是简单的开多线程运行
Tomcat 与 Netty 的区别
Tomcat特点
Tomcat与Netty都是IO多路复用的,那为什么Netty比Tomcat好?
虽然Tomcat与Netty都是NIO但是实现的方式还是有区别的
- Tomcat用的是select,而Netty用epoll
- Netty采用堆外内存
- Netty有零拷贝
- Netty有高性能对象池
因为servlet规范,tomcat要实现servlet规范所以不能最大发挥NIO的特性,servlet3.0之前完全是同步阻塞模型,在read http body 以及 response的情况下,即使tomcat选择 NIO的 connector也是模拟阻塞的行为。
为什么NIO在读取body时要模拟阻塞?
tomcat的NIO完全可以以非阻塞方式处理IO,为什么在读取body部分时要模拟阻塞呢?这是因为servlet规范里定义了ServletInputStream
在读数据时是阻塞模式。
Tomcat中NIO的配置
在Connector
节点配置protocol="org.apache.coyote.http11.Http11NioProtocol",Http11NioProtocol
协议下默认最大连接数是10000,也可以重新修改maxConnections
的值,同时我们可以设置最大线程数maxThreads
,这里设置的最大线程数就是Excutor
的线程池的大小。
在BIO模式下实际上是没有maxConnections
,即使配置也不会生效,BIO模式下的maxConnections
是保持跟maxThreads
大小一致,因为它是一请求一线程模式。
NioEndpoint结构
要理解tomcat的nio最主要就是对NioEndpoint
的理解。它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor
5个部分。
-
LimitLatch
是连接控制器,它负责维护连接数的计算,nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求。 -
Acceptor
负责接收连接,默认是1个线程来执行,将请求的事件注册到事件列表。 -
Poller
来负责轮询,Poller线程数量是cpu的核数Math.min(2,Runtime.getRuntime().availableProcessors())
。由Poller
将就绪的事件生成SocketProcessor
同时交给Excutor
去执行。Excutor
线程池的大小就是我们在Connector
节点配置的maxThreads
的值。 -
Excutor
的线程中,会完成从socket中读取http request,解析成HttpServletRequest
对象,分派到相应的servlet并完成逻辑,然后将response通过socket发回client。 -
socket
中读数据和往socket中写数据的过程,并没有像典型的非阻塞的NIO的那样,注册OP_READ
或OP_WRITE
事件到主Selector
,而是直接通过socket完成读写,这时是阻塞完成的,但是在timeout控制上,使用了NIO的Selector机制,但是这个Selector并不是Poller线程维护的主Selector,而是BlockPoller线程中维护的Selector,称之为辅Selector。
NioEndpoint执行序列图
Tomcat请求处理
在这里结果处理完后是直接返回给NIOChannel
的。
Netty请求处理
基础版就是单Reactor模型的方式
进阶版就是主从Reactor模型的方式,EventLoop对接请求,将业务的请求处理交给Word线程处理,处理完后结果缓存在队列中,然后EventLoop去轮训队列去处理返回。
IO读写责任范畴是指,Tomcat的IO读写不是由Selector来完成的,Selector只负责分发,IO读写由Word线程来完成的,而Netty是由Selector来完成的,每个连接都由Selector负责也就是EventLoop,并且保存绑定关系直到连接结束
Socket独立性指,如果某个请求线程阻塞了,在Tomcat里只会阻塞某个客户端的连接Socket,并不会像Netty一样将绑定到同一个EventLoop的其他Socket都阻塞住
关于Netty的核心在下面的文章
MVC异步执行的模型
Tomcat对异步Servlet的支持
- Tomcat线程池中的线程会调用Servlet#service,叫Tomcat线程,请求处理线程
- Web程序在service方法的实现里启动的新线程,叫Web应用线程
无论是否在异步状态下,Tomcat都是采用NIO模式的,和Netty一样都是NIO模式
NIO的思想模式(IO多路复用)都是一样的都是Reactor模型
非异步模式下
当一个新请求到达,Tomcat会从线程池取一个线程处理,该线程会调用你的Web应用,Web应用在处理请求过程中,Tomcat线程会一直阻塞,直到Web应用处理完,才输出响应,最后Tomcat回收该线程。
假如Web应用需很长时间处理一个请求(比如DB查询或等待下游的服务调用返回),则Tomcat线程一直不回收,就会占用系统资源,极端情况下会导致“线程饥饿”,即Tomcat没有更多线程处理新请求了,所以有了异步模式,将Tomcat线程与Web线程解绑,但是Controller层返回值也需要支持异步才行,比如返回Flux或者Callable,DeferredResult等
MVC的异步就像是在阻塞的模式下进行魔改的一样。
在接口处理完数据后返回给前端还是需要一个请求线程来返回,需要进行IO处理,而这个IO处理就是阻塞的。
在Servlet 3.0
中,我们可以从HttpServletRequest
对象中获得一个AsyncContext
对象,该对象构成了异步处理的上下文,Request
和Response
对象都可从中获取。AsyncContext
可以从当前线程传给另外的线程,并在新的线程中完成对请求的处理并返回结果给客户端,初始线程便可以还回给容器线程池以处理更多的请求。如此,通过将请求从一个线程传给另一个线程处理的过程便构成了Servlet 3.0
中的异步处理。
上图也是Servlet 3.0
,为了做Servlet 3.1
的对比而重画的,Servlet 3.0
对请求的处理虽然是异步的,但是对InputStream
和OutputStream
的IO操作却依然是阻塞的,对于数据量大的请求体或者返回体,阻塞IO也将导致不必要的等待。
因此在Servlet 3.1
中引入了非阻塞IO
我们应该知道Servlet底层的IO是通过如下两个IO流支持的
-
ServletInputStream
: 用来读取数据的输入流 -
ServletOutputStream
: 用来输出数据的输出流
从Servlet3.1
开始,ServletInputStream
新增了一个setReadListener(ReadListener listener)
方法实现非阻塞式读取数据。
从Servlet3.1
开始,ServletOutputStream
新增了一个setWriterListener(WriteListener listener)
方法实现非阻塞式输出数据。
非阻塞IO代码例子
案例一:
用来模拟接口请求时的阻塞耗时
public class LongRunningProcess {
public void run() {
try {
int millis = ThreadLocalRandom.current().nextInt(2000);
String currentThread = Thread.currentThread().getName();
System.out.println(currentThread + " sleep for " + millis + " milliseconds.");
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@WebServlet(value = "/nonBlockingThreadPoolAsync", asyncSupported = true)
public class NonBlockingAsyncHelloServlet extends HttpServlet {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200, 50000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
AsyncContext asyncContext = request.startAsync();
ServletInputStream inputStream = request.getInputStream();
inputStream.setReadListener(new ReadListener() {
@Override
public void onDataAvailable() throws IOException {
}
@Override
public void onAllDataRead() throws IOException {
executor.execute(() -> {
new LongRunningProcess().run();
try {
asyncContext.getResponse().getWriter().write("Hello World!");
} catch (IOException e) {
e.printStackTrace();
}
asyncContext.complete();
});
}
@Override
public void onError(Throwable t) {
asyncContext.complete();
}
});
}
}
为ServletInputStream
添加了一个ReadListener
,并在ReadListener
的onAllDataRead()
方法中完成了长时处理过程。
案例二:
package com.test.servlet3Noblock;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
@WebServlet(urlPatterns = "/AsyncLongRunningServlet2", asyncSupported = true)
public class AsyncLongRunningServlet extends HttpServlet {
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
request.setCharacterEncoding("UTF-8");
response.setContentType("text/html;charset=UTF-8");
AsyncContext actx = request.startAsync();//通过request获得AsyncContent对象
actx.setTimeout(30*3000);//设置异步调用超时时长
ServletInputStream in = request.getInputStream();
//异步读取(实现了非阻塞式读取)
in.setReadListener(new MyReadListener(in,actx));
//直接输出到页面的内容(不等异步完成就直接给页面)
PrintWriter out = response.getWriter();
out.println("<h1>直接返回页面,不等异步处理结果了</h1>");
out.flush();
}
}
package com.test.servlet3Noblock;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import java.io.IOException;
import java.io.PrintWriter;
public class MyReadListener implements ReadListener {
private ServletInputStream inputStream;
private AsyncContext asyncContext;
public MyReadListener(ServletInputStream input,AsyncContext context){
this.inputStream = input;
this.asyncContext = context;
}
//数据可用时触发执行
@Override
public void onDataAvailable() throws IOException {
System.out.println("数据可用时触发执行");
}
//数据读完时触发调用
@Override
public void onAllDataRead() throws IOException {
try {
Thread.sleep(3000);//暂停5秒,模拟耗时处理数据
PrintWriter out = asyncContext.getResponse().getWriter();
out.write("数据读完了");
out.flush();
System.out.println("数据读完了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//数据出错触发调用
@Override
public void onError(Throwable t){
System.out.println("数据 出错");
t.printStackTrace();
}
}
可以说Servlet3
的非阻塞IO是对Servlet3
异步的增强。Servlet3的非阻塞是利用java.util.EventListener
的事件驱动机制来实现的。
这里的非阻塞IO跟我们常说的JDK NIO不是一个概念,Servlet3.1的非阻塞是同jdk的事件驱动机制来实现。
Servlet 3.1 新特性
- 提供了
@WebServlet、@WebFilter、@WebListener、@WebInitParam
等注解的支持 - web模块化:可以将一个项目分成N个模块,然后通过扫描模块下的
META-INF/web-fragment.xml
进行装配 - 容器启动时可插拔:使用
ServletContainerInitializer
实现,可以在容器启动时自动回调其onStartup
方法,插入一些功能 - 零XML化SpringMVC:使用
ServletContainerInitializer
即SpringMVC
注解配置实现无XML化的SpringMVC配置 - servlet的异步支持
- servlet 3.1的非阻塞I/O支持
Tomcat架构图
从tomcat处理整个request请求流程中,异步处于哪一步,这个图是tomcat的总体结构,里面用箭头标明了请求线路。
在tomcat的组件中Connector
和Engine
是最核心的两个组件,Servlet3的异步处理就是发生在Connector中。
概念总结
Tomcat NIO Connector ,Servlet 3.0 Async,Spring MVC Async
总结梳理:
nio是一种IO的模型,对比与传统的BIO,它可以利用较少的线程处理更多的连接从而增加机器的吞吐量,Tomcat NIO Connector
是Tomcat
的一种NIO连接模式。异步,前面提到他是一种通讯的方式,它跟NIO没有任务关系,及时没有NIO也可以实现异步,Servlet 3.0 Async
是指Servlet 3
规范以后支持了异步处理Servlet
请求,我们可以把请求线程和业务线程分开。Spring MVC Async
是在Servlet3
异步的基础上做了一层封装。
Tomcat NIO Connector
Tomcat的Connector 有三种模式,BIO,NIO,APR,Tomcat NIO Connector
是其中的NIO模式,使得tomcat容器可以用较少的线程处理大量的连接请求,不再是传统的一请求一线程模式。
Servlet 3.0 Async
Servlet 3.0
支持了业务请求的异步处理,Servlet3
之前一个请求的处理流程,请求解析、READ BODY,RESPONSE BODY
,以及其中的业务逻辑处理都由Tomcat
线程池中的一个线程进行处理的。那么3.0以后我们可以让请求线程(IO线程)和业务处理线程分开,进而对业务进行线程池隔离。我们还可以根据业务重要性进行业务分级,然后再把线程池分级。还可以根据这些分级做其它操作比如监控和降级处理。
Spring MVC Async
Spring MVC 3.2
以上版本基于Servlet 3
的基础做的封装,使用方式如下
WebFlux的异步模型
WebFlux是全链路的异步请求,在WebFlux里就没有使用Tomcat的SpringBoot的困扰,因为里面写业务和接口都是采用Flux或Mono,都是异步响应式框架。
事件循环是异步的关键
Tomcat与Netty做异步服务器只能说各有优劣