Event Notifications with Wait/Notify
Yokan provides a powerful event notification mechanism through the
YOKAN_MODE_WAIT and YOKAN_MODE_NOTIFY modes. This allows clients
to coordinate asynchronously by waiting for specific keys to appear in
the database and being notified when they do.
This feature is particularly useful for:
Producer/consumer patterns
Distributed coordination
Event-driven workflows
Barrier synchronization
Pipeline processing
Basic Concepts
The wait/notify mechanism works as follows:
Waiting clients use
YOKAN_MODE_WAITwhen performing get operations on keys that don’t yet exist. Instead of returning an error, the operation blocks until the key appears.Notifying clients use
YOKAN_MODE_NOTIFYwhen putting values. This wakes up any clients waiting for those specific keys.
The key advantage is that waiting clients don’t need to poll the database repeatedly - they simply block until the data is available, reducing network traffic and improving efficiency.
Important
Not all backends support wait/notify modes. If you attempt to use
these modes with an unsupported backend, you’ll receive a
YOKAN_ERR_MODE error. The in-memory backends (map, unordered_map)
support this feature.
Basic Wait/Notify Example
Here’s a simple example demonstrating the wait/notify pattern:
/*
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
struct consumer_args {
yk_database_handle_t db;
const char* key;
size_t ksize;
};
static void consumer_thread(void* arg)
{
struct consumer_args* args = (struct consumer_args*)arg;
printf("[Consumer] Waiting for key '%s'...\n", args->key);
char buffer[256];
size_t vsize = sizeof(buffer);
/* Use YOKAN_MODE_WAIT to block until key appears */
yk_return_t ret = yk_get(args->db, YOKAN_MODE_WAIT,
args->key, args->ksize,
buffer, &vsize);
if(ret == YOKAN_SUCCESS) {
printf("[Consumer] Received: %.*s\n", (int)vsize, buffer);
} else {
fprintf(stderr, "[Consumer] Error getting key\n");
}
}
int main(int argc, char** argv) {
if(argc != 3) {
fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
return 1;
}
const char* server_addr_str = argv[1];
uint16_t provider_id = (uint16_t)atoi(argv[2]);
/* Initialize Margo and Argobots */
margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL) {
fprintf(stderr, "Failed to initialize Margo\n");
return 1;
}
yk_return_t ret;
yk_client_t client;
yk_database_handle_t db;
hg_addr_t server_addr = HG_ADDR_NULL;
ABT_xstream xstream;
ABT_pool pool;
ABT_thread consumer;
/* Initialize Yokan client */
ret = yk_client_init(mid, &client);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to initialize Yokan client\n");
margo_finalize(mid);
return 1;
}
/* Look up address */
hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
if(hret != HG_SUCCESS) {
fprintf(stderr, "Failed to lookup server address\n");
goto cleanup;
}
/* Create database handle */
ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to create database handle\n");
goto cleanup;
}
/* Get default execution stream and pool for threading */
ABT_xstream_self(&xstream);
ABT_xstream_get_main_pools(xstream, 1, &pool);
const char* key = "notification_key";
const char* value = "notification_value";
/* Launch consumer thread that waits for key */
struct consumer_args args = { db, key, strlen(key) };
ABT_thread_create(pool, consumer_thread, &args,
ABT_THREAD_ATTR_NULL, &consumer);
/* Give consumer time to start waiting */
usleep(100000); /* 100ms */
/* Producer: put value with notification */
printf("[Producer] Putting value with notification...\n");
/* Use YOKAN_MODE_NOTIFY to wake up waiting consumers */
ret = yk_put(db, YOKAN_MODE_NOTIFY,
key, strlen(key),
value, strlen(value));
if(ret == YOKAN_SUCCESS) {
printf("[Producer] Value sent\n");
}
/* Wait for consumer thread to finish */
ABT_thread_free(&consumer);
printf("\n=== Wait/Notify completed successfully ===\n");
/* Cleanup */
yk_database_handle_release(db);
cleanup:
if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
yk_client_finalize(client);
margo_finalize(mid);
return (ret == YOKAN_SUCCESS) ? 0 : 1;
}
In this example:
The consumer starts waiting for a key that doesn’t exist yet
The producer puts the value with
YOKAN_MODE_NOTIFYThe consumer’s
get()operation completes and receives the value
The consumer thread blocks at the get() call until the producer
puts the value, providing efficient event-driven coordination.
Multiple Waiters
Multiple clients can wait for the same key. When a put operation
uses YOKAN_MODE_NOTIFY, all waiting clients are woken up:
/*
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
struct consumer_args {
yk_database_handle_t db;
const char* key;
size_t ksize;
int id;
};
static void consumer_thread(void* arg)
{
struct consumer_args* args = (struct consumer_args*)arg;
printf("[Consumer %d] Waiting for event...\n", args->id);
char buffer[256];
size_t vsize = sizeof(buffer);
/* All consumers wait for the same key */
yk_return_t ret = yk_get(args->db, YOKAN_MODE_WAIT,
args->key, args->ksize,
buffer, &vsize);
if(ret == YOKAN_SUCCESS) {
printf("[Consumer %d] Received: %.*s\n", args->id, (int)vsize, buffer);
} else {
fprintf(stderr, "[Consumer %d] Error getting key\n", args->id);
}
}
int main(int argc, char** argv) {
if(argc != 3) {
fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
return 1;
}
const char* server_addr_str = argv[1];
uint16_t provider_id = (uint16_t)atoi(argv[2]);
/* Initialize Margo */
margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL) {
fprintf(stderr, "Failed to initialize Margo\n");
return 1;
}
yk_return_t ret;
yk_client_t client;
yk_database_handle_t db;
hg_addr_t server_addr = HG_ADDR_NULL;
ABT_xstream xstream;
ABT_pool pool;
/* Initialize Yokan client */
ret = yk_client_init(mid, &client);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to initialize Yokan client\n");
margo_finalize(mid);
return 1;
}
/* Look up address */
hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
if(hret != HG_SUCCESS) {
fprintf(stderr, "Failed to lookup server address\n");
goto cleanup;
}
/* Create database handle */
ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to create database handle\n");
goto cleanup;
}
/* Get default execution stream and pool */
ABT_xstream_self(&xstream);
ABT_xstream_get_main_pools(xstream, 1, &pool);
const char* key = "broadcast_event";
const char* value = "event_data";
printf("Starting multiple waiters...\n");
/* Create multiple consumer threads */
#define NUM_CONSUMERS 5
ABT_thread consumers[NUM_CONSUMERS];
struct consumer_args args[NUM_CONSUMERS];
for(int i = 0; i < NUM_CONSUMERS; i++) {
args[i].db = db;
args[i].key = key;
args[i].ksize = strlen(key);
args[i].id = i;
ABT_thread_create(pool, consumer_thread, &args[i],
ABT_THREAD_ATTR_NULL, &consumers[i]);
}
/* Give consumers time to start waiting */
usleep(200000); /* 200ms */
/* Single notification wakes all waiters */
printf("\n[Producer] Broadcasting event to all waiters...\n");
ret = yk_put(db, YOKAN_MODE_NOTIFY,
key, strlen(key),
value, strlen(value));
if(ret == YOKAN_SUCCESS) {
printf("[Producer] Broadcast sent\n");
}
/* Wait for all consumers to complete */
for(int i = 0; i < NUM_CONSUMERS; i++) {
ABT_thread_free(&consumers[i]);
}
printf("\n=== All consumers received the broadcast ===\n");
/* Cleanup */
yk_database_handle_release(db);
cleanup:
if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
yk_client_finalize(client);
margo_finalize(mid);
return (ret == YOKAN_SUCCESS) ? 0 : 1;
}
This is useful for broadcast-style notifications where multiple workers need to be triggered by the same event.
Producer/Consumer Pattern
A common use case is implementing producer/consumer queues where consumers wait for producers to provide data:
/*
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define NUM_PRODUCERS 3
#define NUM_CONSUMERS 2
#define ITEMS_PER_PRODUCER 5
struct shared_state {
yk_database_handle_t db;
ABT_mutex mutex;
int items_produced;
int items_consumed;
};
struct consumer_args {
struct shared_state* state;
int id;
};
struct producer_args {
struct shared_state* state;
int id;
};
static void consumer_thread(void* arg)
{
struct consumer_args* args = (struct consumer_args*)arg;
struct shared_state* state = args->state;
int id = args->id;
while(1) {
/* Get next work item ID (atomically) */
ABT_mutex_lock(state->mutex);
int item_id = state->items_consumed++;
int consumed = state->items_consumed;
ABT_mutex_unlock(state->mutex);
/* Generate unique work item key */
char key[64];
snprintf(key, sizeof(key), "work_item_%d", item_id);
char buffer[256];
size_t vsize = sizeof(buffer);
/* Wait for work item and consume it atomically */
yk_return_t ret = yk_get(state->db, YOKAN_MODE_WAIT | YOKAN_MODE_CONSUME,
key, strlen(key),
buffer, &vsize);
if(ret == YOKAN_SUCCESS) {
printf("[Consumer %d] Processing: %.*s\n", id, (int)vsize, buffer);
/* Simulate work */
usleep(50000); /* 50ms */
}
/* Stop when we've consumed all expected items */
if(consumed >= NUM_PRODUCERS * ITEMS_PER_PRODUCER) {
break;
}
}
printf("[Consumer %d] Finished\n", id);
}
static void producer_thread(void* arg)
{
struct producer_args* args = (struct producer_args*)arg;
struct shared_state* state = args->state;
int id = args->id;
for(int j = 0; j < ITEMS_PER_PRODUCER; j++) {
/* Get next item ID (atomically) */
ABT_mutex_lock(state->mutex);
int item_id = state->items_produced++;
ABT_mutex_unlock(state->mutex);
char key[64], work[256];
snprintf(key, sizeof(key), "work_item_%d", item_id);
snprintf(work, sizeof(work), "Task %d from producer %d", item_id, id);
printf("[Producer %d] Creating: %s\n", id, work);
/* Put work item with notification */
yk_put(state->db, YOKAN_MODE_NOTIFY,
key, strlen(key),
work, strlen(work));
/* Simulate production delay */
usleep(100000); /* 100ms */
}
printf("[Producer %d] Finished\n", id);
}
int main(int argc, char** argv) {
if(argc != 3) {
fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
return 1;
}
const char* server_addr_str = argv[1];
uint16_t provider_id = (uint16_t)atoi(argv[2]);
/* Initialize Margo */
margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL) {
fprintf(stderr, "Failed to initialize Margo\n");
return 1;
}
yk_return_t ret;
yk_client_t client;
yk_database_handle_t db;
hg_addr_t server_addr = HG_ADDR_NULL;
ABT_xstream xstream;
ABT_pool pool;
ABT_mutex mutex;
/* Initialize shared state */
struct shared_state state = { .items_produced = 0, .items_consumed = 0 };
/* Initialize Yokan client */
ret = yk_client_init(mid, &client);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to initialize Yokan client\n");
margo_finalize(mid);
return 1;
}
/* Look up address */
hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
if(hret != HG_SUCCESS) {
fprintf(stderr, "Failed to lookup server address\n");
goto cleanup;
}
/* Create database handle */
ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to create database handle\n");
goto cleanup;
}
state.db = db;
/* Create mutex for atomic counters */
ABT_mutex_create(&mutex);
state.mutex = mutex;
/* Get default execution stream and pool */
ABT_xstream_self(&xstream);
ABT_xstream_get_main_pools(xstream, 1, &pool);
printf("Starting producer/consumer workflow...\n");
/* Create consumer threads */
ABT_thread consumers[NUM_CONSUMERS];
struct consumer_args consumer_args[NUM_CONSUMERS];
for(int i = 0; i < NUM_CONSUMERS; i++) {
consumer_args[i].state = &state;
consumer_args[i].id = i;
ABT_thread_create(pool, consumer_thread, &consumer_args[i],
ABT_THREAD_ATTR_NULL, &consumers[i]);
}
/* Give consumers time to start waiting */
usleep(100000); /* 100ms */
/* Create producer threads */
ABT_thread producers[NUM_PRODUCERS];
struct producer_args producer_args[NUM_PRODUCERS];
for(int i = 0; i < NUM_PRODUCERS; i++) {
producer_args[i].state = &state;
producer_args[i].id = i;
ABT_thread_create(pool, producer_thread, &producer_args[i],
ABT_THREAD_ATTR_NULL, &producers[i]);
}
/* Wait for all threads to complete */
for(int i = 0; i < NUM_PRODUCERS; i++) {
ABT_thread_free(&producers[i]);
}
for(int i = 0; i < NUM_CONSUMERS; i++) {
ABT_thread_free(&consumers[i]);
}
printf("\n=== Producer/Consumer workflow completed ===\n");
printf("Items produced: %d\n", state.items_produced);
printf("Items consumed: %d\n", state.items_consumed);
/* Cleanup */
ABT_mutex_free(&mutex);
yk_database_handle_release(db);
cleanup:
if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
yk_client_finalize(client);
margo_finalize(mid);
return (ret == YOKAN_SUCCESS) ? 0 : 1;
}
This example demonstrates:
Multiple producers creating work items
Multiple consumers waiting for and processing items
Using key naming to create a work queue
Consuming items with
YOKAN_MODE_CONSUMEto prevent duplicate processing
Distributed Coordination
Wait/notify can coordinate distributed processes across different nodes:
/*
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define NUM_RANKS 4
/* Simulate distributed barrier synchronization */
static void barrier_sync(yk_database_handle_t db, int rank, int num_ranks)
{
char my_key[64];
const char* ready_value = "ready";
snprintf(my_key, sizeof(my_key), "barrier_%d", rank);
printf("[Rank %d] Announcing readiness\n", rank);
/* Announce that this rank is ready */
yk_put(db, YOKAN_MODE_NOTIFY,
my_key, strlen(my_key),
ready_value, strlen(ready_value));
/* Wait for all other ranks to be ready */
for(int i = 0; i < num_ranks; i++) {
if(i == rank) continue;
char other_key[64];
char buffer[256];
size_t vsize = sizeof(buffer);
snprintf(other_key, sizeof(other_key), "barrier_%d", i);
printf("[Rank %d] Waiting for rank %d\n", rank, i);
yk_get(db, YOKAN_MODE_WAIT,
other_key, strlen(other_key),
buffer, &vsize);
}
printf("[Rank %d] All ranks synchronized!\n", rank);
}
struct rank_args {
yk_database_handle_t db;
int rank;
int num_ranks;
};
static void rank_thread(void* arg)
{
struct rank_args* args = (struct rank_args*)arg;
/* Simulate different arrival times */
usleep(args->rank * 100000); /* rank * 100ms */
/* Execute barrier synchronization */
barrier_sync(args->db, args->rank, args->num_ranks);
/* Continue with synchronized work */
printf("[Rank %d] Proceeding with synchronized work\n", args->rank);
}
int main(int argc, char** argv) {
if(argc != 3) {
fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
return 1;
}
const char* server_addr_str = argv[1];
uint16_t provider_id = (uint16_t)atoi(argv[2]);
/* Initialize Margo */
margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL) {
fprintf(stderr, "Failed to initialize Margo\n");
return 1;
}
yk_return_t ret;
yk_client_t client;
yk_database_handle_t db;
hg_addr_t server_addr = HG_ADDR_NULL;
ABT_xstream xstream;
ABT_pool pool;
/* Initialize Yokan client */
ret = yk_client_init(mid, &client);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to initialize Yokan client\n");
margo_finalize(mid);
return 1;
}
/* Look up address */
hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
if(hret != HG_SUCCESS) {
fprintf(stderr, "Failed to lookup server address\n");
goto cleanup;
}
/* Create database handle */
ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to create database handle\n");
goto cleanup;
}
/* Get default execution stream and pool */
ABT_xstream_self(&xstream);
ABT_xstream_get_main_pools(xstream, 1, &pool);
printf("Simulating distributed barrier with %d ranks\n", NUM_RANKS);
/* Simulate multiple distributed processes */
ABT_thread ranks[NUM_RANKS];
struct rank_args args[NUM_RANKS];
for(int rank = 0; rank < NUM_RANKS; rank++) {
args[rank].db = db;
args[rank].rank = rank;
args[rank].num_ranks = NUM_RANKS;
ABT_thread_create(pool, rank_thread, &args[rank],
ABT_THREAD_ATTR_NULL, &ranks[rank]);
}
/* Wait for all ranks to complete */
for(int rank = 0; rank < NUM_RANKS; rank++) {
ABT_thread_free(&ranks[rank]);
}
printf("\n=== Distributed coordination completed ===\n");
/* Cleanup barrier keys */
for(int i = 0; i < NUM_RANKS; i++) {
char key[64];
snprintf(key, sizeof(key), "barrier_%d", i);
yk_erase(db, YOKAN_MODE_DEFAULT, key, strlen(key));
}
/* Cleanup */
yk_database_handle_release(db);
cleanup:
if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
yk_client_finalize(client);
margo_finalize(mid);
return (ret == YOKAN_SUCCESS) ? 0 : 1;
}
This pattern is useful for:
Distributed barriers
Checkpoint synchronization
Multi-stage pipelines
Leader election
Without Wait/Notify
For comparison, here’s what coordination looks like without wait/notify, using polling instead:
/*
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <yokan/database.h>
#include <yokan/client.h>
#include <margo.h>
#include <abt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
struct consumer_args {
yk_database_handle_t db;
const char* key;
size_t ksize;
};
static void consumer_thread(void* arg)
{
struct consumer_args* args = (struct consumer_args*)arg;
printf("[Consumer] Polling for key '%s'...\n", args->key);
char buffer[256];
size_t vsize;
int poll_count = 0;
/* Keep polling until key exists */
while(1) {
poll_count++;
vsize = sizeof(buffer);
/* Try to get the key (no WAIT mode) */
yk_return_t ret = yk_get(args->db, YOKAN_MODE_DEFAULT,
args->key, args->ksize,
buffer, &vsize);
if(ret == YOKAN_SUCCESS) {
/* Success! Key found */
break;
} else {
/* Key doesn't exist yet, sleep and retry */
usleep(50000); /* 50ms */
}
}
printf("[Consumer] Received after %d poll attempts: %.*s\n",
poll_count, (int)vsize, buffer);
}
int main(int argc, char** argv) {
if(argc != 3) {
fprintf(stderr, "Usage: %s <server_addr> <provider_id>\n", argv[0]);
return 1;
}
const char* server_addr_str = argv[1];
uint16_t provider_id = (uint16_t)atoi(argv[2]);
/* Initialize Margo */
margo_instance_id mid = margo_init("na+sm", MARGO_CLIENT_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL) {
fprintf(stderr, "Failed to initialize Margo\n");
return 1;
}
yk_return_t ret;
yk_client_t client;
yk_database_handle_t db;
hg_addr_t server_addr = HG_ADDR_NULL;
ABT_xstream xstream;
ABT_pool pool;
ABT_thread consumer;
/* Initialize Yokan client */
ret = yk_client_init(mid, &client);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to initialize Yokan client\n");
margo_finalize(mid);
return 1;
}
/* Look up address */
hg_return_t hret = margo_addr_lookup(mid, server_addr_str, &server_addr);
if(hret != HG_SUCCESS) {
fprintf(stderr, "Failed to lookup server address\n");
goto cleanup;
}
/* Create database handle */
ret = yk_database_handle_create(client, server_addr, provider_id, 1, &db);
if(ret != YOKAN_SUCCESS) {
fprintf(stderr, "Failed to create database handle\n");
goto cleanup;
}
/* Get default execution stream and pool */
ABT_xstream_self(&xstream);
ABT_xstream_get_main_pools(xstream, 1, &pool);
const char* key = "polling_key";
const char* value = "polling_value";
/* Launch consumer thread that polls for key */
struct consumer_args args = { db, key, strlen(key) };
ABT_thread_create(pool, consumer_thread, &args,
ABT_THREAD_ATTR_NULL, &consumer);
/* Give consumer time to start polling */
usleep(100000); /* 100ms */
/* Producer: put value (no notify needed) */
printf("[Producer] Putting value...\n");
ret = yk_put(db, YOKAN_MODE_DEFAULT,
key, strlen(key),
value, strlen(value));
if(ret == YOKAN_SUCCESS) {
printf("[Producer] Value sent (consumer will find it eventually)\n");
}
/* Wait for consumer thread to finish */
ABT_thread_free(&consumer);
printf("\n=== Polling approach completed ===\n");
printf("Note: Multiple network round-trips wasted on polling\n");
printf("Compare this to wait/notify for better efficiency\n");
/* Cleanup */
yk_erase(db, YOKAN_MODE_DEFAULT, key, strlen(key));
yk_database_handle_release(db);
cleanup:
if(server_addr != HG_ADDR_NULL) margo_addr_free(mid, server_addr);
yk_client_finalize(client);
margo_finalize(mid);
return (ret == YOKAN_SUCCESS) ? 0 : 1;
}
The polling approach:
Wastes CPU and network resources
Has unpredictable latency based on poll interval
Doesn’t scale well with many waiters
Can miss rapid updates
The wait/notify approach is more efficient and responsive.
Timeouts
There is currently no way to timeout a watcher; the watcher will need to be notified by a writer.