Transferring data over RDMA
Margo inherits from Mercury the possibility to do RDMA operations. In this tutorial, we will revisit our sum example and have the client send a bunch of values to the server by exposing a buffer of memory where these values are located, and have the server pull from this memory.
Input/Output with hg_bulk_t
Let’s first take a look at the types.
types.h (show/hide)
#ifndef PARAM_H
#define PARAM_H
#include <mercury.h>
#include <mercury_macros.h>
MERCURY_GEN_PROC(sum_in_t,
((int32_t)(n))\
((hg_bulk_t)(bulk)))
MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret)))
#endif
The hg_bulk_t
opaque type represents a handle to
a region of memory in a process. In addition to this handle,
we add a field n
that will tell us how many values
are in the buffer.
Client exposing memory
Starting with the client code for once.
client. (show/hide)
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include "types.h"
int main(int argc, char** argv)
{
if(argc != 2) {
fprintf(stderr,"Usage: %s <server address>\n", argv[0]);
exit(0);
}
margo_instance_id mid = margo_init("tcp", MARGO_CLIENT_MODE, 0, 0);
margo_set_log_level(mid, MARGO_LOG_DEBUG);
hg_id_t sum_rpc_id = MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, NULL);
hg_addr_t svr_addr;
margo_addr_lookup(mid, argv[1], &svr_addr);
int i;
sum_in_t args;
for(i=0; i<4; i++) {
int32_t values[10] = { 1,4,2,5,6,3,5,3,2,5 };
hg_size_t segment_sizes[1] = { 10*sizeof(int32_t) };
void* segment_ptrs[1] = { (void*)values };
hg_bulk_t local_bulk;
margo_bulk_create(mid, 1, segment_ptrs, segment_sizes, HG_BULK_READ_ONLY, &local_bulk);
args.n = 10;
args.bulk = local_bulk;
hg_handle_t h;
margo_create(mid, svr_addr, sum_rpc_id, &h);
margo_forward(h, &args);
sum_out_t resp;
margo_get_output(h, &resp);
margo_debug(mid, "Got response: %d", resp.ret);
margo_free_output(h,&resp);
margo_destroy(h);
margo_bulk_free(local_bulk);
}
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return 0;
}
We allocate the values
buffer as an array of 10 integers
(this array is on the stack in this example. An array allocated
on the heap would work just the same).
margo_bulk_create
is used to create an hg_bulk_t
handle representing the segment of memory exposed by the client.
Its first parameter is the margo_instance_id
. Then come
the number of segments to expose, a void**
array of
addresses pointing to each segment, a hg_size_t*
array
of sizes for each segment, and the mode used to expose the
memory region. HG_BULK_READ_ONLY
indicates that Margo
will only read (i.e., the server will only pull) from this segment.
HG_BULK_WRITE_ONLY
indicates that Margo will only write
to the segment and HG_BULK_READWRITE
indicates that both
operations may happen.
The bulk handle is freed after being used, using
margo_bulk_free
.
Server pulling from client
Let’s now take a look at the server.
server.c (show/hide)
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <margo.h>
#include "types.h"
static const int TOTAL_RPCS = 16;
static int num_rpcs = 0;
static void sum(hg_handle_t h);
DECLARE_MARGO_RPC_HANDLER(sum)
int main(int argc, char** argv)
{
margo_instance_id mid = margo_init("tcp", MARGO_SERVER_MODE, 0, 0);
assert(mid);
margo_set_log_level(mid, MARGO_LOG_INFO);
hg_addr_t my_address;
margo_addr_self(mid, &my_address);
char addr_str[128];
size_t addr_str_size = 128;
margo_addr_to_string(mid, addr_str, &addr_str_size, my_address);
margo_addr_free(mid,my_address);
margo_info(mid, "Server running at address %s\n", addr_str);
MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, sum);
margo_wait_for_finalize(mid);
return 0;
}
static void sum(hg_handle_t h)
{
hg_return_t ret;
num_rpcs += 1;
sum_in_t in;
sum_out_t out;
int32_t* values;
hg_bulk_t local_bulk;
margo_instance_id mid = margo_hg_handle_get_instance(h);
const struct hg_info* info = margo_get_info(h);
hg_addr_t client_addr = info->addr;
ret = margo_get_input(h, &in);
assert(ret == HG_SUCCESS);
values = calloc(in.n, sizeof(*values));
hg_size_t buf_size = in.n * sizeof(*values);
ret = margo_bulk_create(mid, 1, (void**)&values, &buf_size,
HG_BULK_WRITE_ONLY, &local_bulk);
assert(ret == HG_SUCCESS);
ret = margo_bulk_transfer(mid, HG_BULK_PULL, client_addr,
in.bulk, 0, local_bulk, 0, buf_size);
assert(ret == HG_SUCCESS);
out.ret = 0;
int i;
for(i = 0; i < in.n; i++) {
out.ret += values[i];
}
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
ret = margo_bulk_free(local_bulk);
assert(ret == HG_SUCCESS);
free(values);
ret = margo_free_input(h, &in);
assert(ret == HG_SUCCESS);
ret = margo_destroy(h);
assert(ret == HG_SUCCESS);
if(num_rpcs == TOTAL_RPCS) {
margo_finalize(mid);
}
}
DEFINE_MARGO_RPC_HANDLER(sum)
Within the RPC handler, after deserializing the RPC’s input, we allocate an array of appropriate size:
values = calloc(in.n, sizeof(*values));
We then expose it the same way as we did on the client side,
to get a local bulk handle, using margo_bulk_create
.
This time we specify that this handle will be only written.
margo_bulk_transfer
is used to do the transfer.
Here we pull (HG_BULK_PULL
) the data from the client’s memory
to the server’s local memory. We provide the client’s address
(obtained from the hg_info structure of the RPC handle),
the offset in the client’s memory region (here 0)
and on the local memory region (0 as well),
as well as the size in bytes.
Once the transfer is completed, we perform the sum and
return it to the client. We don’t forget to use margo_bulk_free
to free the bulk handle we created (the bulk handle in the in
structure will be freed by margo_free_input
, which is why
it is so important that this function be called).