2021SC@SDUSC
storm代码阅读(四)
2021SC@SDUSC
事务topology
根据topology中Spout类型不同,topology可分为非事务topology和事务topology两种类型。
非事务topology:Spout类型为IRichSpout。Storm并不保证消息的可靠传输,消息可能会丢失。
事务topology:Spout类型为ITransactionalSpout。Storm负责初始化一个事务并负责在事务失败时进行重传,事务topology保证了消息的可靠传输,以及在事务提交节点处事务能够按顺序被提交。
事务topology又根据ITransactionalSpout接口的不同实现分为基本事务topology和模糊事务topology两种类型。
基本事务topology:每个事务所对应的数据在事务被重传时不发生变化。用户只要保证数据的元数据不变就可以每次获取到相同的数据集合。
模糊事务topology:每个事务所对应的数据在事务被重传时可能发生变化,但会保证事务中的消息只属于某一个事务,也就是保证同一条消息不会同时属于多个事务。
在这里,我们主要介绍一下事务topology。 图片里展示的是事务拓扑的接口以及实现类,了解这些类是理解事务topology的关键。
ITransactionalSpout
ITransactionalSpout接口包含两部分接口,一部分用于提供协调Spout节点的逻辑,一部分用于提供消息发送Bolt结点的逻辑。用户实现这些接口后Storm便会负责将这些逻辑部署到合适的节点上运行。
ITransactionalSpout接口具体实现如下:
1 public interface ITransactionalSpout<T> extends IComponent {
2 public interface Coordinator<X> {
3
4 X initializeTransaction(BigInteger txid, X prevMetadata);
5
6 boolean isReady();
7
8 void close();
9 }
10
11 public interface Emitter<X> {
12
13 void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector);
14
15 void cleanupBefore(BigInteger txid);
16
17 void close();
18 }
19
20 Coordinator<T> getCoordinator(Map conf, TopologyContext context);
21
22 Emitter<T> getEmitter(Map conf, TopologyContext context);
23 }
第4行的initializeTransaction方法用于产生新事物的元数据,它可以基于上一个事务的元数据来产生。初始化第一个事务时prevMetadata为空。要注意的是该方法仅当isReady方法返回true时才有可能被调用到,并且,对于每个事务它只会调用一次。
第6行的isReady方法用来检测当前是否开始一个新事务。Storm中许多地方都可能是产生新事物的合适时间点,如在Spout收到ack消息后,上一个事务可能已经处理结束,此时可以调用isReady方法来判断是否可以开始一个新事务。由于Spout的主循环线程会依次处理输入消息以及产生新消息,这就要求其中调用的方法时非阻塞的。因此isReady方法需要设计为非阻塞的,即如果新事物不就绪就立即返回false,而不是调用sleep方法。
第13行的emitBatch方法是消息发送Bolt结点的接口中最重要的方法。协调Spout节点发送的事务尝试消息都会到达消息发送Bolt节点,然后该节点会调用emitBatch方法来发送一批数据,这个过程要保证同一个事务序号对应于相同的数据(模糊事务类型的Spout除外)。要注意对于同一个事务序号该方法可能被调用多次,如事务被重传时。
第15行的cleanupBefore方法负责给用户提供合适的时间点来清理和事务相关的数据,该方法只有在输入的事务序号相对应的事务全被成功处理后来才被调用。
|