1.  前言

最近项目团队遇到一个问题,一个用Java开发的定时任务执行效率达不到要求,凌晨2点开始运行,执行到早上9点客户上班了,程序还没处理完,影响了客户白天正常的业务。和负责开发的工程师聊了下,他反馈主要原因有三个:

  1. 需要处理的数据量较大,大概有6000条。
  2. 每条数据处理时间需要5秒。
  3. 处理是串行的,处理完一条才开始处理第二条。

和他一起分析了下,可以从2、3入手手去提升。由于程序执行时服务器的资源情况是比较空闲的,所以将串行处理改为并行执应该能解决这个问题。

从Java8 开始,Java已经提供了很多针对并行处理的高级能力,能让我们轻松地编写并行处理的代码,充分利用现代计算机的硬件资源。

下面我们将以一个订单列表的处理为例子,说明下如何利用Java8的并行处理能力提升程序的执行效率。

2. 并行(Parallelism) 与并发(Concurrency)处理

2.1 串行执行

假设我们由A、B、C、D 4长订单要处理,每张订单的处理时间为1秒,串行执行情况下,处理完成一张订单后再开始另一张订单的处理,所以一共需要4秒完成所有的处理。

Sequential

如果订单处理过程中有很多I/O 操作,那么处理中需要有很多I/O 等待时间,那么这些时间都浪费掉了。

另外如果执行的机器上有多个CPU/内核,那么其他的CPU资源也浪费掉了。

2.2 并行执行

并行执行是将处理分配到每个CPU上,每个CPU 同一时间处理一张订单。

从下图可见,如果每张订单的处理时间为1秒,执行的机器上有2个CPU,忽略并行任务分配的开销,那么一共需要约2秒完成所有的处理。

Parallelism

所以并行执行可以充分利用机器上的多CPU内核资源,提高执行效率。

2.3 并发执行

并发执行是利用多线程执行能力,并发处理多个任务。线程可以被操作系统分配到多个CPU内核上执行,线程被操作系统调度进行执行和等待,从而达到多个任务行发执行的效果。

从下图可见,如果每张订单的处理时间为1秒,使用4个线程来执行,忽略线程分配和调度的开销,那么一共需要约1秒完成所有的处理。

Concurrency

3. 并行处理样例代码

3.1 订单处理服务

订单实体

/**
 * 订单
 */
public class Order {
    private String id;
    public Order(String id){
        this.id = id;
    }
    public String getId(){
        return id;
    }
    public  void setId(String id){
        this.id = id;
    }
}

订单处理服务

模拟每个订单处理需要1秒时间

/**
 * 订单服务
 */
public class OrderService {
    /**
     * 订单处理
     * @param order 订单
     * @return 处理结果
     */
    public static ProcessResult Process(Order order)
    {
        ProcessResult result = new ProcessResult();
        result.setOrderId(order.getId());
        try {
            //模拟订单处理需要1秒时间
            Thread.sleep(1000L);
            result.setCode(ResultCode.SUCCESS);
            result.setMessage("Process Success");
        } catch (Exception ex) {
            //记录订单处理异常日志
            result.setCode(ResultCode.FAILURE);
            result.setMessage("Process Failure");
        }
        return result;
    }
}

/**
 * 处理结果
 */
public class ProcessResult {
    private int code;
    private String message;
    private String orderId;

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
}

3.2 准备测试数据

我们准备了20条订单数据来进行测试

    /**
     * 准备用于测试的订单列表
     * */
    private static List<Order> prepareOrders(){
        List<Order>  orders = new ArrayList<Order>();
        for (int i = 0; i < 20; i++) {
            Order order = new Order(UUID.randomUUID().toString());
            orders.add(order);
        }
        return  orders;
    }

3.3 串行处理

这个是一个对订单列表进行串行处理的样例代码。

    /**
     * 串行执行
     */
    private  static void runSerial(){
        //开始时间
        long start = System.currentTimeMillis();
        //准备用于测试的订单
        List<Order> orders = prepareOrders();
        //处理
        List<ProcessResult> results = orders.stream()
                .map(OrderService::Process)
                .collect(Collectors.toList());
        //结束时间
        long end = System.currentTimeMillis();
        //输出处理时间
        System.out.printf("串行处理订单,处理时间为 %s ms%n", end - start);
        // System.out.println("处理状态为: " + results);
    }

3.4 Parallel Stream 并行处理

Parallel Stream 是Java8 的一个并行处理功能,可以利用多核CPU并行对Stream中的数据进行处理。并行度由执行程序机器的CPU内核数量决定。例如在一台有8个内核的机器上,Parallel Stream 的并行度是8。

    /**
     *  Parallel Stream 并行处理
     */
    private  static void runParallel(){
        //开始时间
        long start = System.currentTimeMillis();
        //准备用于测试的订单
        List<Order> orders = prepareOrders();
        //处理
        List<ProcessResult> results = orders.stream()
                .parallel()
                .map(OrderService::Process)
                .collect(Collectors.toList());
        //结束时间
        long end = System.currentTimeMillis();
        //输出处理时间
        System.out.printf("Parallel Stream 并行处理订单,处理时间为 %s ms%n", end - start);
        //System.out.println("处理状态为: " + results);
    }

3.5 CompletableFuture 多线程并发处理

CompletableFuture 是Java8中对多线程处理的一个高级封装,可以使用线程池来并行执行处理任务。

    /**
     * CompletableFuture 多线程并发处理
     * @param executor 线程池
     */
    private  static void runCompletableFuture(Executor executor){
        //开始时间
        long start = System.currentTimeMillis();
        //准备用于测试的订单
        List<Order> orders = prepareOrders();
        //处理
        List<CompletableFuture<ProcessResult>> futureResults = orders.stream()
                .map(o -> CompletableFuture.supplyAsync(
                        () -> OrderService.Process(o),executor)
                ).collect(Collectors.toList());
        //收集结果
        List<ProcessResult> results = futureResults.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        //结束时间
        long end = System.currentTimeMillis();
        //输出处理时间
        System.out.printf("CompletableFuture 多线程并发处理订单,处理时间为 %s ms%n", end - start);
        //System.out.println("处理状态为: " + results);
    }

3.6 测试程序

我们将上面的代码集成起来,编写一个测试程序,分别按三种方式对订单进行处理,并输出各自的处理时间。

    public static void main(String[] args) {
        //准备一个线程池,线程数量为20
        Executor executor = Executors.newFixedThreadPool(20);
        //串行执行
        runSerial();
        //Parallel Stream 并行处理
        runParallel();
        //CompletableFuture 多线程并发处理
        runCompletableFuture(executor);
        System.exit(0);
    }

3.7 运行结果

在一台8个CPU内核的机器上运行该程序,输出结果如下:

串行处理订单,处理时间为 20150 ms
Parallel Stream 并行处理订单,处理时间为 3016 ms
CompletableFuture 多线程并发处理订单,处理时间为 1013 ms

Process finished with exit code 0

对20条订单,每条订单处理时间为1秒,运行结果如下:

串行处理需要20秒。

这是没有使用任何并行处理的情况,每条订单的处理结束后,再处理下一条,所以一共耗时20秒。

Parallel Stream 并行处理需要3秒。

因为机器有8个内核,所以对于Parallel Stream并行处理时并行度为8,20个订单需要分3次才能处理完成,所以一共耗时3秒。

CompletableFuture 多线程并发处理需要1秒

我们使用了一个有20个线程的线程池来执行,所以20个订单的处理一共耗时1秒。

4. 总结

从上面的样例代码可见:

  • 使用Java8的并行处理能力,可以提升我们程序的执行效率,充分利用计算机的硬件资源。
  • Parallel Stream 利用CPU的多内核来进行并行处理,并行度由CPU的内核数量决定。
  • CompletableFuture 使用线程池来进行并发处理,并发数由线程池的线程数量决定。