RDMA transfers
Mercury can use RDMA to transfer large amounts of data. In this tutorial we will demonstrate how to use this feature by transfering the content of a file from a client to a server.
Input/output structures
Like in our earlier examples, we need to define the structures used for RPC inputs and outputs. These are as follows.
types.h (show/hide)
#ifndef PARAM_H
#define PARAM_H
#include <mercury.h>
#include <mercury_bulk.h>
#include <mercury_proc_string.h>
#include <mercury_macros.h>
MERCURY_GEN_PROC(save_in_t,
((hg_string_t)(filename))\
((hg_size_t)(size))\
((hg_bulk_t)(bulk_handle)))
MERCURY_GEN_PROC(save_out_t, ((int32_t)(ret)))
#endif
The client will send the name of the file (hg_string_t
),
its size (hg_size_t
), and a bulk handle representing the
region of memory exposed by the client and containing the content
of the file.
The server will simply respond with an integer indicating whether the operation was succesful.
Client code
The client code is as follows.
client.c (show/hide)
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <mercury.h>
#include "types.h"
typedef struct {
hg_class_t* hg_class;
hg_context_t* hg_context;
hg_id_t save_rpc_id;
int completed;
} client_state;
typedef struct {
client_state* state;
hg_bulk_t bulk_handle;
void* buffer;
size_t size;
char* filename;
} save_operation;
hg_return_t lookup_callback(const struct hg_cb_info *callback_info);
hg_return_t save_completed(const struct hg_cb_info *info);
int main(int argc, char** argv)
{
if(argc != 4) {
fprintf(stderr,"Usage: %s <protocol> <server address> <filename>\n", argv[0]);
exit(0);
}
hg_return_t ret;
const char* protocol = argv[1];
/* Local instance of the client_state. */
client_state state;
state.completed = 0;
// Initialize an hg_class.
state.hg_class = HG_Init(protocol, HG_FALSE);
assert(state.hg_class != NULL);
// Creates a context for the hg_class.
state.hg_context = HG_Context_create(state.hg_class);
assert(state.hg_context != NULL);
// Register a RPC function
state.save_rpc_id = MERCURY_REGISTER(state.hg_class, "save", save_in_t, save_out_t, NULL);
// Create the save_operation structure
save_operation save_op;
save_op.state = &state;
save_op.filename = argv[3];
if(access(save_op.filename, F_OK) == -1) {
fprintf(stderr,"File %s doesn't exist or cannot be accessed.\n",save_op.filename);
exit(-1);
}
char* server_address = argv[2];
ret = HG_Addr_lookup(state.hg_context, lookup_callback, &save_op, server_address, HG_OP_ID_IGNORE);
// Main event loop
while(!state.completed)
{
unsigned int count;
do {
ret = HG_Trigger(state.hg_context, 0, 1, &count);
} while((ret == HG_SUCCESS) && count && !state.completed);
HG_Progress(state.hg_context, 100);
}
// Destroy the context
ret = HG_Context_destroy(state.hg_context);
assert(ret == HG_SUCCESS);
// Finalize the hg_class.
hg_return_t err = HG_Finalize(state.hg_class);
assert(err == HG_SUCCESS);
return 0;
}
hg_return_t lookup_callback(const struct hg_cb_info *callback_info)
{
hg_return_t ret;
assert(callback_info->ret == 0);
/* We get the pointer to the client_state here. */
save_operation* save_op = (save_operation*)(callback_info->arg);
client_state* state = save_op->state;
/* Check file size to allocate buffer. */
FILE* file = fopen(save_op->filename,"r");
fseek(file, 0L, SEEK_END);
save_op->size = ftell(file);
fseek(file, 0L, SEEK_SET);
save_op->buffer = calloc(1, save_op->size);
size_t bytes_read = fread(save_op->buffer,1,save_op->size,file);
fclose(file);
hg_addr_t addr = callback_info->info.lookup.addr;
hg_handle_t handle;
ret = HG_Create(state->hg_context, addr, state->save_rpc_id, &handle);
assert(ret == HG_SUCCESS);
save_in_t in;
in.filename = save_op->filename;
in.size = save_op->size;
ret = HG_Bulk_create(state->hg_class, 1, (void**) &(save_op->buffer), &(save_op->size),
HG_BULK_READ_ONLY, &(save_op->bulk_handle));
assert(ret == HG_SUCCESS);
in.bulk_handle = save_op->bulk_handle;
/* The state pointer is passed along as user argument. */
ret = HG_Forward(handle, save_completed, save_op, &in);
assert(ret == HG_SUCCESS);
/* Free the address. */
ret = HG_Addr_free(state->hg_class, addr);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
hg_return_t save_completed(const struct hg_cb_info *info)
{
hg_return_t ret;
/* Get the state pointer from the user-provided arguments. */
save_operation* save_op = (save_operation*)(info->arg);
client_state* state = (client_state*)(save_op->state);
save_out_t out;
assert(info->ret == HG_SUCCESS);
ret = HG_Get_output(info->info.forward.handle, &out);
assert(ret == HG_SUCCESS);
printf("Got response: %d\n", out.ret);
ret = HG_Bulk_free(save_op->bulk_handle);
assert(ret == HG_SUCCESS);
ret = HG_Free_output(info->info.forward.handle, &out);
assert(ret == HG_SUCCESS);
ret = HG_Destroy(info->info.forward.handle);
assert(ret == HG_SUCCESS);
state->completed = 1;
return HG_SUCCESS;
}
We define a save_operation
structure to keep information
about the on-going operation. This structure will be passed by pointer
as user-provided argument to callbacks.
In the lookup callback, we open the file and read its content into
a buffer. We then use HG_Bulk_create
to expose the buffer
for RDMA operations. This gives us an hg_bulk_t
object that
can be sent over RPC to the server.
Once the RPC has completed and a response is received, the hg_bulkt_t
object is freed using HG_Bulk_free
.
Server code
The following code corresponds to the server.
server.c (show/hide)
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <mercury.h>
#include "types.h"
/* This structure will encapsulate data about the server. */
typedef struct {
hg_class_t* hg_class;
hg_context_t* hg_context;
} server_state;
typedef struct {
char* filename;
hg_size_t size;
void* buffer;
hg_bulk_t bulk_handle;
hg_handle_t handle;
} rpc_state;
static hg_return_t save_bulk_completed(const struct hg_cb_info *info);
static hg_return_t save(hg_handle_t h);
int main(int argc, char** argv)
{
hg_return_t ret;
if(argc != 2) {
printf("Usage: %s <server address>\n", argv[0]);
exit(0);
}
const char* server_address = argv[1];
server_state state; // Instance of the server's state
state.hg_class = HG_Init(server_address, HG_TRUE);
assert(state.hg_class != NULL);
/* Get the address of the server */
char hostname[128];
hg_size_t hostname_size = 128;
hg_addr_t self_addr;
HG_Addr_self(state.hg_class,&self_addr);
HG_Addr_to_string(state.hg_class, hostname, &hostname_size, self_addr);
printf("Server running at address %s\n",hostname);
state.hg_context = HG_Context_create(state.hg_class);
assert(state.hg_context != NULL);
hg_id_t rpc_id = MERCURY_REGISTER(state.hg_class, "save", save_in_t, save_out_t, save);
/* Attach the local server_state to the RPC so we can get a pointer to it when
* the RPC is invoked. */
ret = HG_Register_data(state.hg_class, rpc_id, &state, NULL);
do
{
unsigned int count;
do {
ret = HG_Trigger(state.hg_context, 0, 1, &count);
} while((ret == HG_SUCCESS) && count);
HG_Progress(state.hg_context, 100);
} while(1);
ret = HG_Context_destroy(state.hg_context);
assert(ret == HG_SUCCESS);
ret = HG_Finalize(state.hg_class);
assert(ret == HG_SUCCESS);
return 0;
}
hg_return_t save(hg_handle_t handle)
{
hg_return_t ret;
save_in_t in;
// Get the server_state attached to the RPC.
const struct hg_info* info = HG_Get_info(handle);
server_state* stt = HG_Registered_data(info->hg_class, info->id);
ret = HG_Get_input(handle, &in);
assert(ret == HG_SUCCESS);
rpc_state* my_rpc_state = (rpc_state*)calloc(1,sizeof(rpc_state));
my_rpc_state->handle = handle;
my_rpc_state->filename = strdup(in.filename);
my_rpc_state->size = in.size;
my_rpc_state->buffer = calloc(1,in.size);
ret = HG_Bulk_create(stt->hg_class, 1, &(my_rpc_state->buffer),
&(my_rpc_state->size), HG_BULK_WRITE_ONLY, &(my_rpc_state->bulk_handle));
assert(ret == HG_SUCCESS);
/* initiate bulk transfer from client to server */
ret = HG_Bulk_transfer(stt->hg_context, save_bulk_completed,
my_rpc_state, HG_BULK_PULL, info->addr, in.bulk_handle, 0,
my_rpc_state->bulk_handle, 0, my_rpc_state->size, HG_OP_ID_IGNORE);
assert(ret == HG_SUCCESS);
ret = HG_Free_input(handle, &in);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
hg_return_t save_bulk_completed(const struct hg_cb_info *info)
{
assert(info->ret == 0);
rpc_state* my_rpc_state = info->arg;
hg_return_t ret;
FILE* f = fopen(my_rpc_state->filename,"w+");
fwrite(my_rpc_state->buffer, 1, my_rpc_state->size, f);
fclose(f);
printf("Writing file %s\n", my_rpc_state->filename);
save_out_t out;
out.ret = 0;
ret = HG_Respond(my_rpc_state->handle, NULL, NULL, &out);
assert(ret == HG_SUCCESS);
(void)ret;
HG_Bulk_free(my_rpc_state->bulk_handle);
HG_Destroy(my_rpc_state->handle);
free(my_rpc_state->filename);
free(my_rpc_state->buffer);
free(my_rpc_state);
return HG_SUCCESS;
}
On the server, the rpc_state
structure will be used to keep
track of information about an on-going operation. In particular, it
contains the hg_handle_t
object of the on-going RPC, and
the hg_bulk_t
object of the local buffer exposed to receive
the data.
Upon receiving an RPC, we enter the save
callback. This
function allocates a local buffer to receive the data and exposes it
using HG_Bulk_create
.
We issue the RDMA operation using HG_Bulk_transfer
, specifying
the HG_BULK_PULL
type of operation, and save_bulk_completed
as a callback to call once the the RDMA operation has completed.
It is important to note that this function returns immediately and the RDMA
operation has not be completed at this point. The save
callback
will return and the Mercury progress loop will continue running, eventually
calling save_bulk_completed
when the RDMA operation has finished.
Note that we don’t respond to the client in the save
callback,
we do in the save_bulk_completed
callback, hence the save
callback does not destroy the RPC’s hg_handle_t
object. This
object is kept and freed in save_bulk_completed
.
Ahhh, callbacks… (now you understand how much easier Margo and Thallium are).