Non-blocking RPC

We may sometimes want to send an RPC and carry on some work, checking later whether the RPC has completed. This is done using margo_iforward and some other functions that will be described in this tutorial.

Input and output structures

We will take again the example of a sum RPC and make the RPC non-blocking. The header bellow is a reminder of what the input and output structures look like.

types.h (show/hide)

#ifndef PARAM_H
#define PARAM_H

#include <mercury.h>
#include <mercury_macros.h>

MERCURY_GEN_PROC(sum_in_t,
        ((int32_t)(x))\
        ((int32_t)(y)))

MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret)))

#endif

Non-blocking forward on client

The following code examplifies the use of non-blocking RPC on clients.

client.c (show/hide)

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include "types.h"

int main(int argc, char** argv)
{
    if(argc != 2) {
        fprintf(stderr,"Usage: %s <server address>\n", argv[0]);
        exit(0);
    }

    margo_instance_id mid = margo_init("tcp", MARGO_CLIENT_MODE, 0, 0);
    margo_set_log_level(mid, MARGO_LOG_DEBUG);
    hg_id_t sum_rpc_id = MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, NULL);

    hg_addr_t svr_addr;
    margo_addr_lookup(mid, argv[1], &svr_addr);

    int i;
    sum_in_t args;
    for(i=0; i<4; i++) {
        args.x = 42+i*2;
        args.y = 42+i*2+1;

        hg_handle_t h;
        margo_create(mid, svr_addr, sum_rpc_id, &h);
        margo_request req;
        margo_iforward(h, &args, &req);

        margo_debug(mid, "Waiting for reply...");

        margo_wait(req);

        sum_out_t resp;
        margo_get_output(h, &resp);

        margo_debug(mid, "Got response: %d+%d = %d", args.x, args.y, resp.ret);

        margo_free_output(h,&resp);
        margo_destroy(h);
    }

    margo_addr_free(mid, svr_addr);

    margo_finalize(mid);

    return 0;
}

Instead of using margo_forward, we use margo_iforward. This function returns immediately after having sent the RPC to the server. It also takes an extra argument of type margo_request*. The client will use this request object to check the status of the RPC.

We then use margo_wait on the request to block until we have received a response from the server. Alternatively, margo_test can be be used to check whether the server has sent a response, without blocking if it hasn’t.

Note

It is safe to delete or modify the RPC’s input right after the call to margo_iforward. margo_iforward indeed returns after having serialized this input into its send buffer.

Non-blocking response on server

Although generally less useful than non-blocking forwards, non-blocking responses are also available on servers. The margo_irespond function can be used for this purpose. It returns as soon as the response has been posted to the Mercury queue.

server.c (show/hide)

#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include "types.h"

static const int TOTAL_RPCS = 16;
static int num_rpcs = 0;

static void sum(hg_handle_t h);
DECLARE_MARGO_RPC_HANDLER(sum)

int main(int argc, char** argv)
{
    margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 0, 0);
    assert(mid);
    margo_set_log_level(mid, MARGO_LOG_INFO);

    hg_addr_t my_address;
    margo_addr_self(mid, &my_address);
    char addr_str[128];
    size_t addr_str_size = 128;
    margo_addr_to_string(mid, addr_str, &addr_str_size, my_address);
    margo_addr_free(mid,my_address);
    margo_info(mid, "Server running at address %s", addr_str);

    MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, sum);

    margo_wait_for_finalize(mid);

    return 0;
}

static void sum(hg_handle_t h)
{
    hg_return_t ret;
    num_rpcs += 1;

    sum_in_t in;
    sum_out_t out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    out.ret = in.x + in.y;
    margo_info(mid, "Computed %d + %d = %d", in.x, in.y, out.ret);

    margo_thread_sleep(mid, 1000);

    margo_request req;

    ret = margo_irespond(h, &out, &req);
    assert(ret == HG_SUCCESS);

    /* ... do other work ... */

    ret = margo_wait(req);
    assert(ret == HG_SUCCESS);

    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    if(num_rpcs == TOTAL_RPCS) {
        margo_finalize(mid);
    }
}
DEFINE_MARGO_RPC_HANDLER(sum)

Note

margo_respond (the blocking version) returns when the response has been sent, but does not guarantees that the client has received it. Its behavior is not very different from margo_irespond, which returns as soon as the response has been scheduled for sending. Hence it is unlikely that you ever need margo_irespond.

Timeout

Just like there is a margo_forward_timed, there is a margo_iforward_timed, which takes an additional parameter (before the request pointer) indicating a timeout in millisecond. This timeout applies from the time of the call to margo_iforward_timed. Should the server not respond within this time limit, the called to margo_wait on the resulting request will return HG_TIMEOUT.