参考目录
隐藏
阅读完需:约 7 分钟
redis消息队列适合轻量级高并发的情况,比如秒杀,及时数据分析等。
首先来看一下redis在SpringMvc上的配置,这个相比起Boot的配置坑会比较多!
SpringMvc版
1.依赖配置
2.<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
一.使用xml进行配置
xml进行配置JedisPoolConfig、JedisConnectionFactory、Spring RedisTemplate
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 配置JedisPoolConfig-->
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="50"/>
<property name="maxTotal" value="100"/>
<property name="maxWaitMillis" value="20000"/>
</bean>
<!--配置JedisConnectionFactory-->
<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="localhost"/>
<property name="port" value="6379"/>
<property name="poolConfig" ref="poolConfig"/>
</bean>
<!--使用字符串进行序列化-->
<bean id="stringReadisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
<!--使用JDK的序列化器进行转化-->
<bean id="jdkSerializationRedisSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>
<!--配置Spring RedisTemplate-->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="keySerializer" ref="stringReadisSerializer"/>
<property name="valueSerializer" ref="stringReadisSerializer"/>
</bean>
</beans>
这里最坑的地方是配置redis的序列化,因为SDR默认采用的序列化策略有两种,一种是String的序列化策略,一种是JDK的序列化策略。StringRedisTemplate默认采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的。RedisTemplate默认采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。就是因为序列化策略的不同,即使是同一个key用不同的Template去序列化,结果是不同的。所以根据key去删除数据的时候就出现了删除失败的问题。
比如:
<!-- redis客户端模板 -->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<!-- 注入连接工厂 -->
<property name="connectionFactory" ref="connectionFactory" />
<!-- 配置key序列化类 -->
<property name="keySerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<!-- 配置value序列化类 -->
<property name="valueSerializer">
<bean
class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
</property>
在这个xml配置里两个序列化不一样,对接redis的时候就会出现反序列化错误的情况。
2.使用
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("application.xml");
RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class);
redisTemplate.opsForValue().set("key1","value1");
redisTemplate.opsForValue().set("key2","value2");
String value1 = (String) redisTemplate.opsForValue().get("key1");
System.out.println(value1);
二.使用java方式
package com.wbg.mr.spring;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public JedisConnectionFactory jedisConnectionFactory(){
JedisConnectionFactory jcf = new JedisConnectionFactory();
jcf.setHostName("localhost");
return jcf;
}
@Bean
public RedisTemplate redisTemplate(){
RedisTemplate rt = new RedisTemplate();
rt.setConnectionFactory(jedisConnectionFactory());
rt.setKeySerializer(new StringRedisSerializer());
rt.setValueSerializer(new StringRedisSerializer());
return rt;
}
}
2.使用
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(RedisConfig.class);
RedisConfig redisConfig = applicationContext.getBean(RedisConfig.class);
RedisTemplate redisTemplate = redisConfig.redisTemplate();
redisTemplate.opsForValue().set("key11","value11");
redisTemplate.opsForValue().set("key12","value12");
String value11 = (String) redisTemplate.opsForValue().get("key11");
System.out.println(value11);
SpringBoot就只需导入依赖和配置一下就可以了。
第一种消息队列:
1.注册StringRedisTemplate
/**
* 注册redisTemplate,作为消息队列的发布者
*/
@Configuration
public class PublisherConfig {
@Bean
public StringRedisTemplate getRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
return new StringRedisTemplate(redisConnectionFactory);
}
}
2.消息生产者,注入redisTemplate,用convertAndSend发送消息
@Service
public class PublisherService {
@Autowired
private StringRedisTemplate redisTemplate;
public String sendMessage(String name) {
try {
redisTemplate.convertAndSend("TOPIC_USERNAME", name);
return "消息发送成功了";
} catch (Exception e) {
e.printStackTrace();
return "消息发送失败了";
}
}
}
3.在controller中注入service,请求时发送消息
@RestController
@RequestMapping("publisher")
public class PublisherController {
@Autowired
private PublisherService publisherService;
@RequestMapping("{name}")
public String sendMessage(@PathVariable("name") String name) {
return publisherService.sendMessage(name);
}
}
3.消费者:创建一个接收消息的类,继承MessageListener,也可以不继承
继承:
@Component
public class Receiver implements MessageListener {
private static Logger logger = LoggerFactory.getLogger(Receiver.class);
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
RedisSerializer<String> valueSerializer = redisTemplate.getStringSerializer();
String deserialize = valueSerializer.deserialize(message.getBody());
logger.info("收到的mq消息" + deserialize);
}
}
不继承:
@Component
public class Receiver {
private static Logger logger = LoggerFactory.getLogger(Receiver.class);
public void receiveMessage(String message) {
logger.info("收到的mq消息" + message);
}
}
4.消息订阅者配置类
@Configuration
@AutoConfigureAfter({Receiver.class})
public class SubscriberConfig {
/**
* 消息监听适配器,注入接受消息方法,输入方法名字 反射方法
*
* @param receiver
* @return
*/
@Bean
public MessageListenerAdapter getMessageListenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage"); //当没有继承MessageListener时需要写方法名字
}
/**
* 创建消息监听容器
*
* @param redisConnectionFactory
* @param messageListenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("TOPIC_USERNAME"));
return redisMessageListenerContainer;
}
}
消费者和生产者配置已经完成了,启动springboot程序,用postman请求controller方法就可以了。
第二种消息队列:
在xml里面配置监听器:
1.springmvc-redis.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="${redis.maxIdle}" />
<property name="maxTotal" value="${redis.maxActive}" />
<property name="MaxWaitMillis" value="${redis.maxWait}" />
<property name="testOnBorrow" value="${redis.testOnBorrow}" />
</bean>
<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}" p:pool-config-ref="poolConfig">
</bean>
<!-- redis客户端模板 -->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<!-- 注入连接工厂 -->
<property name="connectionFactory" ref="connectionFactory" />
<!-- 配置key序列化类 -->
<property name="keySerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<!-- 配置value序列化类 -->
<property name="valueSerializer">
<bean
class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
</property>
</bean>
<!-- 定义监听容器 -->
<bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy">
<property name="connectionFactory" ref="connectionFactory"/>
<!-- 任务执行器 -->
<property name="taskExecutor">
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="10"/>
</bean>
</property>
<!-- 消息监听器 -->
<property name="messageListeners">
<map>
<entry key-ref="redisSubscribeDaoImpl">
<list>
<!-- 监听通道匹配 -->
<bean class="org.springframework.data.redis.listener.PatternTopic">
<constructor-arg value="channel*" />
</bean>
</list>
</entry>
</map>
</property>
</bean>
</beans>
2.生产者
package com.yllt.site.redis.impl;
import com.yllt.site.redis.IRedisPublishDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Repository;
/**
*
* @author asus
* @date 2019/8/17
*/
@Repository
public class RedisPublishDaoImpl implements IRedisPublishDao {
private static Logger logger = LoggerFactory.getLogger(RedisPublishDaoImpl.class);
@Autowired
private RedisTemplate<String, Object> redisTemplate;
static String[] channels = null;
static {
if(channels == null){
channels = new String[]{"channel1", "channel2", "channel3", "channel4", "channel5"};
}
}
@Override
public void publishMessage(String message) {
logger.info("消息发布开始..." + message);
redisTemplate.convertAndSend("channel1", message);
}
}
3.消费者
package com.yllt.site.redis.impl;
import com.alibaba.fastjson.JSONObject;
import com.yllt.common.util.CollectionUtil;
import com.yllt.site.domain.Camera;
import com.yllt.site.domain.Trajectory;
import com.yllt.site.service.ICameraService;
import com.yllt.site.service.ITrajectoryService;
import com.yllt.site.util.LatLngUtil;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.awt.geom.Point2D;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 消息订阅者
* @author asus
* @date 2019/8/17
*/
@Service("redisSubscribeDaoImpl")
public class RedisSubscribeDaoImpl implements MessageListener {
private static Logger logger = LoggerFactory.getLogger(RedisSubscribeDaoImpl.class);
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
logger.info("消息订阅开始...");
byte[] body = message.getBody();
String msgBody = (String) redisTemplate.getValueSerializer().deserialize(body);
logger.info("接收到消息内容:{}" + msgBody);
}
}