springboot整合rocketmq
建议先阅读RocketMQ二:java整合rocketmq开发模型
环境依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28</version>
</dependency>
</dependencies>
要选择和服务端的rocketmq相同的版本
代码案例
基础案例
生产者 在需要发送的地方引入RocketMQTemplate
@Resource
private RocketMQTemplate rocketMQTemplate;
public boolean sendMessage(String topic, String jsonString, Map<String, String> map) {
MessageBuilder<Object> builder = MessageBuilder.withPayload(jsonString);
map.forEach((index, value) -> {
builder.setHeader(index, value);
});
Message<Object> message = builder.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
String status = sendResult.getSendStatus().name();
if ("send_ok".equalsIgnoreCase(status)) return true;
return false;
}
消费者
@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",consumeMode= ConsumeMode.CONCURRENTLY)
public class SimpleConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("收到消息:"+msg);
}
}
简单的消费场景建议用上面的消费者,如果是一些复杂的消费场景,需要用到其他的属性,比如头信息,队列信息等这样就不好使了,建议使用下面这种方式
@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",consumeMode= ConsumeMode.CONCURRENTLY)
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
String tags = message.getTags();
Map<String, String> properties = message.getProperties();
User user = JSONObject.parseObject(message.getBody(), User.class);
System.out.println("msgId:"+msgId);
System.out.println("tags:"+tags);
System.out.println("body:"+user);
properties.forEach((i,v)->{
System.out.println(i+":"+v);
});
}
}
RocketMQMessageListener 注解解析
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
String consumerGroup();
String topic();
SelectorType selectorType() default SelectorType.TAG;
String selectorExpression() default "*";
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
MessageModel messageModel() default MessageModel.CLUSTERING;
int consumeThreadMax() default 64;
int maxReconsumeTimes() default -1;
long consumeTimeout() default 15L;
int replyTimeout() default 3000;
String accessKey() default "${rocketmq.consumer.access-key:}";
String secretKey() default "${rocketmq.consumer.secret-key:}";
boolean enableMsgTrace() default false;
String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";
String nameServer() default "${rocketmq.name-server:}";
String accessChannel() default "${rocketmq.access-channel:}";
}
顺序发送
public boolean sendOrderly(String topic, String jsonString, Map<String, String> map) {
for (int i = 0; i < 100; i++) {
int orderId = i;
for (int j = 0; j < 4; j++) {
String step = "";
switch (j) {
case 0:
step = "支付";
break;
case 1:
step = "扣减库存";
break;
case 2:
step = "发送物流";
break;
case 3:
step = "用户签收";
break;
}
SendResult sendResult = rocketMQTemplate.syncSendOrderly(
topic, jsonString + "OrderId: "+ orderId+ " step: " + step, orderId + "");
}
}
return true;
}
@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",consumeMode= ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
广播消息
@RocketMQMessageListener 属性配置 messageModel = MessageModel.BROADCASTING
延迟消息
public boolean sendDelay(String topic, String jsonString,Long timeOut){
Message<String> build = MessageBuilder.withPayload(jsonString).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, build, timeOut,4);
System.out.println(sendResult);
return true;
}
消息过滤
消息过滤通过RocketMQMessageListener 的selectorType和selectorExpression属性搭配使用 消息发送
public boolean sendFilter(String topic, String jsonString,String tags,Map<String, String> map){
MessageBuilder<Object> builder = MessageBuilder.withPayload(jsonString);
String tag = "";
for (int i = 0; i < 10; i++) {
if (i%2 == 0){
tag = "mytag";
}else {
if (map != null){
map.forEach((index, value) -> {
builder.setHeader(index, value);
});
}
tag = "othertag";
}
Message<Object> message = builder.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic + ":" + tag, message);
}
return true;
}
简单的过滤
@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",
consumeMode= ConsumeMode.CONCURRENTLY,selectorType = SelectorType.TAG,
selectorExpression = "mytag")
public class FilterConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String tags = message.getTags();
String body = new String(message.getBody());
System.out.println("consumer1---tags:"+tags+" body:"+body);
}
}
通过sql过滤
@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer2", topic = "test2",
consumeMode= ConsumeMode.CONCURRENTLY,selectorType = SelectorType.SQL92,
selectorExpression = "TAGS is not null and TAGS = 'othertag' and aa is not null and aa = 'dd' ")
public class FilterConsumer2 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String tags = message.getTags();
String body = new String(message.getBody());
System.out.println("consumer2---tags:"+tags+" body:"+body);
}
}
事务消息
发送消息
public boolean sendMessageInTransaction(String topic, String jsonString) throws InterruptedException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload(jsonString)
.setHeader("orderId","TransID_"+i)
.setHeader("tags",tags[i % tags.length])
.setHeader("MyProp","MyProp_"+i)
.build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message,"");
Thread.sleep(10);
}
return true;
}
注册事务处理机制 该机制用于 提交 或者回滚 相当于 原生api的producer.setTransactionListener(new TransactionListener() {}
package com.rocketmq.demo.config;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get("orderId");
String tags = msg.getHeaders().get("tags", String.class);
if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get("orderId").toString();
String tags = msg.getHeaders().get("tags").toString();
if(StringUtils.contains(tags,"TagC")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagD")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
消息接收 和之前没什么不同
@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",consumeMode= ConsumeMode.CONCURRENTLY)
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msg = new String(message.getBody());
String orderId = message.getUserProperty("orderId");
String tags = message.getUserProperty("tags");
System.out.println("orderId:"+orderId);
System.out.println("tags:"+tags);
System.out.println("body:"+msg);
}
}
注意事项: 之前注册事务处理机制的时候没有指定组,而是指定了rocketMQTemplate 因此当你有很多个消费者组的时候,都是共用这个事务处理逻辑,那么我们怎么解决这个问题呢? 由于之前指定的是rocketMQTemplate 这个beanName,那么根据这个,我们可以定义另一个rocketMQTemplateOther ,然后@RocketMQTransactionListener 指定这个rocketMQTemplateOther ,通过 rocketMQTemplateOther 发送的事务消息就会经过其他事务处理机制。
案例: 使用原来的rocketMQTemplate 就会进入到 定义另一个RocketMQTemplate
@ExtRocketMQTemplateConfiguration
public class RocketMQTemplateOther extends RocketMQTemplate {
}
定义另一个事务处理机制 在发送消息的地方引入
@Resource
private RocketMQTemplate rocketMQTemplateOther;
通过rocketMQTemplateOther来发送事务 这样就实现了 每个消费者组 之间事务机制的隔离
上诉例子的全部发送代码
package com.rocketmq.demo.unti;
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
@Component
public class RocketUtil {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private RocketMQTemplate rocketMQTemplateOther;
public boolean sendMessage(String topic, String jsonString, Map<String, String> map) {
MessageBuilder<Object> builder = MessageBuilder.withPayload(jsonString);
map.forEach((index, value) -> {
builder.setHeader(index, value);
});
Message<Object> message = builder.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
String status = sendResult.getSendStatus().name();
if ("send_ok".equalsIgnoreCase(status)) return true;
return false;
}
public boolean sendOrderly(String topic, String jsonString, Map<String, String> map) {
for (int i = 0; i < 100; i++) {
int orderId = i;
for (int j = 0; j < 4; j++) {
String step = "";
switch (j) {
case 0:
step = "支付";
break;
case 1:
step = "扣减库存";
break;
case 2:
step = "发送物流";
break;
case 3:
step = "用户签收";
break;
}
SendResult sendResult = rocketMQTemplate.syncSendOrderly(
topic, jsonString + "OrderId: "+ orderId+ " step: " + step, orderId + "");
}
}
return true;
}
public boolean sendDelay(String topic, String jsonString,Long timeOut){
Message<String> build = MessageBuilder.withPayload(jsonString).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, build, timeOut,4);
System.out.println(sendResult);
return true;
}
public boolean sendFilter(String topic, String jsonString,String tags,Map<String, String> map){
MessageBuilder<Object> builder = MessageBuilder.withPayload(jsonString);
String tag = "";
for (int i = 0; i < 10; i++) {
if (i%2 == 0){
tag = "mytag";
}else {
if (map != null){
map.forEach((index, value) -> {
builder.setHeader(index, value);
});
}
tag = "othertag";
}
Message<Object> message = builder.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic + ":" + tag, message);
}
return true;
}
public boolean sendMessageInTransaction(String topic, String jsonString) throws InterruptedException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload(jsonString)
.setHeader("orderId","TransID_"+i)
.setHeader("tags",tags[i % tags.length])
.setHeader("MyProp","MyProp_"+i)
.build();
SendResult sendResult = rocketMQTemplateOther.sendMessageInTransaction(topic, message,"");
Thread.sleep(10);
}
return true;
}
}
|