理论知识,在
《分布式事务解决方案》已经讲解的十分通透了,本文主要讲解如何通过Seata实现分布式事务控制
我们以订单微服务模块进行演示,然后由订单微服务调用商品微服务扣除库存
实现流程如下:
1.搭建服务
我们创建两个微服务模块,服务模块:
同时将两个服务集成fegin,mybatis,rabbitMq并且保证可以注册到nacos,具体如何创建和集成大家可以参考:
2.建表
2.1 order服务下的表
DROP TABLE IF EXISTS `tb_order`;
CREATE TABLE `tb_order` (
`order_id` int NOT NULL AUTO_INCREMENT,
`user_id` int DEFAULT NULL,
`product_id` int DEFAULT NULL COMMENT '产品ID',
`order_time` datetime DEFAULT NULL COMMENT '下单时间 ',
`transfer_money` double DEFAULT NULL COMMENT '交易金额',
`product_num` int DEFAULT NULL COMMENT '购买商品数量',
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.2 stock服务下的表
DROP TABLE IF EXISTS `tb_product`;
CREATE TABLE `tb_product` (
`product_id` int NOT NULL AUTO_INCREMENT,
`product_name` varchar(255) DEFAULT NULL COMMENT '产品名称',
`price` double(10,2) DEFAULT NULL COMMENT '价格',
`rest_num` int DEFAULT NULL COMMENT '产品剩余数量',
PRIMARY KEY (`product_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO `tb_product` VALUES ('1', 'Nike Air', '500.00', '15');
3.实现服务
3.1 新建下单服务(OrderServer模块)
@RestController
@Slf4j
public class TbOrderController {
@Autowired
ITbOrderService tbOrderService;
@PostMapping("/order/prod/createOrder")
public TbOrder createOrder(@RequestBody TbOrder tbOrder) {
log.info("接收到{}号商品的下单请求", tbOrder.getProductId());
return tbOrderService.createOrder(tbOrder.getProductId(), tbOrder.getProductNum());
}
}
@Service
@Slf4j
public class TbOrderServiceImpl extends ServiceImpl<TbOrderMapper, TbOrder> implements ITbOrderService {
@Autowired
ProductService productService;
@Autowired
Sender_Direct sender_direct;
@Override
public TbOrder createOrder(Integer productId, Integer productNum) {
Map<String, Object> productInfo = productService.getProductById(productId);
log.info("查询到{}号商品,商品名称为:{},剩余数量:{}", productId, productInfo.get("productName"), productInfo.get("restNum"));
TbOrder tbOrder = new TbOrder();
tbOrder.setUserId(1);
tbOrder.setProductId(productId);
tbOrder.setOrderTime(new Date());
tbOrder.setTransferMoney(Double.valueOf(productInfo.get("transferMoney").toString()));
baseMapper.insert(tbOrder);
log.info("创建订单成功 ,订单信息为:{}", JSON.toJSONString(tbOrder));
productService.reduceInventory(productId, productNum);
return null;
}
对应的fegin如下:
@FeignClient(value = "seata-stock")
public interface ProductService {
@RequestMapping(value = "getProductById", method = RequestMethod.POST)
public Map<String, Object> getProductById(@RequestParam(value = "productId") Integer productId);
@RequestMapping(value = "reduceInventory", method = RequestMethod.POST)
public Integer reduceInventory(@RequestParam(value = "productId") Integer productId,
@RequestParam(value = "num") Integer num);
}
3.2 添加商品信息获取以及库存消减服务(stockServer模块)
@RestController
public class TbProductController {
@Autowired
ITbProductService productService;
@RequestMapping(value = "getProductById", method = RequestMethod.POST)
public TbProduct getProductById(@RequestParam(value = "productId") Integer productId) {
return productService.getById(productId);
}
@RequestMapping(value = "reduceInventory", method = RequestMethod.POST)
public Integer reduceInventory(@RequestParam(value = "productId") Integer productId,
@RequestParam(value = "num") Integer num) {
return productService.reduceInventory(productId, num);
}
}
@Service
public class TbProductServiceImpl extends ServiceImpl<TbProductMapper, TbProduct> implements ITbProductService {
@Override
public Integer reduceInventory(Integer productId, Integer num) {
TbProduct tbProduct = baseMapper.selectById(productId);
if (tbProduct.getRestNum() < num) {
throw new RuntimeException("库存不足");
}
int i = 1 / 0;
tbProduct.setRestNum(tbProduct.getRestNum() - num);
return baseMapper.update(tbProduct, null);
}
}
4.启动Seata
4.1 安装 Seata
5.使用Seata实现事务控制
5.1 初始化数据表
在我们的数据库中加入一张undo_log表,这是Seata记录事务日志要用到的表
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) COLLATE utf8_bin NOT NULL,
`context` varchar(255) COLLATE utf8_bin NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
`log_modified` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
`ext` varchar(100) COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`branch_id`,`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
5.2 添加配置
在需要进行分布式控制的微服务中进行下面几项配置:
5.2.1 添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2021.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2021.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
5.2.2 DataSourceProxyConfig
Seata 是通过代理数据源实现事务分支的,所以需要配置 io.seata.rm.datasource.DataSourceProxy 的Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
5.2.3 registry.conf
在resources下添加Seata的配置文件 registry.conf
registry {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
}
注意:serverAddr按照你自己的naocs地址进行修改
5.2.4 修改application.properties文件
nacos.ip=192.168.28.133
server.port=8082
spring.application.name=seata-order
spring.cloud.nacos.discovery.server-addr=${nacos.ip}:8848
spring.cloud.nacos.discovery.namespace=public
spring.cloud.nacos.discovery.group=SEATA_GROUP
spring.cloud.nacos.discovery.username=nacos
spring.cloud.nacos.discovery.password=nacos
spring.main.allow-circular-references=true
seata.enabled=true
seata.enable-auto-data-source-proxy=false
seata.tx-service-group=my_test_tx_group
seata.config.type=nacos
seata.config.nacos.server-addr=${nacos.ip}:8848
seata.config.nacos.namespace=public
seata.config.nacos.group=SEATA_GROUP
seata.config.nacos.username=nacos
seata.config.nacos.password=nacos
seata.registry.type=nacos
seata.registry.nacos.server-addr=${nacos.ip}:8848
seata.registry.nacos.namespace=public
seata.registry.nacos.username=nacos
seata.registry.nacos.password=nacos
seata.registry.nacos.application=seata-server
seata.registry.nacos.group=SEATA_GROUP
seata.service.vgroup-mapping.my_test_tx_group=default
seata.service.disable-global-transaction=false
seata.client.rm.report-success-enable=false
如下所示:
5.3 在服务上开启全局事务
@GlobalTransactional
注意:我们需要在所有需要控制的服务上都需要开启此注解,即:
6.测试
首先我们先不加入注解@GlobalTransactional,进行测试:
打开我们的订单表,可以看到已经生成了一个订单信息:
同时我们再看看库存数据,会发现未发生任何变化
我来解释一下原因: 每一个服务下都会遵循自己独立的ACID原则,一旦发生异常数据库就会回滚,即保证数据的一致性,而我们的分布式事务就是要解决微服务之间数据一致性的问题
现在我们在两个服务上面加入注解@GlobalTransactional再次进行测试: 会发现order表中未添加任何的数据,同时库存也没有变化,完全符合我们所需要的功能。
|