取号这种需求在开发中有时候很常见,例如:银行办理业务时候就需求取号。那如何去实现一种高效、可靠的取号呢?对于取号这种需求,我们首先需要保证取号不能重复,也就是说保证分布式系统取号也不能重复。有可能我们会首先想到基于Redis的incr命令可以高效、快速实现,但是缺点是当Redis宕机后,有可能造成数据丢失,导致取号重复。另一种方案就是基于MySQL实现,也就是今天的主题 MySQLMaxValueIncrementer 。
源码分析
创建 MySQLMaxValueIncrementer 对象时,需要指定表名 incrementerName 和 需要递增的列名 columnName 。每次调用 nextIntValue() 就对列递增1。该功能就基于MySQL的 last_insert_id() 函数来实现。为什么 last_insert_id() 函数就能保证并发问题呢?因为 last_insert_id() 和 Connection 对象有关,也就是相当于每个 connection 对象都自己的 last_insert_id() 函数,并且 connection 对象是访问不到其他 connection 对象的 lsat_insert_id() 函数。这样就巧妙保证了并发问题。核心代码如下所示:
protected synchronized long getNextKey() throws DataAccessException {
if (this.maxId == this.nextId) {
Connection con = null;
Statement stmt = null;
boolean mustRestoreAutoCommit = false;
try {
if (this.useNewConnection) {
con = this.getDataSource().getConnection();
if (con.getAutoCommit()) {
mustRestoreAutoCommit = true;
con.setAutoCommit(false);
}
} else {
con = DataSourceUtils.getConnection(this.getDataSource());
}
stmt = con.createStatement();
if (!this.useNewConnection) {
DataSourceUtils.applyTransactionTimeout(stmt, this.getDataSource());
}
String columnName = this.getColumnName();
try {
stmt.executeUpdate("update " + this.getIncrementerName() + " set " + columnName + " = last_insert_id(" + columnName + " + " + this.getCacheSize() + ") limit 1");
} catch (SQLException var20) {
throw new DataAccessResourceFailureException("Could not increment " + columnName + " for " + this.getIncrementerName() + " sequence table", var20);
}
ResultSet rs = stmt.executeQuery("select last_insert_id()");
try {
if (!rs.next()) {
throw new DataAccessResourceFailureException("last_insert_id() failed after executing an update");
}
this.maxId = rs.getLong(1);
} finally {
JdbcUtils.closeResultSet(rs);
}
this.nextId = this.maxId - (long)this.getCacheSize() + 1L;
} catch (SQLException var23) {
throw new DataAccessResourceFailureException("Could not obtain last_insert_id()", var23);
} finally {
JdbcUtils.closeStatement(stmt);
if (con != null) {
if (this.useNewConnection) {
try {
con.commit();
if (mustRestoreAutoCommit) {
con.setAutoCommit(true);
}
} catch (SQLException var21) {
throw new DataAccessResourceFailureException("Unable to commit new sequence value changes for " + this.getIncrementerName());
}
JdbcUtils.closeConnection(con);
} else {
DataSourceUtils.releaseConnection(con, this.getDataSource());
}
}
}
} else {
++this.nextId;
}
return this.nextId;
}
扩展
从上面的源码我们了解到,要使用 MySQLMaxValueIncrementer ,我们就必须有一张表,并且需要预先插入一条数据,才能使用该功能。有时候我们的取号在很多地方都需要使用,按照原来的方式要在每张表里都得有一个字段来记录当前序号值。这样维护可能变得有点困难。这里扩展就是把取号单独抽取一张表里,由业务KEY来关联当前序号,这样会相比前一种方式变得更加好维护。
类图设计
取号的实现不一定要用MySQL来实现,也可以基于Redis来实现,所以实现的方式有多种方式,考虑到可扩展,采用策略者设计模式来设计。
接口设计
public interface SequenceGenerator {
int nextIntValue(Sequence sequence);
long nextLongValue(Sequence sequence);
}
public interface Sequence {
String getKey();
}
实现类
StableSequenceGenerator实现了 SequenceGenerator 和 InitializingBean 。该类会在初始化时去创建一张 Sequence 表,用来维护各个业务的序号。使用 insert into ... on duplicate key 的方式,不需要手动插入初始化数据。
public class StableSequenceGenerator implements SequenceGenerator, InitializingBean {
private final DataSource dataSource;
private static final String TABLE_NAME = "sequence";
private static final String COLUMN_NAME = "current_seq";
private static final String TABLE_SCHEME = "create table sequence(" +
"current_seq int not null default '1' comment '当前序号', " +
"`type` varchar(255) not null comment '业务类型'," +
"`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'," +
"`updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间'," +
"unique key `uniq_type` (`type`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;";
public StableSequenceGenerator(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public void afterPropertiesSet() throws Exception {
this.initTable();
}
private void initTable() {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = this.dataSource.getConnection();
DatabaseMetaData metaData = conn.getMetaData();
rs = metaData.getTables(conn.getCatalog(), null, TABLE_NAME, null);
stmt = conn.createStatement();
if (!rs.next()) {
stmt.execute(TABLE_SCHEME);
}
} catch (SQLException e) {
throw new DataAccessResourceFailureException("Unable to create table " + TABLE_NAME);
} finally {
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(stmt);
JdbcUtils.closeConnection(conn);
}
}
@Override
public int nextIntValue(Sequence sequence) {
ExtendMySQLMaxValueIncrementer incrementer = new ExtendMySQLMaxValueIncrementer(dataSource, sequence);
incrementer.setIncrementerName(TABLE_NAME);
incrementer.setColumnName(COLUMN_NAME);
return incrementer.nextIntValue();
}
@Override
public long nextLongValue(Sequence sequence) {
ExtendMySQLMaxValueIncrementer incrementer = new ExtendMySQLMaxValueIncrementer(dataSource, sequence);
incrementer.setIncrementerName(TABLE_NAME);
incrementer.setColumnName(COLUMN_NAME);
return incrementer.nextLongValue();
}
}
MySQLMaxValueIncrementer的扩展实现
public class ExtendMySQLMaxValueIncrementer extends AbstractColumnMaxValueIncrementer {
private static final String VALUE_SQL = "select last_insert_id()";
private long nextId = 0L;
private long maxId = 0L;
private boolean useNewConnection = true;
private Sequence sequence;
public ExtendMySQLMaxValueIncrementer() {
}
public ExtendMySQLMaxValueIncrementer(DataSource dataSource, Sequence sequence) {
super(dataSource, "sequence", "current_seq");
this.sequence = sequence;
}
public ExtendMySQLMaxValueIncrementer(DataSource dataSource, String incrementerName, String columnName) {
super(dataSource, incrementerName, columnName);
}
public void setUseNewConnection(boolean useNewConnection) {
this.useNewConnection = useNewConnection;
}
@Override
protected synchronized long getNextKey() throws DataAccessException {
if (this.maxId == this.nextId) {
Connection con = null;
Statement stmt = null;
boolean mustRestoreAutoCommit = false;
try {
if (this.useNewConnection) {
con = this.getDataSource().getConnection();
if (con.getAutoCommit()) {
mustRestoreAutoCommit = true;
con.setAutoCommit(false);
}
} else {
con = DataSourceUtils.getConnection(this.getDataSource());
}
stmt = con.createStatement();
if (!this.useNewConnection) {
DataSourceUtils.applyTransactionTimeout(stmt, this.getDataSource());
}
String columnName = this.getColumnName();
try {
stmt.executeUpdate(this.getSql());
} catch (SQLException var20) {
throw new DataAccessResourceFailureException("Could not increment " + columnName + " for " + this.getIncrementerName() + " sequence table", var20);
}
ResultSet rs = stmt.executeQuery("select last_insert_id()");
try {
if (!rs.next()) {
throw new DataAccessResourceFailureException("last_insert_id() failed after executing an update");
}
this.maxId = rs.getLong(1);
this.maxId = this.maxId == 0L ? 1 : this.maxId;
} finally {
JdbcUtils.closeResultSet(rs);
}
this.nextId = this.maxId - (long)this.getCacheSize() + 1L;
} catch (SQLException var23) {
throw new DataAccessResourceFailureException("Could not obtain last_insert_id()", var23);
} finally {
JdbcUtils.closeStatement(stmt);
if (con != null) {
if (this.useNewConnection) {
try {
con.commit();
if (mustRestoreAutoCommit) {
con.setAutoCommit(true);
}
} catch (SQLException var21) {
throw new DataAccessResourceFailureException("Unable to commit new sequence value changes for " + this.getIncrementerName());
}
JdbcUtils.closeConnection(con);
} else {
DataSourceUtils.releaseConnection(con, this.getDataSource());
}
}
}
} else {
++this.nextId;
}
return this.nextId;
}
public String getSql() {
return "insert into " + this.getIncrementerName() + " (type)" + " values('" + this.sequence.getKey() +"')" +
" ON DUPLICATE KEY UPDATE " + this.getColumnName() + " = last_insert_id(" + this.getColumnName() + " + " + this.getCacheSize() + ")";
}
}
|