12小节:阻塞队列容量热更新策略下的“坑”
作者:程序员马丁
热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校招或社招上拿个offer。
阻塞队列容量热更新策略下的“坑”,元数据信息:
- 什么是线程池oneThread:https://t.zsxq.com/5GfrN
- 代码仓库:https://gitcode.net/nageoffer/onethread —— 申请项目权限参考上述线程池项目链接
- 章节难度:★★★☆☆ - 较难
- 视频地址:文档先行视频次之
©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》和《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。
内容摘要:本篇文章围绕阻塞队列容量动态调整的可行性展开,结合 Java 原生 LinkedBlockingQueue
存在的反射修改问题,深入剖析了其底层设计的局限性,并介绍了 RabbitMQ 的解决方案 —— VariableLinkedBlockingQueue
。
课程目录如下所示:
- 前 言
- 反射方案的问题
- RabbitMQ 如何解决热更新问题
- 文末总结
前言
阻塞队列动态更新不止是咱们需要,RabbitMQ 同样需要。在实际运行中,RabbitMQ 的工作线程池会处理来自大量客户端的请求,这些操作的压力往往具有明显的波峰波谷特征,例如在早晚高峰、电商秒杀等期间,通过动态扩容任务队列,防止因短期流量冲击造成系统雪崩。RabbitMQ 运行过程中,会监控如线程池活跃线程数、队列使用情况、任务阻塞率等指标,达到设置阈值自动进行扩容。
RabbitMQ 我没用过不太熟,客户端通过自动扩容这里大家有兴趣可以具体看看。
RabbitMQ 团队在面临同样需求时,选择了一个更优雅的做法:直接复制并修改 LinkedBlockingQueue
源码,做成可变容量版本。
本章节我们先通过讲解反射带来的问题,然后再看 RabbitMQ 怎么做的。
反射方案的问题
虽然我们可以通过反射手段绕过 final
限制,修改 LinkedBlockingQueue
中的 capacity
字段,但仅仅修改这个字段值并不能真正实现预期行为。我们来看两个非常典型的问题场景:
1. 队列已满修改容量无效
当队列已满,调用线程正阻塞在 put()
方法上等待空位:
while (count.get() == capacity) {
notFull.await(); // 阻塞在这里
}
此时,我们通过反射动态将 capacity
从 10 修改为 100,期望 unblock 等待线程。但线程仍然阻塞在 notFull.await()
,无法及时感知容量变化!
下方截图取自 LinkedBlockingQueue#put
方法:
capacity
虽然被改大了,但线程已经卡在了 await()
上,如果没有人手动调用 signalNotFull()
,它永远不会被唤醒。
⚠️ 注意:反射只改字段,不会自动触发条件变量通知(Condition#signal)!
以下是一个测试用例,我放在了 core
模块 test 目录下,展示通过反射修改 LinkedBlockingQueue 容量的尝试及其问题:
public class ResizableCapacityLinkedBlockingQueueV1Test {
public static void main(String[] args) throws Exception {
ResizableCapacityLinkedBlockingQueueV1<String> queue = new ResizableCapacityLinkedBlockingQueueV1<>(2);
// 填充队列至满
queue.put("Element 1");
System.out.println("入队列成功,当前大小:" + queue.size());
queue.put("Element 2");
System.out.println("入队列成功,当前大小:" + queue.size());
// 尝试添加第三个元素,预期阻塞
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
System.out.println("尝试添加 Element 3,队列已满,线程将被阻塞");
queue.put("Element 3");
System.out.println("成功添加 Element 3,队列大小:" + queue.size());
} catch (InterruptedException e) {
System.out.println("添加 Element 3 失败");
}
});
// 等待 2 秒,确保线程阻塞
TimeUnit.SECONDS.sleep(2);
// 通过反射修改容量
try {
queue.setCapacity(3);
System.out.println("通过反射修改容量为:3");
} catch (Exception e) {
System.out.println("反射修改容量失败:" + e.getMessage());
}
// 等待 2 秒,观察是否成功添加
TimeUnit.SECONDS.sleep(2);
executor.shutdownNow();
}
}
⚠️ 注意:如果想在 IDEA 里跑这个单元测试,需要在 IDEA 单元测试中设置 VM 参数:
--add-opens java.base/java.util.concurrent=ALL-UNNAMED
我们期望的日志输出如下: