IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 利用消息队列实现分布式事务 -> 正文阅读

[大数据]利用消息队列实现分布式事务

引言

听说公司上批员工入职培训有这么一个小demo

之前分布式这块接触比较少,于是尝试做了一下

这边技术选型springcloud,kafka,mysql,docker,quartz

思路分析

首先描述下我这边的场景,也是很常见的一个异步调用场景:

即将服务A假设为某电商用户模块,服务B假设为电商活动模块。

我这边呢,假设用户支付多少钱,就返多少钱的一个代金券

一致性解决

梳理一下流程,上面这一版有一个致命的问题!如下所示
事务开始

(1)给alili账户扣10元
(2)给alili账户发10元代金券封装为消息,发送给消息队列

事务结束
那么来了,如何保证第一步和第二步是在同一个事务里完成的。换句话说,第一步操作的是数据库,第二步操作的是一个消息队列,你如何保证这两步之间的一致性?
记住了,任何涉及到数据库和中间件之间的业务逻辑操作,都需要考虑二者之间的一致性。比如,你先操作了数据库,再操作缓存,数据库和缓存之间一致性如何解决?
改变思路,加一张事务表,如下图所示

?

?注意了,此时事务的内容为
事务开始
(1)给账户alili,扣10元
(2)给事件表插入一条记录

事务结束

此时是对同一数据库的两张表操作,因此可以用数据库的事务进行保证。
另外,起一个定时程序,定时扫描事务表,发现一个状态为'UNFINISHED'的事件,就进行封装为消息,发送到消息中间件,然后将状态改为'FINISHED'.

幂等性解决

注意了,这一版还存在一个幂等性问题!
仔细看,定时程序做了如下三个操作
(1)定时扫描事务表,发现一个状态为'UNFINISHED'的事件
(2)将事件信息,封装为消息,发送到消息中间件
(3)将事件状态改为'FINISHED'

假设在步骤(2)的时候,发送完消息体,还未执行步骤(3),定时程序阵亡了!然后重启定时程序,发现刚那个事务的状态依然为'UNFINISHED',因此重新发送。这样,就会出现重复消费问题。因此,幂等性也是需要保证的!

给代金券表添加事务id字段,如果一旦出现重复消费,则在事务里直接报出唯一约束冲突错误,从而保证了幂等性!

内容

搭建nacos

docker环境安装略。。

安装dokcer 镜像

docker pull nacos/nacos-server


启动nacos

docker run ?-d --env MODE=standalone -p 8848:8848 --name nacos nacos/nacos-server

访问页面

http://192.168.10.11:8848/nacos?


账号密码:nacos,nacos

搭建mysql

?略

我这边采用mysql5.7

讲一下nacos持久化

进入nacos容器,进入conf,拷贝sql文件,建库建表

同目录

vim?application.properties

修改数据库配置,退出容器exit,docker restart

搭建kafka

由于kafka需依赖zookeeper,我们需要容器内部通信

?我这边使用docker-compose,省去手动配置,

安装docker-compose,略

新建docker-conpose.yml文件

写入配置

version: '2'
services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-network
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    depends_on:
      - zookeeper
    links:
      - zookeeper
    networks:
      - kafka-network
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 5
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.11:9092  #宿主机监听端口
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
networks:
  kafka-network:
    driver: bridge

?在docker-compose.yml所在目录下执行:

? ? ? ? 启动

????????docker-compose up -d

????????关闭? ? ? ?

?????????docker-compose down

这边都是单机配置,集群自行研究,重点不在这里

用户服务搭建

? ? ? ? 用户表

CREATE TABLE `t_user` (
  `uid` bigint(20) NOT NULL AUTO_INCREMENT,
  `uname` varchar(100) NOT NULL COMMENT '用户名',
  `money` bigint(100) NOT NULL DEFAULT '0' COMMENT '余额',
  PRIMARY KEY (`uid`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

? ? ? ? 代金券事务表?

CREATE TABLE `t_user_coupon_tran` (
  `tid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '事务id',
  `uid` bigint(20) NOT NULL COMMENT '用户id',
  `coupon_money` bigint(20) NOT NULL COMMENT '优惠金额',
  `status` int(1) NOT NULL COMMENT '状态',
  PRIMARY KEY (`tid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;

? ? ? ? 模块结构

????????用户支付?

@Override
    @Transient
    public void post(Long uid, Long money) {
        User user = userDao.selectById(uid);
        if(Objects.isNull(user))
            throw new RuntimeException("用户不存在");
        if(user.getMoney()<money)
            throw new RuntimeException("余额不足");
        //用户扣钱
        user.setMoney(user.getMoney()-money);
        userDao.updateById(user);
        userCouponTranService.save(new UserCouponTran(user.getUid(),money, ToCouponStatus.USER_COUPON_NO_SEND.getCode()));
    }

? ? ? ? 代金券事务状态枚举类

????????

public enum ToCouponStatus {
    USER_COUPON_NO_SEND(0,"代金券消息未发送"),USER_COUPON_SEND(1,"代金券消息已发送队列");
    private Integer code;
    private String status;

    ToCouponStatus(Integer code, String status) {
        this.code = code;
        this.status = status;
    }

    public Integer getCode() {
        return code;
    }

    public String getStatus() {
        return status;
    }

}

? ? ? ? 代金券状态监控定时任务job

public class CouponJob extends QuartzJobBean {

    @Resource
    private UserCouponTranService userCouponTranService;

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {

        List<UserCouponTran> tranMessages = userCouponTranService.list(new QueryWrapper<UserCouponTran>()
                .eq("status", ToCouponStatus.USER_COUPON_NO_SEND.getCode()));

        tranMessages.forEach(tranMsg->{
            //通知发放代金券
            kafkaTemplate.send("coupon",tranMsg.getUid()+","+tranMsg.getCouponMoney()+","+tranMsg.getTid())
                    .addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                        @Override
                        public void onFailure(Throwable throwable) {
                            throw new RuntimeException("消息发送失败");
                        }

                        @Override
                        public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                            //更改事务记录状态
                            tranMsg.setStatus(ToCouponStatus.USER_COUPON_SEND.getCode());
                            userCouponTranService.updateById(tranMsg);
                        }
                    });
        });
    }
}

活动服务搭建

? ? ? ? 代金券表

CREATE TABLE `t_coupon` (
  `cid` bigint(20) NOT NULL AUTO_INCREMENT,
  `cmoney` bigint(20) NOT NULL COMMENT '优惠金额',
  `uid` bigint(20) NOT NULL COMMENT '用户id',
  `tid` bigint(20) NOT NULL COMMENT '事务id',
  PRIMARY KEY (`cid`),
  UNIQUE KEY `t_coupon_tid_unique_key` (`tid`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;

? ? ? ? 消费端监听

@Component
@Log
public class CouponListener {

    @Resource
    private CouponService couponService;

    @KafkaListener(topics = "coupon")
    public void grantCoupon(ConsumerRecord<?, ?> record){
        String[] split = record.value().toString().split(",");
        Long uid=Long.valueOf(split[0]);
        Long money=Long.valueOf(split[1]);
        Long tid=Long.valueOf(split[2]);
        try {
            couponService.save(new Coupon(money,uid,tid));
        }catch (Exception e){
            if(e instanceof DuplicateKeyException){
                log.warning("重复消费,tid="+tid);
            }
        }
    }
}

代码地址

https://gitee.com/alili0619/tran? ? ? ?

?欢迎交流!

参考文献:

https://www.cnblogs.com/rjzheng/p/10115798.html

https://www.cnblogs.com/rjzheng/p/8994962.html

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-15 23:47:47  更:2021-07-15 23:48:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/7 15:11:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码