IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka-消费者 -> 正文阅读

[大数据]Kafka-消费者

消费者

基本概念

  • 消费者:消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。
  • 消费组:每个消费者都有一个对应的消费者组。也只属于一个消费组。

如果所有消费者都属于同一个消费组,那么所有的消息都会被均衡地投递到每一个消费者,每一个消息只会被一个消费者处理。如果所有的消费者都属于不同消费组,那么消息就会被广播给所有的消费者。

当某个主题中分区数小于小于消费者个数,那么就会出现有的消费者不能接收到消息。

代码

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;

public class MyConsumer1 {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        Map<String,Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        // 配置一次拉取最小拉取数量。默认1B
        configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,1);
        // 默认最大量。单位B
        configs.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,500000);
        // 最大等待毫秒
        configs.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,500);
        // 每个分区最大返回数据量,单位B
        configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,500000);
        // 一次拉取最大记录数
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
        // 多久关闭闲置的连接
        configs.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,500000);
        // 是否允许通过pattern方式订阅,true-不允许,false-允许
        configs.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,false);
        // 设置Socket接收消息缓冲区的大小,单位B,-1则为操作系统默认值
        configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,65565);
        // 发送消息缓冲区大小
        configs.put(ConsumerConfig.SEND_BUFFER_CONFIG,65565);
        // 等待请求响应的最长时间
        configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
        // 元数据过期时间。单位ms
        configs.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG,60000);
        // 重连等待时间
        configs.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG,30);
        // 重新发送失败的请求到指定主题分区之前的等待时间
        configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,30);
        // 事务隔离级别  read_uncommitted  read_committed
        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_uncommitted");


        // 消费者组名字
        configs.put(ConsumerConfig.GROUP_ID_CONFIG,"consumer.demo");
        // 手动进行位移提交
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        KafkaConsumer<Integer,String> consumer = new KafkaConsumer<Integer, String>(configs);
        // 通过pattern的模式订阅主题
        final Pattern pattern = Pattern.compile("topic_[0-9]");
        // 通过集合的方式订阅多个主题
        final List<String> topics = Arrays.asList("topic_1");
        // 消费者再均衡-分区的所属权从一个消费者转移到另一个消费者的行为。
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            // 再均衡开始之前和消费者停止读取之后被调用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                collection.forEach(tp ->{
                    System.out.println("剥夺的分区:" + tp.partition());
                });
            }
            // 重新分配分区之后和消费者开始读取消费之前被调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                collection.forEach(tp->{
                    System.out.println(tp.partition());
                });
            }
        });
        final  ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
        final Iterable<ConsumerRecord<Integer,String>> topicIterable = records.records("topic_1");
        topicIterable.forEach(record ->{
            System.out.println("===============================");
            System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
            System.out.println("消息的key:" + record.key());
            System.out.println("消息的偏移量(当前消费到的位置):" + record.offset());
            System.out.println("消息的分区号:" + record.partition());
            System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
            System.out.println("消息的时间戳:" + record.timestamp());
            System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
            System.out.println("消息的时间戳类型:" + record.timestampType());
            System.out.println("消息的主题:" + record.topic());
            System.out.println("消息的值:" + record.value());
        });
        // 同步提交的方式
        consumer.commitSync();
    }
}

订阅及取消订阅方式

public interface Consumer<K, V> extends Closeable {
    // 按照主题的方式进行订阅
    void subscribe(Collection<String> topics);

    // 按照主题的方式进行订阅,并设置再均衡监听器
    void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
    // 按照Pattern进行订阅,并设置再均衡监听器
    void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
    // 按照Pattern进行订阅
    void subscribe(Pattern pattern);
    // 用于指定订阅的主题及分区集合
    void assign(Collection<TopicPartition> partitions);
    // --- 取消订阅
    void unsubscribe();
}
  • 查询某个主题的分区信息
public interface Consumer<K, V> extends Closeable {
    List<PartitionInfo> partitionsFor(String topic);
}
  • 取消订阅的其他方式
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());
  • 获取消费者所分配到的分区信息
consumer.assignment();

位移提交

  • 同步位移提交
consumer.commitSync();
  • 带参数的同步位移提交
final  ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
final Iterable<ConsumerRecord<Integer,String>> topicIterable = records.records("topic_1");
topicIterable.forEach(record ->{
    consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()),
                                     new OffsetAndMetadata(record.offset()+1)));
});
  • 按分区粒度同步提交消费位移
final  ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<Integer, String>> consumerRecords = records.records(partition);
    for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {

    }
    consumer.commitSync(Collections.singletonMap(partition,
	new OffsetAndMetadata(consumerRecords.get(consumerRecords.size()-1).offset())));
}
  • 异步提交
consumer.commitAsync();

拦截器

public interface ConsumerInterceptor<K, V> extends Configurable {
	// 消息会在调用poll方法返回之前进行拦截处理
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

   	// 提交完消费者位移之后
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    /**
     * This is called when interceptor is closed
     */
    public void close();
}
  • 配置拦截器类
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"类名");
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-08 10:48:29  更:2021-09-08 10:49:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 14:33:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码