引言
听说公司上批员工入职培训有这么一个小demo
之前分布式这块接触比较少,于是尝试做了一下
这边技术选型springcloud,kafka,mysql,docker,quartz
思路分析
首先描述下我这边的场景,也是很常见的一个异步调用场景:
即将服务A假设为某电商用户模块,服务B假设为电商活动模块。
我这边呢,假设用户支付多少钱,就返多少钱的一个代金券

一致性解决
梳理一下流程,上面这一版有一个致命的问题!如下所示 事务开始 (1)给alili账户扣10元 (2)给alili账户发10元代金券封装为消息,发送给消息队列 事务结束 那么来了,如何保证第一步和第二步是在同一个事务里完成的。换句话说,第一步操作的是数据库,第二步操作的是一个消息队列,你如何保证这两步之间的一致性? 记住了,任何涉及到数据库和中间件之间的业务逻辑操作,都需要考虑二者之间的一致性。比如,你先操作了数据库,再操作缓存,数据库和缓存之间一致性如何解决? 改变思路,加一张事务表,如下图所示
?
?注意了,此时事务的内容为 事务开始 (1)给账户alili,扣10元 (2)给事件表插入一条记录 事务结束 此时是对同一数据库的两张表操作,因此可以用数据库的事务进行保证。 另外,起一个定时程序,定时扫描事务表,发现一个状态为'UNFINISHED'的事件,就进行封装为消息,发送到消息中间件,然后将状态改为'FINISHED'.
幂等性解决
注意了,这一版还存在一个幂等性问题! 仔细看,定时程序做了如下三个操作 (1)定时扫描事务表,发现一个状态为'UNFINISHED'的事件 (2)将事件信息,封装为消息,发送到消息中间件 (3)将事件状态改为'FINISHED'
假设在步骤(2)的时候,发送完消息体,还未执行步骤(3),定时程序阵亡了!然后重启定时程序,发现刚那个事务的状态依然为'UNFINISHED',因此重新发送。这样,就会出现重复消费问题。因此,幂等性也是需要保证的!

给代金券表添加事务id字段,如果一旦出现重复消费,则在事务里直接报出唯一约束冲突错误,从而保证了幂等性!

内容
搭建nacos
docker环境安装略。。
安装dokcer 镜像
docker pull nacos/nacos-server
启动nacos
docker run ?-d --env MODE=standalone -p 8848:8848 --name nacos nacos/nacos-server

访问页面
http://192.168.10.11:8848/nacos?
 账号密码:nacos,nacos
搭建mysql
?略
我这边采用mysql5.7
讲一下nacos持久化
进入nacos容器,进入conf,拷贝sql文件,建库建表

同目录
vim?application.properties

修改数据库配置,退出容器exit,docker restart
搭建kafka
由于kafka需依赖zookeeper,我们需要容器内部通信
?我这边使用docker-compose,省去手动配置,
安装docker-compose,略
新建docker-conpose.yml文件
写入配置
version: '2'
services:
zookeeper:
image: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka-network
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on:
- zookeeper
links:
- zookeeper
networks:
- kafka-network
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 5
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.11:9092 #宿主机监听端口
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
kafka-network:
driver: bridge
?在docker-compose.yml所在目录下执行:
? ? ? ? 启动
????????docker-compose up -d
????????关闭? ? ? ?
?????????docker-compose down

这边都是单机配置,集群自行研究,重点不在这里
用户服务搭建
? ? ? ? 用户表
CREATE TABLE `t_user` (
`uid` bigint(20) NOT NULL AUTO_INCREMENT,
`uname` varchar(100) NOT NULL COMMENT '用户名',
`money` bigint(100) NOT NULL DEFAULT '0' COMMENT '余额',
PRIMARY KEY (`uid`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
? ? ? ? 代金券事务表?
CREATE TABLE `t_user_coupon_tran` (
`tid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '事务id',
`uid` bigint(20) NOT NULL COMMENT '用户id',
`coupon_money` bigint(20) NOT NULL COMMENT '优惠金额',
`status` int(1) NOT NULL COMMENT '状态',
PRIMARY KEY (`tid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
? ? ? ? 模块结构

????????用户支付?
@Override
@Transient
public void post(Long uid, Long money) {
User user = userDao.selectById(uid);
if(Objects.isNull(user))
throw new RuntimeException("用户不存在");
if(user.getMoney()<money)
throw new RuntimeException("余额不足");
//用户扣钱
user.setMoney(user.getMoney()-money);
userDao.updateById(user);
userCouponTranService.save(new UserCouponTran(user.getUid(),money, ToCouponStatus.USER_COUPON_NO_SEND.getCode()));
}
? ? ? ? 代金券事务状态枚举类
????????
public enum ToCouponStatus {
USER_COUPON_NO_SEND(0,"代金券消息未发送"),USER_COUPON_SEND(1,"代金券消息已发送队列");
private Integer code;
private String status;
ToCouponStatus(Integer code, String status) {
this.code = code;
this.status = status;
}
public Integer getCode() {
return code;
}
public String getStatus() {
return status;
}
}
? ? ? ? 代金券状态监控定时任务job
public class CouponJob extends QuartzJobBean {
@Resource
private UserCouponTranService userCouponTranService;
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
List<UserCouponTran> tranMessages = userCouponTranService.list(new QueryWrapper<UserCouponTran>()
.eq("status", ToCouponStatus.USER_COUPON_NO_SEND.getCode()));
tranMessages.forEach(tranMsg->{
//通知发放代金券
kafkaTemplate.send("coupon",tranMsg.getUid()+","+tranMsg.getCouponMoney()+","+tranMsg.getTid())
.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
throw new RuntimeException("消息发送失败");
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//更改事务记录状态
tranMsg.setStatus(ToCouponStatus.USER_COUPON_SEND.getCode());
userCouponTranService.updateById(tranMsg);
}
});
});
}
}
活动服务搭建
? ? ? ? 代金券表
CREATE TABLE `t_coupon` (
`cid` bigint(20) NOT NULL AUTO_INCREMENT,
`cmoney` bigint(20) NOT NULL COMMENT '优惠金额',
`uid` bigint(20) NOT NULL COMMENT '用户id',
`tid` bigint(20) NOT NULL COMMENT '事务id',
PRIMARY KEY (`cid`),
UNIQUE KEY `t_coupon_tid_unique_key` (`tid`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;
? ? ? ? 消费端监听
@Component
@Log
public class CouponListener {
@Resource
private CouponService couponService;
@KafkaListener(topics = "coupon")
public void grantCoupon(ConsumerRecord<?, ?> record){
String[] split = record.value().toString().split(",");
Long uid=Long.valueOf(split[0]);
Long money=Long.valueOf(split[1]);
Long tid=Long.valueOf(split[2]);
try {
couponService.save(new Coupon(money,uid,tid));
}catch (Exception e){
if(e instanceof DuplicateKeyException){
log.warning("重复消费,tid="+tid);
}
}
}
}
代码地址
https://gitee.com/alili0619/tran? ? ? ?
?欢迎交流!
参考文献:
https://www.cnblogs.com/rjzheng/p/10115798.html
https://www.cnblogs.com/rjzheng/p/8994962.html
|