CompletableFuture使用示例

CompletableFuture/CompletionStage

Posted by Jay on July 10, 2019

CompletableFuture使用示例

一、示例1

// {@link CompletableFuture} 使用实例
public class CompletableFutureTest {

    /* ------主动完成计算join/get/getNow/complete/completeExceptionally------ */

    private static void test01() throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 1 / 0;
            return 100;
        });
        // join()/get()抛出不同的异常
         future.join();
//        future.get();
    }

    private static void test02() throws Exception {
        final CompletableFuture<Integer> f = compute(); // 未完成的future
        class Client extends Thread {
            CompletableFuture<Integer> f;

            public Client(String name, CompletableFuture<Integer> f) {
                super(name);
                this.f = f;
            }

            @Override
            public void run() {
                try {
                    System.out.println(this.getName() + ": " + f.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }
        new Client("Client1", f).start();
        new Client("Client2", f).start();
        System.out.println("waiting");
        TimeUnit.SECONDS.sleep(2);
        // 主动完成future
         f.complete(1000);
//        f.completeExceptionally(new RuntimeException("test exception"));
    }

    // 未完成的future
    private static CompletableFuture<Integer> compute() {
        return new CompletableFuture<>();
    }

    /* ------创建CompletableFuture supplyAsync------ */

    private static void test03() {
        CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
            // 长时间的异步计算
            return ".00";
        });
    }

    /* ------计算结果完成时的处理whenComplete------*/

    private static void test04() throws Exception {
        CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
            System.out.println("f1 done...");
        });
        CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1.0;
        });

        f2.whenComplete((result, e) -> {
            System.out.println("result: " + result);
        });

        System.in.read();
    }

    private static class CompletableFutureInnerTest {
        private static Random rand = new Random();
        private static long t = System.currentTimeMillis();

        private static int getMoreData() {
            System.out.println("begin to start compute");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("end to start compute. passed " + (System.currentTimeMillis() - t) / 1000 + " seconds");
            return rand.nextInt(1000);
        }

        public static void test05() throws Exception {
            CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(CompletableFutureInnerTest::getMoreData);
            CompletableFuture<Integer> f2 = f1.whenComplete((v, e) -> {
                System.out.println(v);
                System.out.println(e);
            });
            System.out.println(f2.get()); // 返回与f1相同的结果
            System.out.println("end...");
        }
    }

    /* ------转换thenApplyAsync/thenApply------ */

    private static void test06() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> 100)
                .thenApplyAsync(i -> i * 10).thenApply(Object::toString);
        System.out.println(future.get());
    }

    /* ------纯消费(执行Action) thenAccept/thenAcceptBoth/thenRun------*/

    private static void test07() throws Exception {
        CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> 100).thenAccept(System.out::println);
        System.out.println(f.get());
    }

    private static void test08() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> 100)
                .thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y));
        System.out.println(future.get());
    }

    private static void test09() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> 100)
                .thenRun(() -> System.out.println("finished"));
        System.out.println(future.get());
    }

    /* ------组合thenCompose/thenCombine------*/

    /**
     * {@link CompletableFuture#thenCompose(Function)} future之间有先后依赖顺序
     */
    private static void test10() throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> 100)
                .thenCompose(i -> CompletableFuture.supplyAsync(() -> String.valueOf(i * 10)));
        System.out.println(future.get());
    }

    private static void test11() {
        final String original = "message";
        CompletableFuture<String> cf = CompletableFuture
                .completedFuture(original)
                .thenApply(String::toUpperCase)
                .thenCompose(upper -> CompletableFuture
                        .completedFuture(original)
                        .thenApply(String::toLowerCase)
                        .thenApply(s -> upper + s)
                );
        System.out.println("MESSAGEmessage: " + cf.join());
    }

    /**
     * {@link CompletableFuture#thenCombine(CompletionStage, BiFunction)} 并行执行,future之间没有先后依赖顺序
     */
    private static void test12() throws Exception {
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "abc");
        CompletableFuture<String> f = f1.thenCombine(f2, (x, y) -> x + "-" + y);
        System.out.println(f.get());
    }

    /* ------Either applyToEither------*/

    private static void test13() throws Exception {
        Random random = new Random();
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        });
        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 200;
        });
        // 谁先完成,先用谁
        CompletableFuture<String> f = f1.applyToEither(f2, Object::toString);
        System.out.println(f.get());
    }

    /* ------anyOf 与 allOf------ */

    private static void test14() throws Exception {
        Random random = new Random();
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        });
        CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 200;
        });
        // 谁先完成,就完成计算,返回该结果
        CompletableFuture<Object> f = CompletableFuture.anyOf(f1, f2);
        System.out.println(f.get());
    }

    public static void main(String[] args) throws Exception {

//         test01();
//         test02();
//         test03();
//         test04();
//        CompletableFutureInnerTest.test05();
//         test06();
//         test07();
//         test08();
//         test09();
//         test10();
//         test11();
//         test12();
//         test13();
         test14();
    }

}

二、示例2

// {@link CompletableFuture} Util
public class CompletableFutureUtil {

    // 将多个CompletableFuture组合成一个CompletableFuture,这个组合后的CompletableFuture的
    // 计算结果是个List,它包含前面所有的CompletableFuture的计算结果
    //
    // @param futures {@link CompletableFuture<T>}
    // @param <T>     Type Parameter
    // @return {@link CompletableFuture<List<T>>}
    public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
        CompletableFuture<Void> allDoneFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        return allDoneFutures.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }

     // 将多个CompletableFuture组合成一个CompletableFuture,这个组合后的CompletableFuture的
     // 计算结果是个List,它包含前面所有的CompletableFuture的计算结果
     //
     // @param futures {@link CompletableFuture<T>}
     // @param <T>     Type Parameter
     // @return {@link CompletableFuture<List<T>>}
    public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
        List<CompletableFuture<T>> futureList = futures.filter(Objects::nonNull).collect(Collectors.toList());
        return sequence(futureList);
    }

    // 实现JDK {@code Future<T>} 与 {@code CompletableFuture<T>}的转换
    //
    // @param future   {@link Future}
    // @param executor {@link Executor}
    // @param <T>      Type Parameter
    // @return {@link CompletableFuture<T>}
    public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return future.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }, executor);
    }

}

三、示例3

// {@link CompletableFuture} 使用实例2
public class CompletableFutureTest2 {

    // 1.创建完成的CompletableFuture
    private static void completedFutureExample() {
        CompletableFuture<String> future = CompletableFuture.completedFuture("message");
        assertTrue(future.isDone());
        assertEquals("message", future.getNow(null));
    }

   // 2.运行简单的异步场景
   // <p>
   // - CompletableFuture 是异步执行方式.
   // - 使用 ForkJoinPool 实现异步执行,这种方式使用了 daemon 线程执行 Runnable 任务.
    private static void runAsyncExample() {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            assertTrue(Thread.currentThread().isDaemon()); // ForkJoin pool daemon线程
            randomSleep();
        });

        assertFalse(future.isDone());
        sleepEnough();
        assertTrue(future.isDone());
    }

    private static void sleepEnough() {
        try {
            TimeUnit.SECONDS.sleep(6);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void randomSleep() {
        Random random = new Random();
        try {
            TimeUnit.SECONDS.sleep(random.nextInt(5) + 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 3.同步执行动作示例
    private static void thenApplyExample() {
        // 同步执行
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApply(str -> {
            assertFalse(Thread.currentThread().isDaemon()); // main线程
            return str.toUpperCase();
        });
        assertEquals("MESSAGE", cf.getNow(null));
    }

    // 4.异步执行动作示例
    private static void thenApplyAsyncExample() {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
            assertTrue(Thread.currentThread().isDaemon()); // ForkJoin pool daemon线程
            randomSleep();
            return s.toUpperCase();
        });
        assertNull(cf.getNow(null));
        assertEquals("MESSAGE", cf.join());
    }

    // 5.使用固定的线程池完成异步执行动作示例
    private static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
        int count = 1;

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "custom-executor-" + count++);
        }
    });

    private static void thenApplyAsyncWithExecutorExample() {
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
            assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
            assertFalse(Thread.currentThread().isDaemon());
            randomSleep();
            return s.toUpperCase();
        }, executor);
        assertNull(cf.getNow(null));
        assertEquals("MESSAGE", cf.join());
    }

    // 6.作为消费者消费计算结果示例
    private static void thenAcceptExample() {
        StringBuilder sb = new StringBuilder();
        CompletableFuture.completedFuture("thenAccept message").thenAccept(sb::append);
        assertTrue("Result was empty", sb.length() > 0);
    }

    // 7.异步消费示例
    private static void thenAcceptAsyncExample() {
        StringBuilder result = new StringBuilder();
        CompletableFuture<Void> cf = CompletableFuture.completedFuture("thenAcceptAsync message")
                .thenAcceptAsync(result::append);
        cf.join();
        assertTrue("Result was empty", result.length() > 0);
    }

    // 8.运行两个阶段后执行
    private static void runAfterBothExample() {
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
                CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
                () -> result.append("done"));
        assertTrue("Result was empty", result.length() > 0);
    }

    private static void thenAcceptBothExample() {
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
                CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
                (s1, s2) -> result.append(s1).append(s2));
        assertEquals("MESSAGEmessage", result.toString());
    }

    public static void main(String[] args) throws Exception {

        // completedFutureExample();
        // runAsyncExample();
        // thenApplyExample();
        // thenApplyAsyncExample();
        // thenApplyAsyncWithExecutorExample();
        // thenAcceptExample();
        // thenAcceptAsyncExample();
        // runAfterBothExample();
        thenAcceptBothExample();

    }

}

参考