博客论坛
简介
基于 SpringBoot + FreeMarker 开发的博客论坛项目。前端使用 layui + fly 模板,使用自定义 Freemarker 标签,使用 shiro+redis 完成了会话共享,redis 的 zset 结构完成本周热议排行榜,t-io+websocket 完成即时消息通知和群聊,rabbitmq+elasticsearch 完成博客内容搜索引擎,使用 mybatis plus 方便数据库操作,使用 mysql 作为数据库。
功能
前端
- 登录注册
- 文章管理
- 评论管理
- 收藏、精选、置顶
- 搜索
- 个人中心
- 群聊
安装 rabbitmq https://blog.csdn.net/qq_45803593/article/details/124922690
安装 elasticsearch https://blog.csdn.net/qq_45803593/article/details/124895016
代码
1、自定义 freemarker 方法
我们想要博客在显示时间的时候,将时间显示为 秒前;分钟前;小时前;天前;月前;年前;未知;
1、导入工具类包
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.1.17</version>
</dependency>
2、导入模板工具类
DirectiveHandler
package com.manster.common.templates;
import freemarker.core.Environment;
import freemarker.template.*;
import org.springframework.util.Assert;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Date;
import java.util.List;
import java.util.Map;
public class DirectiveHandler {
private Environment env;
private Map<String, TemplateModel> parameters;
private TemplateModel[] loopVars;
private TemplateDirectiveBody body;
private Environment.Namespace namespace;
public DirectiveHandler(Environment env, Map<String, TemplateModel> parameters, TemplateModel[] loopVars,
TemplateDirectiveBody body) {
this.env = env;
this.loopVars = loopVars;
this.parameters = parameters;
this.body = body;
this.namespace = env.getCurrentNamespace();
}
public void render() throws IOException, TemplateException {
Assert.notNull(body, "must have template directive body");
body.render(env.getOut());
}
public void renderString(String text) throws Exception {
StringWriter writer = new StringWriter();
writer.append(text);
env.getOut().write(text);
}
public DirectiveHandler put(String key, Object value) throws TemplateModelException {
namespace.put(key, wrap(value));
return this;
}
public String getString(String name) throws TemplateModelException {
return TemplateModelUtils.converString(getModel(name));
}
public Integer getInteger(String name) throws TemplateModelException {
return TemplateModelUtils.converInteger(getModel(name));
}
public Short getShort(String name) throws TemplateModelException {
return TemplateModelUtils.converShort(getModel(name));
}
public Long getLong(String name) throws TemplateModelException {
return TemplateModelUtils.converLong(getModel(name));
}
public Double getDouble(String name) throws TemplateModelException {
return TemplateModelUtils.converDouble(getModel(name));
}
public String[] getStringArray(String name) throws TemplateModelException {
return TemplateModelUtils.converStringArray(getModel(name));
}
public Boolean getBoolean(String name) throws TemplateModelException {
return TemplateModelUtils.converBoolean(getModel(name));
}
public Date getDate(String name) throws TemplateModelException {
return TemplateModelUtils.converDate(getModel(name));
}
public String getString(String name, String defaultValue) throws Exception {
String result = getString(name);
return null == result ? defaultValue : result;
}
public Integer getInteger(String name, int defaultValue) throws Exception {
Integer result = getInteger(name);
return null == result ? defaultValue : result;
}
public Long getLong(String name, long defaultValue) throws Exception {
Long result = getLong(name);
return null == result ? defaultValue : result;
}
public String getContextPath() {
String ret = null;
try {
ret = TemplateModelUtils.converString(getEnvModel("base"));
} catch (TemplateModelException e) {
}
return ret;
}
public TemplateModel wrap(Object object) throws TemplateModelException {
return env.getObjectWrapper().wrap(object);
}
public TemplateModel getEnvModel(String name) throws TemplateModelException {
return env.getVariable(name);
}
public void write(String text) throws IOException {
env.getOut().write(text);
}
private TemplateModel getModel(String name) {
return parameters.get(name);
}
public abstract static class BaseMethod implements TemplateMethodModelEx {
public String getString(List<TemplateModel> arguments, int index) throws TemplateModelException {
return TemplateModelUtils.converString(getModel(arguments, index));
}
public Integer getInteger(List<TemplateModel> arguments, int index) throws TemplateModelException {
return TemplateModelUtils.converInteger(getModel(arguments, index));
}
public Long getLong(List<TemplateModel> arguments, int index) throws TemplateModelException {
return TemplateModelUtils.converLong(getModel(arguments, index));
}
public Date getDate(List<TemplateModel> arguments, int index) throws TemplateModelException {
return TemplateModelUtils.converDate(getModel(arguments, index));
}
public TemplateModel getModel(List<TemplateModel> arguments, int index) {
if (index < arguments.size()) {
return arguments.get(index);
}
return null;
}
}
}
TemplateDirective
package com.manster.common.templates;
import freemarker.core.Environment;
import freemarker.template.TemplateDirectiveBody;
import freemarker.template.TemplateDirectiveModel;
import freemarker.template.TemplateException;
import freemarker.template.TemplateModel;
import java.io.IOException;
import java.util.Map;
public abstract class TemplateDirective implements TemplateDirectiveModel {
protected static String RESULT = "result";
protected static String RESULTS = "results";
@Override
public void execute(Environment env, Map parameters,
TemplateModel[] loopVars, TemplateDirectiveBody body) throws TemplateException, IOException {
try {
execute(new DirectiveHandler(env, parameters, loopVars, body));
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new TemplateException(e, env);
}
}
abstract public String getName();
abstract public void execute(DirectiveHandler handler) throws Exception;
}
TemplateModelUtils
package com.manster.common.templates;
import freemarker.template.*;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import static org.apache.commons.lang3.StringUtils.*;
public class TemplateModelUtils {
public static final DateFormat FULL_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static final int FULL_DATE_LENGTH = 19;
public static final DateFormat SHORT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");
public static final int SHORT_DATE_LENGTH = 10;
public static String converString(TemplateModel model) throws TemplateModelException {
if (null != model) {
if (model instanceof TemplateScalarModel) {
return ((TemplateScalarModel) model).getAsString();
} else if ((model instanceof TemplateNumberModel)) {
return ((TemplateNumberModel) model).getAsNumber().toString();
}
}
return null;
}
public static TemplateHashModel converMap(TemplateModel model) throws TemplateModelException {
if (null != model) {
if (model instanceof TemplateHashModelEx) {
return (TemplateHashModelEx) model;
} else if (model instanceof TemplateHashModel) {
return (TemplateHashModel) model;
}
}
return null;
}
public static Integer converInteger(TemplateModel model) throws TemplateModelException {
if (null != model) {
if (model instanceof TemplateNumberModel) {
return ((TemplateNumberModel) model).getAsNumber().intValue();
} else if (model instanceof TemplateScalarModel) {
String s = ((TemplateScalarModel) model).getAsString();
if (isNotBlank(s)) {
try {
return Integer.parseInt(s);
} catch (NumberFormatException e) {
}
}
}
}
return null;
}
public static Short converShort(TemplateModel model) throws TemplateModelException {
if (null != model) {
if (model instanceof TemplateNumberModel) {
return ((TemplateNumberModel) model).getAsNumber().shortValue();
} else if (model instanceof TemplateScalarModel) {
String s = ((TemplateScalarModel) model).getAsString();
if (isNotBlank(s)) {
try {
return Short.parseShort(s);
} catch (NumberFormatException e) {
}
}
}
}
return null;
}
public static Long converLong(TemplateModel model) throws TemplateModelException {
if (null != model) {
if (model instanceof TemplateNumberModel) {
return ((TemplateNumberModel) model).getAsNumber().longValue();
} else if (model instanceof TemplateScalarModel) {
String s = ((TemplateScalarModel) model).getAsString();
if (isNotBlank(s)) {
try {
return Long.parseLong(s);
} catch (NumberFormatException e) {
}
}
}
}
return null;
}
public static Double converDouble(TemplateModel model) throws TemplateModelException {
if (null != model) {
if (model instanceof TemplateNumberModel) {
return ((TemplateNumberModel) model).getAsNumber().doubleValue();
} else if (model instanceof TemplateScalarModel) {
String s = ((TemplateScalarModel) model).getAsString();
if (isNotBlank(s)) {
try {
return Double.parseDouble(s);
} catch (NumberFormatException ignored) {
}
}
}
}
return null;
}
public static String[] converStringArray(TemplateModel model) throws TemplateModelException {
if (model instanceof TemplateSequenceModel) {
TemplateSequenceModel smodel = (TemplateSequenceModel) model;
String[] values = new String[smodel.size()];
for (int i = 0; i < smodel.size(); i++) {
values[i] = converString(smodel.get(i));
}
return values;
} else {
String str = converString(model);
if (isNotBlank(str)) {
return split(str,',');
}
}
return null;
}
public static Boolean converBoolean(TemplateModel model) throws TemplateModelException {
if (null != model) {
if (model instanceof TemplateBooleanModel) {
return ((TemplateBooleanModel) model).getAsBoolean();
} else if (model instanceof TemplateNumberModel) {
return !(0 == ((TemplateNumberModel) model).getAsNumber().intValue());
} else if (model instanceof TemplateScalarModel) {
String temp = ((TemplateScalarModel) model).getAsString();
if (isNotBlank(temp)) {
return Boolean.valueOf(temp);
}
}
}
return null;
}
public static Date converDate(TemplateModel model) throws TemplateModelException {
if (null != model) {
if (model instanceof TemplateDateModel) {
return ((TemplateDateModel) model).getAsDate();
} else if (model instanceof TemplateScalarModel) {
String temp = trimToEmpty(((TemplateScalarModel) model).getAsString());
return parseDate(temp);
}
}
return null;
}
public static Date parseDate(String date) {
Date ret = null;
try {
if (FULL_DATE_LENGTH == date.length()) {
ret = FULL_DATE_FORMAT.parse(date);
} else if (SHORT_DATE_LENGTH == date.length()) {
ret = SHORT_DATE_FORMAT.parse(date);
}
} catch (ParseException e) {
}
return ret;
}
}
3、自定义方法
package com.manster.template;
import com.manster.common.templates.DirectiveHandler;
import freemarker.template.TemplateModelException;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
@Component
public class TimeAgoMethod extends DirectiveHandler.BaseMethod {
private static final long ONE_MINUTE = 60000L;
private static final long ONE_HOUR = 3600000L;
private static final long ONE_DAY = 86400000L;
private static final long ONE_WEEK = 604800000L;
private static final String ONE_SECOND_AGO = "秒前";
private static final String ONE_MINUTE_AGO = "分钟前";
private static final String ONE_HOUR_AGO = "小时前";
private static final String ONE_DAY_AGO = "天前";
private static final String ONE_MONTH_AGO = "月前";
private static final String ONE_YEAR_AGO = "年前";
private static final String ONE_UNKNOWN = "未知";
@Override
public Object exec(List arguments) throws TemplateModelException {
Date time = getDate(arguments, 0);
return format(time);
}
public static String format(Date date) {
if (null == date) {
return ONE_UNKNOWN;
}
long delta = new Date().getTime() - date.getTime();
if (delta < 1L * ONE_MINUTE) {
long seconds = toSeconds(delta);
return (seconds <= 0 ? 1 : seconds) + ONE_SECOND_AGO;
}
if (delta < 45L * ONE_MINUTE) {
long minutes = toMinutes(delta);
return (minutes <= 0 ? 1 : minutes) + ONE_MINUTE_AGO;
}
if (delta < 24L * ONE_HOUR) {
long hours = toHours(delta);
return (hours <= 0 ? 1 : hours) + ONE_HOUR_AGO;
}
if (delta < 48L * ONE_HOUR) {
return "昨天";
}
if (delta < 30L * ONE_DAY) {
long days = toDays(delta);
return (days <= 0 ? 1 : days) + ONE_DAY_AGO;
}
if (delta < 12L * 4L * ONE_WEEK) {
long months = toMonths(delta);
return (months <= 0 ? 1 : months) + ONE_MONTH_AGO;
} else {
long years = toYears(delta);
return (years <= 0 ? 1 : years) + ONE_YEAR_AGO;
}
}
private static long toSeconds(long date) {
return date / 1000L;
}
private static long toMinutes(long date) {
return toSeconds(date) / 60L;
}
private static long toHours(long date) {
return toMinutes(date) / 60L;
}
private static long toDays(long date) {
return toHours(date) / 24L;
}
private static long toMonths(long date) {
return toDays(date) / 30L;
}
private static long toYears(long date) {
return toMonths(date) / 365L;
}
}
4、将方法注入到 freemarker 中
作用: @PostConstruct注解的方法在项目启动的时候执行这个方法,也可以理解为在spring容器启动的时候执行,可作为一些数据的常规化加载,比如数据字典之类的。
执行顺序: 其实从依赖注入的字面意思就可以知道,要将对象p注入到对象a,那么首先就必须得生成对象a和对象p,才能执行注入。所以,如果一个类A中有个成员变量p被@Autowried注解,那么@Autowired注入是发生在A的构造方法执行完之后的。
如果想在生成对象时完成某些初始化操作,而偏偏这些初始化操作又依赖于依赖注入,那么久无法在构造函数中实现。为此,可以使用@PostConstruct注解一个方法来完成初始化,@PostConstruct注解的方法将会在依赖注入完成后被自动调用。
Constructor >> @Autowired >> @PostConstruct
FreeMarkerConfig
package com.manster.config;
import com.manster.template.TimeAgoMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class FreeMarkerConfig {
@Autowired
private freemarker.template.Configuration configuration;
@PostConstruct
public void setUp(){
configuration.setSharedVariable("timeAgo", new TimeAgoMethod());
}
}
2、自定义 freemarker 标签
1、自定义标签
@Component
public class BlogsTemplate extends TemplateDirective {
@Autowired
BlogService blogService;
@Override
public String getName() {
return "blogs";
}
@Override
public void execute(DirectiveHandler handler) throws Exception {
Integer level = handler.getInteger("level");
Integer pn = handler.getInteger("pn",1);
Integer size = handler.getInteger("size",2);
Long categoryId = handler.getLong("categoryId");
IPage<BlogVo> page = blogService.paging(new Page(pn, size), categoryId, null, level, null, "created");
handler.put(RESULTS, page).render();
}
}
2、配置标签
@Configuration
public class FreeMarkerConfig {
@Autowired
private freemarker.template.Configuration configuration;
@Autowired
BlogsTemplate blogsTemplate;
@PostConstruct
public void setUp(){
configuration.setSharedVariable("timeAgo", new TimeAgoMethod());
configuration.setSharedVariable("blogs", blogsTemplate);
}
}
3、使用标签
common.ftl 首先我们将一篇博客的简要信息进行封装
<#macro blisting blog>
<li>
<a href="/user/${blog.authorId}" class="fly-avatar">
<img src="${blog.authorAvatar}" alt="${blog.authorName}">
</a>
<h2>
<a class="layui-badge">${blog.categoryName}</a>
<a href="/blog/${blog.id}">${blog.title}</a>
</h2>
<div class="fly-list-info">
<a href="/user/${blog.authorId}" link>
<cite>${blog.authorName}</cite>
</a>
<span>${timeAgo(blog.created)}</span>
<span class="fly-list-nums">
<i class="iconfont icon-pinglun1" title="回答"></i> ${blog.commentCount}
</span>
</div>
<div class="fly-list-badge">
</div>
</li>
</#macro>
index.ftl 然后我们将获取到的列表进行遍历
<ul class="fly-list">
<@blogs size=3 level=1>
<#list results.records as blog>
<@blisting blog></@blisting>
</#list>
</@blogs>
</ul>
3、redis 的 zset 实现排行榜
1、redis操作分析
我们需要使用的 redis 命令有:
- 新增:ZADD key score member
- 自增:ZINCRBY key increment member
- 并集:ZUNIONSTORE key numkeys key [key …]
- 排序:ZREVRANGE key 0 -1 withscores
使用 redis 的有序聚合 zset 来进行排行榜的统计,我们将每七天作为一个周期来进行排序
3号博客1评论10条
3号博客2评论6条
首先我们将每天的评论数量进行存储,其对应的 redis 命令为
zadd day:3 10 blog:1 6 blog:2
zadd day:4 10 blog:1 6 blog:2
zadd day:5 10 blog:1 6 blog:2
...
zadd day:10 10 blog:1 6 blog:2
然后我们完成七天每篇博客的评论数相加,其形式应该为
70 post:1
42 post:2
这里我们需要使用并集,将集合中相同的成员的值进行相加合并
Redis Zunionstore 命令计算给定的一个或多个有序集的并集,其中给定 key 的数量必须以 numkeys 参数指定,并将该并集(结果集)储存到 destination 。
默认情况下,结果集中某个成员的分数值是所有给定集下该成员分数值之和 。
redis 127.0.0.1:6379> ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
redis> ZADD zset1 1 "one"
(integer) 1
redis> ZADD zset1 2 "two"
(integer) 1
redis> ZADD zset2 1 "one"
(integer) 1
redis> ZADD zset2 2 "two"
(integer) 1
redis> ZADD zset2 3 "three"
(integer) 1
redis> ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3
(integer) 3
redis> ZRANGE out 0 -1 WITHSCORES
1) "one"
2) "5"
3) "three"
4) "9"
5) "two"
6) "10"
redis>
使用 WEIGHTS 选项时,可以为各个有序集合输入指定一个乘法系数(Multiplication factor )。这意味着在将每个有序集合输入中的每个元素的分值传递给聚合函数(Aggregation function)之前,会将该分值乘以对应的系数。当未给定 WEIGHTS 选项时,乘法系数默认为 1。
使用 AGGREGATE 选项时,可以指定并集运算结果的聚合方式。该选项默认值为 SUM,即将输入中所有存在该元素的集合中对应的分值全部加一起。当选项被设置为 MIN 或 MAX 任意值时,结果集合将保存输入中所有存在该元素的集合中对应的分值的最小或最大值。
我们这里需要进行的操作就是七个集合进行并集
ZUNIONSTORE week:rank 7 day:18 day19 ... day25
最终我们的测试命令集合为
127.0.0.1:6379> zadd day:3 10 blog:1
(integer) 1
127.0.0.1:6379> zadd day:4 10 blog:1
(integer) 1
127.0.0.1:6379> zadd day:5 10 blog:1
(integer) 1
127.0.0.1:6379> zadd day:3 3 blog:2
(integer) 1
127.0.0.1:6379> zadd day:4 11 blog:2
(integer) 1
127.0.0.1:6379> zadd day:5 5 blog:2
(integer) 1
127.0.0.1:6379> keys *
1) "day:5"
2) "day:3"
3) "day:4"
127.0.0.1:6379> zrevrange day:3 0 -1 withscores
1) "blog:1"
2) "10"
3) "blog:2"
4) "3"
127.0.0.1:6379> ZUNIONSTORE week:rank 3 day:3 day:4 day:5
(integer) 2
127.0.0.1:6379> zrevrange week:rank 0 -1 withscores
1) "blog:1"
2) "30"
3) "blog:2"
4) "19"
127.0.0.1:6379> zincrby day:5 10 blog:2
"15"
至此我们就得到了我们想要的结果
2、项目代码实现
1、初始化数据
首先我们需要在项目启动以后就将数据进行初始化,所以我们还是写在 ContextStartup 中,同时我们需要实现 redis 序列化,不适用java 默认的序列化方式
package com.manster.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.net.UnknownHostException;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
jackson2JsonRedisSerializer.setObjectMapper(new ObjectMapper());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
return redisTemplate;
}
}
我们在项目启动时就将近7天的数据进行初始化
@Component
public class ContextStartup implements ApplicationRunner, ServletContextAware {
@Autowired
CategoryService categoryService;
ServletContext servletContext;
@Autowired
BlogService blogService;
@Override
public void run(ApplicationArguments args) throws Exception {
List<Category> categories = categoryService.list(new QueryWrapper<Category>().eq("status", 0));
servletContext.setAttribute("categories", categories);
blogService.initWeekRank();
}
@Override
public void setServletContext(ServletContext servletContext) {
this.servletContext = servletContext;
}
}
然后我们在 blog 服务中实现初始化,
- 首先我们查出 7 天内发布的所有文章
- 然后设置每天日期为 key,评论数为 score,文章id 为 member;同时设置过期时间
- 设置完成后我们将这些文章的简要信息都存入一个 hash 中
- 最后我们对这 7 天的每篇文章的评论数做一个并集得出排行榜
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements BlogService {
@Autowired
RedisUtils redisUtils;
@Override
public BlogVo getOneBlog(QueryWrapper<Blog> wrapper) {
return baseMapper.selectOneBlog(wrapper);
}
@Override
public void initWeekRank() {
List<Blog> blogList = this.list(new QueryWrapper<Blog>()
.ge("created", DateUtil.offsetDay(new Date(), -7))
.select("id,title,user_id,comment_count,view_count,created")
);
for (Blog blog : blogList) {
String key = "day:rank:" + DateUtil.format(blog.getCreated(), DatePattern.PURE_DATE_FORMAT);
redisUtils.zSet(key, blog.getId(), blog.getCommentCount());
long between = DateUtil.between(new Date(), blog.getCreated(), DateUnit.DAY);
long expireTime = (7 - between) * 24 * 60 * 60;
redisUtils.expire(key, expireTime);
this.hashCacheBlog(blog, expireTime);
}
this.zunionAndStoreForWeekRank();
}
private void zunionAndStoreForWeekRank() {
String key = "day:rank:" + DateUtil.format(new Date(), DatePattern.PURE_DATE_FORMAT);
String destKey = "week:rank";
List<String> otherKeys = new ArrayList<>();
for(int i=-6; i<0; i++){
String tempKey = "day:rank:" +
DateUtil.format(DateUtil.offsetDay(new Date(), i), DatePattern.PURE_DATE_FORMAT);
otherKeys.add(tempKey);
}
redisUtils.zUnionAndStore(key, otherKeys, destKey);
}
private void hashCacheBlog(Blog blog, long expireTime) {
String key = "rank:blog:" + blog.getId();
boolean hasKey = redisUtils.hasKey(key);
if(!hasKey){
redisUtils.hset(key, "blog:id", blog.getId(), expireTime);
redisUtils.hset(key, "blog:title", blog.getTitle(), expireTime);
redisUtils.hset(key, "blog:commentCount", blog.getCommentCount(), expireTime);
}
}
}
测试结果如下
127.0.0.1:6379> keys *
1) "day:rank:20220427"
2) "day:rank:20220428"
3) "rank:blog:3"
4) "rank:blog:1"
5) "rank:blog:2"
6) "week:rank"
127.0.0.1:6379> zrevrange week:rank 0 -1 withscores
1) "2"
2) "3"
3) "1"
4) "2"
5) "3"
6) "0"
127.0.0.1:6379> zrange day:rank:20220428 0 -1 withscores
1) "1"
2) "2"
3) "2"
4) "3"
127.0.0.1:6379> hgetall rank:blog:3
1) "blog:id"
2) "3"
3) "blog:title"
4) "\"111111111111\""
5) "blog:commentCount"
6) "0"
2、获取本周热议
这里我们还是使用自定义标签的方式来进行获取
package com.manster.template;
import com.manster.common.templates.DirectiveHandler;
import com.manster.common.templates.TemplateDirective;
import com.manster.util.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import java.util.*;
@Component
public class HotsTemplate extends TemplateDirective {
@Autowired
RedisUtils redisUtils;
@Override
public String getName() {
return "hots";
}
@Override
public void execute(DirectiveHandler handler) throws Exception {
Set<ZSetOperations.TypedTuple> typedTuples = redisUtils.getZSetRank("week:rank", 0, 6);
List<Map> hotBlogs = new ArrayList<>();
for (ZSetOperations.TypedTuple typedTuple : typedTuples) {
Map<String, Object> map = new HashMap<>();
Object value = typedTuple.getValue();
String blogKey = "rank:blog:" + value;
map.put("id", value);
map.put("title", redisUtils.hget(blogKey, "blog:title"));
map.put("commentCount", typedTuple.getScore());
hotBlogs.add(map);
}
handler.put(RESULTS, hotBlogs).render();
}
}
FreeMarkerConfig 然后我们在配置类中增加这个标签
@PostConstruct
public void setUp(){
configuration.setSharedVariable("timeAgo", new TimeAgoMethod());
configuration.setSharedVariable("blogs", blogsTemplate);
configuration.setSharedVariable("hots", hotsTemplate);
}
right.ftl
<@hots>
<#list results as blog>
<dd>
<a href="/blog/${blog.id}">${blog.title}</a>
<span><i class="iconfont icon-pinglun1"></i> ${blog.commentCount}</span>
</dd>
</#list>
</@hots>
3、增加评论数
接下来我们需要做的就是在增加评论的时候将这篇文章的评论数+1,同步到 redis 中并且重新进行并集计算。
- 在我们的缓存中,启动时就缓存了近7天内发布的文章,但是新增的评论的文章可能是之前没有的,所以我们还需要注意
@Override
public void incrCommentCount(long blogId, boolean isIncr) {
String key = "day:rank:" + DateUtil.format(new Date(), DatePattern.PURE_DATE_FORMAT);
redisUtils.zIncrementScore(key, blogId, isIncr ? 1 : -1);
Blog blog = this.getById(blogId);
long between = DateUtil.between(new Date(), blog.getCreated(), DateUnit.DAY);
long expireTime = (7 - between) * 24 * 60 * 60;
redisUtils.expire(key, expireTime);
this.hashCacheBlog(blog, expireTime);
this.zunionAndStoreForWeekRank();
}
3、文章阅读量同步
我们将文章的阅读量进行缓存,避免过多的操作数据库,但是也要定时将数据同步到数据库里面
1、缓存阅读量
每当我们请求博客详情时,我们就执行这个操作
BlogController
@Override
public void putViewCount(BlogVo blogVo) {
String key = "blog:viewCount:" + blogVo.getId();
Integer viewCount = (Integer) redisUtils.get(key);
if(viewCount !=null){
blogVo.setViewCount(viewCount + 1);
}else {
blogVo.setViewCount(blogVo.getViewCount() + 1);
}
redisUtils.set(key, blogVo.getViewCount());
}
2、同步数据
我们需要做一个定时器来将其同步到数据库中
package com.manster.schedules;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.manster.entity.Blog;
import com.manster.service.BlogService;
import com.manster.util.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@Component
public class ViewCountSyncTask {
@Autowired
RedisUtils redisUtils;
@Autowired
RedisTemplate redisTemplate;
@Autowired
BlogService blogService;
@Scheduled(cron = "0 0/1 * * * *")
public void task(){
Set<String> keys = redisTemplate.keys("blog:viewCount:*");
List<String> ids = new ArrayList<>();
for (String key : keys) {
String blogId = key.substring("blog:viewCount:".length());
if(redisUtils.hasKey(key)){
ids.add(blogId);
}
}
if(ids.isEmpty()) return;
List<Blog> blogs = blogService.list(new QueryWrapper<Blog>().in("id", ids));
blogs.stream().forEach(blog -> blog.setViewCount((Integer) redisUtils.get("blog:viewCount:"+blog.getId())));
if(blogs.isEmpty()) return;
boolean isSucc = blogService.updateBatchById(blogs);
if (isSucc){
ids.stream().forEach(id -> redisUtils.del("blog:viewCount:" + id));
System.out.println("----------------同步成功-------------");
}
}
}
4、及时通知作者评论内容
1、回复评论
package com.manster.controller;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.manster.common.lang.Result;
import com.manster.entity.Blog;
import com.manster.entity.Comment;
import com.manster.entity.User;
import com.manster.entity.UserMessage;
import com.manster.service.CommentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.manster.controller.BaseController;
@RestController
@RequestMapping("/comment")
public class CommentController extends BaseController {
@Autowired
CommentService commentService;
@ResponseBody
@PostMapping("/reply")
public Result reply(Long bid, String content){
Assert.notNull(bid, "找不到对应的文章");
Assert.hasLength(content, "评论内容不能为空");
Blog blog = blogService.getById(bid);
Assert.isTrue(blog!=null, "该文章已被删除");
Comment comment = new Comment();
comment.setBlogId(bid);
comment.setContent(content);
comment.setUserId(getProfileId());
comment.setLevel(0);
comment.setVoteDown(0);
comment.setVoteUp(0);
commentService.save(comment);
blog.setCommentCount(blog.getCommentCount() + 1);
blogService.updateById(blog);
blogService.incrCommentCount(bid, true);
if(!comment.getUserId().equals(blog.getUserId())){
UserMessage message = new UserMessage();
message.setBlogId(bid);
message.setCommentId(comment.getId());
message.setFromUserId(getProfileId());
message.setToUserId(blog.getUserId());
message.setContent(content);
message.setType(1);
message.setStatus(0);
userMessageService.save(message);
}
if (content.startsWith("@")){
String username = content.substring(1, content.indexOf(""));
User user = userService.getOne(new QueryWrapper<User>().eq("username", username));
if(user != null){
UserMessage message = new UserMessage();
message.setBlogId(bid);
message.setCommentId(comment.getId());
message.setFromUserId(getProfileId());
message.setToUserId(user.getId());
message.setContent(content);
message.setType(2);
message.setStatus(0);
userMessageService.save(message);
}
}
return Result.success().action("/blog/"+bid);
}
}
2、导入 webscoket
因为评论后我们会发送消息给作者或者@的用户,为了即时通知,我们需要使用 websocket
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
然后我们配置 websocket
package com.manster.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@EnableAsync
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/user/","/topic/");
registry.setApplicationDestinationPrefixes("/app");
}
}
在前端页面也进行添加,在登陆后加载 websocket,我们需要在 layout.ftl 先导入 sock.js 、stomp.js
<script src="/res/js/sock.js"></script>
<script src="/res/js/stomp.js"></script>
然后我们编写 js 来加载 websocket
<script>
function showTips(count) {
var msg = $('<a class="fly-nav-msg" href="javascript:;">'+ count +'</a>');
var elemUser = $('.fly-nav-user');
elemUser.append(msg);
msg.on('click', function(){
location.href = "/user/message";
});
layer.tips('你有 '+ count +' 条未读消息', msg, {
tips: 3
,tipsMore: true
,fixed: true
});
msg.on('mouseenter', function(){
layer.closeAll('tips');
})
}
$(function () {
var elemUser = $('.fly-nav-user');
if(layui.cache.user.uid !== -1 && elemUser[0]){
var socket = new SockJS("/websocket");
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
stompClient.subscribe("/user/" + <@shiro.principal property="id"/> + "/messageCount", function (res) {
showTips(res.body);
})
});
}
})
</script>
- 当我们发送消息到
/user/id/messageCount 这个通道的时候,前端就可以实时收到消息进行展示
3、发送消息
if(!comment.getUserId().equals(blog.getUserId())){
UserMessage message = new UserMessage();
message.setBlogId(bid);
message.setCommentId(comment.getId());
message.setFromUserId(getProfileId());
message.setToUserId(blog.getUserId());
message.setContent(content);
message.setType(1);
message.setStatus(0);
userMessageService.save(message);
webSocketService.sendMessageCountToUser(message.getToUserId());
}
创建 WebSocketService,WebSocketServiceImpl 来实现消息发送
package com.manster.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.manster.entity.UserMessage;
import com.manster.service.UserMessageService;
import com.manster.service.WebSocketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.messaging.simp.SimpMessagingTemplate;
@Service
public class WebSocketServiceImpl implements WebSocketService {
@Autowired
UserMessageService userMessageService;
@Autowired
SimpMessagingTemplate messagingTemplate;
@Async
@Override
public void sendMessageCountToUser(Long toUserId) {
int count = userMessageService.count(new QueryWrapper<UserMessage>()
.eq("to_user_id", toUserId)
.eq("status", 0));
messagingTemplate.convertAndSendToUser(toUserId.toString(), "/messageCount", count);
System.out.println(toUserId);
}
}
5、elasticsearch 实现搜索功能
1、简介
结合我们学习过的内容,我们之前学习搜索引擎,学过 lucene 还有 elasticsearch,lucene 比较适合单体项目,不适合分布式。
对于数据之间的同步,比如修改或者删除了数据,es是基于内存的,我们需要通知它来进行相应数据的同步,我们就需要加入一些别的中间件来通知进行同步了,有以下三种方案:
- elasticsearch + RabbitMq
- elasticsearch + canal
- elasticsearch + logstash
这次搜索我们用的是 es,es 与数据库之间的内容同步我们用的是 RabbitMq 进行一步同步。下面我们一一来实现这些功能。
首先我们来分析一下我们要开发的功能。
- 搜索功能
- es 数据初始化
- es 与数据库的异步同步功能
集成 elasticsearch 的方式有很多,
- 比较原生的 TransportClient client
- spring 提供的 ElasticsearchTemplate
- spring jpa 提供的 ElasticsearchRepository
其中使用 ElasticsearchRepository 应该是开发量最小的一种方式,使用 template 或者 TransportClient client 方式可能会更灵活。
我们之前有学过 spring data jpa,一种可以按照命名规则就可以查库的方式,在搜索单表时候特别方便。
这次开发,我们使用 ElasticsearchRepository 的方式,当然,引入了这个包之后,你也可以使用 ElasticsearchTemplate 来开发。spring 都会自动帮你注入生成。
2、配置
1、首先我们导入 jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.modelmapper</groupId>
<artifactId>modelmapper</artifactId>
<version>1.1.0</version>
</dependency>
windows下安装 elasticsearch
2、然后我们对 es,rabbitmq 进行配置
spring:
data:
elasticsearch:
cluster-name: elasticsearch
cluster-nodes: localhost:9300
repositories:
enabled: true
rabbitmq:
username: guest
password: guest
host: localhost
port: 5672
在这里,可能会产生一个依赖冲突,因为 redis 和 es 底层都要依赖 netty,当这两个依赖的 netty 版本不一致的时候就会产生冲突,对此我们在启动类中加入
System.setProperty("es.set.netty.runtime.available.processors", "false");
3、前端分析
首先我们分析一下前端代码 mods/index.js
$('.fly-search').on('click', function(){
layer.open({
type: 1
,title: false
,closeBtn: false
,shadeClose: true
,maxWidth: 10000
,skin: 'fly-layer-search'
,content: ['<form action="/search">'
,'<input autocomplete="off" placeholder="搜索内容,回车跳转" type="text" name="q">'
,'</form>'].join('')
,success: function(layero){
var input = layero.find('input');
input.focus();
layero.find('form').submit(function(){
var val = input.val();
if(val.replace(/\s/g, '') === ''){
return false;
}
});
}
})
});
- 我们可以看出它是弹个窗跳到了 bing 去进行搜索
- 我们将其进行修改为直接写一个 /search 接口,而关键字 name 为 q
- 我们注释掉搜索后的前缀值
4、接口编写
然后我们进行接口的编写
@RequestMapping("/search")
public String search(String q){
request.setAttribute("q", q);
request.setAttribute("pageData", null);
return "search";
}
我们返回一个 search 页面,就照着 index 页面即可,这里要注意我们封装的分页组件请求路径后面直接拼接 pn,但是我们这里还需要原来的 q 查询关键字条件,所以我们不直接使用组件了,而是根据这个页面的特点来重新写请求路径
<#include "/inc/layout.ftl" />
<@layout "搜索 - ${q}">
<#include "/inc/header-panel.ftl" />
<div class="layui-container">
<div class="layui-row layui-col-space15">
<div class="layui-col-md8">
<div class="fly-panel">
<div class="fly-panel-title fly-filter">
<a>正在搜索关键字: “ <strong style="color: #FF7200">${q}</strong> ” - 共有 <strong style="color: #FF7200">${pageData.total}</strong> 条记录</a>
</div>
<ul class="fly-list">
<#list pageData.records as blog>
<@blisting blog></@blisting>
</#list>
</ul>
<div style="text-align: center">
<div id="laypage-main">
</div>
<script>
layui.use('laypage', function(){
var laypage = layui.laypage;
laypage.render({
elem: 'laypage-main'
,count: ${pageData.total}
,curr: ${pageData.current}
,limit: ${pageData.size}
,jump: function(obj, first){
if(!first){
location.href = "?q=${q}&pn=" + obj.curr;
}
}
});
});
</script>
</div>
</div>
</div>
<#include "/inc/right.ftl" />
</div>
</div>
</@layout>
5、搜索类
1、实体
我们将所有关于搜索的东西都放在 search 包下,首先我们需要一个 model 类来存储 es 对象
package com.manster.search.model;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
import java.util.Date;
@Data
@Document(indexName = "blog", type = "blog", createIndex = true)
public class BlogDocument implements Serializable {
@Id
private Long id;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String title;
private Long authorId;
@Field(type = FieldType.Keyword)
private String authorName;
private String authorAvatar;
private Long categoryId;
@Field(type = FieldType.Keyword)
private String categoryName;
private Boolean recommend;
private Integer level;
private Integer commentCount;
private Integer viewCount;
@Field(type = FieldType.Date)
private Date created;
}
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") 表示使用 ik 分词器@Field(type = FieldType.Keyword) 表明这是一个关键字分词,不需要进行切分分词
2、持久
我们还需要持久层操作,repository,这里就使用符合 JPA 命名规范的操作语法就可以进行操作了
package com.manster.search.repository;
import com.manster.search.model.BlogDocument;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface BlogRepository extends ElasticsearchRepository<BlogDocument, Long> {
}
这时,我们就可以启动 head 插件来看一下是否启动是就帮助我们创建了 Blog 的 index(类似于数据库)
6、实现搜索
1、搜索
我们创建 SearchService 以及其实现类,并将其注入到 BaseController 中
@RequestMapping("/search")
public String search(String q){
IPage pageData = searchService.search(getPage(), q);
request.setAttribute("q", q);
request.setAttribute("pageData", pageData);
return "search";
}
然后我们实现 search 方法
package com.manster.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.manster.search.model.BlogDocument;
import com.manster.search.repository.BlogRepository;
import com.manster.service.SearchService;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
@Service
public class SearchServiceImpl implements SearchService {
@Autowired
BlogRepository blogRepository;
@Override
public IPage search(Page page, String keyword) {
Long current = page.getCurrent() - 1;
Long size = page.getSize();
Pageable pageable = PageRequest.of(current.intValue(), size.intValue());
MultiMatchQueryBuilder builder = QueryBuilders.multiMatchQuery(keyword, "title", "authorName", "categoryName");
org.springframework.data.domain.Page<BlogDocument> documents = blogRepository.search(builder, pageable);
IPage pageData = new Page(page.getCurrent(), page.getSize(), documents.getTotalElements());
pageData.setRecords(documents.getContent());
return pageData;
}
}
2、全量同步
在项目启动的时候,我们就将全部博客信息都存入 es 中,在这里我们决定给管理员一个同步的按钮,让其进行同步
AdminController
@ResponseBody
@PostMapping("/initEsData")
public Result initEsData(){
int size = 10000;
Page page = new Page();
page.setSize(size);
long total = 0;
for (int i = 1; i < 1000; i++) {
page.setCurrent(i);
IPage<BlogVo> paging = blogService.paging(page, null, null, null, null, null);
int num = searchService.initEsData(paging.getRecords());
total += num;
if(paging.getRecords().size() < size){
break;
}
}
return Result.success("ES索引初始化成功,共" + total + "条记录", null);
}
初始化操作 SearchServiceImpl
@Override
public int initEsData(List<BlogVo> records) {
if(records == null || records.isEmpty()){
return 0;
}
List<BlogDocument> documents = new ArrayList<>();
for (BlogVo record : records) {
BlogDocument blogDocument = new BlogDocument();
BeanUtils.copyProperties(record, blogDocument);
documents.add(blogDocument);
}
blogRepository.saveAll(documents);
return documents.size();
}
前端增加一个按钮
<@shiro.hasRole name="admin">
<li lay-id="es">ES数据同步</li>
</@shiro.hasRole>
......
<#-- es数据同步 -->
<@shiro.hasRole name="admin">
<div class="layui-form layui-form-pane layui-tab-item">
<form action="/admin/initEsData" method="post">
<div class="layui-form-item">
<button class="layui-btn" key="set-mine" lay-filter="*" lay-submit alert="true">确认同步</button>
</div>
</form>
</div>
</@shiro.hasRole>
此时我们就可以实现数据的同步与搜索了
6、rabbitmq 实现 es 数据同步
1、配置mq
当将文章进行编辑以后,我们将其更新到 es 中,所以我们首先要对 rabbitmq 进行配置
package com.manster.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public final static String ES_QUEUE = "es_queue";
public final static String ES_EXCHANGE = "es_exchange";
public final static String ES_BIND_KEY = "es_exchange";
@Bean
public Queue exQueue(){
return new Queue(ES_QUEUE);
}
@Bean
DirectExchange exchange(){
return new DirectExchange(ES_EXCHANGE);
}
@Bean
Binding binding(Queue exQueue, DirectExchange exchange){
return BindingBuilder.bind(exQueue).to(exchange).with(ES_BIND_KEY);
}
}
当我们发送一条消息到交换机 es_exchange 并指定 RoutingKey 为 es_exchange 后,会把消息存到队列 es_queue 里面,然后我们的消费者监听到队列消息后我们就可以进行消费了。
2、生产消息
然后我们就需要定义发送包含什么内容的消息到消息队列
package com.manster.search.mq;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class BlogMqIndexMessage implements Serializable {
public static final String CREATE_OR_UPDATE = "create_update";
public static final String REMOVE = "remove";
private Long blogId;
private String type;
}
然后我们就在修改博客时发送消息
amqpTemplate.convertAndSend(RabbitConfig.ES_EXCHANGE, RabbitConfig.ES_BIND_KEY,
new BlogMqIndexMessage(blog.getId(), BlogMqIndexMessage.CREATE_OR_UPDATE));
amqpTemplate.convertAndSend(RabbitConfig.ES_EXCHANGE, RabbitConfig.ES_BIND_KEY,
new BlogMqIndexMessage(blog.getId(), BlogMqIndexMessage.REMOVE));
3、消费消息
然后我们对消息进行消费
package com.manster.search.mq;
import com.manster.config.RabbitConfig;
import com.manster.service.SearchService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(queues = RabbitConfig.ES_QUEUE)
public class MqMessageHandler {
@Autowired
SearchService searchService;
@RabbitHandler
public void handler(BlogMqIndexMessage message){
switch (message.getType()){
case BlogMqIndexMessage.CREATE_OR_UPDATE:
searchService.createOrUpdate(message);
break;
case BlogMqIndexMessage.REMOVE:
searchService.removeIndex(message);
break;
default:
log.error("没有对应的消息类型 -->> {}", message.toString());
break;
}
}
}
然后我们实现这两种消息的消费
@Override
public void createOrUpdate(BlogMqIndexMessage message) {
Long blogId = message.getBlogId();
BlogVo blogVo = blogService.getOneBlog(new QueryWrapper<Blog>().eq("b.id", blogId));
BlogDocument blogDocument = new BlogDocument();
BeanUtils.copyProperties(blogVo, blogDocument);
blogRepository.save(blogDocument);
log.info("es 更新一条数据 -->> {}", message.toString());
}
@Override
public void removeIndex(BlogMqIndexMessage message) {
Long blogId = message.getBlogId();
blogRepository.deleteById(blogId);
log.info("es 删除一条数据 -->> {}", message.toString());
}
至此,我们的同步到 es 的目的就到达了
7、群聊
1、群聊Demo
今天就来完成一个聊天室的功能。
技术选型:
- 前端 layim、websocket
- 后端 t-io websocekt 版
首先我们先来把 layim 的界面先运行起来。layim 是 layui 的一个付费模块,首先我们把 layui 的静态资源包放到 static 中,关于 layim,因为不是完全开源的产品,所以我就不给出具体的包了。
layim 官网
1、引入 layIM
首先引进相关 layim 模块插件,这个插件不是开源的,线上使用需要进行捐赠,如果需要在商业中使用最好进行捐赠哈。
然后按照官网给出的例子,我们来搞个最简单的 hello word。在这之前,我们先要获取一下插件,原则上我们应该通过捐赠形式获取,不过只为了学习,所以就直接从网络上搜索了一个,相关的 js 如下:
将 layim.js 导入到 resources\static\res\layui\lay\modules 目录下
将 layim 样式文件夹 导入到 resources\static\res\layui\css\modules 目录下
2、编写demo
然后根据官方文档,我们在首页的正下方有个群聊按钮,点击之后可以打开群聊窗口进行群聊。所以为了所有页面都能聊天,所以把 js 写在全局模板中
<script>
$(function () {
layui.use('layim', function(layim){
layim.config({
brief: true
,min: true
}).chat({
name: '客服姐姐'
,type: 'friend'
,avatar: 'https://picsum.photos/id/1027/40'
,id: -2
});
layim.setChatMin();
});
});
</script>
这段 js 的效果如下,我们来分析一下:
- layim.config 表示进行初始化配置
- brief: true 表示简约模式,只有一个聊天窗口
- .chat 是声明并打开一个聊天窗口
- layim.setChatMin(); 表示收缩聊天面板。
点击之后的效果:
ok,上面是我们最简单的一个聊天窗口已经可以展示出来了,不过现在还没有功能,还不能相互聊天,接下来我们会给每个窗口一个身份,然后进行相互聊天。
2、封装js
我们在 resources\static\res\js 目录下新建一个 chat.js来进行封装
layui.use('layim', function(layim){
layim.config({
brief: true
,min: true
}).chat({
name: '客服姐姐'
,type: 'friend'
,avatar: 'https://picsum.photos/id/1027/40'
,id: -2
});
layim.setChatMin();
});
然后在 layout.ftl 中进行引入
<script src="/res/js/chat.js"></script>
然后我们分析一下要做的事情
- 建立 ws 连接
- 历史聊天信息回显
- 获取个人、群聊信息并打开聊天窗口
- 发送消息
- 接受消息
- 心跳、断开重连机制
由于这些方法有的需要相互调用,所以我们最好再建一个 js 文件进行封装,方便我们调用,创建 im.js,将其也导入 layout.ftl
<script src="/res/js/im.js"></script>
3、t-io 集成 websocket
1、tio简介
(初始化服务器)
(客户端与服务端通讯流程)
然后集成 t-io 之后要去实现的消息逻辑处理:
常用类说明:
通过以上内容我们知道了几个比较关键的类,也是我们再初始化启动 t-io 服务的几个关键类。
我们先来说明一下几个比较重要的类
以上就是我们需要清楚的 3 个类。有了这 3 个类之后我们就可以启动我们的服务,进行 ws 的连接了。
2、集成
因为我们这次要实现的功能是 t-io 集成 websocket。而 t-io 为我们已经帮我们集成了一套代码,帮我们省去了协议升级等步骤,这样我我们就不需要去手动写很多升级协议等代码了。
我们接着集成 t-io 的 websocket。因为是直接有一套集成框架,所以我这里直接引入版本:https://mvnrepository.com/artifact/org.t-io/tio-websocket-server
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-server</artifactId>
<version>3.2.5.v20190101-RELEASE</version>
</dependency>
4、实现
首先我们整理一下思路
- 后端服务
- 我们需要一个配置
ImServerConfig ,将启动类进行初始化 - 启动类需要端口号和消息的处理器(
ImWsMsgHandler implements IWsMsgHandler ) - 消息处理器中需要鉴别消息类型,对此我们写一个
MsgHandlerFactory ,使用 map 来进行存储与鉴别 - 而真正进行消息处理我们就在
ImWsMsgHandler 中
- 握手前
handshake ,我们获取用户的 id 来绑定个人通道 - 握手后
onAfterHandshaked ,我们将根据群聊名来绑定群聊通道 - 在接受消息时
onText ,我们将其传输来的 JSON 数据转化为 map 进行接收,然后根据 MsgHandlerFactory 获取消息类型对应的处理器,然后调用处理器的处理方法 handler 来进行处理
PingMsgHandler , 我们就简单打印一下ChatMsgHandler , 我们对消息进行一些处理。
- 首先我们还是将数据由 json 转为 map,获取发送用户与接受目标
- 然后装载实体
- 其次将消息进行发送到群聊,在发送时我们需要一个过滤器,不将群组消息发送到自己所在的通道(因为 layim 会自动将自己发送的消息直接显示,我们将不需要后端再给自己显示一遍了)
- 最后我们将该条消息存入 redis,方便查看聊天历史记录
- 前端
- 首先我们使用 layui 的 layim 组件,而我们需要的许多方法之间需要互相调用,为了方便调用我们再封装一个 js 来编写方法
- 该对象中,我们首先需要进行获取个人、群聊信息,并打开聊天窗口
- 然后我们需要进行与后端 tio 建立连接,主要包括
onopen ,onclose ,onmessage 三个方法,同时我们也要设计心跳消息与重连 - 其次我们实现消息的发送
- 最后我们实现历史聊天记录的回显
1、后端服务
首先我们需要对 tio 服务进行配置启动
package com.manster.config;
import com.manster.im.handler.MsgHandlerFactory;
import com.manster.im.server.ImServerStarter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Slf4j
@Configuration
public class ImServerConfig {
@Value("${im.server.port}")
private int imPort;
@Bean
ImServerStarter imServerStarter(){
try {
ImServerStarter serverStarter = new ImServerStarter(imPort);
serverStarter.start();
MsgHandlerFactory.init();
return serverStarter;
} catch (IOException e) {
log.error("tio server 启动失败", e);
}
return null;
}
}
关于以上这段配置,我们需要三个东西,首先就是配置端口,然后是启动器 ImServerStarter ,最后是 MsgHandlerFactory 消息分类处理器
我们在 application.yml 中进行消息处理的端口配置
im:
server:
port: 9326
这里我们首先需要我们的 tio 服务进行启动 ImServerStarter
package com.manster.im.server;
import lombok.extern.slf4j.Slf4j;
import org.tio.server.ServerGroupContext;
import org.tio.websocket.server.WsServerStarter;
import org.tio.websocket.server.handler.IWsMsgHandler;
import java.io.IOException;
@Slf4j
public class ImServerStarter {
private WsServerStarter starter;
public ImServerStarter(int port) throws IOException {
IWsMsgHandler handler = new ImWsMsgHandler();
starter = new WsServerStarter(port, handler);
ServerGroupContext serverGroupContext = starter.getServerGroupContext();
serverGroupContext.setHeartbeatTimeout(5000);
}
public void start() throws IOException {
starter.start();
log.info("tio server start !!");
}
}
根据以上代码,我们又需要一个消息处理器,也就是关于消息的处理器
其中 ImWsMsgHandler 为我们要对消息进行处理的地方,比如握手前,握手后,接受消息,关闭时要进行的操作
package com.manster.im.server;
import cn.hutool.core.map.MapUtil;
import cn.hutool.json.JSONUtil;
import com.manster.common.lang.Consts;
import com.manster.im.handler.MsgHandler;
import com.manster.im.handler.MsgHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.server.handler.IWsMsgHandler;
import java.util.Map;
@Slf4j
public class ImWsMsgHandler implements IWsMsgHandler {
@Override
public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
String userId = httpRequest.getParam("userId");
log.info("{} -------------> 正在握手!", userId);
Tio.bindUser(channelContext, userId);
return httpResponse;
}
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
Tio.bindGroup(channelContext, Consts.IM_GROUP_NAME);
log.info("{} -------------> 已绑定群!", channelContext.getId());
}
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
return null;
}
@Override
public Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
log.info("接收到信息---------->{}", s);
Map map = JSONUtil.toBean(s, Map.class);
String type = MapUtil.getStr(map, "type");
String data = MapUtil.getStr(map, "data");
MsgHandler handler = MsgHandlerFactory.getMsgHandler(type);
handler.handler(data, wsRequest, channelContext);
return null;
}
@Override
public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
return null;
}
}
然后我们需要进行初始化消息处理器类别,也就是判别传来的消息是一个什么类型的,我们针对类型进行不同的处理
比如 ping 心跳,就不进行处理,或者只进行日志的打印;但是针对 chat 型消息,我们需要对其进行转换、发送等处理。
package com.manster.im.handler;
import com.manster.common.lang.Consts;
import com.manster.im.handler.impl.ChatMsgHandler;
import com.manster.im.handler.impl.PingMsgHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class MsgHandlerFactory {
private static Map<String, MsgHandler> handlerMap = new HashMap<>();
public static void init(){
handlerMap.put(Consts.IM_MESS_TYPE_CHAT, new ChatMsgHandler());
handlerMap.put(Consts.IM_MESS_TYPE_PING, new PingMsgHandler());
log.info("handler factory init!!");
}
public static MsgHandler getMsgHandler(String type){
return handlerMap.get(type);
}
}
public interface MsgHandler {
void handler(String data, WsRequest wsRequest, ChannelContext channelContext);
}
PingMsgHandler ,ping 消息处理器
package com.manster.im.handler.impl;
import com.manster.im.handler.MsgHandler;
import org.tio.core.ChannelContext;
import org.tio.websocket.common.WsRequest;
public class PingMsgHandler implements MsgHandler {
@Override
public void handler(String data, WsRequest wsRequest, ChannelContext channelContext) {
System.out.println("ping~~");
}
}
ChatMsgHandler chat 消息处理器
package com.manster.im.handler.impl;
import cn.hutool.json.JSONUtil;
import com.manster.common.lang.Consts;
import com.manster.im.handler.MsgHandler;
import com.manster.im.handler.filter.ExcludeMineChannelContextFilter;
import com.manster.im.message.ChatInMess;
import com.manster.im.message.ChatOutMess;
import com.manster.im.vo.ImMess;
import com.manster.im.vo.ImTo;
import com.manster.im.vo.ImUser;
import com.manster.service.ChatService;
import com.manster.util.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import java.util.Date;
@Slf4j
public class ChatMsgHandler implements MsgHandler {
@Override
public void handler(String data, WsRequest wsRequest, ChannelContext channelContext) {
ChatInMess chatInMess = JSONUtil.toBean(data, ChatInMess.class);
ImUser mine = chatInMess.getMine();
ImTo to = chatInMess.getTo();
ImMess imMess = new ImMess();
imMess.setContent(mine.getContent());
imMess.setAvatar(mine.getAvatar());
imMess.setId(Consts.IM_DEFAULT_GROUP_ID);
imMess.setMine(false);
imMess.setUsername(mine.getUsername());
imMess.setFromid(mine.getId());
imMess.setTimestamp(new Date());
imMess.setType(to.getType());
ChatOutMess chatOutMess = new ChatOutMess();
chatOutMess.setEmit("chatMessage");
chatOutMess.setData(imMess);
String result = JSONUtil.toJsonStr(chatOutMess);
log.info("群聊消息---------> {}", result);
WsResponse wsResponse = WsResponse.fromText(result, "utf-8");
ExcludeMineChannelContextFilter filter = new ExcludeMineChannelContextFilter();
filter.setCurrentContext(channelContext);
Tio.sendToGroup(channelContext.getGroupContext(), Consts.IM_GROUP_NAME, wsResponse, filter);
ChatService chatService = (ChatService) SpringUtils.getBean("chatService");
chatService.setGroupHistoryMsg(imMess);
}
}
其中的 ChatInMess,ChatOutMess,ImUser,ImTo,ImMess 都是根据 layim 传来的消息或者接受的接口类型进行封装接受的实体类
在群聊时,我们自己发送的消息直接根据前端显示,所以我们自己就不需要后端传递来的消息了,对此我们在发送群组消息时进行过滤,在发送群组消息时,我们不给自己这一个成员发送消息 ExcludeMineChannelContextFilter 。
package com.manster.im.handler.filter;
import lombok.Data;
import org.tio.core.ChannelContext;
import org.tio.core.ChannelContextFilter;
@Data
public class ExcludeMineChannelContextFilter implements ChannelContextFilter {
private ChannelContext currentContext;
@Override
public boolean filter(ChannelContext channelContext) {
if(currentContext.userid.equals(channelContext.userid)){
return false;
}
return true;
}
}
然后当我们在查看历史消息的时候,我们需要一些历史消息,我们就在 redis 中存储一段时间 ChatServiceImpl
@Override
public void setGroupHistoryMsg(ImMess imMess) {
redisUtils.lSet(Consts.IM_GROUP_HISTROY_MSG_KEY, imMess, 24 * 60 * 60);
}
而由于我们的 ChatMsgHandler 没有被 spring 管理,所以我们需要 SpringUtils 工具类去 spring 中获取到我们的 ChatServiceImpl ,我们也为这个实现类自己命名 @Service("chatService")
至此消息的后端处理便完成了。
2、前端处理
首先我们根据 layui 的 layim 开发文档进行调用
layui.use('layim', function(layim){
var $ = layui.jquery;
layim.config({
brief: true
,voice: false
,chatLog: layui.cache.dir + 'css/modules/layim/html/chatlog.html'
});
var tiows = new tio.ws($, layim);
tiows.openChatWindow()
tiows.connect();
layim.on('sendMessage', function (mess) {
tiows.sendMessage(mess);
})
});
然后我们真正开始实现这些方法。
首先为了方便调用我们将这个 js 写为一个对象,然后编写其构造方法
if (typeof (tio) == "undefined") {
tio = {};
}
tio.ws = {};
tio.ws = function ($, layim) {
var self = this;
}
首先我们与后端建立连接,并实现监听的方法
this.connect = function () {
var url = "ws://localhost:9326?userId=" + self.userId;
var socket = new WebSocket(url);
self.socket = socket;
socket.onopen = function () {
console.log("tio ws 启动 ~")
self.lastInteractionTime(new Date().getTime());
self.ping();
};
socket.onclose = function () {
console.log("tio ws 关闭 ~")
self.reconn();
};
socket.onmessage = function (res) {
console.log("接受到了消息!!")
console.log(res)
res = eval('(' + res.data + ')');
if(res.emit === 'chatMessage'){
layim.getMessage(res.data);
}
self.lastInteractionTime(new Date().getTime());
};
}
其中,这时我们就可以进行心跳消息的发送了
this.heartbeatTimeout = 5000;
this.heartbeatSendInterval = this.heartbeatTimeout / 2;
......
this.lastInteractionTime = function () {
if (arguments.length == 1) {
this.lastInteractionTimeValue = arguments[0]
}
return this.lastInteractionTimeValue
}
this.ping = function () {
console.log("------------->准备心跳中~");
self.pingIntervalId = setInterval(function () {
var iv = new Date().getTime() - self.lastInteractionTime();
if ((self.heartbeatSendInterval + iv) >= self.heartbeatTimeout) {
self.socket.send(JSON.stringify({
type: 'pingMessage'
,data: 'ping'
}))
console.log("------------->心跳中~")
}
}, self.heartbeatSendInterval)
};
this.reconn = function () {
clearInterval(self.pingIntervalId);
self.connect();
};
然后我们进行获取个人的信息,群聊信息并打开聊天窗口
this.openChatWindow = function () {
$.ajax({
url: "/chat/getMineAndGroupData",
async: false,
success: function (res) {
self.group = res.data.group;
self.mine = res.data.mine;
self.userId = self.mine.id;
}
});
console.log(self.group);
console.log(self.mine);
var cache = layui.layim.cache();
cache.mine = self.mine;
layim.chat(self.group);
layim.setChatMin();
}
在后端我们也进行相应的接口编写,并赋予默认值
@RestController
@RequestMapping("/chat")
public class ChatController extends BaseController {
@Autowired
ChatService chatService;
@GetMapping("/getMineAndGroupData")
public Result getMineAndGroupData(){
Map<String, Object> group = new HashMap<>();
group.put("name", "社区群聊");
group.put("type", "group");
group.put("id", Consts.IM_DEFAULT_GROUP_ID);
group.put("avatar", "https://picsum.photos/id/625/200");
group.put("members", 0);
ImUser user = chatService.getCurrentUser();
return Result.success(MapUtil.builder()
.put("group", group)
.put("mine", user)
.map());
}
我们实现获取用户信息的接口,ChatServiceImpl
@Override
public ImUser getCurrentUser() {
AccountProfile principal = (AccountProfile) SecurityUtils.getSubject().getPrincipal();
ImUser user = new ImUser();
if(principal != null){
user.setId(principal.getId());
user.setAvatar(principal.getAvatar());
user.setUsername(principal.getUsername());
user.setStatus(ImUser.ONLINE_STATUS);
}else {
user.setAvatar("https://picsum.photos/id/1011/200");
Long imUserId = (Long) SecurityUtils.getSubject().getSession().getAttribute("imUserId");
user.setId(imUserId != null ? imUserId : RandomUtil.randomLong());
SecurityUtils.getSubject().getSession().setAttribute("imUserId", user.getId());
user.setUsername("匿名用户");
user.setStatus(ImUser.ONLINE_STATUS);
}
return user;
}
然后我们实现消息的发送
this.sendMessage = function (mess) {
self.socket.send(JSON.stringify({
type: 'chatMessage'
,data: mess
}));
}
最后我们实现前端的历史消息回显
this.initHistoryMess = function () {
localStorage.clear();
$.ajax({
url: '/chat/getGroupHistoryMsg',
success: function (res){
var data = res.data;
if(data.length < 1){
return;
}
for(var i in data){
layim.getMessage(data[i])
}
}
});
}
后端也编写相应的接口 ChatController
@GetMapping("/getGroupHistoryMsg")
public Result getGroupHistoryMsg(){
List<Object> messages = chatService.getGroupHistoryMsg(20);
return Result.success(messages);
}
实现获取历史聊天记录,而我们在 tio 返回消息的时候就已经将消息都存入 redis 中,所以我们直接取出即可 ChatServiceImpl
@Override
public void setGroupHistoryMsg(ImMess imMess) {
redisUtils.lSet(Consts.IM_GROUP_HISTROY_MSG_KEY, imMess, 24 * 60 * 60);
}
@Override
public List<Object> getGroupHistoryMsg(int count) {
long length = redisUtils.lGetListSize(Consts.IM_GROUP_HISTROY_MSG_KEY);
return redisUtils.lGet(Consts.IM_GROUP_HISTROY_MSG_KEY, length - count < 0 ? 0 : length - count, length);
}
总体的 im.js 如下:
if (typeof (tio) == "undefined") {
tio = {};
}
tio.ws = {};
tio.ws = function ($, layim) {
this.heartbeatTimeout = 5000;
this.heartbeatSendInterval = this.heartbeatTimeout / 2;
var self = this;
this.connect = function () {
var url = "ws://localhost:9326?userId=" + self.userId;
var socket = new WebSocket(url);
self.socket = socket;
socket.onopen = function () {
console.log("tio ws 启动 ~")
self.lastInteractionTime(new Date().getTime());
self.ping();
};
socket.onclose = function () {
console.log("tio ws 关闭 ~")
self.reconn();
};
socket.onmessage = function (res) {
console.log("接受到了消息!!")
console.log(res)
res = eval('(' + res.data + ')');
if(res.emit === 'chatMessage'){
layim.getMessage(res.data);
}
self.lastInteractionTime(new Date().getTime());
};
}
this.openChatWindow = function () {
$.ajax({
url: "/chat/getMineAndGroupData",
async: false,
success: function (res) {
self.group = res.data.group;
self.mine = res.data.mine;
self.userId = self.mine.id;
}
});
console.log(self.group);
console.log(self.mine);
var cache = layui.layim.cache();
cache.mine = self.mine;
layim.chat(self.group);
layim.setChatMin();
}
this.initHistoryMess = function () {
localStorage.clear();
$.ajax({
url: '/chat/getGroupHistoryMsg',
success: function (res){
var data = res.data;
if(data.length < 1){
return;
}
for(var i in data){
layim.getMessage(data[i])
}
}
});
}
this.sendMessage = function (mess) {
self.socket.send(JSON.stringify({
type: 'chatMessage'
,data: mess
}));
}
this.lastInteractionTime = function () {
if (arguments.length == 1) {
this.lastInteractionTimeValue = arguments[0]
}
return this.lastInteractionTimeValue
}
this.ping = function () {
console.log("------------->准备心跳中~");
self.pingIntervalId = setInterval(function () {
var iv = new Date().getTime() - self.lastInteractionTime();
if ((self.heartbeatSendInterval + iv) >= self.heartbeatTimeout) {
self.socket.send(JSON.stringify({
type: 'pingMessage'
,data: 'ping'
}))
console.log("------------->心跳中~")
}
}, self.heartbeatSendInterval)
};
this.reconn = function () {
clearInterval(self.pingIntervalId);
self.connect();
};
}
示例
登录
注册
首页
个人中心
搜索
聊天
聊天历史
博客详情
博客编辑
|