Sending arguments, returning values

In the previous tutorial we saw how to send a simple RPC. This RPC was not taking any argument and the server was not sending any response to the client. In this tutorial we will see how to add arguments and return values to an RPC. We will build an RPC handler that receives two integers and returns their sum to the client.

Input and output structures

First, we need to define structures encapsulating the input and output data. This is done using Mercury macros as shown in the following code, which we will place in a types.h private header file.

types.h (show/hide)

#ifndef PARAM_H
#define PARAM_H

#include <mercury.h>
#include <mercury_macros.h>

/* We use the Mercury macros to define the input
 * and output structures along with the serialization
 * functions.
 */
MERCURY_GEN_PROC(sum_in_t,
        ((int32_t)(x))\
        ((int32_t)(y)))

MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret)))

#endif

We include <mercury.h> and <mercury_macros.h>. The latter contains the necessary macros. We then use MERCURY_GEN_PROC to declare our structures. This macro expends not only to the definition of the sum_in_t and sum_out_t structures, but also to that of serialization functions that Mercury (hence Margo) can use to serialize these structures into a buffer, and deserialize them from a buffer.

Note

Once these types are defined using the macro, you can use them as members of other types.

Note

The <mercury_proc_string.h> may also be included. It provides the hg_string_t and hg_const_string_t types, which are typedefs of char* and const char* respectively and must be used to represent null-terminated strings.

Important

The structures defined with the MERCURY_GEN_PROC cannot contain pointers (apart from the hg_string_t and hg_const_string_t types). We will see in a future tutorial how to define serialization functions for more complex structures.

Sum server code

The following shows the server code.

server.c (show/hide)

#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include "types.h"

typedef struct {
    int max_rpcs;
    int num_rpcs;
} server_data;

static void sum(hg_handle_t h);
DECLARE_MARGO_RPC_HANDLER(sum)

int main(int argc, char** argv)
{
    margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 0, 0);
    assert(mid);

    server_data svr_data = {
        .max_rpcs = 4,
        .num_rpcs = 0
    };

    hg_addr_t my_address;
    margo_addr_self(mid, &my_address);
    char addr_str[128];
    size_t addr_str_size = 128;
    margo_addr_to_string(mid, addr_str, &addr_str_size, my_address);
    margo_addr_free(mid,my_address);

    margo_info(mid, "Server running at address %s", addr_str);

    hg_id_t rpc_id = MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, sum);
    margo_register_data(mid, rpc_id, &svr_data, NULL);

    margo_wait_for_finalize(mid);

    return 0;
}

static void sum(hg_handle_t h)
{
    hg_return_t ret;

    sum_in_t in;
    sum_out_t out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);
    margo_set_log_level(mid, MARGO_LOG_INFO);

    const struct hg_info* info = margo_get_info(h);
    server_data* svr_data = (server_data*)margo_registered_data(mid, info->id);

    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    out.ret = in.x + in.y;
    margo_trace(mid, "Computed %d + %d = %d", in.x, in.y, out.ret);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    svr_data->num_rpcs += 1;
    if(svr_data->num_rpcs == svr_data->max_rpcs) {
        margo_finalize(mid);
    }
}
DEFINE_MARGO_RPC_HANDLER(sum)

This code is very similar to our earlier code (you will notice that we have attached data to the RPC to avoid using global variables, as advised at the end of the previous tutorial).

Now MARGO_REGISTER takes the types of the arguments being sent and received, rather than void. Notice that we are also not calling margo_registered_disable_response anymore since this time the server will send a response to the client.

Two structures in and out are declared at the beginning of the RPC handler. in will be used to deserialize the arguments sent by the client, while out will be used to send a response.

margo_get_input is used to deserialize the content of the RPC’s data into the variable in. The out variable is then modified with out.ret = in.x + in.y; before being sent back to the client using margo_respond.

Important

An input deserialized using margo_get_input should be freed using margo_free_input, even if the structure is on the stack and does not contain pointers.

Client code

Let’s now take a look at the client’s code.

client.c (show/hide)

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include "types.h"

int main(int argc, char** argv)
{
    if(argc != 2) {
        fprintf(stderr,"Usage: %s <server address>\n", argv[0]);
        exit(0);
    }

    margo_instance_id mid = margo_init("tcp", MARGO_CLIENT_MODE, 0, 0);
    margo_set_log_level(mid, MARGO_LOG_INFO);

    hg_id_t sum_rpc_id = MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, NULL);

    hg_addr_t svr_addr;
    margo_addr_lookup(mid, argv[1], &svr_addr);

    int i;
    sum_in_t args;
    for(i=0; i<4; i++) {
        args.x = 42+i*2;
        args.y = 42+i*2+1;

        hg_handle_t h;
        margo_create(mid, svr_addr, sum_rpc_id, &h);
        margo_forward(h, &args);

        sum_out_t resp;
        margo_get_output(h, &resp);

        margo_info(mid, "Got response: %d+%d = %d\n", args.x, args.y, resp.ret);

        margo_free_output(h,&resp);
        margo_destroy(h);
    }

    margo_addr_free(mid, svr_addr);

    margo_finalize(mid);

    return 0;
}

Again, MARGO_REGISTER takes the types of the arguments being sent and received. We initialize the sum_in_t args; and sum_out_t resp; to hold respectively the arguments of the RPC (what will become the in variable on the server side) and the return value (out on the server side).

margo_forward now takes a pointer to the input argument as second parameter, and margo_get_output is used to deserialized the value returned by the server into the resp variable.

Important

Just like we called margo_free_input on the server because the input had been obtained using margo_get_input, we must call margo_free_output on the client side because the output has been obtained using margo_get_output.

Timeout

It can sometimes be important for the client to be able to timeout if an operation takes too long. This can be done using margo_forward_timed, which takes an extra parameter: a timeout (double) value in milliseconds. If the server has not responded to the RPC after this timeout expires, margo_forward_timed will return HG_TIMEOUT and the RPC will be cancelled.

Important

The fact that a call has timed out does not mean that the server hasn’t received the RPC or hasn’t processed it. It simply means that, should the server send a reponse back, this response will be ignored by the client. Worse: the server will not be aware that the client has cancelled the operation. It is up to the developer to make sure that such a behavior is consistent with the semantics of her protocol.