Redis
简介
Redis:REmote DIctionary Server(远程字典服务器)
是完全开源免费的,用C语言编写的,遵守BSD协议,是一个高性能的(Key/Value)分布式内存数据库,基于内存运行,并支持持久化的NoSQL数据库,是当前最热门的NoSQL数据库之一,也被人们称为数据结构服务器
Redis与其他key-value缓存产品有以下三个特点
- Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用。
- Redis不仅仅支持简单的 key-value 类型的数据,同时还提供list、set、zset、hash等数据结构的存储。
- Redis支持数据的备份,即master-slave模式的数据备份。
Redis能干嘛
内存存储和持久化:redis支持异步将内存中的数据写到硬盘上,同时不影响继续服务取最新N个数据的操作,如:可以将最新的10条评论的ID放在Redis的List集合里面 发布、订阅消息系统 地图信息分析 定时器、计数器
特性
数据类型、基本操作和配置
持久化和复制,RDB、AOF
事务的控制
基础命令
[root@192 bin] cd /usr/local/bin
root@192 bin] redis-server rconfig/redis.conf
[root@192 bin] redis-cli -p 6379
127.0.0.1:6379> ping
PONG
[root@192 myredis] ps -ef|grep redis
127.0.0.1:6379> shutdown
not connected> exit
Select命令切换数据库
Dbsize查看当前数据库的key的数量
Flushdb:清空当前库 ==================
Flushall:清空全部的库 ==================
五大基本数据类型
Redis键(key)
127.0.0.1:6379> set name qinjiang
OK
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> EXISTS name
(integer) 1
127.0.0.1:6379> move name 1
(integer) 1
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> set name qinjiang
OK
127.0.0.1:6379> EXPIRE name 10
127.0.0.1:6379> ttl name
127.0.0.1:6379> type name
string
String
127.0.0.1:6379> del key1
127.0.0.1:6379> exists key1
127.0.0.1:6379> append key1 "hello"
(integer) 10
127.0.0.1:6379> STRLEN key1
(integer) 10
127.0.0.1:6379> set views 0
127.0.0.1:6379> incr views
(integer) 1
127.0.0.1:6379> decr views
(integer) 1
127.0.0.1:6379> incrby views 10 1
(integer) 11
127.0.0.1:6379> decrby views 10
(integer) 1
127.0.0.1:6379> getrange key2 0 -1
127.0.0.1:6379> getrange key2 0 2
127.0.0.1:6379> SETRANGE key2 1 xx
127.0.0.1:6379> setex key3 60 expire
127.0.0.1:6379> setnx mykey "redis"
127.0.0.1:6379> setnx mykey "mongodb"
127.0.0.1:6379> mset k10 v10 k11 v11ms k12 v12
127.0.0.1:6379> mget k10 k11 k12 k13
127.0.0.1:6379> msetnx k10 v10 k15 v15
set user:1 value(json数据)
mset user:1:name zhangsan user:1:age 2
mget user:1:name user:1:age
127.0.0.1:6379> getset db mongodb
(nil)
127.0.0.1:6379> get db
"mongodb"
127.0.0.1:6379> getset db redis
"mongodb"
127.0.0.1:6379> get db
"redis"
List
所有的 List 命令都是 L 开头的
127.0.0.1:6379> LPUSH list "one"
(integer) 1
127.0.0.1:6379> LPUSH list "two"
(integer) 2
127.0.0.1:6379> RPUSH list "right"
(integer) 3
127.0.0.1:6379> Lrange list 0 -1
1) "two"
2) "one"
3) "right"
127.0.0.1:6379> Lpop list
"two"
127.0.0.1:6379> Rpop list
"right"
127.0.0.1:6379> Lrange list 0 -1 1)
"one"
127.0.0.1:6379> Lindex list 1
(nil)
127.0.0.1:6379> Lindex list 0
"one"
127.0.0.1:6379> Lindex list -1
"one"
127.0.0.1:6379> Llen list
(integer) 3
127.0.0.1:6379> lrem list 1 "two"
(integer) 1
127.0.0.1:6379> ltrim mylist 1 2
127.0.0.1:6379> rpoplpush mylist myotherlist
127.0.0.1:6379> exists list
127.0.0.1:6379> lpush list "value1"
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "value1"
127.0.0.1:6379> lset list 0 "new"
OK
127.0.0.1:6379> lrange list 0 0
1) "new"
redis> LINSERT mylist BEFORE "World" "There"
(integer) 3
Set(集合)
127.0.0.1:6379> sadd myset "hello"
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 0
127.0.0.1:6379> SMEMBERS myset
1) "kuangshen"
2) "hello"
127.0.0.1:6379> SISMEMBER myset "hello"
(integer) 1
127.0.0.1:6379> SISMEMBER myset "world"
(integer) 0
127.0.0.1:6379> scard myset
127.0.0.1:6379> srem myset "hello"
127.0.0.1:6379> SRANDMEMBER myset
127.0.0.1:6379> SRANDMEMBER myset 2
127.0.0.1:6379> spop myset
127.0.0.1:6379> smove myset myset2 "kuangshen"
127.0.0.1:6379> sadd key1 "a"
(integer) 1
127.0.0.1:6379> sadd key1 "b"
(integer) 1
127.0.0.1:6379> sadd key1 "c"
(integer) 1
127.0.0.1:6379> sadd key2 "c"
(integer) 1
127.0.0.1:6379> sadd key2 "d"
(integer) 1
127.0.0.1:6379> sadd key2 "e"
(integer) 1
127.0.0.1:6379> SDIFF key1 key2
1) "a"
2) "b"
127.0.0.1:6379> SINTER key1 key2
1) "c"
127.0.0.1:6379> SUNION key1 key2
Hash(哈希)
key-! 这时的value中的值为map集合!
127.0.0.1:6379> hset myhash field1 "kuangshen"
127.0.0.1:6379> hget myhash field1
127.0.0.1:6379> HMSET myhash field1 "Hello" field2 "World"
127.0.0.1:6379> HDEL myhash field1
127.0.0.1:6379> hgetall myhash
127.0.0.1:6379> hlen myhash
127.0.0.1:6379> hexists myhash field1
127.0.0.1:6379> HKEYS myhash
1) "field2"
2) "field1"
127.0.0.1:6379> HVALS myhash
1) "World"
2) "Hello"
127.0.0.1:6379> hset myhash field 5
(integer) 1
127.0.0.1:6379> HINCRBY myhash field 1
(integer) 6
127.0.0.1:6379> HINCRBY myhash field -1
(integer) 5
Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。存储部分变更的数据,如用户信息等。
Zset(有序集合)
127.0.0.1:6379> zadd myset 1 "one"
(integer) 1
127.0.0.1:6379> zadd myset 2 "two" 3 "three"
(integer) 2
127.0.0.1:6379> zadd salary 2500 xiaoming (integer) 1
127.0.0.1:6379> zadd salary 5000 xiaohong (integer) 1
127.0.0.1:6379> zadd salary 500 kuangshen (integer) 1
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores
127.0.0.1:6379> ZREVRANGE salary 0 -1 WITHSCORES
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 2500 WITHSCORES
127.0.0.1:6379> zrem salary kuangshen
127.0.0.1:6379> ZRANGE salary 0 -1
127.0.0.1:6379> zcard salary
127.0.0.1:6379> ZCOUNT myset 1 3
三种特殊数据类型
Geospatial 地理位置
Redis 的 GEO 特性在 Redis 3.2 版本中推出, 这个功能可以将用户给定的地理位置信息储存起来, 并对这些信息进行操作。来实现诸如附近位置、摇一摇这类依赖于地理位置信息的功能。geo的数据类型为zset。
GEO 的数据结构总共有六个常用命令:geoadd、geopos、geodist、georadius、georadiusbymember、gethash
1、geoadd
geoadd key longitude latitude member ...
127.0.0.1:6379> geoadd china:city 116.23 40.22 北京
(integer) 1
127.0.0.1:6379> geoadd china:city 121.48 31.40 上海 113.88 22.55 深圳 120.21 30.20 杭州
(integer) 3
127.0.0.1:6379> geoadd china:city 106.54 29.40 重庆 108.93 34.23 西安 114.02 30.58 武汉
(integer) 3
2、geopos
geopos key member [member...]
127.0.0.1:6379> geopos china:city 北京
1) 1) "116.23000055551528931"
2) "40.2200010338739844"
127.0.0.1:6379> geopos china:city 上海 重庆
1) 1) "121.48000091314315796"
2) "31.40000025319353938"
2) 1) "106.54000014066696167"
2) "29.39999880018641676"
127.0.0.1:6379> geopos china:city 新疆
1) (nil)
3、geodist
geodist key member1 member2 [unit]
127.0.0.1:6379> geodist china:city 北京 上海
"1088785.4302"
127.0.0.1:6379> geodist china:city 北京 上海 km
"1088.7854"
127.0.0.1:6379> geodist china:city 重庆 北京 km
"1491.6716"
4、georadius
georadius key longitude latitude radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count]
127.0.0.1:6379> georadius china:city 100 30 1000 km
重庆
西安
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord withdist count 1
重庆
635.2850
106.54000014066696167
29.39999880018641676
127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord withdist count 2
重庆
635.2850
106.54000014066696167
29.39999880018641676
西安
963.3171
108.92999857664108276
34.23000121926852302
5、georadiusbymember
georadiusbymember key member radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count]
127.0.0.1:6379> GEORADIUSBYMEMBER china:city 上海 400 km
杭州
上海
6、geohash
geohash key member [member...]
127.0.0.1:6379> geohash china:city 北京 重庆
wx4sucu47r0
wm5z22h53v0
GEO没有提供删除成员的命令,但是因为GEO的底层实现是zset,所以可以借用zrem命令实现对地理位置信息的删除.
HyperLogLog(基数统计的算法)
127.0.0.1:6379> PFADD mykey a b c d e f g h i j
1
127.0.0.1:6379> PFCOUNT mykey
10
127.0.0.1:6379> PFADD mykey2 i j z x c v b n m
1
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2
OK
127.0.0.1:6379> PFCOUNT mykey3
15
Bitmaps(计数器)
可以用来设置上班打卡与否,账号登录与否,等等
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N7zCRSjG-1652586387293)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225195015667.png)]
GETBIT key offffset 获取offffset设置的值,未设置过默认返回0 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ftWVcBWU-1652586387294)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225195047165.png)]
bitcount key [start, end] 统计 key 上位为1的个数
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-isHc6OkN-1652586387296)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225195054021.png)]
事务
redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。
批量操作在发送 EXEC 命令前被放入队列缓存,并不会被实际执行!
Redis不保证原子性: Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。
Redis事务的三个阶段:
-
开始事务 multi -
命令入队 -
执行事务 exec
Redis事务相关命令:
watch key1 key2 ...
multi
exec
discard
unwatch
正常执行
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BIHySeEw-1652586387297)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225200435553.png)]
放弃事务
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RS92eAcL-1652586387298)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225200449801.png)]
若在事务队列中存在命令性错误(类似于java编译性错误),则执行EXEC命令时,所有命令都不会执行
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lykcSkYR-1652586387299)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225200508708.png)]
若在事务队列中存在语法性错误(类似于java的1/0的运行时异常),则执行EXEC命令时,其他正确命令会被执行,错误命令抛出异常。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DNNe2Xdq-1652586387299)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201225200524942.png)]
Watch 监控(乐观锁)
127.0.0.1:6379> watch balance
开启事务,当开启两个线程同时操作一个数据,当线程一的数据还没提交,线程二已经操作修改了,那么事务提交时,就会报错,因为我们提前加上了watch监控器。
unwatch 解锁,关闭监视
jedis
1、导入依赖
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
2、远程连接redis测试
-
vim redis.conf
- 将绑定的ip:
bind 127.0.0.1 注释掉 - 修改
protected-mode 属性为no -
重启redis-server -
编写代码测试 public static void main(String[] args) {
System.out.println("测试");
Jedis jedis = new Jedis("39.102.33.216",6379);
System.out.println(jedis.ping());
}
Springboot整合redis
1 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2 配置
# Redis服务器地址
spring.redis.host=39.102.33.216
# Redis服务器连接端口
spring.redis.port=6379
3 测试
@SpringBootTest
class Redis02SpringbootApplicationTests {
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads() {
redisTemplate.opsForValue().set("name","redis学习");
System.out.println(redisTemplate.opsForValue().get("name"));
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
connection.flushAll();
connection.close();
}
}
配置一个RedisTemplate
package com.kuang.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean @SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
RedisUtils工具类
(直接用RedisTemplate操作Redis,需要很多行代码,因此直接封装好一RedisUtils,这样写代码更方便点。这个RedisUtils交给Spring容器实例化,使用时直接注解注入。)
package com.tycoon.redis02springboot.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public class RedisUtils {
private static RedisTemplate<String, Object> redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
RedisUtils.redisTemplate = redisTemplate;
}
public static boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
public static boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
@SuppressWarnings("unchecked")
public static void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
public static Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
public static boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
public static long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
public static Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
public static Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
public static boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
public static boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
public static double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
public static double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
public static Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public static long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public static long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public static long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public static List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
public static Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
Redis.conf
config get * # 获取全部的配置
单位
1、配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit
2、对 大小写 不敏感
INCLUDES 包含
和Spring配置文件类似,可以通过includes包含,redis.conf 可以作为总文件,可以包含其他文件!
NETWORK 网络配置
bind 127.0.0.1
protected-mode yes
port 6379
GENERAL 通用
daemonize yes
supervised no
supervised no
loglevel notice
logfile ""
databases 16
always-show-logo yes
SNAPSHOPTING 快照
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir ./
SECURITY安全
config get requirepass
config set requirepass "123456"
127.0.0.1:6379> auth 123456
限制 CLIENTS
maxclients 10000
maxmemory <bytes>
maxmemory-policy noeviction
append only模式
appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
持久化
Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能!
RDB(Redis DataBase)
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。
Fork
Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据(变量,环境变量,程序计数器等)数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程。
RDB 保存的是 dump.rdb 文件
如何触发RDB快照
1、配置文件中默认的快照配置,建议多用一台机子作为备份,复制一份 dump.rdb
2、命令save或者是bgsave
- save 时只管保存,其他不管,全部阻塞
- bgsave,Redis 会在后台异步进行快照操作,快照同时还可以响应客户端请求。可以通过lastsave命令获取最后一次成功执行快照的时间。
3、执行flushall命令,也会产生 dump.rdb 文件,但里面是空的,无意义 !
4、退出的时候也会产生 dump.rdb 文件!
如何恢复
1、将备份文件(dump.rdb)移动到redis安装目录并启动服务即可
2、CONFIG GET dir 获取目录
127.0.0.1:6379> config get dir
dir
/usr/local/bin
优点和缺点
优点:
1、适合大规模的数据恢复
2、对数据完整性和一致性要求不高
缺点:
1、在一定间隔时间做一次备份,所以如果redis意外down掉的话,就会丢失最后一次快照后的所有修改
2、Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑。
小结
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cYyBvqqC-1652586387300)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226152824283.png)]
AOF(Append Only File)
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
Aof保存的是 appendonly.aof 文件
配置
appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
No-appendfsync-on-rewrite
Auto-aof-rewrite-min-size
Auto-aof-rewrite-percentage
AOF 启动/修复/恢复
正常恢复:
- 启动:设置Yes,修改默认的appendonly no,改为yes
- 将有数据的aof文件复制一份保存到对应目录(config get dir)
- 恢复:重启redis然后重新加载
异常恢复:
- 启动:设置Yes
- 故意破坏 appendonly.aof 文件!
- 修复: redis-check-aof --fix appendonly.aof 进行修复
- 恢复:重启 redis 然后重新加载
Rewrite
AOF 采用文件追加方式,文件会越来越大,为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis 就会启动AOF 文件的内容压缩,只保留可以恢复数据的最小指令集,可以使用命令 bgrewriteaof !
重写原理: AOF 文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),遍历新进程的内存中数据,每条记录有一条的Set语句。重写aof文件的操作,并没有读取旧的aof文件,这点和快照有点类似!
触发机制: Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的已被且文件大于64M的触发。
优点和缺点
优点:
1、每修改同步:appendfsync always 同步持久化,每次发生数据变更会被立即记录到磁盘,性能较差但数据完整性比较好
2、每秒同步: appendfsync everysec 异步操作,每秒记录 ,如果一秒内宕机,有数据丢失
3、不同步: appendfsync no 从不同步
缺点:
1、相同数据集的数据而言,aof 文件要远大于 rdb文件,恢复速度慢于 rdb。
2、Aof 运行效率要慢于 rdb,每秒同步策略效率较好,不同步效率和rdb相同。
小总结
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Ta2mbwB-1652586387301)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226160033643.png)]
总结
1、RDB 持久化方式能够在指定的时间间隔内对你的数据进行快照存储
2、AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。
3、只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
4、同时开启两种持久化方式
- 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
- RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段。
5、性能建议
- 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留 save 900 1 这条规则。
- 如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价:一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
- 如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。
Redis发布订阅
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
Redis 客户端可以订阅任意数量的频道。
命令
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwM9GzfV-1652586387302)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226171147331.png)]
测试
redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages...
(press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat "Hello,Redis"
(integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat "Hello,Kuangshen"
(integer) 1
1) "message"
2) "redisChat"
3) "Hello,Redis"
1) "message"
2) "redisChat"
3) "Hello,Kuangshen"
Redis主从复制
概念
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave 以读为主。
默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。
主从复制的作用主要包括:
1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
4、高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
环境配置
slaveof 主库ip 主库端口
Info replication
修改配置文件!
准备工作:我们配置主从复制,至少需要三个,一主二从!配置三个客户端!
1、拷贝多个redis.conf 文件
2、指定端口 6379,依次类推
3、开启daemonize yes
4、Pid文件名字 pidfile /var/run/redis_6379.pid , 依次类推
5、Log文件名字 logfile “6379.log” , 依次类推
6、Dump.rdb 名字 dbfilename dump6379.rdb , 依次类推
上面都配置完毕后,3个服务通过3个不同的配置文件开启,我们的准备环境就OK 了!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9iPv9nUn-1652586387303)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226174108984.png)]
一主二从
1、环境初始化
1、默认三个都是Master 主节点
2、配置为一个Master 两个Slave
slaveof 127.0.0.1 6379
slaveof 127.0.0.1 6379
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DWmBAU8d-1652586387304)(C:/Users/hasee/AppData/Roaming/Typora/typora-user-images/image-20201226174829191.png)]
3、在主机设置值,在从机都可以取到!从机不能写值!
小结
主机宕机:如果主机宕机了,那么从机依然连接到主机,就是没有写的操作了,当主机重新连接,那么从机还可以继续取到主机里写的值。
从机宕机:但是当从机断开了,那么主机中断开了这个从机,重新连接后,也没有主从效应了。但是从新设置它的主机,那么还是可以取到主机的值。
层层链路
上一个Slave 可以是下一个slave 和 Master,Slave 同样可以接收其他 slaves 的连接和同步请求,那么该 slave 作为了链条中下一个的master,可以有效减轻 master 的写压力!
谋朝篡位
一主二从的情况下,如果主机断了,从机可以使用命令 SLAVEOF NO ONE 将自己改为主机!这个时候其余的从机链接到这个节点。对一个从属服务器执行命令 SLAVEOF NO ONE 将使得这个从属服务器关闭复制功能,并从从属服务器转变回主服务器,原来同步所得的数据集不会被丢弃。
主机再回来,也只是一个光杆司令了,从机为了正常使用跑到了新的主机上!
主从复制原理
Slave 启动成功连接到 master 后会发送一个sync命令
Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行
哨兵模式
概述
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。
谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
这里的哨兵有两个作用
- 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
- 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
配置测试
1、调整结构,6379带着80、81
2、自定义的 /myredis 目录下新建 sentinel.conf 文件,名字千万不要错
3、配置哨兵,填写内容sentinel monitor 被监控主机名字 127.0.0.1 6379 1上面最后一个数字1,表示主机挂掉后slave投票看让谁接替成为主机,得票数多少后成为主机
4、启动哨兵Redis-sentinel /myredis/sentinel.conf上述目录依照各自的实际情况配置,可能目录不同
5、正常主从演示
6、原有的Master 挂了
7、投票新选
8、重新主从继续开工,info replication 查查看
9、问题:如果之前的master 重启回来,会不会双master 冲突? 之前的回来只能做小弟了
哨兵模式的优缺点
优点
-
哨兵集群模式是基于主从模式的,所有主从的优点,哨兵模式同样具有。 -
主从可以切换,故障可以转移,系统可用性更好。 -
哨兵模式是主从模式的升级,系统更健壮,可用性更高。
缺点
-
Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。 -
实现哨兵模式的配置也不简单,甚至可以说有些繁琐
哨兵配置说明
port 26379
dir /tmp
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000
sentinel notification-script mymaster /var/redis/notify.sh
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
Redis缓存穿透和雪崩
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。
缓存穿透
概念
缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。
解决方案
布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;
缓存空对象
当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;
但是这种方法会存在两个问题:
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。
缓存击穿
概述
这里需要注意和缓存击穿的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。
解决方案
设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点 key 过期后产生的问题。
加互斥锁
分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩
概念
缓存雪崩,是指在某一个时间段,缓存集中过期失效。
产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。
其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。
解决方案
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。
|