IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> KAFKA 海量吞吐低延迟技术解密:KafkaController -> 正文阅读

[大数据]KAFKA 海量吞吐低延迟技术解密:KafkaController

1、导读

KAFKA?是基于?Scala?语言开发的一个多分区、多副本且基于?ZooKeeper?协调的分布式消息系统,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用,越来越多的开源分布式处理系统如?Cloudera、Storm、Spark、Flink?等都支持与?KAFKA?集成。本文将基于?KAFKA?v1.1.0?版本源码,探讨?KafkaController?的启动流程、选举流程、脑裂问题和事件队列模型。笔者水平有限,若有不当之处,敬请指正。

2、Controller?是什么

很多时候业务为了保证?7x24?小时提供服务,假如一台机器宕机,仍然能有其它机器顶替它继续工作,一般采用?Leader/Follower?模式,即常说的主从模式。正常情况下主机提供服务,备机只负责监听主机状态,当主机故障时,业务可以自动切换到备机继续提供服务,而在切换过程中从备机中选出主机的过程就是?Leader?选举。

KAFKA?集群的多个服务代理节点(节点,Broker)中,有一个?Broker?会被选举为控制器(Controller)。Controller?负责管理整个集群中所有分区和副本的状态变化,并时刻检测集群状态的变化,例如?Broker?故障、分区的?Leader?副本选举和分区重分配。每个?KAFKA?集群只有一个?Controller,以维护集群的单一一致视图。虽然?Controller?成为单点,但?KAFKA?有处理单点故障的机制。

3、Controller?的启动流程

KAFKA?Server?启动时会做一系列的初始化,例如:初始化?ZooKeeper、启动?KafkaController、启动?GroupCoordinator、启动?ReplicaManager?等等。

3.1、初始化?ZooKeeper

初始化?ZooKeeper?主要是建立连接,注册监听器,其中值得关注的是在初始化原生ZooKeeper对象的同时会注册一个全局的事件监听器(Watcher),负责监听ZK节点数据变更、ZK的会话状态等等。

// sessionTimeoutMs: server.properties 配置文件中的 zookeeper.session.timeout.ms
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)

ZooKeeperClientWatcher?负责监听ZK节点数据变更、会话状态等等。

private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
    override def process(event: WatchedEvent): Unit = {
      debug(s"Received event: $event")
      Option(event.getPath) match {
        case None =>
          val state = event.getState
          stateToMeterMap.get(state).foreach(_.mark())
          inLock(isConnectedOrExpiredLock) {
            isConnectedOrExpiredCondition.signalAll()
          }
          if (state == KeeperState.AuthFailed) {
            error("Auth failed.")
            stateChangeHandlers.values.foreach(_.onAuthFailure())
          } else if (state == KeeperState.Expired) {
            // 监听Broker与ZK的会话过期事件
            scheduleSessionExpiryHandler()
          }
        case Some(path) =>
          // 监听ZK节点状态,将事件分发至相应的Handler
          (event.getType: @unchecked) match {
            case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
            case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
            case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
            case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
          }
      }
    }
  }

其中?zNodeChangeHandlers、zNodeChildChangeHandlers?均是?ConcurrentHashMap?结构,在后续的?KAFKA?的启动过程中会将相应的?Handler?添加进来,ZK节点监听器监听到节点发生变化后,KafkaController?负责从?ConcurrentHashMap?结构中根据?ZK?节点路径取出对应的处理器(Handler)处理事件。

private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala

3.2、启动内存队列异步事件管理器

所有?Broker?会定期检查?ZooKeeper?以表明自己是健康的。如果?Broker?的响应时间超过?zookeeper.session.timeout.ms(默认18,000?毫秒),则意味着?Broker?与?ZooKeeper?的会话过期,随后将触发?Controller?选举。

eventManager?的对象类型是?ControllerEventManager,用于统一管理监听器触发事件,当ZK监听到节点变化后,其运用单线程方式从内存队列中取出对应的?Handler?处理事件。「Controller?的事件队列模型」章节会展开讲述。

def startup() = {
    zkClient.registerStateChangeHandler(new StateChangeHandler {
      override val name: String = StateChangeHandlers.ControllerHandler
      override def beforeInitializingSession(): Unit = {
        // 初始化 KafkaController 和 ZK 的会话信息
        // 1、重置内存中的ActiveControllerId
        // 2、重置内存中的依赖对象信息,例如监听器触发的事件、上下文信息
        // 3、重置Kafka Broker状态
        val expireEvent = new Expire
        eventManager.clearAndPut(expireEvent)
        expireEvent.waitUntilProcessed()
      }
      override def afterInitializingSession(): Unit = {
        // 事件管理器添加「注册Broker和选举Controller」事件
        // 1、ZK注册Broker信息
        // 2、选举Controller
        eventManager.put(RegisterBrokerAndReelect)
      }
    })

    // 事件管理器添加KafkaController的启动事件
    eventManager.put(Startup)
    // 单线程启动事件管理器
    eventManager.start()
  }

3.3、选举?Controller

每个?Broker?启动过程中,都会尝试在?ZK?创建?/controller?临时节点,也有可能其它?Broker?同时尝试创建,只有创建成功的那个?Broker?才会成为?Controller,而创建失败的?Broker?选举失败。「Controller?的选举流程」章节会展开讲述。

case object Startup extends ControllerEvent {
		...
    override def process(): Unit = {
      // 1、/controller 节点添加ControllerChangeHandler,监听Controller的变更事件
      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
      // 2、选举Controller
      elect()
    }
  }

3.3、初始化上下文信息

Controller?在选举成功后会读取?ZK?中各节点的数据来完成一系列的初始化工作,包括初始化上下文信息(从ZK读取ControllerEpoch、Topic、分区、副本相关的各种元数据信息)、启动并管理分区和副本的状态机、定义监听器触发事件、定义周期性任务触发事件(如分区自平衡)。不管是监听器触发的事件,还是周期性任务触发事件,都会读取或更新?Controller?中的上下文信息。

private def onControllerFailover() {
    // 1、读取 /controller_epoch 的值到 ControllerContext(上下文信息)
    // 2、递增 ControllerEpoch,并更新到 /controller_epoch
    readControllerEpochFromZooKeeper()
    incrementControllerEpoch()

    // 3、注册 ZK 监听器触发事件(逻辑上基于事件队列模型实现,并非真正在ZK上增加Watcher)
    //    - Broker 相关变化的事件处理器
    //    - Topic 相关变化的事件处理器
    //    - ISR 集合变更的事件处理器
    //    - 优先副本选举的事件处理器
    //    - 分区重分配的事件处理器
    val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
      isrChangeNotificationHandler)
    childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
    val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
    nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)

    ...

    // 4、初始化上下文信息,ControllerContext用于维护Topic、分区和副本相关的各种元数据,
    //
    initializeControllerContext()

    ...
  }
private def initializeControllerContext() {
    // 读取 /brokers/ids,初始化可用 Broker 集合
    controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
    // 读取 /broker/topics,初始化集群中所有 Topic 集合
    controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
    // 所有 Topic 对应ZK中 /brokers/topics/<topic> 节点添加PartitionModificationsHandler,用于监听Topic中的分区分配变化
    registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
    // 读取 /broker/topics/<topic>/partitions,初始化每个Topic每个分区的AR集合
    controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
    controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
    controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
    // 所有可用 Broker 对应ZK中 /brokers/ids/<id> 节点添加BrokerModificationsHandler,用于监听Broker增减的变化
    registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
    // 初始化每个分区的 Leader、ISR 集合等信息
    updateLeaderAndIsrCache()
    // 启动 ControllerChannelManager
    startChannelManager()
    // 读取 /admin/reassign_partitions,初始化需要进行副本重分配的分区
    initializePartitionReassignment()
	...
}

KafkaController?相关的?ZooKeeper?路径如下:

  • /controller:存放?Controller?的信息。记录?Controller?所在的?brokerid,节点类型是临时节点,与ZK会话失效时自动删除。ControllerChangeHandler?监听?Controller?的变更事件;

  • /brokers/ids:存放集群中所有可用的?Broker。BrokerChangeHandler?负责处理?Broker?的增减事件;

  • /brokers/topics:存放集群中所有的?Topic。TopicChangeHandler?负责处理?Topic?的增减事件;

  • /admin/delete_topics:存放删除的?Topic。TopicDeletionHandler?负责处理?Topic?的删除事件;

  • /brokers/topics/<topic>/partitions:存放?Topic?的分区。PartitionModificationsHandler?负责处理分区分配事件;

  • /brokers/topics/<topic>/partitions/<partition>/state:存放的是分区的?Leader?副本和?ISR?集合信息。PartitionReassignmentIsrChangeHandler?负责在副本分配到分区时,需要等待新副本追上Leader?副本后才能执行后续操作;

  • /admin/reassign_partitions:存放重新分配AR的路径,通过命令修改AR时会写入此路径。PartitionReassignmentHandler?负责执行分区重新分配;

  • /admin/preferred_replica_election:存放需要选举副本?Leader?的分区信息。PreferredReplicaElectionHandler?负责对指定分区进行有限副本选举。

4、Controller?的选举流程

KafkaController?触发选举的时机有三个:

  • KafkaController?启动时;

  • KafkaController?与?ZooKeeper?的会话过期;

  • ZooKeeper?中?/controller?临时节点不存在时;

每个?Broker?启动过程中,都会尝试在?ZK?创建?/controller?临时节点,同时也有可能其它?Broker?尝试创建,但只有创建成功的那个?Broker?才会成为?Controller,而创建失败的?Broker?选举失败。图?4-1?展示了?Broker0?成功创建临时节点并写入?Controller?信息;图?4-2?根据展示了当?Broker0?竞选成为?Controller,而?Broker1?和?Broker?2?竞选失败后作为?Follower?后不工作;

图?4-1

图?4-2

每个?Broker?都会在内存中保存当前?Controller?的?brokerid?的值,标识为?activeControllerId。

private def elect(): Unit = {
	...
	// 步骤1、获取ZK的/controller临时节点中brokerid的值,若节点不存在则取-1
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    }

    try {
	  // 步骤2:尝试在ZK中创建/controller临时节点,若创建成功则竞选成为Controller
      zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
	  info(s"${config.brokerId} successfully elected as the controller")
      activeControllerId = config.brokerId
	  // 步骤3:更新/controller_epoch等信息
	  onControllerFailover()
    } catch {
      ...
    }
}

private def onControllerFailover() {
    // 步骤4、读取 /controller_epoch 的值到 ControllerContext(上下文信息)
    // 步骤5、递增 ControllerEpoch,并更新到 /controller_epoch
    info("Reading controller epoch from ZooKeeper")
    readControllerEpochFromZooKeeper()
    info("Incrementing controller epoch in ZooKeeper")
    incrementControllerEpoch()
    info("Registering handlers")

	...
}

4.1、旧版的脑裂问题

早期?KAFKA?版本(例如?0.8.x)的脑裂问题?ISSUE:KAFKA-Split-Brain

上文介绍过,KAFKA?是依赖?ZooKeeper?来做?Leader/Follower?HA:每个节点都尝试注册一个象征??Leader?的临时节点,其他未注册成功的则成为?Follower,并且通过监听器机制监听着?Leader?所创建的临时节点,ZooKeeper?通过内部心跳机制来确定?Leader?的状态,一旦?Leader?出现故障,ZooKeeper?能很快获悉并且通知其他的?Follower,其他?Follower?尝试竞选,这样就完成了一个新的?Leader?切换。但值得注意的是,短暂的时间内可能由于?Leader?的?GC?时间过长或者?ZooKeeper?节点间网络抖动导致心跳超时(Leader?假死),ZooKeeper?与?Leader?的会话超时随后?ZooKeeper?通知剩余的?Follower?重新竞选,Follower?中就有一个成为了“新Leader”,但“旧Leader”并未故障,此时可能有一部分的客户端已收到?ZooKeeper?的通知并连接到“新Leader”,有一部分客户端仍然连接在“旧Leader”,如果同时两个客户端分别对新旧?Leader?的同一个数据进行更新,就会出现很严重问题。

ZooKeeper?会话过期后通知?Follower?重新竞选的代码入口:kafka.zookeeper.ZooKeeperClient#scheduleSessionExpiryHandler

图?4-3?展示了一种可能发生脑裂过程,Broker?0?是?Controller,但由于负载过高发生长时间?FGC,耗时超过?zookeeper.session.timeout.ms?导致与?ZooKeeper?的心跳超时,ZooKeeper?监听到会话过期事件并通知?Broker?1?和?Broker?2,Broker?1?竞选成功成为新的?Leader(Controller),Broker?2?竞选失败继续成为?Follower,随后?Broker?0?GC?结束仍然以?Leader(Controller)?身份运行,此时集群中就有多个?Leader(Controller);

图?4-3

为了解决脑裂问题,新版本的?KAFKA?中新增了“控制器纪元”(ZooKeeper?中是持久节点,路径是?/controller_epoch),Integer?类型,用于跟踪并记录?Controller?发生变更的次数,表示当前的?Controller?是第几代控制器。controller_epoch?的初始值为?1,当?Controller?发生变更,每选出一个新的?Controller?就将该字段自增。每个和?Controller?交互的请求都会携带?controller_epoch?字段:

  • 若请求的?controller_epoch?值小于内存中的?epoch?值,则认为这个请求是发送给已过期的?Controller,那么这个请求会被认为是无效;

  • 若请求的?controller_epoch?值大于内存中的?epoch?值,则说明已经有新的?Controller?当选;

由此可见,KAFKA?通过?controller_epoch?来保证?Controller?的唯一性;

5、Controller?的事件队列模型

从上文可知,Controller?在选举成功后会读取?ZK?中各节点的数据来完成一系列的初始化工作,不管是监听器触发的事件,还是周期性任务触发事件,都会读取或更新?Controller?中的上下文信息,这样就涉及到了多线程同步,如果单纯使用锁机制来控制,那么整体性能会大打折扣,实际上,在?0.10?版本之前,都是通过锁机制的方式来处理上下文的多线程同步,在?0.11+?版本以后改用单线程基于事件队列的模型,将每个事件都做一层封装,然后按事件发生的先后顺序暂存到?LinkedBlockingQueue?中,最后使用?ControllerEventThread?线程按照?FIFO?的原则来处理各个事件,这样不需要锁机制就可以在多线程间维护线程安全。

class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer], eventProcessedListener: ControllerEvent => Unit) {
  ...
  private val putLock = new ReentrantLock()
  private val queue = new LinkedBlockingQueue[ControllerEvent]
  private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)

  def start(): Unit = thread.start()

  def put(event: ControllerEvent): Unit = inLock(putLock) {
    queue.put(event)
  }

  class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
    override def doWork(): Unit = {
      queue.take() match {
				...
        case controllerEvent =>
          try {
            rateAndTimeMetrics(state).time {
              controllerEvent.process()
            }
          } catch {
            case e: Throwable => error(s"Error processing event $controllerEvent", e)
          }
				...
      }
    }
  }
}

在?KAFKA?早期版本中,并没有采用?KafkaController?这样一个概念来统一管理分区和副本的状态,而是依赖ZK,每个?Broker?都会在?ZK?上为分区和副本注册大量的?Watcher,当分区或副本状态变化时,会唤醒很多不必要的?Watcher,这种严重依赖?ZK?的设计会有脑裂、羊群效应,以及造成?ZK?过载的隐患;在?0.11+?版本以后,只有?KafkaController?在?ZK?上注册相应的?Watcher,其它的?Broker?极少需要再监听?ZK?中的数据变化,不过每个?Broker?还是会对?/controller?节点添加监听器,用于监听?Controller?是否存活;

图?5-1

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-13 21:52:47  更:2022-03-13 21:54:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 19:09:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码