IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> storm源码分析研究(二) -> 正文阅读

[大数据]storm源码分析研究(二)

2021SC@SDUSC

spout源码分析(一)

2021SC@SDUSC


2021SC@SDUSC

核心概念介绍

1、结构:
Spout是storm的核心组件之一,最源头的接口是IComponent。

2、发送:
当Spout从外部获取数据后,向Topology中发出的Tuple可以是可靠的,也可以是不可靠的。Spout可以发射多个流,可以定义多个流(即定义多个stream),也可以使用方法来发射指定的流。

3、重要结构:
Spout的重要方法是nextTuple,nextTuple方法发射一个新的元组到Topology,如果没有新的元组发射,则直接返回。注意任务Spout的nextTuple方法都不要实现成阻塞的,因为storm是在相同的线程中调用spout的方法。
Spout的另外两个重要方法是ack和fail方法,当spout发射的元组被拓扑成功处理时,调用ack方法,当处理失败时,调用fail方法,此外,ack和fail方法仅被可靠的spout调用。

ISpout.java

ISpout接口:
storm实现主要依靠以下几个函数,全局代码如下:

package org.apache.storm.spout;

import java.io.Serializable;
import java.util.Map;
import org.apache.storm.task.TopologyContext;
public interface ISpout extends Serializable {
 
    void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);

    void close();

    void activate();

    void deactivate();

    void nextTuple();

    void ack(Object msgId);

    void fail(Object msgId);
}

Strom支持所有的基本类型,当它使用元组作为数据模型,元组中的每个字段都可以是任何类型的对象。而如果要使用自己定义的类型,需要为自己定义的类型实现并且注册一个serializer。每个节点还必须要为输出的元组定义字段名称。
部分函数解释:
open():
当该组件的任务在集群上初始化时调用。它为spout提供了执行spout的环境。
close():
当ISpout即将关闭时调用。不能保证会调用close,因为supervisor会杀死集群上的的worker进程。
activate():
当spout从非激活模式被激活时调用。
deactivate():
当spout失效时调用。

ShellSpout.java

重载函数如下:

    public void open(Map<String, Object> topoConf, TopologyContext context,
                     SpoutOutputCollector collector) {
        this.collector = collector;
        this.context = context;

        if (topoConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
            workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
        } else {
            workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
        }

        process = new ShellProcess(command);
        if (!env.isEmpty()) {
            process.setEnv(env);
        }

        Number subpid = process.launch(topoConf, context, changeDirectory);
        LOG.info("Launched subprocess with pid " + subpid);

        logHandler = ShellUtils.getLogHandler(topoConf);
        logHandler.setUpContext(ShellSpout.class, process, this.context);

        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    @Override
    public void close() {
        heartBeatExecutorService.shutdownNow();
        process.destroy();
        running = false;
    }

    @Override
    public void nextTuple() {
        this.sendSyncCommand("next", "");
    }

    @Override
    public void ack(Object msgId) {
        this.sendSyncCommand("ack", msgId);
    }

    @Override
    public void fail(Object msgId) {
        this.sendSyncCommand("fail", msgId);
    }

 @Override
    public void activate() {
        LOG.info("Start checking heartbeat...");
        // prevent timer to check heartbeat based on last thing before activate
        setHeartbeat();
        if (heartBeatExecutorService.isShutdown()) {
            //In case deactivate was called before
            heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
        }
        heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
        this.sendSyncCommand("activate", "");
    }

    @Override
    public void deactivate() {
        this.sendSyncCommand("deactivate", "");
        heartBeatExecutorService.shutdownNow();
    }



void open(Map<String, Object> topoConf, TopologyContext context,SpoutOutputCollector collector)
参数:
topoconf :
Storm关于这个Spout的配置
context :
用来获取该Spout任务的信息,包括任务id,组件id,输入输出信息等等
collector :
用来从这个Spout里发送元组,元组可以在任何时间里发送,包括open和close函数里。collector是线程安全的,应该被作为一个实例对象保存到Spout对象里。

void ack(Object msgId):
以msgId消息告诉Storm这个Spout已经成功输出了该元组
void activate():
激活Spout,Spout从deactivate模式转化为activate模式,Spout开始调用nextTuple输出数据。
void close():
关闭Spout
void deactivate():
解除激活Spout,Spout从activate模式转化为deactivate模式,Spout停止调用nextTuple输出数据
void fail(Object msgId):
以msgId消息告诉Storm这个Spout输出该元组失败,主要用于将该元组重新放回消息队列,以在一段时间后重发该元组
void nextTuple():
调用该函数请求Storm发送元组到Output Collector,这个函数不应该是阻塞的,当没有元组发送时,一般调用sleep,以充分利用CPU。

参考链接:
https://blog.csdn.net/wdasdaw/article/details/48896321
https://xlucas.blog.csdn.net/article/details/55301577

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-08 11:51:37  更:2021-10-08 11:52:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 8:57:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码