Redis入门
Linux安装
下载地址 http://download.redis.io/releases/redis-5.0.7.tar.gz
安装步骤
1、下载获得 redis-5.0.7.tar.gz 后将它放到我们Linux的目录下 /opt
2、/opt 目录下,解压命令 : tar -zxvf redis-5.0.7.tar.gz
3、解压完成后出现文件夹:redis-5.0.7
4、进入目录: cd redis-5.0.7
5、在 redis-5.0.7 目录下执行 make 命令
运行make命令时故意出现的错误解析:
1. 安装gcc (gcc是linux下的一个编译程序,是c程序的编译工具)
能上网: yum install gcc-c++
版本测试: gcc-v
2. 二次make
3. Jemalloc/jemalloc.h: 没有那个文件或目录
运行 make distclean 之后再make
4. Redis Test(可以不用执行)
6、如果make完成后继续执行 make install
7、查看默认安装目录:usr/local/bin
/usr 这是一个非常重要的目录,类似于windows下的Program Files,存放用户的程序
8、拷贝配置文件(备用)
cd /usr/bin
ls -l
# 在redis的解压目录下备份redis.conf
mkdir myredis
cp redis.conf myredis
# 拷一个备份,养成良好的习惯,我们就修改这个文件
# 修改配置保证可以后台应用
vim redis.conf
daemonize 设置yes或者no区别
-
daemonize:yes
- redis采用的是单进程多线程的模式。当redis.conf中选项daemonize设置成yes时,代表开启守护进程模式。在该模式下,redis会在后台运行,并将进程pid号写入至redis.conf选项pidfifile设置的文件中,此时redis将一直运行,除非手动kill该进程。
-
daemonize:no
- 当daemonize选项设置成no时,当前界面将进入redis的命令行界面,exit强制退出或者关闭连接工具(putty,xshell等)都会导致redis进程退出。
9、测试一下!
[root@VM-0-5-centos ~]
[root@VM-0-5-centos bin]
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set k1 hellword
OK
127.0.0.1:6379> get k1
"hellword"
[root@VM-0-5-centos /]
redis 7955 1 0 00:12 ? 00:00:00 /www/server/redis/src/redis-server 127.0.0.1:6379
root 8322 4825 0 00:13 pts/0 00:00:00 redis-cli -p 6379
root 15654 10426 0 00:16 pts/2 00:00:00 grep
127.0.0.1:6379> shutdown
not connected> exit
[root@192 myredis]
root 16140 16076 0 04:53 pts/2 00:00:00 grep
基础说明
准备工作:开启redis服务,客户端连接
redis压力测试工具-----Redis-benchmark
Redis-benchmark是官方自带的Redis性能测试工具,可以有效的测试Redis服务的性能。
redis 性能测试工具可选参数如下所示:
redis-benchmark -h localhost -p 6379 -c 100 -n 100000
====== SET ======
100000 requests completed in 1.88 seconds
100 parallel clients
3 bytes payload
keep alive: 1
17.05% <= 1 milliseconds
97.35% <= 2 milliseconds
99.97% <= 3 milliseconds
100.00% <= 3 milliseconds
基础数据库常识
默认16个数据库,类似数组下标从零开始,初始默认为零号库
查看 redis.conf ,里面有默认的配置
databases 16
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
Select命令切换数据库
127.0.0.1:6379> select 7
OK
127.0.0.1:6379[7]>
# 不同的库可以存不同的数据
Dbsize查看当前数据库的key数量
127.0.0.1:6379> select 7
OK
127.0.0.1:6379[7]> dbsize
(integer) 0
127.0.0.1:6379[7]> SELECT 0
OK
127.0.0.1:6379> DBSIZE
(integer) 1
127.0.0.1:6379> KEYS *
1) "k1"
Flushdb:清空当前库
Flushall:清空全部的库
127.0.0.1:6379> DBSIZE
(integer) 1
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> DBSIZE
(integer) 0
为什么默认端口是6379呢? 粉丝效应
redis是单线程的
我们首先要明白,Redis很快!官方表示,因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了!
6Redis采用的是基于内存的采用的是单进程单线程模型的 KV 数据库,由C语言编写,官方提供的数据是可以达到100000+的QPS(每秒内查询次数)。这个数据不比采用单进程多线程的同样基于内存的 KV数据库 Memcached 差!
Redis为什么单线程还这么快?
1.误区: 高性能的服务器一定是多线程的?
2.误区: 多线程(CPU上下文切换) 一定比单线程效率高
CPU>内存>硬盘速度(juc)
核心: redis是将所有的数据全部放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!!!) ,对于内存系统来说,如果没有上下文切换效率就是最高的! 多次读写对都是在一个CPU上的,在内存情况下,这个就是最佳的方案
五大数据类型
Redis是一个开放源代码(BSD许可)的内存中数据结构存储,用作数据库,缓存和消息代理。它支持数
据结构,例如字符串,哈希,列表,集合,带范围查询的排序集合,位图,超日志,带有半径查询和流
的地理空间索引。Redis具有内置的复制,Lua脚本,LRU驱逐,事务和不同级别的磁盘持久性,并通过
Redis Sentinel和Redis Cluster自动分区提供了高可用性。
String(字符串类型)
String是redis最基本的类型,你可以理解成Memcached一模一样的类型,一个key对应一个value
String类型时二进制安全的,意思是redis的String可以包含任何数据,比如jpg图片或者序列化的对象。
String类型时redis最基本的数据类型,一个redis中字符串value最多可以是512M
Hash(哈希,类似Java里的Map)
Redis hash是一个键值对集合
Redis hash是一个String类型的field和value的映射表,hash特别适合用于存储对象。
类似于Java里面的Map<String, Object>
List(列表)
Redis列表是简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)
它的底层实际是个链表
Set(集合)
Redis的Set是String类型的无序集合,它是通过HashTable实现的!
Zset(sorted set:有序集合)
Redis zset和set一样,也是String类型元素的集合,且不允许重复的成员。
不同的是每个元素都会关联一个double类型的分数
Redis正是通过分数来为集合的成员进行从小到大的排序,zset的成员是唯一的,但是分数(Score)且可以重复。
Redis键(key)
1.exists name # 判断当前key是否存在 存在为1 反之为0
2.move name # 移除当前key
3.expire name 10 # 设置key的过期时间,单位是秒
4.ttl name # 查看当前key的剩余时间
5.type name # 查看key的类型
127.0.0.1:6379> set name bingning
OK
127.0.0.1:6379> KEYS *
1) "name"
127.0.0.1:6379> EXISTS name
(integer) 1
127.0.0.1:6379> EXISTS name1
(integer) 0
127.0.0.1:6379> move name 1
(integer) 1
127.0.0.1:6379> key *
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> EXPIRE name 20
(integer) 1
127.0.0.1:6379> TTL name
(integer) 15
127.0.0.1:6379> TTL name
(integer) 13
127.0.0.1:6379> TTL name
(integer) -2
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> set name bingnign
OK
127.0.0.1:6379> TYPE name
string
String
单值Value
常规命令说明:
-
append name “shanjiao” # 追加字符串,如果当前key不存在相当于setkey -
strlen name # 查看指定key的长度 -
incr views # views自增1 -
decr views # views自减1 -
incrby views 10 # 自定义设置自增步长 -
decrby views 5 # 自定义设置自减步长 -
getrange 获取指定区间范围内的值 -
setrange 设置指定区间范围内的值,格式是setrange key值 -
setex(set with expire)键秒值 setnx(set if not exist) -
mset Mset 命令用于同时设置一个或多个 key-value 对。 -
mget Mget 命令返回所有(一个或多个)给定 key 的值。 -
msetnx 当所有 key 都成功设置,返回 1 -
这里的key是一个巧妙的设计: user:{id} {filed},如此设计在redis中完全可以 -
getset # 先get再set
127.0.0.1:6379> set name bingnign
OK
127.0.0.1:6379> get name
"bingnign"
127.0.0.1:6379> DEL name
(integer) 1
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> EXISTS name
(integer) 0
127.0.0.1:6379> append name "hello"
(integer) 5
127.0.0.1:6379> get name
"hello"
127.0.0.1:6379> append name "-4243"
(integer) 10
127.0.0.1:6379> get name
"hello-4243"
127.0.0.1:6379> STRLEN name
(integer) 10
127.0.0.1:6379> set views 0
OK
127.0.0.1:6379> INCR views
(integer) 1
127.0.0.1:6379> INCR views
(integer) 2
127.0.0.1:6379> decr views
(integer) 1
127.0.0.1:6379> incrby views 19
(integer) 20
127.0.0.1:6379> decrby views 20
(integer) 0
127.0.0.1:6379> set key2 fdshhe23
OK
127.0.0.1:6379> getrange key2 0 -1
"fdshhe23"
127.0.0.1:6379> getrange key2 0 5
"fdshhe"
127.0.0.1:6379> get key2
"fdshhe23"
127.0.0.1:6379> SETRANGE key2 1 fds
(integer) 8
127.0.0.1:6379> get key2
"ffdshe23"
127.0.0.1:6379> SETRANGE key2 3 fdsetrtd
(integer) 11
127.0.0.1:6379> get key2
"ffdfdsetrtd"
127.0.0.1:6379> setex key3 60 expire
OK
127.0.0.1:6379> ttl key3
(integer) 53
127.0.0.1:6379> get key3
"expire"
127.0.0.1:6379> setnx mykey "redis"
(integer) 1
127.0.0.1:6379> setnx mykey "mongodb"
(integer) 0
127.0.0.1:6379> get mykey
"redis"
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3
OK
127.0.0.1:6379> keys *
1) "k2"
2) "k3"
3) "k1"
127.0.0.1:6379> mget k1 k2 k3 k4
1) "v1"
2) "v2"
3) "v3"
4) (nil)
127.0.0.1:6379> msetnx k1 k4 k5 k6
(integer) 0
127.0.0.1:6379> get key5
(nil)
127.0.0.1:6379> set user:1 value
OK
127.0.0.1:6379> get user:1
"value"
set user:1 {name:zhangsan,age:3}
127.0.0.1:6379> mset user:1:name zhangsan user:1:age 2
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "2"
127.0.0.1:6379> get db
"mongodb"
127.0.0.1:6379> getset db redis
"mongodb"
127.0.0.1:6379> get db
"redis"
String数据结构是简单的key-value类型,value其实不仅可以是String,也可以是数字。
常规key-value缓存应用:
常规计数:微博数,粉丝数等
列表List
Lpush :将一个或多个值插入到列表头部。(左)rpush :将一个或多个值插入到列表尾部。(右)lrange :返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。lpop 命令用于移除并返回列表的第一个元素。当列表 key 不存在时,返回 nil 。rpop 移除列表的最后一个元素,返回值为移除的元素。Lindex ,按照索引下标获得元素(-1代表最后一个,0代表是第一个)llen 用于返回列表的长度。lrem key 根据参数 COUNT 的值,移除列表中与参数 VALUE 相等的元素。rpoplpush 移除列表的最后一个元素,并将该元素添加到另一个列表并返回。lset key index value 将列表 key 下标为 index 的元素的值设置为 valuelinsert key before/after pivot value 用于在列表的元素前或者后插入元素。 将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。
127.0.0.1:6379> lpush list "ont"
(integer) 1
127.0.0.1:6379> LPUSH list "tpw" "fed"
(integer) 3
127.0.0.1:6379> rpush list "right"
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "fed"
2) "tpw"
3) "ont"
4) "right"
127.0.0.1:6379> lrange list 0 1
1) "fed"
2) "tpw"
127.0.0.1:6379> lrange list 0 1
1) "fed"
2) "tpw"
127.0.0.1:6379> LPOP list
"fed"
127.0.0.1:6379> rpop list
"right"
127.0.0.1:6379> lrange list 0 -1
1) "tpw"
2) "ont"
127.0.0.1:6379> lindex list 1
"ont"
127.0.0.1:6379> lindex list 0
"tpw"
127.0.0.1:6379> lindex list -1
"ont"
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> lpush list "one" "two" "three"
(integer) 3
127.0.0.1:6379> LLEN list
(integer) 3
127.0.0.1:6379> lrem mylist 1 "hello"
(integer) 1
127.0.0.1:6379> lrange mylist 0 -1
1) "foo"
127.0.0.1:6379> rpush mylist "hello" "hello1" "hello2" "hello3"
(integer) 4
127.0.0.1:6379> LTRIM mylist 1 2
OK
127.0.0.1:6379> lrange mylist 0 -1
1) "hello1"
2) "hello2"
127.0.0.1:6379> rpush mylist "hello" "foo" "bar"
(integer) 3
127.0.0.1:6379> RPOPLPUSH mylist myotherlist
"bar"
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) "foo"
127.0.0.1:6379> lrange myotherlist 0 -1
1) "bar"
127.0.0.1:6379> exists list
(integer) 0
127.0.0.1:6379> lset list 0 item
(error) ERR no such key
127.0.0.1:6379> lpush list "valuel"
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "valuel"
127.0.0.1:6379> lset list 0 "new"
OK
127.0.0.1:6379> lrange list 0 0
1) "new"
127.0.0.1:6379> liset list 1 "new"
(error) ERR unknown command `liset`, with args beginning with: `list`, `1`, `new`,
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "world"
(integer) 2
127.0.0.1:6379> LINsert mylist before "world" "Three"
(integer) 3
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) "Three"
3) "world"
小结
- 他实际上是一个链表,before Bode after, left, right 都可以插入值
- 如果key不存在,创建新的链表
- 如果key存在,新增内容
- 如果移除了所有值,空链表,也就代表不存在
- 在两边插入或者改动值,效率最高!中间元素,相对俩手效率会低一点~
消息队列,消息排队!(Lpush Rpop) , 栈 (Lpush Lpop)!
集合Set
-
sadd 将一个或多个成员元素加入到集合中,不能重复 -
smembers 返回集合中的所有的成员。 -
sismember 命令判断成员元素是否是集合的成员。 -
scard,获取集合里面的元素个数 -
srem key value 用于移除集合中的一个或多个成员元素 -
srandmember key 命令用于返回集合中的一个或多个随机元素。 -
spop key 用于移除集合中的指定 key 的一个或多个随机元素 -
smove SOURCE DESTINATION MEMBER -
将指定成员 member 元素从 source 集合移动到 destination 集合。 -
数字集合类
- 差集: sdiff
- 交集: sinter
- 并集: sunion
127.0.0.1:6379> sadd myset "hello"
(integer) 1
127.0.0.1:6379> sadd myset "kung"
(integer) 1
127.0.0.1:6379> sadd myset "kung"
(integer) 0
127.0.0.1:6379> SMEMBERS myset
1) "kung"
2) "hello"
127.0.0.1:6379> SISMEMBER myset "hello"
(integer) 1
127.0.0.1:6379> SISMEMBER myset "wo"
(integer) 0
127.0.0.1:6379> scard myset
(integer) 2
127.0.0.1:6379> srem myset "kung"
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "hello"
127.0.0.1:6379> SMEMBERS myset
1) "bingbing"
2) "world"
3) "hello"
127.0.0.1:6379> SRANDMEMBER myset
"hello"
127.0.0.1:6379> SRANDMEMBER myset 2
1) "hello"
2) "world"
127.0.0.1:6379> SRANDMEMBER myset 2
1) "bingbing"
2) "hello"
127.0.0.1:6379> SMEMBERS myset
1) "bingbing"
2) "world"
3) "hello"
127.0.0.1:6379> spop myset
"hello"
127.0.0.1:6379> spop myset 2
1) "bingbing"
2) "world"
127.0.0.1:6379> SMEMBERS myset
(empty array)
127.0.0.1:6379> sadd myset "hello" "world" "bingning"
(integer) 3
127.0.0.1:6379> sadd myset "set2"
(integer) 1
127.0.0.1:6379> smove myset myset2 "bingning"
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "set2"
2) "world"
3) "hello"
127.0.0.1:6379> SMEMBERS myset2
1) "bingning"
- 数字集合类
- 差集: sdiff
- 交集: sinter
- 并集: sunion
127.0.0.1:6379> sadd key1 "a" "b" "c"
(integer) 3
127.0.0.1:6379> sadd key2 "c" "d" "e"
(integer) 3
127.0.0.1:6379> sdiff key1 key2
1) "a"
2) "b"
127.0.0.1:6379> sinter key1 key2
1) "c"
127.0.0.1:6379> SUNION key1 key2
1) "b"
2) "c"
3) "a"
4) "e"
5) "d"
微博,b站 将A用户的所有关注的人放到set集合中,将他的粉丝也放在集合中
共同关注,共同爱好,二度好友,六度分割理论
哈希Hash
kv模式不变,但v是一个键值对
- hset、hget 命令用于为哈希表中的字段赋值 。
- hmset、hmget 同时将多个field-value对设置到哈希表中。会覆盖哈希表中已存在的字段。
- hgetall 用于返回哈希表中,所有的字段和值。
- hdel 用于删除哈希表 key 中的一个或多个指定字段
- hlen 获取哈希表中字段的数量。
- hexists 查看哈希表的指定字段是否存在。
- hkeys 获取哈希表中的所有域(field)。
- hvals 返回哈希表所有域(field)的值。
- hincrby 为哈希表中的字段值加上指定增量值
- hsetnx 为哈希表中不存在的的字段赋值 。
127.0.0.1:6379> hset myhash field1 "bingnign" field2 "we"
(integer) 2
127.0.0.1:6379> hget myhash field1
"bingnign"
127.0.0.1:6379> hmset myhash field1 "hello" field2 "worj"
OK
127.0.0.1:6379> hgetall myhash
1) "field1"
2) "hello"
3) "field2"
4) "worj"
127.0.0.1:6379> hdel myhash field1
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "worj"
127.0.0.1:6379> hlen myhash
(integer) 1
127.0.0.1:6379> hset myhash field1 "gs"
(integer) 1
127.0.0.1:6379> hlen myhash
(integer) 2
127.0.0.1:6379> hexists myhash field1
(integer) 1
127.0.0.1:6379> hexists myhash gile
(integer) 0
127.0.0.1:6379> HKEYS myhash
1) "field2"
2) "field1"
127.0.0.1:6379> hvals myhash
1) "worj"
2) "gs"
127.0.0.1:6379> hset myhash filed1 10
(integer) 1
127.0.0.1:6379> HINCRBY myhash filed1 1
(integer) 11
127.0.0.1:6379> hincrby myhash filed1 -1
(integer) 10
127.0.0.1:6379> hsetnx myhash filed1 "gsd"
(integer) 1
127.0.0.1:6379> hsetnx myhash filed1 "work"
(integer) 0
127.0.0.1:6379> hget myhash filed1
"gsd"
hash变更的数据user name age ,尤其是用户信息之类的,经常变动的信息! hash更适合于对象的存储,String更加适合字符串存储
有序集合Zset
在set基础上,加一个score值。之前set是k1 v1 v2 v3,现在zset是 k1 score1 v1 score2 v2
zadd 将一个或多个成员元素及其分数值加入到有序集当中。zrange 返回有序集中,指定区间内的成员zrangebyscore 返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大) 次序排列。zrevrange 返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大) 次序排列。zcard 命令用于计算集合中元素的数量。count 计算有序集合中指定分数区间的成员数量。zrank 返回有序集中指定成员的排名。其中有序集成员按分数值递增(从小到大)顺序排列。zrevrank 返回有序集中成员的排名。其中有序集成员按分数值递减(从大到小)排序。
127.0.0.1:6379> zadd myzset 1 "one" 2 "two"
(integer) 2
127.0.0.1:6379> zrange myzset 0 -1
1) "one"
2) "two"
127.0.0.1:6379> zadd salary 1000 "zan" 1500 "bingni" 500 "xiaoq"
(integer) 3
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf
1) "xiaoq"
2) "zan"
3) "bingni"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores
1) "xiaoq"
2) "500"
3) "zan"
4) "1000"
5) "bingni"
6) "1500"
127.0.0.1:6379> ZREVRANGE salary 0 -1 withscores
1) "bingni"
2) "1500"
3) "zan"
4) "1000"
5) "xiaoq"
6) "500"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 1000 withscores
1) "xiaoq"
2) "500"
3) "zan"
4) "1000"
127.0.0.1:6379> ZCARD salary
(integer) 3
127.0.0.1:6379> zadd myset1 1 "jl" 2 "j2" 3 "j3"
(integer) 3
127.0.0.1:6379> zcount myset1 1 3
(integer) 3
127.0.0.1:6379> zcount myset1 1 2
(integer) 2
127.0.0.1:6379> zrange salary 0 -1 withscores
1) "xiaoq"
2) "500"
3) "zan"
4) "1000"
5) "bingni"
6) "1500"
127.0.0.1:6379> zrank salary zan
(integer) 1
127.0.0.1:6379> zrank salary bingni
(integer) 2
127.0.0.1:6379> ZREVRAnk salary zan
(integer) 1
127.0.0.1:6379> ZREVRAnk salary xiaoq
(integer) 2
和set相比,sorted set增加了一个权重参数score,使得集合中的元素能够按score进行有序排列,比如一个存储全班同学成绩的sorted set,其集合value可以是同学的学号,而score就可以是其考试得分,这样在数据插入集合的时候,就已经进行了天然的排序。可以用sorted set来做带权重的队列,比如普通消息的score为1,重要消息的score为2,然后工作线程可以选择按score的倒序来获取工作任务。让重要的任务优先执行。
排行榜应用,取TOP N操作 !
三种特殊数据类型
GEO地理位置
朋友的定位,附近的人,打车距离计算?
redis的 Geo 在redis3.2版本就推出了,这个功能可以推算地理位置的信息, 两地之间的距离,方圆几里的人
可以查询一些测试数据:http://www.jsons.cn/lngcode/
只有六个命令
geoadd
geoadd key longitude latitude member ...
127.0.0.1:6379> geoadd china:city 116.23 40.22 北京
(integer) 1
127.0.0.1:6379> geoadd china:city 121.48 31.40 上海 113.88 22.55 深圳 120.21 30.20 杭州
(integer) 3
127.0.0.1:6379> geoadd china:city 106.54 29.40 重庆 108.93 34.23 西安 114.02 30.58 武汉
(integer) 3
测试:百度搜索经纬度查询,模拟真实数据
geopos
解析:
geopos key member [member...]
测试:
127.0.0.1:6379> GEOPOS china:city 北京
1) 1) "116.23000055551528931"
2) "40.2200010338739844"
127.0.0.1:6379> GEOPOS china:city 上海 重庆
1) 1) "121.48000091314315796"
2) "31.40000025319353938"
2) 1) "106.54000014066696167"
2) "29.39999880018641676"
geodist
解析:
geodist key member1 member2 [unit]
测试:
127.0.0.1:6379> geodist china:city 北京 上海
"1088785.4302"
127.0.0.1:6379> geodist china:city 重庆 北京 km
"1491.6716"
georadius
解析:
georadius key longitude latitude radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count]
测试:重新连接 redis-cli,增加参数 --raw ,可以强制输出中文,不然会乱码
[root@VM-0-5-centos bin]
[root@VM-0-5-centos bin]
127.0.0.1:6379> georadius china:city 100 30 1000 km
重庆
127.0.0.1:6379> georadius china:city 100 30 1000 km withdist
重庆
635.2850
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord
重庆
106.54000014066696167
29.39999880018641676
127.0.0.1:6379> georadius china:city 100 30 1000 km withdist withcoord count 2
重庆
635.2850
106.54000014066696167
29.39999880018641676
georadiusbymember
解析:
georadiusbymember key member radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count]
测试:
127.0.0.1:6379> GEORADIUSBYMEMBER china:city 上海 1000 km
上海
杭州
geohash
解析:
geohash key member [member...]
测试:
127.0.0.1:6379> geohash china:city 上海 北京
wtw6sk5n300
wx4sucu47r0
127.0.0.1:6379> geohash china:city 北京 杭州
wx4sucu47r0
wv7gu1xzg00
zrem
GEO没有提供删除成员的命令,但是因为GEO的底层实现是zset,所以可以借用zrem命令实现对地理位置信息的删除.
127.0.0.1:6379> geoadd china:city 116.23 40.22 beijin
1
127.0.0.1:6379> zrange china:city 0 -1
杭州
beijin
127.0.0.1:6379> zrem china:city beijin
1
127.0.0.1:6379> zrange china:city 0 -1
杭州
HyperLogLog(求基数)
什么是基数?
比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。
基数估计就是在误差可接受的范围内,快速计算基数。
简介
Redis 2.8.9 版本就跟新了Hyperloglog数据结构!
Redis Hyperloglog 基数统计的算法!
优点: 占用的内存是固定的, 2^64 不同的元素的基数,只需要12kb的内存即可,如果从内存角度来比较,且允许有误差(0.81%)Hyperloglog首选
网页的UV(一个人访问一个网站多次,但是还是算作一个人!)
传统的方式,set保存用户的id ,然后就可以统计set中元素数量作为标准!
set集合(无序,不重复)
这个方式如果保存大量的用户id,就会比较麻烦! 我们的目的是为了技术,而不是保存用户id
pfadd 添加
pfcount 计数
pfmerge 合并
测试使用
127.0.0.1:6379> pfadd mykey a b c d e f g h i j
1
127.0.0.1:6379> pfcount mykey
10
127.0.0.1:6379> pfadd mykey2 i j l b o o h
1
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2
OK
127.0.0.1:6379> pfcount mykey3
12
bitmap (判断状态)
统计用户信息(活跃,不活跃) (登录,未登录)
打卡(打卡,未打卡)
两个状态的都可以使用bitmap
bitmap 位图 , 数据结构! 都是操作二进制位来进行记录的,就只有0和1两个状态
1字节 = 8bit
- setbit 设置
- getbit 获取
- bitcount 统计为1的次数
测试:
127.0.0.1:6379> SETBIT login 0 1
0
127.0.0.1:6379> SETBIT login 1 0
0
127.0.0.1:6379> SETBIT login 2 1
0
127.0.0.1:6379> setbit login 3 1
0
127.0.0.1:6379> setbit login 4 0
0
127.0.0.1:6379> setbit login 8 0
0
127.0.0.1:6379> getbit login 2
1
127.0.0.1:6379> getbit login 8
0
127.0.0.1:6379> bitcount login
3
Redis.conf
熟悉基本配置
位置
Redis 的配置文件位于 Redis 安装目录下,文件名为 redis.conf
config get *
配置文件的地址:
我们一般情况下,会单独拷贝出来一份进行操作。来保证初始文件的安全。
Units 单位
1、配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit
2、对 大小写 不敏感
include 包含
和Spring配置文件类似,可以通过includes包含,redis.conf 可以作为总文件,可以包含其他文件!
netword 网络配置
bind 127.0.0.1
ip protected-mode yes
port 6379
general 通用
daemonize yes
supervised no
pidfile /var/run/redis_6379.pid
loglevel notice
logfile ""
databases 16
always-show-logo yes
snapshopting快照
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir ./
REPLICATION 复制 我们后面讲主从复制再给大家讲解!这里先跳过!
SECURITY安全
访问密码的查看,设置和取消
config get requirepass
config set requirepass "123456"
127.0.0.1:6379> ping
NOAUTH Authentication required.
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> ping
PONG
限制
maxclients 10000
maxmemory <bytes>
maxmemory-policy noeviction
append only模式
appendonly no
appendfilename "appendonly.aof"
appendfilename AOF 文件名称 appendfsync everysec
常见配置介绍
1、Redis默认不是以守护进程的方式运行,可以通过该配置项修改,使用yes启用守护进程
daemonize no
2、当Redis以守护进程方式运行时,Redis默认会把pid写入/var/run/redis.pid文件,可以通过pidfifile指
定
pidfifile /var/run/redis.pid
3、指定Redis监听端口,默认端口为6379,作者在自己的一篇博文中解释了为什么选用6379作为默认端口,因为6379在手机按键上MERZ对应的号码,而MERZ取自意大利歌女Alessia Merz的名字
port 6379
4、绑定的主机地址
bind 127.0.0.1
5、当 客户端闲置多长时间后关闭连接,如果指定为0,表示关闭该功能
timeout 300
6、指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为verbose
loglevel verbose
7、日志记录方式,默认为标准输出,如果配置Redis为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给/dev/null
logfifile stdout
8、设置数据库的数量,默认数据库为0,可以使用SELECT 命令在连接上指定数据库id
databases 16
9、指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合
save
Redis默认配置文件中提供了三个条件:
save 900 1
save 300 10
save 60 10000
分别表示900秒(15分钟)内有1个更改,300秒(5分钟)内有10个更改以及60秒内有10000个更改。
10、指定存储至本地数据库时是否压缩数据,默认为yes,Redis采用LZF压缩,如果为了节省CPU时
间,可以关闭该选项,但会导致数据库文件变的巨大
rdbcompression yes
11、指定本地数据库文件名,默认值为dump.rdb
dbfifilename dump.rdb
12、指定本地数据库存放目录
dir ./
13、设置当本机为slav服务时,设置master服务的IP地址及端口,在Redis启动时,它会自动从master进行数据同步
slaveof
14、当master服务设置了密码保护时,slav服务连接master的密码
masterauth
15、设置Redis连接密码,如果配置了连接密码,客户端在连接Redis时需要通过AUTH 命令提供密码,默认关闭
requirepass foobared
16、设置同一时间最大客户端连接数,默认无限制,Redis可以同时打开的客户端连接数为Redis进程可以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时,Redis会关闭新的连接并向客户端返回max number of clients reached错误信息
maxclients 128
17、指定Redis最大内存限制,Redis在启动时会把数据加载到内存中,达到最大内存后,Redis会先尝试清除已到期或即将到期的Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis新的vm机制,会把Key存放内存,Value会存放在swap区
maxmemory
18、指定是否在每次更新操作后进行日志记录,Redis在默认情况下是异步的把数据写入磁盘,如果不开启,可能会在断电时导致一段时间内的数据丢失。因为 redis本身同步数据文件是按上面save条件来同步的,所以有的数据会在一段时间内只存在于内存中。默认为no
appendonly no
19、指定更新日志文件名,默认为appendonly.aof
appendfifilename appendonly.aof
20、指定更新日志条件,共有3个可选值:
no:表示等操作系统进行数据缓存同步到磁盘(快)
always:表示每次更新操作后手动调用fsync()将数据写到磁盘(慢,安全)
everysec:表示每秒同步一次(折衷,默认值)
appendfsync everysec
21、指定是否启用虚拟内存机制,默认值为no,简单的介绍一下,VM机制将数据分页存放,由Redis将访问量较少的页即冷数据swap到磁盘上,访问多的页面由磁盘自动换出到内存中(在后面的文章我会仔细分析Redis的VM机制)
vm-enabled no
22、虚拟内存文件路径,默认值为/tmp/redis.swap,不可多个Redis实例共享
vm-swap-fifile /tmp/redis.swap
23、将所有大于vm-max-memory的数据存入虚拟内存,无论vm-max-memory设置多小,所有索引数据都是内存存储的(Redis的索引数据 就是keys),也就是说,当vm-max-memory设置为0的时候,其实是所有value都存在于磁盘。默认值为0
vm-max-memory 0
24、Redis swap文件分成了很多的page,一个对象可以保存在多个page上面,但一个page上不能被多个对象共享,vm-page-size是要根据存储的 数据大小来设定的,作者建议如果存储很多小对象,page大小最好设置为32或者64bytes;如果存储很大大对象,则可以使用更大的page,如果不 确定,就使用默认值
vm-page-size 32
25、设置swap文件中的page数量,由于页表(一种表示页面空闲或使用的bitmap)是在放在内存中的,,在磁盘上每8个pages将消耗1byte的内存。
vm-pages 134217728
26、设置访问swap文件的线程数,最好不要超过机器的核数,如果设置为0,那么所有对swap文件的操作都是串行的,可能会造成比较长时间的延迟。默认值为4
vm-max-threads 4
27、设置在向客户端应答时,是否把较小的包合并为一个包发送,默认为开启
glueoutputbuf yes
28、指定在超过一定的数量或者最大的元素超过某一临界值时,采用一种特殊的哈希算法
hash-max-zipmap-entries 64
hash-max-zipmap-value 512
29、指定是否激活重置哈希,默认为开启(后面在介绍Redis的哈希算法时具体介绍)
activerehashing yes
30、指定包含其它的配置文件,可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各
个实例又拥有自己的特定配置文件
include /path/to/local.conf
Redos持久化
RDB(Redis DataBase)
在主从复制中,rdb就是备用了!从机上面
什么是RDB
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里.
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中, 带持久化过程都结束了,再用这个临时文件替换上次持久化好的文件. 整个过程中,主进程是不进行任何IO操作的.这就确保了极高的性能.如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感.那RDB方式要比AOF方式更加的高效.RDB的缺点是最后一次持久化后的数据可能丢失. 我们默认的就是RDB, 一般情况下不需要修改这个配置
RDB保存的文件是,dump.rdb都是在我们的配置文件中快照中进行配置的!
触发机制
- save 的规则满足的情况下,会自动触发rdb规则
- 执行flushall命令 , 也会触发我们的rdb 规则!
- 退出redis,也会产生rdb文件!
备份就会自动生成一个dump.rdb
如何恢复rdb文件!
-
只需要将rdb 文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb恢复其中的数据! -
查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin"
- 默认的配置就可以了
优点&缺点
优点:
- 适合大规模的数据恢复!
- 对数据的完整性要求不高!
缺点:
- 需要一定的时间间隔进行操作! 如果redis意外宕机了,这个最后一个修改的数据就没有了!
- fork进程的时候,会占用一定的内容空间!
AOF(Append Only File)
将我们的所有命令都记录下来,history , 恢复的时候就把这个文件全部在执行一遍
是什么
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
Aof保存的是 appendonly.aof 文件
配置
append
默认是不开启的,我们需要手动进行配置! 我们只需要将appendonly 改为yes就开启了aof!
重启,redis 就可以生效了!
如果这个aof文件有错位,这时候 redis是启动不起来的,我们需要修复这个aof文件
redis 给我提供了一个工具redis-check-aof --fix
如果文件正常,重启就可以直接恢复了!
AOF 启动/修复/恢复
正常恢复:
异常恢复:
Rewrite
是什么:
AOF 采用文件追加方式,文件会越来越大,为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis 就会启动AOF 文件的内容压缩,只保留可以恢复数据的最小指令集,可以使用命令 bgrewriteaof !
重写原理:
AOF 文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),遍历新进程的内存中数据,每条记录有一条的Set语句。重写aof文件的操作,并没有读取旧的aof文件,这点和快照有点类似!
触发机制:
Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的已被且文件大
于64M的触发。
优点和缺点
优点:
1、每修改同步:appendfsync always 同步持久化,每次发生数据变更会被立即记录到磁盘,性能较差
但数据完整性比较好
2、每秒同步: appendfsync everysec 异步操作,每秒记录 ,如果一秒内宕机,有数据丢失
3、不同步: appendfsync no 从不同步
缺点:
1、相同数据集的数据而言,aof 文件要远大于 rdb文件,恢复速度慢于 rdb。
2、Aof 运行效率要慢于 rdb,每秒同步策略效率较好,不同步效率和rdb相同。
总结
1、RDB 持久化方式能够在指定的时间间隔内对你的数据进行快照存储
2、AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。
3、只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
4、同时开启两种持久化方式
5、性能建议
-
因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留 save 900 1 这条规则。 -
如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。 -
如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。
Redis事务
理论
Redis事务的概念:
Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。
总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。
Redis事务没有隔离级别的概念:
批量操作在发送 EXEC 命令前被放入队列缓存,并不会被实际执行!
Redis不保证原子性:
Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。
Redis事务的三个阶段:
Redis事务相关命令:
watch key1 key2 ...
multi
exec
discard
unwatch
实践
正常执行
放弃事务
若在事务队列中存在命令性错误(类似于java编译性错误),则执行EXEC命令时,所有命令都不会执行
若在事务队列中存在语法性错误(类似于java的1/0的运行时异常),则执行EXEC命令时,其他正确命令会被执行,错误命令抛出异常。
Watch监控
悲观锁:
悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿到这个数据就会block直到它拿到锁。传统的关系型数据库里面就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在操作之前先上锁。
**乐观锁:**乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁。但是在更新的时候会判断一下再此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读的应用类型,这样可以提高吞吐量,乐观锁策略:提交版本必须大于记录当前版本才能执行更新
测试:
1、初始化信用卡可用余额和欠额
127.0.0.1:6379> set balance 100
OK
127.0.0.1:6379> set debt 0
OK
2、使用watch检测balance,事务期间balance数据未变动,事务执行成功
127.0.0.1:6379> watch balance
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> decrby balance 20
QUEUED
127.0.0.1:6379> incrby debt 20
QUEUED
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20
3、使用watch检测balance,事务期间balance数据变动,事务执行失败!
127.0.0.1:6379> watch balance OK127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> decrby balance 20
QUEUED
127.0.0.1:6379> incrby debt 20
QUEUED
127.0.0.1:6379> exec
(nil)
127.0.0.1:6379> get balance
"80"
127.0.0.1:6379> set balance 200
OK
127.0.0.1:6379> UNWATCH
OK
127.0.0.1:6379> watch balance
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> decrby balance 20
QUEUED
127.0.0.1:6379> incrby debt 20
QUEUED 127.0.0.1:6379> exec
1) (integer) 180
2) (integer) 40
说明:
一但执行 EXEC 开启事务的执行后,无论事务使用执行成功, WARCH 对变量的监控都将被取消。
故当事务执行失败后,需重新执行WATCH命令对变量进行监控,并开启新的事务进行操作。
小结
watch指令类似于乐观锁,在事务提交时,如果watch监控的多个KEY中任何KEY的值已经被其他客户端更改,则使用EXEC执行事务时,事务队列将不会被执行,同时返回Nullmulti-bulk应答以通知调用者事务执行失败。
Redis发布订阅
是什么
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
Redis 客户端可以订阅任意数量的频道。
订阅/发布消息图:
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的
关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
命令
这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。
测试
以下实例演示了发布订阅是如何工作的。在我们实例中我们创建了订阅频道名为redisChat :
redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1
现在,我们先重新开启个redis客户端,然后在同一个频道redisChat发布两次消息,订阅者就能接收到消息。
redis 127.0.0.1:6379> PUBLISH redisChat "Hello,Redis"
(integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat "Hello,Kuangshen"
(integer) 1
1) "message"
2) "redisChat"
3) "Hello,Redis"
1) "message"
2) "redisChat"
3) "Hello,Kuangshen"
原理
Redis是使用C实现的,通过分析 Redis 源码里的pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。
Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。
通过 SUBSCRIBE 命令订阅某频道后,redis-server 里维护了一个字典,字典的键就是一个个 channel,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中。
通过 PUBLISH 命令向订阅者发送消息,redis-server 会使用给定的频道作为键,在它所维护的 channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。使用场景
Pub/Sub构建实时消息系统
Redis的Pub/Sub系统可以构建实时的消息系统
比如很多用Pub/Sub构建的实时聊天系统的例子。
Redis主从复制
概念
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave 以读为主。
默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。
主从复制的作用主要包括:
1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
4、高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的,原因如下:
1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较
大;
2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有
内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。
电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。
对于这种场景,我们可以使如下这种架构:
环境配置
基本配置
配从库不配主库,从库配置
slaveof 主库ip 主库端口
Info replication
每次与master断开之后,都需要重新连接,除非你配置进redis.conf文件
修改配置文件
准备工作:我们配置主从复制,至少需要三个,一主二从!配置三个客户端!
1、拷贝多个redis.conf文件
2、指定端口6379,依次类推
3、开启daemonize yes
4、Pid文件名字pidfile /var/run/redis_6379.pid ,依此类推
5、Log文件名字logfile "6379.log" ,依次类推
6、Dump.rdb名字dbfilename dump6379.rdb ,依次类推
上面都配置完毕后,3个服务通过3个不同的配置文件开启,我们的准备环境就OK 了!
一主二从
一主二从
1、环境初始化
默认三个都是Master 主节点
2、配置为一个Master两个Slave
3、在主机设置值,在从机都可以取到!从机不能写值!
测试一:主机挂了,查看从机信息,主机恢复,再次查看信息
测试二:从机挂了,查看主机信息,从机恢复,查看从机信息
层次链路
上一个Slave可以是下一个slave和Master,Slave同样可以接收其它slaves的连接和同步请求,那么该slave作为了链条中下一个的master,可以有效减轻master的写压力!
测试:6379设置值以后6380和6381都可以获取到!OK!
谋朝篡位
一主二从的情况下,如果主机断了,从机可以使用命令 SLAVEOF NO ONE 将自己改为主机!这个时候其余的从机链接到这个节点。对一个从属服务器执行命令 SLAVEOF NO ONE 将使得这个从属服务器关闭复制功能,并从从属服务器转变回主服务器,原来同步所得的数据集不会被丢弃。
主机再回来,也只是一个光杆司令了,从机为了正常使用跑到了新的主机上!
复制原理
Slave 启动成功连接到 master 后会发送一个sync命令
Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行
完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行
哨兵模式
概述
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。
谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
这里的哨兵有两个作用
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
配置测试
1、调整结构,6379带着80、81
2、自定义的 /myredis 目录下新建 sentinel.conf 文件,名字千万不要错
3、配置哨兵,填写内容
4、启动哨兵
5、正常主从演示
6、原有的Master 挂了
7、投票新选
8、重新主从继续开工,info replication 查查看
9、问题:如果之前的master 重启回来,会不会双master 冲突? 之前的回来只能做小弟了
哨兵模式的优缺点
优点
- 哨兵集群模式是基于主从模式的,所有主从的优点,哨兵模式同样具有。
- 主从可以切换,故障可以转移,系统可用性更好。
- 哨兵模式是主从模式的升级,系统更健壮,可用性更高。
缺点
- Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。
- 实现哨兵模式的配置也不简单,甚至可以说有些繁琐
哨兵配置说明
port 26379
dir /tmp
sentinel monitor mymaster 127.0.0.1 6379 2 1234567891011121314
sentinel auth-pass mymaster MySUPER
这个数字越小,完成failover所需的时间就越长,
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000
sentinel notification-script mymaster /var/redis/notify.sh
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh 616263
缓存穿透和雪崩
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。
缓存穿透
概念
缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没用,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库,这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。
解决方案
布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;
缓存空对象
当存储层不命中,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源
但是这种方法会存在两个问题:
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。
缓存击穿
概述
这里需要注意和缓存击穿的区别,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导致数据库瞬间压力过大。
解决方案
设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。
加互斥锁
分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其它线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩
概念
缓存雪崩,是指在某一个时间段,缓存集中过期失效。
产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。
其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然
形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就
是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知
的,很有可能瞬间就把数据库压垮。
解决方案
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续
工作,其实就是搭建的集群。
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对
某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数
据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让
缓存失效的时间点尽量均匀。
Jedis
Jedis是Redis官方推荐的Java连接开发工具。要在Java开发中使用好Redis中间件,必须对Jedis熟悉才能
写成漂亮的代码
测试联通
1、新建一个普通的Maven项目
2、导入redis的依赖!
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
3、编写测试代码
package com.kuang.ping;
import redis.clients.jedis.Jedis;
public class Ping {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1",6379);
System.out.println("连接成功");
//查看服务是否运行
System.out.println("服务正在运行: "+jedis.ping());
}
}
4、启动redis服务
5、启动测试,结果
连接成功
服务正在运行: PONG
常用API
基本操作
public class TestPassword {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.connect();
jedis.disconnect();
jedis.flushAll();
}
}
对key操作的命令
public class TestKey {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
System.out.println("清空数据:"+jedis.flushDB());
System.out.println("判断某个键是否存在:"+jedis.exists("username"));
System.out.println("新增<'username','kuangshen'>的键值 对:"+jedis.set("username", "kuangshen"));
stem.out.println("新增<'password','password'>的键值 对:"+jedis.set("password", "password"));
System.out.print("系统中所有的键如下:");
Set<String> keys = jedis.keys("*");
System.out.println(keys);
System.out.println("删除键password:"+jedis.del("password"));
System.out.println("判断键password是否存 在:"+jedis.exists("password"));
System.out.println("查看键username所存储的值的类 型:"+jedis.type("username"));
System.out.println("随机返回key空间的一个:"+jedis.randomKey());
System.out.println("重命名key:"+jedis.rename("username","name"));
System.out.println("取出改后的name:"+jedis.get("name"));
System.out.println("按索引查询:"+jedis.select(0));
System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB());
System.out.println("返回当前数据库中key的数目:"+jedis.dbSize());
System.out.println("删除所有数据库中的所有key:"+jedis.flushAll());
}
}
对String操作的命令
public class TestString {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
System.out.println("===========增加数据===========");
System.out.println(jedis.set("key1","value1"));
System.out.println(jedis.set("key2","value2"));
System.out.println(jedis.set("key3", "value3"));
System.out.println("删除键key2:"+jedis.del("key2"));
System.out.println("获取键key2:"+jedis.get("key2"));
System.out.println("修改key1:"+jedis.set("key1", "value1Changed"));
System.out.println("获取key1的值:"+jedis.get("key1"));
System.out.println("在key3后面加入值:"+jedis.append("key3", "End"));
System.out.println("key3的值:"+jedis.get("key3"));
System.out.println("增加多个键值 对:"+jedis.mset("key01","value01","key02","value02","key03","value03"));
System.out.println("获取多个键值 对:"+jedis.mget("key01","key02","key03"));
System.out.println("获取多个键值 对:"+jedis.mget("key01","key02","key03","key04"));
System.out.println("删除多个键值对:"+jedis.del("key01","key02"));
System.out.println("获取多个键值 对:"+jedis.mget("key01","key02","key03"));
jedis.flushDB();
System.out.println("===========新增键值对防止覆盖原先值==============");
System.out.println(jedis.setnx("key1", "value1"));
System.out.println(jedis.setnx("key2", "value2"));
System.out.println(jedis.setnx("key2", "value2-new"));
System.out.println(jedis.get("key1"));
System.out.println(jedis.get("key2"));
System.out.println("===========新增键值对并设置有效时间=============");
System.out.println(jedis.setex("key3", 2, "value3"));
System.out.println(jedis.get("key3"));
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(jedis.get("key3"));
System.out.println("===========获取原值,更新为新值==========");
System.out.println(jedis.getSet("key2", "key2GetSet"));
System.out.println(jedis.get("key2"));
System.out.println("获得key2的值的字串:"+jedis.getrange("key2", 2, 4));} }
对List操作命令
public class TestList {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
System.out.println("===========添加一个list===========");
jedis.lpush("collections", "ArrayList", "Vector", "Stack", "HashMap", "WeakHashMap", "LinkedHashMap");
jedis.lpush("collections", "HashSet");
jedis.lpush("collections", "TreeSet");
jedis.lpush("collections", "TreeMap");
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections区间0-3的元 素:"+jedis.lrange("collections",0,3));
System.out.println("===============================");
System.out.println("删除指定元素个数:"+jedis.lrem("collections", 2, "HashMap"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("删除下表0-3区间之外的元 素:"+jedis.ltrim("collections", 0, 3));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections列表出栈(左 端):"+jedis.lpop("collections"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections添加元素,从列表右端,与lpush相对 应:"+jedis.rpush("collections", "EnumMap"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("collections列表出栈(右 端):"+jedis.rpop("collections"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("修改collections指定下标1的内 容:"+jedis.lset("collections", 1, "LinkedArrayList"));
System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));
System.out.println("===============================");
System.out.println("collections的长度:"+jedis.llen("collections"));
System.out.println("获取collections下标为2的元 素:"+jedis.lindex("collections", 2));
System.out.println("==============================="); jedis.lpush("sortedList", "3","6","2","0","7","4");
System.out.println("sortedList排序前:"+jedis.lrange("sortedList", 0, -1));
System.out.println(jedis.sort("sortedList"));
System.out.println("sortedList排序后:"+jedis.lrange("sortedList", 0, -1));
}
}
对Set的操作命令
public class TestSet {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
System.out.println("============向集合中添加元素(不重复) ============");
System.out.println(jedis.sadd("eleSet", "e1","e2","e4","e3","e0","e8","e7","e5"));
System.out.println(jedis.sadd("eleSet", "e6"));
System.out.println(jedis.sadd("eleSet", "e6"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("删除一个元素e0:"+jedis.srem("eleSet", "e0"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("删除两个元素e7和e6:"+jedis.srem("eleSet", "e7","e6"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet"));
System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet"));
System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));
System.out.println("eleSet中包含元素的个数:"+jedis.scard("eleSet"));
System.out.println("e3是否在eleSet中:"+jedis.sismember("eleSet", "e3"));
System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e1"));
System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e5"));
System.out.println("=================================");
System.out.println(jedis.sadd("eleSet1", "e1","e2","e4","e3","e0","e8","e7","e5"));
System.out.println(jedis.sadd("eleSet2", "e1","e2","e4","e3","e0","e8"));
System.out.println("将eleSet1中删除e1并存入eleSet3 中:"+jedis.smove("eleSet1", "eleSet3", "e1"));
System.out.println("将eleSet1中删除e2并存入eleSet3 中:"+jedis.smove("eleSet1", "eleSet3", "e2"));
System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));
System.out.println("eleSet3中的元素:"+jedis.smembers("eleSet3"));
System.out.println("============集合运算=================");
System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));
System.out.println("eleSet2中的元素:"+jedis.smembers("eleSet2"));
System.out.println("eleSet1和eleSet2的交 集:"+jedis.sinter("eleSet1","eleSet2"));
System.out.println("eleSet1和eleSet2的并 集:"+jedis.sunion("eleSet1","eleSet2"));
System.out.println("eleSet1和eleSet2的差 集:"+jedis.sdiff("eleSet1","eleSet2"));
jedis.sinterstore("eleSet4","eleSet1","eleSet2");
System.out.println("eleSet4中的元素:"+jedis.smembers("eleSet4"));
}
}
对Hash的操作命令
public class TestHash {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
Map<String,String> map = new HashMap<>();
map.put("key1","value1");
map.put("key2","value2");
map.put("key3","value3");
map.put("key4","value4");
jedis.hmset("hash",map);
jedis.hset("hash", "key5", "value5");
System.out.println("散列hash的所有键值对 为:"+jedis.hgetAll("hash"));
System.out.println("散列hash的所有键为:"+jedis.hkeys("hash"));
System.out.println("散列hash的所有值为:"+jedis.hvals("hash"));
System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加 key6:"+jedis.hincrBy("hash", "key6", 6));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加 key6:"+jedis.hincrBy("hash", "key6", 3));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("删除一个或者多个键值对:"+jedis.hdel("hash", "key2"));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("散列hash中键值对的个数:"+jedis.hlen("hash"));
System.out.println("判断hash中是否存在 key2:"+jedis.hexists("hash","key2"));
System.out.println("判断hash中是否存在 key3:"+jedis.hexists("hash","key3"));
System.out.println("获取hash中的值:"+jedis.hmget("hash","key3"));
System.out.println("获取hash中的 值:"+jedis.hmget("hash","key3","key4"));
}
}
事务
基本操作
package com.kuang.multi;
import com.alibaba.fastjson.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
public class TestMulti {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB(); JSONObject
jsonObject = new JSONObject();
jsonObject.put("hello", "world");
jsonObject.put("name", "java");
Transaction multi = jedis.multi();
String result = jsonObject.toJSONString();
try{
multi.set("json", result);
multi.set("json2", result);
int i = 100/0;
multi.exec();
}catch(Exception e){
e.printStackTrace();
multi.discard();
}finally{
System.out.println(jedis.get("json"));
System.out.println(jedis.get("json2"));
jedis.close();
}
}
}
SpringBoot整合
基础使用
概述
在SpringBoot中一般使用RedisTemplate提供的方法来操作Redis。那么使用SpringBoot整合Redis需要那些步骤呢。
1、 JedisPoolConfig (这个是配置连接池)
2、 RedisConnectionFactory 这个是配置连接信息,这里的RedisConnectionFactory是一个接口,我们需要使用它的实现类,在SpringD Data Redis方案中提供了以下四种工厂模型:
- JredisConnectionFactory
- JedisConnectionFactory
- LettuceConnectionFactory
- SrpConnectionFactory
3、 RedisTemplate 基本操作
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
yaml配置
spring:
redis:
host: 127.0.0.1
port: 6379
password: 123456
jedis:
pool:
max-active: 8
max-wait: -1ms
max-idle: 500 min-idle: 0
lettuce:
shutdown-timeout: 0ms
测试
@SpringBootTest
class SpringbootRedisApplicationTests {
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Test
void contextLoads() {
redisTemplate.opsForValue().set("myKey","myValue");
System.out.println(redisTemplate.opsForValue().get("myKey"));
}
}
封装工具类
1、新建一个SpringBoot项目
2、导入redis的启动器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
3、配置redis,可以查看 RedisProperties 分析
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
4、分析 RedisAutoConfifiguration 自动配置类
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration {
@Bean
@ConditionalOnMissingBean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
通过源码可以看出,SpringBoot自动帮我们在容器中生成了一个RedisTemplate和一个StringRedisTemplate。
但是,这个RedisTemplate的泛型是<Object,Object>,写代码不方便,需要写好多类型转换的代码;我们需要一个泛型为<String,Object>形式的RedisTemplate。
并且,这个RedisTemplate没有设置数据存在Redis时,key及value的序列化方式。
看到这个@ConditionalOnMissingBean注解后,就知道如果Spring容器中有了RedisTemplate对象了,这个自动配置的RedisTemplate不会实例化。因此我们可以直接自己写个配置类,配置RedisTemplate。
5、既然自动配置不好用,就重新配置一个RedisTemplate
package com.kuang.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
jackson template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
6、写一个Redis工具类(直接用RedisTemplate操作Redis,需要很多行代码,因此直接封装好一个RedisUtils,这样写代码更方便点。这个RedisUtils交给Spring容器实例化,使用时直接注解注入。)
package com.kuang.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0"); }
return redisTemplate.opsForValue().increment(key, delta);
}
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item); }
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true; } catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
}
catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true; }
catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time); return true;
} catch (Exception e) {
e.printStackTrace(); return false;
}
}
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
ren true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace(); return 0;
}
}
}
|