下面的架子是按照之前的一个框架继续开发的,有必要的话,可以参考一下:
RocketMQ系列之消息发送/消费初体验_阿小冰的博客-CSDN博客RocketMQ系列之消息发送/消费初体验https://blog.csdn.net/qq_38377525/article/details/123253718?1、发送创建订单的事务消息,预下单操作
这里定义一个订单的实体类Order,包含订单ID、订单标题两个属性即可
编写事务的模拟接口,代码如下:
@RestController
public class TransactionalController {
@Autowired
private Source source;
@GetMapping("/transactional")
public String tramsactional(){
Order order=new Order("1","上海浦东");
String transactionId=UUID.randomUUID().toString();
MessageBuilder builder = MessageBuilder.withPayload(order).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId);
Message message=builder.build();
source.output().send(message);
return "order is ok";
}
}
2、配置文件
server.port=8070
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=producer-demo-group
3、创建事务监听器
@RocketMQTransactionListener(txProducerGroup = "OrderTransactionGroup")
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
//获取前面订单生成的事务id
String transactionId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
//以事务ID为主键,执行本地事务
Order order=(Order) message.getPayload();
boolean result = this.saveOrder(order, transactionId);
return result?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;
}catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
//获取事务ID
String transactionId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
//以事务ID为主键,查询本地事务执行情况
if(isSuccess(transactionId)){
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
/**
* @Description: 将事务ID设置为唯一主键,调用数据库insert
*/
private boolean saveOrder(Order order,String transactionId){
return true;
}
/**
* @Description: 查询数据库
*/
private boolean isSuccess(String transactionId){
return true;
}
}
使用@RocketMQTransactionListener注解用于接收本地事务的监听,txProducerGroup是事务组名称,和前面定义的OrderTransactionGroup保持一致,其中这个接口还有两个实现方法,也在上面的代码有所体现
- executeLocalTransaction:执行本地事务,在消息发送成功会回调执行,一单事务提交成功,下游应用的消费者就能收到该消息
- checkLocalTransaction:检查本地事务执行状态,如果executeLocalTransaction方法中返回的状态是位置UNKNOWN或者为返回的状态,就会默认在预处理发送的一分钟后由Broker通知Producer检查本地事务,在Producer中回调本地事务监听器中的checkLocalTransaction方法,检查本地事务时,可以根据事务ID查询本地事务的状态,再返回具体事务状态给Broker
4、消费者不变,和上面提供链接中的消费者保持一致即可,然后启动验证即可