Threads Synchronization in Unix/Linux

Since threads execute in the same address space of a process, they share all the global variables and data structures in the same address space. When several threads try to modify the same shared variable or data structure, if the outcome depends on the execution order of the threads, it is called a race condition. In concurrent programs, race conditions must not exist. Otherwise, the results may be inconsistent. In addition to the join operation, concurrently executing threads often need to cooperate with one another. In order to prevent race conditions, as well as to support threads cooperation, threads need synchronization. In general, synchronization refers to the mechanisms and rules used to ensure the integrity of shared data objects and coordination of concurrently executing entities. It can be applied to either processes in kernel mode or threads in user mode. In the following, we shall discuss the specific problem of threads synchronization in Pthreads.

1. Mutex Locks

The simplest kind of synchronizing tool is a lock, which allows an execution entity to proceed only if it possesses the lock. In Pthreads, locks are called mutex, which stands for Mutual Exclusion. Mutex variables are declared with the type pthread_mutex_t, and they must be initialized before using. There are two ways to initialize a mutex variable.

(1) Statically, as in

pthread_mutex_t m = PTHREADMUTEXINITIALIZER;

which defines a mutex variable m and initializes it with default attributes.

(2) Dynamically with the pthread_mutex_init() function, which allows setting the mutex attributes by an attr parameter, as in

pthread_mutex_init (pthread_mutex_t *m, pthread_mutexattr_t,*attr);

As usual, the attr parameter can be set to NULL for default attributes.

After initialization, mutex variables can be used by threads via the following functions.

int pthread_mutex_lock (pthread_mutex_t *m);

int pthread_mutex_unlock (pthread_mutex_t *m);

int pthread_mutex_trylock (pthread_mutex_t *m);

int pthread_mutex_destroy (pthread_mutex_t *m);

Threads use mutexes as locks to protect shared data objects. A typical usage of mutex is as follows. A thread first creates a mutex and initializes it once. A newly created mutex is in the unlocked state and without an owner. Each thread tries to access a shared data object by

pthread_mutex_lock(&m); // lock mutex

access shared data object; // access shared data in a critical region

pthread_mutex_unlock(&m); // unlock mutex

When a thread executes pthread_mutex_lock(&m), if the mutex is unlocked, it locks the mutex, becomes the mutex owner and continues. Otherwise, it becomes blocked and waits in the mutex waiting queue. Only the thread which has acquired the mutex lock can access the shared data object. A sequence of executions which can only be performed by one execution entity at a time is commonly known as a Critical Region (CR). In Pthreads, mutexes are used as locks to protect Critical Regions to ensure at most one thread can be inside a CR at any time. When the thread finishes with the shared data object, it exits the CR by calling pthread_mutex_unlock(&m) to unlock the mutex. A locked mutex can only be unlocked by the current owner. When unlocking a mutex, if there are no blocked threads in the mutex waiting queue, it unlocks the mutex and the mutex has no owner. Otherwise, it unblocks a waiting thread from the mutex waiting queue, which becomes the new owner, and the mutex remains locked. When all the threads are finished, the mutex may be destroyed if it was dynamically allocated. We demonstrate threads synchronization using mutex lock by an example.

Example 4.3: This example is a modified version of Example 4.1. As before, we shall use N working threads to compute the sum of all the elements of an N x N matrix of integers. Each working thread computes the partial sum of a row. Instead of depositing the partial sums in a global sum[ ] array, each working thread tries to update a global variable, total, by adding its partial sum to it. Since all the working threads try to update the same global variable, they must be synchronized to prevent race conditions. This can be achieved by a mutex lock, which ensures that only one working thread can update the total variable at a time in a Critical Region. The following shows the Example Program C4.3.

/** C4.3.c: matrix sum by threads with mutex lock **/

#include <stdio.h>

#include <stdlib.h>

#include <pthread.h>

#define N 4

int A[N][N];

int total =0;                 // global total

pthread_mutex_t *m;           // mutex pointer

void *func(void *arg)         // working thread function

{

int i, row, sum = 0;

pthread_t tid = pthread_self(); // get thread ID number

row = (int)arg; // get row number from arg

printf(“Thread %d [%lu] computes sum of row %d\n”, row, tid,

for (i=0; i<N; i++)      // compute partial sum of A

in

sum += A

[i];

printf(“Thread %d [%lu] update total with %d : “, row, tid,

pthread_mutx_lock(m);

total += sum;            // update global total inside a CR

pthread_mutex_unlock(m);

printf(“total = %d\n”, total);

}

int main (int argc, char *argv[])

{

pthread_t thread[N]; int i, j, r;

void ‘status;

printf(“Main: initialize A matrix\n”);

for (i=0; i<N; i++){

sum[i] = 0;

for (j=0; j<N; j++){

A[i][j] = i*N + j + 1;

printf(“%4d “, A[i][j]);

}

printf(“\n”);

}

// create a mutex m

m = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));

pthread mutex init(m, NULL); // initialize mutex m

printf(“Main: create %d threads\n”, N); for(i=0; i<N; i++)     {

pthread_create(&thread[i], NULL, func, (void *)i);

}

printf(“Main: try to join with threads\n”);

for(i=0; i<N; i++)  {

pthread_join(thread[i], &status);

printf(“Main: joined with %d [%lu]: status=%d\n”, i, thread[i], (int)status);

}

printf(“Main: tatal = %d\n”, total);

pthread_mutex_destroy(m); // destroy mutex m

pthread_exit(NULL);

}

Figure 4.3 show the sample outputs of running the Example 4.3 program, which demonstrates mutex lock in Pthreads.

2. Deadlock Prevention

Mutexes use the locking protocol. If a thread can not acquire a mutex lock, it becomes blocked, waiting for the mutex to be unlocked before continuing. In any locking protocol, misuse of locks may lead to problems. The most well-known and prominent problem is deadlock. Deadlock is a condition, in which many execution entities mutually wait for one another so that none of them can proceed. To illustrate this, assume that a thread T1 has acquired a mutex lock m1 and tries to lock another mutex m2. Another thread T2 has acquired the mutex lock m2 and tries to lock the mutex m1, as shown in the following diagram.

In this case, T1 and T2 would mutually wait for each other forever, so they are in a deadlock due to crossed locking requests. Similar to no race conditions, deadlocks must not exist in concurrent programs. There are many ways to deal with possible deadlocks, which include deadlock prevention, deadlock avoidance, deadlock detection and recovery, etc. In real systems, the only practical way is deadlock prevention, which tries to prevent deadlocks from occurring when designing parallel algorithms. A simple way to prevent deadlock is to order the mutexes and ensure that every thread requests mutex locks only in a single direction, so that there are no loops in the request sequences.

However, it may not be possible to design every parallel algorithm with only uni-directional locking requests. In such cases, the conditional locking function, pthread_mutex_trylock(), may be used to prevent deadlocks. The trylock() function returns immediately with an error if the mutex is already locked. In that case, the calling thread may back-off by releasing some of the locks it already holds, allowing other threads to continue. In the above crossed locking example, we may redesign one of the threads, e.g. T1, as follows, which uses conditional locking and back-off to prevent the deadlock.

3. Condition Variables

Mutexes are used only as locks, which ensure threads to access shared data objects exclusively in Critical Regions. Condition variables provide a means for threads cooperation. Condition variables are always used in conjunction with mutex locks. This is no surprise because mutual exclusion is the basis of all synchronizing mechanisms. In Pthreads, condition variables are declared with the type pthread_cond_t, and must be initialized before using. Like mutexes, condition variables can also be initialized in two ways.

  • Statically, when it is declared, as in

pthread_cond_t con = PTHREAD_COND_INITIALIZER;

which defines a condition variable, con, and initializes it with default attributes.

  • Dynamically with the pthread_cond_init() function, which allows setting a condition variable with an attr parameter. For simplicity, we shall always use a NULL attr parameter for default attributes.

The following code segments show how to define a condition variable with an associated mutex lock.

pthread_mutex_t con_mutex; // mutex for a condition variable

pthread_cond_t con;    //a condition variable that relies on con_mutex

pthread_mutex_init(&con_mutex, NULL); // initialize mutex

pthread_cond_init(&con, NULL);    // initialize con

When using a condition variable, a thread must acquire the associated mutex lock first. Then it performs operations within the Critical Region of the mutex lock and releases the mutex lock, as in

pthread_mutex_lock(&con_mutex);

modify or test shared data objects

use condition variable con to wait or signal conditions

pthread_mutex_unlock(&con_mutex);

Within the CR of the mutex lock, threads may use condition variables to cooperate with one another via the following functions.

pthread_cond_wait(condition, mutex): This function blocks the calling thread until the specified condition is signaled. This routine should be called while mutex is locked. It will automatically release the mutex lock while the thread waits. After signal is received and a blocked thread is awakened, mutex will be automatically locked.

pthread_cond_signal(condition): This function is used to signal, i.e. to wake up or unblock, a thread which is waiting on the condition variable. It should be called after mutex is locked, and must unlock mutex in order for pthread_cond_wait () to complete.

pthread_cond_broadcast(condition) : This function unblocks all threads that are blocked on the condition variable. All unblocked threads will compete for the same mutex to access the condition variable. Their order of execution depends on threads scheduling.

We demonstrate thread cooperation using condition variables by an example.

4. Producer-Consumer Problem

Example 4.4: In this example, we shall implement a simplified version of the producer-consumer problem, which is also known as the bounded buffer problem, using threads and condition variables. The producer-consumer problem is usually defined with processes as executing entities, which can be regarded as threads in the current context. The problem is defined as follows.

A set of producer and consumer processes share a finite number of buffers. Each buffer contains a unique item at a time. Initially, all the buffers are empty. When a producer puts an item into an empty buffer, the buffer becomes full. When a consumer gets an item from a full buffer, the buffer becomes empty, etc. A producer must wait if there are no empty buffers. Similarly, a consumer must wait if there are no full buffers. Furthermore, waiting processes must be allowed to continue when their awaited events occur.

In the example program, we shall assume that each buffer holds an integer value. The shared global variables are defined as

// shared global variables

int buf[NBUF];        // circular buffers

int head, tail;       // indices

int data;            // number of full buffers

The buffers are used as a set of circular buffers. The index variables head is for putting an item into an empty buffer, and tail is for taking an item out of a full buffer. The variable data is the number of full buffers. To support cooperation between producer and consumer, we define a mutex and two condition variables.

pthread_mutex_t mutex;        // mutex lock

pthread_cond_t empty, full; // condition variables

where empty represents the condition of any empty buffers, and full represents the condition of any full buffers. When a producer finds there are no empty buffers, it waits on the empty condition variable, which is signaled whenever a consumer has consumed a full buffer. Likewise, when a consumer finds there are no full buffers, it waits on the full condition variable, which is signaled whenever a producer puts an item into an empty buffer.

The program starts with the main thread, which initializes the buffer control variables and the condition variables. After initialization, it creates a producer and a consumer thread and waits for the threads to join. The buffer size is set to NBUF=5 but the producer tries to put N=10 items into the buffer area, which would cause it to wait when all the buffers are full. Similarly, the consumer tries to get N=10 items from the buffers, which would cause it to wait when all the buffers are empty. In either case, a waiting thread is signaled up by another thread when the awaited conditions are met. Thus, the two threads cooperate with each other through the condition variables. The following shows the program code of Example Program C4.4, which implements a simplified version of the producer- consumer problem with only one producer and one consumer.

/* C4.4.c: producer-consumer by threads with condition variables */

#include <stdio.h>

#include <stdlib.h>

#include <pthread.h>

#define NBUF    5

#define N       10

// shared global variables

int buf[NBUF];        // circular buffers

int head, tail;       // indices

int data;            // number of full buffers

pthread_mutex_t mutex;        // mutex lock

pthread_cond_t empty, full; // condition variables

int init()

{

head = tail = data = 0;

pthread_mutex_init(&mutex, NULL);

pthread_cond_init(&fullBuf, NULL);

pthread_cond_init(&emptyBuf, NULL);

}

void *producer()

{

int i;

pthread_t me = pthread_self();

for (i=0; i<N; i++){ // try to put N items into buf[ ]

pthread_mutex_lock(&mutex); // lock mutex

if (data == NBUF){

printf (“producer %lu: all bufs FULL: wait\n”, me);

pthread_cond_wait(&empty, &mutex); // wait

}

buf[head++] = i+1;              // item = 1,2,..,N

head %= NBUF;                   // circular bufs

data++;                        // inc data by 1

printf(“producer %lu: data=%d value=%d\n”, me, bp->data, i + 1);

pthread_mutex_unlock(&mutex); // unlock mutex

pthread_cond_signal(&full); // unblock a consumer, if any

}

printf(“producer %lu: exit\n”, me);

}

void *consumer()

{

int i, c;

pthread_t me = pthread_self(); for (i=0; i<N; i++)       {

pthread_mutex_lock(&mutex);       //  lock mutex

if (data == 0)  {

printf (“consumer %lu: all bufs EMPTY: wait\n”, me);

pthread_cond_wait(&full, &mutex); // wait

}

c = buf[tail++];                 //  get an item

tail %= NBUF;

data–;                        // dec data by 1

printf(“consumer %lu: value=%d\n”, me, c);

pthread_mutex_unlock(&mutex);   //     unlock mutex

pthread_cond_signal(&empty);      //  unblock a producer, if any

}

printf(“consumer %lu: exit\n”, me);

}

int main ()

{

pthread_t pro, con; init();

printf(“main: create producer and consumer threads\n”);

pthread_create(&pro, NULL, producer, NULL);

pthread_create(&con, NULL, consumer, NULL);

printf(“main: join with threads\n”);

pthread_join(pro, NULL); pthread_join(con, NULL);

printf(“main: exit\n”);

}

Figure 4.4 shows the output of running the producer-consumer example program.

5. Semaphores

Semaphores are general mechanisms for process synchronization. A (counting) semaphore is a data structure

struct sem{

int value;             // semaphore (counter) value;

struct process *queue // a queue of blocked processes

}s;

Before using, a semaphore must be initialized with an initial value and an empty waiting queue. Regardless of the hardware platform, i.e. whether on single CPU systems or multiprocessing systems, the low level implementation of semaphores guarantees that each semaphore can only be operated by one executing entity at a time and operations on semaphores are atomic (indivisible) or primitive from the viewpoint of executing entities. The reader may ignore such details here and focus on the high level operations on semaphores and their usage as a mechanism for process synchronization. The most well- known operations on semaphores are P and V (Dijkstra 1965), which are defined as follows.

where BLOCK(s) blocks the calling process in the semaphore’s waiting queue, and SIGNAL (s) unblocks a process from the semaphore’s waiting queue.

Semaphores are not part of the original Pthreads standard. However, most Pthreads now support semaphores of POSIX 1003.1b. POSIX semaphores include the following functions

int seminit(sem, value) : initialize sem with an initial value

int semwait(sem) : similar to P(sem)

int sem_post(sem)        : similar to V(sem)

The major difference between semaphores and condition variables is that the former includes a counter, manipulate the counter, test the counter value to make decisions, etc. all in the Critical Region of atomic or primitive operations, whereas the latter requires a specific mutex lock to enforce the Critical Region. In Pthreads, mutexes are strictly for locking and condition variables are for threads cooperation. In contrast, counting semaphores with initial value 1 can be used as locks. Semaphores with other initial values can be used for cooperation. Therefore, semaphores are more general and flexible than condition variables. The following example illustrates the advantages of semaphores over condition variables.

Example 4.5: The producer-consumer problem can be solved more efficiently by using semaphores. In this example, empty=N and full=0 are semaphores for producers and consumers to cooperate with one another, and mutex =1 is a lock semaphore for processes to access shared buffers one at a time in a Critical Region. The following shows the pseudo code of the producer-consumer problem using semaphores.

6. Barriers

The threads join operation allows a thread (usually the main thread) to wait for the termination of other threads. After all awaited threads have terminated, the main thread may create new threads to continue executing the next parts of a parallel program. This requires the overhead of creating new threads. There are situations in which it would be better to keep the threads alive but require them not to go on until all of them have reached a prescribed point of synchronization. In Pthreads, the mechanism is the barrier, along with a set of barrier functions. First, the main thread creates a barrier object

pthread_barrier_t barrier;

and calls

pthread_barrier_init(&barrier NULL, nthreads);

to initialize it with the number of threads that are to be synchronized at the barrier. Then the main thread creates working threads to perform tasks. The working threads use

pthread_barrier_wait( &barrier)

to wait at the barrier until the specified number of threads have reached the barrier. When the last thread arrives at the barrier, all the threads resume execution again. In this case, a barrier acts as a rendezvous point, rather than as a graveyard, of threads. We illustrate the use of barriers by an example.

7. Solve System of Linear Equations by Concurrent Threads

We demonstrate applications of concurrent threads and threads join and barrier operations by an example.

Example 4.6: The example is to solve a system of linear equations by concurrent threads. Assume AX = B is a system of linear equations, where A is an N x N matrix of real numbers, X is a column vector of N unknowns and B is column vector of constants. The problem is to compute the solution vector X. The most well known algorithm for solving systems of linear equations is Gauss elimina­tion. The algorithm consists of 2 major steps; row reduction, which reduces the combined matrix [A|B] to an upper-triangular form, followed by back substitution, which computes the solution vector X. In the row-reduction steps, partial pivoting is a scheme which ensures the leading element of the row used to reduce other rows has the maximum absolute value. Partial pivoting helps improve the accuracy of the numerical computations. The following shows a Gauss elimination algorithm with partial pivoting.

/******** Gauss Elimination Algorithm with Partial Pivoting *******/

Step 1: Row reduction: reduce [A|B] to upper triangular form

Step 2: Back Substitution: compute xN-1, xN-2, … , x0 in that order

The Gauss elimination algorithm can be parallelized as follows. The algorithm starts with a main thread, which creates N working threads to execute the ge(thread_id) function and waits for all the working threads to join. The threads function ge() implements the row reduction step of the Gauss elimination algorithm. In the ge() function, for each iteration of row = 0 to N-2, the thread with thread_ID=i does the partial pivoting on a corresponding row i. All other threads wait at a barrier (1) until partial pivoting is complete. Then each working thread does row reduction on a unique row equal to its ID number. Since all working threads must have finished the current row reductions before iteration on the next row can start, so they wait at another barrier (2). After reducing the matrix [A|B] to upper triangular form, the final step is to compute the solutions x[N-i], for i=1 to N in that order, which is inherently sequential. The main thread must wait until all the working threads have ended before starting the back substitution. This is achieved by join operations by the main thread. The following shows the complete code of the Example Program C4.5.

/** C4.5.c: Gauss Elimination with Partial Pivoting **/

#include <stdio.h>

#include <stdlib.h>

#include <math.h>

#include <pthread.h>

#define N 4 double A[N][N+1];

pthread_barrier_t barrier;

int print_matrix()

{

int i, j;

printf(“———————————– \n”);

for(i=0; i<N; i++){

for(j=0;j<N+1;j++)

printf(“%6.2f “,    A[i][j]);

printf(“\n”);

}

}

void *ge(void *arg) // threads function: Gauss elimination

{

int i, j, prow;

int myid = (int)arg;

double temp, factor;

for(i=0; i<N-1; i++){

if (i == myid){

printf(“partial pivoting by thread %d on row %d:    “, myid, i);

temp = 0.0; prow = i;

for (j=i; j<=N; j++){

if (fabs(A[j][i]) > temp){

temp = fabs(A[j][i]);

prow = j;

}

}

printf(“pivot_row=%d pivot=%6.2f\n”, prow, A[prow][i]);

if (prow != i){   // swap rows

for (j=i; j<N+1; j++){

temp = A[i][j];

A[i][j] = A[prow][j];

A[prow][j] = temp;

}

}

}

// wait for partial pivoting done

pthread_barrier_wait(&barrier);

for(j=i+1; j<N; j++){ if (j == myid){

printf(“thread %d do row %d\n”, myid, j);

factor = A[j][i]/A[i][i];

for (k=i+1; k<=N; k++)

A[j][k] -= A[i][k]*factor;

A[j][i] = 0.0;

}

}

// wait for current row reductions to finish

pthread_barrier_wait(&barrier);

if (i == myid)

print_matrix();

}

}

int main(int argc, char *argv[])

{

int i, j; double sum; pthread_t threads[N];

printf(“main: initialize matrix A[N][N+1] as [A|B]\n”);

for (i=0; i<N; i++) for (j=0; j<N; j++)

A[i][j] = 1.0; for (i=0; i<N; i++)

A[i][N-i-1] = 1.0*N; for (i=0; i<N; i++){

A[i][N] = 2.0*N – 1;

}

print_matrix(); // show initial matrix [A|B]

pthread_barrier_init(&barrier, NULL, N); // set up barrier

printf(“main: create N=%d working threads\n”, N);

for (i=0; i<N; i++){

pthread_create(&threads[i], NULL, ge, (void *)i);

printf(“main: wait for all %d working threads to join\n”, N);

for (i=0; i<N; i++){

pthread_join(threads[i], NULL);

}

printf(“main: back substitution :   “);

for (i=N-1; i>=0; i–){

sum = 0.0;

for (j=i+1; j<N; j++)

sum += A[i][j]*A[j][N];

A[i][N] = (A[i][N]- sum)/A[i][i];

}

// print solution printf(“The solution is :\n”);

for(i=0; i<N; i++){

printf(“%6.2f “, A[i][N]);

}

printf(“\n”);

}

Figure 4.5 shows the sample outputs of running the Example 4.6 program, which solves a system of equations with N=4 unknowns by Gauss elimination with partial pivoting.

8. Threads in Linux

Unlike many other operating systems, Linux does not distinguish processes from threads. To the Linux kernel, a thread is merely a process that shares certain resources with other processes. In Linux both processes and threads are created by the clone() system call, which has the prototype

int clone(int (*fn)(void *), void *child_stack, int flags, void *arg)

As can be seen, clone() is more like a thread creation function. It creates a child process to execute a function fn(arg) with a child_stack. The flags field specifies the resources to be shared by the parent and child, which includes

CLONE_VM: parent and child share address space

CLONE_FS: parent and child share file system information, e.g. root.

CWD CLONE_FILES: parent and child share opened files

CLONE_SIGHAND: parent and child share signal handlers and blocked signals

If any of the flags is specified, both processes share exactly the SAME resource, not a separate copy of the resource. If a flag is not specified, the child process usually gets a separate copy of the resource. In this case, changes made to a resource by one process do not affect the resource of the other process. The Linux kernel retains fork() as a system call but it may be implemented as a library wrapper that calls clone() with appropriate flags. An average user does not have to worry about such details. It suffices to say that Linux has an efficient way of supporting threads. Moreover, most current Linux kernels support Symmetric Multiprocessing (SMP). In such Linux systems, processes (threads) are scheduled to run on multiprocessors in parallel.

Source: Wang K.C. (2018), Systems Programming in Unix/Linux, Springer; 1st ed. 2018 edition.

Leave a Reply

Your email address will not be published. Required fields are marked *