Tasks and Thread Pools in Java

Constructing a new thread is somewhat expensive because it involves inter­action with the operating system. If your program creates a large number of short-lived threads, you should not map each task to a separate thread, but use a thread pool instead. A thread pool contains a number of threads that are ready to run. You give a Runnable to the pool, and one of the threads calls the run method. When the run method exits, the thread doesn’t die but stays around to serve the next request.

In the following sections, you will see the tools that the Java concurrency framework provides for coordinating concurrent tasks.

1. Callables and Futures

A Runnable encapsulates a task that runs asynchronously; you can think of it as an asynchronous method with no parameters and no return value. A Callable is similar to a Runnable, but it returns a value. The Callable interface is a parameterized type, with a single method call.

public interface Callable<V>

{

V call() throws Exception;

}

The type parameter is the type of the returned value. For example, a Callable<Integer> represents an asynchronous computation that eventually returns an Integer object.

A Future holds the result of an asynchronous computation. You start a compu­tation, give someone the Future object, and forget about it. The owner of the Future object can obtain the result when it is ready.

The Future<V> interface has the following methods:

V get()

V get(long timeout, TimeUnit unit)

void cancel(boolean mayInterrupt)

boolean isCancelled()

boolean isDone()

A call to the first get method blocks until the computation is finished. The second get method also blocks, but it throws a TimeoutException if the call timed out before the computation finished. If the thread running the computation is interrupted, both methods throw an InterruptedException. If the computation has already finished, get returns immediately.

The isDone method returns false if the computation is still in progress, true if it is finished.

You can cancel the computation with the cancel method. If the computation has not yet started, it is canceled and will never start. If the computation is currently in progress, it is interrupted if the mayInterrupt parameter is true.

One way to execute a Callable is to use a FutureTask, which implements both the Future and Runnable interfaces, so that you can construct a thread for running it:

Callable<Integer> task = …;

var futureTask = new FutureTask<Integer>(task);

var t = new Thread(futureTask); // it’s a Runnable

t.start();

Integer result = futureTask.get(); // it’s a Future

More commonly, you will pass a Callable to an executor. That is the topic of the next section.

2. Executors

The Executors class has a number of static factory methods for constructing thread pools; see Table 12.2 for a summary.

The newCachedThreadPool method constructs a thread pool that executes each task immediately, using an existing idle thread when available and creating a new thread otherwise. The newFixedThreadPool method constructs a thread pool with a fixed size. If more tasks are submitted than there are idle threads, the un­served tasks are placed on a queue. They are run when other tasks have completed. The newSingleThreadExecutor is a degenerate pool of size 1 where a single thread executes the submitted tasks, one after another. These three methods return an object of the ThreadPoolExecutor class that implements the ExecutorService interface.

Use a cached thread pool when you have threads that are short-lived or spend a lot of time blocking. However, if you have threads that are working hard without blocking, you don’t want to run a large number of them together.

For optimum speed, the number of concurrent threads is the number of pro­cessor cores. In such a situation, you should use a fixed thread pool that bounds the total number of concurrent threads.

The single-thread executor is useful for performance analysis. If you temporar­ily replace a cached or fixed thread pool with a single-thread pool, you can measure how much slower your application runs without the benefit of concurrency.

You can submit a Runnable or Callable to an ExecutorService with one of the following methods:

Future<T> submit(Callable<T> task)

Future<?> submit(Runnable task)

Future<T> submit(Runnable task, T result)

The pool will run the submitted task at its earliest convenience. When you call submit, you get back a Future object that you can use to get the result or cancel the task.

The second submit method returns an odd-looking Future<?>. You can use such an object to call isDone, cancel, or isCancetted, but the get method simply returns null upon completion.

The third version of submit yields a Future whose get method returns the given result object upon completion.

When you are done with a thread pool, call shutdown. This method initiates the shutdown sequence for the pool. An executor that is shut down accepts no new tasks. When all tasks are finished, the threads in the pool die. Alterna­tively, you can call shutdownNow. The pool then cancels all tasks that have not yet begun.

Here, in summary, is what you do to use a thread pool:

  1. Call the static newCachedThreadPool or newFixedThreadPool method of the Executors class.
  2. Call submit to submit Callable or Runnable objects.
  3. Hang on to the returned Future objects so that you can get the results or cancel the tasks.
  4. Call shutdown when you no longer want to submit any tasks.

The ScheduledExecutorService interface has methods for scheduled or repeated ex­ecution of tasks. It is a generalization of java.util.Timer that allows for thread pooling. The newScheduledThreadPool and newSingleThreadScheduledExecutor methods of the Executors class return objects that implement the ScheduledExecutorService interface.

You can schedule a Runnable or Callable to run once, after an initial delay. You can also schedule a Runnable to run periodically. See the API notes for details.

3. Controlling Groups of Tasks

You have seen how to use an executor service as a thread pool to increase the efficiency of task execution. Sometimes, an executor is used for a more tactical reason—simply to control a group of related tasks. For example, you can cancel all tasks in an executor with the shutdownNow method.

The invokeAny method submits all objects in a collection of Callable objects and returns the result of a completed task. You don’t know which task that is—presumably, it is the one that finished most quickly. Use this method for a search problem in which you are willing to accept any solution. For example, suppose that you need to factor a large integer—a computation that is required for breaking the RSA cipher. You could submit a number of tasks, each at­tempting a factorization with numbers in a different range. As soon as one of these tasks has an answer, your computation can stop.

The invokeAll method submits all objects in a collection of Callable objects, blocks until all of them complete, and returns a list of Future objects that represent the solutions to all tasks. You can process the results of the computation, when they are available, like this:

List<Callable<T>> tasks = . . .;

List<Future<T>> results = executor.invokeAll(tasks);

for (Future<T> result : results)

processFurther(result.get());

In the for loop, the first call result.get() blocks until the first result is available. That is not a problem if all tasks finish in about the same time. However, it may be worth obtaining the results in the order in which they are available. This can be arranged with the ExecutorCompletionService.

Start with an executor, obtained in the usual way. Then construct an ExecutorCompletionService. Submit tasks to the completion service. The service manages a blocking queue of Future objects, containing the results of the submitted tasks as they become available. Thus, a more efficient organization for the preceding computation is the following:

var service = new ExecutorCompletionService<T>(executor);

for (Callable<T> task : tasks) service.submit(task);

for (int i = 0; i < tasks.size(); i++)

processFurther(service.take().get());

The program in Listing 12.8 shows how to use callables and executors. In the first computation, we count how many files in a directory tree contain a given word. We make a separate task for each file:

Set<Path> files = descendants(Path.of(start));

var tasks = new ArrayList<Callable<Long>>();

for (Path file : files)

{

Callable<Long> task = () -> occurrences(word, file);

tasks.add(task);

}

Then we pass the tasks to an executor service:

ExecutorService executor = Executors.newCachedThreadPool();

List<Future<Long>> results = executor.invokeAll(tasks);

To get the combined count, we add all results, blocking until they are available:

long total = 0;

for (Future<Long> result : results)

total += result.get();

The program also displays the time spent during the search. Unzip the source code for the JDK somewhere and run the search. Then replace the executor service with a single-thread executor and try again to see whether the concurrent computation was faster.

In the second part of the program, we search for the first file that contains the given word. We use invokeAny to parallelize the search. Here, we have to be more careful about formulating the tasks. The invokeAny method terminates as soon as any task returns. So we cannot have the search tasks return a boolean to indicate success or failure. We don’t want to stop searching when a task failed. Instead, a failing task throws a NoSuchElementException. Also, when one task has succeeded, the others are canceled. Therefore, we monitor the inter­rupted status. If the underlying thread is interrupted, the search task prints a message before terminating, so that you can see that the cancellation is effective.

public static Callable<Path> searchForTask(String word, Path path)

{

return () -> {

try (var in = new Scanner(path))

{

while (in.hasNext())

{

if (in.next().equals(word)) return path;

if (Thread.currentThread().isInterrupted())

{

System.out.println(“Search in ” + path + ” canceled.”); return null;

}

}
throw new NoSuchElementException();

}

};

}

For informational purposes, this program prints out the largest pool size during execution. This information is not available through the ExecutorService interface. For that reason, we had to cast the pool object to the ThreadPoolExecutor class.

4. The Fork-Join Framework

Some applications use a large number of threads that are mostly idle. An example would be a web server that uses one thread per connection. Other applications use one thread per processor core, in order to carry out compu­tationally intensive tasks, such as image or video processing. The fork-join framework, which appeared in Java 7, is designed to support the latter. Sup­pose you have a processing task that naturally decomposes into subtasks, like this:

if (problemSize < threshold)

solve problem directly

else

{

break problem into subproblems

recursively solve each subproblem

combine the results

}

One example is image processing. To enhance an image, you can transform the top half and the bottom half. If you have enough idle processors, those operations can run in parallel. (You will need to do a bit of extra work along the strip that separates the two halves, but that’s a technical detail.)

Here, we discuss a simpler example. Suppose we want to count how many elements of an array fulfill a particular property. We cut the array in half, compute the counts of each half, and add them up.

To put the recursive computation in a form that is usable by the framework, supply a class that extends RecursiveTask<T> (if the computation produces a result of type T) or RecursiveAction (if it doesn’t produce a result). Override the compute method to generate and invoke subtasks, and to combine their results.

class Counter extends RecursiveTask<Integer>

{

protected Integer compute()

{

if (to – from < THRESHOLD)

{

solve problem directly

}

else

{

int mid = (from + to) / 2;

var first = new Counter(values, from, mid, filter);

var second = new Counter(values, mid, to, filter);

invokeAll(first, second);

return first.join() + second.join();

}

}

}

Here, the invokeAll method receives a number of tasks and blocks until all of them have completed. The join method yields the result. Here, we apply join to each subtask and return the sum.

Listing 12.9 shows the complete example.

Behind the scenes, the fork-join framework uses an effective heuristic, called work stealing, for balancing the workload among available threads. Each worker thread has a deque (double-ended queue) for tasks. A worker thread pushes subtasks onto the head of its own deque. (Only one thread accesses the head, so no locking is required.) When a worker thread is idle, it “steals” a task from the tail of another deque. Since large subtasks are at the tail, such stealing is rare.

Source: Horstmann Cay S. (2019), Core Java. Volume I – Fundamentals, Pearson; 11th edition.

Leave a Reply

Your email address will not be published. Required fields are marked *