druid --为监控而生,具体监控如何做
结合昨天FilterChainImpl执行的FilterEventAdapter的拦截器处理逻辑,今天针对源码中的StatFilter源码进行分析,关注拦截sql执行中的监控参数处理。
connection_connect
连接时的拦截方法
public ConnectionProxy connection_connect(FilterChain chain, Properties info) throws SQLException {
ConnectionProxy connection = null;
long startNano = System.nanoTime();
long startTime = System.currentTimeMillis();
long nanoSpan;
long nowTime = System.currentTimeMillis();
JdbcDataSourceStat dataSourceStat = chain.getDataSource().getDataSourceStat();
dataSourceStat.getConnectionStat().beforeConnect();
try {
connection = chain.connection_connect(info);
nanoSpan = System.nanoTime() - startNano;
} catch (SQLException ex) {
dataSourceStat.getConnectionStat().connectError(ex);
throw ex;
}
dataSourceStat.getConnectionStat().afterConnected(nanoSpan);
if (connection != null) {
JdbcConnectionStat.Entry statEntry = getConnectionInfo(connection);
dataSourceStat.getConnections().put(connection.getId(), statEntry);
statEntry.setConnectTime(new Date(startTime));
statEntry.setConnectTimespanNano(nanoSpan);
statEntry.setEstablishNano(System.nanoTime());
statEntry.setEstablishTime(nowTime);
statEntry.setConnectStackTrace(new Exception());
dataSourceStat.getConnectionStat().setActiveCount(dataSourceStat.getConnections().size());
}
return connection;
}
connection_close
连接关闭的拦截方法
@Override
public void connection_close(FilterChain chain, ConnectionProxy connection) throws SQLException {
if (connection.getCloseCount() == 0) {
long nowNano = System.nanoTime();
JdbcDataSourceStat dataSourceStat = chain.getDataSource().getDataSourceStat();
dataSourceStat.getConnectionStat().incrementConnectionCloseCount();
JdbcConnectionStat.Entry connectionInfo = getConnectionInfo(connection);
long aliveNanoSpan = nowNano - connectionInfo.getEstablishNano();
JdbcConnectionStat.Entry existsConnection = dataSourceStat.getConnections().remove(connection.getId());
if (existsConnection != null) {
dataSourceStat.getConnectionStat().afterClose(aliveNanoSpan);
}
}
chain.connection_close(connection);
}
connection_commit
@Override
public void connection_commit(FilterChain chain, ConnectionProxy connection) throws SQLException {
chain.connection_commit(connection);
JdbcDataSourceStat dataSourceStat = chain.getDataSource().getDataSourceStat();
dataSourceStat.getConnectionStat().incrementConnectionCommitCount();
}
@Override
public void connection_rollback(FilterChain chain, ConnectionProxy connection) throws SQLException {
chain.connection_rollback(connection);
JdbcDataSourceStat dataSourceStat = chain.getDataSource().getDataSourceStat();
dataSourceStat.getConnectionStat().incrementConnectionRollbackCount();
}
internalBeforeStatementExecute
private final void internalBeforeStatementExecute(StatementProxy statement, String sql) {
JdbcDataSourceStat dataSourceStat = statement.getConnectionProxy().getDirectDataSource().getDataSourceStat();
dataSourceStat.getStatementStat().beforeExecute();
final ConnectionProxy connection = statement.getConnectionProxy();
final JdbcConnectionStat.Entry connectionCounter = getConnectionInfo(connection);
statement.setLastExecuteStartNano();
connectionCounter.setLastSql(sql);
if (connectionStackTraceEnable) {
connectionCounter.setLastStatementStatckTrace(new Exception());
}
JdbcSqlStat sqlStat = statement.getSqlStat();
if (sqlStat == null || sqlStat.isRemoved()) {
sqlStat = createSqlStat(statement, sql);
statement.setSqlStat(sqlStat);
}
JdbcStatContext statContext = JdbcStatManager.getInstance().getStatContext();
if (statContext != null) {
sqlStat.setName(statContext.getName());
sqlStat.setFile(statContext.getFile());
}
boolean inTransaction = false;
try {
inTransaction = !statement.getConnectionProxy().getAutoCommit();
} catch (SQLException e) {
LOG.error("getAutoCommit error", e);
}
if (sqlStat != null) {
sqlStat.setExecuteLastStartTime(System.currentTimeMillis());
sqlStat.incrementRunningCount();
if (inTransaction) {
sqlStat.incrementInTransactionCount();
}
}
StatFilterContext.getInstance().executeBefore(sql, inTransaction);
String mergedSql;
if (sqlStat != null) {
mergedSql = sqlStat.getSql();
} else {
mergedSql = sql;
}
Profiler.enter(mergedSql, Profiler.PROFILE_TYPE_SQL);
}
private final void internalAfterStatementExecute(StatementProxy statement, boolean firstResult,
int... updateCountArray) {
final long nowNano = System.nanoTime();
final long nanos = nowNano - statement.getLastExecuteStartNano();
JdbcDataSourceStat dataSourceStat = statement.getConnectionProxy().getDirectDataSource().getDataSourceStat();
dataSourceStat.getStatementStat().afterExecute(nanos);
final JdbcSqlStat sqlStat = statement.getSqlStat();
if (sqlStat != null) {
sqlStat.incrementExecuteSuccessCount();
sqlStat.decrementRunningCount();
sqlStat.addExecuteTime(statement.getLastExecuteType(), firstResult, nanos);
statement.setLastExecuteTimeNano(nanos);
if ((!firstResult) && statement.getLastExecuteType() == StatementExecuteType.Execute) {
try {
int updateCount = statement.getUpdateCount();
sqlStat.addUpdateCount(updateCount);
} catch (SQLException e) {
LOG.error("getUpdateCount error", e);
}
} else {
for (int updateCount : updateCountArray) {
sqlStat.addUpdateCount(updateCount);
sqlStat.addFetchRowCount(0);
StatFilterContext.getInstance().addUpdateCount(updateCount);
}
}
long millis = nanos / (1000 * 1000);
if (millis >= slowSqlMillis) {
String slowParameters = buildSlowParameters(statement);
sqlStat.setLastSlowParameters(slowParameters);
String lastExecSql = statement.getLastExecuteSql();
if (logSlowSql) {
String msg = "slow sql " + millis + " millis. " + lastExecSql + "" + slowParameters;
switch (slowSqlLogLevel) {
case "WARN":
LOG.warn(msg);
break;
case "INFO":
LOG.info(msg);
break;
case "DEBUG":
LOG.debug(msg);
break;
default:
LOG.error(msg);
}
}
handleSlowSql(statement);
}
}
String sql = statement.getLastExecuteSql();
StatFilterContext.getInstance().executeAfter(sql, nanos, null);
Profiler.release(nanos);
}
额外:mergedSql
public static String parameterize(String sql
, DbType dbType
, SQLSelectListCache selectListCache
, List<Object> outParameters
, SQLParserFeature[] features
, VisitorFeature ...visitorFeatures) {
SQLStatementParser parser = SQLParserUtils.createSQLStatementParser(sql, dbType, features);
if (selectListCache != null) {
parser.setSelectListCache(selectListCache);
}
List<SQLStatement> statementList = parser.parseStatementList();
if (statementList.size() == 0) {
return sql;
}
StringBuilder out = new StringBuilder(sql.length());
ParameterizedVisitor visitor = createParameterizedOutputVisitor(out, dbType);
if (outParameters != null) {
visitor.setOutputParameters(outParameters);
}
configVisitorFeatures(visitor, visitorFeatures);
for (int i = 0; i < statementList.size(); i++) {
SQLStatement stmt = statementList.get(i);
if (i > 0) {
SQLStatement preStmt = statementList.get(i - 1);
if (preStmt.getClass() == stmt.getClass()) {
StringBuilder buf = new StringBuilder();
ParameterizedVisitor v1 = createParameterizedOutputVisitor(buf, dbType);
preStmt.accept(v1);
if (out.toString().equals(buf.toString())) {
continue;
}
}
if (!preStmt.isAfterSemi()) {
out.append(";\n");
} else {
out.append('\n');
}
}
if (stmt.hasBeforeComment()) {
stmt.getBeforeCommentsDirect().clear();
}
Class<?> stmtClass = stmt.getClass();
if (stmtClass == SQLSelectStatement.class) {
SQLSelectStatement selectStatement = (SQLSelectStatement) stmt;
visitor.visit(selectStatement);
visitor.postVisit(selectStatement);
} else {
stmt.accept(visitor);
}
}
if (visitor.getReplaceCount() == 0
&& parser.getLexer().getCommentCount() == 0
&& sql.charAt(0) != '/') {
boolean notUseOriginalSql = false;
if (visitorFeatures != null) {
for (VisitorFeature visitorFeature : visitorFeatures) {
if (visitorFeature == VisitorFeature.OutputParameterizedZeroReplaceNotUseOriginalSql) {
notUseOriginalSql = true;
}
}
}
if (!notUseOriginalSql) {
int ddlStmtCount = 0;
for (SQLStatement stmt : statementList) {
if (stmt instanceof SQLDDLStatement) {
ddlStmtCount++;
}
}
if (ddlStmtCount == statementList.size()) {
notUseOriginalSql = true;
}
}
if (!notUseOriginalSql) {
return sql;
}
}
return out.toString();
}
总结
今天主要针对StatFilter进行了源码学习,整个过滤器的逻辑处理都是围绕着监控的各个指标参数,也确实印证了druid为监控而生的定义。在看这块代码实现的时候感觉很有趣的就是他的抽象类模版方法这些设计思想,目前项目中也在用,在实际写业务代码的时候也可以尝试去采用架构思想进行设计自己的业务实现。我一直任务并不是一定要接触很牛逼的项目才能提升自己,提升自己的最好方法就是提高自己的定位,在做一个简单的业务功能的时候可以站的怎么让自己的功能高复用、怎么更灵活、怎么能够后续拓展,这个其实就是设计模式的最开始的想法,慢慢总结实践,这样经验一样很宝贵。后面针对Visitor进行细化解读,明确具体sql解析的原理。
|