阅读完需:约 15 分钟
何为队列(queue)?
queue在计算机科学中随处可见,Queue是一个存储、组织数据的数据结构,其最大的特性就是FIFO;
rabbitmq中queue是RabbitMQ的内部对象,用于存储消息
何为消息队列(Message queue)?
服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信);
消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯 (异步通信)
何为RabbitMq?
RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言。
前几篇关于RabbitMQ的文章(搜索RabbitMQ):
介绍几个关于RabbitMQ的关键内容:
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。
Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue( 队列 )
Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。

RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
Message acknowledgment( 消息确认 )
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。
如果一个Queue没被任何的Consumer Subscribe(订阅),当有数据到达时,这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer。这个数据被Consumer正确收到时,这个数据就被从Queue中删除。
那么什么是正确收到呢?通过ACK。每个Message都要被acknowledged(确认,ACK)。我们可以显示的在程序中去ACK,也可以自动的ACK。如果有数据没有被ACK,那么RabbitMQ Server会把这个信息发送到下一个Consumer。
Message durability( 持久化 )
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。
交换器持久化可以通过在声明队列时将durable参数设置为true。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器了。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。队列的持久化可以通过在声明队列时将durable参数设置为true。
设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依然存在。如果只设置队列持久化或者消息持久化,重启之后消息都会消失。
当然,也可以将所有的消息都设置为持久化,但是这样做会影响RabbitMQ的性能,因为磁盘的写入速度比内存的写入要慢得多。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。鱼和熊掌不可兼得,关键在于选择和取舍。在实际中,需要根据实际情况在可靠性和吞吐量之间做一个权衡。
Prefetch count
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

Exchange( 交换器 )
在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。

Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。
RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略。
routing key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
RabbitMQ为routing key设定的长度限制为255 bytes。
Binding( 绑定 )
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。

Binding key(应该就是 Queue 的名字)
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
Exchange Types
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述)
下面分别与SpringBoot整合进行介绍:
总目录:

direct( 直接转发路由 )
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。
RabbitDirectConfig:
@Configuration
public class RabbitDirectConfig {
public final static String DIRECTNAME="enmalvi";
@Bean //队列
Queue queue(){
return new Queue("hello.java");
}
//这个可以不需要 交换器和绑定
@Bean //交换器
DirectExchange directExchange(){
return new DirectExchange(DIRECTNAME,true,false);
}
@Bean //绑定
Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
}
}
DirectReceiver:
@Component
public class DirectReceiver {
//队列监听
@RabbitListener(queues = "hello.java")
public void handlerl(String msg) {
System.out.println("handlerl>>>"+msg);
}
}
DemoApplicationTests:
//测试
@SpringBootTest
class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("hello.java","第一次bootRabBitMQ");
}
}

fanout( 复制分发路由)
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 原理是不需要routkey,当exchange收到消息后,将消息复制多份转发给与自己绑定的消息队列

上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。
RabbitFanoutConfig:
@Configuration
public class RabbitFanoutConfig {
public static final String FANOUTNAME="enmalvi-Fanout";
//指定两个队列
@Bean
Queue queueOne(){
return new Queue("queue-one");
}
@Bean
Queue queueTwo(){
return new Queue("queue-two");
}
//交换机绑定队列名称
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUTNAME,true,false);
}
//绑定交换机和Queue
@Bean
Binding bindingOne(){
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
@Bean
Binding bindingTwo(){
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}
FanoutReceiver:
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handlerl(String msg){
System.out.println("Fanout》》》1111"+msg);
}
@RabbitListener(queues = "queue-two")
public void handlerl2(String msg){
System.out.println("Fanout》》》2222"+msg);
}
}
DemoApplicationTests:
@SpringBootTest
class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test1(){
rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"helloFanout");
}
}

topic( 通配路由)
是direct exchange的通配符模式
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
- routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key与routing key一样也是句点号“. ”分隔的字符串
- binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。
RabbitTopicConfig:
@Configuration
public class RabbitTopicConfig {
//队列名称
public static final String TOPICNAME="java-topic";
//交换器绑定队列
@Bean
TopicExchange topicExchange(){
return new TopicExchange(TOPICNAME,true,false);
}
//创建几个Queue队列
@Bean
Queue xiaomi(){
return new Queue("xiaomi");
}
@Bean
Queue huawei(){
return new Queue("huawei");
}
@Bean
Queue phone(){
return new Queue("phone");
}
//绑定交换器和Queue
@Bean
Binding xiangmiBinding(){
return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
}
@Bean
Binding huaweioBinding(){
return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
}
@Bean
Binding phoneBinding(){
return BindingBuilder.bind(huawei()).to(topicExchange()).with("#.phone.#");
}
}
TopicReceiver:
@Component
public class TopicReceiver {
//监听队列
@RabbitListener (queues = "xiaomi")
public void handler1(String msg){
System.out.println("TopicReceiver>xiaomi》》》"+msg);
}
@RabbitListener (queues = "huawei")
public void handler12(String msg){
System.out.println("TopicReceiver>huawei》》》"+msg);
}
@RabbitListener (queues = "phone")
public void handler13(String msg){
System.out.println("TopicReceiver>phone》》》"+msg);
}
}
DemoApplicationTests:
@SpringBootTest
class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test2(){
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","phone");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.new","huawei");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","huaweiphone");
}
}

headers(首部交换机)
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
其中匹配规则(x-match)又分为all和any,all代表必须所有的键值对匹配,any代表只要有一个键值对匹配即可。headers exchange的默认匹配规则(x-match)是any。
RabbitHeaderConfig:
@Configuration
public class RabbitHeaderConfig {
//队列名称
public static final String HEADERNAME="java-Header";
//交换机绑定队列名称
@Bean
HeadersExchange headersExchange(){
return new HeadersExchange(HEADERNAME,true,false);
}
//创建Queue
@Bean
Queue queueName(){
return new Queue("name-queue");
}
@Bean
Queue queueAge(){
return new Queue("age-queue");
}
//通过map来指定
@Bean
Binding bindingName(){
Map<String,Object>map=new HashMap<>();
map.put("name","java");
return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
}
//绑定 规则是头部是age开头
@Bean
Binding bindingAge(){
return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();
}
}
HeaderReceiver:
@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void handler1(byte[] msg){
System.out.println("HeaderReceiver:handler1"+new String(msg,0,msg.length));
}
@RabbitListener(queues = "age-queue")
public void handler2(byte[] msg){
System.out.println("HeaderReceiver:handler2"+new String(msg,0,msg.length));
}
}
DemoApplicationTests:
@SpringBootTest
class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test3(){
Message Ageage= MessageBuilder.withBody("hello Ageage".getBytes()).setHeader("name","java").build();
Message Nameage= MessageBuilder.withBody("hello Nameage".getBytes()).setHeader("age","99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null, Ageage);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null, Nameage);
}
}

常用命令
启动rabbit服务:service rabbitmq-server start
停止rabbit服务:service rabbitmq-server stop
后台启动:rabbitmq-server -detached
运行状态:rabbitmqctl status
用户管理
查看所有用户:rabbitmqctl list_users
添加用户:rabbitmqctl add_user username password
删除用户:rabbitmqctl delete_user username
修改密码:rabbitmqctl change_password username newpassword
开启rabbit网页控制台
进入rabbit安装目录:cd /usr/lib/rabbitmq
查看已经安装的插件:rabbitmq-plugins list
开启网页版控制台:rabbitmq-plugins enable rabbitmq_management
重启rabbitmq服务
输入网页访问地址:http://localhost:15672/ 使用默认账号:guest/guest登录
大佬文章:
https://www.cnblogs.com/williamjie/p/9481774.html
https://zhuanlan.zhihu.com/p/63700605