Transferring data over RDMA

Margo inherits from Mercury the possibility to do RDMA operations. In this tutorial, we will revisit our sum example and have the client send a bunch of values to the server by exposing a buffer of memory where these values are located, and have the server pull from this memory.

Input/Output with hg_bulk_t

Let’s first take a look at the types.

types.h (show/hide)

#ifndef PARAM_H
#define PARAM_H

#include <mercury.h>
#include <mercury_macros.h>

MERCURY_GEN_PROC(sum_in_t,
        ((int32_t)(n))\
        ((hg_bulk_t)(bulk)))

MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret)))

#endif

The hg_bulk_t opaque type represents a handle to a region of memory in a process. In addition to this handle, we add a field n that will tell us how many values are in the buffer.

Client exposing memory

Starting with the client code for once.

client. (show/hide)

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include "types.h"

int main(int argc, char** argv)
{
    if(argc != 2) {
        fprintf(stderr,"Usage: %s <server address>\n", argv[0]);
        exit(0);
    }

    margo_instance_id mid = margo_init("tcp", MARGO_CLIENT_MODE, 0, 0);
    margo_set_log_level(mid, MARGO_LOG_DEBUG);

    hg_id_t sum_rpc_id = MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, NULL);

    hg_addr_t svr_addr;
    margo_addr_lookup(mid, argv[1], &svr_addr);

    int i;
    sum_in_t args;
    for(i=0; i<4; i++) {

        int32_t values[10] = { 1,4,2,5,6,3,5,3,2,5 };
        hg_size_t segment_sizes[1] = { 10*sizeof(int32_t) };
        void* segment_ptrs[1] = { (void*)values };

        hg_bulk_t local_bulk;
        margo_bulk_create(mid, 1, segment_ptrs, segment_sizes, HG_BULK_READ_ONLY, &local_bulk);

        args.n = 10;
        args.bulk = local_bulk;

        hg_handle_t h;
        margo_create(mid, svr_addr, sum_rpc_id, &h);
        margo_forward(h, &args);

        sum_out_t resp;
        margo_get_output(h, &resp);

        margo_debug(mid, "Got response: %d", resp.ret);

        margo_free_output(h,&resp);
        margo_destroy(h);

        margo_bulk_free(local_bulk);
    }

    margo_addr_free(mid, svr_addr);

    margo_finalize(mid);

    return 0;
}

We allocate the values buffer as an array of 10 integers (this array is on the stack in this example. An array allocated on the heap would work just the same). margo_bulk_create is used to create an hg_bulk_t handle representing the segment of memory exposed by the client. Its first parameter is the margo_instance_id. Then come the number of segments to expose, a void** array of addresses pointing to each segment, a hg_size_t* array of sizes for each segment, and the mode used to expose the memory region. HG_BULK_READ_ONLY indicates that Margo will only read (i.e., the server will only pull) from this segment. HG_BULK_WRITE_ONLY indicates that Margo will only write to the segment and HG_BULK_READWRITE indicates that both operations may happen.

The bulk handle is freed after being used, using margo_bulk_free.

Server pulling from client

Let’s now take a look at the server.

server.c (show/hide)

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include "types.h"

static const int TOTAL_RPCS = 16;
static int num_rpcs = 0;

static void sum(hg_handle_t h);
DECLARE_MARGO_RPC_HANDLER(sum)

int main(int argc, char** argv)
{
    margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 0, 0);
    assert(mid);
    margo_set_log_level(mid, MARGO_LOG_INFO);

    hg_addr_t my_address;
    margo_addr_self(mid, &my_address);
    char addr_str[128];
    size_t addr_str_size = 128;
    margo_addr_to_string(mid, addr_str, &addr_str_size, my_address);
    margo_addr_free(mid,my_address);

    margo_info(mid, "Server running at address %s\n", addr_str);

    MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, sum);

    margo_wait_for_finalize(mid);

    return 0;
}

static void sum(hg_handle_t h)
{
    hg_return_t ret;
    num_rpcs += 1;

    sum_in_t in;
    sum_out_t out;
    int32_t* values;
    hg_bulk_t local_bulk;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    const struct hg_info* info = margo_get_info(h);
    hg_addr_t client_addr = info->addr;

    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    values = calloc(in.n, sizeof(*values));
    hg_size_t buf_size = in.n * sizeof(*values);

    ret = margo_bulk_create(mid, 1, (void**)&values, &buf_size,
            HG_BULK_WRITE_ONLY, &local_bulk);
    assert(ret == HG_SUCCESS);

    ret = margo_bulk_transfer(mid, HG_BULK_PULL, client_addr,
            in.bulk, 0, local_bulk, 0, buf_size);
    assert(ret == HG_SUCCESS);

    out.ret = 0;
    int i;
    for(i = 0; i < in.n; i++) {
        out.ret += values[i];
    }

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    ret = margo_bulk_free(local_bulk);
    assert(ret == HG_SUCCESS);

    free(values);

    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    if(num_rpcs == TOTAL_RPCS) {
        margo_finalize(mid);
    }
}
DEFINE_MARGO_RPC_HANDLER(sum)

Within the RPC handler, after deserializing the RPC’s input, we allocate an array of appropriate size:

values = calloc(in.n, sizeof(*values));

We then expose it the same way as we did on the client side, to get a local bulk handle, using margo_bulk_create. This time we specify that this handle will be only written.

margo_bulk_transfer is used to do the transfer. Here we pull (HG_BULK_PULL) the data from the client’s memory to the server’s local memory. We provide the client’s address (obtained from the hg_info structure of the RPC handle), the offset in the client’s memory region (here 0) and on the local memory region (0 as well), as well as the size in bytes.

Once the transfer is completed, we perform the sum and return it to the client. We don’t forget to use margo_bulk_free to free the bulk handle we created (the bulk handle in the in structure will be freed by margo_free_input, which is why it is so important that this function be called).