Redis分片模式接入
1. 参考
flea-frame-cache使用之Redis分片模式接入 源代码v1.1.0
2. 依赖
jedis-3.0.1.jar
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>
spring-context-4.3.18.RELEASE.jar
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.18.RELEASE</version>
</dependency>
spring-context-support-4.3.18.RELEASE.jar
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>4.3.18.RELEASE</version>
</dependency>
3. 基础接入
可参考笔者的这篇博文 Memcached接入,不再赘述。
可参考笔者的这篇博文 Memcached接入,不再赘述。
注意该版,相比《flea-frame-cache使用之Redis接入》博文中,废弃如下与 ShardedJedis 有关的方法:
ShardedJedisPool getJedisPool();
void setShardedJedis(ShardedJedis shardedJedis);
ShardedJedis getShardedJedis();
《flea-frame-cache使用之Redis接入》博文中 提到了使用 Redis客户端代理方式 访问 RedisClient, 在这版为了实现Redis访问异常后的重试机制,废弃了代理模式,采用了命令行模式,可参考下面的 RedisClientCommand。
public abstract class RedisClientCommand<T> {
private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(RedisClientCommand.class);
private final ShardedJedisPool shardedJedisPool;
private final int maxAttempts;
public RedisClientCommand(ShardedJedisPool shardedJedisPool, int maxAttempts) {
this.shardedJedisPool = shardedJedisPool;
this.maxAttempts = maxAttempts;
}
public abstract T execute(ShardedJedis connection);
public T run() {
return runWithRetries(this.maxAttempts);
}
private T runWithRetries(int attempts) {
if (attempts <= 0) {
throw new FleaCacheMaxAttemptsException("No more attempts left.");
}
ShardedJedis connection = null;
try {
connection = shardedJedisPool.getResource();
Object obj = null;
if (LOGGER.isDebugEnabled()) {
obj = new Object() {};
LOGGER.debug1(obj, "Get ShardedJedis = {}", connection);
}
T result = execute(connection);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug1(obj, "Result = {}", result);
}
return result;
} catch (JedisConnectionException e) {
releaseConnection(connection);
connection = null;
if (LOGGER.isErrorEnabled()) {
Object obj = new Object() {};
LOGGER.error1(obj, "Redis连接异常:", e);
int currAttempts = this.maxAttempts - attempts + 1;
LOGGER.error1(obj, "第 {} 次尝试失败,开始第 {} 次尝试...", currAttempts, currAttempts + 1);
}
return runWithRetries(attempts - 1);
} finally {
releaseConnection(connection);
}
}
private void releaseConnection(ShardedJedis connection) {
if (ObjectUtils.isNotEmpty(connection)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug1(new Object() {}, "Close ShardedJedis");
}
connection.close();
}
}
}
分片模式 Redis 客户端 主要使用 ShardedJedis 来操作 Redis 数据。
public class FleaRedisShardedClient extends FleaRedisClient {
private ShardedJedisPool shardedJedisPool;
private int maxAttempts;
private FleaRedisShardedClient() {
this(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
}
private FleaRedisShardedClient(String poolName) {
super(poolName);
init();
}
private void init() {
if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(getPoolName())) {
shardedJedisPool = RedisShardedPool.getInstance().getJedisPool();
maxAttempts = RedisShardedConfig.getConfig().getMaxAttempts();
} else {
shardedJedisPool = RedisShardedPool.getInstance(getPoolName()).getJedisPool();
maxAttempts = CacheConfigUtils.getMaxAttempts();
}
}
@Override
public String set(final String key, final Object value) {
return new RedisClientCommand<String>(this.shardedJedisPool, this.maxAttempts) {
@Override
public String execute(ShardedJedis connection) {
if (value instanceof String)
return connection.set(key, (String) value);
else
return connection.set(SafeEncoder.encode(key), ObjectUtils.serialize(value));
}
}.run();
}
@Override
public String set(final byte[] key, final byte[] value) {
}
@Override
public String set(final String key, final Object value, final int expiry) {
}
@Override
public String set(final byte[] key, final byte[] value, final int expiry) {
}
@Override
public String set(final String key, final Object value, final long expiry) {
}
@Override
public String set(final byte[] key, final byte[] value, final long expiry) {
}
@Override
public String set(final String key, final Object value, final SetParams params) {
}
@Override
public String set(final byte[] key, final byte[] value, final SetParams params) {
}
@Override
public byte[] get(final byte[] key) {
}
@Override
public Long del(final String key) {
}
@Override
protected Client getClientByKey(final Object key) {
}
public static class Builder {
}
}
该类的构造函数初始化逻辑,可以看出我们使用了 RedisShardedPool, 下面来介绍一下。
上个版本我们使用 RedisPool 初始化Redis相关配置信息,为了体现Redis分片模式,这个版本里面,我们使用 RedisShardedPool 用于Redis相关配置信息的初始化,其中重点是获取分布式Jedis连接池 ShardedJedisPool ,该类其中一个构造方法如下:
public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards,
Hashing algo)
public class RedisShardedPool {
private static final ConcurrentMap<String, RedisShardedPool> redisPools = new ConcurrentHashMap<>();
private String poolName;
private ShardedJedisPool shardedJedisPool;
private RedisShardedPool(String poolName) {
this.poolName = poolName;
}
public static RedisShardedPool getInstance() {
return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
}
public static RedisShardedPool getInstance(String poolName) {
if (!redisPools.containsKey(poolName)) {
synchronized (redisPools) {
if (!redisPools.containsKey(poolName)) {
RedisShardedPool redisShardedPool = new RedisShardedPool(poolName);
redisPools.putIfAbsent(poolName, redisShardedPool);
}
}
}
return redisPools.get(poolName);
}
public void initialize() {
}
public void initialize(List<CacheServer> cacheServerList) {
}
public String getPoolName() {
return poolName;
}
public ShardedJedisPool getJedisPool() {
if (ObjectUtils.isEmpty(shardedJedisPool)) {
throw new FleaCacheConfigException("获取分布式Jedis连接池失败:请先调用initialize初始化");
}
return shardedJedisPool;
}
}
3.7 Redis配置文件
flea-frame-cache读取 redis.properties(Redis配置文件),用作初始化 RedisShardedPool
redis.systemName=FleaFrame
redis.server=127.0.0.1:10001,127.0.0.1:10002,127.0.0.1:10003
redis.password=huazie123,huazie123,huazie123
redis.weight=1,1,1
redis.connectionTimeout=2000
redis.soTimeout=2000
redis.hashingAlg=1
redis.pool.maxTotal=100
redis.pool.maxIdle=10
redis.pool.minIdle=0
redis.pool.maxWaitMillis=2000
redis.maxAttempts=5
redis.nullCacheExpiry=10
该类继承抽象Flea缓存类 AbstractFleaCache ,其构造方法可见如需要传入Redis客户端 RedisClient ,相关使用下面介绍:
public class RedisFleaCache extends AbstractFleaCache {
private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(RedisFleaCache.class);
private RedisClient redisClient;
private CacheModeEnum cacheMode;
public RedisFleaCache(String name, int expiry, int nullCacheExpiry, CacheModeEnum cacheMode, RedisClient redisClient) {
super(name, expiry, nullCacheExpiry);
this.cacheMode = cacheMode;
this.redisClient = redisClient;
if (CacheUtils.isClusterMode(cacheMode))
cache = CacheEnum.RedisCluster;
else
cache = CacheEnum.RedisSharded;
}
@Override
public Object getNativeValue(String key) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug1(new Object() {}, "KEY = {}", key);
}
return redisClient.get(key);
}
@Override
public Object putNativeValue(String key, Object value, int expiry) {
if (LOGGER.isDebugEnabled()) {
Object obj = new Object() {};
LOGGER.debug1(obj, "REDIS FLEA CACHE, KEY = {}", key);
LOGGER.debug1(obj, "REDIS FLEA CACHE, VALUE = {}", value);
LOGGER.debug1(obj, "REDIS FLEA CACHE, EXPIRY = {}s", expiry);
LOGGER.debug1(obj, "REDIS FLEA CACHE, NULL CACHE EXPIRY = {}s", getNullCacheExpiry());
}
if (ObjectUtils.isEmpty(value)) {
return redisClient.set(key, new NullCache(key), getNullCacheExpiry());
} else {
if (expiry == CommonConstants.NumeralConstants.INT_ZERO) {
return redisClient.set(key, value);
} else {
return redisClient.set(key, value, expiry);
}
}
}
@Override
public Object deleteNativeValue(String key) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug1(new Object() {}, "KEY = {}", key);
}
return redisClient.del(key);
}
@Override
public String getSystemName() {
if (CacheUtils.isClusterMode(cacheMode))
return RedisClusterConfig.getConfig().getSystemName();
else
return RedisShardedConfig.getConfig().getSystemName();
}
}
可参考笔者的这篇博文 Memcached接入,不再赘述。
该类继承抽象Flea缓存管理类 AbstractFleaCacheManager,构造方法使用了 RedisClientFactory 获取分片模式下默认连接池的Redis客户端 RedisClient,可在 3.11 查看。newCache 方法返回的是 RedisFleaCache 的实例对象,每一类 Redis 缓存数据都对应了一个 RedisFleaCache 的实例对象。
public class RedisShardedFleaCacheManager extends AbstractFleaCacheManager {
private RedisClient redisClient;
public RedisShardedFleaCacheManager() {
RedisShardedPool.getInstance().initialize();
redisClient = RedisClientFactory.getInstance();
}
@Override
protected AbstractFleaCache newCache(String name, int expiry) {
int nullCacheExpiry = RedisShardedConfig.getConfig().getNullCacheExpiry();
return new RedisFleaCache(name, expiry, nullCacheExpiry, CacheModeEnum.SHARDED, redisClient);
}
}
Redis 客户端工厂类,有四种方式获取 Redis 客户端:
- 一是获取分片模式下默认连接池的 Redis 客户端,应用在单个缓存接入场景【3.10 采用】;
- 二是获取指定模式下默认连接池的 Redis 客户端,应用在单个缓存接入场景;
- 三是获取分片模式下指定连接池的 Redis 客户端,应用在整合缓存接入场景;
- 四是获取指定模式下指定连接池的 Redis 客户端,应用在整合缓存接入场景。
public class RedisClientFactory {
private static final ConcurrentMap<String, RedisClient> redisClients = new ConcurrentHashMap<>();
private RedisClientFactory() {
}
public static RedisClient getInstance() {
return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
}
public static RedisClient getInstance(CacheModeEnum mode) {
return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME, mode);
}
public static RedisClient getInstance(String poolName) {
return getInstance(poolName, CacheModeEnum.SHARDED);
}
public static RedisClient getInstance(String poolName, CacheModeEnum mode) {
String key = StringUtils.strCat(poolName, CommonConstants.SymbolConstants.UNDERLINE, StringUtils.valueOf(mode.getMode()));
if (!redisClients.containsKey(key)) {
synchronized (redisClients) {
if (!redisClients.containsKey(key)) {
RedisClientStrategyContext context = new RedisClientStrategyContext(poolName);
redisClients.putIfAbsent(key, FleaStrategyFacade.invoke(mode.name(), context));
}
}
}
return redisClients.get(key);
}
}
在上面 的 getInstance(String poolName, CacheModeEnum mode) 方法中,使用了 RedisClientStrategyContext ,用于定义 Redis 客户端策略上下文。根据不同的缓存模式,就可以找到对应的 Redis 客户端策略。
该类中包含了 Redis 分片 和 Redis 集群 相关的客户端策略。
public class RedisClientStrategyContext extends FleaStrategyContext<RedisClient, String> {
public RedisClientStrategyContext() {
super();
}
public RedisClientStrategyContext(String contextParam) {
super(contextParam);
}
@Override
protected Map<String, IFleaStrategy<RedisClient, String>> init() {
Map<String, IFleaStrategy<RedisClient, String>> fleaStrategyMap = new HashMap<>();
fleaStrategyMap.put(CacheModeEnum.SHARDED.name(), new RedisShardedClientStrategy());
fleaStrategyMap.put(CacheModeEnum.CLUSTER.name(), new RedisClusterClientStrategy());
return Collections.unmodifiableMap(fleaStrategyMap);
}
}
public class RedisShardedClientStrategy implements IFleaStrategy<RedisClient, String> {
@Override
public RedisClient execute(String poolName) throws FleaStrategyException {
RedisClient originRedisClient;
if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(poolName)) {
originRedisClient = new FleaRedisShardedClient.Builder().build();
} else {
originRedisClient = new FleaRedisShardedClient.Builder(poolName).build();
}
return originRedisClient;
}
}
好了,到这里我们可以来测试 Redis 分片模式。
首先,这里需要按照 Redis 配置文件中的地址部署相应的 Redis 服务,可参考笔者的 这篇博文。
@Test
public void testRedisShardedFleaCache() {
try {
AbstractFleaCacheManager manager = FleaCacheManagerFactory.getFleaCacheManager(CacheEnum.RedisSharded.getName());
AbstractFleaCache cache = manager.getCache("fleaparadetail");
LOGGER.debug("Cache={}", cache);
cache.put("menu1", "huazie");
cache.put("menu2", null);
cache.getCacheKey();
LOGGER.debug(cache.getCacheName() + ">>>" + cache.getCacheDesc());
} catch (Exception e) {
LOGGER.error("Exception:", e);
}
}
4. 进阶接入
可参考笔者的这篇博文 Memcached接入,不再赘述。
该类继承抽象 Spring 缓存 AbstractSpringCache,用于对接 Spring; 从构造方法可见,该类初始化还是使用 RedisFleaCache。
public class RedisSpringCache extends AbstractSpringCache {
public RedisSpringCache(String name, IFleaCache fleaCache) {
super(name, fleaCache);
}
public RedisSpringCache(String name, int expiry, int nullCacheExpiry, CacheModeEnum cacheMode, RedisClient redisClient) {
this(name, new RedisFleaCache(name, expiry, nullCacheExpiry, cacheMode, redisClient));
}
}
可参考笔者的这篇博文 Memcached接入,不再赘述。
该类继承抽象 Spring 缓存管理类 AbstractSpringCacheManager,用于对接Spring; 基本实现同 RedisShardedFleaCacheManager,唯一不同在于 newCache 的实现。
public class RedisShardedSpringCacheManager extends AbstractSpringCacheManager {
private RedisClient redisClient;
public RedisShardedSpringCacheManager() {
RedisShardedPool.getInstance().initialize();
redisClient = RedisClientFactory.getInstance();
}
@Override
protected AbstractSpringCache newCache(String name, int expiry) {
int nullCacheExpiry = RedisShardedConfig.getConfig().getNullCacheExpiry();
return new RedisSpringCache(name, expiry, nullCacheExpiry, CacheModeEnum.SHARDED, redisClient);
}
}
4.5 spring 配置
<bean id="redisShardedSpringCacheManager" class="com.huazie.frame.cache.redis.manager.RedisShardedSpringCacheManager">
<property name="configMap">
<map>
<entry key="fleaconfigdata" value="86400"/>
</map>
</property>
</bean>
<cache:annotation-driven cache-manager="redisShardedSpringCacheManager" proxy-target-class="true"/>
4.6 缓存自测
private ApplicationContext applicationContext;
@Before
public void init() {
applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
LOGGER.debug("ApplicationContext={}", applicationContext);
}
@Test
public void testRedisShardedSpringCache() {
try {
AbstractSpringCacheManager manager = (RedisShardedSpringCacheManager) applicationContext.getBean("redisShardedSpringCacheManager");
LOGGER.debug("RedisCacheManager={}", manager);
AbstractSpringCache cache = manager.getCache("fleaconfigdata");
LOGGER.debug("Cache={}", cache);
Set<String> cacheKey = cache.getCacheKey();
LOGGER.debug("CacheKey = {}", cacheKey);
} catch (Exception e) {
LOGGER.error("Exception:", e);
}
}
结语
Redis 接入重构工作已经全部结束,当前版本为 Redis 分片模式。下一篇博文,我将要介绍 Redis 集群模式的接入工作,敬请期待!!!
|