Mutexes and Condition Variables

Mutexes and condition variables are traditional synchronization primitives for protecting shared data and coordinating work units. They are essential for thread-safe data structures and complex coordination patterns.

Key Concepts

Mutexes (Mutual Exclusion)

A mutex ensures only one work unit accesses a critical section at a time. - ABT_mutex_lock(): Acquire exclusive access (blocks if held by another) - ABT_mutex_unlock(): Release access - Protected region: Code between lock and unlock

Condition Variables

Condition variables allow work units to wait for specific conditions. - ABT_cond_wait(mutex): Release mutex, wait for signal, reacquire mutex - ABT_cond_signal(): Wake one waiter - ABT_cond_broadcast(): Wake all waiters

Critical Pattern:
ABT_mutex_lock(mutex);
while (!condition) {
    ABT_cond_wait(cond, mutex);  /* Atomically unlocks mutex and waits */
}
/* Condition is now true, mutex is locked */
ABT_mutex_unlock(mutex);

Example: Producer-Consumer Pattern

The classic producer-consumer pattern uses mutex and condition variables for coordination:

  1/*
  2 * Producer-consumer pattern with mutex and condition variable
  3 */
  4
  5#include <stdio.h>
  6#include <abt.h>
  7
  8#define NUM_PRODUCERS 2
  9#define NUM_CONSUMERS 2
 10#define ITEMS_PER_PRODUCER 5
 11#define BUFFER_SIZE 3
 12
 13typedef struct {
 14    int buffer[BUFFER_SIZE];
 15    int count;
 16    int in;
 17    int out;
 18    ABT_mutex mutex;
 19    ABT_cond not_full;
 20    ABT_cond not_empty;
 21} shared_buffer_t;
 22
 23typedef struct {
 24    int id;
 25    shared_buffer_t *shared;
 26} worker_arg_t;
 27
 28void producer(void *arg)
 29{
 30    worker_arg_t *worker = (worker_arg_t *)arg;
 31    shared_buffer_t *buf = worker->shared;
 32
 33    for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
 34        int item = worker->id * 100 + i;
 35
 36        ABT_mutex_lock(buf->mutex);
 37
 38        /* Wait while buffer is full */
 39        while (buf->count == BUFFER_SIZE) {
 40            printf("Producer %d: buffer full, waiting...\n", worker->id);
 41            ABT_cond_wait(buf->not_full, buf->mutex);
 42        }
 43
 44        /* Add item to buffer */
 45        buf->buffer[buf->in] = item;
 46        buf->in = (buf->in + 1) % BUFFER_SIZE;
 47        buf->count++;
 48        printf("Producer %d: produced item %d (buffer: %d/%d)\n",
 49               worker->id, item, buf->count, BUFFER_SIZE);
 50
 51        /* Signal consumers */
 52        ABT_cond_signal(buf->not_empty);
 53
 54        ABT_mutex_unlock(buf->mutex);
 55    }
 56}
 57
 58void consumer(void *arg)
 59{
 60    worker_arg_t *worker = (worker_arg_t *)arg;
 61    shared_buffer_t *buf = worker->shared;
 62    int num_items = (NUM_PRODUCERS * ITEMS_PER_PRODUCER) / NUM_CONSUMERS;
 63
 64    for (int i = 0; i < num_items; i++) {
 65        ABT_mutex_lock(buf->mutex);
 66
 67        /* Wait while buffer is empty */
 68        while (buf->count == 0) {
 69            printf("Consumer %d: buffer empty, waiting...\n", worker->id);
 70            ABT_cond_wait(buf->not_empty, buf->mutex);
 71        }
 72
 73        /* Remove item from buffer */
 74        int item = buf->buffer[buf->out];
 75        buf->out = (buf->out + 1) % BUFFER_SIZE;
 76        buf->count--;
 77        printf("  Consumer %d: consumed item %d (buffer: %d/%d)\n",
 78               worker->id, item, buf->count, BUFFER_SIZE);
 79
 80        /* Signal producers */
 81        ABT_cond_signal(buf->not_full);
 82
 83        ABT_mutex_unlock(buf->mutex);
 84    }
 85}
 86
 87int main(int argc, char **argv)
 88{
 89    ABT_xstream xstream;
 90    ABT_pool pool;
 91    ABT_thread producers[NUM_PRODUCERS];
 92    ABT_thread consumers[NUM_CONSUMERS];
 93    worker_arg_t prod_args[NUM_PRODUCERS];
 94    worker_arg_t cons_args[NUM_CONSUMERS];
 95    shared_buffer_t shared;
 96
 97    ABT_init(argc, argv);
 98
 99    printf("=== Producer-Consumer Pattern ===\n");
100    printf("Producers: %d, Consumers: %d, Buffer: %d\n\n",
101           NUM_PRODUCERS, NUM_CONSUMERS, BUFFER_SIZE);
102
103    /* Initialize shared buffer */
104    shared.count = 0;
105    shared.in = 0;
106    shared.out = 0;
107    ABT_mutex_create(&shared.mutex);
108    ABT_cond_create(&shared.not_full);
109    ABT_cond_create(&shared.not_empty);
110
111    ABT_xstream_self(&xstream);
112    ABT_xstream_get_main_pools(xstream, 1, &pool);
113
114    /* Create producers */
115    for (int i = 0; i < NUM_PRODUCERS; i++) {
116        prod_args[i].id = i;
117        prod_args[i].shared = &shared;
118        ABT_thread_create(pool, producer, &prod_args[i],
119                          ABT_THREAD_ATTR_NULL, &producers[i]);
120    }
121
122    /* Create consumers */
123    for (int i = 0; i < NUM_CONSUMERS; i++) {
124        cons_args[i].id = i;
125        cons_args[i].shared = &shared;
126        ABT_thread_create(pool, consumer, &cons_args[i],
127                          ABT_THREAD_ATTR_NULL, &consumers[i]);
128    }
129
130    /* Wait for all */
131    for (int i = 0; i < NUM_PRODUCERS; i++) {
132        ABT_thread_free(&producers[i]);
133    }
134    for (int i = 0; i < NUM_CONSUMERS; i++) {
135        ABT_thread_free(&consumers[i]);
136    }
137
138    /* Cleanup */
139    ABT_cond_free(&shared.not_empty);
140    ABT_cond_free(&shared.not_full);
141    ABT_mutex_free(&shared.mutex);
142
143    printf("\nAll items produced and consumed successfully\n");
144
145    ABT_finalize();
146    return 0;
147}

Key Points

Protecting Shared State

All accesses to the shared buffer are protected by mutex lock/unlock.

Waiting on Conditions
while (buf->count == BUFFER_SIZE) {
    ABT_cond_wait(buf->not_full, buf->mutex);
}

Producer waits while buffer is full. ABT_cond_wait() atomically releases the mutex and blocks. When signaled, it reacquires the mutex before returning.

Signaling

After producing/consuming, signal the opposite side that the condition changed.

Optimizing memory

ABT_mutex is an opaque pointer to some heap-allocated object that must be created with ABT_mutex_create`. To avoid such an indirection, you may use an ABT_mutex_memory instead, initialized with ABT_MUTEX_INITIALIZER. This type is a placeholder of the same size as a mutex’ underlying implementation, and can be converted into an ABT_mutex by using ABT_MUTEX_MEMORY_GET_HANDLE(&mutex_memory). This eliminates the need for ABT_mutex_create/free and their corresponding heap allocation/deallocation.

The same applies to ABT_cond_memory, using ABT_COND_INITIALIZER and ABT_COND_MEMORY_GET_HANDLE.

Important

An ABT_mutex_memory should not be moved in memory. An ABT_mutex can be moved (it’s a pointer to an ABT_mutex_memory, which doesn’t move).

Example: Thread-Safe Queue

Building reusable thread-safe data structures with mutexes, using static initialization (ABT_mutex_memory instead of ABT_mutex):

  1/*
  2 * Thread-safe queue implementation with mutexes using static initialization
  3 * Demonstrates ABT_mutex_memory to avoid heap allocation
  4 */
  5
  6#include <stdio.h>
  7#include <stdlib.h>
  8#include <string.h>
  9#include <abt.h>
 10
 11#define QUEUE_SIZE 10
 12#define NUM_WORKERS 4
 13#define ITEMS_PER_WORKER 5
 14
 15typedef struct {
 16    int *data;
 17    int capacity;
 18    int size;
 19    int head;
 20    int tail;
 21    ABT_mutex_memory mutex_mem;  /* Static mutex memory */
 22} thread_safe_queue_t;
 23
 24void queue_init(thread_safe_queue_t *q, int capacity)
 25{
 26    q->data = malloc(sizeof(int) * capacity);
 27    q->capacity = capacity;
 28    q->size = 0;
 29    q->head = 0;
 30    q->tail = 0;
 31    /* Initialize mutex memory
 32     * This is equivalent to setting it to ABT_MUTEX_INITIALIZER */
 33    memset(&q->mutex_mem, 0, sizeof(q->mutex_mem));
 34}
 35
 36int queue_push(thread_safe_queue_t *q, int value)
 37{
 38    /* Convert mutex_memory to ABT_mutex handle */
 39    ABT_mutex mutex = ABT_MUTEX_MEMORY_GET_HANDLE(&q->mutex_mem);
 40
 41    ABT_mutex_lock(mutex);
 42
 43    if (q->size == q->capacity) {
 44        ABT_mutex_unlock(mutex);
 45        return -1;  /* Queue full */
 46    }
 47
 48    q->data[q->tail] = value;
 49    q->tail = (q->tail + 1) % q->capacity;
 50    q->size++;
 51
 52    ABT_mutex_unlock(mutex);
 53    return 0;
 54}
 55
 56int queue_pop(thread_safe_queue_t *q, int *value)
 57{
 58    /* Convert mutex_memory to ABT_mutex handle */
 59    ABT_mutex mutex = ABT_MUTEX_MEMORY_GET_HANDLE(&q->mutex_mem);
 60
 61    ABT_mutex_lock(mutex);
 62
 63    if (q->size == 0) {
 64        ABT_mutex_unlock(mutex);
 65        return -1;  /* Queue empty */
 66    }
 67
 68    *value = q->data[q->head];
 69    q->head = (q->head + 1) % q->capacity;
 70    q->size--;
 71
 72    ABT_mutex_unlock(mutex);
 73    return 0;
 74}
 75
 76void queue_destroy(thread_safe_queue_t *q)
 77{
 78    /* No need to free mutex_mem - no heap allocation */
 79    free(q->data);
 80}
 81
 82typedef struct {
 83    int worker_id;
 84    thread_safe_queue_t *queue;
 85} worker_arg_t;
 86
 87void worker_thread(void *arg)
 88{
 89    worker_arg_t *worker = (worker_arg_t *)arg;
 90
 91    /* Each worker pushes some items */
 92    for (int i = 0; i < ITEMS_PER_WORKER; i++) {
 93        int item = worker->worker_id * 100 + i;
 94
 95        while (queue_push(worker->queue, item) != 0) {
 96            /* Queue full, yield and retry */
 97            ABT_thread_yield();
 98        }
 99
100        printf("Worker %d: pushed %d\n", worker->worker_id, item);
101    }
102
103    /* Then pop some items */
104    for (int i = 0; i < ITEMS_PER_WORKER; i++) {
105        int item;
106
107        while (queue_pop(worker->queue, &item) != 0) {
108            /* Queue empty, yield and retry */
109            ABT_thread_yield();
110        }
111
112        printf("  Worker %d: popped %d\n", worker->worker_id, item);
113    }
114}
115
116int main(int argc, char **argv)
117{
118    ABT_xstream xstream;
119    ABT_pool pool;
120    ABT_thread threads[NUM_WORKERS];
121    worker_arg_t worker_args[NUM_WORKERS];
122    thread_safe_queue_t queue;
123
124    ABT_init(argc, argv);
125
126    printf("=== Thread-Safe Queue (Static Mutex) ===\n");
127    printf("Workers: %d, Queue capacity: %d\n", NUM_WORKERS, QUEUE_SIZE);
128    printf("Using ABT_mutex_memory for zero-overhead initialization\n\n");
129
130    queue_init(&queue, QUEUE_SIZE);
131
132    ABT_xstream_self(&xstream);
133    ABT_xstream_get_main_pools(xstream, 1, &pool);
134
135    /* Create worker threads */
136    for (int i = 0; i < NUM_WORKERS; i++) {
137        worker_args[i].worker_id = i;
138        worker_args[i].queue = &queue;
139        ABT_thread_create(pool, worker_thread, &worker_args[i],
140                          ABT_THREAD_ATTR_NULL, &threads[i]);
141    }
142
143    /* Wait for all workers */
144    for (int i = 0; i < NUM_WORKERS; i++) {
145        ABT_thread_free(&threads[i]);
146    }
147
148    queue_destroy(&queue);
149
150    printf("\nAll operations completed safely with mutex protection\n");
151    printf("No heap allocation needed for mutex (ABT_mutex_memory)\n");
152
153    ABT_finalize();
154    return 0;
155}

Key Points

Encapsulated Locking

Lock/unlock happens inside queue operations. Users don’t need to know about the mutex.

Short Critical Sections

Mutex is held only during the actual queue manipulation, not during application logic.

Yielding on Busy

When queue is full/empty, yield to let other work units run before retrying.

Pthread Interoperability

Critical for Mochi: Argobots mutexes work with pthreads (needed for MPI integration):

  1/*
  2 * Interoperability: Argobots ULTs and pthreads sharing mutex/cond
  3 * Critical for Mochi services that integrate with MPI or other pthread-based libraries
  4 */
  5
  6#include <stdio.h>
  7#include <pthread.h>
  8#include <abt.h>
  9
 10#define NUM_ULT_WORKERS 2
 11#define NUM_PTHREAD_WORKERS 2
 12
 13typedef struct {
 14    int counter;
 15    ABT_mutex mutex;
 16    ABT_cond cond;
 17    int target;
 18} shared_data_t;
 19
 20typedef struct {
 21    int worker_id;
 22    shared_data_t *shared;
 23} worker_arg_t;
 24
 25void ult_worker(void *arg)
 26{
 27    worker_arg_t *worker = (worker_arg_t *)arg;
 28    shared_data_t *shared = worker->shared;
 29
 30    ABT_mutex_lock(shared->mutex);
 31    shared->counter++;
 32    printf("ULT worker %d: incremented counter to %d\n",
 33           worker->worker_id, shared->counter);
 34
 35    /* Signal if target reached */
 36    if (shared->counter >= shared->target) {
 37        ABT_cond_broadcast(shared->cond);
 38    }
 39    ABT_mutex_unlock(shared->mutex);
 40}
 41
 42void *pthread_worker(void *arg)
 43{
 44    worker_arg_t *worker = (worker_arg_t *)arg;
 45    shared_data_t *shared = worker->shared;
 46
 47    /* pthreads can use Argobots mutex/cond */
 48    ABT_mutex_lock(shared->mutex);
 49    shared->counter++;
 50    printf("  pthread worker %d: incremented counter to %d\n",
 51           worker->worker_id, shared->counter);
 52
 53    /* Signal if target reached */
 54    if (shared->counter >= shared->target) {
 55        ABT_cond_broadcast(shared->cond);
 56    }
 57    ABT_mutex_unlock(shared->mutex);
 58
 59    return NULL;
 60}
 61
 62int main(int argc, char **argv)
 63{
 64    ABT_xstream xstream;
 65    ABT_pool pool;
 66    ABT_thread ult_threads[NUM_ULT_WORKERS];
 67    pthread_t pthreads[NUM_PTHREAD_WORKERS];
 68    worker_arg_t ult_args[NUM_ULT_WORKERS];
 69    worker_arg_t pthread_args[NUM_PTHREAD_WORKERS];
 70    shared_data_t shared;
 71
 72    ABT_init(argc, argv);
 73
 74    printf("=== Pthread Interoperability ===\n");
 75    printf("Mixing Argobots ULTs and pthreads with shared synchronization\n\n");
 76
 77    /* Initialize shared data */
 78    shared.counter = 0;
 79    shared.target = NUM_ULT_WORKERS + NUM_PTHREAD_WORKERS;
 80    ABT_mutex_create(&shared.mutex);
 81    ABT_cond_create(&shared.cond);
 82
 83    ABT_xstream_self(&xstream);
 84    ABT_xstream_get_main_pools(xstream, 1, &pool);
 85
 86    /* Create ULT workers */
 87    for (int i = 0; i < NUM_ULT_WORKERS; i++) {
 88        ult_args[i].worker_id = i;
 89        ult_args[i].shared = &shared;
 90        ABT_thread_create(pool, ult_worker, &ult_args[i],
 91                          ABT_THREAD_ATTR_NULL, &ult_threads[i]);
 92    }
 93
 94    /* Create pthread workers */
 95    for (int i = 0; i < NUM_PTHREAD_WORKERS; i++) {
 96        pthread_args[i].worker_id = i;
 97        pthread_args[i].shared = &shared;
 98        pthread_create(&pthreads[i], NULL, pthread_worker, &pthread_args[i]);
 99    }
100
101    /* Wait for target in main thread */
102    ABT_mutex_lock(shared.mutex);
103    while (shared.counter < shared.target) {
104        printf("Main: waiting for all workers (counter=%d/%d)\n",
105               shared.counter, shared.target);
106        ABT_cond_wait(shared.cond, shared.mutex);
107    }
108    ABT_mutex_unlock(shared.mutex);
109
110    printf("\nAll workers completed! Counter: %d\n", shared.counter);
111
112    /* Cleanup */
113    for (int i = 0; i < NUM_ULT_WORKERS; i++) {
114        ABT_thread_free(&ult_threads[i]);
115    }
116    for (int i = 0; i < NUM_PTHREAD_WORKERS; i++) {
117        pthread_join(pthreads[i], NULL);
118    }
119
120    ABT_cond_free(&shared.cond);
121    ABT_mutex_free(&shared.mutex);
122
123    printf("Argobots mutex/cond worked with both ULTs and pthreads\n");
124
125    ABT_finalize();
126    return 0;
127}

Key Points

Shared Synchronization

Pthreads can lock Argobots mutexes and wait on Argobots condition variables. This is essential when mixing Mochi services with pthread-based libraries (like MPI).

Common Use Case in Mochi:
  • Margo RPC handlers (ULTs) and other multi-threaded libraries (using pthreads) sharing data

  • Progress threads (pthreads) signaling Argobots work units

  • Integrating Mochi with pthread-based I/O libraries

Important

While blocking on an ABT_mutex will make the execution stream yield back to its scheduler to look for other ULTs to run, it is not the case when using a POSIX mutex (pthread_mutex_t). POSIX mutex will cause the entire execution stream to block. It is therefore import to rely on ABT_mutex as much as possible.

Mutex Priority Levels

Argobots mutexes support priority levels for scheduling:

ABT_mutex_lock_high(mutex);    /* High priority lock */
ABT_mutex_lock_low(mutex);     /* Low priority lock */
ABT_mutex_spinlock(mutex);     /* Spin instead of blocking */

Use priority locks when you need to influence scheduling decisions based on critical section importance.

Best Practices

Keep Critical Sections Short
/* Bad: Long critical section */
ABT_mutex_lock(mutex);
shared_data = expensive_computation();  /* Don't do this under lock */
ABT_mutex_unlock(mutex);

/* Good: Minimal critical section */
result = expensive_computation();
ABT_mutex_lock(mutex);
shared_data = result;
ABT_mutex_unlock(mutex);
Always Use while for Condition Variables
/* Wrong */
if (!ready) {
    ABT_cond_wait(cond, mutex);
}

/* Right */
while (!ready) {
    ABT_cond_wait(cond, mutex);
}
Unlock in Reverse Order

If you lock multiple mutexes, unlock in reverse order:

ABT_mutex_lock(mutex1);
ABT_mutex_lock(mutex2);
/* ... */
ABT_mutex_unlock(mutex2);
ABT_mutex_unlock(mutex1);
Signal vs Broadcast
  • ABT_cond_signal(): Wake one waiter (efficient for single consumer)

  • ABT_cond_broadcast(): Wake all waiters (needed when condition affects all)

Static initialization
  • Use ABT_*_memory whenever possible.

Common Pitfalls

Deadlock
/* Thread A */
ABT_mutex_lock(mutex1);
ABT_mutex_lock(mutex2);

/* Thread B */
ABT_mutex_lock(mutex2);  /* Locks in opposite order */
ABT_mutex_lock(mutex1);  /* Deadlock! */

Fix: Always lock mutexes in the same order.

Forgetting to Unlock
ABT_mutex_lock(mutex);
if (error) {
    return;  /* Wrong! Mutex still locked */
}
ABT_mutex_unlock(mutex);

Fix: Use error handling that ensures unlock, or use RAII-style wrappers.

Not Holding Mutex During cond_wait
/* Wrong */
ABT_cond_wait(cond, mutex);  /* Mutex not locked! */

ABT_cond_wait() requires the mutex to be locked when called.

Using if Instead of while

Spurious wakeups and broadcast signals mean you must recheck the condition:

/* Wrong */
ABT_mutex_lock(mutex);
if (!ready) {
    ABT_cond_wait(cond, mutex);
}
/* ready might still be false! */

API Reference

Mutex Functions
  • int ABT_mutex_create(ABT_mutex *newmutex)

  • int ABT_mutex_lock(ABT_mutex mutex)

  • int ABT_mutex_trylock(ABT_mutex mutex)

  • int ABT_mutex_spinlock(ABT_mutex mutex)

  • int ABT_mutex_lock_high(ABT_mutex mutex)

  • int ABT_mutex_lock_low(ABT_mutex mutex)

  • int ABT_mutex_unlock(ABT_mutex mutex)

  • int ABT_mutex_free(ABT_mutex *mutex)

Condition Variable Functions
  • int ABT_cond_create(ABT_cond *newcond)

  • int ABT_cond_wait(ABT_cond cond, ABT_mutex mutex)

  • int ABT_cond_timedwait(ABT_cond cond, ABT_mutex mutex, const struct timespec *abstime)

  • int ABT_cond_signal(ABT_cond cond)

  • int ABT_cond_broadcast(ABT_cond cond)

  • int ABT_cond_free(ABT_cond *cond)

Static Initializers
  • ABT_MUTEX_INITIALIZER: Static mutex initialization

  • ABT_COND_INITIALIZER: Static condition variable initialization