1. 前言
最近项目团队遇到一个问题,一个用Java开发的定时任务执行效率达不到要求,凌晨2点开始运行,执行到早上9点客户上班了,程序还没处理完,影响了客户白天正常的业务。和负责开发的工程师聊了下,他反馈主要原因有三个:
- 需要处理的数据量较大,大概有6000条。
- 每条数据处理时间需要5秒。
- 处理是串行的,处理完一条才开始处理第二条。
和他一起分析了下,可以从2、3入手手去提升。由于程序执行时服务器的资源情况是比较空闲的,所以将串行处理改为并行执应该能解决这个问题。
从Java8 开始,Java已经提供了很多针对并行处理的高级能力,能让我们轻松地编写并行处理的代码,充分利用现代计算机的硬件资源。
下面我们将以一个订单列表的处理为例子,说明下如何利用Java8的并行处理能力提升程序的执行效率。
2. 并行(Parallelism) 与并发(Concurrency)处理
2.1 串行执行
假设我们由A、B、C、D 4长订单要处理,每张订单的处理时间为1秒,串行执行情况下,处理完成一张订单后再开始另一张订单的处理,所以一共需要4秒完成所有的处理。
如果订单处理过程中有很多I/O 操作,那么处理中需要有很多I/O 等待时间,那么这些时间都浪费掉了。
另外如果执行的机器上有多个CPU/内核,那么其他的CPU资源也浪费掉了。
2.2 并行执行
并行执行是将处理分配到每个CPU上,每个CPU 同一时间处理一张订单。
从下图可见,如果每张订单的处理时间为1秒,执行的机器上有2个CPU,忽略并行任务分配的开销,那么一共需要约2秒完成所有的处理。
所以并行执行可以充分利用机器上的多CPU内核资源,提高执行效率。
2.3 并发执行
并发执行是利用多线程执行能力,并发处理多个任务。线程可以被操作系统分配到多个CPU内核上执行,线程被操作系统调度进行执行和等待,从而达到多个任务行发执行的效果。
从下图可见,如果每张订单的处理时间为1秒,使用4个线程来执行,忽略线程分配和调度的开销,那么一共需要约1秒完成所有的处理。
3. 并行处理样例代码
3.1 订单处理服务
订单实体
/**
* 订单
*/
public class Order {
private String id;
public Order(String id){
this.id = id;
}
public String getId(){
return id;
}
public void setId(String id){
this.id = id;
}
}
订单处理服务
模拟每个订单处理需要1秒时间
/**
* 订单服务
*/
public class OrderService {
/**
* 订单处理
* @param order 订单
* @return 处理结果
*/
public static ProcessResult Process(Order order)
{
ProcessResult result = new ProcessResult();
result.setOrderId(order.getId());
try {
//模拟订单处理需要1秒时间
Thread.sleep(1000L);
result.setCode(ResultCode.SUCCESS);
result.setMessage("Process Success");
} catch (Exception ex) {
//记录订单处理异常日志
result.setCode(ResultCode.FAILURE);
result.setMessage("Process Failure");
}
return result;
}
}
/**
* 处理结果
*/
public class ProcessResult {
private int code;
private String message;
private String orderId;
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
}
3.2 准备测试数据
我们准备了20条订单数据来进行测试
/**
* 准备用于测试的订单列表
* */
private static List<Order> prepareOrders(){
List<Order> orders = new ArrayList<Order>();
for (int i = 0; i < 20; i++) {
Order order = new Order(UUID.randomUUID().toString());
orders.add(order);
}
return orders;
}
3.3 串行处理
这个是一个对订单列表进行串行处理的样例代码。
/**
* 串行执行
*/
private static void runSerial(){
//开始时间
long start = System.currentTimeMillis();
//准备用于测试的订单
List<Order> orders = prepareOrders();
//处理
List<ProcessResult> results = orders.stream()
.map(OrderService::Process)
.collect(Collectors.toList());
//结束时间
long end = System.currentTimeMillis();
//输出处理时间
System.out.printf("串行处理订单,处理时间为 %s ms%n", end - start);
// System.out.println("处理状态为: " + results);
}
3.4 Parallel Stream 并行处理
Parallel Stream 是Java8 的一个并行处理功能,可以利用多核CPU并行对Stream中的数据进行处理。并行度由执行程序机器的CPU内核数量决定。例如在一台有8个内核的机器上,Parallel Stream 的并行度是8。
/**
* Parallel Stream 并行处理
*/
private static void runParallel(){
//开始时间
long start = System.currentTimeMillis();
//准备用于测试的订单
List<Order> orders = prepareOrders();
//处理
List<ProcessResult> results = orders.stream()
.parallel()
.map(OrderService::Process)
.collect(Collectors.toList());
//结束时间
long end = System.currentTimeMillis();
//输出处理时间
System.out.printf("Parallel Stream 并行处理订单,处理时间为 %s ms%n", end - start);
//System.out.println("处理状态为: " + results);
}
3.5 CompletableFuture 多线程并发处理
CompletableFuture 是Java8中对多线程处理的一个高级封装,可以使用线程池来并行执行处理任务。
/**
* CompletableFuture 多线程并发处理
* @param executor 线程池
*/
private static void runCompletableFuture(Executor executor){
//开始时间
long start = System.currentTimeMillis();
//准备用于测试的订单
List<Order> orders = prepareOrders();
//处理
List<CompletableFuture<ProcessResult>> futureResults = orders.stream()
.map(o -> CompletableFuture.supplyAsync(
() -> OrderService.Process(o),executor)
).collect(Collectors.toList());
//收集结果
List<ProcessResult> results = futureResults.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
//结束时间
long end = System.currentTimeMillis();
//输出处理时间
System.out.printf("CompletableFuture 多线程并发处理订单,处理时间为 %s ms%n", end - start);
//System.out.println("处理状态为: " + results);
}
3.6 测试程序
我们将上面的代码集成起来,编写一个测试程序,分别按三种方式对订单进行处理,并输出各自的处理时间。
public static void main(String[] args) {
//准备一个线程池,线程数量为20
Executor executor = Executors.newFixedThreadPool(20);
//串行执行
runSerial();
//Parallel Stream 并行处理
runParallel();
//CompletableFuture 多线程并发处理
runCompletableFuture(executor);
System.exit(0);
}
3.7 运行结果
在一台8个CPU内核的机器上运行该程序,输出结果如下:
串行处理订单,处理时间为 20150 ms
Parallel Stream 并行处理订单,处理时间为 3016 ms
CompletableFuture 多线程并发处理订单,处理时间为 1013 ms
Process finished with exit code 0
对20条订单,每条订单处理时间为1秒,运行结果如下:
串行处理需要20秒。
这是没有使用任何并行处理的情况,每条订单的处理结束后,再处理下一条,所以一共耗时20秒。
Parallel Stream 并行处理需要3秒。
因为机器有8个内核,所以对于Parallel Stream并行处理时并行度为8,20个订单需要分3次才能处理完成,所以一共耗时3秒。
CompletableFuture 多线程并发处理需要1秒
我们使用了一个有20个线程的线程池来执行,所以20个订单的处理一共耗时1秒。
4. 总结
从上面的样例代码可见:
- 使用Java8的并行处理能力,可以提升我们程序的执行效率,充分利用计算机的硬件资源。
- Parallel Stream 利用CPU的多内核来进行并行处理,并行度由CPU的内核数量决定。
- CompletableFuture 使用线程池来进行并发处理,并发数由线程池的线程数量决定。