Timed RDMA transfers
This tutorial extends Transferring data over RDMA by showing how to attach a deadline to bulk RDMA operations. Three new Margo functions underlie this feature:
margo_bulk_transfer_timed— synchronous transfer with a millisecond deadlinemargo_bulk_itransfer_timed— asynchronous transfer with a millisecond deadline
Both are exposed through the timed() method on thallium::remote_bulk.
Synchronous timed transfers
Calling .timed(deadline) on a remote_bulk object returns a
timed_remote_bulk proxy that behaves exactly like remote_bulk,
but routes all transfers through the timed Margo functions.
Server
#include <iostream>
#include <chrono>
#include <thallium.hpp>
namespace tl = thallium;
int main() {
tl::engine myEngine("tcp://127.0.0.1:1234", THALLIUM_SERVER_MODE);
std::function<void(const tl::request&, tl::bulk&)> f =
[&myEngine](const tl::request& req, tl::bulk& b) {
tl::endpoint ep = req.get_endpoint();
std::vector<char> v(6);
std::vector<std::pair<void*,std::size_t>> segments(1);
segments[0].first = (void*)(&v[0]);
segments[0].second = v.size();
tl::bulk local = myEngine.expose(segments, tl::bulk_mode::write_only);
try {
b.on(ep).timed(std::chrono::seconds(5)) >> local;
std::cout << "Received: ";
for(auto c : v) std::cout << c;
std::cout << std::endl;
} catch(const tl::timeout&) {
std::cout << "Transfer timed out!" << std::endl;
}
req.respond();
};
myEngine.define("do_timed_rdma", f);
myEngine.wait_for_finalize();
}
The key line is:
b.on(ep).timed(std::chrono::seconds(5)) >> local;
timed() accepts either a double (milliseconds) or any
std::chrono::duration:
b.on(ep).timed(5000.0) >> local; // 5 s as double ms
b.on(ep).timed(std::chrono::seconds(5)) >> local; // std::chrono overload
b.on(ep).timed(std::chrono::milliseconds(500)) >> local;
If the deadline expires before the transfer completes, >> (or <<)
throws tl::timeout. Any other Margo error throws tl::margo_exception.
Client
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
int main() {
tl::engine myEngine("tcp", MARGO_CLIENT_MODE);
tl::remote_procedure remote_do_rdma = myEngine.define("do_timed_rdma");
tl::endpoint server_endpoint = myEngine.lookup("tcp://127.0.0.1:1234");
std::string buffer = "Matthieu";
std::vector<std::pair<void*,std::size_t>> segments(1);
segments[0].first = (void*)(&buffer[0]);
segments[0].second = buffer.size()+1;
tl::bulk myBulk = myEngine.expose(segments, tl::bulk_mode::read_only);
remote_do_rdma.on(server_endpoint)(myBulk);
return 0;
}
The client side is identical to a regular RDMA client — only the server side decides whether to use a timed transfer.
Asynchronous timed transfers
timed_remote_bulk also exposes pull_to and push_from,
which start the transfer without blocking and return an async_bulk_op.
Calling wait() on the returned object blocks until the transfer
completes or the deadline expires.
tl::async_bulk_op op = b.on(ep).timed(std::chrono::seconds(5)).pull_to(local);
// ... overlap other computation here ...
try {
std::size_t n = op.wait(); // blocks; throws tl::timeout if deadline expired
} catch(const tl::timeout&) {
// handle timeout
}
Similarly for push_from:
tl::async_bulk_op op = b.on(ep).timed(5000.0).push_from(local);
std::size_t n = op.wait();
Note
A zero deadline (timed(0.0)) is equivalent to calling the
non-timed variants (operator>> / operator<< / pull_to
/ push_from) directly, since the underlying
margo_bulk_transfer and margo_bulk_itransfer functions are
themselves thin wrappers around their timed counterparts with
timeout_ms = 0.
API summary
// -- on remote_bulk --
// Returns a timed_remote_bulk proxy (double overload)
timed_remote_bulk remote_bulk::timed(double timeout_ms) const noexcept;
// Returns a timed_remote_bulk proxy (std::chrono::duration overload)
template<typename Rep, typename Period>
timed_remote_bulk remote_bulk::timed(
const std::chrono::duration<Rep,Period>& d) const noexcept;
// -- on timed_remote_bulk --
// Synchronous pull (remote → local): throws tl::timeout on expiry
std::size_t timed_remote_bulk::operator>>(const bulk_segment& dest) const;
// Synchronous push (local → remote): throws tl::timeout on expiry
std::size_t timed_remote_bulk::operator<<(const bulk_segment& src) const;
// Asynchronous pull: wait() throws tl::timeout on expiry
async_bulk_op timed_remote_bulk::pull_to(const bulk_segment& dest) const;
// Asynchronous push: wait() throws tl::timeout on expiry
async_bulk_op timed_remote_bulk::push_from(const bulk_segment& src) const;