IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Redis应用场景举例 -> 正文阅读

[大数据]Redis应用场景举例

Redis应用场景举例

缓存

缓存现在几乎是所有中大型网站都在用的必杀技,合理的利用缓存不仅能够提升网站访问速度,还能大大降低数据库的压力。

作为Key-Value形态的内存数据库,Redis 最先会被想到的应用场景便是作为数据缓存。而使用 Redis 缓存数据非常简单,只需要通过string类型将序列化后的对象存起来即可,不过也有一些需要注意的地方:

  • 必须保证不同对象的 key 不会重复,并且使 key 尽量短,一般使用类名(表名)加主键拼接而成。

  • 选择一个优秀的序列化方式也很重要,目的是提高序列化的效率和减少内存占用。

  • 缓存内容与数据库的一致性,这里一般有两种做法:

    • 只在数据库查询后将对象放入缓存,如果对象发生了修改或删除操作,直接清除对应缓存(或设为过期)。
    • 在数据库新增和查询后将对象放入缓存,修改后更新缓存,删除后清除对应缓存(或设为过期)。

Redis还提供了键过期功能,也提供了灵活的键淘汰策略,所以,现在Redis用在缓存的场合非常多。

百万key模糊查找

在百万数量keys的Redis中,如何模糊的查找出某个key?

  • 通过 keys 指令,通过设置匹配模式,精准找出想要的key(不推荐,会把所有的key都扫描一遍,效率极低)

    image-20220528173758560

  • 通过 scan 指令,设置增量(默认为10),分块迭代查找,每次返回当前检索到的索引位置,下次检索的时候,只需要将索引位置设置为上次返回的即可,当索引返回为0时,则说明全部迭代一遍,如果此时还没有,则说明Redis里面没有,因此 最大迭代次数 = 总key数量 / 增量

    image-20220528174114266

模拟一下场景:

package com.soberw.redis_quickstart.redis;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

/**
 * @author soberw
 * @Classname ScanRedis
 * @Description
 * @Date 2022-05-30 10:39
 */
@Component
public class ScanRedis {

    @Autowired
    StringRedisTemplate redisTemplate;

    public void test() {

        for (int i = 0; i < 100; i++) {
            redisTemplate.opsForValue().set("a" + i, "1");
        }
        for (int i = 0; i < 100; i++) {
            redisTemplate.opsForValue().set("b" + i, "1");
        }

        ScanOptions build = ScanOptions.scanOptions().match("b15").count(10).build();
        Cursor<String> scan = redisTemplate.scan(build);

        while (scan.hasNext()) {
            System.out.println(scan.next());
        }
        scan.close();
    }
}

我们可以在redis中开启监视:

image-20220530113458329

image-20220530113527591

执行LUA脚本

image-20220528172406575

Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。https://www.w3cschool.cn/lua/

  • LUA脚本在Redis中的优势

将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。

LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

还是上面的查找key值案例,我们使用lua脚本执行一遍:

package com.soberw.redis_quickstart.redis;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;

/**
 * @author soberw
 * @Classname Lua
 * @Description
 * @Date 2022-05-30 10:57
 */
@Component
public class Lua {

    @Autowired(required = false)
    StringRedisTemplate redisTemplate;

    public void test() {
        //当前游标位置,默认从0开始
        String cursor = "0";
        //每次查找的次数
        String count = "50";

        for (int i = 0; i < 50; i++) {
            redisTemplate.opsForValue().set("c" + i, "1");
        }
        //lua脚本中,参数的索引是从 1 开始的
        String lua = "return redis.call('scan',KEYS[1],'MATCH',ARGV[1],'count',ARGV[2])";
        RedisScript<List> script = RedisScript.of(lua, List.class);

        RedisSerializer redisSerializer = redisTemplate.getStringSerializer();
        do {
            List list = redisTemplate.execute(script, redisSerializer, redisSerializer, Collections.singletonList(cursor), "c15*", count);
            cursor = (String) list.get(0);
            System.out.println("当前游标所在位置:" + cursor);
            System.out.println(">>>>>>" + list.get(1));
        } while (!"0".equals(cursor));
    }
}

image-20220530114927076

image-20220530114943645

分布式锁机制实现

  • 什么是分布式锁?
    分布式锁是控制分布式系统或不同系统之间共同访问共享资源的一种锁实现,如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往需要互斥来防止彼此干扰来保证一致性。

  • 分布式锁需要具备哪些条件?

    • 互斥性:在任意一个时刻,只有一个客户端持有锁。
    • 无死锁:即便持有锁的客户端崩溃或者其他意外事件,锁仍然可以被获取。
    • 容错:只要大部分Redis节点都活着,客户端就可以获取和释放锁

可以借助Redis的一个指令 setnx 解决:

image-20220528174517823

> EXISTS job                # job 不存在
(integer) 0

> SETNX job "chengxuyuan"    # job 设置成功
(integer) 1

> SETNX job "paotui"   # 尝试覆盖 job ,失败
(integer) 0

> GET job                   # 没有被覆盖
"chengxuyuan"

模拟多线程并发获取分布式锁SetNX

package com.soberw.redis_quickstart.redis;

import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author soberw
 * @Classname SetNX
 * @Description
 * @Date 2022-05-30 15:40
 */
@Component
public class SetNX {
    @Resource(name = "redisTemplate")
    ValueOperations<String, String> valueOperations;

    final String key = "product#001";

    public void test() throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                Boolean ifAbsent = valueOperations.setIfAbsent(key, "2022-05-30-15-43");
                if (ifAbsent) {
                    System.out.printf("线程{%s} 获取到分布式锁成功\n", Thread.currentThread().getName());
                } else {
                    System.out.printf("线程{%s} 获取到分布式锁失败\n", Thread.currentThread().getName());
                }
            });
            Thread.sleep(2 * 1000);
        }
    }

}

image-20220530155144584

BitMap

主要使用到的是Redis的一种数据类型 BitMap

相关指令:

  • setbit key offest value

    其中key即为我们想要设置的值,offest是偏移量,value是要设置的具体的值,包括 01

  • getbit key offest

    key 所储存的字符串值,获取指定偏移量上的位(bit)。

    offset 比字符串值的长度大,或者 key 不存在时,返回 0

  • **bitcount ** key [start] [end]

    计算给定字符串中,被设置为 1 的比特位的数量。

    通过指定额外的 startend 参数,可以让计数只在特定的位上进行。

  • bitop operation destkey key [key…]

    对一个或多个保存二进制位的字符串 key 进行位元操作,并将结果保存到 destkey 上。

    operation 可以是 ANDORNOTXOR 这四种操作中的任意一种:

    • BITOP AND destkey key [key ...] ,对一个或多个 key 求逻辑并,并将结果保存到 destkey
    • BITOP OR destkey key [key ...] ,对一个或多个 key 求逻辑或,并将结果保存到 destkey
    • BITOP XOR destkey key [key ...] ,对一个或多个 key 求逻辑异或,并将结果保存到 destkey
    • BITOP NOT destkey key ,对给定 key 求逻辑非,并将结果保存到 destkey

    除了 NOT 操作之外,其他操作都可以接受一个或多个 key 作为输入。


    BitMa对于一些特定情景的计算非常有效。

    假设现在我们希望记录自己网站上的用户的上线频率,比如说,计算用户 A 上线了多少天,用户 B 上线了多少天,诸如此类,以此作为数据,从而决定让哪些用户参加 beta 测试等活动 —— 这个模式可以使用 SETBITBITCOUNT 来实现。

    比如说,每当用户在某一天上线的时候,我们就使用 SETBIT ,以用户名作为 key ,将那天所代表的网站的上线日作为 offset 参数,并将这个 offset 上的为设置为 1

    举个例子,如果今天是网站上线的第 100 天,而用户 peter 在今天阅览过网站,那么执行命令 SETBIT peter 100 1 ;如果明天 peter 也继续阅览网站,那么执行命令 SETBIT peter 101 1 ,以此类推。

    当要计算 peter 总共以来的上线次数时,就使用 BITCOUNT 命令:执行 BITCOUNT peter ,得出的结果就是 peter 上线的总天数。

    再例如,如果想要统计网站的在线用户中,使用iPhone手机的用户数量,则可以借助于BITOP命令,将统计手机型号的key与统计在线用户的key进行操作,就可以得到数量了。

    那么,使用BitMap的内存占用如何?

    因为在BitMap中,任何keyvalue值都只有两种情况 : 0 1,即一个比特位(bit)。

    例如上面的统计在线人数的例子中,如何使用set类型,同样可以实现,但是因为存储机制的不同,一个用户需要32bit,如果统计的是 10 亿用户,使用set需要占用 4GB的空间。

    而使用BitMap,因为一个值只占用 1 bit,因此优化了 32 倍!即内存占用为原有的4GB/32 = 125MB左右!!!


    下面使用BitMap模拟在线统计:

    @Component
    public class BitMapDemo {
    
        @Resource(name = "redisTemplate")
        private ValueOperations valueOperations;
    
        private static final String key = "product#01";
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        private static final int user01 = 1;
        private static final int user02 = 2;
        private static final int user03 = 3;
    
        private static String key20220601 = "20220601";
        private static String key20220602 = "20220602";
        private static String key20220603 = "20220603";
        private static String saveKey = "20220601#20220602#20220603";
    
        public void test() {
    
            valueOperations.setBit(key20220601, user01, true);
            valueOperations.setBit(key20220601, user02, true);
    
    
            valueOperations.setBit(key20220602, user02, true);
    
    
            valueOperations.setBit(key20220603, user01, true);
            valueOperations.setBit(key20220603, user03, true);
    
    
            //1. 一直在线人数统计
            //2. 时间段活跃用户
            RedisCallback<Long> callback = connection -> {
                return connection.bitOp(RedisStringCommands.BitOperation.AND, saveKey.getBytes(),
    
                        key20220601.getBytes(), key20220602.getBytes(), key20220603.getBytes());
            };
            Long value = redisTemplate.execute(callback);
            RedisCallback<Long> callback2 = connection -> {
                return connection.bitCount(saveKey.getBytes());
            };
            Long value2 = redisTemplate.execute(callback2);
            System.out.println(value2);
        }
    
    }
    

SET和ZSET

set因为其不重复且可以比较计算的特性,在处理好友的交际功能时有奇效

指令说明
SDIFF key1, key2求两个key对应的set的差集(不包括右边的)
SINTER key1, key2求两个key对应的set的交集
SUNION key1, key2求两个key对应的set的并集
SDIFFSTORE dest key [key…]求两个key对应的set的差集(不包括右边的),并将结果存放在dest中
SINTERSTORE dest key [key…]求两个key对应的set的交集,并将结果存放在dest中
SUNIONSTORE dest key [key…]求两个key对应的set的并集,并将结果存放在dest中

通过这些命令,可以实现好友交际的功能。

例如我们在抖音等平台经常看到系统给我们推荐,你可能认识,或者你们有几个共同好友等功能,实际上就是通过这一原理实现。

再者ZSET类型,因为可以在SET的基础上另外再设置一个score值,即权重值,再加之ZSET本身的有序性,可以实现很多场景,例如排行榜、热搜、今日头条、推荐等…

SET举例:

@Component
public class SetRedis {

    private final String key1 = "stu#01";
    private final String key2 = "stu#02";


    @Autowired(required = false)
    StringRedisTemplate redisTemplate;

    public void test(){
        SetOperations<String, String> set = redisTemplate.opsForSet();
        set.add(key1,"a");
        set.add(key1,"b");
        set.add(key1,"c");

        set.add(key2,"b");
        set.add(key2,"c");
        set.add(key2,"d");

        System.out.println("key1 = " + set.members(key1));
        System.out.println("key2 = " + set.members(key2));

        System.out.println("set.difference(key1,key2) = " + set.difference(key1, key2));
        System.out.println("set.intersect(key1,key2) = " + set.intersect(key1, key2));
        System.out.println("set.union(key1,key2) = " + set.union(key1, key2));
    }
}

image-20220529232110967

手机验证码案例

需求

1、输入手机号,点击发送后随机生成6位数字码,2分钟有效

2、输入验证码,点击验证,返回成功或失败

3、每个手机号每天只能发送3次验证码

1637656260450

分析

实现验证码功能,大致步骤如下:

  1. 用户发送手机号
  2. 后台接收手机号,调用redis查询此手机号是否今天已经发送过了,设置 user:手机号:count 值以记录次数,过期时间截止到当天00.00
  3. 如果还有次数,生成验证码并保存在redis里 user:手机号:code ,同时次数加 1
  4. 如果是第一次发送,则同时保存 计数以及验证码 ,并且次数赋值为 1
  5. 验证的时候,只需要取出 redis里保存的 user:手机号:code 进行比对即可

实现

这里就简单模拟一下,前台简单实现一下:

<%@ page language="java" contentType="text/html; charset=UTF-8"
         pageEncoding="UTF-8" %>
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Insert title here</title>
    <script src="static/jquery/jquery-3.1.0.js"></script>
    <link href="static/bs/css/bootstrap.min.css" rel="stylesheet"/>
    <script src="static/bs/js/bootstrap.min.js"></script>

</head>
<body>
<div class="container">
    <div class="row">
        <div id="alertdiv" class="col-md-12">
            <form class="navbar-form navbar-left" role="search" id="codeform">
                <div class="form-group">
                    <input type="text" class="form-control" placeholder="填写手机号" name="phone_no">
                    <button type="button" class="btn btn-default" id="sendCode">发送验证码</button>
                    <br>
                    <font id="countdown" color="red"></font>
                    <br>
                    <input type="text" class="form-control" placeholder="填写验证码" name="verify_code">
                    <button type="button" class="btn btn-default" id="verifyCode">确定</button>
                    <font id="result" color="green"></font><font id="error" color="red"></font>
                </div>
            </form>
        </div>
    </div>
</div>

</body>
<script type="text/javascript">
    var t = 120;//设定倒计时的时间
    var interval;

    function refer() {
        $("#countdown").text("请于" + t + "秒内填写验证码 "); // 显示倒计时
        t--; // 计数器递减
        if (t <= 0) {
            clearInterval(interval);
            $("#countdown").text("验证码已失效,请重新发送! ");
        }
    }

    $(function () {
        $("#sendCode").click(function () {

            $.post("SendCodeServlet", $("#codeform").serialize(), function (data) {
                if (data == "true") {
                    t = 120;
                    clearInterval(interval);
                    interval = setInterval("refer()", 1000);//启动1秒定时
                } else if (data == "limit") {
                    clearInterval(interval);
                    $("#countdown").text("单日发送超过次数! ")
                }
            });
        });

        $("#verifyCode").click(function () {

            $.post("CheckCodeServlet", $("#codeform").serialize(), function (data) {
                if (data == "true") {
                    $("#result").attr("color", "green");
                    $("#result").text("验证成功");
                    clearInterval(interval);
                    $("#countdown").text("")
                } else {
                    $("#result").attr("color", "red");
                    $("#result").text("验证失败");
                }
            });
        });


    });
</script>
</html>

image-20220527132803236

SendCodeServlet:

/**
 * 处理发送验证码的Servlet
 *
 * @author soberw
 */
@WebServlet("/SendCodeServlet")
public class SendCodeServlet extends HttpServlet {

    Jedis jedis = new Jedis("192.168.6.200", 6379);

    CodeService codeService = new CodeServiceImpl();


    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        //获取填入的手机号
        String phoneNo = request.getParameter("phone_no");
        boolean flag = false;
        if (phoneNo != null && !"".equals(phoneNo)) {
            flag = codeService.getAndSetCodeCount(jedis, phoneNo);
        }
        String code = getCode(6);
        String rs = null;
        if (flag) {
            rs = codeService.setCode(jedis, phoneNo, code);
            if ("OK".equals(rs)) {
                System.out.println("向" + phoneNo + "发送了验证码为:" + code);
                response.getWriter().write("true");
            }
        } else {
            response.getWriter().write("limit");
        }
        jedis.close();
    }

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doPost(request, response);
    }

    /**
     * 随机生成验证码的方法
     *
     * @param len
     * @return
     */
    private String getCode(int len) {
        String code = "";
        for (int i = 0; i < len; i++) {
            int rand = new Random().nextInt(10);
            code += rand;
        }
        return code;
    }
}

CheckCodeServlet:

/**
 * 处理验证验证码请求的Servlet
 *
 * @author soberw
 */
@WebServlet("/CheckCodeServlet")
public class CheckCodeServlet extends HttpServlet {

    Jedis jedis = new Jedis("192.168.6.200", 6379);

    CodeService codeService = new CodeServiceImpl();

    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        String phoneNo = request.getParameter("phone_no");
        String verifyCode = request.getParameter("verify_code");
        if (null == verifyCode || "".equals(verifyCode) || null == phoneNo || "".equals(phoneNo)) {
            return;
        }
        String code = codeService.getCode(jedis, phoneNo);
        if (null != verifyCode && verifyCode.equals(code)) {
            response.getWriter().write("true");
            // 删除验证码
            codeService.delCode(jedis, phoneNo);
        } else {
            response.getWriter().write("false");
        }
        // 释放资源
        jedis.close();
    }

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doPost(request, response);
    }
}

CodeService:

package com.atguigu.redis.service;

import redis.clients.jedis.Jedis;

/**
 * @author soberw
 * @Classname CodeService
 * @Description
 * @Date 2022-05-27 9:49
 */
public interface CodeService {

    /**
     * 判断当前手机号今天是否还有验证次数
     * 获取redis里面指定的手机号当天的验证次数,并根据次数返回验证结果
     *
     * @param jedis 数据源
     * @param phone 手机号
     * @return 如果次数>3 返回false ;如果小于,返回true;如果不存在此手机号的计数,设置初始值为1
     */
    boolean getAndSetCodeCount(Jedis jedis, String phone);

    /**
     * 给指定手机号设置验证码
     *
     * @param jedis      数据源
     * @param phone      手机号
     * @param verifyCode 验证码
     * @return
     */
    String setCode(Jedis jedis, String phone, String verifyCode);

    /**
     * 获取指定手机号在Redis里面的验证码
     *
     * @param jedis
     * @param phone
     * @return
     */
    String getCode(Jedis jedis, String phone);

    /**
     * 删除指定手机号的验证码
     * @param jedis
     * @param phone
     */
    void delCode(Jedis jedis,String phone);

}

CodeServiceImpl:

package com.atguigu.redis.service;

import redis.clients.jedis.Jedis;

import java.time.Duration;
import java.time.LocalTime;

/**
 * @author soberw
 * @Classname CodeServiceImpl
 * @Description
 * @Date 2022-05-27 9:50
 */
public class CodeServiceImpl implements CodeService {

    /**
     * 最大验证次数
     */
    private final Integer MAX_VERIFY_COUNT = 3;


    /**
     * 获取当天剩余秒数的方法
     *
     * @return
     */
    private long getTheLeftSeconds() {
        //获取现在的时间
        LocalTime now = LocalTime.now();
        //获取当日23点59分59秒的时间
        LocalTime end = LocalTime.of(23, 59, 59);
        //获取end与now相差的秒数
        long millis = Duration.between(now, end).toMillis() / 1000;
        return millis;
    }

    @Override
    public boolean getAndSetCodeCount(Jedis jedis, String phone) {
        String phoneNo = "user:" + phone + ":count";
        //获取次数
        String count = jedis.get(phoneNo);
        //如果存在并且不为空
        if (count != null && !"".equals(count)) {
            int i = Integer.parseInt(count);
            //判断这是第几次
            if (i < MAX_VERIFY_COUNT) {
                //说明还有次数
                //原有的次数 + 1
                jedis.incr(phoneNo);
                return true;
            } else {
                return false;
            }
        }
        // 计数器设置为1,时间为 时间至今晚24点
        jedis.setex(phoneNo, (int) getTheLeftSeconds(), "1");
        return true;
    }

    @Override
    public String setCode(Jedis jedis, String phone, String verifyCode) {
        String phoneNo = "user:" + phone + ":code";
        //存储并设置 120 秒有效
        return jedis.setex(phoneNo, 120, verifyCode);
    }

    @Override
    public String getCode(Jedis jedis, String phone) {
        String phoneNo = "user:" + phone + ":code";
        return jedis.get(phoneNo);
    }

    @Override
    public void delCode(Jedis jedis, String phone) {
        String phoneNo = "user:" + phone + ":code";
        jedis.del(phoneNo);
    }
}

测试一下:

image-20220527150132742

image-20220527150143546

image-20220527150217122

image-20220527150233729

当发送超过三次时:

image-20220527150331687

image-20220527150319311

秒杀案例

需求

  1. 商品在库存中的数量确定
  2. 当用户抢到商品后,对应的商品库存数量够跟着变化
  3. 同一时间多名用户抢购秒杀,确保库存和用户数据不出错

分析

使用Redis处理计数器和人员记录的事务操作

image-20220528163704610

其实业务逻辑很简单,就是用户点击抢购后,后台向Redis发送给查询库存,判断库存是否还有余量,如果有则下单成功,库存数量-1,同时成功名单+1。

简单的实现如下:

//秒杀过程
public static boolean doSecKill(String uid,String prodid) throws IOException {
   // 判空
   if(null ==uid || "".equals(uid)|| null == prodid || "".equals(prodid)){
      return false;
   }
   // 拼接key
   // 商品库存key
   String kc ="sk:"+prodid+":qt";
   // 秒杀成功者key
   String userskey ="sk:"+prodid+":usr";

   // jedis
   Jedis jedis=new Jedis("192.168.6.101",6379);
   String kcCount = jedis.get(kc);
   if(Integer.parseInt(kcCount)<=0){
      System.out.println("秒杀结束");
      return false;
   }
   // 商品库存-1
   jedis.decr(kc);
   // 将用户ID保存到成功者list中
   jedis.sadd(userskey,uid);
   System.out.println("秒杀成功");
   Jedis.close();
   return true;
}

但是当同一时间并发量过大的时候,要保证这一过程不出错,也是不容易的。

其实造成这的根本原因还是因为没有保证操作的原子性

我们知道,Redis的指令操作线程worker是单线程执行的,即同一时间只会有一条指令被执行,这就保证了指令执行的原子性。

但是Redis严格来说并不是单线程的运行模式,其控制访问的IO线程是多线程。

因为秒杀的操作是一系列的,查看库存–>修改库存–>保存用户,而非单一指令。

因此必然会出错。

测试

并发模拟器

因为同一时间很难找到那么多的人帮我们点击模拟,因此这里可以借助于一个并发测试模拟工具ab模拟测试:

需要在Linux环境下安装:

CentOS6 默认安装

CentOS7需要手动安装

  • 联网安装方式
yum install httpd-tools
  • 无网络安装方式

(1) 进入cd /run/media/root/CentOS 7 x86_64/Packages(路径跟centos6不同)

(2) 顺序安装

1 apr-1.4.8-3.el7.x86_64.rpm

2 apr-util-1.5.2-6.el7.x86_64.rpm

3 httpd-tools-2.4.6-67.el7.centos.x86_64.rpm

并发测试实现

vim postfile 模拟表单提交参数,以&符号结尾;存放当前目录。文件内容:prodid=0101&

ab -n 总访问数 -c 并发量 -k -p ~/postfile -T application/x-www-form-urlencoded http://主机Ip地址:端口号/访问路径

实测指令:

ab -n 1000 -c 100 -k -p ~/postfile -T application/x-www-form-urlencoded http://192.168.8.110:8080/redis_seckill_war_exploded/doseckill

超卖问题

  • 发现买多了,超出了设定的数量

1637659324063

1637659331757

  • 超卖产生的原因: 没有事务控制,一个用户在秒杀商品时,被其他用户打断

1637659366039

解决方案

方案一:乐观锁

  • 可以使用乐观锁处理超卖问题

1637659493882

//增加乐观锁
jedis.watch(qtkey);
 
//3.判断库存
String qtkeystr = jedis.get(qtkey);
if(qtkeystr==null || "".equals(qtkeystr.trim())) {
    System.out.println("未初始化库存");
    jedis.close();
    return false ;
}
 
int qt = Integer.parseInt(qtkeystr);
if(qt<=0) {
    System.err.println("已经秒光");
    jedis.close();
    return false;
}
 
//增加事务
Transaction multi = jedis.multi();
 
//4.减少库存
//jedis.decr(qtkey);
multi.decr(qtkey);
 
//5.加人
//jedis.sadd(usrkey, uid);
multi.sadd(usrkey, uid);
 
//执行事务
List<Object> list = multi.exec();
 
//判断事务提交是否失败
if(list==null || list.size()==0) {
    System.out.println("秒杀失败");
    jedis.close();
    return false;
}
System.err.println("秒杀成功");
jedis.close();

超卖问题解决

1637659575817

1637659581969


但是此种方式存在着问题:

  • 连接超时问题

    Redis同一时刻的连接数量是有一定限制的,当并发量过大的时候,因为Redis的连接数量管控,会导致部分连接超时:

    image-20220528171530311

  • 库存遗留问题

    当库存量过多的时候,就会出现问题:

    image-20220528170442298

    image-20220528170400003

    我们发现,2000个人同时抢500个商品,但是最后却没有卖完,库存里还剩了474个,这显然也是不合理的。

    其实这与乐观锁的处理机制有关。

    乐观锁的处理方式是:

    当不同的用户查询库存的时候,Redis都给他们分别打上标签,当一个用户率先抢占到线程后,成功减掉库存后,对应的标签就会发生变化。

    而另一个同一时间的用户再去减库存的时候,发现标签变化了,就不再执行减库存操作了,通俗说也就是没抢到,但是此时库存里明明是有商品的,只是与查询的时候不一样而已。

    因此导致了库存遗留问题。

方案二:连接池

  • 加连接池可以处理连接超时,但是仍然会出现库存
//秒杀过程
public static boolean doSecKill(String uid,String prodid) throws IOException {
    // 判空
    if(null ==uid || "".equals(uid)|| null == prodid || "".equals(prodid)){
        return false;
    }
    // 拼接key
    // 商品库存key
    String kc ="sk:"+prodid+":qt";
    // 秒杀成功者key
    String userskey ="sk:"+prodid+":usr";


    // 获取连接池
    JedisPool jedisPool = JedisPoolUtil.getJedisPoolInstance();
    // jedis
    Jedis jedis=jedisPool.getResource();
    // 给库存key上乐观锁
    jedis.watch(kc);
    // 开启事务
    String kccount = jedis.get(kc);
    if(Integer.parseInt(kccount)<=0){
        System.out.println("秒杀结束");
        // 归还连接到连接池
        JedisPoolUtil.release(jedisPool,jedis);
        return false;
    }
    Transaction multi = jedis.multi();
    // 组队
    // 商品库存-1
    //jedis.decr(kc);
    multi.decr(kc);
    // 将用户ID保存到成功者list中
    //jedis.sadd(userskey,uid);
    multi.sadd(userskey,uid);

    // 执行
    List<Object> exec = multi.exec();
    if(null == exec || exec.size()==0){
        System.out.println("秒杀失败");
        // 归还连接到连接池
        JedisPoolUtil.release(jedisPool,jedis);
        return false;
    }


    System.out.println("秒杀成功");
    // 归还连接到连接池
    JedisPoolUtil.release(jedisPool,jedis);
    return true;
}

方案三:LUA脚本

因为Redis是原子性的,同一时刻只会执行一条指令,因此,我们可以将这一系列操作变成一条指令,交由Redis去处理,这样就可以解决库存问题,如何将一系列操作变成一条指令呢?

可以借助于LUA脚本:

利用lua脚本淘汰用户,解决超卖问题。

redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题

1637717375049

LUA脚本如下:

local userid=KEYS[1]; 
local prodid=KEYS[2];
local qtkey="sk:"..prodid..":qt";
local usersKey="sk:"..prodid..":usr'; 
local userExists=redis.call("sismember",usersKey,userid);
if tonumber(userExists)==1 then 
  return 2;
end
local num= redis.call("get" ,qtkey);
if tonumber(num)<=0 then 
  return 0; 
else 
  redis.call("decr",qtkey);
  redis.call("sadd",usersKey,userid);
end
return 1;

代码实现:

public class SecKill_redisByScript {
	
	static String secKillScript ="local userid=KEYS[1];\r\n" + 
			"local prodid=KEYS[2];\r\n" + 
			"local qtkey='sk:'..prodid..\":qt\";\r\n" + 
			"local usersKey='sk:'..prodid..\":usr\";\r\n" + 
			"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" + 
			"if tonumber(userExists)==1 then \r\n" + 
			"   return 2;\r\n" + 
			"end\r\n" + 
			"local num= redis.call(\"get\" ,qtkey);\r\n" + 
			"if tonumber(num)<=0 then \r\n" + 
			"   return 0;\r\n" + 
			"else \r\n" + 
			"   redis.call(\"decr\",qtkey);\r\n" + 
			"   redis.call(\"sadd\",usersKey,userid);\r\n" + 
			"end\r\n" + 
			"return 1" ;

	public static boolean doSecKill(String uid,String prodid) throws IOException {

		JedisPool jedispool =  JedisPoolUtil.getJedisPoolInstance();
		Jedis jedis=jedispool.getResource();

		 //String sha1=  .secKillScript;
		String sha1=  jedis.scriptLoad(secKillScript);
		Object result= jedis.evalsha(sha1, 2, uid,prodid);

		  String reString=String.valueOf(result);
		if ("0".equals( reString )  ) {
			System.err.println("已抢空!!");
		}else if("1".equals( reString )  )  {
			System.out.println("抢购成功!!!!");
		}else if("2".equals( reString )  )  {
			System.err.println("该用户已抢过!!");
		}else{
			System.err.println("抢购异常!!");
		}
		jedis.close();
		return true;
	}
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-06 17:25:11  更:2022-06-06 17:26:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 4:47:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码