Non-blocking RPC
We may sometimes want to send an RPC and carry on some work,
checking later whether the RPC has completed. This is done
using margo_iforward
and some other functions that
will be described in this tutorial.
Input and output structures
We will take again the example of a sum RPC and make the RPC non-blocking. The header bellow is a reminder of what the input and output structures look like.
types.h (show/hide)
#ifndef PARAM_H
#define PARAM_H
#include <mercury.h>
#include <mercury_macros.h>
MERCURY_GEN_PROC(sum_in_t,
((int32_t)(x))\
((int32_t)(y)))
MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret)))
#endif
Non-blocking forward on client
The following code examplifies the use of non-blocking RPC on clients.
client.c (show/hide)
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include "types.h"
int main(int argc, char** argv)
{
if(argc != 2) {
fprintf(stderr,"Usage: %s <server address>\n", argv[0]);
exit(0);
}
margo_instance_id mid = margo_init("tcp", MARGO_CLIENT_MODE, 0, 0);
margo_set_log_level(mid, MARGO_LOG_DEBUG);
hg_id_t sum_rpc_id = MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, NULL);
hg_addr_t svr_addr;
margo_addr_lookup(mid, argv[1], &svr_addr);
int i;
sum_in_t args;
for(i=0; i<4; i++) {
args.x = 42+i*2;
args.y = 42+i*2+1;
hg_handle_t h;
margo_create(mid, svr_addr, sum_rpc_id, &h);
margo_request req;
margo_iforward(h, &args, &req);
margo_debug(mid, "Waiting for reply...");
margo_wait(req);
sum_out_t resp;
margo_get_output(h, &resp);
margo_debug(mid, "Got response: %d+%d = %d", args.x, args.y, resp.ret);
margo_free_output(h,&resp);
margo_destroy(h);
}
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return 0;
}
Instead of using margo_forward
, we use margo_iforward
.
This function returns immediately after having sent the RPC to the server.
It also takes an extra argument of type margo_request*
.
The client will use this request object to check the status of the RPC.
We then use margo_wait
on the request to block until we have
received a response from the server. Alternatively, margo_test
can be be used to check whether the server has sent a response, without
blocking if it hasn’t.
Note
It is safe to delete or modify the RPC’s input right after the call to
margo_iforward
. margo_iforward
indeed returns after
having serialized this input into its send buffer.
Non-blocking response on server
Although generally less useful than non-blocking forwards,
non-blocking responses are also available on servers.
The margo_irespond
function can be used for this purpose.
It returns as soon as the response has been posted to the Mercury queue.
server.c (show/hide)
#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include "types.h"
static const int TOTAL_RPCS = 16;
static int num_rpcs = 0;
static void sum(hg_handle_t h);
DECLARE_MARGO_RPC_HANDLER(sum)
int main(int argc, char** argv)
{
margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 0, 0);
assert(mid);
margo_set_log_level(mid, MARGO_LOG_INFO);
hg_addr_t my_address;
margo_addr_self(mid, &my_address);
char addr_str[128];
size_t addr_str_size = 128;
margo_addr_to_string(mid, addr_str, &addr_str_size, my_address);
margo_addr_free(mid,my_address);
margo_info(mid, "Server running at address %s", addr_str);
MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, sum);
margo_wait_for_finalize(mid);
return 0;
}
static void sum(hg_handle_t h)
{
hg_return_t ret;
num_rpcs += 1;
sum_in_t in;
sum_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(h);
ret = margo_get_input(h, &in);
assert(ret == HG_SUCCESS);
out.ret = in.x + in.y;
margo_info(mid, "Computed %d + %d = %d", in.x, in.y, out.ret);
margo_thread_sleep(mid, 1000);
margo_request req;
ret = margo_irespond(h, &out, &req);
assert(ret == HG_SUCCESS);
/* ... do other work ... */
ret = margo_wait(req);
assert(ret == HG_SUCCESS);
ret = margo_free_input(h, &in);
assert(ret == HG_SUCCESS);
ret = margo_destroy(h);
assert(ret == HG_SUCCESS);
if(num_rpcs == TOTAL_RPCS) {
margo_finalize(mid);
}
}
DEFINE_MARGO_RPC_HANDLER(sum)
Note
margo_respond
(the blocking version) returns when the
response has been sent, but does not guarantees that the client
has received it. Its behavior is not very different
from margo_irespond
, which returns as soon as the
response has been scheduled for sending. Hence it is unlikely
that you ever need margo_irespond
.
Timeout
Just like there is a margo_forward_timed
, there is a
margo_iforward_timed
, which takes an additional parameter
(before the request pointer) indicating a timeout in millisecond.
This timeout applies from the time of the call to margo_iforward_timed
.
Should the server not respond within this time limit, the called to
margo_wait
on the resulting request will return HG_TIMEOUT
.