IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Redis -> 正文阅读

[大数据]Redis

Redis

简介

Redis:REmote DIctionary Server(远程字典服务器)

是完全开源免费的,用C语言编写的,遵守BSD协议,是一个高性能的(Key/Value)分布式内存数据库,基于内存运行,并支持持久化的NoSQL数据库,是当前最热门的NoSQL数据库之一,也被人们称为数据结构服务器

Redis与其他key-value缓存产品有以下三个特点

  • Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用。
  • Redis不仅仅支持简单的 key-value 类型的数据,同时还提供list、set、zset、hash等数据结构的存储。
  • Redis支持数据的备份,即master-slave模式的数据备份。

Redis能干嘛

内存存储和持久化:redis支持异步将内存中的数据写到硬盘上,同时不影响继续服务取最新N个数据的操作,如:可以将最新的10条评论的ID放在Redis的List集合里面
发布、订阅消息系统
地图信息分析
定时器、计数器

特性

数据类型、基本操作和配置
持久化和复制,RDB、AOF
事务的控制

基础命令

# 【shell】启动redis服务 
[root@192 bin] cd /usr/local/bin 
root@192 bin] redis-server rconfig/redis.conf

# redis客户端连接===> 观察地址的变化,如果连接ok,是直接连上的,redis默认端口号 6379 
[root@192 bin] redis-cli -p 6379 
127.0.0.1:6379> ping 
PONG

# 【shell】ps显示系统当前进程信息 
[root@192 myredis] ps -ef|grep redis

# 【redis】关闭连接 
127.0.0.1:6379> shutdown 
not connected> exit


Select命令切换数据库
Dbsize查看当前数据库的key的数量
Flushdb:清空当前库    ==================
Flushall:清空全部的库    ==================

五大基本数据类型

Redis键(key)

# keys * 查看所有的key 
127.0.0.1:6379> set name qinjiang 
OK
127.0.0.1:6379> keys * 
1) "name" 
# exists key 的名字,判断某个key是否存在 
127.0.0.1:6379> EXISTS name 
(integer) 1 
# move key db ---> 当前库就没有了,被移除了 
127.0.0.1:6379> move name 1 
(integer) 1 
127.0.0.1:6379> keys * 
(empty list or set) 
# expire key 秒钟:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删 除。
# ttl key 查看还有多少秒过期,-1 表示永不过期,-2 表示已过期 
127.0.0.1:6379> set name qinjiang 
OK
127.0.0.1:6379> EXPIRE name 10   #设置name10秒过期

127.0.0.1:6379> ttl name #查看name还有几秒过期

# type key 查看你的key是什么类型
127.0.0.1:6379> type name
string

String

127.0.0.1:6379> del key1 # 删除key

127.0.0.1:6379> exists key1 # 查看 key1 存不存在

127.0.0.1:6379> append key1 "hello" #往key1后边追加值,对不存在的 key 进行 APPEND ,等同于 SET key1 "hello"
(integer) 10 # 返回增加完后的字符串长度

127.0.0.1:6379> STRLEN key1 # # 获取字符串的长度
(integer) 10 # 返回增加完后的字符串长度 

# =================================================== 
# incr、decr 一定要是数字才能进行加减,+1 和 -1。 
# incrby、decrby 命令将 key 中储存的数字加上指定的增量值。 
# ===================================================
127.0.0.1:6379> set views 0 # 设置浏览量为0
127.0.0.1:6379> incr views # 浏览 + 1
(integer) 1
127.0.0.1:6379> decr views # 浏览 - 1
(integer) 1
# +10 
127.0.0.1:6379> incrby views 10 1
(integer) 11 
# -10 
127.0.0.1:6379> decrby views 10 
(integer) 1

# =================================================== 
# range [范围] 
# getrange 获取指定区间范围内的值,类似between...and的关系,从零到负一表示全部 
# ===================================================
127.0.0.1:6379> getrange key2 0 -1 # 截取全部字符串
127.0.0.1:6379> getrange key2 0 2 # 截取部分字符串

# =================================================== 
# setrange 设置指定区间范围内的值,格式是setrange key值 具体值
# ===================================================
127.0.0.1:6379> SETRANGE key2 1 xx # 替换指定位置开始的字符串

# =================================================== 
# setex(set with expire)设置过期时间
# setnx(set if not exist) 不存在设置
# ===================================================
127.0.0.1:6379> setex key3 60 expire # 设置过期时间

127.0.0.1:6379> setnx mykey "redis" # 如果不存在就创建,成功返回1

127.0.0.1:6379> setnx mykey "mongodb" # 如果存在就不创建,失败返回0

# =================================================== 
# mset Mset 命令用于同时设置一个或多个 key-value 对。 
# mget Mget 命令返回所有(一个或多个)给定 key 的值。 
# 如果给定的 key 里面,有某个 key 不存在,那么这个 key 返回特殊值 nil 。 
# msetnx 当所有 key 都成功设置,返回 1 。 
# 如果所有给定 key 都设置失败(至少有一个 key 已经存在),那么返回 0 。原子操 作# ===================================================
127.0.0.1:6379> mset k10 v10 k11 v11ms  k12 v12 #创建多个
127.0.0.1:6379> mget k10 k11 k12 k13 #获取多个

127.0.0.1:6379> msetnx k10 v10 k15 v15 # 原子性操作! 如果不存就创建,只要有一个存在就失败

# 传统对象缓存 
set user:1 value(json数据)
# 可以用来缓存对象 
mset user:1:name zhangsan user:1:age 2 
mget user:1:name user:1:age

# =================================================== 
# getset(先get再set)
# ===================================================
127.0.0.1:6379> getset db mongodb # 没有旧值,返回 nil 
(nil) 
127.0.0.1:6379> get db 
"mongodb" 
127.0.0.1:6379> getset db redis # 返回旧值 mongodb 
"mongodb" 
127.0.0.1:6379> get db 
"redis"

List

所有的 List 命令都是 L 开头的

# ===================================================
# Lpush:将一个或多个值插入到列表头部。(左)
# rpush:将一个或多个值插入到列表尾部。(右)
# lrange:返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。
# 其中 0 表示列表的第一个元素, 1 表示列表的第二个元素,以此类推。 
# 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此 类推。
# ===================================================
127.0.0.1:6379> LPUSH list "one" #默认存值是先进后出的
(integer) 1 
127.0.0.1:6379> LPUSH list "two" 
(integer) 2
127.0.0.1:6379> RPUSH list "right"  #在最后添加值
(integer) 3
127.0.0.1:6379> Lrange list 0 -1 #获取List全部的值
1) "two" 
2) "one" 
3) "right"

# ===================================================
# lpop 命令用于移除并返回列表的第一个元素。当列表 key 不存在时,返回 nil 。 
# rpop 移除列表的最后一个元素,返回值为移除的元素。 
# ===================================================
127.0.0.1:6379> Lpop list #移除list第一个元素
"two" 
127.0.0.1:6379> Rpop list #移除list最后一个元素
"right" 
127.0.0.1:6379> Lrange list 0 -1 1) #移除list全部元素
"one"

# ===================================================
# Lindex,按照索引下标获得元素(-1代表最后一个,0代表是第一个) 
# ===================================================
127.0.0.1:6379> Lindex list 1 
(nil) 
127.0.0.1:6379> Lindex list 0 #表示根据下标获取值
"one" 
127.0.0.1:6379> Lindex list -1 
"one"

# =================================================== 
# llen 用于返回列表的长度。 
# ===================================================
127.0.0.1:6379> Llen list # 返回列表的长度 
(integer) 3

# =================================================== 
# lrem key 根据参数 COUNT 的值,移除列表中与参数 VALUE 相等的元素。
# ===================================================
127.0.0.1:6379> lrem list 1 "two"   #移除一个 "two" 从上往下
(integer) 1 

# ===================================================
# Ltrim key 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区 间之内的元素都将被删除。 
# ===================================================
127.0.0.1:6379> ltrim mylist 1 2 #根据下标截取中间的值,然后值就替换成截取后的值

# =================================================== 
# rpoplpush 移除列表的最后一个元素,并将该元素添加到另一个列表并返回。 
# ===================================================
127.0.0.1:6379> rpoplpush mylist myotherlist #移除mylist中最后一个元素,转移到myotherlist列表中

# ===================================================
# lset key index value 将列表 key 下标为 index 的元素的值设置为 value 。 
# ===================================================
127.0.0.1:6379> exists list # 对空列表(key 不存在)进行 LSET 
127.0.0.1:6379> lpush list "value1" # 对非空列表进行 LSET 
(integer) 1 
127.0.0.1:6379> lrange list 0 0 
1) "value1" 
127.0.0.1:6379> lset list 0 "new" # 更新值 
OK
127.0.0.1:6379> lrange list 0 0 
1) "new"

# ===================================================
# linsert key before/after pivot value 用于在列表的元素前或者后插入元素。
# 将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。
# ===================================================
redis> LINSERT mylist BEFORE "World" "There"  #往"World"前面插入"There"
(integer) 3 

Set(集合)

127.0.0.1:6379> sadd myset "hello" #set中存值
(integer) 1 
127.0.0.1:6379> sadd myset "kuangshen" 
(integer) 1 
127.0.0.1:6379> sadd myset "kuangshen" #值不能重复
(integer) 0 
127.0.0.1:6379> SMEMBERS myset  #smembers查看set所有的值
1) "kuangshen" 
2) "hello" 
127.0.0.1:6379> SISMEMBER myset "hello"  #sismember查看set中有没有这个值
(integer) 1 
127.0.0.1:6379> SISMEMBER myset "world" 
(integer) 0

127.0.0.1:6379> scard myset #获取set中值的个数

127.0.0.1:6379> srem myset "hello" #移除set中的某一个元素

127.0.0.1:6379> SRANDMEMBER myset #随机得到一个值
127.0.0.1:6379> SRANDMEMBER myset 2 #随机得到多个值

127.0.0.1:6379> spop myset #随机删除一个元素

127.0.0.1:6379> smove myset myset2 "kuangshen" #移动集合中指定元素到另外一个集合中

127.0.0.1:6379> sadd key1 "a" 
(integer) 1 
127.0.0.1:6379> sadd key1 "b" 
(integer) 1 
127.0.0.1:6379> sadd key1 "c" 
(integer) 1 
127.0.0.1:6379> sadd key2 "c" 
(integer) 1 
127.0.0.1:6379> sadd key2 "d" 
(integer) 1 
127.0.0.1:6379> sadd key2 "e" 
(integer) 1 
127.0.0.1:6379> SDIFF key1 key2 # 差集 
1) "a" 
2) "b" 
127.0.0.1:6379> SINTER key1 key2 # 交集 
1) "c" 
127.0.0.1:6379> SUNION key1 key2 # 并集
#输出两个集合中所有不重复的元素

Hash(哈希)

key-! 这时的value中的值为map集合!

127.0.0.1:6379> hset myhash field1 "kuangshen" #在myhash中存一个map集合
127.0.0.1:6379> hget myhash field1 #得到值

127.0.0.1:6379> HMSET myhash field1 "Hello" field2 "World" #存多个值
127.0.0.1:6379> HDEL myhash field1 #删除指定的key

127.0.0.1:6379> hgetall myhash #获取所有的值

127.0.0.1:6379> hlen myhash #获取长度

127.0.0.1:6379> hexists myhash field1 # hexists 查看哈希表的指定字段是否存在。

127.0.0.1:6379> HKEYS myhash  #获取myhash中的所有key
1) "field2" 
2) "field1"
127.0.0.1:6379> HVALS myhash  #获取myhash中的所有value
1) "World" 
2) "Hello"

# =================================================== 
# hincrby 为哈希表中的字段值加上指定增量值。
# =================================================== 
127.0.0.1:6379> hset myhash field 5 
(integer) 1 
127.0.0.1:6379> HINCRBY myhash field 1 
(integer) 6 
127.0.0.1:6379> HINCRBY myhash field -1 
(integer) 5

Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。存储部分变更的数据,如用户信息等。

Zset(有序集合)

127.0.0.1:6379> zadd myset 1 "one"  #添加一个值
(integer) 1 
127.0.0.1:6379> zadd myset 2 "two" 3 "three"  #添加多个值
(integer) 2

127.0.0.1:6379> zadd salary 2500 xiaoming (integer) 1 
127.0.0.1:6379> zadd salary 5000 xiaohong (integer) 1 
127.0.0.1:6379> zadd salary 500 kuangshen (integer) 1 
# Inf无穷大量+∞,同样地,-∞可以表示为-Inf。 
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf # 显示整个有序集,从小到大

127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores # 递增排列

127.0.0.1:6379> ZREVRANGE salary 0 -1 WITHSCORES # 从大到小

127.0.0.1:6379> ZRANGEBYSCORE salary -inf 2500 WITHSCORES # 显示工资 <=2500 的所有成员

127.0.0.1:6379> zrem salary kuangshen #移除指定元素
127.0.0.1:6379> ZRANGE salary 0 -1 #移除全部元素

127.0.0.1:6379> zcard salary #获取集合中的个数

127.0.0.1:6379> ZCOUNT myset 1 3  #获取区间值

三种特殊数据类型

Geospatial 地理位置

Redis 的 GEO 特性在 Redis 3.2 版本中推出, 这个功能可以将用户给定的地理位置信息储存起来, 并对这些信息进行操作。来实现诸如附近位置、摇一摇这类依赖于地理位置信息的功能。geo的数据类型为zset。

GEO 的数据结构总共有六个常用命令:geoadd、geopos、geodist、georadius、georadiusbymember、gethash

1、geoadd # 语法 
geoadd key longitude latitude member ...
# 将给定的空间元素(纬度、经度、名字)添加到指定的键里面。 
# 这些数据会以有序集he的形式被储存在键里面,从而使得georadius和georadiusbymember这样的 命令可以在之后通过位置查询取得这些元素。 
# geoadd命令以标准的x,y格式接受参数,所以用户必须先输入经度,然后再输入纬度。 
# geoadd能够记录的坐标是有限的:非常接近两极的区域无法被索引。 
# 有效的经度介于-180-180度之间,有效的纬度介于-85.05112878 度至 85.05112878 度之间。, 当用户尝试输入一个超出范围的经度或者纬度时,geoadd命令将返回一个错误。

127.0.0.1:6379> geoadd china:city 116.23 40.22 北京 
(integer) 1 
127.0.0.1:6379> geoadd china:city 121.48 31.40 上海 113.88 22.55 深圳 120.21 30.20 杭州 
(integer) 3 
127.0.0.1:6379> geoadd china:city 106.54 29.40 重庆 108.93 34.23 西安 114.02 30.58 武汉 
(integer) 3


2、geopos # 语法 
geopos key member [member...] 
#从key里返回所有给定位置元素的位置(经度和纬度)
127.0.0.1:6379> geopos china:city 北京 
1)  1) "116.23000055551528931" 
	2) "40.2200010338739844" 
127.0.0.1:6379> geopos china:city 上海 重庆 
1)  1) "121.48000091314315796" 
	2) "31.40000025319353938" 
2)  1) "106.54000014066696167"
	2) "29.39999880018641676" 
127.0.0.1:6379> geopos china:city 新疆 
1) (nil)


3、geodist # 语法 
geodist key member1 member2 [unit]

#返回两地之间的距离
127.0.0.1:6379> geodist china:city 北京 上海 
"1088785.4302" 
127.0.0.1:6379> geodist china:city 北京 上海 km  #精简成km
"1088.7854" 
127.0.0.1:6379> geodist china:city 重庆 北京 km 
"1491.6716"


4、georadius # 语法 
georadius key longitude latitude radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count]
# 以给定的经纬度为中心, 找出某一半径内的元素
# 在 china:city 中寻找坐标 100 30 半径为 1000km 的城市 
127.0.0.1:6379> georadius china:city 100 30 1000 km
重庆
西安

# withdist withcoord 返回位置名称 距离 和经纬度 count 限定寻找个数 
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord withdist count 1
重庆
635.2850 
106.54000014066696167
29.39999880018641676 
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord withdist count 2
重庆
635.2850 
106.54000014066696167 
29.39999880018641676 
西安
963.3171 
108.92999857664108276 
34.23000121926852302


5、georadiusbymember # 语法
georadiusbymember key member radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count]
# 找出位于指定范围内的元素,中心点是由给定的位置元素决定
127.0.0.1:6379> GEORADIUSBYMEMBER china:city 上海 400 km 
杭州
上海


6、geohash # 语法 
geohash key member [member...]
# 该命令将返回11个字符串的geohash字符串!
# Redis使用geohash将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似 表示距离越近。
127.0.0.1:6379> geohash china:city 北京 重庆 
wx4sucu47r0 
wm5z22h53v0
# 如果两个字符串越像,那么他们的距离越近

GEO没有提供删除成员的命令,但是因为GEO的底层实现是zset,所以可以借用zrem命令实现对地理位置信息的删除.

HyperLogLog(基数统计的算法)

127.0.0.1:6379> PFADD mykey a b c d e f g h i j  #创建第一组元素
1
127.0.0.1:6379> PFCOUNT mykey   #统计mykey元素的基数
10
127.0.0.1:6379> PFADD mykey2 i j z x c v b n m  #创建第二组元素
1
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2  #合并两组元素,不重复
OK
127.0.0.1:6379> PFCOUNT mykey3  #统计mykey3元素的基数
15

Bitmaps(计数器)

可以用来设置上班打卡与否,账号登录与否,等等

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N7zCRSjG-1652586387293)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225195015667.png)]

GETBIT key offffset 获取offffset设置的值,未设置过默认返回0 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ftWVcBWU-1652586387294)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225195047165.png)]

bitcount key [start, end] 统计 key 上位为1的个数

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-isHc6OkN-1652586387296)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225195054021.png)]

事务

redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。

批量操作在发送 EXEC 命令前被放入队列缓存,并不会被实际执行!

Redis不保证原子性:
Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。

Redis事务的三个阶段:

  • 开始事务 multi

  • 命令入队

  • 执行事务 exec

Redis事务相关命令:

watch key1 key2 ... 
#监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则 事务被打断 ( 类似乐观锁 ) 
multi # 标记一个事务块的开始( queued ) 
exec # 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 ) 
discard # 取消事务,放弃事务块中的所有命令 
unwatch # 取消watch对所有key的监控

正常执行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BIHySeEw-1652586387297)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225200435553.png)]

放弃事务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RS92eAcL-1652586387298)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225200449801.png)]

若在事务队列中存在命令性错误(类似于java编译性错误),则执行EXEC命令时,所有命令都不会执行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lykcSkYR-1652586387299)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225200508708.png)]

若在事务队列中存在语法性错误(类似于java的1/0的运行时异常),则执行EXEC命令时,其他正确命令会被执行,错误命令抛出异常。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DNNe2Xdq-1652586387299)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225200524942.png)]

Watch 监控(乐观锁)

127.0.0.1:6379> watch balance #只需要给这个key加上watch就好

开启事务,当开启两个线程同时操作一个数据,当线程一的数据还没提交,线程二已经操作修改了,那么事务提交时,就会报错,因为我们提前加上了watch监控器。

unwatch 解锁,关闭监视

jedis

1、导入依赖

    <dependencies>
        <!-- jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>

2、远程连接redis测试

  1. vim redis.conf

    1. 将绑定的ip: bind 127.0.0.1注释掉
    2. 修改protected-mode属性为no
  2. 重启redis-server

  3. 编写代码测试

        public static void main(String[] args) {
            System.out.println("测试");
            Jedis jedis = new Jedis("39.102.33.216",6379);
            System.out.println(jedis.ping());
        }
    

Springboot整合redis

1 导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

2 配置

# Redis服务器地址 
spring.redis.host=39.102.33.216
# Redis服务器连接端口 
spring.redis.port=6379

3 测试

@SpringBootTest
class Redis02SpringbootApplicationTests {

    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    void contextLoads() {
        //opsForValue() 这个代表操作什么类型(String)
        redisTemplate.opsForValue().set("name","redis学习");
        System.out.println(redisTemplate.opsForValue().get("name"));
        // 获取redis连接对象
        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        // 清空数据
        connection.flushAll();
        connection.close();
    }

}

配置一个RedisTemplate

package com.kuang.config; 
import com.fasterxml.jackson.annotation.JsonAutoDetect; 
import com.fasterxml.jackson.annotation.PropertyAccessor; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.data.redis.connection.RedisConnectionFactory; 
import org.springframework.data.redis.core.RedisTemplate; 
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; 
import org.springframework.data.redis.serializer.StringRedisSerializer; 

@Configuration 
public class RedisConfig { 
    @Bean @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(factory); 
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); 
        ObjectMapper om = new ObjectMapper(); 
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); 
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); 
        jackson2JsonRedisSerializer.setObjectMapper(om); 
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        
        // key采用String的序列化方式 
        template.setKeySerializer(stringRedisSerializer); 
        // hash的key也采用String的序列化方式 
        template.setHashKeySerializer(stringRedisSerializer); 
        // value序列化方式采用jackson 
        template.setValueSerializer(jackson2JsonRedisSerializer); 
        // hash的value序列化方式采用jackson 
        template.setHashValueSerializer(jackson2JsonRedisSerializer); 
        template.afterPropertiesSet();
        
        return template; 
    } 
}

RedisUtils工具类

(直接用RedisTemplate操作Redis,需要很多行代码,因此直接封装好一RedisUtils,这样写代码更方便点。这个RedisUtils交给Spring容器实例化,使用时直接注解注入。)

package com.tycoon.redis02springboot.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @author RenBo
 * @version 1.0
 * @date 2020/12/26
 */
@Component
public class RedisUtils {
    private static RedisTemplate<String, Object> redisTemplate;


    /**
     * 静态注入
     *
     * @param redisTemplate
     */
    @Autowired
    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        RedisUtils.redisTemplate = redisTemplate;
    }

    /**
     * 指定缓存失效时间
     *
     * @param key  键
     * @param time 时间(秒)
     * @return
     */
    public static boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key获取过期时间
     *
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public static long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     *
     * @param key 键
     * @return true 存在 false不存在
     */
    public static boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     *
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public static void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }

    /**
     * 普通缓存获取
     *
     * @param key 键
     * @return 值
     */
    public static Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     *
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */
    public static boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 普通缓存放入并设置时间
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public static boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 递增
     *
     * @param key   键
     * @param delta 要增加几(大于0)
     * @return
     */
    public static long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     *
     * @param key   键
     * @param delta 要减少几(小于0)
     * @return
     */
    public static long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    /**
     * HashGet
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return 值
     */
    public static Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     *
     * @param key 键
     * @return 对应的多个键值
     */
    public static Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     *
     * @param key 键
     * @param map 对应多个键值
     * @return true 成功 false 失败
     */
    public static boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     *
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public static boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public static boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public static boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public static void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }

    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public static boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     * @return
     */
    public static double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     * @return
     */
    public static double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }

    /**
     * 根据key获取Set中的所有值
     *
     * @param key 键
     * @return
     */
    public static Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public static boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public static long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public static long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0) {
                expire(key, time);
            }
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取set缓存的长度
     *
     * @param key 键
     * @return
     */
    public static long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */
    public static long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     * @return
     */
    public static List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     *
     * @param key 键
     * @return
     */
    public static long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     * @return
     */
    public static Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public static boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public static boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public static boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public static boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */
    public static boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */
    public static long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

Redis.conf

config get * # 获取全部的配置

单位

1、配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit
2、对 大小写 不敏感

INCLUDES 包含

和Spring配置文件类似,可以通过includes包含,redis.conf 可以作为总文件,可以包含其他文件!

NETWORK 网络配置

bind 127.0.0.1 # 绑定的ip 
protected-mode yes # 保护模式 
port 6379 # 默认端口

GENERAL 通用

daemonize yes # 默认情况下,Redis不作为守护进程运行。需要开启的话,改为 yes

supervised no # 可通过upstart和systemd管理Redis守护进程

supervised no # 可通过upstart和systemd管理Redis守护进程

loglevel notice # 日志级别。可选项有:
# debug(记录大量日志信息,适用于开发、测试阶段); 
# verbose(较多日志信息);
# notice(适量日志信息,使用于生产环境); 
# warning(仅有部分重要、关键信息才会被记录)。

logfile "" # 日志文件的位置,当指定为空字符串时,为标准输出
databases 16 # 设置数据库的数目。默认的数据库是DB 0 
always-show-logo yes # 是否总是显示logo

SNAPSHOPTING 快照

# 900秒(15分钟)内至少1个key值改变(则进行数据库保存--持久化) 
save 900 1 
# 300秒(5分钟)内至少10个key值改变(则进行数据库保存--持久化) 
save 300 10 
# 60秒(1分钟)内至少10000个key值改变(则进行数据库保存--持久化) 
save 60 10000

stop-writes-on-bgsave-error yes # 持久化出现错误后,是否依然进行继续进行工作

rdbcompression yes # 使用压缩rdb文件 yes:压缩,但是需要一些cpu的消耗。no:不压 缩,需要更多的磁盘空间

rdbchecksum yes # 是否校验rdb文件,更有利于文件的容错性,但是在保存rdb文件的时 候,会有大概10%的性能损耗

dbfilename dump.rdb # dbfilenamerdb文件名称

dir ./ # dir 数据目录,数据库的写入会在这个目录。rdb、aof文件也会写在这个目录

SECURITY安全

# 启动redis 
# 连接客户端

# 获得和设置密码 
config get requirepass 
config set requirepass "123456"

# 下次需要登录
127.0.0.1:6379> auth 123456

限制 CLIENTS

maxclients 10000 # 设置能连上redis的最大客户端连接数量

maxmemory <bytes> # redis配置的最大内存容量

maxmemory-policy noeviction # maxmemory-policy 内存达到上限的处理策略
						#volatile-lru:利用LRU算法移除设置过过期时间的key。
						#volatile-random:随机移除设置过过期时间的key。
						#volatile-random:随机移除设置过过期时间的key。
						#allkeys-lru:利用LRU算法移除任何key。
						#allkeys-random:随机移除任何key。
						#noeviction:不移除任何key,只是返回一个写错误。

append only模式

appendonly no # 是否以append only模式作为持久化方式,默认使用的是rdb方式持久化,这种 方式在许多应用中已经足够用了

appendfilename "appendonly.aof" # appendfilename AOF 文件名称

appendfsync everysec # appendfsync aof持久化策略的配置
		# no表示不执行fsync,由操作系统保证数据同步到磁盘,速度最快。 
		# always表示每次写入都执行fsync,以保证数据同步到磁盘。 
		# everysec表示每秒执行一次fsync,可能会导致丢失这1s数据。
	

持久化

Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能!

RDB(Redis DataBase)

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。

Fork

Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量,环境变量,程序计数器等)数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程。

RDB 保存的是 dump.rdb 文件

如何触发RDB快照

1、配置文件中默认的快照配置,建议多用一台机子作为备份,复制一份 dump.rdb

2、命令save或者是bgsave

  • save 时只管保存,其他不管,全部阻塞
  • bgsave,Redis 会在后台异步进行快照操作,快照同时还可以响应客户端请求。可以通过lastsave命令获取最后一次成功执行快照的时间。

3、执行flushall命令,也会产生 dump.rdb 文件,但里面是空的,无意义 !

4、退出的时候也会产生 dump.rdb 文件!

如何恢复

1、将备份文件(dump.rdb)移动到redis安装目录并启动服务即可

2、CONFIG GET dir 获取目录

127.0.0.1:6379> config get dir 
dir 
/usr/local/bin

优点和缺点

优点:

1、适合大规模的数据恢复

2、对数据完整性和一致性要求不高

缺点:

1、在一定间隔时间做一次备份,所以如果redis意外down掉的话,就会丢失最后一次快照后的所有修改

2、Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑。

小结

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cYyBvqqC-1652586387300)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226152824283.png)]

AOF(Append Only File)

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

Aof保存的是 appendonly.aof 文件

配置

appendonly no # 是否以append only模式作为持久化方式,默认使用的是rdb方式持久化,这 种方式在许多应用中已经足够用了 
appendfilename "appendonly.aof" # appendfilename AOF 文件名称 
appendfsync everysec # appendfsync aof持久化策略的配置 
# no表示不执行fsync,由操作系统保证数据同步到磁盘,速度最快。 
# always表示每次写入都执行fsync,以保证数据同步到磁盘。 
# everysec表示每秒执行一次fsync,可能会导致丢失这1s数据。 

No-appendfsync-on-rewrite #重写时是否可以运用Appendfsync,用默认no即可,保证数据安 全性
Auto-aof-rewrite-min-size # 设置重写的基准值 
Auto-aof-rewrite-percentage #设置重写的基准值

AOF 启动/修复/恢复

正常恢复:

  • 启动:设置Yes,修改默认的appendonly no,改为yes
  • 将有数据的aof文件复制一份保存到对应目录(config get dir)
  • 恢复:重启redis然后重新加载

异常恢复:

  • 启动:设置Yes
  • 故意破坏 appendonly.aof 文件!
  • 修复: redis-check-aof --fix appendonly.aof 进行修复
  • 恢复:重启 redis 然后重新加载

Rewrite

AOF 采用文件追加方式,文件会越来越大,为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis 就会启动AOF 文件的内容压缩,只保留可以恢复数据的最小指令集,可以使用命令 bgrewriteaof !

重写原理:
AOF 文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),遍历新进程的内存中数据,每条记录有一条的Set语句。重写aof文件的操作,并没有读取旧的aof文件,这点和快照有点类似!

触发机制:
Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的已被且文件大于64M的触发。

优点和缺点

优点:

1、每修改同步:appendfsync always 同步持久化,每次发生数据变更会被立即记录到磁盘,性能较差但数据完整性比较好

2、每秒同步: appendfsync everysec 异步操作,每秒记录 ,如果一秒内宕机,有数据丢失

3、不同步: appendfsync no 从不同步

缺点:

1、相同数据集的数据而言,aof 文件要远大于 rdb文件,恢复速度慢于 rdb。

2、Aof 运行效率要慢于 rdb,每秒同步策略效率较好,不同步效率和rdb相同。

小总结

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Ta2mbwB-1652586387301)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226160033643.png)]

总结

1、RDB 持久化方式能够在指定的时间间隔内对你的数据进行快照存储

2、AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。

3、只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化

4、同时开启两种持久化方式

  • 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
  • RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段。

5、性能建议

  • 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留 save 900 1 这条规则。
  • 如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价:一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
  • 如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。

Redis发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 客户端可以订阅任意数量的频道。

命令

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwM9GzfV-1652586387302)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226171147331.png)]

测试

#订阅端,订阅了这个公众号
redis 127.0.0.1:6379> SUBSCRIBE redisChat 
Reading messages... 
(press Ctrl-C to quit) 
1) "subscribe" 
2) "redisChat" 
3) (integer) 1


#作者发布一个消息
redis 127.0.0.1:6379> PUBLISH redisChat "Hello,Redis" 
(integer) 1 
redis 127.0.0.1:6379> PUBLISH redisChat "Hello,Kuangshen" 
(integer) 1 


# 订阅者的客户端会显示如下消息 
1) "message" 
2) "redisChat" 
3) "Hello,Redis" 
1) "message" 
2) "redisChat" 
3) "Hello,Kuangshen"

Redis主从复制

概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave 以读为主。

默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

主从复制的作用主要包括:

1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。

2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。

3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。

4、高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。

环境配置

slaveof 主库ip 主库端口 # 配置主从 
Info replication # 查看信息

修改配置文件!

准备工作:我们配置主从复制,至少需要三个,一主二从!配置三个客户端!

1、拷贝多个redis.conf 文件

2、指定端口 6379,依次类推

3、开启daemonize yes

4、Pid文件名字 pidfile /var/run/redis_6379.pid , 依次类推

5、Log文件名字 logfile “6379.log” , 依次类推

6、Dump.rdb 名字 dbfilename dump6379.rdb , 依次类推

上面都配置完毕后,3个服务通过3个不同的配置文件开启,我们的准备环境就OK 了!

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9iPv9nUn-1652586387303)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226174108984.png)]

一主二从

1、环境初始化

1、默认三个都是Master 主节点

2、配置为一个Master 两个Slave

slaveof 127.0.0.1 6379
slaveof 127.0.0.1 6379

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DWmBAU8d-1652586387304)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226174829191.png)]

3、在主机设置值,在从机都可以取到!从机不能写值!

小结

主机宕机:如果主机宕机了,那么从机依然连接到主机,就是没有写的操作了,当主机重新连接,那么从机还可以继续取到主机里写的值。

从机宕机:但是当从机断开了,那么主机中断开了这个从机,重新连接后,也没有主从效应了。但是从新设置它的主机,那么还是可以取到主机的值。

层层链路

上一个Slave 可以是下一个slave 和 Master,Slave 同样可以接收其他 slaves 的连接和同步请求,那么该 slave 作为了链条中下一个的master,可以有效减轻 master 的写压力!

谋朝篡位

一主二从的情况下,如果主机断了,从机可以使用命令 SLAVEOF NO ONE 将自己改为主机!这个时候其余的从机链接到这个节点。对一个从属服务器执行命令 SLAVEOF NO ONE 将使得这个从属服务器关闭复制功能,并从从属服务器转变回主服务器,原来同步所得的数据集不会被丢弃。

主机再回来,也只是一个光杆司令了,从机为了正常使用跑到了新的主机上!

主从复制原理

Slave 启动成功连接到 master 后会发送一个sync命令

Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步

但是只要是重新连接master,一次完全同步(全量复制)将被自动执行

哨兵模式

概述

主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。

谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

这里的哨兵有两个作用

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。

假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

配置测试

1、调整结构,6379带着80、81

2、自定义的 /myredis 目录下新建 sentinel.conf 文件,名字千万不要错

3、配置哨兵,填写内容sentinel monitor 被监控主机名字 127.0.0.1 6379 1上面最后一个数字1,表示主机挂掉后slave投票看让谁接替成为主机,得票数多少后成为主机

4、启动哨兵Redis-sentinel /myredis/sentinel.conf上述目录依照各自的实际情况配置,可能目录不同

5、正常主从演示

6、原有的Master 挂了

7、投票新选

8、重新主从继续开工,info replication 查查看

9、问题:如果之前的master 重启回来,会不会双master 冲突? 之前的回来只能做小弟了

哨兵模式的优缺点

优点

  1. 哨兵集群模式是基于主从模式的,所有主从的优点,哨兵模式同样具有。

  2. 主从可以切换,故障可以转移,系统可用性更好。

  3. 哨兵模式是主从模式的升级,系统更健壮,可用性更高。

缺点

  1. Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。

  2. 实现哨兵模式的配置也不简单,甚至可以说有些繁琐

哨兵配置说明

# Example sentinel.conf 

# 哨兵sentinel实例运行的端口 默认26379 
port 26379 
# 哨兵sentinel的工作目录 
dir /tmp 
# 哨兵sentinel监控的redis主节点的 ip port 
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。 
# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了 
# sentinel monitor <master-name> <ip> <redis-port> <quorum> 
sentinel monitor mymaster 127.0.0.1 6379 2

# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都 要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码 
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd

# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒 
# sentinel down-after-milliseconds <master-name> <milliseconds> 
sentinel down-after-milliseconds mymaster 30000

# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同 步,这个数字越小,完成failover所需的时间就越长, 但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。 可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。 
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1

# 故障转移的超时时间 failover-timeout 可以用在以下这些方面: 
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的 master那里同步数据时。 
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超 时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了 
# 默认三分钟 
# sentinel failover-timeout <master-name> <milliseconds> 
sentinel failover-timeout mymaster 180000

# SCRIPTS EXECUTION

#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮 件通知相关人员。 
#对于脚本的运行结果有以下规则: 
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。 
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。

#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等 等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常 运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果 sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执 行的,否则sentinel无法正常启动成功。
#通知脚本 
# sentinel notification-script <master-name> <script-path> 
sentinel notification-script mymaster /var/redis/notify.sh

# 客户端重新配置主节点参数脚本 
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master 地址已经发生改变的信息。 
# 以下参数将会在调用脚本时传给脚本: 
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port> 
# 目前<state>总是“failover”, 
# <role>是“leader”或者“observer”中的一个。 
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的 slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。 
# sentinel client-reconfig-script <master-name> <script-path> 
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

Redis缓存穿透和雪崩

Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。

缓存穿透

概念

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

解决方案

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;

缓存空对象

当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;

但是这种方法会存在两个问题:

1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;

2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。

缓存击穿

概述

这里需要注意和缓存击穿的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。

当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。

解决方案

设置热点数据永不过期

从缓存层面来看,没有设置过期时间,所以不会出现热点 key 过期后产生的问题。

加互斥锁

分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

缓存雪崩

概念

缓存雪崩,是指在某一个时间段,缓存集中过期失效。

产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。

解决方案

redis高可用

这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。

限流降级

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。

数据预热

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-18 17:42:19  更:2022-05-18 17:44:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:48:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码