相关链接: Redis Key-Value数据库【初级】:https://blog.csdn.net/qq_41822345/article/details/125527045 Redis Key-Value数据库【高级】:https://blog.csdn.net/qq_41822345/article/details/125568007 Redis Key-Value数据库【实战】:https://blog.csdn.net/qq_41822345/article/details/125568012
一、手机验证码【简单】
需求:
1、输入手机号,点击发送后随机生成6位数字码,2分钟有效。 2、输入验证码,点击验证,返回成功或失败。 3、每个手机号每天只能输入3次。
import redis.clients.jedis.Jedis;
import java.util.Random;
public class PhoneCode {
public static void main(String[] args) {
String code = sendCode("18896725688");
System.out.println("获取验证码:" + code);
verifyCode("18896725688", code);
}
public static String sendCode(String phone) {
Jedis jedis = new Jedis("192.168.168.101", 6379);
String countKey = "VerifyCode" + phone + ":count";
String codeKey = "VerifyCode" + phone + ":code";
String count = jedis.get(countKey);
if (count == null) {
jedis.setex(countKey, 24 * 60 * 60, "1");
} else if (Integer.parseInt(count) <= 2) {
jedis.incr(countKey);
} else if (Integer.parseInt(count) > 2) {
System.out.println("今天发送次数已经超过三次");
jedis.close();
}
String vCode = getCode();
jedis.setex(codeKey, 120, vCode);
jedis.close();
return vCode;
}
public static void verifyCode(String phone, String code) {
Jedis jedis = new Jedis("192.168.168.101", 6379);
String codeKey = "VerifyCode" + phone + ":code";
String redisCode = jedis.get(codeKey);
if (redisCode.equals(code)) {
System.out.println("成功");
} else {
System.out.println("失败");
}
jedis.close();
}
public static String getCode() {
Random random = new Random();
String code = "";
for (int i = 0; i < 6; i++) {
int rand = random.nextInt(10);
code += rand;
}
return code;
}
}
二、Redis-事务-锁机制【原理】
1、redis事务
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
Redis事务的主要作用就是串联多个命令防止别的命令插队。
从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。组队的过程中可以通过discard来放弃组队。
- Redis事务命令:multi、exec、discard
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
3) OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set m1 n1
QUEUED
127.0.0.1:6379(TX)> set m2
(error) ERR wrong number of arguments for 'set' command
127.0.0.1:6379(TX)> set m3 n3
QUEUED
127.0.0.1:6379(TX)> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set p1 q1
QUEUED
127.0.0.1:6379(TX)> incr p1
QUEUED
127.0.0.1:6379(TX)> set p2 q2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK
127.0.0.1:6379>
在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
unwatch用来取消 WATCH 命令对所有 key 的监视。如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH。
2、redis事务锁
如果多个事务同时操作一个资源,可能会出现并发不安全甚至错误的现象。比如:有金额10000,同一时间内容共有三个请求:一个请求想给金额减8000;一个请求想给金额减5000;一个请求想给金额减1000。
所以需要有锁来保证并发安全。
悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。如下:
乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等【CMS】机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set 机制实现事务的。如下:
3、redis事务三特性
- 单独的隔离操作
事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。 - 没有隔离级别的概念
队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行。 - 不保证原子性
事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚。
三、Redis秒杀案例【实操】
本案例代码位于:https://download.csdn.net/download/qq_41822345/85885803
案例:某商品有库存n个,规定秒杀活动中,每个用户最多能购买1个。
秒杀成功结果:商品库存减少个数=秒杀成功的用户个数
1、ab测试
使用工具ab模拟高并发请求测试,安转命令如下:
yum install httpd-tools
[root@k8s101 myredis]
/root/myredis
[root@k8s101 myredis]
prodid=0101&
[root@k8s101 myredis]
import java.io.IOException;
import redis.clients.jedis.Jedis;
public class SecKill_base {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.168.101", 6379);
System.out.println(jedis.ping());
jedis.close();
}
public static boolean doSecKill(String uid, String prodid) throws IOException {
if (uid == null || prodid == null) {
return false;
}
Jedis jedis = new Jedis("192.168.168.101", 6379);
String kcKey = "sk:" + prodid + ":qt";
String userKey = "sk:" + prodid + ":user";
String kc = jedis.get(kcKey);
if (kc == null) {
System.out.println("秒杀还没有开始,请等待");
jedis.close();
return false;
}
if (jedis.sismember(userKey, uid)) {
System.out.println("已经秒杀成功了,不能重复秒杀");
jedis.close();
return false;
}
if (Integer.parseInt(kc) <= 0) {
System.out.println("秒杀已经结束了");
jedis.close();
return false;
}
jedis.decr(kcKey);
jedis.sadd(userKey, uid);
System.out.println("秒杀成功了..");
jedis.close();
return true;
}
}
2、超卖问题—乐观锁
import java.io.IOException;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
public class SecKill_redis {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.168.101", 6379);
System.out.println(jedis.ping());
jedis.close();
}
public static boolean doSecKill(String uid, String prodid) throws IOException {
if (uid == null || prodid == null) {
return false;
}
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();
String kcKey = "sk:" + prodid + ":qt";
String userKey = "sk:" + prodid + ":user";
jedis.watch(kcKey);
String kc = jedis.get(kcKey);
if (kc == null) {
System.out.println("秒杀还没有开始,请等待");
jedis.close();
return false;
}
if (jedis.sismember(userKey, uid)) {
System.out.println("已经秒杀成功了,不能重复秒杀");
jedis.close();
return false;
}
if (Integer.parseInt(kc) <= 0) {
System.out.println("秒杀已经结束了");
jedis.close();
return false;
}
Transaction multi = jedis.multi();
multi.decr(kcKey);
multi.sadd(userKey, uid);
List<Object> results = multi.exec();
if (results == null || results.size() == 0) {
System.out.println("秒杀失败了....");
jedis.close();
return false;
}
System.out.println("秒杀成功了..");
jedis.close();
return true;
}
}
3、高并发导致连接超时问题—连接池
通过创建数据库连接池,解决连接超时问题。连接池配置如下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class JedisPoolUtil {
private static volatile JedisPool jedisPool = null;
private JedisPoolUtil() {
}
public static JedisPool getJedisPoolInstance() {
if (null == jedisPool) {
synchronized (JedisPoolUtil.class) {
if (null == jedisPool) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(200);
poolConfig.setMaxIdle(32);
poolConfig.setMaxWaitMillis(100*1000);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true);
jedisPool = new JedisPool(poolConfig, "192.168.168.101", 6379, 60000 );
}
}
}
return jedisPool;
}
}
4、高并发且乐观锁导致库存遗留问题—LUA脚本
Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。
很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。
redis使用lua的优势:
将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。
LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
redis 在2.6版本以后,通过lua脚本解决争抢问题,实际上是 redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。
- 代码3——LUA脚本:利用lua脚本淘汰用户,解决超卖问题。
import java.io.IOException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class SecKill_redisByScript {
public static void main(String[] args) {
JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedispool.getResource();
System.out.println(jedis.ping());
}
static String secKillScript = "local userid=KEYS[1];\r\n" +
"local prodid=KEYS[2];\r\n" +
"local qtkey='sk:'..prodid..\":qt\";\r\n" +
"local usersKey='sk:'..prodid..\":usr\";\r\n" +
"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" +
"if tonumber(userExists)==1 then \r\n" +
" return 2;\r\n" +
"end\r\n" +
"local num= redis.call(\"get\" ,qtkey);\r\n" +
"if tonumber(num)<=0 then \r\n" +
" return 0;\r\n" +
"else \r\n" +
" redis.call(\"decr\",qtkey);\r\n" +
" redis.call(\"sadd\",usersKey,userid);\r\n" +
"end\r\n" +
"return 1";
public static boolean doSecKill(String uid, String prodid) throws IOException {
JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedispool.getResource();
String sha1 = jedis.scriptLoad(secKillScript);
Object result = jedis.evalsha(sha1, 2, uid, prodid);
String reString = String.valueOf(result);
if ("0".equals(reString)) {
System.err.println("已抢空!!");
} else if ("1".equals(reString)) {
System.out.println("抢购成功!!!!");
} else if ("2".equals(reString)) {
System.err.println("该用户已抢过!!");
} else {
System.err.println("抢购异常!!");
}
jedis.close();
return true;
}
}
四、Redis应用问题解决
1、缓存穿透
访问原来就不存在的数据。 key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。
1.1、缓存穿透解决方案
一个一定不存在缓存或查询不到的数据,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
解决方案:
(1) 对空值缓存: 如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
(2) 设置可访问的名单(白名单):
使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
(3) 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
(4) **进行实时监控:**当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。
2、缓存击穿
访问某个原本存在缓存中的数据缓存过期。key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候高并发的请求可能会瞬间把后端DB压垮。
2.1、缓存击穿解决方案
key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。
解决方案:
(1)预先设置热门数据: 在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长。
(2)实时调整: 现场监控哪些数据热门,实时调整key的过期时长。
(3)使用锁:
a、就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。
b、先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key。
c、当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key。
d、当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。
3、缓存雪崩
访问批量存在缓存中的数据缓存过期【同时过期】。key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
3.1、缓存雪崩解决方案
缓存失效时的雪崩效应对底层系统的冲击非常可怕!
解决方案:
(1) 构建多级缓存架构: nginx缓存 + redis缓存 +其他缓存(ehcache等)
(2) 使用锁或队列:
用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况。
(3) 设置过期标志更新缓存:
记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
(4) 将缓存失效时间分散开:
比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
4、分布式锁【核心】
本案例代码位于:https://download.csdn.net/download/qq_41822345/85885803
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁主流的实现方案:
-
基于数据库实现分布式锁 -
基于缓存(Redis等) -
基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
-
性能:redis最高 -
可靠性:zookeeper最高
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
- 加锁和解锁必须具有原子性。
redis命令:
set key value [EX seconds|PX milliseconds|EXAT timestamp|PXAT milliseconds-timestamp|KEEPTTL] [NX|XX] [GET]
set sku:1:info "OK" NX PX 10000
4.1、代码实现1——基本实现分布式锁功能
- 多个客户端同时获取锁(setnx)
- 获取成功,执行业务逻辑{从db获取数据,放入缓存},执行完成释放锁(del)
- 其他客户端等待重试
@GetMapping("testLock")
public void testLock(){
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
if(lock){
Object value = redisTemplate.opsForValue().get("num");
if(StringUtils.isEmpty(value)){
return;
}
int num = Integer.parseInt(value+"");
redisTemplate.opsForValue().set("num", ++num);
redisTemplate.delete("lock");
}else{
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ab模拟高并发:
127.0.0.1:6379> set num 0
OK
127.0.0.1:6379> get num
"0"
[root@k8s101 ~]
127.0.0.1:6379> get num
"1000"
4.2、代码实现2——优化锁过期时间
@GetMapping("testLock")
public void testLock(){
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","1111",3, TimeUnit.SECONDS);
if(lock){
Object value = redisTemplate.opsForValue().get("num");
if(StringUtils.isEmpty(value)){
return;
}
int num = Integer.parseInt(value+"");
redisTemplate.opsForValue().set("num", ++num);
String lockUuid = (String)redisTemplate.opsForValue().get("lock");
redisTemplate.delete("lock");
}else{
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.3、代码实现3——优化防UUID误删
@GetMapping("testLock")
public void testLock() {
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
if (lock) {
Object value = redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(value)) {
return;
}
int num = Integer.parseInt(value + "");
redisTemplate.opsForValue().set("num", ++num);
String lockUuid = (String) redisTemplate.opsForValue().get("lock");
if (lockUuid.equals(uuid)) {
redisTemplate.delete("lock");
}
} else {
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.4、代码实现4——优化使用LUA脚本保证原子性
@GetMapping("testLockLua")
public void testLockLua() {
String uuid = UUID.randomUUID().toString();
String skuId = "25";
String locKey = "lock:" + skuId;
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
if (lock) {
Object value = redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(value)) {
return;
}
int num = Integer.parseInt(value + "");
redisTemplate.opsForValue().set("num", String.valueOf(++num));
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
try {
Thread.sleep(1000);
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|