Non-blocking calls
PyMargo RPC calls, as well as calls to respond and
transfer, can be made non-blocking by passing blocking=False
as parameter. This will make these functions return a Request object,
which has a test() and a wait() function. The former
returns whether the request has completed, without blocking. The latter
actually blocks until the request completed.
The server and client codes bellow examplify the use of non-blocking calls.
from mochi.margo import Engine, Handle
from mochi.margo.bulk import Bulk
from mochi.margo.margo.bulk import write_only, pull
class Receiver:
def __init__(self, engine):
self.engine = engine
def do_bulk_transfer(self, handle: Handle, remote_bulk: Bulk, bulk_size: int):
local_buffer = bytes(bulk_size)
local_bulk = self.engine.create_bulk(local_buffer, write_only)
req = self.engine.transfer(
op=pull,
origin_addr=handle.address,
origin_handle=remote_bulk,
origin_offset=0,
local_handle=local_bulk,
local_offset=0,
size=bulk_size,
blocking=False)
# do something in parallel here
req.test() # will return True if the request completed
req.wait() # will block until request is completed
# note: req.wait() is still necessary even if the req.test()
# returned true, as it frees the internal request
print(local_buffer)
req = handle.respond("RDMA completed", blocking=False)
# do more things will response is sent
req.wait() # block until response is sent
if __name__ == "__main__":
with Engine("tcp") as engine:
receiver = Receiver(engine)
engine.register("do_bulk_transfer", receiver.do_bulk_transfer)
print(f"Service running at {engine.address}")
engine.enable_remote_shutdown()
engine.wait_for_finalize()
import sys
from mochi.margo import Engine
import mochi.margo as pymargo
if __name__ == "__main__":
with Engine("tcp", mode=pymargo.client) as engine:
do_bulk_transfer = engine.register("do_bulk_transfer")
address = engine.lookup(sys.argv[1])
buffer = b"This is a bytes buffer"
local_bulk = engine.create_bulk(buffer, pymargo.bulk.read_only)
req = do_bulk_transfer.on(address)(local_bulk, len(buffer), blocking=False)
# do something in parallel
response = req.wait() # wait for the request to complete
assert response == "RDMA completed"
address.shutdown()
Important
wait needs to be called even if test returned True.
For starter, calling wait on an RPC request is what will actually
return the RPC’s response, but this is also what will make the Request
object free its internal resources.