一、简介
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。
基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
注意,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。
Github:https://github.com/LMAX-Exchange/disruptor
Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型 。
JUC包下队列存在的问题:
队列 | 描述 |
---|
ArrayBlockingQueue | 基于数组结构实现的一个有界阻塞队列 | LinkedBlockingQueue | 基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列 | PriorityBlockingQueue | 支持按优先级排序的无界阻塞队列 | DelayQueue | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列 | SynchronousQueue | 不存储元素的阻塞队列 | LinkedTransferQueue | 基于链表结构实现的一个无界阻塞队列 | LinkedBlockingDeque | 基于链表结构实现的一个双端阻塞队列 |
- JUC下的队列大部分采用加
ReentrantLock 锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。 - 加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。
- 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题
false sharing (伪共享)。
关于伪共享的介绍,可以参考《Java内存模型》中缓存一致性协议部分的内容,里面有对伪共享以及解决方案做详细的介绍
二、基本原理
2.1 设计方案
Disruptor 通过以下设计来解决队列速度慢的问题:
-
环形数组结构 为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。 -
元素位置定位 数组长度2^n ,通过位运算,加快定位的速度(计算元素的索引下标)。每个元素都有一个序列号sequence ,采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。 -
无锁设计 每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。通过CAS 和自旋的方式实现线程安全。 -
利用缓存行填充结局了伪共享问题 -
实现了基于事件驱动的生产者消费者模型(观察者模式) 消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费
2.2 RingBuffer数据结构
使用RingBuffer 来作为队列的数据结构,RingBuffer 就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence ),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:
Disruptor 要求设置数组长度为2的n次幂。在知道索引(index )下标的情况下,存与取数组上的元素时间复杂度只有O(1) ,而这个index 可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length 。也可以用位运算来计算效率更高,此时array.length 必须是2的幂次方,index=sequece&(entries.length-1) 。这种计算方式与HashMap 计算槽位的方式是一样的,效率更高。- 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉
思考:覆盖数据会不会导致数据丢失呢?只从上面的设计上来看,于ArrayBlockingQueue 挺像的,都是采用环形数据存取数据,不过ArrayBlockingQueue 同其他JUC中的有界阻塞队列一样,当队列满或空时,就直接会被阻塞,而这一点也是Disruptor 与它们不同的地方,Disruptor 通过提供多种策略,由开发人员根据业务需要使用相应的策略,针对队列满这种情况,只提供了一种策略:短暂阻塞当前线程
LockSupport.parkNanos(1)
而针对队列为空的情况,提供了多种策略,常用的有如下几种:
BlockingWaitStrategy :常见且默认的等待策略,当队列为空时,进行阻塞等待。使用ReentrantLock+Condition 实现阻塞,最节省CPU,但高并发场景下性能最差。适合CPU资源紧张,而吞吐量和延迟并不重要的场景SleepingWaitStrategy :在循环中不断等待数据,先进行自旋,如果不成功则使用Thread.yield() 让出CPU,如果还不行就通过LockSupport.parkNanos(1L) 进行线程休眠,以确保不占用太多的CPU资源。因此该策略会产生比较高的平均延时。典型的应用场景就是异步日志。YieldingWaitStrategy :该策略适用于低延时的场景。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield ()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。BusySpinWaitStrategy : 采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用
一共有如下这些覆盖策略,这里就不一一列举了。
2.3 元素写入与读取
2.3.1 单线程写数据
- 申请写入m个元素
- 若是有m个元素可以写入,则返回最大的序列号。这里主要判断写入m个元素会不会覆盖未读的元素
- 如果返回正确,则生产者开始写入元素
其大概流程如下:
2.3.2 多线程读写数据
在多个生产者存在的情况下,如果防止多个线程重复写同一个槽位,就是解决问题的关键。普通的内存队列,大多采用ReentrantLock 来实现,这种实现在高并发环境中效率是很低的,因为涉及到太多线程的阻塞与唤醒操作。而Disruptor 的解决办法是每个生产者获取不同的数组空间进行操作。通过CAS 很容易实现,只需要在分配槽位的时候,通过CAS 判断一下这段空间是否已经被分配出去即可。
但这种实现有一个新的问题:如何防止读取的时候,读取到还未写入元素的槽位。Disruptor 在多生产者的情况下,引入了一个与Ring Buffer 大小相同的buffer :available Buffer 。当某个位置写入成功时,便把available Buffer 中对应的位置,标记为写入成功。读取的时候,会遍历available Buffer ,来判断元素是否已经就绪。
消费者读数据
多生产者线程写入的情况下,读数据也会复杂一些:
- 申请读取到序号n
- 如果此时
writer cursor >= n ,仍然无法确定连续可读的最大下标。从read cursor 开始读取available Buffer ,一直读到第一个不可用的元素,然后返回最大连续可读元素的位置 - 消费者读取元素
如下图所示:
读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3 正在向RingBuffer 相应位置写元素,写线程被分配到最大元素下标为11。读线程申请读取从下标3到下标11的元素,判断writer cursor >=11 成立。然后开始读取availableBuffer ,从3开始往后读,读取到下标为7的槽位时,发现元素还没有生产成功,于是waitFor(11) 只能返回6。然后消费者读取下标从3到6的四个元素。
生产者写数据
多个生产者写入数据的过程大致如下:
- 申请写入m个元素
- 如果m个元素可以写入,则返回最大的序列号。每个生产者都会被分配一段独享的空间(通过
CAS+自旋 实现) - 生产者写入元素,同时设置
availableBuffer 里面相应的位置,以标记哪些槽位已经有元素写入成功。
如下图所示:
writer1 和writer2 两个线程同时要向数组写元素,通过CAS+自旋 都申请到可用的数组空间。Writer1 被分配了下标3到下标5的空间,Writer2 被分配了下标6到下标9的空间。Writer1 写入下标3位置的元素,同时将availableBuffer 相应位置标记为写入成功,往后移一位,开始写下标为4位置的元素。Writer2 以同样的方式进行写入,最终都写入完成
2.4 Disruptor的核心概念
RingBuffer (环形缓冲区):基于数组的内存级别缓存,是创建sequencer (序号)与定义WaitStrategy (拒绝策略)的入口。Disruptor (总体执行入口):对RingBuffer 的封装,持有RingBuffer 、消费者线程池Executor 、消费者集合ConsumerRepository 等引用。Sequence (序号分配器):对RingBuffer 中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence 可以跟踪标识某个事件的处理进度,同时还能消除伪共享。Sequencer (数据传输器):Sequencer 里面包含了Sequence ,是Disruptor 的核心,Sequencer 有两个实现类:SingleProducerSequencer (单生产者实现)、MultiProducerSequencer (多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法SequenceBarrier (消费者屏障):用于控制RingBuffer 的Producer 和Consumer 之间的平衡关系,并且决定了Consumer 是否还有可处理的事件的逻辑。WaitStrategy (消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略Event :从生产者到消费者过程中所处理的数据单元,Event 由使用者自定义。EventHandler :由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor 中的一个消费者的接口。EventProcessor :这是个事件处理器接口,实现了Runnable ,处理主要事件循环,处理Event ,拥有消费者的Sequence
Disruptor 的构造方法:
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
- EventFactory:创建事件(任务)的工厂类。
- ringBufferSize:容器的长度。
- ThreadFactory :用于创建执行任务的线程。
- ProductType:生产者类型:单生产者、多生产者。
- WaitStrategy:等待策略。
三、使用场景
在使用的时候需要引入其依赖,如果引入了Log4j2 的依赖,它自带的就有Disruptor 的依赖包
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.7</version>
</dependency>
先创建OrderEvent (消息载体/事件)和OrderEventFactory (事件工厂)
OrderEvent 类将会被放入环形队列中作为消息内容,而OrdreEventFactory 类用于创建OrderEvent 事件
@Data
@ToString
public class OrderEvent {
private String phone;
private String address;
}
public class OrderEventFactory implements EventFactory {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
创建生产者
@Log4j2
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer ringBuffer){
this.ringBuffer = ringBuffer;
}
public void produce(String username,String phone,String address){
long sequence = ringBuffer.next();
try {
OrderEvent orderEvent = ringBuffer.get(sequence);
orderEvent.setUsername(username);
orderEvent.setPhone(phone);
orderEvent.setAddress(address);
} catch (Exception e) {
log.error("write event error[{}]",e);
} finally {
ringBuffer.publish(sequence);
}
}
}
创建消费者
@Log4j2
public class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("Order Details[{}]",event);
}
}
3.1 单生产者消费者
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.SINGLE,
new YieldingWaitStrategy()
);
disruptor.handleEventsWith(new OrderEventHandler());
disruptor.start();
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
for (int index = 0; index < 10; index++){
orderEventProducer.produce(RandomStringUtils.randomAlphabetic(4), "1592751"+(int)(Math.random()*10000),"lizhi"+index);
}
disruptor.shutdown();
3.2 单生产者多消费者
如果有多个消费者,只需要在调用handleEventsWith() 时将多个消费者传递进去即可
disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
上面的这种方式,多个消费者会重复消费每一条消息。如果要实现多消费者情况下,每条消息只会被一个消费者消费,那么就需要调用handleEventsWithWorkerPool() ,同时消费者需要实现WorkHandler 接口
@Log4j2
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("Order Details[{}]",event);
}
@Override
public void onEvent(OrderEvent event) throws Exception {
System.out.println(event);
}
}
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());
3.3 多生产者多消费者
在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态。
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
new YieldingWaitStrategy()
);
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());
disruptor.start();
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
new Thread(() ->{
OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
for (int index = 0; index < 10; index++){
orderEventProducer.produce(RandomStringUtils.randomAlphabetic(4), "1592751"+(int)(Math.random()*10000),"lizhi"+index);
}
},"producer1").start();
new Thread(() -> {
OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
for (int index = 0; index < 10; index++){
orderEventProducer.produce(RandomStringUtils.randomAlphabetic(6), "1234567"+(int)(Math.random()*10000),"lizhi"+index);
}
},"producer2").start();
disruptor.shutdown();
3.4 消费则优先级模式
在有些场景中,因为业务需要而形成一条消费链,比如一个消息由消费者A -> 消费者B -> 消费者C的顺序依次进行消费。在配置消费者时可以通过then() 方法去实现顺序消费
disruptor.handleEventsWith(new OrderEventHandler()).then(new OrderEventHandler())
handleEventsWith 与handleEventsWithWorkerPool 都是支持 then 的,它们可以结合使用。比如可以按照 消费者A -> (消费者B 消费者C) -> 消费者D 的消费顺序
disruptor.handleEventsWith(new OrderEventHandler())
.thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
.then(new OrderEventHandler());
|