Thursday, August 16, 2012

How to use Fork/Join Framework's RecursiveAction feature in JDK7?

JDK 7 has new additions for supporting parallelism using ForkJoinPool executor that is dedicated to running instances implementing ForkJoinTask.

ForkJoinTask objects feature two specific methods:
  1. The fork() method allows a ForkJoinTask to be planned for asynchronous execution. This allows a new ForkJoinTask to be launched from an existing one. fork() only schedules a new task within a ForkJoinPool, but no child Java Virtual Machine is ever created.
  2. In turn, the join() method allows a ForkJoinTask to wait for the completion of another one.
There are two types of ForkJoinTask specializations:
  1. Instances of RecursiveAction represent executions that do not yield a return value.
  2. In contrast, instances of RecursiveTask yield return values. You can checkout RecursiveTask code sample at link: Performance comparison of Executor framework vs ForkJoin framework’s RecursiveTask feature in java or JDK7
Following example illustrates how to use RecursiveAction type of ForkJoinTask implementation:

Code Sample:

import static java.util.Arrays.asList;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ForkJoinRandomFillAction {
    Random random = new Random();
   
    public void loadArray(int[] array) {
        for (int i=0; i<array.length; i++) {
            array[i] = random.nextInt(10000); //Generates numbers from 0 to 10000
        }
    }
   
    public static void main(String[] args) {
        ForkJoinRandomFillAction sort = new ForkJoinRandomFillAction();
      
        int arrayLength = 1_00_00_0000;
        int array[] = new int[arrayLength];

        //No. of times sequential & Parallel operation should be performed to warm up HotSpot JVM
        final int iterations = 10;
      
        for (int i = 0; i < iterations; i++) {
            long start = System.currentTimeMillis();
            sort.loadArray(array);

            System.out.println("Sequential processing time: " + (System.currentTimeMillis() - start) + " ms");
          
        }
      
        System.out.println("Number of processor available: " + Runtime.getRuntime().availableProcessors());
      
        ForkJoinPool fjpool = new ForkJoinPool(64); //Default parallelism level = Runtime.getRuntime().availableProcessors()
      
        for (int i = 0; i < iterations; i++) {
            // Create a task with the complete array
            RecursiveAction task = new RandomFillAction(array, 0, array.length);
            long start = System.currentTimeMillis();
            fjpool.invoke(task);

            System.out.println("Parallel processing time: "    + (System.currentTimeMillis() - start)+ " ms");
        }
      
        System.out.println("Number of steals: " + fjpool.getStealCount() + "\n");
    }
}

class RandomFillAction extends RecursiveAction {
    private static final long serialVersionUID = 1L;
    final int low;
    final int high;
    private int[] array;
    final int splitSize = 40000; //Some threshold size to spit the task
   
    public RandomFillAction(int[] array, int low, int high) {
        this.low = low;
        this.high = high;
        this.array = array;
    }

    @Override
    protected void compute() {
        if (high - low > splitSize) {
            // task is huge so divide in half
            int mid = (low + high) >>> 1;
            invokeAll(asList(new RandomFillAction(array, low, mid), new RandomFillAction(array, mid, high)));
        } else {
            //Some calculation logic
            Random random = new Random();
            for (int i = low; i < high; i++) {
                array[i] = random.nextInt(10000);
            }
        }
    }
}

Output:

Sequential processing time: 1180 ms
Sequential processing time: 1178 ms
Sequential processing time: 1167 ms
Sequential processing time: 1165 ms
Sequential processing time: 1162 ms
Sequential processing time: 1158 ms
Sequential processing time: 1154 ms
Sequential processing time: 1169 ms
Sequential processing time: 1168 ms
Sequential processing time: 1161 ms
Number of processor available: 4
Parallel processing time: 469 ms
Parallel processing time: 393 ms
Parallel processing time: 389 ms
Parallel processing time: 373 ms
Parallel processing time: 375 ms
Parallel processing time: 371 ms
Parallel processing time: 587 ms
Parallel processing time: 384 ms
Parallel processing time: 385 ms
Parallel processing time: 376 ms
Number of steals: 1498

Conclusion:
  1. ForkJoin framework gives almost 300% improvement in performance compare to sequential execution.
  2. I was just trying to demonstrate ForkJoin framework in simplest way possible. You can definitely use executor framework in java to split the task in different threads to gain better performance.
  3. This post focused on the new fork/join tasks provided by Java SE 7 for making it easier to write parallel programs. Though it is not that as easy as marking some methods or long running loops with annotations to make them run in parallel still it is a good addition to JDK 7.
References:
  1. Performance comparison of Executor framework vs ForkJoin framework’s RecursiveTask feature in java or JDK7
  2. Fork and Join: Java Can Excel at Painless Parallel Programming
  3. JDK 7 Adoption Guide

No comments:

Post a Comment