IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Druid核心源码解析--DruidDataSource -> 正文阅读

[大数据]Druid核心源码解析--DruidDataSource

配置读取

druid连接池支持的所有连接参数可在类com.alibaba.druid.pool.DruidDataSourceFactory中查看。

配置读取代码:

 public void configFromPropety(Properties properties) {
        //这方法太长,自己看源码去吧,就是读读属性。。。。
    }

整体代码比较简单,就是把配置内容,读取到dataSource。

连接池初始化

首先是简单的判断,加锁:

if (inited) {
            //已经被初始化好了,直接return
            return;
        }

        // bug fixed for dead lock, for issue #2980
        DruidDriver.getInstance();
        /**控制创建移除连接的锁,并且通过条件去控制一个连接的生成消费**/
        // public DruidAbstractDataSource(boolean lockFair){
        //        lock = new ReentrantLock(lockFair);
        //
        //        notEmpty = lock.newCondition();
        //        empty = lock.newCondition();
        //    }
        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

之后会更新一些JMX的监控指标:

//一些jmx监控指标
                this.connectionIdSeedUpdater.addAndGet(this, delta);
                this.statementIdSeedUpdater.addAndGet(this, delta);
                this.resultSetIdSeedUpdater.addAndGet(this, delta);
                this.transactionIdSeedUpdater.addAndGet(this, delta);

druid的监控指标都是通过jmx实现的。

解析连接串:

 if (this.jdbcUrl != null) {
                //解析连接串
                this.jdbcUrl = this.jdbcUrl.trim();
                initFromWrapDriverUrl();
            }

initFromWrapDriverUrl方法,除了从jdbc url中解析出连接和驱动信息,后面还把filters的名字,解析成了对应的filter类。

  private void initFromWrapDriverUrl() throws SQLException {
        if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) {
            return;
        }

        DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null);
        this.driverClass = config.getRawDriverClassName();

        LOG.error("error url : '" + jdbcUrl + "', it should be : '" + config.getRawUrl() + "'");

        this.jdbcUrl = config.getRawUrl();
        if (this.name == null) {
            this.name = config.getName();
        }

        for (Filter filter : config.getFilters()) {
            addFilter(filter);
        }
    }

之后在init方法里面,会进行filters的初始化:

 //初始化filter 属性
            for (Filter filter : filters) {
                filter.init(this);
            }

之后解析数据库类型:

 if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
                this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
            }

注意枚举值: com.alibaba.druid.DbType,这个里面包含了目前durid连接池支持的所有数据源 类型,另外,druid还额外提供了一些驱动类,例如:

 elastic_search  (1 << 25), // com.alibaba.xdriver.elastic.jdbc.ElasticDriver

clickhouse还提供了负载均衡的驱动类:

com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver

在回到init方法,之后是一堆参数解析,不再说,跳过了。
之后是通过SPI加载自定义的filter:

  private void initFromSPIServiceLoader() {
        if (loadSpifilterSkip) {
            return;
        }

        if (autoFilters == null) {
            List<Filter> filters = new ArrayList<Filter>();
            ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);

            for (Filter filter : autoFilterLoader) {
                AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
                if (autoLoad != null && autoLoad.value()) {
                    filters.add(filter);
                }
            }
            autoFilters = filters;
        }

        for (Filter filter : autoFilters) {
            if (LOG.isInfoEnabled()) {
                LOG.info("load filter from spi :" + filter.getClass().getName());
            }
            addFilter(filter);
        }
    }

注意自定义的filter,要使用com.alibaba.druid.filter.AutoLoad

解析驱动:

  protected void resolveDriver() throws SQLException {
        if (this.driver == null) {
            if (this.driverClass == null || this.driverClass.isEmpty()) {
                this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
            }

            if (MockDriver.class.getName().equals(driverClass)) {
                driver = MockDriver.instance;
            } else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) {
                Properties info = new Properties();
                info.put("user", username);
                info.put("password", password);
                info.putAll(connectProperties);
                driver = new BalancedClickhouseDriver(jdbcUrl, info);
            } else {
                if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
                    throw new SQLException("url not set");
                }
                driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
            }
        } else {
            if (this.driverClass == null) {
                this.driverClass = driver.getClass().getName();
            }
        }
    }

其中durid自己的mock驱动和clickhouse的负载均衡的驱动,特殊判断了下,其他走的都是class forname.

之后是exception sorter和checker的一些东西,跟主线剧情关系不大,skip.

之后是一些初始化JdbcDataSourceStat,没啥东西。

之后是核心:

  connections = new DruidConnectionHolder[maxActive];  //连接数组
            evictConnections = new DruidConnectionHolder[maxActive]; //销毁的连接数组
            keepAliveConnections = new DruidConnectionHolder[maxActive]; //保持活跃可用的数组

dataSource的连接,都被包装在类DruidConnectionHolder中,之后是一个同步去初始化连接还是异步去初始化的连接,总之,是去初始化 连接的过程:

if (createScheduler != null && asyncInit) {
                for (int i = 0; i < initialSize; ++i) {
                    submitCreateTask(true);
                }
            } else if (!asyncInit) {
                // init connections
                while (poolingCount < initialSize) {
                    try {
                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                        connections[poolingCount++] = holder;
                    } catch (SQLException ex) {
                        LOG.error("init datasource error, url: " + this.getUrl(), ex);
                        if (initExceptionThrow) {
                            connectError = ex;
                            break;
                        } else {
                            Thread.sleep(3000);
                        }
                    }
                }

                if (poolingCount > 0) {
                    poolingPeak = poolingCount;
                    poolingPeakTime = System.currentTimeMillis();
                }
            }

初始化的连接个数为连接串里面配置的initialSize.

核心初始化方法com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection(),在这方法里面,会拿用户名密码,之后执行真正的获取connection:

 public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
        Connection conn;
        if (getProxyFilters().size() == 0) {
            conn = getDriver().connect(url, info);
        } else {
            conn = new FilterChainImpl(this).connection_connect(info);
        }

        createCountUpdater.incrementAndGet(this);

        return conn;
    }

注意,如果配置了filters,则所有操作,都会在操作前执行filter处理链。

 public ConnectionProxy connection_connect(Properties info) throws SQLException {
        if (this.pos < filterSize) {
            return nextFilter()
                    .connection_connect(this, info);
        }

        Driver driver = dataSource.getRawDriver();
        String url = dataSource.getRawJdbcUrl();

        Connection nativeConnection = driver.connect(url, info);

        if (nativeConnection == null) {
            return null;
        }

        return new ConnectionProxyImpl(dataSource, nativeConnection, info, dataSource.createConnectionId());
    }

再回到主流程init方法,connections数组初始化完成之后,
开启额外线程:

     createAndLogThread();  //打印连接信息
            createAndStartCreatorThread(); //创建连接线程
            createAndStartDestroyThread(); //销毁连接线程 

先看注释,具体里面的内容后面单独拉出来讲。

之后:

 initedLatch.await(); //初始化 latch -1
            init = true;  //标记已经初始化完成

            initedTime = new Date(); //时间
            registerMbean(); //为datasource 注册jmx监控指标

最后的最后,如果配置了keepAlive:


            if (keepAlive) {
                // async fill to minIdle
                if (createScheduler != null) {
                    for (int i = 0; i < minIdle; ++i) {
                        submitCreateTask(true);
                    }
                } else {
                    this.emptySignal();
                }
            }

这时候,会根据配置的活跃连接数minIdle,去给datasource的连接,做个保持活跃连接个数,具体后面再说。

连接池使用的核心逻辑

首先,使用数组作为连接的容器,对于真实连接的加入和移除,使用lock就行同步,另外,在加入和移除连接时候,对比生产消费模型,通过lock上的条件,来通知是否可以获取或者加入连接。

 public DruidAbstractDataSource(boolean lockFair){
        lock = new ReentrantLock(lockFair);

        notEmpty = lock.newCondition();  //非空,有连接
        empty = lock.newCondition(); //空的
    } 

另外,默认的fairlock为false

  public DruidDataSource(){
        this(false);
    }

    public DruidDataSource(boolean fairLock){
        super(fairLock);

        configFromPropety(System.getProperties());
    }

创建连接

在线程com.alibaba.druid.pool.DruidDataSource.CreateConnectionThread中:

 if (emptyWait) {
                        // 必须存在线程等待,才创建连接
                        if (poolingCount >= notEmptyWaitThreadCount //
                                && (!(keepAlive && activeCount + poolingCount < minIdle))
                                && !isFailContinuous()
                        ) {
                            empty.await();
                        }

                        // 防止创建超过maxActive数量的连接
                        if (activeCount + poolingCount >= maxActive) {
                            empty.await();
                            continue;
                        }
                    }

必须存在线程等待获取连接时候,才能创建连接,并且要保持总的连接数,不能超过配置的最大连接。

创建完连接之后,执行notEmpty.signalAll();通知消费者。

获取连接

外层代码:

 @Override
    public DruidPooledConnection getConnection() throws SQLException {
        return getConnection(maxWait);
    }

    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        init();

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }

忽略掉filter chain,其实最后执行的还是com.alibaba.druid.pool.DruidDataSource#getConnectionDirect

方法内部:

   poolableConnection = getConnectionInternal(maxWaitMillis);
  • 1 , 连接不足,需要直接去创建新的,跟我们初始化一样
  • 2,从connections里面拿
 if (maxWait > 0) {
                    holder = pollLast(nanos);
                } else {
                    holder = takeLast();
                }

其中,maxWait默认为-1,配置在init里面:

 String property = properties.getProperty("druid.maxWait");
            if (property != null && property.length() > 0) {
                try {
                    int value = Integer.parseInt(property);
                    this.setMaxWait(value);
                } catch (NumberFormatException e) {
                    LOG.error("illegal property 'druid.maxWait'", e);
                }
            }

这个用于配置拿连接时候,是否在这个时间上进行等待,默认是否,即一直等到拿到连接为止。

直接看下阻塞拿的过程:

 DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
        try {
            //没连接了
            while (poolingCount == 0) {
                //暗示下创建线程没连接了
                emptySignal(); // send signal to CreateThread create connection

                if (failFast && isFailContinuous()) {
                    throw new DataSourceNotAvailableException(createError);
                }

                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                    notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                }
                try {
                    //傻等着创建或者回收,能给整出来点儿连接
                    notEmpty.await(); // signal by recycle or creator
                } finally {
                    notEmptyWaitThreadCount--;
                }
                notEmptyWaitCount++;

                if (!enable) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    if (disableException != null) {
                        throw disableException;
                    }

                    throw new DataSourceDisableException();
                }
            }
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }

        //拿数组的最后一个连接
        decrementPoolingCount();
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;

        return last;
    }

连接回收

 protected void createAndStartDestroyThread() {
        destroyTask = new DestroyTask();
	//自定义配置销毁 ,适用于连接数非常多的 情况
        if (destroyScheduler != null) {
            long period = timeBetweenEvictionRunsMillis;
            if (period <= 0) {
                period = 1000;
            }
            destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
                                                                          TimeUnit.MILLISECONDS);
            initedLatch.countDown();
            return;
        }

        String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
        //单线程销毁 
        destroyConnectionThread = new DestroyConnectionThread(threadName);
        destroyConnectionThread.start();
    }

实际的销毁:

 public class DestroyTask implements Runnable {
        public DestroyTask() {

        }

        @Override
        public void run() {
            shrink(true, keepAlive);

            if (isRemoveAbandoned()) {
                removeAbandoned();
            }
        }

    }

最终 执行的还是 shrink方法。

   public void shrink(boolean checkTime, boolean keepAlive) {
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            return;
        }

        boolean needFill = false;
        int evictCount = 0;
        int keepAliveCount = 0;
        int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
        fatalErrorCountLastShrink = fatalErrorCount;
        
        try {
            if (!inited) {
                return;
            }

            final int checkCount = poolingCount - minIdle; //需要检测连接的数量
            final long currentTimeMillis = System.currentTimeMillis();
            for (int i = 0; i < poolingCount; ++i) { //检测目前connections数组中的连接
                DruidConnectionHolder connection = connections[i];

                if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis))  {
                    keepAliveConnections[keepAliveCount++] = connection;
                    continue;
                }

                if (checkTime) {
                    //是否设置了物理连接的超时时间phyTimoutMills。假如设置了该时间,
                    // 判断连接时间存活时间是否已经超过phyTimeoutMills,是则放入evictConnections中
                    if (phyTimeoutMillis > 0) {
                        long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                        if (phyConnectTimeMillis > phyTimeoutMillis) {
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }

                    long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;//获取连接空闲时间
                    //如果某条连接空闲时间小于minEvictableIdleTimeMillis,则不用继续检查剩下的连接了
                    if (idleMillis < minEvictableIdleTimeMillis
                            && idleMillis < keepAliveBetweenTimeMillis
                    ) {
                        break;
                    }

                    if (idleMillis >= minEvictableIdleTimeMillis) {
                        // check checkTime is silly code
                        //检测检查了几个连接了
                        if (checkTime && i < checkCount) {
                            //超时了
                            evictConnections[evictCount++] = connection;
                            continue;
                        } else if (idleMillis > maxEvictableIdleTimeMillis) {
                            //超时了
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }

                    if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                        //配置了keepAlive,并且在存活时间内,放到keepAlive数组
                        keepAliveConnections[keepAliveCount++] = connection;
                    }
                } else {
                    //不需要检查时间的,直接移除
                    if (i < checkCount) {
                        evictConnections[evictCount++] = connection;
                    } else {
                        break;
                    }
                }
            }

            int removeCount = evictCount + keepAliveCount; //移除了几个
            //由于使用connections连接时候,都是取后面的,后面 的是最新的连接,只考虑前面过期就行,所以只需要挪动前面的连接
            if (removeCount > 0) {
                System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                poolingCount -= removeCount;
            }
            keepAliveCheckCount += keepAliveCount;

            if (keepAlive && poolingCount + activeCount < minIdle) {
                //不够核心的活跃连接时候,需要去创建啦
                needFill = true;
            }
        } finally {
            lock.unlock();
        }

        if (evictCount > 0) {
            for (int i = 0; i < evictCount; ++i) {
                //销毁连接
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCountUpdater.incrementAndGet(this);
            }
            Arrays.fill(evictConnections, null);
        }

        if (keepAliveCount > 0) {
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) {
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();

                boolean validate = false;
                try {
                    this.validateConnection(connection);
                    validate = true;
                } catch (Throwable error) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("keepAliveErr", error);
                    }
                    // skip
                }

                boolean discard = !validate; //没通过validate
                if (validate) {
                    //通过keep alive检查,更新时间
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    //这里还会尝试放回connections数组
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) {
                        //没放入,标记要丢弃了
                        discard = true;
                    }
                }

                if (discard) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        // skip
                    }

                    lock.lock();
                    try {
                        discardCount++;

                        if (activeCount + poolingCount <= minIdle) {
                            //发信号让创建线程去创建
                            emptySignal();
                        }
                    } finally {
                        lock.unlock();
                    }
                }
            }
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);
        }

        if (needFill) {
            //又要去创建了
            lock.lock();
            try {
                int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                for (int i = 0; i < fillCount; ++i) {
                    emptySignal();
                }
            } finally {
                lock.unlock();
            }
        } else if (onFatalError || fatalErrorIncrement > 0) {
            lock.lock();
            try {
                emptySignal();
            } finally {
                lock.unlock();
            }
        }
    }

工具数组evictConnections,keepAliveConnections 用完即被置空,老工具人了。

一波操作下来,完成了对connections数组的大清洗。

小结

  • 只写了核心逻辑,很多validate,checker,filter省略了。
  • druid连接池源码里面还有很多好用的工具,比如数据库驱动工具,jdbc工具,解析SQL的语法树,ibatis的支持,wall过滤,多数据源…
  • 最新的代码我看还有支持配套ZK的高可用方案,用到的话后期我会继续更新源码解析。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-16 22:27:35  更:2022-03-16 22:30:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:46:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码