kafka启动流程分析
KafkaServer.startup()来完成KafkaServer的启动工作。startup方法会完成核心组件的初始化并启动这些组件。包括组件任务调度器( KafkaScheduler)、日志管理器 (LogManager)、 网络通信服务器 (SockeServer)、副本管理器 ( ReplicaManager)、 控制器 (KafkaController)、组协调器 ( GroupCoordinator)、动态配置管 理器 (DynamicConfigManager)以及 Kafka 健康状态检测 (KafkaHealthcheck)等。 (1)设置代理的状态为Starting 表示启动代理 ( 代理状态机有6种状态: a、NotRunning 0 代理未启动, b、Starting 1 代理正在启动中 , c、 RecoveringFromUncleanShutdown 2 代理非正常关闭,在${1og.dir}配置的每个路 径下存在.katka cleanshutdown 文件 d、RunningAsBroker 3 代理己正常启动 e、PendingControlledShutdown 6 KatkaController 被关闭 f、BrokerShuttingDown 7 代理正在准备关闭 )
(2)启动任务调度器( KafkaScheduler ) 构造一个线程总数为KaTeX parse error: Expected '}', got 'EOF' at end of input: …dlerPool 中主要是创建{ num.io.threads }个 KafkaRequestHandler, Handler 循环从 Request Channel 中取出 Request 并交给 kafka.server.KafkaApis 来处理具体的业务逻辑。 在实例化 KafkaRequestHandlerPool 之前先要实例化 KafkaApis, Kafka 将所有请求的 requestld 封装成一个 枚举类 ApiKeys。当前版本的 Kafka 支持 21 种类型的请求。 ( 11 )实例化动态配置管理器。 注册监听 ZooKeeper 的/config 路径下各子节点信息变化。 (12) 实例化井启动 Kafka 健康状态检查 (KafkaHealthcheck)。 Kafka 健康检查机制主要是 在 ZooKeeper 的/brokers/ids 路径下创建一个与当前代理的 id 同名的节点,该节点也是一个临时 节点。当代理离线时,该节点会被删除,其他代理或者消费者通过判断/brokers/ids 路径下是否 有某个代理的 brokerId 来确定该代理的健康状态。 (13 )向 meta.prope叫es 文件中写入当前代理的 id 以及固定版本号为 0 的 version 信息。 ( 14)注册 Kafka 的 m由ics 信息,在 KafkaServer 启动时将一些动态的 JMX Beans 进行注 册,以便于对 Kafka 进行跟踪监控。 最后将当前代理的状态设置为 RunningAsBroker,表示当前 KafkaServer 己正常启动完成,
|