创建主题流程分析
创建主题包括2个阶段 1、客户端创建,就是客户端将主题得元数据写入zk 2、服务端得创建,是服务端控制器创建主题得过程。 客户端创建主题 可以通过kafka的api客户端或者是命令行来创建主题,底层都是调用了TopicCommand.createTopic(zkUtils: ZkUtils, ops:TopicCommandOptions)方法创建主题。该方法的主题流程是,首先是对主题和相关的配置信息进行校验,然后进行分区副本分配。最后在zk的/brokers/topics/路径下创建节点,将分区副本分配方案写入每个分区节点之中。 对主题校验:主题名字由长度不超过 249 个字母、数字、着重号(. )、下划线()、连接号()的字符组成, 正 则表达式为: “[a-zA-Z0-9\\._\\卡”,但不允许主题名字只有着重号(.) 分区副本分配。在创建主题的时候可以指定分区副本的分配方案,也可以使用默认的分区副本分配策略。 在 0.10 版本之后, Kafka 支持指定代理机架信息,如果指定了机架信息在副 本分配时会尽可能地让分区的副本分布到不同的机架上。 当创建一个主题成功的时候,物流是kafka的api还是命令行创建的,返回创建成功的时候,仅仅是在zk上创建/brokers/topics 节点成功创建了该主题对应的子节点成功,服务端的创建是交由控制器异步的去操作的。
服务端创建: 在控制器启动的时候会注册分区状态机和副本状态机,当代理成为控制器的时候就会回调onBecomingLeader()方法,这个方法会调用分区和副本的registerListeners()方法。分区状态机注册了监听zk的路径 的/brokers/topics 子节点的变化的 TopicChangeListener 监听 器用于监听主题及分区变化。副本状态机在 registerListeners()方法中会注册一 个 BrokerChangeListener 监昕器, 该监听器用于监听/brokers/ids 子节点的变化。当创建主题的时候会往/brokers/topics 写数据,这时两个状态机的监听就会触发执行。 TopicChangeListener监听器会调用handleChildChange()方法进行处理,该方法的具体处理逻辑如下: (1)首先获取/brokers/topics节点下所有topic的列表例如为A,然后获取ControllerContext 中缓存的列表B ,集合A-B的差集C 标识为新增的topic,集合 B-A 差值为D 标识为删除的topic集合。 (2)用集合A更新ControllerContext缓存中的B (3)遍历集合C ,获取集合中每个主题下的partions子节点,获取/brokers/topics/ topic-foo/partitons 下各分区副本分配信息,构造一个 Map[TopicAndPartition, Seq[Int]]集合, 以每个主题的每个分区 TopicAndPartition 对象作为 Key,以该分区的 AR 作为 Value。 同时从分 区副本分配信息中过滤掉集合 N 中所有主题对应的分区副本分配信息, 然后更新缓存中分区副 本分配信息。 (4) 调用控制器的 onNewTopicCreation()方法, 实 现真正创建主题的逻辑。onNewTopicCreation()方法操作步骤: 1、遍历集合C。通过分区状态机为每个新创建的主题向 ZooKeeper 注册一个监听分区变化 的监听器PartitionModificationsListener 2、调用控制器的 onNewPartitionCreation()方法创建分区。 具体操作步骤:调用分区状态机的 handleStateChangesO方法, 将新增主题的各分区状态设置为NewPartition 状态。 调用副本状态机的 handleStateChange()方法, 将新增主题的每个分区的副本状态设置 为 NewReplica 状态。 调用分区状态机的 handleStateChanges()方法 , 将新增主题的各分区状态设置为 OnlinePartition 状态,将各分区 AR 中第一个副本选为该分区的 Leader,将 AR 作为 JSR, 然后 创建各分区节点井写入该分区的详细元数据信息 3、调用副本状态机将新增主题的各分区的副本状态从 NewReplica 转换为 OnlineReplica 状态。
|