IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka 2.8.0 JAVA API基本使用 -> 正文阅读

[大数据]Kafka 2.8.0 JAVA API基本使用

以下测试皆在windows下进行,请根据自己情况酌情配置kafka zookeeper等环境

本人使用的是jdk11,代码中可能存在jdk9的新特性,使用jdk9以前的jdk的朋友请自行转换

kafka环境变量等暂时略过

1.java导入依赖

        <!--导入kafka依赖-->
 ? ? ? ?<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.kafka</groupId>
 ? ? ? ? ? ?<artifactId>kafka-clients</artifactId>
 ? ? ? ? ? ?<version>2.8.0</version>
 ? ? ? ?</dependency>
 ? ? ? ?<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
 ? ? ? ?<dependency>
 ? ? ? ? ? ?<groupId>org.apache.kafka</groupId>
 ? ? ? ? ? ?<artifactId>kafka_2.13</artifactId>
 ? ? ? ? ? ?<version>2.8.0</version>
 ? ? ? ?</dependency>

kafka Producer

导入相关依赖后,创建测试类ProducerDemo;

  • 创建生产者对象

    使用KafkaProducer 创建kafka生产者对象,这时可以发现kafka不允许我们使用空构造来创建对象;

    那么我们就选用传入properties的方式创建kafka生产者

    创建生产者的时候,跟控制台命令一样,我们需要指定集群名称以及序列化器,而这些相关设置都会存储在我们的配置文件中;

    kafka给我们提供了ProducerConfig类,并在其中已经给我们提前准备好了我们所需要的key,在向properties中put键值时,可以直接使用producerConfig的静态常量作为key;并传入相应value

  • 向kafka中发送信息

    使用kafkaProducer向kafka中发送信息,可以使用其提供的send()方法 ;使用时可以看到其需要传入ProducerRecord以及一个可选的Callback

    ProducerRecord: 即为每条数据所封装成的对象

    CallBack:可选;获取函数的回调

  • close()

    在真实生产环境中,我们可能不需要手动调用close方法关闭kafkaProducer,但是目前的测试阶段,如果不使用close关闭,可能会导致发送的信息在设置等待的时间内,不会被真正的发送;

    流在关闭的时候会对数据进行回收操作

/**
 * 描述:kafkaProducer生产者
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID ? ? DATE ? ? ? ?  PERSON ? ? ? ?  REASON
 *  1 ? ?  2021/8/10 23:14 ?  Bambi ? ? ?  Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ProducerPartitionerDemo01 {
 ? ?public static void main(String[] args) {
        
 ? ? ? ?Properties properties = new Properties();
 ? ? ? ?//自行修改为对应的集群地址 kafka默认为9092,此处我没有更改
 ? ? ? ?properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");   
 ? ? ? ?//需要传入序列化器的全类名,kafka需要通过反射全类名去获取序列化器
 ? ? ? ?properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
 ? ? ? ?properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
?
 ? ? ? ?KafkaProducer kafkaProducer = new KafkaProducer(properties);
 ? ? ? ?for (int i = 0; i < 10; i++) {
 ? ? ? ? ? ?//使用callBack收集回调信息,使用了lamdba表达式
 ? ? ? ? ? ?kafkaProducer.send(new ProducerRecord("此处使用自己存在的主题","value"),((metadata, exception) ->{
 ? ? ? ? ? ? ? ?if(exception==null){
 ? ? ? ? ? ? ? ? ? ?System.out.println("没有错误,数据添加成功");
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  } ));
 ? ? ?  }
        //关闭
 ? ? ? ?kafkaProducer.close();
 ?  }
}

自定义分区器

如果想要自己根据业务需求编写自定义的分区规则,可以自定义分区器;

说到自定义,就势必需要去实现某个接口或者继承某个类

这里, 我们需要实现的是kafka给我们的Partitioner接口,实现后重写方法

/**
 * 描述: 自定义分区器
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID ? ? DATE ? ? ? ?  PERSON ? ? ? ?  REASON
 *  1 ? ?  2021/8/10 22:24 ?  Bambi ? ? ?  Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class MyPartitioner implements Partitioner {
?
 ? ?/**
 ? ? * 编写分区规则
 ? ? */
 ? ?@Override
 ? ?public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
 ? ? ? ?//根据业务需求编写分区规则
 ? ? ? ?return 0;
 ?  }
?
 ? ?@Override
 ? ?public void close() {
?
 ?  }
?
 ? ?/**
 ? ? * 读取配置信息
 ? ? * @param configs
 ? ? */
 ? ?@Override
 ? ?public void configure(Map<String, ?> configs) {
?
 ?  }
}

在编写规则时可以参考Kafka对Partitioner的默认实现 DefaultPartitioner

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
 ? ? ? ? ? ? ? ? ? ? ? ? int numPartitions) {
 ?      //如果key也不存在,则会对可用分区进行轮询
 ? ? ? ?if (keyBytes == null) {
 ? ? ? ? ? ?return stickyPartitionCache.partition(topic, cluster);
 ? ? ?  }
 ?      //如果没有指定分区,且存在key值,则会根据key的hash进行取模来选择分区
 ? ? ? ?// hash the keyBytes to choose a partition
 ? ? ? ?return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
 ?  }

实现同步发送

正常情况下kafka生产者发送信息采用的是异步发送的方式,主线程将信息发送给共享变量 RecordAccumulator ,Sender线程不同的从共享变量中拉取数据发送到broker上;

实现逻辑

在两个线程其中一个执行的时候去阻塞另一个线程,实现串行

我们可以发现kafkaProducer的send()方法是存在返回值 Future 的;

而我们知道,当future对象调用get()方法时,不仅会获得当前线程回调的对象,还会阻塞当前线程

我们便使用这个方法来实现同步发送

同步发送的使用场景相对较少,我们可以使用同步发送来确保区内有序,即当上一条信息发送后,未接收到ack之前,阻塞发送线程,不继续发送,从而实现有序


消费者API

编写消费者api的逻辑与生产者十分的相像,使用kafka提供的 KafkaConsumer 来创建消费者对象

并在配置文件中传递对应信息,可以使用ConsumerConfig中的静态属性充当key值

/**
 * 描述:kafka消费者
 *
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID ? ? DATE ? ? ? ?  PERSON ? ? ? ?  REASON
 *  1 ? ?  2021/8/11 0:21 ?  Bambi ? ? ?  Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ConsumerTest {
?
 ? ?public static void main(String[] args) {
 ? ? ? ?Properties properties = new Properties();
 ? ? ? ?//连接的集群
 ? ? ? ?properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 ? ? ? ?//生产者需要指定序列化器,那么消费者就需要指定对应的反序列化器
 ? ? ? ?properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
 ? ? ? ?properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
?
 ? ? ? ?//自动提交
 ? ? ? ?properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
 ? ? ? ?//自动提交的延迟,提交的是消费者的offset
 ? ? ? ?properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
?
 ? ? ? ?//消费者组
 ? ? ? ?properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroup01");
?
 ? ? ? ?//创建消费者
 ? ? ? ?KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
 ? ? ? ?//订阅主题
 ? ? ? ?//此处可以添加多个集群
 ? ? ? ?//可以看到这里没有返回值,也就是说,这里只是单纯的指定了主题,如果想获取主题中的信息,需要使用别的方法
 ? ? ? ?kafkaConsumer.subscribe(List.of("你的主题"));
?
?
 ? ? ? ?while (true){
 ? ? ? ? ? ?//获取的类型与Producer类似,不过为ConsumerRecords类,想要得到单个数据,需要遍历输出
 ? ? ? ? ? ?//新版本建议使用传入Duration的方式,直接传入毫秒数的方式以过时
 ? ? ? ? ? ?ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ZERO);
 ? ? ? ? ? ?consumerRecords.forEach(stringStringConsumerRecord -> {
 ? ? ? ? ? ? ? ?//可以看到,使用consumerRecord去调用方法的时候,可以获取到Key,所以key并不只是用来划分分区之用,如果没有指定key,会输出null
 ? ? ? ? ? ? ? ?System.out.println(stringStringConsumerRecord.key()+":"+stringStringConsumerRecord.value()+" ?  :"+stringStringConsumerRecord.offset());
 ? ? ? ? ?  });
 ? ? ?  }
 ? ? ? ?//consumer进行订阅拉去信息的时候不需要手动关闭,因为顺序执行完毕后,jvm会关闭;所以可以使用一个while循环来持续消费
 ?  }
}

启动消费者,会发现我们可以连接到对应的主题,但是不会获取到先前已经存在的信息;

在我们使用控制台调用消费者时,如果我们想获取该分区已经存在的信息,我们可以使用 --from-beginning指令将offset放到最前端从头获取;

java api中也是一样;

kafka中 命令行能做的事情,在配置文件中应该都有相关的配置

我们进入ConsumerConfig,可以查看到其已经给我们提供了 AUTO_OFFSET_RESET_CONFIG这个属性;

根据下方的doc描述,该属性默认值为lastest,这也是我们为什么在不设置的时候会无法获取已存在信息的原因,我们可以手动在配置文件中传入

earierlast

此处注意这个指定的生效条件:

  • 只有当当前消费者/消费者组第一次消费(即还没有offset时),或当前的offset在这个server中不存在时,指令才会生效

    这里解释一下为什么会不存在,kafka的数据默认时7天清空一次,如果我们拿着已经清空的数据的offset去寻找数据,就会出现offset在server中不存在的现象,此时AUTO_OFFSET_RESET_CONFIG就会生效

关于offset的手动提交

我们为什么需要手动提交? 自动提交无法保证准确的提交时机

  • 如果设置的提交延时过短,会丢是数据

  • 如果设置的延时过长,会导致数据重复

1.在配置文件中关闭自动提交

既然我们需要手动提交,则必然需要在配置文件中将自动提交置为false

ENABLE_AUTO_COMMIT_CONFIG,<----将它改成false

  1. 在消费结束后进行手动提交

    使用consumer的 commitSync() 同步提交,或commitAsync() 异步提交

    • commitSyn:

      相比于异步提交,因为其提交offset时自带失败重试的机制,相对更加可靠

    • commitAsync:

      同步提交相对可靠,但是会阻塞当先线程,影响吞吐量;

      在大多数情况下,我们会选用异步提交的方式

自定义存储offset

手动提交虽然可以解决丢是数据的问题,但是仍然会存在数据重复的现象;

kafka也早已考虑到这种情况,所以允许我们自定义存储offset的规则,(比如我们可以和MySQL的写入操作进行事务绑定...)

但是相对于自定义分区器,自定义存储offset要相对麻烦一些;在0.9版本之后,kafka会将offset暂存在kafka内置的一个主题中,想要去维护一个offset,就需要考虑到消费者的Rebalance问题

即,如果当前消费者所消费的分区挂掉了,消费者需要转移到另一分区去消费,此时的offset需要定位到这个分区最近提交的offset

为此,我们需要实现kafka提供的ConsumerRebalanceListener

/**
 * 描述: 自定义存储Offset
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID ? ? DATE ? ? ? ?  PERSON ? ? ? ?  REASON
 *  1 ? ?  2021/8/11 23:26 ?  Bambi ? ? ?  Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ConsumerConfigOffset {
 ? ?//创建一个Map在暂存当前offset
 ? ?private static Map<TopicPartition,Long> currentOffset = new ConcurrentHashMap<>();
 ? ?
 ? ?public static void main(String[] args) {
 ? ? ? ?//配置文件较为冗长,我写了个工具类进行配置,相关配置内容已经提到过,就不再赘述
 ? ? ? ?PropertiesUtils propertiesUtils = new PropertiesUtils();
 ? ? ? ?Properties properties = propertiesUtils.ConsumerProperties("localhost:9092", "bambiOffset", "false", "100", 1);
?
 ? ? ? ?KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
?
 ? ? ? ?//在此处创建ConsumerRebalanceListener类
 ? ? ? ?kafkaConsumer.subscribe(List.of("solo1"), new ConsumerRebalanceListener() {
 ? ? ? ? ? ?//在Rebalance之前调用
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 ? ? ? ? ? ? ? ?commitOffset(currentOffset);
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?//在Rebalance之后调用
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
 ? ? ? ? ? ? ? ?currentOffset.clear();
 ? ? ? ? ? ? ? ?partitions.forEach(partition -> {
 ? ? ? ? ? ? ? ? ? ?//定位到分区中最近的offset,继续消费
 ? ? ? ? ? ? ? ? ? ?kafkaConsumer.seek(partition,getOffset(partition));
 ? ? ? ? ? ? ?  });
 ? ? ? ? ?  }
 ? ? ?  });
?
 ? ? ? ?while (true){
 ? ? ? ? ? ?ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofMillis(1000));
 ? ? ? ? ? ?poll.forEach(consumerRecord ->{
 ? ? ? ? ? ? ? ?System.out.printf("offset =  %d %n",consumerRecord.offset());
 ? ? ? ? ? ? ? ?System.out.printf("key = %s %n",consumerRecord.key());
 ? ? ? ? ? ? ? ?System.out.printf("value = %s %n",consumerRecord.value());
?
 ? ? ? ? ? ? ? ?//将下标缓存到offset中
 ? ? ? ? ? ? ? ?currentOffset.put(new TopicPartition(consumerRecord.topic(),consumerRecord.partition()),consumerRecord.offset());
 ? ? ? ? ?  });
 ? ? ? ? ? ?commitOffset(currentOffset);
 ? ? ?  }
 ?  }
?
 ? ?/**
 ? ? * 提交当前offset
 ? ? * @param currentOffset
 ? ? */
 ? ?private static void commitOffset(Map<TopicPartition , Long> currentOffset){
 ? ? ? ?//处理异步提交的业务逻辑
 ?  }
?
 ? ?//获取当前分区的offset
 ? ?private static long getOffset(TopicPartition partition){
 ? ? ? ?return 0;
 ?  }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-15 15:39:36  更:2021-08-15 15:40:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:14:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码