这一章主要解决发送系统级消息或通知的问题
1 阻塞队列
2 Kafka入门
http://kafka.apache.org/ Broker:Kafka中的每台服务器记为一个Broker Zookeeper:用来管理集群 Topic:主题,生产者-消费者模式是“发布订阅”模式,生产者发布消息的地方就是Topic Partition:分区,对主题的消息分区,可以增强服务器的并发能力(如上图右侧) Offset:消息在分区内 存放的索引 Replication:副本,对消息备份,Kafka是分布式消息引擎 Leader Replic主副本,可以响应查询
3 Spring整合Kafka
生产者发消息是我们主动调用,消费者取消息是自动的
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka(){
kafkaProducer.sendMessage("test","你好!");
kafkaProducer.sendMessage("test","hello!");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer{
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic,String content){
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer{
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}
【此处运行时电脑的小电扇嗷嗷吹然后运行失败了,提示找不到topic,我开动我的小脑袋瓜子想是不是因为我把启动Kafka的命令行关掉了?打开之后再运行就成功了~】 开启Kafka的命令:(两个窗口都在安装目录下)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
不想打字的话直接把文件拖到窗口即可
4 发送系统通知
系统发布通知是非常频繁的行为,重点要考虑性能问题 要用到Kafka的消息队列解决问题 为什么要用消息队列解决问题呢? 评论、点赞、关注是三类不同的通知,可以用三个不同的主题,某件事发生后,就把消息扔到队列里,当前线程(生产者)就可以去处理别的事情。后续的业务由消费者处理。 这种并发方式为异步方式
业务角度:以事件为主体来解决
4.1 封装事件(实体)
知识点: 1.private Map<String, Object> data = new HashMap<>(); //实体最后有个map属性,使程序具有扩展性 2.set方法将无返回值改为返回自身,可以循环调用
public Event setTopic(String topic) {
this.topic = topic;
return this;
}
代码:
package com.nowcoder.community.entity;
import java.util.HashMap;
import java.util.Map;
public class Event {
private String topic;
private int userId;
private int entityType;
private int entityId;
private int entityUserId;
private Map<String, Object> data = new HashMap<>();
public String getTopic() {
return topic;
}
public Event setTopic(String topic) {
this.topic = topic;
return this;
}
public int getUserId() {
return userId;
}
public Event setUserId(int userId) {
this.userId = userId;
return this;
}
public int getEntityType() {
return entityType;
}
public Event setEntityType(int entityType) {
this.entityType = entityType;
return this;
}
public int getEntityId() {
return entityId;
}
public Event setEntityId(int entityId) {
this.entityId = entityId;
return this;
}
public int getEntityUserId() {
return entityUserId;
}
public Event setEntityUserId(int entityUserId) {
this.entityUserId = entityUserId;
return this;
}
public Map<String, Object> getData() {
return data;
}
public Event setData(String key,Object value) {
this.data.put(key, value);
return this;
}
}
4.2 写生产者与消费者
@Component
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void fireEvent(Event event){
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}
}
@Component
public class EventConsumer implements CommunityConstant {
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
@Autowired
private MessageService messageService;
@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record){
if(record == null || record.value()==null){
logger.error("消息的内容为空");
return;
}
Event event = JSONObject.parseObject(record.value().toString(),Event.class);
if(event==null){
logger.error("消息格式错误");
return ;
}
Message message = new Message();
message.setFromId(SYSTEM_USER_ID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
Map<String, Object> content = new HashMap<>();
content.put("userId", event.getUserId());
content.put("entityType", event.getEntityType());
content.put("entityId", event.getEntityId());
if(!event.getData().isEmpty()){
for(Map.Entry<String,Object> entry : event.getData().entrySet()){
content.put(entry.getKey(), entry.getValue());
}
}
message.setContent(JSONObject.toJSONString(content));
messageService.addMessage(message);
}
}
4.3 在对应事件发生时调用生产者(Controller里修改)
调用生产者即:触发事件,并将事件扔到消息队列
4.4 启动
Kafka如果启动报错什么锁死,把kafka-logs删掉再重启 报错了,看第一句话报的什么错 往下拉看到自己编写的程序,点进去看报什么错
5 显示系统通知
5.1 通知列表
数据层-业务层-视图层-动态模板,分别加方法
5.2 通知详情
5.3 未读消息
|