2021SC@SDUSC
storm代码阅读(二)
2021SC@SDUSC
topology介绍
Storm集群中的任务称之为Topology。
Topology任务从数据源中获取数据,然后进行后续处理。在Topology中从外部数据源获取数据的组件,称之为Spout,处理数据的组件,称之为bolt。一个Topology就是由一个或者多个的Spout和Bolt组成。特别注意的是一个Topology中,必须同时存在Spout和Bolt,Spout和Bolt数量可以随意。几种topology举例: Topology是由Spout、Bolt、数据载体Tuple等构成的一定规则的网络拓扑图。Storm的拓扑Topology类似于MapReduce任务,一个关键的区别是MapReduce任务运行一段时间后最终会完成,而Storm拓扑一直运行(直到杀掉它)。
TopologyBuilder代码总览
Storm提供了TopologyBuilder类来创建Topology。TopologyBuilder实际上是封装了Topology的Thrift接口,也就是说Topology实际上是通过Thrift定义的一个结构,TopologyBuilder将这个对象建立起来,然后Nimbus实际上运行一个Thrift服务器,用于接收用户提交的结构。由于采用Thrift实现,所以用户可以用其他语言建立Topology,这样就提供了比较方便的多语言操作支持。
它的主要方法包括setSpout、setBolt以及他们的重载方法,最终目的是创建StormTopology对象。
TopologyBuilder类定义如下:
public class TopologyBuilder {
private final Map<String, IRichBolt> bolts = new HashMap<>();
private final Map<String, IRichSpout> spouts = new HashMap<>();
private final Map<String, ComponentCommon> commons = new HashMap<>();
private final Map<String, Set<String>> componentToSharedMemory = new HashMap<>();
private final Map<String, SharedMemory> sharedMemory = new HashMap<>();
private boolean hasStatefulBolt = false;
private Map<String, StateSpoutSpec> stateSpouts = new HashMap<>();
private List<ByteBuffer> workerHooks = new ArrayList<>();
private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) {
Map<String, Object> res = new HashMap<>(into);
res.putAll(newMap);
return JSONValue.toJSONString(res);
}
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
maybeAddCheckpointSpout();
for (String boltId : bolts.keySet()) {
IRichBolt bolt = bolts.get(boltId);
bolt = maybeAddCheckpointTupleForwarder(bolt);
ComponentCommon common = getComponentCommon(boltId, bolt);
try {
maybeAddCheckpointInputs(common);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
} catch (RuntimeException wrapperCause) {
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
throw new IllegalStateException("Bolt '" + boltId + "' contains a non-serializable field of type "
+ wrapperCause.getCause().getMessage() + ", "
+ "which was instantiated prior to topology creation. "
+ wrapperCause.getCause().getMessage()
+ " "
+ "should be instantiated within the prepare method of '"
+ boltId
+ " at the earliest.",
wrapperCause);
}
throw wrapperCause;
}
}
for (String spoutId : spouts.keySet()) {
IRichSpout spout = spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
try {
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
} catch (RuntimeException wrapperCause) {
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
throw new IllegalStateException(
"Spout '" + spoutId + "' contains a non-serializable field of type "
+ wrapperCause.getCause().getMessage()
+ ", which was instantiated prior to topology creation. "
+ wrapperCause.getCause().getMessage()
+ " should be instantiated within the open method of '"
+ spoutId
+ " at the earliest.",
wrapperCause);
}
throw wrapperCause;
}
}
StormTopology stormTopology = new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<>());
stormTopology.set_worker_hooks(workerHooks);
if (!componentToSharedMemory.isEmpty()) {
stormTopology.set_component_to_shared_memory(componentToSharedMemory);
stormTopology.set_shared_memory(sharedMemory);
}
return Utils.addVersions(stormTopology);
}
/**
* Define a new bolt in this topology with parallelism of just one thread.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the bolt
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology with the specified amount of parallelism.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt the bolt
* @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, bolt, parallelismHint);
bolts.put(id, bolt);
return new BoltGetter(id);
}
/**
* Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic
* bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the
* topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the basic bolt
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic
* bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the
* topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt the basic bolt
* @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelismHint) throws IllegalArgumentException {
return setBolt(id, new BasicBoltExecutor(bolt), parallelismHint);
}
/**
* Define a new bolt in this topology. This defines a windowed bolt, intended for windowing operations. The {@link
* IWindowedBolt#execute(TupleWindow)} method is triggered for each window interval with the list of current events in the window.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the windowed bolt
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology. This defines a windowed bolt, intended for windowing operations. The {@link
* IWindowedBolt#execute(TupleWindow)} method is triggered for each window interval with the list of current events in the window.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt the windowed bolt
* @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somwehere around the cluster.
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelismHint) throws IllegalArgumentException {
return setBolt(id, new WindowedBoltExecutor(bolt), parallelismHint);
}
/**
* Define a new bolt in this topology. This defines a stateful bolt, that requires its state (of computation) to be saved. When this
* bolt is initialized, the {@link IStatefulBolt#initState(State)} method is invoked after {@link IStatefulBolt#prepare(Map,
* TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)} with its previously saved state.
* <p>
* The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
* are expected to anchor the tuples while emitting and ack the input tuples once its processed.
* </p>
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful bolt
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology. This defines a stateful bolt, that requires its state (of computation) to be saved. When this
* bolt is initialized, the {@link IStatefulBolt#initState(State)} method is invoked after {@link IStatefulBolt#prepare(Map,
* TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)} with its previously saved state.
* <p>
* The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
* are expected to anchor the tuples while emitting and ack the input tuples once its processed.
* </p>
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt the stateful bolt
* @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somwehere around the cluster.
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt, Number parallelismHint) throws
IllegalArgumentException {
hasStatefulBolt = true;
return setBolt(id, new StatefulBoltExecutor<T>(bolt), parallelismHint);
}
/**
* Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful windowing operations. The {@link
* IStatefulWindowedBolt#execute(TupleWindow)} method is triggered for each window interval with the list of current events in the
* window. During initialization of this bolt {@link IStatefulWindowedBolt#initState(State)} is invoked with its previously saved
* state.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the stateful windowed bolt
* @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState})
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
/**
* Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful windowing operations. The {@link
* IStatefulWindowedBolt#execute(TupleWindow)} method is triggered for each window interval with the list of current events in the
* window. During initialization of this bolt {@link IStatefulWindowedBolt#initState(State)} is invoked with its previously saved
* state.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param bolt the stateful windowed bolt
* @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somwehere around the cluster.
* @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState})
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt, Number parallelismHint) throws
IllegalArgumentException {
hasStatefulBolt = true;
IStatefulBolt<T> executor;
if (bolt.isPersistent()) {
executor = new PersistentWindowedBoltExecutor<>(bolt);
} else {
executor = new StatefulWindowedBoltExecutor<T>(bolt);
}
return setBolt(id, new StatefulBoltExecutor<T>(executor), parallelismHint);
}
/**
* Define a new bolt in this topology. This defines a lambda basic bolt, which is a simpler to use but more restricted kind of bolt.
* Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in
* the topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param biConsumer lambda expression that implements tuple processing for this bolt
* @param fields fields for tuple that should be emitted to downstream bolts
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple, BasicOutputCollector> biConsumer, String... fields) throws
IllegalArgumentException {
return setBolt(id, biConsumer, null, fields);
}
/**
* Define a new bolt in this topology. This defines a lambda basic bolt, which is a simpler to use but more restricted kind of bolt.
* Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in
* the topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param biConsumer lambda expression that implements tuple processing for this bolt
* @param fields fields for tuple that should be emitted to downstream bolts
* @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple, BasicOutputCollector> biConsumer, Number parallelismHint,
String... fields) throws IllegalArgumentException {
return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelismHint);
}
/**
* Define a new bolt in this topology. This defines a lambda basic bolt, which is a simpler to use but more restricted kind of bolt.
* Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in
* the topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param consumer lambda expression that implements tuple processing for this bolt
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, SerializableConsumer<Tuple> consumer) throws IllegalArgumentException {
return setBolt(id, consumer, null);
}
/**
* Define a new bolt in this topology. This defines a lambda basic bolt, which is a simpler to use but more restricted kind of bolt.
* Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in
* the topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's
* outputs.
* @param consumer lambda expression that implements tuple processing for this bolt
* @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
* somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, SerializableConsumer<Tuple> consumer, Number parallelismHint) throws IllegalArgumentException {
return setBolt(id, new LambdaConsumerBolt(consumer), parallelismHint);
}
/**
* Define a new spout in this topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
* @param spout the spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumentException {
return setSpout(id, spout, null);
}
/**
* Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the
* parallelism_hint will be ignored and only one task will be allocated to this component.
*
* @param id the id of this component. This id is referenced by other components that want to consume this spout's
* outputs.
* @param parallelismHint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
* process somewhere around the cluster.
* @param spout the spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelismHint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, spout, parallelismHint);
spouts.put(id, spout);
return new SpoutGetter(id);
}
/**
* Define a new spout in this topology.
*
* @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
* @param supplier lambda expression that implements tuple generating for this spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier) throws IllegalArgumentException {
return setSpout(id, supplier, null);
}
/**
* Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the
* parallelism_hint will be ignored and only one task will be allocated to this component.
*
* @param id the id of this component. This id is referenced by other components that want to consume this spout's
* outputs.
* @param parallelismHint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
* process somewhere around the cluster.
* @param supplier lambda expression that implements tuple generating for this spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier, Number parallelismHint) throws IllegalArgumentException {
return setSpout(id, new LambdaSpout(supplier), parallelismHint);
}
/**
* Add a new worker lifecycle hook.
*
* @param workerHook the lifecycle hook to add
*/
public void addWorkerHook(IWorkerHook workerHook) {
if (null == workerHook) {
throw new IllegalArgumentException("WorkerHook must not be null.");
}
workerHooks.add(ByteBuffer.wrap(Utils.javaSerialize(workerHook)));
}
private void validateUnusedId(String id) {
if (bolts.containsKey(id)) {
throw new IllegalArgumentException("Bolt has already been declared for id " + id);
}
if (spouts.containsKey(id)) {
throw new IllegalArgumentException("Spout has already been declared for id " + id);
}
if (stateSpouts.containsKey(id)) {
throw new IllegalArgumentException("State spout has already been declared for id " + id);
}
}
/**
* If the topology has at least one stateful bolt add a {@link CheckpointSpout} component to the topology.
*/
private void maybeAddCheckpointSpout() {
if (hasStatefulBolt) {
setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
}
}
private void maybeAddCheckpointInputs(ComponentCommon common) {
if (hasStatefulBolt) {
addCheckPointInputs(common);
}
}
/**
* If the topology has at least one stateful bolt all the non-stateful bolts are wrapped in {@link CheckpointTupleForwarder} so that the
* checkpoint tuples can flow through the topology.
*/
private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) {
if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) {
bolt = new CheckpointTupleForwarder(bolt);
}
return bolt;
}
/**
* For bolts that has incoming streams from spouts (the root bolts), add checkpoint stream from checkpoint spout to its input. For other
* bolts, add checkpoint stream from the previous bolt to its input.
*/
private void addCheckPointInputs(ComponentCommon component) {
Set<GlobalStreamId> checkPointInputs = new HashSet<>();
for (GlobalStreamId inputStream : component.get_inputs().keySet()) {
String sourceId = inputStream.get_componentId();
if (spouts.containsKey(sourceId)) {
checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID));
} else {
checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID));
}
}
for (GlobalStreamId streamId : checkPointInputs) {
component.put_to_inputs(streamId, Grouping.all(new NullStruct()));
}
}
private ComponentCommon getComponentCommon(String id, IComponent component) {
ComponentCommon ret = new ComponentCommon(commons.get(id));
OutputFieldsGetter getter = new OutputFieldsGetter();
component.declareOutputFields(getter);
ret.set_streams(getter.getFieldsDeclaration());
return ret;
}
private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if (parallelism != null) {
int dop = parallelism.intValue();
if (dop < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(dop);
}
Map<String, Object> conf = component.getComponentConfiguration();
if (conf != null) {
common.set_json_conf(JSONValue.toJSONString(conf));
}
commons.put(id, common);
}
TopologyBuilder具体代码段分析
具体代码段分析:
private final Map<String, IRichBolt> bolts = new HashMap<>();
private final Map<String, IRichSpout> spouts = new HashMap<>();
private final Map<String, ComponentCommon> commons = new HashMap<>();
private final Map<String, Set<String>> componentToSharedMemory = new HashMap<>();
private final Map<String, SharedMemory> sharedMemory = new HashMap<>();
private boolean hasStatefulBolt = false;
private Map<String, StateSpoutSpec> stateSpouts = new HashMap<>();
private List<ByteBuffer> workerHooks = new ArrayList<>();
该部分定义了类成员变量。bolts包含了所有Bolt对象,它们均为IRichBolt类型;spouts包含了所有Spout对象,均为IRichBolt类型;commons包含了所有Bolt及Spout对象;stateSpouts包含了StateSpout对象,StateSpouts是具有同步功能的Spout对象。
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
maybeAddCheckpointSpout();
for (String boltId : bolts.keySet()) {
IRichBolt bolt = bolts.get(boltId);
bolt = maybeAddCheckpointTupleForwarder(bolt);
ComponentCommon common = getComponentCommon(boltId, bolt);
try {
maybeAddCheckpointInputs(common);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
} catch (RuntimeException wrapperCause) {
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
throw new IllegalStateException("Bolt '" + boltId + "' contains a non-serializable field of type "
+ wrapperCause.getCause().getMessage() + ", "
+ "which was instantiated prior to topology creation. "
+ wrapperCause.getCause().getMessage()
+ " "
+ "should be instantiated within the prepare method of '"
+ boltId
+ " at the earliest.",
wrapperCause);
}
throw wrapperCause;
}
}
for (String spoutId : spouts.keySet()) {
IRichSpout spout = spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
try {
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
} catch (RuntimeException wrapperCause) {
if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
throw new IllegalStateException(
"Spout '" + spoutId + "' contains a non-serializable field of type "
+ wrapperCause.getCause().getMessage()
+ ", which was instantiated prior to topology creation. "
+ wrapperCause.getCause().getMessage()
+ " should be instantiated within the open method of '"
+ spoutId
+ " at the earliest.",
wrapperCause);
}
throw wrapperCause;
}
}
StormTopology stormTopology = new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<>());
stormTopology.set_worker_hooks(workerHooks);
if (!componentToSharedMemory.isEmpty()) {
stormTopology.set_component_to_shared_memory(componentToSharedMemory);
stormTopology.set_shared_memory(sharedMemory);
}
return Utils.addVersions(stormTopology);
}
该部分根据输入的Blot和Spout对象构建StormTopology对象。根据 boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); 和 spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));可以看出,StormTopology中Blot和Spout均为对象序列化后得到的字节数组。
public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, bolt, parallelismHint);
bolts.put(id, bolt);
return new BoltGetter(id);
}
public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelismHint) throws IllegalArgumentException {
return setBolt(id, new BasicBoltExecutor(bolt), parallelismHint);
}
public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelismHint) throws IllegalArgumentException {
return setBolt(id, new WindowedBoltExecutor(bolt), parallelismHint);
}
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt, Number parallelismHint) throws
IllegalArgumentException {
hasStatefulBolt = true;
return setBolt(id, new StatefulBoltExecutor<T>(bolt), parallelismHint);
}
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException {
return setBolt(id, bolt, null);
}
public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt, Number parallelismHint) throws
IllegalArgumentException {
hasStatefulBolt = true;
IStatefulBolt<T> executor;
if (bolt.isPersistent()) {
executor = new PersistentWindowedBoltExecutor<>(bolt);
} else {
executor = new StatefulWindowedBoltExecutor<T>(bolt);
}
return setBolt(id, new StatefulBoltExecutor<T>(executor), parallelismHint);
}
public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple, BasicOutputCollector> biConsumer, String... fields) throws
IllegalArgumentException {
return setBolt(id, biConsumer, null, fields);
}
public BoltDeclarer setBolt(String id, SerializableBiConsumer<Tuple, BasicOutputCollector> biConsumer, Number parallelismHint,
String... fields) throws IllegalArgumentException {
return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelismHint);
}
public BoltDeclarer setBolt(String id, SerializableConsumer<Tuple> consumer) throws IllegalArgumentException {
return setBolt(id, consumer, null);
}
public BoltDeclarer setBolt(String id, SerializableConsumer<Tuple> consumer, Number parallelismHint) throws IllegalArgumentException {
return setBolt(id, new LambdaConsumerBolt(consumer), parallelismHint);
}
该部分定义了setBolt方法及各种重载方法。其中return setBolt(id, new BasicBoltExecutor(bolt), parallelismHint);可以看出setBolt方法会利用BasicBoltExecutor包装输入的IBasicBolt对象,其中BasicBoltExecutor还实现了消息的跟踪和发送。validateUnusedId(id);会检测输入的组件ID当前是否唯一,initCommon(id, bolt, parallelismHint); bolts.put(id, bolt);这两句用于生成ComponentCommon对象。下一行return new BoltGetter(id);返回一个BoltGetter对象,将利用其为Bolt对象添加输入。
public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumentException {
return setSpout(id, spout, null);
}
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelismHint) throws IllegalArgumentException {
validateUnusedId(id);
initCommon(id, spout, parallelismHint);
spouts.put(id, spout);
return new SpoutGetter(id);
}
public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier) throws IllegalArgumentException {
return setSpout(id, supplier, null);
}
public SpoutDeclarer setSpout(String id, SerializableSupplier<?> supplier, Number parallelismHint) throws IllegalArgumentException {
return setSpout(id, new LambdaSpout(supplier), parallelismHint);
}
该部分定义了setSpout方法,它类似于setBolt方法,也将产生ComponentCommon对象。
private ComponentCommon getComponentCommon(String id, IComponent component) {
ComponentCommon ret = new ComponentCommon(commons.get(id));
OutputFieldsGetter getter = new OutputFieldsGetter();
component.declareOutputFields(getter);
ret.set_streams(getter.getFieldsDeclaration());
return ret;
}
该部分定义了getComponentCommon方法,该方法主要定义输出的流。
private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if (parallelism != null) {
int dop = parallelism.intValue();
if (dop < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(dop);
}
Map<String, Object> conf = component.getComponentConfiguration();
if (conf != null) {
common.set_json_conf(JSONValue.toJSONString(conf));
}
commons.put(id, common);
}
该部分定义了initCommon方法,主要对ComponentCommon对象进行初始化,如设置并行度和标准配置等。
|