disruptor 结构
伪共享
value值始终独占一个缓存行,极限条件下为第一个或者最后一个元素,防止更新其他值时重刷导致性能下降
Sequence - AtomicLong增强(缓存行填充) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class Sequence extends RhsPadding { static final long INITIAL_VALUE = -1L ; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; static { UNSAFE = Util.getUnsafe(); try { VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value" )); }catch (final Exception e){ throw new RuntimeException (e); } } public Sequence () { this (INITIAL_VALUE); } public Sequence (final long initialValue) { UNSAFE.putOrderedLong(this , VALUE_OFFSET, initialValue); } public long get () { return value; } public void set (final long value) { UNSAFE.putOrderedLong(this , VALUE_OFFSET, value); } public void setVolatile (final long value) { UNSAFE.putLongVolatile(this , VALUE_OFFSET, value); } public boolean compareAndSet (final long expectedValue, final long newValue) { return UNSAFE.compareAndSwapLong(this , VALUE_OFFSET, expectedValue, newValue); } public long incrementAndGet () { return addAndGet(1L ); } public long addAndGet (final long increment) { long currentValue; long newValue; do { currentValue = get(); newValue = currentValue + increment; } while (!compareAndSet(currentValue, newValue)); return newValue; } @Override public String toString () { return Long.toString(get()); } }
Producer
Cursored接口 实现此接口的类,记录sequence位置的类。例如,生产者在生产消息时通过访问getCursor来定位当前ringBuffer下一个生产的位置,这个位置需要实时更新。
Sequenced接口 实现此接口类,实现一个有序的存储结构,也就是RingBuffer的一个特性。
getBufferSize 获取ringBuffer的大小
hasAvailableCapacity 判断空间是否足够
remainingCapacity 获取ringBuffer的剩余空间
next 申请下一个或者n个sequence(value)作为生产event的位置
tryNext 尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException
publish 发布Event
Sequencer接口 Sequencer接口,扩展了Cursored和Sequenced接口。在前两者的基础上,增加了消费与生产相关的方法。
INITIAL_CURSOR_VALUE: -1 为 sequence的起始值
claim: 申请一个特殊的Sequence,只有设定特殊起始值的ringBuffer时才会使用(一般是多个生产者时才会使用)
isAvailable:非阻塞,验证一个sequence是否已经被published并且可以消费
addGatingSequences:将这些sequence加入到需要跟踪处理的gatingSequences中
removeGatingSequence:移除某个sequence
newBarrier:给定一串需要跟踪的sequence,创建SequenceBarrier。SequenceBarrier是用来给多消费者确定消费位置是否可以消费用的
getMinimumSequence:获取这个ringBuffer的gatingSequences中最小的一个sequence
getHighestPublishedSequence:获取最高可以读取的Sequence
newPoller:目前没用,不讲EventPoller相关的内容(没有用到)
GatingSequence :RingBuffer的头由一个名字为cursor的Sequence对象维护,用来协调生产者向RingBuffer中填充数据。表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的话,队列尾的维护就是无锁的。但是,在生产者方确定RingBuffer是否已满就需要跟踪更多信息。为此,GatingSequence用来跟踪相关Sequence。
AbstractSequencer AbstractSequencer实现了Sequencer接口,有五个Field:
1 2 3 4 5 6 7 8 9 10 private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences" );protected final int bufferSize;protected final WaitStrategy waitStrategy;protected final Sequence cursor = new Sequence (Sequencer.INITIAL_CURSOR_VALUE);protected volatile Sequence[] gatingSequences = new Sequence [0 ];
该类实现了上面接口中的一些方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Override public final long getCursor () { return cursor.get(); } @Override public final int getBufferSize () { return bufferSize; } @Override public final void addGatingSequences (Sequence... gatingSequences) { SequenceGroups.addSequences(this , SEQUENCE_UPDATER, this , gatingSequences); } @Override public boolean removeGatingSequence (Sequence sequence) { return SequenceGroups.removeSequence(this , SEQUENCE_UPDATER, sequence); } @Override public long getMinimumSequence () { return Util.getMinimumSequence(gatingSequences, cursor.get()); } @Override public SequenceBarrier newBarrier (Sequence... sequencesToTrack) { return new ProcessingSequenceBarrier (this , waitStrategy, cursor, sequencesToTrack); } @Override public <T> EventPoller<T> newPoller (DataProvider<T> dataProvider, Sequence... gatingSequences) { return EventPoller.newInstance(dataProvider, this , new Sequence (), cursor, gatingSequences); }
SingleProducerSequencer(线程不安全) 1 2 3 4 protected long nextValue = Sequence.INITIAL_VALUE;protected long cachedValue = Sequence.INITIAL_VALUE;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 @Override public boolean hasAvailableCapacity (int requiredCapacity) { return hasAvailableCapacity(requiredCapacity, false ); } private boolean hasAvailableCapacity (int requiredCapacity, boolean doStore) { long nextValue = this .nextValue; long wrapPoint = (nextValue + requiredCapacity) - bufferSize; long cachedGatingSequence = this .cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue){ if (doStore) { cursor.setVolatile(nextValue); } long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); this .cachedValue = minSequence; if (wrapPoint > minSequence){ return false ; } } return true ; } @Override public long next () { return next(1 ); }@Override public long next (int n) { if (n < 1 ){ throw new IllegalArgumentException ("n must be > 0" ); } long nextValue = this .nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this .cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue){ cursor.setVolatile(nextValue); long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))){ LockSupport.parkNanos(1L ); } this .cachedValue = minSequence; } this .nextValue = nextSequence; return nextSequence; } @Override public long tryNext () throws InsufficientCapacityException { return tryNext(1 ); }@Override public long tryNext (int n) throws InsufficientCapacityException{ if (n < 1 ){ throw new IllegalArgumentException ("n must be > 0" ); } if (!hasAvailableCapacity(n, true )){ throw InsufficientCapacityException.INSTANCE; } long nextSequence = this .nextValue += n; return nextSequence; } @Override public long remainingCapacity () { long nextValue = this .nextValue; long consumed = Util.getMinimumSequence(gatingSequences, nextValue); long produced = nextValue; return getBufferSize() - (produced - consumed); } @Override public void claim (long sequence) { this .nextValue = sequence; }@Override public void publish (long sequence) { cursor.set(sequence); waitStrategy.signalAllWhenBlocking(); } @Override public void publish (long lo, long hi) { publish(hi); }@Override public boolean isAvailable (long sequence) { return sequence <= cursor.get(); } @Override public long getHighestPublishedSequence (long lowerBound, long availableSequence) { return availableSequence; }
MultiProducerSequencer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static final Unsafe UNSAFE = Util.getUnsafe();private static final long BASE = UNSAFE.arrayBaseOffset(int [].class);private static final long SCALE = UNSAFE.arrayIndexScale(int [].class);private final Sequence gatingSequenceCache = new Sequence (Sequencer.INITIAL_CURSOR_VALUE);private final int [] availableBuffer;private final int indexMask;private final int indexShift;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 public boolean hasAvailableCapacity (final int requiredCapacity) { return hasAvailableCapacity(gatingSequences, requiredCapacity, cursor.get()); } private boolean hasAvailableCapacity (Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) { long wrapPoint = (cursorValue + requiredCapacity) - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue){ long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); gatingSequenceCache.set(minSequence); if (wrapPoint > minSequence){ return false ; } } return true ; } @Override public long next () { return next(1 ); }@Override public long next (int n) { if (n < 1 ){ throw new IllegalArgumentException ("n must be > 0" ); } long current; long next; do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){ long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence){ LockSupport.parkNanos(1 ); continue ; } gatingSequenceCache.set(gatingSequence); }else if (cursor.compareAndSet(current, next)){ break ; } }while (true ); return next; } @Override public void publish (final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } @Override public void publish (long lo, long hi) { for (long l = lo; l <= hi; l++){ setAvailable(l); } waitStrategy.signalAllWhenBlocking(); } private void setAvailable (final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } private void setAvailableBufferValue (int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); } @Override public boolean isAvailable (long sequence) { int index = calculateIndex(sequence); int flag = calculateAvailabilityFlag(sequence); long bufferAddress = (index * SCALE) + BASE; return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag; } @Override public long getHighestPublishedSequence (long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { if (!isAvailable(sequence)){ return sequence - 1 ; } } return availableSequence; } private int calculateAvailabilityFlag (final long sequence) { return (int ) (sequence >>> indexShift); } private int calculateIndex (final long sequence) { return ((int ) sequence) & indexMask; }
环形无锁队列RingBuffer
RingBuffer类中保存了整个RingBuffer每个槽(entry或者slot)的Event对象,对应的field是private final Object[] entries;这些对象只在RingBuffer初始化时被建立,之后就是修改这些对象(初始化Event和填充Event),并不会重新建立新的对象。RingBuffer可以有多生产者和消费者,所以这个entries会被多线程访问频繁的,但不会修改(因为不会重新建立新的对象,这个数组保存的是对对象的具体引用,所以不会变)
运算取模
m % 2^n = m & ( 2^n - 1 ) //RingBuffer中数组大小必须为2的倍数
源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 abstract class RingBufferFields <E> extends RingBufferPad { private static final int BUFFER_PAD; private static final long REF_ARRAY_BASE; private static final Unsafe UNSAFE = Util.getUnsafe(); static { final int scale = UNSAFE.arrayIndexScale(Object[].class); if (4 == scale){ REF_ELEMENT_SHIFT = 2 ; } else if (8 == scale){ REF_ELEMENT_SHIFT = 3 ; } else { throw new IllegalStateException ("Unknown pointer size" ); } BUFFER_PAD = 128 / scale; REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT); } private final long indexMask; private final Object[] entries; protected final int bufferSize; protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer){ this .sequencer = sequencer; this .bufferSize = sequencer.getBufferSize(); if (bufferSize < 1 ){ throw new IllegalArgumentException ("bufferSize must not be less than 1" ); } if (Integer.bitCount(bufferSize) != 1 ){ throw new IllegalArgumentException ("bufferSize must be a power of 2" ); } this .indexMask = bufferSize - 1 ; this .entries = new Object [sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill (EventFactory<E> eventFactory) { for (int i = 0 ; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } @SuppressWarnings("unchecked") protected final E elementAt (long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } }
RingBuffer中方法大部分都是在包装EventTranslator以及Sequencer中的方法。
构造一个RingBuffer需要如下元素:实现EventFactory的Event的工厂,实现Sequencer的生产者,等待策略waitStrategy还有bufferSize。
EventTranslator的作用主要是发布Event,还有EventTranslatorTwoArg,EventTranslatorVararg等有类似作用
WaitStrategy
BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒
BusySpinWaitStrategy:线程一直自旋等待,比较耗CPU。
LiteBlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,比BlockingWaitStrategy要轻,某些情况下可以减少阻塞的次数。
PhasedBackoffWaitStrategy:根据指定的时间段参数和指定的等待策略决定采用哪种等待策略。
SleepingWaitStrategy:可通过参数设置,使线程通过Thread.yield()主动放弃执行,通过线程调度器重新调度;或一直自旋等待。
TimeoutBlockingWaitStrategy:通过参数设置阻塞时间,如果超时则抛出异常。
YieldingWaitStrategy: 通过Thread.yield()主动放弃执行,通过线程调度器重新调度。
waitStrategy接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface WaitStrategy { long waitFor (long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException; void signalAllWhenBlocking () ; }
BlockingWaitStrategy 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public final class BlockingWaitStrategy implements WaitStrategy { private final Lock lock = new ReentrantLock (); private final Condition processorNotifyCondition = lock.newCondition(); @Override public long waitFor (long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException{ long availableSequence; if (cursorSequence.get() < sequence){ lock.lock(); try { while (cursorSequence.get() < sequence){ barrier.checkAlert(); processorNotifyCondition.await(); } } finally { lock.unlock(); } } while ((availableSequence = dependentSequence.get()) < sequence){ barrier.checkAlert(); } return availableSequence; } @Override public long waitFor (final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; } @Override public void signalAllWhenBlocking () { lock.lock(); try { processorNotifyCondition.signalAll(); }finally { lock.unlock(); } } }
SequenceBarrier
SequenceBarrier只有一个实现类,就是ProcessingSequenceBarrier。 ProcessingSequenceBarrier由生产者Sequencer,生产定位cursorSequence,等待策略waitStrategy还有一组依赖sequence:dependentSequence组成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 final class ProcessingSequenceBarrier implements SequenceBarrier { private final WaitStrategy waitStrategy; private final Sequence dependentSequence; private volatile boolean alerted = false ; private final Sequence cursorSequence; private final Sequencer sequencer; ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences){ this .sequencer = sequencer; this .waitStrategy = waitStrategy; this .cursorSequence = cursorSequence; if (0 == dependentSequences.length){ dependentSequence = cursorSequence; }else { dependentSequence = new FixedSequenceGroup (dependentSequences); } } @Override public long waitFor (final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this ); if (availableSequence < sequence) { return availableSequence; } return sequencer.getHighestPublishedSequence(sequence, availableSequence); } @Override public long getCursor () { return dependentSequence.get(); } @Override public boolean isAlerted () { return alerted; } @Override public void alert () { alerted = true ; waitStrategy.signalAllWhenBlocking(); } @Override public void clearAlert () { alerted = false ; } @Override public void checkAlert () throws AlertException{ if (alerted) { throw AlertException.INSTANCE; } } }