|
目录
1、引入依赖
2、application.yml配置文件
3、AtomikosJtaPlatform
4、事务管理器JPAAtomikosTransactionConfig
5、主数据源配置
6、其他数据源配置
7、controller层
8、service层
9、实现类
10、通用返回工具类
11、testdb包实体
12、testdb2包实体
13、testdb包持久层dao
14、testdb2包持久层dao
15、测试多数据源事务回滚
实现方式:不同的package下面的接口函数自动注入 不同的数据源
实体、dao分包存放

?数据创建两个库:test库、test2库
test库中创建表如下:
CREATE TABLE `article` ?( ? `id` bigint(32) NOT NULL AUTO_INCREMENT, ? `author` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, ? `title` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, ? `content` varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, ? `create_time` datetime(0) NULL DEFAULT NULL, ? `createTime` datetime(6) NULL DEFAULT NULL, ? PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 21 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; ?
test2库中创建表如下:
CREATE TABLE `message` ?( ? `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '主键', ? `name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'name', ? `content` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'content', ? PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 7 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
1、引入依赖
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--jpa-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--JTA分布式事务-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
2、application.yml配置文件
注:这里是url,不是jdbc-url
server:
port: 8888
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
datasource:
primary:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: '0407'
driver-class-name: com.mysql.cj.jdbc.Driver
secondary:
url: jdbc:mysql://localhost:3306/test2?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: '0407'
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
database-platform: org.hibernate.dialect.MySQL5InnoDBDialect
hibernate:
ddl-auto: validate
database: mysql
show-sql: true
#多数据源分布式事务
jta:
atomikos:
datasource:
max-pool-size: 20
borrow-connection-timeout: 60
connectionfactory:
max-pool-size: 20
borrow-connection-timeout: 60
此类固定配置
package com.lyf.springboot.config;
import org.hibernate.engine.transaction.jta.platform.internal.AbstractJtaPlatform;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
public class AtomikosJtaPlatform extends AbstractJtaPlatform {
private static final long serialVersionUID = 1L;
static TransactionManager transactionManager;
static UserTransaction transaction;
@Override
protected TransactionManager locateTransactionManager() {
return transactionManager;
}
@Override
protected UserTransaction locateUserTransaction() {
return transaction;
}
}
4、事务管理器JPAAtomikosTransactionConfig
固定配置
package com.lyf.springboot.config;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.vendor.Database;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
@Configuration
@ComponentScan
@EnableTransactionManagement
public class JPAAtomikosTransactionConfig {
@Bean
public PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
//设置JPA特性
@Bean
public JpaVendorAdapter jpaVendorAdapter() {
HibernateJpaVendorAdapter hibernateJpaVendorAdapter = new HibernateJpaVendorAdapter();
//显示sql
hibernateJpaVendorAdapter.setShowSql(true);
//自动生成/更新表
hibernateJpaVendorAdapter.setGenerateDdl(true);
//设置数据库类型
hibernateJpaVendorAdapter.setDatabase(Database.MYSQL);
return hibernateJpaVendorAdapter;
}
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
AtomikosJtaPlatform.transactionManager = userTransactionManager;
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({"userTransaction", "atomikosTransactionManager"})
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransaction userTransaction = userTransaction();
AtomikosJtaPlatform.transaction = userTransaction;
TransactionManager atomikosTransactionManager = atomikosTransactionManager();
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
}
5、主数据源配置
注:修改标红的部分
package com.lyf.config;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.HashMap;
@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = "com.lyf.dao.testdb", //注意这里
entityManagerFactoryRef = "primaryEntityManager",
transactionManagerRef = "transactionManager")
public class JPAPrimaryConfig {
@Resource
private JpaVendorAdapter jpaVendorAdapter;
@Primary
@Bean(name = "primaryDataSourceProperties")
@ConfigurationProperties(prefix = "spring.datasource.primary") //注意这里
public DataSourceProperties primaryDataSourceProperties() {
return new DataSourceProperties();
}
@Primary
@Bean(name = "primaryDataSource", initMethod = "init", destroyMethod = "close")
@ConfigurationProperties(prefix = "spring.datasource.primary")
public DataSource primaryDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(primaryDataSourceProperties().getUrl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(primaryDataSourceProperties().getPassword());
mysqlXaDataSource.setUser(primaryDataSourceProperties().getUsername());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("primary");
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setMaxPoolSize(20);
return xaDataSource;
}
@Primary
@Bean(name = "primaryEntityManager")
@DependsOn("transactionManager")
public LocalContainerEntityManagerFactoryBean primaryEntityManager() throws Throwable {
HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
properties.put("javax.persistence.transactionType", "JTA");
LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
entityManager.setJtaDataSource(primaryDataSource());
entityManager.setJpaVendorAdapter(jpaVendorAdapter);
//这里要修改成主数据源的扫描包
entityManager.setPackagesToScan("com.lyf.entity.testdb");
entityManager.setPersistenceUnitName("primaryPersistenceUnit");
entityManager.setJpaPropertyMap(properties);
return entityManager;
}
}
6、其他数据源配置
package com.lyf.config;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.HashMap;
@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = "com.lyf.dao.testdb2", //注意这里
entityManagerFactoryRef = "secondaryEntityManager",
transactionManagerRef = "transactionManager")
public class JPASecondaryConfig {
@Resource
private JpaVendorAdapter jpaVendorAdapter;
@Bean(name = "secondaryDataSourceProperties")
@ConfigurationProperties(prefix = "spring.datasource.secondary") //注意这里
public DataSourceProperties masterDataSourceProperties() {
return new DataSourceProperties();
}
@Bean(name = "secondaryDataSource", initMethod = "init", destroyMethod = "close")
@ConfigurationProperties(prefix = "spring.datasource.secondary")
public DataSource masterDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(masterDataSourceProperties().getUrl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(masterDataSourceProperties().getPassword());
mysqlXaDataSource.setUser(masterDataSourceProperties().getUsername());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("secondary");
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setMaxPoolSize(20);
return xaDataSource;
}
@Bean(name = "secondaryEntityManager")
@DependsOn("transactionManager")
public LocalContainerEntityManagerFactoryBean masterEntityManager() throws Throwable {
HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
properties.put("javax.persistence.transactionType", "JTA");
LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
entityManager.setJtaDataSource(masterDataSource());
entityManager.setJpaVendorAdapter(jpaVendorAdapter);
//这里要修改成主数据源的扫描包
entityManager.setPackagesToScan("com.lyf.entity.testdb2");
entityManager.setPersistenceUnitName("secondaryPersistenceUnit");
entityManager.setJpaPropertyMap(properties);
return entityManager;
}
}
7、controller层
package com.lyf.controller;
import com.lyf.service.ArticleService;
import com.lyf.utils.AjaxResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@Slf4j
@Controller
@RequestMapping("/rest")
public class ArticleController {
@Resource
private ArticleService articleService;
//增加一篇Article ,使用POST方法
@PostMapping("/articles")
public @ResponseBody AjaxResponse saveArticle(){
articleService.saveArticle();
return AjaxResponse.success();
}
}
8、service层
package com.lyf.service;
public interface ArticleService {
void saveArticle();
}
9、实现类
package com.lyf.service.impl;
import com.lyf.dao.testdb.ArticleRepository;
import com.lyf.dao.testdb2.MessageRepository;
import com.lyf.entity.testdb.Article;
import com.lyf.entity.testdb2.Message;
import com.lyf.service.ArticleService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Service
public class ArtivleServiceImpl implements ArticleService {
@Resource
private ArticleRepository articleRepository;
@Resource
private MessageRepository messageRepository;
@Override
public void saveArticle() {
Article article = new Article();
article.setAuthor("刘耀福");
article.setTitle("学习JPA多数据源分布式事务");
article.setContent("学习中");
article.setCreateTime(new Date());
articleRepository.save(article);
Message Message = new Message();
Message.setName("科比");
Message.setContent("打篮球");
messageRepository.save(Message);
}
}
?
10、通用返回工具类
package com.lyf.utils;
import lombok.Data;
@Data
public class AjaxResponse {
private boolean isok; //请求是否处理成功
private int code; //请求响应状态码(200、400、500)
private String message; //请求结果描述信息
private Object data; //请求结果数据(通常用于查询操作)
private AjaxResponse(){}
//请求成功的响应,不带查询数据(用于删除、修改、新增接口)
public static AjaxResponse success(){
AjaxResponse ajaxResponse = new AjaxResponse();
ajaxResponse.setIsok(true);
ajaxResponse.setCode(200);
ajaxResponse.setMessage("请求响应成功!");
return ajaxResponse;
}
//请求成功的响应,带有查询数据(用于数据查询接口)
public static AjaxResponse success(Object obj){
AjaxResponse ajaxResponse = new AjaxResponse();
ajaxResponse.setIsok(true);
ajaxResponse.setCode(200);
ajaxResponse.setMessage("请求响应成功!");
ajaxResponse.setData(obj);
return ajaxResponse;
}
//请求成功的响应,带有查询数据(用于数据查询接口)
public static AjaxResponse success(Object obj, String message){
AjaxResponse ajaxResponse = new AjaxResponse();
ajaxResponse.setIsok(true);
ajaxResponse.setCode(200);
ajaxResponse.setMessage(message);
ajaxResponse.setData(obj);
return ajaxResponse;
}
}
11、testdb包实体
package com.lyf.entity.testdb;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.util.Date;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name="article")
public class Article {
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
private Long id;
@Column(nullable = false,length = 32)
private String author;
@Column(nullable = false, unique = true,length = 32)
private String title;
@Column(length = 512)
private String content;
private Date createTime;
}
12、testdb2包实体
package com.lyf.entity.testdb2;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
@Data
@Entity
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Table(name="message")
public class Message {
@Id
@GeneratedValue
private Long id;
@Column(nullable = false)
private String name;
@Column(nullable = false)
private String content;
}
13、testdb包持久层dao
package com.lyf.dao.testdb;
import com.lyf.entity.testdb.Article;
import org.springframework.data.jpa.repository.JpaRepository;
public interface ArticleRepository extends JpaRepository<Article, Long> {
}
14、testdb2包持久层dao
package com.lyf.dao.testdb2;
import com.lyf.entity.testdb2.Message;
import org.springframework.data.jpa.repository.JpaRepository;
public interface MessageRepository extends JpaRepository<Message, Long> {
}
15、测试多数据源事务回滚
????????
?
postman测试报错,我们看看控制台
?控制台使除0异常

?查看数据库事务回滚了没,回滚了

?
?springboot+spring data jpa 分布式事务回滚完结
?
|