1.nginx->openResty
?1.1 openResty nginx.conf
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
location ~ /api/item/(\d+) {
default_type application/json;
content_by_lua_file lua/item.lua;
}
?lua_shared_dict item_cache 150m;
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
server {
listen 8081;
server_name localhost;
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
#共享词典,本地缓存
lua_shared_dict item_cache 150m;
server {
listen 8081;
server_name localhost;
location ~ /api/item/(\d+) {
default_type application/json;
content_by_lua_file lua/item.lua;
}
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
1.2 nginx
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
upstream openRestry-cluster {
server 116.62.72.238:8081;
}
server {
listen 8081;
server_name localhost;
location /api {
proxy_pass http://openRestry-cluster;
}
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
1.3 ?item.lua
--cjson
local cjson = require('cjson')
--local dict
local item_cache = ngx.shared.item_cache
--return val
function read_data ( key ,expire )
--search local buff
local val = item_cache:get(key)
if not val then
-- set key
item_cache:set(key , "the key of the value is :" .. key , expire)
val = "缓存没有值"
end
return val
end
--get the first parameter
local id = ngx.var[1]
local data = read_data ( id , 60 )
ngx.say(data)
?2.nginx->openResty->Redis
?2.1 common.lua
都是工具类,我是真的不想背着写下来了
- 关闭Redis连接
- ?连接Redis
- 发起http请求
- 导出上面3个方法
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
-- 建立连接的超时时间 发送请求的超时时间 相应结果的超时时间
red:set_timeouts(1000 , 1000 , 1000)
-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end
-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end
-- 发起http请求
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
ngx.log(ngx.ERR, "http not found, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
--导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M
?2.2 item.lua
- 先查询本地缓存,如果没有再查询Redis,如果Redis也没有则设置一个默认值
--导入common函数库
local common = require('common')
--导入redis连接客户端
local read_redis = common.read_redis
--cjson
local cjson = require('cjson')
--local dict
local item_cache = ngx.shared.item_cache
--return val
function read_data ( key ,expire )
--search local buff
local val = item_cache:get(key)
-- 本地没有缓存
if not val then
ngx.log(ngx.ERR,"本地没有缓存取Redis读取 : key " , key)
-- 去Redis 查询
val = read_redis("127.0.0.1",6379,key)
--判断 是否从redis得到了缓存
if not val then
val = "假设从tomcat客户端取得的数据"
end
item_cache:set(key,val,expire)
end
return val
end
--get the first parameter
local id = ngx.var[1]
local data = read_data ("item:id:" .. id , 60 )
ngx.say(data)
3.nginx->openResty->Redis->http
3.1 item.lua
--导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
--导入cjson库
local cjson = require('cjson')
--导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache
--?封装函数,先查询redis,再查询http
function read_data(key , expire , path , params)
--查询本地缓存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR,"本地缓存查询失败,尝试查询Redis , key: ", key)
--查询Redis
val = read_redis("127.0.0.1" , 6379 , key)
--判断查询结果
if not val then
ngx.log(ngx.ERR,"redis查询失败,尝试查询http,key:" , key)
-- redis查询失败,去查询http
val = read_http(path , params)
end
end
--查询成功,把数据写入本地缓存
item_cache:set(key, val ,expire)
--返回数据
return val
end
--获取路径参数
local id = ngx.var[1]
--查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800, "/item/" .. id , nil)
--查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)
--JSON 转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
--组合数据
item.stock = stock.stock
item.sold = stock.sold
--把item序列化为json返回结果
ngx.say(cjson.encode(item))
3.2 nginx.conf
- 发送http请求需要自己的nginx做代理
- upstream
upstream tomcat-cluster{
hash $request_uri;
server localhost:9000; # 因为我只有一台服务器,openResty,Redis,tomcat全部放一起了,这里就不做集群了
}
location /item/ {
proxy_pass http://tomcat-cluster;
}
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
#添加共享词典,本地缓存
lua_shared_dict item_cache 150m;
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
#tomcat集群配置
upstream tomcat-cluster{
hash $request_uri;
server localhost:9000;
}
server {
listen 8081;
server_name localhost;
location /item {
proxy_pass http://tomcat-cluster; #访问本机的tomcat
}
location ~ /api/item/(\d+) {
default_type application/json;
content_by_lua_file lua/item.lua;
}
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
4. nginx->openResty->Redis->http->Caffeine
4.1 dependency
<!--caffeine-->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
4.2 Bean
@Configuration
public class CaffeineConfig {
@Bean
public Cache<Long, Item> itemCache(){
return Caffeine.newBuilder().
initialCapacity(100).
maximumSize(10_000).build();
}
@Bean
public Cache<Long, ItemStock> stockCache(){
return Caffeine.newBuilder().
initialCapacity(100).
maximumSize(10000).
build();
}
}
4.3 controller
//select one item
@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id){
//有就从本地进程缓存中取,没有才查数据库
return itemCache.get(id,key->itemService.query()
.ne("status", 3).eq("id", id)
.one());
}
//select one stock
@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id){
return stockCache.get(id,key->stockService.getById(id));
}
4.4 redis
- 关于redis,在openResty的本地缓存未命中的情况下,就去查询Redis,但是Redis如果依然未命中的情况下,openResty未进行set操作,所以既然流程能走到这里,就说明Redis未命中,所以应该在controller里面实现存key的操作,但是不需要实现去操作
- 自己去实现吧
5. 缓存预热
项目启动的时候直接往redis里面存一些热点数据,为了防止雪崩,击穿等情况
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService iItemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
//缓存预热 这里正常应该配合大数据做分析,分析哪些数据应该做缓存预热
//这里是demo直接把所有信息全部缓存好了
@Override
public void afterPropertiesSet() throws Exception {
//初始化缓存
//1.查询商品信息
List<Item> itemList = iItemService.list();
//2.放入缓存
for (Item item : itemList) {
//2.1 item序列化 json
String itemJSON = MAPPER.writeValueAsString(item);
//2.2存入redis
redisTemplate.opsForValue().set("item:id:"+item.getId(),itemJSON);
}
//3.查询库存信息
List<ItemStock> stockList = stockService.list();
//4.放入缓存
for (ItemStock itemStock : stockList) {
//序列化
String stockJSON = MAPPER.writeValueAsString(itemStock);
//存入Redis
redisTemplate.opsForValue().set("item:stock:id:"+itemStock.getId(), stockJSON);
}
}
}
6.Canal
缓存同步
6.1 pojo mapping
- pojo?
- ?帮助Canal 建立 pojo 与 数据库一条数据的对应关系
- ? @Id : 标记表中的id字段
- ? @Column (name = "name") : 标记表中与属性名不一致的字段
- ? @Transient : 标记不属于表中的字段
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id //标记表中的id字段
private Long id;//商品id
@Column(name = "name") //标记表中与属性名不一致订字段
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient //标记不适于表中的字段
private Integer stock;
@TableField(exist = false)
@Transient //标记不适于表中的字段
private Integer sold;
}
?6.2 watch
监听数据库的变化,进行动作
@CanalTable("tb_item") //指定要监听的表
@Component //指定表关联的实体类
public class ItemHandler implements EntryHandler<Item> {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long,Item> itemCache;
@Override
public void insert(Item item) { //监听到数据库的增
//写数据到JVM进程缓存
itemCache.put(item.getId(), item);
//写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) { //监听到数据库的改
//写数据到redis
redisHandler.saveItem(after);
//写数据到JVM进程缓存
itemCache.put(after.getId(), after);
}
@Override
public void delete(Item item) { //删
//删除数据到redis
itemCache.invalidate(item.getId());
//删除据到JVM进程缓存
redisHandler.deleteItemById(item.getId());
}
}
?7.Caffeine?
缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类:
- ?分布式缓存,例如Redis:
- ? ?优点:存储容量更大、可靠性更好、可以在集群间共享
- ? ?缺点:访问缓存有网络开销
- ? ?场景:缓存数据量较大、可靠性要求较高、需要在集群间共享
- ?进程本地缓存,例如HashMap、GuavaCache:
- ??优点:读取本地内存,没有网络开销,速度更快
- ??缺点:存储容量有限、可靠性较低、无法共享
- ??场景:性能要求较高,缓存数据量较小
7.1 入门案例
@Test
public void test1(){
Cache<String, String> cache = Caffeine.newBuilder().build();
cache.put("key", "value");
String value = cache.getIfPresent("key");
System.out.println(value);
String value2 = cache.get("key2", key -> {
return "value2";
});
System.out.println(value2);
}
?7.2 Caffeine缓存驱逐策略
?7.2.1 基于容量
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
// 设置缓存大小上限为 1
.maximumSize(1)
.build();
@Test
public void test2() throws InterruptedException {
Cache<String, String> cache = Caffeine.newBuilder().maximumSize(1).build();
cache.put("k1", "v1");
cache.put("k2", "v2");
cache.put("k3", "v3");
// 延迟10ms,给清理线程一点时间 , 驱逐需要时间
Thread.sleep(10L);
System.out.println(cache.getIfPresent("k1"));
System.out.println(cache.getIfPresent("k2"));
System.out.println(cache.getIfPresent("k3"));
}
7.2.2 基于时间
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
.expireAfterWrite(Duration.ofSeconds(10)) // 设置缓存有效期为 10 秒 , key创建后10秒后无人访问,则进行驱逐
.build();
@Test
public void test3() throws InterruptedException {
Cache<String, String> cache =
Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(2L)).build();
cache.put("k1", "k2");
cache.put("k2", "k2");
TimeUnit.SECONDS.sleep(3);
System.out.println(cache.getIfPresent("k1"));
System.out.println(cache.getIfPresent("k2"));
}
7.2.3 基于引用
? ? 设置缓存为软引用或弱引用,利用GC来回收缓存数据。性能较差,不建议使用。
? ? 在默认情况下,当一个缓存元素过期的时候,Caffeine不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐。(和Redis很像吗)
7.3 实现进程缓存
利用Caffeine实现下列需求:
- 给根据id查询商品的业务添加缓存,缓存未命中时查询数据库
- 给根据id查询商品库存的业务添加缓存,缓存未命中时查询数据库
- 缓存初始大小为100
- 缓存上限为10000
7.3.1 将缓存加入到Spring容器中
@Configuration
public class CaffeineConfig {
@Bean
public Cache<Long, Item> itemCache(){
return Caffeine.newBuilder().initialCapacity(100).maximumSize(10000).build();
}
@Bean
public Cache<Long, ItemStock> stockCache(){
return Caffeine.newBuilder().initialCapacity(100).maximumSize(10000).build();
}
}
7.3.2 controller
@RestController
@RequestMapping("item")
public class ItemController {
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
@Autowired
private Cache<Long,Item> itemCache;
@Autowired
private Cache<Long,ItemStock> stockCache;
//select one item
@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id){
return itemCache.get(id,key->itemService.query().ne("status", 3).eq("id",key).one());
}
//select one stock
@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id){
return stockCache.get(id,key->stockService.getById(key));
}
}
?8.OpenResty
8.1 安装OpenResty
自行安装
8.2 获取请求参数
OpenResty提供了各种API用来获取不同类型的请求参数:
我们这里采用路径占位符来获取
路径占位符
- ?~ : 表示采用正则表达式模式
- ?( ) : 一组括号表示,一组正则表达式值 => \d : 整数 ?
- ?ngx.var[1] : 表示第一个正则表达式.
8.3 nginx 发起 http请求
local?resp?=?ngx.location.capture("/path",{
????method?=?ngx.HTTP_GET,???--?请求方式
????args?=?{a=1,b=2},??--?get方式传参数
????body?=?"c=3&d=4"?--?post方式传参数
})
返回的响应内容包括:
- resp.status:响应状态码
- resp.header:响应头,是一个table
- resp.body:响应体,就是响应数据
注意:这里的path是路径,并不包含IP和端口。这个请求会被nginx内部的server监听并处理。 但是我们希望这个请求发送到Tomcat服务器,所以还需要编写一个server(在openResty的nginx.conf文件)来对这个路径做反向代理:
比如 path部分传入了 , /item/10001,则会被nginx代理到如下拼接的代理地址localhost:9000/item/10001 ,指向本地的tomcat服务器
openResty nginx.conf
#tomcat集群配置
upstream tomcat-cluster{
hash $request_uri;
server localhost:9000;
}
location /item {
proxy_pass http://tomcat-cluster; #访问本机的tomcat
}
9.缓存预热
我们数据量较少,可以在启动时将所有数据都放入缓存中。- - !
9.1 实现
docker?run?--name?redis?-p?6379:6379?-d?redis?redis-server?--appendonly?yes
- 在item-service服务中引入Redis依赖
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
redis:
host: 116.62.71.237:6379 #准备好你自己的服务器或者虚拟机
- 编写初始化类
- InitializingBean 会在 Bean 创建之后 , Autowired 注入之后执行.
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService iItemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
//缓存预热 这里正常应该配合大数据做分析,分析哪些数据应该做缓存预热
//这里是demo直接把所有信息全部缓存好了
@Override
public void afterPropertiesSet() throws Exception {
//初始化缓存
//1.查询商品信息
List<Item> itemList = iItemService.list();
//2.放入缓存
for (Item item : itemList) {
//2.1 item序列化 json
String itemJSON = MAPPER.writeValueAsString(item);
//2.2存入redis
redisTemplate.opsForValue().set("item:id:"+item.getId(),itemJSON);
}
//3.查询库存信息
List<ItemStock> stockList = stockService.list();
//4.放入缓存
for (ItemStock itemStock : stockList) {
//序列化
String stockJSON = MAPPER.writeValueAsString(itemStock);
//存入Redis
redisTemplate.opsForValue().set("item:stock:id:"+itemStock.getId(), stockJSON);
}
}
}
10.缓存同步
10.1 缓存数据同步的常见方式
缓存数据同步的常见方式有三种:
10.2 基于Canal的异步通知
Canal
Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:
- MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
- MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据?
Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
10.3 安装canal
查阅资料自行安装 : 安装Canal.md
10.4 异步通知
- Canal提供各种语言的客户端,当Canal监听到binlog变化时,会通知Canal客户端.
- Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。不过这里我们会使用GitHub上的第三方开源的canal-starter。
- 引入依赖
-
<!--canal-->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency> - 编写配置
-
canal:
destination: heima #canal实例名称,要跟canan-server运行时设置的destination一致,就是docker run 那一步
server: locahost:11111 - 编写监听器,监听Canal
-
@CanalTable("tb_item") //指定要监听的表
@Component //指定表关联的实体类
public class ItemHandler implements EntryHandler<Item> {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long,Item> itemCache;
@Override
public void insert(Item item) { //监听到数据库的增
//写数据到JVM进程缓存
itemCache.put(item.getId(), item);
//写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) { //监听到数据库的改
//写数据到JVM进程缓存
itemCache.put(after.getId(), after);
//写数据到redis
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) { //删
//删除据到JVM进程缓存
itemCache.invalidate(item.getId());
//删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}
- 这里有一个问题 canal对于数据库的监控实时对缓存进行更改,up主是对新增的数据全部添加了缓存对吧?这里我感觉是不是应该先判断缓存中是否有没有然后,再进行缓存的同步操作呢? ?对于之后的业务 岂不是缓存与数据库的数据完全一样了?内存吃不消吧....
- Canal推送给canal-client的是被修改的这一行数据(row),而我们引入的canal-client则会帮我们把行数据封装到Item实体类中。这个过程中需要知道数据库与实体的映射关系,要用到JPA的几个注解
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}
- 在前面有一个RedisHandler 的 Bean 新增代码 实现 Redis 对key的 set 和 del
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService iItemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
//缓存预热 这里正常应该配合大数据做分析,分析哪些数据应该做缓存预热
//这里是demo直接把所有信息全部缓存好了
@Override
public void afterPropertiesSet() throws Exception {
//初始化缓存
//1.查询商品信息
List<Item> itemList = iItemService.list();
//2.放入缓存
for (Item item : itemList) {
//2.1 item序列化 json
String itemJSON = MAPPER.writeValueAsString(item);
//2.2存入redis
redisTemplate.opsForValue().set("item:id:"+item.getId(),itemJSON);
}
//3.查询库存信息
List<ItemStock> stockList = stockService.list();
//4.放入缓存
for (ItemStock itemStock : stockList) {
//序列化
String stockJSON = MAPPER.writeValueAsString(itemStock);
//存入Redis
redisTemplate.opsForValue().set("item:stock:id:"+itemStock.getId(), stockJSON);
}
}
public void saveItem(Item item){
try {
String itemJSON = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:"+item.getId(),itemJSON);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteItemById(Long id){
redisTemplate.delete("item:id:" + id);
}
}
|