Saturday, November 09, 2013

Throttling Task Submission with a BlockingExecutor

The JDK's java.util.concurrent.ThreadPoolExecutor allows you to submit tasks to a thread pool and uses a BlockingQueue to hold submitted tasks. If you have thousands of tasks to submit, you specify a "bounded" queue (i.e. one with a maximum capacity) otherwise your JVM may run out of memory. You can set a RejectedExecutionHandler to handle what happens when the queue is full, but there are still outstanding tasks to submit.

Here is a simple example showing how you would use a ThreadPoolExecutor with a BlockingQueue with capacity 1000. The CallerRunsPolicy ensures that, when the queue is full, additional tasks will be processed by the submitting thread.

int numThreads = 5;
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                                 new ArrayBlockingQueue<Runnable>(1000),
                                 new ThreadPoolExecutor.CallerRunsPolicy());

The problem with this approach is that, when the queue is full, the thread submitting the tasks to the pool becomes busy executing a task itself and during this time, the queue could become empty and the threads in the pool could become idle. This is not very efficient. We want to keep the thread pool busy and the work queue saturated at all the times.

There are various solutions to this problem. One of them is to use a custom Executor which blocks (and thus prevents further tasks from being submitted to the pool) when the queue is full. The code for BlockingExecutor is shown below. It is based on the BoundedExecutor example from Brian Goetz, 2006. Java Concurrency in Practice. 1 Edition. Addison-Wesley Professional. (Section 8.3.3).

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An executor which blocks and prevents further tasks from
 * being submitted to the pool when the queue is full.
 * <p>
 * Based on the BoundedExecutor example in:
 * Brian Goetz, 2006. Java Concurrency in Practice. (Listing 8.4)
 */
public class BlockingExecutor extends ThreadPoolExecutor {

  private static final Logger LOGGER = LoggerFactory.
                                          getLogger(BlockingExecutor.class);
  private final Semaphore semaphore;

  /**
   * Creates a BlockingExecutor which will block and prevent further
   * submission to the pool when the specified queue size has been reached.
   *
   * @param poolSize the number of the threads in the pool
   * @param queueSize the size of the queue
   */
  public BlockingExecutor(final int poolSize, final int queueSize) {
    super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());

    // the semaphore is bounding both the number of tasks currently executing
    // and those queued up
    semaphore = new Semaphore(poolSize + queueSize);
  }

  /**
   * Executes the given task.
   * This method will block when the semaphore has no permits
   * i.e. when the queue has reached its capacity.
   */
  @Override
  public void execute(final Runnable task) {
    boolean acquired = false;
    do {
        try {
            semaphore.acquire();
            acquired = true;
        } catch (final InterruptedException e) {
            LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
        }
    } while (!acquired);

    try {
        super.execute(task);
    } catch (final RejectedExecutionException e) {
        semaphore.release();
        throw e;
    }
  }

  /**
   * Method invoked upon completion of execution of the given Runnable,
   * by the thread that executed the task.
   * Releases a semaphore permit.
   */
  @Override
  protected void afterExecute(final Runnable r, final Throwable t) {
    super.afterExecute(r, t);
    semaphore.release();
  }
}