发布于2021-06-12 14:53 阅读(58) 评论(0) 点赞(19) 收藏(1)
导语
上次分享中讲到了比较常用的几个Map相关的集合。这篇分享主要来记录一下,剩下的关于队列的一些信息以及其他补充的信息。
JCTools 所使用的队列是基于Lamport的无等待SPSC算法,然后稍微做了改进。Lamport算法是在顺序一致性内存模型下可以实现单生产者/单消费者的循环缓冲区。在做了调整之后,在总存储顺序和其他较弱的一致性模型下也是正确的。使用CAS的方式完成实现,相较于使用锁的方式可以减少很多性能的开支,抛弃了锁的申请与切换,可以带来很大的性能提升。
SPSC-单一生产者单一消费者(有界和无界)
只有同步,没有互斥。只有一个生产者,不存在同时有两个生产者使用缓冲区资源造成数据不一致的状态。只需要控制好在缓冲区满的时候不再继续添加元素。有界与无界的区别是一个有缓冲区上限,一个缓冲区可以不断向后延伸。
有界PscArrayQueue
初始化
public SpscArrayQueue(int capacity) {
super(Math.max(capacity, 4));
}
传入值进行初始化,初始化时最小容量大小为4
Offer方法
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException();
} else {
E[] buffer = this.buffer;
long mask = this.mask;
long producerIndex = this.producerIndex;
if (producerIndex >= this.producerLimit && !this.offerSlowPath(buffer, mask, producerIndex)) {
return false;
} else {
long offset = calcElementOffset(producerIndex, mask);
UnsafeRefArrayAccess.soElement(buffer, offset, e);
this.soProducerIndex(producerIndex + 1L);
return true;
}
}
}
添加元素,添加的元素不可为空,添加的生产者的索引小于缓冲区的长度限制,CAS执行添加元素。只有当生产者索引大于缓冲区长度限制并且生产者索引的位置处有值时添加失败。
OfferSlowPath方法
private boolean offerSlowPath(E[] buffer, long mask, long producerIndex) {
int lookAheadStep = this.lookAheadStep;
if (null == UnsafeRefArrayAccess.lvElement(buffer, calcElementOffset(producerIndex + (long)lookAheadStep, mask))) {
this.producerLimit = producerIndex + (long)lookAheadStep;
} else {
long offset = calcElementOffset(producerIndex, mask);
if (null != UnsafeRefArrayAccess.lvElement(buffer, offset)) {
return false;
}
}
return true;
}
计算生产者索引处的偏移,获取偏移处缓冲区的值,如果为空就调整生产者的最大索引,如果不为空,则说明已经有值了,返回false。
Poll方法取值并弹出
public E poll() {
long consumerIndex = this.consumerIndex;
long offset = this.calcElementOffset(consumerIndex);
E[] buffer = this.buffer;
E e = UnsafeRefArrayAccess.lvElement(buffer, offset);
if (null == e) {
return null;
} else {
UnsafeRefArrayAccess.soElement(buffer, offset, (Object)null);
this.soConsumerIndex(consumerIndex + 1L);
return e;
}
}
获取消费者索引,计算消费者在缓冲区中的下标,获取到下标处的值,如果不为空,更新该处的值为null,调整消费者索引+1 ,返回元素。
Peek方法是获取元素不弹出
public E peek() {
return UnsafeRefArrayAccess.lvElement(this.buffer, this.calcElementOffset(this.consumerIndex));
}
Drain方法
public int drain(Consumer<E> c, int limit) {
E[] buffer = this.buffer;
long mask = this.mask;
long consumerIndex = this.consumerIndex;
for(int i = 0; i < limit; ++i) {
long index = consumerIndex + (long)i;
long offset = calcElementOffset(index, mask);
E e = UnsafeRefArrayAccess.lvElement(buffer, offset);
if (null == e) {
return i;
}
UnsafeRefArrayAccess.soElement(buffer, offset, (Object)null);
this.soConsumerIndex(index + 1L);
c.accept(e);
}
return limit;
}
给定一个值或者成为范围,清除缓冲区中这个范围的值
Fill方法
public int fill(Supplier<E> s, int limit) {
E[] buffer = this.buffer;
long mask = this.mask;
int lookAheadStep = this.lookAheadStep;
long producerIndex = this.producerIndex;
for(int i = 0; i < limit; ++i) {
long index = producerIndex + (long)i;
long lookAheadElementOffset = calcElementOffset(index + (long)lookAheadStep, mask);
if (null != UnsafeRefArrayAccess.lvElement(buffer, lookAheadElementOffset)) {
long offset = calcElementOffset(index, mask);
if (null != UnsafeRefArrayAccess.lvElement(buffer, offset)) {
return i;
}
UnsafeRefArrayAccess.soElement(buffer, offset, s.get());
this.soProducerIndex(index + 1L);
} else {
int lookAheadLimit = Math.min(lookAheadStep, limit - i);
for(int j = 0; j < lookAheadLimit; ++j) {
long offset = calcElementOffset(index + (long)j, mask);
UnsafeRefArrayAccess.soElement(buffer, offset, s.get());
this.soProducerIndex(index + (long)j + 1L);
}
i += lookAheadLimit - 1;
}
}
给一个数组和范围,fill方法会将给的缓冲区范围内的值填充到有界缓冲区中,添加了等待策略的drain和fill,drain在循环等待4096次后放弃,fill在循环等待lookAheadStep次后放弃。
父类是BaseSpscLinkedArrayQueue,采用链表式的队列,方便无界的扩展。初始化时给了很大的值,而且是2的倍数。
&emps;其中放值取值等都是使用父类BaseSpscLinkedArrayQueue的方法,只有一个初始化和offerColdPath两个私有方法,offerColdPath方法是在offer添加元素缓冲区已满的情况下调用,方法中判断空间是否充足,缓冲区是否还有空间,都不满足即空间充足,缓冲区没有空间然后申请新的空间,并把元素添加进去。
MPSC-多生产者单一消费者(有界和无界)
多生产者需要考虑到生产者的互斥问题,不仅要控制好不能在空间已满的情况下添加并且还要考虑到是否有其他生产者在同时修改缓冲值,如果有修改的需要循环的方式重新尝试。
有界
构造方法使用的是父类的构造方法,环形的数组队列,初始容量取比传入数值略大的2的次方值
Offer方法
public boolean offer(E e) {
if (null == e) {
传入的元素不能为空
throw new NullPointerException();
} else {
long mask = this.mask;
生产者的最大索引,初始时为传入的容量
long producerLimit = this.lvProducerLimit();
long pIndex;
long offset;
do {
生产者索引
pIndex = this.lvProducerIndex();
如果生产者索引达到最大值,要调整消费者的大小
if (pIndex >= producerLimit) {
获取消费者索引
offset = this.lvConsumerIndex();
调整生产者上限为消费者索引+容器大小
producerLimit = offset + mask + 1L;
调整完依然还是达到上限,返回false
if (pIndex >= producerLimit) {
return false;
}
更新生产者上限
this.soProducerLimit(producerLimit);
}
Cas更新成功就跳出循环,失败就再次尝试
} while(!this.casProducerIndex(pIndex, pIndex + 1L));
计算索引,插入到相应的位置
offset = calcElementOffset(pIndex, mask);
UnsafeRefArrayAccess.soElement(this.buffer, offset, e);
return true;
}
}
Poll方法
public E poll() {
获取消费者索引
long cIndex = this.lpConsumerIndex();
计算在数组中的位置
long offset = this.calcElementOffset(cIndex);
E[] buffer = this.buffer;
获取缓冲区此位置的元素值
E e = UnsafeRefArrayAccess.lvElement(buffer, offset);
if (null == e) {
判断数组是否为空
if (cIndex == this.lvProducerIndex()) {
return null;
}
do {
e = UnsafeRefArrayAccess.lvElement(buffer, offset);
} while(e == null);
}
UnsafeRefArrayAccess.spElement(buffer, offset, (Object)null);
this.soConsumerIndex(cIndex + 1L);
return e;
}
初始化通过父类的有参构造方法
public MpscUnboundedArrayQueue(int chunkSize) {
super(chunkSize);
}
BaseMpscLinkedArrayQueue.Class
public BaseMpscLinkedArrayQueue(int initialCapacity) {
验证初始容量值,必须大于等于2
RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
传入初始参数计算得出最接近initialCapacity的2的N次方值
int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
通过p2capacity计算mask值,用作扩容大小
long mask = (long)(p2capacity - 1 << 1);
默认分配p2capacity+1大小的缓冲区
E[] buffer = CircularArrayOffsetCalculator.allocate(p2capacity + 1);
this.producerBuffer = buffer;
this.producerMask = mask;
this.consumerBuffer = buffer;
this.consumerMask = mask;
用mask作为初始化队列的limit值,producerIndex大于limit就扩容
this.soProducerLimit(mask);
}
RangeUtil.class
public static int checkGreaterThanOrEqual(int n, int expected, String name) {
if (n < expected) {
throw new IllegalArgumentException(name + ": " + n + " (expected: >= " + expected + ')');
} else {
return n;
}
}
要求队列的值不小于expected的大小,但是这里expected就为2
Offer方法
public boolean offer(E e) {
if (null == e) { 添加的元素e不能为空
throw new NullPointerException();
} else {
while(true) {
while(true) {
队列的上限
long offset = this.lvProducerLimit();
生产者指针
long pIndex = this.lvProducerIndex();
if ((pIndex & 1L) != 1L) {
long mask = this.producerMask;
E[] buffer = this.producerBuffer;
如果上限小于生产者指针时,去扩容
if (offset <= pIndex) {
通过offerSlowPath返回状态值来查看怎么处理这个待添加的元素
int result = this.offerSlowPath(mask, pIndex, offset);
switch(result) {
case 0:
default:
break;
case 1:
continue;
case 2:
队列已满,返回false
return false;
case 3:
扩容
this.resize(mask, buffer, pIndex, e);
return true;
}
}
生产者指针没有超过上限的时候,cas方式对生产者指针+2
if (this.casProducerIndex(pIndex, pIndex + 2L)) {
增加成功之后,获取添加元素的位置,将新元素进行添加
offset = LinkedArrayQueueUtil.modifiedCalcElementOffset(pIndex, mask);
UnsafeRefArrayAccess.soElement(buffer, offset, e);
return true;
}
}
}
}
}
}
Resize方法
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e) {
获取oldBuffer长度值
int newBufferLength = this.getNextBufferSize(oldBuffer);
将oldBuffer长度值设置给新的缓冲区
E[] newBuffer = CircularArrayOffsetCalculator.allocate(newBufferLength);
新建的缓冲区赋值给生产者缓冲区
this.producerBuffer = newBuffer;
int newMask = newBufferLength - 2 << 1;
this.producerMask = (long)newMask;
根据oldmask计算偏移位置
long offsetInOld = LinkedArrayQueueUtil.modifiedCalcElementOffset(pIndex, oldMask);
根据newmask计算偏移位置
long offsetInNew = LinkedArrayQueueUtil.modifiedCalcElementOffset(pIndex, (long)newMask);
将新元素e放置在新缓冲区offsetInNew的位置
UnsafeRefArrayAccess.soElement(newBuffer, offsetInNew, e);
将新的缓冲区放到旧的缓冲区nextArrayOffset(oldmask)处
将oldBuffer中最后一个元素指向新的缓冲区newBuffer,构成单向链表的样子
UnsafeRefArrayAccess.soElement(oldBuffer, this.nextArrayOffset(oldMask), newBuffer);
获取消费者索引
long cIndex = this.lvConsumerIndex();
获取可用容量
long availableInQueue = this.availableInQueue(pIndex, cIndex);
RangeUtil.checkPositive(availableInQueue, "availableInQueue");
扩容生产者上限,扩容大小为新增的缓冲区的大小
this.soProducerLimit(pIndex + Math.min((long)newMask, availableInQueue));
生产者指针加2
this.soProducerIndex(pIndex + 2L);
当在一个缓冲区中遇到jump,就应该考虑取下一个缓冲区取值,衔接新老缓冲区的标志。
UnsafeRefArrayAccess.soElement(oldBuffer, offsetInOld, JUMP);
}
Poll方法
public E poll() {
获取消费者缓冲区
E[] buffer = this.consumerBuffer;
获取消费者指针
long index = this.consumerIndex;
long mask = this.consumerMask;
根据消费者指针和掩码计算从哪里开始取值
long offset = LinkedArrayQueueUtil.modifiedCalcElementOffset(index, mask);
取值
Object e = UnsafeRefArrayAccess.lvElement(buffer, offset);
if (e == null) {
取得值为空,先判断消费者指针和生产者指针是不是相同,相同说明队列为空
if (index == this.lvProducerIndex()) {
return null;
}
do {
e = UnsafeRefArrayAccess.lvElement(buffer, offset);
} while(e == null);
}
if (e == JUMP) {
如果元素为jump,需要取下一个缓冲区的值
E[] nextBuffer = this.getNextBuffer(buffer, mask);
return this.newBufferPoll(nextBuffer, index);
} else {
一般情况下,设置offset位置处值为null,消费者指针加2
UnsafeRefArrayAccess.soElement(buffer, offset, (Object)null);
this.soConsumerIndex(index + 2L);
return e;
}
}
Drain方法和pscArrayQueue的drain方法相同
初始化调用父类的构造器
public ConcurrentCircularArrayQueue(int capacity) {
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
this.mask = (long)(actualCapacity - 1);
this.buffer = CircularArrayOffsetCalculator.allocate(actualCapacity);
}
计算得出略大于传入值的2次方值作为队列的大小
Offer方法
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}
final E[] buffer = this.buffer;
final long mask = this.mask;
final long currProducerIndex = lvProducerIndex();
final long offset = calcElementOffset(currProducerIndex, mask);
if (null != lvElement(buffer, offset))
{
long size = currProducerIndex - lvConsumerIndex();
if (size > mask)
{
return false;
}
else
{
// spin wait for slot to clear, buggers wait freedom
while (null != lvElement(buffer, offset))
{
;
}
}
}
spElement(buffer, offset, e);
// single producer, so store ordered is valid. It is also required to correctly publish the element
// and for the consumers to pick up the tail value.
soProducerIndex(currProducerIndex + 1);
return true;
}
当前获取到的生产者指针指向的位置不为空时,循环直到该位置为空,然后将值插入到该位置。
Poll方法
public E poll()
{
long currentConsumerIndex;
获取共享缓存中的生产者指针
long currProducerIndexCache = lvProducerIndexCache();
do
{
获取消费者指针
currentConsumerIndex = lvConsumerIndex();
如果消费者指针大于当前共享缓存的生产者指针
if (currentConsumerIndex >= currProducerIndexCache)
{
long currProducerIndex = lvProducerIndex();
如果当前生产者指针确实小于等于消费者指针,说明缓冲区中没有元素
if (currentConsumerIndex >= currProducerIndex)
{
return null;
}
else
{
当前生产者指针被更新,或者说刚刚添加了新值,同步共享缓存中的生产者指针为当前生产者指针,并保存
currProducerIndexCache = currProducerIndex;
svProducerIndexCache(currProducerIndex);
}
}
}
循环,当前消费者指针被其他消费者更改时进行循环尝试
while (!casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1));
// consumers are gated on latest visible tail, and so can't see a null value in the queue or overtake
// and wrap to hit same location.
正常执行下来会把消费者指针位置的值删除掉
return removeElement(buffer, currentConsumerIndex, mask);
}
Peek方法,大概套路是获取消费者指针,判断和共享缓存中的生产者指针的距离,如果相等或大于,就获取当前生产者指针的位置,再进行判断,如果还是一样的结果,就说明没有新值的添加,队列确实为空,有新值加入了,生产者指针就会更改,保存到共享缓存中,此时如果消费者指针被其他消费者更改,循环尝试。没有问题后返回元素值。
Offer方法
public boolean offer(final E e)
{
if (null == e)
{
throw new NullPointerException();
}
final long mask = this.mask;
final long capacity = mask + 1;
final long[] sBuffer = sequenceBuffer;
long pIndex;
long seqOffset;
long seq;
long cIndex = Long.MIN_VALUE;// start with bogus value, hope we don't need it
do
{
pIndex = lvProducerIndex();
seqOffset = calcSequenceOffset(pIndex, mask);
获取到生产者指针在队列中的位置值
seq = lvSequence(sBuffer, seqOffset);
// consumer has not moved this seq forward, it's as last producer left
计算出的位置小于指针的位置,说明队列中的位置需要移动了
if (seq < pIndex)
{
// Extra check required to ensure [Queue.offer == false iff queue is full]
说明队列满了
if (pIndex - capacity >= cIndex && // test against cached cIndex
pIndex - capacity >= (cIndex = lvConsumerIndex()))
{ // test against latest cIndex
return false;
}
else
{
队列未满,队列中的位置+1
seq = pIndex + 1; // (+) hack to make it go around again without CAS
}
}
}
其他生产者调整过队列中位置或者生产者指针添加失败都会进行循环重试
while (seq > pIndex || // another producer has moved the sequence(or +)
!casProducerIndex(pIndex, pIndex + 1)); // failed to increment
soElement(buffer, calcElementOffset(pIndex, mask), e);
soSequence(sBuffer, seqOffset, pIndex + 1); // seq++;
return true;
}
Poll方法类似offer方法,做了对生产者指针位置的CAS校验,做了对消费者指针位置的CAS校验
多生产者就对多生产者可能出现并发现问题的地方执行CAS检测,多消费者就对多消费这可能出现并发问题的地方进行CAS检测。单生产者或单消费者就需要做好同步控制。
原文链接:https://blog.csdn.net/nihui123/article/details/117648989
作者:java战神
链接:http://www.javaheidong.com/blog/article/222341/028d875044a755073aea/
来源:java黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 java黑洞网 All Rights Reserved 版权所有,并保留所有权利。京ICP备18063182号-2
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!