程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

结合AQS对Semaphore进行源码解读

发布于2021-05-29 20:31     阅读(1154)     评论(0)     点赞(21)     收藏(3)


一、概述

源码解读java 并发包 AQS(阳哥讲解整理)通过ReentrantLock对AQS的独占模式非公平锁进行了讲解,本篇文章,我们将已Semaphore讲解下共享模式下的AQS实现
Semaphore就是信号量,我们可以用一个桥梁上可通行的最大车数来表示,入某大桥一次只能承载2辆汽车同时通行,当桥梁上的车小于2时,其他车辆才可以进入

二、Semaphore框架

  • acquire() //开启通道,默认为1
  • release() //释放通道,默认为1
  • acquire(int arg) //开启通道,动态开启
  • release(int arg) //释放通道,动态释放

三、源码讲解

我们来模拟一个案例,5个线程A,B,C,D,E去执行任务,定义一个Semaphore,通道为2,5个线程开始只能两个抢到

Semaphore semaphore = new Semaphore(2);
		new Thread(()->{
			try {
                semaphore.acquire();
                System.out.println("A thread come in ---------");
                TimeUnit.MINUTES.sleep(5);
                semaphore.release();
			}catch (Exception e){
			}
		},"A").start();

		new Thread(()->{
			try {
				semaphore.acquire();
                System.out.println("B thread come in ---------");
                TimeUnit.MINUTES.sleep(5);
                semaphore.release();
			}catch (Exception e){
			}
		},"B").start();

		new Thread(()->{
			try {
                semaphore.acquire();
                System.out.println("C thread come in ---------");
                TimeUnit.MINUTES.sleep(5);
                semaphore.release();
			}catch (Exception e){
			}
		},"C").start();
		等等。。。。。。

A,B进入acquire()方法
Semaphore .java

	public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

AbstractQueuedSynchronizer.java

	public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//获取锁
            doAcquireSharedInterruptibly(arg);
    }

先进入tryAcquireShared(arg)方法尝试获取资源转到Semaphore 中的实现方法

		final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();//available =2
                int remaining = available - acquires;//remaining=2-1
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

A进入获取通道为2,remaining为1返回,A线程可执行 返回false
B进入获取通道为1,remaining为0返回,B线程可执行 返回false
C进入获取通道为0,remaining为-1返回,C线程不可执行 返回true
同理DEF获取通道为0,remaining为-1返回,都不可执行 返回true
进入
doAcquireSharedInterruptibly(arg);
AbstractQueuedSynchronizer.java

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//添加一个共享节点到队列尾部
        boolean failed = true;//等待过程中是否被中断过的标志
        try {
            for (;;) {
                final Node p = node.predecessor();//前驱
                if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
                    int r = tryAcquireShared(arg);//再次尝试获取资源
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

addWaiter(Node.SHARED);//添加一个共享节点到队列尾部在前一篇文章已经讲解,过程一直,不在赘述,前驱节点为head,再次尝试获取资源,明显失败,r<0走下面的if判断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
shouldParkAfterFailedAcquire(p, node)同理也讲过,循环设置哨兵head节点waitStatus为SIGNEL(-1)返回true
进入parkAndCheckInterrupt()方法,挂机CDEF线程

	private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

此时A,B线程进入执行 release();
AQS.java

	public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

进入判断 tryReleaseShared(arg)方法
Semaphore .java

	protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

A 获取状态current =0,nextc =0+1=1,设置状态为1,返回true
同理(B 获取状态current =1,nextc =1+1=2,设置状态为2,返回true)
此时进入doReleaseShared();方法

	private void doReleaseShared() {        
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;//哨兵节点状态为-1
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//哨兵节点状态设置为0
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//唤醒后继
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

把哨兵节点状态设置为0,唤醒之前的DEF线程
D线程被唤醒继续执行方法

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//添加一个共享节点到队列尾部
        boolean failed = true;//等待过程中是否被中断过的标志
        try {
            for (;;) {
                final Node p = node.predecessor();//前驱
                if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
                    int r = tryAcquireShared(arg);//再次尝试获取资源
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

得到前驱为head,再次尝试获取资源成功,r>0,执行
setHeadAndPropagate(node, r);方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 拿到head哨兵节点
        setHead(node);//把哨兵节点置为刚才的A节点
        //propagate>0
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;//找到后继几点
            if (s == null || s.isShared())//不为空或者是共享节点
                doReleaseShared();//获取通道唤醒
        }
    }

此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,共享模式
移除原来的哨兵节点,设置当前的节点为哨兵节点,propagate为1继续执行获取获取通道判断是否可以继续唤醒,不断唤醒所有等待中的共享节点线程

至此Semaphore 源码分析完毕

总结

Semaphore 使用的AQS的共享模式实现主要就是继承实现
tryAcquireShared(int),tryReleaseShared(int)这两方法

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//获取锁
            doAcquireSharedInterruptibly(arg);
    }
  • tryAcquireShared(arg)
  • doAcquireSharedInterruptibly(arg)
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  • tryReleaseShared(arg)
  • doReleaseShared()
    老实说我就是把上篇的CountDownLatch拿来改的,因为都是共享模式,不一样的只是自己实现同步器时的tryAcquireShared()和tryReleaseShared()不同而已

原文链接:https://blog.csdn.net/qq_25210899/article/details/117333640



所属网站分类: 技术文章 > 博客

作者:我是小豆丁

链接:http://www.javaheidong.com/blog/article/207234/64edf9c1561354791241/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

21 0
收藏该文
已收藏

评论内容:(最多支持255个字符)