title: “RocketMQ事务消息示例” date: 2022-04-07T17:14:16+08:00 draft: true
引言
分布式事务是一个复杂的问题,本文就基于 RocketMQ 来实现最终一种性方案的分布式事务的示例与测试。
概念
整体的流程如上所示。
RocketMQ 事务消息的原理是基于两阶段提交和事务状态回查。
-
半消息:是指暂时不能被消费的消息,半消息实际上被放在主题名为 RMQ_SYS_TRANS_HALF_TOPIC 下,当 producer 对半消息进行二次确认后,也就是上图的第 4 步后,consumer 才可以消费。 -
事务状态回查:如果上图的第 4 步,半消息提交因为种种原因(网络原因、producer崩溃)失败了,而导致 broker 不能收到 producer 的确认消息,那么 broker 就会定时扫描这些半消息,主动去确认。 当然,这个定时机制也是可以配置的。
最重要的两个概念就介绍到这里啦,其它的就不啰嗦了。
业务流程:每增加一个订单,就增加相应的积分。
数据库
数据库有两个,一个包含订单表和事务日志表,另一个则只有订单积分表。
CREATE TABLE `orders` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`user_id` int unsigned NOT NULL,
`goods_name` varchar(255) NOT NULL COMMENT '商品名',
`total` int unsigned NOT NULL COMMENT '数量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
CREATE TABLE `transaction_log` (
`id` varchar(32) NOT NULL COMMENT '事务ID',
`business` varchar(32) NOT NULL COMMENT '业务标识',
`foreign_key` varchar(32) NOT NULL COMMENT '对应业务表中的主键',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `order_credits` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`user_id` int unsigned NOT NULL COMMENT '用户ID',
`order_id` int unsigned NOT NULL COMMENT '订单ID',
`total` int unsigned NOT NULL COMMENT '积分数量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
核心代码
事务管理工具类
package com.fengxuechao.example.rocketmq;
import java.sql.*;
public class TransactionUtil {
private static final ThreadLocal<Connection> connections = new ThreadLocal<>();
private TransactionUtil() {
}
public static Connection startTransaction() {
Connection connection = connections.get();
if (connection == null) {
try {
connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/rocketmq?serverTimezone=GMT%2B8",
"root",
"12345678");
connection.setAutoCommit(false);
connections.set(connection);
} catch (SQLException e) {
e.printStackTrace();
}
}
return connection;
}
public static int execute(String sql, Object... args) throws SQLException {
PreparedStatement preparedStatement = createPreparedStatement(sql, args);
return preparedStatement.executeUpdate();
}
public static ResultSet select(String sql, Object... args) throws SQLException {
PreparedStatement preparedStatement = createPreparedStatement(sql, args);
preparedStatement.execute();
return preparedStatement.getResultSet();
}
private static PreparedStatement createPreparedStatement(String sql, Object[] args) throws SQLException {
Connection connection = startTransaction();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
if (args != null) {
for (int i = 0; i < args.length; i++) {
preparedStatement.setObject(i + 1, args[i]);
}
}
return preparedStatement;
}
public static void commit() {
try (Connection connection = connections.get()) {
connection.commit();
connections.remove();
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void rollback() {
try (Connection connection = connections.get()) {
connection.rollback();
connections.remove();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
生产者业务代码
发送半消息。对应的是上图中的第 1 步。
注意点:如果发送事务消息,在这里我们的创建的实例必须是 TransactionMQProducer 。
package com.fengxuechao.example.rocketmq;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new OrderTransactionListener();
TransactionMQProducer producer = new TransactionMQProducer();
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
r -> new Thread(r, "client-transaction-msg-check-thread"));
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setProducerGroup("producer_order_trans_group");
producer.start();
String topic = "transaction-topic";
String tags = "trans-order";
Order order = new Order();
order.setId(1);
order.setUserId(1);
order.setGoodsName("小脆面");
order.setTotal(2);
String orderJson = JSON.toJSONString(order);
try {
byte[] orderBytes = orderJson.getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message(topic, tags, "order", orderBytes);
producer.sendMessageInTransaction(msg, null);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
producer.shutdown();
}
}
package com.fengxuechao.example.rocketmq;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.sql.ResultSet;
import java.sql.SQLException;
public class OrderTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务");
TransactionUtil.startTransaction();
LocalTransactionState state;
try {
System.out.println("创建订单");
String orderStr = new String(msg.getBody());
Order order = JSON.parseObject(orderStr, Order.class);
String sql = "insert into orders(id, user_id, goods_name, total) values(?, ?, ?, ?)";
int executeUpdates = TransactionUtil.execute(sql, order.getId(), order.getUserId(),
order.getGoodsName(), order.getTotal());
if (executeUpdates > 0) {
System.out.println("写入本地事务日志");
String logSql = "insert into transaction_log(id, business, foreign_key) values(?, ?, ?)";
String business = msg.getKeys();
TransactionUtil.execute(logSql, msg.getTransactionId(), business, order.getId());
}
TransactionUtil.commit();
state = LocalTransactionState.COMMIT_MESSAGE;
} catch (SQLException e) {
TransactionUtil.rollback();
state = LocalTransactionState.ROLLBACK_MESSAGE;
System.out.println("本地事务异常,回滚");
e.printStackTrace();
}
return state;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.printf("回查本地事务, transactionId = %s%n", msg.getTransactionId());
TransactionUtil.startTransaction();
String sql = "select id, business, foreign_key from transaction_log where id = ?";
try (ResultSet transactionLog = TransactionUtil.select(sql, msg.getTransactionId())) {
if (transactionLog == null) {
return LocalTransactionState.UNKNOW;
}
} catch (SQLException e) {
e.printStackTrace();
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
消费者业务代码
这里是增加积分的阶段。
package com.fengxuechao.example.rocketmq;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_order_trans_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("transaction-topic", "trans-order");
consumer.setMaxReconsumeTimes(3);
TransactionUtil.startTransaction();
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() >= 3) {
sendMail();
}
String orderStr = new String(msg.getBody(), StandardCharsets.UTF_8);
Order order = JSON.parseObject(orderStr, Order.class);
String sql1 = "select * from order_credits where order_id = ?";
ResultSet rs = TransactionUtil.select(sql1, order.getId());
if (rs != null && rs.next()) {
System.out.println("积分已添加,订单已处理!");
} else {
String sql2 = "insert into order_credits(user_id,order_id,total) values(?,?,?)";
TransactionUtil.execute(sql2, order.getUserId(), order.getId(), order.getTotal() * 2);
System.out.printf("订单(id=%s)添加积分%n", order.getId());
TransactionUtil.commit();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
TransactionUtil.rollback();
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
private void sendMail() { }
});
consumer.start();
System.out.println("Consumer Started.");
}
}
总体上,我的思路就是这样,希望大家一起讨论学习。
链接
|