RabbitConfig?
/**
* @author fan
* @date 2022年04月27日 11:17
*/
@EnableRabbit
@Configuration
public class RabbitConfig {
/**创建扇形交换机开始*/
//测试队列名称
private String fanoutQueueName = "fanoutQueue";
// 测试交换机名称
private String fanoutExchange = "fanoutExchange";
// RoutingKey 路由键无需配置,配置也不起作用
//private String fanoutRoutingKey = "fanoutRoutingKey";
/** 创建队列 */
@Bean
public Queue fanoutQueue() {
return new Queue(fanoutQueueName);
}
/** 创建交换机 */
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(fanoutExchange);
}
/** 通过routingKey把队列与交换机绑定起来 */
@Bean
public Binding fanoutExchangeBinding() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
/**结束创建扇形交换机(Fanout exchange)
*直连交换机(Direct exchange): 具有路由功能的交换机,绑定到此交换机的时候需要指定一个 routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列
*扇形交换机(Fanout exchange): 广播消息到所有队列,没有任何处理,速度最快
*主题交换机(Topic exchange): 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,`*`代表一个单词,`#`代表多个单词
*首部交换机(Headers exchange): 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则
*/
}
producer
/**
* @author fan
* @date 2022年04月27日 15:01
*/
@Component
public class MyProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送JSON类型
* @author fan
* @date 2022/4/27 15:39
* @param queueName 队列名称
* @return java.lang.String
*/
public String sendjsonObject(String queueName) {//构造前端回传参数
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "Icomxchishi@qq.com");
jsonObject.put("phone", "03803880388");
jsonObject.put("age", 15);
jsonObject.put("sex","女");
jsonObject.put("name", "messageData");
jsonObject.put("data", Arrays.asList("张三","12345678",12,"男"));
jsonObject.put("timestamp", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
String messageId = String.valueOf(UUID.randomUUID());
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setDeliveryTag(System.currentTimeMillis())
.setContentEncoding("utf-8")
.setMessageId(messageId)
.build();
rabbitTemplate.convertAndSend(queueName, message);
return "sendjsonObject";
}
/**
* 发送map类型数据
* @author fan
* @date 2022/4/27 18:33
* @param queueName 队列名称
* @return java.lang.String
*/
public String sendMessageMap(String queueName) {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "my message!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
map.put("msg","随便");
map.put("data", Arrays.asList("张三","12345678",12,"男"));
rabbitTemplate.convertAndSend(queueName, map);
return "sendMapMessage";
}
}
consumer,以sendjsonObject();为例
@Slf4j
@Component
public class MyConsumer {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RedisClient redisClient;//本人封装的,可以使用redisTemplate
@Autowired
private UserService userService;
@Autowired
private LoggerService loggerService;
/** 消费者消费成功后,再给MQ确认的时候出现了网络波动
* MQ没有接收到确认,为了保证消息被消费,RabbitMQ就会继续给消费者重新投递之前的消息
* 消费者就接收到了两条一样的消息
* 重复消息可能是网络波动原因造成的或其它原因,导致不可避免的重复消息
* 如何保证消息幂等性??
* 解决:消费者先获取到消息后在根据messageId去查询redis是否存在该消息
* 若不存在,则正常消费,消费完毕后写入redis
* 若存在,则消息已经被消费过,就直接丢弃
* @author fan
* @date 2022/4/27 16:28
* @param message
*/
@RabbitListener(queues = "fanoutQueue")
public void sendjsonObject(Message message, Channel channel) throws Exception{//消费者消息重复
log.info("MyConsumer 消费者收到消息:{}" , JSONObject.toJSONString(message));
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
String messageIdRedis = //(String) redisTemplate.opsForValue().get("messageId");
redisClient.getString("messageId");
if (messageId != null) {
if (messageId.equals(messageIdRedis)) {
return;
}
}
System.out.println("number==" + channel.getChannelNumber());
JSONObject jsonObject = JSONObject.parseObject(msg);
long deliverTag = message.getMessageProperties().getDeliveryTag();
try {
//具体业务,如:插入或修改user,这里用数据库可以不用redis做判断
Integer age = (Integer) jsonObject.get("age");
String name = (String) jsonObject.get("name");
String sex = (String) jsonObject.get("sex");
LocalDateTime localDateTime = LocalDateTime.now();
Date dataTime = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
User user = new User();
user.setSex(sex);
user.setName(name);
user.setDataTime(dataTime);
user.setAge(age);
int i = 0;
if (user != null){
i = userService.savetUser(user);//新增或修改
redisClient.setKey(user.getId() + "", user, 5_000L);// 写入缓存,便于操作延时业务
//redisTemplate.opsForList().rightPushAll(user.getId(), Arrays.asList(user));
//loggerService.saveLog(jsonObject);//日志记录
}
log.info("收到消息obj:" + user + ",id=" + user.getId());
log.info("保存了" + i + "记录");
channel.basicAck(deliverTag,true);//手动设置ack,消费成功,确认消息
}catch (Exception e){
try {
//异常返回false,就重新回到队列
channel.basicNack(deliverTag, false, true);
} catch (Exception ioException) {
log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e);
}
log.error("TopicConsumer消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e);
//channel.basicReject(deliverTag, false);
}
//redisTemplate.opsForValue().set(messageId, messageId, 30, TimeUnit.SECONDS);//时间具体根据业务定
redisClient.setKey(messageId,messageId,3_000L);// 写入缓存
log.info("消费消息jsonObject:" + jsonObject + ",messageId:" + messageId);
}
@RabbitListener(queues = "fanoutQueue")
public void sendMessageMap(Map map){
log.info("收到消息map:" + map.toString());
}
}
ProducerController
@RestController
@RequestMapping(value = "producer")
public class ProducerController {
@Autowired
private MyProducer myProducer;
@RequestMapping("/sendjsonObject")
public String sendjsonObject() {//前端传参数回来在producer那里模拟了,这里就略
return myProducer.sendjsonObject("fanoutQueue");
}
@GetMapping("/sendMessageMap")
public String sendMessageMap() {
return myProducer.sendMessageMap("fanoutQueue");
}
}
application.yml
server:
port: 8081
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: root
password:
dbcp2:
initial-size: 5
max-total: 5
max-wait-millis: 200
min-idle: 5
redis:
database: 0
host: 127.0.0.1
port: 6379
password:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-returns: true
publisher-confirms: true
connection-timeout: 5000
#另外一种打印语句的方式
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
logging:
level:
com:
acong:
dao: debug
file: d:/logs/redis.log
效果图:
|