10小节:如何实现线程池参数的并发安全刷新?
作者:程序员马丁
热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校招或社招上拿个offer。
如何实现线程池参数的并发安全刷新?元数据信息:
- 什么是线程池oneThread:https://t.zsxq.com/5GfrN
- 代码仓库:https://gitcode.net/nageoffer/onethread —— 申请项目权限参考上述线程池项目链接
- 章节难度:★★☆☆☆ - 中等
- 视频地址:本章节内容简单,无
©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》和《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。
内容摘要:本文聚焦 oneThread 动态线程池的参数刷新机制,详细拆解“检测差异 → 热更新参数 → 更新元数据 → 消息通知与审计日志”全链路实现。通过逐行解析关键代码,帮助读者理解如何在运行期安全、无感知地将远程配置同步到线程池。
课程目录 如下所示:
- 业务时序
- 配置前置验证
- 线程池配置变更
- 如何保障线程池配置刷新的并发安全?
- 文末总结
在上一章节,咱们已经实现了从配置中心获取最新的变更内容,并将对应的字符串序列化为 Java 对象供后续流程使用。接下来会从最新配置和内存中的线程池配置进行比较,如果有变化则更新,没有变化则跳过。
业务时序
动态线程池的核心能力之一,就是运行时可以自动感知配置变化并热更新,无需重启服务。为实现这一能力,我们需要:
- 获取远程最新线程池配置;
- 对比当前内存中已有的线程池配置;
- 如果检测到配置发生变更,则执行更新;
- 存储最新配置,方便下次配置更新时比对;
- 通知各方配置已变更,并打印变更日志。
配置前置验证
1. 判断是否有线程池配置
代码首先使用检查远程配置是否包含线程池设置。如果为空,则直接返回,跳过后续处理。这确保了只有当有有效的线程池配置时才会继续执行。
if (CollUtil.isEmpty(refresherProperties.getExecutors())) {
return;
}
2. 判断线程池配置是否发生变化
我们逐个遍历线程池 ID,并通过 hasThreadPoolConfigChanged()
进行对比。该方法会检查核心线程数、最大线程数、拒绝策略、队列容量等多个关键参数是否发生了变化。
for (ThreadPoolExecutorProperties remoteProperties : refresherProperties.getExecutors()) {
boolean changed = hasThreadPoolConfigChanged(remoteProperties);
if (!changed) {
continue;
}
...
}
根据线程池 ID 获取注册中心中的 ThreadPoolExecutorHolder
。如果未找到线程池实例,记录警告日志并返回 false
。若找到实例,调用 hasDifference
对比新旧配置。
private boolean hasThreadPoolConfigChanged(ThreadPoolExecutorProperties remoteProperties) {
String threadPoolId = remoteProperties.getThreadPoolId();
ThreadPoolExecutorHolder holder = OneThreadRegistry.getHolder(threadPoolId);
if (holder == null) {
log.warn("No thread pool found for thread pool id: {}", threadPoolId);
return false;
}
ThreadPoolExecutor executor = holder.getExecutor();
ThreadPoolExecutorProperties originalProperties = holder.getExecutorProperties();
return hasDifference(originalProperties, remoteProperties, executor);
}
该方 法逐字段比较关键属性是否发生变化,判断逻辑为:
- 只要有一个字段不相等,则认为配置已变更。
- 字段包括:核心线程数、最大线程数、是否允许超时、存活时间、拒绝策略、队列容量等。
private boolean hasDifference(ThreadPoolExecutorProperties original,
ThreadPoolExecutorProperties remote,
ThreadPoolExecutor executor) {
return isChanged(original.getCorePoolSize(), remote.getCorePoolSize())
|| isChanged(original.getMaximumPoolSize(), remote.getMaximumPoolSize())
|| isChanged(original.getAllowCoreThreadTimeOut(), remote.getAllowCoreThreadTimeOut())
|| isChanged(original.getKeepAliveTime(), remote.getKeepAliveTime())
|| isChanged(original.getRejectedHandler(), remote.getRejectedHandler())
|| isQueueCapacityChanged(original, remote, executor);
}
isChanged
是一个辅助方法,用于比较两个值是否不同:
- 忽略空值,仅对非 null 字段进行判断;
- 使用
Objects.equals()
保证泛型类型安全比较。
private <T> boolean isChanged(T before, T after) {
return after != null && !Objects.equals(before, after);
}
网上也有一些专业做 Java 对象比对是否一致的框架:dadiyang/equator,因为我懒得去看里面的 API,所以就自己进行的判断。如果有复杂对象且字段多的,推荐使用现成工具类。
对于队列容量,有一个特殊的检查 isQueueCapacityChanged
,因为不是所有队列都支持动态调整容量。队列容量仅支持我们自定义的 ResizableCapacityLinkedBlockingQueue
,否则不可动态调整。
private boolean isQueueCapacityChanged(...){
Integer remoteCapacity = remoteProperties.getQueueCapacity();
Integer originalCapacity = originalProperties.getQueueCapacity();
BlockingQueue<?> queue = executor.getQueue();
return remoteCapacity != null
&& !Objects.equals(remoteCapacity, originalCapacity)
&& Objects.equals("ResizableCapacityLinkedBlockingQueue", queue.getClass().getSimpleName());
}
只有远程配置中的队列容量不为空,并且与当前内存中的配置不一致,同时当前线程池使用的是支持动态扩容的队列(如ResizableCapacityLinkedBlockingQueue
),才认为容量确实发生了变化。
默认的 ArrayBlockingQueue 和 LinkedBlockingQueue 为什么不能改变容量,下一章节会详细说明。
线程池配置变更
如果检测到变化,调用线程池刷新方法应用新的配置:
1. 核心 / 最大线程数
线程数更新的原则是:先最大后核心。如果新核心线程数大于当前最大线程数,必须先调大 maximumPoolSize
,否则 JDK 会抛 IllegalArgumentException
。
Integer remoteCorePoolSize = remoteProperties.getCorePoolSize();
Integer remoteMaximumPoolSize = remoteProperties.getMaximumPoolSize();
if (remoteCorePoolSize != null && remoteMaximumPoolSize != null) {
int originalMaximumPoolSize = executor.getMaximumPoolSize();
if (remoteCorePoolSize > originalMaximumPoolSize) {
executor.setMaximumPoolSize(remoteMaximumPoolSize);
executor.setCorePoolSize(remoteCorePoolSize);
} else {
executor.setCorePoolSize(remoteCorePoolSize);
executor.setMaximumPoolSize(remoteMaximumPoolSize);
}
} else {
if (remoteMaximumPoolSize != null) {
executor.setMaximumPoolSize(remoteMaximumPoolSize);
}
if (remoteCorePoolSize != null) {
executor.setCorePoolSize(remoteCorePoolSize);
}
}
之所以会抛出异常,是因为 JDK17 线程池底层在设置核心线程数时做了参数限制校验。
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
有些同学可能打开自己项目一看,发现代码里并没有相关校验逻辑,这是因为 JDK8 并未引入这段线程数的校验机制。我在开发 Hippo4j 的过程中也曾踩过这个坑,感同身受。
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
具体可以参考这个 Issue 👉 Hippo4j Issue#1063,里面有详细的分析过程和复现场景。
2. 拒绝策略
借助枚举工厂方法把字符串策略名转换为真正的 RejectedExecutionHandler
实例,保证可插拔。
if (remoteProperties.getRejectedHandler() != null &&
!Objects.equals(remoteProperties.getRejectedHandler(), originalProperties.getRejectedHandler())) {
RejectedExecutionHandler handler = RejectedPolicyTypeEnum.createPolicy(remoteProperties.getRejectedHandler());
executor.setRejectedExecutionHandler(handler);
}
3. 队列容量动态扩容
只有当队列实例实现了可扩容接口时才可以修改容量,避免 LinkedBlockingQueue
等 JDK 原生队列不支持容量变化导致的风险。
if (isQueueCapacityChanged(originalProperties, remoteProperties, executor)) {
BlockingQueue<Runnable> queue = executor.getQueue();
ResizableCapacityLinkedBlockingQueue<?> resizableQueue = (ResizableCapacityLinkedBlockingQueue<?>) queue;
resizableQueue.setCapacity(remoteProperties.getQueueCapacity());
}
4. 刷新元数据、发送通知、打印审计日志
线程池运行时参数变更后,还有一些后置逻辑需要处理:
- 把最新配置写回注册表,保证后续再读取时就是新的值;
- 然后会把“旧值 → 新值”的映射封装成 DTO,通过钉钉、企业微信、邮件等渠道推送给开发 / 运维,做到即时可见。
- 为实现日志留痕,会通过
log.info
统一打印所有关键字段的 “旧值 -> 新值”。
代码如下所示:
// 线程池参数变更后进行日志打印
String threadPoolId = remoteProperties.getThreadPoolId();
ThreadPoolExecutorHolder holder = OneThreadRegistry.getHolder(threadPoolId);
ThreadPoolExecutorProperties originalProperties = holder.getExecutorProperties();
holder.setExecutorProperties(remoteProperties);
// 发送线程池配置变更消息通知
sendThreadPoolConfigChangeMessage(originalProperties, remoteProperties);
// 打印线程池配置变更日志
log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId,
String.format(CHANGE_DELIMITER, originalProperties.getCorePoolSize(), remoteProperties.getCorePoolSize()),
String.format(CHANGE_DELIMITER, originalProperties.getMaximumPoolSize(), remoteProperties.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, originalProperties.getQueueCapacity(), remoteProperties.getQueueCapacity()),
String.format(CHANGE_DELIMITER, originalProperties.getKeepAliveTime(), remoteProperties.getKeepAliveTime()),
String.format(CHANGE_DELIMITER, originalProperties.getRejectedHandler(), remoteProperties.getRejectedHandler()),
String.format(CHANGE_DELIMITER, originalProperties.getAllowCoreThreadTimeOut(), remoteProperties.getAllowCoreThreadTimeOut()));
如何保障线程池配置刷新的并发安全?
在动态线程池的配置刷新过程中,我们需要支持多个线程同时触发配置变更(比如配置中心推送受网络影响重复推送、多用户手动调用重复、定时校验等),但必须保证同一个线程池的参数刷新是串行、原子、安全的。
否则就可能导致:
- 参数错乱:两个线程同时修改 corePoolSize 和 maximumPoolSize,最终值不可预期;
- 日志混乱:原始值和新值打印错位;
- 队列扩容失败:并发修改
ResizableCapacityLinkedBlockingQueue
会抛异常; - 不一致性:通知发送和实际线程池状态不一致。
在处理并发安全问题时,我考虑了两种方案:
- 对整个方法加锁:实现最简单,能快速解决线程安全问题;
- 仅对指定的线程池 ID 加锁:粒度更细,性能更优。
虽然第一种方式实现起来最省事,但独 属于程序员的“强迫症”让我不太能接受这种处理方式。因此,最终我选择了第二种方案,对线程池 ID 维度加锁。
1. 使用 synchronized (id)
代码示例:
synchronized (threadPoolId) {
// do refresh
}
存在的问题:
- 如果
threadPoolId
是从对象字段获取的(例如.getThreadPoolId()
),多个对象即使返回相同内容,也可能是不同的 String 实例。 - JVM 会对不同的引用分配不同的锁 → 锁不生效,并发冲突依然会发生。