User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Web   ›   RabbitMQ   ›   正文
RabbitMQ

SpringBoot——RabbitMQ初步整合

2020-05-30 00:58:22
869  0 1
参考目录 隐藏
1) 何为队列(queue)?
2) 何为消息队列(Message queue)?
3) 何为RabbitMq?
4) ConnectionFactory、Connection、Channel
5) Queue( 队列 )
6) Message acknowledgment( 消息确认 )
7) Message durability( 持久化 )
8) Prefetch count
9) Exchange( 交换器 )
10) routing key
11) Binding( 绑定 )
12) Binding key(应该就是 Queue 的名字)
13) Exchange Types
14) 常用命令

阅读完需:约 15 分钟

何为队列(queue)?

queue在计算机科学中随处可见,Queue是一个存储、组织数据的数据结构,其最大的特性就是FIFO;

rabbitmq中queue是RabbitMQ的内部对象,用于存储消息

何为消息队列(Message queue)?

服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信);
消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯 (异步通信)

何为RabbitMq?

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言。

前几篇关于RabbitMQ的文章(搜索RabbitMQ):

RabbitMQ—简单队列模式

介绍几个关于RabbitMQ的关键内容:

ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。

Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

Queue( 队列 )

Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。

RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

Message acknowledgment( 消息确认 )

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。

如果一个Queue没被任何的Consumer Subscribe(订阅),当有数据到达时,这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer。这个数据被Consumer正确收到时,这个数据就被从Queue中删除。

那么什么是正确收到呢?通过ACK。每个Message都要被acknowledged(确认,ACK)。我们可以显示的在程序中去ACK,也可以自动的ACK。如果有数据没有被ACK,那么RabbitMQ Server会把这个信息发送到下一个Consumer。

Message durability( 持久化 )

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。

交换器持久化可以通过在声明队列时将durable参数设置为true。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器了。

队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。队列的持久化可以通过在声明队列时将durable参数设置为true。

设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依然存在。如果只设置队列持久化或者消息持久化,重启之后消息都会消失。

当然,也可以将所有的消息都设置为持久化,但是这样做会影响RabbitMQ的性能,因为磁盘的写入速度比内存的写入要慢得多。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。鱼和熊掌不可兼得,关键在于选择和取舍。在实际中,需要根据实际情况在可靠性和吞吐量之间做一个权衡。

Prefetch count

前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

Exchange( 交换器 )

在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。

Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。
RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略。

routing key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
RabbitMQ为routing key设定的长度限制为255 bytes。

Binding( 绑定 )

RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。

Binding key(应该就是 Queue 的名字)

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

Exchange Types

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述)

下面分别与SpringBoot整合进行介绍:

总目录:

direct( 直接转发路由 )

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。

RabbitDirectConfig:

@Configuration
public class RabbitDirectConfig {
    public final static String DIRECTNAME="enmalvi";

    @Bean //队列
    Queue queue(){
        return new Queue("hello.java");
    }

   //这个可以不需要 交换器和绑定
    @Bean //交换器
    DirectExchange directExchange(){
        return new DirectExchange(DIRECTNAME,true,false);
    }

    @Bean //绑定
    Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
    }
}

DirectReceiver:

@Component
public class DirectReceiver {
    //队列监听
    @RabbitListener(queues = "hello.java")
    public  void handlerl(String msg) {
        System.out.println("handlerl>>>"+msg);
    }
}

DemoApplicationTests:

//测试
@SpringBootTest
class DemoApplicationTests {

@Autowired
    RabbitTemplate rabbitTemplate;

@Test
    void contextLoads() {
            rabbitTemplate.convertAndSend("hello.java","第一次bootRabBitMQ");
            }
}

fanout( 复制分发路由)

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 原理是不需要routkey,当exchange收到消息后,将消息复制多份转发给与自己绑定的消息队列

上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

RabbitFanoutConfig:

@Configuration
public class RabbitFanoutConfig {
    public static  final String FANOUTNAME="enmalvi-Fanout";

    //指定两个队列
    @Bean
    Queue queueOne(){
        return new Queue("queue-one");
    }
    @Bean
    Queue queueTwo(){
        return new Queue("queue-two");
    }
    //交换机绑定队列名称
    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUTNAME,true,false);
    }
    //绑定交换机和Queue
    @Bean
    Binding bindingOne(){
        return BindingBuilder.bind(queueOne()).to(fanoutExchange());
    }
    @Bean
    Binding bindingTwo(){
        return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
    }
}

FanoutReceiver:

@Component
public class FanoutReceiver {
    @RabbitListener(queues = "queue-one")
    public void handlerl(String msg){
        System.out.println("Fanout》》》1111"+msg);
    }

    @RabbitListener(queues = "queue-two")
    public void handlerl2(String msg){
        System.out.println("Fanout》》》2222"+msg);
    }
}

DemoApplicationTests:

@SpringBootTest
class DemoApplicationTests {

@Autowired
    RabbitTemplate rabbitTemplate;

@Test
public void test1(){
     rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"helloFanout");
        }
}

topic( 通配路由)

是direct exchange的通配符模式

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

  • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key与routing key一样也是句点号“. ”分隔的字符串
  • binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

RabbitTopicConfig:

@Configuration
public class RabbitTopicConfig {
    //队列名称
    public static final String TOPICNAME="java-topic";
    
    //交换器绑定队列
    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange(TOPICNAME,true,false);
    }
    
    //创建几个Queue队列
    @Bean
    Queue xiaomi(){
        return new Queue("xiaomi");
    }

    @Bean
    Queue huawei(){
        return new Queue("huawei");
    }

    @Bean
    Queue phone(){
        return new Queue("phone");
    }

    //绑定交换器和Queue
    @Bean
    Binding xiangmiBinding(){
        return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
    }

    @Bean
    Binding huaweioBinding(){
        return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
    }

    @Bean
    Binding phoneBinding(){
        return BindingBuilder.bind(huawei()).to(topicExchange()).with("#.phone.#");
    }

}

TopicReceiver:

@Component
public class TopicReceiver {
    //监听队列
    @RabbitListener (queues = "xiaomi")
    public void handler1(String msg){
        System.out.println("TopicReceiver>xiaomi》》》"+msg);
    }

    @RabbitListener (queues = "huawei")
    public void handler12(String msg){
        System.out.println("TopicReceiver>huawei》》》"+msg);
    }

    @RabbitListener (queues = "phone")
    public void handler13(String msg){
        System.out.println("TopicReceiver>phone》》》"+msg);
    }
}

DemoApplicationTests:

@SpringBootTest
class DemoApplicationTests {

@Autowired
    RabbitTemplate rabbitTemplate;

@Test
public void test2(){
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米");
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","phone");
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.new","huawei");
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","huaweiphone");
    }
}

headers(首部交换机)

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

其中匹配规则(x-match)又分为all和any,all代表必须所有的键值对匹配,any代表只要有一个键值对匹配即可。headers exchange的默认匹配规则(x-match)是any。

RabbitHeaderConfig:

@Configuration
public class RabbitHeaderConfig {
    //队列名称
    public static final String HEADERNAME="java-Header";
    //交换机绑定队列名称
    @Bean
    HeadersExchange headersExchange(){
        return new HeadersExchange(HEADERNAME,true,false);
    }
    //创建Queue
    @Bean
    Queue queueName(){
        return new Queue("name-queue");
    }

    @Bean
    Queue queueAge(){
        return new Queue("age-queue");
    }

    //通过map来指定
    @Bean
    Binding bindingName(){
        Map<String,Object>map=new HashMap<>();
        map.put("name","java");
        return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();

    }
    //绑定 规则是头部是age开头
    @Bean
    Binding bindingAge(){
        return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();
    }
}

HeaderReceiver:

@Component
public class HeaderReceiver {
    @RabbitListener(queues = "name-queue")
    public void handler1(byte[] msg){
        System.out.println("HeaderReceiver:handler1"+new String(msg,0,msg.length));
    }

    @RabbitListener(queues = "age-queue")
    public void handler2(byte[] msg){
        System.out.println("HeaderReceiver:handler2"+new String(msg,0,msg.length));
    }
}

DemoApplicationTests:

@SpringBootTest
class DemoApplicationTests {

@Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void test3(){
        Message Ageage= MessageBuilder.withBody("hello Ageage".getBytes()).setHeader("name","java").build();
        Message Nameage= MessageBuilder.withBody("hello Nameage".getBytes()).setHeader("age","99").build();
        rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null, Ageage);
        rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null, Nameage);
    }
}

常用命令

启动rabbit服务:service rabbitmq-server start

停止rabbit服务:service rabbitmq-server stop

后台启动:rabbitmq-server -detached

运行状态:rabbitmqctl status

用户管理

查看所有用户:rabbitmqctl list_users

添加用户:rabbitmqctl add_user username password

删除用户:rabbitmqctl delete_user username

修改密码:rabbitmqctl change_password username newpassword

开启rabbit网页控制台

进入rabbit安装目录:cd /usr/lib/rabbitmq

查看已经安装的插件:rabbitmq-plugins list

开启网页版控制台:rabbitmq-plugins enable rabbitmq_management

重启rabbitmq服务

输入网页访问地址:http://localhost:15672/ 使用默认账号:guest/guest登录

大佬文章:

https://www.cnblogs.com/williamjie/p/9481774.html

https://zhuanlan.zhihu.com/p/63700605

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

1 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 294点赞 593865浏览

随机文章
SpringMVC笔记9—文件上传与下载
5年前
Kotlin-类型初步—智能类型转换(十)
4年前
SpringBoot—整合Druid(阿里巴巴数据库连接池)
5年前
Java—字符串操作(String)
5年前
SpringSecurity—OAuth 2(九)第三方应用优化
5年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1927 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 593865 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付