Sunday, September 25, 2016

Java 8 Concurrency: LongAdder and LongAccumulator

Java 8 introduces LongAdder, LongAccumulator, DoubleAdder and DoubleAccumulator, which are recommended instead of the Atomic classes when multiple threads update frequently but read less frequently (for example, in the context of gathering statistics). Under high thread contention, these new classes are designed to grow dynamically and, therefore, there is higher throughput at the expense of higher space consumption.

The "Adder" classes support operations for additions, whereas the "Accumulator" clases are given a function to combine values.

To understand how these classes work, you need to take a look at the implementation of the base class, Striped64. It has a "base" field, plus a table of "cells", each of which is a padded variant of AtomicLong to reduce cache contention. It first tries to update the base field using the usual AtomicLong CAS operation. If there is no contention, CAS will succeed. However, if there is contention (i.e. the CAS update of "base" failed), it will use the "probe" value (hash value) of the thread to map to a cell in the table. If a cell does not exist, it will be created. New cells are created upon contention until the number of cells reaches the nearest power of two greater than or equal to the number of CPUs. If the cell exists, CAS will be tried to update the value of the cell. If CAS fails, the probe value will be updated using a secondary hash until an uncontended cell is found. Finally, the sum method simply adds the base field and the elements in the cells array.

The code below shows how you can use LongAdder to calculate the sum of several values:

LongAdder adder = new LongAdder();

// do some addition in different threads
adder.add(10);
adder.increment();
// ...

// get the current sum
long sum = adder.sum();

Or you can use LongAccumulator as follows:

LongAccumulator acc = new LongAccumulator(Long::sum, 0);

acc.accumulate(10);
adder.increment();
// ...

// get the current sum
long sum = acc.get();

You can also use LongAdder in conjuction with a ConcurrentHashMap to maintain a frequency map:

ConcurrentMap<String, LongAdder> freqs = new ConcurrentHashMap<>();
freqs.computeIfAbsent("foo", k -> new LongAdder()).increment();

Sunday, September 18, 2016

Power Set Algorithm

The power set of a set S is the set of all subsets of S including the empty set and S itself. For example, the power set of {1, 2, 3} is {{}, {1}, {2}, {3}, {1, 2}, {1, 3}, {2, 3}, {1, 2, 3}}.

The problem can be solved using recursion. We know that if the initial set is empty, then its power set is {{}} and if it has only one element {a} then its power set is {{}, {a}}.

Here is the algorithm:

public static <E extends Comparable<E>> List<List<E>> subsets(final List<E> list) {
  final List<List<E>> result = new ArrayList<>();

  // if the list is empty there is only one subset, which is the empty set
  if (list.isEmpty()) {
    result.add(Collections.emptyList());
    return result;
  }

  // otherwise, get the first element
  final E first = list.get(0);

  // find the subsets of the rest of the list
  final List<E> rest = list.subList(1, list.size());
  final List<List<E>> subsets = subsets(rest);
  result.addAll(subsets);

  // create new subsets by inserting the first element into each subset
  for (final List<E> subset : subsets) {
    final List<E> newSubset = new ArrayList<>(subset);
    newSubset.add(0, first);
    result.add(newSubset);
  }

  return result;
}

Sunday, September 11, 2016

Factorial Using Iteration, Recursion and Java 8 Streams

In this post, we will solve the classic school problem of calculating the factorial of a positive integer, using different Java algorithms.

Note: In all the algorithms below, we assume that the input is > 1.

1. Iterative Factorial

In this algorithm, we use a standard for-loop and increment the variables i and result at each iteration:

static int iterativeFactorial(final int n) {
  int result = 1;
  for (int i = 1; i <= n; i++) {
    result *= i;
  }
  return result;
}
2. Recursive Factorial

Here is the recursive version (the function calls itself):

static int recursiveFactorial(final int n) {
  return n == 1 ? 1 : n * recursiveFactorial(n - 1);
}

Recursion lets you get rid of variables that are updated at each iteration. Typically, making a recursive function call is more expensive than iteration because each time the function is called, a new stack frame has to be created on the call stack to hold the state of each function call. Therefore, the memory consumed by the recursive function is proportional to the size of its input. You may even get a StackOverflowError with a large input.

3. Tail-recursive Factorial

Here's a tail-recursive definition of factorial. The recursive call is the last thing that happens in the function. In contrast, in our previous definition of factorial, the last thing was n multiplied by the result of the recursive call.

static int tailRecursiveFactorial(final int n) {
  return tailRecursiveFactorialHelper(n, 1);
}
private static int tailRecursiveFactorialHelper(final int n, final int acc) {
  return n == 1 ? acc : tailRecursiveFactorialHelper(n - 1, n * acc);
}

Tail-recursive functions can be optimised by the compiler in some programming languages. Instead of storing each intermediate result of the recursion onto different stack frames, a single stack frame can be reused instead, because there's no need to keep track of intermediate results. Unfortunately, Java does not support this kind of optimisation, but you might want to take this approach anyway, in case the compiler is optimised in the future. Other JVM languages (e.g. Scala) can optimise tail-recursion.

4. Stream Factorial

Finally, Java 8 streams provide a simple, declarative way of defining factorial:

static int streamFactorial(final int n) {
  return IntStream.rangeClosed(1, n).reduce(1, (a, b) -> a * b);
}

Sunday, July 31, 2016

Java 8: Implementing a custom TemporalAdjuster

TemporalAdjusters, introduced in Java 8's new Date and Time API, allow you to perform complex date manipulations. For example, you can adjust a date to the next Friday, or to the last day of the month. There are already several pre-defined TemporalAdjusters, which can be accessed using the static factory methods in the TemporalAdjusters class, as shown below:

import java.time.DayOfWeek;
import java.time.LocalDate;
import static java.time.temporal.TemporalAdjusters.*;

LocalDate date = LocalDate.of(2016, 7, 30);
LocalDate nextFriday = date.with(nextOrSame(DayOfWeek.FRIDAY)); // 2016-08-05
LocalDate monthEnd = date.with(lastDayOfMonth()); // 2016-07-31

If you can't find a suitable TemporalAdjuster, it's quite easy to create your own, by implementing the TemporalAdjuster interface.

Here is an example of a custom TemporalAdjuster, which moves the date forward to the next working day:

public class NextWorkingDay implements TemporalAdjuster {
  @Override
  public Temporal adjustInto(Temporal temporal) {
    DayOfWeek dayOfWeek = DayOfWeek.of(temporal.get(ChronoField.DAY_OF_WEEK));
    int daysToAdd = dayOfWeek == DayOfWeek.FRIDAY ? 3 :
                   (dayOfWeek == DayOfWeek.SATURDAY ? 2 : 1);
    return temporal.plus(daysToAdd, ChronoUnit.DAYS);
  }
}

Since TemporalAdjuster is a functional interface, you could use a lambda expression, but it is very likely that you will want to use this adjuster in other parts of your code, so it is better to encapsulate the logic in a proper class, which can then be re-used.

To define a TemporalAdjuster with a lambda expression, it is more convenient to use the ofDateAdjuster static factory method because it allows you work with a LocalDate object instead of a low level Temporal.

static TemporalAdjuster NEXT_WORKING_DAY = TemporalAdjusters.ofDateAdjuster(
  date -> {
   DayOfWeek dayOfWeek = date.getDayOfWeek();
   int daysToAdd = dayOfWeek == DayOfWeek.FRIDAY ? 3 :
                  (dayOfWeek == DayOfWeek.SATURDAY ? 2 : 1);
   return date.plusDays(daysToAdd);
});

Sunday, June 26, 2016

Java 8: CompletableFuture vs Parallel Stream

This post shows how Java 8's CompletableFuture compares with parallel streams when peforming asynchronous computations.

We will use the following class to model a long-running task:

class MyTask {
  private final int duration;
  public MyTask(int duration) {
    this.duration = duration;
  }
  public int calculate() {
    System.out.println(Thread.currentThread().getName());
    try {
      Thread.sleep(duration * 1000);
    } catch (final InterruptedException e) {
      throw new RuntimeException(e);
    }
    return duration;
  }
}

Let's create ten tasks, each with a duration of 1 second:

List<MyTask> tasks = IntStream.range(0, 10)
                                    .mapToObj(i -> new MyTask(1))
                                    .collect(toList());

How can we calculate the list of tasks efficiently?

Approach 1: Sequentially

Your first thought might be to calculate the tasks sequentially, as follows:

public static void runSequentially(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.stream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

As you might expect, this takes 10 seconds to run, because each task is run one after the other on the main thread.

Approach 2: Using a parallel stream

A quick improvement is to convert your code to use a parallel stream, as shown below:

public static void useParallelStream(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.parallelStream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

The output is

main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main
Processed 10 tasks in 3043 millis

This time it took 3 seconds because 4 tasks were run in parallel (using three threads from the ForkJoinPool, plus the main thread).

Approach 3: Using CompletableFutures

Let's see if CompletableFutures perform any better:

public static void useCompletableFuture(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
           .collect(Collectors.toList());

  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

In the code above, we first obtain a list of CompletableFutures and then invoke the join method on each future to wait for them to complete one by one. Note that join is the same as get, with the only difference being that the former doesn't throw any checked exception, so it's more convenient in a lambda expression.

Also, you must use two separate stream pipelines, as opposed to putting the two map operations after each other, because intermediate stream operations are lazy and you would have ended up processing your tasks sequentially! That's why you first need to collect your CompletableFutures in a list to allow them to start before waiting for their completion.

The output is

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 4010 millis

It took 4 seconds to process 10 tasks. You will notice that only 3 ForkJoinPool threads were used and that, unlike the parallel stream, the main thread was not used.

Approach 4: Using CompletableFutures with a custom Executor

One of the advantages of CompletableFutures over parallel streams is that they allow you to specify a different Executor to submit their tasks to. This means that you can choose a more suitable number of threads based on your application. Since my example is not very CPU-intensive, I can choose to increase the number of threads to be greater than Runtime.getRuntime().getAvailableProcessors(), as shown below:

public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
  long start = System.nanoTime();
  ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
           .collect(Collectors.toList());

  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
  executor.shutdown();
}

The output is

pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1009 millis

After this improvement, it now takes only 1 second to process 10 tasks.

As you can see, CompletableFutures provide more control over the size of the thread pool and should be used if your tasks involve I/O. However, if you're doing CPU-intensive operations, there's no point in having more threads than processors, so go for a parallel stream, as it is easier to use.