IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> storm源码分析研究(五) -> 正文阅读

[大数据]storm源码分析研究(五)

2021SC@SDUSC

spout源码分析(四)

2021SC@SDUSC

spout:
spout
ack机制
为保证无数据丢失,Storm/JStorm使用了非常漂亮的可靠性处理机制,当定义Topology时指定Acker,JStorm除了Topology本身任务外,还会启动一组称为Acker的特殊任务,负责跟踪Topolgogy DAG中的每个消息。每当发现一个DAG被成功处理完成,Acker就向创建根消息的Spout任务发送一个Ack信号。

Acker按照Tuple Tree的方式跟踪消息。当Spout发送一个消息的时候,它就通知对应的Acker一个新的根消息产生了,这时Acker就会创建一个新的Tuple Tree。当Acker发现这棵树被完全处理之后,他就会通知对应的Spout任务。

在这里插入图片描述

RandomSentenceSpout.java

表示数据源,这里用从数组随机获取一个元素作为模拟数据源获取,日常开发通常是从MQ中获取相应数据进行数据流驱动。

public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        rand = new Random();
    }

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[]{
            sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
            sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")
        };
        final String sentence = sentences[rand.nextInt(sentences.length)];

        LOG.debug("Emitting tuple: {}", sentence);

        collector.emit(new Values(sentence));
    }

    protected String sentence(String input) {
        return input;
    }

    @Override
    public void ack(Object id) {
    }

    @Override
    public void fail(Object id) {
    }
@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

RandomSentenceSpout实现了IRichSpout接口

而Spout需要实现的接口可以是:
IRichSpout:最基本的Spout,继承自ISpout, IComponent,沒有任何特殊方法
IControlSpout:继承自IComponent,包括open,close,activate,deactivate,nextTuple,ack(Object msgId),fail等方法

public void open():
Spout初始化的时候调用

系统框架会不断调用public void nextTuple() ,其中,定义String[] sentences用来模拟数据源,String sentence是随机取出字符串, collector.emit(new Values(sentence))是将得到的字符串输出到下一个组件,需要注意的是,这里Values中值填充顺序要和下面declareOutputFields中字段声明顺序一致。

对spout来说,需要在emit的时候要指定msgId,然后需要缓存数据,在ack时删除,在fail的时候重新发送进行重试。

SpoutExecutor.java

@Override
    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
        String streamId = tuple.getSourceStreamId();
        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            spoutOutputCollector.flush();
        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            pending.rotate();
        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
            Long id = (Long) tuple.getValue(0);
            TupleInfo pendingForId = pending.get(id);
            if (pendingForId != null) {
                pending.put(id, pendingForId);
            }
        } else {
            Long id = (Long) tuple.getValue(0);
            Long timeDeltaMs = (Long) tuple.getValue(1);
            TupleInfo tupleInfo = pending.remove(id);
            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                if (taskId != tupleInfo.getTaskId()) {
                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                }
                Long timeDelta = null;
                if (hasAckers) {
                    long startTimeMs = tupleInfo.getTimestamp();
                    if (startTimeMs != 0) {
                        timeDelta = timeDeltaMs;
                    }
                }
                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
                    ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
                    failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                }
            }
        }
    }

    public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
        try {
            ISpout spout = (ISpout) taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug()) {
                LOG.info("SPOUT Acking message {} {}", tupleInfo.getRootId(), tupleInfo.getMessageId());
            }
            spout.ack(tupleInfo.getMessageId());
            if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
                new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            }
            if (hasAckers && timeDelta != null) {
                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
                taskData.getTaskMetrics().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
        try {
            ISpout spout = (ISpout) taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug()) {
                LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getRootId(), tupleInfo, reason);
            }
            spout.fail(tupleInfo.getMessageId());
            new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            if (timeDelta != null) {
                executor.getStats().spoutFailedTuple(tupleInfo.getStream());
                taskData.getTaskMetrics().spoutFailedTuple(tupleInfo.getStream());
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }


SpoutExecutor在tupleActionFn内,
如果接收到ACKER_ACK_STREAM_ID,则进行ackSpoutMsg操作;
如果接收到ACKER_FAIL_STREAM_ID,则进行failSpoutMsg操作。

SpoutExecutor的ackSpoutMsg及failSpoutMsg里头分别调用了具体spout的ack及fail方法,将ack的结果通知到原始的spout

参考链接:
https://www.jianshu.com/p/3964e0d0705b
https://www.jianshu.com/p/d26f24d7eb1c

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-29 13:06:54  更:2021-10-29 13:07:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 1:52:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码