Working in terms of providers

In this tutorial, we will learn about providers. This is probably the most important tutorial on Margo since it also describe the design patterns and methodology to use to develop a Margo-based service.

We will take again the sum example developped in earlier tutorials, but this time we will give it a proper microservice interface. The result will be composed of two libraries: one for clients, one for servers, as well as their respective headers. We will call this microservice Alpha.

Terminology

Although C is not an object-oriented programming language, the best way to understand providers is to think of them an object that can receive RPCs. Rather than targetting a server, as we did in previous tutorials, a client’s RPC will target a specific provider in this server.

Multiple providers belonguing to the same service and living at the same address will expose the same set of RPCs, but each provider will be distinguished from others using its unique provider id (an uint16_t).

Clients will now use provider handles rather than addresses to communicate with a particular provider at a given address. A provider handle encapsulates the address of the server as well as the provider id of the provider inside this server.

Input and output structures

Our alpha service will use the same input and output structures as in earlier tutorials. We put it here for completeness.

types.h (show/hide)

#ifndef PARAM_H
#define PARAM_H

#include <mercury.h>
#include <mercury_macros.h>

MERCURY_GEN_PROC(sum_in_t,
        ((int32_t)(x))\
        ((int32_t)(y)))

MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret)))

#endif

Alpha client interface

First let’s start with a header that will be common to the server and the client. This header will simply define the ALPHA_SUCCESS and ALPHA_FAILURE return codes.

alpha-common.h (show/hide)

#ifndef __ALPHA_COMMON_H
#define __ALPHA_COMMON_H

#define ALPHA_SUCCESS  0
#define ALPHA_FAILURE -1

#endif

Let’s now declare our client interface.

alpha-client.h (show/hide)

#ifndef __ALPHA_CLIENT_H
#define __ALPHA_CLIENT_H

#include <margo.h>
#include <alpha-common.h>

#if defined(__cplusplus)
extern "C" {
#endif

typedef struct alpha_client* alpha_client_t;
#define ALPHA_CLIENT_NULL ((alpha_client_t)NULL)

typedef struct alpha_provider_handle *alpha_provider_handle_t;
#define ALPHA_PROVIDER_HANDLE_NULL ((alpha_provider_handle_t)NULL)

/**
 * @brief Creates a ALPHA client.
 *
 * @param[in] mid Margo instance
 * @param[out] client ALPHA client
 *
 * @return ALPHA_SUCCESS or error code defined in alpha-common.h
 */
int alpha_client_init(margo_instance_id mid, alpha_client_t* client);

/**
 * @brief Finalizes a ALPHA client.
 *
 * @param[in] client ALPHA client to finalize
 *
 * @return ALPHA_SUCCESS or error code defined in alpha-common.h
 */
int alpha_client_finalize(alpha_client_t client);

/**
 * @brief Creates a ALPHA provider handle.
 *
 * @param[in] client ALPHA client responsible for the provider handle
 * @param[in] addr Mercury address of the provider
 * @param[in] provider_id id of the provider
 * @param[in] handle provider handle
 *
 * @return ALPHA_SUCCESS or error code defined in alpha-common.h
 */
int alpha_provider_handle_create(
        alpha_client_t client,
        hg_addr_t addr,
        uint16_t provider_id,
        alpha_provider_handle_t* handle);

/**
 * @brief Increments the reference counter of a provider handle.
 *
 * @param handle provider handle
 *
 * @return ALPHA_SUCCESS or error code defined in alpha-common.h
 */
int alpha_provider_handle_ref_incr(
        alpha_provider_handle_t handle);

/**
 * @brief Releases the provider handle. This will decrement the
 * reference counter, and free the provider handle if the reference
 * counter reaches 0.
 *
 * @param[in] handle provider handle to release.
 *
 * @return ALPHA_SUCCESS or error code defined in alpha-common.h
 */
int alpha_provider_handle_release(alpha_provider_handle_t handle);

/**
 * @brief Makes the target ALPHA provider compute the sum of the
 * two numbers and return the result.
 *
 * @param[in] handle provide handle.
 * @param[in] x first number.
 * @param[in] y second number.
 * @param[out] result resulting value.
 *
 * @return ALPHA_SUCCESS or error code defined in alpha-common.h
 */
int alpha_compute_sum(
        alpha_provider_handle_t handle,
        int32_t x,
        int32_t y,
        int32_t* result);

#endif

The client interface defines two opaque pointers to structures.

  • The alpha_client_t handle will be pointing to an object keeping track of registered RPC identifiers for our microservice. An object of this type will be created using alpha_client_init and destroyed using alpha_client_finalize.

  • The alpha_provider_handle_t handle will be pointing to a provider handle for a provider of the Alpha service. An object of this type will be created using alpha_provider_handle_create function and destroyed using alpha_provider_handle_release. This object will have an internal reference count. alpha_provider_handle_ref_incr will be used to manually increment this reference count.

The alpha_compute_sum function will be in charge of sending a sum RPC to the provider designated by the provider handle.

Client implementation

The following code shows the implementation of our client interface.

alpha-client.c (show/hide)

#include "types.h"
#include "alpha-client.h"
#include <stdlib.h>

struct alpha_client {
   margo_instance_id mid;
   hg_id_t           sum_id;
   uint64_t          num_prov_hdl;
};

struct alpha_provider_handle {
    alpha_client_t client;
    hg_addr_t      addr;
    uint16_t       provider_id;
    uint64_t       refcount;
};

int alpha_client_init(margo_instance_id mid, alpha_client_t* client)
{
    int ret = ALPHA_SUCCESS;

    alpha_client_t c = (alpha_client_t)calloc(1, sizeof(*c));
    if(!c) return ALPHA_FAILURE;

    c->mid = mid;

    hg_bool_t flag;
    hg_id_t id;
    margo_registered_name(mid, "alpha_sum", &id, &flag);

    if(flag == HG_TRUE) {
        margo_registered_name(mid, "alpha_sum", &c->sum_id, &flag);
    } else {
        c->sum_id = MARGO_REGISTER(mid, "alpha_sum", sum_in_t, sum_out_t, NULL);
    }

    *client = c;
    return ALPHA_SUCCESS;
}

int alpha_client_finalize(alpha_client_t client)
{
    if(client->num_prov_hdl != 0) {
        margo_warning(client->mid,
            "%d provider handles not released when alpha_client_finalize was called",
            client->num_prov_hdl);
    }
    free(client);
    return ALPHA_SUCCESS;
}

int alpha_provider_handle_create(
        alpha_client_t client,
        hg_addr_t addr,
        uint16_t provider_id,
        alpha_provider_handle_t* handle)
{
    if(client == ALPHA_CLIENT_NULL)
        return ALPHA_FAILURE;

    alpha_provider_handle_t ph =
        (alpha_provider_handle_t)calloc(1, sizeof(*ph));

    if(!ph) return ALPHA_FAILURE;

    hg_return_t ret = margo_addr_dup(client->mid, addr, &(ph->addr));
    if(ret != HG_SUCCESS) {
        free(ph);
        return ALPHA_FAILURE;
    }

    ph->client      = client;
    ph->provider_id = provider_id;
    ph->refcount    = 1;

    client->num_prov_hdl += 1;

    *handle = ph;
    return ALPHA_SUCCESS;
}

int alpha_provider_handle_ref_incr(
        alpha_provider_handle_t handle)
{
    if(handle == ALPHA_PROVIDER_HANDLE_NULL)
        return ALPHA_FAILURE;
    handle->refcount += 1;
    return ALPHA_SUCCESS;
}

int alpha_provider_handle_release(alpha_provider_handle_t handle)
{
    if(handle == ALPHA_PROVIDER_HANDLE_NULL)
        return ALPHA_FAILURE;
    handle->refcount -= 1;
    if(handle->refcount == 0) {
        margo_addr_free(handle->client->mid, handle->addr);
        handle->client->num_prov_hdl -= 1;
        free(handle);
    }
    return ALPHA_SUCCESS;
}

int alpha_compute_sum(
        alpha_provider_handle_t handle,
        int32_t x,
        int32_t y,
        int32_t* result)
{
    hg_handle_t   h;
    sum_in_t     in;
    sum_out_t   out;
    hg_return_t ret;

    in.x = x;
    in.y = y;

    ret = margo_create(handle->client->mid, handle->addr, handle->client->sum_id, &h);
    if(ret != HG_SUCCESS)
        return ALPHA_FAILURE;

    ret = margo_provider_forward(handle->provider_id, h, &in);
    if(ret != HG_SUCCESS) {
        margo_destroy(h);
        return ALPHA_FAILURE;
    }

    ret = margo_get_output(h, &out);
    if(ret != HG_SUCCESS) {
        margo_destroy(h);
        return ALPHA_FAILURE;
    }

    *result = out.ret;

    margo_free_output(h, &out);
    margo_destroy(h);
    return ALPHA_SUCCESS;
}

When initializing the client, margo_registered_name is used to check whether the RPC has been defined already. If it has, we use this function to retrieve its id. Otherwise, we use the usual MARGO_REGISTER macro.

Notice the use of margo_provider_forward in alpha_compute_sum, which uses the provider id to send the RPC to a specific provider.

Alpha server interface

Moving on to the server’s side, the following code shows how to define the server’s interface.

alpha-server.h (show/hide)

#ifndef __ALPHA_SERVER_H
#define __ALPHA_SERVER_H

#include <margo.h>
#include <alpha-common.h>

#ifdef __cplusplus
extern "C" {
#endif

#define ALPHA_ABT_POOL_DEFAULT ABT_POOL_NULL

typedef struct alpha_provider* alpha_provider_t;
#define ALPHA_PROVIDER_NULL ((alpha_provider_t)NULL)
#define ALPHA_PROVIDER_IGNORE ((alpha_provider_t*)NULL)

/**
 * @brief Creates a new ALPHA provider. If ALPHA_PROVIDER_IGNORE
 * is passed as last argument, the provider will be automatically
 * destroyed when calling :code:`margo_finalize`.
 *
 * @param[in] mid Margo instance
 * @param[in] provider_id provider id
 * @param[in] pool Argobots pool
 * @param[out] provider provider handle
 *
 * @return ALPHA_SUCCESS or error code defined in alpha-common.h
 */
int alpha_provider_register(
        margo_instance_id mid,
        uint16_t provider_id,
        ABT_pool pool,
        alpha_provider_t* provider);

/**
 * @brief Destroys the Alpha provider and deregisters its RPC.
 *
 * @param[in] provider Alpha provider
 *
 * @return ALPHA_SUCCESS or error code defined in alpha-common.h
 */
int alpha_provider_destroy(
        alpha_provider_t provider);

#ifdef __cplusplus
}
#endif

#endif

This interface contains the definition of an opaque pointer type, alpha_provider_t, which will be used to hide the implementation of our Alpha provider. Our interface contains the alpha_provider_register function, which creates an Alpha provider and registers its RPCs, and the alpha_provider_destroy function, which destroys it and deregisters the corresponding RPCs. The former also allows users to pass ALPHA_PROVIDER_IGNORE as last argument, when we don’t expect to do anything with the provider after registration.

This interface would also be the place where to put other functions that configure or modify the Alpha provider once created.

Note

The alpha_provider_register function also takes an Argobots pool as argument. We will discuss this in a following tutorial.

Server implementation

The following code shows the implementation of the interface we just defined.

alpha-server.c (show/hide)

#include "alpha-server.h"
#include "types.h"
#include <stdlib.h>

struct alpha_provider {
    margo_instance_id mid;
    hg_id_t sum_id;
    /* other provider-specific data */
};

static void alpha_finalize_provider(void* p);

DECLARE_MARGO_RPC_HANDLER(alpha_sum_ult);
static void alpha_sum_ult(hg_handle_t h);
/* add other RPC declarations here */

int alpha_provider_register(
        margo_instance_id mid,
        uint16_t provider_id,
        ABT_pool pool,
        alpha_provider_t* provider)
{
    alpha_provider_t p;
    hg_id_t id;
    hg_bool_t flag;

    flag = margo_is_listening(mid);
    if(flag == HG_FALSE) {
        margo_error(mid, "alpha_provider_register(): margo instance is not a server");
        return ALPHA_FAILURE;
    }

    margo_provider_registered_name(mid, "alpha_sum", provider_id, &id, &flag);
    if(flag == HG_TRUE) {
        margo_error(mid, "alpha_provider_register(): a provider with the same provider id (%d) already exists", provider_id);
        return ALPHA_FAILURE;
    }

    p = (alpha_provider_t)calloc(1, sizeof(*p));
    if(p == NULL) {
        margo_error(mid, "alpha_provider_register(): failed to allocate memory for provider");
        return ALPHA_FAILURE;
    }

    p->mid = mid;

    id = MARGO_REGISTER_PROVIDER(mid, "alpha_sum",
            sum_in_t, sum_out_t,
            alpha_sum_ult, provider_id, pool);
    margo_register_data(mid, id, (void*)p, NULL);
    p->sum_id = id;
    /* add other RPC registration here */

    margo_provider_push_finalize_callback(mid, p, &alpha_finalize_provider, p);

    if(provider)
        *provider = p;
    return ALPHA_SUCCESS;
}

static void alpha_finalize_provider(void* p)
{
    alpha_provider_t provider = (alpha_provider_t)p;
    margo_deregister(provider->mid, provider->sum_id);
    /* deregister other RPC ids ... */
    free(provider);
}

int alpha_provider_destroy(
        alpha_provider_t provider)
{
    /* pop the finalize callback */
    margo_provider_pop_finalize_callback(provider->mid, provider);
    /* call the callback */
    alpha_finalize_provider(provider);

    return ALPHA_SUCCESS;
}


static void alpha_sum_ult(hg_handle_t h)
{
    hg_return_t ret;
    sum_in_t     in;
    sum_out_t   out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    const struct hg_info* info = margo_get_info(h);
    alpha_provider_t provider = (alpha_provider_t)margo_registered_data(mid, info->id);

    ret = margo_get_input(h, &in);

    out.ret = in.x + in.y;
    margo_trace(mid, "Computed %d + %d = %d", in.x, in.y, out.ret);

    ret = margo_respond(h, &out);
    ret = margo_free_input(h, &in);
    margo_destroy(h);
}
DEFINE_MARGO_RPC_HANDLER(alpha_sum_ult)

We start by defining the alpha_provider structure. It may contain the RPC ids as well as any data you may need as context for your RPCs.

The alpha_provider_register function starts by checking that the Margo instance is in server mode by using margo_is_listening. It then checks that there isn’t already an alpha provider with the same id. It does so by using margo_provider_registered_name to check whether the sum RPC has already been registered with the same provider id.

We then use MARGO_REGISTER_PROVIDER instead of MARGO_REGISTER. This macro takes a provider id and an Argobots pool in addition to the parameters of MARGO_REGISTER.

Finally, we call margo_provider_push_finalize_callback to setup a callback that Margo should call when calling margo_finalize. This callback will deregister the RPCs and free the provider.

The alpha_provider_destroy function is pretty simple but important to understand. In most cases the user will create a provider and leave it running until something calls margo_finalize, at which point the provider’s finalization callback will be called. If the user wants to destroy the provider before Margo is finalized, it is important to tell Margo not to call the provider’s finalization callback when margo_finalize. Hence, we use margo_provider_pop_finalize_callback. This function takes a Margo instance, and an owner for the callback (here the provider). If the provider registered multiple callbacks using margo_provider_push_finalize_callback, margo_provider_pop_finalize_callback will pop the last one pushed, and should therefore be called as many time as needed to pop all the finalization callbacks corresponding to the provider.

Warning

Finalization callbacks are called after the Mercury progress loop is terminated. Hence, you cannot send RPCs from them. If you need a finalization callback to be called before the progress loop is terminated, use margo_push_prefinalize_callback or margo_provider_push_prefinalize_callback.

Using the Alpha client

The previous codes can be compiled into two libraries, libalpha-client.{a,so} and libalpha-server.{a,so}. The former will be used by client codes to use the Alpha microservice as follows.

client.c (show/hide)

#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include <alpha-client.h>

int main(int argc, char** argv)
{
    if(argc != 3) {
        fprintf(stderr,"Usage: %s <server address> <provider id>\n", argv[0]);
        exit(0);
    }

    const char* svr_addr_str = argv[1];
    uint16_t    provider_id  = atoi(argv[2]);

    margo_instance_id mid = margo_init("tcp", MARGO_CLIENT_MODE, 0, 0);
    margo_set_log_level(mid, MARGO_LOG_INFO);

    hg_addr_t svr_addr;
    margo_addr_lookup(mid, svr_addr_str, &svr_addr);

    alpha_client_t alpha_clt;
    alpha_provider_handle_t alpha_ph;

    alpha_client_init(mid, &alpha_clt);

    alpha_provider_handle_create(alpha_clt, svr_addr, provider_id, &alpha_ph);

    int32_t result;
    alpha_compute_sum(alpha_ph, 45, 23, &result);

    alpha_provider_handle_release(alpha_ph);

    alpha_client_finalize(alpha_clt);

    margo_addr_free(mid, svr_addr);

    margo_finalize(mid);

    return 0;
}

Notice how simple such an interface is for end users.

Using the Alpha server

A server can be written that spins up an Alpha providervas follows.

server.c (show/hide)

#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include <alpha-server.h>

int main(int argc, char** argv)
{
    margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 0, 0);
    assert(mid);
    margo_set_log_level(mid, MARGO_LOG_INFO);

    hg_addr_t my_address;
    margo_addr_self(mid, &my_address);
    char addr_str[128];
    size_t addr_str_size = 128;
    margo_addr_to_string(mid, addr_str, &addr_str_size, my_address);
    margo_addr_free(mid,my_address);
    margo_info(mid, "Server running at address %s, with provider id 42", addr_str);

    alpha_provider_register(mid, 42, ALPHA_ABT_POOL_DEFAULT, ALPHA_PROVIDER_IGNORE);

    margo_wait_for_finalize(mid);

    return 0;
}

A typical Mochi service will consist of a composition of multiple providers spin up in the same program.

Tip

To avoid conflicts with other microservices, it is recommended to prefix the name of the RPCs with the name of the service, as we did here with “alpha_sum”.

Note

Providers declaring RPCs with distinct names (i.e. providers from distinct microservices) can have the same provider ids. The provider id is here to distinguish providers of the same type within a given server.

Timeout

The margo_provider_forward_timed and margo_provider_iforward_timed can be used when sending RPCs (in a blocking or non-blocking manner) to specify a timeout in milliseconds after which the call (or result of margo_wait) will be HG_TIMEOUT.