- mq的结构图,如下:
同理,consumer也是先向nameserver获取从哪一个broker获取消息,再得到nameserver的响应之后,才开始获取消息,consumer会一直监听消息的发送. 异步与同步的区别: 4 发送同步,异步消息: 同步: 指的是消息的发送者在发送消息后,处于阻塞状态,等待消息的消费方发送确认后返回. /**
- 发送同步消息
/ public class SyncProducer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(“group1”); //2.指定Nameserver地址 producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.启动producer producer.start(); for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /* * 参数一:消息主题Topic(消息的类别) * 参数二:消息Tag() * 参数三:消息内容 */ Message msg = new Message(“springboot-mq”, “Tag1”, (“Hello World” + i).getBytes()); //5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); //消息id(每条消息都有一个id) String msgId = result.getMsgId(); //消息队列id int queueId = result.getMessageQueue().getQueueId(); System.out.println(“发送结果:” + result); //线程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.关闭生产者producer producer.shutdown(); } }
异步: 消息发送到之后,不会去等待mq回传发送结果,异步消 息的可靠性没有同步消息高,异步消息的发送可以通过回调函数的方式接受消息的处理. 应用场景为对消息发送性能比较高的场景.不能让消息延迟 /**
-
发送异步消息 */ public class AsyncProducer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(“group1”); //2.指定Nameserver地址 producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.启动producer producer.start(); for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("base", "Tag2", ("Hello World" + i).getBytes());
//5.发送异步消息
producer.send(msg, new SendCallback() {
/**
* 发送成功回调函数
*/
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果:" + sendResult);
}
/**
* 发送失败回调函数
*/
public void onException(Throwable e) {
System.out.println("发送异常:" + e);
}
});
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者producer
producer.shutdown();
} }
5 单向消息发送: 只管发,不管结果(例如日志发送) /**
- 发送单向消息
/ public class OneWayProducer { public static void main(String[] args) throws Exception, MQBrokerException { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(“group1”); //2.指定Nameserver地址 producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.启动producer producer.start(); for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /* * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message(“base”, “Tag3”, (“Hello World,单向消息” + i).getBytes()); //5.发送单向消息 producer.sendOneway(msg); //线程睡1秒 TimeUnit.SECONDS.sleep(5); } //6.关闭生产者producer producer.shutdown(); } }
6 消费消息的流程: /**
-
消息的接受者 / public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”); //2.指定Nameserver地址 consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.订阅主题Topic和Tag consumer.subscribe(“base”, ""); //设定消费模式:负载均衡|广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
//消费成功返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
} } 在回调函数中监听消息,有消息发送到达时候,负责处理.
7 消息消费模式: (1) 广播模式: 消息队列中的消息被所有的消费者都消费一次.如下消息队列中的abc三条消息被消费者1,2都进行了消费.消费者之间都消费了相同的消息. (2) 负载均衡模式:消费者1,消费者2合作起来一起消费abc,例如消费者1消费了ab,消费者2消费了c. (3) 消费者在消费消息的时候指定消费模式:
注意: 默认的消费模式是负载均衡
8 顺序消息: 指的是按照生产者发送消息的顺序去进行消费(FIFO).RocketMQ可以严格保证消息有序(mq本身的数据结构就是先进先出). 当消息生产者的业务操作产生的消息具有顺序性的时候,那么消息在broker中是如何存储? Broker中包含了不止一个队列,broker采用轮询的方式,将生产者的顺序消息分别存储在不同的队列中,如图: 消费者采用多线程的方式同时去消费消息.消费者如何保证顺序性消费呢?多线程很难保证顺序性.
9 消息顺序: (1) 全局消息顺序(broker中所有队列的消息都是有序的,全局顺序是没有必要的) (2) 局部消息顺序(将有顺序的消息放入一个队列中) 此时在一个队列中的消息,就得用单线程而不是多线程的方式去消费消息了.
9.1 首先保证同一个业务的顺序消息发送至同一个队列中.例如:创建一个订单的业务: 创建,付款,推送,完成; 订单号相同的消息需要推送至同一个队列,消费时,同一个orderId获取到的肯定是同一个队列. 如下: 确定业务消息存在一个队列中. public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
//3.启动producer
producer.start();
//构建消息集合(订单集合)
List<OrderStep> orderSteps = OrderStep.buildOrders();
//发送消息
for (int i = 0; i < orderSteps.size(); i++) {
String body = orderSteps.get(i) + "";
Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
/**
* 参数一:消息对象
* 参数二:消息队列的选择器(确保业务相关消息发送至同一个队列)
* 参数三:选择队列的业务标识(订单ID)
*/
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
*
* @param mqs:队列集合
* @param msg:消息对象
* @param arg:业务标识的参数
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (long) arg;
long index = orderId % mqs.size();//根据订单id选择队列(保证同一个订单在同一个队列中)
return mqs.get((int) index);//根据取模的结果确定发送的队列
}
}, orderSteps.get(i).getOrderId());
System.out.println("发送结果:" + sendResult);
}
producer.shutdown();
}
}
9.2 消息的顺序消费(采用单线程确保消费的顺序性) 10 延迟消息: 在发送消息的时候,设置消息被延迟消费的时间 延迟可选如下: 11 批量消息发送: 批量发送消息能显著提高消息传递的性能,这一批消息的总大小不应超过4M.超过范围则需要进行分割.需要手写一个分割算法,计算消息的大小. public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(“group1”); //2.指定Nameserver地址 producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.启动producer producer.start(); List msgs = new ArrayList(); //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg1 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 1).getBytes()); Message msg2 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 2).getBytes()); Message msg3 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 3).getBytes()); msgs.add(msg1); msgs.add(msg2); msgs.add(msg3); //5.发送消息 SendResult result = producer.send(msgs); //发送状态 SendStatus status = result.getSendStatus(); System.out.println(“发送结果:” + result); //线程睡1秒 TimeUnit.SECONDS.sleep(1); //6.关闭生产者producer producer.shutdown(); } }
public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”); //2.指定Nameserver地址 consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.订阅主题Topic和Tag consumer.subscribe(“BatchTopic”, “*”); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
12 过滤消息的方式: 当消息生产者将消息发送至broker后,consumer可以根据一定的规则对消息进行过滤,选择需要消费的消息 (1) tag过滤,consumer将接受包含TAG的消息.但是限制是一个消息只能有一个标签,不适用与一些复杂的场景. public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(“group1”); //2.指定Nameserver地址 producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.启动producer producer.start(); for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message(“FilterTagTopic”, “Tag2”, (“Hello World” + i).getBytes()); //5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); System.out.println(“发送结果:” + result); //线程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.关闭生产者producer producer.shutdown(); } } public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”); //2.指定Nameserver地址 consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.订阅主题Topic和Tag consumer.subscribe(“FilterTagTopic”, "Tag1 || Tag2 "); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
(2) Sql:复杂场景推荐使sql过滤. public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(“group1”); //2.指定Nameserver地址 producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.启动producer producer.start(); for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message(“FilterSQLTopic”, “Tag1”, (“Hello World” + i).getBytes()); msg.putUserProperty(“i”, String.valueOf(i));//添加自定义的属性用于过滤 //5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); System.out.println(“发送结果:” + result); //线程睡1秒 TimeUnit.SECONDS.sleep(2); } //6.关闭生产者producer producer.shutdown(); } } public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”); //2.指定Nameserver地址 consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”); //3.订阅主题Topic和Tag consumer.subscribe(“FilterSQLTopic”, MessageSelector.bySql(“i>5”));//过滤条件 //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
|