一、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
二、配置
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.password=123456
spring.redis.database=0
# 连接超时时间(毫秒)
spring.redis.timeout=10000
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=300
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=5
# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=100
三、监听代码
package com.china.system.service;
import org.springframework.data.redis.connection.MessageListener;
/**
* Redis 订阅
*
* @author songjy
*/
public interface RedisSubscribeService extends MessageListener {
}
实现类
package com.china.system.service.impl;
import com.china.system.service.RedisSubscribeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* redis 订阅:https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/#pubsub
*
* @author songjy
*/
@Component
@Slf4j
public class RedisSubscribeServiceImpl extends MessageListenerAdapter implements RedisSubscribeService {
@Override
public void onMessage(@Nullable Message message, byte[] bytes) {
if (Objects.isNull(message)) {
log.error("消息为空");
return;
}
String channel = new String(message.getChannel());
String body = new String(message.getBody());
log.info("订阅频道:{}{}消息:{}", channel, System.lineSeparator(), body);
}
}
四、监听配置
package com.china.system.config;
import com.china.system.service.RedisSubscribeService;
import com.google.common.collect.Lists;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* @author songjy
*/
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
RedisSubscribeService redisSubscribeService) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
redisMessageListenerContainer.addMessageListener(redisSubscribeService, Lists.newArrayList(
ChannelTopic.of("song"),
ChannelTopic.of("jian"),
ChannelTopic.of("yong")
));
return redisMessageListenerContainer;
}
}
五、消息发布
package com.china.system.service;
/**
* Redis 发布
*
* @author songjy
*/
public interface RedisPublishService {
/**
* 发布消息
*
* @param channel 频道
* @param message 消息
*/
void sendMessage(String channel, String message);
}
实现类
package com.china.system.service.impl;
import com.china.system.service.RedisPublishService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
* @author songjy
*/
@Component
@Slf4j
public class RedisPublishServiceImpl implements RedisPublishService {
private StringRedisTemplate stringRedisTemplate;
@Autowired
public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public void sendMessage(String channel, String message) {
if (StringUtils.isBlank(message)) {
return;
}
stringRedisTemplate.convertAndSend(channel, message);
log.info("频道【{},{}】消息已发布", channel, message);
}
}
六、消息发布订阅单元测试
package com.china.system.service.impl;
import com.china.system.RedisDemoApplication;
import com.china.system.service.RedisPublishService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.TimeUnit;
@SpringBootTest(classes = {RedisDemoApplication.class})
@RunWith(SpringRunner.class)
@Transactional
@Slf4j
@Data
@ActiveProfiles("beta")
public class RedisPublishServiceImplTests {
private RedisPublishService redisPublishService;
@Autowired
public void setRedisPublishService(RedisPublishService redisPublishService) {
this.redisPublishService = redisPublishService;
}
@Test
public void sendMessageTest() throws InterruptedException {
redisPublishService.sendMessage("song", "song" + System.currentTimeMillis());
redisPublishService.sendMessage("jian", "jian" + System.currentTimeMillis());
redisPublishService.sendMessage("yong", "yong" + System.currentTimeMillis());
log.info("消息发布完毕");
TimeUnit.SECONDS.sleep(5L);
}
}
|