Non-blocking calls
PyMargo RPC calls, as well as calls to respond
and
transfer
, can be made non-blocking by passing blocking=False
as parameter. This will make these functions return a Request
object,
which has a test()
and a wait()
function. The former
returns whether the request has completed, without blocking. The latter
actually blocks until the request completed.
The server and client codes bellow examplify the use of non-blocking calls.
from pymargo.core import Engine, Handle
from pymargo.bulk import Bulk
import pymargo.bulk
class Receiver:
def __init__(self, engine):
self.engine = engine
def do_bulk_transfer(self, handle: Handle, remote_bulk: Bulk, bulk_size: int):
local_buffer = bytes(bulk_size)
local_bulk = self.engine.create_bulk(local_buffer, pymargo.bulk.write_only)
req = self.engine.transfer(
op=pymargo.bulk.pull,
origin_addr=handle.address,
origin_handle=remote_bulk,
origin_offset=0,
local_handle=local_bulk,
local_offset=0,
size=bulk_size,
blocking=False)
# do something in parallel here
req.test() # will return True if the request completed
req.wait() # will block until request is completed
# note: req.wait() is still necessary even if the req.test()
# returned true, as it frees the internal request
print(local_buffer)
req = handle.respond("RDMA completed", blocking=False)
# do more things will response is sent
req.wait() # block until response is sent
if __name__ == "__main__":
with Engine("tcp") as engine:
receiver = Receiver(engine)
engine.register("do_bulk_transfer", receiver.do_bulk_transfer)
print(f"Service running at {engine.address}")
engine.enable_remote_shutdown()
engine.wait_for_finalize()
import sys
from pymargo.core import Engine
import pymargo
if __name__ == "__main__":
with Engine("tcp", mode=pymargo.client) as engine:
do_bulk_transfer = engine.register("do_bulk_transfer")
address = engine.lookup(sys.argv[1])
buffer = b"This is a bytes buffer"
local_bulk = engine.create_bulk(buffer, pymargo.bulk.read_only)
req = do_bulk_transfer.on(address)(local_bulk, len(buffer), blocking=False)
# do something in parallel
response = req.wait() # wait for the request to complete
assert response == "RDMA completed"
address.shutdown()
Important
wait
needs to be called even if test
returned True
.
For starter, calling wait
on an RPC request is what will actually
return the RPC’s response, but this is also what will make the Request
object free its internal resources.