2021SC@SDUSC
spout源码分析(二)
2021SC@SDUSC
在计算任务时需要的数据是由Spout提供的,所以Spout可以说是Storm中的消息源,它一般是从外部数据源(日志文件、数据库、消息队列等等)不间断地读取数据,然后发送给tuple元组的。
输出是通过Spout输出收集器发送的,即SpoutOutputCollector,而SpoutOutputCollector的接口是ISpoutOutputCollector。
编程人员一般可通过OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送的流,然后使用SpoutOutputCollector的emit方法将数据发送
ISpoutOutputCollector.java
package org.apache.storm.spout;
import java.util.List;
import org.apache.storm.task.IErrorReporter;
public interface ISpoutOutputCollector extends IErrorReporter {
List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
long getPendingCount();
void flush();
}
emit方法: 用来向外发送数据,它的返回值是该消息所有发送目标的Taskld集合。 输入参数streamld表示消息将被输出到的流; tuple为要输出的消息列表; messageld表示输出消息的标记信息。如果messageld被设置为null, Storm将不会追踪该消息,否则它会被用来追踪所发出消息的处理情况。
emitDirect方法: 参数与emit方法相似,主要区别在于使用emitDirect时, 只有由参数taskld所指定的Task才可以接收这条消息。 这个方法要求与参数streamld相对应的流必须被定义为直接流,同时接收端的Task也必须以直接分组 ( Direct Grouping ) 的方式来接收消息,否则会有异常抛出。另外,需要注意的是,如果没有下游节点接收该消息,那么该消息其实也就没有被真正发送。
SpoutOutputCollector.java
在SpoutOutputCollector类中,实现了消息发射的方法,并且还提供了多个重载方法方便用户使用。
package org.apache.storm.spout;
import java.util.List;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.utils.Utils;
public class SpoutOutputCollector implements ISpoutOutputCollector {
ISpoutOutputCollector delegate;
public SpoutOutputCollector(ISpoutOutputCollector delegate) {
this.delegate = delegate;
}
@Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
return delegate.emit(streamId, tuple, messageId);
}
public List<Integer> emit(List<Object> tuple, Object messageId) {
return delegate.emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
}
public List<Integer> emit(String streamId, List<Object> tuple) {
return emit(streamId, tuple, null);
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
delegate.emitDirect(taskId, streamId, tuple, messageId);
}
public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
}
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
emitDirect(taskId, streamId, tuple, null);
}
public void emitDirect(int taskId, List<Object> tuple) {
emitDirect(taskId, tuple, null);
}
@Override
public void flush() {
delegate.flush();
}
@Override
public void reportError(Throwable error) {
delegate.reportError(error);
}
@Override
public long getPendingCount() {
return delegate.getPendingCount();
}
}
List emit(String streamId, List tuple, Object messageId): 指定一个streamid和message发射tuple消息并返回起发送消息的task的序号。当tuple消息完全处理了,就会回调ack方法,否则会回调fail方法。
List emit(List tuple, Object messageId): emit的重载方法,这没有指定streamid,故采用默认的streamid
List emit(String streamId, List tuple): emit的重载方法,这没有指定streamid,故采用默认的streamid,因为没有messageid,故ack方法和fail方法不会被调用
reportError(Throwable error): 处理异常
代理模式: SpoutOutputCollector实际上是一个代理类,持有ISpoutOutputCollector类型的对象delegate,具体的执行都是通过_delegate调用相应的方法来实现的。
参考链接:https://blog.csdn.net/qq_29201447/article/details/81667876
|