IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Redius -> 正文阅读

[大数据]Redius

Redis

Nosql概述

NoSql是什么

NoSQL = Not Only SQL(不仅仅是SQL)

关系型数据库:表格,行,列

很多的数据类型用户的个人信息,社交网络,地理位置这些数据类型的存储不需要一个固定的格式,不需要多余的操作就可以扩展的,例如:Map<String,Object> 使用键值对来控制!

NoSql的特点:

  1. 方便扩展(数据之间没有关系,很好扩展)

  2. 大数据量高性能(Redis一秒写8万次,读取11万,NoSQL的缓存记录级别,是一种细粒度的缓存,性能比较高)

  3. 数据类型是多样型的!(不需要事先设计数据库,随取随用)

  4. 传统RDBMS和NoSQl

    传统的RDBMS
    - 结构化组织
    - SQL
    - 数据和关系都存在单独的表中
    - 操作操作,数据定义语言
    - 严格的一致性
    - 基础的事务
    -.....
    
    Nosql
    - 不仅仅是数据
    - 没有固定的查询语言
    - 键值对存储,列存储,文档存储,图形数据库(社交关系)
    - 最终一致性
    - CAP定理和BASE (异地多活) 
    - 高性能,高可用,高可扩
    -.....
    

    3高和3V

    3V:主要是描述问题的

    1. 海量Velume
    2. 多样Variety
    3. 实时Velocity

    3高:主要是对程序的要求

    1. 高并发
    2. 高可扩
    3. 高性能

    真正的公司实践:NoSql +RDBMS一起使用

NoSQL的四大分类

KV键值对:

  • 新浪: Redis
  • 美团:Redis +Tair
  • 阿里、百度:Redis + memecache

文档型数据库(bson格式 和 json 一样):

  • MongoDB(一般必须要掌握)
    • MOngoDB 是一个基于分布式文件存储的数据库,C++编写,主要用来处理大量的文档
    • MongoDB是一个介于关系型数据库和非关系型数据库中间的产品
    • MongoDB是非关型数据库中功能最丰富,最像关系型数据库的!
  • ConthDB

列存储数据库

  • HBase
  • 分布式文件系统

图关系数据库

  • Neo4j,InfoGrid

  • 存的是关系,比如:朋友圈社交网路,广告推荐!

img

Redis入门

概述

Redis是什么?

Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的使用ANSI,C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list([链表]、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis能干什么?

  1. 内存存储,持久化,内存中是断电即失,所以说持久化很重要(rdb,aof)

  2. 效率高,可以用于高速缓存

  3. 发布订阅系统

  4. 地图信息分析

  5. 计时器,计数器(浏览量)

特性

1.多样数据类型

2.持久化

3.集群

4.事务

学习中需要用到的东西

redis中文官网:http://www.redis.cn/

Redis推荐是在linux服务器上搭建的,我们是基于linux学习

Windows 下安装

下载地址:https://github.com/tporadowski/redis/releases

1.下载得到压缩包

image-20211115125919519

2.解压后开启服务

image-20211115131836888

3.发现出现闪退现象

4.cmd 进入redis目录下,输入redis-server.exe redis.windows.conf

image-20211115131806200

5.测试连接

image-20211115135849275

Linux安装

1.官网下载压缩包:https://redis.io/download

2.解压Redis的安装包,程序建议放在/opt目录下

[root@iZbp10ephck4n6u4xij7pcZ ~]# cd /home
[root@iZbp10ephck4n6u4xij7pcZ home]# ls
canCopy.txt  linux_amd64  mysql  redis-6.2.6.tar.gz  ssm02_train.war  test  ut

# 把Redis移动到/opt目录下
[root@iZbp10ephck4n6u4xij7pcZ home]# mv redis-6.2.6.tar.gz /opt
[root@iZbp10ephck4n6u4xij7pcZ home]# cd /opt
[root@iZbp10ephck4n6u4xij7pcZ opt]# ls
containerd  redis-6.2.6.tar.gz

# 解压安装包
[root@iZbp10ephck4n6u4xij7pcZ opt]# tar -zxvf redis-6.2.6.tar.gz

[root@iZbp10ephck4n6u4xij7pcZ opt]# ls
containerd  redis-6.2.6  redis-6.2.6.tar.gz

3.进入解压后的redis,

image-20211115141308999

4.基本的环境安装

yum install gcc-c++

make

make install

image-20211115141834676

5.redis的默认安装路径:/usr/local/bin

image-20211115141940156

6.将redis配置文件复制到当前目录下

image-20211115142319278

7.redis默认不是后台启动的,修改配置文件

image-20211115142601956

8.启动redis服务

image-20211115143247491

9.查看redis的进程是否开启 ps -ef|grep redis

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IUrm1axv-1637586350751)(https://i.loli.net/2021/11/22/GFhTjd5MOe3kEBo.png)]

10.关闭redis服务 在客户端 输入 shutdown

image-20211115143600475

测试性能

redis-benchmark:压力测试工具

redis-benchmark 命令参数!

image-20211115143803371

简单测试

# 测试100个并发连接 100000请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000

基础知识

redis默认有16个数据库,默认使用的是第0个

# 进入配置文件查看
[root@iZbp10ephck4n6u4xij7pcZ bin]# vim myconfig/redis.conf

image-20211115144936068

可以使用select进行切换数据库

# 开启客户端
[root@iZbp10ephck4n6u4xij7pcZ bin]# redis-cli

# 选择第3个
127.0.0.1:6379> select 3
OK
# 查看大小
127.0.0.1:6379[3]> dbsize
(integer) 0
127.0.0.1:6379[3]> 

清空当前数据库:flushdb

127.0.0.1:6379[3]> keys *
1) "name"
127.0.0.1:6379[3]> flushdb
OK
127.0.0.1:6379[3]> keys *
(empty array)

清空所有数据库:flushall

Redis是单线程的

明白Redis是很快的,Redis是基于内存操作,CPU不是Redis性能瓶颈,Redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程,就使用单线程了!

Redis为什么单线程还那么快

1、误区:高性能的服务器一定是多线程的?

2、误区:多线程(CPU上下文会切换!)一定比单线程效率快

先去CPU>内存>硬盘的速度要有所了解!

核心:redis 是将所有的数据全部放在内存中的,所以说单线程去操作效率就是最高的,多线程(CPU上下文切换:耗时操作!!!),对于内存系统来说,如果没有上下文切换效率就是最高的。

Redis五大数据类型

Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings)散列(hashes)列表(lists)集合(sets)有序集合(sorted sets) 与范围查询, bitmapshyperloglogs地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication)LUA脚本(Lua scripting)LRU驱动事件(LRU eviction)事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。

Redis-Key

127.0.0.1:6379[3]> set name li
OK

# 判断 键 name 是否存在
127.0.0.1:6379[3]> exists name
(integer) 1

# 在当前数据库 移除 键 name 
127.0.0.1:6379[3]> move name 1
(integer) 1
127.0.0.1:6379[3]> exists name
(integer) 0
127.0.0.1:6379[3]> set name 456
OK
127.0.0.1:6379[3]> keys *
1) "name"
127.0.0.1:6379[3]> get name
"456"

# 设置 键name 10s 后失效
127.0.0.1:6379[3]> expire name 10
(integer) 1

# 查看 键name 多久失效
127.0.0.1:6379[3]> ttl name
(integer) 1
127.0.0.1:6379[3]> ttl name
(integer) -2
127.0.0.1:6379[3]> get name
(nil)

127.0.0.1:6379[3]> set name 2
OK
# 查看键种类
127.0.0.1:6379[3]> type name
string

更多命令可以去官网查看

String(字符串)

127.0.0.1:6379[3]> set name 2
OK
127.0.0.1:6379[3]> type name
string 

# 字符串增加字段, 如果当前key不存在,就相当于set
127.0.0.1:6379[3]> append name hello
(integer) 6
127.0.0.1:6379[3]> get name
"2hello"

# 获取字符串长度
127.0.0.1:6379[3]> strlen name
(integer) 6
#######################################################################
步长

127.0.0.1:6379[3]> set views 0
OK

# 加1
127.0.0.1:6379[3]> incr views
(integer) 1

# 减1
127.0.0.1:6379[3]> decr views
(integer) 0

# 增加指定数
127.0.0.1:6379[3]> incrby views 10
(integer) 10
127.0.0.1:6379[3]> get views
"10"

# 减少指定数
127.0.0.1:6379[3]> decrby views 10
(integer) 0
127.0.0.1:6379[3]> get views
"0"
127.0.0.1:6379[3]> 
###############################################################################
字符串范围 range

127.0.0.1:6379> set name hello,dijia
OK
127.0.0.1:6379> get name
"hello,dijia"

# 截取字符串0~2
127.0.0.1:6379> getrange name 0 2
"hel"
# 截取全部字符串
127.0.0.1:6379> getrange name 0 -1
"hello,dijia"

# 替换
# 从2的位置出替换 
127.0.0.1:6379> setrange name 2 xxx
(integer) 11
127.0.0.1:6379> get name
"hexxx,dijia"
#######################################################

# setex(set with sexpire) 设置过期时间
# setnx(set if not exist) 不存在再设置(在分布式锁中会常常使用)

# 设置 name2 30秒后过期
127.0.0.1:6379> setex name2 30 "i wil go"
OK
# 查看还有多少秒
127.0.0.1:6379> ttl name2
(integer) 15
127.0.0.1:6379> ttl name2
(integer) 14
127.0.0.1:6379> ttl name2
(integer) 12

# 如果不存在,创建name3 ,返回1
127.0.0.1:6379> setnx name3 "hi"
(integer) 1
# 如果存在,返回0
127.0.0.1:6379> setnx name3 "ok"
(integer) 0
127.0.0.1:6379> get name3
"hi"
#########################################################
# 批量设置
mset
mget
msetnx # 原子性操作,要么一起成功,要么一起失败

127.0.0.1:6379> mset k1 v1 k2 v2
OK
127.0.0.1:6379> keys *
1) "mylist"
2) "name"
3) "counter:__rand_int__"
4) "key:__rand_int__"
5) "myhash"
6) "k2"
7) "name3"
8) "k1"
127.0.0.1:6379> mget k1 k2
1) "v1"
2) "v2"

# 如果不存在,设置 k1 为v8,k3 为v3
127.0.0.1:6379> msetnx k1 v8 k3 v3
(integer) 0
127.0.0.1:6379> keys *
1) "mylist"
2) "name"
3) "counter:__rand_int__"
4) "key:__rand_int__"
5) "myhash"
6) "k2"
7) "name3"
8) "k1"
127.0.0.1:6379> get k1
"v1"
127.0.0.1:6379> 
###################################################################
#对象
# 设置一个user对象,id 为1 ,name为zhangsan,age为30,值为json字符来保存对象!
set user:1 {name:zhangsan,age:30}

# 这里的key是一个巧妙的设计: user:{id}:{filed} 如此设计在Redis中完全是可以的
127.0.0.1:6379> mset user:1:name zhangsan user:1:age 20
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "20"
#######################################################
getset # 先get再set

# 如果不存在值,则返回nil
127.0.0.1:6379> getset db redis
(nil)
127.0.0.1:6379> get db
"redis"

# 如果存在值,获取原来的值,修改值
127.0.0.1:6379> getset db mogodb
"redis"
127.0.0.1:6379> get db
"mogodb"

String 类似的使用场景,value除了是我们的字符串还可以是我们的数字

  • 计数器
  • 统计多单位的数量

List

在redis里面,我们可以把list完成栈,队列,阻塞队列!

所有的list命令都是

# lpush 往链表头部(左边)加元素
# rpush 往链表尾部(右边)加元素

127.0.0.1:6379> lpush list one
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lget list
(error) ERR unknown command `lget`, with args beginning with: `list`, 
127.0.0.1:6379> lrange list 0-1
(error) ERR wrong number of arguments for 'lrange' command
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379> lrange list 0 1
1) "two"
2) "one"
#####################################################################
# LPOP 从左边弹出
# RPOP 从右边弹出

127.0.0.1:6379> rpop list
"111"
127.0.0.1:6379> get list
(error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379> rpop list
"one"
#######################################
Lindex 通过下标获取list中值

127.0.0.1:6379> lindex list 0
"two"
127.0.0.1:6379> lindex list 1
(nil)
############################################
llen 返回列表长度

127.0.0.1:6379> llen list
(integer) 1
###########################################
#删除指定的值
lrem 

127.0.0.1:6379> lpush list 2
(integer) 2
127.0.0.1:6379> 
127.0.0.1:6379> lpush list 3
(integer) 3
127.0.0.1:6379> lpush list 3
(integer) 4
127.0.0.1:6379> lrange list 0 -1 
1) "3"
2) "3"
3) "2"
4) "two"
127.0.0.1:6379> lrem list 1 2    # 删除list中 1个key为2的
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "two"
127.0.0.1:6379> lrem list 2 3
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "two"
################################################
# trim 修剪

127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "2"
3) "1"
4) "1"
5) "1"
6) "two"
127.0.0.1:6379> ltrim list 1 2  # 截取保留 list 1~2索引处的链表
OK
127.0.0.1:6379> lrange list 0 -1
1) "2"
2) "1"
#####################################################
# rpoplpush 移除列表最后一个元素到新表中

127.0.0.1:6379> lrange list 0 -1
1) "2"
2) "1"
127.0.0.1:6379> rpoplpush list list2
"1"
127.0.0.1:6379> lrange list 0 -1
1) "2"
127.0.0.1:6379> lrange list2 0 -1
1) "1"

##########################################################
# lset 将列表中指定下标的值替换为另外一个值,更新操作
127.0.0.1:6379> exists list
(integer) 1
127.0.0.1:6379> lset list 0 item  #如果存在,可以更新
OK
127.0.0.1:6379> lrange list
(error) ERR wrong number of arguments for 'lrange' command
127.0.0.1:6379> lrange list 0 -1
1) "item"
127.0.0.1:6379> lset list 1 15  #如果不存在,报错
(error) ERR index out of range
#############################################################
# linsert 在某个元素之前或者之后插入元素
127.0.0.1:6379> lrange list 0 -1
1) "item"
127.0.0.1:6379> lpush list 45
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "45"
2) "item"
127.0.0.1:6379> linsert list before item 123 # 在list中 item元素之前插入123
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "45"
2) "123"
3) "item"
127.0.0.1:6379> linsert list after item 124  # 在list中 item元素之后插入124
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "45"
2) "123"
3) "item"
4) "124"

小结

  • 它实际上是一个链表,before Node after ,left ,right 都可以插入值
  • 如果key不存在,创建新的链表
  • 如果key存在,新增内容
  • 如果移除了所有值,空链表,也代表不存在
  • 在两边插入改动值,效率最高!中间元素,相对来说效率会低一点

消息排队!消息队列(Lpush Rpop)

Set

# sadd 增加
# smembers 查看set中所有值
# sismember 判断set是否有某个值
127.0.0.1:6379> sadd myset 1
(integer) 1
127.0.0.1:6379> smembers myset
1) "1"
127.0.0.1:6379> smembers myse
(empty array)
127.0.0.1:6379> key *
(error) ERR unknown command `key`, with args beginning with: `*`, 
127.0.0.1:6379> keys *
 1) "myset"
 2) "list"
 3) "counter:__rand_int__"
 4) "name"
 5) "myhash"
 6) "name3"
 7) "k2"
 8) "user:1:age"
 9) "list2"
10) "lsit"
11) "user:1:name"
12) "mylist"
13) "key:__rand_int__"
14) "db"
15) "k1"
127.0.0.1:6379> sismember myset hello
(integer) 0
127.0.0.1:6379> sismember myset 1
(integer) 1
###################################################################
# srem 移除set中指定元素

127.0.0.1:6379> sadd myset 13465
(integer) 1
127.0.0.1:6379> sadd myset 21354
(integer) 1
127.0.0.1:6379> srem myset 1
(integer) 1
127.0.0.1:6379> smermber myset
(error) ERR unknown command `smermber`, with args beginning with: `myset`, 
127.0.0.1:6379> smermbers myset
(error) ERR unknown command `smermbers`, with args beginning with: `myset`, 
127.0.0.1:6379> smembers myset
1) "13465"
2) "21354"
######################################################
#spop  随机移除元素

127.0.0.1:6379> smembers myset
1) "13465"
2) "21354"
127.0.0.1:6379> spop myset
"21354"
127.0.0.1:6379> smembers myset
1) "13465"
##################################################
# smove 将一个集合中指定元素移动到另外一个集合中
127.0.0.1:6379> smembers myset
1) "2"
2) "13465"
127.0.0.1:6379> smove myset myset2 13465
(integer) 1
127.0.0.1:6379> smember myset2
(error) ERR unknown command `smember`, with args beginning with: `myset2`, 
127.0.0.1:6379> smembers myset2
1) "13465"

#############################################################
数字集合类
- 差集 sdiff
- 交集 sinter
- 并集 sunion

Hash

Map 集合,key-Map集合

# hset 在hash中删除元素
# hget 获取hash中键对应的map
# hgetall 获取hash中全部数据
# hdel 删除指定的hash
# hlen 获取hash长度
# hexists 判断hash中是否存在某字段
# hkeys 获得所有的 field
# hvals 获取所有的value
# hsetnx 如果不存在,就设置,存在就不设置
127.0.0.1:6379> hset myhash li good
(integer) 1
127.0.0.1:6379> hget myhash li
"good"
127.0.0.1:6379> hget hash
(error) ERR wrong number of arguments for 'hget' command
127.0.0.1:6379> hgetall myhash
1) "element:__rand_int__"
2) "VXK"
3) "field"
4) "li"
5) "li"
6) "good"
127.0.0.1:6379> hgetall myhash
1) "element:__rand_int__"
2) "VXK"
3) "field"
4) "li"
5) "li"
6) "good"
127.0.0.1:6379> hdel myhash li
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "element:__rand_int__"
2) "VXK"
3) "field"
4) "li"

hash适合对象存储,String 适合字符串存储

Zset(有序集合)

在set的基础之上,增加一个值,set k1 v1 ,zset k1 score1 v1

127.0.0.1:6379> zadd myzset 1 one
(integer) 1
127.0.0.1:6379> zadd myzset 2 two
(integer) 1
127.0.0.1:6379> zrange myzset 0 -1
1) "one"
2) "two"

#################################################
排序如何实现

127.0.0.1:6379> zadd salary 2500 xiaohong # 添加三个用户
(integer) 1
127.0.0.1:6379> zadd salary 5000 zhangsan
(integer) 1
127.0.0.1:6379> zadd salary 500 xiaoli
(integer) 1
127.0.0.1:6379> zrangebyscore salary -inf +inf # 显示全部用户从小到大
1) "xiaoli"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrangebyscore salary -inf +inf withscores # 显示全部用户显示工资
1) "xiaoli"
2) "500"
3) "xiaohong"
4) "2500"
5) "zhangsan"
6) "5000"
127.0.0.1:6379> zrangebyscore salary -inf 2500 withscores # 显示工资小于2500员工的升序排序
1) "xiaoli"
2) "500"
3) "xiaohong"
4) "2500"
127.0.0.1:6379> zrevrange salary 0 -1 # 从大到小排序
1) "zhangsan"
2) "xiaohong"
3) "xiaoli"
127.0.0.1:6379> zrevrange salary 0 -1 withscores
1) "zhangsan"
2) "5000"
3) "xiaohong"
4) "2500"
5) "xiaoli"
6) "500"
#####################################################
# rem 移除有序集合的元素

127.0.0.1:6379> zrange salary 0 -1
1) "xiaoli"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrem salary xiaoli
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "xiaohong"
2) "zhangsan"


127.0.0.1:6379> zcard salary # 获取有序集合的数量
(integer) 3

127.0.0.1:6379> zcount salary 20 5000 # value值在 20 到 5000之间的个数
(integer) 2

案例思路:存储班级成绩表,工资表排序

普通消息: 1. 重要消息 2. 带权重进行判断

排行榜应用

三种特殊数据类型

geospatial(地理位置)

朋友的定位,附近的人,打车距离

Hyperloglog

什么是奇数?

A{1,3,5,7,8,7}

B{1,3,5,7,8}

基数(不重复的元素)= 5,可以接受误差!

简介

redis Hyperloglog 基数统计的算法

优点占用的内存是固定的,2^64不同的元素的技术,只需要12KB内存,从内存角度来看,首选Hyperloglog首选

测试

127.0.0.1:6379> pfadd mykey a b c c  # 增加元素
(integer) 1
127.0.0.1:6379> pfcount mykey        # 获得基数数量
(integer) 3
127.0.0.1:6379> pfadd mykey2 1 2 3   # 增加元素
(integer) 1
127.0.0.1:6379> pfcount mykey2       # 获得基数数量
(integer) 3
127.0.0.1:6379> pfmerge mykey3 mykey mykey2 # 合并两组 mykey mykey2 -> mykey3并集
OK
127.0.0.1:6379> pfcount mykey3 # 查看并集数量
(integer) 6

如果允许容错,可以使用Hyperloglog!

Bitmaps

位存储

统计用户信息,活跃,不活跃!登录,未登录!打卡等两个状态的,都可以使用Bitmaps

Bitmaps位图,数据结构!都是操作二进制位来进行记录,就只有0和1两种状态

365 天 = 365 bit 1字节 = 8bit

127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 0
(integer) 0

# 统计打卡记录
127.0.0.1:6379> bitcount sign
(integer) 1

事务

事务: 要么同时成功,要么同时失败,原子性!

Redis单条命令是保存原子性的,但是事务不保证原子性!

Redis事务本质:一组命令的集合一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行!一次性,排它性!执行一些列的命令

Redis事务没有隔离级别的概念!

所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行!

Redis的事务:

  • 开启事务(multi)
  • 命令入队()
  • 执行事务(exec)

正常开启事务

# 开启事务
127.0.0.1:6379> multi
OK

# 命令入队
127.0.0.1:6379(TX)> set name li
QUEUED
127.0.0.1:6379(TX)> set name wang
QUEUED
127.0.0.1:6379(TX)> get name
QUEUED


# 执行
127.0.0.1:6379(TX)> exec
1) OK
2) OK
3) "wang"

放弃事务

127.0.0.1:6379> multi  # 开启事务
OK
127.0.0.1:6379(TX)> set k1 v1 # 命令入队
QUEUED
127.0.0.1:6379(TX)> set k2 v2 #
QUEUED
127.0.0.1:6379(TX)> discard # 取消事务,事务队列中所有事务都不会被执行
OK
127.0.0.1:6379> get k1
(nil)

编译型异常(代码有问题),事务中所有事务都不会被执行

127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1 
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> getser k3 v3  # 错误命令
(error) ERR unknown command `getser`, with args beginning with: `k3`, `v3`, 
127.0.0.1:6379(TX)> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k1 # 所有命令都没有被执行
(nil)

运行时异常(1/0),如果事务队列中存在语法性,那么执行命令的时候,其他命令是可以正常执行的错误的命令会抛出异常

127.0.0.1:6379> set k1 "v1"
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> incr k1  # 会在执行时报错,但不会影响其他命令
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) (error) ERR value is not an integer or out of range
2) OK
127.0.0.1:6379> get k2
"v2"

监控! Watch

悲观锁:

  • 很悲观,什么时候都会出问题,无论做什么都会加锁

乐观锁:

  • 认为什么时候都不会出问题,所以不会上锁,更新数据的时候去判断一下,在此期间是否有人修改过这个数据
  • 获取version
  • 更新的时候比较version

正常执行

127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money # 监视money对象
OK
127.0.0.1:6379> multi  # 事务正常结束,数据期间没有发生变动,这个时候就正常执行成功
OK
127.0.0.1:6379(TX)> decrby money 20
QUEUED
127.0.0.1:6379(TX)> incrby out 20
QUEUED
127.0.0.1:6379(TX)> exec
1) (integer) 80
2) (integer) 20

Jedis

Jedis是Redis官方推荐的java连接开发工具!使用java操作Redis中间件!

测试

1.导入相关依赖

 <dependency>
     <groupId>redis.clients</groupId>
     <artifactId>jedis</artifactId>
     <version>3.7.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>

2.编写测试类

public class JedisTest {

    public static void main(String[] args) {
        //new 一个jedis对象连接redis库
        Jedis jedis = new Jedis("127.0.0.1",6379);
        System.out.println(jedis.ping());

    }
}

3.打开redis-server测试连接

image-20211116175844188

image-20211116175853004

测试事务

测试类

public class AffairTest {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("127.0.0.1",6379);

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("name","li");
        jsonObject.put("password","123");

        jedis.flushAll();

        Transaction multi = jedis.multi();
        String s = jsonObject.toJSONString();

        //开启事务
        try{
            multi.set("user1",s);
            int a =  1/0;           //发生异常
            multi.set("user2",s);
            multi.exec();
        }catch (Exception e){
            System.out.println("发生异常");
            multi.discard();
        }finally {
            System.out.println(jedis.get("user1"));
            System.out.println(jedis.get("user2"));
            jedis.close();  //关闭连接
        }
    }
}

测试结果

image-20211116194542655

SpringBoot + Redis

说明:在springboot2.x之后,原来使用的jedis被替换成了lettuce

jedis:采用直连,多个线程操作的话,是不安全的,如果想要避免不安全,使用jedis pool连接池,更像BIO模式

lettuce:采用netty,实例可以再多个线程进行共享,不存在线程不安全的情况,可以减少线程数据了,更像NIO模式

原码分析:

@Configuration(
    proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
    public RedisAutoConfiguration() {
    }

    @Bean
    @ConditionalOnMissingBean(
        name = {"redisTemplate"}
    )
    //我们可以自己定义一个redisTemplate来替换这个默认的!
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { /
        //默认的redisTemplate没有过多的设置,redis对象都是需要序列化的
        //两个泛型都是object类型,我们后使用需要强制转换<String,Object>
        RedisTemplate<Object, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean//String是redis中最常使用的类型,所以说单独提出来一个bean!
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

整合测试

1.导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.在application.yml中配置redis

spring:
  application:
    name: dev

  redis:
    host: 127.0.0.1
    port: 6379

3.测试

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    void contextLoads() {
        redisTemplate.opsForValue().set("name","li");
        System.out.println(redisTemplate.opsForValue().get("name"));
    }

}

image-20211116205035934

image-20211116205303099

重写配置类

@Configuration
public class RedisConfig {

    //编写自己的模板
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);

        //Json的序列化配置
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        //key采用String的序列化
        template.setStringSerializer(stringRedisSerializer);
        //hash的key也采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        
        return template;
    }
}

此时再从终端测试一下

image-20211116214842356

企业级开发:编写Redis工具类

@Component
public final class RedisUtil{

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================
    /**
     * 指定缓存失效时间
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(String.valueOf(CollectionUtils.arrayToList(key)));
            }
        }
    }


    // ============================String=============================

    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 递增
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /**
     * 递减
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }


    // ================================Map=================================

    /**
     * HashGet
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * HashSet 并设置时间
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }


    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /**
     * 根据key获取Set中的所有值
     * @param key 键
     * @return
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================

    /**
     * 获取list缓存的内容
     *  @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     * @return
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0)
                expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */

    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

Redis.conf详解

Redis启动的时候,就通过配置文件来启动

单位

image-20211118133639772

1.配置文件unit单位对大小写不敏感

包含

image-20211118133826857

如同Spring,Improt,includ

网络

bind 127.0.0.1 -::1 # 绑定ip
protected-mode yes # 保护模式
port 6379 # 端口设置

通用GENERAL

daemonize yes # 以守护进程的方式进行,默认no需要修改

pidfile /var/run/redis_6379.pid # 如果以后台的方式进行,我们就需要指定一个pid文件

# 日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably) 生产环境
# warning (only very important / critical messages are logged)
loglevel notice
logfile "" # 日志的文件位置名
databases 16 # 数据库的数量,默认是16个
always-show-logo no # 是否总是显示LOGO

快照

持久化,在规定的时间内,执行了多少次操作,则会持久化到文件.rdb .aof

redis是内存数据库,如果没有持久化,那么数据断电即失

# 如果3600s内,如果至少有一个1 key进行了修改,我们及进行持久化操作
save 3600 1
# 如果300s内,如果至少有一个100 key进行了修改,我们及进行持久化操作
save 300 100
# 如果60s内,如果至少有一个10000 key进行了修改,我们及进行持久化操作
save 60 10000
#若想要持久化,会自己定义这个测试!

stop-writes-on-bgsave-error yes # 持久化如果出错,是否还需要继续工作

rdbcompression yes # 是否压缩rdb文件,需要消耗cpu资源

rdbchecksum yes # 保存rdb文件的时候,进行错误的检查校验

dir ./ # rdb 文件保存的目录

REPLICATTION 复制,主从复制

image-20211120201526911

SECURITY 安全

设置密码

127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass "redis"
OK
127.0.0.1:6379> config get requirepass # 设置密码
1) "requirepass"
2) "redis"
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> exec
(error) ERR EXEC without MULTI
127.0.0.1:6379> exet
(error) ERR unknown command `exet`, with args beginning with: 
127.0.0.1:6379> exit
[root@iZbp10ephck4n6u4xij7pcZ bin]# redis-cli
127.0.0.1:6379> 
127.0.0.1:6379> config get requirepass
(error) NOAUTH Authentication required.  # 显示需要认证
127.0.0.1:6379> ping
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth redis   # 输入密码认证
OK
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "redis"

CLIENTS 限制

maxclients 10000 # 设置能连接上redis的最大客户端的数量


maxmemory <bytes> # redis 配置最大的内存容量


maxmemory-policy noeviction #内存达到上限之后的处理策略

1、volatile-lru:只对设置了过期时间的key进行LRU(默认值) 

2、allkeys-lru : 删除lru算法的key   

3、volatile-random:随机删除即将过期key   

4、allkeys-random:随机删除   

5、volatile-ttl : 删除即将过期的   

6、noeviction : 永不过期,返回错误

APPEND ONLY模式 aof配置

appendonly no #默认不开启aof模式,默认是使用rdb方式持久化的,在大部分所有的情况下,rdb完全够用
appendfilename "appendonly.aof" # 持久化的文件名称

# appendfsync always # 每次修改都会sync,消耗性能
appendfsync everysec # 每秒执行一次sync,可能会丢失这ls的数据
# appendfsync no # sync,这个时候操作系统自己同步数据,速度最快!

具体配置,在Redis持久化中

Redis持久化

面试和工作,持久化都是重点!

Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能!

RDB(Redis DataBase)

什么是RDB

img

如果需要进行大规模数据恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式比AOF方式更加高效,RDB的缺点是最后一次持久化的数据可能丢失

我们默认的就是RDB,一般情况下不需要修改这个配置

rdb保存的文件是dump.rdb

image-20211118170837808

image-20211118170423632

触发机制

1.save的规则满足的情况下,会自动触发rdb规则

2.执行flushall命令,也会触发rdb规则

3.关闭redis,也会生成一个rdb文件

如何恢复rdb文件

1.只需要将rdb文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb恢复其中的数据

2.查看rdb文件需要存储的位置

127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin #如果这个目录下存在dump.rdb文件,启动就会自动恢复其中的数据

优点:

  • 适合大规模的数据恢复!
  • 数据的完整性不高!

缺点:

  1. 需要一定的时间间隔进行操作,如果redis意外宕机了,这个最后一次修改数据就没有了
  2. fork进程的时候,会占用一定的内容空间

AOF(Append Only File)

AOF是什么

将所有命令都记录下来,恢复的时候就把这个文件全部都再执行一遍!

image-20211118172317690

AOF保存的是aof文件

image-20211118172538392

开启AOF

默认不开启的,我们需要手动进行配置

将appendonly no 改成 appendonly yes ,重启redis就可以了

修复出错的aof文件

如果这个aof文件有错位,redis是启动不起来的,我们需要修复这个aof文件

redis给我们提供了一个工具 redis-check-aof

 redis-check-aof --fix appendonly.aof

优点和缺点

优点:

  1. 每一次修改都同步,文件的完整会更加好
  2. 每秒同步一次,可能会丢失一秒的数据
  3. 从不同步,效率最高

缺点:

  1. 相对于数据文件来说,aof远远大于rdb,修复的速度也比rdb慢
  2. aof运行效率比rdb慢,所以redis默认的是rdb

Redis发布订阅

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接受消息。微信,微博,关注系统!

Redis客户端可以订阅任意数量的频道。

订阅/发布消息图:

img

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

img

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

img

命令

image-20211120192016985

使用场景

  1. 实时消息系统!

  2. 事实聊天(频道当做聊天室,将信息回显给所有人即可!)

  3. 订阅,关注等

    稍微复杂的场景会使用信息中间件MQ等来实现

Redis主从复制

简介

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写入为主,Slave以读为主(读写分离)。

主从复制的主要作用:

  1. 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
  2. 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
  3. 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务,分担服务器负载;尤其是在写少读多的场景下,通过多个节点分担负载,可以大大提高Redis服务器的并发量。
  4. 高可用基石:主从复制还是哨兵和集群能够实现的基础,因此说主从复制时Redis高可用的基础。

环境配置

只配置从库,不用配置主库

127.0.0.1:6379> info replication  # 查询当前库的信息
# Replication
role:master         #角色 master
connected_slaves:0  # 没有从机
master_failover_state:no-failover
master_replid:3af9c262dbd2251f336e749c179e0e6f107c7875
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

复制3个redis配置文件,然后修改对应信息

  1. 端口
  2. pid名字
  3. log文件名字
  4. dum文件名称

修改完毕之后,启动3个redis服务

image-20211120195618352

一主二从

认老大! 一主(6380)二从(6381,6382)

[root@iZbp10ephck4n6u4xij7pcZ myconfig]# redis-cli -h 127.0.0.1 -p 6381

# slaveo host port  认老大
127.0.0.1:6381> slaveof 127.0.0.1 6380 
OK
127.0.0.1:6381> info replication # 查看信息
# Replication
role:slave                       # slave 从机
master_host:127.0.0.1
master_port:6380
master_link_status:up
master_last_io_seconds_ago:1
master_sync_in_progress:0
slave_read_repl_offset:14
slave_repl_offset:14
slave_priority:100
slave_read_only:1
replica_announced:1
connected_slaves:0
master_failover_state:no-failover
master_replid:6e9c6f45e78828ddc39e32d289e553ea0fc25c07
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:14
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:14

# 在主机中查看

127.0.0.1:6380> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6381,state=online,offset=196,lag=1 # 发现从机信息
master_failover_state:no-failover
master_replid:6e9c6f45e78828ddc39e32d289e553ea0fc25c07
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:196
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:196

真实的从主配置应该在配置文件中配置,这样的话是永久的,我们这里使用的是命令,暂时的

细节

  1. 主机可以写,从机只能读
  2. 主机断开连接,从机依旧能连接到主机,但是没有写操作,主机如果回来了,从机依旧可以直接获取到主机写的
  3. 如果是使用命令行来配置的主从,这个时候如果重启了,从机就会变回主机,只要变为从机,立马就可以从主机中获取值

复制原理

slave启动成功连接daomaster后会发送sync同步命令

master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送这个数据文件到slave,并完成一次完全同步

全量复制:slave服务在收到数据库文件数据之后,将其存盘并加载到内存中

增量复制:master 陆续将新的所有收集到的修改命令依此传给slave,完成同步

但是只要是重新连接master,一次完全同步(全量复制)将被自动执行,我们的数据一定可以在从机中看到!

主机断开连接

如果主机断开连接,我们可以使用slaveof no one让自己变成主机,手动让其他结点连接到这个主节点,如果主机重新连接,手动连接。

哨兵模式(自动选举老大)

单哨兵模式

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行,其原理就是哨兵通过发送命令,等到Redis服务器响应,从而监控运行的多个Redis实例。

单哨兵模式

多哨兵模式

单个哨兵进程对Redis服务器进行监控时可能会出现一些问题(比如说哨兵挂掉),为此我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。由于一个哨兵就需要一个进程,哨兵集群至少要三个哨兵才能保证健壮性,因此要配置多哨兵,起步就是6个进程。

多哨兵模式

多哨兵模式中,如果主机宕机了会怎么样?

假设主服务器宕机,哨兵1先检测到这个结果,但是系统并不会马上进行failover(故障转移)过程,因为仅仅是哨兵1主观认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器连接不上了,并且数量达到一定值时(3个哨兵中两个哨兵检测主机不能使用了,那么就认为这个主机挂掉了),那么哨兵之间就会对从机中进行一次选举主机的投票,投票的结果由其中任意一个哨兵(随机)发起,然后进行failover(故障转移)操作,选举成功后就将该从机切换成主机,之后就会通过发布订阅模式(所有的服务都被哨兵监控着),让各个哨兵把自己监控的从服务器实现切换主机,这个过程成为客观下线

测试

1、配置哨兵模式sentinel.conf(最基本配置)

# sentinel monitor 被监控的名称 主机地址 端口 1
sentinel monitor myredis 127.0.0.1 6379 1

这里的 ’1‘ 代表的是如果有1个sentinel(哨兵)判断某个master(主机)宕机,那么该主机宕机下线(也就是至少多少个sentinel同意,master才下线)

2、开启哨兵

5368:X 20 Jan 2021 20:48:18.240 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
5368:X 20 Jan 2021 20:48:18.240 # Redis version=6.0.9, bits=64, commit=00000000, modified=0, pid=5368, just started
5368:X 20 Jan 2021 20:48:18.240 # Configuration loaded
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 6.0.9 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                   
 (    '      ,       .-`  | `,    )     Running in sentinel mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 26379
 |    `-._   `._    /     _.-'    |     PID: 5368
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           http://redis.io        
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               
5368:X 20 Jan 2021 20:48:18.241 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
5368:X 20 Jan 2021 20:48:18.247 # Sentinel ID is f9bce173dd83362821eb2739edc6ea2167f85edd
5368:X 20 Jan 2021 20:48:18.247 # +monitor master myredis 127.0.0.1 6379 quorum 1
5368:X 20 Jan 2021 20:48:18.248 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379

3、当主机宕机了,哨兵做出什么动作

sentinel日志

127.0.0.1:6381> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6380,state=online,offset=6230,lag=0
master_replid:99ac637afc72fb3e748e844e0b4a9553cce745c2
master_replid2:5f25a8d52692fcda674137883c0f5912e847645c
master_repl_offset:6230
second_repl_offset:3922
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:6230

4、如果此时宕机的机器重新连接回来,那么他只能归并到当前的master当作从机(大人,时代变了!)

+convert-to-slave slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6381

哨兵优缺点

优点

  • 哨兵集群,基于主从复制模式,所有的主从配置的优点,它都有。
  • 主从可以切换,故障可以转移,系统的可用性就会更好。
  • 哨兵模式就是主从模式的升级版,从收到到自动,更加健壮。

缺点

  • Redis不好在线扩容,集群容量一旦达到上限,在线扩容就会十分麻烦。
  • 实现哨兵模式的配置比较麻烦,并且其中有很多选项

哨兵模式配置文件中的全部配置

# Example   sentinel.conf
# 哨兵sentinel实例运行的端口   默认是26379,如果有哨兵集群,我们还需要配置每个哨兵端口
port 26379
#哨兵sentinel的工作目录
dir /tmp
#哨兵 sentine1 监控的redis主节点的 ip port   
# master-name  ,可以自己命名的主节点名字 只能由字母A-Z、数字0-9、这三个字符"  .   -  _ "组成。
# quorum配置多少个sentine1哨兵统- -认为master主节点失联那么这时客观上认为主节点失联了
# sentine1 monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster   127.0.0.1   6379   2
#当在Redis实例中开启了requirepass foobared 授权密码这样所有连接kedis实例的客户端都要提供密码
#设置哨兵sentinel连接主从的密码注意必须为主从设置- - 样的验证密码
# sentine1 auth-pass <master-name> <password>
sentine1 auth-pass mymaster MySUPER--secret-0123passwOrd
#指定多少毫秒之后主节点没有应答哨兵sentine1 此时哨兵主观上认为主节点下线默认30秒
# sentinel down-after-mi 11i seconds <master-name> <mi 11iseconds>
sentine1 down-after-mi 11iseconds mymaster 30000
#这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行同步,这个数字越小,完成fai lover所需的时间就越长,但是如果这个数字越大,就意味着越多的slave因为replication而 不可用。可以通过将这个值设为1来保证每次只有一个slave处于不能处理命令请求的状态。
# sentine1 paralle1-syncs <master-name> <numslaves>
sentine1 paralle1-syncs mymaster 1
#故障转移的超时时间failover-timeout 可以用在以下这些方面:
#1.同一个sentine1对同一 个master两次fai lover之间的间隔时间。
#2.当一个slave从一 个错误的master那里同步数据开始计算时间。直到s1ave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有s1aves指向新的master所需的最大时间。不过,即使过了这个超时,slaves 依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
#默认三分钟
# sentine1 failover-timeout <master-name> <milliseconds>
sentine1 fai lover-ti meout mymaster 180000
# SCRIPTS EXECUTION
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被-一个SIGKILL信号终止,之后重新执行。
#通知型脚本:当sentine1有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等 方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,一 个是事件的类型,一个是事件的描述。如果sentine1. conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentine1无法正常启动成功。
#通知脚本
# she11编程
# sentine1 notification-script <master-name> <script-path>
sentine1 notificati on-script mymaster /var/redis/notify. sh
#客户端重新配置主节点参数脚本
#当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
#以下参数将会在调用脚本时传给脚本: 
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
#目前<state>总是“failover",
# <role>是“Teader"或者"observer"中的-一个。
#参数from-ip, from-port, to-ip,to-port是用来和旧的master和新的master(即旧的s lave)通信的
#这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentine1 client-reconfig-script <master-name> <script-path>
sentine1 client-reconfig-script mymaster /var/redis/reconfig.sh #一般都是由运维来配置!

浅谈Redis缓存穿透、击穿、雪崩

Redis缓存使用流程

客户端向服务器发送读请求,此时后台会先去缓存中查数据,如果数据命中,那么返回结果,反之去数据库中查询,如果数据库中查到数据,那么返回数据,并且写入缓存,如果没有查到该数据即返回空结果

img

缓存穿透(数据未命中)

概念

缓存穿透的概念很简单,用户想要查询一个数据,发现Redis缓存中没有,也就是缓存没有命中,于是就向数据库查询,然后发现也没有,于是本次查询失败。当用户很多的时候(秒杀场景),如果在缓存都没有命中,于是都去请求了数据库DB,一瞬间就给数据库造成巨大的压力,这种情况就是常说的缓存穿透。

解决方案(简单介绍)

1、布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。

布隆过滤器

2、缓存空对象

当存储层不命中时,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,从而保护了后端数据源。

缓存空对象

但是这种方式存在两个问题:

  • 如果空值能够被缓存起来,这就意味着需要更多的空间存储更多的键,因为其中可能会有很多空值的键。
  • 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口不一致的问题,这对于需要保持一致的业务会有影响。

缓存击穿(热点key失效,大量数据集中于一点)

概述

这里和缓存穿透不同,缓存穿透是查询不到数据,所以都去查询数据库,给数据库造成很大压力;而缓存击穿是一个key十分热点(比如微博某明星热搜),缓存在不停地抗着高并发,始终对一个点访问,当这个缓存数据失效后,大量的高并发请求穿破缓存,在很短时间去请求数据库(就好比一梭子子弹打容器上),就像在屏幕中凿了个洞。

当这个热点key失效过期后,大量并发请求前往数据库查询最新数据,查询完后并写入缓存,会使数据库压力过大。

解决方案

1、设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。

2、加互斥锁
分布式锁:使用分布式锁,保证对每个key同时只有一个线程去查询后端服务,其它线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

加互斥锁

缓存雪崩(大量的key几乎同时过期)

概述

缓存雪崩是因为大面积的缓存在一个时间点集中失效,打崩了数据库。

举个简单的例子:如果所有首页的Key失效时间都是12小时,中午12点刷新的,在零点的时候有个秒杀活动会有大量用户涌入,假设当时每秒 6000 个请求,本来缓存在可以扛住每秒 5000 个请求,但是这个时候缓存所有的Key都失效了。此时 1 秒 6000 个请求全部落数据库,数据库必然扛不住,它会报一下警,真实情况可能DBA(管理员)都没反应过来就直接挂了。此时,如果没用什么特别的方案来处理这个故障,DBA 很着急,重启数据库,但是数据库立马又被新的流量给打死了。这就是缓存雪崩。
缓存失效,引起雪崩

解决方案

1、Redis高可用
这个思想的含义就是:既然Redis有可能会挂掉,那我多增设几台Redis,这样一台挂掉了之后其它的还可以继续工作,高可用其实就是搭建的Redis集群。将热点数据均匀分布在不同的Redis库中。

2、限流降级
这个解决方案的思想就是:在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其它线程等待。

3、数据预热
数据预热的含义就是在正式部署之前,先把可能高访问的数据预先访问一遍,这样可能大部分的数据就能加载到缓存中。在即将发生大并发访问前手动触发加载缓存中不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀一点。

4、在批量往Redis存数据的时候,把每个key的失效时间都加个随机值,这样保证数据不会在同一时间大面积失效。
要查询一个数据,发现Redis缓存中没有,也就是缓存没有命中,于是就向数据库查询,然后发现也没有,于是本次查询失败。当用户很多的时候(秒杀场景),如果在缓存都没有命中,于是都去请求了数据库DB,一瞬间就给数据库造成巨大的压力,这种情况就是常说的缓存穿透。

解决方案(简单介绍)

1、布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。

布隆过滤器

2、缓存空对象

当存储层不命中时,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,从而保护了后端数据源。

缓存空对象

但是这种方式存在两个问题:

  • 如果空值能够被缓存起来,这就意味着需要更多的空间存储更多的键,因为其中可能会有很多空值的键。
  • 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口不一致的问题,这对于需要保持一致的业务会有影响。

缓存击穿(热点key失效,大量数据集中于一点)

概述

这里和缓存穿透不同,缓存穿透是查询不到数据,所以都去查询数据库,给数据库造成很大压力;而缓存击穿是一个key十分热点(比如微博某明星热搜),缓存在不停地抗着高并发,始终对一个点访问,当这个缓存数据失效后,大量的高并发请求穿破缓存,在很短时间去请求数据库(就好比一梭子子弹打容器上),就像在屏幕中凿了个洞。

当这个热点key失效过期后,大量并发请求前往数据库查询最新数据,查询完后并写入缓存,会使数据库压力过大。

解决方案

1、设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。

2、加互斥锁
分布式锁:使用分布式锁,保证对每个key同时只有一个线程去查询后端服务,其它线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

加互斥锁

缓存雪崩(大量的key几乎同时过期)

概述

缓存雪崩是因为大面积的缓存在一个时间点集中失效,打崩了数据库。

举个简单的例子:如果所有首页的Key失效时间都是12小时,中午12点刷新的,在零点的时候有个秒杀活动会有大量用户涌入,假设当时每秒 6000 个请求,本来缓存在可以扛住每秒 5000 个请求,但是这个时候缓存所有的Key都失效了。此时 1 秒 6000 个请求全部落数据库,数据库必然扛不住,它会报一下警,真实情况可能DBA(管理员)都没反应过来就直接挂了。此时,如果没用什么特别的方案来处理这个故障,DBA 很着急,重启数据库,但是数据库立马又被新的流量给打死了。这就是缓存雪崩。
缓存失效,引起雪崩

解决方案

1、Redis高可用
这个思想的含义就是:既然Redis有可能会挂掉,那我多增设几台Redis,这样一台挂掉了之后其它的还可以继续工作,高可用其实就是搭建的Redis集群。将热点数据均匀分布在不同的Redis库中。

2、限流降级
这个解决方案的思想就是:在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其它线程等待。

3、数据预热
数据预热的含义就是在正式部署之前,先把可能高访问的数据预先访问一遍,这样可能大部分的数据就能加载到缓存中。在即将发生大并发访问前手动触发加载缓存中不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀一点。

4、在批量往Redis存数据的时候,把每个key的失效时间都加个随机值,这样保证数据不会在同一时间大面积失效。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-23 12:26:02  更:2021-11-23 12:28:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 8:58:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码