HOOOS

CompletableFuture在Spring和Netty等开源项目中的应用实践

0 67 代码小工 JavaCompletableFuture并发编程
Apple

你好,我是你的Java学习伙伴“代码小工”。今天咱们来聊聊Java并发编程中的一个利器——CompletableFuture,以及它在一些著名开源项目,特别是Spring Framework和Netty中的应用。

1. 为什么要用CompletableFuture?

在咱们日常开发中,经常会遇到需要异步处理任务的场景。比如说,你要从多个数据源获取数据,然后汇总结果;或者你要执行一个耗时操作,不想阻塞主线程。传统的Future虽然能实现异步,但用起来总感觉差点意思,比如:

  • 获取结果需要阻塞等待(get()方法),或者轮询查看是否完成(isDone()方法),不够优雅。
  • 多个Future之间的组合、编排比较麻烦,代码写起来不够简洁。
  • 异常处理也比较繁琐。

CompletableFuture的出现,就是为了解决这些痛点。它是Java 8引入的一个强大的异步编程工具,提供了更灵活、更强大的异步编程能力。它基于回调机制,可以让你以非阻塞的方式处理异步任务的结果,并且支持链式调用、组合多个异步任务、声明式异常处理等特性。

简单来说,CompletableFuture就像一个“任务管家”,你可以把任务交给它,然后告诉它:“任务完成后,帮我做这个、做那个,如果出错了,就那样处理”。它会帮你打理好一切,你只需要关注最终的结果就行了。

2. CompletableFuture基础入门

在深入开源项目之前,咱们先快速过一下CompletableFuture的基础用法,这样后面看源码的时候才能更容易理解。

2.1 创建CompletableFuture

创建CompletableFuture对象主要有以下几种方式:

  • CompletableFuture.runAsync(): 执行一个没有返回值的异步任务。

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务执行完成");
    });
    
  • CompletableFuture.supplyAsync(): 执行一个有返回值的异步任务。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        // 模拟耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "任务执行结果";
    });
    
  • CompletableFuture.completedFuture(): 直接创建一个已经完成的CompletableFuture

    CompletableFuture<String> future = CompletableFuture.completedFuture("直接返回结果");
    

2.2 获取异步任务的结果

  • get()join():这两个方法都会阻塞当前线程,直到CompletableFuture完成并返回结果。区别在于get()方法会抛出受检异常(InterruptedExceptionExecutionException),而join()方法不会。

        try {
            String result = future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    
  • getNow(valueIfAbsent): 如果CompletableFuture已经完成,则立即返回结果;否则返回给定的默认值valueIfAbsent

    String result = future.getNow("默认值");
    

2.3 结果处理和转换

CompletableFuture最强大的地方在于它提供了一系列方法,让你可以在异步任务完成后,以非阻塞的方式处理结果,或者对结果进行转换。

  • thenApply(): 对CompletableFuture的结果进行转换。

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> "123")
            .thenApply(Integer::parseInt);
    
  • thenAccept(): 消费CompletableFuture的结果,没有返回值。

    CompletableFuture.supplyAsync(() -> "Hello")
            .thenAccept(System.out::println);
    
  • thenRun(): CompletableFuture完成后执行一个Runnable,不关心结果,也没有返回值。

    CompletableFuture.runAsync(() -> System.out.println("任务1"))
            .thenRun(() -> System.out.println("任务2"));
    

2.4 异步任务组合

CompletableFuture还支持将多个异步任务组合起来,形成一个更复杂的异步流程。

  • thenCompose(): 将两个CompletableFuture串联起来,前一个任务的结果作为后一个任务的输入。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
            .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
    
  • thenCombine(): 将两个CompletableFuture的结果组合起来。

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
    CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
    
  • allOf(): 等待所有给定的CompletableFuture完成。

    CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
    
  • anyOf(): 只要给定的CompletableFuture中有一个完成,就返回。

    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
    

2.5 异常处理

CompletableFuture提供了exceptionally()方法来处理异步任务中发生的异常。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("发生异常");
})
.exceptionally(ex -> {
    System.err.println("捕获到异常: " + ex.getMessage());
    return "默认值";
});

3. Spring Framework中的CompletableFuture

Spring Framework从4.2版本开始,对CompletableFuture提供了全面支持。咱们来看看CompletableFuture在Spring中是如何应用的。

3.1 @Async注解

Spring的@Async注解大家应该都不陌生,它可以将一个方法标记为异步执行。在底层,Spring会使用TaskExecutor来执行这些异步方法。从Spring 4.2开始,@Async注解标注的方法可以直接返回CompletableFuture

@Service
public class MyService {

    @Async
    public CompletableFuture<String> doSomethingAsync() {
        // 模拟耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return CompletableFuture.completedFuture("任务完成");
    }
}

这样,你就可以在调用doSomethingAsync()方法时,立即得到一个CompletableFuture对象,然后通过它来处理异步任务的结果。

3.2 ListenableFuture

CompletableFuture出现之前,Spring提供了ListenableFuture接口来实现异步回调。ListenableFuture可以添加回调函数,在异步任务完成或失败时触发。

ListenableFuture<String> future = ...; // 获取ListenableFuture
future.addCallback(new ListenableFutureCallback<String>() {
    @Override
    public void onSuccess(String result) {
        // 成功回调
    }

    @Override
    public void onFailure(Throwable ex) {
        // 失败回调
    }
});

Spring提供了CompletableToListenableFutureAdapter类,可以将CompletableFuture转换为ListenableFuture,从而与Spring的异步回调机制集成。

3.3 Spring WebFlux

Spring WebFlux是Spring 5引入的响应式Web框架,它完全基于异步非阻塞的编程模型。在WebFlux中,CompletableFuture得到了广泛应用。Controller方法的返回值可以是CompletableFuture,表示异步处理请求。

@RestController
public class MyController {

    @GetMapping("/hello")
    public CompletableFuture<String> hello() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, World!";
        });
    }
}

WebFlux底层使用Reactor或RxJava等响应式库,它们都与CompletableFuture有很好的集成。

4. Netty中的CompletableFuture

Netty是一个高性能的异步事件驱动的网络编程框架。在Netty中,异步操作非常常见,比如连接建立、数据读写等。Netty 4.0开始引入了FuturePromise接口,用于处理异步操作的结果。虽然Netty的Future和Java的Future同名,但它们是不同的接口。Netty的Future提供了更丰富的异步操作方法。

Netty中的ChannelFuture就是一个Future,它表示一个Channel上的异步操作的结果,比如连接操作、写操作等。

ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8080);
connectFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            // 连接成功
        } else {
            // 连接失败
        }
    }
});

虽然Netty的Future已经很强大了,但CompletableFuture的出现,还是给Netty带来了一些新的可能性。比如,你可以将Netty的ChannelFuture转换为CompletableFuture,然后利用CompletableFuture的链式调用、组合等特性,编写更简洁、更易读的异步代码。

Netty社区也提供了一个netty-incubator-codec-quic的孵化器项目,其中用到了一种QuicFuture extends CompletableFuture,感兴趣的同学可以自行研究。

5. 总结与思考

CompletableFuture是Java并发编程的一个重要工具,它提供了更强大、更灵活的异步编程能力。在Spring Framework和Netty等开源项目中,CompletableFuture都得到了广泛应用。通过学习这些开源项目中的CompletableFuture用法,我们可以更好地理解它的设计思想和最佳实践。

当然了,我还是得强调一下,虽然CompletableFuture很强大,但也不是万能的。在实际开发中,咱们还是要根据具体场景选择合适的异步编程工具。比如,如果你的项目已经大量使用了RxJava或Project Reactor等响应式库,那么直接使用这些库提供的异步编程模型可能更合适。

最后留几个小问题给你思考一下:

  1. 除了文中提到的Spring和Netty,你还在哪些开源项目中看到过CompletableFuture的应用?
  2. CompletableFutureallOf()anyOf()方法在实际开发中有什么应用场景?
  3. CompletableFuture的异常处理机制有什么优缺点?

希望今天的分享对你有所帮助。如果你有任何问题或想法,欢迎在评论区留言,咱们一起交流学习!

点评评价

captcha
健康