Skip to main content

12小节:阻塞队列容量热更新策略下的“坑”

作者:程序员马丁

在线博客:https://nageoffer.com

note

热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校招或社招上拿个offer。

阻塞队列容量热更新策略下的“坑”,元数据信息:

©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。


内容摘要:本篇文章围绕阻塞队列容量动态调整的可行性展开,结合 Java 原生 LinkedBlockingQueue 存在的反射修改问题,深入剖析了其底层设计的局限性,并介绍了 RabbitMQ 的解决方案 —— VariableLinkedBlockingQueue

课程目录如下所示:

  • 前言
  • 反射方案的问题
  • RabbitMQ 如何解决热更新问题
  • 文末总结

前言

阻塞队列动态更新不止是咱们需要,RabbitMQ 同样需要。在实际运行中,RabbitMQ 的工作线程池会处理来自大量客户端的请求,这些操作的压力往往具有明显的波峰波谷特征,例如在早晚高峰、电商秒杀等期间,通过动态扩容任务队列,防止因短期流量冲击造成系统雪崩。RabbitMQ 运行过程中,会监控如线程池活跃线程数、队列使用情况、任务阻塞率等指标,达到设置阈值自动进行扩容。

RabbitMQ 我没用过不太熟,客户端通过自动扩容这里大家有兴趣可以具体看看。

RabbitMQ 团队在面临同样需求时,选择了一个更优雅的做法:直接复制并修改 LinkedBlockingQueue 源码,做成可变容量版本

本章节我们先通过讲解反射带来的问题,然后再看 RabbitMQ 怎么做的。

反射方案的问题

虽然我们可以通过反射手段绕过 final 限制,修改 LinkedBlockingQueue 中的 capacity 字段,但仅仅修改这个字段值并不能真正实现预期行为。我们来看两个非常典型的问题场景:

1. 队列已满修改容量无效

当队列已满,调用线程正阻塞在 put() 方法上等待空位:

while (count.get() == capacity) {
notFull.await(); // 阻塞在这里
}

此时,我们通过反射动态将 capacity 从 10 修改为 100,期望 unblock 等待线程。但线程仍然阻塞在 notFull.await(),无法及时感知容量变化!

下方截图取自 LinkedBlockingQueue#put 方法:

capacity 虽然被改大了,但线程已经卡在了 await() 上,如果没有人手动调用 signalNotFull(),它永远不会被唤醒。

⚠️ 注意:反射只改字段,不会自动触发条件变量通知(Condition#signal)

以下是一个测试用例,我放在了 core 模块 test 目录下,展示通过反射修改 LinkedBlockingQueue 容量的尝试及其问题:

public class ResizableCapacityLinkedBlockingQueueV1Test {

public static void main(String[] args) throws Exception {
ResizableCapacityLinkedBlockingQueueV1<String> queue = new ResizableCapacityLinkedBlockingQueueV1<>(2);

// 填充队列至满
queue.put("Element 1");
System.out.println("入队列成功,当前大小:" + queue.size());
queue.put("Element 2");
System.out.println("入队列成功,当前大小:" + queue.size());

// 尝试添加第三个元素,预期阻塞
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
System.out.println("尝试添加 Element 3,队列已满,线程将被阻塞");
queue.put("Element 3");
System.out.println("成功添加 Element 3,队列大小:" + queue.size());
} catch (InterruptedException e) {
System.out.println("添加 Element 3 失败");
}
});

// 等待 2 秒,确保线程阻塞
TimeUnit.SECONDS.sleep(2);

// 通过反射修改容量
try {
queue.setCapacity(3);
System.out.println("通过反射修改容量为:3");
} catch (Exception e) {
System.out.println("反射修改容量失败:" + e.getMessage());
}

// 等待 2 秒,观察是否成功添加
TimeUnit.SECONDS.sleep(2);

executor.shutdownNow();
}
}

⚠️ 注意:如果想在 IDEA 里跑这个单元测试,需要在 IDEA 单元测试中设置 VM 参数:

--add-opens java.base/java.util.concurrent=ALL-UNNAMED

我们期望的日志输出如下:

解锁付费内容,👉 戳