IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> redis学习(借鉴网上的文档) -> 正文阅读

[大数据]redis学习(借鉴网上的文档)

Redis入门

Linux安装

下载地址 http://download.redis.io/releases/redis-5.0.7.tar.gz

安装步骤

1、下载获得 redis-5.0.7.tar.gz 后将它放到我们Linux的目录下 /opt

2、/opt 目录下,解压命令 : tar -zxvf redis-5.0.7.tar.gz

3、解压完成后出现文件夹:redis-5.0.7

4、进入目录: cd redis-5.0.7

5、在 redis-5.0.7 目录下执行 make 命令

运行make命令时故意出现的错误解析: 
    1. 安装gcc (gcc是linux下的一个编译程序,是c程序的编译工具) 
    能上网: yum install gcc-c++ 
        版本测试: gcc-v 
    2. 二次make 
    3. Jemalloc/jemalloc.h: 没有那个文件或目录 
        运行 make distclean 之后再make 
        4. Redis Test(可以不用执行)

6、如果make完成后继续执行 make install

7、查看默认安装目录:usr/local/bin

/usr 这是一个非常重要的目录,类似于windows下的Program Files,存放用户的程序

8、拷贝配置文件(备用)

cd /usr/bin
    ls -l 
    # 在redis的解压目录下备份redis.conf 
    mkdir myredis 
    cp redis.conf myredis 
    # 拷一个备份,养成良好的习惯,我们就修改这个文件 
    # 修改配置保证可以后台应用 
    vim redis.conf

img

daemonize 设置yes或者no区别

  • daemonize:yes

    • redis采用的是单进程多线程的模式。当redis.conf中选项daemonize设置成yes时,代表开启守护进程模式。在该模式下,redis会在后台运行,并将进程pid号写入至redis.conf选项pidfifile设置的文件中,此时redis将一直运行,除非手动kill该进程。
  • daemonize:no

    • 当daemonize选项设置成no时,当前界面将进入redis的命令行界面,exit强制退出或者关闭连接工具(putty,xshell等)都会导致redis进程退出。

9、测试一下!

# 我使用的是宝塔下载,所以及宝塔启动了
[root@VM-0-5-centos ~]# cd /usr/bin
    # redis客户端连接===> 观察地址的变化,如果连接ok,是直接连上的,redis默认端口号 6379
[root@VM-0-5-centos bin]# redis-cli -p 6379

127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set k1 hellword
OK
127.0.0.1:6379> get k1
"hellword"

# 【shell】ps显示系统当前进程信息
[root@VM-0-5-centos /]# ps -ef | grep redis
redis     7955     1  0 00:12 ?        00:00:00 /www/server/redis/src/redis-server 127.0.0.1:6379
root      8322  4825  0 00:13 pts/0    00:00:00 redis-cli -p 6379
root     15654 10426  0 00:16 pts/2    00:00:00 grep --color=auto redis

# 【redis】关闭连接 
127.0.0.1:6379> shutdown 
not connected> exit

# 【shell】ps显示系统当前进程信息 
[root@192 myredis]# ps -ef|grep redis 
root 16140 16076 0 04:53 pts/2 00:00:00 grep --color=auto redis

img

img

基础说明

准备工作:开启redis服务,客户端连接

redis压力测试工具-----Redis-benchmark

Redis-benchmark是官方自带的Redis性能测试工具,可以有效的测试Redis服务的性能。

img

redis 性能测试工具可选参数如下所示:

img

# 测试一:100个并发连接,100000个请求,检测host为localhost 端口为6379的redis服务器性 能
redis-benchmark -h localhost -p 6379 -c 100 -n 100000 
# 测试出来的所有命令只举例一个! 
====== SET ====== 
100000 requests completed in 1.88 seconds # 对集合写入测试 
100 parallel clients # 每次请求有100个并发客户端 
3 bytes payload # 每次写入3个字节的数据,有效载荷 
keep alive: 1 # 保持一个连接,一台服务器来处理这些请求 
17.05% <= 1 milliseconds 
97.35% <= 2 milliseconds 
99.97% <= 3 milliseconds 
100.00% <= 3 milliseconds # 所有请求在 3 毫秒内完成 53248.14 requests per second # 每秒处理 53248.14 次请求

基础数据库常识

默认16个数据库,类似数组下标从零开始,初始默认为零号库

查看 redis.conf ,里面有默认的配置 
databases 16 

# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where 
# dbid is a number between 0 and 'databases'-1 
databases 16

Select命令切换数据库

127.0.0.1:6379> select 7
OK
127.0.0.1:6379[7]> 
# 不同的库可以存不同的数据

Dbsize查看当前数据库的key数量

127.0.0.1:6379> select 7
OK
127.0.0.1:6379[7]> dbsize
(integer) 0
127.0.0.1:6379[7]> SELECT 0
OK
127.0.0.1:6379> DBSIZE
(integer) 1
127.0.0.1:6379> KEYS *
1) "k1"

img

Flushdb:清空当前库

Flushall:清空全部的库

127.0.0.1:6379> DBSIZE
(integer) 1
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> DBSIZE
(integer) 0

为什么默认端口是6379呢? 粉丝效应

redis是单线程的

我们首先要明白,Redis很快!官方表示,因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了!

6Redis采用的是基于内存的采用的是单进程单线程模型的 KV 数据库,由C语言编写,官方提供的数据是可以达到100000+的QPS(每秒内查询次数)。这个数据不比采用单进程多线程的同样基于内存的 KV数据库 Memcached 差!

Redis为什么单线程还这么快?

1.误区: 高性能的服务器一定是多线程的?

2.误区: 多线程(CPU上下文切换) 一定比单线程效率高

CPU>内存>硬盘速度(juc)

核心: redis是将所有的数据全部放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!!!) ,对于内存系统来说,如果没有上下文切换效率就是最高的! 多次读写对都是在一个CPU上的,在内存情况下,这个就是最佳的方案

五大数据类型

Redis是一个开放源代码(BSD许可)的内存中数据结构存储,用作数据库,缓存和消息代理。它支持数

据结构,例如字符串,哈希,列表,集合,带范围查询的排序集合,位图,超日志,带有半径查询和流

的地理空间索引。Redis具有内置的复制,Lua脚本,LRU驱逐,事务和不同级别的磁盘持久性,并通过

Redis Sentinel和Redis Cluster自动分区提供了高可用性。

String(字符串类型)

String是redis最基本的类型,你可以理解成Memcached一模一样的类型,一个key对应一个value

String类型时二进制安全的,意思是redis的String可以包含任何数据,比如jpg图片或者序列化的对象。

String类型时redis最基本的数据类型,一个redis中字符串value最多可以是512M

Hash(哈希,类似Java里的Map)

Redis hash是一个键值对集合

Redis hash是一个String类型的field和value的映射表,hash特别适合用于存储对象。

类似于Java里面的Map<String, Object>

List(列表)

Redis列表是简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)

它的底层实际是个链表

Set(集合)

Redis的Set是String类型的无序集合,它是通过HashTable实现的!

Zset(sorted set:有序集合)

Redis zset和set一样,也是String类型元素的集合,且不允许重复的成员。

不同的是每个元素都会关联一个double类型的分数

Redis正是通过分数来为集合的成员进行从小到大的排序,zset的成员是唯一的,但是分数(Score)且可以重复。

Redis键(key)

1.exists name # 判断当前key是否存在 存在为1 反之为0

2.move name # 移除当前key

3.expire name 10 # 设置key的过期时间,单位是秒

4.ttl name # 查看当前key的剩余时间

5.type name # 查看key的类型

127.0.0.1:6379> set name bingning
OK
127.0.0.1:6379> KEYS *
1) "name"
# exists key 的名字,判断某个key是否存在
127.0.0.1:6379> EXISTS name
(integer) 1
127.0.0.1:6379> EXISTS name1
(integer) 0

#移除当前库中的key
127.0.0.1:6379> move name 1
(integer) 1
127.0.0.1:6379> key *
127.0.0.1:6379> keys *
(empty array)

# 为给点key设置生存时间,  时间到了 他会被自动删除
# ttl key 查看还有多少秒过期, -1表示永不过期,-2表示已过期
127.0.0.1:6379> EXPIRE name 20
(integer) 1
127.0.0.1:6379> TTL name
(integer) 15
127.0.0.1:6379> TTL name
(integer) 13
127.0.0.1:6379> TTL name
(integer) -2
127.0.0.1:6379> keys *
(empty array)

# type key 查看key的类型
127.0.0.1:6379> set name bingnign
OK
127.0.0.1:6379> TYPE name
string

String

单值Value

常规命令说明:

  1. append name “shanjiao” # 追加字符串,如果当前key不存在相当于setkey

  2. strlen name # 查看指定key的长度

  3. incr views # views自增1

  4. decr views # views自减1

  5. incrby views 10 # 自定义设置自增步长

  6. decrby views 5 # 自定义设置自减步长

  7. getrange 获取指定区间范围内的值

  8. setrange 设置指定区间范围内的值,格式是setrange key值

  9. setex(set with expire)键秒值 setnx(set if not exist)

  10. mset Mset 命令用于同时设置一个或多个 key-value 对。

  11. mget Mget 命令返回所有(一个或多个)给定 key 的值。

  12. msetnx 当所有 key 都成功设置,返回 1

  13. 这里的key是一个巧妙的设计: user:{id} {filed},如此设计在redis中完全可以

  14. getset # 先get再set

#=================================================== 
# set、get、del、append、strlen 
#===================================================
127.0.0.1:6379> set name bingnign  # 设置值
OK
127.0.0.1:6379> get name  # 获取值
"bingnign"
127.0.0.1:6379> DEL name  # 删除值
(integer) 1
127.0.0.1:6379> keys *   # 查看全部的key
(empty array)
127.0.0.1:6379> EXISTS name  # 查看key是否存在
(integer) 0
127.0.0.1:6379> append name "hello"  # 对不存在的key进行Append,等同于set name "hello"
(integer) 5   #字符串长度
127.0.0.1:6379> get name
"hello"
127.0.0.1:6379> append name "-4243"  # 对已存在的字符串进行Append
(integer) 10  # 长度从5字符增加到10个字符
127.0.0.1:6379> get name
"hello-4243"
127.0.0.1:6379> STRLEN name # 获取字符串长度
(integer) 10

#=================================================== 
# incr、decr 一定要是数字才能进行加减,+1 和 -1。 
# incrby、decrby 命令将 key 中储存的数字加上指定的增量值。 
#===================================================
127.0.0.1:6379> set views 0 #设置游览量为0
OK
127.0.0.1:6379> INCR views  #游览 +1
(integer) 1
127.0.0.1:6379> INCR views # 游览 +1
(integer) 2
127.0.0.1:6379> decr views # 游览减1
(integer) 1
127.0.0.1:6379> incrby views 19 # 指定游览 +19
(integer) 20
127.0.0.1:6379> decrby views 20 # 指定游览减19
(integer) 0

#=================================================== 
# range [范围] # getrange 获取指定区间范围内的值,类似between...and的关系,从零到负一表示全部 
#===================================================
127.0.0.1:6379> set key2 fdshhe23
OK
127.0.0.1:6379> getrange key2 0 -1  # 获取全部范围的值
"fdshhe23"
127.0.0.1:6379> getrange key2 0 5 # 从第一个位置开始,获取0-5的值
"fdshhe"

#=================================================== 
# setrange 设置指定区间范围内的值,格式是setrange key值 具体值 
#===================================================
127.0.0.1:6379> get key2
"fdshhe23"
127.0.0.1:6379> SETRANGE key2 1 fds # 替换值,从第二个位置开始
(integer) 8
127.0.0.1:6379> get key2
"ffdshe23"
127.0.0.1:6379> SETRANGE key2 3 fdsetrtd # 替换值超过了已有值自动添加
(integer) 11
127.0.0.1:6379> get key2
"ffdfdsetrtd"

#=================================================== 
# setex(set with expire)键秒值 
# setnx(set if not exist) 
#===================================================
127.0.0.1:6379> setex key3 60 expire # 设置过期时间
OK
127.0.0.1:6379> ttl key3 # 查看剩余的世界
(integer) 53
127.0.0.1:6379> get key3
"expire"
127.0.0.1:6379> setnx mykey "redis" # 如果不存在就设置,成功返回1
(integer) 1
127.0.0.1:6379> setnx mykey "mongodb" # 如果存在就设置,失败返回0
(integer) 0
127.0.0.1:6379> get mykey
"redis"

#=================================================== 
# mset Mset 命令用于同时设置一个或多个 key-value 对。 
# mget Mget 命令返回所有(一个或多个)给定 key 的值。
# 如果给定的 key 里面,有某个 key 不存在,那么这个 key 返回特殊值 nil 。 
# msetnx 当所有 key 都成功设置,返回 1 。 
# 如果所有给定 key 都设置失败(至少有一个 key 已经存在),那么返回 0 。原子操 作#===================================================

127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 
OK
127.0.0.1:6379> keys *
1) "k2"
2) "k3"
3) "k1"
127.0.0.1:6379> mget k1 k2 k3 k4
1) "v1"
2) "v2"
3) "v3"
4) (nil)
127.0.0.1:6379> msetnx k1 k4 k5 k6 # 原子性操作
(integer) 0
127.0.0.1:6379> get key5
(nil)

# 传统对象缓存
127.0.0.1:6379> set user:1 value
OK
127.0.0.1:6379> get user:1
"value"

# 对象
set user:1 {name:zhangsan,age:3} # 设置一个user:1 对象 的值为json字符来保存一个对象
# 这里的key是一个巧妙的设计: user:{id} {filed},如此设计在redis中完全可以

127.0.0.1:6379> mset user:1:name zhangsan user:1:age 2
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "2"

# getset(先get再set)

127.0.0.1:6379> get db
"mongodb"
127.0.0.1:6379> getset db redis
"mongodb"
127.0.0.1:6379> get db
"redis"

String数据结构是简单的key-value类型,value其实不仅可以是String,也可以是数字。

常规key-value缓存应用:

常规计数:微博数,粉丝数等

列表List

  • Lpush:将一个或多个值插入到列表头部。(左)
  • rpush:将一个或多个值插入到列表尾部。(右)
  • lrange:返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。
  • lpop命令用于移除并返回列表的第一个元素。当列表 key 不存在时,返回 nil 。
  • rpop移除列表的最后一个元素,返回值为移除的元素。
  • Lindex,按照索引下标获得元素(-1代表最后一个,0代表是第一个)
  • llen用于返回列表的长度。
  • lrem key 根据参数 COUNT 的值,移除列表中与参数 VALUE 相等的元素。
  • rpoplpush移除列表的最后一个元素,并将该元素添加到另一个列表并返回。
  • lset key index value 将列表 key 下标为 index 的元素的值设置为 value
  • linsert key before/after pivot value 用于在列表的元素前或者后插入元素。 将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。
#===================================================
# Lpush:将一个或多个值插入到列表头部。(左) 
# rpush:将一个或多个值插入到列表尾部。(右) 
# lrange:返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。 
# 其中 0 表示列表的第一个元素, 1 表示列表的第二个元素,以此类推。 
# 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此 类推。 
#==================================================
127.0.0.1:6379> lpush list "ont"
(integer) 1
127.0.0.1:6379> LPUSH list "tpw" "fed"
(integer) 3
127.0.0.1:6379> rpush list "right"
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "fed"
2) "tpw"
3) "ont"
4) "right"
127.0.0.1:6379> lrange list 0 1
1) "fed"
2) "tpw"

#=================================================== 
# lpop 命令用于移除并返回列表的第一个元素。当列表 key 不存在时,返回 nil 。 
# rpop 移除列表的最后一个元素,返回值为移除的元素。 
#===================================================
127.0.0.1:6379> lrange list 0 1
1) "fed"
2) "tpw"
127.0.0.1:6379> LPOP list
"fed"
127.0.0.1:6379> rpop list
"right"
127.0.0.1:6379> lrange list 0 -1
1) "tpw"
2) "ont"

#=================================================== 
# Lindex,按照索引下标获得元素(-1代表最后一个,0代表是第一个) 
#===================================================
127.0.0.1:6379> lindex list 1
"ont"
127.0.0.1:6379> lindex list 0
"tpw"
127.0.0.1:6379> lindex list -1
"ont"

#=================================================== 
# llen 用于返回列表的长度。 
#===================================================
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> lpush list "one" "two" "three"
(integer) 3
127.0.0.1:6379> LLEN list
(integer) 3

#=================================================== 
# lrem key 根据参数 COUNT 的值,移除列表中与参数 VALUE 相等的元素。 
#===================================================
127.0.0.1:6379> lrem mylist 1 "hello"
(integer) 1
127.0.0.1:6379> lrange mylist 0 -1
1) "foo"

#=================================================== 
# Ltrim key 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区 间之内的元素都将被删除。 
#===================================================
127.0.0.1:6379> rpush mylist "hello" "hello1" "hello2" "hello3"
(integer) 4
127.0.0.1:6379> LTRIM mylist 1 2
OK
127.0.0.1:6379> lrange mylist 0 -1
1) "hello1"
2) "hello2"

#=================================================== 
# rpoplpush 移除列表的最后一个元素,并将该元素添加到另一个列表并返回。 
#===================================================
127.0.0.1:6379> rpush mylist "hello" "foo" "bar"
(integer) 3
127.0.0.1:6379> RPOPLPUSH mylist myotherlist
"bar"
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) "foo"
127.0.0.1:6379> lrange myotherlist 0 -1
1) "bar"

#=================================================== 
# lset key index value 将列表 key 下标为 index 的元素的值设置为 value 。 
#===================================================
127.0.0.1:6379> exists list # 对空列表(key 不存在)进行List
(integer) 0
127.0.0.1:6379> lset list 0 item  # 报错
(error) ERR no such key
127.0.0.1:6379> lpush list "valuel"  # 对非空列表进行lset
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "valuel"
127.0.0.1:6379> lset list 0 "new" # 更新值
OK
127.0.0.1:6379> lrange list 0 0
1) "new"
127.0.0.1:6379> liset list 1 "new" # index 超出范围报错
(error) ERR unknown command `liset`, with args beginning with: `list`, `1`, `new`, 

#=================================================== 
# linsert key before/after pivot value 用于在列表的元素前或者后插入元素。 
# 将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。 
#===================================================
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "world"
(integer) 2
127.0.0.1:6379> LINsert mylist before "world" "Three"
(integer) 3
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) "Three"
3) "world"

小结

  • 他实际上是一个链表,before Bode after, left, right 都可以插入值
  • 如果key不存在,创建新的链表
  • 如果key存在,新增内容
  • 如果移除了所有值,空链表,也就代表不存在
  • 在两边插入或者改动值,效率最高!中间元素,相对俩手效率会低一点~

消息队列,消息排队!(Lpush Rpop) , 栈 (Lpush Lpop)!

集合Set

  • sadd 将一个或多个成员元素加入到集合中,不能重复

  • smembers 返回集合中的所有的成员。

  • sismember 命令判断成员元素是否是集合的成员。

  • scard,获取集合里面的元素个数

  • srem key value 用于移除集合中的一个或多个成员元素

  • srandmember key 命令用于返回集合中的一个或多个随机元素。

  • spop key 用于移除集合中的指定 key 的一个或多个随机元素

  • smove SOURCE DESTINATION MEMBER

  • 将指定成员 member 元素从 source 集合移动到 destination 集合。

  • 数字集合类

    • 差集: sdiff
    • 交集: sinter
    • 并集: sunion
#=================================================== 
# sadd 将一个或多个成员元素加入到集合中,不能重复 
# smembers 返回集合中的所有的成员。 
# sismember 命令判断成员元素是否是集合的成员。
#===================================================
127.0.0.1:6379> sadd myset "hello"
(integer) 1
127.0.0.1:6379> sadd myset "kung"
(integer) 1
127.0.0.1:6379> sadd myset "kung" # 集合中已经存在,添加失败
(integer) 0
127.0.0.1:6379> SMEMBERS myset
1) "kung"
2) "hello"
127.0.0.1:6379> SISMEMBER myset "hello"
(integer) 1
127.0.0.1:6379> SISMEMBER myset "wo"
(integer) 0

# =================================================== 
# scard,获取集合里面的元素个数 
#===================================================
127.0.0.1:6379> scard myset
(integer) 2

#=================================================== 
# srem key value 用于移除集合中的一个或多个成员元素 
#===================================================
127.0.0.1:6379> srem myset "kung"
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "hello"

# srandmember key 命令用于返回集合中的一个随机元素。
127.0.0.1:6379> SMEMBERS myset
1) "bingbing"
2) "world"
3) "hello"
127.0.0.1:6379> SRANDMEMBER myset
"hello"
127.0.0.1:6379> SRANDMEMBER myset 2
1) "hello"
2) "world"
127.0.0.1:6379> SRANDMEMBER myset 2
1) "bingbing"
2) "hello"

# spop key 用于移除集合中的指定 key 的一个或多个随机元素
127.0.0.1:6379> SMEMBERS myset
1) "bingbing"
2) "world"
3) "hello"
127.0.0.1:6379> spop myset
"hello"
127.0.0.1:6379> spop myset 2
1) "bingbing"
2) "world"
127.0.0.1:6379> SMEMBERS myset
(empty array)

# smove SOURCE DESTINATION MEMBER 
# 将指定成员 member 元素从 source 集合移动到 destination 集合。
127.0.0.1:6379> sadd myset "hello" "world" "bingning"
(integer) 3
127.0.0.1:6379> sadd myset "set2"
(integer) 1
127.0.0.1:6379> smove myset myset2 "bingning"
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "set2"
2) "world"
3) "hello"
127.0.0.1:6379> SMEMBERS myset2
1) "bingning"

# =================================================== 
- 数字集合类 
	- 差集: sdiff 
	- 交集: sinter 
	- 并集: sunion 
#===================================================
127.0.0.1:6379> sadd key1 "a" "b" "c" 
(integer) 3
127.0.0.1:6379> sadd key2 "c" "d" "e"
(integer) 3
127.0.0.1:6379> sdiff key1 key2
1) "a"
2) "b"
127.0.0.1:6379> sinter key1 key2
1) "c"
127.0.0.1:6379> SUNION key1 key2
1) "b"
2) "c"
3) "a"
4) "e"
5) "d"

微博,b站 将A用户的所有关注的人放到set集合中,将他的粉丝也放在集合中

共同关注,共同爱好,二度好友,六度分割理论

哈希Hash

kv模式不变,但v是一个键值对

  • hset、hget 命令用于为哈希表中的字段赋值 。
  • hmset、hmget 同时将多个field-value对设置到哈希表中。会覆盖哈希表中已存在的字段。
  • hgetall 用于返回哈希表中,所有的字段和值。
  • hdel 用于删除哈希表 key 中的一个或多个指定字段
  • hlen 获取哈希表中字段的数量。
  • hexists 查看哈希表的指定字段是否存在。
  • hkeys 获取哈希表中的所有域(field)。
  • hvals 返回哈希表所有域(field)的值。
  • hincrby 为哈希表中的字段值加上指定增量值
  • hsetnx 为哈希表中不存在的的字段赋值 。
# hset、hget 命令用于为哈希表中的字段赋值 。 
# hmset、hmget 同时将多个field-value对设置到哈希表中。会覆盖哈希表中已存在的字段。 
# hgetall 用于返回哈希表中,所有的字段和值。 
# hdel 用于删除哈希表 key 中的一个或多个指定字段 
127.0.0.1:6379> hset myhash field1 "bingnign" field2 "we"
(integer) 2
127.0.0.1:6379> hget myhash field1
"bingnign"
127.0.0.1:6379> hmset myhash field1 "hello" field2 "worj"
OK
127.0.0.1:6379> hgetall myhash
1) "field1"
2) "hello"
3) "field2"
4) "worj"
127.0.0.1:6379> hdel myhash field1
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "worj"

# hlen 获取哈希表中字段的数量。
127.0.0.1:6379> hlen myhash
(integer) 1
127.0.0.1:6379> hset myhash field1 "gs"
(integer) 1
127.0.0.1:6379> hlen myhash
(integer) 2

# hexists 查看哈希表的指定字段是否存在。
127.0.0.1:6379> hexists myhash field1
(integer) 1
127.0.0.1:6379> hexists myhash gile
(integer) 0

# hkeys 获取哈希表中的所有域(field)。 
# hvals 返回哈希表所有域(field)的值。
127.0.0.1:6379> HKEYS myhash
1) "field2"
2) "field1"
127.0.0.1:6379> hvals myhash
1) "worj"
2) "gs"

# hincrby 为哈希表中的字段值加上指定增量值。
127.0.0.1:6379> hset myhash filed1 10
(integer) 1
127.0.0.1:6379> HINCRBY myhash filed1 1
(integer) 11
127.0.0.1:6379> hincrby myhash filed1 -1
(integer) 10

# hsetnx 为哈希表中不存在的的字段赋值 。
127.0.0.1:6379> hsetnx myhash filed1 "gsd"
(integer) 1
127.0.0.1:6379> hsetnx myhash filed1 "work"
(integer) 0
127.0.0.1:6379> hget myhash filed1
"gsd"

hash变更的数据user name age ,尤其是用户信息之类的,经常变动的信息! hash更适合于对象的存储,String更加适合字符串存储

有序集合Zset

在set基础上,加一个score值。之前set是k1 v1 v2 v3,现在zset是 k1 score1 v1 score2 v2

  • zadd将一个或多个成员元素及其分数值加入到有序集当中。
  • zrange返回有序集中,指定区间内的成员
  • zrangebyscore返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大) 次序排列。
  • zrevrange返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大) 次序排列。
  • zcard命令用于计算集合中元素的数量。
  • count计算有序集合中指定分数区间的成员数量。
  • zrank返回有序集中指定成员的排名。其中有序集成员按分数值递增(从小到大)顺序排列。
  • zrevrank返回有序集中成员的排名。其中有序集成员按分数值递减(从大到小)排序。
# zadd 将一个或多个成员元素及其分数值加入到有序集当中。 
# zrange 返回有序集中,指定区间内的成员
127.0.0.1:6379> zadd myzset 1 "one" 2 "two"
(integer) 2
127.0.0.1:6379> zrange myzset 0 -1
1) "one"
2) "two"

# zrangebyscore 返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大) 次序排列。
127.0.0.1:6379> zadd salary 1000 "zan" 1500 "bingni" 500 "xiaoq"
(integer) 3
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf
1) "xiaoq"
2) "zan"
3) "bingni"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores
1) "xiaoq"
2) "500"
3) "zan"
4) "1000"
5) "bingni"
6) "1500"
127.0.0.1:6379> ZREVRANGE salary 0 -1 withscores
1) "bingni"
2) "1500"
3) "zan"
4) "1000"
5) "xiaoq"
6) "500"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 1000  withscores
1) "xiaoq"
2) "500"
3) "zan"
4) "1000"

# zcard 命令用于计算集合中元素的数量。
127.0.0.1:6379> ZCARD salary
(integer) 3

# zcount 计算有序集合中指定分数区间的成员数量。
127.0.0.1:6379> zadd myset1 1 "jl" 2 "j2" 3 "j3"
(integer) 3
127.0.0.1:6379> zcount myset1 1 3
(integer) 3
127.0.0.1:6379> zcount myset1 1 2
(integer) 2

# zrank 返回有序集中指定成员的排名。其中有序集成员按分数值递增(从小到大)顺序排列。
127.0.0.1:6379> zrange salary 0 -1 withscores
1) "xiaoq"
2) "500"
3) "zan"
4) "1000"
5) "bingni"
6) "1500"
127.0.0.1:6379> zrank salary zan # 显示zan的薪水排名 
(integer) 1
127.0.0.1:6379> zrank salary bingni # 显示bingni的薪水排名,第三
(integer) 2

# zrevrank 返回有序集中成员的排名。其中有序集成员按分数值递减(从大到小)排序。
127.0.0.1:6379> ZREVRAnk salary zan
(integer) 1
127.0.0.1:6379> ZREVRAnk salary xiaoq
(integer) 2

和set相比,sorted set增加了一个权重参数score,使得集合中的元素能够按score进行有序排列,比如一个存储全班同学成绩的sorted set,其集合value可以是同学的学号,而score就可以是其考试得分,这样在数据插入集合的时候,就已经进行了天然的排序。可以用sorted set来做带权重的队列,比如普通消息的score为1,重要消息的score为2,然后工作线程可以选择按score的倒序来获取工作任务。让重要的任务优先执行。

排行榜应用,取TOP N操作 !

三种特殊数据类型

GEO地理位置

朋友的定位,附近的人,打车距离计算?

redis的 Geo 在redis3.2版本就推出了,这个功能可以推算地理位置的信息, 两地之间的距离,方圆几里的人

可以查询一些测试数据:http://www.jsons.cn/lngcode/

只有六个命令

geoadd

# 语法 
geoadd key longitude latitude member ... 
# 将给定的空间元素(纬度、经度、名字)添加到指定的键里面。 
# 这些数据会以有序集he的形式被储存在键里面,从而使得georadius和georadiusbymember这样的 命令可以在之后通过位置查询取得这些元素。 
# geoadd命令以标准的x,y格式接受参数,所以用户必须先输入经度,然后再输入纬度。 
# geoadd能够记录的坐标是有限的:非常接近两极的区域无法被索引。 
# 有效的经度介于-180-180度之间,有效的纬度介于-85.05112878 度至 85.05112878 度之间。, 当用户尝试输入一个超出范围的经度或者纬度时,geoadd命令将返回一个错误。
127.0.0.1:6379> geoadd china:city 116.23 40.22 北京 
(integer) 1 
127.0.0.1:6379> geoadd china:city 121.48 31.40 上海 113.88 22.55 深圳 120.21 30.20 杭州 
(integer) 3 
127.0.0.1:6379> geoadd china:city 106.54 29.40 重庆 108.93 34.23 西安 114.02 30.58 武汉 
(integer) 3

测试:百度搜索经纬度查询,模拟真实数据

geopos

解析:

# 语法 
geopos key member [member...] 

#从key里返回所有给定位置元素的位置(经度和纬度) 1234

测试:

127.0.0.1:6379> GEOPOS china:city 北京
1) 1) "116.23000055551528931"
   2) "40.2200010338739844"
127.0.0.1:6379> GEOPOS china:city 上海 重庆
1) 1) "121.48000091314315796"
   2) "31.40000025319353938"
2) 1) "106.54000014066696167"
   2) "29.39999880018641676"

geodist

解析:

# 语法 
geodist key member1 member2 [unit] 
# 返回两个给定位置之间的距离,如果两个位置之间的其中一个不存在,那么命令返回空值。 
# 指定单位的参数unit必须是以下单位的其中一个: 
# m表示单位为米 
# km表示单位为千米 
# mi表示单位为英里 
# ft表示单位为英尺 
# 如果用户没有显式地指定单位参数,那么geodist默认使用米作为单位。 
#geodist命令在计算距离时会假设地球为完美的球形,在极限情况下,这一假设最大会造成0.5%的误 差。

测试:

127.0.0.1:6379> geodist china:city 北京 上海
"1088785.4302"
127.0.0.1:6379> geodist china:city 重庆 北京 km
"1491.6716"

georadius

解析:

# 语法 
georadius key longitude latitude radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count] 
# 以给定的经纬度为中心, 找出某一半径内的元素

测试:重新连接 redis-cli,增加参数 --raw ,可以强制输出中文,不然会乱码

[root@VM-0-5-centos bin]# cd /usr/bin
[root@VM-0-5-centos bin]# redis-cli --raw -p 6379
# 在 china:city 中寻找坐标 100 30 半径为 1000km 的城市
127.0.0.1:6379> georadius china:city 100 30 1000 km
重庆

# withdist 返回位置名称和中心距离
127.0.0.1:6379> georadius china:city 100 30 1000 km withdist
重庆
635.2850

# withcoord 返回位置名称和经纬度
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord
重庆
106.54000014066696167
29.39999880018641676

# withdist withcoord 返回位置名称 距离 和经纬度 count 限定寻找个数
127.0.0.1:6379> georadius china:city 100 30 1000 km withdist withcoord count 2
重庆
635.2850
106.54000014066696167
29.39999880018641676

georadiusbymember

解析:

# 语法 
georadiusbymember key member radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count] 
# 找出位于指定范围内的元素,中心点是由给定的位置元素决定

测试:

127.0.0.1:6379> GEORADIUSBYMEMBER china:city 上海 1000 km
上海
杭州

geohash

解析:

# 语法 
geohash key member [member...] 
# Redis使用geohash将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似 表示距离越近。

测试:

127.0.0.1:6379> geohash china:city 上海 北京
wtw6sk5n300
wx4sucu47r0
127.0.0.1:6379> geohash china:city 北京 杭州
wx4sucu47r0
wv7gu1xzg00

zrem

GEO没有提供删除成员的命令,但是因为GEO的底层实现是zset,所以可以借用zrem命令实现对地理位置信息的删除.

127.0.0.1:6379> geoadd china:city 116.23 40.22 beijin
1
127.0.0.1:6379> zrange china:city 0 -1
杭州
beijin
127.0.0.1:6379> zrem china:city beijin
1
127.0.0.1:6379> zrange china:city 0 -1
杭州

HyperLogLog(求基数)

什么是基数?

比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。

基数估计就是在误差可接受的范围内,快速计算基数。

简介

Redis 2.8.9 版本就跟新了Hyperloglog数据结构!

Redis Hyperloglog 基数统计的算法!

优点: 占用的内存是固定的, 2^64 不同的元素的基数,只需要12kb的内存即可,如果从内存角度来比较,且允许有误差(0.81%)Hyperloglog首选

网页的UV(一个人访问一个网站多次,但是还是算作一个人!)

传统的方式,set保存用户的id ,然后就可以统计set中元素数量作为标准!

set集合(无序,不重复)

这个方式如果保存大量的用户id,就会比较麻烦! 我们的目的是为了技术,而不是保存用户id

pfadd 添加

pfcount 计数

pfmerge 合并

测试使用

127.0.0.1:6379> pfadd mykey a b c d e f g h i j
1
127.0.0.1:6379> pfcount mykey 
10
127.0.0.1:6379> pfadd mykey2 i j l b o o h
1
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2
OK
127.0.0.1:6379> pfcount mykey3
12

bitmap (判断状态)

统计用户信息(活跃,不活跃) (登录,未登录)

打卡(打卡,未打卡)

两个状态的都可以使用bitmap

bitmap 位图 , 数据结构! 都是操作二进制位来进行记录的,就只有0和1两个状态

1字节 = 8bit

  • setbit 设置
  • getbit 获取
  • bitcount 统计为1的次数

测试:

127.0.0.1:6379> SETBIT login 0 1
0
127.0.0.1:6379> SETBIT login 1 0
0
127.0.0.1:6379> SETBIT login 2 1
0
127.0.0.1:6379> setbit login 3 1
0
127.0.0.1:6379> setbit login 4 0
0
127.0.0.1:6379> setbit login 8 0
0
127.0.0.1:6379> getbit login 2
1
127.0.0.1:6379> getbit login 8
0
127.0.0.1:6379> bitcount login # 统计为1的次数
3

Redis.conf

熟悉基本配置

位置

Redis 的配置文件位于 Redis 安装目录下,文件名为 redis.conf

config get * # 获取全部的配置 1

配置文件的地址:

img

我们一般情况下,会单独拷贝出来一份进行操作。来保证初始文件的安全。

Units 单位

img

1、配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit

2、对 大小写 不敏感

include 包含

img

和Spring配置文件类似,可以通过includes包含,redis.conf 可以作为总文件,可以包含其他文件!

netword 网络配置

bind 127.0.0.1 # 绑定的
ip protected-mode yes # 保护模式 
port 6379 # 默认端口

general 通用

daemonize yes # 默认情况下,Redis不作为守护进程运行。需要开启的话,改为 yes 
supervised no # 可通过upstart和systemd管理Redis守护进程 
pidfile /var/run/redis_6379.pid # 以后台进程方式运行redis,则需要指定pid 文件 
loglevel notice # 日志级别。可选项有: 
	# debug(记录大量日志信息,适用于开发、测试阶段); 
	# verbose(较多日志信息); 
	# notice(适量日志信息,使用于生产环境); 		# warning(仅有部分重要、关键信息才会被记录)。 
logfile "" # 日志文件的位置,当指定为空字符串时,为标准输出 
databases 16 # 设置数据库的数目。默认的数据库是DB 0 
always-show-logo yes # 是否总是显示logo

snapshopting快照

# 900秒(15分钟)内至少1个key值改变(则进行数据库保存--持久化) save 900 1 
# 300秒(5分钟)内至少10个key值改变(则进行数据库保存--持久化) save 300 10 
# 60秒(1分钟)内至少10000个key值改变(则进行数据库保存--持久化) 
save 60 10000 
stop-writes-on-bgsave-error yes # 持久化出现错误后,是否依然进行继续进行工作 
rdbcompression yes # 使用压缩rdb文件 yes:压缩,但是需要一些cpu的消耗。no:不压 缩,需要更多的磁盘空间 
rdbchecksum yes # 是否校验rdb文件,更有利于文件的容错性,但是在保存rdb文件的时 候,会有大概10%的性能损耗 
dbfilename dump.rdb # dbfilenamerdb文件名称 
dir ./ # dir 数据目录,数据库的写入会在这个目录。rdb、aof文件也会写在这个目录

REPLICATION 复制 我们后面讲主从复制再给大家讲解!这里先跳过!

SECURITY安全

访问密码的查看,设置和取消

# 启动redis 
# 连接客户端 
# 获得和设置密码 
config get requirepass 
config set requirepass "123456" 
#测试ping,发现需要验证 
127.0.0.1:6379> ping 
NOAUTH Authentication required. # 验证 
127.0.0.1:6379> auth 123456 
OK
127.0.0.1:6379> ping 
PONG

限制

maxclients 10000 # 设置能连上redis的最大客户端连接数量 
maxmemory <bytes> # redis配置的最大内存容量 
maxmemory-policy noeviction 
	# maxmemory-policy 内存达到上限的处理策略 
	#volatile-lru:利用LRU算法移除设置过过期时间的key。 
	#volatile-random:随机移除设置过过期时间的key。 
	#volatile-ttl:移除即将过期的key,根据最近过期时间来删除(辅以TTL) 
	#allkeys-lru:利用LRU算法移除任何key。 	 #allkeys-random:随机移除任何key。 		#noeviction:不移除任何key,只是返回一个写错误。

append only模式

appendonly no # 是否以append only模式作为持久化方式,默认使用的是rdb方式持久化,这种 方式在许多应用中已经足够用了 
appendfilename "appendonly.aof" # 

appendfilename AOF 文件名称 appendfsync everysec # appendfsync aof持久化策略的配置 
	# no表示不执行fsync,由操作系统保证数据同步到磁盘,速度最快。 
	# always表示每次写入都执行fsync,以保证数据同步到磁盘。 
	# everysec表示每秒执行一次fsync,可能会导致丢失这1s数据。

常见配置介绍

1、Redis默认不是以守护进程的方式运行,可以通过该配置项修改,使用yes启用守护进程

daemonize no

2、当Redis以守护进程方式运行时,Redis默认会把pid写入/var/run/redis.pid文件,可以通过pidfifile指

pidfifile /var/run/redis.pid

3、指定Redis监听端口,默认端口为6379,作者在自己的一篇博文中解释了为什么选用6379作为默认端口,因为6379在手机按键上MERZ对应的号码,而MERZ取自意大利歌女Alessia Merz的名字

port 6379

4、绑定的主机地址

bind 127.0.0.1

5、当 客户端闲置多长时间后关闭连接,如果指定为0,表示关闭该功能

timeout 300

6、指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为verbose

loglevel verbose

7、日志记录方式,默认为标准输出,如果配置Redis为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给/dev/null

logfifile stdout

8、设置数据库的数量,默认数据库为0,可以使用SELECT 命令在连接上指定数据库id

databases 16

9、指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合

save

Redis默认配置文件中提供了三个条件:

save 900 1

save 300 10

save 60 10000

分别表示900秒(15分钟)内有1个更改,300秒(5分钟)内有10个更改以及60秒内有10000个更改。

10、指定存储至本地数据库时是否压缩数据,默认为yes,Redis采用LZF压缩,如果为了节省CPU时

间,可以关闭该选项,但会导致数据库文件变的巨大

rdbcompression yes

11、指定本地数据库文件名,默认值为dump.rdb

dbfifilename dump.rdb

12、指定本地数据库存放目录

dir ./

13、设置当本机为slav服务时,设置master服务的IP地址及端口,在Redis启动时,它会自动从master进行数据同步

slaveof

14、当master服务设置了密码保护时,slav服务连接master的密码

masterauth

15、设置Redis连接密码,如果配置了连接密码,客户端在连接Redis时需要通过AUTH 命令提供密码,默认关闭

requirepass foobared

16、设置同一时间最大客户端连接数,默认无限制,Redis可以同时打开的客户端连接数为Redis进程可以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时,Redis会关闭新的连接并向客户端返回max number of clients reached错误信息

maxclients 128

17、指定Redis最大内存限制,Redis在启动时会把数据加载到内存中,达到最大内存后,Redis会先尝试清除已到期或即将到期的Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis新的vm机制,会把Key存放内存,Value会存放在swap区

maxmemory

18、指定是否在每次更新操作后进行日志记录,Redis在默认情况下是异步的把数据写入磁盘,如果不开启,可能会在断电时导致一段时间内的数据丢失。因为 redis本身同步数据文件是按上面save条件来同步的,所以有的数据会在一段时间内只存在于内存中。默认为no

appendonly no

19、指定更新日志文件名,默认为appendonly.aof

appendfifilename appendonly.aof

20、指定更新日志条件,共有3个可选值:

no:表示等操作系统进行数据缓存同步到磁盘(快)

always:表示每次更新操作后手动调用fsync()将数据写到磁盘(慢,安全)

everysec:表示每秒同步一次(折衷,默认值)

appendfsync everysec

21、指定是否启用虚拟内存机制,默认值为no,简单的介绍一下,VM机制将数据分页存放,由Redis将访问量较少的页即冷数据swap到磁盘上,访问多的页面由磁盘自动换出到内存中(在后面的文章我会仔细分析Redis的VM机制)

vm-enabled no

22、虚拟内存文件路径,默认值为/tmp/redis.swap,不可多个Redis实例共享

vm-swap-fifile /tmp/redis.swap

23、将所有大于vm-max-memory的数据存入虚拟内存,无论vm-max-memory设置多小,所有索引数据都是内存存储的(Redis的索引数据 就是keys),也就是说,当vm-max-memory设置为0的时候,其实是所有value都存在于磁盘。默认值为0

vm-max-memory 0

24、Redis swap文件分成了很多的page,一个对象可以保存在多个page上面,但一个page上不能被多个对象共享,vm-page-size是要根据存储的 数据大小来设定的,作者建议如果存储很多小对象,page大小最好设置为32或者64bytes;如果存储很大大对象,则可以使用更大的page,如果不 确定,就使用默认值

vm-page-size 32

25、设置swap文件中的page数量,由于页表(一种表示页面空闲或使用的bitmap)是在放在内存中的,,在磁盘上每8个pages将消耗1byte的内存。

vm-pages 134217728

26、设置访问swap文件的线程数,最好不要超过机器的核数,如果设置为0,那么所有对swap文件的操作都是串行的,可能会造成比较长时间的延迟。默认值为4

vm-max-threads 4

27、设置在向客户端应答时,是否把较小的包合并为一个包发送,默认为开启

glueoutputbuf yes

28、指定在超过一定的数量或者最大的元素超过某一临界值时,采用一种特殊的哈希算法

hash-max-zipmap-entries 64

hash-max-zipmap-value 512

29、指定是否激活重置哈希,默认为开启(后面在介绍Redis的哈希算法时具体介绍)

activerehashing yes

30、指定包含其它的配置文件,可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各

个实例又拥有自己的特定配置文件

include /path/to/local.conf

Redos持久化

RDB(Redis DataBase)

在主从复制中,rdb就是备用了!从机上面

什么是RDB

img

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里.

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中, 带持久化过程都结束了,再用这个临时文件替换上次持久化好的文件. 整个过程中,主进程是不进行任何IO操作的.这就确保了极高的性能.如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感.那RDB方式要比AOF方式更加的高效.RDB的缺点是最后一次持久化后的数据可能丢失. 我们默认的就是RDB, 一般情况下不需要修改这个配置

RDB保存的文件是,dump.rdb都是在我们的配置文件中快照中进行配置的!

img

触发机制

  1. save 的规则满足的情况下,会自动触发rdb规则
  2. 执行flushall命令 , 也会触发我们的rdb 规则!
  3. 退出redis,也会产生rdb文件!

备份就会自动生成一个dump.rdb

img

如何恢复rdb文件!

  1. 只需要将rdb 文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb恢复其中的数据!

  2. 查看需要存在的位置

127.0.0.1:6379> config get dir
 1) "dir"
 2) "/usr/local/bin" # 入伙在这个目录下存在dump.rdb 文件,启动就会自动恢复其中的数据
  1. 默认的配置就可以了

优点&缺点

优点:

  1. 适合大规模的数据恢复!
  2. 对数据的完整性要求不高!

缺点:

  1. 需要一定的时间间隔进行操作! 如果redis意外宕机了,这个最后一个修改的数据就没有了!
  2. fork进程的时候,会占用一定的内容空间!

AOF(Append Only File)

将我们的所有命令都记录下来,history , 恢复的时候就把这个文件全部在执行一遍

是什么

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

Aof保存的是 appendonly.aof 文件

img

配置

img

append

img

默认是不开启的,我们需要手动进行配置! 我们只需要将appendonly 改为yes就开启了aof!

重启,redis 就可以生效了!

如果这个aof文件有错位,这时候 redis是启动不起来的,我们需要修复这个aof文件

redis 给我提供了一个工具redis-check-aof --fix

img

如果文件正常,重启就可以直接恢复了!

img

AOF 启动/修复/恢复

正常恢复:

  • 启动:设置Yes,修改默认的appendonly no,改为yes

  • 将有数据的aof文件复制一份保存到对应目录(confifig get dir)

  • 恢复:重启redis然后重新加载

异常恢复:

  • 启动:设置Yes

  • 故意破坏 appendonly.aof 文件!

  • 修复: redis-check-aof --fix appendonly.aof 进行修复

  • 恢复:重启 redis 然后重新加载

Rewrite

是什么:

AOF 采用文件追加方式,文件会越来越大,为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis 就会启动AOF 文件的内容压缩,只保留可以恢复数据的最小指令集,可以使用命令 bgrewriteaof !

重写原理:

AOF 文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),遍历新进程的内存中数据,每条记录有一条的Set语句。重写aof文件的操作,并没有读取旧的aof文件,这点和快照有点类似!

触发机制:

Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的已被且文件大

于64M的触发。

优点和缺点

优点:

1、每修改同步:appendfsync always 同步持久化,每次发生数据变更会被立即记录到磁盘,性能较差

但数据完整性比较好

2、每秒同步: appendfsync everysec 异步操作,每秒记录 ,如果一秒内宕机,有数据丢失

3、不同步: appendfsync no 从不同步

缺点:

1、相同数据集的数据而言,aof 文件要远大于 rdb文件,恢复速度慢于 rdb。

2、Aof 运行效率要慢于 rdb,每秒同步策略效率较好,不同步效率和rdb相同。

总结

1、RDB 持久化方式能够在指定的时间间隔内对你的数据进行快照存储

2、AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。

3、只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化

4、同时开启两种持久化方式

  • 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。

  • RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段。

5、性能建议

  • 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留 save 900 1 这条规则。

  • 如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。

  • 如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。

Redis事务

理论

Redis事务的概念:

Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。

总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。

Redis事务没有隔离级别的概念:

批量操作在发送 EXEC 命令前被放入队列缓存,并不会被实际执行!

Redis不保证原子性:

Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。

Redis事务的三个阶段:

  • 开始事务

  • 命令入队

  • 执行事务

Redis事务相关命令:

watch key1 key2 ... #监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则 事务被打断 ( 类似乐观锁 ) 
multi # 标记一个事务块的开始( queued ) 
exec # 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 )
discard # 取消事务,放弃事务块中的所有命令 
unwatch # 取消watch对所有key的监控

实践

正常执行

img

放弃事务

img

若在事务队列中存在命令性错误(类似于java编译性错误),则执行EXEC命令时,所有命令都不会执行

img

若在事务队列中存在语法性错误(类似于java的1/0的运行时异常),则执行EXEC命令时,其他正确命令会被执行,错误命令抛出异常。

img

Watch监控

悲观锁:

悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿到这个数据就会block直到它拿到锁。传统的关系型数据库里面就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在操作之前先上锁。

**乐观锁:**乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁。但是在更新的时候会判断一下再此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读的应用类型,这样可以提高吞吐量,乐观锁策略:提交版本必须大于记录当前版本才能执行更新

测试:

1、初始化信用卡可用余额和欠额

127.0.0.1:6379> set balance 100 
OK
127.0.0.1:6379> set debt 0 
OK

2、使用watch检测balance,事务期间balance数据未变动,事务执行成功

127.0.0.1:6379> watch balance 
OK
127.0.0.1:6379> MULTI 
OK
127.0.0.1:6379> decrby balance 20 
QUEUED 
127.0.0.1:6379> incrby debt 20 
QUEUED 
127.0.0.1:6379> exec 
1) (integer) 80 
2) (integer) 20

3、使用watch检测balance,事务期间balance数据变动,事务执行失败!

# 窗口一 
127.0.0.1:6379> watch balance OK127.0.0.1:6379> MULTI # 执行完毕后,执行窗口二代码测试 
OK
127.0.0.1:6379> decrby balance 20 
QUEUED 
127.0.0.1:6379> incrby debt 20 
QUEUED 
127.0.0.1:6379> exec # 修改失败! 
(nil) 

# 窗口二 
127.0.0.1:6379> get balance 
"80" 
127.0.0.1:6379> set balance 200 
OK

# 窗口一:出现问题后放弃监视,然后重来! 
127.0.0.1:6379> UNWATCH # 放弃监视 
OK
127.0.0.1:6379> watch balance 
OK
127.0.0.1:6379> MULTI 
OK
127.0.0.1:6379> decrby balance 20 
QUEUED 
127.0.0.1:6379> incrby debt 20

QUEUED 127.0.0.1:6379> exec # 成功! 
1) (integer) 180 
2) (integer) 40

说明:

一但执行 EXEC 开启事务的执行后,无论事务使用执行成功, WARCH 对变量的监控都将被取消。

故当事务执行失败后,需重新执行WATCH命令对变量进行监控,并开启新的事务进行操作。

小结

watch指令类似于乐观锁,在事务提交时,如果watch监控的多个KEY中任何KEY的值已经被其他客户端更改,则使用EXEC执行事务时,事务队列将不会被执行,同时返回Nullmulti-bulk应答以通知调用者事务执行失败。

Redis发布订阅

是什么

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 客户端可以订阅任意数量的频道。

订阅/发布消息图:

img

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的

关系:

img

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

img

命令

这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。

img

测试

以下实例演示了发布订阅是如何工作的。在我们实例中我们创建了订阅频道名为redisChat

redis 127.0.0.1:6379> SUBSCRIBE redisChat 

Reading messages... (press Ctrl-C to quit) 
1) "subscribe" 
2) "redisChat" 
3) (integer) 1

现在,我们先重新开启个redis客户端,然后在同一个频道redisChat发布两次消息,订阅者就能接收到消息。

redis 127.0.0.1:6379> PUBLISH redisChat "Hello,Redis" 
(integer) 1 
redis 127.0.0.1:6379> PUBLISH redisChat "Hello,Kuangshen" 
(integer) 1 

# 订阅者的客户端会显示如下消息 
1) "message" 
2) "redisChat" 
3) "Hello,Redis" 
1) "message" 
2) "redisChat" 
3) "Hello,Kuangshen"

原理

Redis是使用C实现的,通过分析 Redis 源码里的pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。

Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。

通过 SUBSCRIBE 命令订阅某频道后,redis-server 里维护了一个字典,字典的键就是一个个 channel,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中。

通过 PUBLISH 命令向订阅者发送消息,redis-server 会使用给定的频道作为键,在它所维护的 channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。

Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。使用场景

Pub/Sub构建实时消息系统

Redis的Pub/Sub系统可以构建实时的消息系统

比如很多用Pub/Sub构建的实时聊天系统的例子。

Redis主从复制

概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave 以读为主。

默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

主从复制的作用主要包括:

1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。

2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。

3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。

4、高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。

一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的,原因如下:

1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较

大;

2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有

内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。

电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。

对于这种场景,我们可以使如下这种架构:

img

环境配置

基本配置

配从库不配主库,从库配置

slaveof 主库ip 主库端口 # 配置主从 
Info replication # 查看信息

每次与master断开之后,都需要重新连接,除非你配置进redis.conf文件

修改配置文件

准备工作:我们配置主从复制,至少需要三个,一主二从!配置三个客户端!

img

1、拷贝多个redis.conf文件

img

2、指定端口6379,依次类推

3、开启daemonize yes

4、Pid文件名字pidfile /var/run/redis_6379.pid,依此类推

5、Log文件名字logfile "6379.log",依次类推

6、Dump.rdb名字dbfilename dump6379.rdb,依次类推

img

上面都配置完毕后,3个服务通过3个不同的配置文件开启,我们的准备环境就OK 了!

img

一主二从

一主二从

1、环境初始化

img

默认三个都是Master 主节点

img

2、配置为一个Master两个Slave

img

3、在主机设置值,在从机都可以取到!从机不能写值!

img

测试一:主机挂了,查看从机信息,主机恢复,再次查看信息

测试二:从机挂了,查看主机信息,从机恢复,查看从机信息

层次链路

上一个Slave可以是下一个slave和Master,Slave同样可以接收其它slaves的连接和同步请求,那么该slave作为了链条中下一个的master,可以有效减轻master的写压力!

img

img

测试:6379设置值以后6380和6381都可以获取到!OK!

谋朝篡位

一主二从的情况下,如果主机断了,从机可以使用命令 SLAVEOF NO ONE 将自己改为主机!这个时候其余的从机链接到这个节点。对一个从属服务器执行命令 SLAVEOF NO ONE 将使得这个从属服务器关闭复制功能,并从从属服务器转变回主服务器,原来同步所得的数据集不会被丢弃。

img

主机再回来,也只是一个光杆司令了,从机为了正常使用跑到了新的主机上!

复制原理

Slave 启动成功连接到 master 后会发送一个sync命令

Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行

完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步

但是只要是重新连接master,一次完全同步(全量复制)将被自动执行

哨兵模式

概述

主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。

谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

img

这里的哨兵有两个作用

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。

  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。

img

假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

配置测试

1、调整结构,6379带着80、81

2、自定义的 /myredis 目录下新建 sentinel.conf 文件,名字千万不要错

3、配置哨兵,填写内容

  • sentinel monitor 被监控主机名字 127.0.0.1 6379 1

  • 上面最后一个数字1,表示主机挂掉后slave投票看让谁接替成为主机,得票数多少后成为主机

4、启动哨兵

  • Redis-sentinel /myredis/sentinel.conf

  • 上述目录依照各自的实际情况配置,可能目录不同

5、正常主从演示

6、原有的Master 挂了

7、投票新选

8、重新主从继续开工,info replication 查查看

9、问题:如果之前的master 重启回来,会不会双master 冲突? 之前的回来只能做小弟了

哨兵模式的优缺点

优点

  1. 哨兵集群模式是基于主从模式的,所有主从的优点,哨兵模式同样具有。
  2. 主从可以切换,故障可以转移,系统可用性更好。
  3. 哨兵模式是主从模式的升级,系统更健壮,可用性更高。

缺点

  1. Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。
  2. 实现哨兵模式的配置也不简单,甚至可以说有些繁琐

哨兵配置说明

# Example sentinel.conf 

# 哨兵sentinel实例运行的端口 默认26379 
port 26379 

# 哨兵sentinel的工作目录 
dir /tmp 

# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了 
# sentinel monitor <master-name> <ip> <redis-port> <quorum> 
sentinel monitor mymaster 127.0.0.1 6379 2 1234567891011121314

# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都 要提供密码 
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码 
# sentinel auth-pass <master-name> <password> 
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd

# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒 
# sentinel down-after-milliseconds <master-name> <milliseconds> sentinel down-after-milliseconds mymaster 30000 

# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同 步,
这个数字越小,完成failover所需的时间就越长, 
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。 
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。 
# sentinel parallel-syncs <master-name> <numslaves> 
sentinel parallel-syncs mymaster 1 
# 故障转移的超时时间 failover-timeout 可以用在以下这些方面: 
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。 
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的 master那里同步数据时。 
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超 时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了 
# 默认三分钟 
# sentinel failover-timeout <master-name> <milliseconds> 
sentinel failover-timeout mymaster 180000 

# SCRIPTS EXECUTION 

#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮 件通知相关人员。 
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10 
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。 
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。 
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执 行。

#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等 等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常 运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果 sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执 行的,否则sentinel无法正常启动成功。 
#通知脚本 
# sentinel notification-script <master-name> <script-path> 
sentinel notification-script mymaster /var/redis/notify.sh 

# 客户端重新配置主节点参数脚本 
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master 地址已经发生改变的信息。 
# 以下参数将会在调用脚本时传给脚本: 
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port> 
# 目前<state>总是“failover”, 
# <role>是“leader”或者“observer”中的一个。 
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的 slave)通信的

# 这个脚本应该是通用的,能被多次调用,不是针对性的。 
# sentinel client-reconfig-script <master-name> <script-path> 
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh 616263

缓存穿透和雪崩

Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。

缓存穿透

概念

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没用,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库,这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

解决方案

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;

img

缓存空对象

当存储层不命中,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源

img

但是这种方法会存在两个问题:

1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键

2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。

缓存击穿

概述

这里需要注意和缓存击穿的区别,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。

当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导致数据库瞬间压力过大。

解决方案

设置热点数据永不过期

从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。

加互斥锁

分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其它线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

缓存雪崩

概念

缓存雪崩,是指在某一个时间段,缓存集中过期失效。

产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

img

其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然

形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就

是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知

的,很有可能瞬间就把数据库压垮。

解决方案

redis高可用

这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续

工作,其实就是搭建的集群。

限流降级

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对

某个key只允许一个线程查询数据和写缓存,其他线程等待。

数据预热

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数

据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让

缓存失效的时间点尽量均匀。

Jedis

Jedis是Redis官方推荐的Java连接开发工具。要在Java开发中使用好Redis中间件,必须对Jedis熟悉才能

写成漂亮的代码

测试联通

1、新建一个普通的Maven项目

2、导入redis的依赖!

<!-- https://mvnrepository.com/artifact/redis.clients/jedis --> 
<dependency> 
<groupId>redis.clients</groupId> 
<artifactId>jedis</artifactId> 
<version>3.2.0</version> 
</dependency> 
<dependency> 
<groupId>com.alibaba</groupId> 
<artifactId>fastjson</artifactId> 
<version>1.2.58</version> 
</dependency>

3、编写测试代码

package com.kuang.ping; 
import redis.clients.jedis.Jedis; 
public class Ping { 
public static void main(String[] args) { 
Jedis jedis = new Jedis("127.0.0.1",6379); 
System.out.println("连接成功"); 
//查看服务是否运行 
System.out.println("服务正在运行: "+jedis.ping()); 
} 
}

4、启动redis服务

5、启动测试,结果

连接成功 
服务正在运行: PONG

常用API

基本操作

public class TestPassword { 
    public static void main(String[] args) { 
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        //验证密码,如果没有设置密码这段代码省略 
        // jedis.auth("password"); 
        
        jedis.connect(); //连接 
        jedis.disconnect(); //断开连接 
        jedis.flushAll(); //清空所有的key 
    } 
}

对key操作的命令

public class TestKey { 
    public static void main(String[] args) { 
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        
        System.out.println("清空数据:"+jedis.flushDB()); 
        System.out.println("判断某个键是否存在:"+jedis.exists("username")); 
        System.out.println("新增<'username','kuangshen'>的键值 对:"+jedis.set("username", "kuangshen")); 
        stem.out.println("新增<'password','password'>的键值 对:"+jedis.set("password", "password")); 
        System.out.print("系统中所有的键如下:"); 
        Set<String> keys = jedis.keys("*"); 
        System.out.println(keys); 
        System.out.println("删除键password:"+jedis.del("password")); 
        System.out.println("判断键password是否存 在:"+jedis.exists("password")); 
        System.out.println("查看键username所存储的值的类 型:"+jedis.type("username")); 
        System.out.println("随机返回key空间的一个:"+jedis.randomKey()); 
        System.out.println("重命名key:"+jedis.rename("username","name")); 
        System.out.println("取出改后的name:"+jedis.get("name")); 
        System.out.println("按索引查询:"+jedis.select(0)); 
        System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB()); 
        System.out.println("返回当前数据库中key的数目:"+jedis.dbSize()); 
        System.out.println("删除所有数据库中的所有key:"+jedis.flushAll()); 
    } 
}

对String操作的命令

public class TestString { 
    public static void main(String[] args) { 
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        
        jedis.flushDB(); 
        System.out.println("===========增加数据==========="); 
        System.out.println(jedis.set("key1","value1")); 
        System.out.println(jedis.set("key2","value2")); 
        System.out.println(jedis.set("key3", "value3"));
        System.out.println("删除键key2:"+jedis.del("key2")); 
        System.out.println("获取键key2:"+jedis.get("key2")); 
        System.out.println("修改key1:"+jedis.set("key1", "value1Changed")); 
        System.out.println("获取key1的值:"+jedis.get("key1")); 
        System.out.println("在key3后面加入值:"+jedis.append("key3", "End")); 
        System.out.println("key3的值:"+jedis.get("key3")); 
        System.out.println("增加多个键值 对:"+jedis.mset("key01","value01","key02","value02","key03","value03")); 
        System.out.println("获取多个键值 对:"+jedis.mget("key01","key02","key03")); 
        System.out.println("获取多个键值 对:"+jedis.mget("key01","key02","key03","key04")); 
        System.out.println("删除多个键值对:"+jedis.del("key01","key02")); 
        System.out.println("获取多个键值 对:"+jedis.mget("key01","key02","key03")); 
        jedis.flushDB(); 
        System.out.println("===========新增键值对防止覆盖原先值=============="); 
        System.out.println(jedis.setnx("key1", "value1")); 
        System.out.println(jedis.setnx("key2", "value2")); 
        System.out.println(jedis.setnx("key2", "value2-new")); 
        System.out.println(jedis.get("key1")); 
        System.out.println(jedis.get("key2")); 
        System.out.println("===========新增键值对并设置有效时间============="); 
        System.out.println(jedis.setex("key3", 2, "value3")); 
        System.out.println(jedis.get("key3")); 
        try {
            TimeUnit.SECONDS.sleep(3); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        }
        
        System.out.println(jedis.get("key3")); 
        
        System.out.println("===========获取原值,更新为新值=========="); 
        System.out.println(jedis.getSet("key2", "key2GetSet")); 
        System.out.println(jedis.get("key2")); 
        System.out.println("获得key2的值的字串:"+jedis.getrange("key2", 2, 4));} }

对List操作命令

public class TestList { 
    public static void main(String[] args) { 
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        jedis.flushDB(); 
        System.out.println("===========添加一个list==========="); 
        jedis.lpush("collections", "ArrayList", "Vector", "Stack", "HashMap", "WeakHashMap", "LinkedHashMap"); 
        jedis.lpush("collections", "HashSet"); 
        jedis.lpush("collections", "TreeSet"); 
        jedis.lpush("collections", "TreeMap"); 
        System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));//-1代表倒数第一个元素,-2代表倒数第二个元素,end为-1表示查询全部
        System.out.println("collections区间0-3的元 素:"+jedis.lrange("collections",0,3)); 
        System.out.println("==============================="); 
        // 删除列表指定的值 ,第二个参数为删除的个数(有重复时),后add进去的值先被删,类 似于出栈 
        System.out.println("删除指定元素个数:"+jedis.lrem("collections", 2, "HashMap")); 
        System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
        System.out.println("删除下表0-3区间之外的元 素:"+jedis.ltrim("collections", 0, 3)); 
        System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
        System.out.println("collections列表出栈(左 端):"+jedis.lpop("collections")); 
        System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
        System.out.println("collections添加元素,从列表右端,与lpush相对 应:"+jedis.rpush("collections", "EnumMap")); 
        System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
        System.out.println("collections列表出栈(右 端):"+jedis.rpop("collections")); 
        System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
        System.out.println("修改collections指定下标1的内 容:"+jedis.lset("collections", 1, "LinkedArrayList")); 
        System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
        System.out.println("==============================="); 
        System.out.println("collections的长度:"+jedis.llen("collections")); 
        System.out.println("获取collections下标为2的元 素:"+jedis.lindex("collections", 2)); 
        System.out.println("==============================="); jedis.lpush("sortedList", "3","6","2","0","7","4"); 
        System.out.println("sortedList排序前:"+jedis.lrange("sortedList", 0, -1)); 
        System.out.println(jedis.sort("sortedList")); 
        System.out.println("sortedList排序后:"+jedis.lrange("sortedList", 0, -1));
    } 
}

对Set的操作命令

public class TestSet { 
    public static void main(String[] args) { 
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        jedis.flushDB(); 
        System.out.println("============向集合中添加元素(不重复) ============"); 
        System.out.println(jedis.sadd("eleSet", "e1","e2","e4","e3","e0","e8","e7","e5")); 
        System.out.println(jedis.sadd("eleSet", "e6")); 
        System.out.println(jedis.sadd("eleSet", "e6")); 
        System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet")); 
        System.out.println("删除一个元素e0:"+jedis.srem("eleSet", "e0"));
        System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet")); 
        System.out.println("删除两个元素e7和e6:"+jedis.srem("eleSet", "e7","e6")); 
        System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet")); 
        System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet")); 
        System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet")); 
        System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet")); 
        System.out.println("eleSet中包含元素的个数:"+jedis.scard("eleSet")); 
        System.out.println("e3是否在eleSet中:"+jedis.sismember("eleSet", "e3")); 
        System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e1")); 
        System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e5")); 
        System.out.println("================================="); 
        System.out.println(jedis.sadd("eleSet1", "e1","e2","e4","e3","e0","e8","e7","e5")); 
        System.out.println(jedis.sadd("eleSet2", "e1","e2","e4","e3","e0","e8")); 
        System.out.println("将eleSet1中删除e1并存入eleSet3 中:"+jedis.smove("eleSet1", "eleSet3", "e1"));//移到集合元素 
        System.out.println("将eleSet1中删除e2并存入eleSet3 中:"+jedis.smove("eleSet1", "eleSet3", "e2")); 
        System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1")); 
        System.out.println("eleSet3中的元素:"+jedis.smembers("eleSet3")); 
        System.out.println("============集合运算================="); 
        System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1")); 
        System.out.println("eleSet2中的元素:"+jedis.smembers("eleSet2")); 
        System.out.println("eleSet1和eleSet2的交 集:"+jedis.sinter("eleSet1","eleSet2")); 
        System.out.println("eleSet1和eleSet2的并 集:"+jedis.sunion("eleSet1","eleSet2")); 
        System.out.println("eleSet1和eleSet2的差 集:"+jedis.sdiff("eleSet1","eleSet2"));//eleSet1中有,eleSet2中没有 
        jedis.sinterstore("eleSet4","eleSet1","eleSet2");//求交集并将交集保存到 dstkey的集合
        System.out.println("eleSet4中的元素:"+jedis.smembers("eleSet4")); 
    } 
}

对Hash的操作命令

public class TestHash { 
    public static void main(String[] args) { 
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        jedis.flushDB(); 
        Map<String,String> map = new HashMap<>(); 
        map.put("key1","value1"); 
        map.put("key2","value2"); 
        map.put("key3","value3"); 
        map.put("key4","value4"); 
        //添加名称为hash(key)的hash元素 
        jedis.hmset("hash",map); 
        //向名称为hash的hash中添加key为key5,value为value5元素 
        jedis.hset("hash", "key5", "value5"); 
        System.out.println("散列hash的所有键值对 为:"+jedis.hgetAll("hash"));//return Map<String,String>
      System.out.println("散列hash的所有键为:"+jedis.hkeys("hash"));//return Set<String> 
        System.out.println("散列hash的所有值为:"+jedis.hvals("hash"));//return List<String> 
        System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加 key6:"+jedis.hincrBy("hash", "key6", 6)); 
        System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash")); 
        System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加 key6:"+jedis.hincrBy("hash", "key6", 3)); 
        System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash")); 
        System.out.println("删除一个或者多个键值对:"+jedis.hdel("hash", "key2"));
        System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash")); 
        System.out.println("散列hash中键值对的个数:"+jedis.hlen("hash")); 
        System.out.println("判断hash中是否存在 key2:"+jedis.hexists("hash","key2")); 
        System.out.println("判断hash中是否存在 key3:"+jedis.hexists("hash","key3")); 
        System.out.println("获取hash中的值:"+jedis.hmget("hash","key3")); 
        System.out.println("获取hash中的 值:"+jedis.hmget("hash","key3","key4")); 
    } 
}  

事务

基本操作

package com.kuang.multi; 
import com.alibaba.fastjson.JSONObject; 
import redis.clients.jedis.Jedis; 
import redis.clients.jedis.Transaction; 
public class TestMulti { 
    public static void main(String[] args) { 
        //创建客户端连接服务端,redis服务端需要被开启 
        Jedis jedis = new Jedis("127.0.0.1", 6379); 
        jedis.flushDB(); JSONObject 
            jsonObject = new JSONObject(); 
        jsonObject.put("hello", "world"); 
        jsonObject.put("name", "java"); 
        //开启事务 
        Transaction multi = jedis.multi(); 
        String result = jsonObject.toJSONString(); 
        try{
            //向redis存入一条数据 
            multi.set("json", result); 
            //再存入一条数据 
            multi.set("json2", result); 
            //这里引发了异常,用0作为被除数 
            int i = 100/0; 
            //如果没有引发异常,执行进入队列的命令
            multi.exec(); 
        }catch(Exception e){ 
            e.printStackTrace(); 
            //如果出现异常,回滚 
            multi.discard(); 
        }finally{ 
            System.out.println(jedis.get("json")); 
            System.out.println(jedis.get("json2")); 
            //最终关闭客户端 
            jedis.close(); 
        } 
    } 
}

SpringBoot整合

基础使用

概述

在SpringBoot中一般使用RedisTemplate提供的方法来操作Redis。那么使用SpringBoot整合Redis需要那些步骤呢。

1、 JedisPoolConfig(这个是配置连接池)

2、 RedisConnectionFactory这个是配置连接信息,这里的RedisConnectionFactory是一个接口,我们需要使用它的实现类,在SpringD Data Redis方案中提供了以下四种工厂模型:

  • JredisConnectionFactory
  • JedisConnectionFactory
  • LettuceConnectionFactory
  • SrpConnectionFactory

3、 RedisTemplate基本操作

导入依赖

<dependency> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-data-redis</artifactId> 
</dependency>

yaml配置

spring: 
	redis: 
    host: 127.0.0.1 
    port: 6379 
    password: 123456 
    jedis: 
		pool: 
            max-active: 8 
            max-wait: -1ms 
            max-idle: 500 min-idle: 0 
	lettuce: 
		shutdown-timeout: 0ms

测试

@SpringBootTest 
class SpringbootRedisApplicationTests { 
    @Autowired 
    private RedisTemplate<String,String> redisTemplate; 
    @Test 
    void contextLoads() { 
        redisTemplate.opsForValue().set("myKey","myValue"); 
        System.out.println(redisTemplate.opsForValue().get("myKey")); 
    } 
}

封装工具类

1、新建一个SpringBoot项目

2、导入redis的启动器

<dependency> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-data-redis</artifactId> 
</dependency>

3、配置redis,可以查看 RedisProperties 分析

# Redis服务器地址 
spring.redis.host=127.0.0.1 
# Redis服务器连接端口 
spring.redis.port=6379

4、分析 RedisAutoConfifiguration 自动配置类

@Configuration(proxyBeanMethods = false) 
@ConditionalOnClass(RedisOperations.class) 
@EnableConfigurationProperties(RedisProperties.class)

@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class }) 
public class RedisAutoConfiguration { 
    @Bean 
    @ConditionalOnMissingBean(name = "redisTemplate") 
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { 
        RedisTemplate<Object, Object> template = new RedisTemplate<>(); 
        template.setConnectionFactory(redisConnectionFactory); 
        return template; 
    }
    @Bean 
    @ConditionalOnMissingBean 
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { 
        StringRedisTemplate template = new StringRedisTemplate(); 
        template.setConnectionFactory(redisConnectionFactory); 
        return template; 
    } 
}

通过源码可以看出,SpringBoot自动帮我们在容器中生成了一个RedisTemplate和一个StringRedisTemplate。

但是,这个RedisTemplate的泛型是<Object,Object>,写代码不方便,需要写好多类型转换的代码;我们需要一个泛型为<String,Object>形式的RedisTemplate。

并且,这个RedisTemplate没有设置数据存在Redis时,key及value的序列化方式。

看到这个@ConditionalOnMissingBean注解后,就知道如果Spring容器中有了RedisTemplate对象了,这个自动配置的RedisTemplate不会实例化。因此我们可以直接自己写个配置类,配置RedisTemplate。

5、既然自动配置不好用,就重新配置一个RedisTemplate

package com.kuang.config; 
import com.fasterxml.jackson.annotation.JsonAutoDetect; 
import com.fasterxml.jackson.annotation.PropertyAccessor; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.data.redis.connection.RedisConnectionFactory; 
import org.springframework.data.redis.core.RedisTemplate; 
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; 
import org.springframework.data.redis.serializer.StringRedisSerializer; 
@Configuration 
public class RedisConfig { 
    @Bean 
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { 
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); 
        template.setConnectionFactory(factory); 
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); 
        ObjectMapper om = new ObjectMapper(); 
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); 
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); 
        jackson2JsonRedisSerializer.setObjectMapper(om); 
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); 
        // key采用String的序列化方式 
        template.setKeySerializer(stringRedisSerializer); 
        // hash的key也采用String的序列化方式 
        template.setHashKeySerializer(stringRedisSerializer); 
        // value序列化方式采用
        jackson template.setValueSerializer(jackson2JsonRedisSerializer); 
        // hash的value序列化方式采用jackson 
        template.setHashValueSerializer(jackson2JsonRedisSerializer); 
        template.afterPropertiesSet(); 
        return template; 
    } 
}

6、写一个Redis工具类(直接用RedisTemplate操作Redis,需要很多行代码,因此直接封装好一个RedisUtils,这样写代码更方便点。这个RedisUtils交给Spring容器实例化,使用时直接注解注入。)

package com.kuang.utils; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.data.redis.core.RedisTemplate; 
import org.springframework.stereotype.Component; 
import org.springframework.util.CollectionUtils; 

import java.util.List; 
import java.util.Map; 
import java.util.Set; 
import java.util.concurrent.TimeUnit; 

@Component 
public final class RedisUtil { 
    @Autowired 
    private RedisTemplate<String, Object> redisTemplate; 
    
    // =============================common============================ /*** 指定缓存失效时间 * @param key 键 * @param time 时间(秒) */ 
    public boolean expire(String key, long time) { 
        try {
            if (time > 0) { 
                redisTemplate.expire(key, time, TimeUnit.SECONDS); 
            }
            return true; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    /**
    * 根据key 获取过期时间 
    * @param key 键 不能为null 
    * @return 时间(秒) 返回0代表为永久有效 
    */ 
    public long getExpire(String key) { 
        return redisTemplate.getExpire(key, TimeUnit.SECONDS); 
    }
    
    /**
    * 判断key是否存在 
    * @param key 键 
    * @return true 存在 false不存在 
    */ 
    public boolean hasKey(String key) { 
        try {
            return redisTemplate.hasKey(key); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * 删除缓存 
    * @param key 可以传一个值 或多个 
    */ 
    @SuppressWarnings("unchecked") 
    public void del(String... key) { 
        if (key != null && key.length > 0) { 
            if (key.length == 1) { 
                redisTemplate.delete(key[0]); 
            } else { 
                redisTemplate.delete(CollectionUtils.arrayToList(key)); 
            } 
        } 
    }
    
    // ============================String============================= 
    
    /**
    * 普通缓存获取 
    * @param key 键 
    * @return 值 
    */
    
    public Object get(String key) { 
        return key == null ? null : redisTemplate.opsForValue().get(key); 
    }
    
    /**
    * 普通缓存放入
    * @param key 键 
    * @param value 值 
    * @return true成功 false失败 
    */ 
    
    public boolean set(String key, Object value) { 
        try {
            redisTemplate.opsForValue().set(key, value); 
            return true; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * 普通缓存放入并设置时间 
    * @param key 键 
    * @param value 值 
    * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 
    * @return true成功 false 失败 
    */ 
    public boolean set(String key, Object value, long time) { 
        try {
            if (time > 0) { 
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); 
            } else { 
                set(key, value);
            }
            return true; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * 递增 
    * @param key 键 
    * @param delta 要增加几(大于0)
    */ 
    public long incr(String key, long delta) { 
        if (delta < 0) { 
            throw new RuntimeException("递增因子必须大于0"); }
        return redisTemplate.opsForValue().increment(key, delta); 
    }
    
    /**
    * 递减 
    * @param key 键 
    * @param delta 要减少几(小于0) 
    */ 
    public long decr(String key, long delta) { 
        if (delta < 0) { 
            throw new RuntimeException("递减因子必须大于0"); 
        }
        return redisTemplate.opsForValue().increment(key, -delta); 
    }
    
    // ================================Map================================= 
    
    /**
    * HashGet
    * @param key 键 不能为null 
    * @param item 项 不能为null 
    */ public Object hget(String key, String item) { 
        return redisTemplate.opsForHash().get(key, item); }
    
    /**
    * 获取hashKey对应的所有键值 
    * @param key 键 
    * @return 对应的多个键值 
    */ 
    public Map<Object, Object> hmget(String key) { 
        return redisTemplate.opsForHash().entries(key); 
    }
    
    /**
    * HashSet 
    * @param key 键 
    * @param map 对应多个键值 
    */ 
    public boolean hmset(String key, Map<String, Object> map) { 
        try {
            redisTemplate.opsForHash().putAll(key, map); 
            return true; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * HashSet 并设置时间
    * @param key 键 
    * @param map 对应多个键值 
    * @param time 时间(秒) 
    * @return true成功 false失败 
    */ 
    public boolean hmset(String key, Map<String, Object> map, long time) {
        
        try {
            redisTemplate.opsForHash().putAll(key, map); 
            if (time > 0) { 
                expire(key, time); 
            }
            return true; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false;
        } 
    }
    
    /**
    * 向一张hash表中放入数据,如果不存在将创建 
    *
    * @param key 键 
    * @param item 项 
    * @param value 值 
    * @return true 成功 false失败
    */ 
    public boolean hset(String key, String item, Object value) { 
        try {
            redisTemplate.opsForHash().put(key, item, value); 
            return true; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * 向一张hash表中放入数据,如果不存在将创建 
    *
    * @param key 键 
    * @param item 项 
    * @param value 值 
    * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 
    * @return true 成功 false失败 
    */ 
    public boolean hset(String key, String item, Object value, long time) { 
        try {
            redisTemplate.opsForHash().put(key, item, value); 
            if (time > 0) { 
                expire(key, time); 
            }
            return true; } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * 删除hash表中的值 
    *
    * @param key 键 不能为null
    
    * @param item 项 可以使多个 不能为null 
    */ 
    public void hdel(String key, Object... item) { 
        redisTemplate.opsForHash().delete(key, item); 
    }
    
    /**
    * 判断hash表中是否有该项的值 
    *
    * @param key 键 不能为null 
    * @param item 项 不能为null 
    * @return true 存在 false不存在 
    */ 
    public boolean hHasKey(String key, String item) { 
        return redisTemplate.opsForHash().hasKey(key, item); 
    }
    
    /**
    * hash递增 如果不存在,就会创建一个 并把新增后的值返回 
    *
    * @param key 键 
    * @param item 项 
    * @param by 要增加几(大于0) 
    */ 
    public double hincr(String key, String item, double by) { 
        return redisTemplate.opsForHash().increment(key, item, by); 
    }
    
    /**
    * hash递减 
    *
    * @param key 键 
    * @param item 项 
    * @param by 要减少记(小于0) 
    */ 
    public double hdecr(String key, String item, double by) { 
        return redisTemplate.opsForHash().increment(key, item, -by); 
    }
    
    /**
    * 根据value从一个set中查询,是否存在 
    *
    * @param key 键 
    * @param value 值 
    * @return true 存在 false不存在 
    */ 
    public boolean sHasKey(String key, Object value) { 
        try {
            return redisTemplate.opsForSet().isMember(key, value); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * 将数据放入set缓存 
    *
    * @param key 键 
    * @param values 值 可以是多个
    * @return 成功个数 
    */ 
    public long sSet(String key, Object... values) { 
        try {
            return redisTemplate.opsForSet().add(key, values); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return 0; 
        } 
    }
    
    /**
    * 将set数据放入缓存 
    *
    * @param key 键 
    * @param time 时间(秒) 
    * @param values 值 可以是多个 
    * @return 成功个数 
    */ 
    public long sSetAndTime(String key, long time, Object... values) { 
        try {
            Long count = redisTemplate.opsForSet().add(key, values); 
            if (time > 0) 
                expire(key, time); 
            return count; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return 0; 
        } 
    }
    
    /**
    * 获取set缓存的长度
    *
    * @param key 键 
    */
    public long sGetSetSize(String key) { 
        try {
            return redisTemplate.opsForSet().size(key); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return 0; 
        } 
    }
    
    /**
    * 移除值为value的 
    *
    * @param key 键 
    * @param values 值 可以是多个 
    * @return 移除的个数 
    */
    
    public long setRemove(String key, Object... values) { 
        try {
            Long count = redisTemplate.opsForSet().remove(key, values); 
            return count; 
        } 
        catch (Exception e) { 
            e.printStackTrace(); 
            return 0; 
        } 
    }
    
    // ===============================list================================= 
    
    /**
    * 获取list缓存的内容 
    *
    * @param key 键 
    * @param start 开始 
    * @param end 结束 0 到 -1代表所有值 
    */ 
    public List<Object> lGet(String key, long start, long end) { 
        try {
            return redisTemplate.opsForList().range(key, start, end); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return null; 
        } 
    }
    
    /**
    * 获取list缓存的长度 
    *
    * @param key 键 
    */ 
    public long lGetListSize(String key) { 
        try {
            return redisTemplate.opsForList().size(key);
          } catch (Exception e) { 
            e.printStackTrace(); 
            return 0; 
        } 
    }
    
    /**
    * 通过索引 获取list中的值 
    *
    * @param key 键 
    * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0 时,-1,表尾,-2倒数第二个元素,依次类推 
    */ 
    public Object lGetIndex(String key, long index) { 
        try {
            return redisTemplate.opsForList().index(key, index); 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return null; 
        }
    }  
    
    /**
    * 将list放入缓存 
    *
    * @param key 键 
    * @param value 值 
    */ 
    public boolean lSet(String key, Object value) { 
        try {
            redisTemplate.opsForList().rightPush(key, value); 
            return true; } 
        catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * 将list放入缓存 
    * @param key 键 
    * @param value 值
    * @param time 时间(秒) 
    */
    public boolean lSet(String key, Object value, long time) { 
        try {
            redisTemplate.opsForList().rightPush(key, value); 
            if (time > 0) 
                expire(key, time); 
            return true;
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false;
        }
	}
    
    /**
    * 将list放入缓存
    *
    * @param key 键 
    * @param value 值 
    * @return 
    */ 
    public boolean lSet(String key, List<Object> value) { 
        try {
            redisTemplate.opsForList().rightPushAll(key, value); 
            return true; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false; 
        } 
    }
    
    /**
    * 将list放入缓存 
    *
    * @param key 键 
    * @param value 值 
    * @param time 时间(秒) 
    * @return 
    */ 
    public boolean lSet(String key, List<Object> value, long time) { 
        try {
            redisTemplate.opsForList().rightPushAll(key, value); 
            if (time > 0) 
                expire(key, time); return true; 
        } catch (Exception e) { 
            e.printStackTrace(); return false; 
        } 
    }
    
    /**
    * 根据索引修改list中的某条数据 
    *
    * @param key 键 
    * @param index 索引 
    * @param value 值 
    * @return 
    */ 
    public boolean lUpdateIndex(String key, long index, Object value) { 
        try {
            redisTemplate.opsForList().set(key, index, value); 
            ren true; 
        } catch (Exception e) { 
            e.printStackTrace(); 
            return false;
            
        } 
    }
    
    /**
    * 移除N个值为value 
    *
    * @param key 键 
    * @param count 移除多少个 
    * @param value 值 
    * @return 移除的个数 
    */ 
    public long lRemove(String key, long count, Object value) { 
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value); 
            return remove; 
        } catch (Exception e) { 
            e.printStackTrace(); return 0; 
        } 
    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-20 15:51:25  更:2021-09-20 15:52:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 11:56:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码