IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【Sprint Boot论坛项目】5、Kafka,构建TB级异步消息系统 -> 正文阅读

[大数据]【Sprint Boot论坛项目】5、Kafka,构建TB级异步消息系统

这一章主要解决发送系统级消息或通知的问题

1 阻塞队列

在这里插入图片描述

2 Kafka入门

在这里插入图片描述
http://kafka.apache.org/
Broker:Kafka中的每台服务器记为一个Broker
Zookeeper:用来管理集群
Topic:主题,生产者-消费者模式是“发布订阅”模式,生产者发布消息的地方就是Topic
Partition:分区,对主题的消息分区,可以增强服务器的并发能力(如上图右侧)
Offset:消息在分区内 存放的索引
Replication:副本,对消息备份,Kafka是分布式消息引擎
Leader Replic主副本,可以响应查询

3 Spring整合Kafka

在这里插入图片描述
生产者发消息是我们主动调用,消费者取消息是自动的

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;
    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test","你好!");
        kafkaProducer.sendMessage("test","hello!");
        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
@Component
class KafkaProducer{
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic,String content){
        kafkaTemplate.send(topic, content);
    }
}

@Component
class KafkaConsumer{
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }
}

【此处运行时电脑的小电扇嗷嗷吹然后运行失败了,提示找不到topic,我开动我的小脑袋瓜子想是不是因为我把启动Kafka的命令行关掉了?打开之后再运行就成功了~
开启Kafka的命令:(两个窗口都在安装目录下)

bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties

不想打字的话直接把文件拖到窗口即可

4 发送系统通知

在这里插入图片描述
系统发布通知是非常频繁的行为,重点要考虑性能问题
要用到Kafka的消息队列解决问题
为什么要用消息队列解决问题呢?
评论、点赞、关注是三类不同的通知,可以用三个不同的主题,某件事发生后,就把消息扔到队列里,当前线程(生产者)就可以去处理别的事情。后续的业务由消费者处理。
这种并发方式为异步方式

业务角度:以事件为主体来解决

4.1 封装事件(实体)

知识点:
1.private Map<String, Object> data = new HashMap<>();//实体最后有个map属性,使程序具有扩展性
2.set方法将无返回值改为返回自身,可以循环调用

public Event setTopic(String topic) {//返回自身可以循环调用
        this.topic = topic;
        return this;
    }

代码:

package com.nowcoder.community.entity;

import java.util.HashMap;
import java.util.Map;

public class Event {
    private String topic;//不同的类型:评论、点赞、关注
    private int userId;//事件触发的人
    private int entityType;
    private int entityId;//被触发的实体对象
    private int entityUserId;//实体作者
    private Map<String, Object> data = new HashMap<>();//具有扩展性

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {//返回自身可以循环调用
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key,Object value) {
        this.data.put(key, value);
        return this;
    }
}

4.2 写生产者与消费者

@Component
public class EventProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    //处理事件
    public void fireEvent(Event event){
        //将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }
}
@Component
public class EventConsumer implements CommunityConstant {
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
    @Autowired
    private MessageService messageService;

    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
        if(record == null || record.value()==null){
            logger.error("消息的内容为空");
            return;
        }
        Event event = JSONObject.parseObject(record.value().toString(),Event.class);
        if(event==null){
            logger.error("消息格式错误");
            return ;
        }
        //发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        Map<String, Object> content = new HashMap<>();
        content.put("userId", event.getUserId());
        content.put("entityType", event.getEntityType());
        content.put("entityId", event.getEntityId());

        if(!event.getData().isEmpty()){
            for(Map.Entry<String,Object> entry : event.getData().entrySet()){
                content.put(entry.getKey(), entry.getValue());
            }
        }

        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);
    }
}

4.3 在对应事件发生时调用生产者(Controller里修改)

调用生产者即:触发事件,并将事件扔到消息队列

4.4 启动

Kafka如果启动报错什么锁死,把kafka-logs删掉再重启
在这里插入图片描述
报错了,看第一句话报的什么错
在这里插入图片描述
往下拉看到自己编写的程序,点进去看报什么错

5 显示系统通知

在这里插入图片描述

5.1 通知列表

数据层-业务层-视图层-动态模板,分别加方法

5.2 通知详情

5.3 未读消息

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:17:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:54:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码