IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka,异步消息系统 -> 正文阅读

[大数据]Kafka,异步消息系统

牛客社区网站系统消息发送

阻塞队列

  • BlockingQueue(java接口)
    解决线程通信的问题
    阻塞方法:put、take;阻塞时不会占用系统资源
  • 生产者消费者模式
  • 实现类:
    ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue、DelayQueue;

Kafka入门

kafka是分布式的流媒体平台,应用:消息系统,日志收集,用户行为追踪,流式处理。
kafka特点:高吞吐量,消息持久化,高可靠性(分布式保证),高扩展性。
(对硬盘的读取,顺序读取甚至比对内存的随机读取要快,kafka顺序读取硬盘,保证高吞吐量,高速率)
消息队列两种方式:点对点、发布订阅模式(kafka采用)
Kafka术语:

  • Broker(服务器),Zookeeper(用于管理集群)
  • Topic(理解为一个文件夹,生产者把消息发布到的位置)、Partition(对topic的分区)、offset(消息在分区内存放的索引)
  • Leader Replica(主副本)、Follower Replica(从副本,只是用来备份)

cd到kafka相应的目录下
启动zookeeper服务器:bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动kafka服务器: bin\windows\kafka-server-start.bat config\server.properties

kafka出错,把kafka-log文件夹删除重新启动试试

Spring整合Kafka

  • 引入依赖 :spring-kafka
  • 配置kafka:配置server、consumer
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
# 是否自动提交消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true 
# 自动提交的频率
spring.kafka.consumer.auto-commit-interval=3000
  • 访问kafka
    //生产者,生产者发消息是主动调用方法去发送的
    kafkaTemplate.send(topic,data) 
    
    //消费者,监听test的topic,阻塞的读
    @kafkaListener(topics = {"test"})  
    // 会把读取的消息封装成ConsumerRecord,从record读原始消息
    public void handlMessage(ConsumerRecord record){}

服务器启动时,Spring Kafka会自动创建主题。但是测试代码中不会,所以测试代码中你需要用的主题,需要手动创建。

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTest {
    @Autowired
    private kafkaProducer kafkaProducer;

    @Test
    public void testkafka(){
        kafkaProducer.senMessage("test","你好");
        kafkaProducer.senMessage("test","在哪");

        try{
            Thread.sleep(1000*10);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
@Component
class kafkaProducer{
    @Autowired
    private KafkaTemplate kafkaTemplate;
    // 发送的主题和内容
    public void senMessage(String topic,String content){
        kafkaTemplate.send(topic,content);
    }
}
@Component
class kafkaConsumer{

    @KafkaListener(topics = {"test"})
    public void handler(ConsumerRecord record){
        System.out.println(record.value());
    }
}

发送系统通知

评论、点赞、关注等产生的事件发生后,消费者线程异步的从消息队列中取出处理,提高并发性

  • 触发事件
    评论后,发布通知
    点赞后,发布通知
    关注后,发布通知
//在添加评论的方法中加上如下代码,评论后调用发布信息

//触发评论事件
        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityUserId(comment.getEntityId())
                .setData("postId",discussPostId);
        // 评论的实体的作者EntityUser,帖子的作者,评论的作者,在不同的表中,分开来查
        if(comment.getEntityType() == ENTITY_TYPE_POST){ //帖子的评论
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }

        eventProducer.fireEvent(event);
  • 处理事件
    封装事件对象
    开发事件的生产者
    开发事件的消费者

消费者监听事件,然后把事件存放进入数据库中

 //消费评论、点赞、关注事件
    @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }

        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        // 发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        Map<String, Object> content = new HashMap<>();
        content.put("userId", event.getUserId());
        content.put("entityType", event.getEntityType());
        content.put("entityId", event.getEntityId());

        //遍历map对象,再把key value放到content中
        if (!event.getData().isEmpty()) {
            for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
                content.put(entry.getKey(), entry.getValue());
            }
        }

        //把content转为JSON字符串
        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);
    }

  • 通知列表
    显示评论、点赞、关注三种类型的通知
  • 通知详情
    分页显示某一类主题所包含的通知
  • 未读消息
    在页面头部显示所有的未读消息数量
 	// 查询某个主题下最新的通知
    Message selectLatestNotice(@Param("userId")int userId, @Param("topic")String topic);

    // 查询某个主题所包含的通知数量
    int selectNoticeCount(@Param("userId")int userId, @Param("topic")String topic);

    // 查询未读的通知的数量
    int selectNoticeUnreadCount(@Param("userId")int userId,@Param("topic") String topic);
 
    <select id="selectLatestNotice" resultType="Message">
        select <include refid="selectFields"></include>
        from message
        where id in (
	        select max(id) from message
	        where status != 2
	        and from_id = 1
	        and to_id = #{userId}
	        and conversation_id = #{topic}
        )
    </select>

    <select id="selectNoticeCount" resultType="int">
        select count(id) from message
        where status != 2
        and from_id = 1
        and to_id = #{userId}
        and conversation_id = #{topic}
    </select>

    <select id="selectNoticeUnreadCount" resultType="int">
        select count(id) from message
        where status = 0
        and from_id = 1
        and to_id = #{userId}
        <if test="topic!=null">
            and conversation_id = #{topic}
        </if>
    </select>
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-14 10:59:11  更:2021-07-14 11:01:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/3 6:41:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码