User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Java   ›   正文
Java

Java8—新特性之CompletableFuture(构建异步应用)

2020-10-25 20:54:28
2019  0 3
参考目录 隐藏
1) 异步
2) Futrue异步模式存在的问题
3) JDK8引入中重磅类库:CompletableFuture
4) CompletableFuture实现了CompletionStage接口的如下策略:
5) CompletableFuture实现了Futurre接口的如下策略:
6) 主动完成计算
7) 使用案例
8) 创建CompletableFuture
9) whenComplete计算结果完成时的处理
10) handle、 thenApply相当于回调函数(callback) 当然也有转换的作用
11) thenAccept与thenRun(纯消费(执行Action))
12) thenCombine、thenCompose整合两个计算结果
13) Either:任意一个计算完了就可以执行
14) 辅助方法 allOf 和 anyOf
15) CompletableFuture的同步回调和异步回调
16) 更上一层楼(手写sequence方法)
17) CompletableFuture避坑
18) 需要自定义线程池
19) allOf()超时时间
20) 线程池的DiscardPolicy()导致整个程序卡死
21) ListenableFuture
22) 比较 ListenableFuture 与 Future
23) 如何使用 ThreadPoolTaskExecutor
24) ListenableFuture 的好处以及 Future 带来的阻塞问题

阅读完需:约 37 分钟

异步

传统单线程环境下,调用函数是同步的,必须等待程序返回结果后,才可进行其他处理。因此为了提高系统整体的并发性能,引入了异步执行~

jdk中已经内置future模式的实现。Future是Java5添加的类,用来描述一个异步计算的结果。可以用isDone方法来检查计算是否完成,或者使用get阻塞住调用线程,直至计算完成返回结果,也可以用cancel方法来停止任务的执行。

Futrue异步模式存在的问题

Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却是不方便,只能通过阻塞或轮询的方式得到任务结果。

阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源。而且还不能及时得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言像Node.js,采用回调的方式实现异步编程。Java的一些框架像Netty,自己扩展Java的Future接口,提供了addListener等多个扩展方法。

Java—Future与FutureTask的区别与联系

guava里面也提供了通用的扩展Future: ListenableFuture\SettableFuture以及辅助类Futures等,方便异步编程

Spring4.0也扩展了Futrue,提供了ListenableFuture来addCallback()采用回调函数的形式来提高整体异步性能~

关于ListenableFuture后面有写哦!

作为正统Java类库,是不是应该加点什么特性,可以加强一下自身库的功能?

JDK8引入中重磅类库:CompletableFuture

Java8里面新增加了一个包含50个方法左右的类:CompletableFuture. 提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。

JDK1.8才新加入的一个实现类CompletableFuture,实现了Future, CompletionStage两个接口。

CompletableFuture实现了CompletionStage接口的如下策略:

  1. 为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作。
  2. 没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例。
  3. 所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖

CompletableFuture实现了Futurre接口的如下策略:

  1. CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。
  2. 以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException。

CompletableFuture中4个异步执行任务静态方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,
注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止

主动完成计算

CompletableFuture 类实现了CompletionStage和Future接口,所以还是可以像以前一样通过阻塞或轮询的方式获得结果。尽管这种方式不推荐使用。

如下四个方法都可以获取结果:

public T 	get()  //Futrue的方法 阻塞
public T 	get(long timeout, TimeUnit unit) //Futrue的方法 阻塞
// 新提供的方法
public T 	getNow(T valueIfAbsent) //getNow有点特殊,如果结果已经计算完则返回结果或抛异常,否则返回给定的valueIfAbsent的值(此方法有点反人类有木有)
public T 	join() // 返回计算的结果或抛出一个uncheckd异常。 推荐使用

上面4个方法,推荐使用join,还有带超时时间的get方法

CompletableFuture并非一定要交给线程池执行才能实现异步,你可以像下面这样实现异步运行:

     public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();
        
        //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok"); //这里把你信任的结果set进去后,所有阻塞的get()方法都能立马苏醒,获得到结果
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        System.out.println("准备打印结果...");
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
输出:
准备打印结果...
task doing...
计算结果:ok

如果没有意外,上面发的代码工作得很正常。但是,如果任务执行过程中产生了异常会怎样呢?如下:只加一句1/0的代码

 //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok");
        }).start();

这种情况下会得到一个相当糟糕的结果:异常会被限制在执行任务的线程的范围内,最终会杀死该守护线程,而主线程,将永远永远阻塞了。

怎么解决呢?

  • 使用get(long timeout, TimeUnit unit)代替get()方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,我们应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。

使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException 。不过,也因为如此,你不能确定执行任务的线程内到底发生了什么问题(因此自己要做好权衡)。

  • 更好的解决方案是:为了能获取任务线程内发生的异常,你需要使用
    CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。
    这样,当执行任务发生异常时,调用get()方法的线程将会收到一个 ExecutionException异常,该异常接收了一个包含失败原因的Exception 参数。
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture();

        //自己开个线程去执行 执行完把结果告诉completableFuture即可
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                System.out.println(1 / 0);
            //} catch (InterruptedException e) {
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成 并且把结果告诉completableFuture
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        System.out.println("准备打印结果...");
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

这样子,如果内部发生了异常,调用get方法的时候就能得到这个Exception,进而能拿到抛异常的原因了。

说明:若执行了completableFuture.completeExceptionally(e);,那么completableFuture.get()它最终不是把异常获取出来了,而是给throw出来了,阻断主线程。(get()方法是用于获取正常返回值的)

使用案例

在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。

创建CompletableFuture

四个静态方法(如上),一个空构造函数

whenComplete计算结果完成时的处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> 	whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> 	whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> 	whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

注意这几个方法都会返回CompletableFuture。

CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null

        //若有异常
        CompletableFuture.supplyAsync(() -> 1 / 0)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

Main方法本地测试小细节:

假若我们直接通过这种方式,sleep个几秒:

    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(5);
                        System.out.println("this my sleep time");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i + 1;
                })
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e));
    }

发现控制台木有任何打印:

很多小伙伴就就开始打断点,发现断点都进不去。
根本原因:因为是异步执行的,所以主线程会线结束,JVM都退出了,自然异步线程也会死掉喽

解决方案:让main线程不这么快退出就行,自己测试的时候我们加上这么一句话就Ok了:

    public static void main(String[] args) throws InterruptedException {
    	...
    	// 让主线程等待 只要这个时间大于你异步线程的时间就成~~~
    	TimeUnit.SECONDS.sleep(20);
    }

Tips:对于web环境是不会存在此问题的,因为它的主线程一般情况下永远不会退出~~~~这里只是小伙伴在本地测试上需要注意的一个小细节~

handle、 thenApply相当于回调函数(callback) 当然也有转换的作用

public <U> CompletableFuture<U> 	handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> 	handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> 	handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)


    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

使用方式如下:

    public static void main(String[] args) {

        CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null

        //若有异常
        CompletableFuture.supplyAsync(() -> 1 / 0)
                .thenApplyAsync(i -> i * 10)
                .thenApply(i -> i.toString())
                .whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

        //上面效果 或者下面这么写也行(但上面那么写 连同异常都可以处理) 全部匿名方式 效率高 代码也优雅
        //CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> 100)
        //       .thenApplyAsync(i -> i * 10)
        //        .thenApply(i -> i.toString());
        //System.out.println(f.get()); //"1000"
    }

我们会发现,结合Java8的流式处理,简直绝配。代码看起来特别的优雅,关键还效率高,连异常都一下子给我们抓住了,简直完美。

thenApply与handle方法的区别在于handle方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。而thenApply方法只是用来处理正常值,因此一旦有异常就会抛出。

thenAccept与thenRun(纯消费(执行Action))

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }
  • 可以看到,thenAccept和thenRun都是无返回值的。如果说thenApply是不停的输入输出的进行生产,那么thenAccept和thenRun就是在进行消耗。它们是整个计算的最后两个阶段。
  • 同样是执行指定的动作,同样是消耗,二者也有区别:
    thenAccept接收上一阶段的输出作为本阶段的输入
    thenRun根本不关心前一阶段的输出,根本不不关心前一阶段的计算结果,因为它不需要输入参数(thenRun使用的是Runnable,若你只是单纯的消费,不需要启用线程时,就用thenRun更合适)

上面的方法是当计算完成的时候,会生成新的计算结果(thenApply, handle),或者返回同样的计算结果whenComplete。CompletableFuture还提供了一种处理结果的方法,只对结果执行Action,而不返回新的计算值,因此计算值为Void:

public static void main(String[] args) {
        CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> 100)
                .thenAccept(x -> System.out.println(x)); //100
        //如果此句话get不调用  也是能够输出100的 上面也会有输出的
        System.out.println(f.join()); //null 返回null,所以thenAccept是木有返回值的

        //thenRun的案例演示
        CompletableFuture<Void> f2 = CompletableFuture.supplyAsync(() -> 100)
                .thenRun(() -> System.out.println("不需要入参")); //不需要入参
        System.out.println(f2.join()); //null 返回null,所以thenRun是木有返回值的

    }

thenAcceptBoth以及相关方法提供了类似的功能,当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另外一个异步的结果。
runAfterBoth是当两个CompletionStage都正常完成计算的时候,执行一个Runnable,这个Runnable并不使用计算的结果。

public <U> CompletableFuture<Void> 	thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> 	thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> 	thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void> 	runAfterBoth(CompletionStage<?> other,  Runnable action)

例子:

    public static void main(String[] args) {
        CompletableFuture<Void> f =  CompletableFuture.supplyAsync(() -> 100)
                // 第二个消费者:x,y显然是可以把前面几个的结果都拿到,然后再做处理
                .thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y)); //1000
        System.out.println(f.join()); //null
    }

thenCombine、thenCompose整合两个计算结果

public <U,V> CompletableFuture<V> 	thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> 	thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> 	thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

public <U> CompletableFuture<U> 	thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> 	thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> 	thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

先说:thenCompose
这一组方法接受一个Function作为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture,这个新的CompletableFuture会组合原来的CompletableFuture和函数返回的CompletableFuture。

而下面的一组方法thenCombine用来复合另外一个CompletionStage的结果。它的功能类似:
两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。

其实从功能上来讲,它们的功能更类似thenAcceptBoth,只不过thenAcceptBoth是纯消费,它的函数参数没有返回值,而thenCombine的函数参数fn有返回值。

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "abc");
        CompletableFuture<String> f = future.thenCombine(future2, (x, y) -> y + "-" + x);
        System.out.println(f.join()); //abc-100
    }

因此,你可以根据方法的参数的类型来加速你的记忆。Runnable类型的参数会忽略计算的结果,Consumer是纯消费计算结果,BiConsumer会组合另外一个CompletionStage纯消费,Function会对计算结果做转换,BiFunction会组合另外一个CompletionStage的计算结果做转换。

Either:任意一个计算完了就可以执行

thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,而我们下面要了解的方法是当任意一个CompletableFuture计算完成的时候就会执行。

public CompletableFuture<Void> 	acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> 	acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> 	acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> 	applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> 	applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> 	applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

acceptEither方法是当任意一个CompletionStage完成的时候,action这个消费者就会被执行。这个方法返回CompletableFuture

applyToEither方法是当任意一个CompletionStage完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture的计算结果。

下面这个例子有时会输出100,有时候会输出200,哪个Future先完成就会根据它的结果计算。

    public static void main(String[] args) {
        Random rand = new Random();
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        });
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 200;
        });
        CompletableFuture<String> f = future.applyToEither(future2, i -> i.toString());
        System.out.println(f.join()); //有时候输出100  有时候输出200
    }

辅助方法 allOf 和 anyOf

前面我们已经介绍了几个静态方法:completedFuture、runAsync、supplyAsync,下面介绍的这两个方法用来组合多个CompletableFuture。

public static CompletableFuture<Void> 	    allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> 	anyOf(CompletableFuture<?>... cfs)

allOf方法是当所有的CompletableFuture都执行完后执行计算。
anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果返回

但是anyOf和applyToEither不同。anyOf接受任意多的CompletableFuture但是applyToEither只是判断两个CompletableFuture,anyOf返回值的计算结果是参数中其中一个CompletableFuture的计算结果,applyToEither返回值的计算结果却是要经过fn处理的。当然还有静态方法的区别,线程池的选择等

    public static void main(String[] args) {
        Random rand = new Random();
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(10000 + rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(10000 + rand.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });
        //CompletableFuture<Void> f =  CompletableFuture.allOf(future1,future2);
        CompletableFuture<Object> f = CompletableFuture.anyOf(future1, future2);
        System.out.println(f.join());
    }

通过上面的介绍,应该把CompletableFuture的方法和功能介绍完了(cancel、isCompletedExceptionally()、isDone()以及继承于Object的方法无需介绍了, toCompletableFuture()返回CompletableFuture本身)

CompletableFuture的同步回调和异步回调

CompletableFuture 根据任务的主从关系为:

  • 提交任务的方法,如静态方法 supplyAsync(supplier[, executor]), runAsync(runnable[, executor])
  • 回调函数,即对任务执行后所作出回应的方法,多数方法了,如 thenRun(action), thenRunAsync(action[, executor]), whenComplete(action), whenCompleteAsync(action[, executor]) 等
    – 注意:每个方法都有带Async后缀和不带此后缀的方法(下面会说区别)

根据执行方法可分为同步与异步方法,任务都是要被异步执行(请注意:任务的执行都是异步的)。所以提交任务的方法都是异步的。而对任务作出回应的方法很多分为两个版本,如

  • 同步回应,如 thenRun(action), whenComplete(action)
  • 异步回应,如 thenRunAsync(action[, executor]), whenCompleteAsync(action[, executor]), 异步方法可以传入线程池,否则用默认的。可参考:理解 CompletableFuture 的任务与回调函数的线程 和 CompletableFuture的async后缀函数与不带async的函数的区别

更上一层楼(手写sequence方法)

如果用过Guava的Future类,就会知道它的Futures辅助类提供了很多便利方法,用来处理多个Future,而不像Java的CompletableFuture,只提供了allOf、anyOf两个方法。

比如有这样一个需求,将多个CompletableFuture组合成一个CompletableFuture,这个组合后的CompletableFuture的计算结果是个List,它包含前面所有的CompletableFuture的计算结果,guava的Futures.allAsList可以实现这样的功能,但是对于java CompletableFuture,我们需要一些辅助方法:

 /**
     * 可以把多个futures序列化起来  最终返回一个装载有结果的CompletableFuture即可  调用join方法就够了
     * 当然只能是同一类型哦(返回的结果)
     *
     * @param <T>     the type parameter
     * @param futures the futures
     * @return the completable future
     */
    public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
        //通过allOf方法把所有的futures放到一起  返回Void
        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        //遍历把每一个futures通过join方法把结果拿到  最终给返回出去 并且是用CompletableFuture包装着的
        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
    }

或者Java Future转CompletableFuture:

public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }, executor);
}

使用案例:

    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        });
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 200;
        });
        CompletableFuture<List<Integer>> resultList = sequence(Arrays.asList(future1, future2));

        System.out.println(resultList.join()); //[100, 200]
    }

事实上,如果每个操作都很简单的话(比如:我们仅仅是getById()这种查询)没有必要用这种多线程异步的方式,因为创建线程还需要时间,还不如直接同步执行来得快。

事实证明,只有当每个操作很复杂需要花费相对很长的时间(比如,调用多个其它的系统的接口;比如,商品详情页面这种需要从多个系统中查数据显示的)的时候用CompletableFuture才合适,不然区别真的不大,还不如顺序同步执行。


CompletableFuture避坑

需要自定义线程池

CompletableFuture默认使用的线程池是 ForkJoinPool.commonPool(),commonPool是当前 JVM(进程) 上的所有 CompletableFuture、并行 Stream 共享的,commonPool 的目标场景是非阻塞的 CPU 密集型任务,其线程数默认为 CPU 数量减1,所以对于我们用java常做的IO密集型任务,默认线程池是远远不够使用的;在双核及以下机器上,默认线程池又会退化为为每个任务创建一个线程,相当于没有线程池。

以runAsync的代码举例,不指定线程池时,使用的是ASYNC_POOL,而这个ASYNC_POOL的大小,是根据 CPU 核数计算出来的(COMMON_PARALLELISM)如果COMMON_PARALLELISM小于1,USE_COMMON_POOL为false(此时ForkJoinPool.commonPool()不支持并发),直接退化为 ThreadPerTaskExecutor,每个任务新开一个线程。

关于ForkJoinPool.commonPool()查看这篇

Java—并发编程(八)线程池— (7) ForkJoinPool

下面是部分代码及注释,解释一下为何会这样。

这段用来计算ForkJoinPool.commonPool()的线程池大小的

static {
        // initialize field offsets for CAS etc
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ForkJoinPool.class;
            CTL = U.objectFieldOffset
                (k.getDeclaredField("ctl"));
            RUNSTATE = U.objectFieldOffset
                (k.getDeclaredField("runState"));
            STEALCOUNTER = U.objectFieldOffset
                (k.getDeclaredField("stealCounter"));
            Class<?> tk = Thread.class;
            PARKBLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            Class<?> wk = WorkQueue.class;
            QTOP = U.objectFieldOffset
                (wk.getDeclaredField("top"));
            QLOCK = U.objectFieldOffset
                (wk.getDeclaredField("qlock"));
            QSCANSTATE = U.objectFieldOffset
                (wk.getDeclaredField("scanState"));
            QPARKER = U.objectFieldOffset
                (wk.getDeclaredField("parker"));
            QCURRENTSTEAL = U.objectFieldOffset
                (wk.getDeclaredField("currentSteal"));
            QCURRENTJOIN = U.objectFieldOffset
                (wk.getDeclaredField("currentJoin"));
            Class<?> ak = ForkJoinTask[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }

        commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
        defaultForkJoinWorkerThreadFactory =
            new DefaultForkJoinWorkerThreadFactory();
        modifyThreadPermission = new RuntimePermission("modifyThread");

        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;
    }

关键的来了

     // ForkJoinPool.commonPool()线程池大小为1或0,就不使用ForkJoinPool.commonPool()了
     private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    // 为每个任务开一个线程的线程工厂
    /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
    static final class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) { new Thread(r).start(); }
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} after
     * it runs the given action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable){
        return asyncRunStage(asyncPool, runnable);
    }

自定义线程池例子

        final Integer AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

        void com() throws InterruptedException {
        for (int i=0 ;i<10 ;i++) {
            // JDK 自带
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(Thread.currentThread().getName());
            }, new ThreadPoolExecutor(AVAILABLE_PROCESSORS, 3 * AVAILABLE_PROCESSORS,
                    10, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(20),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()));
        }
        Thread.sleep(10000000);
    }

allOf()超时时间

allOf()超时时间不合理的后果,就是引发错误,可能导致OOM

 public static void main(String[] args) {
        List<CompletableFuture> futures = new ArrayList<>(3);
        CompletableFuture first = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("first");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
        futures.add(first);
        Long startTime = System.currentTimeMillis();
        log.info("开始等待所有线程结束,时间:{}", startTime);
        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.error("等待所有线程结束发生异常:", e);
        } finally {
            Long endTime = System.currentTimeMillis();
            log.info("等待结束,时间:{}, 耗时:{}秒", startTime, (endTime-startTime)/1000);
            try {
                log.info(JSONObject.toJSONString(first));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

输出结果

18:28:48.100 [main] INFO com.enmalvi.Spring_ListenableFuture.ListenableFuture - 开始等待所有线程结束,时间:1661509728099
18:28:51.107 [main] ERROR com.enmalvi.Spring_ListenableFuture.ListenableFuture - 等待所有线程结束发生异常:
java.util.concurrent.TimeoutException: null
	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
	at com.enmalvi.Spring_ListenableFuture.ListenableFuture.main(ListenableFuture.java:108)
18:28:51.108 [main] INFO com.enmalvi.Spring_ListenableFuture.ListenableFuture - 等待结束,时间:1661509728099, 耗时:3秒
18:28:51.150 [main] INFO com.enmalvi.Spring_ListenableFuture.ListenableFuture - {"cancelled":false,"completedExceptionally":false,"done":false,"numberOfDependents":1}

可以看到,设置allof的等待时间(2s)比线程实际执行时间(5s)短,等待超时结束时,线程并没有被杀死,也没有被取消,也没有抛异常,这个线程继续运行着,直到正常结束。

这会有什么问题呢?

如果接口中存在这种代码,大量请求超时返回,但是实际子线程还在运行,接口又接收更多的请求,继续创建子线程(没有使用线程池)或者进入线程池排队,前者会导致 OOM,后者则会加剧接口超时(任务都在排队,等不到执行)导致接口乃至整个服务完全不可用。

实际代码中我们有很多io操作,一般都会设置超时时间(不设置的后果很严重),如果子线程中调用了这些方法,那么allof()方法的超时时间一定要大于最大的超时时间(如果有串行操作,还需要累加。

线程池的DiscardPolicy()导致整个程序卡死

CompletableFuture处理多线程任务时一般建议自定义线程池,线程池有个容量满了的处理策略:

  • ThreadPoolExecutor.DiscardPolicy()
  • ThreadPoolExecutor.DiscardOldestPolicy()
  • ThreadPoolExecutor.AbortPolicy()
  • ThreadPoolExecutor.CallerRunsPolicy()

拒绝策略的介绍

Java—并发编程(八)线程池– (5) 线程池的原理

开始使用jdk8的新API CompletableFuture 之后,CompletableFuture.allOf()方法或者get()方法等待所有CompletableFuture执行完时,如果采用丢弃策略(包括自定义的不抛异常),则allOf()方法和get()方法如果没有设置超时就会无限期的等待下去。


ListenableFuture

ListenableFuture 顾名思义可以监听Future,它是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率底下。使用ListenableFuture guava帮我们检测Future是否完成了,如果完成了就自动调用回调函数,这样可以减少并发程序的复杂度。

这里的ListenableFuture是Spring提供的,当然Guava包上也有ListenableFuture,其实Spring是抄袭Guava的

我们都知道Future及其唯一实现类FutureTask的作用: 对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

但是有一些弊端:

它有可能已经完成了计算并返回结果,也有可能至今还没完成。 我们只能手动的判断时候处理完成,以及处理完成后,怎么做。比如:有结果对结果怎么处理?如果出现异常怎么处理?等等。

由此就引入了CompletableFuture和ListenableFuture

而CompletableFuture上面已经讲过了,这里就讲讲ListenableFuture

比较 ListenableFuture 与 Future

org.springframework.util.concurrent.ListenableFuture
java.util.concurrent.Future

这两个类最关键的在于ListenableFuture增加了回调函数

void addCallback(ListenableFutureCallback<? super T> callback);
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);

其实在Spring上还有一个类ThreadPoolTaskExecutor,这个类是 Spring 在 ThreadPoolExecutor 的基础上进行了一层封装。ListenableFuture一般都是和ThreadPoolTaskExecutor搭配起来一起使用的。

java.util.concurrent.ThreadPoolExecutor
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

相比 ThreadPoolExecutor,ThreadPoolTaskExecutor 增加了 submitListenable 方法,该方法返回 ListenableFuture 接口对象,该接口完全抄袭了 google 的 guava。

ListenableFuture 接口对象,增加了线程执行完毕后成功和失败的回调方法。从而避免了 Future 需要以阻塞的方式调用 get,然后再执行成功和失败的方法。

ThreadPoolTaskExecutor 中具体的初始化线程池方法如下:

@Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);

		}

		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

		this.threadPoolExecutor = executor;
		return executor;
	}

如何使用 ThreadPoolTaskExecutor

public static void main(String[] args) throws Exception {
        /**
         * ThreadPoolExecutor 是 JDK 自带,ThreadPoolTaskExecutor 是 Spring 在 ThreadPoolExecutor 的基础上进行了一层封装。
         * 相比 ThreadPoolExecutor,ThreadPoolTaskExecutor 增加了 submitListenable 方法,该方法返回 ListenableFuture 接口对象,
         * 该接口完全抄袭了 google 的 guava。
         */
        ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
        executorService.setCorePoolSize(2);
        executorService.setMaxPoolSize(2);
        executorService.setKeepAliveSeconds(60);
        executorService.setQueueCapacity(Integer.MAX_VALUE);
        executorService.initialize();
        /**
         * 通过 new 获取 ThreadPoolTaskExecutor 对象
         * 通过 setCorePoolSize 等方法可以配置线程池相关参数
         * 最重要的是通过 initialize 方法完成线程池初始化,否则抛出:java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized 异常
         * 调用 submitListenable 方法返回 ListenableFuture 对象
         * 调用 ListenableFuture 对象的 addCallback 方法增加 成功和失败的回调处理
         * 其中成功的回调对象实现了 SuccessCallback 接口,其中的方法为:void onSuccess(T result)
         * 其中失败的回调对象实现了 FailureCallback 接口,其中的方法为:void onFailure(Throwable ex)
         */

        /**
         * ListenableFuture 接口继承 Future 接口,并增加了如下两个方法:
         * void addCallback(ListenableFutureCallback<? super T> callback);
         * void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
         */

        /**
         * ListenableFuture 接口对象,增加了线程执行完毕后成功和失败的回调方法。
         * 从而避免了 Future 需要以阻塞的方式调用 get,然后再执行成功和失败的方法。
         */
        executorService.submitListenable(() -> {

            // 休息 5 秒,模拟工作的情况
            TimeUnit.SECONDS.sleep(5);
            // 通过抛出 RuntimeException 异常来模拟异常
            //throw new RuntimeException("出现异常");
            return true;

        }).addCallback(data -> log.info("success,result = {}", data), ex -> log.info("**异常信息**:{}"));
    }

ThreadPoolTaskExecutor 的 submitListenable 方法,传入一个 Runnable 或者 Callable 对象,实际上 Runnable 或者 Callable 对象被包装到 ListenableFutureTask 对象中,然后提交到 ExecutorService 对象,最后返回的是 ListenableFutureTask 对象,具体如下

	@Override
	public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
		ExecutorService executor = getThreadPoolExecutor();
		try {
			ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
			executor.execute(future);
			return future;
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}

在 ListenableFutureTask 中可以发现其继承了 FutureTask 对象并实现了 ListenableFuture 对象, 其中 FutureTask 对象中的 run 是最终线程执行的方法,具体如下

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

上面 run 方法中的 set 方法将线程的执行结果通知出去,在 set 方法中可以发现其调用了 finishCompletion 方法,finishCompletion 方法会一直循环判断线程池中的队列的任务是否执行完毕,一旦执行完毕就会调用 done 方法

ListenableFutureTask 重写了 done 方法, 在正常执行完毕的情况下通过 this.callbacks.success(result) 调用成功回调函数,在出现 InterruptedException 异常的情况下既不会调用 成功的回调,也不会调用失败的回调,其他类型的异常出现的时候才会通过 this.callbacks.failure(cause) 调用失败回调函数,ListenableFutureTask 中的 done 方法具体如下:

	@Override
	protected void done() {
		Throwable cause;
		try {
			T result = get();
			this.callbacks.success(result);
			return;
		}
		catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
			return;
		}
		catch (ExecutionException ex) {
			cause = ex.getCause();
			if (cause == null) {
				cause = ex;
			}
		}
		catch (Throwable ex) {
			cause = ex;
		}
		this.callbacks.failure(cause);
	}
  • 从上面的流程分析中,可以发现:因为 FutureTask 中重写了 run 方法,所以才实现了线程执行完毕后可以执行回调方法,其中使用了模板方法设计模式。
  • 模板方法设计模式的主要特点在于:在接口中定义方法,在抽象类中实现方法,并定义抽象方法,实现的方法中又调用抽象方法,最终的子类中重写抽象方法。
  • Runnable 和 ListenableFuture 是接口, FutureTask 是抽象类,ListenableFutureTask 是最终的子类

ListenableFuture 的好处以及 Future 带来的阻塞问题

  • ListenableFuture 相比 Future 是不需要知道 执行结果的情况下就可以将 成功或者失败的业务代码 通过回调的方式 预埋,带来的好处就是异步,不需要阻塞当前线程,从而可以提高系统的吞吐量;
  • Future 需要通过 get() 方法阻塞当前线程,在获取线程的执行结果后再根据执行结果编写相关的业务代码;

例子来演示下:

通过 1 个线程,循环执行 10000 次,使用 submitListenable 方法

    public static void main(String[] args) {
        ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
        executorService.setCorePoolSize(1);
        executorService.setMaxPoolSize(1);
        executorService.initialize();

        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            ListenableFuture<Boolean> asyncResult = executorService.submitListenable(() -> {

                // 休息5毫秒,模拟执行
                TimeUnit.MILLISECONDS.sleep(5);
                //throw new RuntimeException("出现异常");
                return true;

            });
            asyncResult.addCallback(data -> {
                try {
                    // 休息3毫秒模拟获取到执行结果后的操作
                    TimeUnit.MILLISECONDS.sleep(3);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, ex -> log.info("**异常信息**:{}"));
        }
        System.out.println(String.format("总结耗时:%s", System.currentTimeMillis() - start));
    }

通过测试3次,总计耗时在 30 – 60 毫秒之间


通过 1 个线程,循环执行 10000 次,使用 submit 方法和 Future 对象,具体如下:

public static void main(String[] args) {
    ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
    executorService.setCorePoolSize(1);
    executorService.setMaxPoolSize(1);
    executorService.initialize();

    long start = System.currentTimeMillis();
    for (int i = 0; i < 10000; i++) {
        Future<Boolean> future = executorService.submit(() -> {
            try {
                // 休息5毫秒,模拟执行
                TimeUnit.MILLISECONDS.sleep(5);
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        });

        try {
            // 以阻塞的方式获取执行结果
            Boolean result = future.get();
            // logger.info(String.format("执行结果:%s", result));
            // 休息3毫秒模拟获取到执行结果后的操作
            TimeUnit.MILLISECONDS.sleep(3);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
    System.out.println(String.format("总结耗时:%s", System.currentTimeMillis() - start));
}

Future只是测试了一次,总结耗时:99377

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

3 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 294点赞 593752浏览

随机文章
SpringSecurity—异常处理机制
5年前
Git基操学习—3
5年前
RabbitMQ—工作队列模式
5年前
SpringBoot—扩展接口
2年前
SpringSecurity—new 出来的对象一样也可以被 Spring 容器管理
4年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1927 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 593752 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付