Featured image of post 多线程之 completableFuture

多线程之 completableFuture

Image

先谈谈Future

     Callable与Runnable的功能大致相似,但是call()函数有返回值. Callable一般是和ExecutorService配合来使用的

    Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成

    在Future接口中声明了5个方法
  •    cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
    
  •    isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。 
    
  •   isDone方法表示任务是否已经完成,若任务完成,则返回true; 
    
  •   get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回; 
    
  •   get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
    

    也就是说Future提供了三种功能:

    1)判断任务是否完成;

    2)能够中断任务;

    3)能够获取任务执行结果。

      因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了FutureTask。
    

    来两个demo:

  1 public static void futureDemo1() throws ExecutionException, InterruptedException {
  2
  3    ThreadPoolExecutor pool = CommonThreadPool.getPool();
  4    Future<Integer> f = pool.submit(() -> {
  5      // 长时间的异步计算
  6      Thread.sleep(2000);
  7      // 然后返回结果
  8      return 100;
  9    });
 10    while (!f.isDone()) {
 11      System.out.println(System.currentTimeMillis() + " 还没结束");
 12    }
 13    //结束后,获取结果
 14    System.out.println(f.get());
 15
 16  }
 17
 18```java
 19
 20        Future只实现了异步,而没有实现回调,主线程get时会阻塞,可以轮询以便获取异步调用是否完成。在实际的使用中建议使用Guava ListenableFuture 来实现异步非阻塞,目的就是多任务异步执行,通过回调的方式来获取执行结果而不需轮询任务状态。
 21
 22```java
 23 public static void futureDemo2() {
 24
 25    ListeningExecutorService executorService = MoreExecutors
 26        .listeningDecorator(CommonThreadPool.getPool());
 27
 28    IntStream.rangeClosed(1, 10).forEach(i -> {
 29      ListenableFuture<Integer> listenableFuture = executorService
 30          .submit(() -> {
 31            // 长时间的异步计算
 32            // Thread.sleep(3000);
 33            // 然后返回结果
 34            return 100;
 35          });
 36
 37      Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
 38        @Override
 39        public void onSuccess(Integer result) {
 40          System.out.println("get listenable future's result with callback " + result);
 41        }
 42
 43        @Override
 44        public void onFailure(Throwable t) {
 45          t.printStackTrace();
 46        }
 47      }, executorService);
 48    });
 49  }
 50
 51```java
 52
 53CompletableFuture
 54
 55        Futrue对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
 56
 57Java 8, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能
 58
 59        CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。
 60
 61        CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAcceptthenApplythenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理
 62
 63       下面将会一个个的例子来说明CompletableFuture
 64
 65异步执行
 66
 67```cs
 68/**
 69   *
 70   * public static CompletableFuture<Void>   runAsync(Runnable runnable)
 71   * public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
 72   * public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
 73   * public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
 74   *
 75   *
 76   * 以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。
 77   *
 78   * runAsync方法也好理解,它以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。
 79   *
 80   * supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
 81   */
 82public static void runAsyncExample() throws ExecutionException, InterruptedException {
 83
 84    CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
 85      System.out.println("异常执行代码");
 86    });
 87
 88    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
 89      //长时间的计算任务
 90      return "·00";
 91    });
 92
 93    System.out.println(future.join());
 94
 95  }
 96
 97```java
 98
 99计算结果完成时的处理
100
101```xml
102 /**
103   *
104   * 当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:
105   *
106   * whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T>
107   * whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T>
108   * whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public
109   * CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)
110   *
111   * 不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。
112   * Java的CompletableFuture类总是遵循这样的原则
113   *
114   * 如果你希望不管 CompletableFuture 运行正常与否 都执行一段代码,如释放资源,更新状态,记录日志等,但是同时不影响原来的执行结果。
115   * 那么你可以使用 whenComplete 方法。exceptionally非常类似于 catch(),而 whenComplete 则非常类似于 finally:
116   */
117  public static void whenComplete() throws ExecutionException, InterruptedException {
118
119    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
120      @Override
121      public Integer get() {
122        return 2323;
123      }
124    });
125    Future<Integer> f = future.whenComplete((v, e) -> {
126      System.out.println(v);
127      System.out.println(e);
128    });
129    System.out.println(f.get());
130
131  }

handle 是执行任务完成时对结果的处理

  1private static class HttpResponse {
  2
  3    private final int status;
  4    private final String body;
  5
  6    public HttpResponse(final int status, final String body) {
  7      this.status = status;
  8      this.body = body;
  9    }
 10
 11    @Override
 12    public String toString() {
 13      return status + " - " + body;
 14    }
 15  }
 16
 17  /**
 18   * handle 是执行任务完成时对结果的处理。handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。
 19   * 这组方法兼有whenComplete和转换的两个功能
 20   *
 21   * public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
 22   * public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
 23   * public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
 24   *
 25   * thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
 26   * public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
 27   * public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
 28   * public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
 29   */
 30  public static void handle() throws ExecutionException, InterruptedException {
 31
 32    for (final boolean failure : new boolean[]{false, true}) {
 33
 34      CompletableFuture<Integer> x = CompletableFuture.supplyAsync(() -> {
 35        if (failure) {
 36          throw new RuntimeException("Oops, something went wrong");
 37        }
 38        return 42;
 39      });
 40
 41      /** * Returns a new CompletableFuture that, when this CompletableFuture completes either normally or exceptionally, * is executed with this stage's result and exception as arguments to the supplied function. */
 42      CompletableFuture<HttpResponse> tryX = x
 43          // Note that tryX and x are of different type.
 44          .handle((value, ex) -> {
 45            if (value != null) {
 46              // We get a chance to transform the result...
 47              return new HttpResponse(200, value.toString());
 48            } else {
 49              // ... or return details on the error using the ExecutionException's message:
 50              return new HttpResponse(500, ex.getMessage());
 51            }
 52          });
 53
 54      // Blocks (avoid this in production code!), and either returns the promise's value:
 55      System.out.println(tryX.get());
 56      System.out.println("isCompletedExceptionally = " + tryX.isCompletedExceptionally());
 57
 58    }
 59
 60```java
 61
 62转换
 63
 64```cs
 65 /**
 66   * 转换
 67   * @throws ExecutionException
 68   * @throws InterruptedException
 69   */
 70  public static void thenApply() throws ExecutionException, InterruptedException {
 71    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
 72      return 100;
 73    });
 74    CompletableFuture<String> f =  future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString());
 75    //"1000"
 76    System.out.println(f.get());
 77  }
 78
 79```java
 80
 81Action
 82
 83```xml
 84/**
 85   * 上面的方法是当计算完成的时候,会生成新的计算结果(thenApply, handle),或者返回同样的计算结果whenComplete
 86   * CompletableFuture还提供了一种处理结果的方法,只对结果执行Action,而不返回新的计算值,因此计算值为Void:
 87   *
 88   * public CompletableFuture<Void>   thenAccept(Consumer<? super T> action)
 89   * public CompletableFuture<Void>   thenAcceptAsync(Consumer<? super T> action)
 90   * public CompletableFuture<Void>   thenAcceptAsync(Consumer<? super T> action, Executor executor)
 91   */
 92  public static void action() throws ExecutionException, InterruptedException {
 93
 94    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
 95      return 100;
 96    });
 97    CompletableFuture<Void> f =  future.thenAccept(System.out::println);
 98    System.out.println(f.get());
 99
100  }
101
102```java
103
104thenAccept
105
106```xml
107 /**
108   * thenAcceptBoth以及相关方法提供了类似的功能,当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另外一个异步的结果。
109   * runAfterBoth是当两个CompletionStage都正常完成计算的时候,执行一个Runnable,这个Runnable并不使用计算的结果。
110   *
111   * public <U> CompletableFuture<Void>   thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
112   * public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
113   * public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
114   * public     CompletableFuture<Void>   runAfterBoth(CompletionStage<?> other,  Runnable action)
115   */
116  public static void thenAcceptBoth() throws ExecutionException, InterruptedException {
117    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
118      return 100;
119    });
120    CompletableFuture<Void> f =  future.thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y));
121    System.out.println(f.get());
122
123  }

thenRun

  1/**
  2   * 当计算完成的时候会执行一个Runnable,与thenAccept不同,Runnable并不使用CompletableFuture计算的结果。
  3   *
  4   * public CompletableFuture<Void>   thenRun(Runnable action)
  5   * public CompletableFuture<Void>   thenRunAsync(Runnable action)
  6   * public CompletableFuture<Void>   thenRunAsync(Runnable action, Executor executor)
  7   */
  8  public static void  thenRun() throws ExecutionException, InterruptedException {
  9
 10    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
 11      return 100;
 12    });
 13    CompletableFuture<Void> f =  future.thenRun(() -> System.out.println("finished"));
 14    System.out.println(f.get());
 15
 16  }
 17
 18```java
 19
 20复合
 21
 22```xml
 23/**
 24   * thenCombine用来复合另外一个CompletionStage的结果。它的功能类似
 25   *
 26   * A +
 27   *   |
 28   *   +------> C
 29   *   +------^
 30   * B +
 31   *
 32   * 两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。
 33   *
 34   * public <U,V> CompletableFuture<V>   thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
 35   * public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
 36   * public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
 37   *
 38   * 其实从功能上来讲,它们的功能更类似thenAcceptBoth,只不过thenAcceptBoth是纯消费,它的函数参数没有返回值,而thenCombine的函数参数fn有返回值。
 39   */
 40  public static void thenCombine() throws ExecutionException, InterruptedException {
 41
 42    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
 43      return 100;
 44    });
 45    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
 46      return "abc";
 47    });
 48    CompletableFuture<String> f =  future.thenCombine(future2, (x,y) -> y + "-" + x);
 49    System.out.println(f.get()); //abc-100
 50
 51  }
 52
 53```java
 54
 55组合
 56
 57```cs
 58/**
 59   * 组合
 60   * 这一组方法接受一个Function作为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture,
 61   * 这个新的CompletableFuture会组合原来的CompletableFuture和函数返回的CompletableFuture。因此它的功能类似: A +--> B +---> C
 62   *
 63   * thenCompose返回的对象并不是函数fn返回的对象,如果原来的CompletableFuture还没有计算出来,它就会生成一个新的组合后的CompletableFuture。
 64   */
 65  public static void thenCompose() throws ExecutionException, InterruptedException {
 66
 67    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
 68      return 100;
 69    });
 70    CompletableFuture<String> f =  future.thenCompose( i -> {
 71      return CompletableFuture.supplyAsync(() -> {
 72        return (i * 10) + "";
 73      });
 74    });
 75    System.out.println(f.get()); //1000
 76
 77  }
 78
 79```java
 80
 81Either
 82
 83```xml
 84/**
 85   * Either 语义:表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行。
 86   *
 87   * public CompletableFuture<Void>   acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
 88   * public CompletableFuture<Void>   acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
 89   * public CompletableFuture<Void>   acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
 90   * public <U> CompletableFuture<U>   applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
 91   * public <U> CompletableFuture<U>   applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
 92   * public <U> CompletableFuture<U>   applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
 93   *
 94   *
 95   * acceptEither方法是当任意一个CompletionStage完成的时候,action这个消费者就会被执行。这个方法返回CompletableFuture<Void>
 96   *
 97   * applyToEither方法是当任意一个CompletionStage完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。
 98   */
 99  public static void either() {
100
101    Random random = new Random();
102
103    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
104
105      try {
106        Thread.sleep(random.nextInt(1000));
107      } catch (InterruptedException e) {
108        e.printStackTrace();
109      }
110
111      return "from future1";
112    });
113
114    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
115
116      try {
117        Thread.sleep(random.nextInt(1000));
118      } catch (InterruptedException e) {
119        e.printStackTrace();
120      }
121
122      return "from future2";
123    });
124
125    CompletableFuture<Void> haha = future1
126        .acceptEitherAsync(future2, str -> System.out.println("The future is " + str));
127
128    try {
129      System.out.println(haha.get());
130    } catch (InterruptedException e) {
131      e.printStackTrace();
132    } catch (ExecutionException e) {
133      e.printStackTrace();
134    }
135
136  }

All

  1 /**
  2   * allOf方法是当所有的CompletableFuture都执行完后执行计算。
  3   * anyOf接受任意多的CompletableFuture
  4   *
  5   * anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。
  6   */
  7  public static void allOfAndAnyOf() throws ExecutionException, InterruptedException {
  8
  9    Random rand = new Random();
 10    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
 11      try {
 12        Thread.sleep(10000 + rand.nextInt(1000));
 13      } catch (InterruptedException e) {
 14        e.printStackTrace();
 15      }
 16      return 100;
 17    });
 18    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
 19      try {
 20        Thread.sleep(10000 + rand.nextInt(1000));
 21      } catch (InterruptedException e) {
 22        e.printStackTrace();
 23      }
 24      return "abc";
 25    });
 26    //CompletableFuture<Void> f =  CompletableFuture.allOf(future1,future2);
 27    CompletableFuture<Object> f =  CompletableFuture.anyOf(future1,future2);
 28    System.out.println(f.get());
 29
 30  }
 31
 32```java
 33
 34allOf 如果其中一个失败了如何快速结束所有?
 35
 36```cs
 37/**
 38   * allOf 如果其中一个失败了如何快速结束所有?
 39   *
 40   * 默认情况下,allOf 会等待所有的任务都完成,即使其中有一个失败了,也不会影响其他任务继续执行。但是大部分情况下,一个任务的失败,往往意味着整个任务的失败,继续执行完剩余的任务意义并不大。
 41   * 在 谷歌的 Guava 的 allAsList 如果其中某个任务失败整个任务就会取消执行:
 42   *
 43   * 一种做法就是对 allOf 数组中的每个 CompletableFuture 的 exceptionally 方法进行捕获处理:如果有异常,那么整个 allOf 就直接抛出那个异常:
 44   */
 45
 46  public static void allOfOneFail(){
 47    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
 48      System.out.println("-- future1 -->");
 49      try {
 50        Thread.sleep(1000);
 51      } catch (InterruptedException e) {
 52        // TODO Auto-generated catch block
 53        e.printStackTrace();
 54      }
 55      System.out.println("<-- future1 --");
 56      return "Hello";
 57    });
 58
 59    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
 60      System.out.println("-- future2 -->");
 61      try {
 62        Thread.sleep(2000);
 63      } catch (InterruptedException e) {
 64        // TODO Auto-generated catch block
 65        e.printStackTrace();
 66      }
 67      System.out.println("<-- future2 --");
 68      throw new RuntimeException("Oops!");
 69    });
 70
 71    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
 72      System.out.println("-- future3 -->");
 73      try {
 74        Thread.sleep(4000);
 75      } catch (InterruptedException e) {
 76        // TODO Auto-generated catch block
 77        e.printStackTrace();
 78      }
 79      System.out.println("<-- future3 --");
 80      return "world";
 81    });
 82
 83    // CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
 84    // combinedFuture.join();
 85
 86    CompletableFuture<Void> allWithFailFast = CompletableFuture.allOf(future1, future2, future3);
 87    Stream.of(future1, future2, future3).forEach(f -> f.exceptionally(e -> {
 88      allWithFailFast.completeExceptionally(e);
 89      return null;
 90    }));
 91
 92    allWithFailFast.join();
 93  }
 94
 95```java
 96
 97我自己的一个demo
 98
 99```cpp
100 /**
101   * 假设你有一个集合,需要请求N个接口,接口数据全部返回后进行后续操作。
102   */
103  public static void myDemo(){
104
105    ArrayList<String> strings = Lists.newArrayList("1", "2", "3", "4");
106
107    CompletableFuture[] cfs = strings.stream()
108        .map(s -> CompletableFuture.supplyAsync(() -> {
109          return s + " $";
110        }).thenAccept(s1 -> {
111          System.out.println(s1+ " #");
112        }).exceptionally(t -> {
113          return null;
114        })).toArray(CompletableFuture[]::new);
115
116    // 等待future全部执行完
117    CompletableFuture.allOf(cfs).join();
118
119  }

Image

关注公众号 获取更多精彩内容

位旅人路过 次翻阅 初次见面