V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
HeyHudy
V2EX  ›  Java

自荐 Java 多线程神器——ThreadForge ,让多线程从此简单

  •  
  •   HeyHudy · 21 小时 8 分钟前 · 1212 次点击

    新春临近,先住各位 V 友新年快乐~

    从场景切入

    传统的 ExecutorServiceFutureCompletableFuture 非常强大,但写起来比较麻烦:

    • 线程池要手动创建和关闭
    • 超时逻辑每个任务都要写一遍
    • 失败了要不要取消其他任务?得自己判断
    • 异常怎么传播?要么吞掉,要么手动包装
    • 想知道任务跑了多久?自己打日志

    我一直在思考怎么能让 Javaer 用多线程的时候能简单点,少点弯弯绕绕,于是诞生了 ThreadForge 。

    ThreadForge:把复杂度收敛到一个可推理的模型里

    ThreadForge 的设计哲学很简单:先降低认知成本,再追求性能。

    可以把它理解成一个结构化并发框架——让你用写同步代码的思维写并发代码,同时自动处理那些容易遗漏的边界情况。

    也可以把它理解成对于 Java 内置并发工具的二次包装,目标是让 Java 并发更简单、更清晰。

    什么是结构化?

    看一个最简单的例子:

    try (ThreadScope scope = ThreadScope.open()) {
        Task<String> user = scope.submit("load-user", () -> fetchUser());
        Task<Integer> orders = scope.submit("load-orders", () -> fetchOrders());
        
        scope.await(user, orders);
        
        // 到这里,两个任务肯定都结束了(成功、失败或超时)
        String result = user.await() + ":" + orders.await();
    }
    // scope 关闭时,所有任务自动取消、资源自动清理
    

    这段代码有几个关键点:

    1. 所有任务都绑定在 ThreadScope,生命周期有边界,不会泄漏
    2. 默认就是安全的:默认超时、默认失败传播、自动取消
    3. 代码结构就是任务关系:读代码的人一眼就能看出两个任务是并发的,且必须都完成才能继续

    对比传统写法,你需要:

    • 创建线程池,配置核心线程数、队列大小
    • 提交任务,手动处理 Future
    • 写 try-finally 确保 shutdown
    • 手动处理超时和异常传播

    这里其实就能看出来 ThreadForge 的设计初衷和目标了,就是努力让我们省掉这些重复劳动,专注业务逻辑。

    五个让你省脑力的设计

    1. 默认行为就是正确的

    // 默认:FAIL_FAST + 30 秒超时 + 自动取消其他任务
    try (ThreadScope scope = ThreadScope.open()) {
        Task<Integer> a = scope.submit(() -> riskyRpc());
        Task<Integer> b = scope.submit(() -> anotherRpc());
        scope.await(a, b);
    } catch (ScopeTimeoutException timeout) {
        // 超时了,所有任务已被自动取消
        fallback();
    } catch (FailurePropagationException failed) {
        // 某个任务失败了,其他任务已被自动取消
        handleError(failed);
    }
    

    不需要配置,不需要思考,开箱即用。

    2. 失败策略明确且统一

    不同场景对失败的容忍度不同,ThreadForge 提供了 5 种明确的策略:

    • FAIL_FAST:快速失败,立即取消其他任务(默认)
    • COLLECT_ALL:等所有任务结束,汇总所有失败
    • SUPERVISOR:不自动取消,失败信息收集到 Outcome
    • CANCEL_OTHERS:失败后取消其余任务,但不抛异常
    • IGNORE_ALL:忽略失败,只返回成功的结果
    // 场景:批量导入,即使部分失败也要知道哪些成功了
    try (ThreadScope scope = ThreadScope.open()
            .withFailurePolicy(FailurePolicy.SUPERVISOR)) {
        
        List<Task<Void>> tasks = ids.stream()
            .map(id -> scope.submit(() -> importData(id)))
            .collect(toList());
        
        Outcome outcome = scope.await(tasks);
        
        // 明确知道哪些成功、哪些失败
        log.info("成功: {}, 失败: {}", 
            outcome.successCount(), outcome.failureCount());
    }
    

    3. 并发度控制不再需要手动管理队列

    // 场景:调用外部 API,最多同时 50 个请求
    try (ThreadScope scope = ThreadScope.open()
            .withConcurrencyLimit(50)) {
        
        List<Task<Result>> tasks = hugeIdList.stream()
            .map(id -> scope.submit(() -> externalApi.call(id)))
            .collect(toList());
        
        List<Result> results = scope.awaitAll(tasks);
    }
    // 自动限流,不会把外部服务打爆
    

    不需要自己写信号量,也不需要手动分批,框架自动处理。

    4. 生命周期观测统一收口

    ThreadScope scope = ThreadScope.open()
        .withHook(new ThreadHook() {
            @Override
            public void onStart(TaskInfo info) {
                metrics.taskStarted(info.name());
            }
            
            @Override
            public void onSuccess(TaskInfo info, Duration duration) {
                metrics.taskSuccess(info.name(), duration.toMillis());
            }
            
            @Override
            public void onFailure(TaskInfo info, Throwable error, Duration duration) {
                log.error("Task {} failed after {}", info.name(), duration, error);
                metrics.taskFailed(info.name());
            }
        });
    

    不需要在每个任务里重复写日志和监控代码,同时新的 1.0.2 版本中内置了 ScopeMetricsSnapshot 作为观测点,直接 .toString() 就能看到完整的调用耗时等情况 。

    5. 跨 JDK 版本的一致体验

    // 同一套 API
    try (ThreadScope scope = ThreadScope.open()) {
        // JDK 21+: 自动使用虚拟线程
        // JDK 8-20: 自动降级到线程池
        Task<String> task = scope.submit(() -> longRunningTask());
        return task.await();
    }
    

    不需要分叉代码,不需要写 if-else,框架自动适配。

    适用场景

    ThreadForge 特别适合这些场景:

    并发 RPC 聚合

    try (ThreadScope scope = ThreadScope.open()) {
        Task<User> user = scope.submit(() -> userService.get(uid));
        Task<List<Order>> orders = scope.submit(() -> orderService.list(uid));
        Task<Profile> profile = scope.submit(() -> profileService.get(uid));
        
        scope.await(user, orders, profile);
        
        return buildResponse(user.await(), orders.await(), profile.await());
    }
    

    批量数据处理

    try (ThreadScope scope = ThreadScope.open()
            .withConcurrencyLimit(100)
            .withDeadline(Duration.ofMinutes(5))) {
        
        List<Task<Void>> tasks = records.stream()
            .map(r -> scope.submit(() -> process(r)))
            .collect(toList());
        
        scope.awaitAll(tasks);
    }
    

    生产者-消费者模式

    try (ThreadScope scope = ThreadScope.open()) {
        Channel<Data> channel = Channel.bounded(1000);
        
        scope.submit(() -> {
            for (Data d : datasource) {
                channel.send(d);
            }
            channel.close();
            return null;
        });
        
        List<Task<Void>> consumers = IntStream.range(0, 4)
            .mapToObj(i -> scope.submit(() -> {
                for (Data d : channel) {
                    process(d);
                }
                return null;
            }))
            .collect(toList());
        
        scope.awaitAll(consumers);
    }
    

    开始使用

    Maven:

    <dependency>
        <groupId>pub.lighting</groupId>
        <artifactId>threadforge-core</artifactId>
        <version>1.0.2</version>
    </dependency>
    

    Gradle:

    implementation("pub.lighting:threadforge-core:1.0.2")
    

    最小示例:

    try (ThreadScope scope = ThreadScope.open()) {
        Task<String> task = scope.submit(() -> "Hello, ThreadForge");
        System.out.println(task.await());
    }
    

    📦 GitHub: github.com/wuuJiawei/ThreadForge
    📖 文档: 见项目 docs/api/README.md
    📄 License: MIT

    最后

    感谢所有看到这里的朋友。

    JDK21 之后,官方团队也跟进了结构化并发类,可以称这个项目是又一个轮子,也可以称它是在工程化里面的一次探讨和另一种解决方案,毕竟给低版本的 JDK 也提供了可能性。

    欢迎点赞、评论,如果有任何问题,也欢迎提出您的宝贵意见。

    jimeng-2026-02-10-3757-「 ThreadForge 」 这是我新开发的开源项目,帮我做一个 logo 。Java....png

    这是让即梦画的 logo ,看起来有点意思,像是个老派的项目。

    4 条回复    2026-02-13 11:45:58 +08:00
    cloudzhou
        1
    cloudzhou  
       20 小时 50 分钟前
    大同小异,但是最终要走向 Java 结构化并发类,如果真要实现,那么就基于 Java 结构化并发类二次开发

    这是我以前的理念:
    https://www.sunp.eu.org/t/916816#r_12705296
    keepfun
        2
    keepfun  
       20 小时 29 分钟前
    good
    1ffree
        3
    1ffree  
       19 小时 45 分钟前
    有点意思
    andforce
        4
    andforce  
       18 小时 24 分钟前
    Kotlin 的协程,我已经很久没有手动写多线程了
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   Solana   ·   769 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 22:10 · PVG 06:10 · LAX 14:10 · JFK 17:10
    ♥ Do have faith in what you're doing.