RabbitMQ简单复习:
一.架构:
1. Publisher连接->? Broker(Server)-> VHost->Connection-> channel->Exchange->Queu<-Exchange<-channel<- VHost<- Broker<- Consumer
(1) RPC: remote process call远程调用
(2) 依赖: amqp-client
(3) JAVA客户端口: 5672; 图形化界面客户端: 15672
(4) 生产者发布消息-> 交换机路由到队列-> 被消费者监听
(5) Virtual Host: 虚拟主机
(6) Admin: add user-> /test-> administrator-> virtual host /test
2.通讯方式: 五种
(1) Hello-World模式: 1个生产者+1个消费者+1个默认交换机+1个队列
public class Publisher{//生产者
@Test
public void publish() throws IOException{
//连接
Connection connection=RabbitMQClient.getConnection();
//信道
Channel channel=connection.createChannel();
//默认交换机, 路由规则, 消息属性, 消息体byte[]
channel.basicPublish("exchange", "routingKey", BasicProperties, msg.gtBytes());
channel.close(); //底层io流
connection.close();
}
}
(5) public class Consumer{ //消费者
@Test
public void consume() throws IOException{
Connection connection=RabbitMQClient.getConnection();
Channel channel=connection.createChannel();
//队列, 持久化(重启后删除), 排外只能一个消费者, autoDelete, arguments其他信息
channel.queueDeclare("queue", durable, false, false, null);
com.rabbitmq.client.Consumer consumer=new DefaultConsumer(channel){//开启监听
@override //处理消息
public void handleDelivery(String consumerTag, Envelop envelope, AMQP.BasicPuroperties properties, byte[] body) throws IOException{
System.out.println(new String(body, charsetName="utf-8"));
}
};
channel.basicConsume("queue", true, consumer);
channel.close(); //底层io流
connection.close();
}
}
(2) Work模式: 分摊
1个生产者+2个消费者+1个默认交换机+2个队列
Publisher.java + Consumer1.java + Consumer2.java
(3) Publish/Subscribe广播模式: 群发
1个生产者+2个消费者+1个指定交换机+2个指定队列
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
channel.queueBind("pubsub-queue","pubsub-exchange");
(4) Router路由模式: 匹配路由键
1个生产者+2个消费者+1个指定交换机+2个指定队列
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind("routing-queue","routing-exchange", "Error");
channel.basicPublish("routing-exchange", "ERROR", null, "ERROR".getBytes());
(5) Topic话题模式: 通配路由键 *匹配一个单词, #匹配多个
1个生产者+2个消费者+2个指定队列+2个指定交换机
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topic-queue","topic-exchange", "*.read.#");
channel.basicPublish("topic-exchange", "fast.red.A", "msg".getBytes());
3. Spring-rabbitTemplate:??
(1) 创建springboot工程, 勾选Spring-Web依赖 spring-boot-starter-web
(2) 依赖: spring-boot-starter-amqp //starter启动器, 起步配置
(3) yml配置:
spring:
rabbitmq:
host: 10.20.159.25
port: 5672
username: test
password: test
virtual-host: /test
(4) 配置类 RabbitMQConfig.java
@Configuration //表明是配置类, 配置文件
public class RabbitMQConfig {
//1. 创建exchange - topic
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("boot-topic-exchange",true,false);
}
//2. 创建queue
@Bean
public Queue getQueue(){
return new Queue("boot-queue",true,false,false,null);
}
//3. 绑定在一起
@Bean
public Binding getBinding(TopicExchange topicExchange, Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
}
(5) SpringBoot整合MQ: Consumer.java
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue") //监听配置注解
public void getMessage(Object message){
System.out.println("接收到消息:" + message);
}
}
(6) 单元测试 SpringBootRabbitmqApplicationTest.java
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
}
}
(7) 父类交换机AbstractExchange:
1) CustomExchange: Hello-world自定义模式
?) Work分摊模式: 也属于自定义
2) FanoutExchange: publish/subscribe广播模式
3) DirectExchange: Routing路由模式
3) TopicExchange: Topic话题模式
3. ACK机制 : Message-Acknowledged机制
(1) Msg和Message:
String msg: 字符串
Message message: 对象封装方法
message.getMessageProperties().getDeliveryTag(),false);
(2) 手动ACK:消费者做完后, 如果自动ACK容易造成消息丢失
(3) 配置yml:
listener:
simple:
acknowledge-mode: manual
(4) 监听方法: Consumer.java
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息:" + msg);
//int i = 1 / 0;
//手动ack
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
4. 生产者确认机制: Publisher-Confirm
(1) MQ宕机-> 消息丢失-> 持久化Queue
(2) RabbitMQ提供事务和Confirm机制:
1)普通confirm: 信道发送UID返回publisher
//3.1 开启confirm
channel.confirmSelect();
//3.2 发送消息
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
//3.3 判断消息发送是否成功
if(channel.waitForConfirms()){
System.out.println("消息发送成功");
}else{
System.out.println("发送消息失败");
}
2)批量confirm:
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = "Hello-World!" + i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 确定批量操作是否成功
channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
3)异步confirm
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = "Hello-World!" + i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
}
});
5.? Return机制来监听消息是否从exchange送到了指定的queue中
// 开启return机制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 当消息没有送达到queue时,才会回调执行。
System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
}
});
// 在发送消息时,换另外一个,方法重载,指定mandatory参数为true
channel.basicPublish("","HelloWorld",true,null,msg.getBytes());
6. SpringBoot整合MQ:
7. 避免消息重复消费:?
? ? (1) 消费者message-acknowledge-> 没有走到手动ACK代码-> 消息被重复消费
? ? (2) 幂等性: 多次操作和一次操作效果一致, 如数据库删除, 网页提交按钮
? ? (3) 非幂等: 操作不一致. 如数据库添加
? ? (4) 解决: 发送消息前-> 消息id放入Redis(key=0正在执行, key=1执行成功)-> 发送消息ACK失败-> MQ发送消息给其他消费者-> 先执行setnx-> 如key存在,则获取值-->如key=0则不消费> 如key=1则ack
(1) 生产者,发送消息时,指定messageId
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) //指定消息是否需要持久化 1 - 需要持久化 2 - 不需要持久化
.messageId(UUID.randomUUID().toString())
.build();
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());
(2)消费者,在消费消息时,根据具体业务逻辑去操作redis
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Jedis jedis = new Jedis("10.20.159.25",6379);//记得修改为自己的Linux服务器ip
String messageId = properties.getMessageId();
//1. setnx到Redis中,默认指定value-0
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if(result != null && result.equalsIgnoreCase("OK")) {
System.out.println("接收到消息:" + new String(body, "UTF-8"));
//消费消息了,打印消息
//2. 消费成功,set messageId 1
jedis.set(messageId,"1");
channel.basicAck(envelope.getDeliveryTag(),false);
}else {
//3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1则手动ack
String s = jedis.get(messageId);
if("1".equalsIgnoreCase(s)){
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}
};
8.? SpringBoot实现代码-> 避免重复消费
(1) 生产者:
@Test
void contextLoads() throws IOException {
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
System.in.read();
}
(2) 消费者:
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 获取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 设置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消费消息
System.out.println("接收到消息:" + msg);
//3. 设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
|