广水天气,今日影视,咖啡-雷竞技app_雷竞技app官网

频道:小编推荐 日期: 浏览:308

RingBuffer 怎样确保数据不丢掉

由于ringbuffer是一个环形的行列,那么出产者和顾客在遍历这个行列的时分,怎样制衡呢?

1、出产快,消费慢,数据丢掉?

出产者速度画债肉偿过快,导致一个目标还没消费完,就循环出产了一个新的目标要参加ringbuffer,导致消费不完整,构成数据丢掉?

咱们留意到,在咱们获取出产者下一个方位的时分,是经过ringbuffer的next办法,而这个next办法是调用了sequencer的next办法

这个目标,在咱们创立disruptor目标的时分,创立的

所以这个ringbuffer便是disruptor中的sequencer目标,那么在进行获取next的时分,这里是怎样获取下一个的呢?是否会对这个出产获取下一个序列进行相应的等候战略,防止发作相应的搅扰!!!

这个各位看官还需多看看里边的代码以及封装(特别是封装,真是九转十八弯),多了解,我这绕着绕着很简略就绕晕了,刚开始也是云里雾里。

EventProcessor接口概览

EventProcessor望文生义,便是事情处理器(handle和process都能够翻译为“处理”,可是process侧重于机器的处理,而handle侧重于有人工的处理,所以运用handle表明用户逻辑的处理,运用process表明机器的处理),这个接口有两个完成类,分别是WorkProcessor和BatchEventProcessor,它们对应的逻辑处理顾客分别是EventHandler和WorkHandler。下面是EventProcessor的UML类图及EventHandler和EventProcessor的接口界说。



/**
* Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
*
* @param event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcess错爱天使or#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
* 处理事情的回调接口
*/
public interface EventHandler
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequ万界造化珠ence of the event being processed
* @param endOfBatch flag to indicate if this is the last e罗萍简历vent in a batch from the {@link RingBuffer}
* @throws Exception if the EventHa广水气候,今天影视,咖啡-雷竞技app_雷竞技app官网ndler would like the exception handled further up the chain.
*/
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
/**
* EventProcessors waitFor events to become available for consumption from the {@link RingBuf柯润东fer}
*


* An EventProcessor will generally be associated with a Thread for execution.
* 事情履行器,等女生湿了待RingBuffer有可用消费事情。一个事情处理器相关一个履行线程
*/
public interface EventProcessor extends Runnable
{
/**
* Get a reference to the {@link Sequence} being used by this {@link EventProcessor}.
*
* @return reference to the {@link Sequence} for this {@link EventProcessor}
*/
Sequence getSequence();
/**
* Signal that this EventProcessor should stop when it has finished consuming at the next 热河杆子帮clean break.
* It will call {@link SequenceBarrier#alert()} to notify the thread to check status.
*/
void halt();
boolean isRunning();
}

EventProcessor接口承继了Runnable接口,首要有两种完成:单线程批量处理BatchEventProcessor和多线程处理WorkProcessor

在运用Disruptor协助类构建顾客时,运用handleEventsWith办法传入多个EventHandler,内部运用多个BatchEventProcessor相关多个线程履行。这种状况相似JMS中的发布订阅方式,同一事情会被多个顾客并行消费。适用于同一事情触发多种操作。

而使kb2699988用Disruptor的handleEventsWithWorkerPool传入多个WorkHandler时,内部运用多个WorkProcessor相关多个线程履行。这种状况相似JMS的点对点方式,同一事情会被一组顾客其中之一消费。适用于提高顾客并行处理才干。

消费技能完成

咱们先回忆下Disruptor顾客的两个特色:顾客依靠图(即下文所谓的“消费链”)和事情多播。

假定现在有A,B,C,D四个顾客,它们都能组成什么样的方式呢?从很多的排列组合中,我挑了4组比较有代表性的消费链方式。



image.png

  • 第1组中,顾客A消费按成后,B、C、D可一起消费;
  • 第2组中,顾客A、B、C、D次序消费;
  • 第3组中,顾客A、B次序消费后,C、D一起消费;
  • 第4组中,顾客A在消费完成后,B和C能够一起消费,可是有必要在都消费完成后,D才干消费。

标号为1、3、4的消费链都运用了事情多播,可见事情多播归于消费链的一种组合方式。留意,在上面4种组合中,每个组合的每一水平行,都归于一个顾客组。

这些还仅仅较为简略的消费链组成,实践中消费链或许会更杂乱。

那么在Disruptor内部是怎样完成消费链的呢?

咱们能够先考虑下。假如想把独立的顾客组成消费链,那么后方的顾客(组)必定要知道在它前方的顾客(组)的处理状况,不然就做不到次序消费。一起,顾客也要了解出产者的方位,来判别是否有可用事情。之前咱们剖析出产者代码的时分,现已讲过,出产者为了不掩盖没有消费彻底的事情,有必要知道最慢顾客的处理状况

做到了这些才会有才干去广水气候,今天影视,咖啡-雷竞技app_雷竞技app官网操控顾客组成消费链。下面让咱们详细看Disruptor中的完成。

单出产者,多顾客方式。多顾客关于音讯不重复消费。

package liuqiang.complex.multi;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerTy广水气候,今天影视,咖啡-雷竞技app_雷竞技app官网pe;
import liuqiang.complex.common.*;
import java.util.c鞠重理oncurrent.Executors;
public class Main3 {
//单出产者,多顾客方式。多顾客关于音讯不重复消费。例如:1线程消费了音讯0,则2线程只能从0后边的音讯消费,不能对音讯0进行消费。
public static void main(String[] args) throws Exception {
EventFactory factory = new OrderFactory();
int ringBufferSize = 1024 * 1024;
Disruptor disruptor =
new Disruptor(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWa凤霸全国txtitStrategy());
/*
* 该办法传入的顾客需求完成WorkHandler接口,办法的内部完成是:先创立WorkPool,然后封装WorkPool为EventHandlerPool回来。
* 顾客1、2关于音讯的消费有时有竞赛,确保同一音讯只能有一个顾客消费
*/
disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2"));
disruptor.start();
RingBuffer ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
//单出产者,出产3条数据
for (int l = 0; l < 3; l++) {
producer.onData(l + "");
}
//为了确保顾客线程现已发动,留足满足的时刻。详细原因详见另一篇博客:disruptor的shutdown失效问题
Thread.sleep(1000);
disruptor.shutdown();
}
}

调用handleEventsWithWorkerPool构成WorkerPool,并进一步封装成EventHandlerGroup。关于同一条音讯,两顾客不重复消费。

或许输出成果如下:

OrderHandler1 1,消费信息:0

OrderHandler1 2,消费信息:1

OrderHandler1 1,消费信息:2

顾客可用序列屏障-SequenceBarrier

咱们要点看一下SequenceBarrier,可直译为“序列屏障”。SequenceBarrier的首要作用是和谐获取顾客可处理到的最大序号,内部持有着出产者和其依靠的顾客序列。它的接口界说如下。

public interface SequenceBarrier
{
/**
* Wait for the given sequence to be available for consumption.

* 等候指定序列可用
* @param 单纯蓝优惠码sequence to wait for
* @return the sequence up to which is available
* @throws AlertException if a status change has occurred for the Disruptor
* @throws InterruptedException if the thread needs awaking on a condition variable.
* @throws TimeoutException
*
*/
long waitFor(long sequence) throws谭静逝世现场相片 AlertException, InterruptedException, TimeoutException;
/**
* Get指铐 the current cursor value that can be read.

* 获取当时可读游标值
*
* @return value of the curso广水气候,今天影视,咖啡-雷竞技app_雷竞技app官网r for entries that have been published.
*
*/
long getCursor();
/**
* The current alert status for the barrier.

* 当时的alert状况
*
* @return true if in alert otherwise false.
*/
boolean isAlerted();
/**
* Alert the {@link EventProcessor}s of a status change广水气候,今天影视,咖啡-雷竞技app_雷竞技app官网 and stay in this status until cleared.

*
* 告诉顾客状况改变。当调用EventProcessor#halt()将调用此办法。
*/
void alert();
/**
* Clear the current alert status.

* 清楚alert状况
*/
void clearAlert();
/**
* Check if an alert has been raised and throw an {@link AlertException} if it has.
* 查看是否发作alert,发作将抛出反常
* @throws AlertException if alert has been raised.
*/
void checkAlert() throws AlertException;
}
SequenceBarrier实例引证被EventProcessor持有,用于等候并获取可用的消费事情,首要体现在waitFor这个办法。

要完成这个功用,需求3点条件:

  1. 知道出产者的方位。
  2. 由于Disruptor支撑顾客链,在不同的顾客组之间,要确保后边的消 费者组只要在前顾客组中的顾客都处理完毕后,才干进行处理。
  3. 暂时没有事情可消费,在等候可用消费时,还需求运用某种等候战略进行等候。

看下SequenceBarrier完成类ProcessingSequenceBarrier的代码是怎样完成waitFor办法。

final class ProcessingSequenceBarrier implements SequenceBarrier
{
private final WaitStrategy waitStrategy; // 等候可用消费时,指定的等候战略
private final Sequence dependentSequence; // 依靠的上组顾客的序号,假如当时为第一组则为cursorSequence(即出产者发布游标序列),不然运用FixedSequenceGroup封装上组顾客序列
private volatile boolean alerte广水气候,今天影视,咖啡-雷竞技app_雷竞技app官网d = false; // 当触发halt时,将符号alerted为true
private final Sequence cursorSe段晓岩quence; // A广水气候,今天影视,咖啡-雷竞技app_雷竞技app官网bstractSequencer中的cursor引证,记载当时发布者发布的最新方位
private final Sequencer sequencer; // MultiProducerSequencer 或 SingleProducerSequencer
public ProcessingSequenceBarrier(
final Sequenc许念游天恒er sequencer,
final WaitStrategy waitStrategy,
夫妻同床final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) // 依靠的上一组序列长度,第一次是0
{
dependentSequence = cursorSequence;
}
else // 将上一组序列数组复制成新数组保存,引证不变
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{蒙眼王后
// 查看是否中止服务
checkAlert();
// 获取最大可用序号 sequence为给定序号,一般为亚洲男同志当时序号+1,cursorSequence记载出产者最新方位,
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this)丝袜内裤;
if (availableSequence < sequence)
{
return availableSequence;
}
// 回来已发布最高的序列值,将对每个序号进行校验
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
// ...
}