diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index 239f6a28b88dad5b6757533e906c007f043c99e7..6d8aa4cb3dc9f4f688bb3cb8bc3c26cadb709101 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -25,8 +25,11 @@ import com.yomahub.liteflow.spi.holder.ContextAwareHolder; import com.yomahub.liteflow.thread.ExecutorCondition.ExecutorCondition; import com.yomahub.liteflow.thread.ExecutorCondition.ExecutorConditionBuilder; +import java.lang.ref.WeakReference; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -42,10 +45,17 @@ public class ExecutorHelper { /** * 此处使用Map缓存线程池信息 key - 线程池构建者的Class全类名 value - 线程池对象 */ - private final Map executorServiceMap; + private final Map> executorServiceMap; + + /** + * 定时任务线程池 + */ + private static ScheduledThreadPoolExecutor cleanupExecutor = new ScheduledThreadPoolExecutor(1); + private ExecutorHelper() { executorServiceMap = MapUtil.newConcurrentHashMap(); + cleanupExecutor.scheduleAtFixedRate(this::startCleanupTask, 1, 1, TimeUnit.MINUTES); } /** @@ -140,17 +150,19 @@ public class ExecutorHelper { /** * 根据线程执行构建者Class类名获取ExecutorService实例 - */ - private ExecutorService getExecutorService(String clazz, String hash) { + */ + public ExecutorService getExecutorService(String clazz, String hash) { try { String key; if (StrUtil.isBlank(hash)) { key = clazz; }else{ key = StrUtil.format("{}_{}", clazz, hash); - } + } + + WeakReference weakRef = executorServiceMap.get(key); + ExecutorService executorServiceFromCache = (weakRef != null) ? weakRef.get() : null; - ExecutorService executorServiceFromCache = executorServiceMap.get(key); if (ObjectUtil.isNotNull(executorServiceFromCache)) { return executorServiceFromCache; } @@ -158,7 +170,7 @@ public class ExecutorHelper { Class executorClass = (Class) Class.forName(clazz); ExecutorBuilder executorBuilder = ContextAwareHolder.loadContextAware().registerBean(executorClass); ExecutorService executorService = executorBuilder.buildExecutor(); - executorServiceMap.put(key, executorService); + executorServiceMap.put(key, new WeakReference<>(executorService)); return executorService; } } @@ -212,7 +224,27 @@ public class ExecutorHelper { } return executor; - } - - + } + + + // 定期清理未被引用的线程池 + private void startCleanupTask() { + Iterator>> iterator = + executorServiceMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + WeakReference weakRef = entry.getValue(); + + // 如果 WeakReference 指向的线程池已经被回收 + if (weakRef == null || weakRef.get() == null) { + System.out.println("Cleaning up unused thread pool: " + entry.getKey()); + iterator.remove(); + } + } + } + + //获取executorServiceMap + public Map> getExecutorServiceMap() { + return executorServiceMap; + } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/thread/CleanUpThreadTest.java b/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/thread/CleanUpThreadTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8b2caefcc11ef0e1754394ebee7c986f38d86db6 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/thread/CleanUpThreadTest.java @@ -0,0 +1,49 @@ +package com.yomahub.liteflow.test.thread; + +import com.yomahub.liteflow.test.BaseTest; +import com.yomahub.liteflow.thread.ExecutorHelper; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.concurrent.ExecutorService; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * 清理不被引用的线程 + * + * @author jason + */ +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = CleanUpThreadTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.thread"}) +public class CleanUpThreadTest extends BaseTest { + @Test + void testCleanupTask_RemovesUnusedThreadPools() throws Exception { + // 获取线程池并模拟使用 + ExecutorService pool1 = ExecutorHelper.loadInstance().getExecutorService("com.yomahub.liteflow.test.thread" + + ".CustomThreadExecutor", + "hash1"); + + // 验证线程池已经被存储 + assertEquals(1, ExecutorHelper.loadInstance().getExecutorServiceMap().size()); + + // 模拟线程池不再被使用 + pool1 = null; + + // 强制触发垃圾回收 + System.gc(); + + // 等待清理任务运行,清理任务默认 1 分钟运行一次,这里适当调整测试的时间间隔 + Thread.sleep(60000); + + // 验证线程池已经被清理(`WeakReference` 被移除) + assertEquals(0, ExecutorHelper.loadInstance().getExecutorServiceMap().size()); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/thread/CustomThreadExecutor.java b/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/thread/CustomThreadExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..5b9049f610ad83cddbc80114fb2c5377373780e3 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/thread/CustomThreadExecutor.java @@ -0,0 +1,23 @@ +package com.yomahub.liteflow.test.thread; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.thread.ExecutorBuilder; + +import java.util.concurrent.ExecutorService; + +public class CustomThreadExecutor implements ExecutorBuilder { + + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(liteflowConfig.getGlobalThreadPoolSize(), liteflowConfig.getGlobalThreadPoolSize(), + liteflowConfig.getGlobalThreadPoolSize(), "customer-when-1-thead-"); + } + +}