Event Notifications with Wait/Notify

Yokan provides a powerful event notification mechanism through the YOKAN_MODE_WAIT and YOKAN_MODE_NOTIFY modes. This allows clients to coordinate asynchronously by waiting for specific keys to appear in the database and being notified when they do.

This feature is particularly useful for:

  • Producer/consumer patterns

  • Distributed coordination

  • Event-driven workflows

  • Barrier synchronization

  • Pipeline processing

Basic Concepts

The wait/notify mechanism works as follows:

  1. Waiting clients use YOKAN_MODE_WAIT when performing get operations on keys that don’t yet exist. Instead of returning an error, the operation blocks until the key appears.

  2. Notifying clients use YOKAN_MODE_NOTIFY when putting values. This wakes up any clients waiting for those specific keys.

The key advantage is that waiting clients don’t need to poll the database repeatedly - they simply block until the data is available, reducing network traffic and improving efficiency.

Important

Not all backends support wait/notify modes. If you attempt to use these modes with an unsupported backend, you’ll receive a YOKAN_ERR_MODE error. The in-memory backends (map, unordered_map) support this feature.

Basic Wait/Notify Example

Here’s a simple example demonstrating the wait/notify pattern:

/*
 * (C) 2024 The University of Chicago
 *
 * See COPYRIGHT in top-level directory.
 */
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

struct consumer_args {
    yk_database_handle_t db;
    const char* key;
    size_t ksize;
};

static void consumer_thread(void* arg)
{
    struct consumer_args* args = (struct consumer_args*)arg;

    printf("[Consumer] Waiting for key '%s'...\n", args->key);

    char buffer[256];
    size_t vsize = sizeof(buffer);

    /* Use YOKAN_MODE_WAIT to block until key appears */
    yk_return_t ret = yk_get(args->db, YOKAN_MODE_WAIT,
                               args->key, args->ksize,
                               buffer, &vsize);

    if(ret == YOKAN_SUCCESS) {
        printf("[Consumer] Received: %.*s\n", (int)vsize, buffer);
    } else {
        fprintf(stderr, "[Consumer] Error getting key\n");
    }
}

int main(int argc, char** argv) {
    if(argc != 3) {
        fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
        return 1;
    }

    const char* server_addr_str = argv[1];
    uint16_t provider_id = (uint16_t)atoi(argv[2]);

    /* Initialize Margo and Argobots */
    margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
    if(mid == MARGO_INSTANCE_NULL) {
        fprintf(stderr, "Failed to initialize Margo\n");
        return 1;
    }

    yk_return_t ret;
    yk_client_t client;
    yk_database_handle_t db;
    hg_addr_t server_addr = HG_ADDR_NULL;
    ABT_xstream xstream;
    ABT_pool pool;
    ABT_thread consumer;

    /* Initialize Yokan client */
    ret = yk_client_init(mid, &client);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to initialize Yokan client\n");
        margo_finalize(mid);
        return 1;
    }

    /* Look up address */
    hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
    if(hret != HG_SUCCESS) {
        fprintf(stderr, "Failed to lookup server address\n");
        goto cleanup;
    }

    /* Create database handle */
    ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to create database handle\n");
        goto cleanup;
    }

    /* Get default execution stream and pool for threading */
    ABT_xstream_self(&xstream);
    ABT_xstream_get_main_pools(xstream, 1, &pool);

    const char* key = "notification_key";
    const char* value = "notification_value";

    /* Launch consumer thread that waits for key */
    struct consumer_args args = { db, key, strlen(key) };
    ABT_thread_create(pool, consumer_thread, &args,
                      ABT_THREAD_ATTR_NULL, &consumer);

    /* Give consumer time to start waiting */
    usleep(100000); /* 100ms */

    /* Producer: put value with notification */
    printf("[Producer] Putting value with notification...\n");

    /* Use YOKAN_MODE_NOTIFY to wake up waiting consumers */
    ret = yk_put(db, YOKAN_MODE_NOTIFY,
                 key, strlen(key),
                 value, strlen(value));

    if(ret == YOKAN_SUCCESS) {
        printf("[Producer] Value sent\n");
    }

    /* Wait for consumer thread to finish */
    ABT_thread_free(&consumer);

    printf("\n=== Wait/Notify completed successfully ===\n");

    /* Cleanup */
    yk_database_handle_release(db);

cleanup:
    if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
    yk_client_finalize(client);
    margo_finalize(mid);

    return (ret == YOKAN_SUCCESS) ? 0 : 1;
}

In this example:

  • The consumer starts waiting for a key that doesn’t exist yet

  • The producer puts the value with YOKAN_MODE_NOTIFY

  • The consumer’s get() operation completes and receives the value

The consumer thread blocks at the get() call until the producer puts the value, providing efficient event-driven coordination.

Multiple Waiters

Multiple clients can wait for the same key. When a put operation uses YOKAN_MODE_NOTIFY, all waiting clients are woken up:

/*
 * (C) 2024 The University of Chicago
 *
 * See COPYRIGHT in top-level directory.
 */
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

struct consumer_args {
    yk_database_handle_t db;
    const char* key;
    size_t ksize;
    int id;
};

static void consumer_thread(void* arg)
{
    struct consumer_args* args = (struct consumer_args*)arg;

    printf("[Consumer %d] Waiting for event...\n", args->id);

    char buffer[256];
    size_t vsize = sizeof(buffer);

    /* All consumers wait for the same key */
    yk_return_t ret = yk_get(args->db, YOKAN_MODE_WAIT,
                               args->key, args->ksize,
                               buffer, &vsize);

    if(ret == YOKAN_SUCCESS) {
        printf("[Consumer %d] Received: %.*s\n", args->id, (int)vsize, buffer);
    } else {
        fprintf(stderr, "[Consumer %d] Error getting key\n", args->id);
    }
}

int main(int argc, char** argv) {
    if(argc != 3) {
        fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
        return 1;
    }

    const char* server_addr_str = argv[1];
    uint16_t provider_id = (uint16_t)atoi(argv[2]);

    /* Initialize Margo */
    margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
    if(mid == MARGO_INSTANCE_NULL) {
        fprintf(stderr, "Failed to initialize Margo\n");
        return 1;
    }

    yk_return_t ret;
    yk_client_t client;
    yk_database_handle_t db;
    hg_addr_t server_addr = HG_ADDR_NULL;
    ABT_xstream xstream;
    ABT_pool pool;

    /* Initialize Yokan client */
    ret = yk_client_init(mid, &client);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to initialize Yokan client\n");
        margo_finalize(mid);
        return 1;
    }

    /* Look up address */
    hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
    if(hret != HG_SUCCESS) {
        fprintf(stderr, "Failed to lookup server address\n");
        goto cleanup;
    }

    /* Create database handle */
    ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to create database handle\n");
        goto cleanup;
    }

    /* Get default execution stream and pool */
    ABT_xstream_self(&xstream);
    ABT_xstream_get_main_pools(xstream, 1, &pool);

    const char* key = "broadcast_event";
    const char* value = "event_data";

    printf("Starting multiple waiters...\n");

    /* Create multiple consumer threads */
#define NUM_CONSUMERS 5
    ABT_thread consumers[NUM_CONSUMERS];
    struct consumer_args args[NUM_CONSUMERS];

    for(int i = 0; i < NUM_CONSUMERS; i++) {
        args[i].db = db;
        args[i].key = key;
        args[i].ksize = strlen(key);
        args[i].id = i;

        ABT_thread_create(pool, consumer_thread, &args[i],
                          ABT_THREAD_ATTR_NULL, &consumers[i]);
    }

    /* Give consumers time to start waiting */
    usleep(200000); /* 200ms */

    /* Single notification wakes all waiters */
    printf("\n[Producer] Broadcasting event to all waiters...\n");

    ret = yk_put(db, YOKAN_MODE_NOTIFY,
                 key, strlen(key),
                 value, strlen(value));

    if(ret == YOKAN_SUCCESS) {
        printf("[Producer] Broadcast sent\n");
    }

    /* Wait for all consumers to complete */
    for(int i = 0; i < NUM_CONSUMERS; i++) {
        ABT_thread_free(&consumers[i]);
    }

    printf("\n=== All consumers received the broadcast ===\n");

    /* Cleanup */
    yk_database_handle_release(db);

cleanup:
    if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
    yk_client_finalize(client);
    margo_finalize(mid);

    return (ret == YOKAN_SUCCESS) ? 0 : 1;
}

This is useful for broadcast-style notifications where multiple workers need to be triggered by the same event.

Producer/Consumer Pattern

A common use case is implementing producer/consumer queues where consumers wait for producers to provide data:

/*
 * (C) 2024 The University of Chicago
 *
 * See COPYRIGHT in top-level directory.
 */
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define NUM_PRODUCERS 3
#define NUM_CONSUMERS 2
#define ITEMS_PER_PRODUCER 5

struct shared_state {
    yk_database_handle_t db;
    ABT_mutex mutex;
    int items_produced;
    int items_consumed;
};

struct consumer_args {
    struct shared_state* state;
    int id;
};

struct producer_args {
    struct shared_state* state;
    int id;
};

static void consumer_thread(void* arg)
{
    struct consumer_args* args = (struct consumer_args*)arg;
    struct shared_state* state = args->state;
    int id = args->id;

    while(1) {
        /* Get next work item ID (atomically) */
        ABT_mutex_lock(state->mutex);
        int item_id = state->items_consumed++;
        int consumed = state->items_consumed;
        ABT_mutex_unlock(state->mutex);

        /* Generate unique work item key */
        char key[64];
        snprintf(key, sizeof(key), "work_item_%d", item_id);

        char buffer[256];
        size_t vsize = sizeof(buffer);

        /* Wait for work item and consume it atomically */
        yk_return_t ret = yk_get(state->db, YOKAN_MODE_WAIT | YOKAN_MODE_CONSUME,
                                   key, strlen(key),
                                   buffer, &vsize);

        if(ret == YOKAN_SUCCESS) {
            printf("[Consumer %d] Processing: %.*s\n", id, (int)vsize, buffer);

            /* Simulate work */
            usleep(50000); /* 50ms */
        }

        /* Stop when we've consumed all expected items */
        if(consumed >= NUM_PRODUCERS * ITEMS_PER_PRODUCER) {
            break;
        }
    }

    printf("[Consumer %d] Finished\n", id);
}

static void producer_thread(void* arg)
{
    struct producer_args* args = (struct producer_args*)arg;
    struct shared_state* state = args->state;
    int id = args->id;

    for(int j = 0; j < ITEMS_PER_PRODUCER; j++) {
        /* Get next item ID (atomically) */
        ABT_mutex_lock(state->mutex);
        int item_id = state->items_produced++;
        ABT_mutex_unlock(state->mutex);

        char key[64], work[256];
        snprintf(key, sizeof(key), "work_item_%d", item_id);
        snprintf(work, sizeof(work), "Task %d from producer %d", item_id, id);

        printf("[Producer %d] Creating: %s\n", id, work);

        /* Put work item with notification */
        yk_put(state->db, YOKAN_MODE_NOTIFY,
               key, strlen(key),
               work, strlen(work));

        /* Simulate production delay */
        usleep(100000); /* 100ms */
    }

    printf("[Producer %d] Finished\n", id);
}

int main(int argc, char** argv) {
    if(argc != 3) {
        fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
        return 1;
    }

    const char* server_addr_str = argv[1];
    uint16_t provider_id = (uint16_t)atoi(argv[2]);

    /* Initialize Margo */
    margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
    if(mid == MARGO_INSTANCE_NULL) {
        fprintf(stderr, "Failed to initialize Margo\n");
        return 1;
    }

    yk_return_t ret;
    yk_client_t client;
    yk_database_handle_t db;
    hg_addr_t server_addr = HG_ADDR_NULL;
    ABT_xstream xstream;
    ABT_pool pool;
    ABT_mutex mutex;

    /* Initialize shared state */
    struct shared_state state = { .items_produced = 0, .items_consumed = 0 };

    /* Initialize Yokan client */
    ret = yk_client_init(mid, &client);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to initialize Yokan client\n");
        margo_finalize(mid);
        return 1;
    }

    /* Look up address */
    hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
    if(hret != HG_SUCCESS) {
        fprintf(stderr, "Failed to lookup server address\n");
        goto cleanup;
    }

    /* Create database handle */
    ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to create database handle\n");
        goto cleanup;
    }

    state.db = db;

    /* Create mutex for atomic counters */
    ABT_mutex_create(&mutex);
    state.mutex = mutex;

    /* Get default execution stream and pool */
    ABT_xstream_self(&xstream);
    ABT_xstream_get_main_pools(xstream, 1, &pool);

    printf("Starting producer/consumer workflow...\n");

    /* Create consumer threads */
    ABT_thread consumers[NUM_CONSUMERS];
    struct consumer_args consumer_args[NUM_CONSUMERS];

    for(int i = 0; i < NUM_CONSUMERS; i++) {
        consumer_args[i].state = &state;
        consumer_args[i].id = i;
        ABT_thread_create(pool, consumer_thread, &consumer_args[i],
                          ABT_THREAD_ATTR_NULL, &consumers[i]);
    }

    /* Give consumers time to start waiting */
    usleep(100000); /* 100ms */

    /* Create producer threads */
    ABT_thread producers[NUM_PRODUCERS];
    struct producer_args producer_args[NUM_PRODUCERS];

    for(int i = 0; i < NUM_PRODUCERS; i++) {
        producer_args[i].state = &state;
        producer_args[i].id = i;
        ABT_thread_create(pool, producer_thread, &producer_args[i],
                          ABT_THREAD_ATTR_NULL, &producers[i]);
    }

    /* Wait for all threads to complete */
    for(int i = 0; i < NUM_PRODUCERS; i++) {
        ABT_thread_free(&producers[i]);
    }
    for(int i = 0; i < NUM_CONSUMERS; i++) {
        ABT_thread_free(&consumers[i]);
    }

    printf("\n=== Producer/Consumer workflow completed ===\n");
    printf("Items produced: %d\n", state.items_produced);
    printf("Items consumed: %d\n", state.items_consumed);

    /* Cleanup */
    ABT_mutex_free(&mutex);
    yk_database_handle_release(db);

cleanup:
    if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
    yk_client_finalize(client);
    margo_finalize(mid);

    return (ret == YOKAN_SUCCESS) ? 0 : 1;
}

This example demonstrates:

  • Multiple producers creating work items

  • Multiple consumers waiting for and processing items

  • Using key naming to create a work queue

  • Consuming items with YOKAN_MODE_CONSUME to prevent duplicate processing

Distributed Coordination

Wait/notify can coordinate distributed processes across different nodes:

/*
 * (C) 2024 The University of Chicago
 *
 * See COPYRIGHT in top-level directory.
 */
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define NUM_RANKS 4

/* Simulate distributed barrier synchronization */
static void barrier_sync(yk_database_handle_t db, int rank, int num_ranks)
{
    char my_key[64];
    const char* ready_value = "ready";

    snprintf(my_key, sizeof(my_key), "barrier_%d", rank);

    printf("[Rank %d] Announcing readiness\n", rank);

    /* Announce that this rank is ready */
    yk_put(db, YOKAN_MODE_NOTIFY,
           my_key, strlen(my_key),
           ready_value, strlen(ready_value));

    /* Wait for all other ranks to be ready */
    for(int i = 0; i < num_ranks; i++) {
        if(i == rank) continue;

        char other_key[64];
        char buffer[256];
        size_t vsize = sizeof(buffer);

        snprintf(other_key, sizeof(other_key), "barrier_%d", i);

        printf("[Rank %d] Waiting for rank %d\n", rank, i);

        yk_get(db, YOKAN_MODE_WAIT,
               other_key, strlen(other_key),
               buffer, &vsize);
    }

    printf("[Rank %d] All ranks synchronized!\n", rank);
}

struct rank_args {
    yk_database_handle_t db;
    int rank;
    int num_ranks;
};

static void rank_thread(void* arg)
{
    struct rank_args* args = (struct rank_args*)arg;

    /* Simulate different arrival times */
    usleep(args->rank * 100000); /* rank * 100ms */

    /* Execute barrier synchronization */
    barrier_sync(args->db, args->rank, args->num_ranks);

    /* Continue with synchronized work */
    printf("[Rank %d] Proceeding with synchronized work\n", args->rank);
}

int main(int argc, char** argv) {
    if(argc != 3) {
        fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
        return 1;
    }

    const char* server_addr_str = argv[1];
    uint16_t provider_id = (uint16_t)atoi(argv[2]);

    /* Initialize Margo */
    margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
    if(mid == MARGO_INSTANCE_NULL) {
        fprintf(stderr, "Failed to initialize Margo\n");
        return 1;
    }

    yk_return_t ret;
    yk_client_t client;
    yk_database_handle_t db;
    hg_addr_t server_addr = HG_ADDR_NULL;
    ABT_xstream xstream;
    ABT_pool pool;

    /* Initialize Yokan client */
    ret = yk_client_init(mid, &client);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to initialize Yokan client\n");
        margo_finalize(mid);
        return 1;
    }

    /* Look up address */
    hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
    if(hret != HG_SUCCESS) {
        fprintf(stderr, "Failed to lookup server address\n");
        goto cleanup;
    }

    /* Create database handle */
    ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to create database handle\n");
        goto cleanup;
    }

    /* Get default execution stream and pool */
    ABT_xstream_self(&xstream);
    ABT_xstream_get_main_pools(xstream, 1, &pool);

    printf("Simulating distributed barrier with %d ranks\n", NUM_RANKS);

    /* Simulate multiple distributed processes */
    ABT_thread ranks[NUM_RANKS];
    struct rank_args args[NUM_RANKS];

    for(int rank = 0; rank < NUM_RANKS; rank++) {
        args[rank].db = db;
        args[rank].rank = rank;
        args[rank].num_ranks = NUM_RANKS;

        ABT_thread_create(pool, rank_thread, &args[rank],
                          ABT_THREAD_ATTR_NULL, &ranks[rank]);
    }

    /* Wait for all ranks to complete */
    for(int rank = 0; rank < NUM_RANKS; rank++) {
        ABT_thread_free(&ranks[rank]);
    }

    printf("\n=== Distributed coordination completed ===\n");

    /* Cleanup barrier keys */
    for(int i = 0; i < NUM_RANKS; i++) {
        char key[64];
        snprintf(key, sizeof(key), "barrier_%d", i);
        yk_erase(db, YOKAN_MODE_DEFAULT, key, strlen(key));
    }

    /* Cleanup */
    yk_database_handle_release(db);

cleanup:
    if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
    yk_client_finalize(client);
    margo_finalize(mid);

    return (ret == YOKAN_SUCCESS) ? 0 : 1;
}

This pattern is useful for:

  • Distributed barriers

  • Checkpoint synchronization

  • Multi-stage pipelines

  • Leader election

Without Wait/Notify

For comparison, here’s what coordination looks like without wait/notify, using polling instead:

/*
 * (C) 2024 The University of Chicago
 *
 * See COPYRIGHT in top-level directory.
 */
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

struct consumer_args {
    yk_database_handle_t db;
    const char* key;
    size_t ksize;
};

static void consumer_thread(void* arg)
{
    struct consumer_args* args = (struct consumer_args*)arg;

    printf("[Consumer] Polling for key '%s'...\n", args->key);

    char buffer[256];
    size_t vsize;
    int poll_count = 0;

    /* Keep polling until key exists */
    while(1) {
        poll_count++;
        vsize = sizeof(buffer);

        /* Try to get the key (no WAIT mode) */
        yk_return_t ret = yk_get(args->db, YOKAN_MODE_DEFAULT,
                                   args->key, args->ksize,
                                   buffer, &vsize);

        if(ret == YOKAN_SUCCESS) {
            /* Success! Key found */
            break;
        } else {
            /* Key doesn't exist yet, sleep and retry */
            usleep(50000); /* 50ms */
        }
    }

    printf("[Consumer] Received after %d poll attempts: %.*s\n",
           poll_count, (int)vsize, buffer);
}

int main(int argc, char** argv) {
    if(argc != 3) {
        fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
        return 1;
    }

    const char* server_addr_str = argv[1];
    uint16_t provider_id = (uint16_t)atoi(argv[2]);

    /* Initialize Margo */
    margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
    if(mid == MARGO_INSTANCE_NULL) {
        fprintf(stderr, "Failed to initialize Margo\n");
        return 1;
    }

    yk_return_t ret;
    yk_client_t client;
    yk_database_handle_t db;
    hg_addr_t server_addr = HG_ADDR_NULL;
    ABT_xstream xstream;
    ABT_pool pool;
    ABT_thread consumer;

    /* Initialize Yokan client */
    ret = yk_client_init(mid, &client);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to initialize Yokan client\n");
        margo_finalize(mid);
        return 1;
    }

    /* Look up address */
    hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
    if(hret != HG_SUCCESS) {
        fprintf(stderr, "Failed to lookup server address\n");
        goto cleanup;
    }

    /* Create database handle */
    ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
    if(ret != YOKAN_SUCCESS) {
        fprintf(stderr, "Failed to create database handle\n");
        goto cleanup;
    }

    /* Get default execution stream and pool */
    ABT_xstream_self(&xstream);
    ABT_xstream_get_main_pools(xstream, 1, &pool);

    const char* key = "polling_key";
    const char* value = "polling_value";

    /* Launch consumer thread that polls for key */
    struct consumer_args args = { db, key, strlen(key) };
    ABT_thread_create(pool, consumer_thread, &args,
                      ABT_THREAD_ATTR_NULL, &consumer);

    /* Give consumer time to start polling */
    usleep(100000); /* 100ms */

    /* Producer: put value (no notify needed) */
    printf("[Producer] Putting value...\n");

    ret = yk_put(db, YOKAN_MODE_DEFAULT,
                 key, strlen(key),
                 value, strlen(value));

    if(ret == YOKAN_SUCCESS) {
        printf("[Producer] Value sent (consumer will find it eventually)\n");
    }

    /* Wait for consumer thread to finish */
    ABT_thread_free(&consumer);

    printf("\n=== Polling approach completed ===\n");
    printf("Note: Multiple network round-trips wasted on polling\n");
    printf("Compare this to wait/notify for better efficiency\n");

    /* Cleanup */
    yk_erase(db, YOKAN_MODE_DEFAULT, key, strlen(key));
    yk_database_handle_release(db);

cleanup:
    if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
    yk_client_finalize(client);
    margo_finalize(mid);

    return (ret == YOKAN_SUCCESS) ? 0 : 1;
}

The polling approach:

  • Wastes CPU and network resources

  • Has unpredictable latency based on poll interval

  • Doesn’t scale well with many waiters

  • Can miss rapid updates

The wait/notify approach is more efficient and responsive.

Timeouts

There is currently no way to timeout a watcher; the watcher will need to be notified by a writer.