问题描述
消息队列的消息幂等性,主要是由 MQ 重试机制引起的。
我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
举个例子,一个消息 M 发送到了消息中间件,消息投递到了消费程序 A,A 接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。然而这种可靠的特性导致,消息可能被多次地投递。
再举个例子,程序 A 接受到这个消息 M 并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序 A 来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
解决方案
基于业务表的去重
假如我们的逻辑是这样的:
select?*?from?t_order?where?order_no?=?'order123'
if(order??!=?null)?{
????return?;//消息重复,直接返回
}
那么在并下情况下可能会有问题(当业务还没处理完,此时并发的请求进来了,则会穿透掉检查的挡板),所以要考虑原子性问题。
解决的办法在我们上一篇文章中也有介绍到,即使用:
但无论是 select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。
基于消息表+本地事务
我们以 RocketMQ 作为消息中间件为例, RocketMQ 的 文档 中描述道:
“
RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 Id 等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
”
“
msgId 一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同 msgId 的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
”
根据文档的描述我们得到了第一种解决方案,即通过去重表 的方法,这点在上一篇文章 《幂等解决方案集合(一)》 中已有所描述,需要注意的是:
“
在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。
”
具体来说,我们可以这样做:在数据库中增加一个消息消费记录表,把业务操作和这个消息插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。
-
开启事务 -
插入消息表(处理好主键冲突的问题) -
更新订单表(原消费逻辑) -
提交事务
但是这里有它的局限性
-
消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如 Redis 这种不支持事务特性的数据源,则这些数据是不可回滚的。 -
数据库的数据必须是在一个库,跨库无法解决
使用 Exactly-Once 投递语义收发消息
Exactly-Once 投递语义
以下引用自阿里云的文档:
“
Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
”
“
Exactly-Once 语义是消息系统和流式计算系统中消息流转的最理想状态,但是在业界并没有太多理想的实现。因为真正意义上的 Exactly-Once 依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。例如,当您的消费端完成一条消息的消费处理后出现异常宕机,而消费端重启后由于消费的位点没有同步到消息系统的服务端,该消息有可能被重复消费。
”
“
业界对于Exactly-Once 投递语义存在很大的争议,很多人会拿出“FLP 不可能理论”或者其他一致性定律对此议题进行否定,但事实上,特定场景的 Exactly-Once 语义实现并不是非常复杂,只是因为通常大家没有精确的描述问题的本质。
”
“
如果您要实现一条消息的消费结果只能在业务系统中生效一次,您需要解决的只是如何保证同一条消息的消费幂等问题。消息队列RocketMQ 版的Exactly-Once 语义就是解决业务中最常见的一条消息的消费结果(消息在消费端计算处理的结果)在数据库系统中有且仅生效一次的问题。
”
典型使用场景
“
在电商系统中,上游实时计算模块发布商品价格变更的信息,异步通知到下游商品管理模块进行价格变更。此时,需要保证每一条信息的消费幂等,即重复的价格变更信息只会生效一次,这样便不会发生价格多次重复修改的情况,确保实现了消息消费的幂等。
”
操作步骤
1 添加依赖
消息队列 RocketMQ 版的 ExactlyOnceConsumer 在客户端 SDK ons-client-ext-1.8.4.Final 中发布,若要使用 Exactly-Once 投递语义,需在应用中依赖该 SDK。另外,ExactlyOnceConsumer 基于 Spring 实现了通过注解@MQTransaction 开启 Exactly-Once 消费的方式,因此还需要在应用中增加 Spring 3.0 以上版本的依赖。
完整的依赖内容如下所示。
<dependency>
????<groupId>com.aliyun.openservices</groupId>
????<artifactId>ons-client-ext</artifactId>
????<version>1.8.4.Final</version>
</dependency>
<dependency>
????<groupId>org.springframework</groupId>
????<artifactId>spring-context</artifactId>
????<version>${spring-version}</version>
</dependency>
<dependency>
????<groupId>org.springframework</groupId>
????<artifactId>spring-jdbc</artifactId>
????<version>${spring-version}</version>
</dependency>
2 在用于存储消息消费结果的数据库中创建 transaction_record 表 (注意:存储消息消费结果的数据库系统必须支持本地事务)
若要使用消息队列 RocketMQ 版的 Exactly-Once 投递语义,您需要在业务处理结果持久化的数据库中创建一张 transaction_record 表,保证此表与存储业务处理结果的表在同一个数据库内,且该数据库支持本地事务。目前,消息队列 RocketMQ 版的 Exactly-Once 投递语义支持您的业务访问 MySQL 和 SQLServer 两种类型的数据源。这两种类型的数据源下 transaction_record 表的建表语句如以下所示 (mysql)。
CREATE?TABLE?`transaction_record`?(
??`consumer_group`?varchar(128)?NOT?NULL?DEFAULT?'',
??`message_id`?varchar(255)?NOT?NULL?DEFAULT?'',
??`topic_name`?varchar(255)?NOT?NULL?DEFAULT?'',
??`ctime`?bigint(20)?NOT?NULL,
??`queue_id`?int(11)?NOT?NULL,
??`offset`?bigint(20)?NOT?NULL,
??`broker_name`?varchar(255)?NOT?NULL?DEFAULT?'',
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT,
??PRIMARY?KEY?(`id`),
??UNIQUE?KEY?`message_id_unique_key`?(`message_id`),
??KEY?`ctime_key`?(`ctime`),
??KEY?`load_key`?(`queue_id`,`broker_name`,`topic_name`,`ctime`)
)?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;???????????
3 在消息生产端使用 PropertyKeyConst.EXACTLYONCE_DELIVERY 属性设置打开 Exactly-Once 投递语义
/**
?*?TestExactlyOnceProducer?启动
?*?通过 PropertyKeyConst.EXACTLYONCE_DELIVERY 开启 exactly-once 投递语义。
?*/
public?class?TestExactlyOnceProducer?{
????public?static?void?main(String[]?args)?{
????????Properties?producerProperties?=?new?Properties();
????????producerProperties.setProperty(PropertyKeyConst.GROUP_ID,"{GROUP_ID}");
????????producerProperties.setProperty(PropertyKeyConst.AccessKey,"{accessKey}");
????????producerProperties.setProperty(PropertyKeyConst.SecretKey,"{secretKey}");
????????producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR,"{NAMESRV_ADDR}");
????????producerProperties.setProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY,"true");
????????Producer?producer?=?ExactlyOnceONSFactory.createProducer(producerProperties);
????????producer.start();
????????System.out.println("Producer?Started");
????????for?(int?i?=?0;?i?<?10;?i++)?{
????????????Message?message?=?new?Message("{topic}",?"{tag}",?"mq?send?transaction?message?test".getBytes());
????????????try?{
????????????????SendResult?sendResult?=?producer.send(message);
????????????????assert?sendResult?!=?null;
????????????????System.out.println(new?Date()?+?"?Send?mq?message?success!?msgId?is:?"?+?sendResult.getMessageId());
????????????}?catch?(ONSClientException?e)?{
????????????????System.out.println("发送失败");
????????????????//出现异常意味着发送失败,为避免消息丢失,建议缓存该消息然后进行重试。
????????????}
????????}
????????producer.shutdown();
????}
}????????????
4 在消息消费端创建 ExactlyOnceConsumer,并开启 Exactly-Once 的消费模式。
使用消息队列 RocketMQ 版的 Exactly-Once 投递语义进行消费时,消费端需要使用 ExactlyOnceONSFactory 调用 createExactlyOnceConsumer 接口创建 ExactlyOnceConsumer,然后通过使用 ExactlyOnceConsumer 进行 Exactly-Once 模式的消费。
在使用 ExactlyOnceConsumer 时,需要注意以下两点:
-
创建 ExactlyOnceConsumer 时,可以通过设置 PropertyKeyConst.EXACTLYONCE_DELIVERY 属性打开或者关闭 Exactly-Once 投递语义。ExactlyOnceConsumer 默认打开 Exactly-Once 投递语义。 -
使用 ExactlyOnceConsumer 消费时,在消息监听器 MessageListener 的 consume 方法中,您的业务处理逻辑需要使用 MQDataSource 对数据库的进行读写操作。
您可以选择以下任一方式在消费端开启 Exactly-Once 投递语义:
-
以非 Spring 方式开启 Exactly-Once 投递语义 -
MessageListener 中以事务方式实现多项数据库操作和消息消费的事务性 -
MessageListener 中通过 Springboot 注解方式实现开启 Exactly-Once 投递语义 -
MessageListener 中通过 MyBatis 方式实现 Exactly-Once 投递语义
以下示例为:MessageListener 中以事务方式实现多项数据库操作和消息消费的事务性,其他示例代码,请参考阿里云文档:https://help.aliyun.com/document_detail/102777.html
/**
?* TestExactlyOnceListener 实现。
?*?实现了一个事务中对多个业务表进行更新的场景,保证事务内的操作有且仅有一次生效。
?*/
public?class?SimpleTxListener?implements?MessageListener?{
????private?MQDataSource?dataSource;
????public?SimpleTxListener()?{
????????this.dataSource?=?new?MQDataSource("{url}",?"{user}",?"{passwd}",?"{driver}");
????}
????????@Override
????public?Action?consume(Message?message,?ConsumeContext?context)?{
????????Connection?connection?=?null;
????????Statement?statement?=?null;
????????try?{
????????????/**
?????????????*??在此处对消费到的消息 message 做业务计算处理,使用 MQDataSource 将处理结果持久化到数据库系统。
?????????????*??此范例演示了在一个事务内对多个表进行更新的业务场景,Exactly-Once 投递语义保证事务内的操作有且仅有一次。
?????????????*??实际的业务处理按照:接收消息->业务处理->结果持久化的流程来设计。
?????????????*/
????????????connection?=?dataSource.getConnection();
????????????connection.setAutoCommit(false);
????????????String?insertSql?=?String.format("INSERT?INTO?app(msg,?message_id,?ctime)?VALUES(\"%s\",?\"%s\",?%d)",
????????????????new?String(message.getBody()),?message.getMsgID(),?System.currentTimeMillis());
????????????String?updateSql?=?String.format("UPDATE?consume_count?SET?cnt?=?count?+?1?WHERE?consumer_group?=?\"%s\"",?"GID_TEST");
????????????statement?=?connection.createStatement();
????????????statement.execute(insertSql);
????????????statement.execute(updateSql);
????????????connection.commit();
????????????System.out.println("consume?message?:"?+?message.getMsgID());
????????????return?Action.CommitMessage;
????????}?catch?(Throwable?e)?{
????????????try?{
????????????????connection.rollback();
????????????}?catch?(Exception?e1)?{
????????????}
????????????System.out.println("consume?message?fail");
????????????return?Action.ReconsumeLater;
????????}?finally?{
????????????if?(statement?!=?null)?{
????????????????try?{
????????????????????statement.close();
????????????????}?catch?(Exception?e)?{?}
????????????}
????????????if?(connection?!=?null)?{
????????????????try?{
????????????????????connection.close();
????????????????}?catch?(Exception?e)?{?}
????????????}
????????}
????}??????????????????????????????????????????????????????
}????????????
总结
你可以看到,这种方式是阿里云做了封装,但理论上跟上一个方案 基于消息表+本地事务 是一样的。
利用 Redis
最后这种方案其实思路跟 基于消息表+本地事务 类似,只不过判断是否重复消息用 Redis 分布式锁实现,关于具体实现步骤也跟我们上一篇文章类似,请参考上一篇文章。
最后
当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入 死信队列 ,最终进入到数据库中,接着设置定时 job 查询这些数据,进行手动补偿。
参考
-
https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md -
https://www.baiyp.ren/%E4%B8%9A%E5%8A%A1%E5%B9%82%E7%AD%89%E6%80%A7%E6%9E%B6%E6%9E%84%E8%AE%BE%E8%AE%A1.html -
https://mp.weixin.qq.com/s/Ojdh0-POjucg_De8OdQfwQ -
https://help.aliyun.com/document_detail/102777.html
小盒子的技术分享
有关编程、技术类知识的分享及生活感悟
87篇原创内容
公众号
|