一、简介
二、概述
三、使用
1、安装
拉取镜像:
docker hub :
docker pull rabbitmq:management
创建容器:
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 b6f50be8c669
查看web端
http://localhost:15672
data:image/s3,"s3://crabby-images/6c2c3/6c2c3f68fe20f0f7ab8eb5bfdf8726acdf22feef" alt="在这里插入图片描述"
2、入门案例
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/xingyu");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue1", true, false, false, null);
String body = "hello rabbitmq!";
channel.basicPublish("","queue1",null,body.getBytes(StandardCharsets.UTF_8));
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/xingyu");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue1",true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("queue1",true,consumer);
}
3、工作模式
3.1 work queue 工作队列模式
data:image/s3,"s3://crabby-images/7cee5/7cee56fa505b4ffa550312c0e174787dc76a061c" alt="在这里插入图片描述"
3.2 pub/sub订阅模式
data:image/s3,"s3://crabby-images/e28d1/e28d1bdaa800192736918e6da7cf712cf4272585" alt="在这里插入图片描述"
在订阅者模式中,多了一个exchange的角色,而且过程有所改变
- p:生产者,不在发送消息到队列,而是发送到交换机
- c:消费者,消息的接收者,会一直等待消息到来
- Queue:队列,接受消息,缓存消息
- Exchange: 交换机,一方面,接收生产者发送的消息,另一方面,知道如何处理消息,例如交给某个特别队列,递交给所有队列,或者将消息丢掉,到底如何操作,取决于exchange的类型,exchange常见有三种类型
Fanout:广播,把消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic: 通配符,把消息交给符合routing pattern(路由模式) 的队列
- exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列和exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/xingyu");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
String body = "日志信息:张三调用了findall()方法...日志级别:info";
channel.basicPublish(exchangeName, "", null, body.getBytes(StandardCharsets.UTF_8));
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/xingyu");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/xingyu");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
System.out.println("将日志信息存储到数据库");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/xingyu");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "erro");
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "warning");
channel.queueBind(queue2Name, exchangeName, "debug");
String body = "日志信息:张三调用了findall()方法...日志级别:info";
channel.basicPublish(exchangeName, "debug", null, body.getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/xingyu");
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "#.erro");
channel.queueBind(queue2Name, exchangeName, "#.info");
channel.queueBind(queue2Name, exchangeName, "*.warning");
channel.queueBind(queue2Name, exchangeName, "*.debug.*");
String body1 = "日志信息:张三调用了findall()方法...日志级别:system.erro";
String body2 = "日志信息:张三调用了findall()方法...日志级别:system.info";
String body3 = "日志信息:张三调用了findall()方法...日志级别:system.warning";
String body4 = "日志信息:张三调用了findall()方法...日志级别:system.debug";
String body5 = "日志信息:张三调用了findall()方法...日志级别:system.debug.debug";
channel.basicPublish(exchangeName, "system.erro", null, body1.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "system.info", null, body2.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "system.warning", null, body3.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "system.debug", null, body4.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "system.debug.debug", null, body5.getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
3.3 springboot整合rabbitmq
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
@Bean("bootExchange")
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
@Bean("bootQueue")
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding binding(@Qualifier("bootExchange") Exchange bootExchange,
@Qualifier("bootQueue") Queue queue){
return BindingBuilder.bind(queue).to(bootExchange).with("boot.#").noargs();
}
}
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@org.junit.Test
public void test() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello....");
}
}
@Component
public class ConsumerListener {
@RabbitListener(queues = "boot_queue")
public void listenerQueue(Message message){
byte[] body = message.getBody();
System.out.println(message);
System.out.println(new String(body));
}
}
四、高级特性
1、消息的可靠投递
1.1 confirm
spring:
application:
name: rabbitmq-producer
rabbitmq:
virtual-host: /xingyu
host: localhost
port: 5672
username: root
password: root
publisher-confirms: true
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
@Bean("bootExchange")
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
@Bean("bootQueue")
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding binding(@Qualifier("bootExchange") Exchange bootExchange,
@Qualifier("bootQueue") Queue queue){
return BindingBuilder.bind(queue).to(bootExchange).with("boot.#").noargs();
}
}
@org.junit.Test
public void test() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm执行了");
if(ack){
System.out.println("接收成功");
}else {
System.out.println("接收失败"+cause);
}
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"boot.haha","boot mq hello....");
}
1.2 return
spring:
application:
name: rabbitmq-producer
rabbitmq:
virtual-host: /xingyu
host: localhost
port: 5672
username: root
password: root
publisher-returns: true
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
@Bean("bootExchange")
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
@Bean("bootQueue")
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding binding(@Qualifier("bootExchange") Exchange bootExchange,
@Qualifier("bootQueue") Queue queue){
return BindingBuilder.bind(queue).to(bootExchange).with("boot.#").noargs();
}
}
@org.junit.Test
public void test() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return执行了");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"boot1","boot mq hello....");
}
@Component
@Configuration
public class ConsumerListener {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@RabbitListener(queues = "boot_queue")
public void listenerQueue1(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(message);
System.out.println("执行业务逻辑");
int i = 3/0;
channel.basicAck(deliveryTag, true);
System.out.println("签收成功");
} catch (Exception e) {
System.out.println("签收失败");
channel.basicNack(deliveryTag, true, true);
}
}
}
2、 消费端限流
ack确认必须设置为手工
@Component
@Configuration
public class ConsumerListener {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1);
return factory;
}
@RabbitListener(queues = "boot_queue")
public void listenerQueue1(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println(new String(message.getBody()));
channel.basicAck(deliveryTag,true);
}
}
3、TTL
TTL全称Time To Live (存活时间/过期时间) 当消息到达存活时间后,还没有被消费,就会被清除 rabbitmq可对消息设置存活时间,也可以对整个队列设置存活时间
data:image/s3,"s3://crabby-images/5e783/5e783e565712b3ea40a804f02d81cab0b790be45" alt="在这里插入图片描述"
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
@Bean("bootExchange")
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
@Bean("bootQueue")
public Queue queue(){
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_NAME).withArguments(map).build();
}
@Bean
public Binding binding(@Qualifier("bootExchange") Exchange bootExchange,
@Qualifier("bootQueue") Queue queue){
return BindingBuilder.bind(queue).to(bootExchange).with("boot.#").noargs();
}
}
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"boot.haha", "boot mq hello...." , new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});
4、死信队列
DLX : dead letter exchange(死信交换机),当消息成为Dead Message后,可以被重新发送到另一个交换机,这个交换机就是DLX
队列消息长度达到限制 消费者拒绝消费消息,basicNack/basicReject,并且不把消息重新放回原目标队列,requeue =false 原队列存在消息过期设置,消息达到超时时间未被消费
给队列设置参数: x-dead-letter-exchange x-dead-letter-routing-key data:image/s3,"s3://crabby-images/a5b65/a5b654fafb470a5c0bd725f349ef2c18990ef3ee" alt="在这里插入图片描述"
@Configuration
public class RabbitMQConfig {
public static final String TTL_EXCHANGE_NAME = "ttl_exchange";
public static final String TTL_QUEUE_NAME = "ttl_queue";
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
public static final String DEAD_QUEUE_NAME = "dead_queue";
@Bean("ttlExchange")
public Exchange ttlExchange(){
return ExchangeBuilder.topicExchange(TTL_EXCHANGE_NAME).build();
}
@Bean("ttlQueue")
public Queue ttlQueue(){
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl",10000);
map.put("x-max-length",10);
map.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
map.put("x-dead-letter-routing-key","dead.hehe");
return QueueBuilder.durable(TTL_QUEUE_NAME).withArguments(map).build();
}
@Bean
public Binding ttlBinding(@Qualifier("ttlExchange") Exchange exchange,
@Qualifier("ttlQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
}
@Bean("deadExchange")
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
}
@Bean("deadQueue")
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
@Bean
public Binding deadBinding(@Qualifier("deadExchange") Exchange exchange,
@Qualifier("deadQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("dead.#").noargs();
}
}
@org.junit.Test
public void test() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,
"ttl.haha", "我是一条消息,我会死吗");
}
5、延迟队列
TTL + 死信队列 data:image/s3,"s3://crabby-images/1f673/1f673f717f515d40d070a0dd53c76b830064ba74" alt="在这里插入图片描述"
实现:由消费者监听死信队列即可
6、日志与监控
web控制台
7、消息追踪
一般用于开发和调试环境 在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于rabbitmq而言,可能因为生产者或者消费者与rabbitmq断开了连接,而它们与rabbitmq又采用了不同的确认机制,也有可能是因为交换器与队列之间不同的转发策略,甚至于交换机并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施,另外rabbitmq本身的集群策略也可能导致消息的丢失,这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位
给amq.rabbitmq.trace 交换机绑定的队列,routingkey 设置为 “#” , 在rabbitmq命令台输入 rabbitmqctl trace_on 即可开启 data:image/s3,"s3://crabby-images/11852/11852adfd2be873e5ae27d8d6843c7fa98234621" alt="在这里插入图片描述"
rabbitmq-plugins list rabbitmq-plugins enable 插件名 data:image/s3,"s3://crabby-images/f5b9a/f5b9a957d2f3039a223c49527cf0b1a8bc35e25a" alt="在这里插入图片描述" data:image/s3,"s3://crabby-images/c0068/c0068ff5ef83317a5d07c461dab23af94aaf75f4" alt="在这里插入图片描述"
8、应用问题
8.1 消息补偿机制
100%消息发送成功 2失败,则比q3和mdb的消息id,如果不一致,则通知producer重新发送 2,3失败,则比对db和MDB数据是否一致,不一致的数据由producer重新发送
data:image/s3,"s3://crabby-images/d67a5/d67a5e652630afa94c1040510abb8c2e96c22812" alt="在这里插入图片描述"
8.2 幂等性
利用版本号 data:image/s3,"s3://crabby-images/6fc60/6fc60364aae43474f5a56c3f42098f79ed25da6a" alt="在这里插入图片描述"
- 消费数据为了单纯的写入数据库,可以先根据主键查询数据是否已经存在,如果已经存在了就没必要插入了。或者直接插入也没问题,因为可以利用主键的唯一性来保证数据不会重复插入,重复插入只会报错,但不会出现脏数据。
- 消费数据只是为了缓存到redis当中,这种情况就是直接往redis中set value了,天然的幂等性。
- 针对复杂的业务情况,可以在生产消息的时候给每个消息加一个全局唯一ID,消费者消费消息时根据这个ID去redis当中查询之前是否消费过。如果没有消费过,就进行消费并将这个消息的ID写入到redis当中。如果已经消费过了,就无需再次消费了。
8.3 可靠性
data:image/s3,"s3://crabby-images/1aa18/1aa18cd9845081764ca3c43189b0e9f01893c74d" alt="在这里插入图片描述"
8.4 顺序性
link:https://blog.csdn.net/zw791029369/article/details/109561457
data:image/s3,"s3://crabby-images/0bf5f/0bf5f876b763a7160f224cde1d7a5b7f181fdb3a" alt="https://blog.csdn.net/zw791029369/article/details/109561457" data:image/s3,"s3://crabby-images/df521/df521b4fe11c9b39ad8f58b4183a24e64b41254e" alt="在这里插入图片描述"
9、集群
rabbitmq天然支持集群,不需要注册中心 集群+haproxy
查看状态 rabbitmqctl status
|