Storm🥇
Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop。随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,
1、DAG:有向无循环图
它由有限个顶点和有向边组成,每条有向边都从一个顶点指向另一个顶点;从任意一个顶点出发都不能通过这些有向边回到原来的顶点。有向无环图就是从一个图中的任何一点出发,不管走过多少个分叉路口,都没有回到原来这个点的可能性。
2、Storm的特性
1.适用场景广泛:
storm可以实时处理消息和更新DB,对一个数据量进行持续的查询并返回客户端(持续计算)
对一个耗资源的查询作实时并行化的处理(分布式方法调用,即DRPC),storm的这些基础API可以满足大量的场景。
2、可伸缩性高:
Storm的可伸缩性可以让storm每秒可以处理的消息量达到很高。
Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易的扩展。
3、保证无数据丢失:
实时系统必须保证所有的数据被成功的处理。storm保证每一条消息都会被处理。
4、异常健壮:
storm集群非常容易管理,轮流重启节点不影响应用。
5、容错性好:
在消息处理过程中出现异常, storm会进行重试
6、语言无关性:
Storm的topology和消息处理组件(Bolt)可以用任何语言来定义, 这一点使得任何人都可以使用storm.
3、Storm物理架构
1、nimbus:
Storm的Master,负责资源分配和任务调度,一个Storm集群只有一个Nimbus,此节点是一个无状态节点,所有的一切都存储再Zookeeper
2、supervisor:
Storm的Slave,负责接受Nimbus分配的任务管理所有的Worker,一个Supervisor节点包含多个Workers进程,默认4个,一般情况一个topology对应一个worker
3、workers:
工作进程(Process),每个工作进程中都有多个Task。
4、Task:
在 Storm 集群中每个 Spout 和 Bolt 都由若干个任务(tasks)来执行。
worker中每一个spout/bolt的线程称为一个task
同一个spout/bolt的task可能会共享一个物理线程(Thread),该线程称为executor
5、Storm的并行机制:
Topology由一个或多个Spout/Bolt组件构成。运行中的Topology由一个或多个Supervisor节点的Worker构成
6、流式计算框架(计算框架)
客户端将数据发送给MQ(消息队列),然后传递到Storm中进行计算
最终计算的结果存储到数据库中(HBase,Mysql)
客户端不要求服务器返回结果,客户端可以一直向Storm发送数据
客户端相当于生产者,Storm相当于消费者
4、Storm的数据分发策略
--->
1、ShuffleGrouping
随机分组,随机派发stream里面的tuple,保证每个bolttask接收到的tuple数目大致相同。轮询,平均分配
优点:
为tuple选择task的代价小;
bolt的tasks之间的负载比较均衡;
缺点:
上下游components之间的逻辑组织关系不明显;
--->
2、FieldsGrouping
按字段分组
比如,按"user-id"这个字段来分组,那么具有同样"user-id"的tuple会被分到相同的Bolt里的一个task,而不同的"user-id"则可能会被分配到不同的task。
优点:
上下游components之间的逻辑组织关系显著;
缺点:
付出为tuple选择task的代价;
bolt的tasks之间的负载可能不均衡,根据field字段而定;
--->
3、AllGrouping
广播发送,对于每一个tuple,所有的bolts都会收到
优点:
上游事件可以通知下游bolt中所有task;
缺点:
tuple消息冗余,对性能有损耗,请谨慎使用;
--- >
4、GlobalGrouping
全局分组,把tuple分配给taskid最低的task。
优点:
所有上游消息全部汇总,便于合并、统计等;
缺点:
bolt的tasks之间的负载可能不均衡,id最小的task负载过重;
--- >
5、DirectGrouping
指向型分组,这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。
只有被声明为DirectStream的消息流可以声明这种分组方法。
而且这种消息tuple必须使用emitDirect方法来发射。
消息处理者可以通过TopologyContext来获取处理它的消息的task的id(OutputCollector.emit方法也会返回task的id)
优点:
Topology的可控性强,且组件的各task的负载可控;
缺点:
当实际负载与预估不符时性能削弱;
--->
6、Localorshufflegrouping
本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的ShuffleGrouping行为一致
优点:
相对于ShuffleGrouping,因优先选择同进程task间传输而降低tuple网络传输代价,但因寻找同进程的task而消耗CPU和内存资源,因此应视情况来确定选择ShuffleGrouping或LocalOrShuffleGrouping;
缺点:
上下游components之间的逻辑组织关系不明显;
--->
7、NoneGrouping
不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shufflegrouping是一样的效果。有一点不同的是storm会把使用nonegrouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
8、customGrouping
自定义,相当于mapreduce那里自己去实现一个partition一样。
简单案例
测试小案例:
-- NumberTopology --
public class NumberTopology {
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("numberspout", new NumberSpout());
topologyBuilder.setBolt("numberBolt", new NumberBolt()).shuffleGrouping("numberspout");
Config conf = new Config();
StormTopology topology = topologyBuilder.createTopology();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("numberTopology", conf, topology);
}
}
-- NumberBolt --
public class NumberBolt extends BaseBasicBolt {
private static int sum;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println("从上游获取的数据:" + input);
System.out.println("从上游获取的数据为:" + input.getInteger(0) + "--" + input.getIntegerByField("num"));
sum += input.getInteger(0);
System.out.println("截止到本次,共获取数据和为:" + sum);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}
}
-- NumberSpout --
public class NumberSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int number;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
this.collector.emit(new Values(number++));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}
}
5、Storm的通信机制
① Worker进程间通信原理
宏观
① 每个worker进程都一个独立的接受线程和发送线程
② 接收线程接受外部发送的消息到执行器(executor)的收集队列(incoming-queue)中
③ 发送线程从worker的执行器发送队列(transfer-queue)中读取消息,通过网络发给其他worker
内部微观
① worker中有多个执行器executor,每个执行器都有对应的接受进程和发送线程,
② worker接受线程将消息分配给指定taskid的executor的接受器中
③ executor会有段单独的线程进行处理,最后结果outgoing-queue,达到阈值进行批量发送tuple(元组)
② Worker进程内通信原理
Disruptor是一个Queue队列。
Disruptor一种线程之间信息无锁的交换方式。
Disruptor主要特点:
1、 没有竞争=没有锁=非常快。
2、 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结
Disruptor 核心技术点:
Disruptor可以看成一个事件监听或消息机制,在队列中一边生产者放入消息,另外一边消费者并行取出处理.
底层是单个数据结构:一个ring buffer(环形数据缓冲区)
③ Storm的容错机制
节点故障
Nimbus宕机
单点故障,1.0后便是高可用的了
非Nimbus宕机
故障时,所有任务超时,Nimbus将会将任务重新分配到别的服务器上
进程故障
1、Worker
worker由Supervisor负责监控,一旦worker故障,会尝试重启,如果还是失败并且失去心跳机制,那么Nimbus将会把worker的任务分配给其他服务器
2、Supervisor
一旦遇到异常情况,直接自动毁灭,无状态(信息存放在zookeeper中)
① 快速失败:在遍历过程中进行增删改,抛出异常 多进程下不能发送修改
② 安全失败:遍历的数据先复制一份,对复制的进行遍历并发修改,来保证安全
④ 任务级容错
1、Bolt 任务冲突crash引起消息未被应答
执行超时,会调用fail方法
2、acker任务失败
首先持有的消息超时失效,Spout的fail方法执行
3、Spout任务失败
此时消息的完整性交给外部设备比如MQ
⑤ 消息容完整性
每个从Spout(Storm中数据源点)发出的Tuple(Storm中最小的消息单元)可能会生成成千上万个新的Tuple
形成一颗Tuple树,当整颗Tuple树的节点都被成功处理了,我们就说从Spout发出的Tuple被完全处理了。
ACKER:
acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径,
如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消息处理失败了
相反则会告知spout该消息处理成功了。
|