RDMA transfers

Mercury can use RDMA to transfer large amounts of data. In this tutorial we will demonstrate how to use this feature by transfering the content of a file from a client to a server.

Input/output structures

Like in our earlier examples, we need to define the structures used for RPC inputs and outputs. These are as follows.

types.h (show/hide)

#ifndef PARAM_H
#define PARAM_H

#include <mercury.h>
#include <mercury_bulk.h>
#include <mercury_proc_string.h>
#include <mercury_macros.h>

MERCURY_GEN_PROC(save_in_t,
    ((hg_string_t)(filename))\
	((hg_size_t)(size))\
    ((hg_bulk_t)(bulk_handle)))

MERCURY_GEN_PROC(save_out_t, ((int32_t)(ret)))

#endif

The client will send the name of the file (hg_string_t), its size (hg_size_t), and a bulk handle representing the region of memory exposed by the client and containing the content of the file.

The server will simply respond with an integer indicating whether the operation was succesful.

Client code

The client code is as follows.

client.c (show/hide)

#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <mercury.h>
#include "types.h"

typedef struct {
    hg_class_t*   hg_class;
    hg_context_t* hg_context;
    hg_id_t       save_rpc_id;
    int           completed;
} client_state;

typedef struct {
    client_state*   state;
    hg_bulk_t       bulk_handle;
    void*           buffer;
    size_t          size;
    char*           filename;
} save_operation;

hg_return_t lookup_callback(const struct hg_cb_info *callback_info);
hg_return_t save_completed(const struct hg_cb_info *info);

int main(int argc, char** argv)
{
    if(argc != 4) {
        fprintf(stderr,"Usage: %s <protocol> <server address> <filename>\n", argv[0]);
        exit(0);
    }

    hg_return_t ret;

    const char* protocol = argv[1];

    /* Local instance of the client_state. */
    client_state state;
    state.completed = 0;
    // Initialize an hg_class.
    state.hg_class = HG_Init(protocol, HG_FALSE);
    assert(state.hg_class != NULL);

    // Creates a context for the hg_class.
    state.hg_context = HG_Context_create(state.hg_class);
    assert(state.hg_context != NULL);

    // Register a RPC function
    state.save_rpc_id = MERCURY_REGISTER(state.hg_class, "save", save_in_t, save_out_t, NULL);

    // Create the save_operation structure
    save_operation save_op;
    save_op.state = &state;
    save_op.filename = argv[3];
    if(access(save_op.filename, F_OK) == -1) {
        fprintf(stderr,"File %s doesn't exist or cannot be accessed.\n",save_op.filename);
        exit(-1);
    } 

    char* server_address = argv[2];
    ret = HG_Addr_lookup(state.hg_context, lookup_callback, &save_op, server_address, HG_OP_ID_IGNORE);

    // Main event loop
    while(!state.completed)
    {
        unsigned int count;
        do {
            ret = HG_Trigger(state.hg_context, 0, 1, &count);
        } while((ret == HG_SUCCESS) && count && !state.completed);
        HG_Progress(state.hg_context, 100);
    }

    // Destroy the context
    ret = HG_Context_destroy(state.hg_context);
    assert(ret == HG_SUCCESS);

    // Finalize the hg_class.
    hg_return_t err = HG_Finalize(state.hg_class);
    assert(err == HG_SUCCESS);
    return 0;
}


hg_return_t lookup_callback(const struct hg_cb_info *callback_info)
{
    hg_return_t ret;

    assert(callback_info->ret == 0);

    /* We get the pointer to the client_state here. */
    save_operation* save_op = (save_operation*)(callback_info->arg);
    client_state* state = save_op->state;

    /* Check file size to allocate buffer. */
    FILE* file = fopen(save_op->filename,"r");
    fseek(file, 0L, SEEK_END);
    save_op->size = ftell(file);
    fseek(file, 0L, SEEK_SET);
    save_op->buffer = calloc(1, save_op->size);
    size_t bytes_read = fread(save_op->buffer,1,save_op->size,file);
    fclose(file);

    hg_addr_t addr = callback_info->info.lookup.addr;
    hg_handle_t handle;
    ret = HG_Create(state->hg_context, addr, state->save_rpc_id, &handle);
    assert(ret == HG_SUCCESS);

    save_in_t in;
    in.filename = save_op->filename;
    in.size     = save_op->size; 

    ret = HG_Bulk_create(state->hg_class, 1, (void**) &(save_op->buffer), &(save_op->size),
            HG_BULK_READ_ONLY, &(save_op->bulk_handle));
    assert(ret == HG_SUCCESS);
    in.bulk_handle = save_op->bulk_handle;

    /* The state pointer is passed along as user argument. */
    ret = HG_Forward(handle, save_completed, save_op, &in);
    assert(ret == HG_SUCCESS);

    /* Free the address. */
    ret = HG_Addr_free(state->hg_class, addr);
    assert(ret == HG_SUCCESS);

    return HG_SUCCESS;
}

hg_return_t save_completed(const struct hg_cb_info *info)
{
    hg_return_t ret;

    /* Get the state pointer from the user-provided arguments. */
    save_operation* save_op = (save_operation*)(info->arg);
    client_state* state = (client_state*)(save_op->state);

    save_out_t out;
    assert(info->ret == HG_SUCCESS);

    ret = HG_Get_output(info->info.forward.handle, &out);
    assert(ret == HG_SUCCESS);

    printf("Got response: %d\n", out.ret);

    ret = HG_Bulk_free(save_op->bulk_handle);
    assert(ret == HG_SUCCESS);

    ret = HG_Free_output(info->info.forward.handle, &out);
    assert(ret == HG_SUCCESS);

    ret = HG_Destroy(info->info.forward.handle);
    assert(ret == HG_SUCCESS);

    state->completed = 1;

    return HG_SUCCESS;
}

We define a save_operation structure to keep information about the on-going operation. This structure will be passed by pointer as user-provided argument to callbacks.

In the lookup callback, we open the file and read its content into a buffer. We then use HG_Bulk_create to expose the buffer for RDMA operations. This gives us an hg_bulk_t object that can be sent over RPC to the server.

Once the RPC has completed and a response is received, the hg_bulkt_t object is freed using HG_Bulk_free.

Server code

The following code corresponds to the server.

server.c (show/hide)

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <mercury.h>
#include "types.h"

/* This structure will encapsulate data about the server. */
typedef struct {
    hg_class_t*     hg_class;
    hg_context_t*   hg_context;
} server_state;

typedef struct {
    char*       filename;
    hg_size_t   size;
    void*       buffer;
    hg_bulk_t   bulk_handle;
    hg_handle_t handle;
} rpc_state;

static hg_return_t save_bulk_completed(const struct hg_cb_info *info);
static hg_return_t save(hg_handle_t h);

int main(int argc, char** argv)
{
    hg_return_t ret;

    if(argc != 2) {
        printf("Usage: %s <server address>\n", argv[0]);
        exit(0);
    }

    const char* server_address = argv[1];

    server_state state; // Instance of the server's state

    state.hg_class = HG_Init(server_address, HG_TRUE);
    assert(state.hg_class != NULL);

    /* Get the address of the server */
    char hostname[128];
    hg_size_t hostname_size = 128;
    hg_addr_t self_addr;
    HG_Addr_self(state.hg_class,&self_addr);
    HG_Addr_to_string(state.hg_class, hostname, &hostname_size, self_addr);
    printf("Server running at address %s\n",hostname);

    state.hg_context = HG_Context_create(state.hg_class);
    assert(state.hg_context != NULL);

    hg_id_t rpc_id = MERCURY_REGISTER(state.hg_class, "save", save_in_t, save_out_t, save);

    /* Attach the local server_state to the RPC so we can get a pointer to it when
     * the RPC is invoked. */
    ret = HG_Register_data(state.hg_class, rpc_id, &state, NULL);

    do
    {
        unsigned int count;
        do {
            ret = HG_Trigger(state.hg_context, 0, 1, &count);
        } while((ret == HG_SUCCESS) && count);

        HG_Progress(state.hg_context, 100);
    } while(1);

    ret = HG_Context_destroy(state.hg_context);
    assert(ret == HG_SUCCESS);

    ret = HG_Finalize(state.hg_class);
    assert(ret == HG_SUCCESS);

    return 0;
}

hg_return_t save(hg_handle_t handle)
{
    hg_return_t ret;
    save_in_t in;

    // Get the server_state attached to the RPC.
    const struct hg_info* info = HG_Get_info(handle);
    server_state* stt = HG_Registered_data(info->hg_class, info->id);

    ret = HG_Get_input(handle, &in);
    assert(ret == HG_SUCCESS);

    rpc_state* my_rpc_state = (rpc_state*)calloc(1,sizeof(rpc_state));
    my_rpc_state->handle = handle;
    my_rpc_state->filename = strdup(in.filename);
    my_rpc_state->size = in.size;
    my_rpc_state->buffer = calloc(1,in.size);

    ret = HG_Bulk_create(stt->hg_class, 1, &(my_rpc_state->buffer),
            &(my_rpc_state->size), HG_BULK_WRITE_ONLY, &(my_rpc_state->bulk_handle));
    assert(ret == HG_SUCCESS);

    /* initiate bulk transfer from client to server */
    ret = HG_Bulk_transfer(stt->hg_context, save_bulk_completed,
            my_rpc_state, HG_BULK_PULL, info->addr, in.bulk_handle, 0,
            my_rpc_state->bulk_handle, 0, my_rpc_state->size, HG_OP_ID_IGNORE);
    assert(ret == HG_SUCCESS);

    ret = HG_Free_input(handle, &in);
    assert(ret == HG_SUCCESS);
    return HG_SUCCESS;
}

hg_return_t save_bulk_completed(const struct hg_cb_info *info)
{
    assert(info->ret == 0);

    rpc_state* my_rpc_state = info->arg;
    hg_return_t ret;

    FILE* f = fopen(my_rpc_state->filename,"w+");
    fwrite(my_rpc_state->buffer, 1, my_rpc_state->size, f);
    fclose(f);

    printf("Writing file %s\n", my_rpc_state->filename);

    save_out_t out;
    out.ret = 0;

    ret = HG_Respond(my_rpc_state->handle, NULL, NULL, &out);
    assert(ret == HG_SUCCESS);
    (void)ret;

    HG_Bulk_free(my_rpc_state->bulk_handle);
    HG_Destroy(my_rpc_state->handle);
    free(my_rpc_state->filename);
    free(my_rpc_state->buffer);
    free(my_rpc_state);

    return HG_SUCCESS;
}

On the server, the rpc_state structure will be used to keep track of information about an on-going operation. In particular, it contains the hg_handle_t object of the on-going RPC, and the hg_bulk_t object of the local buffer exposed to receive the data.

Upon receiving an RPC, we enter the save callback. This function allocates a local buffer to receive the data and exposes it using HG_Bulk_create.

We issue the RDMA operation using HG_Bulk_transfer, specifying the HG_BULK_PULL type of operation, and save_bulk_completed as a callback to call once the the RDMA operation has completed. It is important to note that this function returns immediately and the RDMA operation has not be completed at this point. The save callback will return and the Mercury progress loop will continue running, eventually calling save_bulk_completed when the RDMA operation has finished.

Note that we don’t respond to the client in the save callback, we do in the save_bulk_completed callback, hence the save callback does not destroy the RPC’s hg_handle_t object. This object is kept and freed in save_bulk_completed.

Ahhh, callbacks… (now you understand how much easier Margo and Thallium are).