IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> KafkaApis处理FetchRequest请求源码解析 -> 正文阅读

[大数据]KafkaApis处理FetchRequest请求源码解析

KafkaApis模块是Kafka中负责不同业务请求的具体实现逻辑,本文主要讲一下KafkaApis处理FetchRequest请求的流程。

当状态为Follower的Replica向状态为Leader的Replica同步数据或者消费者获取数据时,Replica会发送FetchRequest给Leader所在的Broker Server,Broker Server在接收到FetchRequest请求时,会返回相应的数据,同时还会根据情况更新对应的元数据。其详细的过程如下:

def handleFetchRequest(request: RequestChannel.Request) {
    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
    //根据fetchRequest获取指定的数据
    val dataRead = replicaManager.readMessageSets(fetchRequest)

    //如果更新请求是来自follower,还需要更新元数据详细以及响应那些延迟的DelayedFetch和DelayedProduce
    if(fetchRequest.isFromFollower)
      recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))

    // 统计获取的数据量
    val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
    // 统计异常
    val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
      errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
    /* 如果发生以下几种情况,立即响应 
        1) 请求方不希望等待
        2) fetchRequest不想要任何数据
        3) 已经获取到了足够的数据
        4) 获取数据出现异常
    if(fetchRequest.maxWait <= 0 ||
       fetchRequest.numPartitions <= 0 ||
       bytesReadable >= fetchRequest.minBytes ||
       errorReadingData) {
     
      val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
    } else {
      
      //如果没有获取到足够的数据,此时不会立即返回相应,而是采用Purgatory策略延迟相应
      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq
      val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
        dataRead.mapValues(_.offset))

      
      val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)
      if (satisfiedByMe)
        fetchRequestPurgatory.respond(delayedFetch)
    }
  }

对于Broker Server处理FetchRequest请求的过程中,有两个重要的步骤:1)如果是来自Follower的请求,如果更新元数据信息,2)DelayedFetch是如何判断满足条件的。

对于前者主要更新以下几个方面的元数据:1)更新对于Replica的LEO(LogEndOffset),即记录该Replica的当前日志结束偏移量。2)更新Leader Replica 的HighWatermark,3)可能需要扩大ISR列表。更新完数据之后,就去检查该Broker Server上的DelayedProduce是否满足返回条件,因为发生了数据同步,导致DelayedProduce得到了更多的Replica认同,其过程如下:

private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
   
    offsets.foreach {
      case (topicAndPartition, offset) =>
        /*更新replica的的LEO,更新leader replica的highwatermark,更新isr,
          如果Leader replica的highwatermark发生变化,则unblock之前阻塞住的
        DelayedFetchRequest和DelayedProduceRequest
        */
        replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
          topicAndPartition.partition, replicaId, offset)

       /*如果之前的ProduceRequest的ack >1,我们需要检查是否可以unblocked阻塞的 
           DelayedFetchRequest
       */
        replicaManager.unblockDelayedProduceRequests(topicAndPartition)
    }
  }

对于后者主要是判断当前是否有足够的数据可以拉取,如果有足够的数据可以拉取,则相应那些DelayedProduceRequest,其详细过程如下:

def isSatisfied(replicaManager: ReplicaManager) : Boolean = {
    var accumulatedSize = 0
    val fromFollower = fetch.isFromFollower
    partitionFetchOffsets.foreach {
      case (topicAndPartition, fetchOffset) =>
        try {
          if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
            //获取leader replica
            val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
            /*如果是来自follower replica,则可以同步leader replica的LEO之前的所有数据。
              如果是来自消费者,则只同步leader replica的highwatermark之前的所有数据
            val endOffset =
              if (fromFollower)
                replica.logEndOffset
              else
                replica.highWatermark

            if (endOffset.offsetOnOlderSegment(fetchOffset)) {
               /*此时代表Follower replica正在向被截取的Leader Replica拉取数据,
              
              需要立即返回response
               */
              return true
            } else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
              /*此时代表Follower replica正在拉取Leader Replica旧的segment上的数据,
              
              说明follower replica已经落后太多,需要立即返回response
               */
            
              return true
            } else if (fetchOffset.precedes(endOffset)) {
              //在当前segment上,并且fetchoffset < endoffset,说明有数据可以获取
              accumulatedSize += endOffset.positionDiff(fetchOffset)
            }
          }
        } catch {
          case utpe: UnknownTopicOrPartitionException => // Case A
            return true
          case nle: NotLeaderForPartitionException =>  // Case B
            return true
        }
    }

    // 统计可以拉取的数据量是否满足要求
    accumulatedSize >= fetch.minBytes
  }

在kafka集群正常运行过程中,fetchOffset和endOffset都位于当前Segment上,因此主要统计可以fetch的数据量是否满足最小数据量的要求,只要满足,就可以响应那些DelayedFetchRequest。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-07 13:47:24  更:2022-02-07 13:49:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 0:21:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码