1、什么是 Storm?
Storm是一个分布式实时 大数据处理系统,它是一个流数据框架 ,具有最高的摄取率,虽然Storm是无状态 的,它通过ZooKeeper管理分布式环境和集群状态 ,保证每个消息 将通过拓扑 至少处理一次 关键字:实时 、流数据 storm类似图片的电梯,一直往上传送数据 ,数据 一上去就被传送、处理
2、Storm核心概念
tuple | 元组,数据结构,有序的元素列表 ,通常是任意类型的数据,outputCollector.emit(new Values(s1));,这里的new Value(s1)就是一个tuple |
---|
Stream | 流,一序列的tuple,拓扑图中的线 | Spouts | 流的源 ,storm从原始数据源接收输入数据,可以编写以获取数据源读取数据 ,实现IRichSpout接口 ,继承BaseRichSpout | Bolts | Bolts是逻辑处理单元,Spout将数据传递到Bolts的过程以及Bolts之间 ,并产生新的输出流,一些常见的接口:IRichBolt、IBasicBolt等 | 拓扑 | Spouts和Bolts连接在一起,形成拓扑结构,简单来说拓扑是有向图 ,其中顶点是计算 ,边缘时数据流 Spouts将数据发射到一个或者多个Bolts,bolt表示拓扑中最小的逻辑节点,Bolts的输入可以发射到另一个Bolts作为输入 | 进程 | 拓扑在多个工作节点上以分布式方式运行,Storm将所有的工作节点上的任务均匀分布 | 任务 | spout 和bolt 的执行的过程就是任务 | 流分组 | 控制tuple如何进行路由,数据流从Spouts流到Bolts,或从一个Bolts流到另一个Bolts ,内置4个分组策略 |
2.1、节点类型
Nimbus:master node,主要工作运行拓扑,分析元组tuple,收集执行的task,将task分给supervisor supervisor:工作节点,有多个处理进程,代理任务给所有的wokr进程,在Nimbus和supervisor之间使用内部的消息系统通信
2.2、组件
Nimbus :master node ,在work node间分发数据,指派task给work node,监控故障 supervisor :接收nimbus的指令,有多个work进程,监视work进程,完成task work process :执行相关的task,本身不执行,创建executor(执行线程),可以有多个执行线程 executor :执行线程 task :处理数据 zookeeper :维持状态
2.2、流分组
1)随机分组 (Shuffle Grouping )
随机分组,随机派发stream里面的tuple,保证每个bolt task 接收到tuple数目一样大 ,平均分配 ,轮询 一个spout,有两个Bolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new MySpout(),1);
builder.setBolt("bolt",new MyBolt(),2).shuffleGrouping("spout");
输出的结果:
default Thread-9-bolt--id=83 line :1 session_id:123
default Thread-11-bolt--id=85 line :1 session_id:dfghj
default Thread-9-bolt--id=83 line :2 session_id:34567
default Thread-11-bolt--id=85 line :2 session_id:2345
default Thread-9-bolt--id=83 line :3 session_id:234fv
default Thread-9-bolt--id=83 line :4 session_id:456yg
default Thread-9-bolt--id=83 line :5 session_id:23456ygh
default Thread-11-bolt--id=85 line :3 session_id:werfg
default Thread-11-bolt--id=85 line :4 session_id:345
default Thread-11-bolt--id=85 line :5 session_id:23456ygh
default Thread-11-bolt--id=85 line :6 session_id:1
default Thread-9-bolt--id=83 line :6 session_id:456yg
default Thread-9-bolt--id=83 line :7 session_id:2
default Thread-11-bolt--id=85 line :7 session_id:2
其中Thread-9-bolt的id为83,另一个是85,两个随机分 配,但是保证了平均分配
2)字段分组 (Fields Grouping ) 一般用在计数
按照字段 分组,比如,按照"session_id"这个字段来分组,那么具有相同的值的"session_id"的tuple被分配到相同的Bolt里的一个task ,而不同的"session_id",不同的"session_id"的可能被分配到不同的task.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new MySpout(),1);
builder.setBolt("bolt",new MyBolt(),2).fieldsGrouping("spout",new Fi
输出结果:
11 2 3 default Thread-11-bolt--id=85 line :6 session_id:2
1 2 3 default Thread-11-bolt--id=85 line :7 session_id:2
1 2 3 default Thread-11-bolt--id=85 line :8 session_id:2
1 2 3 default Thread-11-bolt--id=85 line :9 session_id:2
1 2 3 default Thread-11-bolt--id=85 line :10 session_id:2
11 2 33 default Thread-11-bolt--id=85 line :11 session_id:2
数据格式中的第二个 是session_id,session_id值相同的交给同一个 Thread-11-bolt–id
3)所有分组 (All Grouping )常用来发送信号
广播发送 ,对于每一个tuple,所有的bolt都会收到
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new MySpout(),1);
builder.setBolt("bolt",new MyBolt(),2).allGrouping("spout");
输出结果:
ssssssssss 123 sdfv default Thread-9-bolt--id=83 line :1 session_id:123
ssssssssss 123 sdfv default Thread-11-bolt--id=85 line :1 session_id:123
sdfgh dfghj sdfghj default Thread-9-bolt--id=83 line :2 session_id:dfghj
sdfgh dfghj sdfghj default Thread-11-bolt--id=85 line :2 session_id:dfghj
每个tuple都会被每个bolt处理一边
4)全局分组 (Global Grouping )一般用来汇总
全局分组 ,把tuple分配给task_id 最低的task.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new MySpout(),1);
builder.setBolt("bolt",new MyBolt(),3).globalGrouping("spout");
输出结果:
12 1 1 default Thread-9-bolt--id=82 line :12 session_id:1
11 2 3 default Thread-9-bolt--id=82 line :13 session_id:2
1 2 3 default Thread-9-bolt--id=82 line :14 session_id:2
1 2 3 default Thread-9-bolt--id=82 line :15 session_id:2
1 2 3 default Thread-9-bolt--id=82 line :16 session_id:2
1 2 3 default Thread-9-bolt--id=82 line :17 session_id:2
11 2 33 default Thread-9-bolt--id=82 line :18 session_id:2
每次都是最小的id执行task
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("wordSpout",new WordSpout());
builder.setBolt("splitBolt",new SplitBolt(),2).shuffleGrouping("wordSpout");
builder.setBolt("countBolt",new CountBolt(),4).fieldsGrouping("splitBolt",new Fields("word"));
builder.setBolt("reportBolt",new ReportBolt()).globalGrouping("countBolt");
最后4个countBolt 被汇总到countBolt 其他的就做演示
|