User-Profile-Image
hankin
  • 5
  • Java
  • Kotlin
  • Spring
  • Web
  • SQL
  • MegaData
  • More
  • Experience
  • Enamiĝu al vi
  • 分类
    • Zuul
    • Zookeeper
    • XML
    • WebSocket
    • Web Notes
    • Web
    • Vue
    • Thymeleaf
    • SQL Server
    • SQL Notes
    • SQL
    • SpringSecurity
    • SpringMVC
    • SpringJPA
    • SpringCloud
    • SpringBoot
    • Spring Notes
    • Spring
    • Servlet
    • Ribbon
    • Redis
    • RabbitMQ
    • Python
    • PostgreSQL
    • OAuth2
    • NOSQL
    • Netty
    • MySQL
    • MyBatis
    • More
    • MinIO
    • MegaData
    • Maven
    • LoadBalancer
    • Kotlin Notes
    • Kotlin
    • Kafka
    • jQuery
    • JavaScript
    • Java Notes
    • Java
    • Hystrix
    • Git
    • Gateway
    • Freemarker
    • Feign
    • Eureka
    • ElasticSearch
    • Docker
    • Consul
    • Ajax
    • ActiveMQ
  • 页面
    • 归档
    • 摘要
    • 杂图
    • 问题随笔
  • 友链
    • Spring Cloud Alibaba
    • Spring Cloud Alibaba - 指南
    • Spring Cloud
    • Nacos
    • Docker
    • ElasticSearch
    • Kotlin中文版
    • Kotlin易百
    • KotlinWeb3
    • KotlinNhooo
    • 前端开源搜索
    • Ktorm ORM
    • Ktorm-KSP
    • Ebean ORM
    • Maven
    • 江南一点雨
    • 江南国际站
    • 设计模式
    • 熊猫大佬
    • java学习
    • kotlin函数查询
    • Istio 服务网格
    • istio
    • Ktor 异步 Web 框架
    • PostGis
    • kuangstudy
    • 源码地图
    • it教程吧
    • Arthas-JVM调优
    • Electron
    • bugstack虫洞栈
    • github大佬宝典
    • Sa-Token
    • 前端技术胖
    • bennyhuo-Kt大佬
    • Rickiyang博客
    • 李大辉大佬博客
    • KOIN
    • SQLDelight
    • Exposed-Kt-ORM
    • Javalin—Web 框架
    • http4k—HTTP包
    • 爱威尔大佬
    • 小土豆
    • 小胖哥安全框架
    • 负雪明烛刷题
    • Kotlin-FP-Arrow
    • Lua参考手册
    • 美团文章
    • Java 全栈知识体系
    • 尼恩架构师学习
    • 现代 JavaScript 教程
    • GO相关文档
    • Go学习导航
    • GoCN社区
    • GO极客兔兔-案例
    • 讯飞星火GPT
    • Hollis博客
    • PostgreSQL德哥
    • 优质博客推荐
    • 半兽人大佬
    • 系列教程
    • PostgreSQL文章
    • 云原生资料库
    • 并发博客大佬
Help?

Please contact us on our email for need any support

Support
    首页   ›   Web   ›   ActiveMQ   ›   正文
ActiveMQ

ActiveMQ的使用方法

2020-04-19 01:30:55
1150  0 0
参考目录 隐藏
1) 1.Queue
2) Producer
3) Consumer
4) 2.Topic
5) Producer
6) Consumer
7) 注意:

阅读完需:约 7 分钟

关于ActiveMQ 的详细内容:

ActiveMQ—入门(消息中间件)

1.Queue

Producer

生产者:生产消息,发送端。

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.11.2</version>
</dependency>
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
public void testQueueProducer() throws Exception {
		//1、创建一个连接工厂对象,需要指定服务的ip及端口。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
		//2、使用工厂对象创建一个Connection对象。
		Connection connection = connectionFactory.createConnection();
		//3、开启连接,调用Connection对象的start方法。
		connection.start();
		//4、创建一个Session对象。
		//第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
		//第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
		Queue queue = session.createQueue("test-queue");
		//6、使用Session对象创建一个Producer对象。
		MessageProducer producer = session.createProducer(queue);
		//7、创建一个Message对象,可以使用TextMessage。
		/*TextMessage textMessage = new ActiveMQTextMessage();
		textMessage.setText("hello Activemq");*/
		TextMessage textMessage = session.createTextMessage("hello activemq");
		//8、发送消息
		producer.send(textMessage);
		//9、关闭资源
		producer.close();
		session.close();
		connection.close();
	}

Consumer

消费者:接收消息。

第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源

public void testQueueConsumer() throws Exception {
		//创建一个ConnectionFactory对象连接MQ服务器
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
		//创建一个连接对象
		Connection connection = connectionFactory.createConnection();
		//开启连接
		connection.start();
		//使用Connection对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//创建一个Destination对象。queue对象
		Queue queue = session.createQueue("test-queue");
		//使用Session对象创建一个消费者对象。
		MessageConsumer consumer = session.createConsumer(queue);
		//接收消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				//打印结果
				TextMessage textMessage = (TextMessage) message;
				String text;
				try {
					text = textMessage.getText();
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
				
			}
		});
		//等待接收消息
		System.in.read();
		//关闭资源
		consumer.close();
		session.close();
		connection.close();
	}

2.Topic

Producer

使用步骤:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。

@Test
	public void testTopicProducer() throws Exception {
		//1、创建一个连接工厂对象,需要指定服务的ip及端口。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
		//2、使用工厂对象创建一个Connection对象。
		Connection connection = connectionFactory.createConnection();
		//3、开启连接,调用Connection对象的start方法。
		connection.start();
		//4、创建一个Session对象。
		//第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
		//第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
		Topic topic = session.createTopic("test-topic");
		//6、使用Session对象创建一个Producer对象。
		MessageProducer producer = session.createProducer(topic);
		//7、创建一个Message对象,可以使用TextMessage。
		/*TextMessage textMessage = new ActiveMQTextMessage();
		textMessage.setText("hello Activemq");*/
		TextMessage textMessage = session.createTextMessage("topic message");
		//8、发送消息
		producer.send(textMessage);
		//9、关闭资源
		producer.close();
		session.close();
		connection.close();
	}

Consumer

消费者:接收消息。

第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
@Test
	public void testTopicConsumer() throws Exception {
		//创建一个ConnectionFactory对象连接MQ服务器
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
		//创建一个连接对象
		Connection connection = connectionFactory.createConnection();
		//开启连接
		connection.start();
		//使用Connection对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//创建一个Destination对象。topic对象
		Topic topic = session.createTopic("test-topic");
		//使用Session对象创建一个消费者对象。
		MessageConsumer consumer = session.createConsumer(topic);
		//接收消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				//打印结果
				TextMessage textMessage = (TextMessage) message;
				String text;
				try {
					text = textMessage.getText();
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
				
			}
		});
		System.out.println("topic消费者3启动。。。。");
		//等待接收消息
		System.in.read();
		//关闭资源
		consumer.close();
		session.close();
		connection.close();
	}

注意:

1.  session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
   // 其中:Boolean.FALSE表示本次会话不开启事务管理,假如需要开启事务管理,将其改为Boolean.TRUE即可
   //同时需要在发送消息后添加session.commit(),否则,消息是不会被提交的.
   //Session.AUTO_ACKNOWLEDGE表示消息确认机制
	AUTO_ACKNOWLEDGE:自动确认
	CLIENT_ACKNOWLEDGE:客户端确认
	SESSION_TRANSACTED:事务确认,如果使用事务推荐使用该确认机制
	AUTO_ACKNOWLEDGE:懒散式确认,消息偶尔不会被确认,也就是消息可能会被重复发送.但发生的概率很小
2.  connection.start();
   //在消息接收端,接受消息前需要加入这段代码,开启连接,否则一样无法获取消息.
3.  Destination destination = session.createQueue("myQueue");
	//创建目的地时,如果做测试收不到信息,可以将目的地名称修改一下,我用的是IDEA,不清楚为何,
	//有时候收不到信息,修改一下就好了,猜测可能是缓存的原因吧

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

0 打赏
Enamiĝu al vi
不要为明天忧虑.因为明天自有明天的忧虑.一天的难处一天当就够了。
543文章 68评论 294点赞 594169浏览

随机文章
MyBatis—一对多和多对一处理(另一个参考)
5年前
Java—并发编程—自旋锁、排队自旋锁、MCS锁、CLH锁
4年前
Java—类加载内存
5年前
Spring—JdbcTemplate模板类的使用(补充)
5年前
java—stream()方法的使用(java8)
5年前
博客统计
  • 日志总数:543 篇
  • 评论数目:68 条
  • 建站日期:2020-03-06
  • 运行天数:1927 天
  • 标签总数:23 个
  • 最后更新:2024-12-20
Copyright © 2025 网站备案号: 浙ICP备20017730号 身体没有灵魂是死的,信心没有行为也是死的。
主页
页面
  • 归档
  • 摘要
  • 杂图
  • 问题随笔
博主
Enamiĝu al vi
Enamiĝu al vi 管理员
To be, or not to be
543 文章 68 评论 594169 浏览
测试
测试
看板娘
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付