JAVA线程池监控以及动态调整线程池

JAVA线程池监控以及动态调整线程池

关于线程池的基础知识和源码见Java线程池源码分析_出世&入世的博客-CSDN博客

1、背景

Java线程池源码分析里虽然介绍了线程池的核心配置(核心线程数、最大线程数和队列大小)该如何配置,但是实际上业界也没有一个统一的标准。虽然有些所谓的”公式”,但是不同的业务场景复杂多变,配置原则也不尽相同。从实际经验来看,IO密集型、CPU密集型应用在线程配置上就比较悬殊,因此没有一个通用的适合所有场景的公式。

那么我们换一种思路,就是既然不能明确配置,那么能不能支持动态配置呢?答案是肯定的,因为线程池本身就支持核心线程数和最大线程数的修改,而且是实时生效的。 通常在生产环境中,我们可以实时监控线程池的运行状态,随时掌握应用服务的性能状况,以便在系统资源紧张时及时告警,动态调整线程配置,必要时进行人工介入,排查问题,线上修复。

也就是说,通过实时监控,然后动态修改。

2、监控

我们知道,线程池使用不当也会使服务器资源枯竭,导致异常情况的发生,比如固定线程池的阻塞队列任务数量过多、缓存线程池创建的线程过多导致内存溢出、系统假死等问题。因此,我们需要一种简单的监控方案来监控线程池的使用情况,比如完成任务数量、未完成任务数量、线程大小等信息。

线程池的监控分为2种类型,一种是在执行任务前后全量统计任务排队时间和执行时间,另外一种是通过定时任务,定时获取活跃线程数,队列中的任务数,核心线程数,最大线程数等数据。

2.1、MonitoredThreadPoolStatisticsExecutor全量统计

参数名称说明
poolName线程池的名称
timeout预设的任务超时时间阈值
taskTimeoutFlag是否记录任务超时次数
execTimeout任务执行超时时间阈值
taskExecTimeoutFlag是否记录任务执行超时次数
waitInQueueTimeout任务在队列中等待的时间阈值
taskWaitInQueueTimeoutFlag是否记录任务等待时间超时次数
queueSizeWarningPercent任务队列使用率告警阈值
queueSizeWarningFlag是否进行队列容量告警
queueSizeHasWarningFlag是否需要队列容量告警(队列是否曾经达到过预警值)
taskTotalTime任务总时长,以任务提交时间进行计时,单位 ms
taskTotalExecTime任务总执行时长,以任务开始执行进行计时,单位 ms
minTaskTime最短任务时长,以提交时间计时,单位 ms
maxTaskTime最长任务时长,以提交时间计时,单位 ms
taskTimeoutCount任务超时次数,以任务提交进行计时
taskExecTimeoutCount任务执行超时次数,以任务开始执行时间进行计时
taskWaitInQueueTimeoutCount任务等待时间超过设定的阈值的次数
minTaskExecTime最短任务时长,以执行时间计时,单位 ms
maxTaskExecTime最长任务时长,以执行时间计时,单位 ms
activeCount线程池中正在执行任务的线程数量
completedTaskCount线程池已完成的任务数量,该值小于等于taskCount
corePoolSize线程池的核心线程数量
largestPoolSize线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
maximumPoolSize线程池的最大线程数量
poolSize线程池当前的线程数量
taskCount线程池已经执行的和未执行的任务总数

为了简化,代码中的监控数据都是通过日志打印,实际中是通过kafka收集,然后做出可视化监控。

/**
 * 自定义可监控的线程池
 */
 public class MonitoredThreadPoolStatisticsExecutor extends ThreadPoolExecutor implements DisposableBean {
 /**

 * 线程池的名称
 */
 private String poolName;
 /**
 * 预设的任务超时时间阈值,用于统计功能。     * 以任务提交时间进行计时,单位 ms,大于0则记录超时次数。
 */
 private long timeout = 120000l;
 /**
 * 是否记录任务超时次数
 */
 private boolean taskTimeoutFlag = false;
 /**
 * 任务执行超时时间阈值,用于统计功能。     * 以任务开始执行进行计时,单位 ms,大于 0 则记录任务执行超时次数。
 */
 private long execTimeout = 120000l;
 /**
 * 是否记录任务执行超时次数
 */
 private boolean taskExecTimeoutFlag = false;
 /**
 * 任务在队列中等待的时间阈值,用于统计功能。     * 以任务提交时间开始计时到开始执行为止,单位 ms。
 */
 private long waitInQueueTimeout = 60000l;
 /**
 * 是否记录任务等待时间超时次数
 */
 private boolean taskWaitInQueueTimeoutFlag = false;
 /**
 * 任务队列使用率告警阈值
 */
 private int queueSizeWarningPercent = 80;
 /**
 * 是否进行队列容量告警
 */
 private boolean queueSizeWarningFlag = false;
 /**
 * 是否需要队列容量告警(队列是否曾经达到过预警值)
 */
 private AtomicBoolean queueSizeHasWarningFlag = new AtomicBoolean(false);
 /**
 * 任务总时长,用于统计功能。以任务提交时间进行计时,单位 ms
 */
 private AtomicLong taskTotalTime = new AtomicLong(0);
 /**
 * 任务总执行时长,用于统计功能。以任务开始执行进行计时,单位 ms
 */
 private AtomicLong taskTotalExecTime = new AtomicLong(0);
 /**
 * 最短任务时长,以提交时间计时,单位 ms
 */
 private long minTaskTime = Long.MAX_VALUE;
 /**
 * 最长任务时长,以提交时间计时,单位 ms
 */
 private long maxTaskTime = 0;
 /**
 * 任务超时次数,以任务提交进行计时
 */
 private AtomicLong taskTimeoutCount = new AtomicLong(0);
 /**
 * 任务执行超时次数,以任务开始执行时间进行计时
 */
 private AtomicLong taskExecTimeoutCount = new AtomicLong(0);
 /**
 * 任务等待时间超过设定的阈值的次数
 */
 private AtomicLong taskWaitInQueueTimeoutCount = new AtomicLong(0);
 /**
 * 最短任务时长,以执行时间计时,单位 ms
 */
 private long minTaskExecTime = Long.MAX_VALUE;
 /**
 * 最长任务时长,以执行时间计时,单位 ms
 */
 private long maxTaskExecTime = 0;
 /**
 * 保存任务信息
 */
 private Map<String, TaskStatistics> taskInfoMap = new ConcurrentHashMap<String, TaskStatistics>();

 public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), handler);
 this.poolName = poolName;
 this.timeout = timeout;
 this.execTimeout = execTimeout;
 this.waitInQueueTimeout = waitInQueueTimeout;
 this.queueSizeWarningPercent = queueSizeWarningPercent;
 if (this.timeout > 0) {
 this.taskTimeoutFlag = true;
 }
 if (this.execTimeout > 0) {
 this.taskExecTimeoutFlag = true;
 }
 if (this.waitInQueueTimeout > 0) {
 this.taskWaitInQueueTimeoutFlag = true;
 }
 if (this.queueSizeWarningPercent > 0) {
 this.queueSizeWarningFlag = true;
 }

 ThreadPoolMonitor.monitor(this);
 }

 public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName));
 this.poolName = poolName;
 this.timeout = timeout;
 this.execTimeout = execTimeout;
 this.waitInQueueTimeout = waitInQueueTimeout;
 this.queueSizeWarningPercent = queueSizeWarningPercent;
 if (this.timeout > 0) {
 this.taskTimeoutFlag = true;
 }
 if (this.execTimeout > 0) {
 this.taskExecTimeoutFlag = true;
 }
 if (this.waitInQueueTimeout > 0) {
 this.taskWaitInQueueTimeoutFlag = true;
 }
 if (this.queueSizeWarningPercent > 0) {
 this.queueSizeWarningFlag = true;
 }
 ThreadPoolMonitor.monitor(this);
 }

 @Override
 public void execute(Runnable command) {
 this.taskInfoMap.put(String.valueOf(command.hashCode()), new TaskStatistics());
 if (this.queueSizeWarningFlag) {
 float f = (float) getQueue().size() / (getQueue().size() + getQueue().remainingCapacity());
 BigDecimal bd = new BigDecimal(f).setScale(2, BigDecimal.ROUND_HALF_UP);
 int usedPercent = bd.multiply(new BigDecimal(100)).intValue();
 if (usedPercent > this.queueSizeWarningPercent) {
 this.queueSizeHasWarningFlag.set(true);
 System.out.println("queueSize percent Warning!used:" + usedPercent + "%,qSize:" + getQueue().size() + ",remaining:" + getQueue().remainingCapacity());
 }
 }
 super.execute(command);
 }

 @Override
 protected void beforeExecute(Thread t, Runnable r) {
 TaskStatistics taskStatistics = this.taskInfoMap.get(String.valueOf(r.hashCode()));
 if (null != taskStatistics) {
 taskStatistics.setStartExecTime(System.currentTimeMillis());
 }
 super.beforeExecute(t, r);
 }

 @Override
 protected void afterExecute(Runnable r, Throwable t) {
 //重写此方法做一些统计功能
 long endTime = System.currentTimeMillis();
 TaskStatistics taskStatistics = this.taskInfoMap.remove(String.valueOf(r.hashCode()));
 if (null != taskStatistics) {
 long taskTotalTime = endTime - taskStatistics.getCommitTime();
 long taskExecTime = endTime - taskStatistics.getStartExecTime();
 long taskWaitInQueueTime = taskStatistics.getStartExecTime() - taskStatistics.getCommitTime();
 this.taskTotalTime.addAndGet(taskTotalTime);
 this.taskTotalExecTime.addAndGet(taskExecTime);
 if (this.minTaskTime > taskTotalTime) {
 this.minTaskTime = taskTotalTime;
 }
 if (this.maxTaskTime < taskTotalTime) {
 this.maxTaskTime = taskTotalTime;
 }
 if (this.taskTimeoutFlag && taskTotalTime > this.timeout) {
 this.taskTimeoutCount.incrementAndGet();
 }
 if (this.minTaskExecTime > taskExecTime) {
 this.minTaskExecTime = taskExecTime;
 }
 if (this.maxTaskExecTime < taskExecTime) {
 this.maxTaskExecTime = taskExecTime;
 }
 if (this.taskExecTimeoutFlag && taskExecTime > this.execTimeout) {
 this.taskExecTimeoutCount.incrementAndGet();
 }
 if (this.taskWaitInQueueTimeoutFlag && taskWaitInQueueTime > this.waitInQueueTimeout) {
 this.taskWaitInQueueTimeoutCount.incrementAndGet();
 }
 System.out.println("task cost info[ taskTotalTime:" + taskTotalTime + ",taskExecTime:" + taskExecTime + ",taskWaitInQueueTime:" + taskWaitInQueueTime + " ]");

 // 初始线程数、核心线程数、正在执行的任务数量、
 // 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、
 // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
 LOGGER.info("{}-pool-monitor: " +
 " PoolSize: {}, CorePoolSize: {}, Active: {}, " +
 "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
 "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
 this.poolName,
 this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
 this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
 this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());

 }
 super.afterExecute(r, t);
 }

 /**

 * Spring容器管理线程池的生命周期,线程池Bean销毁之前先关闭     * @throws Exception
 */
 @Override
 public void destroy() throws Exception {
 shutdown();
 }

 /**
 * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
 */
 @Override
 public void shutdown() {
 // 统计已执行任务、正在执行任务、未执行任务数量
 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
 this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
 super.shutdown();
 }

 /**
 * 线程池立即关闭时,统计线程池情况
 */
 @Override
 public List<Runnable> shutdownNow() {
 // 统计已执行任务、正在执行任务、未执行任务数量
 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
 this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
 return super.shutdownNow();
 }


 /**

 * 任务平均时长,无已完成任务时,返回 0
 */
 public long getTaskAvgTime() {
 if (this.getCompletedTaskCount() > 0) {
 return this.getTaskTotalTime().get() / this.getCompletedTaskCount();
 }
 return 0;
 }

 /**

 * 任务平均执行时长,无已完成任务时,返回 0
 */
 public long getTaskAvgExecTime() {
 if (this.getCompletedTaskCount() > 0) {
 return this.getTaskTotalExecTime().get() / this.getCompletedTaskCount();
 }
 return 0;
 }
 //省略setter/getter方法
} 
public class TaskStatistics {
 /**
 * 任务提交时间
 */
 private long commitTime;
 /**
 * 任务开始执行时间
 */
 private long startExecTime;

 public TaskStatistics() {
 this.commitTime = System.currentTimeMillis();
 }
}
方法含义
shutdown()线程池延迟关闭时(等待线程池里的任务都执行完毕),统计已执行任务、正在执行任务、未执行任务数量
shutdownNow()线程池立即关闭时,统计已执行任务、正在执行任务、未执行任务数量
beforeExecute(Thread t, Runnable r)任务执行之前,记录任务开始时间,startTimes这个HashMap以任务的hashCode为key,开始时间为值
afterExecute(Runnable r, Throwable t)任务执行之后,计算任务结束时间。统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止信息

注意事项:

  1. 在 afterExecute 方法中需要注意,需要调用 ConcurrentHashMap 的 remove 方法移除并返回任务的开始时间信息,而不是调用 get 方法,因为在高并发情况下,线程池里要执行的任务很多,如果只获取值不移除的话,会使 ConcurrentHashMap 越来越大,引发内存泄漏或溢出问题。

2.2、定时采集

public class ThreadPoolMonitor {

 private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();
 private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));

 private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;

 public ThreadPoolMonitor() {
 }

 public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {
 if (threadPoolExecutor instanceof MonitoredThreadPoolStatisticsExecutor) {
 throw new IllegalArgumentException("MonitoredThreadPoolStatisticsExecutor is already monitored.");
 } else {
 monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
 }
 }

 public static void remove(String name) {
 ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);
 if (futureWrapper != null) {
 futureWrapper.future.cancel(false);
 }

 }

 public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {
 ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);
 if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {
 POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);
 futureWrapper.future.cancel(false);
 }

 }

 static void monitor(MonitoredThreadPoolStatisticsExecutor threadPoolExecutor) {
 monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
 }

 private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {
 PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);
 POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {
 if (v == null) {
 return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);
 } else {
 throw new IllegalStateException("duplicate pool name: " + name);
 }
 });
 }

 static {
 Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));
 }

 static class FutureWrapper {
 private final Future<?> future;
 private final ThreadPoolExecutor threadPoolExecutor;

 public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) {
 this.future = future;
 this.threadPoolExecutor = threadPoolExecutor;
 }
 }
}
public class PoolMonitorTask implements Runnable {

 private final ThreadPoolExecutor monitoredThreadPool;
 private final String poolName;
 private volatile long lastTaskCount = 0L;

 public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) {
 this.monitoredThreadPool = monitoredThreadPool;
 this.poolName = poolName;
 }

 @Override
 public void run() {
 int activeCount = this.monitoredThreadPool.getActiveCount();
 int corePoolSize = this.monitoredThreadPool.getCorePoolSize();
 int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize();
 int queueTaskSize = this.monitoredThreadPool.getQueue().size();
 long taskCount = this.monitoredThreadPool.getTaskCount();
 int executedTask = (int) (taskCount - this.lastTaskCount);
 log.info("线程池名称 = {}, 活跃线程数峰值 = {}, 队列任务数峰值 = {}, 核心线程数 = {}, 最大线程数 = {}, 执行的任务总数 = {}",
 this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask);
 this.lastTaskCount = taskCount;
 if (this.monitoredThreadPool.isTerminated()) {
 ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool);
 }
 }

}

2.3、可视化

通过Kafka获取到监控数据后,可以做一个可视化页面,比如可以展示下面这这些数据

  • active/coreSize :活动线程数和核心线程数的比值, 其中active = executor.getActiveCount(),表示所有运行中的工作线程的数量,这个比值反应线程池的线程活跃状态,如果一直维持在一个很低的水平,则说明线程池需要进行缩容;如果长时间维持一个很大的数值,说明活跃度好,线程池利用率高。

  • active/maxSize :活动线程数和最大线程数的比值,这个值可以配合上面的 active/coreSize 来看,当active/coreSize大于100%的时候,如果active/maxSize维持在一个较低的值,则说明当前线程池的负载偏低,如果大于60%或者更高,则说明线程池过载,需要及时调整线程池容量配置。

  • completedTaskCount:执行完毕的工作线程的总数,包含历史所有。

  • largestPoolSize:历史上线程池容量触达过的最大值

  • rejectCount:被拒绝的线程的数量,如果大量线程被拒绝,则说明当前线程池已经溢出了,需要及时调整线程池配置

  • queueSize:队列中工作线程的数量,如果大量的线程池在排队,说明coreSize已经不够用了,可以根据实际情况来调整,对于执行时间要求很严格的业务场景,可能需要通过提升coreSize来减少排队情况。

3、动态调整线程池

配置线程池的大小可根据以下几个维度进行分析来配置合理的线程数:

任务性质可分为:CPU密集型任务,IO密集型任务,混合型任务,任务的执行时长,任务是否有依赖——依赖其他系统资源,如数据库连接等。

1、CPU密集型任务 尽量使用较小的线程池,一般为CPU核数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。

2、IO密集型任务 可以使用稍大的线程池,一般为2*CPU核数+1。 因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。

3、混合型任务 可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。 因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失

4、依赖其他资源 如某个任务依赖数据库的连接返回的结果,这时候等待的时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才能更好的利用CPU。

总之,线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。

但是实践发现,尽管我们经过谨慎的评估,仍然不能够保证一次计算出来合适的参数,那么我们是否可以将修改线程池参数的成本降下来,这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间呢? 基于这个思考,我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:

3.1、修改参数

实际应用中主要有下列参数可以支持动态修改。

线程池参数说明
corePoolSize核心线程数
maximumPoolSize最大线程数
queueCapacity等待队列大小
timeout任务超时时间告警阈值
execTimeout任务执行超时时间告警阈值
queuedTaskWarningSize等待队列排队数量告警阈值
checkInterval线程池定时监控时间间隔
autoExtend是否自动扩容

其中的corePoolSize、maximumPoolSize都可以使用ThreadPoolExecutor提供的api实现: public void setCorePoolSize(int corePoolSize) public void setMaximumPoolSize(int maximumPoolSize)

从ThreadPoolExecutor源码中可知,

设置新的核心线程数时, 如果设置的新值小于当前值,多余的现有线程将在下一次空闲时终止,如果新设置的corePoolSize值更大,将在需要时启动新线程来执行任何排队的任务;

设置新的最大线程数时,如果新值小于当前值,多余的现有线程将在下一次空闲时终止。

ThreadPoolExecutor没有提供直接修改等待队列大小的api。这就需要我们自定义一个可以修改容量的队列。其实很简单,只要把jdk原生的队列中的容量设置为可以修改,并提供修改方法即可。 比如把jdk中的LinkedBlockingQueue拷贝一份,命名为CapacityResizableLinkedBlockingQueue。 将其capacity的属性变为可变的,并提供set方法:

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

将上述原生代码改为:

private volatile int capacity;

public void setCapacity(int capacity) { 
 this.capacity = capacity;
}

3.2、配置监听

可以通过配置中心的动态加载来处理,以Apollo为例,我们可以利用Apollo的ChangeListener来实现对配置变更的监听,(如果是MySQL,可以修改完配置后直接同过HTTP接口通知客户端进行配置刷新),代码片段如下:

public class ThreadPoolConfigUpdateListener {

 @Value("${apollo.bootstrap.namespaces:application}")
 private String namespace;

 @Autowired
 private DynamicThreadPoolFacade dynamicThreadPoolManager;

 @Autowired
 private DynamicThreadPoolProperties poolProperties;

 @PostConstruct
 public void init() {
 initConfigUpdateListener();
 }

 public void initConfigUpdateListener() {
 String apolloNamespace = namespace;
 if (StringUtils.hasText(poolProperties.getApolloNamespace())) {
 apolloNamespace = poolProperties.getApolloNamespace();
 }
 String finalApolloNamespace = apolloNamespace;
 Config config = ConfigService.getConfig(finalApolloNamespace);
 config.addChangeListener(changeEvent -> {
 try {
 Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000);
 } catch (InterruptedException e) {
 log.error("配置刷新异常",e);
 }
 dynamicThreadPoolManager.refreshThreadPoolExecutor();
 log.info("线程池配置有变化,刷新完成");
 });
 }

}

线程池配置的刷新的逻辑如下:

public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
 try {
 dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> {
 ThreadPoolMonitor threadPoolExecutor =ThreadPoolMonitor.POOL_TASK_FUTURE_MAP.get(poolProperties.getThreadPoolName()).getThreadPoolExecutor();

 executor.setCorePoolSize(poolProperties.getCorePoolSize());
 executor.setMaxPoolSize(poolProperties.getMaximumPoolSize());
 executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime());
 });

 }catch(Exception e){
 log.error("Executor 参数设置异常",e);
 }
 }

3.4、后台管理

当然可以通过管理后台来动态修改,如下图,参数可保存在mysql

最后,一个简单的线程池监控以及动态修改架构如下图:

4、线程池如何保持有序/串行

线程池当然是无序的,但是如果我们需要像kafka那样分区有序怎么办呢?

思路:建立多个只有一个线程的线程池,然后按照不同key 在 不同线程池上执行,这样key就相当于kafka的分区,只要key相同,那么提交的任务就会有序执行。

demo如下

 

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
社会演员多的头像社会演员多普通用户
上一篇 2023年12月4日
下一篇 2023年12月4日

相关推荐