@
最近在学习juc并发编程,于是决定汇总一下并发编程中常用方法,常见问题以及常见考题,今天是第一章—CompletableFuture
CompletableFuture是jdk8版本开始出现的类。目的是为了应用于并发编程状态下遇到的各种场景。CompletableFuture实现了CompletionStage接口和Future接口,对Java7及以前Future接口做了大量的扩展,增加了许多常用方法,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
首先得学会看懂几个函数式接口的特性!!!(重点)
//runAsync方法不支持返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
//supplyAsync可以支持返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
CompletableFuture创建异步任务主要分为两个方法runAsync和supplyAsync
其中
下面是四种创建方式
public class CompletableFutureTest {
public static void main(String[] args) throws Exception{
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,5,5
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(10));
CompletableFuture future1=CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+"*********future1 coming in");
});
//这里获取到的值是null,无返回值
System.out.println(future1.get());
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
//ForkJoinPool.commonPool-worker-9
System.out.println(Thread.currentThread().getName() + "\t" + "*********future2 coming in");
}, executor);
CompletableFuture<Integer> future3 =CompletableFuture.supplyAsync(()-> {
//pool-1-thread-1
System.out.println(Thread.currentThread().getName() + "\t" + "future3带有返回值");
return "abc";
});
System.out.println(future3.get());
CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "future4带有返回值");
return "abc";
}, executor);
System.out.println(future4.get());
//关闭线程池
executor.shutdown();
}
}
获得结果和触发计算(get、getNow、join、complete)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace();}
return 1;
}).thenApply(s->{
System.out.println("-----1");
//如果加上int error=1/0; 由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
//int error=1/0;
return s+1;
}).thenApply(s->{
System.out.println("-----2");
return s+2;
}).whenComplete((v,e)->{
if(e==null){
System.out.println("result-----"+v);
}
}).exceptionally(e->{
e.printStackTrace();
return null;
});
System.out.println(Thread.currentThread().getName()+"\t"+"over....");
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}
CompletableFuture.supplyAsync(() -> {
return 1;
}).handle((f,e) -> {
System.out.println("-----1");
return f + 2;
}).handle((f,e) -> {
System.out.println("-----2");
int error=1/0;
return f + 3;
}).handle((f,e) -> {
System.out.println("-----3");
return f + 4;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("----result: " + v);
}
}).exceptionally(e -> {
e.printStackTrace();
return null;
}).join());
4. 线程串行化方法
带了Async的方法表示的是:会重新在线程池中启动一个线程来执行任务
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
案例说明:电商比价需求
同一款产品,同时搜索出同款产品在各大电商的售价;
同一款产品,同时搜索出本产品在某一个电商平台下,各个入驻门店的售价是多少
出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List
in jd price is 88.05
in pdd price is 86.11
in taobao price is 90.43
(String类型)
public class test1 {
public ExecutorService executorService = new ThreadPoolExecutor(40,100,100,TimeUnit.MINUTES,new ArrayBlockingQueue<>(10000));
public static List<mall> list = Arrays.asList(
new mall("jd"),
new mall("tmall"),
new mall("taobao"),
new mall("pdd"),
new mall("elm")
);
public static void main(String[] args) {
showRes3(list,"mysql");
}
public static List<String> showRes(List<mall> list,String productName){
long start = System.currentTimeMillis();
List<String> collect = list.stream().map(mall -> String.format(productName + " in %s price is %.2f", mall.getName(), mall.getPrice(productName))).collect(Collectors.toList());
collect.forEach(System.out::println);
long end = System.currentTimeMillis();
System.out.println("耗时:"+(end-start)+"秒");
return collect;
}
public static List<String> showRes2(List<mall> list,String productName){
long start = System.currentTimeMillis();
List<String> collect = list.parallelStream().map(mall -> CompletableFuture.supplyAsync(() ->
String.format(productName + " in %s price is %.2f", mall.getName(), mall.getPrice(productName))
).join()).collect(Collectors.toList());
long end = System.currentTimeMillis();
collect.forEach(System.out::println);
System.out.println("耗时:"+(end-start)+"秒");
return collect;
}
public static List<String> showRes3(List<mall> list,String productName){
long start = System.currentTimeMillis();
List<String> collect = list.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", mall.getName(), mall.getPrice(productName))
)).collect(Collectors.toList()).stream().map(s -> s.join()).collect(Collectors.toList());
long end = System.currentTimeMillis();
collect.forEach(System.out::println);
System.out.println("耗时:"+(end-start)+"毫秒");
return collect;
}
}
class mall{
private String name;
public String getName() {
return name;
}
public mall(String name) {
this.name = name;
}
public double getPrice(String productName){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return ThreadLocalRandom.current().nextDouble()*2+productName.charAt(0);
}
}
通过上面代码示例结果得知
通过异步线程方法耗时需要1000毫秒左右
而同步线程方法耗时需要5000毫秒
而且当商城店铺越来越多的时候,异步线程耗时不会增加,同步线程耗时会不断增加,异步的优势就显现出来了。
tips
在使用stream流的时候,如果使用的是集合的stream()方法,再使用异步线程时比如使用join()方法会出现串行,和上面方法三一样得使用两次stream流才能实现异步执行,相当于每一步都得使用stream流。
这个时候可以使用parallelStream并行流就可以解决这个问题
但是使用parallelStream会出现一些问题,所以并行流要谨慎使用!!!