? ? ? ? 前面有一章?Redis集群环境搭建?如果没有搭建环境的可以参考下,也是一些爬坑记录,如果操作有问题的欢迎留言讨论,在准备好环境后我们就来使用redis 实现一下分布式锁的一个业务情况。
前期准备
1. 先创建一个商品管理项目来作为本次功能实现的demo
?2. 关于商品基本的增删改查代码就不做展示了,自行去写或者参考我前面给出来的代码
?3. 把本次需要用的redis 及redisson的依赖引入
<!--redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.0</version>
</dependency>
4. redisson.yml 配置文件内容 自己调整一下集群地址,或者其他相关配置如密码等
clusterServersConfig:
# 连接空闲超时,单位:毫秒 默认10000
idleConnectionTimeout: 10000
pingTimeout: 1000
# 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000
connectTimeout: 10000
# 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000
timeout: 3000
# 命令失败重试次数
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
# 重新连接时间间隔,单位:毫秒
reconnectionTimeout: 10000
# 执行失败最大次数
failedAttempts: 3
# 密码
# password: test1234
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
clientName: null
# loadBalancer 负载均衡算法类的选择
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
#从节点发布和订阅连接的最小空闲连接数
slaveSubscriptionConnectionMinimumIdleSize: 1
#从节点发布和订阅连接池大小 默认值50
slaveSubscriptionConnectionPoolSize: 50
# 从节点最小空闲连接数 默认值32
slaveConnectionMinimumIdleSize: 32
# 从节点连接池大小 默认64
slaveConnectionPoolSize: 64
# 主节点最小空闲连接数 默认32
masterConnectionMinimumIdleSize: 32
# 主节点连接池大小 默认64
masterConnectionPoolSize: 64
# 订阅操作的负载均衡模式
subscriptionMode: SLAVE
# 只在从服务器读取
readMode: SLAVE
# 集群地址
nodeAddresses:
- "redis://114.116.51.164:6379"
- "redis://114.116.51.164:6380"
- "redis://114.116.51.164:6381"
- "redis://114.116.51.164:6382"
- "redis://114.116.51.164:6383"
- "redis://114.116.51.164:6384"
# 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000
scanInterval: 1000
#这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2
threads: 0
#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2
nettyThreads: 0
# 编码方式 默认org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#传输模式
transportMode: NIO
# 分布式锁自动过期时间,防止死锁,默认30000
lockWatchdogTimeout: 30000
# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true
keepPubSubOrder: true
# 用来指定高性能引擎的行为。由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试。
#
#该参数仅限于Redisson PRO版本。
#performanceMode: HIGHER_THROUGHPUT
5. RedisConfig配置类
package com.andy.protect.util;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.IOException;
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redisson() throws IOException {
Config config = Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream());
RedissonClient redisson = Redisson.create(config);
return redisson;
}
@Bean
public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) {
return new RedissonConnectionFactory(redisson);
}
@Bean("redisTemplate")
public RedisTemplate getRedisTemplate(RedisConnectionFactory redissonConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redissonConnectionFactory);
redisTemplate.setValueSerializer(valueSerializer());
redisTemplate.setKeySerializer(keySerializer());
redisTemplate.setHashKeySerializer(keySerializer());
redisTemplate.setHashValueSerializer(valueSerializer());
return redisTemplate;
}
@Bean
public RedisSerializer keySerializer() {
return new StringRedisSerializer();
}
@Bean
public RedisSerializer valueSerializer() {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
return jackson2JsonRedisSerializer;
}
}
6. RedisUtil 工具类,redis集群里面也有这个,再贴一次吧
package com.andy.protect.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* redis使用简单工具类
*/
@Component
public class RedisUtil {
@Autowired
private RedisTemplate redisTemplate;
public void set(String key, Object value) {
redisTemplate.opsForValue().set(key, value);
}
public void set(String key, Object value, long timeout, TimeUnit unit) {
redisTemplate.opsForValue().set(key, value, timeout, unit);
}
public boolean setIfAbsent(String key, Object value, long timeout, TimeUnit unit) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
public <T> T get(String key, Class<?> T) {
return (T) redisTemplate
.opsForValue().get(key);
}
public void delete(String key){
redisTemplate.delete(key);
}
public String get(String key) {
return (String) redisTemplate
.opsForValue().get(key);
}
public Long decr(String key) {
return redisTemplate
.opsForValue().decrement(key);
}
public Long decr(String key, long delta) {
return redisTemplate
.opsForValue().decrement(key, delta);
}
public Long incr(String key) {
return redisTemplate
.opsForValue().increment(key);
}
public Long incr(String key, long delta) {
return redisTemplate
.opsForValue().increment(key, delta);
}
public void expire(String key, long time, TimeUnit unit) {
redisTemplate.expire(key, time, unit);
}
}
到此,准备工作完成
接下来说明一下
解决思路及代码实现
????????
思路我都直接写到代码中了,注释相对也算齐全,偷个懒就不再去分析了
package com.andy.protect.service.impl;
import com.alibaba.fastjson.JSON;
import com.andy.protect.dao.ProtectInfoDao;
import com.andy.protect.entity.ProtectInfo;
import com.andy.protect.service.ProtectService;
import com.andy.protect.util.RedisUtil;
import jodd.util.StringUtil;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁实现热点数据查询问题
*
* 解决思路:
* 前置准备,将redis依赖及redisSon(加锁解决redis相关问题)依赖加上
* 1.为了防止高并发请求,所以第一步应该是在查询时先查询redis缓存信息,没有时再到数据库查询
* a.当查询到了缓存数据,则更新缓存时间,并返回
* b.没有查询到缓存,查询数据库,并且在查询成功后,将数据添加到缓存中
* 2.当数据有更新情况发生,缓存应该也对应需要更新,可以选择直接删除缓存,下次查询的时候再缓存到redis,也可以再设置一次
* 3.为了防止缓存击穿(缓存失效)大量的数据同时失效问题,所以我们要给过期时间上加一个随机数
* 4.当缓存没有预热,大量请求打入,此时就会有问题,所以需要锁来保证当没有缓存时,只有一个来进行缓存重建
* 5.用读写锁来保证缓存与数据库数据不一致问题,get使用读锁,update用写锁
*/
@Service
public class ProtectServiceImpl implements ProtectService {
//默认缓存时间为一天
static final long CACHE_TIME = 60*60*24;
//定义商品缓存的key
static final String PROTECT_KEY = "protect:cache:";
//锁的名称 商品热点缓存创建保证缓存预热重建
public static final String LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX = "lock:product:hot_cache_create:";
//读写锁,来保证数据库与缓存数据一致问题
public static final String LOCK_PRODUCT_UPDATE = "lock:product:update:";
//null对象内
static final String EMPTY_CACHE="{}";
@Autowired
private ProtectInfoDao protectInfoDao;
@Autowired
private RedisUtil redisUtil;
/**
* redisson注入
*/
@Autowired
private Redisson redisson;
@Override
public ProtectInfo add(ProtectInfo protectInfo) {
ProtectInfo add = protectInfoDao.add(protectInfo);
try{
//新增一个将数据保存redis
redisUtil.set(PROTECT_KEY+add.getId(),JSON.toJSONString(add),getCacheTime(),TimeUnit.SECONDS);
}catch (Exception e){}
return add;
}
@Override
public boolean delete(ProtectInfo protectInfo) {
boolean delete = protectInfoDao.delete(protectInfo);
try {
//删除成功,缓存也一并删除
if (delete) {
redisUtil.delete(PROTECT_KEY + 1);
}
}catch (Exception e){}
return delete;
}
@Override
public boolean update(ProtectInfo protectInfo) {
boolean update = false;
//更新时加写锁,保证缓存一致
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE + protectInfo.getId());
RLock rLock = readWriteLock.writeLock();
rLock.lock();
try{
update = protectInfoDao.update(protectInfo);
//更新redis
redisUtil.set(PROTECT_KEY+protectInfo.getId(),JSON.toJSONString(protectInfo),getCacheTime(),TimeUnit.SECONDS);
}
finally {
rLock.unlock();
}
return update;
}
@Override
public ProtectInfo get(int id) {
ProtectInfo protectInfo = null;
//先查询redis有没有存在,如果有则返回,没有才查询数据库
String protectKey = PROTECT_KEY + id;
//查询redis缓存是否有数据,有则返回
protectInfo = getProtectInfoByCache(protectKey);
if(protectInfo != null){
return protectInfo;
}
/**-------上面是缓存查询操作,下面是数据库查询阶段---------*/
//加锁来保证热点缓存并发重建问题
RLock lock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX+id);
lock.lock();
try {
//当热点缓存重建时,两次查询防止高并发情况,当第一个线程重建了缓存,后续的线程就不需要再来一遍加锁然后再去查询数据库的情况,可以直接返回
protectInfo = getProtectInfoByCache(protectKey);
if(protectInfo != null){
return protectInfo;
}
/*********使用读锁来保证数据库与缓存不一致问题***********/
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE + id);
RLock rLock = readWriteLock.readLock();
rLock.lock();
try {
//缓存中没有数据,查询数据库,并且添加到缓存中
protectInfo = protectInfoDao.get(id);
if (protectInfo == null) {
//将{}放入缓存,防止相同的null攻击 缓存击穿问题处理
redisUtil.set(PROTECT_KEY + protectInfo.getId(), EMPTY_CACHE, nullCacheTime(), TimeUnit.SECONDS);
} else {
redisUtil.set(PROTECT_KEY + protectInfo.getId(), JSON.toJSONString(protectInfo), getCacheTime(), TimeUnit.SECONDS);
}
}catch (Exception e){}
finally {
//释放读锁
rLock.unlock();
}
}catch (Exception e){}
finally {
//释放热点缓存并发重建问题锁
lock.unlock();
}
return protectInfo;
}
/**
* 返回缓存+随机五小时内时间
* @return
*/
public long getCacheTime(){
//加上随机时间解决缓存击穿问题
return CACHE_TIME + new Random().nextInt(5) * 60 * 60;
}
public long nullCacheTime(){
//空值的随机时间(其实可以不必要,主要是防止不停的访问,如果使用布隆过滤器,就没有太大必要了)
return 60 + new Random().nextInt(30);
}
/**
* 获取缓存数据
* 有就返回,可能返回new一个空的对象
* 没有数据就返回 null
*/
public ProtectInfo getProtectInfoByCache(String key){
ProtectInfo product = null;
String productStr = redisUtil.get(key);
if (!StringUtil.isEmpty(productStr)) {
//查询到延期空对象,防止重复访问
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(key, nullCacheTime(), TimeUnit.SECONDS);
return new ProtectInfo();
}
product = JSON.parseObject(productStr, ProtectInfo.class);
//缓存读延期
redisUtil.expire(key, getCacheTime(), TimeUnit.SECONDS);
}
return product;
}
}
这个代码基本也对redis缓存的常见问题做了处理,这个有必要说明一下
1.缓存数据库不一致,指更新了数据库,缓存数据可能还是老数据
????????通过更新时删除和redisson读写锁来保证,但是没办法100% 还可以使用zookeeper来做,不过redis效率相对更高,CP AP总要牺牲一个
2.缓存击穿(缓存失效) 大量数据失效,击穿缓存,打到数据库
? ? ? ? 使用过期时间加随机数避免大量数据同时失效
3.缓存重建 缓存还没有数据,海量请求同时进入,每个人都在查询并保存缓存
? ? ? ? 加锁只允许一个请求重建缓存 还可以提前往缓存插入数据(比如秒杀)
4.缓存穿透?击穿和穿透的区别就是穿透是连数据库也没有数据(黑客攻击)不停请求数据库
? ? ? ? 可以将查不到的数据也放到redis,也可以使用布隆过滤器
5.缓存雪崩?和缓存击穿类似的,区别是击穿一般指一条数据高并发请求,雪崩是不同的数据大批量过期
? ? ? ? 也可通过随机时间,热点数据不过期,或者分布到不同服务器等方案解决
总结
关于redis实现分布式锁的实例就到这儿了,工作中使用可能没有必要那么全,程序都是要去适应业务的,业务量达不到,像加锁或者其他操作这些都没有必要,反而影响性能,根据实际业务去完成适合的程序才是一个优秀的程序员(毕竟要考虑成本)。你写的很多,用牛刀去杀鸡,后面基础弱的又维护不了,完全没有必要的结果
|