IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 高性能内存队列Disruptor -> 正文阅读

[Java知识库]高性能内存队列Disruptor

一、简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

注意,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列

Github:https://github.com/LMAX-Exchange/disruptor

Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型

JUC包下队列存在的问题:

队列描述
ArrayBlockingQueue基于数组结构实现的一个有界阻塞队列
LinkedBlockingQueue基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列
PriorityBlockingQueue支持按优先级排序的无界阻塞队列
DelayQueue基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列
SynchronousQueue不存储元素的阻塞队列
LinkedTransferQueue基于链表结构实现的一个无界阻塞队列
LinkedBlockingDeque基于链表结构实现的一个双端阻塞队列
  • JUC下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。
  • 加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。
  • 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。

关于伪共享的介绍,可以参考《Java内存模型》中缓存一致性协议部分的内容,里面有对伪共享以及解决方案做详细的介绍

二、基本原理

2.1 设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构

    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。

  • 元素位置定位

    数组长度2^n,通过位运算,加快定位的速度(计算元素的索引下标)。每个元素都有一个序列号sequence,采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

    每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。通过CAS和自旋的方式实现线程安全。

  • 利用缓存行填充结局了伪共享问题

  • 实现了基于事件驱动的生产者消费者模型(观察者模式)

    消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费

2.2 RingBuffer数据结构

使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:

  • Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)。这种计算方式与HashMap计算槽位的方式是一样的,效率更高。
  • 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉

思考:覆盖数据会不会导致数据丢失呢?只从上面的设计上来看,于ArrayBlockingQueue挺像的,都是采用环形数据存取数据,不过ArrayBlockingQueue同其他JUC中的有界阻塞队列一样,当队列满或空时,就直接会被阻塞,而这一点也是Disruptor与它们不同的地方,Disruptor通过提供多种策略,由开发人员根据业务需要使用相应的策略,针对队列满这种情况,只提供了一种策略:短暂阻塞当前线程

LockSupport.parkNanos(1)

而针对队列为空的情况,提供了多种策略,常用的有如下几种:

  • BlockingWaitStrategy:常见且默认的等待策略,当队列为空时,进行阻塞等待。使用ReentrantLock+Condition实现阻塞,最节省CPU,但高并发场景下性能最差。适合CPU资源紧张,而吞吐量和延迟并不重要的场景
  • SleepingWaitStrategy:在循环中不断等待数据,先进行自旋,如果不成功则使用Thread.yield()让出CPU,如果还不行就通过LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此该策略会产生比较高的平均延时。典型的应用场景就是异步日志
  • YieldingWaitStrategy:该策略适用于低延时的场景。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。
  • BusySpinWaitStrategy: 采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用

一共有如下这些覆盖策略,这里就不一一列举了。

2.3 元素写入与读取

2.3.1 单线程写数据

  1. 申请写入m个元素
  2. 若是有m个元素可以写入,则返回最大的序列号。这里主要判断写入m个元素会不会覆盖未读的元素
  3. 如果返回正确,则生产者开始写入元素

其大概流程如下:

2.3.2 多线程读写数据

在多个生产者存在的情况下,如果防止多个线程重复写同一个槽位,就是解决问题的关键。普通的内存队列,大多采用ReentrantLock来实现,这种实现在高并发环境中效率是很低的,因为涉及到太多线程的阻塞与唤醒操作。而Disruptor的解决办法是每个生产者获取不同的数组空间进行操作。通过CAS很容易实现,只需要在分配槽位的时候,通过CAS判断一下这段空间是否已经被分配出去即可。

但这种实现有一个新的问题:如何防止读取的时候,读取到还未写入元素的槽位。Disruptor在多生产者的情况下,引入了一个与Ring Buffer大小相同的bufferavailable Buffer。当某个位置写入成功时,便把available Buffer中对应的位置,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

消费者读数据

多生产者线程写入的情况下,读数据也会复杂一些:

  1. 申请读取到序号n
  2. 如果此时writer cursor >= n,仍然无法确定连续可读的最大下标。从read cursor开始读取available Buffer,一直读到第一个不可用的元素,然后返回最大连续可读元素的位置
  3. 消费者读取元素

如下图所示:

读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写元素,写线程被分配到最大元素下标为11。读线程申请读取从下标3到下标11的元素,判断writer cursor >=11成立。然后开始读取availableBuffer,从3开始往后读,读取到下标为7的槽位时,发现元素还没有生产成功,于是waitFor(11)只能返回6。然后消费者读取下标从3到6的四个元素。

生产者写数据

多个生产者写入数据的过程大致如下:

  1. 申请写入m个元素
  2. 如果m个元素可以写入,则返回最大的序列号。每个生产者都会被分配一段独享的空间(通过CAS+自旋实现)
  3. 生产者写入元素,同时设置availableBuffer里面相应的位置,以标记哪些槽位已经有元素写入成功。

如下图所示:

writer1writer2两个线程同时要向数组写元素,通过CAS+自旋都申请到可用的数组空间。Writer1被分配了下标3到下标5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素,同时将availableBuffer相应位置标记为写入成功,往后移一位,开始写下标为4位置的元素。Writer2以同样的方式进行写入,最终都写入完成

2.4 Disruptor的核心概念

  • RingBuffer(环形缓冲区):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口。
  • Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费者集合ConsumerRepository等引用。
  • Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
  • Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
  • SequenceBarrier(消费者屏障):用于控制RingBufferProducerConsumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
  • WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
  • Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
  • EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。
  • EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence

Disruptor的构造方法:

public Disruptor(
    final EventFactory<T> eventFactory,
    final int ringBufferSize,
    final ThreadFactory threadFactory,
    final ProducerType producerType,
    final WaitStrategy waitStrategy)
{
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}
  • EventFactory:创建事件(任务)的工厂类。
  • ringBufferSize:容器的长度。
  • ThreadFactory :用于创建执行任务的线程。
  • ProductType:生产者类型:单生产者、多生产者。
  • WaitStrategy:等待策略。

三、使用场景

在使用的时候需要引入其依赖,如果引入了Log4j2的依赖,它自带的就有Disruptor的依赖包

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.7</version>
</dependency>

先创建OrderEvent(消息载体/事件)和OrderEventFactory(事件工厂)

OrderEvent类将会被放入环形队列中作为消息内容,而OrdreEventFactory类用于创建OrderEvent事件

@Data
@ToString
public class OrderEvent {
    private String phone;
    private String address;
}

public class OrderEventFactory implements EventFactory {
    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}

创建生产者

@Log4j2
public class OrderEventProducer {

    private RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer ringBuffer){
        this.ringBuffer = ringBuffer;
    }

    public void produce(String username,String phone,String address){
        // 获取事件队列下一个槽位
        long sequence = ringBuffer.next();
        try {
            // 获取消息/事件
            OrderEvent orderEvent = ringBuffer.get(sequence);
            // 填充消息数据
            orderEvent.setUsername(username);
            orderEvent.setPhone(phone);
            orderEvent.setAddress(address);
        } catch (Exception e) {
            log.error("write event error[{}]",e);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

创建消费者

@Log4j2
public class OrderEventHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("Order Details[{}]",event);
    }
}

3.1 单生产者消费者

Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
    new OrderEventFactory(),
    1024 * 1024,
    Executors.defaultThreadFactory(),
    ProducerType.SINGLE, // 单生产者
    new YieldingWaitStrategy() // 等待策略
);

// 设置消费者用于处理RingBuffer的事件
disruptor.handleEventsWith(new OrderEventHandler());
disruptor.start();

// 创建RingBuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
// 创建生产者
OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for (int index = 0; index < 10; index++){
    orderEventProducer.produce(RandomStringUtils.randomAlphabetic(4), "1592751"+(int)(Math.random()*10000),"lizhi"+index);
}

disruptor.shutdown();

3.2 单生产者多消费者

如果有多个消费者,只需要在调用handleEventsWith()时将多个消费者传递进去即可

// 设置消费者用于处理RingBuffer的事件
// 消息会被多个消费者重复消费
disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());

上面的这种方式,多个消费者会重复消费每一条消息。如果要实现多消费者情况下,每条消息只会被一个消费者消费,那么就需要调用handleEventsWithWorkerPool(),同时消费者需要实现WorkHandler接口

@Log4j2
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("Order Details[{}]",event);
    }

    @Override
    public void onEvent(OrderEvent event) throws Exception {
        System.out.println(event);
    }
}
// 设置多个消费者不重复消费
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());

3.3 多生产者多消费者

在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态。

Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
    new OrderEventFactory(),
    1024 * 1024,
    Executors.defaultThreadFactory(),
    ProducerType.MULTI, // 多生产者
    new YieldingWaitStrategy() // 等待策略
);

// 设置消费者用于处理RingBuffer的事件
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(),new OrderEventHandler());
disruptor.start();

// 创建RingBuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
new Thread(() ->{
    // 创建生产者
    OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
    // 发送消息
    for (int index = 0; index < 10; index++){
        orderEventProducer.produce(RandomStringUtils.randomAlphabetic(4), "1592751"+(int)(Math.random()*10000),"lizhi"+index);
    }
},"producer1").start();

new Thread(() -> {
    // 创建生产者
    OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
    // 发送消息
    for (int index = 0; index < 10; index++){
        orderEventProducer.produce(RandomStringUtils.randomAlphabetic(6), "1234567"+(int)(Math.random()*10000),"lizhi"+index);
    }
},"producer2").start();

disruptor.shutdown();

3.4 消费则优先级模式

在有些场景中,因为业务需要而形成一条消费链,比如一个消息由消费者A -> 消费者B -> 消费者C的顺序依次进行消费。在配置消费者时可以通过then()方法去实现顺序消费

disruptor.handleEventsWith(new OrderEventHandler()).then(new OrderEventHandler())

handleEventsWithhandleEventsWithWorkerPool都是支持 then 的,它们可以结合使用。比如可以按照 消费者A -> (消费者B 消费者C) -> 消费者D 的消费顺序

disruptor.handleEventsWith(new OrderEventHandler())
    .thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
    .then(new OrderEventHandler());
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-10 22:17:51  更:2022-03-10 22:19:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 12:06:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码