IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 7.Kafka生产者API -> 正文阅读

[大数据]7.Kafka生产者API

环境准备

  • java环境
  • kafka环境
  • kafka-clients jar包
    或者依赖:
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>

Kafka API

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
在这里插入图片描述
需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

package com.huazai.zookeeper.example;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @author pyh
 * @date 2021/7/25 12:43
 */
public class CustomProducer {
    private final static String TOPIC = "TEST_TOPIC";

    private final static Integer COUNT = 100;

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        // kafka连接地址,多个地址用“,”隔开
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.132:9092,192.168.64.132:9093,192.168.64.132:9094");
        // 应答策略,all相当于-1
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // 序列化key所用到的类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 序列化value所用到的类
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        CountDownLatch countDownLatch = new CountDownLatch(COUNT);
        for (int i = 0; i < COUNT; i++) {
            producer.send(new ProducerRecord<>(TOPIC, "TEST_TOPIC message:" + String.valueOf(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    try {
                        // 回调一遍倒数一遍,直至循环完才让主线程继续往下走
                        countDownLatch.wait();
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            });
        }
        // 由于kafka是异步发送的,需要等到发送回调完成之后才能让程序结束,否则程序提前结束会导致发送失败。
        countDownLatch.await();
        System.out.println("发送完毕!");
        producer.close();

    }
}

启动程序之前先执行消费者命令监控消息消费情况。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic TEST_TOPIC

启动程序,客户端监控结果如下:
在这里插入图片描述

遇到的问题

连接超时

在这里插入图片描述
解决方案:

  1. 在kafka/config目录下的server.properties配置advertised.listeners或listeners的ip地址需与kafka所在主机的hostname保持一致)
    在这里插入图片描述
  2. 网络问题。检查/etc/hosts中的主机ip映射的hostname与配置的listeners中的hostname是否保持一致。
    在这里插入图片描述
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:11:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年3日历 -2024/3/29 19:19:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码