Asynchronous Computations in Java

So   far, our approach to concurrent computation has been to break up a task, and then wait until all pieces have completed. But waiting is not always a good idea. In the following sections, you will see how to implement wait-free, or asynchronous, computations.

1. Completable Futures

When you have a Future object, you need to call get to obtain the value, blocking until the value is available. The ComptetabteFuture class implements the Future interface, and it provides a second mechanism for obtaining the result. You register a callback that will be invoked (in some thread) with the result once it is available.

ComptetabteFuture<String> f = …;

f.thenAccept(s -> Process the result string s);

In this way, you can process the result without blocking once it is available.

There are a few API methods that return CompletableFuture objects. For example, you can fetch a web page asynchronously with the experimental HttpCtient class that you will encounter in Chapter 4 of Volume II:

HttpClient client = HttpCUent.newHttpCUent();

HttpRequest request = HttpRequest.newBuitder(URI.create(urtString)).GET().buitd();

CompletableFuture<HttpResponse<String>> f = client.sendAsync(

request, BodyHandler.asString());

It is nice if there is a method that produces a ready-made ComptetabteFuture, but most of the time, you need to make your own. To run a task asynchronously and obtain a CompletableFuture, you don’t submit it directly to an executor service. Instead, you call the static method CompletableFuture.supplyAsync. Here is how to read the web page without the benefit of the HttpClient class:

public CompletableFuture<String> readPage(URL url)

{

return CompletableFuture.supplyAsync(() ->

{

try

{

return new String(url.openStreamO.readAUBytesO, “UTF-8”);

}

catch (IOException e)

{

throw new UncheckedIOException(e);

}

}, executor);

}

If you omit the executor, the task is run on a default executor (namely the executor returned by ForkJoinPool.commonPoolU). You usually don’t want to do that.

A CompletableFuture can complete in two ways: either with a result, or with an uncaught exception. In order to handle both cases, use the whenComplete method. The supplied function is called with the result (or null if none) and the exception (or null if none).

f.whenComplete((s, t) -> {

if (t == null) { Process the result s; }

else { Process the Throwable t; }

});

The CompletableFuture is called completable because you can manually set a completion value. (In other concurrency libraries, such an object is called a promise.) Of course, when you create a CompletableFuture with supplyAsync, the completion value is implicitly set when the task has finished. But setting the result explicitly gives you additional flexibility. For example, two tasks can work simultaneously on computing an answer:

var f = new CompletableFuture<Integer>();

executor.execute(() ->

{

int n = workHard(arg);

f.complete(n);

});

executor.execute(() ->

{

int n = workSmart(arg);

f.complete(n);

});

To instead complete a future with an exception, call

Throwable t = . . .;

f.completeExceptionally(t);

The isDone method tells you whether a Future object has been completed (nor­mally or with an exception). In the preceding example, the workHard and workSmart methods can use that information to stop working when the result has been determined by the other method.

2. Composing Completable Futures

Nonblocking calls are implemented through callbacks. The programmer reg­isters a callback for the action that should occur after a task completes. Of course, if the next action is also asynchronous, the next action after that is in a different callback. Even though the programmer thinks in terms of “first do step 1, then step 2, then step 3,” the program logic can become dispersed in “callback hell.” It gets even worse when one has to add error handling. Suppose step 2 is “the user logs in.” You may need to repeat that step since the user can mistype the credentials. Trying to implement such a control flow in a set of callbacks, or to understand it once it has been implemented, can be quite challenging.

The ComptetabteFuture class solves this problem by providing a mechanism for composing asynchronous tasks into a processing pipeline.

For example, suppose we want to extract all images from a web page. Let’s say we have a method

pubtic ComptetabteFuture<String> readPage(URL urt)

that yields the text of a web page when it becomes available. If the method

pubtic List<URL> getImageURLs(String page)

yields the URLs of images in an HTML page, you can schedule it to be called when the page is available:

ComptetabteFuture<String> contents = readPage(urt);

ComptetabteFuture<List<URL>> imageURLs = contents.thenAppty(this::getLinks);

The thenAppty method doesn’t block either. It returns another future. When the first future has completed, its result is fed to the getImageURLs method, and the return value of that method becomes the final result.

With completable futures, you just specify what you want to have done and in which order. It won’t all happen right away, of course, but what is important is that all the code is in one place.

Conceptually, ComptetabteFuture is a simple API, but there are many variants of methods for composing completable futures. Let us first look at those that deal with a single future (see Table 12.3). (For each method shown, there are also two Async variants that I don’t show. One of them uses a shared ForkJoinPoot, and the other has an Executor parameter.) In the table, I use a shorthand nota­tion for the ponderous functional interfaces, writing T -> U instead of Function<? super T, U>. These aren’t actual Java types, of course.

You have already seen the thenAppty method. Suppose f is a function that receives values of type T and returns values of type U. The calls

ComptetabteFuture<U> future.thenAppty(f);

ComptetabteFuture<U> future.thenApptyAsync(f);

return a future that applies the function f to the result of future when it is available. The second call runs f in yet another thread.

The thenCompose method, instead of taking a function mapping the type T to the type U, receives a function mapping T to ComptetabteFuture<U>. That sounds rather abstract, but it can be quite natural. Consider the action of reading a web page from a given URL. Instead of supplying a method

public String blockingReadPage(URL url)

it is more elegant to have that method return a future:

public CompletableFuture<String> readPage(URL url)

Now, suppose we have another method that gets the URL from user input, perhaps from a dialog that won’t reveal the answer until the user has clicked the OK button. That, too, is an event in the future:

public CompletableFuture<URL> getURLInput(String prompt)

Here we have two functions T -> CompletableFuture<U> and U -> CompletableFuture<V>. Clearly, they compose to a function T -> ComptetabteFuture<V> if the second function is called when the first one has completed. That is exactly what thenCompose does.

In the preceding section, you saw the whenComptete method for handling excep­tions. There is also a handte method that requires a function processing the result or exception and computing a new result. In many cases, it is simpler to call the exceptionatty method instead. That method computes a dummy value when an exception occurs:

CompletableFuture<List<URL>> imageURLs = readPage(url)

.exceptionally(ex -> “<html></html>”)

.thenApply(this::getImageURLs)

You can handle a timeout in the same way:

CompletableFuture<List<URL>> imageURLs = readPage(url)

.completeOnTimeout(“<html></html>”, 30, TimeUnit.SECONDS)

.thenApply(this::getImageURLs)

Alternatively, you can throw an exception on timeout:

CompletableFuture<String> = readPage(url).orTimeout(30, TimeUnit.SECONDS)

The methods in Table 12.3 with void result are normally used at the end of a processing pipeline.

Now let us turn to methods that combine multiple futures (see Table 12.4).

The first three methods run a CompletableFuture<T> and a CompletableFuture<U> action concurrently and combine the results.

The next three methods run two CompletableFuture<T> actions concurrently. As soon as one of them finishes, its result is passed on, and the other result is ignored.

Finally, the static attOf and anyOf methods take a variable number of completable futures and yield a ComptetabteFuture<Void> that completes when all of them, or any one of them, completes. The attOf method does not yield a result. The anyOf method does not terminate the remaining tasks.

Listing 12.10 shows a complete program that reads a web page, scans it for images, loads the images and saves them locally. Note how all time-consuming methods return a ComptetabteFuture. To kick off the asynchronous computation, we use a little trick. Rather than calling the readPage method directly, we make a completed future with the URL argument, and then compose that future with this::readPage. That way, the pipeline has a very uniform appearance:

CompletableFuture.completedFuture(url)

.thenComposeAsync(this::readPage, executor)

.thenApply(this::getImageURLs)

.thenCompose(this::getImages)

.thenAccept(this::saveImages);

3. Long-Running Tasks in User Interface Callbacks

One of the reasons to use threads is to make your programs more responsive. This is particularly important in an application with a user interface. When your program needs to do something time-consuming, you cannot do the work in the user-interface thread, or the user interface will be frozen. Instead, fire up another worker thread.

For example, if you want to read a file when the user clicks a button, don’t do this:

var open = new JButton(“Open”);

open.addActionListener(event ->

{ // BAD–tong-running action is executed on UI thread

var in = new Scanner(fite);

white (in.hasNextLine())

{

String tine = in.nextLine();

}

});

Instead, do the work in a separate thread.

open.addActionListener(event ->

{ // GOOD–tong-running action in separate thread

Runnable task = () ->

{

var in = new Scanner(fite);

white (in.hasNextLine())

{

String tine = in.nextLine();

}

};

executor.execute(task);

});

However, you cannot directly update the user interface from the worker thread that executes the long-running task. User interfaces such as Swing, JavaFX, or Android are not thread-safe. You cannot manipulate user interface elements from multiple threads, or they risk becoming corrupted. In fact, JavaFX and Android check for this, and throw an exception if you try to access the user interface from a thread other than the UI thread.

Therefore, you need to schedule any UI updates to happen on the UI thread. Each user interface library provides some mechanism to schedule a Runnabte for execution on the UI thread. For example, in Swing, you call

EventQueue.invokeLater(() -> tabet.setText(percentage + “% complete”));

It is tedious to implement user feedback in a worker thread, so each user in­terface library provides some kind of helper class for managing the details, such as SwingWorker in Swing, Task in JavaFX, and AsyncTask in Android. You specify actions for the long-running task (which is run on a separate thread), as well as progress updates and the final disposition (which are run on the UI thread).

The program in Listing 12.11 has commands for loading a text file and for canceling the file loading process. You should try the program with a long file, such as the full text of The Count of Monte Cristo, supplied in the gutenberg directory of the book’s companion code. The file is loaded in a separate thread. While the file is being read, the Open menu item is disabled and the Cancel item is enabled (see Figure 12.6). After each line is read, a line counter in the status bar is updated. After the reading process is complete, the Open menu item is reenabled, the Cancel item is disabled, and the status line text is set to Done.

This example shows the typical UI activities of a background task:

  • After each work unit, update the UI to show progress.
  • After the work is finished, make a final change to the UI.

The SwingWorker class makes it easy to implement such a task. Override the doInBackground method to do the time-consuming work and occasionally call publish to communicate work progress. This method is executed in a worker thread. The publish method causes a process method to execute in the event dispatch thread to deal with the progress data. When the work is complete, the done method is called in the event dispatch thread so that you can finish updating the UI.

Whenever you want to do some work in the worker thread, construct a new worker. (Each worker object is meant to be used only once.) Then call the execute method. You will typically call execute on the event dispatch thread, but that is not a requirement.

It is assumed that a worker produces a result of some kind; therefore, SwingWorker<T, V> implements Future<T>. This result can be obtained by the get method of the Future interface. Since the get method blocks until the result is available, you don’t want to call it immediately after calling execute. It is a good idea to call it only when you know that the work has been completed. Typi­cally, you call get from the done method. (There is no requirement to call get. Sometimes, processing the progress data is all you need.)

Both the intermediate progress data and the final result can have arbitrary types. The SwingWorker class has these types as type parameters. A SwingWorker<T, V> produces a result of type T and progress data of type V.

To cancel the work in progress, use the cancel method of the Future interface. When the work is canceled, the get method throws a CancellationException.

As already mentioned, the worker thread’s call to publish will cause calls to process on the event dispatch thread. For efficiency, the results of several calls to publish may be batched up in a single call to process. The process method receives a List<V> containing all intermediate results.

Let us put this mechanism to work for reading in a text file. As it turns out, a JTextArea is quite slow. Appending lines from a long text file (such as all lines in The Count of Monte Cristo) takes considerable time.

To show the user that progress is being made, we want to display the number of lines read in a status line. Thus, the progress data consist of the current line number and the current line of text. We package these into a trivial inner class:

private class ProgressData

{

public int number;

public String line;

}

The final result is the text that has been read into a StringBuilder. Thus, we need a SwingWorker<StringBuilder, ProgressData>.

In the doInBackground method, we read a file, a line at a time. After each line, we call publish to publish the line number and the text of the current line.

@Override public StringBuilder doInBackground() throws IOException, InterruptedException

{

int lineNumber = 0;

var in = new Scanner(new FileInputStream(file), StandardCharsets.UTF_8);

while (in.hasNextLine())

{

String line = in.nextLine(); lineNumber++;

text.append(line).append(“\n”);

var data = new ProgressData();

data.number = lineNumber;

data.line = line; publish(data);

Thread.sleep(1); // to test cancellation; no need to do this in your programs

}

return text;

}

We also sleep for a millisecond after every line so that you can test cancella­tion without getting stressed out, but you wouldn’t want to slow down your own programs by sleeping. If you comment out this line, you will find that The Count of Monte Cristo loads quite quickly, with only a few batched user interface updates.

In the process method, we ignore all line numbers but the last one, and we concatenate all lines for a single update of the text area.

@Override public void process(List<ProgressData> data)

{

if (isCancelled()) return; var b = new StringBuilder();

statusLine.setText(“” + data.get(data.size() – 1).number);

for (ProgressData d : data) b.append(d.line).append(“\n”);

textArea.append(b.toString());

}

In the done method, the text area is updated with the complete text, and the Cancel menu item is disabled.

Note how the worker is started in the event listener for the Open menu item.

This simple technique allows you to execute time-consuming tasks while keeping the user interface responsive.

Source: Horstmann Cay S. (2019), Core Java. Volume I – Fundamentals, Pearson; 11th edition.

Leave a Reply

Your email address will not be published. Required fields are marked *