IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 定义Mybatis拦截器动态切换postgre数据库schema -> 正文阅读

[大数据]定义Mybatis拦截器动态切换postgre数据库schema

背景

随着业务的发展和合规要求,产品数据库将切换到Postgres。之前不同技术域,不同交付工程的数据分库管理的方式切换到PG数据库后将通过分schema管理。
ORM继续使用Mybatis,为使用迁移工作量尽可能小,现有的SQL代码不做大的修改。动态数据源实现考虑在Mybatis执行过程中做拦截,替换sql中的schema标识。

提取请求参数中的schema

约定rest接口请求Header参数中增加schema信息。通过切面技术从请求头中提取schema后保存到线程变量。

1. 提取schema

package com.postgres.manager;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;

/**
 * Schema切面, 提取header头中的schema保存到SchemaHolder中
 *
 * @author elon
 * @since 2022-03-20
 */
@Aspect
@Component
@Order(9999)
public class SchemaAspect {
    @Pointcut("@annotation(org.springframework.web.bind.annotation.GetMapping) "
            + "|| @annotation(org.springframework.web.bind.annotation.PostMapping) "
            + "|| @annotation(org.springframework.web.bind.annotation.DeleteMapping) "
            + "|| @annotation(org.springframework.web.bind.annotation.RequestMapping)")
    void schema() {

    }

    /**
     * 从请求头提取
     *
     * @param joinPoint
     */
    @Before("schema()")
    public void setSchema(JoinPoint joinPoint) {
        String schema = getSchemaFromHeader();
        SchemaHolder.set(schema);
    }

    @After("schema()")
    public void clearSchema(JoinPoint joinPoint) {
        SchemaHolder.clear();
    }

    /**
     * 从请求头中后去schema信息
     *
     * @return schema
     */
    private String getSchemaFromHeader() {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        String schema = request.getHeader("schema");
        return schema;
    }
}

2. 保存schema的线程变量类

package com.postgres.manager;

/**
 * Schema持有类. 用于在异步线程或者跨多个方法传递schema信息
 *
 * @author elon
 * @since 2022-03-19
 */
public class SchemaHolder {
    private static ThreadLocal<String> schema = new ThreadLocal<>();

    public static void set(String sch) {
        schema.set(sch);
    }

    public static String get() {
        return schema.get();
    }

    public static void clear() {
        schema.remove();
    }
}

定义Mybatis拦截器

1. 定义拦截器注解,用于修饰DAO层级接口

package com.postgres.manager;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * schema拦截器注解。修饰mapper接口类,用以区分访问的pg数据库schema
 *
 * @author elon
 * @since 2022-03-20
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SchemaInterceptAnnotation {
    /**
     * schema类型。取值范围:business, common
     *
     * @return
     */
    String schemaType() default "";
}

在DAO层接口类加上该注解,拦截器会动态切换schema.

package com.postgres.mapper;

import com.postgres.manager.SchemaInterceptAnnotation;
import com.postgres.model.ExamResult;
import com.postgres.model.User;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

@Mapper
@SchemaInterceptAnnotation(schemaType = "business")
public interface UserMapper {
    /**
     * 从schema获取user数据
     *
     * @return user列表
     */
    List<User> getUserFromSchema(@Param("name") String name);

    /**
     * 插入用户数据到schema
     *
     * @param userList 用户列表
     */
    void insertUser2Schema(@Param("list") List<User> userList);

    /**
     * 获取测试成绩.
     *
     * @return 测试成绩列表
     */
    List<ExamResult> getExamResult();
}

2. 拦截器替换sql中的表名为schema.表名

package com.postgres.manager;

import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;
import java.sql.Connection;
import java.util.Properties;

/**
 * StatementHandler拦截器. 在prepare方法执行前拦截,修改sql语句,增加schema.
 *
 * @author elon
 * @since 2022-03-20
 */
@Component
@Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
public class StatementHandlerInterceptor implements Interceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(StatementHandlerInterceptor.class);

    /**
     * 业务数据分schema存储
     */
    private static final String BUSINESS_SCHEMA = "business";

    /**
     * 公共的配置数据(不分schema), 固定库
     */
    private static final String COMMON_SCHEMA = "common";

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
        MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY,
                SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());

        MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
        String mapperMethod = mappedStatement.getId();

        BoundSql boundSql = statementHandler.getBoundSql();
        String sql = boundSql.getSql();

        String mapperClass = mapperMethod.substring(0, mappedStatement.getId().lastIndexOf("."));
        Class<?> classType = Class.forName(mapperClass);

        SchemaInterceptAnnotation interceptAnnotation = classType.getAnnotation(SchemaInterceptAnnotation.class);
        String schemaType = interceptAnnotation.schemaType();
        String newSql = replaceSqlWithSchema(schemaType, sql, mapperMethod);

        //通过反射修改sql语句
        Field field = boundSql.getClass().getDeclaredField("sql");
        field.setAccessible(true);
        field.set(boundSql, newSql);

        return invocation.proceed();
    }

    @Override
    public Object plugin(Object object) {
        if (object instanceof StatementHandler) {
            return Plugin.wrap(object, this);
        } else {
            return object;
        }
    }

    @Override
    public void setProperties(Properties properties) {

    }

    private String replaceSqlWithSchema(String schemaType, String originalSql, String mapperMethod){
        // 替换sql中的表名,加上schema
        if (BUSINESS_SCHEMA.equals(schemaType)) {
            String schema = SchemaHolder.get();
            return originalSql.replaceAll(" t_", " " + schema + ".t_");
        } else if (COMMON_SCHEMA.equals(schemaType)) {
            return originalSql.replaceAll(" t_", " " + COMMON_SCHEMA + ".t_");
        } else {
            LOGGER.error("Invalid SchemaInterceptAnnotation. mapperMethod:{}", mapperMethod);
            throw new IllegalArgumentException("Invalid SchemaInterceptAnnotation.");
        }
    }
}

2. 添加拦截器

加上如下处理, 拦截器才会生效

package com.postgres.config;

import com.postgres.manager.StatementHandlerInterceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.List;

@Configuration
public class InterceptorConfig {

    @Autowired
    private List<SqlSessionFactory> sqlSessionFactoryList;

    @PostConstruct
    public void addSqlInterceptor() {
        StatementHandlerInterceptor interceptor = new StatementHandlerInterceptor();
        for (SqlSessionFactory sqlSessionFactory : sqlSessionFactoryList) {
            sqlSessionFactory.getConfiguration().addInterceptor(interceptor);
        }
    }
}

完整的Demo代码还包括DataSource配置和XML中SQL,这些和普通的Spring Boot项目无异。参考github上的完整实现代码:https://github.com/ylforever/elon-postgres

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-07 22:46:53  更:2022-04-07 22:50:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 13:22:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码