TaskRuntime API 入门

TaskRuntime API 入门

【开发总结】Disruptor 使用简介

在极客时间看到王宝令老师关于 Disruptor 的一篇文章,觉得很有意思。看完之后又在网上找到一些其他关于Disruptor 的资料看了一下。

现在写篇文章总结一下。

使用

Disruptor 百度翻译是干扰者,分裂器的意思。
在这里它其实是一个高性能队列,一个queue。所以我有点想不通为什么名字取成这样。有清楚的同学可以知会我一生。

Disruptor 的使用相对Java集合类中的队列,会更加复杂。

第一步,引入jar包.

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

第二步,生成 Disruptor 对象

第三步,设置队列中消息消费的handler.

第四步,启动 Disruptor 线程。

第五步,获取ringbuffer。生产者通过向 Disruptor 的ringbuffer 来发布消息的。所以事先要先获取ringbuffer。

第六步,发布消息。

/**
 * @description:
 * @author: lkb
 * @create: 2020-10-28 19:46
 */
@Slf4j
public class MyTest {


    public static void main(String[] args) {
        //指定RingBuffer大小,
        //必须是2的N次方
        int bufferSize = 1024;

        //构建Disruptor
        Disruptor<LongEvent> disruptor
                = new Disruptor<>(
                LongEvent::new,
                bufferSize,
                DaemonThreadFactory.INSTANCE);

        //注册事件处理器
        disruptor.handleEventsWith(
                (event, sequence, endOfBatch) ->
                        System.out.println("E: " + event));

        //启动Disruptor
        disruptor.start();

        //获取RingBuffer
        RingBuffer<LongEvent> ringBuffer
                = disruptor.getRingBuffer();
        //生产Event
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            //生产者生产消息
            ringBuffer.publishEvent(
                    (event, sequence, buffer) ->
                            event.set(buffer.getLong(0)), bb);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.error(e.getMessage(), e);
            }
        }
    }
}

其中LongEvent 是一个普通的POJO对象

public class LongEvent {
    private long value;
    public void set(long value) {
        this.value = value;
    }
}

Disruptor 的使用是典型的生产者-消费者模式。

Java集合中的队列更符合我们对队列的操作习惯。以ArrayBlockingQueue为例,我们可以把ArrayBlockingQueue想象为一个队列管道,生产者线程生产完数据后,将数据丢到队列中,消费者线程从另外一端取出数据,进行消费。

而Disruptor 相对其他传统的队列而言更像一个“大家长”,生成者需要通过这位“大家长”的ringbuffer将消息发送出去,消费者需要将处理操作注册到“大家长”这里。

这样的队列操作不太符合我们的习惯,所以使用上会不那么顺手。

高效的秘诀

在不顺手的情况下,为什么还是有很多系统用到它呢?原因在于它非常高效。

Disruptor 使用简介-LMLPHP

上面是网上找到的性能对比图。可以看到Disruptor性能上是非常高的。
那它是如何实现高效的呢?

  1. 内存分配更加合理,使用 RingBuffer 数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率;
  2. 对象循环利用,避免频繁 GC。
  3. 能够避免伪共享,提升缓存利用率。
  4. 采用无锁算法,避免频繁加锁、解锁的性能消耗。支持批量消费,消费者可以无锁方式消费多个消息。

对于第四点,相信大家都很清楚。锁操作涉及到操作系统状态切换,这个操作是非常耗时耗资源的。无锁操作可以避免状态切换。

对于前面三点,涉及到一个非常重要的概念,就是缓存。CPU有三级缓存。离CPU越近的缓存,速度越快,但是容量越小。因为CPU的速度远远大于其他硬件的速度,设置缓存能够减小CPU和其他硬件的速度差。这个缓存和生产者消费者中间的队列有异曲同工之妙。

为了提高缓存的命中率,硬件通过局部性原理,在加载一个数据的同时将它周围的数据也加载进去。

上诉的第1、2条,通过将数据设置进连续相邻的内存位置,CPU在读取了一个数据的时候,发现第二个数据已经因为“局部性”原理加载进缓存,就不需要再次去寻址,直接从缓存中获取数据。

第1,2条是对缓存的高效利用,第3条就是对缓存低效使用的规避。
有一种缓存低效使用的方式是“伪共享”。内存是按照缓存行进行管理的。缓存行的大小通常是64个字节。

Disruptor 使用简介-LMLPHP

例如一个缓存行存储了两个对象,对其中一个对象的操作会使得整个缓存行失效。也就是说即使对象B被加入了缓存,但是因为其他对象的操作无效了。
第3条中,Disruptor 中通过将对象包裹,让一个对象充满整个缓存行,避免了伪共享的问题。

还有一点就是,相对于其他阻塞队列,Disruptor 的等待策略更多,功能更加强大。

通过对缓存的利用和无锁操作,Disruptor 成为一个高效队列。

一些思考

Disruptor 的一些思想其实在其他框架上也是常见的。

避免伪共享问题上,MySQL 8.0 版本直接将查询缓存的整块功能删掉了;在高效利用缓存上,线程池、队列等都多算缓存概念的受益者;避免锁操作上,Java的底层的各种锁优化,也是利用这点,比如轻量级锁。

为什么这么多框架会不约而同地想到这些问题呢?

因为计算机、操作系统是非常成熟的,底层都是非常相似的架构。了解计算机底层原理,对这些知识才能触类旁通。所以,啥不说,计算机基础课,我打算再上一遍。

10-29 21:51