阅读完需:约 4 分钟
最简单的使用(总目录):

1.先写配置文件
spring.activemq.broker-url=tcp://127.0.0.1:61616 //地址
spring.activemq.packages.trust-all=true
spring.activemq.user=admin //用户名
spring.security.user.password=admin //密码
2.Message的bean
public class Message implements Serializable {
private String content;
private Date SendDate;
@Override
public String toString() {
return "Message{" +
"content='" + content + '\'' +
", SendDate=" + SendDate +
'}';
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Date getSendDate() {
return SendDate;
}
public void setSendDate(Date sendDate) {
SendDate = sendDate;
}
}
3.编写JmsComponent
@Component
public class JmsComponent {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Queue queue;
public void send(Message message){
jmsMessagingTemplate.convertAndSend(this.queue,message);
}
@JmsListener(destination = "hello.java")
public void receive(Message msg){
System.out.println(msg);
}
}
4.启动
@SpringBootApplication
public class ActiveApplication {
public static void main(String[] args) {
SpringApplication.run(ActiveApplication.class, args);
}
@Bean
Queue queue(){
return new ActiveMQQueue("hello.java");
}
}
5.测试
@SpringBootTest
class ActiveApplicationTests {
@Autowired
JmsComponent jmsComponent;
@Test
void contextLoads() {
Message message=new Message();
message.setContent("hello javaaa");
message.setSendDate(new Date());
jmsComponent.send(message);
}
}
结果:

第二个例子:
说明:这里的生产者生产消息采用浏览器访问方式,方便控制消息生产和消费,当然也可以单独起一个线程自动生产消息。
1、yml配置文件
server:
port: 8081
spring:
#如果是点对点(queue),那么此处默认应该是false,如果发布订阅,那么一定设置为true
jms:
pub-sub-domain: true
activemq:
user: admin
password: admin
#定义ActivMQ的连接地址
broker-url: tcp://127.0.0.1:61616
#mq连接池
pool:
enabled: true
max-connections: 8
#空闲的连接过期时间,默认为30秒
idle-timeout: 30000
#定义队列名称
queueName: myQueue
#定义主题名称
topicName: myTopic
2、pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.example</groupId>
<artifactId>activemq</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<groupId>com.example</groupId>
<artifactId>jmsdemo</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
点对点模式(开启点对点模式,请把yml文件的pub-sub-domain设为false)
访问路径:127.0.0.1:8081/sendToQueue?msg=要发送的消息
1、生产者
@RestController
public class QueueProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("sendToQueue")
public void sendMessage(String msg, HttpServletResponse response) throws IOException {
//队列名称
ActiveMQQueue queue = new ActiveMQQueue("myQueue");
//发送消息
jmsMessagingTemplate.convertAndSend(queue, msg);
System.out.println("客户端发送消息成功");
response.getWriter().write("success");
}
}
2、消费者
@Component
public class QueueConsumer {
// @Autowired
// private JmsMessagingTemplate jmsMessagingTemplate;
// 使用JmsListener配置消费者监听的队列
@JmsListener(destination = "myQueue")
public void handleMessage(String msg) {
System.out.println("服务端成功接收queue消息成功:" + msg);
}
/*
@JmsListener(destination = "myQueue")
// SendTo 会将此方法返回的数据, 写入到 OutQueue 中去.
@SendTo("outQueue")
public String handleMessage2(String msg) {
System.out.println("成功接受msg" + msg);
return "成功接受msg" + msg;
}
*/
}
主题-发布订阅模式
访问路径:127.0.0.1:8081/sendToTopic?msg=要发送的消息
1、生产者
@RestController
public class TopicProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("sendToTopic")
public void sendMessage(String msg, HttpServletResponse response) throws IOException {
//队列名称
ActiveMQTopic topic = new ActiveMQTopic("myTopic");
//发送消息
jmsMessagingTemplate.convertAndSend(topic, msg);
System.out.println("客户端发送消息成功");
response.getWriter().write("success");
}
}
2、消费者
@Component
public class TopicConsumer {
// 使用JmsListener配置消费者监听的队列
@JmsListener(destination = "myTopic")
public void handleMessage1(String msg) {
System.out.println("消费者1成功接收topic消息成功:" + msg);
}
// 使用JmsListener配置消费者监听的队列
@JmsListener(destination = "myTopic")
public void handleMessage2(String msg) {
System.out.println("消费者2成功接收topic消息成功:" + msg);
}
}
}
消息可靠保证机制
- 事务控制模式
生产者:
//生产者必须要将消息提交事务,才可以提交到队列中
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
...此处省略中间代码...
// 发送消息
producer.send(message);
//手动提交事务
session.commit();
消费者:
Session session = createConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
...此处省略中间代码...
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
session.commit();
- 手动签收模式
生产者:开启手动签收,在消费者接受完消息后手动签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
消费者:
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
message.acknowledge();
- 消息持久化
//若需要持久化,只需要在生产者设置
// 创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 持久化消息,当MQ关闭后,消息会保存成文件(在acitvemq的data目录中)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);