IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Mybatis学习之路(四)Mybatis源码分析三 -> 正文阅读

[大数据]Mybatis学习之路(四)Mybatis源码分析三

本系列文章:
??Mybatis学习之路(一)理论基础和使用介绍
??Mybatis学习之路(二)Mybatis源码分析一
??Mybatis学习之路(三)Mybatis源码分析二
??Mybatis学习之路(四)Mybatis源码分析三

一、缓存机制

??通常我们都会用 Redis 或 memcached 等缓存中间件,拦截大量奔向数据库的请求,以减轻数据库压力。MyBatis自然也在内部提供了相应的支持。通过在框架层面增加缓存功能,可减轻数据库的压力,同时又可以提升查询速度,可谓一举两得。MyBatis 缓存结构由一级缓存和二级缓存构成,这两级缓存均是使用 Cache 接口的实现类。

1.1 缓存类

??在 MyBatis 中,Cache 是缓存接口,定义了一些基本的缓存操作。MyBatis 内部提供了丰富的缓存实现类,比如具有基本缓存功能的PerpetualCache ,具有 LRU 策略的缓存 LruCache ,以及可保证线程安全的缓存SynchronizedCache 和具备阻塞功能的缓存 BlockingCache 等。
??MyBatis 在实现缓存模块的过程中,使用了装饰模式。

1.1.1 PerpetualCache

??PerpetualCache(位于org.apache.ibatis.cache.impl) 是一个具有基本功能的缓存类,内部使用了 HashMap 实现缓存功能。

public class PerpetualCache implements Cache {

  	private final String id;

  	private final Map<Object, Object> cache = new HashMap<>();

  	public PerpetualCache(String id) {
    	this.id = id;
  	}

  	@Override
  	public String getId() {
    	return id;
  	}

  	@Override
  	public int getSize() {
    	return cache.size();
  	}

  	@Override
  	public void putObject(Object key, Object value) {
    	// 存储键值对到 HashMap
    	cache.put(key, value);
  	}

  	@Override
  	public Object getObject(Object key) {
    	// 查找缓存项
    	return cache.get(key);
  	}

  	@Override
  	public Object removeObject(Object key) {
    	// 移除缓存项
    	return cache.remove(key);
  	}

  	@Override
  	public void clear() {
    	cache.clear();
  	}

  	@Override
  	public boolean equals(Object o) {
    	if (getId() == null) {
      		throw new CacheException("Cache instances require an ID.");
    	}
    	if (this == o) {
      		return true;
    	}
    	if (!(o instanceof Cache)) {
      		return false;
    	}

    	Cache otherCache = (Cache) o;
    	return getId().equals(otherCache.getId());
  	}

  	@Override
  	public int hashCode() {
    	if (getId() == null) {
      		throw new CacheException("Cache instances require an ID.");
    	}
    	return getId().hashCode();
  	}
}

1.1.2 LruCache

??LruCache(位于org.apache.ibatis.cache.decorators),顾名思义,是一种具有 LRU 策略的缓存实现类。

public class LruCache implements Cache {

  	private final Cache delegate;
  	private Map<Object, Object> keyMap;
  	private Object eldestKey;

  	public LruCache(Cache delegate) {
   	 	this.delegate = delegate;
    	setSize(1024);
  	}

  	@Override
  	public String getId() {
    	return delegate.getId();
  	}

  	@Override
  	public int getSize() {
    	return delegate.getSize();
  	}

  	public void setSize(final int size) {
    	// 初始化 keyMap,注意,keyMap 的类型继承自 LinkedHashMap,
		// 并覆盖了 removeEldestEntry 方法
    	keyMap = new LinkedHashMap<Object, Object>(size, .75F, true) {
      		private static final long serialVersionUID = 4267176411845948333L;
      		// 覆盖 LinkedHashMap 的 removeEldestEntry 方法
      		@Override
      		protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) {
        		boolean tooBig = size() > size;
        		if (tooBig) {
          			// 获取将要被移除缓存项的键值
          			eldestKey = eldest.getKey();
        		}
        		return tooBig;
      		}
    	};
  	}

  	@Override
  	public void putObject(Object key, Object value) {
    	// 存储缓存项
    	delegate.putObject(key, value);
    	cycleKeyList(key);
  	}

  	@Override
  	public Object getObject(Object key) {
    	// 刷新 key 在 keyMap 中的位置
    	keyMap.get(key); // touch
    	// 从被装饰类中获取相应缓存项
    	return delegate.getObject(key);
  	}

  	@Override
  	public Object removeObject(Object key) {
    	// 从被装饰类中移除相应的缓存项
    	return delegate.removeObject(key);
  	}

  	@Override
  	public void clear() {
    	delegate.clear();
    	keyMap.clear();
  	}

  	private void cycleKeyList(Object key) {
    	// 存储 key 到 keyMap 中
    	keyMap.put(key, key);
    	if (eldestKey != null) {
      		// 从被装饰类中移除相应的缓存项
      		delegate.removeObject(eldestKey);
      		eldestKey = null;
    	}
  	}
}

??LruCache 的 keyMap 属性是实现 LRU 策略的关键,该属性类型继承自LinkedHashMap,并覆盖了 removeEldestEntry 方法。LinkedHashMap 可保持键值对的插入顺序,当插入一个新的键值对时,LinkedHashMap 内部的 tail 节点会指向最新插入的节点。head 节点则指向第一个被插入的键值对,也就是最久未被访问的那个键值对。默认情况下,LinkedHashMap 仅维护键值对的插入顺序。若要基于 LinkedHashMap 实现 LRU 缓存,还需通过构造方法将 LinkedHashMap 的 accessOrder 属性设为 true,此时 LinkedHashMap会维护键值对的访问顺序。比如,上面代码中 getObject 方法中执行了这样一句代码keyMap.get(key) ,目的是刷新 key 对应的键值对在 LinkedHashMap 的位置。
??LinkedHashMap 会将 key 对应的键值对移动到链表的尾部,尾部节点表示最久刚被访问过或者插入的节点。除了需将 accessOrder 设为 true,还需覆盖 removeEldestEntry 方法。LinkedHashMap 在插入新的键值对时会调用该方法,以决定是否在插入新的键值对后,移除老的键值对。在上面的代码中,当被装饰类的容量超出了 keyMap 的所规定的容量(由构造方法传入)后,keyMap 会移除最长时间未被访问的键,并将该键保存到 eldestKey 中,然后由 cycleKeyList 方法将 eldestKey 传给被装饰类的 removeObject 方法,移除相应的缓存项目。

1.1.3 BlockingCache

??BlockingCache(位于org.apache.ibatis.cache.decorators) 实现了阻塞特性,该特性是基于 Java 重入锁实现的。同一时刻下,BlockingCache 仅允许一个线程访问指定 key 的缓存项,其他线程将会被阻塞住。

public class BlockingCache implements Cache {

  	private long timeout;
  	private final Cache delegate;
  	private final ConcurrentHashMap<Object, CountDownLatch> locks;

  	public BlockingCache(Cache delegate) {
    	this.delegate = delegate;
    	this.locks = new ConcurrentHashMap<>();
  	}

  	@Override
  	public String getId() {
    	return delegate.getId();
  	}

  	@Override
  	public int getSize() {
    	return delegate.getSize();
  	}	

  	@Override
  	public void putObject(Object key, Object value) {
    	try {
      		// 存储缓存项
      		delegate.putObject(key, value);
    	} finally {
      		// 释放锁
      		releaseLock(key);
    	}
  	}

  	@Override
  	public Object getObject(Object key) {
    	// 请求锁
    	acquireLock(key);
    	Object value = delegate.getObject(key);
    	// 若缓存命中,则释放锁。需要注意的是,未命中则不释放锁
    	if (value != null) {
      		// 释放锁
      		releaseLock(key);
    	}
   	 	return value;
  	}

  	@Override
  	public Object removeObject(Object key) {
    	// 释放锁
    	releaseLock(key);
    	return null;
  	}

  	@Override
  	public void clear() {
    	delegate.clear();
  	}

  	private void acquireLock(Object key) {
    	CountDownLatch newLatch = new CountDownLatch(1);
    	while (true) {
      		CountDownLatch latch = locks.putIfAbsent(key, newLatch);
      		if (latch == null) {
        		break;
      		}
      		try {
        		if (timeout > 0) {
          			boolean acquired = latch.await(timeout, TimeUnit.MILLISECONDS);
          			if (!acquired) {
            			throw new CacheException(
                			"Couldn't get a lock in " + timeout + " for the key " + key + " at the cache " + delegate.getId());
          				}
        			} else {
          				latch.await();
        			}
      		} catch (InterruptedException e) {
        		throw new CacheException("Got interrupted while trying to acquire lock for key " + key, e);
      		}
    	}
  	}

  	private void releaseLock(Object key) {
    	CountDownLatch latch = locks.remove(key);
    	if (latch == null) {
      		throw new IllegalStateException("Detected an attempt at releasing unacquired lock. This should never happen.");
    	}
    	latch.countDown();
  	}

  	public long getTimeout() {
    	return timeout;
  	}

  	public void setTimeout(long timeout) {
    	this.timeout = timeout;
  	}
}

??在查询缓存时,getObject 方法会先获取与 key 对应的锁,并加锁。若缓存命中,getObject 方法会释放锁,否则将一直锁定。getObject 方法若返回 null,表示缓存未命中。此时 MyBatis 会向数据库发起查询请求,并调用 putObject 方法存储查询结果。此时,putObject 方法会将指定 key 对应的锁进行解锁,这样被阻塞的线程即可恢复运行。

1.2 CacheKey

?? MyBatis 中,引入缓存的目的是为提高查询效率,降低数据库压力。value 的内容是 SQL 的查询结果,key是一种复合对象,能涵盖可影响查询结果的因子。在 MyBatis 中,这种复合对象就是 CacheKey(位于org.apache.ibatis.cache)。

public class CacheKey implements Cloneable, Serializable {
  	private static final int DEFAULT_MULTIPLIER = 37;
  	private static final int DEFAULT_HASHCODE = 17;
  	// 乘子,默认为 37
  	private final int multiplier;
  	// CacheKey 的 hashCode,综合了各种影响因子
  	private int hashcode;
  	// 校验和
  	private long checksum;
  	// 影响因子个数
  	private int count;
  	// 影响因子集合
  	private List<Object> updateList;

  	public CacheKey() {
   	 	this.hashcode = DEFAULT_HASHCODE;
    	this.multiplier = DEFAULT_MULTIPLIER;
    	this.count = 0;
    	this.updateList = new ArrayList<>();
  	}
}

??除了 multiplier 是恒定不变的 ,其他变量将在更新操作中被修改。

  	/** 每当执行更新操作时,表示有新的影响因子参与计算 */
  	public void update(Object object) {
    	int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object);
    	// 自增 count
    	count++;
    	// 计算校验和
    	checksum += baseHashCode;
    	// 更新 baseHashCode
    	baseHashCode *= count;
    	// 计算 hashCode
    	hashcode = multiplier * hashcode + baseHashCode;
    	// 保存影响因子
    	updateList.add(object);
  	}

??当不断有新的影响因子参与计算时,hashcode 和 checksum 将会变得愈发复杂和随机。这样可降低冲突率,使 CacheKey 可在缓存中更均匀的分布。CacheKey 最终要作为键存入HashMap,因此它需要覆盖 equals 和 hashCode 方法。下面我们来看一下这两个方法的实现。

  	public boolean equals(Object object) {
    	// 检测是否为同一个对象
    	if (this == object) {
      		return true;
    	}
    	// 检测 object 是否为 CacheKey
    	if (!(object instanceof CacheKey)) {
      		return false;
    	}

    	final CacheKey cacheKey = (CacheKey) object;
    	// 检测 hashCode 是否相等
    	if (hashcode != cacheKey.hashcode) {
      		return false;
    	}
    	// 检测校验和是否相同
    	if (checksum != cacheKey.checksum) {
      		return false;
    	}
    	// 检测 coutn 是否相同
    	if (count != cacheKey.count) {
      		return false;
    	}
    	// 如果上面的检测都通过了,下面分别对每个影响因子进行比较
    	for (int i = 0; i < updateList.size(); i++) {
      		Object thisObject = updateList.get(i);
      		Object thatObject = cacheKey.updateList.get(i);
      		if (!ArrayUtil.equals(thisObject, thatObject)) {
        		return false;
      		}
    	}
    	return true;
  	}	

  	public int hashCode() {
    	// 返回 hashcode 变量
    	return hashcode;
  	}

??equals 方法的检测逻辑比较严格,对 CacheKey 中多个成员变量进行了检测,已保证两者相等。hashCode 方法比较简单,返回 hashcode 变量即可。

1.3 一级缓存

??在进行数据库查询之前,MyBatis 首先会检查以及缓存中是否有相应的记录,若有的话直接返回即可。一级缓存是数据库的最后一道防护,若一级缓存未命中,查询请求将落到数据库上。一级缓存是在 BaseExecutor 被初始化的:

public abstract class BaseExecutor implements Executor {
	protected PerpetualCache localCache;
	// 省略其他字段

	protected BaseExecutor(Configuration configuration,Transaction transaction) {
		this.localCache = new PerpetualCache("LocalCache");
		// 省略其他字段初始化方法
 	}
}

??一级缓存的类型为 PerpetualCache,没有被其他缓存类装饰过。一级缓存所存储从查询结果会在 MyBatis 执行更新操作(INSERT/UPDATE/DELETE),以及提交和回滚事务时被清空。下面我们来看一下访问一级缓存的逻辑。

  	public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    	BoundSql boundSql = ms.getBoundSql(parameter);
    	// 创建 CacheKey
    	CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
    	return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
  	}

  	public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    	ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
    	if (closed) {
      		throw new ExecutorException("Executor was closed.");
    	}
    	if (queryStack == 0 && ms.isFlushCacheRequired()) {
      		clearLocalCache();
    	}
    	List<E> list;
    	try {
      		queryStack++;
      		// 查询一级缓存
      		list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
      		if (list != null) {
        		// 存储过程相关逻辑
        		handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
      		} else {
        		// 缓存未命中,则从数据库中查询
        		list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
      		}
    	} finally {
      		queryStack--;
    	}
    	if (queryStack == 0) {
      		for (DeferredLoad deferredLoad : deferredLoads) {
        		deferredLoad.load();
      		}
      		// issue #601
      		deferredLoads.clear();
      		if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
        		// issue #482
        		clearLocalCache();
      		}
    	}
    	return list;
  	}

??如上,在访问一级缓存之前,MyBatis 首先会调用 createCacheKey 方法创建 CacheKey:

  	public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    	if (closed) {
      		throw new ExecutorException("Executor was closed.");
    	}
    	// 创建 CacheKey 对象
    	CacheKey cacheKey = new CacheKey();
    	// 将 MappedStatement 的 id 作为影响因子进行计算
    	cacheKey.update(ms.getId());
    	// RowBounds 用于分页查询,下面将它的两个字段作为影响因子进行计算
    	cacheKey.update(rowBounds.getOffset());
    	cacheKey.update(rowBounds.getLimit());
    	// 获取 sql 语句,并进行计算
    	cacheKey.update(boundSql.getSql());
    	List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
   	 	TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
    	// mimic DefaultParameterHandler logic
    	for (ParameterMapping parameterMapping : parameterMappings) {
      		if (parameterMapping.getMode() != ParameterMode.OUT) {
        		Object value;
        		// 当前大段代码用于获取 SQL 中的占位符 #{xxx} 对应的运行时参数
        		String propertyName = parameterMapping.getProperty();
        		if (boundSql.hasAdditionalParameter(propertyName)) {
          			value = boundSql.getAdditionalParameter(propertyName);
        		} else if (parameterObject == null) {
          			value = null;
        		} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
          			value = parameterObject;
        		} else {
          			MetaObject metaObject = configuration.newMetaObject(parameterObject);
          			value = metaObject.getValue(propertyName);
        		}
        		// 让运行时参数参与计算
        		cacheKey.update(value);
      		}
    	}
    	if (configuration.getEnvironment() != null) {
      		// 获取 Environment id 遍历,并让其参与计算
      		cacheKey.update(configuration.getEnvironment().getId());
    	}
    	return cacheKey;
  	}

??在计算 CacheKey 的过程中,有很多影响因子参与了计算。比如 MappedStatement 的id 字段,SQL 语句,分页参数,运行时变量,Environment 的 id 字段等。通过让这些影响因子参与计算,可以很好的区分不同查询请求。所以,我们可以简单的把 CacheKey 看做是一个查询请求的 id。有了 CacheKey,我们就可以使用它读写缓存了。在上面代码中,若一级缓存为命中,BaseExecutor 会调用 queryFromDatabase 查询数据库,并将查询结果写入缓存中。

  	private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    	List<E> list;
    	// 向缓存中存储一个占位符
    	localCache.putObject(key, EXECUTION_PLACEHOLDER);
    	try {
      		// 查询数据库
      		list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
    	} finally {
      		// 移除占位符
      		localCache.removeObject(key);
    	}
    	// 存储查询结果
    	localCache.putObject(key, list);
    	// 存储过程相关逻辑
    	if (ms.getStatementType() == StatementType.CALLABLE) {
      		localOutputParameterCache.putObject(key, parameter);
    	}
    	return list;
  	}

1.4 二级缓存

??二级缓存构建在一级缓存之上,在收到查询请求时,MyBatis 首先会查询二级缓存。若二级缓存未命中,再去查询一级缓存。与一级缓存不同,二级缓存和具体的命名空间绑定,一级缓存则是和 SqlSession 绑定
??在按照 MyBatis 规范使用 SqlSession 的情况下,一级缓存不存在并发问题。二级缓存则不然,二级缓存可在多个命名空间间共享。这种情况下,会存在并发问题,因此需要针对性的去处理。除了并发问题,二级缓存还存在事务问题,相关问题将在接下来进行分析。下面先来看一下CachingExecutor(位于org.apache.ibatis.executor)中的访问二级缓存的逻辑。

  	public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    	BoundSql boundSql = ms.getBoundSql(parameterObject);
    	// 创建 CacheKey
    	CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
    	return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  	}

  	public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
      throws SQLException {
    	// 从 MappedStatement 中获取 Cache,注意这里的 Cache
		// 并非是在 CachingExecutor 中创建的
    	Cache cache = ms.getCache();
    	// 如果配置文件中没有配置 <cache>,则 cache 为空
    	if (cache != null) {
      		flushCacheIfRequired(ms);
      		if (ms.isUseCache() && resultHandler == null) {
        		ensureNoOutParams(ms, boundSql);
        		// 访问二级缓存
        		@SuppressWarnings("unchecked")
        		List<E> list = (List<E>) tcm.getObject(cache, key);
        		// 缓存未命中
        		if (list == null) {
          			// 向一级缓存或者数据库进行查询
          			list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
          			// 缓存查询结果
          			tcm.putObject(cache, key, list); // issue #578 and #116
        		}
        		return list;
      		}
    	}
    	return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  	}

??注意二级缓存是从 MappedStatement 中获取的,而非由 CachingExecutor 创建。由于 MappedStatement 存在于全局配置中,可以被多个 CachingExecutor 获取到,这样就会出现线程安全问题。除此之外,若不加以控制,多个事务共用一个缓存实例,会导致脏读问题。线程安全问题可以通过 SynchronizedCache 装饰类解决,该装饰类会在 Cache 实例构造期间被添加上。至于脏读问题,需要借助其他类来处理,也就是上面代码中 tcm 变量对应的类型,即TransactionalCacheManager(位于org.apache.ibatis.cache)。

/** 事务缓存管理器 */
public class TransactionalCacheManager {
  	// Cache 与 TransactionalCache 的映射关系表
  	private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();

  	public void clear(Cache cache) {
    	// 获取 TransactionalCache 对象,并调用该对象的 clear 方法
    	getTransactionalCache(cache).clear();
  	}

  	public Object getObject(Cache cache, CacheKey key) {
    	return getTransactionalCache(cache).getObject(key);
  	}

 	public void putObject(Cache cache, CacheKey key, Object value) {
    	getTransactionalCache(cache).putObject(key, value);
  	}

  	public void commit() {
    	for (TransactionalCache txCache : transactionalCaches.values()) {
      		txCache.commit();
    	}
  	}

  	public void rollback() {
    	for (TransactionalCache txCache : transactionalCaches.values()) {
      		txCache.rollback();
    	}
  	}

  	private TransactionalCache getTransactionalCache(Cache cache) {
    	// 从映射表中获取 TransactionalCache
    	// TransactionalCache 也是一种装饰类,为 Cache 增加事务功能
    	return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
  	}
}

??TransactionalCacheManager 内部维护了 Cache 实例与 TransactionalCache 实例间的映射关系,该类也仅负责维护两者的映射关系,真正做事的还是 TransactionalCache(位于org.apache.ibatis.cache.decorators)。TransactionalCache 是一种缓存装饰器,可以为 Cache 实例增加事务功能。之前提到的脏读问题正是由该类进行处理的。

public class TransactionalCache implements Cache {

  	private static final Log log = LogFactory.getLog(TransactionalCache.class);

  	private final Cache delegate;
  	private boolean clearOnCommit;
  	// 在事务被?交前,所有从数据库中查询的结果将缓存在此集合中
  	private final Map<Object, Object> entriesToAddOnCommit;
  	// 在事务被?交前,当缓存未命中时,CacheKey 将会被存储在此集合中
  	private final Set<Object> entriesMissedInCache;

  	public TransactionalCache(Cache delegate) {
    	this.delegate = delegate;
    	this.clearOnCommit = false;
    	this.entriesToAddOnCommit = new HashMap<>();
    	this.entriesMissedInCache = new HashSet<>();
  	}

  	@Override
  	public String getId() {
    	return delegate.getId();
  	}

  	@Override
  	public int getSize() {
    	return delegate.getSize();
  	}

  	@Override
  	public Object getObject(Object key) {
    	// 查询 delegate 所代表的缓存
    	Object object = delegate.getObject(key);
    	if (object == null) {
      		// 缓存未命中,则将 key 存入到 entriesMissedInCache 中
      		entriesMissedInCache.add(key);
    	}
    	// issue #146
    	if (clearOnCommit) {
      		return null;
    	} else {
      		return object;
    	}
  	}

  	@Override
  	public void putObject(Object key, Object object) {
    	// 将键值对存入到 entriesToAddOnCommit 中,而非 delegate 缓存中
    	entriesToAddOnCommit.put(key, object);
  	}

  	@Override
  	public Object removeObject(Object key) {
    	return null;
  	}

  	@Override
  	public void clear() {
    	clearOnCommit = true;
    	// 清空 entriesToAddOnCommit,但不清空 delegate 缓存
    	entriesToAddOnCommit.clear();
  	}

  	public void commit() {
    	// 根据 clearOnCommit 的值决定是否清空 delegate
    	if (clearOnCommit) {
      		delegate.clear();
    	}
    	// 刷新未缓存的结果到 delegate 缓存中
    	flushPendingEntries();
    	// 重置 entriesToAddOnCommit 和 entriesMissedInCache
    	reset();
  	}

  	public void rollback() {
    	unlockMissedEntries();
    	reset();
  	}

  	private void reset() {
    	clearOnCommit = false;
    	// 清空集合
    	entriesToAddOnCommit.clear();
    	entriesMissedInCache.clear();
  	}

  	private void flushPendingEntries() {
    	for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
      	// 将 entriesToAddOnCommit 中的内容转存到 delegate 中
      	delegate.putObject(entry.getKey(), entry.getValue());
    	}
    	for (Object entry : entriesMissedInCache) {
      		if (!entriesToAddOnCommit.containsKey(entry)) {
        		// 存入空值
        		delegate.putObject(entry, null);
      		}
    	}
  	}

  	private void unlockMissedEntries() {
    	for (Object entry : entriesMissedInCache) {
      		try {
        		// 调用 removeObject 进行解锁
        		delegate.removeObject(entry);
      		} catch (Exception e) {
        		log.warn("Unexpected exception while notifiying a rollback to the cache adapter. "
            		+ "Consider upgrading your cache adapter to the latest version. Cause: " + e);
      		}
    	}
  	}
}

??在 TransactionalCache 的代码中,我们要重点关注 entriesToAddOnCommit 集合,TransactionalCache 中的很多方法都会与这个集合打交道。该集合用于存储从查询的结果,那为什么要将结果保存在该集合中,而非 delegate 所表示的缓存中呢?主要是因为直接存到 delegate 会导致脏数据问题。
??我们再来看一下 entriesMissedInCache 集合,这个集合是用于存储未命中缓存的查询请求所对应的 CacheKey。单独分析与 entriesMissedInCache 相关的逻辑没什么意义,要搞清 entriesMissedInCache 的实际用途,需要把它和 BlockingCache 的逻辑结合起来进行分析。在 BlockingCache,同一时刻仅允许一个线程通过 getObject 方法查询指定 key 对应的缓存项。如果缓存未命中,getObject 方法不会释放锁,导致其他线程被阻塞住。其他线程要想恢复运行,必须进行解锁,解锁逻辑由 BlockingCache 的 putObject 和 removeObject 方法执行。其中 putObject 会在TransactionalCache 的flushPendingEntries方法中被调用,removeObject方法则由 TransactionalCache 的 unlockMissedEntries 方法调用。flushPendingEntries 和unlockMissedEntries 最终都会遍历 entriesMissedInCache 集合,并将集合元素传给BlockingCache 的相关方法。这样可以解开指定 key 对应的锁,让阻塞线程恢复运行。

二、插件机制

??一般情况下,开源框架都会提供插件或其他形式的拓展点,供开发者自行拓展。这样的好处是显而易见的,一是增加了框架的灵活性。二是开发者可以结合实际需求,对框架进行拓展,使其能够更好的工作。以 MyBatis 为例,我们可基于 MyBatis 插件机制实现分页、分表,监控等功能。

2.1 插件机制原理

??在编写插件时,除了需要让插件类实现 Interceptor 接口外,还需要通过注解标注该插件的拦截点。所谓拦截点指的是插件所能拦截的方法,MyBatis 所允许拦截的方法如下:

  1. Executor::update, query, flushStatements, commit, rollback,
    getTransaction, close, isClosed
  2. ParameterHandler: getParameterObject, setParameters
  3. ResultSetHandler::handleResultSets, handleOutputParameters
  4. StatementHandler::prepare, parameterize, batch, update, query

??如果想要拦截 Executor 的 query 方法,那么可以这样定义插件:

@Intercepts({
	@Signature(
		type = Executor.class,
		method = "query",
		args ={MappedStatement.class, Object.class, RowBounds.class,
			ResultHandler.class}
	 )
})
public class ExamplePlugin implements Interceptor {
	// 省略逻辑
}

??除此之外,我们还需将插件配置到相关文件中。这样 MyBatis 在启动时可以加载插件,并保存插件实例到相关对象(InterceptorChain,拦截器链)中。待准备工作做完后,MyBatis处于就绪状态。我们在执行 SQL 时,需要先通过 DefaultSqlSessionFactory 创 建SqlSession 。Executor 实例会在创建 SqlSession 的过程中被创建,Executor 实例创建完毕后,MyBatis 会通过 JDK 动态代理为实例生成代理类。这样,插件逻辑即可在 Executor 相关方法被调用前执行。以上就是 MyBatis 插件机制的基本原理。

2.1.1 植入插件逻辑

??此处以 Executor 为例,分析 MyBatis 是如何为 Executor 实例植入插件逻辑的。Executor 实例是在开启 SqlSession 时被创建的,因此,下面从源头进行分析。先来看一下 SqlSession 开启的过程。先看DefaultSqlSessionFactory:

  	public SqlSession openSession() {
    	return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
  	}

	private SqlSession openSessionFromDataSource(ExecutorType execType,
  		TransactionIsolationLevel level, boolean autoCommit) {
		Transaction tx = null;
		try {
			// 省略部分逻辑
			// 创建 Executor
			final Executor executor = configuration.newExecutor(tx, execType);
			return new DefaultSqlSession(configuration, executor, autoCommit);
	 	}	
		catch (Exception e) {...}
		finally {...}
	}

??Executor 的创建过程封装在 Configuration 中:

  	public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
    	executorType = executorType == null ? defaultExecutorType : executorType;
    	executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
    	Executor executor;
    	// 根据 executorType 创建相应的 Executor 实例
    	if (ExecutorType.BATCH == executorType) {
      		executor = new BatchExecutor(this, transaction);
    	} else if (ExecutorType.REUSE == executorType) {
      		executor = new ReuseExecutor(this, transaction);
    	} else {
      		executor = new SimpleExecutor(this, transaction);
    	}
    	if (cacheEnabled) {
      		executor = new CachingExecutor(executor);
    	}
    	// 植入插件
    	executor = (Executor) interceptorChain.pluginAll(executor);
    	return executor;
  	}

??newExecutor 方法在创建好 Executor 实例后,紧接着通过拦截器链 interceptorChain为 Executor 实例植入代理逻辑。接下来看下InterceptorChain(位于org.apache.ibatis.plugin):

public class InterceptorChain {

  	private final List<Interceptor> interceptors = new ArrayList<>();

  	public Object pluginAll(Object target) {
    	// 遍历拦截器集合
    	for (Interceptor interceptor : interceptors) {
      		// 调用拦截器的 plugin 方法植入相应的插件逻辑
      		target = interceptor.plugin(target);
    	}
    	return target;
  	}
  	/** 添加插件实例到 interceptors 集合中 */
  	public void addInterceptor(Interceptor interceptor) {
    	interceptors.add(interceptor);
  	}
  	/** 获取插件列表 */
  	public List<Interceptor> getInterceptors() {
    	return Collections.unmodifiableList(interceptors);
  	}
}

??pluginAll 方法会调用具体插件的plugin 方法植入相应的插件逻辑。如果有多个插件,则会多次调用 plugin 方法,最终生成一个层层嵌套的代理类。形如下面:

??当 Executor 的某个方法被调用的时候,插件逻辑会先行执行。执行顺序由外而内,比如上图的执行顺序为 plugin3 → plugin2 → Plugin1 → Executor。
??plugin 方法是由具体的插件类实现,不过该方法代码一般比较固定,所以下面找个示例分析一下,例如ExamplePlugin:

	public Object plugin(Object target) {
		return Plugin.wrap(target, this);
	}

??继续看Plugin(位于org.apache.ibatis.plugin):

  	public static Object wrap(Object target, Interceptor interceptor) {
    	// 获取插件类 @Signature 注解内容,并生成相应的映射结构。形如下面:
    	// {
		// Executor.class : [query, update, commit],
		// ParameterHandler.class : [getParameterObject, setParameters]
		// }
    	Map<Class<?>, Set<Method>> signatureMap = getSignatureMap(interceptor);
    	Class<?> type = target.getClass();
    	// 获取目标类实现的接口
    	Class<?>[] interfaces = getAllInterfaces(type, signatureMap);
    	if (interfaces.length > 0) {
      		// 通过 JDK 动态代理为目标类生成代理类
      		return Proxy.newProxyInstance(
          		type.getClassLoader(),
          		interfaces,
          		new Plugin(target, interceptor, signatureMap));
    	}
    	return target;
  	}

??plugin 方法在内部调用了 Plugin 类的 wrap 方法,用于为目标对象生成代理。Plugin类实现了InvocationHandler接口,因此它可以作为参数传给Proxy的newProxyInstance方法。
??关于插件植入的逻辑就分析完了。接下来,我们来看看插件逻辑是怎样执行的。

2.1.2 执行插件逻辑

??Plugin 实现了 InvocationHandler 接口,因此它的 invoke 方法会拦截所有的方法调用。invoke 方法会对所拦截的方法进行检测,以决定是否执行插件逻辑。

  	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    	try {
      		// 获取被拦截方法列表,比如:signatureMap.get(Executor.class),
	  		// 可能返回 [query, update, commit]
      		Set<Method> methods = signatureMap.get(method.getDeclaringClass());
      		// 检测方法列表是否包含被拦截的方法
      		if (methods != null && methods.contains(method)) {
        		// 执行插件逻辑
        		return interceptor.intercept(new Invocation(target, method, args));
      		}
      		// 执行被拦截的方法
      		return method.invoke(target, args);
    	} catch (Exception e) {
      		throw ExceptionUtil.unwrapThrowable(e);
    	}
  	}

??invoke 方法会检测被拦截方法是否配置在插件的 @Signature 注解中,若是,则执行插件逻辑,否则执行被拦截方法。插件逻辑封装在 intercept 中,该方法的参数类型为 Invocation。Invocation 主要用于存储目标类,方法以及方法参数列表。下面简单看一下Invocation(位于org.apache.ibatis.plugin):

public class Invocation {

  	private final Object target;
  	private final Method method;
  	private final Object[] args;

  	public Invocation(Object target, Method method, Object[] args) {
    	this.target = target;
    	this.method = method;
    	this.args = args;
  	}

  	public Object getTarget() {
    	return target;
  	}

  	public Method getMethod() {
    	return method;
  	}

  	public Object[] getArgs() {
    	return args;
  	}

  	public Object proceed() throws InvocationTargetException, IllegalAccessException {
    	// 调用被拦截的方法
	   return method.invoke(target, args);
  	}
}

2.2 实现一个分页插件

??本节将实现一个 MySQL 数据库分页插件。相关代码如下:

@Intercepts({
	@Signature(
		type = Executor.class, // 目标类
		method = "query", // 目标方法
		args ={MappedStatement.class,
			Object.class, RowBounds.class, ResultHandler.class}
	 )
})

public class MySqlPagingPlugin implements Interceptor {
	private static final Integer MAPPED_STATEMENT_INDEX = 0;
	private static final Integer PARAMETER_INDEX = 1;
	private static final Integer ROW_BOUNDS_INDEX = 2;
	
	@Override
	public Object intercept(Invocation invocation) throws Throwable {
	  	Object[] args = invocation.getArgs();
		RowBounds rb = (RowBounds) args[ROW_BOUNDS_INDEX];
		// 无需分页
		if (rb == RowBounds.DEFAULT) {
			return invocation.proceed();
		}
		// 将原 RowBounds 参数设为 RowBounds.DEFAULT,关闭 MyBatis 内置的分页机制
		args[ROW_BOUNDS_INDEX] = RowBounds.DEFAULT;
		MappedStatement ms = (MappedStatement) args[MAPPED_STATEMENT_INDEX];
		BoundSql boundSql = ms.getBoundSql(args[PARAMETER_INDEX]);
		// 获取 SQL 语句,拼接 limit 语句
		String sql = boundSql.getSql();
		String limit = String.format("LIMIT %d,%d", rb.getOffset(), rb.getLimit());
		sql = sql + " " + limit;
		// 创建一个 StaticSqlSource,并将拼接好的 sql 传入
		SqlSource sqlSource = new StaticSqlSource(
			ms.getConfiguration(), sql, boundSql.getParameterMappings());
		// 通过反射获取并设置 MappedStatement 的 sqlSource 字段
		Field field = MappedStatement.class.getDeclaredField("sqlSource");
		field.setAccessible(true);
		field.set(ms, sqlSource);
		// 执行被拦截方法
		return invocation.proceed();
	}

	@Override
	public Object plugin(Object target) {
		return Plugin.wrap(target, this);
	}
	
	@Override
	public void setProperties(Properties properties) {
 	}
}

??上面的分页插件通过 RowBounds 参数获取分页信息,并生成相应的 limit 语句。之后拼接 sql,并使用该 sql 作为参数创建 StaticSqlSource。最后通过反射替换 MappedStatement 对象中的 sqlSource 字段。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-24 18:32:57  更:2021-12-24 18:35:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/27 2:51:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计