第七阶段模块二
Redis
1. 概述
1.1 互联网架构的演变历程
第1阶段:数据访问量不大,简单的架构即可搞定!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4ntkevuE-1633005745103)(E:\MarkDown\拉勾笔记\redis 架构第一阶段)]
第2阶段:数据访问量大,使用缓存技术来缓解数据库的压力。
? 不同的业务访问不同的数据库
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OL4DRruk-1633005745106)(E:\MarkDown\拉勾笔记\redis 架构第二阶段)]
第3阶段:
? 主从读写分离。
? 之前的缓存确实能够缓解数据库的压力,但是写和读都集中在一个数据库上,压力又来了。
? 一个数据库负责写,一个数据库负责读。分工合作。愉快!
? 让master(主数据库)来响应事务性(增删改)操作,让slave(从数据库)来响应非事务性 (查询)操作,然后再采用主从复制来把master上的事务性操作同步到slave数据库中
? mysql的master/slave就是网站的标配!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LUUi1SLa-1633005745109)(E:\MarkDown\拉勾笔记\redis 架构第三阶段)]
第4阶段:
? mysql的主从复制,读写分离的基础上,mysql的主库开始出现瓶颈
? 由于MyISAM使用表锁,所以并发性能特别差
? 分库分表开始流行,mysql也提出了表分区,虽然不稳定,但我们看到了希望
? 开始吧,mysql集群
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EeV79TLc-1633005745114)(E:\MarkDown\拉勾笔记\redis 架构第四阶段)]
1.2 Redis入门介绍
1.互联网需求的3高
? 高并发,高可扩,高性能
2.Redis 是一种运行速度很快,并发性能很强,并且运行在内存上的NoSql(not only sql)数据库
3.NoSQL数据库 和 传统数据库 相比的优势
? NoSQL数据库无需事先为要存储的数据建立字段,随时可以存储自定义的数据格式。
? 而在关系数据库里,增删字段是一件非常麻烦的事情。若是非常大数据量的表,增加字段简直就是一个噩梦
4.Redis的常用使用场景
? 缓存,毫无疑问这是Redis当今最为人熟知的使用场景。在提升服务器性能方面非常有效;一些频繁被访问的数据,经常被访问的数据如果放在关系型数据库,每次查询的开销都会很 大,而放在redis中,因为redis 是放在内存中的可以很高效的访问
? 排行榜,在使用传统的关系型数据库(mysql oracle 等)来做这个事儿,非常的麻烦,而利 用Redis的SortSet(有序集合)数据结构能够简单的搞定;
? 计算器/限速器,利用Redis中原子性的自增操作,我们可以统计类似用户点赞数、用户访问数等,这类操作如果用MySQL,频繁的读写会带来相当大的压力;限速器比较典型的使用场景是限制某个用户访问某个API的频率,常用的有抢购时,防止用户疯狂点击带来不必要的压力;
? 好友关系,利用集合的一些命令,比如求交集、并集、差集等。可以方便搞定一些共同好友、共同爱好之类的功能;
? 简单消息队列,除了Redis自身的发布/订阅模式,我们也可以利用List来实现一个队列机制, 比如:到货通知、邮件发送之类的需求,不需要高可靠,但是会带来非常大的DB压力,完全可以用List来完成异步解耦;
? Session共享,以jsp为例,默认Session是保存在服务器的文件中,如果是集群服务,同一个用户过来可能落在不同机器上,这就会导致用户频繁登陆;采用Redis保存Session后,无论用户落在那台机器上都能够获取到对应的Session信息。
1.3 Redis/Memcache/MongoDB对比
1.3.1 Redis和Memcache
? Redis和Memcache都是内存数据库。不过memcache还可用于缓存其他东西,例如图片、视频等等。
? memcache 数据结构单一kv,redis 更丰富一些,还提供 list,set, hash 等数据结构的存储,有效的减少网络 IO 的次数
? 虚拟内存–Redis当物理内存用完时,可以将一些很久没用到的value交换到磁盘
? 存储数据安全–memcache挂掉后,数据没了(没有持久化机制);redis可以定期保存到磁盘(持 久化)
? 灾难恢复–memcache挂掉后,数据不可恢复; redis数据丢失后可以通过RBD或AOF恢复
1.3.2 Redis和MongoDB
? redis和mongodb并不是竞争关系,更多的是一种协作共存的关系。
? mongodb本质上还是硬盘数据库,在复杂查询时仍然会有大量的资源消耗,而且在处理复杂逻辑时仍然要不可避免地进行多次查询。
? 这时就需要redis或Memcache这样的内存数据库来作为中间层进行缓存和加速。
? 比如在某些复杂页面的场景中,整个页面的内容如果都从mongodb中查询,可能要几十个查询语句,耗时很长。如果需求允许,则可以把整个页面的对象缓存至redis中,定期更新。这样 mongodb和redis就能很好地协作起来
1.4 分布式数据库CAP原理
1.4.1 CAP简介
1.传统的关系型数据库事务具备ACID:
? A:原子性
? C:一致性
? I:独立性
? D:持久性
2.分布式数据库的CAP:
? C(Consistency):强一致性
? “all nodes see the same data at the same time”,即更新操作成功并返回客户端后,所有节点在同一时间的数据完全一致,这就是分布式的一致性。一致性的问题在并发系统中不可避免,对于客户端来说,一致性指的是并发访问时更新过的数据如何获取的问题。从服务端来看,则是更新如何复制分布到整个系统,以保证数据最终一致。
? A(Availability):高可用性
? 可用性指“Reads and writes always succeed”,即服务一直可用,而且要是正常的响应时间。好的可用性主要是指系统能够很好的为用户服务,不出现用户操作失败或者访问超时等用户体验不好的情况。
? P(Partition tolerance):分区容错性
? 即分布式系统在遇到某节点或网络分区故障时,仍然能够对外提供满足一致性或可用性的服务。
? 分区容错性要求能够使应用虽然是一个分布式系统,而看上去却好像是在一个可以运转正常的整体。比如现在的分布式系统中有某一个或者几个机器宕掉了,其他剩下的机器还能够正常运转满足系统需求,对于用户而言并没有什么体验上的影响。
1.4.2 CAP理论
CAP理论提出就是针对分布式数据库环境的,所以,P这个属性必须容忍它的存在,而且是必须具备的。
因为P是必须的,那么我们需要选择的就是A和C。
大家知道,在分布式环境下,为了保证系统可用性,通常都采取了复制的方式,避免一个节点损坏,导致系统不可用。那么就出现了每个节点上的数据出现了很多个副本的情况,而数据从一个节点复制到另外的节点时需要时间和要求网络畅通的,所以,当P发生时,也就是无法向某个节点复制数据时,这时候你有两个选择:
? 选择可用性 A,此时,那个失去联系的节点依然可以向系统提供服务,不过它的数据就不能保证是同步的了(失去了C属性)。
? 选择一致性C,为了保证数据库的一致性,我们必须等待失去联系的节点恢复过来,在这个过程中,那个节点是不允许对外提供服务的,这时候系统处于不可用状态(失去了A属性)。
最常见的例子是读写分离,某个节点负责写入数据,然后将数据同步到其它节点,其它节点提供读取的服务,当两个节点出现通信问题时,你就面临着选择A(继续提供服务,但是数据不保证准确),C(用户处于等待状态,一直等到数据同步完成)。
1.4.3 CAP总结
分区是常态,不可避免,三者不可共存
可用性和一致性是一对冤家
? 一致性高,可用性低
? 一致性低,可用性高
因此,根据 CAP 原理将 NoSQL 数据库分成了满足 CA 原则、满足 CP 原则和满足 AP 原则三大类:
? CA - 单点集群,满足一致性,可用性的系统,通常在可扩展性上不太强大。
? CP - 满足一致性,分区容忍性的系统,通常性能不是特别高。
? AP - 满足可用性,分区容忍性的系统,通常可能对一致性要求低一些。
2. 下载与安装
2.1 下载
redis:http://www.redis.net.cn/
图形工具:https://redisdesktop.com/download
2.2 安装
虽然可以在安装在windows操作系统,但是官方不推荐,所以我们一如既往的安装在linux上
1.上传tar.gz包,并解压
tar -zxvf redis-5.0.4.tar.gz
2.安装gcc(必须有网络)
yum -y install gcc
忘记是否安装过,可以使用 gcc -v 命令查看gcc版本,如果没有安装过,会提示命令不存在
3.进入redis目录,进行编译
make
4.编译之后,开始安装
make install
2.3 安装后的操作
2.3.1 后台运行方式
1.redis默认不会使用后台运行,如果你需要,修改配置文件daemonize=yes,当你后台服务启动的时候,会写成一个进程文件运行。
vim /opt/redis-5.0.4/redis.conf
daemonize yes
2.以配置文件的方式启动
cd /usr/local/bin
redis-server /opt/redis-5.0.4/redis.conf
2.3.2 关闭数据库
1.单实例关闭
redis-cli shutdown
2.多实例关闭
redis-cli -p 6379 shutdown
2.3.3 常用操作
1.检测6379端口是否在监听
netstat -lntp | grep 6379
端口为什么是6379?
? 6379在是手机按键上MERZ对应的号码, 而MERZ取自意大利歌女Alessia Merz的名字。 MERZ长期以来被antirez(redis作者)及其朋友当作愚蠢的代名词。
2.检测后台进程是否存在
ps -ef|grep redis
2.3.4 连接redis并测试
redis-cli
ping
2.3.5 HelloWorld
set k1 china
get kl
2.3.6 测试性能
1.先 ctrl+c,退出redis客户端
redis-benchmark
2.执行命令后,命令不会自动停止,需要我们手动ctrl+c停止测试
[root@localhost bin]
====== PING_INLINE ======
100000 requests completed in 1.80 seconds
50 parallel clients
3 bytes payload
keep alive: 1
87.69% <= 1 milliseconds
99.15% <= 2 milliseconds
99.65% <= 3 milliseconds
99.86% <= 4 milliseconds
99.92% <= 5 milliseconds
99.94% <= 6 milliseconds
99.97% <= 7 milliseconds
100.00% <= 7 milliseconds
55524.71 requests per second
2.3.7 默认16个数据库
vim /opt/redis-5.0.4/redis.conf
127.0.0.1:6379> get k1
"china"
127.0.0.1:6379> select 16
(error) ERR DB index is out of range
127.0.0.1:6379> select 15
OK
127.0.0.1:6379[15]> get k1
(nil)
127.0.0.1:6379[15]> select 0
OK
127.0.0.1:6379> get k1
"china"
2.3.8 数据库键的数量
dbsize
redis在linux支持命令补全(tab)
2.3.9 清空数据库
1.清空当前库
flushdb
2.清空所有(16个)库,慎用!!
flushall
2.3.10 模糊查询(key)
模糊查询keys命令,有三个通配符:
1.*:通配任意多个字符
? 查询所有的键
keys *
? 模糊查询k开头,后面随便多少个字符
keys k*
? 模糊查询e为最后一位,前面随便多少个字符
keys *e
? 双 * 模式,匹配任意多个字符:查询包含k的键
keys *k*
2.?:通配单个字符
? 模糊查询k字头,并且匹配一个字符
keys k?
? 你只记得第一个字母是k,他的长度是3
keys k??
3.[]:通配括号内的某一个字符
? 记得其他字母,就第二个字母可能是a或e
keys r[ae]dis
2.3.11 键(key)
1.exists key:判断某个key是否存在
127.0.0.1:6379> exists k1
(integer) 1
127.0.0.1:6379> exists y1
(integer) 0
2.move key db:移动(剪切,粘贴)键到几号库
127.0.0.1:6379> move x1 8
(integer) 1
127.0.0.1:6379> exists x1
(integer) 0
127.0.0.1:6379> select 8
OK
127.0.0.1:6379[8]> keys *
1) "x1"
3.ttl key:查看键还有多久过期(-1永不过期,-2已过期)
? time to live 还能活多久
127.0.0.1:6379[8]> ttl x1
(integer) -1
4.expire key 秒:为键设置过期时间(生命倒计时)
127.0.0.1:6379[8]> set k1 v1
OK
127.0.0.1:6379[8]> ttl k1
(integer) -1
127.0.0.1:6379[8]> expire k1 10
(integer) 1
127.0.0.1:6379[8]> get k1
"v1"
127.0.0.1:6379[8]> ttl k1
(integer) 2
127.0.0.1:6379[8]> get k1
(nil)
127.0.0.1:6379[8]> keys *
(empty list or set)
5.type key:查看键的数据类型
127.0.0.1:6379[8]> type k1
string
3. 使用Redis
3.1 五大数据类型
操作文档:http://redisdoc.com/
3.1.1 字符串String
1.set/get/del/append/strlen
127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> set k2 v2
OK
127.0.0.1:6379> keys *
1) "k1"
2) "k2"
127.0.0.1:6379> del k2
(integer) 1
127.0.0.1:6379> keys *
1) "k1"
127.0.0.1:6379> get k1
"v1"
127.0.0.1:6379> append k1 abc
(integer) 5
127.0.0.1:6379> get k1
"v1abc"
127.0.0.1:6379> strlen k1
(integer) 5
2.incr/decr/incrby/decrby:加减操作,操作的必须是数字类型
? incr:意思是increment,增加
? decr:意思是decrement,减少
127.0.0.1:6379> set k1 1
OK
127.0.0.1:6379> incr k1
(integer) 2
127.0.0.1:6379> incr k1
(integer) 3
127.0.0.1:6379> get k1
"3"
127.0.0.1:6379> decr k1
(integer) 2
127.0.0.1:6379> decr k1
(integer) 1
127.0.0.1:6379> get k1
"1"
127.0.0.1:6379> incrby k1 3
(integer) 4
127.0.0.1:6379> get k1
"4"
127.0.0.1:6379> decrby k1 2
(integer) 2
127.0.0.1:6379> get k1
"2"
3.getrange/setrange:类似between…and…
? range:范围
127.0.0.1:6379> set k1 abcdef
OK
127.0.0.1:6379> get k1
"abcdef"
127.0.0.1:6379> getrange k1 0 -1
"abcdef"
127.0.0.1:6379> getrange k1 0 3
"abcd"
127.0.0.1:6379> setrange k1 1 xxx
(integer) 6
127.0.0.1:6379> get k1
"axxxef"
4.setex/setnx
? set with expir:添加数据的同时设置生命周期
127.0.0.1:6379> setex k1 5 v1
OK
127.0.0.1:6379> get k1
"v1"
127.0.0.1:6379> get k1
(nil)
? set if not exist:添加数据的时候判断是否已经存在,防止已存在的数据被覆盖掉
127.0.0.1:6379> setnx k1 wei
(integer) 0
127.0.0.1:6379> get k1
"weiwei"
127.0.0.1:6379> setnx k2 wei
(integer) 1
5.mset/mget/msetnx
? m:more更多
127.0.0.1:6379> set k1 v1 k2 v2
(error) ERR syntax error
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3
OK
127.0.0.1:6379> keys *
1) "k1"
2) "k2"
3) "k3"
127.0.0.1:6379> mget k2 k3
1) "v2"
2) "v3"
127.0.0.1:6379> msetnx k3 v3 k4 v4
(integer) 0
127.0.0.1:6379> msetnx k4 v4 k5 v5
(integer) 1
6.getset:先get后set
127.0.0.1:6379> getset k6 v6
(nil)
127.0.0.1:6379> keys *
1) "k4"
2) "k1"
3) "k2"
4) "k3"
5) "k5"
6) "k6"
127.0.0.1:6379> get k6
"v6"
127.0.0.1:6379> getset k6 vv6
"v6"
127.0.0.1:6379> get k6
"vv6"
3.1.2 列表List
push和pop,类似机枪AK47:push,压子弹,pop,射击出子弹
1.lpush/rpush/lrange
? l:left 自左向右→添加 (从上往下添加)
? r:right 自右向左←添加(从下往上添加)
127.0.0.1:6379> lpush list01 1 2 3 4 5
(integer) 5
127.0.0.1:6379> keys *
1) "list01"
127.0.0.1:6379> lrange list01 0 -1
1) "5"
2) "4"
3) "3"
4) "2"
5) "1"
127.0.0.1:6379> rpush list02 1 2 3 4 5
(integer) 5
127.0.0.1:6379> lrange list02 0 -1
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
2.lpop/rpop:移除第一个元素(上左下右)
127.0.0.1:6379> lpop list02
"1"
127.0.0.1:6379> rpop list02
"5"
3.lindex:根据下标查询元素(从左向右,自上而下)
127.0.0.1:6379> lrange list01 0 -1
1) "5"
2) "4"
3) "3"
4) "2"
5) "1"
127.0.0.1:6379> lindex list01 2
"3"
127.0.0.1:6379> lindex list01 1
"4"
4.llen:返回集合长度
127.0.0.1:6379> llen list01
(integer) 5
5.lrem:删除n个value
127.0.0.1:6379> lpush list01 1 2 2 3 3 3 4 4 4 4
(integer) 10
127.0.0.1:6379> lrem list01 2 3
(integer) 2
127.0.0.1:6379> lrange list01 0 -1
1) "4"
2) "4"
3) "4"
4) "4"
5) "3"
6) "2"
7) "2"
8) "1"
6.ltrim:截取指定范围的值,别的全扔掉
? ltrim key begindex endindex
127.0.0.1:6379> lpush list01 1 2 3 4 5 6 7 8 9
(integer) 9
127.0.0.1:6379> lrange list01 0 -1
1) "9"
2) "8"
3) "7"
4) "6"
5) "5"
6) "4"
7) "3"
8) "2"
9) "1"
127.0.0.1:6379> ltrim list01 3 6
OK
127.0.0.1:6379> lrange list01 0 -1
1) "6"
2) "5"
3) "4"
4) "3"
7.rpoplpush:从一个集合搞一个元素到另一个集合中(右出一个,左进一个)
127.0.0.1:6379> rpush list01 1 2 3 4 5
(integer) 5
127.0.0.1:6379> lrange list01 0 -1
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
127.0.0.1:6379> rpush list02 1 2 3 4 5
(integer) 5
127.0.0.1:6379> lrange list02 0 -1
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
127.0.0.1:6379> rpoplpush list01 list02
"5"
127.0.0.1:6379> lrange list01 0 -1
1) "1"
2) "2"
3) "3"
4) "4"
127.0.0.1:6379> lrange list02 0 -1
1) "5"
2) "1"
3) "2"
4) "3"
5) "4"
6) "5"
8.lset:改变某个下标的某个值
? lset key index value
127.0.0.1:6379> lrange list02 0 -1
1) "5"
2) "1"
3) "2"
4) "3"
5) "4"
6) "5"
127.0.0.1:6379> lset list02 0 x
OK
127.0.0.1:6379> lrange list02 0 -1
1) "x"
2) "1"
3) "2"
4) "3"
5) "4"
6) "5"
9.linsert:插入元素(指定某个元素之前/之后)
? linsert key before/after oldvalue newvalue
127.0.0.1:6379> lrange list02 0 -1
1) "x"
2) "1"
3) "2"
4) "3"
5) "4"
6) "5"
127.0.0.1:6379> linsert list02 before 2 java
(integer) 7
127.0.0.1:6379> lrange list02 0 -1
1) "x"
2) "1"
3) "java"
4) "2"
5) "3"
6) "4"
7) "5"
127.0.0.1:6379> linsert list02 after 2 redis
(integer) 8
127.0.0.1:6379> lrange list02 0 -1
1) "x"
2) "1"
3) "java"
4) "2"
5) "redis"
6) "3"
7) "4"
8) "5"
10.性能总结:类似添加火车皮一样,头尾操作效率高,中间操作效率惨;
3.1.3 集合Set
和java中的set特点类似,不允许重复
1.sadd/smembers/sismember:添加/查看/判断是否存在
127.0.0.1:6379> sadd set01 1 2 2 3 3 3
(integer) 3
127.0.0.1:6379> smembers set01
1) "1"
2) "2"
3) "3"
127.0.0.1:6379> sismember set01 2
(integer) 1
127.0.0.1:6379> sismember set01 5
(integer) 0
? 注意:1和0可不是下标,而是布尔。1:true存在,0:false不存在
2.scard:获得集合中的元素个数
127.0.0.1:6379> scard set01
(integer) 3
3.srem:删除集合中的元素
? srem key value
127.0.0.1:6379> srem set01 2
(integer) 1
4.srandmember:从集合中随机获取几个元素
? srandmember 整数(个数)
127.0.0.1:6379> sadd set01 1 2 3 4 5 6 7 8 9
(integer) 9
127.0.0.1:6379> smembers set01
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
7) "7"
8) "8"
9) "9"
127.0.0.1:6379> srandmember set01 3
1) "8"
2) "2"
3) "3"
127.0.0.1:6379> srandmember set01 5
1) "5"
2) "8"
3) "7"
4) "4"
5) "6"
5.spop:随机出栈(移除)
127.0.0.1:6379> smembers set01
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
7) "7"
8) "8"
9) "9"
127.0.0.1:6379> spop set01
"8"
127.0.0.1:6379> spop set01
"7"
6.smove:移动元素:将key1某个值赋值给key2
127.0.0.1:6379> sadd set01 1 2 3 4 5
(integer) 5
127.0.0.1:6379> sadd set02 x y z
(integer) 3
127.0.0.1:6379> smove set01 set02 3
(integer) 1
7.数学集合类
? 交集:sinter
? 并集:sunion
? 差集:sdiff
127.0.0.1:6379> sadd set01 1 2 3 4 5
(integer) 5
127.0.0.1:6379> sadd set02 2 a 1 b 3
(integer) 5
127.0.0.1:6379> sinter set01 set02
1) "1"
2) "2"
3) "3"
127.0.0.1:6379> sunion set01 set02
1) "5"
2) "4"
3) "3"
4) "2"
5) "b"
6) "a"
7) "1"
127.0.0.1:6379> sdiff set01 set02
1) "4"
2) "5"
127.0.0.1:6379> sdiff set02 set01
1) "b"
2) "a"
3.1.4 哈希Hash
类似java里面的Map
KV模式不变,但V是一个键值对
1.hset/hget/hmset/hmget/hgetall/hdel:添加/得到/多添加/多得到/得到全部/删除属性
127.0.0.1:6379> hset user id 1001
(integer) 1
127.0.0.1:6379> hget user
(error) ERR wrong number of arguments for 'hget' command
127.0.0.1:6379> hget user id
"1001"
127.0.0.1:6379> hmset student id 101 name tom age 22
OK
127.0.0.1:6379> hget student name
"tom"
127.0.0.1:6379> hmget student name age
1) "tom"
2) "22"
127.0.0.1:6379> hgetall student
1) "id"
2) "101"
3) "name"
4) "tom"
5) "age"
6) "22"
127.0.0.1:6379> hdel student age
(integer) 1
127.0.0.1:6379> hgetall student
1) "id"
2) "101"
3) "name"
4) "tom"
2.hlen:返回元素的属性个数
127.0.0.1:6379> hgetall student
1) "id"
2) "101"
3) "name"
4) "tom"
127.0.0.1:6379> hlen student
(integer) 2
3.hexists:判断元素是否存在某个属性
127.0.0.1:6379> hexists student name
(integer) 1
127.0.0.1:6379> hexists student age
(integer) 0
4.hkeys/hvals:获得属性的所有key/获得属性的所有value
127.0.0.1:6379> hkeys student
1) "id"
2) "name"
127.0.0.1:6379> hvals student
1) "101"
2) "tom"
5.hincrby/hincrbyfloat:自增(整数)/自增(小数)
127.0.0.1:6379> hmset student id 101 name tom age 22
OK
127.0.0.1:6379> hincrby student age 2
(integer) 24
127.0.0.1:6379> hget student age
"24"
127.0.0.1:6379> hmset user id 1001 money 1000
OK
127.0.0.1:6379> hincrbyfloat user money 5.5
"1005.5"
127.0.0.1:6379> hget user money
"1005.5"
6.hsetnx:添加的时候,先判断是否存在
127.0.0.1:6379> hsetnx student age 18
(integer) 0
127.0.0.1:6379> hsetnx student sex 男
(integer) 1
127.0.0.1:6379> hgetall student
1) "id"
2) "101"
3) "name"
4) "tom"
5) "age"
6) "24"
7) "sex"
8) "\xe7\x94\xb7"
3.1.5 有序集合Zset
真实需求:
充10元可享vip1;
充20元可享vip2;
充30元可享vip3;
以此类推…
1.zadd/zrange (withscores):添加/查询
127.0.0.1:6379> zadd zset01 10 vip1 20 vip2 30 vip3 40 vip4 50 vip5
(integer) 5
127.0.0.1:6379> zrange zset01 0 -1
1) "vip1"
2) "vip2"
3) "vip3"
4) "vip4"
5) "vip5"
127.0.0.1:6379> zrange zset01 0 -1 withscores
1) "vip1"
2) "10"
3) "vip2"
4) "20"
5) "vip3"
6) "30"
7) "vip4"
8) "40"
9) "vip5"
10) "50"
2.zrangebyscore:模糊查询
? ( : 不包含
? limit:跳过几个截取几个
127.0.0.1:6379> zrangebyscore zset01 20 40
1) "vip2"
2) "vip3"
3) "vip4"
127.0.0.1:6379> zrangebyscore zset01 20 (40
1) "vip2"
2) "vip3"
127.0.0.1:6379> zrangebyscore zset01 (20 (40
1) "vip3"
127.0.0.1:6379> zrangebyscore zset01 10 40 limit 2 2
1) "vip3"
2) "vip4"
127.0.0.1:6379> zrangebyscore zset01 10 40 limit 2 1
1) "vip3"
3.zrem:删除元素
127.0.0.1:6379> zrem zset01 vip5
(integer) 1
4.zcard/zcount/zrank/zscore:集合长度/范围内元素个数/得元素下标/通过值得分数
127.0.0.1:6379> zcard zset01
(integer) 4
127.0.0.1:6379> zcount zset01 20 30
(integer) 2
127.0.0.1:6379> zrank zset01 vip3
(integer) 2
127.0.0.1:6379> zscore zset01 vip2
"20"
5.zrevrank:逆序找下标(从下向上)
127.0.0.1:6379> zrevrank zset01 vip3
(integer) 1
6.zrevrange:逆序查询
127.0.0.1:6379> zrange zset01 0 -1
1) "vip1"
2) "vip2"
3) "vip3"
4) "vip4"
127.0.0.1:6379> zrevrange zset01 0 -1
1) "vip4"
2) "vip3"
3) "vip2"
4) "vip1"
7.zrevrangebyscore:逆序范围查找
127.0.0.1:6379> zrevrangebyscore zset01 30 20
1) "vip3"
2) "vip2"
127.0.0.1:6379> zrevrangebyscore zset01 20 30
(empty list or set)
3.2 持久化
3.2.1 RDB
Redis DataBase
? 在指定的时间间隔内,将内存中的数据集的快照写入磁盘;
? 默认保存在/usr/local/bin中,文件名dump.rdb;
3.2.1.1 自动备份
redis是内存数据库,当我们每次用完redis,关闭linux时,按道理来说,内存释放,redis中的数据也会随之消失
为什么我们再次启动redis的时候,昨天的数据还在,并没有消失呢?
正是因为,每次关机时,redis会自动将数据备份到一个文件中 :/usr/local/bin/dump.rdb
接下来我们就来全方位的认识 自动备份机制
1.默认的自动备份策略不利于我们测试,所以修改redis.conf文件中的自动备份策略
vim redis.conf
/SNAP
save 900 1
save 120 10
save 60 10000
? 当然如果你只是用Redis的缓存功能,不需要持久化,那么你可以注释掉所有的 save 行来停用保存功能。可以直接一个空字符串来实现停用:save “”
2.使用shutdown模拟关机 ,关机之前和关机之后,对比dump.rdb文件的更新时间
? 注意:当我们使用shutdown命令,redis会自动将数据库备份,所以,dump.rdb文件创建时间更新了
3.开机启动redis,我们要在120秒内保存10条数据,再查看dump.rdb文件的更新时间(开两个终端窗口,方便查看)
4.120秒内保存10条数据这一动作触发了备份指令,目前,dump.rdb文件中保存了10条数据,将 dump.rdb拷贝一份dump10.rdb,此时两个文件中都保存10条数据
5.既然有数据已经备份了,那我们就肆无忌惮的将数据全部删除flushall,再次shutdown关机
6.再次启动redis,发现数据真的消失了,并没有按照我们所想的将dump.rdb文件中的内容恢复到 redis中。为什么?
? 因为,当我们保存10条以上的数据时,数据备份起来了;
? 然后删除数据库,备份文件中的数据,也没问题;
? 但是,问题出在shutdown上,这个命令一旦执行,就会立刻备份,将删除之后的空数据库生成备份文件,将之前装10条数据的备份文件覆盖掉了。所以,就出现了上图的结果。自动恢复失败。
? 怎么解决这个问题呢?要将备份文件再备份
7.将dump.rdb文件删除,将dump10.rdb重命名为dump.rdb
8.启动redis服务,登录redis,数据10条,全部恢复!
3.2.1.2 手动备份
之前自动备份,必须更改好多数据,例如上边,我们改变了十多条数据,才会自动备份;
现在,我只保存一条数据,就想立刻备份,应该怎么做?
每次操作完成,执行命令 save 就会立刻备份
3.2.1.3 与RDB相关的配置
1.stop-writes-on-bgsave-error:进水口和出水口,出水口发生故障与否
? yes:当后台备份时候反生错误,前台停止写入
? no:不管死活,就是往里怼
2.rdbcompression:对于存储到磁盘中的快照,是否启动LZF压缩算法,一般都会启动,因为这点性能,多买一台电脑,完全搞定N个来回了。
? yes:启动
? no:不启动(不想消耗CPU资源,可关闭)
3.rdbchecksum:在存储快照后,是否启动CRC64算法进行数据校验;
? 开启后,大约增加10%左右的CPU消耗;
? 如果希望获得最大的性能提升,可以选择关闭;
4.dbfilename:快照备份文件名字
5.dir:快照备份文件保存的目录,默认为当前目录
优势and劣势
优:适合大规模数据恢复,对数据完整性和一致行要求不高;
劣:一定间隔备份一次,意外down掉,就失去最后一次快照的所有修改
3.2.2 AOF
Append Only File 以日志的形式记录每个写操作;
? 将redis执行过的写指令全部记录下来(读操作不记录);
? 只许追加文件,不可以改写文件;
? redis在启动之初会读取该文件从头到尾执行一遍,这样来重新构建数据;
3.2.2.1 开启AOF
1.为了避免失误,最好将redis.conf总配置文件备份一下,然后再修改内容如下:
appendonly yes
appendfilename appendonly.aof
2.重新启动redis,以新配置文件启动
redis-server /opt/redis5.0.4/redis.conf
3.连接redis,加数据,删库,退出
4.查看当前文件夹多一个aof文件,看看文件中的内容,保存的都是写操作
? 文件中最后一句要删除,否则数据恢复不了
? 编辑这个文件,最后要 :wq! 强制执行
5.只需要重新连接,数据恢复成功
3.2.2.2 共存?谁优先?
我们查看redis.conf文件,AOF和RDB两种备份策略可以同时开启,那系统会怎样选择?
1.动手试试,编辑appendonly.aof,胡搞乱码,保存退出
2.启动redis 失败,所以是AOF优先载入来恢复原始数据!因为AOF比RDB数据保存的完整性更高!
3.修复AOF文件,杀光不符合redis语法规范的代码
reids-check-aof --fix appendonly.aof
3.2.2.3 与AOF相关的配置
1.appendonly:开启aof模式
2.appendfilename:aof的文件名字,最好别改!
3.appendfsync:追写策略
? always:每次数据变更,就会立即记录到磁盘,性能较差,但数据完整性好
? everysec:默认设置,异步操作,每秒记录,如果一秒内宕机,会有数据丢失
? no:不追写
4.no-appendfsync-on-rewrite:重写时是否运用Appendfsync追写策略;用默认no即可,保证数据安全性。
? AOF采用文件追加的方式,文件会越来越大,为了解决这个问题,增加了重写机制,redis会自动记录上一次AOF文件的大小,当AOF文件大小达到预先设定的大小时,redis就会启动 AOF文件进行内容压缩,只保留可以恢复数据的最小指令集合
5.auto-aof-rewrite-percentage:如果AOF文件大小已经超过原来的100%,也就是一倍,才重写压缩
6.auto-aof-rewrite-min-size:如果AOF文件已经超过了64mb,才重写压缩
3.2.3 总结(如何选择?)
RDB:只用作后备用途,建议15分钟备份一次就好
AOF:
? 在最恶劣的情况下,也只丢失不超过2秒的数据,数据完整性比较高,但代价太大,会带来持续的IO
? 对硬盘的大小要求也高,默认64mb太小了,企业级最少都是5G以上;
? 后面要学习的master/slave才是新浪微博的选择!!
3.3 事务
可以一次执行多个命令,是一个命令组,一个事务中,所有命令都会序列化(排队),不会被插队;
一个队列中,一次性,顺序性,排他性的执行一系列命令
三特性
? 隔离性:所有命令都会按照顺序执行,事务在执行的过程中,不会被其他客户端送来的命令打断
? 没有隔离级别:队列中的命令没有提交之前都不会被实际的执行,不存在“事务中查询要看到事务里的更新,事务外查询不能看到”这个头疼的问题
? 不保证原子性:冤有头债有主,如果一个命令失败,但是别的命令可能会执行成功,没有回滚
三步走
? 开启multi
? 入队queued
? 执行exec
关系型数据库事务相比
? multi:可以理解成关系型事务中的 begin
? exec :可以理解成关系型事务中的 commit
? discard :可以理解成关系型事务中的 rollback
3.3.1 一起生
开启事务,加入队列,一起执行,并成功
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) "v2"
4) OK
3.3.2 一起死
放弃之前的操作,恢复到原来的值
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1111
QUEUED
127.0.0.1:6379> set k2 v2222
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> get k1
"v1"
3.3.3 一粒老鼠屎坏一锅汤
一句报错,全部取消,恢复到原来的值
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> setlalala
(error) ERR unknown command `setlalala`, with args beginning with:
127.0.0.1:6379> set k5 v5
QUEUED
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> keys *
1) "k2"
2) "k3"
3) "k1"
3.3.4 冤有头债有主
追究责任,谁的错,找谁去
127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr k1
QUEUED
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> set k5 v5
QUEUED
127.0.0.1:6379> exec
1) (error) ERR value is not an integer or out of range
2) OK
3) OK
127.0.0.1:6379> keys *
1) "k5"
2) "k1"
3) "k3"
4) "k2"
5) "k4"
3.3.5 watch监控
测试:模拟收入与支出
? 正常情况下:
127.0.0.1:6379> set in 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby in 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20
? 特殊情况下:
127.0.0.1:6379> watch in
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby in 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec
(nil)
? unwatch:取消watch命令对所有key的操作
? 一旦执行了exec命令,那么之前加的所有监控自动失效!
3.4 Redis的发布订阅
进程间的一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。例如:微信订阅号
订阅一个或多个频道
127.0.0.1:6379> subscribe cctv1 cctv5 cctv6
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "cctv1"
3) (integer) 1
1) "subscribe"
2) "cctv5"
3) (integer) 2
1) "subscribe"
2) "cctv6"
3) (integer) 3
1) "message"
2) "cctv5"
3) "NBA"
127.0.0.1:6379> publish cctv5 NBA
(integer) 1
3.5 主从复制
就是 redis集群的策略
配从(库)不配主(库):小弟可以选择谁是大哥,但大哥没有权利去选择小弟
读写分离:主机写,从机读
3.5.1 一主二仆
1.准备三台服务器,并修改redis.conf
bind 0.0.0.0
2.启动三台redis,并查看每台机器的角色,都是master
info replication
3.测试开始
3.1) 首先,将三个机器全都清空,第一台添加值
mset k1 v1 k2 v2
3.2) 其余两台机器,复制(找大哥)
slaveof 192.168.44.129 6379
3.3) 第一台再添加值
set k3 v3
思考1:slave之前的k1和k2是否能拿到?
? 可以获得,只要跟了大哥,之前的数据也会立刻同步
思考2:slave之后的k3是否能拿到?
? 可以获得,只要跟了大哥,数据会立刻同步
思考3:同时添加k4,结果如何?
? 主机(129master)可以添加成功,从机(130和131是slave)失败,从机只负责读取数据,无权写入数据,这就是“读写分离”
思考4:主机shutdown,从机如何?
? 130和131仍然是slave,并显示他们的master已离线
思考5:主机重启,从机又如何?
? 130和131仍然是slave,并显示他们的master已上线
思考6:从机死了,主机如何?
? 从机归来身份是否变化? 主机没有变化,只是显示少了一个slave
? 主机和从机没有变化,而重启归来的从机自立门户成为了master,不和原来的集群在一起了
3.5.2 血脉相传
一个主机理论上可以多个从机,但是这样的话,这个主机会很累
我们可以使用java面向对象继承中的传递性来解决这个问题,减轻主机的负担
形成祖孙三代:
127.0.0.1:6379> slaveof 192.168.44.129 6379
OK
127.0.0.1:6379> slaveof 192.168444.130 6379
OK
3.5.3 谋权篡位
1个主机,2个从机,当1个主机挂掉了,只能从2个从机中再次选1个主机
国不可一日无君,军不可一日无帅
手动选老大
模拟测试:1为master,2和3为slave,当1挂掉后,2篡权为master,3跟2
slaveof no one
slaveof 192.168.44.130 6379
思考:当1再次回归,会怎样?
? 2和3已经形成新的集群,和1没有任何的关系了。所以1成为了光杆司令
3.5.4 复制原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x48CmhGl-1633005745120)(E:\MarkDown\拉勾笔记\redis 主从复制原理)]
完成上面几个步骤后就完成了从服务器数据初始化的所有操作,从服务器此时可以接收来自用户的读请求
全量复制:Slave初始化阶段,这时Slave需要将Master上的所有数据都复制一份slave接收到数据文件后,存盘,并加载到内存中;(步骤1234)
增量复制:Slave初始化后,开始正常工作时主服务器发生的写操作同步到从服务器的过程;(步骤56)
? 但,只要是重新连接master,一次性(全量复制)同步将自动执行;
Redis主从同步策略:
? 主从刚刚连接的时候,进行全量同步;
? 全同步结束后,进行增量同步。当然,如果有需要,slave 在任何时候都可以发起全量同步。
? redis 策略是,无论如何,首先会尝试进行增量同步,如不成功,要求从机进行全量同步。
3.5.5 哨兵模式
自动版的谋权篡位!
有个哨兵一直在巡逻,突然发现!!!!!老大挂了,小弟们会自动投票,从众小弟中选出新的老大
Sentinel是Redis的高可用性解决方案:
? 由一个或多个Sentinel实例组成的Sentinel系统可以监视任意多个主服务器,以及所有从服务器,并在被监视的主服务器进入下线状态时,自动将下线主服务器属下的某个从服务器升级为新的主服务器,然后由新的主服务器代替已下线的主服务器继续处理命令请求
模拟测试
- 1主,2和3从
- 每一台服务器中创建一个配置文件sentinel.conf,名字绝不能错,并编辑sentinel.conf
sentinel monitor redis129 192.168.44.129 6379 1
- 启动服务的顺序:主Redis --> 从Redis --> Sentinel1/2/3
redis-sentinel sentinel.conf
- 将1号老大挂掉,后台自动发起激烈的投票,选出新的老大
127.0.0.1:6379> shutdown
not connected> exit
-
查看最后权利的分配 ? 3成为了新的老大,2还是小弟 -
如果之前的老大再次归来呢? ? 1号再次归来,自己成为了master,和3平起平坐 ? 过了几秒之后,被哨兵检测到了1号机的归来,1号你别自己玩了,进入集体吧,但是新的老大已经产生了,你只能作为小弟再次进入集体!
3.5.6 缺点
由于所有的写操作都是在master上完成的;
然后再同步到slave上,所以两台机器之间通信会有延迟;
当系统很繁忙的时候,延迟问题会加重;
slave机器数量增加,问题也会加重
3.6 总配置redis.conf 详解
件,
daemonize no
pidfile /var/run/redis.pid
port 6379
tcp-backlog 511
timeout 0
tcp-keepalive 0
loglevel notice
logfile ""
databases 16
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir ./
slave-serve-stale-data yes
slave-read-only yes
repl-disable-tcp-nodelay no
slave-priority 100
appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
lua-time-limit 5000
slowlog-log-slower-than 10000
slowlog-max-len 128
notify-keyspace-events ""
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-entries 512
list-max-ziplist-value 64
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
hll-sparse-max-bytes 3000
activerehashing yes
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60
hz 10
aof-rewrite-incremental-fsync yes
? 通常情况下,默认的配置足够你解决问题!
? 没有极特殊的要求,不要乱改配置!
3.7 Jedis
java和redis打交道的API客户端
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
3.7.1 连接redis
public class Test1 {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.44.129",6379);
String pong = jedis.ping();
System.out.println("pong = " + pong);
}
}
3.7.2 常用API
public class Test2_API {
private void testString(){
Jedis jedis = new Jedis("192.168.44.129",6379);
jedis.set("k1","v1");
jedis.set("k2","v2");
jedis.set("k3","v3");
Set<String> set = jedis.keys("*");
Iterator<String> iterator = set.iterator();
for (set.iterator();iterator.hasNext();){
String k = iterator.next();
System.out.println(k+"->"+jedis.get(k));
}
Boolean k2Exists = jedis.exists("k2");
System.out.println("k2Exists = " + k2Exists);
System.out.println(jedis.ttl("k1"));
System.out.println(jedis.mget("k1","k2","k3","k4","k5"));
System.out.println("-------------------------------------------------------");
}
private void testList(){
Jedis jedis = new Jedis("192.168.44.129",6379);
List<String> list01 = jedis.lrange("list01", 0, -1);
for (String s : list01) {
System.out.println(s);
}
System.out.println("-------------------------------------------------------");
}
private void testSet(){
Jedis jedis = new Jedis("192.168.44.129",6379);
jedis.sadd("order","jd001");
jedis.sadd("order","jd002");
jedis.sadd("order","jd003");
Set<String> order = jedis.smembers("order");
Iterator<String> order_iterator = order.iterator();
while(order_iterator.hasNext()){
String s = order_iterator.next();
System.out.println(s);
}
jedis.srem("order","jd002");
System.out.println(jedis.smembers("order").size());
}
private void testHash(){
Jedis jedis = new Jedis("192.168.44.129",6379);
jedis.hset("user1","username","james");
System.out.println(jedis.hget("user1","username"));
HashMap<String, String> map = new HashMap<>();
map.put("username","tom");
map.put("gender","boy");
map.put("address","beijing");
map.put("phone","15152037019");
jedis.hmset("user2",map);
List<String> list = jedis.hmget("user2", "username", "phone");
for (String s : list) {
System.out.println(s);
}
}
private void testZset(){
Jedis jedis = new Jedis("192.168.44.129",6379);
jedis.zadd("zset01",60d,"zs1");
jedis.zadd("zset01",70d,"zs2");
jedis.zadd("zset01",80d,"zs3");
jedis.zadd("zset01",90d,"zs4");
Set<String> zset01 = jedis.zrange("zset01", 0, -1);
Iterator<String> iterator = zset01.iterator();
while(iterator.hasNext()){
String s = iterator.next();
System.out.println(s);
}
}
public static void main(String[] args) {
Test2_API api = new Test2_API();
api.testZset();
}
}
3.7.3 事务
初始化余额和支出
set yue 100
set zhichu 0
public class TestTransaction {
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis("192.168.44.129", 6379);
int yue = Integer.parseInt(jedis.get("yue"));
int zhichu = 10;
jedis.watch("yue");
Thread.sleep(5000);
if (yue < zhichu){
jedis.unwatch();
System.out.println("余额不足!");
}else {
Transaction transaction = jedis.multi();
transaction.decrBy("yue",zhichu);
transaction.incrBy("zhichu",zhichu);
transaction.exec();
System.out.println("余额:" + jedis.get("yue"));
System.out.println("累计支出:" + jedis.get("zhichu"));
}
}
}
模拟网络延迟:,10秒内,进入linux修改余额为5,这样,余额<支出,就会进入if
3.7.4 JedisPool
redis的连接池技术
详情:https://help.aliyun.com/document_detail/98726.html
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
使用单例模式进行优化
public class JdeisPoolUtil {
private JdeisPoolUtil(){}
private volatile static JedisPool jedisPool = null;
private volatile static Jedis jedis = null;
private static JedisPool getInstance(){
if (jedisPool == null){
synchronized (JdeisPoolUtil.class){
if (jedisPool == null){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(1000);
config.setMaxIdle(30);
config.setMaxWaitMillis(60*1000);
config.setTestOnBorrow(true);
jedisPool = new JedisPool(config, "192.168.44.129", 6379);
}
}
}
return jedisPool;
}
public static Jedis getJedis(){
if (jedis == null){
jedis = getInstance().getResource();
}
return jedis;
}
}
测试类
public class Test_JedisPool {
public static void main(String[] args) {
Jedis jedis1 = JdeisPoolUtil.getJedis();
Jedis jedis2 = JdeisPoolUtil.getJedis();
System.out.println(jedis1 == jedis2);
}
}
3.8 高并发下的分布式锁
经典案例:秒杀,抢购优惠券等
3.8.1 搭建工程并测试单线程
pom.xml
<packaging>war</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<configuration>
<port>8001</port>
<path>/</path>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
id="WebApp_ID" version="3.1">
<servlet>
<servlet-name>springmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring/spring.xml</param-value>
</init-param>
</servlet>
<servlet-mapping>
<servlet-name>springmvc</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>
spring-dao.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="controller"/>
<bean id="stringRedisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
</bean>
<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="192.168.44.129"></property>
<property name="port" value="6379"/>
</bean>
</beans>
测试类
@Controller
public class TestKill {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/kill")
public @ResponseBody synchronized String kill(){
int phoneCount = Integer.parseInt(stringRedisTemplate.opsForValue().get("phone"));
if (phoneCount > 0){
phoneCount--;
stringRedisTemplate.opsForValue().set("phone",phoneCount+"");
System.out.println("库存-1,剩余:" + phoneCount);
}else {
System.out.println("库存不足!");
}
return "over";
}
}
3.8.2 高并发测试
1.启动两次工程,端口号分别8001和8002
2.使用nginx做负载均衡
upstream wei{
server 192.168.44.1:8001;
server 192.168.44.1:8002;
}
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
proxy_pass http://wei;
root html;
index index.html index.htm;
}
/usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf
3.使用 JMeter 模拟1秒内发出100个http请求,会发现同一个商品会被两台服务器同时抢购!
3.8.3 实现分布式锁的思路
1.因为redis是单线程的,所以命令也就具备原子性,使用setnx命令实现锁,保存k-v
? 如果k不存在,保存(当前线程加锁),执行完成后,删除k表示释放锁
? 如果k已存在,阻塞线程执行,表示有锁
2.如果加锁成功,在执行业务代码的过程中出现异常,导致没有删除k(释放锁失败),那么就会造成死锁(后面的所有线程都无法执行)!
? 设置过期时间,例如10秒后,redis自动删除
3.高并发下,由于时间段等因素导致服务器压力过大或过小,每个线程执行的时间不同
? 第一个线程,执行需要13秒,执行到第10秒时,redis自动过期了k(释放锁)
? 第二个线程,执行需要7秒,加锁,执行第3秒(锁 被释放了,为什么,是被第一个线程的 finally主动deleteKey释放掉了)
? 。。。连锁反应,当前线程刚加的锁,就被其他线程释放掉了,周而复始,导致锁会永久失效
4.给每个线程加上唯一的标识UUID随机生成,释放的时候判断是否是当前的标识即可
5.问题又来了,过期时间如果设定?
? 如果10秒太短不够用怎么办?
? 设置60秒,太长又浪费时间
? 可以开启一个定时器线程,当过期时间小于总过期时间的1/3时,增长总过期时间(吃仙丹续命!)
自己实现分布式锁,太难了!
3.8.4 Redisson
Redis 是最流行的 NoSQL 数据库解决方案之一,而 Java 是世界上最流行的编程语言之一。
虽然两者看起来很自然地在一起“工作”,但是要知道,Redis 其实并没有对 Java 提供原生支持。
相反,作为 Java 开发人员,我们若想在程序中集成 Redis,必须使用 Redis 的第三方库。
而 Redisson 就是用于在 Java 程序中操作 Redis 的库,它使得我们可以在程序中轻松地使用 Redis。
Redisson 在 java.util 中常用接口的基础上,为我们提供了一系列具有分布式特性的工具类。
@Controller
public class TestKill {
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/kill")
public @ResponseBody synchronized String kill(){
String productKey = "HUAWEI-P40";
RLock rLock = redisson.getLock(productKey);
rLock.lock(30, TimeUnit.SECONDS);
try {
int phoneCount = Integer.parseInt(stringRedisTemplate.opsForValue().get("phone"));
if (phoneCount > 0){
phoneCount--;
stringRedisTemplate.opsForValue().set("phone",phoneCount+"");
System.out.println("库存-1,剩余:" + phoneCount);
}else {
System.out.println("库存不足!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return "over";
}
@Bean
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.44.129:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
? 实现分布式锁的方案其实有很多,我们之前用过的zookeeper的特点就是高可靠性,现在我们用的 redis特点就是高性能。
? 目前分布式锁,应用最多的仍然是“Redis”
分布式文件系统—FastDFS
1. 场景概述
天猫,淘宝等购物网站,大量的图片和视频,文件太多,如何存储?
用户访问量大又如何保证下载速度?分布式文件系统就是解决这些问题的!
1.1 什么是文件系统
文件数据是如何存储的??
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XPy8Ledh-1633005745123)(E:\MarkDown\拉勾笔记\FastDFS 什么是文件系统)]
1.2 分布式文件系统
一台电脑存储量有限,并且并发吞吐量也有限,如何提高性能?
一吨货物,我要运送到吐鲁番:
? 1个人运,不敢想象
? 50个人运,太难了;
? 500个人运,每个人都很轻松;
这就是分布式吗?
答:这里面有集群的概念,也有分布式的概念,二者不要混淆,面试常问的经典题目
分布式:不同的业务模块部署在不同的服务器上或者同一个业务模块分拆多个子业务,部署不同的服务器上。解决高并发的问题;
集群:同一个业务部署在多台服务器上,提高系统的高可用
例如:
? 小饭馆原来只有一个厨师,切菜洗菜备料一手抓。客人越来越多,一个厨师忙不过来,只能再请一个厨师,两个厨师都能炒菜,也就是两个厨师的作用是一样的,这样,两个厨师的关系就是“集群”;
? 为了让厨师专心炒菜,把菜炒到极致,又请了配菜师负责切菜,备料等工作。厨师和备菜师的关系是“分布式”;
? 一个备菜师忙不过来,要提供两份食材给两个厨师,又请了一个备菜师,两个备菜师的关系又是“集群”。
1.3 主流的分布式文件系统
1.3.1 HDFS
(Hadoop Distributed File System)Hadoop 分布式文件系统;
高容错的系统,适合部署到廉价的机器上;
能提供高吞吐量的数据访问,非常适合大规模数据应用;
HDFS采用主从结构,一个HDFS是由一个name节点和N个data节点组成;
name节点储存元数据,一个文件分割成N份存储在不同的data节点上。
1.3.2 GFS
Google File System
可扩展的分布式文件系统,用于大型的,分布式的,对大量数据进行访问的应用;
运行于廉价的普通硬件上,可以提供容错功能;
它可以给大量的用户提供总体性能较高的服务;
GFS采用主从结构,一个GFS集群由一个master和大量的chunkserver(分块服务器)组成;
一个文件被分割若干块,分散储存到多个分块server中
1.3.3.FastDFS
由淘宝资深架构师余庆编写并开源;
专为互联网量身定制,充分考虑了冗余备份、负载均衡、线性扩容等机制,并注重高可用、高性能等指标,使用FastDFS很容易搭建一套高性能的文件服务器集群提供文件上传、下载等服务;
HDFS,GFS等都是通用的文件系统,他们的优点是开发体验好,但是系统的复杂度较高,性能也一般;
相比之下,专用的分布式文件系统体验差,但是复杂度低,性能也高,尤其fastDFS特别适合图片,小视频等小文件,因为fastDFS对文件是不分割的,所以没有文件合并的开销;
网络通信用socket,速度快。
1.4 工作原理
fastDFS包含Tracker Server和Storage Server;
客户端请求Tracker Server进行文件的上传与下载;
Tracker Server调度Storage Server最终完成上传与下载
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QCz2Vp8K-1633005745125)(E:\MarkDown\拉勾笔记\fastDFS 工作原理)]
Tracker (译:追踪者)
? 作用是负载均衡和调度,它管理着存储服务(Storage Server),可以理解为:“大管家,追踪者,调度员”;
? Tracker Server可以集群,实现高可用,策略为“轮询”。
Storage (译:仓库; 贮存器)
? 作用是文件存储,客户端上传的文件最终存储到storage服务器上;
? storage集群采用分组的方式,同组内的每台服务器是平等关系,数据同步,目的是实现数据备份,从而高可用,而不同组的服务器之间是不通信的;
? 同组内的每台服务器的存储量不一致的情况下,会选取容量最小的那个,所以同组内的服务器之间软硬件最好保持一致。
? Storage Server会连接集群中的所有Tracker Server,定时向他们汇报自己的状态,例如:剩余空间,文件同步情况,文件上传下载次数等信息。
1.5 上传/下载 原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e52xnRG9-1633005745126)(E:\MarkDown\拉勾笔记\FastDFS 上传原理)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lB6Jn0PH-1633005745129)(E:\MarkDown\拉勾笔记\FastDFS 下载原理)]
客户端上传文件后,storage会将文件id返回给客户端
group1/M00/02/11/aJxAeF21O5wAAAAAAAAGaEIOA12345.sh
? 组名:文件上传后,在storage组的名称,文件上传成功后,由storage返回,需要客户端自行保存。
? 虚拟磁盘路径:
? storage配置的虚拟路径,在磁盘选项storage_path对应。
? storage_path0对应M00,
? storage_path1对应M01,
? 数据两级目录: storage在虚拟磁盘下自行创建的目录。
? 文件名: 与上传时不同,是用storage根据特定信息生成的,里面包含:storage服务器的ip,创建时间戳,大小,后缀名等信息
2. FastDFS的上传与下载
2.1 安装
2.1.1 安装gcc(编译时需要)
yum install -y gcc gcc-c++
2.1.2 安装libevent(运行时需求)
yum -y install libevent
2.1.3 安装 libfastcommon
libfastcommon是FastDFS官方提供的,libfastcommon包含了FastDFS运行所需要的一些基础库。
1.上传 libfastcommon-master.zip 到 /opt
安装解压zip包的命令: yum install -y unzip
解压包: unzip libfastcommon.zip
进入目录: cd libfastcommon-master
2.编译
./make.sh
如果:make.sh的权限不够,则需要授权(可执行的权利)
chmod 777 make.sh
3.安装
./make.sh install
libfastcommon安装好后会在/usr/lib64 目录下生成 libfastcommon.so 库文件
4.拷贝库文件
cd /usr/lib64
cp libfastcommon.so /usr/lib
2.1.4 安装Tracker
1.下载 FastDFS_v5.05.tar.gz,并上传到 /opt
tar -zxvf FastDFS_v5.05.tar.gz
cd FastDFS
./make.sh
./make.sh install
2.安装成功将安装目录下的conf下的文件拷贝到/etc/fdfs/下
cp /opt/FastDFS/conf/* /etc/fdfs/
2.2 配置
1.Tracker配置
vim /etc/fdfs/tracker.conf
port=22122
mkdir /home/fastdfs)
base_path=/home/fastdfs
2.Storage配置
vim /etc/fdfs/storage.conf
group_name=group1
port=23000
heart_beat_interval=30
base_path=/home/fastdfs
store_path0=/home/fastdfs/fdfs_storage
tracker_server=192.168.44.129:22122
2.3 启动服务
1.启动tracker
/usr/bin/fdfs_trackerd /etc/fdfs/tracker.conf restart
2.启动storage
/usr/bin/fdfs_storaged /etc/fdfs/storage.conf restart
3.查看所有运行的端口:
netstat -ntlp
2.4 搭建 Java工程
使用IDEA创建maven工程
2.4.1 pom.xml
<dependency>
<groupId>net.oschina.zcx7878</groupId>
<artifactId>fastdfs-client-java</artifactId>
<version>1.27.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
2.4.2 创建配置文件
在resources下创建config目录,在config目录下创建 fastdfs-client.properties,内容如下:
##fastdfs-client.properties
fastdfs.connect_timeout_in_seconds = 5
fastdfs.network_timeout_in_seconds = 30
fastdfs.charset = UTF-8
fastdfs.http_anti_steal_token = false
fastdfs.http_secret_key = FastDFS1234567890
fastdfs.http_tracker_http_port = 80
fastdfs.tracker_servers = 192.168.44.129:22122
2.4.3 文件上传
public class TestUpload {
public static void main(String[] args) {
try {
ClientGlobal.initByProperties("config/fastdfs-client.properties");
TrackerClient trackerClient = new TrackerClient();
TrackerServer trackerServer = trackerClient.getConnection();
StorageServer storageServer = null;
StorageClient1 client = new StorageClient1(trackerServer, storageServer);
NameValuePair[] list = new NameValuePair[1];
list[0] = new NameValuePair("fileName","1.jpg");
String fileID = client.upload_file1("G:\\1.jpg", "jpg", list);
System.out.println("fileID = " + fileID);
trackerServer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.4.4 文件查询
public class TestQuery {
public static void main(String[] args) {
try {
ClientGlobal.initByProperties("config/fastdfs-client.properties");
TrackerClient trackerClient = new TrackerClient();
TrackerServer trackerServer = trackerClient.getConnection();
StorageServer storageServer = null;
StorageClient1 client = new StorageClient1(trackerServer, storageServer);
FileInfo fileInfo = client.query_file_info1("group1/M00/00/00/wKgsgWFO2OmAEE5XAExtg1rxSVE472.jpg");
if (fileInfo != null){
System.out.println("fileInfo = " + fileInfo);
}else {
System.out.println("查无此文件!");
}
trackerServer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.4.5 文件下载
public class TestDownload {
public static void main(String[] args) {
try {
ClientGlobal.initByProperties("config/fastdfs-client.properties");
TrackerClient trackerClient = new TrackerClient();
TrackerServer trackerServer = trackerClient.getConnection();
StorageServer storageServer = null;
StorageClient1 client = new StorageClient1(trackerServer, storageServer);
byte[] bytes = client.download_file1("group1/M00/00/00/wKgsgWFO2OmAEE5XAExtg1rxSVE472.jpg");
FileOutputStream fileOutputStream = new FileOutputStream(new File("G:/xxxxx.jpg"));
fileOutputStream.write(bytes);
fileOutputStream.close();
trackerServer.close();
System.out.println("下载完毕!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 项目实战
掌握fastDFS在真实项目中的使用方法;
掌握fastDFS实现图片服务器;
3.1 搭建图片服务器
3.1.1 Nginx模块安装 (Storage)
- 上传 fastdfs-nginx-module_v1.16.tar.gz 到 /opt
- 解压nginx模块
tar -zxvf fastdfs-nginx-module_v1.16.tar.gz
- 修改 config 文件,将文件中的 /usr/local/ 路径改为 /usr/
cd /opt/fastdfs-nginx-module/src
vim config
- 将 fastdfs-nginx-module/src下的 mod_fastdfs.conf 拷贝至 /etc/fdfs 下
cp mod_fastdfs.conf /etc/fdfs/
- 修改 /etc/fdfs/mod_fastdfs.conf
vim /etc/fdfs/mod_fastdfs.conf
base_path=/home/fastdfs
tracker_server=192.168.44.129:22122
url_have_group_name=true
store_path0=/home/fastdfs/fdfs_storage
- 将 libfdfsclient.so 拷贝至 /usr/lib 下
cp /usr/lib64/libfdfsclient.so /usr/lib/
- 创建nginx/client目录
mkdir -p /var/temp/nginx/client
3.1.2 Nginx安装 (Tracker)
- 将 nginx-1.14.0.tar.gz上传到/opt(安装过nginx,此步省略)
- 解压:tar -zxvf nginx-1.14.0.tar.gz(安装过nginx,此步省略)
- 安装依赖库(安装过nginx,此步省略)
yum install pcre
yum install pcre-devel
yum install zlib
yum install zlib-devel
yum install openssl
yum install openssl-devel
- 进入nginx解压的目录下 cd /opt/nginx-1.14.0
- 安装
./configure \
--prefix=/usr/local/nginx \
--pid-path=/var/run/nginx/nginx.pid \
--lock-path=/var/lock/nginx.lock \
--error-log-path=/var/log/nginx/error.log \
--http-log-path=/var/log/nginx/access.log \
--with-http_gzip_static_module \
--http-client-body-temp-path=/var/temp/nginx/client \
--http-proxy-temp-path=/var/temp/nginx/proxy \
--http-fastcgi-temp-path=/var/temp/nginx/fastcgi \
--http-uwsgi-temp-path=/var/temp/nginx/uwsgi \
--http-scgi-temp-path=/var/temp/nginx/scgi \
--add-module=/opt/fastdfs-nginx-module/src
**注意:**上边将临时文件目录指定为 /var/temp/nginx,需要在 /var 下创建 temp 及 nginx 目录:mkdir /var/temp/nginx
- 编译:make
- 安装:make install
- 拷贝配置文件
cd /opt/FastDFS/conf
cp http.conf mime.types /etc/fdfs/
是否覆盖:yes
- 修改nginx配置文件
cd /usr/local/nginx/conf/
vim nginx.conf
server {
listen 80;
server_name 192.168.44.129;
#charset koi8-r;
#access_log logs/host.access.log main;
location /group1/M00 {
root /home/fastdfs/fdfs_storage/data;
ngx_fastdfs_module;
}
}
- 关闭nginx,并启动nginx
pkill -9 nginx
/usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf
- 访问nginx并查看图片
http://192.168.44.129
http://192.168.44.129/group1/M00/00/00/wKgsgWFO2OmAEE5XAExtg1rxSVE472.jpg
3.2 创建前端页面
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
<title>上传图片</title>
</head>
<body>
<%--上传文件,文件与文字比较起来,属于内容较大,必须使用post的提交方式--%>
<%--上传文件,和普通文本有区别,action接收参数也会区别对待,所以声明带文件提交的表单为“多部件表单”--%>
<form action="upload" method="post" enctype="multipart/form-data">
<input type="file" name="fname"><br>
<button>提交</button>
</form>
</body>
</html>
3.3 搭建web服务
3.3.1 pom.xml
<packaging>war</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>provided</scope>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>net.oschina.zcx7878</groupId>
<artifactId>fastdfs-client-java</artifactId>
<version>1.27.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<configuration>
<port>8001</port>
<path>/</path>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3.3.2 web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xsi:schemaLocation="
http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" id="WebApp_ID" version="3.1">
<servlet>
<servlet-name>springMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring/spring-mvc.xml</param-value>
</init-param>
</servlet>
<servlet-mapping>
<servlet-name>springMVC</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>
3.3.3 spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<context:component-scan base-package="controller"/>
<mvc:annotation-driven/>
<bean id="multipartResolver" class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
<property name="maxUploadSize" value="2048000000"/>
</bean>
</beans>
3.3.4 文件实体类
public class FileSystem implements Serializable {
private String fileId;
private String filePath;
private String fileName;
@Override
public String toString(){}
setter...
getter...
}
3.3.5 控制层
@Controller
public class FileAction {
@RequestMapping("/upload")
public @ResponseBody FileSystem upload(MultipartHttpServletRequest request) throws Exception{
FileSystem fileSystem = new FileSystem();
MultipartFile file = request.getFile("fname");
String oldFileName = file.getOriginalFilename();
String hou = oldFileName.substring(oldFileName.lastIndexOf(".") + 1);
String newFileName = UUID.randomUUID().toString() + "." + hou;
File toSaveFile = new File("G:/upload" + newFileName);
file.transferTo(toSaveFile);
String newFilePath = toSaveFile.getAbsolutePath();
ClientGlobal.initByProperties("config/fastdfs-client.properties");
TrackerClient trackerClient = new TrackerClient();
TrackerServer trackerServer = trackerClient.getConnection();
StorageServer storageServer = null;
StorageClient1 client = new StorageClient1(trackerServer,storageServer);
NameValuePair[] list = new NameValuePair[1];
list[0] = new NameValuePair("fileName",oldFileName);
String fileId = client.upload_file1(newFilePath, hou, list);
trackerServer.close();
fileSystem.setFileId(fileId);
fileSystem.setFileName(oldFileName);
fileSystem.setFilePath(fileId);
return fileSystem;
}
}
3.3.6 添加fastDFS的配置文件
在resources下创建config目录,在config目录下创建 fastdfs-client.properties
参考:2.4.2
3.3.7 启动fastDFS服务,测试开始
[root@localhost /]
[root@localhost /]
[root@localhost /]
[root@localhost /]
[root@localhost /]
[root@localhost /]
[root@localhost /]
3.4 典型错误
重启linux服务器,可能会到nginx启动失败:
[root@localhost logs]
[root@localhost /]
导致本次错误的原因,是没有修改pid文件的路径,编辑nginx的配置文件:
vim /usr/local/nginx/conf/nginx.conf
pid /usr/local/nginx/logs/nginx.pid;
再次启动nginx,搞定!
RabbitMQ
1.什么是RabbitMQ
1.1 MQ(Message Queue)消息队列
消息队列中间件,是分布式系统中的重要组件
主要解决,异步处理,应用解耦,流量削峰等问题
从而实现高性能,高可用,可伸缩和最终一致性的架构
使用较多的消息队列产品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等
1.1.1 异步处理
用户注册后,需要发送验证邮箱和手机验证码;
将注册信息写入数据库,发送验证邮件,发送手机,三个步骤全部完成后,返回给客户端
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-l8GdbnuN-1633005745131)(E:\MarkDown\拉勾笔记\RabbitMQ 消息队列-异步处理)]
1.1.2 应用解耦
场景:订单系统需要通知库存系统
如果库存系统异常,则订单调用库存失败,导致下单失败
? 原因:订单系统和库存系统耦合度太高
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IhRcUNK5-1633005745134)(E:\MarkDown\拉勾笔记\RabbitMQ 消息队列-应用解耦)]
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户,下单成功;
库存系统:订阅下单的消息,获取下单信息,库存系统根据下单信息,再进行库存操作;
假如:下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现了订单系统和库存系统的应用解耦;
所以说,消息队列是典型的:生产者消费者模型
生产者不断的向消息队列中生产消息,消费者不断的从队列中获取消息
因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的入侵,这样就实现了生产者和消费者的解耦
1.1.3 流量削峰
抢购,秒杀等业务,针对高并发的场景
因为流量过大,暴增会导致应用挂掉,为解决这个问题,在前端加入消息队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RQtEZmzW-1633005745136)(E:\MarkDown\拉勾笔记\RabbitMQ 消息队列-流量削峰)]
用户的请求,服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束的页面!
说白了,秒杀成功的就是进入队列的用户;
1.2 背景知识介绍
1.2.1 AMQP高级消息队列协议
即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议
协议:数据在传输的过程中必须要遵守的规则
基于此协议的客户端可以与消息中间件传递消息
并不受产品、开发语言等条件的限制
1.2.2 JMS
Java Message Server,Java消息服务应用程序接口, 一种规范,和JDBC担任的角色类似
是一个Java平台中关于面向消息中间件的API,用于两个应用程序之间,或分布式系统中发送消息,进行异步通信
1.2.3 二者的联系
JMS是定义了统一接口,统一消息操作;AMQP通过协议统一数据交互格式
JMS必须是java语言;AMQP只是协议,与语言无关
1.2.4 Erlang语言
Erlang(['?:l??])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CSLab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境
最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此非常适合构建分布式,实时软并行计算系统
Erlang运行时环境是一个虚拟机,有点像Java的虚拟机,这样代码一经编译,同样可以随处运行
1.3 为什么选择RabbitMQ
我们开篇说消息队列产品那么多,为什么偏偏选择RabbitMQ呢?
先看命名:兔子行动非常迅速而且繁殖起来也非常疯狂,所以就把Rabbit用作这个分布式软件的命名
Erlang开发,AMQP的最佳搭档,安装部署简单,上手门槛低
企业级消息队列,经过大量实践考验的高可靠,大量成功的应用案例,例如阿里、网易等一线大厂都有使用
有强大的WEB管理页面
强大的社区支持,为技术进步提供动力
支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能非常丰富
集群扩展很容易,并且可以通过增加节点实现成倍的性能提升
总结:如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择 RabbitMQ,如果你想用一个性能高,但偶尔丢点数据不是很在乎可以使用kafka或者zeroMQ
kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!
1.4 RabbitMQ各组件功能
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GWYFcHAm-1633005745137)(E:\MarkDown\拉勾笔记\RabbitMQ各组件功能)]
Broker:消息队列服务器实体
Virtual Host:虚拟主机
? 标识一批交换机、消息队列和相关对象,形成的整体
? 虚拟主机是共享相同的身份认证和加密环境的独立服务器域
? 每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制
? vhost是AMQP概念的基础,RabbitMQ默认的vhost是 /,必须在链接时指定
Exchange:交换器(路由)
? 用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Queue:消息队列
? 用来保存消息直到发送给消费者。
? 它是消息的容器,也是消息的终点。
? 一个消息可投入一个或多个队列。
? 消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding:绑定,用于消息队列和交换机之间的关联。
Channel:通道(信道)
? 多路复用连接中的一条独立的双向数据流通道。
? 信道是建立在真实的TCP连接内的虚拟链接
? AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信道完成的
? 因为对操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入信道的概念,用来复用TCP连接。
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Message:消息
? 消息是不具名的,它是由消息头和消息体组成。
? 消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
2.怎么用RabbitMQ
想要安装RabbitMQ,必须先安装erlang语言环境,类似安装tomcat,必须先安装JDK
查看匹配的版本:https://www.rabbitmq.com/which-erlang.html
2.1 RabbitMQ安装启动
erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads
2.1.1 安装
[root@localhost opt]
[root@localhost opt]
[root@localhost opt]
2.1.2 启动后台管理插件
[root@localhost opt]
2.1.3 启动RabbitMQ
[root@localhost opt]
[root@localhost opt]
[root@localhost opt]
[root@localhost opt]
2.1.4 查看进程
[root@localhost opt]
2.1.5 测试
1.关闭防火墙: systemctl stop firewalld
2.浏览器输入:http://ip:15672
3.默认帐号密码:guest,guest用户默认不允许远程连接
- 创建账号
[root@localhost opt]
- 设置用户角色
[root@localhost opt]
- 设置用户权限
[root@localhost opt]
- 查看当前用户和角色
[root@localhost opt]
- 修改当前用户密码
[root@localhost opt]
4.管理界面介绍
overview:概览
connections:查看链接情况
channels:信道(通道)情况
Exchanges:交换机(路由)情况,默认4类7个
Queues:消息队列情况
Admin:管理员列表
端口:
? 5672:RabbitMQ提供给编程语言客户端链接的端口
? 15672:RabbitMQ管理界面的端口
? 25672:RabbitMQ集群的端口
2.2 RabbitMQ快速入门
2.2.1 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wei</groupId>
<artifactId>lagou-rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
</project>
2.2.2 日志依赖log4j(可选项)
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
2.2.2 创建连接
先创建好虚拟主机
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.44.129");
factory.setPort(5672);
factory.setVirtualHost("/lagou");
factory.setUsername("wei");
factory.setPassword("123123");
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
System.out.println("connection = " + connection);
connection.close();
}
}
2.3 RabbitMQ模式
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此我们只学习前5种
在线手册:https://www.rabbitmq.com/getstarted.html
5种消息模型,大体分为两类:
? 1和2属于点对点
? 3、4、5属于发布订阅模式(一对多)
点对点模式:P2P(point to point)模式包含三个角色:
? 消息队列(queue),发送者(sender),接收者(receiver)
? 每个消息发送到一个特定的队列中,接收者从中获得消息
? 队列中保留这些消息,直到他们被消费或超时
? 特点:
? 1.每个消息只有一个消费者,一旦消费,消息就不在队列中了
? 2.发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了)
? 3.接收者成功接收消息之后需向对象应答成功(确认)
? 如果希望发送的每个消息都会被成功处理,那需要P2P
发布订阅模式:publish(Pub)/subscribe(Sub)
? pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者 (subcriber)
? 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
? 特点:
? 1.每个消息可以有多个订阅者
? 2.发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅后,才能消费发布者的消息
? 3.为了消费消息,订阅者必须保持运行状态;类似于,看电视直播。
? 如果希望发送的消息被多个消费者处理,可采用本模式
2.3.1 简单模式
下面引用官网的一段介绍:
? RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman.
译文:RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把你想要 寄的邮件放到一个邮箱里,你可以确定邮递员先生或女士最终会把邮件送到你的收件人那里。在 这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。
RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!
类似邮局,处理信件的应该是收件人而不是邮局!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ao0yesDG-1633005745139)(E:\MarkDown\拉勾笔记\RabbitMQ模式-简单模式)]
2.3.1.1 生产者P
public class Sender {
public static void main(String[] args) throws Exception {
String msg = "wei:Hello,RabbitMQ!";
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue1",false,false,false,null);
channel.basicPublish("","queue1",null,msg.getBytes());
System.out.println("发送:" + msg);
channel.close();
connection.close();
}
}
启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认
2.3.1.2 消费者C
public class Recer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("接收 = " + s);
}
};
channel.basicConsume("queue1",true,consumer);
}
}
启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0
2.3.1.3 消息确认机制ACK
1.通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
2.RabbitMQ如何得知消息被消费者接收?
? 如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得知,这样消息就丢失了
? 因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
? ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200 就是告诉我们服务器执行成功
? 整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执
? 不过这种回执ACK分为两种情况:
? 自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
? 手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
? 两种情况如何选择,需要看消息的重要性:
? 如果消息不太重要,丢失也没有影响,自动ACK会比较方便
? 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把消息从队列中删除,如果此时消费者抛异常宕机,那么消息就永久丢失了
3.修改手动消息确认
channel.basicConsume("queue1", false, consumer);
结果如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1wiTI6T3-1633005745140)(E:\MarkDown\拉勾笔记\RabbitMQ 消息确认机制ACK)]
解决问题
public class RecerByACK {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("接收 = " + s);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("queue1",false,consumer);
}
}
2.3.2 工作队列模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VJQV6UWh-1633005745142)(E:\MarkDown\拉勾笔记\RabbitMQ模式-工作队列模式)]
之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能力有限,就会产生消息在队列中堆积(生活中的滞销)
一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?
多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被一个人吃)
2.3.2.1 生产者P
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue",false,false,false,null);
for (int i = 1; i <= 100; i++) {
String msg = "羊肉串 --> " + i;
channel.basicPublish("","test_work_queue",null,msg.getBytes());
System.out.println("新鲜出炉:" + msg);
}
channel.close();
connection.close();
}
}
2.3.2.2 消费者1
public class Recer1 {
static int i = 1;
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue",false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【顾客1】吃掉 " + s + "!总共吃【" + i++ + "串!】");
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("test_work_queue",false,consumer);
}
}
2.3.2.3 消费者2
public class Recer2 {
static int i = 1;
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue",false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【顾客2】吃掉 " + s + "!总共吃【" + i++ + "串!】");
try {
Thread.sleep(900);
} catch (Exception e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("test_work_queue",false,consumer);
}
}
先运行2个消费者,排队等候消费(取餐),再运行生产者开始生产消息(烤肉串)
虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个消息
? 例如:工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A10天完 成,B30天完成,A完成自己的编码部分,就无所事事了,等着B完成就可以了,这样是不可 以的,应该遵循“能者多劳”
? 效率高的多干点,效率低的少干点
? 看下面官网是如何给出解决思路的:
公平的分配
? 您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都 不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。
? 这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它只是盲目地将每条第n个消息分派给第n个消费者。
? 为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的worker。
channel.queueDeclare("test_work_queue",false,false,false,null);
channel.basicQos(1);
能者多劳必须要配合手动的ACK机制才生效
2.3.2.4 面试题:避免消息堆积?
- workqueue,多个消费者监听同一个队列
- 接收到消息后,通过线程池,异步消费
2.3.3 发布订阅模式
看官网:
Publish/Subscribe
? In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we’ll do something completely different – we’ll deliver a message to multiple consumers. This pattern is known as “publish/subscribe”.
? To illustrate the pattern, we’re going to build a simple logging system. It will consist of two programs – the first will emit log messages and the second will receive and print them.
? In our logging system every running copy of the receiver program will get the messages. That way we’ll be able to run one receiver and direct the logs to disk; and at the same time we’ll be able to run another receiver and see the logs on the screen.
? Essentially, published log messages are going to be broadcast to all the receivers.
发布-订阅
? 在上一篇教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都被准确地交付给一个工作者。在这一部分中,我们将做一些完全不同的事情——将消息传递给多个消费者。 此模式称为“发布/订阅”。
? 为了演示这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将发送日志消息,第二个将接收和打印它们。
? 在我们的日志系统中,接收程序的每一个正在运行的副本都将获得消息。这样我们就可以运行一个接收器并将日志指向磁盘;与此同时,我们可以运行另一个接收器并在屏幕上看到日志。
? 基本上,发布的日志消息将广播到所有接收方。
生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视频通知
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZUSK9wSZ-1633005745143)(E:\MarkDown\拉勾笔记\RabbitMQ模式-发布订阅模式1)]
上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)
P生产者发送信息给X路由,X将信息转发给绑定X的队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1Y5yYG9e-1633005745145)(E:\MarkDown\拉勾笔记\RabbitMQ模式-发布订阅模式2)]
X队列将信息通过信道发送给消费者,从而进行消费
整个过程,必须先创建路由
? 路由在生产者程序中创建
? 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没有队列,路由并不知道将信息发送给谁
? 运行程序的顺序:
? 1.MessageSender
? 2.MessageReceiver1和MessageReceiver2
? 3.MessageSender
2.3.3.1 生产者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_exchange_fanout","fanout");
String msg = "hello,大家好!";
channel.basicPublish("test_exchange_fanout","",null,msg.getBytes());
System.out.println("生产者:" + msg);
channel.close();
connection.close();
}
}
2.3.3.2 消费者1
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);
channel.queueBind("test_exchange_fanout_queue_1","test_exchange_fanout","");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】 = " + s);
}
};
channel.basicConsume("test_exchange_fanout_queue_1",true,consumer);
}
}
2.3.3.3 消费者2
public class Recer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_fanout_queue_2",false,false,false,null);
channel.queueBind("test_exchange_fanout_queue_2","test_exchange_fanout","");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者2】 = " + s);
}
};
channel.basicConsume("test_exchange_fanout_queue_2",true,consumer);
}
}
2.3.4 路由模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ps5Dtmwk-1633005745147)(E:\MarkDown\拉勾笔记\RabbitMQ模式-路由模式)]
路由会根据类型进行定向分发消息给不同的队列,如图所示
可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货
2.3.4.1 生产者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_exchange_direct","direct");
String msg = "用户注册,【userid=S101】";
channel.basicPublish("test_exchange_direct","insert",null,msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
2.3.4.2 消费者1
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null);
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","insert");
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","update");
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","delete");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】 = " + s);
}
};
channel.basicConsume("test_exchange_direct_queue_1",true,consumer);
}
}
2.3.4.3 消费者2
public class Recer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null);
channel.queueBind("test_exchange_direct_queue_2","test_exchange_direct","select");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者2】 = " + s);
}
};
channel.basicConsume("test_exchange_direct_queue_2",true,consumer);
}
}
- 记住运行程序的顺序,先运行一次sender(创建路由器)
- 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定
- 再次运行sender,发出消息
2.3.5 通配符模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BXEGGqDq-1633005745148)(E:\MarkDown\拉勾笔记\RabbitMQ模式-通配符模式)]
和路由模式90%是一样的。
唯独的区别就是路由键支持模糊匹配
匹配符号
? *:只能匹配一个词(正好一个词,多一个不行,少一个也不行)
? #:匹配0个或更多个词
看一下官网案例:
? Q1绑定了路由键 * .orange.* Q2绑定了路由键 * .*.rabbit 和 lazy.#
? 下面生产者的消息会被发送给哪个队列?
quick.orange.rabbit
lazy.orange.elephant
quick.orange.fox
lazy.brown.fox
lazy.pink.rabbit
quick.brown.fox
orange
quick.orange.male.rabbit
2.3.5.1 生产者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_exchange_topic","topic");
String msg = "用户注册,【userid=S101】";
channel.basicPublish("test_exchange_topic","user.register",null,msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
2.3.5.2 消费者1
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_topic_queue_1",false,false,false,null);
channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】 = " + s);
}
};
channel.basicConsume("test_exchange_topic_queue_1",true,consumer);
}
}
2.3.5.3 消费者2
public class Recer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_topic_queue_2",false,false,false,null);
channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","product.#");
channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","order.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者2】 = " + s);
}
};
channel.basicConsume("test_exchange_topic_queue_2",true,consumer);
}
}
2.4 持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?
? 消费者的ACK确认机制,可以防止消费者丢失消息
? 万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失
想要将消息持久化,那么路由和队列都要持久化才可以
2.4.1 生产者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_exchange_topic","topic",true);
String msg = "商品降价";
channel.basicPublish("test_exchange_topic","product.price", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
2.4.2 消费者
public class Recer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);
channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】 = " + s);
}
};
channel.basicConsume("test_exchange_topic_queue_1",true,consumer);
}
}
2.5 Spring整合RabbitMQ
五种消息模型,在企业中应用最广泛的就是最后一种:定向匹配topic
Spring AMQP 是基于 Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等,简化了我们对于RabbitMQ相关程序的开发。
2.5.1 生产端工程
依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory"
host="192.168.44.129"
port="5672"
username="wei"
password="123123"
virtual-host="/lagou"/>
<rabbit:queue name="test_spring_queue_1"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:topic-exchange name="spring_topic_exchange">
<rabbit:bindings>
<rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"/>
</beans>
发消息
public class Sender {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
Map<String, String> map = new HashMap<>();
map.put("name","小微");
map.put("email","15952037019@163.com");
rabbitTemplate.convertAndSend("msg.user",map);
context.close();
}
}
2.5.2 消费端工程
依赖与生产者一致
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<rabbit:connection-factory
id="connectionFactory"
host="192.168.204.141"
port="5672"
username="laosun"
password="123123"
virtual-host="/lagou"
/>
<rabbit:queue name="test_spring_queue_1"/>
<rabbit:admin connection-factory="connectionFactory"/>
<context:component-scan base-package="listener"/>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="consumerListener" queuenames="test_spring_queue_1" />
</rabbit:listener-container>
</beans>
消费者
? MessageListener接口用于spring容器接收到消息后处理消息
? 如果需要使用自己定义的类型来实现 处理消息时,必须实现该接口,并重写onMessage()方法
? 当spring容器接收消息后,会自动交由onMessage进行处理
@Component
public class ConsumerListener implements MessageListener {
private static final ObjectMapper MEPPER = new ObjectMapper();
@Override
public void onMessage(Message message) {
try {
JsonNode jsonNode = MEPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
JsonNode email = jsonNode.get("email");
System.out.println("从队列中获取:【" + name +"的邮箱是:" + email + "】");
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动项目
public class TestRunner {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
System.in.read();
}
}
2.6 消息成功确认机制
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
? 事务机制
? 发布确认机制
2.6.1 事务机制
AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式
并利用信道 的三个方法来实现以事务方式 发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
? channel.txSelect(): 开启事务
? channel.txCommit() :提交事务
? channel.txRollback() :回滚事务
Spring已经对上面三个方法进行了封装,所以我们只能使用原始的代码演示
2.6.1.1 生产者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_transaction","topic");
channel.txSelect();
try {
channel.basicPublish("test_transaction", "product.price", null, "商品1-降价".getBytes());
System.out.println(1 / 0);
channel.basicPublish("test_transaction", "product.price", null, "商品2-降价".getBytes());
channel.txCommit();
System.out.println("[ 生产者 ]: 消息已全部发送!");
}catch (Exception e){
System.out.println("消息全部撤销!");
channel.txRollback();
e.printStackTrace();
}finally {
channel.close();
connection.close();
}
}
}
2.6.1.2 消费者
public class Recer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_transaction_queue",false,false,false,null);
channel.queueBind("test_transaction_queue","test_transaction","product.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者】 = " + s);
}
};
channel.basicConsume("test_transaction_queue",true,consumer);
}
}
2.6.2 Confirm发布确认机制
RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是采用事务会大大降低消息的吞吐量
老孙我本机SSD硬盘测试结果10w条消息未开启事务,大约8s发送完毕;而开启了事务后,需要将近310s,差了30多倍。
接着老孙翻阅官网,发现官网中已标注
? Using standard AMQP 0-9-1, the only way to guarantee that a message isn’t lost is by using transactions – make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced. It mimics the consumer acknowledgements mechanism already present in the protocol.
? 关键性译文:开启事务性能最大损失超过250倍
那么有没有更加高效的解决方式呢?答案就是采用Confirm模式。
事务效率为什么会这么低呢?试想一下:10条消息,前9条成功,如果第10条失败,那么9条消息要全部撤销回滚。太太太浪费
而confirm模式则采用补发第10条的措施来完成10条消息的送达
2.6.2.1 在spring中应用
spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory"
host="192.168.44.129"
port="5672"
username="wei"
password="123123"
virtual-host="/lagou"
publisher-confirms="true"/>
<rabbit:queue name="test_spring_queue_1"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:topic-exchange name="spring_topic_exchange">
<rabbit:bindings>
<rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"
confirm-callback="messageConfirm"/>
<bean id="messageConfirm" class="confirm.MessageConfirm"/>
</beans>
消息确认处理类
public class MessageConfirm implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b == true){
System.out.println("消息确认成功!");
}else {
System.out.println("xxxxx 消息确认失败 xxxxx");
}
}
}
log4j.properties
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rabbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l%m%n
log4j.rootLogger=debug, stdout,file
发送消息
public class Sender {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
Map<String, String> map = new HashMap<>();
map.put("name","吕布");
map.put("email","6666@163.com");
rabbitTemplate.convertAndSend("lalala","msg.user",map);
System.out.println("消息已发送...");
context.close();
}
}
2.7 消费端限流
在沙漠中行走,3天不喝水,突然喝水,如果使劲喝,容易猝死,要一口一口慢慢喝
我们 Rabbitmq 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,就会出现这样的情况: 巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法同时处理这么多数据, 就会被压垮崩溃
所以,当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,这是用户的行为,我们是无法约束的
所以我们应该对消费端限流,用于保持消费端的稳定
例如:汽车企业不停的生产汽车,4S店有好多库存车卖不出去,但是也不会降价处理,就是要保证市值的稳定,如果生产多少台,就卖多少台,不管价格的话,市场就乱了,所以我们要用不变的价格来稳住消费者购车,才能平稳发展
RabbitMQ 提供了一种 Qos (Quality of Service,服务质量)服务质量保证功能
? 即在非自动确认消息的前提下,如果一定数目的消息未被确认前,不再进行消费新的消息
生产者使用循环发出多条消息
public class Sender {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
Map<String, String> map = new HashMap<>();
map.put("name","吕布");
map.put("email","6666@163.com");
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("msg.user",map);
System.out.println("消息已发送...");
}
context.close();
}
}
生产10条堆积未处理的消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DezMAGnH-1633005745150)(E:\MarkDown\拉勾笔记\RabbitMQ 消费端限流)]
消费者进行限流处理
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<rabbit:connection-factory id="connectionFactory"
host="192.168.44.129"
port="5672"
username="wei"
password="123123"
virtual-host="/lagou"/>
<rabbit:queue name="test_spring_queue_1"/>
<rabbit:admin connection-factory="connectionFactory"/>
<context:component-scan base-package="listener"/>
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
</beans>
@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
private static final ObjectMapper MEPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
JsonNode jsonNode = MEPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
JsonNode email = jsonNode.get("email");
System.out.println("从队列中获取:【" + name +"的邮箱是:" + email + "】");
long msgId = message.getMessageProperties().getDeliveryTag();
channel.basicAck(msgId,true);
Thread.sleep(3000);
System.out.println("休息3秒后,在继续接收消息!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
每次确认接收3条消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zddw71OK-1633005745151)(E:\MarkDown\拉勾笔记\RabbitMQ 消费端限流02)]
2.8 过期时间TTL
Time To Live:生存时间、还能活多久,单位毫秒
在这个周期内,消息可以被消费者正常消费,超过这个时间,则自动删除(其实是被称为dead message并投入到死信队列,无法消费该消息)
RabbitMQ可以对消息和队列设置TTL
? 通过队列设置,队列中所有消息都有相同的过期时间
? 对消息单独设置,每条消息的TTL可以不同(更颗粒化)
2.8.1 设置队列TTL
spring-rabbitmq-producer.xml
<rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000"/>
</rabbit:queue-arguments>
</rabbit:queue>
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CO9Lb5jT-1633005745153)(E:\MarkDown\拉勾笔记\RabbitMQ 过期时间TTL-设置队列TTL)]
5秒之后,消息自动删除
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Eu0GA1DD-1633005745154)(E:\MarkDown\拉勾笔记\RabbitMQ 过期时间TTL-设置队列TTL02)]
2.8.2 设置消息TTL
设置某条消息的ttl,只需要在创建发送消息时指定即可
<rabbit:queue name="test_spring_queue_ttl_2">
public class Sender2 {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
MessageProperties properties = new MessageProperties();
properties.setExpiration("3000");
Message message = new Message("测试过期时间".getBytes(),properties);
rabbitTemplate.convertAndSend("msg.user",message);
System.out.println("消息已发送...");
context.close();
}
}
如果同时设置了queue和message的TTL值,则二者中较小的才会起作用
2.9 死信队列
DLX(Dead Letter Exchanges)死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时消费而变成死信(dead message)后,这些消息就会被分发到DLX交换机中,而绑定DLX交换机的队列,称之为:“死信队列”
消息没有被及时消费的原因:
? 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
? 消息超时未消费
? 达到最大队列长度
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PZdJ56aK-1633005745156)(E:\MarkDown\拉勾笔记\RabbitMQ 死信队列)]
spring-rabbitmq-producer-dlx.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory"
host="192.168.44.129"
port="5672"
username="wei"
password="123123"
virtual-host="/lagou"
publisher-confirms="true"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="my_exchange"/>
<rabbit:queue name="dlx_queue"/>
<rabbit:direct-exchange name="dlx_exchange">
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="dlx_queue"/>
<rabbit:binding key="dlx_max" queue="dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:direct-exchange name="my_exchange">
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="test_ttl_queue"/>
<rabbit:binding key="dlx_max" queue="test_max_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:queue name="test_ttl_queue">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="6000"/>
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="test_max_queue">
<rabbit:queue-arguments>
<entry key="x-max-length" value-type="long" value="2"/>
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
</beans>
发消息进行测试
public class SenderDLX {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
rabbitTemplate.convertAndSend("dlx_max","测试长度1".getBytes());
rabbitTemplate.convertAndSend("dlx_max","测试长度2".getBytes());
rabbitTemplate.convertAndSend("dlx_max","测试长度3".getBytes());
System.out.println("消息已发送...");
context.close();
}
}
2.10 延迟队列
延迟队列:TTL + 死信队列的合体
死信队列只是一种特殊的队列,里面的消息仍然可以消费
在电商开发部分中,都会涉及到延时关闭订单,此时延迟队列正好可以解决这个问题
2.10.1 生产者
沿用上面死信队列案例的超时测试,超时时间改为订单关闭时间即可
public class SenderDLX {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
rabbitTemplate.convertAndSend("dlx_ttl","超时,关闭订单".getBytes());
System.out.println("消息已发送...");
context.close();
}
}
2.10.2 消费者
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<rabbit:connection-factory id="connectionFactory" host="192.168.44.129" port="5672" username="wei" password="123123" virtual-host="/lagou"/>
<rabbit:admin connection-factory="connectionFactory"/>
<context:component-scan base-package="listener"/>
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="dlx_queue" />
</rabbit:listener-container>
</beans>
@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
private static final ObjectMapper MEPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String str = new String(message.getBody());
System.out.println("str = " + str);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.RabbitMQ集群
rabbitmq有3种模式,但集群模式是2种。详细如下:
单一模式:即单机情况不做集群,就单独运行一个rabbitmq而已。之前我们一直在用
普通模式:默认模式,以两个节点(A、B)为例来进行说明
? 当消息进入A节点的Queue后,consumer从B节点消费时,RabbitMQ会在A和B之间创建临时通道进行消息传输,把A中的消息实体取出并经过通道交给B发送给consumer
? 当A故障后,B就无法取到A节点中未消费的消息实体
? 如果做了消息持久化,那么得等A节点恢复,然后才可被消费
? 如果没有持久化的话,就会产生消息丢失的现象
镜像模式:非常经典的 mirror 镜像模式,保证 100% 数据不丢失。
? 高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步
? 对于 100% 数据可靠性解决方案,一般是采用 3 个节点。
? 在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式
还有主备模式,远程模式,多活模式等,本次课程不作为重点,可自行查阅资料了解
3.1 集群搭建
前置条件:准备两台linux,并安装好rabbitmq
集群步骤如下:
1.修改 /etc/hosts 映射文件
? 1号服务器:
127.0.0.1 A localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 A localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.44.129 A
192.168.44.130 B
? 2号服务器:
127.0.0.1 B localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 B localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.44.129 A
192.168.44.130 B
修改hosts文件,要重启服务器,reboot
2.相互通信,cookie必须保持一致,同步 rabbitmq的cookie 文件:跨服务器拷贝 .erlang.cookie (隐藏文件,使用 ls -all 显示)
[root@A opt]
修改cookie文件,要重启服务器,reboot
3.停止防火墙,启动rabbitmq服务
[root@A ~]
[root@A ~]
4.加入集群节点
[root@B ~]
[root@B ~]
[root@B ~]
5.查看节点状态
[root@B ~]
6.查看管理端
? 搭建集群结构之后,之前创建的交换机、队列、用户都属于单一结构,在新的集群环境中是不能用的
? 所以在新的集群中重新手动添加用户即可(任意节点添加,所有节点共享)
[root@A ~]
[root@A ~]
[root@A ~]
? 注意:当节点脱离集群还原成单一结构后,交换机,队列和用户等数据都会重新回来
此时,集群搭建完毕,但是默认采用的模式“普通模式”,可靠性不高
3.2 镜像模式
将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态一致
语法:set_policy {name} {pattern} {definition}
? name:策略名,可自定义
? pattern:队列的匹配模式(正则表达式)
? “^” 可以使用正则表达式,比如"^queue_" 表示对队列名称以“queue_”开头的所有队列进行镜像,而"^"表示匹配所有的队列
? definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
? ha-mode:(High Available,高可用)模式,指明镜像队列的模式,有效值为 all/exactly/nodes,当前策略模式为 all,即复制到所有节点,包含新增节点
? all:表示在集群中所有的节点上进行镜像
? exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
? nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
? ha-params:ha-mode模式需要用到的参数
? ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
[root@A ~]
通过管理端设置镜像策略
3.3 HAProxy实现镜像队列的负载均衡
虽然我们在程序中访问A服务器,可以实现消息的同步,虽然在同步,但都是A服务器在接收消息,A太累
是否可以像Nginx一样,做负载均衡,A和B轮流接收消息,再镜像同步
3.3.1 HAProxy简介
HA(High Available,高可用),Proxy(代理)
HAProxy是一款提供高可用性,负载均衡,并且基于TCP和HTTP应用的代理软件
HAProxy完全免费
HAProxy可以支持数以万计的并发连接
HAProxy可以简单又安全的整合进架构中,同时还保护web服务器不被暴露到网络上
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Yw0rbCxi-1633005745158)(E:\MarkDown\拉勾笔记\RabbitMQ HAProxy简介)]
3.3.2 HAProxy与Nginx
OSI:(Open System Interconnection:开放式系统互联 是把网络通信的工作分为7层,分别是物理层,数据链路层,网络层,传输层,会话层,表示层和应用层)
Nginx的优点:
? 工作在OSI第7层,可以针对http应用做一些分流的策略
? Nginx对网络的依赖非常小,理论上能ping通就就能进行负载功能,屹立至今的绝对优势
? Nginx安装和配置比较简单,测试起来比较方便;
? Nginx不仅仅是一款优秀的负载均衡器/反向代理软件,它同时也是功能强大的Web应用服务器
HAProxy的优点:
? 工作在网络4层和7层,支持TCP与Http协议
? 它仅仅就只是一款负载均衡软件;单纯从效率上来讲HAProxy更会比Nginx有更出色的负载均衡速度,在并发处理上也是优于Nginx的
? 支持8种负载均衡策略 ,支持心跳检测
性能上HA胜,功能性和便利性上Nginx胜
对于Http协议,Haproxy处理效率比Nginx高。所以,没有特殊要求的时候或者一般场景,建议使用Haproxy来做Http协议负载
但如果是Web应用,那么建议使用Nginx!
总之,大家可以结合各自使用场景的特点来进行合理地选择
3.3.3 安装和配置
HAProxy下载:http://www.haproxy.org/download/1.8/src/haproxy-1.8.12.tar.gz
解压
[root@localhost opt]
make时需要使用 TARGET 指定内核及版本
[root@localhost opt]
3.10.0-229.el7.x86_64
根据内核版本选择编译参数:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b2kCZhAX-1633005745159)(E:\MarkDown\拉勾笔记\RabbitMQ HAProxy安装)]
进入目录,编译和安装
[root@localhost opt]
[root@localhost haproxy-1.8.12]
[root@localhost haproxy-1.8.12]
安装成功后,查看版本
[root@localhost haproxy-1.8.12]
配置启动文件,复制haproxy文件到/usr/sbin下 ,复制haproxy脚本,到/etc/init.d下
[root@localhost haproxy-1.8.12]
[root@localhost haproxy-1.8.12]
[root@localhost haproxy-1.8.12]
创建系统账号
[root@localhost haproxy-1.8.12]
haproxy.cfg 配置文件需要自行创建
[root@localhost haproxy-1.8.12]
[root@localhost haproxy-1.8.12]
添加配置信息到haproxy.cfg
global
log 127.0.0.1 local0 info
chroot /usr/local/haproxy
user haproxy
group haproxy
uid 99
gid 99
daemon
maxconn 4096
defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
maxconn 2000
timeout connect 5s
timeout client 30s
timeout server 15s
listen rabbitmq_cluster
bind 192.168.44.131:5672
mode tcp
balance roundrobin
server A 192.168.44.129:5672 check inter 5000 rise 2 fall 3
server B 192.168.44.130:5672 check inter 5000 rise 2 fall 3
listen monitor
bind 192.168.44.131:8100
mode http
option httplog
stats enable
stats uri /monitor
stats refresh 5s
启动HAProxy
[root@localhost haproxy]
访问监控中心:http://192.168.44.131:8100/monitor
记得关闭防火墙: systemctl stop firewalld
项目发消息,只需要将服务器地址修改为131即可,其余不变
所有的请求都会交给HAProxy,其负载均衡给每个rabbitmq服务器
3.4 KeepAlived搭建高可用的HAProxy集群
现在的最后一个问题暴露出来了,如果HAProxy服务器宕机,rabbitmq服务器就不可用了。所以我们需要对HAProxy也要做高可用的集群
3.4.1 概述
Keepalived是Linux下一个轻量级别的高可用热备解决方案
Keepalived的作用是检测服务器的状态,它根据TCP/IP参考模型的第三、第四层、第五层交换机制检测每个服务节点的状态,如果有一台web服务器宕机,或工作出现故障,Keepalived将检测到,并将有故障的服务器从系统中剔除,同时使用其他服务器代替该服务器的工作,当服务器工作正常后Keepalived自动将服务器加入到服务器群中,这些工作全部自动完成,不需要人工干涉, 需要人工做的只是修复故障的服务器。
keepalived基于vrrp(Virtual Router Redundancy Protocol,虚拟路由冗余协议)协议,vrrp它是一种主备(主机和备用机)模式的协议,通过VRRP可以在网络发生故障时透明的进行设备切换而不影响主机之间的数据通信
两台主机之间生成一个虚拟的ip,我们称漂移ip,漂移ip由主服务器承担,一但主服务器宕机,备份服务器就会抢夺漂移ip,继续工作,有效的解决了群集中的单点故障
说白了,将多台路由器设备虚拟成一个设备,对外提供统一ip(VIP)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E3TbasNI-1633005745161)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群)]
3.4.2 安装KeepAlived
修改hosts文件的地址映射
ip | 用途 | 主机名 |
---|
192.168.44.131 | KeepAlived HAProxy | C | 192.168.44.132 | KeepAlived HAProxy | D |
安装 keepalived
[root@C ~]
修改配置文件(内容大改,不如删掉,重新创建)
[root@C ~]
[root@C ~]
! Configuration File for keepalived
global_defs {
router_id C
}
vrrp_script chk_haproxy{
script "/etc/keepalived/haproxy_check.sh"
interval 2
weight -20
}
vrrp_instance VI_1 {
state MASTER
interface eno16777736
virtual_router_id 66
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
track_script {
chk_haproxy
}
virtual_ipaddress {
192.168.44.66/24
}
}
virtual_server 192.168.44.66 5672 {
delay_loop 6
lb_algo rr
lb_kind NAT
protocol TCP
real_server 192.168.44.131 5672 {
weight 1
}
}
创建执行脚本 /etc/keepalived/haproxy_check.sh
#!/bin/bash
COUNT=`ps -C haproxy --no-header |wc -l`
if [ $COUNT -eq 0 ];then
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
sleep 2
if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
killall keepalived
fi
fi
Keepalived 组之间的心跳检查并不能察觉到 HAproxy 负载是否正常,所以需要使用此脚本。
在 Keepalived 主机上,开启此脚本检测 HAproxy 是否正常工作,如正常工作,记录日志。
如进程不存在,则尝试重启 HAproxy ,2秒后检测,如果还没有,则关掉主 Keepalived ,此时备 Keepalived 检测到主 Keepalived 挂掉,接管VIP,继续服务
授权,否则不能执行
[root@C etc]
启动keepalived(两台都启动)
[root@C etc]
[root@C etc]
查看状态
[root@C etc]
[root@C etc]
查看ip情况 ip addr 或 ip a
[root@C etc]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wVwK8Vhl-1633005745162)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群02)]
此时,安装完毕,按照上面的步骤就可以安装第二台了(服务器hostname和ip注意要修改)
常见的网络错误:子网掩码、网关等信息要一致
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sUxVez4G-1633005745163)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群03)]
3.4.3 测试ip漂移的规则
查看虚拟ip ip addr 或 ip a
目前,C节点是主机,所以虚拟ip在C节点
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0vAv81QQ-1633005745166)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群-测试ip漂移的规则)]
停止C的keepalived,虚拟ip漂移到D节点
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2bZDwZxk-1633005745167)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群-测试ip漂移的规则02)]
重新启动C节点keepalived,虚拟ip依旧在D节点,并不会由于C的回归而回归
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fzgcE2wd-1633005745168)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群-测试ip漂移的规则03)]
停止D的keepalived,虚拟ip再漂移回C节点
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8NOq7L4O-1633005745170)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群-测试ip漂移的规则04)]
|