Observing an SSG group
It may be useful for a process to be aware of an SSG group (i.e. be able to access the address of its members) without being itself a member of the group. We call such a process an observer.
Contrary to group members, which are continuously updated of membership changes, observers must periodically poll the group to refresh their view.
In the code samples bellow, Process 1 creates a group and stays alive
for 10 seconds. Process 2 reads the group id from the file created
by Process 1, and uses ssg_group_refresh
to request an up-to-date
view to a randomly-selected member.
proc1.c (show/hide)
#include <assert.h>
#include <stdio.h>
#include <ssg.h>
static void my_membership_update_cb(void* uargs,
ssg_member_id_t member_id,
ssg_member_update_type_t update_type)
{
switch(update_type) {
case SSG_MEMBER_JOINED:
printf("Member %ld joined\n", member_id);
break;
case SSG_MEMBER_LEFT:
printf("Member %ld left\n", member_id);
break;
case SSG_MEMBER_DIED:
printf("Member %ld died\n", member_id);
break;
}
}
int main(int argc, char** argv)
{
margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 1, 0);
assert(mid);
int ret = ssg_init();
assert(ret == SSG_SUCCESS);
hg_addr_t my_addr;
margo_addr_self(mid, &my_addr);
char my_addr_str[128];
size_t my_addr_str_size = 128;
margo_addr_to_string(mid, my_addr_str, &my_addr_str_size, my_addr);
margo_addr_free(mid, my_addr);
const char* group_addr_strs[] = { my_addr_str };
ssg_group_config_t config = {
.swim_period_length_ms = 1000,
.swim_suspect_timeout_periods = 5,
.swim_subgroup_member_count = -1,
.swim_disabled = 0,
.ssg_credential = -1
};
ssg_group_id_t gid;
ret = ssg_group_create(
mid, "mygroup", group_addr_strs, 1,
&config, my_membership_update_cb, NULL,
&gid);
ret = ssg_group_id_store("mygroup.ssg", gid, 1);
// ...
// do stuff using the group
// ...
margo_thread_sleep(mid, 10000);
ret = ssg_group_leave(gid);
assert(ret == SSG_SUCCESS);
ret = ssg_group_destroy(gid);
assert(ret == SSG_SUCCESS);
ret = ssg_finalize();
assert(ret == SSG_SUCCESS);
margo_finalize(mid);
return 0;
}
proc2.c (show/hide)
#include <assert.h>
#include <stdio.h>
#include <ssg.h>
static void my_membership_update_cb(void* uargs,
ssg_member_id_t member_id,
ssg_member_update_type_t update_type)
{
switch(update_type) {
case SSG_MEMBER_JOINED:
printf("Member %ld joined\n", member_id);
break;
case SSG_MEMBER_LEFT:
printf("Member %ld left\n", member_id);
break;
case SSG_MEMBER_DIED:
printf("Member %ld died\n", member_id);
break;
}
}
int main(int argc, char** argv)
{
margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 1, 0);
assert(mid);
int ret = ssg_init();
assert(ret == SSG_SUCCESS);
ssg_group_id_t gid;
int num_addrs = 1;
ret = ssg_group_id_load("mygroup.ssg", &num_addrs, &gid);
assert(ret == SSG_SUCCESS);
ret = ssg_group_refresh(mid, gid);
assert(ret == SSG_SUCCESS);
ret = ssg_group_destroy(gid);
assert(ret == SSG_SUCCESS);
ret = ssg_finalize();
assert(ret == SSG_SUCCESS);
margo_finalize(mid);
return 0;
}
Note
If all the group members have changed between two calls to ssg_group_refresh
,
the observer will obviously not be able to update its view. If this is a likely scenario,
we recommand that group members periodically store the group’s state into a file so
that observers can reload a proper group state ssg_group_refresh
fails.