IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka原理1 -> 正文阅读

[大数据]kafka原理1

kafka消费者api

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MsgConsumer {
    private final static String TOPIC_NAME = "my-replicated-topic";
    private final static String CONSUMER_GROUP_NAME = "testGroup";

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
    // 消费分组名
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
    // 是否自动提交offset,默认就是true  当消费者的代码执行完 不管有没有异常都会把这次从broker中拉取的消息偏移量给提交了 如果偏移量被提交了 消费者就不会再次的消费这条消息了
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    // 自动提交offset的间隔时间  如果关闭了自动提交这个配置就会失效  这个表示两次自动提交的间隔
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

// 一般使用手动提交
    //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    /*
    当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费
    latest(默认) :只消费自己启动之后发送到主题的消息
    earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
    */
    //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  /*
  consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将
  rebalance方案下发给consumer,这个时间可以稍微短一点
  */
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
    /*
    服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组, 被踢出消费组 这个消费者就无法再消费消息 除非重启
    对应的Partition也会被重新分配给其他consumer,默认是10秒
    */
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
    //一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
    /*
    如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 也就是说当消费者从	broker中poll到消息然后执行业务代码 执行的时间超
过了30秒也就是props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);配置的时间broker会认为这个消费者的机器是垃圾就会把这个消费者
踢出消费者组 在broker只会保留性能高的消费者
    
    */
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
// key的序列号方式
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value的序列号方式
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    // 消费指定分区
    //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

    //消息回溯消费
    /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
    consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/

    //指定offset消费
    /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
    consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/

    /*从指定时间点开始消费 kafka没有根据时间消费的api 它只是根据一个时间 去查找这个时间对应的那条消息的偏移量 然后最终根据偏移量消费消息 kafka保存消息的时候 会有.log、.index、.timeindex这三个文件所有的消息都会保存在log文件中为了提升效率会把log文件中的消息每存满4kb把第4kb的那条消息分别放到index、timeindex文件中用来做索引 当使用指定时间消费 就会去timeindex文件中查找这个时间然后找到它的偏移量 然后返回 然后再根据这个偏移量去消费消息指定偏移量就是去index文件中找index、timeindex会保存这个消息的偏移量、磁盘上所在的地址这两个都是根据log文件中得到的*/
    /*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
    //从1小时前开始消费
    long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
    Map<TopicPartition, Long> map = new HashMap<>();
    for (PartitionInfo par : topicPartitions) {
        map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
    }
    Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
    for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
        TopicPartition key = entry.getKey();
        OffsetAndTimestamp value = entry.getValue();
        if (key == null || value == null) continue;
        Long offset = value.offset();
        System.out.println("partition-" + key.partition() + "|offset-" + offset);
        System.out.println();
        //根据消费里的timestamp确定offset
        if (value != null) {
            consumer.assign(Arrays.asList(key));
            consumer.seek(key, offset);
        }
    }*/

    while (true) {
        /*
         * poll() API 是拉取消息的长轮询 也就是说会循环1秒钟从broker中拉取消息 如果在1秒钟拉取到了消息就返回 如果没拉取到消息就返回空(null)
         */
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                    record.offset(), record.key(), record.value());
        }

        /*if (records.count() > 0) {
            // 手动同步提交offset,当前线程会阻塞直到offset提交成功
            // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
            consumer.commitSync();

            // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        System.err.println("Commit failed for " + offsets);
                        System.err.println("Commit failed exception: " + exception.getStackTrace());
                    }
                }
            });

        }*/
    }
}
}

spring boot整合kafka

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml配置文件

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

发送者代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

private final static String TOPIC_NAME = "my-replicated-topic";

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@RequestMapping("/send")
public void send() {
    kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
}

}

消费者代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer {

/**
 * @KafkaListener(groupId = "testGroup", topicPartitions = {
 *             @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
 *             @TopicPartition(topic = "topic2", partitions = "0",
 *                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
 *     },concurrency = "6")
 *  //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
 * @param record
 */
@KafkaListener(topics = "my-replicated-topic",groupId = "zhugeGroup")
public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String value = record.value();
    System.out.println(value);
    System.out.println(record);
    //手动提交offset
    ack.acknowledge();
}

/*//配置多个消费组
@KafkaListener(topics = "my-replicated-topic",groupId = "tulingGroup")
public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String value = record.value();
    System.out.println(value);
    System.out.println(record);
    ack.acknowledge();
}*/
}

kafka的总控制器

kafka集群中一个或者多个broker 在这之间会有一个被选举为控制器(跟leader一样为了好区分这里叫控制器) 它负责管理整个集群所有主题分区的状态(就是这个分区是不是leader或者follower)控制器的选举方式就是当kafka启动的时候会给zookeeper发送create 创建一个临时节点controller 谁先创建成功这个节点谁就是控制器 如果使用脚本同时启动多台kafka 这多台kafka机器就会同时发送create这个命令 但是zookeeper执行命令的时候是一个一个的执行 也就是说第一个创建了controller这个节点后面的都会创建失败 这也就选出了控制器 如果控制器挂了会利用zookeeper监听机制发送给其它机器的kafka然后重新创建controller节点

控制器监听的节点
控制器会监听broker的变化 /brokers/ids/这层的节点也都是临时节点根据zookeeper监听机制处理相关逻辑
监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。
从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。
更新集群的元数据信息,同步到其他普通的broker节点中

根据上面说的控制器监听的节点 当分区中的leader挂了 上面的ids也就会感知到 ids下面都是临时节点 节点挂了controller就会感知到 就可以得到这个节点的一些信息 这些信息就包含 这个节点是哪些分区的leader然后 对应的leader分区就会从isr列表中获取第一个 让它升级为leader 为什么是第一个?第一个可以说它最先跟leader数据同步完的节点 也就是说这个节点跟leader节点通信、传输效率最高让它当leader很明显是最好的 如果isr列表中的节点都挂了它会阻塞客户端 等待节点重新启动 也可以不等待unclean.leader.election.enable=false通过这个参数设置 为false等待 为true不等待 不等待就会丢失数据
这里还有一个消费者Rebalance分区分配策略这个东西 后续会写
副本进入ISR列表有两个条件:
副本节点不能产生分区,必须能与zookeeper保持会话以及跟leader副本网络连通
副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步滞后的副本,是由 replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)

消费者消费消息的偏移量(offset)记录机制

查看kafka用来保存topic的目录下会有许多以__consumer_offsets开头的文件这个文件就是用来保存偏移量 默认50个 __consumer_offsets可能会接收高并发的请求kafka默认给它分配了50个分区 当消费者提交了偏移量就会往__consumer_offsets开头的文件中存储一个key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据

通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区
公式:hash(consumerGroupId) % __consumer_offsets主题的分区数

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-14 14:08:30  更:2021-08-14 14:10:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 21:17:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码