Full Example & Dynamic Condition Variables

 

← Chapter 30 Index

30.2.4 / 30.2.5 — Full Example & Dynamic Condition Variables

thread_multijoin: joining any terminated thread + pthread_cond_init/destroy

Key Terms in This File

thread_multijoin TS_ALIVE TS_TERMINATED TS_JOINED pthread_cond_init pthread_cond_destroy pthread_condattr_t dynamic allocation thread state array

30.2.4 — Example: Joining Any Terminated Thread

POSIX provides pthread_join() to wait for a specific thread to finish. But what if you want to join whichever thread finishes first, like waitpid(-1, ...) does for child processes?

There is no pthread_join_any() in POSIX. The TLPI example program thread_multijoin shows how to build this functionality yourself using a mutex + condition variable.

The idea: each thread, when it finishes, sets its own status to TS_TERMINATED and signals a shared condition variable. A joiner thread sleeps on that condition variable and joins whichever thread is in the TS_TERMINATED state.

Thread State Design

State Value Meaning
TS_ALIVE 1 Thread is currently running
TS_TERMINATED 2 Thread has finished but not yet joined
TS_JOINED 3 Thread has been joined; its resources are freed

Code Example 1: thread_multijoin (from TLPI Section 30.2.4)

/*
 * thread_multijoin.c — Join any terminated thread using condvar
 * Based on TLPI Listing 30-4
 *
 * Compile: gcc -o thread_multijoin thread_multijoin.c -lpthread
 */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>

/* Thread state constants */
#define TS_ALIVE      1
#define TS_TERMINATED 2
#define TS_JOINED     3

/* Per-thread information */
typedef struct {
    pthread_t  tid;         /* Thread ID */
    int        state;       /* TS_ALIVE / TS_TERMINATED / TS_JOINED */
    int        sleep_time;  /* How long this thread sleeps */
} ThreadInfo;

/* Shared state protected by this mutex/condvar pair */
static pthread_mutex_t mtx  = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  cond = PTHREAD_COND_INITIALIZER;

static int tot_threads;        /* Total number of threads created */
static ThreadInfo *thread_array; /* Array of ThreadInfo structs */

/* Each worker thread sleeps for its sleep_time then terminates.
 * Before exiting it sets its state to TS_TERMINATED and signals. */
static void *thread_func(void *arg)
{
    ThreadInfo *ti = (ThreadInfo *) arg;

    sleep(ti->sleep_time);  /* Simulate work */

    /* Update our own state to TERMINATED, then signal */
    pthread_mutex_lock(&mtx);
    ti->state = TS_TERMINATED;
    printf("Thread %d (sleep=%ds): set state to TS_TERMINATED\n",
           (int)(ti - thread_array), ti->sleep_time);
    pthread_mutex_unlock(&mtx);

    pthread_cond_signal(&cond);  /* Wake up the joiner */
    return NULL;
}

/* Joiner: waits for any thread to reach TS_TERMINATED, then joins it */
static void join_any_terminated(void)
{
    int num_unjoined = tot_threads;

    while (num_unjoined > 0) {
        pthread_mutex_lock(&mtx);

        /* Wait until at least one thread is TS_TERMINATED */
        while (1) {
            /* Scan for a terminated-but-unjoined thread */
            int found = -1;
            for (int i = 0; i < tot_threads; i++) {
                if (thread_array[i].state == TS_TERMINATED) {
                    found = i;
                    break;
                }
            }

            if (found != -1) {
                /* Found one — mark as joined before releasing mutex */
                thread_array[found].state = TS_JOINED;
                pthread_mutex_unlock(&mtx);

                /* Now join it (outside the lock — join may block briefly) */
                void *res;
                int s = pthread_join(thread_array[found].tid, &res);
                if (s != 0) {
                    fprintf(stderr, "pthread_join error for thread %d\n",
                            found);
                } else {
                    printf("Joiner: joined thread %d\n", found);
                }
                num_unjoined--;
                break;  /* Back to outer while loop */
            }

            /* No terminated thread found — sleep until one signals */
            printf("Joiner: waiting (num_unjoined=%d)\n", num_unjoined);
            pthread_cond_wait(&cond, &mtx);
            printf("Joiner: woke up\n");
        }
    }

    printf("Joiner: all %d threads joined\n", tot_threads);
}

int main(int argc, char *argv[])
{
    /* Usage: ./thread_multijoin sleep1 sleep2 sleep3 ... */
    if (argc < 2) {
        fprintf(stderr, "Usage: %s sleep-time...\n", argv[0]);
        fprintf(stderr, "  e.g.: %s 3 1 4 2\n", argv[0]);
        return 1;
    }

    tot_threads = argc - 1;
    thread_array = calloc(tot_threads, sizeof(ThreadInfo));
    if (thread_array == NULL) { perror("calloc"); return 1; }

    /* Create all threads */
    for (int i = 0; i < tot_threads; i++) {
        thread_array[i].sleep_time = atoi(argv[i + 1]);
        thread_array[i].state     = TS_ALIVE;

        int s = pthread_create(&thread_array[i].tid, NULL,
                               thread_func, &thread_array[i]);
        if (s != 0) {
            fprintf(stderr, "pthread_create failed for thread %d\n", i);
            return 1;
        }
        printf("Created thread %d (sleep=%ds)\n",
               i, thread_array[i].sleep_time);
    }

    join_any_terminated();  /* Join all threads as they finish */

    free(thread_array);
    return 0;
}

/*
 * Example run: ./thread_multijoin 3 1 4 2
 * Output (approximately):
 *   Created thread 0 (sleep=3s)
 *   Created thread 1 (sleep=1s)
 *   Created thread 2 (sleep=4s)
 *   Created thread 3 (sleep=2s)
 *   Joiner: waiting (num_unjoined=4)
 *   Thread 1 (sleep=1s): set state to TS_TERMINATED
 *   Joiner: woke up
 *   Joiner: joined thread 1
 *   Thread 3 (sleep=2s): set state to TS_TERMINATED
 *   Joiner: joined thread 3
 *   Thread 0 (sleep=3s): set state to TS_TERMINATED
 *   Joiner: joined thread 0
 *   Thread 2 (sleep=4s): set state to TS_TERMINATED
 *   Joiner: joined thread 2
 *   Joiner: all 4 threads joined
 */
Key design decisions in this example:
1. The worker sets state = TS_TERMINATED while holding the mutex, then signals — guaranteeing the joiner sees the updated state.
2. The joiner sets state = TS_JOINED while holding the mutex, before releasing the lock — preventing another thread from trying to join the same terminated thread.
3. pthread_join() is called outside the mutex lock — it should not block long, but keeping it outside the critical section is good practice.

30.2.5 — Dynamically Allocated Condition Variables

Just as with mutexes, you need pthread_cond_init() when you cannot use the static initializer — specifically when the condition variable is heap-allocated or needs non-default attributes.

#include <pthread.h>

int pthread_cond_init(pthread_cond_t *cond,
const pthread_condattr_t *attr);

int pthread_cond_destroy(pthread_cond_t *cond);

/* Both return 0 on success, or a positive error number on error */

Rules for pthread_cond_destroy() — same principles as mutex destroy:

  • Only destroy a condition variable that no thread is currently waiting on.
  • Destroy heap-allocated condition variables before freeing their memory.
  • Destroy stack-allocated condition variables before the function returns.
  • After destroying, a condition variable can be reinitialized with pthread_cond_init().
  • You do NOT need to call pthread_cond_destroy() on a condition variable initialized with PTHREAD_COND_INITIALIZER.

Code Example 2: Dynamic Condition Variable in a Heap-Allocated Struct

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

/* A work queue — each queue has its own mutex and condition variable */
typedef struct {
    int        *items;
    int         head, tail, count, capacity;
    pthread_mutex_t lock;
    pthread_cond_t  not_empty;  /* Signaled when items are added */
    pthread_cond_t  not_full;   /* Signaled when items are consumed */
} WorkQueue;

/* Create a queue on the heap with dynamic mutex/condvar init */
WorkQueue *queue_create(int capacity)
{
    WorkQueue *q = malloc(sizeof(WorkQueue));
    if (!q) return NULL;

    q->items    = malloc(capacity * sizeof(int));
    q->head     = q->tail = q->count = 0;
    q->capacity = capacity;

    /* Must use pthread_mutex_init / pthread_cond_init for heap objects */
    if (pthread_mutex_init(&q->lock, NULL) != 0) {
        free(q->items); free(q); return NULL;
    }
    if (pthread_cond_init(&q->not_empty, NULL) != 0) {
        pthread_mutex_destroy(&q->lock);
        free(q->items); free(q); return NULL;
    }
    if (pthread_cond_init(&q->not_full, NULL) != 0) {
        pthread_cond_destroy(&q->not_empty);
        pthread_mutex_destroy(&q->lock);
        free(q->items); free(q); return NULL;
    }
    return q;
}

/* Enqueue an item — blocks if queue is full */
void queue_push(WorkQueue *q, int item)
{
    pthread_mutex_lock(&q->lock);
    while (q->count == q->capacity)
        pthread_cond_wait(&q->not_full, &q->lock);

    q->items[q->tail] = item;
    q->tail = (q->tail + 1) % q->capacity;
    q->count++;
    printf("Pushed %d (count=%d)\n", item, q->count);
    pthread_mutex_unlock(&q->lock);

    pthread_cond_signal(&q->not_empty);
}

/* Dequeue an item — blocks if queue is empty */
int queue_pop(WorkQueue *q)
{
    pthread_mutex_lock(&q->lock);
    while (q->count == 0)
        pthread_cond_wait(&q->not_empty, &q->lock);

    int item = q->items[q->head];
    q->head = (q->head + 1) % q->capacity;
    q->count--;
    printf("Popped %d (count=%d)\n", item, q->count);
    pthread_mutex_unlock(&q->lock);

    pthread_cond_signal(&q->not_full);
    return item;
}

/* Destroy queue — destroy condvars and mutex BEFORE freeing memory */
void queue_destroy(WorkQueue *q)
{
    pthread_cond_destroy(&q->not_full);   /* 1. destroy condvars */
    pthread_cond_destroy(&q->not_empty);
    pthread_mutex_destroy(&q->lock);      /* 2. destroy mutex */
    free(q->items);                        /* 3. free memory */
    free(q);
}

/* --- Test --- */
static WorkQueue *gq;

void *prod(void *arg) {
    for (int i = 1; i <= 8; i++) {
        queue_push(gq, i);
        struct timespec ts = {0, 100000000};
        nanosleep(&ts, NULL);
    }
    return NULL;
}

void *cons(void *arg) {
    for (int i = 0; i < 8; i++) {
        queue_pop(gq);
        struct timespec ts = {0, 200000000};
        nanosleep(&ts, NULL);
    }
    return NULL;
}

int main(void)
{
    gq = queue_create(3);  /* Queue capacity = 3 */
    pthread_t p, c;
    pthread_create(&p, NULL, prod, NULL);
    pthread_create(&c, NULL, cons, NULL);
    pthread_join(p, NULL);
    pthread_join(c, NULL);
    queue_destroy(gq);
    printf("Done\n");
    return 0;
}

/*
 * Compile: gcc -o work_queue work_queue.c -lpthread
 *
 * This demonstrates a bounded producer-consumer queue with two
 * condition variables: not_empty (consumer waits on this) and
 * not_full (producer waits on this).
 */

Chapter 30 Summary

Mutexes ensure only one thread at a time accesses a shared variable (mutual exclusion). Key points:

  • Static init: PTHREAD_MUTEX_INITIALIZER. Dynamic init: pthread_mutex_init().
  • Always destroy dynamically created mutexes with pthread_mutex_destroy().
  • All threads must follow the same lock ordering to avoid deadlocks.
  • Keep critical sections short to minimize contention.
  • Mutex types: NORMAL (default/fastest), ERRORCHECK (debugging), RECURSIVE (re-entrant).

Condition variables allow threads to efficiently wait for a state change, without busy-waiting. Key points:

  • Always used with a paired mutex. Static init: PTHREAD_COND_INITIALIZER.
  • pthread_cond_wait() atomically unlocks the mutex and sleeps; re-locks on wakeup.
  • Always use while (not if) to recheck the predicate after waking.
  • signal() wakes one waiter; broadcast() wakes all waiters.
  • Spurious wakeups are permitted by POSIX — the while loop handles them.

Interview Questions — Example Program & Dynamic Condition Variables

Q1. Why doesn’t POSIX provide a pthread_join_any() function? Joining any terminated thread can be built from the existing primitives (mutex + condition variable), as shown in the thread_multijoin example. Adding a dedicated syscall would increase kernel complexity for a feature that can be efficiently emulated in user space.
Q2. In the thread_multijoin example, why is the state set to TS_JOINED before calling pthread_join()? To prevent a race condition where two joiner threads (if there were more than one) could both see the same thread in TS_TERMINATED state and both try to join it. pthread_join() on an already-joined thread is undefined behavior. Marking it TS_JOINED while holding the mutex — before releasing the lock — ensures only one joiner can claim each terminated thread.
Q3. When must you use pthread_cond_init() instead of PTHREAD_COND_INITIALIZER? When the condition variable is heap-allocated (e.g., inside a struct created with malloc), when it is a local variable on the stack, or when you need non-default attributes (e.g., using CLOCK_MONOTONIC for timedwait instead of the default CLOCK_REALTIME).
Q4. What must you do before destroying a condition variable? Ensure no thread is currently blocked in pthread_cond_wait() on it. Destroying a condition variable while threads are waiting on it produces undefined behavior. In practice, this means all consumers/waiters must be done before cleanup.
Q5. In a bounded buffer (producer-consumer with capacity limit), how many condition variables do you need? Two: one for “not empty” (consumers wait when buffer is empty) and one for “not full” (producers wait when buffer is full). Using two separate condition variables avoids waking the wrong type of thread — e.g., a signal on not_empty only wakes consumers, not a waiting producer.
Q6. Can you use a single condition variable for both “not empty” and “not full” conditions? Yes, but then you must use broadcast() instead of signal(), because you cannot know whether a consumer or a producer is waiting. This is less efficient — broadcast wakes all waiters, and those whose condition is not yet met go back to sleep. Using two separate condition variables with signal() is more efficient.
Q7. What are the complete steps to properly clean up a heap-allocated struct that contains a mutex and condition variable? (1) pthread_cond_destroy() on each condition variable. (2) pthread_mutex_destroy() on the mutex. (3) free() the memory. This order ensures the synchronization objects are cleanly released before the memory holding them is freed.
Q8. What is the difference between static and dynamic initialization of a condition variable in terms of cleanup? A statically initialized condition variable (PTHREAD_COND_INITIALIZER) does not need to be destroyed — calling pthread_cond_destroy() on it is optional and safe to skip. A dynamically initialized condition variable (pthread_cond_init()) must always be destroyed with pthread_cond_destroy() to free any resources allocated internally.

Leave a Reply

Your email address will not be published. Required fields are marked *