Timed RDMA transfers

This tutorial extends Transferring data over RDMA by showing how to attach a deadline to bulk RDMA operations. Three new Margo functions underlie this feature:

  • margo_bulk_transfer_timed — synchronous transfer with a millisecond deadline

  • margo_bulk_itransfer_timed — asynchronous transfer with a millisecond deadline

Both are exposed through the timed() method on thallium::remote_bulk.

Synchronous timed transfers

Calling .timed(deadline) on a remote_bulk object returns a timed_remote_bulk proxy that behaves exactly like remote_bulk, but routes all transfers through the timed Margo functions.

Server

#include <iostream>
#include <chrono>
#include <thallium.hpp>

namespace tl = thallium;

int main() {

    tl::engine myEngine("tcp://127.0.0.1:1234", THALLIUM_SERVER_MODE);

    std::function<void(const tl::request&, tl::bulk&)> f =
        [&myEngine](const tl::request& req, tl::bulk& b) {
            tl::endpoint ep = req.get_endpoint();
            std::vector<char> v(6);
            std::vector<std::pair<void*,std::size_t>> segments(1);
            segments[0].first  = (void*)(&v[0]);
            segments[0].second = v.size();
            tl::bulk local = myEngine.expose(segments, tl::bulk_mode::write_only);
            try {
                b.on(ep).timed(std::chrono::seconds(5)) >> local;
                std::cout << "Received: ";
                for(auto c : v) std::cout << c;
                std::cout << std::endl;
            } catch(const tl::timeout&) {
                std::cout << "Transfer timed out!" << std::endl;
            }
            req.respond();
        };
    myEngine.define("do_timed_rdma", f);

    myEngine.wait_for_finalize();
}

The key line is:

b.on(ep).timed(std::chrono::seconds(5)) >> local;

timed() accepts either a double (milliseconds) or any std::chrono::duration:

b.on(ep).timed(5000.0)                    >> local;  // 5 s as double ms
b.on(ep).timed(std::chrono::seconds(5))   >> local;  // std::chrono overload
b.on(ep).timed(std::chrono::milliseconds(500)) >> local;

If the deadline expires before the transfer completes, >> (or <<) throws tl::timeout. Any other Margo error throws tl::margo_exception.

Client

#include <iostream>
#include <thallium.hpp>

namespace tl = thallium;

int main() {

    tl::engine myEngine("tcp", MARGO_CLIENT_MODE);
    tl::remote_procedure remote_do_rdma = myEngine.define("do_timed_rdma");
    tl::endpoint server_endpoint = myEngine.lookup("tcp://127.0.0.1:1234");

    std::string buffer = "Matthieu";
    std::vector<std::pair<void*,std::size_t>> segments(1);
    segments[0].first  = (void*)(&buffer[0]);
    segments[0].second = buffer.size()+1;

    tl::bulk myBulk = myEngine.expose(segments, tl::bulk_mode::read_only);

    remote_do_rdma.on(server_endpoint)(myBulk);

    return 0;
}

The client side is identical to a regular RDMA client — only the server side decides whether to use a timed transfer.

Asynchronous timed transfers

timed_remote_bulk also exposes pull_to and push_from, which start the transfer without blocking and return an async_bulk_op. Calling wait() on the returned object blocks until the transfer completes or the deadline expires.

tl::async_bulk_op op = b.on(ep).timed(std::chrono::seconds(5)).pull_to(local);
// ... overlap other computation here ...
try {
    std::size_t n = op.wait();  // blocks; throws tl::timeout if deadline expired
} catch(const tl::timeout&) {
    // handle timeout
}

Similarly for push_from:

tl::async_bulk_op op = b.on(ep).timed(5000.0).push_from(local);
std::size_t n = op.wait();

Note

A zero deadline (timed(0.0)) is equivalent to calling the non-timed variants (operator>> / operator<< / pull_to / push_from) directly, since the underlying margo_bulk_transfer and margo_bulk_itransfer functions are themselves thin wrappers around their timed counterparts with timeout_ms = 0.

API summary

// -- on remote_bulk --

// Returns a timed_remote_bulk proxy (double overload)
timed_remote_bulk remote_bulk::timed(double timeout_ms) const noexcept;

// Returns a timed_remote_bulk proxy (std::chrono::duration overload)
template<typename Rep, typename Period>
timed_remote_bulk remote_bulk::timed(
    const std::chrono::duration<Rep,Period>& d) const noexcept;

// -- on timed_remote_bulk --

// Synchronous pull (remote → local): throws tl::timeout on expiry
std::size_t timed_remote_bulk::operator>>(const bulk_segment& dest) const;

// Synchronous push (local → remote): throws tl::timeout on expiry
std::size_t timed_remote_bulk::operator<<(const bulk_segment& src) const;

// Asynchronous pull: wait() throws tl::timeout on expiry
async_bulk_op timed_remote_bulk::pull_to(const bulk_segment& dest) const;

// Asynchronous push: wait() throws tl::timeout on expiry
async_bulk_op timed_remote_bulk::push_from(const bulk_segment& src) const;