Java多线程(12)

为什么用Disruptor?

       传统阻塞的队列使用锁保证线程安全,而锁通过操作系统内核上下文切换实现,会暂停线程去等待锁,直到锁释放。

执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态,这种状态会导致高水平的写入争用。

       毕竟锁的技术会导致性能变差,而且还有可能会造成死锁。

 

什么是Disruptor?

       Disruptor的流行起源于Martin Fowloer在自己的网站上写了一篇LMAX架构的文章,文章中介绍了LMAX是一种新型零售金融交易平台,能够以很低的延迟产生大量的交易,这个系统是建立在JVM平台之上的,其核心是一个业务逻辑处理器,一个线程里每秒处理六百万订单。业务逻辑处理器完全是允许在内存中,使用事件源驱动方式,而这个处理器的核心就是Disruptor。

       Disruptor是一个多线程并发框架,能够在无锁的情况下实现网络的Queue并发操作,是一个高性能的异步处理框架,也可以认为是一个观察者或事件监听模式的实现。

       Disruptor根本不用锁,取而代之的是,在需要确保操作是线程安全的(特别是,在多生产者场景下,更新下一个可用的序列号)地方,使用CAS(Compare And Swap/Set)操作。这是一个CPU级别的指令,工作方式有点像乐观锁,CPU去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很明显,有其他操作先改变了这个值。

       CAS操作比锁消耗的资源少很多,主要不涉及操作系统,直接在CPU上操作。但并非是没有才加的,只是比使用锁耗时少,比不需要考虑竞争的单线程耗时多。

       在Disruptor中,多线程那个使用了AtomicLong(Java提供的CAS操作),而单线程使用long,没有锁也没有CAS,意味着单线程版本非常快,不会产生序号上的冲突。在整个框架中,只有一个地方出现多线程竞争修改同一个变量值,如果只有一个生产者,那么系统中国的每一个序号由一个线程写入,这意味着没有竞争,也不需要锁、甚至不需要CAS。如果存在多个生产者,唯一会被多线程写入的序号就是ClaimStrate对象里的那个。

 

核心组件

       RingBuffer被看做Disruptor最主要的组件,在3.X后,RingBuffer仅仅负责存储和更新在Disruptor中流通的数据,对一些特殊使用场景能够被其他数据结构完全替代。

       SequenceDisruptor使用Sequence来表示一个特殊组织间处理的序号,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码都依赖这些Sequence值运算,因此Sequence支持多种当前为AtomicLong类的特性。

       Sequencer这是Disruptor真正的核心,实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确的数据传递。

       SequenceBarrirer由Sequence生产,包含已经发布的Sequence的引用,这些Sequence源于Sequencer和一些独立的消费者的Sequence,包含了决定是否有供消费者来消费的Event的逻辑。

       WaitStrategy决定一个消费者将如何等待生产者将Event置入Disruptor中。

     Event从生产者到消费者的过程中所处理的数据单元。Disruptor中没有代表标识Event,完全由用户定义的。

     EventProcessor主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。有一个实现类就是BatchEventProcessor,包含了EventLoop有效的实现,并且将回调到一个EventHandler接口的实现对象。

       EventHandler用户实现,代表了Disruptor中的一个消费者的接口。

       Producer用户实现,调用RingBuffer来插入Event(事件),在Disruptor中没有相应的实现代码。

       WorkProcessor确保每个Sequence只被一个Processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的Sequence。

       WorkerPool一个WorkProcessor池,其中的WorkerProcess将消费Sequence,所以任务可以在实现WorkHandler接口的Worker之间移交。

       LifecycleAware但BatchEventProcessor启动和停止时,与实现这个接口用于接收通知。

import java.nio.ByteBuffer;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import com.lmax.disruptor.*;

import com.lmax.disruptor.dsl.*;

 

public class D01HelloWorld {

    public static void main(String[] args) {

         //线程池

         ExecutorService pool = Executors.newCachedThreadPool();

         //工厂,生成数据

         EventFactoryData factory = new DataFactory();

         //buffSize,也就是RingBuffer2N次方是最好的

         int ringBufferSize = 1024 * 1024;

        

         /*

          * BlockingWaitStrategy:最低效的策略,但对CPU的消耗最小,并且在各种不通过部署环境中能提供更加一致的性能。

          * SleepingWaitStreategy:性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程影响最小,适合异步日志类似的场景。

          * YieldingWaitStrategy:性能最好,适合用于低延迟的系统,在要求极高性能而且事件处理线程数小于CPU逻辑核心数的场景中,例如CPU开启超特性。

          */

        

         //创建Disruptor

         DisruptorData disruptor = new Disruptor(factory, ringBufferSize, pool,

                                                         ProducerType.SINGLE,new YieldingWaitStrategy());

        

         //处理事件的消费者

         disruptor.handleEventsWith(new DataConsumer());

        

         //启动

         disruptor.start();

        

         //发布事件

         RingBufferData ringBuffer = disruptor.getRingBuffer();

        

         //生产者

         DataProducer dataProducer = new DataProducer(ringBuffer);

//       DataProducer2 dataProducer = new DataProducer2(ringBuffer);

        

         //预先分配空间

         ByteBuffer byteBuffer = ByteBuffer.allocate(10);

         for (byte i = 0; i 100; i++) {

             byteBuffer.put(0,i);

             //生产者发布数据

             dataProducer.publish(byteBuffer);

         }

        

         disruptor.shutdown(); //所有事件处理完毕才关闭

         pool.shutdown();

        

         //1、启动项目

         //2、注释DataProducer dataProducer,打开新的DataProducer2 dataProducer,查看简化的发布

    }

}

/**

 * 传输的数据

 */

class Data {

    private String value;

 

    public String getValue() {

         return value;

    }

 

    public void setValue(String value) {

         this.value = value;

    }

}

/**

 * 生产者

 */

class DataProducer {

    //环(可以当成队列),存储数据的地方

    private RingBufferData ringBuffer;

 

    public DataProducer(RingBufferData ringBuffer) {

         super();

         this.ringBuffer = ringBuffer;

    }

   

    /**

     * 调用一次就发布一次数据,会用事件的形式传递给消费者。

     * 用一个简单队列来发布事件(数据)的时候会涉及很多细节,这是因为事件(数据)对象需要预先创建好。

     * 发布事件(数据)至少要两个操作:

     * 1、获取下一个事件(数据)槽并发布事件。

     * 如果使用RingBuffer.next()获取事件(数据)槽,那么一定要发布对应的事件。

     * 如果不能发布事件(数据),那么就会引起Disruptor状态的混乱。

     * 尤其在多个事件生产者的情况下会阀值事件消费者失速,从而不得不重启应用才能恢复。

     */

    public void publish(ByteBuffer byteBuffer) {

         //可以把RingBuffer看成一个队列。

         long nextIndex = ringBuffer.next();

        

         try {

             //根据索引获取都数据

             Data data = ringBuffer.get(nextIndex);

             data.setValue("生产者生产的数据:" + byteBuffer.get(0));

         } finally {

             //发布事件(数据)给消费者去消费

             //最好是包含在finally中,确保必须得到调用,如果某个请求的nextIndex未被提交,将会堵塞,后续的发布操作或者其他的producer

             ringBuffer.publish(nextIndex);

         }

    }

}

/**

 * 消费者:也就是事件处理器

 */

class DataConsumer implements EventHandlerData {

    @Override

    public void onEvent(Data data, long l, boolean arg2) throws Exception {

         System.out.println("消费到:" + data.getValue() + " - " + l);

    }

}

/**

 * Disruptor创建事件,同时还声明了一个Factory来实例化事件(数据)对象填充到RingBuffer

 */

class DataFactory implements EventFactoryData {

    @Override

    public Data newInstance() {

         return new Data();

    }

}

/**

 * 简化发布:3.x中提供Lambda表达式API,可以把一些复杂的操作放在RingBuffer

 * 所以3.x以后的版本最好用Event Publisher或者Event Translator来发布

 */

class DataProducer2 {

    private static final EventTranslatorOneArgData, ByteBuffer TRANSLATOR =

                          new EventTranslatorOneArgData, ByteBuffer() {

 

         @Override

         public void translateTo(Data data, long index, ByteBuffer byteBuffer) {

             data.setValue(byteBuffer.get(0) + "");

         }

    };

   

    private RingBufferData ringBuffer;

 

    public DataProducer2(RingBufferData ringBuffer) {

         super();

         this.ringBuffer = ringBuffer;

    }

   

    /**

     * 发布数据

     */

    public void publish(ByteBuffer byteBuffer) {

         ringBuffer.publishEvent(TRANSLATOR,byteBuffer);

    }

}

 

RingBuffer

       RingBuffer就是一个环,每个槽都有一个序号,序号指向数组中下一个可用的元素,随着不断的填充这个Buffer,这个序号会一直增长,直到绕过这个环。

       要找到数组中的当前序号指向的元素,可通过取模操作,所以这个槽的个数是2的N次方更有利于基于二进制的计算。

       环是没有尾指针的,维护了一个指向下一个可用位置的序号,最初的原因就是想要提供可靠的消息传递。

       环和队列的区别:不删除环中的数据,也就是说数据一直存在环中,知道新的数据覆盖,这也是不需要尾指针的原因。因为是数组,所以比链表快,而且有一个容易预测的访问模式,这对CPU缓存友好,在硬件级别上,数组中的元素是会被预加载的,因此在RingBuffer中,CPU不需要时不时去主内存在加载数组中的下一个元素。环也可以为数组预先分配内存,使得数组对象一直存在,这意味着不需要大量的数据用于垃圾回收,不像链表那样,需要尾每一个添加到其上面的对象创造节点对象对应的。当删除节点时,需要执行相应的内存清理操作。

       为什么这么快:一是Disruptor通过将基本对象填充荣誉基本类型变量来充满整个缓存行,就是一个缓存行8个变量,预设7个变量,然后保存一个唯一变量,这样就不会出现相同的变量。二是无锁队列的实现,对于传统并发队列,至少要维护两个指针,头指针和尾指针。在并发访问修改时,头指针和尾指针的维护不可避免的应用了锁,Disruptor由于是环状队列,对于Produce而言只有头指针而且锁是乐观锁,在标准的Disruptor应用中,只有一个生产者,避免了头指针锁的争用,这也是为什么理解Disruptor为无锁队列。

/**

 * 直接使用RingBuffer发布,简化操作

 */

public class D02RingBuffer {

   

    public static void main(String[] args) throws InterruptedException, ExecutionException {

   

         D02RingBuffer d02RingBuffer = new D02RingBuffer();

         d02RingBuffer.main1();

//       d02RingBuffer.main2();

        

    }

   

    /*

     * 创建单个生产者的RingBuffer

     * 参数1:产生数据,填充到RingBuffer

     * 参数2:必须是2的指数倍,目的是为了将取模运算转为运算提高效率

     * 参数3:等待策略

     */

    RingBufferRData ringBuffer = RingBuffer.createSingleProducer(new EventFactoryRData() {

         @Override

         public RData newInstance() {

             return new RData();

         }

   

    }, 1024);

   

    //线程池

    ExecutorService pool = Executors.newFixedThreadPool(5);

   

    //创建SequenceBarrier,决定是否有供消费者来消费的Event的逻辑

    SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

   

    /**

     * 写法1

     */

    public void main1() throws InterruptedException, ExecutionException {

        

         //创建消息处理器,指定消费者,单个消费者

         BatchEventProcessorRData batchEventProcessor = new BatchEventProcessorRData(ringBuffer, sequenceBarrier, new RConsumer());

        

         //把消费者的位置信息引入注入到生产者,让生产者可以根据消费者的情况决定生产的速度,避免一个快、一个慢,如果只有一个消费者的情况可以省略

         ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        

         //把消息处理器给线程池

         pool.submit(batchEventProcessor);

        

         //如果有多个消费者则重复执行BatchEventProcessorringBuffer.addGatingSequencespool.submit这三行代码

        

         FutureVoid future = pool.submit(new CallableVoid() {

             @Override

             public Void call() throws Exception {

                  //发布数据给消费者消费

                  publish();

                  return null;

             }

         });

        

         future.get();//等待生产者结束

         Thread.sleep(1000); //等待处理结束

         batchEventProcessor.halt();//通知事件(或者说消息)处理器,可用结束了

         pool.shutdown();//关闭线程池

    }

   

    /**

     * 写法2

     * @throws InterruptedException

     */

    public void main2() throws InterruptedException {

         WorkHandlerRData workHandler = new RConsumer();

        

         //配置都是类似的

         WorkerPoolRData workerPool = new WorkerPoolRData(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(),workHandler);

         workerPool.start(pool);

        

         //发布数据给消费者消费

         publish();

        

         Thread.sleep(1000); //等待处理结束

         workerPool.halt();//通知事件(或者说消息)处理器,可用结束了

         pool.shutdown();//关闭线程池

    }

   

    /**

     * 发布

     */

    public void publish() {

         for (int i = 0; i 10; i++) {

             long nextSeq = ringBuffer.next(); //占坑,ringBuffer一个可用区块

             RData rData = ringBuffer.get(nextSeq);//给这个位置放入数据

             rData.setAge(i);

             rData.setName("Test " + i);

             ringBuffer.publish(nextSeq);

         }

    }

}

class RData {

    private String name;

   

    private int age;

 

    public String getName() {

         return name;

    }

 

    public void setName(String name) {

         this.name = name;

    }

 

    public int getAge() {

         return age;

    }

 

    public void setAge(int age) {

         this.age = age;

    }

}

/**

 * 消费者,实现那个都是可以的

 * @author suzhiwei

 */

class RConsumer implements EventHandlerRData,WorkHandlerRData {

 

    @Override

    public void onEvent(RData data, long arg1, boolean arg2) throws Exception {

         onEvent(data);

    }

   

    @Override

    public void onEvent(RData data) throws Exception {

//       data.setAge(new Random().nextInt(),);

         System.out.println(data.getName() + " - " + data.getAge());

    }

}

 

最新回复(0)
/jishuBBmp2RhjlFA5xbP_2FP_2BexUwL3cqDFkUWmhUc4YoZoZvA_3D4795086
8 简首页