Redis
Nosql概述
NoSql是什么
NoSQL = Not Only SQL(不仅仅是SQL)
关系型数据库:表格,行,列
很多的数据类型用户的个人信息,社交网络,地理位置这些数据类型的存储不需要一个固定的格式,不需要多余的操作就可以扩展的,例如:Map<String,Object> 使用键值对来控制!
NoSql的特点:
-
方便扩展(数据之间没有关系,很好扩展) -
大数据量高性能(Redis一秒写8万次,读取11万,NoSQL的缓存记录级别,是一种细粒度的缓存,性能比较高) -
数据类型是多样型的!(不需要事先设计数据库,随取随用) -
传统RDBMS和NoSQl 传统的RDBMS
- 结构化组织
- SQL
- 数据和关系都存在单独的表中
- 操作操作,数据定义语言
- 严格的一致性
- 基础的事务
-.....
Nosql
- 不仅仅是数据
- 没有固定的查询语言
- 键值对存储,列存储,文档存储,图形数据库(社交关系)
- 最终一致性
- CAP定理和BASE (异地多活)
- 高性能,高可用,高可扩
-.....
3高和3V 3V:主要是描述问题的
- 海量Velume
- 多样Variety
- 实时Velocity
3高:主要是对程序的要求
- 高并发
- 高可扩
- 高性能
真正的公司实践:NoSql +RDBMS一起使用
NoSQL的四大分类
KV键值对:
- 新浪: Redis
- 美团:Redis +Tair
- 阿里、百度:Redis + memecache
文档型数据库(bson格式 和 json 一样):
- MongoDB(一般必须要掌握)
- MOngoDB 是一个基于分布式文件存储的数据库,C++编写,主要用来处理大量的文档
- MongoDB是一个介于关系型数据库和非关系型数据库中间的产品
- MongoDB是非关型数据库中功能最丰富,最像关系型数据库的!
- ConthDB
列存储数据库
图关系数据库
-
Neo4j,InfoGrid -
存的是关系,比如:朋友圈社交网路,广告推荐!
Redis入门
概述
Redis是什么?
Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的使用ANSI,C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list([链表]、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
Redis能干什么?
-
内存存储,持久化,内存中是断电即失,所以说持久化很重要(rdb,aof) -
效率高,可以用于高速缓存 -
发布订阅系统 -
地图信息分析 -
计时器,计数器(浏览量) …
特性
1.多样数据类型
2.持久化
3.集群
4.事务
…
学习中需要用到的东西
redis中文官网:http://www.redis.cn/
Redis推荐是在linux服务器上搭建的,我们是基于linux学习
Windows 下安装
下载地址:https://github.com/tporadowski/redis/releases
1.下载得到压缩包
2.解压后开启服务
3.发现出现闪退现象
4.cmd 进入redis目录下,输入redis-server.exe redis.windows.conf
5.测试连接
Linux安装
1.官网下载压缩包:https://redis.io/download
2.解压Redis的安装包,程序建议放在/opt目录下
[root@iZbp10ephck4n6u4xij7pcZ ~]
[root@iZbp10ephck4n6u4xij7pcZ home]
canCopy.txt linux_amd64 mysql redis-6.2.6.tar.gz ssm02_train.war test ut
[root@iZbp10ephck4n6u4xij7pcZ home]
[root@iZbp10ephck4n6u4xij7pcZ home]
[root@iZbp10ephck4n6u4xij7pcZ opt]
containerd redis-6.2.6.tar.gz
[root@iZbp10ephck4n6u4xij7pcZ opt]
[root@iZbp10ephck4n6u4xij7pcZ opt]
containerd redis-6.2.6 redis-6.2.6.tar.gz
3.进入解压后的redis,
4.基本的环境安装
yum install gcc-c++
make
make install
5.redis的默认安装路径:/usr/local/bin
6.将redis配置文件复制到当前目录下
7.redis默认不是后台启动的,修改配置文件
8.启动redis服务
9.查看redis的进程是否开启 ps -ef|grep redis
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IUrm1axv-1637586350751)(https://i.loli.net/2021/11/22/GFhTjd5MOe3kEBo.png)]
10.关闭redis服务 在客户端 输入 shutdown
测试性能
redis-benchmark:压力测试工具
redis-benchmark 命令参数!
简单测试
redis-benchmark -h localhost -p 6379 -c 100 -n 100000
基础知识
redis默认有16个数据库,默认使用的是第0个
[root@iZbp10ephck4n6u4xij7pcZ bin]
可以使用select进行切换数据库
[root@iZbp10ephck4n6u4xij7pcZ bin]
127.0.0.1:6379> select 3
OK
127.0.0.1:6379[3]> dbsize
(integer) 0
127.0.0.1:6379[3]>
清空当前数据库:flushdb
127.0.0.1:6379[3]> keys *
1) "name"
127.0.0.1:6379[3]> flushdb
OK
127.0.0.1:6379[3]> keys *
(empty array)
清空所有数据库:flushall
Redis是单线程的
明白Redis是很快的,Redis是基于内存操作,CPU不是Redis性能瓶颈,Redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程,就使用单线程了!
Redis为什么单线程还那么快
1、误区:高性能的服务器一定是多线程的?
2、误区:多线程(CPU上下文会切换!)一定比单线程效率快
先去CPU>内存>硬盘的速度要有所了解!
核心:redis 是将所有的数据全部放在内存中的,所以说单线程去操作效率就是最高的,多线程(CPU上下文切换:耗时操作!!!),对于内存系统来说,如果没有上下文切换效率就是最高的。
Redis五大数据类型
Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。
Redis-Key
127.0.0.1:6379[3]> set name li
OK
127.0.0.1:6379[3]> exists name
(integer) 1
127.0.0.1:6379[3]> move name 1
(integer) 1
127.0.0.1:6379[3]> exists name
(integer) 0
127.0.0.1:6379[3]> set name 456
OK
127.0.0.1:6379[3]> keys *
1) "name"
127.0.0.1:6379[3]> get name
"456"
127.0.0.1:6379[3]> expire name 10
(integer) 1
127.0.0.1:6379[3]> ttl name
(integer) 1
127.0.0.1:6379[3]> ttl name
(integer) -2
127.0.0.1:6379[3]> get name
(nil)
127.0.0.1:6379[3]> set name 2
OK
127.0.0.1:6379[3]> type name
string
更多命令可以去官网查看
String(字符串)
127.0.0.1:6379[3]> set name 2
OK
127.0.0.1:6379[3]> type name
string
127.0.0.1:6379[3]> append name hello
(integer) 6
127.0.0.1:6379[3]> get name
"2hello"
127.0.0.1:6379[3]> strlen name
(integer) 6
步长
127.0.0.1:6379[3]> set views 0
OK
127.0.0.1:6379[3]> incr views
(integer) 1
127.0.0.1:6379[3]> decr views
(integer) 0
127.0.0.1:6379[3]> incrby views 10
(integer) 10
127.0.0.1:6379[3]> get views
"10"
127.0.0.1:6379[3]> decrby views 10
(integer) 0
127.0.0.1:6379[3]> get views
"0"
127.0.0.1:6379[3]>
字符串范围 range
127.0.0.1:6379> set name hello,dijia
OK
127.0.0.1:6379> get name
"hello,dijia"
127.0.0.1:6379> getrange name 0 2
"hel"
127.0.0.1:6379> getrange name 0 -1
"hello,dijia"
127.0.0.1:6379> setrange name 2 xxx
(integer) 11
127.0.0.1:6379> get name
"hexxx,dijia"
127.0.0.1:6379> setex name2 30 "i wil go"
OK
127.0.0.1:6379> ttl name2
(integer) 15
127.0.0.1:6379> ttl name2
(integer) 14
127.0.0.1:6379> ttl name2
(integer) 12
127.0.0.1:6379> setnx name3 "hi"
(integer) 1
127.0.0.1:6379> setnx name3 "ok"
(integer) 0
127.0.0.1:6379> get name3
"hi"
mset
mget
msetnx
127.0.0.1:6379> mset k1 v1 k2 v2
OK
127.0.0.1:6379> keys *
1) "mylist"
2) "name"
3) "counter:__rand_int__"
4) "key:__rand_int__"
5) "myhash"
6) "k2"
7) "name3"
8) "k1"
127.0.0.1:6379> mget k1 k2
1) "v1"
2) "v2"
127.0.0.1:6379> msetnx k1 v8 k3 v3
(integer) 0
127.0.0.1:6379> keys *
1) "mylist"
2) "name"
3) "counter:__rand_int__"
4) "key:__rand_int__"
5) "myhash"
6) "k2"
7) "name3"
8) "k1"
127.0.0.1:6379> get k1
"v1"
127.0.0.1:6379>
set user:1 {name:zhangsan,age:30}
127.0.0.1:6379> mset user:1:name zhangsan user:1:age 20
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "20"
getset
127.0.0.1:6379> getset db redis
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mogodb
"redis"
127.0.0.1:6379> get db
"mogodb"
String 类似的使用场景,value除了是我们的字符串还可以是我们的数字
List
在redis里面,我们可以把list完成栈,队列,阻塞队列!
所有的list命令都是
127.0.0.1:6379> lpush list one
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lget list
(error) ERR unknown command `lget`, with args beginning with: `list`,
127.0.0.1:6379> lrange list 0-1
(error) ERR wrong number of arguments for 'lrange' command
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379> lrange list 0 1
1) "two"
2) "one"
127.0.0.1:6379> rpop list
"111"
127.0.0.1:6379> get list
(error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379> rpop list
"one"
Lindex 通过下标获取list中值
127.0.0.1:6379> lindex list 0
"two"
127.0.0.1:6379> lindex list 1
(nil)
llen 返回列表长度
127.0.0.1:6379> llen list
(integer) 1
lrem
127.0.0.1:6379> lpush list 2
(integer) 2
127.0.0.1:6379>
127.0.0.1:6379> lpush list 3
(integer) 3
127.0.0.1:6379> lpush list 3
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "2"
4) "two"
127.0.0.1:6379> lrem list 1 2
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "two"
127.0.0.1:6379> lrem list 2 3
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "two"
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "2"
3) "1"
4) "1"
5) "1"
6) "two"
127.0.0.1:6379> ltrim list 1 2
OK
127.0.0.1:6379> lrange list 0 -1
1) "2"
2) "1"
127.0.0.1:6379> lrange list 0 -1
1) "2"
2) "1"
127.0.0.1:6379> rpoplpush list list2
"1"
127.0.0.1:6379> lrange list 0 -1
1) "2"
127.0.0.1:6379> lrange list2 0 -1
1) "1"
127.0.0.1:6379> exists list
(integer) 1
127.0.0.1:6379> lset list 0 item
OK
127.0.0.1:6379> lrange list
(error) ERR wrong number of arguments for 'lrange' command
127.0.0.1:6379> lrange list 0 -1
1) "item"
127.0.0.1:6379> lset list 1 15
(error) ERR index out of range
127.0.0.1:6379> lrange list 0 -1
1) "item"
127.0.0.1:6379> lpush list 45
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "45"
2) "item"
127.0.0.1:6379> linsert list before item 123
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "45"
2) "123"
3) "item"
127.0.0.1:6379> linsert list after item 124
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "45"
2) "123"
3) "item"
4) "124"
小结
- 它实际上是一个链表,before Node after ,left ,right 都可以插入值
- 如果key不存在,创建新的链表
- 如果key存在,新增内容
- 如果移除了所有值,空链表,也代表不存在
- 在两边插入改动值,效率最高!中间元素,相对来说效率会低一点
消息排队!消息队列(Lpush Rpop)
Set
127.0.0.1:6379> sadd myset 1
(integer) 1
127.0.0.1:6379> smembers myset
1) "1"
127.0.0.1:6379> smembers myse
(empty array)
127.0.0.1:6379> key *
(error) ERR unknown command `key`, with args beginning with: `*`,
127.0.0.1:6379> keys *
1) "myset"
2) "list"
3) "counter:__rand_int__"
4) "name"
5) "myhash"
6) "name3"
7) "k2"
8) "user:1:age"
9) "list2"
10) "lsit"
11) "user:1:name"
12) "mylist"
13) "key:__rand_int__"
14) "db"
15) "k1"
127.0.0.1:6379> sismember myset hello
(integer) 0
127.0.0.1:6379> sismember myset 1
(integer) 1
127.0.0.1:6379> sadd myset 13465
(integer) 1
127.0.0.1:6379> sadd myset 21354
(integer) 1
127.0.0.1:6379> srem myset 1
(integer) 1
127.0.0.1:6379> smermber myset
(error) ERR unknown command `smermber`, with args beginning with: `myset`,
127.0.0.1:6379> smermbers myset
(error) ERR unknown command `smermbers`, with args beginning with: `myset`,
127.0.0.1:6379> smembers myset
1) "13465"
2) "21354"
127.0.0.1:6379> smembers myset
1) "13465"
2) "21354"
127.0.0.1:6379> spop myset
"21354"
127.0.0.1:6379> smembers myset
1) "13465"
127.0.0.1:6379> smembers myset
1) "2"
2) "13465"
127.0.0.1:6379> smove myset myset2 13465
(integer) 1
127.0.0.1:6379> smember myset2
(error) ERR unknown command `smember`, with args beginning with: `myset2`,
127.0.0.1:6379> smembers myset2
1) "13465"
数字集合类
- 差集 sdiff
- 交集 sinter
- 并集 sunion
Hash
Map 集合,key-Map集合
127.0.0.1:6379> hset myhash li good
(integer) 1
127.0.0.1:6379> hget myhash li
"good"
127.0.0.1:6379> hget hash
(error) ERR wrong number of arguments for 'hget' command
127.0.0.1:6379> hgetall myhash
1) "element:__rand_int__"
2) "VXK"
3) "field"
4) "li"
5) "li"
6) "good"
127.0.0.1:6379> hgetall myhash
1) "element:__rand_int__"
2) "VXK"
3) "field"
4) "li"
5) "li"
6) "good"
127.0.0.1:6379> hdel myhash li
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "element:__rand_int__"
2) "VXK"
3) "field"
4) "li"
hash适合对象存储,String 适合字符串存储
Zset(有序集合)
在set的基础之上,增加一个值,set k1 v1 ,zset k1 score1 v1
127.0.0.1:6379> zadd myzset 1 one
(integer) 1
127.0.0.1:6379> zadd myzset 2 two
(integer) 1
127.0.0.1:6379> zrange myzset 0 -1
1) "one"
2) "two"
排序如何实现
127.0.0.1:6379> zadd salary 2500 xiaohong
(integer) 1
127.0.0.1:6379> zadd salary 5000 zhangsan
(integer) 1
127.0.0.1:6379> zadd salary 500 xiaoli
(integer) 1
127.0.0.1:6379> zrangebyscore salary -inf +inf
1) "xiaoli"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrangebyscore salary -inf +inf withscores
1) "xiaoli"
2) "500"
3) "xiaohong"
4) "2500"
5) "zhangsan"
6) "5000"
127.0.0.1:6379> zrangebyscore salary -inf 2500 withscores
1) "xiaoli"
2) "500"
3) "xiaohong"
4) "2500"
127.0.0.1:6379> zrevrange salary 0 -1
1) "zhangsan"
2) "xiaohong"
3) "xiaoli"
127.0.0.1:6379> zrevrange salary 0 -1 withscores
1) "zhangsan"
2) "5000"
3) "xiaohong"
4) "2500"
5) "xiaoli"
6) "500"
127.0.0.1:6379> zrange salary 0 -1
1) "xiaoli"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrem salary xiaoli
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "xiaohong"
2) "zhangsan"
127.0.0.1:6379> zcard salary
(integer) 3
127.0.0.1:6379> zcount salary 20 5000
(integer) 2
案例思路:存储班级成绩表,工资表排序
普通消息: 1. 重要消息 2. 带权重进行判断
排行榜应用
三种特殊数据类型
geospatial(地理位置)
朋友的定位,附近的人,打车距离
Hyperloglog
什么是奇数?
A{1,3,5,7,8,7}
B{1,3,5,7,8}
基数(不重复的元素)= 5,可以接受误差!
简介
redis Hyperloglog 基数统计的算法
优点占用的内存是固定的,2^64不同的元素的技术,只需要12KB内存,从内存角度来看,首选Hyperloglog首选
测试
127.0.0.1:6379> pfadd mykey a b c c
(integer) 1
127.0.0.1:6379> pfcount mykey
(integer) 3
127.0.0.1:6379> pfadd mykey2 1 2 3
(integer) 1
127.0.0.1:6379> pfcount mykey2
(integer) 3
127.0.0.1:6379> pfmerge mykey3 mykey mykey2
OK
127.0.0.1:6379> pfcount mykey3
(integer) 6
如果允许容错,可以使用Hyperloglog!
Bitmaps
位存储
统计用户信息,活跃,不活跃!登录,未登录!打卡等两个状态的,都可以使用Bitmaps
Bitmaps位图,数据结构!都是操作二进制位来进行记录,就只有0和1两种状态
365 天 = 365 bit 1字节 = 8bit
127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 0
(integer) 0
127.0.0.1:6379> bitcount sign
(integer) 1
事务
事务: 要么同时成功,要么同时失败,原子性!
Redis单条命令是保存原子性的,但是事务不保证原子性!
Redis事务本质:一组命令的集合一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行!一次性,排它性!执行一些列的命令
Redis事务没有隔离级别的概念!
所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行!
Redis的事务:
- 开启事务(multi)
- 命令入队()
- 执行事务(exec)
正常开启事务
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set name li
QUEUED
127.0.0.1:6379(TX)> set name wang
QUEUED
127.0.0.1:6379(TX)> get name
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
3) "wang"
放弃事务
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> discard
OK
127.0.0.1:6379> get k1
(nil)
编译型异常(代码有问题),事务中所有事务都不会被执行
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> getser k3 v3
(error) ERR unknown command `getser`, with args beginning with: `k3`, `v3`,
127.0.0.1:6379(TX)> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k1
(nil)
运行时异常(1/0),如果事务队列中存在语法性,那么执行命令的时候,其他命令是可以正常执行的错误的命令会抛出异常
127.0.0.1:6379> set k1 "v1"
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> incr k1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) (error) ERR value is not an integer or out of range
2) OK
127.0.0.1:6379> get k2
"v2"
监控! Watch
悲观锁:
乐观锁:
- 认为什么时候都不会出问题,所以不会上锁,更新数据的时候去判断一下,在此期间是否有人修改过这个数据
- 获取version
- 更新的时候比较version
正常执行
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> decrby money 20
QUEUED
127.0.0.1:6379(TX)> incrby out 20
QUEUED
127.0.0.1:6379(TX)> exec
1) (integer) 80
2) (integer) 20
Jedis
Jedis是Redis官方推荐的java连接开发工具!使用java操作Redis中间件!
测试
1.导入相关依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
2.编写测试类
public class JedisTest {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1",6379);
System.out.println(jedis.ping());
}
}
3.打开redis-server测试连接
测试事务
测试类
public class AffairTest {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1",6379);
JSONObject jsonObject = new JSONObject();
jsonObject.put("name","li");
jsonObject.put("password","123");
jedis.flushAll();
Transaction multi = jedis.multi();
String s = jsonObject.toJSONString();
try{
multi.set("user1",s);
int a = 1/0;
multi.set("user2",s);
multi.exec();
}catch (Exception e){
System.out.println("发生异常");
multi.discard();
}finally {
System.out.println(jedis.get("user1"));
System.out.println(jedis.get("user2"));
jedis.close();
}
}
}
测试结果
SpringBoot + Redis
说明:在springboot2.x之后,原来使用的jedis被替换成了lettuce
jedis:采用直连,多个线程操作的话,是不安全的,如果想要避免不安全,使用jedis pool连接池,更像BIO模式
lettuce:采用netty,实例可以再多个线程进行共享,不存在线程不安全的情况,可以减少线程数据了,更像NIO模式
原码分析:
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
public RedisAutoConfiguration() {
}
@Bean
@ConditionalOnMissingBean(
name = {"redisTemplate"}
)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { /
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
整合测试
1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.在application.yml中配置redis
spring:
application:
name: dev
redis:
host: 127.0.0.1
port: 6379
3.测试
@SpringBootTest
class DemoApplicationTests {
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads() {
redisTemplate.opsForValue().set("name","li");
System.out.println(redisTemplate.opsForValue().get("name"));
}
}
重写配置类
@Configuration
public class RedisConfig {
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
RedisTemplate<String, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setStringSerializer(stringRedisSerializer);
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
此时再从终端测试一下
企业级开发:编写Redis工具类
@Component
public final class RedisUtil{
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(String.valueOf(CollectionUtils.arrayToList(key)));
}
}
}
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
Redis.conf详解
Redis启动的时候,就通过配置文件来启动
单位
1.配置文件unit单位对大小写不敏感
包含
如同Spring,Improt,includ
网络
bind 127.0.0.1 -::1
protected-mode yes
port 6379
通用GENERAL
daemonize yes
pidfile /var/run/redis_6379.pid
loglevel notice
logfile ""
databases 16
always-show-logo no
快照
持久化,在规定的时间内,执行了多少次操作,则会持久化到文件.rdb .aof
redis是内存数据库,如果没有持久化,那么数据断电即失
save 3600 1
save 300 100
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dir ./
REPLICATTION 复制,主从复制
SECURITY 安全
设置密码
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass "redis"
OK
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "redis"
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> exec
(error) ERR EXEC without MULTI
127.0.0.1:6379> exet
(error) ERR unknown command `exet`, with args beginning with:
127.0.0.1:6379> exit
[root@iZbp10ephck4n6u4xij7pcZ bin]
127.0.0.1:6379>
127.0.0.1:6379> config get requirepass
(error) NOAUTH Authentication required.
127.0.0.1:6379> ping
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth redis
OK
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "redis"
CLIENTS 限制
maxclients 10000
maxmemory <bytes>
maxmemory-policy noeviction
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
2、allkeys-lru : 删除lru算法的key
3、volatile-random:随机删除即将过期key
4、allkeys-random:随机删除
5、volatile-ttl : 删除即将过期的
6、noeviction : 永不过期,返回错误
APPEND ONLY模式 aof配置
appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
具体配置,在Redis持久化中
Redis持久化
面试和工作,持久化都是重点!
Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能!
RDB(Redis DataBase)
什么是RDB
如果需要进行大规模数据恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式比AOF方式更加高效,RDB的缺点是最后一次持久化的数据可能丢失
我们默认的就是RDB,一般情况下不需要修改这个配置
rdb保存的文件是dump.rdb
触发机制
1.save的规则满足的情况下,会自动触发rdb规则
2.执行flushall命令,也会触发rdb规则
3.关闭redis,也会生成一个rdb文件
如何恢复rdb文件
1.只需要将rdb文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb恢复其中的数据
2.查看rdb文件需要存储的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin
优点:
缺点:
- 需要一定的时间间隔进行操作,如果redis意外宕机了,这个最后一次修改数据就没有了
- fork进程的时候,会占用一定的内容空间
AOF(Append Only File)
AOF是什么
将所有命令都记录下来,恢复的时候就把这个文件全部都再执行一遍!
AOF保存的是aof文件
开启AOF
默认不开启的,我们需要手动进行配置
将appendonly no 改成 appendonly yes ,重启redis就可以了
修复出错的aof文件
如果这个aof文件有错位,redis是启动不起来的,我们需要修复这个aof文件
redis给我们提供了一个工具 redis-check-aof
redis-check-aof --fix appendonly.aof
优点和缺点
优点:
- 每一次修改都同步,文件的完整会更加好
- 每秒同步一次,可能会丢失一秒的数据
- 从不同步,效率最高
缺点:
- 相对于数据文件来说,aof远远大于rdb,修复的速度也比rdb慢
- aof运行效率比rdb慢,所以redis默认的是rdb
Redis发布订阅
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接受消息。微信,微博,关注系统!
Redis客户端可以订阅任意数量的频道。
订阅/发布消息图:
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
命令
使用场景
-
实时消息系统! -
事实聊天(频道当做聊天室,将信息回显给所有人即可!) -
订阅,关注等 稍微复杂的场景会使用信息中间件MQ等来实现
Redis主从复制
简介
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写入为主,Slave以读为主(读写分离)。
主从复制的主要作用:
- 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
- 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
- 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务,分担服务器负载;尤其是在写少读多的场景下,通过多个节点分担负载,可以大大提高Redis服务器的并发量。
- 高可用基石:主从复制还是哨兵和集群能够实现的基础,因此说主从复制时Redis高可用的基础。
环境配置
只配置从库,不用配置主库
127.0.0.1:6379> info replication
role:master
connected_slaves:0
master_failover_state:no-failover
master_replid:3af9c262dbd2251f336e749c179e0e6f107c7875
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
复制3个redis配置文件,然后修改对应信息
- 端口
- pid名字
- log文件名字
- dum文件名称
修改完毕之后,启动3个redis服务
一主二从
认老大! 一主(6380)二从(6381,6382)
[root@iZbp10ephck4n6u4xij7pcZ myconfig]
127.0.0.1:6381> slaveof 127.0.0.1 6380
OK
127.0.0.1:6381> info replication
role:slave
master_host:127.0.0.1
master_port:6380
master_link_status:up
master_last_io_seconds_ago:1
master_sync_in_progress:0
slave_read_repl_offset:14
slave_repl_offset:14
slave_priority:100
slave_read_only:1
replica_announced:1
connected_slaves:0
master_failover_state:no-failover
master_replid:6e9c6f45e78828ddc39e32d289e553ea0fc25c07
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:14
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:14
127.0.0.1:6380> info replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6381,state=online,offset=196,lag=1
master_failover_state:no-failover
master_replid:6e9c6f45e78828ddc39e32d289e553ea0fc25c07
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:196
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:196
真实的从主配置应该在配置文件中配置,这样的话是永久的,我们这里使用的是命令,暂时的
细节
- 主机可以写,从机只能读
- 主机断开连接,从机依旧能连接到主机,但是没有写操作,主机如果回来了,从机依旧可以直接获取到主机写的
- 如果是使用命令行来配置的主从,这个时候如果重启了,从机就会变回主机,只要变为从机,立马就可以从主机中获取值
复制原理
slave启动成功连接daomaster后会发送sync同步命令
master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送这个数据文件到slave,并完成一次完全同步
全量复制:slave服务在收到数据库文件数据之后,将其存盘并加载到内存中
增量复制:master 陆续将新的所有收集到的修改命令依此传给slave,完成同步
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行,我们的数据一定可以在从机中看到!
主机断开连接
如果主机断开连接,我们可以使用slaveof no one让自己变成主机,手动让其他结点连接到这个主节点,如果主机重新连接,手动连接。
哨兵模式(自动选举老大)
单哨兵模式
哨兵模式是一种特殊的模式,首先Redis 提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行,其原理就是哨兵通过发送命令,等到Redis 服务器响应,从而监控运行的多个Redis 实例。
多哨兵模式
单个哨兵进程对Redis 服务器进行监控时可能会出现一些问题(比如说哨兵挂掉),为此我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。由于一个哨兵就需要一个进程,哨兵集群至少要三个哨兵才能保证健壮性,因此要配置多哨兵,起步就是6个进程。
多哨兵模式中,如果主机宕机了会怎么样?
假设主服务器宕机,哨兵1先检测到这个结果,但是系统并不会马上进行failover (故障转移)过程,因为仅仅是哨兵1主观认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器连接不上了,并且数量达到一定值时(3个哨兵中两个哨兵检测主机不能使用了,那么就认为这个主机挂掉了),那么哨兵之间就会对从机中进行一次选举主机的投票,投票的结果由其中任意一个哨兵(随机)发起,然后进行failover(故障转移)操作,选举成功后就将该从机切换成主机,之后就会通过发布订阅模式(所有的服务都被哨兵监控着),让各个哨兵把自己监控的从服务器实现切换主机,这个过程成为客观下线。
测试
1、配置哨兵模式sentinel.conf (最基本配置)
sentinel monitor myredis 127.0.0.1 6379 1
这里的 ’1‘ 代表的是如果有1个sentinel (哨兵)判断某个master (主机)宕机,那么该主机宕机下线(也就是至少多少个sentinel 同意,master 才下线)
2、开启哨兵
5368:X 20 Jan 2021 20:48:18.240
5368:X 20 Jan 2021 20:48:18.240
5368:X 20 Jan 2021 20:48:18.240
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 6.0.9 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in sentinel mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 26379
| `-._ `._ / _.-' | PID: 5368
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
5368:X 20 Jan 2021 20:48:18.241
5368:X 20 Jan 2021 20:48:18.247
5368:X 20 Jan 2021 20:48:18.247
5368:X 20 Jan 2021 20:48:18.248 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
3、当主机宕机了,哨兵做出什么动作
127.0.0.1:6381> info replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6380,state=online,offset=6230,lag=0
master_replid:99ac637afc72fb3e748e844e0b4a9553cce745c2
master_replid2:5f25a8d52692fcda674137883c0f5912e847645c
master_repl_offset:6230
second_repl_offset:3922
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:6230
4、如果此时宕机的机器重新连接回来,那么他只能归并到当前的master 当作从机(大人,时代变了!)
+convert-to-slave slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6381
哨兵优缺点
优点
- 哨兵集群,基于主从复制模式,所有的主从配置的优点,它都有。
- 主从可以切换,故障可以转移,系统的可用性就会更好。
- 哨兵模式就是主从模式的升级版,从收到到自动,更加健壮。
缺点
- Redis不好在线扩容,集群容量一旦达到上限,在线扩容就会十分麻烦。
- 实现哨兵模式的配置比较麻烦,并且其中有很多选项
哨兵模式配置文件中的全部配置
port 26379
dir /tmp
sentinel monitor mymaster 127.0.0.1 6379 2
sentine1 auth-pass mymaster MySUPER--secret-0123passwOrd
sentine1 down-after-mi 11iseconds mymaster 30000
sentine1 paralle1-syncs mymaster 1
sentine1 fai lover-ti meout mymaster 180000
sentine1 notificati on-script mymaster /var/redis/notify. sh
sentine1 client-reconfig-script mymaster /var/redis/reconfig.sh
浅谈Redis缓存穿透、击穿、雪崩
Redis缓存使用流程
客户端向服务器发送读请求,此时后台会先去缓存中查数据,如果数据命中,那么返回结果,反之去数据库中查询,如果数据库中查到数据,那么返回数据,并且写入缓存,如果没有查到该数据即返回空结果
缓存穿透(数据未命中)
概念
缓存穿透的概念很简单,用户想要查询一个数据,发现Redis缓存中没有,也就是缓存没有命中,于是就向数据库查询,然后发现也没有,于是本次查询失败。当用户很多的时候(秒杀场景),如果在缓存都没有命中,于是都去请求了数据库DB,一瞬间就给数据库造成巨大的压力,这种情况就是常说的缓存穿透。
解决方案(简单介绍)
1、布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。
2、缓存空对象
当存储层不命中时,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,从而保护了后端数据源。
但是这种方式存在两个问题:
- 如果空值能够被缓存起来,这就意味着需要更多的空间存储更多的键,因为其中可能会有很多空值的键。
- 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口不一致的问题,这对于需要保持一致的业务会有影响。
缓存击穿(热点key失效,大量数据集中于一点)
概述
这里和缓存穿透不同,缓存穿透是查询不到数据,所以都去查询数据库,给数据库造成很大压力;而缓存击穿是一个key十分热点(比如微博某明星热搜),缓存在不停地抗着高并发,始终对一个点访问,当这个缓存数据失效后,大量的高并发请求穿破缓存,在很短时间去请求数据库(就好比一梭子子弹打容器上),就像在屏幕中凿了个洞。
当这个热点key失效过期后,大量并发请求前往数据库查询最新数据,查询完后并写入缓存,会使数据库压力过大。
解决方案
1、设置热点数据永不过期 从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。
2、加互斥锁 分布式锁:使用分布式锁,保证对每个key同时只有一个线程去查询后端服务,其它线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩(大量的key几乎同时过期)
概述
缓存雪崩是因为大面积的缓存在一个时间点集中失效,打崩了数据库。
举个简单的例子:如果所有首页的Key失效时间都是12小时,中午12点刷新的,在零点的时候有个秒杀活动会有大量用户涌入,假设当时每秒 6000 个请求,本来缓存在可以扛住每秒 5000 个请求,但是这个时候缓存所有的Key都失效了。此时 1 秒 6000 个请求全部落数据库,数据库必然扛不住,它会报一下警,真实情况可能DBA(管理员)都没反应过来就直接挂了。此时,如果没用什么特别的方案来处理这个故障,DBA 很着急,重启数据库,但是数据库立马又被新的流量给打死了。这就是缓存雪崩。
解决方案
1、Redis高可用 这个思想的含义就是:既然Redis有可能会挂掉,那我多增设几台Redis,这样一台挂掉了之后其它的还可以继续工作,高可用其实就是搭建的Redis集群。将热点数据均匀分布在不同的Redis库中。
2、限流降级 这个解决方案的思想就是:在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其它线程等待。
3、数据预热 数据预热的含义就是在正式部署之前,先把可能高访问的数据预先访问一遍,这样可能大部分的数据就能加载到缓存中。在即将发生大并发访问前手动触发加载缓存中不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀一点。
4、在批量往Redis存数据的时候,把每个key的失效时间都加个随机值,这样保证数据不会在同一时间大面积失效。 要查询一个数据,发现Redis缓存中没有,也就是缓存没有命中,于是就向数据库查询,然后发现也没有,于是本次查询失败。当用户很多的时候(秒杀场景),如果在缓存都没有命中,于是都去请求了数据库DB,一瞬间就给数据库造成巨大的压力,这种情况就是常说的缓存穿透。
解决方案(简单介绍)
1、布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。
2、缓存空对象
当存储层不命中时,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,从而保护了后端数据源。
但是这种方式存在两个问题:
- 如果空值能够被缓存起来,这就意味着需要更多的空间存储更多的键,因为其中可能会有很多空值的键。
- 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口不一致的问题,这对于需要保持一致的业务会有影响。
缓存击穿(热点key失效,大量数据集中于一点)
概述
这里和缓存穿透不同,缓存穿透是查询不到数据,所以都去查询数据库,给数据库造成很大压力;而缓存击穿是一个key十分热点(比如微博某明星热搜),缓存在不停地抗着高并发,始终对一个点访问,当这个缓存数据失效后,大量的高并发请求穿破缓存,在很短时间去请求数据库(就好比一梭子子弹打容器上),就像在屏幕中凿了个洞。
当这个热点key失效过期后,大量并发请求前往数据库查询最新数据,查询完后并写入缓存,会使数据库压力过大。
解决方案
1、设置热点数据永不过期 从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。
2、加互斥锁 分布式锁:使用分布式锁,保证对每个key同时只有一个线程去查询后端服务,其它线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩(大量的key几乎同时过期)
概述
缓存雪崩是因为大面积的缓存在一个时间点集中失效,打崩了数据库。
举个简单的例子:如果所有首页的Key失效时间都是12小时,中午12点刷新的,在零点的时候有个秒杀活动会有大量用户涌入,假设当时每秒 6000 个请求,本来缓存在可以扛住每秒 5000 个请求,但是这个时候缓存所有的Key都失效了。此时 1 秒 6000 个请求全部落数据库,数据库必然扛不住,它会报一下警,真实情况可能DBA(管理员)都没反应过来就直接挂了。此时,如果没用什么特别的方案来处理这个故障,DBA 很着急,重启数据库,但是数据库立马又被新的流量给打死了。这就是缓存雪崩。
解决方案
1、Redis高可用 这个思想的含义就是:既然Redis有可能会挂掉,那我多增设几台Redis,这样一台挂掉了之后其它的还可以继续工作,高可用其实就是搭建的Redis集群。将热点数据均匀分布在不同的Redis库中。
2、限流降级 这个解决方案的思想就是:在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其它线程等待。
3、数据预热 数据预热的含义就是在正式部署之前,先把可能高访问的数据预先访问一遍,这样可能大部分的数据就能加载到缓存中。在即将发生大并发访问前手动触发加载缓存中不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀一点。
4、在批量往Redis存数据的时候,把每个key的失效时间都加个随机值,这样保证数据不会在同一时间大面积失效。
|