2021SC@SDUSC
spout源码分析(一)
2021SC@SDUSC
2021SC@SDUSC
核心概念介绍
1、结构: Spout是storm的核心组件之一,最源头的接口是IComponent。
2、发送: 当Spout从外部获取数据后,向Topology中发出的Tuple可以是可靠的,也可以是不可靠的。Spout可以发射多个流,可以定义多个流(即定义多个stream),也可以使用方法来发射指定的流。
3、重要结构: Spout的重要方法是nextTuple,nextTuple方法发射一个新的元组到Topology,如果没有新的元组发射,则直接返回。注意任务Spout的nextTuple方法都不要实现成阻塞的,因为storm是在相同的线程中调用spout的方法。 Spout的另外两个重要方法是ack和fail方法,当spout发射的元组被拓扑成功处理时,调用ack方法,当处理失败时,调用fail方法,此外,ack和fail方法仅被可靠的spout调用。
ISpout.java
ISpout接口: storm实现主要依靠以下几个函数,全局代码如下:
package org.apache.storm.spout;
import java.io.Serializable;
import java.util.Map;
import org.apache.storm.task.TopologyContext;
public interface ISpout extends Serializable {
void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
Strom支持所有的基本类型,当它使用元组作为数据模型,元组中的每个字段都可以是任何类型的对象。而如果要使用自己定义的类型,需要为自己定义的类型实现并且注册一个serializer。每个节点还必须要为输出的元组定义字段名称。 部分函数解释: open(): 当该组件的任务在集群上初始化时调用。它为spout提供了执行spout的环境。 close(): 当ISpout即将关闭时调用。不能保证会调用close,因为supervisor会杀死集群上的的worker进程。 activate(): 当spout从非激活模式被激活时调用。 deactivate(): 当spout失效时调用。
ShellSpout.java
重载函数如下:
public void open(Map<String, Object> topoConf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.context = context;
if (topoConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
} else {
workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
}
process = new ShellProcess(command);
if (!env.isEmpty()) {
process.setEnv(env);
}
Number subpid = process.launch(topoConf, context, changeDirectory);
LOG.info("Launched subprocess with pid " + subpid);
logHandler = ShellUtils.getLogHandler(topoConf);
logHandler.setUpContext(ShellSpout.class, process, this.context);
heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
@Override
public void close() {
heartBeatExecutorService.shutdownNow();
process.destroy();
running = false;
}
@Override
public void nextTuple() {
this.sendSyncCommand("next", "");
}
@Override
public void ack(Object msgId) {
this.sendSyncCommand("ack", msgId);
}
@Override
public void fail(Object msgId) {
this.sendSyncCommand("fail", msgId);
}
@Override
public void activate() {
LOG.info("Start checking heartbeat...");
// prevent timer to check heartbeat based on last thing before activate
setHeartbeat();
if (heartBeatExecutorService.isShutdown()) {
//In case deactivate was called before
heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
this.sendSyncCommand("activate", "");
}
@Override
public void deactivate() {
this.sendSyncCommand("deactivate", "");
heartBeatExecutorService.shutdownNow();
}
void open(Map<String, Object> topoConf, TopologyContext context,SpoutOutputCollector collector) 参数: topoconf : Storm关于这个Spout的配置 context : 用来获取该Spout任务的信息,包括任务id,组件id,输入输出信息等等 collector : 用来从这个Spout里发送元组,元组可以在任何时间里发送,包括open和close函数里。collector是线程安全的,应该被作为一个实例对象保存到Spout对象里。
void ack(Object msgId): 以msgId消息告诉Storm这个Spout已经成功输出了该元组 void activate(): 激活Spout,Spout从deactivate模式转化为activate模式,Spout开始调用nextTuple输出数据。 void close(): 关闭Spout void deactivate(): 解除激活Spout,Spout从activate模式转化为deactivate模式,Spout停止调用nextTuple输出数据 void fail(Object msgId): 以msgId消息告诉Storm这个Spout输出该元组失败,主要用于将该元组重新放回消息队列,以在一段时间后重发该元组 void nextTuple(): 调用该函数请求Storm发送元组到Output Collector,这个函数不应该是阻塞的,当没有元组发送时,一般调用sleep,以充分利用CPU。
参考链接: https://blog.csdn.net/wdasdaw/article/details/48896321 https://xlucas.blog.csdn.net/article/details/55301577
|