IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 深入浅出理解kafka原理系列之:java实现kafka生产者 -> 正文阅读

[Java知识库]深入浅出理解kafka原理系列之:java实现kafka生产者

深入浅出理解kafka原理系列之:java实现kafka生产者

一、引入pom.xml依赖

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.7.2</version>
		</dependency>

二、java实现kafka生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
    private final static String TOPIC_NAME = "optics-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","PLAIN");
        //ack模式,all是最慢但最安全的
        props.put("acks", "-1");
        //失败重试次数
        props.put("retries", 0);
        //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
        props.put("batch.size", 10);
        //props.put("max.request.size",10);
        //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
        props.put("linger.ms", 10000);
        //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
        //buffer.memory要大于batch.size,否则会报申请内存不足的错误
        props.put("buffer.memory", 10240);
        //序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        //key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
        for (int i = 0; i < 10; i++) {
            RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(i), "dd:" + i)).get();
            System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + "  | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
        }
    }
}

输出如下所示:

同步方式发送消息结果:topic名称:optics-topic  | partition分区:0 | offset偏移量:6
同步方式发送消息结果:topic名称:optics-topic  | partition分区:2 | offset偏移量:8
同步方式发送消息结果:topic名称:optics-topic  | partition分区:2 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:6
同步方式发送消息结果:topic名称:optics-topic  | partition分区:0 | offset偏移量:7
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:7
同步方式发送消息结果:topic名称:optics-topic  | partition分区:0 | offset偏移量:8
同步方式发送消息结果:topic名称:optics-topic  | partition分区:0 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic  | partition分区:2 | offset偏移量:10

三、发送消息到指定分区上

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        //key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
        for (int i = 0; i < 10; i++) {
            RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, 1,Integer.toString(i), "dd:" + i)).get();
            System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + "  | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
        }

输出如下所示:

同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:10
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:11
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:12
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:13
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:14
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:15
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:16
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:17
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-12-18 15:50:19  更:2021-12-18 15:52:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 5:56:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码