免费凡客建站官网,清远短视频推广,网络推广软件全邀zjkwlgs,制定推广方案Java8实战-总结50 CompletableFuture#xff1a;组合式异步编程对多个异步任务进行流水线操作对 Future 和 CompletableFuture 的回顾 响应 CompletableFuture 的 completion 事件对最佳价格查询器应用的优化 CompletableFuture#xff1a;组合式异步编程
对多个异步任务进行… Java8实战-总结50 CompletableFuture组合式异步编程对多个异步任务进行流水线操作对 Future 和 CompletableFuture 的回顾 响应 CompletableFuture 的 completion 事件对最佳价格查询器应用的优化 CompletableFuture组合式异步编程
对多个异步任务进行流水线操作
对 Future 和 CompletableFuture 的回顾
CompletableFuture利用Lambda表达式以声明式的API提供了一种机制能够用最有效的方式非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升可以尝试仅使用Java 7中提供的特性重新实现前面代码的功能。下面的代码展示了如何实现这一效果利用Java 7的方法合并两个Future对象
ExecutorService executor Executors.newCachedThreadPool(); //创建一个ExecutorService将任务提交到线程池//创建一个查询欧元到美元转换汇率的Future
final FutureDouble futureRate executor.submit(new CallableDouble() {public Double call() { return exchangeService.getRate(Money.EUR, Money.USD); }}); FutureDouble futurePriceInUSD executor.submit(new CallableDouble() {public Double call() {double priceInEUR shop.getPrice(product); //在第二个 Future中查询指定商店中特定商品的价格return priceInEUR * futureRate.get(); //在查找价格操作的同一个Future中将价格和汇率做乘法计算出汇后价格}}); 在上面的代码中通过向执行器提交一个Callable对象的方式创建了第一个Future对象向外部服务查询欧元和美元之间的转换汇率。紧接着你创建了第二个Future对象查询指定商店中特定商品的欧元价格。最终用与之前代码一样的方式在同一个Future中通过查询商店得到的欧元商品价格乘以汇率得到了最终的价格。注意之前的代码中中如果使用thenCombineAsync不使用thenCombine像上面的代码一样采用第三个Future单独进行商品价格和汇率的乘法运算效果是几乎相同的。这两种实现看起来没太大区别原因是你只对两个Future进行了合并。通过这两段代码我们能看到创建流水线对同步和异步操作进行混合操作有多么简单随着处理任务和需要合并结果数目的增加这种声明式程序设计的优势也愈发明显。
你的“最佳价格查询器”应用基本已经完成不过还缺失了一些元素。你会希望尽快将不同商店中的商品价格呈现给你的用户这是车辆保险或者机票比价网站的典型需求而不是像你之前那样等所有的数据都完备之后再呈现。接下来的一节你会了解如何通过响应CompletableFuture的completion事件实现这一功能与此相反“调用get或者join方法只会造成阻塞直到CompletableFuture完成才能继续往下运行。
响应 CompletableFuture 的 completion 事件
下面这部分的所有示例代码都是通过在响应之前添加1秒钟的等待延迟模拟方法的远程调用。毫无疑问现实世界中你的应用访问各个远程服务时很可能遭遇无法预知的延迟触发的原因多种多样从服务器的负荷到网络的延迟有些甚至是源于远程服务如何评估你应用的商业价值即可能相对于其他的应用你的应用每次查询的消耗时间更长。
由于这些原因你希望购买的商品在某些商店的查询速度要比另一些商店更快。以下面的代码清单为例使用randomDelay方法取代原来的固定延迟。下面的代码是一个模拟生成0.5秒至2.5秒随机延迟的方法
private static final Random random new Random();
public static void randomDelay() {int delay 500 random.nextInt(2000); try { Thread.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); }
} 目前为止你实现的findPrices方法只有在取得所有商店的返回值时才显示商品的价格。而你希望的效果是只要有商店返回商品价格就在第一时间显示返回值不再等待那些还未返回的商店有些甚至会发生超时。你如何实现这种更进一步的改进要求呢
对最佳价格查询器应用的优化
要避免的首要问题是等待创建一个包含了所有价格的List创建完成。你应该做的是直接处理CompletableFuture流这样每个CompletableFuture都在为某个商店执行必要的操作。为了实现这一目标在下面的代码清单中会对前面代码实现的第一部分进行重构实现findPricesStream方法来生成一个由CompletableFuture构成的流。重构findPrices方法返回一个由Future构成的流
public StreamCompletableFutureString findPricesStream(String product) {return shops.stream().map(shop - CompletableFuture.supplyAsync( () - shop.getPrice(product), executor)).map(future - future.thenApply(Quote::parse)).map(future - future.thenCompose(quote - CompletableFuture.supplyAsync(() - Discount.applyDiscount(quote), executor)));
} 现在你为findPricesStream方法返回的Stream添加了第四个map操作在此之前你已经在该方法内部调用了三次 map 。这个新添加的操作其实很简单只是在每个CompletableFuture上注册一个操作该操作会在CompletableFuture完成执行后使用它的返回值。Java 8的CompletableFuture通过thenAccept方法提供了这一功能它接收CompletableFuture执行完毕后的返回值做参数。在这里的例子中该值是由Discount服务返回的字符串值它包含了提供请求商品的商店名称及折扣价格你想要做的操作也很简单只是将结果打印输出
findPricesStream(myPhone).map(f - f.thenAccept(System.out::println)); 注意和之前的thenCompose和thenCombine方法一样thenAccept方法也提供了一个异步版本名为thenAcceptAsync。异步版本的方法会对处理结果的消费者进行调度从线程池中选择一个新的线程继续执行不再由同一个线程完成CompletableFuture的所有任务。因为你想要避免不必要的上下文切换更重要的是你希望避免在等待线程上浪费时间尽快响应CompletableFuture的completion事件所以这里没有采用异步版本。由 于thenAccept方法已经定义了如何处理CompletableFuture返回的结果一旦CompletableFuture计算得到结果它就返回一个CompletableFutureVoid。所以map操作返回的是一个StreamCompletableFutureVoid。对这个CompletableFutureVoid对象你能做的事非常有限只能等待其运行结束不过这也是你所期望的。你还希望能给最慢的商店一些机会让它有机会打印输出返回的价格。为了实现这一目的你可以把构成Stream的所有CompletableFutureVoid对象放到一个数组中等待所有的任务执行完成代码如下所示响应CompletableFuture的completion事件:
CompletableFuture[] futures findPricesStream(myPhone).map(f - f.thenAccept(System.out::println)).toArray(size - new CompletableFuture[size]);
CompletableFuture.allOf(futures).join(); allOf工厂方法接收一个由CompletableFuture构成的数组数组中的所有CompletableFuture对象执行完成之后它返回一个CompletableFutureVoid对象。这意味着如果你需要等待最初Stream中的所有 CompletableFuture对象执行完毕对 allOf方法返回的CompletableFuture执行join操作是个不错的主意。这个方法对“最佳价格查询器”应用也是有用的因为你的用户可能会困惑是否后面还有一些价格没有返回使用这个方法你可以在执行完毕之后打印输出一条消息“All shops returned results or timed out”。
然而在另一些场景中你可能希望只要CompletableFuture对象数组中有任何一个执行完毕就不再等待比如你正在查询两个汇率服务器任何一个返回了结果都能满足你的需求。在这种情况下你可以使用一个类似的工厂方法anyOf。该方法接收一个CompletableFuture对象构成的数组返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFutureObject。