JAVA和Nginx 教程大全

网站首页 > 精选教程 正文

Java并发-Semaphore源码分析 java并发编程实战 看不懂

wys521 2024-11-06 20:35:47 精选教程 59 ℃ 0 评论

一、前言

Semaphore(信号量)是J.U.C包中比较常用到的一个类,是AQS共享模式的一个应用,可以允许多个线程同时对共享资源进行操作,并且可以有效控制并发数,实现流量控制。

Semaphore提供了一个许可的概念,可以把这个许可看做是车票,只有成功买到车票的人才能够上车,并且车票是有数量限制的,否则会导致车辆超载。所以当车票卖完的时候其他人只能等下一趟车,如果中途有人下车,那么他的位置将空闲出来,如果此时有人想上车就又可以买到车票了。

下面就来深入了解下Semaphore的知识。

二、构造器

/**
 * 默认使用非公平机制(非公平信号量)
 */
public Semaphore(int permits) {
	sync = new NonfairSync(permits);
}
/**
 * 可以选择公平或非公平机制(公平信号量or非公平信号量)
 */
public Semaphore(int permits, boolean fair) {
	sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore提供了两个带参构造器,没有提供无参构造器。这两个构造器都必须传入一个初始的信号量许可数,默认是采用非公平方式获取信号量,但也可以通过参数指定获取公平信号量还是非公平信号量。

三、公平信号量

3.1、公平信号量的获取

Semaphore中的公平信号量是FairSync。它的获取API源码如下:

/**
 * 默认获取1个信号量许可
 */
public void acquire() throws InterruptedException {
	sync.acquireSharedInterruptibly(1);
}
/**
 * 获取指定的permits个信号量许可
 */
public void acquire(int permits) throws InterruptedException {
	if (permits < 0) throw new IllegalArgumentException();
	sync.acquireSharedInterruptibly(permits);
}

获取信号量的方法实际上是调用AQS中的acquireSharedInterruptibly()。acquireSharedInterruptibly()的源码在分析AQS共享模式的时候已经讲过了,这里做个回顾。

acquireSharedInterruptibly()的源码如下:

public final void acquireSharedInterruptibly(int arg)
		throws InterruptedException {
	//首先判断线程是否被中断, 如果被中断则抛出异常	
	if (Thread.interrupted())
		throw new InterruptedException();
	//1.尝试获取锁,如果获取锁成功,流程结束
	if (tryAcquireShared(arg) < 0)
		 //2.如果获取失败需要将当前线程封装成Node节点放到等待队列的末尾并按照条件判断是否挂起,等待被唤醒
		doAcquireSharedInterruptibly(arg);
}

acquireSharedInterruptibly方法首先就是去调用tryAcquireShared方法去尝试获取,tryAcquireShared在AQS里面是抽象方法,需要被子类重写,FairSync实现的是公平逻辑,而NonfairSync实现的是非公平逻辑,下面来看下公平逻辑下的tryAcquireShared()方法实现:

protected int tryAcquireShared(int acquires) {
	//自旋,成功才会退出
	for (;;) {
		//首先判断当前线程是不是等待队列中的第一个线程
		if (hasQueuedPredecessors())
			//如果不是则返回-1,表示尝试获取失败
			return -1;
		//获取可以获得的信号量许可数	
		int available = getState();
		//获取acquires个信号量许可之后剩余的信号量许可数
		int remaining = available - acquires;
		//如果remaining小于0,则直接返回remaining
		//如果remaining大于等于0,则先更新同步状态再返回remaining
		if (remaining < 0 ||
			compareAndSetState(available, remaining))
			//返回剩余信号量许可数
			return remaining;
	}
}

tryAcquireShared()的作用是尝试获取acquires个信号量许可数。对于Semaphore而言,state表示的是就是当前可获得的信号量许可数。返回负数表示获取失败,零表示当前线程获取成功但后续线程不能再获取,正数表示当前线程获取成功并且后续线程也能够获取。

所以如果返回的remaining小于0就代表获取失败,因此acquireSharedInterruptibly(arg)方法中的tryAcquireShared(arg) < 0就成立了,所以接下来就会调用doAcquireSharedInterruptibly方法,这个方法在讲AQS的共享模式源码的时候有说过,它会将当前线程包装成节点添加到同步队列的尾部,并且有可能挂起线程,这也是当remaining小于0时线程会排队阻塞的原因。

而如果返回的remaining >= 0就代表当前线程获取成功,因此 tryAcquireShared(arg) < 0就不成立,所以就不会再调用doAcquireSharedInterruptibly方法阻塞当前线程了。

3.2、公平信号量的释放

Semaphore中公平信号量(FairSync)的释放源码如下:

public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

信号量的releases()释放函数,实际上是调用的AQS中的releaseShared()。releaseShared()在AQS中实现,源码如下:

public final boolean releaseShared(int arg) {
	//尝试释放锁,该方法需要AQS的子类去实现
	if (tryReleaseShared(arg)) {
		//如果释放成功就进入此方法唤醒后继线程
		doReleaseShared();
		return true;
	}
	return false;
}

releaseShared()的目的是让当前线程释放它所持有的共享锁。它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。

Semaphore重写了tryReleaseShared(),它的源码如下:

protected final boolean tryReleaseShared(int releases) {
	for (;;) {
		//获取可以获得的信号量许可数
		int current = getState();
		//释放releases个信号量许可之后,剩余的信号量许可数
		int next = current + releases;
		//如果相加结果小于当前可以获得的信号量许可数的话就报错
		if (next < current) // overflow
			throw new Error("Maximum permit count exceeded");
		//以CAS方式更新同步状态的值, 更新成功则返回true, 否则继续循环	
		if (compareAndSetState(current, next))
			return true;
	}
}

如果tryReleaseShared()尝试释放共享锁失败,则会调用doReleaseShared()去释放共享锁。doReleaseShared()的源码如下:

private void doReleaseShared() {
	//自旋
	for (;;) {
		//获取同步队列的头节点
		Node h = head;
		//判断头节点不为空且同步队列中不止一个节点
		if (h != null && h != tail) {
			//获取头节点的等待状态
			int ws = h.waitStatus;
			//如果头节点的等待状态为SIGNAL,表示有后继节点需要被唤醒
			if (ws == Node.SIGNAL) {
				//CAS方式更新头节点的状态为0
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					continue;            // loop to recheck cases
				//唤醒后继节点	
				unparkSuccessor(h);
			}
			//如果头节点的状态为0表示后面没有节点在排队,就CAS方式更新头节点的状态为PROPAGATE,告诉后来的节点可以直接获取锁
			else if (ws == 0 &&
					 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				continue;                
		}
		//只有保证在此期间头节点没有被修改过才能跳出循环
		//可能其他线程获取了锁把head改变了,为了使唤醒的动作传递,必须重试
		if (h == head)                   
			break;
	}
}

doReleaseShared()会释放共享锁,它会从前往后遍历同步队列,依次唤醒然后执行队列中每个节点对应的线程,最终是让这些线程释放它们持有的信号量。

四、非公平信号量

Semaphore中的非公平信号量是NonFairSync,非公平信号量的释放与公平释放量的释放是一样的,不同的是获取信号量的机制不同,主要是tryAcquireShared()实现不同。

非公平信号量的tryAcquireShared()实现如下:

protected int tryAcquireShared(int acquires) {
	return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared()的源码如下:

final int nonfairTryAcquireShared(int acquires) {
	for (;;) {
		//获取可以获得的信号量的许可数
		int available = getState();
		//获得acquires个信号量许可之后,剩余的信号量许可数
		int remaining = available - acquires;
		//如果remaining小于0,则直接返回remaining
		//如果remaining大于等于0,则先更新同步状态再返回remaining
		if (remaining < 0 ||
			compareAndSetState(available, remaining))
			return remaining;
	}
}

非公平信号量的tryAcquireShared()方法在获取信号量的时候不需要判断当前线程是否是同步队列的头节点,而是直接获取。

公平信号量的tryAcquireShared()方法需要先判断当前线程是否是同步队列的头节点,如果不是,则返回-1,表示尝试获取失败。

五、示例

下面看个Semaphore使用的示例:?

public class SemaphoreTest {
    private static final int SEM_MAX = 10;
    public static void main(String[] args) {
        Semaphore sem = new Semaphore(SEM_MAX);
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        //在线程池中执行任务
        threadPool.execute(new MyThread(sem, 5));
        threadPool.execute(new MyThread(sem, 4));
        threadPool.execute(new MyThread(sem, 7));
        //关闭线程池
        threadPool.shutdown();
    }
    static class MyThread extends Thread {
        /**
         * 信号量
         */
        private volatile Semaphore sem;
        /**
         * 申请信号量大小
         */
        private int count;
        public MyThread(Semaphore sem, int count) {
            this.sem = sem;
            this.count = count;
        }
        @Override
        public void run() {
            try {
                //从信号量中获取count个许可
                sem.acquire(count);
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " 获取信号量许可数 = " + count);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放给定数目的许可
                sem.release(count);
                System.out.println(Thread.currentThread().getName() + " 释放信号量许可数 = " + count);
            }
        }
    }
}

执行结果如下:

结果说明:

信号量sem的许可总数是10个,一共3个线程,分别需要获取的信号量许可数是5、4、7。前面两个线程获取到信号量的许可后,sem中剩余的可用的许可数是1,因此,最后一个线程必须等前两个线程释放了它们所持有的信号量许可之后,才能获取到7个信号量许可。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表