IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka开源代码阅读学习之旅(三) - 从一个DEMO入手 -> 正文阅读

[大数据]Kafka开源代码阅读学习之旅(三) - 从一个DEMO入手

一、Producer-生产者

在这里插入图片描述

1.producer类

public class Producer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;
    /**
     * 构建方法,初始化生产者对象
     * @param topic
     * @param isAsync
     */
    public Producer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        //  指定kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        //   client.id一般不做设置
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        //  设置序列化的类。 数据传输的过程中需要进行序列化,消费者获取数据需要反序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //todo: 初始化KafkaProducer对象
        producer = new KafkaProducer<>(props);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public void run() {
        int messageNo = 1;
        // todo: 一直会往kafka发送数据
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            //isAsync , kafka发送数据的时候,有两种方式
            //todo:1: 异步发送   isAsync=true
            //todo:2: 同步发送   isAsync=false
            if (isAsync) { // Send asynchronously
                //todo:异步发送
                //这样的方式,性能比较好,我们生产代码用的就是这种方式。
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    //todo:同步发送
                    //todo:发送一条消息,等这条消息所有的后续工作都完成以后才继续下一条消息的发送。
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();  //阻塞方法
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }
    }
}

二、Producer核心流程

  1. ProducerInterceptors是一个拦截器,对发送的数据进行拦截处理
  2. Serializer对消息的key和value进行序列化
  3. 通过使用分区器作用在每一条消息上,实现数据分发进入到topic不同的分区之中
  4. RecordAccumulator缓存消息,实现批量发送
  5. Sender从RecordAccumulator中获取消息
  6. 构建ClientRequest对象
  7. ClientRequest交到NetWorkClient准备发送
  8. ClientRequest将请求放入到KafkaChannel的缓存
  9. 发送请求到kafka集群
  10. Sender线程接受服务端发送的响应
  11. 执行绑定的回调函数

三、Producer初始化

1.初始化KafkaProducer对象

 private final KafkaProducer<Integer, String> producer;
 //todo: 初始化KafkaProducer对象
        producer = new KafkaProducer<>(props);

2.初始化KafkaProducer重要参数

 Properties props = new Properties();
        //  指定kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        //   client.id一般不做设置
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        //  设置序列化的类。 数据传输的过程中需要进行序列化,消费者获取数据需要反序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-17 12:03:34  更:2021-10-17 12:05:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 6:04:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码