Storm作为分布式实时计算框架,已广泛使用多年,形成成熟的大数据分析和实时计算平台体系。本文简要介绍Storm的架构和一些概念如Topology、Spout和Bolt,以作了解。
1、Storm基本介绍
1.1 Storm特性
Storm是开源的分布式实时计算系统,在实时分析、在线机器学习、连续计算、分布式RPC、ETL等场景中广泛使用。Storm集成了多种消息队列技术和数据库技术,其中的Topology消耗数据流,以任意复杂的方式处理这些流。Storm具有以下特性:
- 用例广泛:Storm可以用户处理消息和更新数据库,完成数据流的持续查询任务,然后将结果流送到客户端。
- 可伸缩性:Storm每秒处理大量的信息,当需要拓展拓扑时,只需要向拓扑中添加主机,然后增加拓扑的并行设置。
- 保证数据没有丢失:通过ACK机制,Storm能保证每条消息都会被处理
- 鲁棒性:Storm集群非常稳健,能够有效地运行
- 容错性:当计算中有错误发生时,Storm只需要重新分配任务再执行即可。
- 高性能低延迟:原生的流处理系统,可以做到毫秒级处理
1.2 Storm基本架构
Storm集群中存在两种类型的节点:运行Nimbus服务的主节点和运行Supervisor服务的工作节点。Storm集群由一个主节点和多个工作节点组成,主节点上运行一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测;每个工作节点上则运行一个名为“Supervisor”的守护进程,用于监听工作、开始并终止工作进程。Nimbus和Supervisor之间的协调工作是通过zookeeper来完成的。Nimbus和Supervisor是无状态的,其元数据存储在Zookeeper中,使得系统具有很高的容错性。
Worker由Supervisor负责启动,一个worker可以有多个Executor执行线程,每个Executor又可以包含一个或多个Task,其中Task为Storm中最小处理单元。每个Executor都会启动一个消息循环线程,用于接收、处理和发送消息,当Executor收到属于其下某一Task的消息后,就会调用该Task对应的处理逻辑对消息进行处理。
1.2.1 Nimbus进程
Nimbus进程运行在Master Node,是Storm集群工作的全局指挥官,负责在集群中分发代码、对节点分配任务并监控异常。主要功能如下:
- 通过Thrift接口,监听并接收客户端提交的Topology;
- 根据集群Workers的资源情况,将客户端提交的Topology进行任务分配,分配结果写入Zookeeper;
- 通过Thrift接口,监听Supervisor下载Topology代码的请求,并提供下载;
- 通过Thrift接口,监听UI对统计信息的读取,从Zookeeper上读取统计信息,返回给UI;
- 若进程退出后,立即在本机重启,则不影响集群运行。
1.2.2 Supervisor进程
Supervisor进程运行在Worker Node , 是Storm集群的资源管理者,负责监听其主机上已经分配的主机的作业,启停已经分配的Worker进程。主要功能如下:
- 定时从Zookeeper检查是否有新Topology代码未下载到本地,并定时删除旧Topology代码;
- 根据Nimbus的任务分配计划,在本机按需启动1个或多个Worker进程,并监控所有的Worker进程的情况;
- 若进程退出,立即在本机重启,则不影响集群运行。
1.2.3 Zookeeper的作用
Nimbus和Supervisor进程都被设计为快速失败(遇到任何意外情况时进程自毁)和无状态(所有状态保存在Zookeeper或磁盘上)。这样设计的好处就是如果它们的进程被意外销毁,那么在重新启动后,就只需要从Zookeeper上获取之前的状态数据即可,并不会造成任何数据丢失。
1.2.4 Worker进程
Worker进程是Storm集群的任务构造者,构造Spoult或Bolt的Task实例运行其中的处理逻辑,启动Executor线程。主要功能如下:
- 根据Zookeeper上分配的Task,在本进程中启动1个或多个Executor线程,将构造好的Task实例交给Executor去运行;
- 向Zookeeper写入心跳;
- 维持传输队列,发送Tuple到其他的Worker;
- 若进程退出,立即在本机重启,则不影响集群运行。
1.2.5 Executor线程
Executor线程是Storm集群的任务执行者,循环执行Task代码。主要功能如下:
- 执行1个或多个Task;
- 执行Acker机制,负责发送Task处理状态给对应Spout所在的worker;
- Worker中每一个Spout/Bolt的线程称为一个任务Task,每个Spout或Bolt在集群中执行许多任务,每个任务对应一个线程的执行。
1.3 Storm中元数据
Storm使用Zookeeper来存储Nimbus、Supervisor、Worker以及Executor之间的共享的元数据,这些模块在重启之后,可以通过对应的元数据进行恢复,因此Storm的模块是无状态的。Storm在Zookeeper中存储数据的目录结构如下所示,这是一个根目录为/storm的树形结构,树中的每一个节点代表Zookeeper中的一个节点,每个叶子节点是Storm真正存储数据的地方。
- /storm/workerbeats//node-port:存储由node和port指定的Worker的运行状态和一些统计信息,主要包括storm-id、当前Worker上所有Executor的统计信息、当前Worker的启动时间以及最后一次更新这些信息的时间。
- /storm/storms/:存储Topology本身的信息,包括名字、启动时间、运行状态、要使用的Worker数目以及每个组件的并行度设置。这些内容在运行过程中是不变的
- /storm/assignments/:存储了Nimbus为每个Topology分配的任务信息,包括该Topology在Nimbus机器本地的存储目录、被分配到的Supervisor机器到主机名的映射关系、每个Executor运行在哪个Worker上以及每个Executor的启动时间。
- /storm/supervisors/:存储Supervisor机器本身的运行统计信息,包括最近一次更新时间、主机名、supervisor-id、已经使用的端口列表、所有端口列表以及运行时间
- /storm/errors/:存储运行过程中每个组件上发生的错误信息
2、Storm基本概念
Storm中涉及的概念包括Tuple、Stream、Stream Grouping 、Spout、Bolt和Topology。
1)元组Tuple
元组是消息传递的基本单元,是一个命名的值列表,元组中的字段可以是任何类型的对象。Storm使用元组作为其数据模型,而元组支持所有的基本类型、字符串和字符数组作为字段值,元组其实是一个value list。
2)流Stream
流是Storm的核心抽象,是一个无界的元组序列。源源不断传递的元组就组成了流,在分布式环境中进行创建和处理。
3)Stream Grouping
Stream Grouping定义了消息分发策略,定义了Bolt节点以何种方式接收数据。消息分发的策略包括:随机分配、根据字段值分配、广播全部分组、发给同一个Task以及无分组等。
4)Spout
Spout是拓扑中stream的来源,是拓扑中产生源数据流的组件。通常Spout会从外部数据源中读取数据,然后转化为拓扑内部的源数据。
5)Bolt
在拓扑中,所有的处理都是在Bolt中完成的,Bolt是stream的处理节点,可以完成过滤、业务处理、连接运算、连接与访问数据库等任何操作。
6)拓扑Topology
拓扑是Storm中运行的一个实时的应用程序,因为各个组件中间的消息流动而形成逻辑上的拓扑结构。Storm的拓扑类似于MapReduce的作业,主要区别是MapReduce作业最终会完成,而拓扑永远在运行直到被杀死。一个拓扑就是Spout和Bolt的连接流分组。
7)Storm和Hadoop中基本概念对比
类型 | Hadoop | Storm |
---|
系统角色 | JobTracker | Nimbus | 系统角色 | TaskTracker | Supervisor | 系统角色 | Child | Worker | 应用名称 | Job | Topology | 组件接口 | Map/Reduce | Spout/Bolt |
2.1 拓扑详解
使用Storm做实时计算,首先需要创建拓扑,一个拓扑是一个有向图的计算。在一个拓扑中每个节点包含处理逻辑,节点之间的连接显示数据应该如何在节点之间传递。如下图所示,其中包含Spout和Bolt:
- Spout:Storm中的消息源,为Topology生产消息,一般是从外部数据源不间断的读取数据并发送给Topology消息(Tuple)
- Bolt:Storm中的消息处理者,为Topology进行消息处理,Bolt可以执行过滤、聚合、查询数据库等操作,最终会被Topology提交到Storm集群中执行
2.2 Spout详解
在Storm中,Spout有3中获取数据的模式:直接连接、消息队列和DRPC。
1)直接连接
在直接连接的架构中,Spout直接与数据源相连接,这种架构实现起来很容易,特别当消息源是一个已知的设备或设备组。另外,也可以使用多个Spout从多个消息源中获取消息,这样可以均匀地分发收集器访问数据源,比如从Web服务器收集日志文件。
2)消息队列
消息发射器把消息发送到消息队列系统,Spout从消息队列系统中获取消息。使用消息队列的优势是可以作为Spout和数据源之间的中间件,也就是Spout不需要知道关于消息发射器的任何东西,添加和删除发射器的过程比直接连接更容易。当然也会带来问题,比如消息队列系统会成为故障点、在处理流程中增加了一层。
3)分布式RPC
DRPCSpout是一个Spout实现,从DRPC服务器接收数据流并处理。DRPC工作流如下:
- 客户端向DRPC服务器发送一个RPC请求
- DRPC服务器接收到请求并发送请求到Topology
- DRPC服务器从Topology接收结果
- DRPC服务器把结果返回给等待的客户端
2.3 Bolt详解
Bolt是一个组件,以元组作为输入,生成元组作为输出。在客户端主机中创建Bolt,序列化到拓扑,并提交到集群的主控节点。集群启动Worker,反序列化Bolt,调用并处理元组。
2.4 Stream Groupings分组策略
Spouts和Bolts在storm集群上执行任务时,是由多个Tasks并行执行,如上图所示,每一个圆圈代表一个 Task。当一个Tuple需要从Bolt A发送给Bolt B执行的时候,由Stream groupings分组策略来决定应该发送给Bolt B的哪一个Task执行。Storm中一共有如下8个内置的 Stream Grouping。
1)Shuffle grouping随机分组
Tuples随机的分发到每个Bolt的每个Task上,每个Bolt获取到等量的Tuples。
2)Fields grouping按字段分组
Streams通过grouping指定的字段(field)来分组。假设通过user-id字段进行分区,那么具有相同user-id的Tuples就会发送到同一个Task。
3)Partial Key grouping
Streams通过grouping中指定的字段(field)来分组,与Fields Grouping相似。但是对于两个下游的Bolt来说是负载均衡的,可以在输入数据不平均的情况下提供更好的优化。
4)All grouping广播分组
Streams会被所有的Bolt的Tasks进行复制。由于存在数据重复处理,所以需要谨慎使用。
5)Global grouping全局分组
整个Streams会进入Bolt的其中一个Task,通常会进入id最小的Task。
6)None grouping不分组
不分组意思是说stream不关心到底谁会收到它tuple,当前None grouping和Shuffle grouping等价,都是进行随机分发。
7)Direct grouping直接分组
Direct grouping只能被用于direct streams,使用这种方式需要由Tuple的生产者直接指定由哪个Task进行处理。只有被声明为Direct Stream的消息流可以声明这种分组方法,而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id(OutputCollector.emit方法也会返回task的id)
8)Local or shuffle grouping
如果目标Bolt有Tasks和当前Bolt的Tasks处在同一个Worker进程中,那么则优先将Tuple Shuffled到处于同一个进程的目标Bolt的Tasks上,这样可以最大限度地减少网络传输。否则,就和普通的Shuffle Grouping行为一致。
2.5 并行度
在Storm中运行Topology任务主要依赖下面三个实体:
- 工作进程(Worker processes)
- 执行线程(Executor)
- 任务(Task)
- 1个Worker进程执行的是1个Topology的子集,不会出现1个Worker为多个Topology服务的情况。1个运行中的Topology由集群中多台物理机上的多个Worker进程组成的。1个Worker进程可以启动多个Executor线程来执行1个Topology的组件(Spout或Bolt)。
- Executor是被Worker进程启动的单独线程。每个Executor会运行一个或者多个Task。
- Task是具体执行数据处理的单元。每个组件的Task数在Topology启动后固定不变的,但是可以修改执行Task的Executor线程数,来动态调整为该拓扑分配的资源。默认情况下,每个Executor会对应一个task。
- 配置拓扑的并行度:配置Executor的数量、Worker进程的数量和拓扑Task的数量
- Worker进程的数量:配置项TOPOLOGY_WORKERS或setNumWorkers指定拓扑可以创建多少个Worker进程
- Executor线程的数量:通过setSpout或SetBolt指定Executor的数量
- Task的数量:配置项TOPOLOGY_TASKS或 setNumTasks指定每个组件创建多少Tasks
2.6 拓扑示例
以下定义一个Topology,由1个Spout和2个Bolt组成
#配置Topology
conf.setNumWorkers(2); //使用两个Worker进程
topologyBuilder.setSpout(“read-spout”,new ReadSpout(),2); //设置并行度为2
topologyBuilder.setBolt(“norm-bolt”,new NormBolt(),2).setNumTasks(4).shuffleGrouping(“read-spout”); //使用4个Tasks
topologyBuilder.setBolt(“write-bolt”,new YellowBolt(),6).shuffleGrouping(“norm-blot”);
- Topology使用2个worker进程
- Spout是并行度为2的ReadSpout实例(生成2个Executor线程和2个Tasks)
- 第一个Bolt是id为“norm-blot”,并行度为2、Tasks数为4、使用随机分组的方式接收blue-spout发送的元组的NormBolt实例(生成2个Executor和4个Tasks)
- 第一个Bolt是id为“write-blot”,并行度为6、Tasks数为4、使用随机分组的方式接收norm-blot发送的元组的WriteBolt实例(生成6个Executor和6个Tasks)
因此该Topology一共有两个Worker进程,2+2+6=10个Executor线程,2+4+6=12个Tasks,每个Worker进程会分配到5个Executor和6个Tasks。
3、总结
上文简要介绍了Storm的基本架构和一些基本概念,与其它大数据组件比较,Storm有它的优势,技术架构上也更为成熟。
- MapReduce:批处理,适合海量离线处理场景
- Spark Streaming:并非真正意义上的流处理,而是微批处理,对数据流进行极小粒度的拆分,近似达到流处理的效果
- Flink是原生的流式处理系统,支持流处理和批处理。Flink中的组件是有状态的,通过检查点机制对数据流和算子状态进行保存
- Storm是原生的流式处理系统,支持毫秒级处理。Storm组件是无状态的,通过ACK机制在失败或超时时候进行重传
参考资料:
- 《从零开始学Storm》,赵必夏编著
- 《Storm源码分析》,李明编著
- http://storm.apache.org/releases/current/Tutorial.html
- https://toutiao.io/posts/r296zu/preview
- https://blog.csdn.net/u013851082/article/details/62429476
- https://blog.csdn.net/GAOXINXINGgaoxinxing/article/details/118367242
|