网站首页 > 精选教程 正文
背景
线程池是一种基于池化思想管理线程的工具,使用线程池可以减少创建销毁线程的开销,避免线程过多导致系统资源耗尽。在高并发的任务处理场景,线程池的使用是必不可少的。在双11主图价格表达项目中为了提升处理性能,很多地方使用到了线程池。随着线程池的使用,逐渐发现一个问题,线程池的参数如何设置?
线程池参数中有三个比较关键的参数,分别是corePoolSize(核心线程数)、maximumPoolSize(最大线程数)、workQueueSzie(工作队列大小)。根据任务的类型可以区分为IO密集型和CPU密集型,对于CPU密集型,一般经验是设置corePoolSize=CPU核数+1,对于IO密集型需要根据具体的RT和流量来设置,没有普适的经验值。然而,我们一般遇到的情况多数是处理IO密集型任务,如果线程池参数不可动态调节,就没办法根据实际情况实时调整处理速度,只能通过发布代码调整参数。
如果线程池参数不合理会导致什么问题呢?下面列举几种可能出现的场景:
- 最大线程数设置偏小,工作队列大小设置偏小,导致服务接口大量抛出RejectedExecutionException。
- 最大线程数设置偏小,工作队列大小设置过大,任务堆积过度,接口响应时长变长。
- 最大线程数设置过大,线程调度开销增大,处理速度反而下降。
- 核心线程数设置过小,流量突增时需要先创建线程,导致响应时长过大。
- 核心线程数设置过大,空闲线程太多,占用系统资源。
线程池任务调度机制
要明白线程池参数对运行时的影响,就必须理解其中的原理,所以下面先简单总结了线程池的核心原理。
Java中的线程池核心实现类是ThreadPoolExecutor,ThreadPoolExecutor一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?其运行机制如下图所示:
所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
其执行流程如下图所示:
动态调节线程池参数实现
线程池相关的重要参数有三个,分别是核心线程数、最大线程数和工作队列大小,接下来将阐述如何实现动态调节线程池参数。
调节核心和最大线程数的原理
ThreadPoolExecutor已经提供了两个方法在运行时设置核心线程数和最大线程数,分别是ThreadPoolExecutor.setCorePoolSize()和ThreadPoolExecutor.setMaximumPoolSize()。
setCorePoolSize方法的执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:
setMaximumPoolSize方法执行流程是:首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。
调节工作队列大小的原理
线程池中是以生产者消费者模式,通过一个阻塞队列来缓存任务,工作线程从阻塞队列中获取任务。工作队列的接口是阻塞队列(BlockingQueue),在队列为空时,获取元素的线程会等待队列变为非空,当队列满时,存储元素的线程会等待队列可用。
目前JDK提供了以下阻塞队列的实现:
但是很不幸,这些阻塞队列的实现都不支持动态调整大小,那么为什么不自己实现一个可动态调整大小的阻塞队列呢。重复造轮子是不可取的,所以我选择改造轮子。LinkedBlockingQueue是比较常用的一个阻塞队列,它无法修改大小的原因是capacity字段设置成了final private final int capacity;。如果我把final去掉,并提供修改capacity的方法,是不是就满足我们的需求呢?事实证明是可行的,文章末尾上传了ResizeLinkedBlockingQueue的实现。
结合Diamond进行实现
Diamond可以管理我们的配置,如果可以通过Diamond实现线程池参数管理那就再好不过了。接下来就开始上代码了,首先实现一个Diamond配置管理类DispatchConfig,然后,实现一个线程池管理的工厂方法StreamExecutorFactory。
DispatchConfig类是一个静态类,在初始化的时候获取了对应Diamond的内容并设置了监听,使用的时候只需要DispatchConfig.getConfig().getCorePoolSize()。
/**
* @author moda
*/
@Slf4j
@Data
public class DispatchConfig {
public static final String DATA_ID = "com.alibaba.mkt.turbo.DispatchConfig";
public static final String GROUP_ID = "mkt-turbo";
private static DispatchConfig config;
static {
try {
String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);
config = JSON.parseObject(content, DispatchConfig.class);
Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {
@Override
public void receiveConfigInfo(String content) {
try {
config = JSON.parseObject(content, DispatchConfig.class);
} catch (Throwable t) {
log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);
}
}
});
} catch (Exception e) {
log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e);
}
}
public static DispatchConfig getConfig() {
return config;
}
private int corePoolSize = 10;
private int maximumPoolSize = 30;
private int workQueueSize = 1024;
/**
* 商品分批处理每批大小
*/
private int itemBatchProcessPageSize = 200;
}
StreamExecutorFactory是一个静态类,维护了一个静态属性executor,并通过initExecutor()进行初始化。在初始化的时候,工作队列使用了可调节大小的阻塞队列ResizeLinkedBlockingQueue,并设置了监听Diamond变更。Diamond发生变更的时候通过在callback中对比值是否发生改变,如果发生改变则调整workQueueSize、corePoolSize、maximumPoolSize。使用的时候只需要StreamExecutorFactory.getExecutor(),修改Diamond配置就能动态修改线程池参数。
/**
* @author moda
*/
@Slf4j
public class StreamExecutorFactory {
private static final String THREAD_NAME = "mkt-turbo_stream_dispatch";
private static ThreadPoolExecutor executor = initExecutor();
private static ThreadPoolExecutor initExecutor() {
ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();
ResizeLinkedBlockingQueue<Runnable> workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());
//拒绝策略,调用者线程处理
RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {
String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",
THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
log.warn(msg);
if (!e.isShutdown()) {
r.run();
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
DispatchConfig.getConfig().getCorePoolSize(),
DispatchConfig.getConfig().getMaximumPoolSize(),
10,
TimeUnit.SECONDS,
workQueue,
nameThreadFactory,
rejectedExecutionHandler
);
Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {
@Override
public void receiveConfigInfo(String content) {
try {
DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);
if (workQueue.getCapacity() != config.getWorkQueueSize()) {
workQueue.setCapacity(config.getWorkQueueSize());
}
if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {
threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());
}
if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {
threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());
}
} catch (Throwable t) {
log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);
}
}
});
return threadPoolExecutor;
}
public static Executor getExecutor() {
return executor;
}
}
本文为阿里云原创内容,未经允许不得转载。
猜你喜欢
- 2024-11-18 Java并发编程线程状态转换
- 2024-11-18 Java面试必考问题:线程的生命周期
- 2024-11-18 Java 19 虚拟线程的状态变化,停驻与锁定
- 2024-11-18 Java线程生命周期详解?
- 2024-11-18 线程从创建最终消亡,要经历的若干状态,你了解其中的多少?
- 2024-11-18 java线程状态之WAITING(等待)
- 2024-11-18 浅谈Java线程:线程基础知识扫盲
- 2024-11-18 Java线程在各个状态下调用start方法会发生什么事情?
- 2024-11-18 为什么Java中线程没有Running状态
- 2024-11-18 阻塞模型将会使线程休眠,为什么 Java 线程状态却是 RUNNABLE?
你 发表评论:
欢迎- 04-11Java面试“字符串三兄弟”String、StringBuilder、StringBuffer
- 04-11Java中你知道几种从字符串中找指定的字符的数量
- 04-11探秘Java面试中问的最多的String、StringBuffer、StringBuilder
- 04-11Python字符串详解与示例(python字符串的常见操作)
- 04-11java正则-取出指定字符串之间的内容
- 04-11String s1 = new String("abc");这句话创建了几个字符串对象?
- 04-11java判断字符串中是否包含某个字符
- 04-11关于java开发中正确的发牌逻辑编写规范
- 最近发表
-
- Java面试“字符串三兄弟”String、StringBuilder、StringBuffer
- Java中你知道几种从字符串中找指定的字符的数量
- 探秘Java面试中问的最多的String、StringBuffer、StringBuilder
- Python字符串详解与示例(python字符串的常见操作)
- java正则-取出指定字符串之间的内容
- String s1 = new String("abc");这句话创建了几个字符串对象?
- java判断字符串中是否包含某个字符
- 关于java开发中正确的发牌逻辑编写规范
- windows、linux如何后台运行jar(并且显示进程名)
- 腾讯大佬私人收藏,GitHub上最受欢迎的100个JAVA库,值得学习
- 标签列表
-
- nginx反向代理 (57)
- nginx日志 (56)
- nginx限制ip访问 (62)
- mac安装nginx (55)
- java和mysql (59)
- java中final (62)
- win10安装java (72)
- java启动参数 (64)
- java链表反转 (64)
- 字符串反转java (72)
- java逻辑运算符 (59)
- java 请求url (65)
- java信号量 (57)
- java定义枚举 (59)
- java字符串压缩 (56)
- java中的反射 (59)
- java 三维数组 (55)
- java插入排序 (68)
- java线程的状态 (62)
- java异步调用 (55)
- java中的异常处理 (62)
- java锁机制 (54)
- java静态内部类 (55)
- java怎么添加图片 (60)
- java 权限框架 (55)
本文暂时没有评论,来添加一个吧(●'◡'●)