Mutexes and Condition Variables
Mutexes and condition variables are traditional synchronization primitives for protecting shared data and coordinating work units. They are essential for thread-safe data structures and complex coordination patterns.
Key Concepts
- Mutexes (Mutual Exclusion)
A mutex ensures only one work unit accesses a critical section at a time. -
ABT_mutex_lock(): Acquire exclusive access (blocks if held by another) -ABT_mutex_unlock(): Release access - Protected region: Code between lock and unlock- Condition Variables
Condition variables allow work units to wait for specific conditions. -
ABT_cond_wait(mutex): Release mutex, wait for signal, reacquire mutex -ABT_cond_signal(): Wake one waiter -ABT_cond_broadcast(): Wake all waiters- Critical Pattern:
ABT_mutex_lock(mutex); while (!condition) { ABT_cond_wait(cond, mutex); /* Atomically unlocks mutex and waits */ } /* Condition is now true, mutex is locked */ ABT_mutex_unlock(mutex);
Example: Producer-Consumer Pattern
The classic producer-consumer pattern uses mutex and condition variables for coordination:
1/*
2 * Producer-consumer pattern with mutex and condition variable
3 */
4
5#include <stdio.h>
6#include <abt.h>
7
8#define NUM_PRODUCERS 2
9#define NUM_CONSUMERS 2
10#define ITEMS_PER_PRODUCER 5
11#define BUFFER_SIZE 3
12
13typedef struct {
14 int buffer[BUFFER_SIZE];
15 int count;
16 int in;
17 int out;
18 ABT_mutex mutex;
19 ABT_cond not_full;
20 ABT_cond not_empty;
21} shared_buffer_t;
22
23typedef struct {
24 int id;
25 shared_buffer_t *shared;
26} worker_arg_t;
27
28void producer(void *arg)
29{
30 worker_arg_t *worker = (worker_arg_t *)arg;
31 shared_buffer_t *buf = worker->shared;
32
33 for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
34 int item = worker->id * 100 + i;
35
36 ABT_mutex_lock(buf->mutex);
37
38 /* Wait while buffer is full */
39 while (buf->count == BUFFER_SIZE) {
40 printf("Producer %d: buffer full, waiting...\n", worker->id);
41 ABT_cond_wait(buf->not_full, buf->mutex);
42 }
43
44 /* Add item to buffer */
45 buf->buffer[buf->in] = item;
46 buf->in = (buf->in + 1) % BUFFER_SIZE;
47 buf->count++;
48 printf("Producer %d: produced item %d (buffer: %d/%d)\n",
49 worker->id, item, buf->count, BUFFER_SIZE);
50
51 /* Signal consumers */
52 ABT_cond_signal(buf->not_empty);
53
54 ABT_mutex_unlock(buf->mutex);
55 }
56}
57
58void consumer(void *arg)
59{
60 worker_arg_t *worker = (worker_arg_t *)arg;
61 shared_buffer_t *buf = worker->shared;
62 int num_items = (NUM_PRODUCERS * ITEMS_PER_PRODUCER) / NUM_CONSUMERS;
63
64 for (int i = 0; i < num_items; i++) {
65 ABT_mutex_lock(buf->mutex);
66
67 /* Wait while buffer is empty */
68 while (buf->count == 0) {
69 printf("Consumer %d: buffer empty, waiting...\n", worker->id);
70 ABT_cond_wait(buf->not_empty, buf->mutex);
71 }
72
73 /* Remove item from buffer */
74 int item = buf->buffer[buf->out];
75 buf->out = (buf->out + 1) % BUFFER_SIZE;
76 buf->count--;
77 printf(" Consumer %d: consumed item %d (buffer: %d/%d)\n",
78 worker->id, item, buf->count, BUFFER_SIZE);
79
80 /* Signal producers */
81 ABT_cond_signal(buf->not_full);
82
83 ABT_mutex_unlock(buf->mutex);
84 }
85}
86
87int main(int argc, char **argv)
88{
89 ABT_xstream xstream;
90 ABT_pool pool;
91 ABT_thread producers[NUM_PRODUCERS];
92 ABT_thread consumers[NUM_CONSUMERS];
93 worker_arg_t prod_args[NUM_PRODUCERS];
94 worker_arg_t cons_args[NUM_CONSUMERS];
95 shared_buffer_t shared;
96
97 ABT_init(argc, argv);
98
99 printf("=== Producer-Consumer Pattern ===\n");
100 printf("Producers: %d, Consumers: %d, Buffer: %d\n\n",
101 NUM_PRODUCERS, NUM_CONSUMERS, BUFFER_SIZE);
102
103 /* Initialize shared buffer */
104 shared.count = 0;
105 shared.in = 0;
106 shared.out = 0;
107 ABT_mutex_create(&shared.mutex);
108 ABT_cond_create(&shared.not_full);
109 ABT_cond_create(&shared.not_empty);
110
111 ABT_xstream_self(&xstream);
112 ABT_xstream_get_main_pools(xstream, 1, &pool);
113
114 /* Create producers */
115 for (int i = 0; i < NUM_PRODUCERS; i++) {
116 prod_args[i].id = i;
117 prod_args[i].shared = &shared;
118 ABT_thread_create(pool, producer, &prod_args[i],
119 ABT_THREAD_ATTR_NULL, &producers[i]);
120 }
121
122 /* Create consumers */
123 for (int i = 0; i < NUM_CONSUMERS; i++) {
124 cons_args[i].id = i;
125 cons_args[i].shared = &shared;
126 ABT_thread_create(pool, consumer, &cons_args[i],
127 ABT_THREAD_ATTR_NULL, &consumers[i]);
128 }
129
130 /* Wait for all */
131 for (int i = 0; i < NUM_PRODUCERS; i++) {
132 ABT_thread_free(&producers[i]);
133 }
134 for (int i = 0; i < NUM_CONSUMERS; i++) {
135 ABT_thread_free(&consumers[i]);
136 }
137
138 /* Cleanup */
139 ABT_cond_free(&shared.not_empty);
140 ABT_cond_free(&shared.not_full);
141 ABT_mutex_free(&shared.mutex);
142
143 printf("\nAll items produced and consumed successfully\n");
144
145 ABT_finalize();
146 return 0;
147}
Key Points
- Protecting Shared State
All accesses to the shared buffer are protected by mutex lock/unlock.
- Waiting on Conditions
while (buf->count == BUFFER_SIZE) { ABT_cond_wait(buf->not_full, buf->mutex); }
Producer waits while buffer is full.
ABT_cond_wait()atomically releases the mutex and blocks. When signaled, it reacquires the mutex before returning.- Signaling
After producing/consuming, signal the opposite side that the condition changed.
Optimizing memory
ABT_mutex is an opaque pointer to some heap-allocated object that must be
created with ABT_mutex_create`. To avoid such an indirection, you may use
an ABT_mutex_memory instead, initialized with ABT_MUTEX_INITIALIZER.
This type is a placeholder of the same size as a mutex’ underlying implementation,
and can be converted into an ABT_mutex by using
ABT_MUTEX_MEMORY_GET_HANDLE(&mutex_memory). This eliminates the need for
ABT_mutex_create/free and their corresponding heap allocation/deallocation.
The same applies to ABT_cond_memory, using ABT_COND_INITIALIZER and
ABT_COND_MEMORY_GET_HANDLE.
Important
An ABT_mutex_memory should not be moved in memory. An ABT_mutex can
be moved (it’s a pointer to an ABT_mutex_memory, which doesn’t move).
Example: Thread-Safe Queue
Building reusable thread-safe data structures with mutexes, using static initialization
(ABT_mutex_memory instead of ABT_mutex):
1/*
2 * Thread-safe queue implementation with mutexes using static initialization
3 * Demonstrates ABT_mutex_memory to avoid heap allocation
4 */
5
6#include <stdio.h>
7#include <stdlib.h>
8#include <string.h>
9#include <abt.h>
10
11#define QUEUE_SIZE 10
12#define NUM_WORKERS 4
13#define ITEMS_PER_WORKER 5
14
15typedef struct {
16 int *data;
17 int capacity;
18 int size;
19 int head;
20 int tail;
21 ABT_mutex_memory mutex_mem; /* Static mutex memory */
22} thread_safe_queue_t;
23
24void queue_init(thread_safe_queue_t *q, int capacity)
25{
26 q->data = malloc(sizeof(int) * capacity);
27 q->capacity = capacity;
28 q->size = 0;
29 q->head = 0;
30 q->tail = 0;
31 /* Initialize mutex memory
32 * This is equivalent to setting it to ABT_MUTEX_INITIALIZER */
33 memset(&q->mutex_mem, 0, sizeof(q->mutex_mem));
34}
35
36int queue_push(thread_safe_queue_t *q, int value)
37{
38 /* Convert mutex_memory to ABT_mutex handle */
39 ABT_mutex mutex = ABT_MUTEX_MEMORY_GET_HANDLE(&q->mutex_mem);
40
41 ABT_mutex_lock(mutex);
42
43 if (q->size == q->capacity) {
44 ABT_mutex_unlock(mutex);
45 return -1; /* Queue full */
46 }
47
48 q->data[q->tail] = value;
49 q->tail = (q->tail + 1) % q->capacity;
50 q->size++;
51
52 ABT_mutex_unlock(mutex);
53 return 0;
54}
55
56int queue_pop(thread_safe_queue_t *q, int *value)
57{
58 /* Convert mutex_memory to ABT_mutex handle */
59 ABT_mutex mutex = ABT_MUTEX_MEMORY_GET_HANDLE(&q->mutex_mem);
60
61 ABT_mutex_lock(mutex);
62
63 if (q->size == 0) {
64 ABT_mutex_unlock(mutex);
65 return -1; /* Queue empty */
66 }
67
68 *value = q->data[q->head];
69 q->head = (q->head + 1) % q->capacity;
70 q->size--;
71
72 ABT_mutex_unlock(mutex);
73 return 0;
74}
75
76void queue_destroy(thread_safe_queue_t *q)
77{
78 /* No need to free mutex_mem - no heap allocation */
79 free(q->data);
80}
81
82typedef struct {
83 int worker_id;
84 thread_safe_queue_t *queue;
85} worker_arg_t;
86
87void worker_thread(void *arg)
88{
89 worker_arg_t *worker = (worker_arg_t *)arg;
90
91 /* Each worker pushes some items */
92 for (int i = 0; i < ITEMS_PER_WORKER; i++) {
93 int item = worker->worker_id * 100 + i;
94
95 while (queue_push(worker->queue, item) != 0) {
96 /* Queue full, yield and retry */
97 ABT_thread_yield();
98 }
99
100 printf("Worker %d: pushed %d\n", worker->worker_id, item);
101 }
102
103 /* Then pop some items */
104 for (int i = 0; i < ITEMS_PER_WORKER; i++) {
105 int item;
106
107 while (queue_pop(worker->queue, &item) != 0) {
108 /* Queue empty, yield and retry */
109 ABT_thread_yield();
110 }
111
112 printf(" Worker %d: popped %d\n", worker->worker_id, item);
113 }
114}
115
116int main(int argc, char **argv)
117{
118 ABT_xstream xstream;
119 ABT_pool pool;
120 ABT_thread threads[NUM_WORKERS];
121 worker_arg_t worker_args[NUM_WORKERS];
122 thread_safe_queue_t queue;
123
124 ABT_init(argc, argv);
125
126 printf("=== Thread-Safe Queue (Static Mutex) ===\n");
127 printf("Workers: %d, Queue capacity: %d\n", NUM_WORKERS, QUEUE_SIZE);
128 printf("Using ABT_mutex_memory for zero-overhead initialization\n\n");
129
130 queue_init(&queue, QUEUE_SIZE);
131
132 ABT_xstream_self(&xstream);
133 ABT_xstream_get_main_pools(xstream, 1, &pool);
134
135 /* Create worker threads */
136 for (int i = 0; i < NUM_WORKERS; i++) {
137 worker_args[i].worker_id = i;
138 worker_args[i].queue = &queue;
139 ABT_thread_create(pool, worker_thread, &worker_args[i],
140 ABT_THREAD_ATTR_NULL, &threads[i]);
141 }
142
143 /* Wait for all workers */
144 for (int i = 0; i < NUM_WORKERS; i++) {
145 ABT_thread_free(&threads[i]);
146 }
147
148 queue_destroy(&queue);
149
150 printf("\nAll operations completed safely with mutex protection\n");
151 printf("No heap allocation needed for mutex (ABT_mutex_memory)\n");
152
153 ABT_finalize();
154 return 0;
155}
Key Points
- Encapsulated Locking
Lock/unlock happens inside queue operations. Users don’t need to know about the mutex.
- Short Critical Sections
Mutex is held only during the actual queue manipulation, not during application logic.
- Yielding on Busy
When queue is full/empty, yield to let other work units run before retrying.
Pthread Interoperability
Critical for Mochi: Argobots mutexes work with pthreads (needed for MPI integration):
1/*
2 * Interoperability: Argobots ULTs and pthreads sharing mutex/cond
3 * Critical for Mochi services that integrate with MPI or other pthread-based libraries
4 */
5
6#include <stdio.h>
7#include <pthread.h>
8#include <abt.h>
9
10#define NUM_ULT_WORKERS 2
11#define NUM_PTHREAD_WORKERS 2
12
13typedef struct {
14 int counter;
15 ABT_mutex mutex;
16 ABT_cond cond;
17 int target;
18} shared_data_t;
19
20typedef struct {
21 int worker_id;
22 shared_data_t *shared;
23} worker_arg_t;
24
25void ult_worker(void *arg)
26{
27 worker_arg_t *worker = (worker_arg_t *)arg;
28 shared_data_t *shared = worker->shared;
29
30 ABT_mutex_lock(shared->mutex);
31 shared->counter++;
32 printf("ULT worker %d: incremented counter to %d\n",
33 worker->worker_id, shared->counter);
34
35 /* Signal if target reached */
36 if (shared->counter >= shared->target) {
37 ABT_cond_broadcast(shared->cond);
38 }
39 ABT_mutex_unlock(shared->mutex);
40}
41
42void *pthread_worker(void *arg)
43{
44 worker_arg_t *worker = (worker_arg_t *)arg;
45 shared_data_t *shared = worker->shared;
46
47 /* pthreads can use Argobots mutex/cond */
48 ABT_mutex_lock(shared->mutex);
49 shared->counter++;
50 printf(" pthread worker %d: incremented counter to %d\n",
51 worker->worker_id, shared->counter);
52
53 /* Signal if target reached */
54 if (shared->counter >= shared->target) {
55 ABT_cond_broadcast(shared->cond);
56 }
57 ABT_mutex_unlock(shared->mutex);
58
59 return NULL;
60}
61
62int main(int argc, char **argv)
63{
64 ABT_xstream xstream;
65 ABT_pool pool;
66 ABT_thread ult_threads[NUM_ULT_WORKERS];
67 pthread_t pthreads[NUM_PTHREAD_WORKERS];
68 worker_arg_t ult_args[NUM_ULT_WORKERS];
69 worker_arg_t pthread_args[NUM_PTHREAD_WORKERS];
70 shared_data_t shared;
71
72 ABT_init(argc, argv);
73
74 printf("=== Pthread Interoperability ===\n");
75 printf("Mixing Argobots ULTs and pthreads with shared synchronization\n\n");
76
77 /* Initialize shared data */
78 shared.counter = 0;
79 shared.target = NUM_ULT_WORKERS + NUM_PTHREAD_WORKERS;
80 ABT_mutex_create(&shared.mutex);
81 ABT_cond_create(&shared.cond);
82
83 ABT_xstream_self(&xstream);
84 ABT_xstream_get_main_pools(xstream, 1, &pool);
85
86 /* Create ULT workers */
87 for (int i = 0; i < NUM_ULT_WORKERS; i++) {
88 ult_args[i].worker_id = i;
89 ult_args[i].shared = &shared;
90 ABT_thread_create(pool, ult_worker, &ult_args[i],
91 ABT_THREAD_ATTR_NULL, &ult_threads[i]);
92 }
93
94 /* Create pthread workers */
95 for (int i = 0; i < NUM_PTHREAD_WORKERS; i++) {
96 pthread_args[i].worker_id = i;
97 pthread_args[i].shared = &shared;
98 pthread_create(&pthreads[i], NULL, pthread_worker, &pthread_args[i]);
99 }
100
101 /* Wait for target in main thread */
102 ABT_mutex_lock(shared.mutex);
103 while (shared.counter < shared.target) {
104 printf("Main: waiting for all workers (counter=%d/%d)\n",
105 shared.counter, shared.target);
106 ABT_cond_wait(shared.cond, shared.mutex);
107 }
108 ABT_mutex_unlock(shared.mutex);
109
110 printf("\nAll workers completed! Counter: %d\n", shared.counter);
111
112 /* Cleanup */
113 for (int i = 0; i < NUM_ULT_WORKERS; i++) {
114 ABT_thread_free(&ult_threads[i]);
115 }
116 for (int i = 0; i < NUM_PTHREAD_WORKERS; i++) {
117 pthread_join(pthreads[i], NULL);
118 }
119
120 ABT_cond_free(&shared.cond);
121 ABT_mutex_free(&shared.mutex);
122
123 printf("Argobots mutex/cond worked with both ULTs and pthreads\n");
124
125 ABT_finalize();
126 return 0;
127}
Key Points
- Shared Synchronization
Pthreads can lock Argobots mutexes and wait on Argobots condition variables. This is essential when mixing Mochi services with pthread-based libraries (like MPI).
- Common Use Case in Mochi:
Margo RPC handlers (ULTs) and other multi-threaded libraries (using pthreads) sharing data
Progress threads (pthreads) signaling Argobots work units
Integrating Mochi with pthread-based I/O libraries
Important
While blocking on an ABT_mutex will make the execution stream yield back to
its scheduler to look for other ULTs to run, it is not the case when using a POSIX mutex
(pthread_mutex_t). POSIX mutex will cause the entire execution stream to block.
It is therefore import to rely on ABT_mutex as much as possible.
Mutex Priority Levels
Argobots mutexes support priority levels for scheduling:
ABT_mutex_lock_high(mutex); /* High priority lock */
ABT_mutex_lock_low(mutex); /* Low priority lock */
ABT_mutex_spinlock(mutex); /* Spin instead of blocking */
Use priority locks when you need to influence scheduling decisions based on critical section importance.
Best Practices
- Keep Critical Sections Short
/* Bad: Long critical section */ ABT_mutex_lock(mutex); shared_data = expensive_computation(); /* Don't do this under lock */ ABT_mutex_unlock(mutex); /* Good: Minimal critical section */ result = expensive_computation(); ABT_mutex_lock(mutex); shared_data = result; ABT_mutex_unlock(mutex);
- Always Use while for Condition Variables
/* Wrong */ if (!ready) { ABT_cond_wait(cond, mutex); } /* Right */ while (!ready) { ABT_cond_wait(cond, mutex); }
- Unlock in Reverse Order
If you lock multiple mutexes, unlock in reverse order:
ABT_mutex_lock(mutex1); ABT_mutex_lock(mutex2); /* ... */ ABT_mutex_unlock(mutex2); ABT_mutex_unlock(mutex1);
- Signal vs Broadcast
ABT_cond_signal(): Wake one waiter (efficient for single consumer)ABT_cond_broadcast(): Wake all waiters (needed when condition affects all)
- Static initialization
Use
ABT_*_memorywhenever possible.
Common Pitfalls
- Deadlock
/* Thread A */ ABT_mutex_lock(mutex1); ABT_mutex_lock(mutex2); /* Thread B */ ABT_mutex_lock(mutex2); /* Locks in opposite order */ ABT_mutex_lock(mutex1); /* Deadlock! */
Fix: Always lock mutexes in the same order.
- Forgetting to Unlock
ABT_mutex_lock(mutex); if (error) { return; /* Wrong! Mutex still locked */ } ABT_mutex_unlock(mutex);
Fix: Use error handling that ensures unlock, or use RAII-style wrappers.
- Not Holding Mutex During cond_wait
/* Wrong */ ABT_cond_wait(cond, mutex); /* Mutex not locked! */
ABT_cond_wait()requires the mutex to be locked when called.- Using if Instead of while
Spurious wakeups and broadcast signals mean you must recheck the condition:
/* Wrong */ ABT_mutex_lock(mutex); if (!ready) { ABT_cond_wait(cond, mutex); } /* ready might still be false! */
API Reference
- Mutex Functions
int ABT_mutex_create(ABT_mutex *newmutex)int ABT_mutex_lock(ABT_mutex mutex)int ABT_mutex_trylock(ABT_mutex mutex)int ABT_mutex_spinlock(ABT_mutex mutex)int ABT_mutex_lock_high(ABT_mutex mutex)int ABT_mutex_lock_low(ABT_mutex mutex)int ABT_mutex_unlock(ABT_mutex mutex)int ABT_mutex_free(ABT_mutex *mutex)
- Condition Variable Functions
int ABT_cond_create(ABT_cond *newcond)int ABT_cond_wait(ABT_cond cond, ABT_mutex mutex)int ABT_cond_timedwait(ABT_cond cond, ABT_mutex mutex, const struct timespec *abstime)int ABT_cond_signal(ABT_cond cond)int ABT_cond_broadcast(ABT_cond cond)int ABT_cond_free(ABT_cond *cond)
- Static Initializers
ABT_MUTEX_INITIALIZER: Static mutex initializationABT_COND_INITIALIZER: Static condition variable initialization