Redis应用场景举例
缓存
缓存现在几乎是所有中大型网站都在用的必杀技,合理的利用缓存不仅能够提升网站访问速度,还能大大降低数据库的压力。
作为Key-Value形态的内存数据库,Redis 最先会被想到的应用场景便是作为数据缓存。而使用 Redis 缓存数据非常简单,只需要通过string类型将序列化后的对象存起来即可,不过也有一些需要注意的地方:
Redis还提供了键过期功能,也提供了灵活的键淘汰策略,所以,现在Redis用在缓存的场合非常多。
百万key模糊查找
在百万数量keys的Redis中,如何模糊的查找出某个key?
模拟一下场景:
package com.soberw.redis_quickstart.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class ScanRedis {
@Autowired
StringRedisTemplate redisTemplate;
public void test() {
for (int i = 0; i < 100; i++) {
redisTemplate.opsForValue().set("a" + i, "1");
}
for (int i = 0; i < 100; i++) {
redisTemplate.opsForValue().set("b" + i, "1");
}
ScanOptions build = ScanOptions.scanOptions().match("b15").count(10).build();
Cursor<String> scan = redisTemplate.scan(build);
while (scan.hasNext()) {
System.out.println(scan.next());
}
scan.close();
}
}
我们可以在redis中开启监视:
执行LUA脚本
Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。https://www.w3cschool.cn/lua/
将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。
LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。
还是上面的查找key值案例,我们使用lua脚本执行一遍:
package com.soberw.redis_quickstart.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
@Component
public class Lua {
@Autowired(required = false)
StringRedisTemplate redisTemplate;
public void test() {
String cursor = "0";
String count = "50";
for (int i = 0; i < 50; i++) {
redisTemplate.opsForValue().set("c" + i, "1");
}
String lua = "return redis.call('scan',KEYS[1],'MATCH',ARGV[1],'count',ARGV[2])";
RedisScript<List> script = RedisScript.of(lua, List.class);
RedisSerializer redisSerializer = redisTemplate.getStringSerializer();
do {
List list = redisTemplate.execute(script, redisSerializer, redisSerializer, Collections.singletonList(cursor), "c15*", count);
cursor = (String) list.get(0);
System.out.println("当前游标所在位置:" + cursor);
System.out.println(">>>>>>" + list.get(1));
} while (!"0".equals(cursor));
}
}
分布式锁机制实现
可以借助Redis的一个指令 setnx 解决:
> EXISTS job
(integer) 0
> SETNX job "chengxuyuan"
(integer) 1
> SETNX job "paotui"
(integer) 0
> GET job
"chengxuyuan"
模拟多线程并发获取分布式锁SetNX
package com.soberw.redis_quickstart.redis;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class SetNX {
@Resource(name = "redisTemplate")
ValueOperations<String, String> valueOperations;
final String key = "product#001";
public void test() throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
Boolean ifAbsent = valueOperations.setIfAbsent(key, "2022-05-30-15-43");
if (ifAbsent) {
System.out.printf("线程{%s} 获取到分布式锁成功\n", Thread.currentThread().getName());
} else {
System.out.printf("线程{%s} 获取到分布式锁失败\n", Thread.currentThread().getName());
}
});
Thread.sleep(2 * 1000);
}
}
}
BitMap
主要使用到的是Redis的一种数据类型 BitMap
相关指令:
-
setbit key offest value 其中key 即为我们想要设置的值,offest 是偏移量,value 是要设置的具体的值,包括 0 和 1 -
getbit key offest 对 key 所储存的字符串值,获取指定偏移量上的位(bit)。 当 offset 比字符串值的长度大,或者 key 不存在时,返回 0 。 -
**bitcount ** key [start] [end] 计算给定字符串中,被设置为 1 的比特位的数量。 通过指定额外的 start 或 end 参数,可以让计数只在特定的位上进行。 -
bitop operation destkey key [key…] 对一个或多个保存二进制位的字符串 key 进行位元操作,并将结果保存到 destkey 上。 operation 可以是 AND 、 OR 、 NOT 、 XOR 这四种操作中的任意一种:
BITOP AND destkey key [key ...] ,对一个或多个 key 求逻辑并,并将结果保存到 destkey 。BITOP OR destkey key [key ...] ,对一个或多个 key 求逻辑或,并将结果保存到 destkey 。BITOP XOR destkey key [key ...] ,对一个或多个 key 求逻辑异或,并将结果保存到 destkey 。BITOP NOT destkey key ,对给定 key 求逻辑非,并将结果保存到 destkey 。 除了 NOT 操作之外,其他操作都可以接受一个或多个 key 作为输入。
BitMa对于一些特定情景的计算非常有效。 假设现在我们希望记录自己网站上的用户的上线频率,比如说,计算用户 A 上线了多少天,用户 B 上线了多少天,诸如此类,以此作为数据,从而决定让哪些用户参加 beta 测试等活动 —— 这个模式可以使用 SETBIT 和 BITCOUNT 来实现。 比如说,每当用户在某一天上线的时候,我们就使用 SETBIT ,以用户名作为 key ,将那天所代表的网站的上线日作为 offset 参数,并将这个 offset 上的为设置为 1 。 举个例子,如果今天是网站上线的第 100 天,而用户 peter 在今天阅览过网站,那么执行命令 SETBIT peter 100 1 ;如果明天 peter 也继续阅览网站,那么执行命令 SETBIT peter 101 1 ,以此类推。 当要计算 peter 总共以来的上线次数时,就使用 BITCOUNT 命令:执行 BITCOUNT peter ,得出的结果就是 peter 上线的总天数。 再例如,如果想要统计网站的在线用户中,使用iPhone手机的用户数量,则可以借助于BITOP 命令,将统计手机型号的key与统计在线用户的key进行与 操作,就可以得到数量了。 那么,使用BitMap的内存占用如何? 因为在BitMap中,任何key 的value 值都只有两种情况 : 0 和 1 ,即一个比特位(bit)。 例如上面的统计在线人数的例子中,如何使用set 类型,同样可以实现,但是因为存储机制的不同,一个用户需要32bit,如果统计的是 10 亿用户,使用set需要占用 4GB的空间。 而使用BitMap,因为一个值只占用 1 bit,因此优化了 32 倍!即内存占用为原有的4GB/32 = 125MB左右!!!
下面使用BitMap模拟在线统计: @Component
public class BitMapDemo {
@Resource(name = "redisTemplate")
private ValueOperations valueOperations;
private static final String key = "product#01";
@Autowired
private StringRedisTemplate redisTemplate;
private static final int user01 = 1;
private static final int user02 = 2;
private static final int user03 = 3;
private static String key20220601 = "20220601";
private static String key20220602 = "20220602";
private static String key20220603 = "20220603";
private static String saveKey = "20220601#20220602#20220603";
public void test() {
valueOperations.setBit(key20220601, user01, true);
valueOperations.setBit(key20220601, user02, true);
valueOperations.setBit(key20220602, user02, true);
valueOperations.setBit(key20220603, user01, true);
valueOperations.setBit(key20220603, user03, true);
RedisCallback<Long> callback = connection -> {
return connection.bitOp(RedisStringCommands.BitOperation.AND, saveKey.getBytes(),
key20220601.getBytes(), key20220602.getBytes(), key20220603.getBytes());
};
Long value = redisTemplate.execute(callback);
RedisCallback<Long> callback2 = connection -> {
return connection.bitCount(saveKey.getBytes());
};
Long value2 = redisTemplate.execute(callback2);
System.out.println(value2);
}
}
SET和ZSET
set因为其不重复且可以比较计算的特性,在处理好友的交际功能时有奇效
指令 | 说明 |
---|
SDIFF key1, key2 | 求两个key对应的set的差集(不包括右边的) | SINTER key1, key2 | 求两个key对应的set的交集 | SUNION key1, key2 | 求两个key对应的set的并集 | SDIFFSTORE dest key [key…] | 求两个key对应的set的差集(不包括右边的),并将结果存放在dest中 | SINTERSTORE dest key [key…] | 求两个key对应的set的交集,并将结果存放在dest中 | SUNIONSTORE dest key [key…] | 求两个key对应的set的并集,并将结果存放在dest中 |
通过这些命令,可以实现好友交际的功能。
例如我们在抖音等平台经常看到系统给我们推荐,你可能认识,或者你们有几个共同好友等功能,实际上就是通过这一原理实现。
再者ZSET类型,因为可以在SET的基础上另外再设置一个score值,即权重值,再加之ZSET本身的有序性,可以实现很多场景,例如排行榜、热搜、今日头条、推荐等…
SET举例:
@Component
public class SetRedis {
private final String key1 = "stu#01";
private final String key2 = "stu#02";
@Autowired(required = false)
StringRedisTemplate redisTemplate;
public void test(){
SetOperations<String, String> set = redisTemplate.opsForSet();
set.add(key1,"a");
set.add(key1,"b");
set.add(key1,"c");
set.add(key2,"b");
set.add(key2,"c");
set.add(key2,"d");
System.out.println("key1 = " + set.members(key1));
System.out.println("key2 = " + set.members(key2));
System.out.println("set.difference(key1,key2) = " + set.difference(key1, key2));
System.out.println("set.intersect(key1,key2) = " + set.intersect(key1, key2));
System.out.println("set.union(key1,key2) = " + set.union(key1, key2));
}
}
手机验证码案例
需求
1、输入手机号,点击发送后随机生成6位数字码,2分钟有效
2、输入验证码,点击验证,返回成功或失败
3、每个手机号每天只能发送3次验证码
分析
实现验证码功能,大致步骤如下:
- 用户发送手机号
- 后台接收手机号,调用redis查询此手机号是否今天已经发送过了,设置 user:手机号:count 值以记录次数,过期时间截止到当天00.00
- 如果还有次数,生成验证码并保存在redis里 user:手机号:code ,同时次数加 1
- 如果是第一次发送,则同时保存 计数以及验证码 ,并且次数赋值为 1
- 验证的时候,只需要取出 redis里保存的 user:手机号:code 进行比对即可
实现
这里就简单模拟一下,前台简单实现一下:
<%@ page language="java" contentType="text/html; charset=UTF-8"
pageEncoding="UTF-8" %>
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
<script src="static/jquery/jquery-3.1.0.js"></script>
<link href="static/bs/css/bootstrap.min.css" rel="stylesheet"/>
<script src="static/bs/js/bootstrap.min.js"></script>
</head>
<body>
<div class="container">
<div class="row">
<div id="alertdiv" class="col-md-12">
<form class="navbar-form navbar-left" role="search" id="codeform">
<div class="form-group">
<input type="text" class="form-control" placeholder="填写手机号" name="phone_no">
<button type="button" class="btn btn-default" id="sendCode">发送验证码</button>
<br>
<font id="countdown" color="red"></font>
<br>
<input type="text" class="form-control" placeholder="填写验证码" name="verify_code">
<button type="button" class="btn btn-default" id="verifyCode">确定</button>
<font id="result" color="green"></font><font id="error" color="red"></font>
</div>
</form>
</div>
</div>
</div>
</body>
<script type="text/javascript">
var t = 120;
var interval;
function refer() {
$("#countdown").text("请于" + t + "秒内填写验证码 ");
t--;
if (t <= 0) {
clearInterval(interval);
$("#countdown").text("验证码已失效,请重新发送! ");
}
}
$(function () {
$("#sendCode").click(function () {
$.post("SendCodeServlet", $("#codeform").serialize(), function (data) {
if (data == "true") {
t = 120;
clearInterval(interval);
interval = setInterval("refer()", 1000);
} else if (data == "limit") {
clearInterval(interval);
$("#countdown").text("单日发送超过次数! ")
}
});
});
$("#verifyCode").click(function () {
$.post("CheckCodeServlet", $("#codeform").serialize(), function (data) {
if (data == "true") {
$("#result").attr("color", "green");
$("#result").text("验证成功");
clearInterval(interval);
$("#countdown").text("")
} else {
$("#result").attr("color", "red");
$("#result").text("验证失败");
}
});
});
});
</script>
</html>
SendCodeServlet:
@WebServlet("/SendCodeServlet")
public class SendCodeServlet extends HttpServlet {
Jedis jedis = new Jedis("192.168.6.200", 6379);
CodeService codeService = new CodeServiceImpl();
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String phoneNo = request.getParameter("phone_no");
boolean flag = false;
if (phoneNo != null && !"".equals(phoneNo)) {
flag = codeService.getAndSetCodeCount(jedis, phoneNo);
}
String code = getCode(6);
String rs = null;
if (flag) {
rs = codeService.setCode(jedis, phoneNo, code);
if ("OK".equals(rs)) {
System.out.println("向" + phoneNo + "发送了验证码为:" + code);
response.getWriter().write("true");
}
} else {
response.getWriter().write("limit");
}
jedis.close();
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doPost(request, response);
}
private String getCode(int len) {
String code = "";
for (int i = 0; i < len; i++) {
int rand = new Random().nextInt(10);
code += rand;
}
return code;
}
}
CheckCodeServlet:
@WebServlet("/CheckCodeServlet")
public class CheckCodeServlet extends HttpServlet {
Jedis jedis = new Jedis("192.168.6.200", 6379);
CodeService codeService = new CodeServiceImpl();
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String phoneNo = request.getParameter("phone_no");
String verifyCode = request.getParameter("verify_code");
if (null == verifyCode || "".equals(verifyCode) || null == phoneNo || "".equals(phoneNo)) {
return;
}
String code = codeService.getCode(jedis, phoneNo);
if (null != verifyCode && verifyCode.equals(code)) {
response.getWriter().write("true");
codeService.delCode(jedis, phoneNo);
} else {
response.getWriter().write("false");
}
jedis.close();
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doPost(request, response);
}
}
CodeService:
package com.atguigu.redis.service;
import redis.clients.jedis.Jedis;
public interface CodeService {
boolean getAndSetCodeCount(Jedis jedis, String phone);
String setCode(Jedis jedis, String phone, String verifyCode);
String getCode(Jedis jedis, String phone);
void delCode(Jedis jedis,String phone);
}
CodeServiceImpl:
package com.atguigu.redis.service;
import redis.clients.jedis.Jedis;
import java.time.Duration;
import java.time.LocalTime;
public class CodeServiceImpl implements CodeService {
private final Integer MAX_VERIFY_COUNT = 3;
private long getTheLeftSeconds() {
LocalTime now = LocalTime.now();
LocalTime end = LocalTime.of(23, 59, 59);
long millis = Duration.between(now, end).toMillis() / 1000;
return millis;
}
@Override
public boolean getAndSetCodeCount(Jedis jedis, String phone) {
String phoneNo = "user:" + phone + ":count";
String count = jedis.get(phoneNo);
if (count != null && !"".equals(count)) {
int i = Integer.parseInt(count);
if (i < MAX_VERIFY_COUNT) {
jedis.incr(phoneNo);
return true;
} else {
return false;
}
}
jedis.setex(phoneNo, (int) getTheLeftSeconds(), "1");
return true;
}
@Override
public String setCode(Jedis jedis, String phone, String verifyCode) {
String phoneNo = "user:" + phone + ":code";
return jedis.setex(phoneNo, 120, verifyCode);
}
@Override
public String getCode(Jedis jedis, String phone) {
String phoneNo = "user:" + phone + ":code";
return jedis.get(phoneNo);
}
@Override
public void delCode(Jedis jedis, String phone) {
String phoneNo = "user:" + phone + ":code";
jedis.del(phoneNo);
}
}
测试一下:
当发送超过三次时:
秒杀案例
需求
- 商品在库存中的数量确定
- 当用户抢到商品后,对应的商品库存数量够跟着变化
- 同一时间多名用户抢购秒杀,确保库存和用户数据不出错
分析
使用Redis处理计数器和人员记录的事务操作
其实业务逻辑很简单,就是用户点击抢购后,后台向Redis发送给查询库存,判断库存是否还有余量,如果有则下单成功,库存数量-1,同时成功名单+1。
简单的实现如下:
public static boolean doSecKill(String uid,String prodid) throws IOException {
if(null ==uid || "".equals(uid)|| null == prodid || "".equals(prodid)){
return false;
}
String kc ="sk:"+prodid+":qt";
String userskey ="sk:"+prodid+":usr";
Jedis jedis=new Jedis("192.168.6.101",6379);
String kcCount = jedis.get(kc);
if(Integer.parseInt(kcCount)<=0){
System.out.println("秒杀结束");
return false;
}
jedis.decr(kc);
jedis.sadd(userskey,uid);
System.out.println("秒杀成功");
Jedis.close();
return true;
}
但是当同一时间并发量过大的时候,要保证这一过程不出错,也是不容易的。
其实造成这的根本原因还是因为没有保证操作的原子性
我们知道,Redis的指令操作线程worker是单线程执行的,即同一时间只会有一条指令被执行,这就保证了指令执行的原子性。
但是Redis严格来说并不是单线程的运行模式,其控制访问的IO线程是多线程。
因为秒杀的操作是一系列的,查看库存–>修改库存–>保存用户,而非单一指令。
因此必然会出错。
测试
并发模拟器
因为同一时间很难找到那么多的人帮我们点击模拟,因此这里可以借助于一个并发测试模拟工具ab模拟测试:
需要在Linux环境下安装:
CentOS6 默认安装
CentOS7需要手动安装
yum install httpd-tools
(1) 进入cd /run/media/root/CentOS 7 x86_64/Packages(路径跟centos6不同)
(2) 顺序安装
1 apr-1.4.8-3.el7.x86_64.rpm
2 apr-util-1.5.2-6.el7.x86_64.rpm
3 httpd-tools-2.4.6-67.el7.centos.x86_64.rpm
并发测试实现
vim postfile 模拟表单提交参数,以&符号结尾;存放当前目录。文件内容:prodid=0101&
ab -n 总访问数 -c 并发量 -k -p ~/postfile -T application/x-www-form-urlencoded http://主机Ip地址:端口号/访问路径
实测指令:
ab -n 1000 -c 100 -k -p ~/postfile -T application/x-www-form-urlencoded http://192.168.8.110:8080/redis_seckill_war_exploded/doseckill
超卖问题
- 超卖产生的原因: 没有事务控制,一个用户在秒杀商品时,被其他用户打断
解决方案
方案一:乐观锁
jedis.watch(qtkey);
String qtkeystr = jedis.get(qtkey);
if(qtkeystr==null || "".equals(qtkeystr.trim())) {
System.out.println("未初始化库存");
jedis.close();
return false ;
}
int qt = Integer.parseInt(qtkeystr);
if(qt<=0) {
System.err.println("已经秒光");
jedis.close();
return false;
}
Transaction multi = jedis.multi();
multi.decr(qtkey);
multi.sadd(usrkey, uid);
List<Object> list = multi.exec();
if(list==null || list.size()==0) {
System.out.println("秒杀失败");
jedis.close();
return false;
}
System.err.println("秒杀成功");
jedis.close();
超卖问题解决
但是此种方式存在着问题:
方案二:连接池
public static boolean doSecKill(String uid,String prodid) throws IOException {
if(null ==uid || "".equals(uid)|| null == prodid || "".equals(prodid)){
return false;
}
String kc ="sk:"+prodid+":qt";
String userskey ="sk:"+prodid+":usr";
JedisPool jedisPool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis=jedisPool.getResource();
jedis.watch(kc);
String kccount = jedis.get(kc);
if(Integer.parseInt(kccount)<=0){
System.out.println("秒杀结束");
JedisPoolUtil.release(jedisPool,jedis);
return false;
}
Transaction multi = jedis.multi();
multi.decr(kc);
multi.sadd(userskey,uid);
List<Object> exec = multi.exec();
if(null == exec || exec.size()==0){
System.out.println("秒杀失败");
JedisPoolUtil.release(jedisPool,jedis);
return false;
}
System.out.println("秒杀成功");
JedisPoolUtil.release(jedisPool,jedis);
return true;
}
方案三:LUA脚本
因为Redis是原子性的,同一时刻只会执行一条指令,因此,我们可以将这一系列操作变成一条指令,交由Redis去处理,这样就可以解决库存问题,如何将一系列操作变成一条指令呢?
可以借助于LUA脚本:
利用lua脚本淘汰用户,解决超卖问题。
redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。
LUA脚本如下:
local userid=KEYS[1];
local prodid=KEYS[2];
local qtkey="sk:"..prodid..":qt";
local usersKey="sk:"..prodid..":usr';
local userExists=redis.call("sismember",usersKey,userid);
if tonumber(userExists)==1 then
return 2;
end
local num= redis.call("get" ,qtkey);
if tonumber(num)<=0 then
return 0;
else
redis.call("decr",qtkey);
redis.call("sadd",usersKey,userid);
end
return 1;
代码实现:
public class SecKill_redisByScript {
static String secKillScript ="local userid=KEYS[1];\r\n" +
"local prodid=KEYS[2];\r\n" +
"local qtkey='sk:'..prodid..\":qt\";\r\n" +
"local usersKey='sk:'..prodid..\":usr\";\r\n" +
"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" +
"if tonumber(userExists)==1 then \r\n" +
" return 2;\r\n" +
"end\r\n" +
"local num= redis.call(\"get\" ,qtkey);\r\n" +
"if tonumber(num)<=0 then \r\n" +
" return 0;\r\n" +
"else \r\n" +
" redis.call(\"decr\",qtkey);\r\n" +
" redis.call(\"sadd\",usersKey,userid);\r\n" +
"end\r\n" +
"return 1" ;
public static boolean doSecKill(String uid,String prodid) throws IOException {
JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis=jedispool.getResource();
String sha1= jedis.scriptLoad(secKillScript);
Object result= jedis.evalsha(sha1, 2, uid,prodid);
String reString=String.valueOf(result);
if ("0".equals( reString ) ) {
System.err.println("已抢空!!");
}else if("1".equals( reString ) ) {
System.out.println("抢购成功!!!!");
}else if("2".equals( reString ) ) {
System.err.println("该用户已抢过!!");
}else{
System.err.println("抢购异常!!");
}
jedis.close();
return true;
}
}
|