Barriers and Futures

In this tutorial, you will learn about two important synchronization primitives for parallel algorithms: barriers for bulk-synchronous patterns and futures for fine-grained dependency management.

Understanding the Synchronization Spectrum

These primitives form a spectrum from coarse-grained to fine-grained synchronization:

Barrier (all-to-all, N-to-N):
  • N work units all synchronize with each other

  • Everyone waits for everyone

  • Use for: bulk-synchronous algorithms, phase transitions

Future (many-to-one, N-to-1):
  • N producers each set one compartment

  • One consumer waits for all N compartments

  • Use for: gathering results from multiple workers, parallel reduction

Eventual (one-to-many, 1-to-N):
  • One producer sets a value

  • Multiple consumers can wait for that value

  • Use for: broadcast pattern (Tutorial 06)

Barriers

A barrier is a synchronization point where multiple work units wait for each other. When a work unit reaches a barrier, it blocks until all other work units also reach the barrier. Once all work units arrive, they all proceed together.

Barrier Count

When creating a barrier, you specify how many work units must arrive before the barrier releases. This count is fixed when the barrier is created.

Bulk-Synchronous Parallel (BSP) Model

Barriers enable the BSP programming model: 1. Parallel computation phase 2. Barrier synchronization 3. Communication/data exchange phase 4. Barrier synchronization 5. Repeat

Use Cases:
  • Stencil computations (iterative PDE solvers)

  • Synchronous iterative algorithms

  • Phase-based simulations

  • Parallel sorting algorithms

  • Matrix operations

Stencil Example with Barriers

Stencil computations update each array element based on its neighbors. This requires synchronization to ensure all threads read the same consistent array state:

  1/*
  2 * Stencil computation with barriers
  3 * Demonstrates bulk-synchronous parallel pattern
  4 */
  5
  6#include <stdio.h>
  7#include <stdlib.h>
  8#include <abt.h>
  9
 10#define NUM_THREADS 4
 11#define ARRAY_SIZE 16
 12#define NUM_ITERATIONS 3
 13
 14typedef struct {
 15    int thread_id;
 16    int num_threads;
 17    double *array;
 18    double *temp;
 19    ABT_barrier barrier;
 20} work_arg_t;
 21
 22void stencil_worker(void *arg)
 23{
 24    work_arg_t *work = (work_arg_t *)arg;
 25    int id = work->thread_id;
 26    int num = work->num_threads;
 27    int chunk_size = ARRAY_SIZE / num;
 28    int start = id * chunk_size;
 29    int end = (id == num - 1) ? ARRAY_SIZE : start + chunk_size;
 30
 31    for (int iter = 0; iter < NUM_ITERATIONS; iter++) {
 32        /* Compute stencil: temp[i] = average of array[i-1], array[i], array[i+1] */
 33        for (int i = start; i < end; i++) {
 34            int left = (i == 0) ? 0 : i - 1;
 35            int right = (i == ARRAY_SIZE - 1) ? ARRAY_SIZE - 1 : i + 1;
 36            work->temp[i] = (work->array[left] + work->array[i] + work->array[right]) / 3.0;
 37        }
 38
 39        /* Barrier: Wait for all threads to finish computation */
 40        ABT_barrier_wait(work->barrier);
 41
 42        /* Copy temp back to array */
 43        for (int i = start; i < end; i++) {
 44            work->array[i] = work->temp[i];
 45        }
 46
 47        /* Barrier: Wait for all threads to finish copying */
 48        ABT_barrier_wait(work->barrier);
 49
 50        if (id == 0) {
 51            printf("Iteration %d completed\n", iter);
 52        }
 53    }
 54}
 55
 56int main(int argc, char **argv)
 57{
 58    ABT_xstream xstream;
 59    ABT_pool pool;
 60    ABT_thread threads[NUM_THREADS];
 61    ABT_barrier barrier;
 62    work_arg_t work_args[NUM_THREADS];
 63    double array[ARRAY_SIZE];
 64    double temp[ARRAY_SIZE];
 65
 66    ABT_init(argc, argv);
 67
 68    printf("=== Stencil Computation with Barriers ===\n");
 69    printf("Array size: %d, Threads: %d, Iterations: %d\n\n",
 70           ARRAY_SIZE, NUM_THREADS, NUM_ITERATIONS);
 71
 72    /* Initialize array */
 73    for (int i = 0; i < ARRAY_SIZE; i++) {
 74        array[i] = i * 10.0;
 75    }
 76
 77    printf("Initial array:\n");
 78    for (int i = 0; i < ARRAY_SIZE; i++) {
 79        printf("%.1f ", array[i]);
 80    }
 81    printf("\n\n");
 82
 83    ABT_xstream_self(&xstream);
 84    ABT_xstream_get_main_pools(xstream, 1, &pool);
 85
 86    /* Create barrier for NUM_THREADS threads */
 87    ABT_barrier_create(NUM_THREADS, &barrier);
 88
 89    /* Create threads */
 90    for (int i = 0; i < NUM_THREADS; i++) {
 91        work_args[i].thread_id = i;
 92        work_args[i].num_threads = NUM_THREADS;
 93        work_args[i].array = array;
 94        work_args[i].temp = temp;
 95        work_args[i].barrier = barrier;
 96
 97        ABT_thread_create(pool, stencil_worker, &work_args[i],
 98                          ABT_THREAD_ATTR_NULL, &threads[i]);
 99    }
100
101    /* Wait for all threads */
102    for (int i = 0; i < NUM_THREADS; i++) {
103        ABT_thread_free(&threads[i]);
104    }
105
106    printf("\nFinal array:\n");
107    for (int i = 0; i < ARRAY_SIZE; i++) {
108        printf("%.1f ", array[i]);
109    }
110    printf("\n\n");
111
112    /* Free barrier */
113    ABT_barrier_free(&barrier);
114
115    printf("Barriers ensured all threads completed each phase before proceeding\n");
116
117    ABT_finalize();
118    return 0;
119}

Key Points

Two Barriers per Iteration
ABT_barrier_wait(work->barrier);  /* After computation */
ABT_barrier_wait(work->barrier);  /* After copying */

We need two barriers: 1. After computation: Ensure all threads finished computing before anyone copies 2. After copying: Ensure all threads finished copying before next iteration

Why two barriers?: Without the second barrier, some threads might start the next iteration’s computation while others are still copying, reading inconsistent data.

Barrier Creation
ABT_barrier_create(NUM_THREADS, &barrier);

The barrier is created for NUM_THREADS threads. Exactly this many threads must call ABT_barrier_wait() for the barrier to release.

Data Dependencies

Barriers enforce happens-before relationships: - All computations happen before any copying - All copying happens before next iteration’s computations

Multiple Barriers per Work Unit

Each work unit may wait on the same barrier multiple times. The barrier synchronizes all threads at each wait point.

Reinitialization

ABT_barrier_reinit may be used to re-initialize a barrier with a different number of waiters. There is no need to reinitialize a barrier to wait multiple times on it with the same number of waiters.

Futures

Futures provide multiple-producer-single-consumer synchronization. Multiple work units (producers) can contribute values to compartments of a future, and a single consumer waits for all compartments to be filled.

You can think of a future as an eventuel with multiple values (compartments), however the future does not store the values. It stores pointers to them, which means it is up to the caller to ensure the pointed memory remains valid. Also, ABT_future_wait, which blocks until all the compartments are filled, does not return the contained values. Instead, a callback provided so ABT_future_create is invoked when all the compartments are filled.

Compartments

A future with N compartments requires N ABT_future_set() calls before the consumer is unblocked. Each producer sets exactly one compartment.

/* Create future with 4 compartments (4 producers) */
ABT_future_create(4, callback, &future);

/* Worker 0 sets compartment 0 */
ABT_future_set(future, &result0);

/* Worker 1 sets compartment 1 */
ABT_future_set(future, &result1);

/* ... workers 2 and 3 ... */

/* Consumer waits for all 4 */
ABT_future_wait(future);  /* Blocks until all 4 compartments set */
Callback for Value Retrieval

The only way to retrieve values from a future is via the callback provided to ABT_future_create(). There is no ABT_future_get() function.

void gather_callback(void **args) {
    /* args[i] is the pointer passed by i-th ABT_future_set() */
    int *val0 = (int *)args[0];
    int *val1 = (int *)args[1];
    /* ... process values ... */
}

ABT_future_create(num_compartments, gather_callback, &future);

The callback is invoked automatically when all compartments are set, before ABT_future_wait() returns.

Futures vs Barriers
  • Barriers: All work units wait for all others (N-to-N synchronization)

  • Futures: One consumer waits for N producers (N-to-1 synchronization)

Futures expose more parallelism when only one work unit needs to wait for results, while producers can immediately move on to other work.

Parallel Reduction with Futures

This example demonstrates the core use case: multiple workers computing partial results, one coordinator gathering them.

  1/*
  2 * Parallel reduction with futures
  3 * Demonstrates multiple-producer-single-consumer synchronization
  4 */
  5
  6#include <stdio.h>
  7#include <stdlib.h>
  8#include <abt.h>
  9
 10#define NUM_WORKERS 4
 11#define ARRAY_SIZE 1000
 12
 13typedef struct {
 14    int worker_id;
 15    int *data;
 16    int start;
 17    int end;
 18    ABT_future result_future;
 19} worker_arg_t;
 20
 21/* Storage for partial results - must persist beyond worker lifetime */
 22int partial_results[NUM_WORKERS];
 23
 24/* Callback invoked when all workers complete
 25 * args[i] contains pointer passed by i-th worker's ABT_future_set()
 26 */
 27void reduction_callback(void **args)
 28{
 29    int total = 0;
 30    printf("\n=== Callback invoked - all workers done ===\n");
 31
 32    /* Sum up all partial results */
 33    for (int i = 0; i < NUM_WORKERS; i++) {
 34        int *partial = (int *)args[i];
 35        printf("  Worker %d contributed: %d\n", i, *partial);
 36        total += *partial;
 37    }
 38
 39    printf("  Total sum: %d\n", total);
 40}
 41
 42void worker_func(void *arg)
 43{
 44    worker_arg_t *warg = (worker_arg_t *)arg;
 45    int sum = 0;
 46
 47    /* Compute partial sum for assigned range */
 48    for (int i = warg->start; i < warg->end; i++) {
 49        sum += warg->data[i];
 50    }
 51
 52    /* Store result in persistent location */
 53    partial_results[warg->worker_id] = sum;
 54
 55    printf("Worker %d computed partial sum: %d (range %d-%d)\n",
 56           warg->worker_id, sum, warg->start, warg->end);
 57
 58    /* Set this worker's compartment in the future
 59     * This is the ONLY way to pass the value - via the callback
 60     */
 61    ABT_future_set(warg->result_future, &partial_results[warg->worker_id]);
 62}
 63
 64int main(int argc, char **argv)
 65{
 66    ABT_xstream xstream;
 67    ABT_pool pool;
 68    ABT_thread workers[NUM_WORKERS];
 69    ABT_future result_future;
 70    worker_arg_t worker_args[NUM_WORKERS];
 71
 72    /* Initialize test data */
 73    int *data = (int *)malloc(ARRAY_SIZE * sizeof(int));
 74    for (int i = 0; i < ARRAY_SIZE; i++) {
 75        data[i] = i + 1;  /* 1, 2, 3, ... */
 76    }
 77
 78    ABT_init(argc, argv);
 79
 80    printf("=== Multiple-Producer-Single-Consumer with Futures ===\n");
 81    printf("Computing sum of array with %d workers\n\n", NUM_WORKERS);
 82
 83    ABT_xstream_self(&xstream);
 84    ABT_xstream_get_main_pools(xstream, 1, &pool);
 85
 86    /* Create future with NUM_WORKERS compartments
 87     * One compartment per worker (multiple producers)
 88     * Main thread is the single consumer
 89     * Callback will be invoked when ALL workers complete
 90     */
 91    ABT_future_create(NUM_WORKERS, reduction_callback, &result_future);
 92
 93    /* Launch workers */
 94    int chunk_size = ARRAY_SIZE / NUM_WORKERS;
 95    for (int i = 0; i < NUM_WORKERS; i++) {
 96        worker_args[i].worker_id = i;
 97        worker_args[i].data = data;
 98        worker_args[i].start = i * chunk_size;
 99        worker_args[i].end = (i == NUM_WORKERS - 1) ? ARRAY_SIZE : (i + 1) * chunk_size;
100        worker_args[i].result_future = result_future;
101
102        ABT_thread_create(pool, worker_func, &worker_args[i],
103                          ABT_THREAD_ATTR_NULL, &workers[i]);
104    }
105
106    /* Main thread waits for ALL workers to complete
107     * This blocks until all NUM_WORKERS compartments are set
108     * Then the callback is invoked automatically
109     */
110    printf("\nMain thread waiting for all workers...\n");
111    ABT_future_wait(result_future);
112    printf("Main thread unblocked - all workers completed!\n");
113
114    /* Note: There is NO ABT_future_get() function!
115     * The only way to retrieve values is via the callback.
116     * If you need the result in main, the callback must store it somewhere.
117     */
118
119    /* Verify against sequential computation */
120    int expected = 0;
121    for (int i = 0; i < ARRAY_SIZE; i++) {
122        expected += data[i];
123    }
124    printf("\nExpected sum (sequential): %d\n", expected);
125
126    /* Wait for workers to join */
127    for (int i = 0; i < NUM_WORKERS; i++) {
128        ABT_thread_free(&workers[i]);
129    }
130
131    ABT_future_free(&result_future);
132    free(data);
133
134    printf("\nKey pattern demonstrated:\n");
135    printf("- Multiple producers (workers) each set one compartment\n");
136    printf("- Single consumer (main) waits for all compartments\n");
137    printf("- Callback receives all values when complete\n");
138    printf("- This is between eventual (1-to-1) and barrier (N-to-N)\n");
139
140    ABT_finalize();
141    return 0;
142}

Key Points

Multiple Producers
ABT_future_create(NUM_WORKERS, reduction_callback, &result_future);

for (int i = 0; i < NUM_WORKERS; i++) {
    /* Each worker sets one compartment */
    ABT_thread_create(pool, worker_func, &worker_args[i], ...);
}

The future has one compartment per worker. This is the multiple-producer pattern.

Callback Receives All Values
void reduction_callback(void **args) {
    for (int i = 0; i < NUM_WORKERS; i++) {
        int *partial = (int *)args[i];
        total += *partial;
    }
}

The callback is invoked when all workers complete. args[] contains all the pointers passed via ABT_future_set().

Single Consumer Waits
ABT_future_wait(result_future);

Main thread (single consumer) blocks until all NUM_WORKERS compartments are set.

Persistent Storage Required
int partial_results[NUM_WORKERS];  /* Must outlive workers */

Values passed to ABT_future_set() must remain valid until the callback executes. Stack variables in worker functions will be destroyed too early.

When to Use Futures

Use Futures When:
  • Multiple producers, one consumer (gather/reduce pattern)

  • Sparse dependency graphs (not all-to-all)

  • Each consumer waits for a specific subset of producers

  • You want to expose maximum parallelism

Use Eventuals When:
  • Single producer, single or multiple consumers

  • Simpler one-to-many broadcast (Tutorial 06)

Use Barriers When:
  • All work units must synchronize (N-to-N)

  • Bulk-synchronous parallel algorithms

  • Phase-based computation

Common Patterns

Parallel Reduction (Futures)
/* N workers compute partial results */
ABT_future_create(N, reduction_callback, &future);
for (i = 0; i < N; i++) {
    ABT_thread_create(pool, worker, &args[i], ...);
}
ABT_future_wait(future);  /* Wait for all N */
Bulk-Synchronous Iteration (Barriers)
ABT_barrier_create(N, &barrier);
for (i = 0; i < N; i++) {
    ABT_thread_create(pool, worker, ...);
}
/* Workers do: compute(), barrier_wait(), communicate(), barrier_wait() */
Divide and Conquer (Futures)
/* Fork into N subproblems */
ABT_future_create(N, merge_callback, &future);
for (i = 0; i < N; i++) {
    ABT_thread_create(pool, subproblem, &args[i], ...);
}
ABT_future_wait(future);  /* Wait for all subproblems */

Common Pitfalls

Wrong Barrier Count
/* WRONG: Barrier count doesn't match thread count */
ABT_barrier_create(NUM_THREADS - 1, &barrier);
for (int i = 0; i < NUM_THREADS; i++) {
    /* Last thread will wait forever */
}
No ABT_future_get() Function
/* WRONG: This function does not exist! */
int *value;
ABT_future_get(future, &value);  /* Compilation error */

The only way to get values is via the callback provided to ABT_future_create().

Forgetting to Reset Futures
/* WRONG: Reusing future without reset */
ABT_future_set(future, &value);
/* ... later in next iteration ... */
ABT_future_set(future, &value);  /* Error! Already set */

Always reset between uses:

ABT_future_set(future, &value);
ABT_future_wait(future);
ABT_future_reset(future);  /* Reset for next use */
Mismatched Compartment Count
/* Create with 4 compartments */
ABT_future_create(4, callback, &future);

/* Only 3 workers set compartments */
/* Waiter blocks forever! Needs 4 sets */
ABT_future_wait(future);

Number of compartments must match number of producers.

Forgetting to Free Resources
ABT_barrier_create(n, &barrier);
/* ... use barrier ... */
ABT_barrier_free(&barrier);  /* Don't forget! */

ABT_future_create(n, callback, &future);
/* ... use future ... */
ABT_future_free(&future);  /* Don't forget! */

API Reference

Barrier Functions
  • int ABT_barrier_create(uint32_t num_waiters, ABT_barrier *newbarrier)

    Create a barrier for num_waiters work units.

  • int ABT_barrier_wait(ABT_barrier barrier)

    Wait at the barrier. Blocks until all num_waiters work units call wait.

  • int ABT_barrier_reinit(ABT_barrier barrier, uint32_t num_waiters)

    Reinitialize a barrier for reuse. Resets internal state and waiter count.

  • int ABT_barrier_free(ABT_barrier *barrier)

    Free a barrier. Must not be called while work units are waiting.

Future Functions
  • int ABT_future_create(uint32_t compartments, void (*cb_func)(void **arg), ABT_future *newfuture)

    Create a future with specified number of compartments. cb_func is invoked when all compartments are set, receiving array of value pointers. Pass NULL for cb_func if you only need synchronization without value gathering.

  • int ABT_future_wait(ABT_future future)

    Wait for all compartments to be set. Blocks until ready. Callback (if provided) is invoked just before this returns.

  • int ABT_future_test(ABT_future future, ABT_bool *flag)

    Non-blocking test if future is ready (all compartments set).

  • int ABT_future_set(ABT_future future, void *value)

    Set one compartment with a value pointer. Sets the next unfilled compartment. If this is the last compartment, wakes waiters and invokes callback.

  • int ABT_future_reset(ABT_future future)

    Reset future for reuse. All compartments become unfilled again.

  • int ABT_future_free(ABT_future *future)

    Free a future object.

Important: There is no ABT_future_get() function. Values are retrieved only via the callback.