1、导读
KAFKA?是基于?Scala?语言开发的一个多分区、多副本且基于?ZooKeeper?协调的分布式消息系统,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用,越来越多的开源分布式处理系统如?Cloudera、Storm、Spark、Flink?等都支持与?KAFKA?集成。本文将基于?KAFKA?v1.1.0?版本源码,探讨?KafkaController?的启动流程、选举流程、脑裂问题和事件队列模型。笔者水平有限,若有不当之处,敬请指正。
2、Controller?是什么
很多时候业务为了保证?7x24?小时提供服务,假如一台机器宕机,仍然能有其它机器顶替它继续工作,一般采用?Leader/Follower?模式,即常说的主从模式。正常情况下主机提供服务,备机只负责监听主机状态,当主机故障时,业务可以自动切换到备机继续提供服务,而在切换过程中从备机中选出主机的过程就是?Leader?选举。
KAFKA?集群的多个服务代理节点(节点,Broker)中,有一个?Broker?会被选举为控制器(Controller)。Controller?负责管理整个集群中所有分区和副本的状态变化,并时刻检测集群状态的变化,例如?Broker?故障、分区的?Leader?副本选举和分区重分配。每个?KAFKA?集群只有一个?Controller,以维护集群的单一一致视图。虽然?Controller?成为单点,但?KAFKA?有处理单点故障的机制。
3、Controller?的启动流程
KAFKA?Server?启动时会做一系列的初始化,例如:初始化?ZooKeeper、启动?KafkaController、启动?GroupCoordinator、启动?ReplicaManager?等等。
3.1、初始化?ZooKeeper
初始化?ZooKeeper?主要是建立连接,注册监听器,其中值得关注的是在初始化原生ZooKeeper对象的同时会注册一个全局的事件监听器(Watcher),负责监听ZK节点数据变更、ZK的会话状态等等。
// sessionTimeoutMs: server.properties 配置文件中的 zookeeper.session.timeout.ms
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
ZooKeeperClientWatcher?负责监听ZK节点数据变更、会话状态等等。
private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
debug(s"Received event: $event")
Option(event.getPath) match {
case None =>
val state = event.getState
stateToMeterMap.get(state).foreach(_.mark())
inLock(isConnectedOrExpiredLock) {
isConnectedOrExpiredCondition.signalAll()
}
if (state == KeeperState.AuthFailed) {
error("Auth failed.")
stateChangeHandlers.values.foreach(_.onAuthFailure())
} else if (state == KeeperState.Expired) {
// 监听Broker与ZK的会话过期事件
scheduleSessionExpiryHandler()
}
case Some(path) =>
// 监听ZK节点状态,将事件分发至相应的Handler
(event.getType: @unchecked) match {
case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
}
}
}
}
其中?zNodeChangeHandlers、zNodeChildChangeHandlers?均是?ConcurrentHashMap?结构,在后续的?KAFKA?的启动过程中会将相应的?Handler?添加进来,ZK节点监听器监听到节点发生变化后,KafkaController?负责从?ConcurrentHashMap?结构中根据?ZK?节点路径取出对应的处理器(Handler)处理事件。
private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
3.2、启动内存队列异步事件管理器
所有?Broker?会定期检查?ZooKeeper?以表明自己是健康的。如果?Broker?的响应时间超过?zookeeper.session.timeout.ms(默认18,000?毫秒),则意味着?Broker?与?ZooKeeper?的会话过期,随后将触发?Controller?选举。
eventManager?的对象类型是?ControllerEventManager,用于统一管理监听器触发事件,当ZK监听到节点变化后,其运用单线程方式从内存队列中取出对应的?Handler?处理事件。「Controller?的事件队列模型」章节会展开讲述。
def startup() = {
zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = StateChangeHandlers.ControllerHandler
override def beforeInitializingSession(): Unit = {
// 初始化 KafkaController 和 ZK 的会话信息
// 1、重置内存中的ActiveControllerId
// 2、重置内存中的依赖对象信息,例如监听器触发的事件、上下文信息
// 3、重置Kafka Broker状态
val expireEvent = new Expire
eventManager.clearAndPut(expireEvent)
expireEvent.waitUntilProcessed()
}
override def afterInitializingSession(): Unit = {
// 事件管理器添加「注册Broker和选举Controller」事件
// 1、ZK注册Broker信息
// 2、选举Controller
eventManager.put(RegisterBrokerAndReelect)
}
})
// 事件管理器添加KafkaController的启动事件
eventManager.put(Startup)
// 单线程启动事件管理器
eventManager.start()
}
3.3、选举?Controller
每个?Broker?启动过程中,都会尝试在?ZK?创建?/controller?临时节点,也有可能其它?Broker?同时尝试创建,只有创建成功的那个?Broker?才会成为?Controller,而创建失败的?Broker?选举失败。「Controller?的选举流程」章节会展开讲述。
case object Startup extends ControllerEvent {
...
override def process(): Unit = {
// 1、/controller 节点添加ControllerChangeHandler,监听Controller的变更事件
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
// 2、选举Controller
elect()
}
}
3.3、初始化上下文信息
Controller?在选举成功后会读取?ZK?中各节点的数据来完成一系列的初始化工作,包括初始化上下文信息(从ZK读取ControllerEpoch、Topic、分区、副本相关的各种元数据信息)、启动并管理分区和副本的状态机、定义监听器触发事件、定义周期性任务触发事件(如分区自平衡)。不管是监听器触发的事件,还是周期性任务触发事件,都会读取或更新?Controller?中的上下文信息。
private def onControllerFailover() {
// 1、读取 /controller_epoch 的值到 ControllerContext(上下文信息)
// 2、递增 ControllerEpoch,并更新到 /controller_epoch
readControllerEpochFromZooKeeper()
incrementControllerEpoch()
// 3、注册 ZK 监听器触发事件(逻辑上基于事件队列模型实现,并非真正在ZK上增加Watcher)
// - Broker 相关变化的事件处理器
// - Topic 相关变化的事件处理器
// - ISR 集合变更的事件处理器
// - 优先副本选举的事件处理器
// - 分区重分配的事件处理器
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
...
// 4、初始化上下文信息,ControllerContext用于维护Topic、分区和副本相关的各种元数据,
//
initializeControllerContext()
...
}
private def initializeControllerContext() {
// 读取 /brokers/ids,初始化可用 Broker 集合
controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
// 读取 /broker/topics,初始化集群中所有 Topic 集合
controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
// 所有 Topic 对应ZK中 /brokers/topics/<topic> 节点添加PartitionModificationsHandler,用于监听Topic中的分区分配变化
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
// 读取 /broker/topics/<topic>/partitions,初始化每个Topic每个分区的AR集合
controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// 所有可用 Broker 对应ZK中 /brokers/ids/<id> 节点添加BrokerModificationsHandler,用于监听Broker增减的变化
registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
// 初始化每个分区的 Leader、ISR 集合等信息
updateLeaderAndIsrCache()
// 启动 ControllerChannelManager
startChannelManager()
// 读取 /admin/reassign_partitions,初始化需要进行副本重分配的分区
initializePartitionReassignment()
...
}
KafkaController?相关的?ZooKeeper?路径如下:
-
/controller:存放?Controller?的信息。记录?Controller?所在的?brokerid,节点类型是临时节点,与ZK会话失效时自动删除。ControllerChangeHandler?监听?Controller?的变更事件; -
/brokers/ids:存放集群中所有可用的?Broker。BrokerChangeHandler?负责处理?Broker?的增减事件; -
/brokers/topics:存放集群中所有的?Topic。TopicChangeHandler?负责处理?Topic?的增减事件; -
/admin/delete_topics:存放删除的?Topic。TopicDeletionHandler?负责处理?Topic?的删除事件; -
/brokers/topics/<topic>/partitions:存放?Topic?的分区。PartitionModificationsHandler?负责处理分区分配事件; -
/brokers/topics/<topic>/partitions/<partition>/state:存放的是分区的?Leader?副本和?ISR?集合信息。PartitionReassignmentIsrChangeHandler?负责在副本分配到分区时,需要等待新副本追上Leader?副本后才能执行后续操作; -
/admin/reassign_partitions:存放重新分配AR的路径,通过命令修改AR时会写入此路径。PartitionReassignmentHandler?负责执行分区重新分配; -
/admin/preferred_replica_election:存放需要选举副本?Leader?的分区信息。PreferredReplicaElectionHandler?负责对指定分区进行有限副本选举。
4、Controller?的选举流程
KafkaController?触发选举的时机有三个:
每个?Broker?启动过程中,都会尝试在?ZK?创建?/controller?临时节点,同时也有可能其它?Broker?尝试创建,但只有创建成功的那个?Broker?才会成为?Controller,而创建失败的?Broker?选举失败。图?4-1?展示了?Broker0?成功创建临时节点并写入?Controller?信息;图?4-2?根据展示了当?Broker0?竞选成为?Controller,而?Broker1?和?Broker?2?竞选失败后作为?Follower?后不工作;
图?4-1
图?4-2
每个?Broker?都会在内存中保存当前?Controller?的?brokerid?的值,标识为?activeControllerId。
private def elect(): Unit = {
...
// 步骤1、获取ZK的/controller临时节点中brokerid的值,若节点不存在则取-1
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (activeControllerId != -1) {
debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
return
}
try {
// 步骤2:尝试在ZK中创建/controller临时节点,若创建成功则竞选成为Controller
zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
info(s"${config.brokerId} successfully elected as the controller")
activeControllerId = config.brokerId
// 步骤3:更新/controller_epoch等信息
onControllerFailover()
} catch {
...
}
}
private def onControllerFailover() {
// 步骤4、读取 /controller_epoch 的值到 ControllerContext(上下文信息)
// 步骤5、递增 ControllerEpoch,并更新到 /controller_epoch
info("Reading controller epoch from ZooKeeper")
readControllerEpochFromZooKeeper()
info("Incrementing controller epoch in ZooKeeper")
incrementControllerEpoch()
info("Registering handlers")
...
}
4.1、旧版的脑裂问题
早期?KAFKA?版本(例如?0.8.x)的脑裂问题?ISSUE:KAFKA-Split-Brain
上文介绍过,KAFKA?是依赖?ZooKeeper?来做?Leader/Follower?HA:每个节点都尝试注册一个象征??Leader?的临时节点,其他未注册成功的则成为?Follower,并且通过监听器机制监听着?Leader?所创建的临时节点,ZooKeeper?通过内部心跳机制来确定?Leader?的状态,一旦?Leader?出现故障,ZooKeeper?能很快获悉并且通知其他的?Follower,其他?Follower?尝试竞选,这样就完成了一个新的?Leader?切换。但值得注意的是,短暂的时间内可能由于?Leader?的?GC?时间过长或者?ZooKeeper?节点间网络抖动导致心跳超时(Leader?假死),ZooKeeper?与?Leader?的会话超时随后?ZooKeeper?通知剩余的?Follower?重新竞选,Follower?中就有一个成为了“新Leader”,但“旧Leader”并未故障,此时可能有一部分的客户端已收到?ZooKeeper?的通知并连接到“新Leader”,有一部分客户端仍然连接在“旧Leader”,如果同时两个客户端分别对新旧?Leader?的同一个数据进行更新,就会出现很严重问题。
ZooKeeper?会话过期后通知?Follower?重新竞选的代码入口:kafka.zookeeper.ZooKeeperClient#scheduleSessionExpiryHandler
图?4-3?展示了一种可能发生脑裂过程,Broker?0?是?Controller,但由于负载过高发生长时间?FGC,耗时超过?zookeeper.session.timeout.ms?导致与?ZooKeeper?的心跳超时,ZooKeeper?监听到会话过期事件并通知?Broker?1?和?Broker?2,Broker?1?竞选成功成为新的?Leader(Controller),Broker?2?竞选失败继续成为?Follower,随后?Broker?0?GC?结束仍然以?Leader(Controller)?身份运行,此时集群中就有多个?Leader(Controller);
图?4-3
为了解决脑裂问题,新版本的?KAFKA?中新增了“控制器纪元”(ZooKeeper?中是持久节点,路径是?/controller_epoch),Integer?类型,用于跟踪并记录?Controller?发生变更的次数,表示当前的?Controller?是第几代控制器。controller_epoch?的初始值为?1,当?Controller?发生变更,每选出一个新的?Controller?就将该字段自增。每个和?Controller?交互的请求都会携带?controller_epoch?字段:
由此可见,KAFKA?通过?controller_epoch?来保证?Controller?的唯一性;
5、Controller?的事件队列模型
从上文可知,Controller?在选举成功后会读取?ZK?中各节点的数据来完成一系列的初始化工作,不管是监听器触发的事件,还是周期性任务触发事件,都会读取或更新?Controller?中的上下文信息,这样就涉及到了多线程同步,如果单纯使用锁机制来控制,那么整体性能会大打折扣,实际上,在?0.10?版本之前,都是通过锁机制的方式来处理上下文的多线程同步,在?0.11+?版本以后改用单线程基于事件队列的模型,将每个事件都做一层封装,然后按事件发生的先后顺序暂存到?LinkedBlockingQueue?中,最后使用?ControllerEventThread?线程按照?FIFO?的原则来处理各个事件,这样不需要锁机制就可以在多线程间维护线程安全。
class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer], eventProcessedListener: ControllerEvent => Unit) {
...
private val putLock = new ReentrantLock()
private val queue = new LinkedBlockingQueue[ControllerEvent]
private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
def start(): Unit = thread.start()
def put(event: ControllerEvent): Unit = inLock(putLock) {
queue.put(event)
}
class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
override def doWork(): Unit = {
queue.take() match {
...
case controllerEvent =>
try {
rateAndTimeMetrics(state).time {
controllerEvent.process()
}
} catch {
case e: Throwable => error(s"Error processing event $controllerEvent", e)
}
...
}
}
}
}
在?KAFKA?早期版本中,并没有采用?KafkaController?这样一个概念来统一管理分区和副本的状态,而是依赖ZK,每个?Broker?都会在?ZK?上为分区和副本注册大量的?Watcher,当分区或副本状态变化时,会唤醒很多不必要的?Watcher,这种严重依赖?ZK?的设计会有脑裂、羊群效应,以及造成?ZK?过载的隐患;在?0.11+?版本以后,只有?KafkaController?在?ZK?上注册相应的?Watcher,其它的?Broker?极少需要再监听?ZK?中的数据变化,不过每个?Broker?还是会对?/controller?节点添加监听器,用于监听?Controller?是否存活;
图?5-1
|