sql 拦截 笔记
背景
之前写过 使用p6spy+springboot 做数据库操作日志审计 但是随着需求迭代越来越多.功能模块也原来越多,对sql的审计工作越来越重要.先执行后审计已经越来越不满足需求了.在此背景下.做了一个轻量级的sql拦截.
实现方式
通过 com.github.gavlyukovskiy 对sql进行拦截,在执行sql前拦截sql,做校验. 需要执行的sql,对应开发人需要提前在系统中做报备.然后专家审核通过后才可以在系统中执行.
审核主要分为3个维度
1.业务维度,执行的sql,是否满足业务的要求.关联表的查询是否合理. 2.数据库维度,sql是否使用了索引,执行的sql是否有条件约束等等. 3.权限维度.sql执行后的影响范围,每次最多可以修改的数据限制等等.
1.sql报备,可以根据自己的情况开发系统来完成.
格式化sql方法,然后保存到系统中,同时每次验证需要执行的sql也是使用此方法.
package com.example.utils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor;
import com.example.config.Constants;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
public class SqlUtils {
public static String getSql(String sql){
return parseSql(sql);
}
public static void initSql(String sql,String methodName){
String parseSql = parseSql(sql);
Constants.SQL_MAP.put(methodName,parseSql);
}
private static String parseSql(String sql) {
sql = sql.replaceAll("[\r\n]", " ");
MySqlStatementParser parser = new MySqlStatementParser(sql);
SQLStatement sqlStatement = parser.parseStatement();
MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
sqlStatement.accept(visitor);
String optTable = visitor.getTables().entrySet().stream().map(nameTableStatEntry -> nameTableStatEntry.getValue().toString().toLowerCase() + "_opt" + "@@" + nameTableStatEntry.getKey().getName().toLowerCase() + "_table")
.collect(Collectors.joining("@@"));
String cols = visitor.getColumns().stream().filter(column -> {
if (optTable.toLowerCase().startsWith("select")) {
return column.isSelect();
} else if (optTable.toLowerCase().startsWith("update")) {
return column.isUpdate();
} else {
return true;
}
}).map(column -> column.toString() + "_col").collect(Collectors.joining("@@"));
String isWhere = "noWhere";
if (visitor.getConditions().size() > 0) {
isWhere = "where";
}
List<String> limitList = Arrays.stream(sql.split(" ")).filter(str -> "limit".equals(str.toLowerCase())).collect(Collectors.toList());
String isLimit = "noLimit";
if (limitList.size() > 0) {
isLimit = "limit";
}
return String.join("@@", optTable, cols, isWhere, isLimit);
}
}
sql格式化思路
sql的格式太多.这里我找了一个简单的方法来实现.通过sql解析出sql的操作类型,操作的数据库表,操作的列,是否有where,是否有limit最后生成一个sql格式化字符串.也就是我们需要审计的.
格式化之后的sql:
#sql1
select_opt@@person_table@@person.*_col@@where@@noLimit
#sql2
select_opt@@person_table@@person.id_col@@person.first_name_col@@person.last_name_col@@person.birth_date_col@@person.deleted_col@@where@@noLimit
通过上面可以看出来2个sql的区别在于第一个查询了 person.*,而第二个sql查询的具体的列 同样的方式,我们可以格式化出update sql要修改的表,修改的列,是否有where等. insert sql要插入的表,要插入的列,是否有where等.
2.sql拦截,在引入com.github.gavlyukovskiy包后,通过实现QueryExecutionListener来实现
引入pom文件
<!--sql拦截包-->
<dependency>
<groupId>com.github.gavlyukovskiy</groupId>
<artifactId>datasource-proxy-spring-boot-starter</artifactId>
<version>1.7.1</version>
</dependency>
核心代码
package com.example.config;
import com.example.utils.SqlUtils;
import lombok.extern.slf4j.Slf4j;
import net.ttddyy.dsproxy.ExecutionInfo;
import net.ttddyy.dsproxy.QueryInfo;
import net.ttddyy.dsproxy.listener.QueryExecutionListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class SqlInterceptor implements QueryExecutionListener {
@Override
public void beforeQuery(ExecutionInfo executionInfo, List<QueryInfo> list) {
FastThreadLocalEntity fastThreadLocalEntity = Constants.threadLocal.get();
for (QueryInfo queryInfo : list) {
log.info("beforeQuery 执行sql={}",queryInfo.getQuery().replaceAll("[\\t\\n\\r]", " "));
String sql = Constants.SQL_MAP.get(fastThreadLocalEntity.getRequestMethod()).toString();
String nowSql = SqlUtils.getSql(queryInfo.getQuery().replaceAll("[\\t\\n\\r]", " "));
if (!sql.equals(nowSql)){
throw new CustomException(String.format("sql验证权限不对!,录入sql=%s,当前sql=%s",sql,nowSql));
}
}
}
@Override
public void afterQuery(ExecutionInfo executionInfo, List<QueryInfo> list) {
}
}
流程图
git地址
https://gitee.com/mr-liu-163/sqltest.git
|