Working in terms of providers
In this tutorial, we will learn about providers. This is probably the most important tutorial on Margo since it also describe the design patterns and methodology to use to develop a Margo-based service.
We will take again the sum example developped in earlier tutorials, but this time we will give it a proper microservice interface. The result will be composed of two libraries: one for clients, one for servers, as well as their respective headers. We will call this microservice Alpha.
Terminology
Although C is not an object-oriented programming language, the best way to understand providers is to think of them an object that can receive RPCs. Rather than targetting a server, as we did in previous tutorials, a client’s RPC will target a specific provider in this server.
Multiple providers belonguing to the same service and living at the same address
will expose the same set of RPCs, but each provider will be distinguished from
others using its unique provider id (an uint16_t
).
Clients will now use provider handles rather than addresses to communicate with a particular provider at a given address. A provider handle encapsulates the address of the server as well as the provider id of the provider inside this server.
Input and output structures
Our alpha service will use the same input and output structures as in earlier tutorials. We put it here for completeness.
types.h (show/hide)
#ifndef PARAM_H
#define PARAM_H
#include <mercury.h>
#include <mercury_macros.h>
MERCURY_GEN_PROC(sum_in_t,
((int32_t)(x))\
((int32_t)(y)))
MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret)))
#endif
Alpha client interface
First let’s start with a header that will be common to the server and
the client. This header will simply define the ALPHA_SUCCESS
and ALPHA_FAILURE
return codes.
alpha-common.h (show/hide)
#ifndef __ALPHA_COMMON_H
#define __ALPHA_COMMON_H
#define ALPHA_SUCCESS 0
#define ALPHA_FAILURE -1
#endif
Let’s now declare our client interface.
alpha-client.h (show/hide)
#ifndef __ALPHA_CLIENT_H
#define __ALPHA_CLIENT_H
#include <margo.h>
#include <alpha-common.h>
#if defined(__cplusplus)
extern "C" {
#endif
typedef struct alpha_client* alpha_client_t;
#define ALPHA_CLIENT_NULL ((alpha_client_t)NULL)
typedef struct alpha_provider_handle *alpha_provider_handle_t;
#define ALPHA_PROVIDER_HANDLE_NULL ((alpha_provider_handle_t)NULL)
/**
* @brief Creates a ALPHA client.
*
* @param[in] mid Margo instance
* @param[out] client ALPHA client
*
* @return ALPHA_SUCCESS or error code defined in alpha-common.h
*/
int alpha_client_init(margo_instance_id mid, alpha_client_t* client);
/**
* @brief Finalizes a ALPHA client.
*
* @param[in] client ALPHA client to finalize
*
* @return ALPHA_SUCCESS or error code defined in alpha-common.h
*/
int alpha_client_finalize(alpha_client_t client);
/**
* @brief Creates a ALPHA provider handle.
*
* @param[in] client ALPHA client responsible for the provider handle
* @param[in] addr Mercury address of the provider
* @param[in] provider_id id of the provider
* @param[in] handle provider handle
*
* @return ALPHA_SUCCESS or error code defined in alpha-common.h
*/
int alpha_provider_handle_create(
alpha_client_t client,
hg_addr_t addr,
uint16_t provider_id,
alpha_provider_handle_t* handle);
/**
* @brief Increments the reference counter of a provider handle.
*
* @param handle provider handle
*
* @return ALPHA_SUCCESS or error code defined in alpha-common.h
*/
int alpha_provider_handle_ref_incr(
alpha_provider_handle_t handle);
/**
* @brief Releases the provider handle. This will decrement the
* reference counter, and free the provider handle if the reference
* counter reaches 0.
*
* @param[in] handle provider handle to release.
*
* @return ALPHA_SUCCESS or error code defined in alpha-common.h
*/
int alpha_provider_handle_release(alpha_provider_handle_t handle);
/**
* @brief Makes the target ALPHA provider compute the sum of the
* two numbers and return the result.
*
* @param[in] handle provide handle.
* @param[in] x first number.
* @param[in] y second number.
* @param[out] result resulting value.
*
* @return ALPHA_SUCCESS or error code defined in alpha-common.h
*/
int alpha_compute_sum(
alpha_provider_handle_t handle,
int32_t x,
int32_t y,
int32_t* result);
#endif
The client interface defines two opaque pointers to structures.
The
alpha_client_t
handle will be pointing to an object keeping track of registered RPC identifiers for our microservice. An object of this type will be created usingalpha_client_init
and destroyed usingalpha_client_finalize
.The
alpha_provider_handle_t
handle will be pointing to a provider handle for a provider of the Alpha service. An object of this type will be created usingalpha_provider_handle_create
function and destroyed usingalpha_provider_handle_release
. This object will have an internal reference count.alpha_provider_handle_ref_incr
will be used to manually increment this reference count.
The alpha_compute_sum
function will be in charge of sending
a sum RPC to the provider designated by the provider handle.
Client implementation
The following code shows the implementation of our client interface.
alpha-client.c (show/hide)
#include "types.h"
#include "alpha-client.h"
#include <stdlib.h>
struct alpha_client {
margo_instance_id mid;
hg_id_t sum_id;
uint64_t num_prov_hdl;
};
struct alpha_provider_handle {
alpha_client_t client;
hg_addr_t addr;
uint16_t provider_id;
uint64_t refcount;
};
int alpha_client_init(margo_instance_id mid, alpha_client_t* client)
{
int ret = ALPHA_SUCCESS;
alpha_client_t c = (alpha_client_t)calloc(1, sizeof(*c));
if(!c) return ALPHA_FAILURE;
c->mid = mid;
hg_bool_t flag;
hg_id_t id;
margo_registered_name(mid, "alpha_sum", &id, &flag);
if(flag == HG_TRUE) {
margo_registered_name(mid, "alpha_sum", &c->sum_id, &flag);
} else {
c->sum_id = MARGO_REGISTER(mid, "alpha_sum", sum_in_t, sum_out_t, NULL);
}
*client = c;
return ALPHA_SUCCESS;
}
int alpha_client_finalize(alpha_client_t client)
{
if(client->num_prov_hdl != 0) {
margo_warning(client->mid,
"%d provider handles not released when alpha_client_finalize was called",
client->num_prov_hdl);
}
free(client);
return ALPHA_SUCCESS;
}
int alpha_provider_handle_create(
alpha_client_t client,
hg_addr_t addr,
uint16_t provider_id,
alpha_provider_handle_t* handle)
{
if(client == ALPHA_CLIENT_NULL)
return ALPHA_FAILURE;
alpha_provider_handle_t ph =
(alpha_provider_handle_t)calloc(1, sizeof(*ph));
if(!ph) return ALPHA_FAILURE;
hg_return_t ret = margo_addr_dup(client->mid, addr, &(ph->addr));
if(ret != HG_SUCCESS) {
free(ph);
return ALPHA_FAILURE;
}
ph->client = client;
ph->provider_id = provider_id;
ph->refcount = 1;
client->num_prov_hdl += 1;
*handle = ph;
return ALPHA_SUCCESS;
}
int alpha_provider_handle_ref_incr(
alpha_provider_handle_t handle)
{
if(handle == ALPHA_PROVIDER_HANDLE_NULL)
return ALPHA_FAILURE;
handle->refcount += 1;
return ALPHA_SUCCESS;
}
int alpha_provider_handle_release(alpha_provider_handle_t handle)
{
if(handle == ALPHA_PROVIDER_HANDLE_NULL)
return ALPHA_FAILURE;
handle->refcount -= 1;
if(handle->refcount == 0) {
margo_addr_free(handle->client->mid, handle->addr);
handle->client->num_prov_hdl -= 1;
free(handle);
}
return ALPHA_SUCCESS;
}
int alpha_compute_sum(
alpha_provider_handle_t handle,
int32_t x,
int32_t y,
int32_t* result)
{
hg_handle_t h;
sum_in_t in;
sum_out_t out;
hg_return_t ret;
in.x = x;
in.y = y;
ret = margo_create(handle->client->mid, handle->addr, handle->client->sum_id, &h);
if(ret != HG_SUCCESS)
return ALPHA_FAILURE;
ret = margo_provider_forward(handle->provider_id, h, &in);
if(ret != HG_SUCCESS) {
margo_destroy(h);
return ALPHA_FAILURE;
}
ret = margo_get_output(h, &out);
if(ret != HG_SUCCESS) {
margo_destroy(h);
return ALPHA_FAILURE;
}
*result = out.ret;
margo_free_output(h, &out);
margo_destroy(h);
return ALPHA_SUCCESS;
}
When initializing the client, margo_registered_name
is used
to check whether the RPC has been defined already. If it has, we use
this function to retrieve its id. Otherwise, we use the usual MARGO_REGISTER
macro.
Notice the use of margo_provider_forward
in alpha_compute_sum
,
which uses the provider id to send the RPC to a specific provider.
Alpha server interface
Moving on to the server’s side, the following code shows how to define the server’s interface.
alpha-server.h (show/hide)
#ifndef __ALPHA_SERVER_H
#define __ALPHA_SERVER_H
#include <margo.h>
#include <alpha-common.h>
#ifdef __cplusplus
extern "C" {
#endif
#define ALPHA_ABT_POOL_DEFAULT ABT_POOL_NULL
typedef struct alpha_provider* alpha_provider_t;
#define ALPHA_PROVIDER_NULL ((alpha_provider_t)NULL)
#define ALPHA_PROVIDER_IGNORE ((alpha_provider_t*)NULL)
/**
* @brief Creates a new ALPHA provider. If ALPHA_PROVIDER_IGNORE
* is passed as last argument, the provider will be automatically
* destroyed when calling :code:`margo_finalize`.
*
* @param[in] mid Margo instance
* @param[in] provider_id provider id
* @param[in] pool Argobots pool
* @param[out] provider provider handle
*
* @return ALPHA_SUCCESS or error code defined in alpha-common.h
*/
int alpha_provider_register(
margo_instance_id mid,
uint16_t provider_id,
ABT_pool pool,
alpha_provider_t* provider);
/**
* @brief Destroys the Alpha provider and deregisters its RPC.
*
* @param[in] provider Alpha provider
*
* @return ALPHA_SUCCESS or error code defined in alpha-common.h
*/
int alpha_provider_destroy(
alpha_provider_t provider);
#ifdef __cplusplus
}
#endif
#endif
This interface contains the definition of an opaque pointer type,
alpha_provider_t
, which will be used to hide the implementation
of our Alpha provider.
Our interface contains the alpha_provider_register
function,
which creates an Alpha provider and registers its RPCs, and the
alpha_provider_destroy
function, which destroys it and deregisters
the corresponding RPCs. The former also allows users to pass
ALPHA_PROVIDER_IGNORE
as last argument, when we don’t expect to do
anything with the provider after registration.
This interface would also be the place where to put other functions that configure or modify the Alpha provider once created.
Note
The alpha_provider_register
function also takes an Argobots pool
as argument. We will discuss this in a following tutorial.
Server implementation
The following code shows the implementation of the interface we just defined.
alpha-server.c (show/hide)
#include "alpha-server.h"
#include "types.h"
#include <stdlib.h>
struct alpha_provider {
margo_instance_id mid;
hg_id_t sum_id;
/* other provider-specific data */
};
static void alpha_finalize_provider(void* p);
DECLARE_MARGO_RPC_HANDLER(alpha_sum_ult);
static void alpha_sum_ult(hg_handle_t h);
/* add other RPC declarations here */
int alpha_provider_register(
margo_instance_id mid,
uint16_t provider_id,
ABT_pool pool,
alpha_provider_t* provider)
{
alpha_provider_t p;
hg_id_t id;
hg_bool_t flag;
flag = margo_is_listening(mid);
if(flag == HG_FALSE) {
margo_error(mid, "alpha_provider_register(): margo instance is not a server");
return ALPHA_FAILURE;
}
margo_provider_registered_name(mid, "alpha_sum", provider_id, &id, &flag);
if(flag == HG_TRUE) {
margo_error(mid, "alpha_provider_register(): a provider with the same provider id (%d) already exists", provider_id);
return ALPHA_FAILURE;
}
p = (alpha_provider_t)calloc(1, sizeof(*p));
if(p == NULL) {
margo_error(mid, "alpha_provider_register(): failed to allocate memory for provider");
return ALPHA_FAILURE;
}
p->mid = mid;
id = MARGO_REGISTER_PROVIDER(mid, "alpha_sum",
sum_in_t, sum_out_t,
alpha_sum_ult, provider_id, pool);
margo_register_data(mid, id, (void*)p, NULL);
p->sum_id = id;
/* add other RPC registration here */
margo_provider_push_finalize_callback(mid, p, &alpha_finalize_provider, p);
if(provider)
*provider = p;
return ALPHA_SUCCESS;
}
static void alpha_finalize_provider(void* p)
{
alpha_provider_t provider = (alpha_provider_t)p;
margo_deregister(provider->mid, provider->sum_id);
/* deregister other RPC ids ... */
free(provider);
}
int alpha_provider_destroy(
alpha_provider_t provider)
{
/* pop the finalize callback */
margo_provider_pop_finalize_callback(provider->mid, provider);
/* call the callback */
alpha_finalize_provider(provider);
return ALPHA_SUCCESS;
}
static void alpha_sum_ult(hg_handle_t h)
{
hg_return_t ret;
sum_in_t in;
sum_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(h);
const struct hg_info* info = margo_get_info(h);
alpha_provider_t provider = (alpha_provider_t)margo_registered_data(mid, info->id);
ret = margo_get_input(h, &in);
out.ret = in.x + in.y;
margo_trace(mid, "Computed %d + %d = %d", in.x, in.y, out.ret);
ret = margo_respond(h, &out);
ret = margo_free_input(h, &in);
margo_destroy(h);
}
DEFINE_MARGO_RPC_HANDLER(alpha_sum_ult)
We start by defining the alpha_provider
structure. It may contain
the RPC ids as well as any data you may need as context for your RPCs.
The alpha_provider_register
function starts by checking that the
Margo instance is in server mode by using margo_is_listening
.
It then checks that there isn’t already an alpha provider with the same id. It does
so by using margo_provider_registered_name
to check whether the sum
RPC has already been registered with the same provider id.
We then use MARGO_REGISTER_PROVIDER
instead of MARGO_REGISTER
.
This macro takes a provider id and an Argobots pool in addition to the parameters
of MARGO_REGISTER
.
Finally, we call margo_provider_push_finalize_callback
to setup
a callback that Margo should call when calling margo_finalize
.
This callback will deregister the RPCs and free the provider.
The alpha_provider_destroy
function is pretty simple but important
to understand. In most cases the user will create a provider and leave it running
until something calls margo_finalize
, at which point the provider’s
finalization callback will be called. If the user wants to destroy the provider
before Margo is finalized, it is important to tell Margo not to call the provider’s
finalization callback when margo_finalize
. Hence, we use margo_provider_pop_finalize_callback
.
This function takes a Margo instance, and an owner for the callback (here the provider).
If the provider registered multiple callbacks using margo_provider_push_finalize_callback
,
margo_provider_pop_finalize_callback
will pop the last one pushed, and should therefore
be called as many time as needed to pop all the finalization callbacks corresponding to the provider.
Warning
Finalization callbacks are called after the Mercury progress loop is terminated.
Hence, you cannot send RPCs from them. If you need a finalization callback to be
called before the progress loop is terminated, use margo_push_prefinalize_callback
or margo_provider_push_prefinalize_callback
.
Using the Alpha client
The previous codes can be compiled into two libraries, libalpha-client.{a,so} and libalpha-server.{a,so}. The former will be used by client codes to use the Alpha microservice as follows.
client.c (show/hide)
#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include <alpha-client.h>
int main(int argc, char** argv)
{
if(argc != 3) {
fprintf(stderr,"Usage: %s <server address> <provider id>\n", argv[0]);
exit(0);
}
const char* svr_addr_str = argv[1];
uint16_t provider_id = atoi(argv[2]);
margo_instance_id mid = margo_init("tcp", MARGO_CLIENT_MODE, 0, 0);
margo_set_log_level(mid, MARGO_LOG_INFO);
hg_addr_t svr_addr;
margo_addr_lookup(mid, svr_addr_str, &svr_addr);
alpha_client_t alpha_clt;
alpha_provider_handle_t alpha_ph;
alpha_client_init(mid, &alpha_clt);
alpha_provider_handle_create(alpha_clt, svr_addr, provider_id, &alpha_ph);
int32_t result;
alpha_compute_sum(alpha_ph, 45, 23, &result);
alpha_provider_handle_release(alpha_ph);
alpha_client_finalize(alpha_clt);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return 0;
}
Notice how simple such an interface is for end users.
Using the Alpha server
A server can be written that spins up an Alpha providervas follows.
server.c (show/hide)
#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include <alpha-server.h>
int main(int argc, char** argv)
{
margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 0, 0);
assert(mid);
margo_set_log_level(mid, MARGO_LOG_INFO);
hg_addr_t my_address;
margo_addr_self(mid, &my_address);
char addr_str[128];
size_t addr_str_size = 128;
margo_addr_to_string(mid, addr_str, &addr_str_size, my_address);
margo_addr_free(mid,my_address);
margo_info(mid, "Server running at address %s, with provider id 42", addr_str);
alpha_provider_register(mid, 42, ALPHA_ABT_POOL_DEFAULT, ALPHA_PROVIDER_IGNORE);
margo_wait_for_finalize(mid);
return 0;
}
A typical Mochi service will consist of a composition of multiple providers spin up in the same program.
Tip
To avoid conflicts with other microservices, it is recommended to prefix the name of the RPCs with the name of the service, as we did here with “alpha_sum”.
Note
Providers declaring RPCs with distinct names (i.e. providers from distinct microservices) can have the same provider ids. The provider id is here to distinguish providers of the same type within a given server.
Timeout
The margo_provider_forward_timed
and margo_provider_iforward_timed
can be used when sending RPCs (in a blocking or non-blocking manner) to specify
a timeout in milliseconds after which the call (or result of margo_wait
)
will be HG_TIMEOUT
.