小结
Druid 连接池是阿里巴巴开源的数据库连接池项目。Druid 连接池为监控而生,内置强大的监控功能。
1. 初始化数据库连接池
1.1 使用 Druid
Spring Boot 项目中配置数据库连接池为 Druid 并启动项目,Spring 上下文初始化完成后会初始化 DruidDataSource, 如下图:
核心类:
DruidDataSourceAutoConfigure 负责初始化 DruidDataSource com.alibaba.druid.pool.DruidDataSource 负责创建高效可管理的数据库连接池
1.2 druid-spring-boot-starter 源码
DruidDataSourceAutoConfigure 是 druid-spring-boot-starter 下的一个配置类。此 starter 中 spring.factories 文件中定义了自动加载 DruidDataSourceAutoConfigure 配置类:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
DruidDataSourceAutoConfigure 配置类,源码如下,这里初始化了 Bean : DruidDataSourceWrapper(继承了 DruidDataSource类) ,初始化了DruidDataSourc#init() 方法。
@Configuration
@ConditionalOnClass(DruidDataSource.class)
@AutoConfigureBefore(DataSourceAutoConfiguration.class)
@EnableConfigurationProperties({DruidStatProperties.class, DataSourceProperties.class})
@Import({DruidSpringAopConfiguration.class,
DruidStatViewServletConfiguration.class,
DruidWebStatFilterConfiguration.class,
DruidFilterConfiguration.class})
public class DruidDataSourceAutoConfigure {
private static final Logger LOGGER = LoggerFactory.getLogger(DruidDataSourceAutoConfigure.class);
@Bean(initMethod = "init")
@ConditionalOnMissingBean
public DataSource dataSource() {
LOGGER.info("Init DruidDataSource");
return new DruidDataSourceWrapper();
}
}
所以,下面重点看DruidDataSource#init() 方法的源码:
1.3 DruidDataSource#init() 方法
DruidDataSource#init() 方法里主要做的事情:
-
初始化 jdbcUrl -
初始化过滤器 -
根据配置的 jdbcUrl 判断数据库的类型 -
通过 SPI ServiceLoader ,让类加载器加载 过滤器 -
加载数据库驱动 resolveDriver(); -
初始化检查,检查数据库类型和驱动是否支持、初始化: initExceptionSorter() -
初始化存放连接的数组 connections = new DruidConnectionHolder[maxActive]; 并初始化 initialSize 个连接 CreateConnectionThread,动态在connections中添加连接 -
创建这几个线程:日志分析线程LogStatsThread、连接创建线程CreateConnectionThread、连接销毁线程DestroyConnectionThread -
向线程池提交创建连接的任务:this.createSchedulerFuture = createScheduler.submit(task);
1.4 其中对于并发以及安全的考虑有:
-
整个初始化过程使用可重入锁来保证互斥性:final ReentrantLock lock = this.lock; -
使用 volatile 变量来保证只会初始化一次 -
DruidDriver 使用静态常量来保证单例 -
使用 AtomicLongFieldUpdater 类来原子更新 DruidAbstractDataSource 中 volatile long 修饰的变量 -
使用 CountDownLatch 来确保 「连接池创建线程」、「连接池销毁线程」 一定会创建 -
使用线程池来优化数据库连接的创建
2. 连接的相关处理
2.1 Druid 如何初始化创建数据库连接
在执行 DruidDataSource#init() 方法时,会初始化存放连接的数组conections,然后启动两个线程
(1)CreateConnectionThread,动态在 connections 数组中添加连接
(2)DestroyConnectionThread,动态移除activeConnections很久不用的连接
根据initialSize 参数创建初始数量的连接:
while (poolingCount < initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
2.2 CreateConnectionThread 线程动态创建连接
CreateConnectionThread 是一个守护线程,当有线程等待时,便会通过 JDBC 创建不超过 maxActive 数量的数据库连接,然后将创建的连接放入 connections 数组。
public class CreateConnectionThread extends Thread {
public CreateConnectionThread(String name){
super(name);
this.setDaemon(true);
}
public void run() {
for (;;) {
...
...
connection = createPhysicalConnection();
...
...
boolean result = put(connection);
}
}
}
2.3 DestroyConnectionThread 线程动态缩容(释放连接)
DestroyConnectionThread 也是一个守护线程
public class DestroyConnectionThread extends Thread {
public DestroyConnectionThread(String name){
super(name);
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
for (;;) {
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000);
}
if (Thread.interrupted()) {
break;
}
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
}
DestroyTask :
public class DestroyTask implements Runnable {
public DestroyTask() {}
@Override
public void run() {
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
2.4 从 Druid 连接池获取数据库连接
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis);
}
}
getConnectionDirect():
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) {
DruidPooledConnection poolableConnection;
try {
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
...
}
...
}
...
}
2.5 回收连接
连接池中每一个连接,用完之后必须调用close()。在close中通过 recycle() 方法把连接从 activeConections 集合中重新移动到connections[],达到重复利用连接的目的。
private volatile DruidConnectionHolder[] connections;
protected final Map<DruidPooledConnection, Object> activeConnections
2.6 异常处理
分析一下 Druid 中操作连接时发生异常情况的一些处理。
2.6.1 Druid 中自定义了哪些异常?
pool 包下比较关键的一些异常有:
- DataSourceClosedException
- DataSourceDisableException
- DataSourceNotAvailableException
- GetConnectionTimeoutException
当获取连接时,如果数据源是处于关闭状态,那么就会抛DataSourceClosedException 异常,当数据源不可用时,就会抛 DataSourceDisableException 异常
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
if (closed) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
}
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
...
}
从 connections[]数组中获取连接 时,连续失败,便会抛出 DataSourceNotAvailableException 异常
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
...
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
当获取连接超过指定等待时间,便会抛出 GetConnectionTimeoutException 异常:
if (createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
2.6.2 ExceptionSorter
ExceptionSorter 的作用是:在数据库服务器重启、网络抖动、连接被服务器关闭等异常情况下,连接发生了不可恢复异常,将连接从连接池中移除,保证连接池在异常发生时情况下正常工作。ExceptionSorter是连接池稳定的关键特性,没有ExceptionSorter 的连接池,不能认为是有稳定性保障的连接池。
ExceptionSorter 接口
public interface ExceptionSorter {
boolean isExceptionFatal(SQLException e);
void configFromProperties(Properties properties);
}
很多异常分拣类都是实现了 ExceptionSorter 这个接口,如图:
ExceptionSorter#isExceptionFatal() 方法用来判断异常是否致命,如果致命那么需要进行处理,拿 DB2ExceptionSorter 举例,如果是 isExceptionFatal() 方法判断异常是致命的,那么便会进行处理:
public void handleConnectionException(DruidPooledConnection pooledConnection, Throwable t, String sql) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.getConnectionHolder();
if (holder == null) {
return;
}
errorCountUpdater.incrementAndGet(this);
lastError = t;
lastErrorTimeMillis = System.currentTimeMillis();
if (t instanceof SQLException) {
SQLException sqlEx = (SQLException) t;
ConnectionEvent event = new ConnectionEvent(pooledConnection, sqlEx);
for (ConnectionEventListener eventListener : holder.getConnectionEventListeners()) {
eventListener.connectionErrorOccurred(event);
}
if (exceptionSorter != null && exceptionSorter.isExceptionFatal(sqlEx)) {
handleFatalError(pooledConnection, sqlEx, sql);
}
throw sqlEx;
} else {
throw new SQLException("Error", t);
}
}
2.7 总结
连接池设计的基本思路: (1)初始化建立连接池对象(服务启动) (2)按照事先指定的参数创建初始数量的连接(即:空闲连接数)。 (3)对于一个访问请求,直接从连接池中得到一个连接。如果连接池对象中没有空闲的连接,且连接数没有达到最大(即:最大活跃连接数),创建一个新的连接;如果达到最大,则设定一定的超时时间,来获取连接。 (4)拿到连接访问服务。 (5)访问服务完成后释放连接(此时的释放连接,并非真正关闭,而是将其放入空闲队列中。如实际空闲连接数大于初始空闲连接数则释放连接)。
(6)释放连接池对象(服务停止、维护期间,释放连接池对象,并释放所有连接)。
3. Druid 中过滤器的装载和使用
Druid提供了基于Filter-Chain模式的扩展,使得在JDBC层做扩展非常容易。Druid内置提供了一些常用的Filter,StatFilter用于提供监控数据,Log系列Filter用于提供输出Connection、Statement、ResultSet相关的日志,WallFilter用于防御SQL注入攻击。
StatFilter——采集并提供监控数据
Druid连接池的监控信息主要是通过StatFilter 采集的,采集的信息非常全面,包括SQL执行、并发、慢查、执行时间区间分布等。具体配置可以看这个 https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_StatFilter
WallFilter —— 基于SQL语义分析来实现防御SQL注入攻击
SQL注入攻击是黑客对数据库进行攻击的常用手段,Druid连接池内置了WallFilter 提供防SQL注入功能,在不影响性能的同时防御SQL注入攻击。
LogFilter ——输出数据库操作的日志
LogFilter 可以输出连接申请/释放,事务提交回滚,Statement的Create/Prepare/Execute/Close,ResultSet的Open/Next/Close,通过LogFilter可以详细诊断一个系统的Jdbc行为。
LogFilter有Log4j、Log4j2、Slf4j、CommsLog等实现,具体配置看这里 https://github.com/alibaba/druid/wiki/%E9%85%8D%E7%BD%AE_LogFilter
3.1 过滤器的装载
Druid 中过滤器的管理工作,主要是在 FilterManager 类中实现。其中 FilterManager#loadFilter() 负责加载 filter 并放入 filter集合中。
public static void loadFilter(List<Filter> filters, String filterName) throws SQLException {
if (filterName.length() == 0) {
return;
}
String filterClassNames = getFilter(filterName);
if (filterClassNames != null) {
for (String filterClassName : filterClassNames.split(",")) {
if (existsFilter(filters, filterClassName)) {
continue;
}
Class<?> filterClass = Utils.loadClass(filterClassName);
if (filterClass == null) {
LOG.error("load filter error, filter not found : " + filterClassName);
continue;
}
Filter filter;
try {
filter = (Filter) filterClass.newInstance();
} catch (ClassCastException e) {
LOG.error("load filter error.", e);
continue;
} catch (InstantiationException e) {
throw new SQLException("load managed jdbc driver event listener error. " + filterName, e);
} catch (IllegalAccessException e) {
throw new SQLException("load managed jdbc driver event listener error. " + filterName, e);
} catch (RuntimeException e) {
throw new SQLException("load managed jdbc driver event listener error. " + filterName, e);
}
filters.add(filter);
}
return;
}
if (existsFilter(filters, filterName)) {
return;
}
Class<?> filterClass = Utils.loadClass(filterName);
if (filterClass == null) {
LOG.error("load filter error, filter not found : " + filterName);
return;
}
try {
Filter filter = (Filter) filterClass.newInstance();
filters.add(filter);
} catch (Exception e) {
throw new SQLException("load managed jdbc driver event listener error. " + filterName, e);
}
}
其中 getFilter() 方法 会从初始化好的 ConcurrentHashMap——aliasMap 中获取指定的 filter,而 aliasMap 则根据 druid-filter.properties 文件中配的 filter 来初始化。
3.2 过滤器装载的时机
1. 初始化时——initFromSPIServiceLoader();
private void initFromSPIServiceLoader() {
if (loadSpifilterSkip) {
return;
}
if (autoFilters == null) {
List<Filter> filters = new ArrayList<Filter>();
ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
for (Filter filter : autoFilterLoader) {
AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
if (autoLoad != null && autoLoad.value()) {
filters.add(filter);
}
}
autoFilters = filters;
}
for (Filter filter : autoFilters) {
if (LOG.isInfoEnabled()) {
LOG.info("load filter from spi :" + filter.getClass().getName());
}
addFilter(filter);
}
}
2. 获取数据源时
@Override
public Connection connect(String url, Properties info) throws SQLException {
if (!acceptsURL(url)) {
return null;
}
connectCount.incrementAndGet();
DataSourceProxyImpl dataSource = getDataSource(url, info);
return dataSource.connect(info);
}
3.3 filterChain 的使用
从 Druid 连接池获取连接的时候便会使用 nextFilter() 方法从装载 filter 的集合中拿到一个 filter, 并执行这个 filter 的 dataSource_getConnection 方法,将获取到的 DruidPooledConnection 对象依次返回到前面的 filter。这里是一个递归调用,层层下沉,使得可以依次执行 filterchain 的下一个filter,当拿到 DruidPooledConnection 对象,并层层返回时,有一个好处是:返回时每经过一个 filter ,都可以执行 该 filter 的 if (conn != null) {} 里的特殊逻辑。
@Override
public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
if (this.pos < filterSize) {
DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis);
return conn;
}
return dataSource.getConnectionDirect(maxWaitMillis);
}
比如StatFilter#dataSource_connect():
@Override
public DruidPooledConnection dataSource_getConnection(FilterChain chain, DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis);
if (conn != null) {
conn.setConnectedTimeNano();
StatFilterContext.getInstance().pool_connection_open();
}
return conn;
}
|