RabbitMQ java API
队列持久化
# 生成一个队列 1、队列名称 2、队列里面的消息是否进行持久化 3、是否共享消息 4、是否自动删除 5、其他高级参
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
消息持久化
# 发送一个消息 1、发送到哪个交换机 2、路由的key 3、其他的参数信息 4、发送消息内容
# 参数3:设置消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes("UTF-8"));
不公平分发(能力越大责任越大,多劳多得)
# 消费者代码处增加
channel.basicQos(1);
预期值
# 消费者代码处增加
channel.basicQos(3);
消息不丢失条件
1、队列持久化; 2、消息持久化; 3、发布确认; 3.1、单个确认发布;(1000条,耗时722ms)慢 3.2、批量发布确认;(1000条,耗时147ms)快,出问题后不知道是哪个, 3.3、异步发布确认;(1000条,耗时62ms发完)性价比最高,复杂
发布确认
开启发布确认
channel.confirmSelect();// 开启发布确认
单个确认发布
# 发一条,及时确认一条
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
批量确认发布
# 发多条后,再确认。比如10个一组或100个一组,然后确认一次
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
......
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
异步确认发布
发送消息前,增加监听
// 成功
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println("成功的消息标记:" + deliveryTag);
};
// 失败
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("失败的消息标记:" + deliveryTag);
};
channel.addConfirmListener(ackCallback, ackCallback);
/**
* 发送一个消息 1、发送到哪个交换机 2、路由的key 3、其他的参数信息 4、发送消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认
失败消息处理
/**
* 线程安全有序的哈希表 使用于高并发 1、轻松的将序号和消息关联 2、轻松批量删除条码,根据序号 3、支持高并发(多线程)
*/
ConcurrentSkipListMap<Long, String> outStandingConfirms = new ConcurrentSkipListMap<>();
// 成功
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if(multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outStandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else {
outStandingConfirms.remove(deliveryTag);
}
System.out.println("成功的消息标记:" + deliveryTag);
};
// 失败
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("失败的消息标记:" + deliveryTag);
};
channel.addConfirmListener(ackCallback, ackCallback);
/**
* 发送一个消息 1、发送到哪个交换机 2、路由的key 3、其他的参数信息 4、发送消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
outStandingConfirms.put(channel.getNextPublishSeqNo(), msg);
channel.waitForConfirms();// 确认
System.out.println("消息发送完毕");
广播/订阅(fanout-扇出交换机)
订阅者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLog1 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 生成随机队列名称,消费完成后自动删除
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到交换机上,routingkey为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("ReceiveLog1接收到的消息:" + msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("ReceiveLog1准备就绪:");
}
}
生产者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
public class EmitLog {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(ReceiveLog1.EXCHANGE_NAME, "fanout");
String msg = "世界你好!";
channel.basicPublish(ReceiveLog1.EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("消息发送完成");
}
}
路由模式(direct-直接交换机)
消费者1
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogDirect1 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
// 声明一个队列
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("console接收到的消息:" + msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("console准备就绪:");
}
}
消费者2
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogDirect2 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
// 声明一个队列
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("disk接收到的消息:" + msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("disk准备就绪:");
}
}
生产者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
public class EmitLog {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(ReceiveLogDirect1.EXCHANGE_NAME, "direct");
String msg = "世界你好aaaaaaaaaaaaaaaaaaaa-warning!";
channel.basicPublish(ReceiveLogDirect1.EXCHANGE_NAME, "warning", null, msg.getBytes());
System.out.println("消息发送完成");
}
}
(Topics-主题交换机)(可包括:fanout、direct)
routing_key格式要求: 1、必须是一个单词列表,以点号分割开; 2、比如:hao.ok.very; 3、* 可以代替一个单词; 4、# 可以代替零个或多个单词; 5、若只有 # ,则表示所有记录,相当于:fanout; 6、若没有 * 和 # ,则表示绑定具体队列,相当于:direct; 消费者1
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class C1 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("C1接收到的消息:" + msg+" 绑定键:"+message.getEnvelope().getRoutingKey());
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("C1准备就绪:");
}
}
消费者2
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class C2 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("C2接收到的消息:" + msg + " 绑定键:" + message.getEnvelope().getRoutingKey());
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
System.out.println("C2准备就绪:");
}
}
生产者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
public class P {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String msg = "topic模式测试走起";
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg.getBytes("UTF-8"));
System.out.println("消息发送成功");
}
}
死信队列
死信队列的3大来源: 1、超时; 2、队列满; 3、消息被拒绝;
TTL死信
消费者
import java.util.HashMap;
import java.util.Map;
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class C1 {
public static final String EXCHANGE_NORMAL_NAME = "normal_exchange";
public static final String EXCHANGE_DEAD_NAME = "dead_exchange";
public static final String QUEUE_NORMAL_NAME = "normal_queue";
public static final String QUEUE_DEAD_NAME = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NORMAL_NAME, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
Map<String, Object> arguments = new HashMap<String, Object>();
// 过期时间 10s=10000ms(一般过期时间在生产方控制,此处不写)
// arguments.put("x-message-ttl",10000);
// 过期后进入的死信交换机
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD_NAME);
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(QUEUE_NORMAL_NAME, false, false, false, arguments);
channel.queueBind(QUEUE_NORMAL_NAME, EXCHANGE_NORMAL_NAME, "zhangsan");
// 死信队列
channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("C1接收到消息:" + msg);
};
channel.basicConsume(QUEUE_NORMAL_NAME, deliverCallback, consumerTag -> {
});
}
}
消费者2
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class C2 {
public static final String EXCHANGE_DEAD_NAME = "dead_exchange";
public static final String QUEUE_DEAD_NAME = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 死信队列
//channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
//channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("C2接收到消息:" + msg);
};
channel.basicConsume(QUEUE_DEAD_NAME, deliverCallback, consumerTag -> {
});
}
}
生产者
import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
public class P {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(C1.EXCHANGE_NORMAL_NAME, BuiltinExchangeType.DIRECT);
String msg = "死信消息要来了";
// 过期时间,单位毫秒,设置10s
BasicProperties properties = new BasicProperties().builder().expiration("10000").build();
channel.basicPublish(C1.EXCHANGE_NORMAL_NAME, "zhangsan", properties, msg.getBytes("UTF-8"));
System.out.println("死信消息发送成功!");
}
}
MAX死信
# C1增加代码
// 设置队列长度限制(队列里面超过6条时,则进入到死信队列)
arguments.put("x-max-length", 6);
# P 删除超时设置即可
消息被拒绝-死信
消费者增加以下代码进行拒绝
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if("拒绝条件".equals(msg)) {
System.out.println("C2拒绝接收消息接收到消息:" + msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("C2接收消息接收到消息:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
// 需要开启手动应答
channel.basicConsume(QUEUE_NORMAL_NAME,false, deliverCallback, consumerTag -> {
});
延迟队列(死信队列的一种)
使用场景: 1、订单在十分钟内未支付自动取消; 2、新创建的店铺,如果10天内都没上传过商品,则自动发送消息提醒; 3、新注册用户,如果3天内都没进行登录,则短信提醒; 4、用户发起退款,如果3天都没处理,则通知相关运营人员介入; 5、预定会议,在预定会议开始前10分钟发消息通知参会人员; 总结:虽然和定时任务非常相似。但特点是:数据量大,时效性强;
|