网站首页 > 精选教程 正文
一、前言
Semaphore(信号量)是J.U.C包中比较常用到的一个类,是AQS共享模式的一个应用,可以允许多个线程同时对共享资源进行操作,并且可以有效控制并发数,实现流量控制。
Semaphore提供了一个许可的概念,可以把这个许可看做是车票,只有成功买到车票的人才能够上车,并且车票是有数量限制的,否则会导致车辆超载。所以当车票卖完的时候其他人只能等下一趟车,如果中途有人下车,那么他的位置将空闲出来,如果此时有人想上车就又可以买到车票了。
下面就来深入了解下Semaphore的知识。
二、构造器
/**
* 默认使用非公平机制(非公平信号量)
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* 可以选择公平或非公平机制(公平信号量or非公平信号量)
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore提供了两个带参构造器,没有提供无参构造器。这两个构造器都必须传入一个初始的信号量许可数,默认是采用非公平方式获取信号量,但也可以通过参数指定获取公平信号量还是非公平信号量。
三、公平信号量
3.1、公平信号量的获取
Semaphore中的公平信号量是FairSync。它的获取API源码如下:
/**
* 默认获取1个信号量许可
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 获取指定的permits个信号量许可
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
获取信号量的方法实际上是调用AQS中的acquireSharedInterruptibly()。acquireSharedInterruptibly()的源码在分析AQS共享模式的时候已经讲过了,这里做个回顾。
acquireSharedInterruptibly()的源码如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//首先判断线程是否被中断, 如果被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//1.尝试获取锁,如果获取锁成功,流程结束
if (tryAcquireShared(arg) < 0)
//2.如果获取失败需要将当前线程封装成Node节点放到等待队列的末尾并按照条件判断是否挂起,等待被唤醒
doAcquireSharedInterruptibly(arg);
}
acquireSharedInterruptibly方法首先就是去调用tryAcquireShared方法去尝试获取,tryAcquireShared在AQS里面是抽象方法,需要被子类重写,FairSync实现的是公平逻辑,而NonfairSync实现的是非公平逻辑,下面来看下公平逻辑下的tryAcquireShared()方法实现:
protected int tryAcquireShared(int acquires) {
//自旋,成功才会退出
for (;;) {
//首先判断当前线程是不是等待队列中的第一个线程
if (hasQueuedPredecessors())
//如果不是则返回-1,表示尝试获取失败
return -1;
//获取可以获得的信号量许可数
int available = getState();
//获取acquires个信号量许可之后剩余的信号量许可数
int remaining = available - acquires;
//如果remaining小于0,则直接返回remaining
//如果remaining大于等于0,则先更新同步状态再返回remaining
if (remaining < 0 ||
compareAndSetState(available, remaining))
//返回剩余信号量许可数
return remaining;
}
}
tryAcquireShared()的作用是尝试获取acquires个信号量许可数。对于Semaphore而言,state表示的是就是当前可获得的信号量许可数。返回负数表示获取失败,零表示当前线程获取成功但后续线程不能再获取,正数表示当前线程获取成功并且后续线程也能够获取。
所以如果返回的remaining小于0就代表获取失败,因此acquireSharedInterruptibly(arg)方法中的tryAcquireShared(arg) < 0就成立了,所以接下来就会调用doAcquireSharedInterruptibly方法,这个方法在讲AQS的共享模式源码的时候有说过,它会将当前线程包装成节点添加到同步队列的尾部,并且有可能挂起线程,这也是当remaining小于0时线程会排队阻塞的原因。
而如果返回的remaining >= 0就代表当前线程获取成功,因此 tryAcquireShared(arg) < 0就不成立,所以就不会再调用doAcquireSharedInterruptibly方法阻塞当前线程了。
3.2、公平信号量的释放
Semaphore中公平信号量(FairSync)的释放源码如下:
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
信号量的releases()释放函数,实际上是调用的AQS中的releaseShared()。releaseShared()在AQS中实现,源码如下:
public final boolean releaseShared(int arg) {
//尝试释放锁,该方法需要AQS的子类去实现
if (tryReleaseShared(arg)) {
//如果释放成功就进入此方法唤醒后继线程
doReleaseShared();
return true;
}
return false;
}
releaseShared()的目的是让当前线程释放它所持有的共享锁。它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
Semaphore重写了tryReleaseShared(),它的源码如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取可以获得的信号量许可数
int current = getState();
//释放releases个信号量许可之后,剩余的信号量许可数
int next = current + releases;
//如果相加结果小于当前可以获得的信号量许可数的话就报错
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//以CAS方式更新同步状态的值, 更新成功则返回true, 否则继续循环
if (compareAndSetState(current, next))
return true;
}
}
如果tryReleaseShared()尝试释放共享锁失败,则会调用doReleaseShared()去释放共享锁。doReleaseShared()的源码如下:
private void doReleaseShared() {
//自旋
for (;;) {
//获取同步队列的头节点
Node h = head;
//判断头节点不为空且同步队列中不止一个节点
if (h != null && h != tail) {
//获取头节点的等待状态
int ws = h.waitStatus;
//如果头节点的等待状态为SIGNAL,表示有后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//CAS方式更新头节点的状态为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点
unparkSuccessor(h);
}
//如果头节点的状态为0表示后面没有节点在排队,就CAS方式更新头节点的状态为PROPAGATE,告诉后来的节点可以直接获取锁
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//只有保证在此期间头节点没有被修改过才能跳出循环
//可能其他线程获取了锁把head改变了,为了使唤醒的动作传递,必须重试
if (h == head)
break;
}
}
doReleaseShared()会释放共享锁,它会从前往后遍历同步队列,依次唤醒然后执行队列中每个节点对应的线程,最终是让这些线程释放它们持有的信号量。
四、非公平信号量
Semaphore中的非公平信号量是NonFairSync,非公平信号量的释放与公平释放量的释放是一样的,不同的是获取信号量的机制不同,主要是tryAcquireShared()实现不同。
非公平信号量的tryAcquireShared()实现如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
nonfairTryAcquireShared()的源码如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取可以获得的信号量的许可数
int available = getState();
//获得acquires个信号量许可之后,剩余的信号量许可数
int remaining = available - acquires;
//如果remaining小于0,则直接返回remaining
//如果remaining大于等于0,则先更新同步状态再返回remaining
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平信号量的tryAcquireShared()方法在获取信号量的时候不需要判断当前线程是否是同步队列的头节点,而是直接获取。
公平信号量的tryAcquireShared()方法需要先判断当前线程是否是同步队列的头节点,如果不是,则返回-1,表示尝试获取失败。
五、示例
下面看个Semaphore使用的示例:?
public class SemaphoreTest {
private static final int SEM_MAX = 10;
public static void main(String[] args) {
Semaphore sem = new Semaphore(SEM_MAX);
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//在线程池中执行任务
threadPool.execute(new MyThread(sem, 5));
threadPool.execute(new MyThread(sem, 4));
threadPool.execute(new MyThread(sem, 7));
//关闭线程池
threadPool.shutdown();
}
static class MyThread extends Thread {
/**
* 信号量
*/
private volatile Semaphore sem;
/**
* 申请信号量大小
*/
private int count;
public MyThread(Semaphore sem, int count) {
this.sem = sem;
this.count = count;
}
@Override
public void run() {
try {
//从信号量中获取count个许可
sem.acquire(count);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 获取信号量许可数 = " + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放给定数目的许可
sem.release(count);
System.out.println(Thread.currentThread().getName() + " 释放信号量许可数 = " + count);
}
}
}
}
执行结果如下:
结果说明:
信号量sem的许可总数是10个,一共3个线程,分别需要获取的信号量许可数是5、4、7。前面两个线程获取到信号量的许可后,sem中剩余的可用的许可数是1,因此,最后一个线程必须等前两个线程释放了它们所持有的信号量许可之后,才能获取到7个信号量许可。
猜你喜欢
- 2024-11-06 信号量限流,高并发场景不得不说的秘密
- 2024-11-06 面试卡在多线程?那就分享几道Java多线程高频面试题,面试不用愁
- 2024-11-06 Java并发系列之Semaphore源码分析
- 2024-11-06 Java多线程与并发 java的并发,多线程,线程模型
- 2024-11-06 66.java并发编程之Semaphore和CountDownLatch使用
- 2024-11-06 Java基础笔试练习(十五) java基础知识试题
- 2024-11-06 72道Java线程面试题,这些面试官必问
- 2024-11-06 Java并发基础-锁详细分析(可重入锁、读写锁、信号量等)
- 2024-11-06 Java并发工具:CountDownLatch CyclicBarrier Semaphore快速掌握
- 2024-11-06 死磕 java同步系列之Semaphore源码解析
你 发表评论:
欢迎- 04-11Java面试“字符串三兄弟”String、StringBuilder、StringBuffer
- 04-11Java中你知道几种从字符串中找指定的字符的数量
- 04-11探秘Java面试中问的最多的String、StringBuffer、StringBuilder
- 04-11Python字符串详解与示例(python字符串的常见操作)
- 04-11java正则-取出指定字符串之间的内容
- 04-11String s1 = new String("abc");这句话创建了几个字符串对象?
- 04-11java判断字符串中是否包含某个字符
- 04-11关于java开发中正确的发牌逻辑编写规范
- 最近发表
-
- Java面试“字符串三兄弟”String、StringBuilder、StringBuffer
- Java中你知道几种从字符串中找指定的字符的数量
- 探秘Java面试中问的最多的String、StringBuffer、StringBuilder
- Python字符串详解与示例(python字符串的常见操作)
- java正则-取出指定字符串之间的内容
- String s1 = new String("abc");这句话创建了几个字符串对象?
- java判断字符串中是否包含某个字符
- 关于java开发中正确的发牌逻辑编写规范
- windows、linux如何后台运行jar(并且显示进程名)
- 腾讯大佬私人收藏,GitHub上最受欢迎的100个JAVA库,值得学习
- 标签列表
-
- nginx反向代理 (57)
- nginx日志 (56)
- nginx限制ip访问 (62)
- mac安装nginx (55)
- java和mysql (59)
- java中final (62)
- win10安装java (72)
- java启动参数 (64)
- java链表反转 (64)
- 字符串反转java (72)
- java逻辑运算符 (59)
- java 请求url (65)
- java信号量 (57)
- java定义枚举 (59)
- java字符串压缩 (56)
- java中的反射 (59)
- java 三维数组 (55)
- java插入排序 (68)
- java线程的状态 (62)
- java异步调用 (55)
- java中的异常处理 (62)
- java锁机制 (54)
- java静态内部类 (55)
- java怎么添加图片 (60)
- java 权限框架 (55)
本文暂时没有评论,来添加一个吧(●'◡'●)