User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Java   ›   正文
Java

Java—并发编程(六)JUC锁 – (12) Phaser

2021-12-12 01:57:14
716  0 1
参考目录 隐藏
1) 使用示例
2) Phaser 介绍
3) 重要接口介绍
4) Phaser 的监控方法
5) Phaser 的分层结构

阅读完需:约 16 分钟

java多线程技术提供了Phaser工具类,Phaser表示“阶段器”,用来解决控制多个线程分阶段共同完成任务的情景问题。其作用相比CountDownLatch和CyclicBarrier更加灵活。

Phaser是按照不同的阶段对线程进行执行,就是它本身是维护者一个阶段这样的成员变量,当前我是执行到哪个阶段,是第0个,还是第1个阶段,每个阶段不同的时候这个线程都可以往前走,有的线程走到某个阶段停了,有的线程一直会走到结束。

你的程序中如果说用到好几个阶段执行,而且有的阶段必须得几个线程共同参与的一种情况下就会使用到Phaser。

下面来看一个小例子。模拟了一个结婚的场景,结婚是好多人都要参加的,因此,我们写了一个类Person是一个runnable可以new出来的,扔给Thread去执行,模拟我们每个人要做的事情。有这么几个方法 arrive() ,eat(). leave(), hug() 。作为一个婚礼来讲,它会分成几个阶段,第一阶段大家都到齐,第二阶段开始吃饭,第三个阶段大家离开,最后新郎新娘进入洞房。

public class Phaser_Demo {
 
    static Random r = new Random();
 
    static MarriagePhaser phaser = new MarriagePhaser();
 
    static void milliSleep(int milli){
        try {
            TimeUnit.MICROSECONDS.sleep(milli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
 
    public static void main(String[] args) {
        phaser.bulkRegister(7);
 
        for (int i = 0; i < 5; i++) {
            new Thread(new Person("p" + i)).start();
        }
 
        new Thread(new Person("新郎")).start();
        new Thread(new Person("新娘")).start();
 
 
    }
 
    static class MarriagePhaser extends Phaser{
 
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
 
            switch (phase){
                case 0 :
                    System.out.println("所有人都到齐了!" + registeredParties);
                    System.out.println();
                    return false;
                case 1:
                    System.out.println("所有人都吃完了!" + registeredParties);
                    System.out.println();
                    return false;
                case 2:
                    System.out.println("所有人都离开了!" + registeredParties);
                    System.out.println();
                    return false;
                case 3:
                    System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
                    System.out.println();
                    return true;
                default:
                    return true;
            }
        }
    }
 
    static class Person implements Runnable{
 
        String name;
 
        public Person(String name) {
            this.name = name;
        }
 
        public void arrive(){
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 到达现场!\n", name);
            phaser.arriveAndAwaitAdvance();
        }
 
        public void eat(){
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 吃完!\n", name);
            phaser.arriveAndAwaitAdvance();
        }
 
        public void leave(){
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 离开!\n", name);
            phaser.arriveAndAwaitAdvance();
        }
 
        public void hug(){
            if(name.equals("新郎") || name.equals("新娘")){
                milliSleep(r.nextInt(1000));
                System.out.printf("%s 洞房!\n", name);
                phaser.arriveAndAwaitAdvance();
            }else {
                phaser.arriveAndDeregister();
            }
        }
 
 
        @Override
        public void run() {
            arrive();
            eat();
            leave();
            hug();
        }
    }
}

看下主程序,一个有5个嘉宾来参加婚礼,加上新郎,新娘一共七个人。

线程start 会调用people的run方法,依次执行 arrive, eat, leave, hug 方法。

那好,我们在每个阶段是不是都应该控制好人数,第一个阶段人都到齐了,婚礼才能开始。第二阶段所有人吃饭,第三阶段所有人离开,但是到了第四阶段并不是所有人进入洞房。所以,要把婚礼分成几个阶段,而且每个阶段必须等相应的人干完自己的事情,才能进入下一阶段。

看下程序的输出:

p0 到达现场!
p2 到达现场!
p1 到达现场!
p4 到达现场!
p3 到达现场!
新郎 到达现场!
新娘 到达现场!
所有人都到齐了!7

新娘 吃完!
p0 吃完!
p2 吃完!
p3 吃完!
新郎 吃完!
p1 吃完!
p4 吃完!
所有人都吃完了!7

p3 离开!
新娘 离开!
p4 离开!
p0 离开!
新郎 离开!
p2 离开!
p1 离开!
所有人都离开了!7

新娘 洞房!
新郎 洞房!
婚礼结束!新郎新娘抱抱!2

程序定义了一个 marriagePhaser 继承了phaser,并重写了onAdvance方法,所有的程序第一次调用arriveAndAwaitAdvance这个方法,phaser的onAdvance方法会自动调用,phase参数代表第几阶段,初始为0; registeredParties 代表这个阶段有几个线程参加。返回false进入下一阶段,返回true,所有线程结束,phaser整个栅栏组结束。 phaser.arriveAndDeregister() 取消参与后面的阶段, phaser.register 往上增加,不仅可以控制栅栏上的个数还可以控制栅栏上的等待数量。


使用示例

我们来实现一个小需求,启动 10 个线程执行任务,由于启动时间有先后,我们希望等到所有的线程都启动成功以后再开始执行,让每个线程在同一个起跑线上开始执行业务操作。

下面,分别介绍 CountDownLatch、CyclicBarrier 和 Phaser 怎么实现该需求。

1、这种 case 最容易使用的就是 CountDownLatch,代码很简单:

// 1. 设置 count 为 1
CountDownLatch latch = new CountDownLatch(1);

for (int i = 0; i < 10; i++) {
    new Thread(() -> {
        try {
            // 2. 每个线程都等在栅栏这里,等待放开栅栏,不会因为有些线程先启动就先跑路了
            latch.await();

            // doWork();

        } catch (InterruptedException ignore) {
        }
    }).start();
}

doSomethingELse(); // 确保在下面的代码执行之前,上面每个线程都到了 await() 上。

// 3. 放开栅栏
latch.countDown();

简单回顾一下 CountDownLatch 的原理:AQS 共享模式的典型使用,构造函数中的 1 是设置给 AQS 的 state 的。latch.await() 方法会阻塞,而 latch.countDown() 方法就是用来将 state– 的,减到 0 以后,唤醒所有的阻塞在 await() 方法上的线程。

2、这种 case 用 CyclicBarrier 来实现更简单:

// 1. 构造函数中指定了 10 个 parties
CyclicBarrier barrier = new CyclicBarrier(10);

for (int i = 0; i < 10; i++) {
    executorService.submit(() -> {
        try {
            // 2. 每个线程"报告"自己到了,
            //    当第10个线程到的时候,也就是所有的线程都到齐了,一起通过
            barrier.await();

            // doWork()

        } catch (InterruptedException | BrokenBarrierException ex) {
            ex.printStackTrace();
        }
    });
}

CyclicBarrier 的原理不是 AQS 的共享模式,是 AQS Condition 和 ReentrantLock 的结合使用

CyclicBarrier 可以被重复使用,我们这里只使用了一个周期,当第十个线程到了以后,所有的线程一起通过,此时开启了新的一个周期,在 CyclicBarrier 中,周期用 generation 表示。

3、我们来介绍今天的主角 Phaser,用 Phaser 实现这个需求也很简单:

Phaser phaser = new Phaser();
// 1. 注册一个 party
phaser.register();

for (int i = 0; i < 10; i++) {

    phaser.register();

    executorService.submit(() -> {
        // 2. 每个线程到这里进行阻塞,等待所有线程到达栅栏
        phaser.arriveAndAwaitAdvance();

        // doWork()
    });
}
phaser.arriveAndAwaitAdvance();

Phaser 比较灵活,它不需要在构造的时候指定固定数目的 parties,而 CountDownLatch 和 CyclicBarrier 需要在构造函数中明确指定一个数字。

我们可以看到,上面的代码总共执行了 11 次 phaser.register() ,可以把 11 理解为 CountDownLatch 中的 count 和 CyclicBarrier 中的 parties。

这样读者应该很容易理解 phaser.arriveAndAwaitAdvance() 了,这是一个阻塞方法,直到该方法被调用 11 次,所有的线程才能同时通过。

这里和 CyclicBarrier 是一个意思,凑齐了所有的线程,一起通过栅栏。

Phaser 也有周期的概念,一个周期定义为一个 phase,从 0 开始。

Phaser 介绍

上面我们介绍了 Phaser 中的两个很重要的接口,register() 和 arriveAndAwaitAdvance(),这节我们来看它的其他的一些重要的接口使用。

画一张图压着:

重要接口介绍

Phaser 还是有 parties 概念的,但是它不需要在构造函数中指定,而是可以很灵活地动态增减。

我们来看 3 个代码片段,看看 parties 是怎么来的。

1、首先是 Phaser 有一个带 parties 参数的构造方法:

public Phaser(int parties) {
    this(null, parties);
}

2、register() 方法:

public int register() {
    return doRegister(1);
}

这个方法会使得 parties 加 1

3、bulkRegister(int parties) 方法:

public int bulkRegister(int parties) {
    if (parties < 0)
        throw new IllegalArgumentException();
    if (parties == 0)
        return getPhase();
    return doRegister(parties);
}

一次注册多个,这个方法会使得 parties 增加相应数值

parties 也可以减少,因为有些线程可能在执行过程中,不和大家玩了,会进行退出,调用 arriveAndDeregister() 即可,这个方法的名字已经说明了它的用途了。

再看一下这个图,phase-1 结束的时候,黑色的线程离开了大家,此时就只有 3 个 parties 了。

这里说一下 Phaser 的另一个概念 phase,它代表 Phaser 中的周期或者叫阶段,phase 从 0 开始,一直往上递增。

通过调用 arrive() 或 arriveAndDeregister() 来标记有一个成员到达了一个 phase 的栅栏,当所有的成员都到达栅栏以后,开启一个新的 phase。

这里我们来看看和 phase 相关的几个方法:

1、arrive()

这个方法标记当前线程已经到达栅栏,但是该方法不会阻塞,注意,它不会阻塞。

大家要理解一点,party 本和线程是没有关系的,不能说一个线程代表一个 party,因为我们完全可以在一个线程中重复调用 arrive() 方法。这么表达纯粹是方便理解用。

2、arriveAndDeregister()

和上面的方法一样,当前线程通过栅栏,非阻塞,但是它执行了 deregister 操作,意味着总的 parties 减 1。

3、arriveAndAwaitAdvance()

这个方法应该一目了然,就是等其他线程都到了栅栏上再一起通过,进入下一个 phase。

4、awaitAdvance(int phase)

这个方法需要指定 phase 参数,也就是说,当前线程会进行阻塞,直到指定的 phase 打开。

5、protected boolean onAdvance(int phase, registeredParties)

这个方法是 protected 的,所以它不是 phaser 提供的 API,从方法名字上也可以看出,它会在一个 phase 结束的时候被调用。

它的返回值代表是否应该终结(terminate)一个 phaser,之所以拿出来说,是因为我们经常会见到有人通过覆写该方法来自定义 phaser 的终结逻辑,如:

protected boolean onAdvance(int phase, int registeredParties) {
    return phase >= N || registeredParties == 0;
}

1、我们可以通过 phaser.isTerminated() 来检测一个 phaser 实例是否已经终结了

2、当一个 phaser 实例被终结以后,register()、arrive() 等这些方法都没有什么意义了,大家可以玩一玩,观察它们的返回值,原本应该返回 phase 值的,但是这个时候会返回一个负数。

Phaser 的监控方法

介绍下几个用于返回当前 phaser 状态的方法:

getPhase():返回当前的 phase,前面说了,phase 从 0 开始计算,最大值是 Integer.MAX_VALUE,超过又从 0 开始

getRegisteredParties():当前有多少 parties,随着不断地有 register 和 deregister,这个值会发生变化

getArrivedParties():有多少个 party 已经到达当前 phase 的栅栏

getUnarrivedParties():还没有到达当前栅栏的 party 数

Phaser 的分层结构

Tiering 这个词本身就不好翻译,大家将就一下,要表达的意思就是,将多个 Phaser 实例构造成一棵树。

1、第一个问题来了,为什么要把多个 Phaser 实例构造成一棵树,解决什么问题?有什么优点?

Phaser 内部用一个 state 来管理状态变化,随着 parties 的增加,并发问题带来的性能影响会越来越严重。

/**
 * 0-15: unarrived
 * 16-31: parties,   所以一个 phaser 实例最大支持 2^16-1=65535 个 parties
 * 32-62: phase,     31 位,那么最大值是 Integer.MAX_VALUE,达到最大值后又从 0 开始
 * 63: terminated
 */
private volatile long state;

通常我们在说 0-15 位这种,说的都是从低位开始的

state 的各种操作依赖于 CAS,典型的无锁操作,但是,在大量竞争的情况下,可能会造成很多的自旋。

而构造一棵树就是为了降低每个节点(每个 Phaser 实例)的 parties 的数量,从而有效降低单个 state 值的竞争。

2、第二个问题,它的结构是怎样的?

这里我们不讲源码,用通俗一点的语言表述一下。我们先写段代码构造一棵树:

Phaser root = new Phaser(5);

Phaser n1 = new Phaser(root, 5);
Phaser n2 = new Phaser(root, 5);

Phaser m1 = new Phaser(n1, 5);
Phaser m2 = new Phaser(n1, 5);
Phaser m3 = new Phaser(n1, 5);

Phaser m4 = new Phaser(n2, 5);

根据上面的代码,我们可以画出下面这个很简单的图:

这棵树上有 7 个 phaser 实例,每个 phaser 实例在构造的时候,都指定了 parties 为 5,但是,对于每个拥有子节点的节点来说,每个子节点都是它的一个 party,我们可以通过 phaser.getRegisteredParties() 得到每个节点的 parties 数量:

  • m1、m2、m3、m4 的 parties 为 5
  • n1 的 parties 为 5 + 3,n2 的 parties 为 5 + 1
  • root 的 parties 为 5 + 2

结论应该非常容易理解,我们来阐述一下过程。

在子节点注册第一个 party 的时候,这个时候会在父节点注册一个 party,注意这里说的是子节点添加第一个 party 的时候,而不是说实例构造的时候。

在上面代码的基础上,大家可以试一下下面的这个代码:

Phaser m5 = new Phaser(n2);
System.out.println("n2 parties: " + n2.getRegisteredParties());
m5.register();
System.out.println("n2 parties: " + n2.getRegisteredParties());

第一行代码中构造了 m5 实例,但是此时它的 parties == 0,所以对于父节点 n2 来说,它的 parties 依然是 6,所以第二行代码输出 6。第三行代码注册了 m5 的第一个 party,显然,第四行代码会输出 7。

当子节点的 parties 降为 0 的时候,会从父节点中”剥离”,我们在上面的基础上,再加两行代码:

m5.arriveAndDeregister();
System.out.println("n2 parties: " + n2.getRegisteredParties());

由于 m5 之前只有一个 parties,所以一次 arriveAndDeregister() 就会使得它的 parties 变为 0,此时第二行代码输出父节点 n2 的 parties 为 6。

还有一点有趣的是(其实也不一定有趣吧),在非树的结构中,此时 m5 应该处于 terminated 状态,因为它的 parties 降为 0 了,不过在树的结构中,这个状态由 root 控制,所以我们依然可以执行 m5.register()…

3、每个 phaser 实例的 phase 周期有快有慢,怎么协调的?

在组织成树的这种结构中,每个 phaser 实例的 phase 已经不受自己控制了,由 root 来统一协调,也就是说,root 当前的 phase 是多少,每个 phaser 的 phase 就是多少。

那又有个问题,如果子节点的一个周期很快就结束了,要进入下一个周期怎么办?需要等!这个时候其实要等所有的节点都结束当前 phase,因为只有这样,root 节点才有可能结束当前 phase。

我觉得 Phaser 中的树结构我们要这么理解,我们要把整棵树当做一个 phaser 实例,每个节点只是辅助用于降低并发而存在,整棵树还是需要满足 Phaser 语义的。

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

1 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 295点赞 599645浏览

随机文章
HttpMessageConverter—关于一个字符串到java对象的转化
5年前
Java—Future与FutureTask的区别与联系
5年前
Spring Boot 项目中的 parent
5年前
FastDFS—— 构建分布式文件管理系统
5年前
PostgreSQL—自定义函数
11个月前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1936 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 599645 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付