IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> storm源码分析研究(三) -> 正文阅读

[大数据]storm源码分析研究(三)

2021SC@SDUSC

spout源码分析(二)

2021SC@SDUSC

在计算任务时需要的数据是由Spout提供的,所以Spout可以说是Storm中的消息源,它一般是从外部数据源(日志文件、数据库、消息队列等等)不间断地读取数据,然后发送给tuple元组的。

输出是通过Spout输出收集器发送的,即SpoutOutputCollector,而SpoutOutputCollector的接口是ISpoutOutputCollector。

编程人员一般可通过OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送的流,然后使用SpoutOutputCollector的emit方法将数据发送

ISpoutOutputCollector.java

package org.apache.storm.spout;

import java.util.List;
import org.apache.storm.task.IErrorReporter;


public interface ISpoutOutputCollector extends IErrorReporter {

    List<Integer> emit(String streamId, List<Object> tuple, Object messageId);

    void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);

    long getPendingCount();

    void flush();
}

emit方法:
用来向外发送数据,它的返回值是该消息所有发送目标的Taskld集合。
输入参数streamld表示消息将被输出到的流;
tuple为要输出的消息列表;
messageld表示输出消息的标记信息。如果messageld被设置为null, Storm将不会追踪该消息,否则它会被用来追踪所发出消息的处理情况。

emitDirect方法:
参数与emit方法相似,主要区别在于使用emitDirect时, 只有由参数taskld所指定的Task才可以接收这条消息。
这个方法要求与参数streamld相对应的流必须被定义为直接流,同时接收端的Task也必须以直接分组 ( Direct Grouping ) 的方式来接收消息,否则会有异常抛出。另外,需要注意的是,如果没有下游节点接收该消息,那么该消息其实也就没有被真正发送。

SpoutOutputCollector.java

在SpoutOutputCollector类中,实现了消息发射的方法,并且还提供了多个重载方法方便用户使用。

package org.apache.storm.spout;

import java.util.List;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.utils.Utils;

public class SpoutOutputCollector implements ISpoutOutputCollector {
    ISpoutOutputCollector delegate;

    public SpoutOutputCollector(ISpoutOutputCollector delegate) {
        this.delegate = delegate;
    }

    @Override
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        return delegate.emit(streamId, tuple, messageId);
    }
    public List<Integer> emit(List<Object> tuple, Object messageId) {
        return delegate.emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }
        public List<Integer> emit(String streamId, List<Object> tuple) {
        return emit(streamId, tuple, null);
    }

    @Override
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
        delegate.emitDirect(taskId, streamId, tuple, messageId);
    }
    public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }
    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
        emitDirect(taskId, streamId, tuple, null);
    }
  public void emitDirect(int taskId, List<Object> tuple) {
        emitDirect(taskId, tuple, null);
    }

    @Override
    public void flush() {
        delegate.flush();
    }

    @Override
    public void reportError(Throwable error) {
        delegate.reportError(error);
    }

    @Override
    public long getPendingCount() {
        return delegate.getPendingCount();
    }
}

List emit(String streamId, List tuple, Object messageId):
指定一个streamid和message发射tuple消息并返回起发送消息的task的序号。当tuple消息完全处理了,就会回调ack方法,否则会回调fail方法。

List emit(List tuple, Object messageId):
emit的重载方法,这没有指定streamid,故采用默认的streamid

List emit(String streamId, List tuple):
emit的重载方法,这没有指定streamid,故采用默认的streamid,因为没有messageid,故ack方法和fail方法不会被调用

reportError(Throwable error):
处理异常

代理模式:
SpoutOutputCollector实际上是一个代理类,持有ISpoutOutputCollector类型的对象delegate,具体的执行都是通过_delegate调用相应的方法来实现的。

参考链接:https://blog.csdn.net/qq_29201447/article/details/81667876

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-15 11:51:47  更:2021-10-15 11:53:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 7:21:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码