Thread-Safe Collections in Java

If multiple threads concurrently modify a data structure, such as a hash table, it is easy to damage that data structure. (See Chapter 9 for more information on hash tables.) For example, one thread may begin to insert a new element. Suppose it is preempted in the middle of rerouting the links between the hash table’s buckets. If another thread starts traversing the same list, it may follow invalid links and create havoc, perhaps throwing exceptions or getting trapped in an infinite loop.

You can protect a shared data structure by supplying a lock, but it is usually easier to choose a thread-safe implementation instead. In the following sec­tions, we discuss the other thread-safe collections that the Java library provides.

1. Blocking Queues

Many threading problems can be formulated elegantly and safely by using one or more queues. Producer threads insert items into the queue, and con­sumer threads retrieve them. The queue lets you safely hand over data from one thread to another. For example, consider our bank transfer program. In­stead of accessing the bank object directly, the transfer threads insert transfer instruction objects into a queue. Another thread removes the instructions from the queue and carries out the transfers. Only that thread has access to the internals of the bank object. No synchronization is necessary. (Of course, the implementors of the thread-safe queue classes had to worry about locks and conditions, but that was their problem, not yours.)

A blocking queue causes a thread to block when you try to add an element when the queue is currently full or to remove an element when the queue is empty. Blocking queues are a useful tool for coordinating the work of multiple threads. Worker threads can periodically deposit intermediate results into a blocking queue. Other worker threads remove the intermediate results and modify them further. The queue automatically balances the workload. If the first set of threads runs slower than the second, the second set blocks while waiting for the results. If the first set of threads runs faster, the queue fills up until the second set catches up. Table 12.1 shows the methods for blocking queues.

The blocking queue methods fall into three categories that differ by the action they perform when the queue is full or empty. If you use the queue as a thread management tool, use the put and take methods. The add, remove, and element operations throw an exception when you try to add to a full queue or get the head of an empty queue. Of course, in a multithreaded program, the queue might become full or empty at any time, so you will instead want to use the offer, poll, and peek methods. These methods simply return with a failure indicator instead of throwing an exception if they cannot carry out their tasks.

There are also variants of the offer and poll methods with a timeout. For example, the call

boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);

tries for 100 milliseconds to insert an element to the tail of the queue. If it succeeds, it returns true; otherwise, it returns false when it times out. Similarly, the call

Object head = q.poll(100, TimeUnit.MILLISECONDS);

tries for 100 milliseconds to remove the head of the queue. If it succeeds, it returns the head; otherwise, it returns null when it times out.

The put method blocks if the queue is full, and the take method blocks if the queue is empty. These are the equivalents of offer and poll with no timeout.

The java.util.concurrent package supplies several variations of blocking queues. By default, the LinkedBlockingQueue has no upper bound on its capacity, but a maximum capacity can be optionally specified. The LinkedBlockingDeque is a double- ended version. The ArrayBlockingQueue is constructed with a given capacity and an optional parameter to require fairness. If fairness is specified, then the longest-waiting threads are given preferential treatment. As always, fairness exacts a significant performance penalty, and you should only use it if your problem specifically requires it.

The PriorityBtockingQueue is a priority queue, not a first-in/first-out queue. Ele­ments are removed in order of their priority. The queue has unbounded ca­pacity, but retrieval will block if the queue is empty. (See Chapter 9 for more information on priority queues.)

A DetayQueue contains objects that implement the Delayed interface:

interface Delayed extends Comparable<Delayed>

{

long getDelay(TimeUnit unit);

}

The getDelay method returns the remaining delay of the object. A negative value indicates that the delay has elapsed. Elements can only be removed from a DelayQueue if their delay has elapsed. You also need to implement the compareTo method. The DelayQueue uses that method to sort the entries.

Java 7 adds a TransferQueue interface that allows a producer thread to wait until a consumer is ready to take on an item. When a producer calls

q.transfer(item);

the call blocks until another thread removes it. The LinkedTransferQueue class implements this interface.

The program in Listing 12.6 shows how to use a blocking queue to control a set of threads. The program searches through all files in a directory and its subdirectories, printing lines that contain a given keyword.

A producer thread enumerates all files in all subdirectories and places them in a blocking queue. This operation is fast, and the queue would quickly fill up with all files in the file system if it was not bounded.

We also start a large number of search threads. Each search thread takes a file from the queue, opens it, prints all lines containing the keyword, and then takes the next file. We use a trick to terminate the application when no further work is required. In order to signal completion, the enumeration thread places a dummy object into the queue. (This is similar to a dummy suitcase with a label “last bag” in a baggage claim belt.) When a search thread takes the dummy, it puts it back and terminates.

Note that no explicit thread synchronization is required. In this application, we use the queue data structure as a synchronization mechanism.

2. Efficient Maps, Sets, and Queues

The java.util.concurrent package supplies efficient implementations for maps, sorted sets, and queues: ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, and ConcurrentLinkedQueue.

These collections use sophisticated algorithms that minimize contention by allowing concurrent access to different parts of the data structure.

Unlike most collections, the size method of these classes does not necessarily operate in constant time. Determining the current size of one of these collections usually requires traversal.

The collections return weakly consistent iterators. That means that the iterators may or may not reflect all modifications that are made after they were con­structed, but they will not return a value twice and they will not throw a ConcurrentModificationException.

The concurrent hash map can efficiently support a large number of readers and a bounded number of writers.

3. Atomic Update of Map Entries

The original version of ConcurrentHashMap only had a few methods for atomic updates, which made for somewhat awkward programming. Suppose we want to count how often certain features are observed. As a simple example, suppose multiple threads encounter words, and we want to count their frequencies.

Can we use a ConcurrentHashMap<String, Long>? Consider the code for incrementing a count. Obviously, the following is not thread-safe:

Long oldValue = map.get(word);

Long newValue = oldValue == null ? 1 : oldValue + 1;

map.put(word, newValue); // ERROR–might not replace oldValue

Another thread might be updating the exact same count at the same time.

In old versions of Java, it was necessary to use the replace method, which atomically replaces an old value with a new one, provided that no other thread has come before and replaced the old value with something else. You had to keep doing it until the attempt succeeded:

do

{

oldValue = map.get(word);

newValue = oldValue == null ? 1 : oldValue + 1;

}

while (!map.replace(word, oldValue, newValue));

An alternative was to use a ConcurrentHashMap<String, AtomicLong> and the following update code:

map.putIfAbsent(word, new AtomicLong());

map.get(word).incrementAndGet();

Unfortunately, a new AtomicLong is constructed for each increment, whether or not it is needed.

Nowadays, the Java API provides methods that make atomic updates more convenient. The compute method is called with a key and a function to compute the new value. That function receives the key and the associated value, or null if there is none, and it computes the new value. For example, here is how we can update a map of integer counters:

map.compute(word, (k, v) -> v == null ? 1 : v + 1);

There are also variants computeIfPresent and computeIfAbsent that only compute a new value when there is already an old one, or when there isn’t yet one. A map of LongAdder counters can be updated with

map.computeIfAbsent(word, k -> new LongAdder()).increment();

That is almost like the call to putIfAbsent that you saw before, but the LongAdder constructor is only called when a new counter is actually needed.

You often need to do something special when a key is added for the first time. The merge method makes this particularly convenient. It has a parameter for the initial value that is used when the key is not yet present. Otherwise, the function that you supplied is called, combining the existing value and the initial value. (Unlike compute, the function does not process the key.)

map.merge(word, 1L, (existingValue, newValue) -> existingValue + newValue);

or simply

map.merge(word, 1L, Long::sum);

It doesn’t get more concise than that.

The program in Listing 12.7 uses a concurrent hash map to count all words in the Java files of a directory tree.

Listing 12.7 concurrentHashMap/CHMDemo.java

4. Bulk Operations on Concurrent Hash Maps

The Java API provides bulk operations on concurrent hash maps that can safely execute even while other threads operate on the map. The bulk opera­tions traverse the map and operate on the elements they find as they go along. No effort is made to freeze a snapshot of the map in time. Unless you happen to know that the map is not being modified while a bulk operation runs, you should treat its result as an approximation of the map’s state.

There are three kinds of operations:

  • search applies a function to each key and/or value, until the function yields a non-null result. Then the search terminates and the function’s result is returned.
  • reduce combines all keys and/or values, using a provided accumulation function.
  • forEach applies a function to all keys and/or values.

Each operation has four versions:

  • operationKeys: operates on keys.
  • operationValues: operates on values.
  • operation: operates on keys and values.
  • operationEntries: operates on Map.Entry objects.

With each of the operations, you need to specify a parallelism threshold. If the map contains more elements than the threshold, the bulk operation is paral­lelized. If you want the bulk operation to run in a single thread, use a threshold of Long.MAX_VALUE. If you want the maximum number of threads to be made available for the bulk operation, use a threshold of 1.

Let’s look at the search methods first. Here are the versions:

U searchKeys(tong threshold, BiFunction<? super K, ? extends U> f)

U searchValues(long threshold, BiFunction<? super V, ? extends U> f)

U search(long threshold, BiFunction<? super K, ? super V,? extends U> f)

U searchEntries(long threshold, BiFunction<Map.Entry<K, V>, ? extends U> f)

For example, suppose we want to find the first word that occurs more than 1,000 times. We need to search keys and values:

String result = map.search(threshold, (k, v) -> v > 1000 ? k : null);

Then result is set to the first match, or to null if the search function returns null for all inputs.

The forEach methods have two variants. The first one simply applies a consumer function for each map entry, for example

map.forEach(threshold,

(k, v) -> System.out.println(k + ” -> ” + v));

The second variant takes an additional transformer function, which is applied first, and its result is passed to the consumer:

map.forEach(threshold,

(k, v) -> k + ” -> ” + v, // transformer System.out::println); // consumer

The transformer can be used as a filter. Whenever the transformer returns null, the value is silently skipped. For example, here we only print the entries with large values:

map.forEach(threshold,

(k, v) -> v > 1000 ? k + ” -> ” + v : null, // filter and transformer

System.out::println); // the nulls are not passed to the consumer

The reduce operations combine their inputs with an accumulation function. For example, here is how you can compute the sum of all values:

Long sum = map.reduceValues(threshold, Long::sum);

As with forEach, you can also supply a transformer function. Here we compute the length of the longest key:

Integer maxlength = map.reduceKeys(threshold,

String::length, // transformer

Integer::max); // accumulator

The transformer can act as a filter, by returning null to exclude unwanted inputs. Here, we count how many entries have value > 1000:

Long count = map.reduceValues(threshold,

v -> v > 1000 ? 1L : null,

Long::sum);

There are specializations for int, long, and double outputs with suffixes ToInt, ToLong, and ToDoubte. You need to transform the input to a primitive value and specify a default value and an accumulator function. The default value is returned when the map is empty.

long sum = map.reduceValuesToLong(threshold,

Long::longValue, // transformer to primitive type

0, // defautt vatue for empty map

Long::sum); // primitive type accumulator

5. Concurrent Set Views

Suppose you want a large, thread-safe set instead of a map. There is no ConcurrentHashSet class, and you know better than trying to create your own. Of course, you can use a ConcurrentHashMap with bogus values, but then you get a map, not a set, and you can’t apply operations of the Set interface.

The static newKeySet method yields a Set<K> that is actually a wrapper around a ConcurrentHashMap<K, Boolean>. (All map values are Boolean.TRUE, but you don’t actually care since you just use it as a set.)

Set<String> words = ConcurrentHashMap.<String>newKeySet();

Of course, if you have an existing map, the keySet method yields the set of keys. That set is mutable. If you remove the set’s elements, the keys (and their values) are removed from the map. But it doesn’t make sense to add elements to the key set, because there would be no corresponding values to add. There is a second keySet method to ConcurrentHashMap, with a default value, to be used when adding elements to the set:

Set<String> words = map.keySet(lL);

words.add(“Java”);

If “Java” wasn’t already present in words, it now has a value of one.

6. Copy on Write Arrays

The CopyOnWriteArrayList and CopyOnWriteArraySet are thread-safe collections in which all mutators make a copy of the underlying array. This arrangement is useful if the threads that iterate over the collection greatly outnumber the threads that mutate it. When you construct an iterator, it contains a reference to the current array. If the array is later mutated, the iterator still has the old array, but the collection’s array is replaced. As a consequence, the older iterator has a consistent (but potentially outdated) view that it can access without any synchronization expense.

7. Parallel Array Algorithms

The Arrays class has a number of parallelized operations. The static Arrays .parattetSort method can sort an array of primitive values or objects. For example,

var contents = new String(Fites.readAttBytes(

Path.of(“atice.txt”)), StandardCharsets.UTF_8); // read file into string

String[] words = contents.sptit(“[\\P{L}]+”); // split along nontetters

Arrays.parattetSort(words);

When you sort objects, you can supply a Comparator.

Arrays.parattetSort(words, Comparator.comparing(String::tength));

With all methods, you can supply the bounds of a range, such as

Arrays.parattetSort(words.tength / 2, words.tength); // sort the upper half

The parallelSetAll method fills an array with values that are computed from a function. The function receives the element index and computes the value at that location.

Arrays.parallelSetAll(values, i -> i % 10);

// fills values with 0 1 2 3 4 5 6 7 8 9 0 1 2 . . .

Clearly, this operation benefits from being parallelized. There are versions for all primitive type arrays and for object arrays.

Finally, there is a parallelPrefix method that replaces each array element with the accumulation of the prefix for a given associative operation. Huh? Here is an example. Consider the array [1, 2, 3, 4, . . .] and the x operation. After executing Arrays.parallelPrefix(values, (x, y) -> x * y), the array contains

[1, 1

x 2, 1

x 2

x 3, 1 x 2 x 3

x 4, . . .]

Perhaps surprisingly, this computation can be parallelized. First, join neighboring elements, as indicated here:

[1, 1

x 2, 3, 3

x 4, 5, 5

x 6, 7, 7

x 8]

The gray values are left alone. Clearly, one can make this computation in parallel in separate regions of the array. In the next step, update the indicated elements by multiplying them with elements that are one or two positions below:

[1, 1 x 2

, 1 x 2 x 3, 1 x 2 x 3 x 4, 5, 5 x 6, 5 x 6 x 7, 5 x 6 x 7 x 8]

This, again, can be done in parallel. After log n steps, the process is com­plete. This is a win over the straightforward linear computation if sufficient processors are available. On special-purpose hardware, this algorithm is commonly used, and users of such hardware are quite ingenious in adapting it to a variety of problems.

8. Older Thread-Safe Collections

Ever since the initial release of Java, the Vector and Hashtable classes provided thread-safe implementations of a dynamic array and a hash table. These classes are now considered obsolete, having been replaced by the ArrayList and HashMap classes. Those classes are not thread-safe. Instead, a different mechanism is supplied in the collections library. Any collection class can be made thread-safe by means of a synchronization wrapper:

List<E> synchArrayList = Cottections.synchronizedList(new ArrayList<E>());

Map<K, V> synchHashMap = Cottections.synchronizedMap(new HashMap<K, V>());

The methods of the resulting collections are protected by a lock, providing thread-safe access.

You should make sure that no thread accesses the data structure through the original unsynchronized methods. The easiest way to ensure this is not to save any reference to the original object. Simply construct a collection and immediately pass it to the wrapper, as we did in our examples.

You still need to use “client-side” locking if you want to iterate over the collection while another thread has the opportunity to mutate it:

synchronized (synchHashMap)

{

Iterator<K> iter = synchHashMap.keySet().iterator();

white (iter.hasNext()) . . .;

}

You must use the same code if you use a “for each” loop because the loop uses an iterator. Note that the iterator actually fails with a ConcurrentModificationException if another thread mutates the collection while the iteration is in progress. The synchronization is still required so that the concurrent modification can be reliably detected.

You are usually better off using the collections defined in the java.utit.concurrent package instead of the synchronization wrappers. In particular, the ConcurrentHashMap has been carefully implemented so that multiple threads can access it without blocking each other, provided they access different buckets. One exception is an array list that is frequently mutated. In that case, a synchronized ArrayList can outperform a CopyOnWriteArrayList.

Source: Horstmann Cay S. (2019), Core Java. Volume I – Fundamentals, Pearson; 11th edition.

Leave a Reply

Your email address will not be published. Required fields are marked *