2021SC@SDUSC
spout源码分析(四)
2021SC@SDUSC
spout: ack机制 为保证无数据丢失,Storm/JStorm使用了非常漂亮的可靠性处理机制,当定义Topology时指定Acker,JStorm除了Topology本身任务外,还会启动一组称为Acker的特殊任务,负责跟踪Topolgogy DAG中的每个消息。每当发现一个DAG被成功处理完成,Acker就向创建根消息的Spout任务发送一个Ack信号。
Acker按照Tuple Tree的方式跟踪消息。当Spout发送一个消息的时候,它就通知对应的Acker一个新的根消息产生了,这时Acker就会创建一个新的Tuple Tree。当Acker发现这棵树被完全处理之后,他就会通知对应的Spout任务。
RandomSentenceSpout.java
表示数据源,这里用从数组随机获取一个元素作为模拟数据源获取,日常开发通常是从MQ中获取相应数据进行数据流驱动。
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
}
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{
sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")
};
final String sentence = sentences[rand.nextInt(sentences.length)];
LOG.debug("Emitting tuple: {}", sentence);
collector.emit(new Values(sentence));
}
protected String sentence(String input) {
return input;
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
RandomSentenceSpout实现了IRichSpout接口
而Spout需要实现的接口可以是: IRichSpout:最基本的Spout,继承自ISpout, IComponent,沒有任何特殊方法 IControlSpout:继承自IComponent,包括open,close,activate,deactivate,nextTuple,ack(Object msgId),fail等方法
public void open(): Spout初始化的时候调用
系统框架会不断调用public void nextTuple() ,其中,定义String[] sentences用来模拟数据源,String sentence是随机取出字符串, collector.emit(new Values(sentence))是将得到的字符串输出到下一个组件,需要注意的是,这里Values中值填充顺序要和下面declareOutputFields中字段声明顺序一致。
对spout来说,需要在emit的时候要指定msgId,然后需要缓存数据,在ack时删除,在fail的时候重新发送进行重试。
SpoutExecutor.java
@Override
public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
String streamId = tuple.getSourceStreamId();
if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
spoutOutputCollector.flush();
} else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
pending.rotate();
} else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
} else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
Long id = (Long) tuple.getValue(0);
TupleInfo pendingForId = pending.get(id);
if (pendingForId != null) {
pending.put(id, pendingForId);
}
} else {
Long id = (Long) tuple.getValue(0);
Long timeDeltaMs = (Long) tuple.getValue(1);
TupleInfo tupleInfo = pending.remove(id);
if (tupleInfo != null && tupleInfo.getMessageId() != null) {
if (taskId != tupleInfo.getTaskId()) {
throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
}
Long timeDelta = null;
if (hasAckers) {
long startTimeMs = tupleInfo.getTimestamp();
if (startTimeMs != 0) {
timeDelta = timeDeltaMs;
}
}
if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
} else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
}
}
}
}
public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {
LOG.info("SPOUT Acking message {} {}", tupleInfo.getRootId(), tupleInfo.getMessageId());
}
spout.ack(tupleInfo.getMessageId());
if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
}
if (hasAckers && timeDelta != null) {
executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
taskData.getTaskMetrics().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
try {
ISpout spout = (ISpout) taskData.getTaskObject();
int taskId = taskData.getTaskId();
if (executor.getIsDebug()) {
LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getRootId(), tupleInfo, reason);
}
spout.fail(tupleInfo.getMessageId());
new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
if (timeDelta != null) {
executor.getStats().spoutFailedTuple(tupleInfo.getStream());
taskData.getTaskMetrics().spoutFailedTuple(tupleInfo.getStream());
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
SpoutExecutor在tupleActionFn内, 如果接收到ACKER_ACK_STREAM_ID,则进行ackSpoutMsg操作; 如果接收到ACKER_FAIL_STREAM_ID,则进行failSpoutMsg操作。
SpoutExecutor的ackSpoutMsg及failSpoutMsg里头分别调用了具体spout的ack及fail方法,将ack的结果通知到原始的spout
参考链接: https://www.jianshu.com/p/3964e0d0705b https://www.jianshu.com/p/d26f24d7eb1c
|