Future接口理论知识复习

Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。

一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务

Future接口常用实现类FutureTask异步任务

Future接口能干什么

Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。

如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。

主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。

代码说话:

Runnable接口

Callable接口

Future接口和FutureTask实现类

目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务(班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

本源的Future接口相关架构

Future编码实战和优缺点分析

优点:Future+线程池异步多线程任务配置,能显著提高程序的执行效率

上述案例case

package com.juc.cf;

import java.util.concurrent.*;

public class FutureThreadPoolDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        long startTime = System.currentTimeMillis();
        FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
            TimeUnit.MICROSECONDS.sleep(500);
            return "task1 over";
        });
        threadPool.submit(futureTask1);

        FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
            TimeUnit.MICROSECONDS.sleep(300);
            return "task2 over";
        });
        threadPool.submit(futureTask2);
        // 加上下面这两个获取异步线程的结果,会比不获取结果要耗时一点但是也比完全同步执行耗时强很多
        System.out.println(futureTask1.get());
        System.out.println(futureTask2.get());

        FutureTask<String> futureTask3 = new FutureTask<String>(() -> {
            TimeUnit.MICROSECONDS.sleep(300);
            return "task3 over";
        });
        threadPool.submit(futureTask3);
        long endTime = System.currentTimeMillis();
        System.out.println("-------costTime: " + (endTime - startTime) + "毫秒");

        threadPool.shutdown();
    }
}

缺点:

get()阻塞:

一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞,他会一直等待异步结果的返回。所以在get()方法里面我们一般会设置等待超时时间。到了指定时间还未获取到结果,直接抛出 java.util.concurrent.TimeoutException。

isDone()轮询:

轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果

如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞

	public static void FutureDone() throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            TimeUnit.SECONDS.sleep(5);
            return "task over";
        });
        threadPool.submit(futureTask);
        System.out.println("-----执行其他任务");
        while (true) {
            if (futureTask.isDone()) {
                System.out.println(futureTask.get());
                break;
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println("异步线程暂未执行完毕");
            }
        }
        threadPool.shutdown();
    }

    public static void FutureBlock() throws InterruptedException, ExecutionException, TimeoutException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            TimeUnit.SECONDS.sleep(5);
            return "task over";
        });
        threadPool.submit(futureTask);
        System.out.println("-----执行其他任务");
        futureTask.get(3, TimeUnit.SECONDS);
        threadPool.shutdown();

    }

结论

Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

对于想完成一些复杂的业务

对于简单的业务场景使用Future完全OK

回到通知:对应Future的完成时间,完成了可以告诉我,也就是我们的回调通知,通过轮询的方式去判断任务是否完成这样非常占用CPU并且代码也不优雅

创建异步任务:Future+线程池配合

多个任务前后依赖可以组合处理:

  • 想要将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值。

  • 将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果。

使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture声明式的方式优雅的处理这些需求

Future能干的,CompletableFuture都能干

CompletableFuture为什么出现

get()方法在 Future 计算完成之前会一直处在阻塞状态下,

isDone()方法容易耗费CPU资源,

对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。

CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

CompletableFuture和CompletionStage源码介绍

架构说明

接口CompletionStage

代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。

CompletableFuture

核心的四个静态方法,来创建一个异步任务

runAsync 无返回值

public static CompletableFuture runAsync(Runnable runnable)

public static CompletableFuture runAsync(Runnable runnable, Executor executor)

supplyAsync 有返回值

public static CompletableFuture supplyAsync(Supplier supplier)

public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

上述Executor executor参数说明

没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。
如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

代码展示:

package com.juc.cf;

import java.util.concurrent.*;

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        runAsyncNoExecutor();
        runAsync();
        supplyAsyncNoExecutor();
        supplyAsync();
    }

    public static void supplyAsync() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // pool-1-thread-1
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello supplyAsyncNoExecutor";
        }, executorService);
        // hello supplyAsyncNoExecutor
        System.out.println(completableFuture.get());
        executorService.shutdown();
    }

    public static void supplyAsyncNoExecutor() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // ForkJoinPool.commonPool-worker-1
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello supplyAsyncNoExecutor";
        });
        // hello supplyAsyncNoExecutor
        System.out.println(completableFuture.get());
    }

    public static void runAsync() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            // pool-1-thread-1
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, executorService);
        // null
        System.out.println(completableFuture.get());
        executorService.shutdown();
    }

    public static void runAsyncNoExecutor() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            // ForkJoinPool.commonPool-worker-1
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // null
        System.out.println(completableFuture.get());
    }
}

code之通用演示,减少阻塞和轮询

从Java8开始引入了CompletableFuture,它是Future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

public static void supplyAsync1() {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        // pool-1-thread-1
        System.out.println(Thread.currentThread().getName() + "come in");
        int result = ThreadLocalRandom.current().nextInt(10);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }, executorService).whenComplete((v, e) -> {
        if (null == e) {
            System.out.println("------计算完成,未发生异常,结果为:" + v);
        }
    }).exceptionally(e -> {
        e.printStackTrace();
        System.out.println("系统发生异常" + e.getCause() + "\t" + e.getMessage());
        return null;
    });

    System.out.println("main主线程执行自己的其他逻辑");
    executorService.shutdown();
}

CompletableFuture的优点

  • 异步任务结束时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法

1.获得结果和触发计算

获得结果

public T get()

不见不散,一直等到结果才返回,会一直阻塞

public T get(long timeout, TimeUnit unit)

过时不候,在指定的timeout时间范围内可以正常返回,超过timeout时间,会报异常

public T join()

作用和get()方法一致,只是不抛出异常

public T getNow(T valueIfAbsent)

在异步线程计算还未完成的情况下,直接将入参返回,即:

计算完,返回计算完成后的结果;没算完,返回设定的valueIfAbsent

主动触发计算

public boolean complete(T value)

是否打断get方法立即返回括号中的值,返回true表示打断了获取异步线程结果的操作,直接返回value值

2.对计算结果进行处理

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

计算结果存在依赖关系,将这两个线程串行化

**异常相关:**由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

计算结果存在依赖关系,将这两个线程串行化

**异常相关:**有异常也可以往下一步走,根据带的异常参数可以进一步处理

总结

3.对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

对比补充:Code之任务之间的顺序执行

  • thenRun(Runnable runnable):任务A执行完执行B,并且B不需要A的结果无返回值
  • thenAccept(Consumer<? super T> action):任务A执行完执行B,B需要A的结果,但是任务B无返回值
  • thenApply(Function<? super T,? extends U> fn):任务A执行完执行B,B需要A的结果,同时任务B有返回值

CompletableFuture和线程池说明

  1. 没有传入自定义线程池,都用默认线程池ForkJoinPoal;
  2. 传入了一个自定义线程池,如果你执行第一个任务的时候,传入了一个自定义线程池:
    调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
    调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
  3. 备注
    有可能处理太快,系统优化切换原则,直接使用main线程处理
    其它如: thenAcceptthenAcceptAsyncthenApplythenApplyAsync等,它们之间的区别也是同理

4.对计算速度进行选用

谁快用谁 applyToEither

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)

5.对计算结果进行合并

两个completionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理

先完成的先等着,等待其他分支任务

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)