Python Bindings
Warabi provides comprehensive Python bindings for both server and client libraries, enabling blob storage functionality in Python applications.
Installation
To use Warabi’s Python bindings, install Warabi with the +python variant in Spack:
spack install mochi-warabi+python
The Python bindings are available in the mochi.warabi package.
Quick Start
Here’s a simple example of using Warabi from Python:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
# Create a Margo engine
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
# Start a Warabi provider with memory backend
provider = Provider(
engine=engine,
provider_id=42,
config={
"target": {
"type": "memory"
}
}
)
# Create a client
client = Client(engine=engine)
# Get a target handle
target = client.make_target_handle(
address=str(engine.addr()),
provider_id=42
)
# Create a region and write data
data = b"Hello, Warabi!"
region = target.create_and_write(data, persist=True)
print(f"Created region: {region}")
# Read the data back
result = target.read(region, offset=0, size=len(data))
print(f"Read data: {result}")
# Verify
assert result == data
print("Success!")
# Cleanup
engine.finalize()
This example demonstrates:
Creating a Margo engine
Starting a Warabi provider with memory backend
Creating a client and target handle
Creating regions and writing/reading data
Server API
The mochi.warabi.server module provides the Provider class for creating
Warabi providers:
Basic Provider Creation
import mochi.margo
from mochi.warabi.server import Provider
# Create a Margo engine
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
# Create a Warabi provider with memory backend
provider = Provider(
engine=engine,
provider_id=42,
config={
"target": {
"type": "memory"
}
}
)
print(f"Warabi provider started at {engine.addr()}")
print(f"Provider ID: 42")
print("Provider is now accepting requests...")
# In a real application, you would wait here
# For this example, we'll just finalize
engine.finalize()
Backend Configuration
Warabi supports multiple storage backends:
import mochi.margo
from mochi.warabi.server import Provider
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
# Memory backend (volatile, fast)
memory_provider = Provider(
engine=engine,
provider_id=1,
config={
"target": {
"type": "memory"
}
}
)
print("Memory backend provider created (ID: 1)")
# ABT-IO file-based backend (persistent)
abtio_provider = Provider(
engine=engine,
provider_id=2,
config={
"target": {
"type": "abtio",
"config": {
"path": "/tmp/warabi_storage.dat",
"create_if_missing": True
}
}
}
)
print("ABT-IO backend provider created (ID: 2)")
# Persistent memory backend (if available)
try:
pmem_provider = Provider(
engine=engine,
provider_id=3,
config={
"target": {
"type": "pmem",
"config": {
"path": "/mnt/pmem/warabi_data",
"create_if_missing_with_size": 1073741824 # 1GB
}
}
}
)
print("PMem backend provider created (ID: 3)")
except Exception as e:
print(f"PMem backend not available: {e}")
engine.finalize()
Available backends:
memory: Fast in-memory storage (volatile)
pmem: Persistent memory storage
abtio: File-based storage using ABT-IO
Client API
The mochi.warabi.client module provides the Client and TargetHandle
classes for interacting with Warabi providers.
Basic Operations
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)
# Create a region
region = target.create(size=4096)
print(f"Created region: {region}")
# Write data to the region
data = b"Hello, Warabi from Python!"
target.write(region, offset=0, data=data, persist=False)
print(f"Wrote {len(data)} bytes")
# Read data back
result = target.read(region, offset=0, size=len(data))
print(f"Read back: {result}")
assert result == data
# Create and write in one operation
data2 = b"Combined operation!"
region2 = target.create_and_write(data2, persist=True)
print(f"Created and wrote region: {region2}")
# Verify
result2 = target.read(region2, offset=0, size=len(data2))
assert result2 == data2
# Persist data explicitly
target.persist(region, offset=0, size=len(data))
print("Data persisted")
# Erase a region
target.erase(region)
print("Region erased")
engine.finalize()
Key operations:
create(size): Create a new regionwrite(region, offset, data, persist=False): Write data to a regionread(region, offset, size): Read data from a regioncreate_and_write(data, persist=False): Create region and write in one operationerase(region): Delete a regionpersist(region, offset, size): Ensure data is persisted to storage
Working with Regions
Regions are logical containers for blob data:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)
# Create a 1MB region
region = target.create(size=1024 * 1024)
print(f"Created 1MB region: {region}")
# Write data at different offsets
chunk1 = b"First chunk of data"
chunk2 = b"Second chunk"
chunk3 = b"Third chunk at the end"
target.write(region, offset=0, data=chunk1, persist=False)
target.write(region, offset=1000, data=chunk2, persist=False)
target.write(region, offset=1000000, data=chunk3, persist=False)
print("Wrote data at offsets: 0, 1000, 1000000")
# Read back specific chunks
result1 = target.read(region, offset=0, size=len(chunk1))
result2 = target.read(region, offset=1000, size=len(chunk2))
result3 = target.read(region, offset=1000000, size=len(chunk3))
assert result1 == chunk1
assert result2 == chunk2
assert result3 == chunk3
print("All chunks verified!")
# Persist the entire region
target.persist(region, offset=0, size=1024 * 1024)
print("Region persisted")
# Create multiple regions for different purposes
metadata_region = target.create(size=4096) # 4KB for metadata
data_region = target.create(size=10 * 1024 * 1024) # 10MB for data
print(f"Metadata region: {metadata_region}")
print(f"Data region: {data_region}")
# Clean up
target.erase(region)
target.erase(metadata_region)
target.erase(data_region)
engine.finalize()
Regions support:
Fixed-size allocation
Offset-based read/write operations
Persistence control
Efficient data management
Buffer Protocol Support
Like Yokan, Warabi supports Python’s buffer protocol for efficient zero-copy operations:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)
# Using bytes (simple but creates copies)
data_bytes = b"Hello from bytes"
region1 = target.create_and_write(data_bytes)
print("Wrote using bytes")
# Using bytearray (more efficient for large data)
data_bytearray = bytearray(b"Hello from bytearray")
region2 = target.create_and_write(data_bytearray)
print("Wrote using bytearray")
# Reading into pre-allocated buffer (zero-copy)
buffer = bytearray(len(data_bytes))
target.read_into(region1, offset=0, buffer=buffer)
print(f"Read into buffer: {bytes(buffer)}")
assert bytes(buffer) == data_bytes
# Using memoryview for large data
large_data = bytearray(10 * 1024 * 1024) # 10MB
large_data[0:5] = b"START"
large_data[-5:] = b"END!!"
region3 = target.create_and_write(memoryview(large_data))
print(f"Wrote 10MB using memoryview (zero-copy)")
# Read back into preallocated buffer
read_buffer = bytearray(10 * 1024 * 1024)
target.read_into(region3, offset=0, buffer=read_buffer)
# Verify
assert read_buffer[0:5] == b"START"
assert read_buffer[-5:] == b"END!!"
print("Large data verified!")
# Partial reads with buffer
partial_buffer = bytearray(5)
target.read_into(region3, offset=0, buffer=partial_buffer)
assert bytes(partial_buffer) == b"START"
print("Partial read successful")
engine.finalize()
Using buffer protocol objects (bytearray, memoryview, NumPy arrays) avoids
memory copies and improves performance for large data transfers.
Asynchronous Operations
Warabi provides async operations for non-blocking I/O:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)
# Asynchronous create
print("Creating region asynchronously...")
create_req = target.create_async(size=4096)
# Do other work while region is being created
print("Doing other work...")
# Wait for creation to complete
region = create_req.wait()
print(f"Region created: {region}")
# Asynchronous write
data = b"Async write data"
print("Writing asynchronously...")
write_req = target.write_async(region, offset=0, data=data, persist=False)
# Do other work
print("Doing other work while writing...")
# Wait for write to complete
write_req.wait()
print("Write completed")
# Asynchronous read
buffer = bytearray(len(data))
print("Reading asynchronously...")
read_req = target.read_async(region, offset=0, buffer=buffer)
# Do other work
print("Doing other work while reading...")
# Wait for read to complete
read_req.wait()
print(f"Read completed: {bytes(buffer)}")
assert bytes(buffer) == data
# Asynchronous persist
print("Persisting asynchronously...")
persist_req = target.persist_async(region, offset=0, size=len(data))
# Do other work
print("Doing other work while persisting...")
# Wait for persist to complete
persist_req.wait()
print("Persist completed")
# Multiple concurrent async operations
print("\nConcurrent async operations:")
regions = []
requests = []
# Issue multiple async creates
for i in range(5):
req = target.create_async(size=1024)
requests.append(req)
# Wait for all to complete
for i, req in enumerate(requests):
region = req.wait()
regions.append(region)
print(f"Region {i} created: {region}")
# Write to all regions concurrently
write_requests = []
for i, region in enumerate(regions):
data = f"Data for region {i}".encode()
req = target.write_async(region, offset=0, data=data, persist=False)
write_requests.append(req)
# Wait for all writes
for req in write_requests:
req.wait()
print("All writes completed")
# Test completion without waiting
print("\nTesting completion status:")
req = target.create_async(size=512)
print(f"Requested completed: {req.completed()}")
# IMPORTANT: the following code will may loop indefinitely
# if the Warabi provider and the progress loop are located
# in the same xstream as the Python code, as the loop never
# yields to them.
# while not req.completed():
# print("Not yet complete, doing other work...")
# # In real code, do actual work here
# pass
region = req.wait() # Get the result
print(f"Async create tested and completed: {region}")
engine.finalize()
Async operations return AsyncRequest or AsyncCreateRequest objects that
can be used to:
Wait for completion with
wait()Test completion with
completed()
Persistence Control
Control when data is persisted to storage:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
# Use ABT-IO backend for persistent storage
provider = Provider(
engine=engine,
provider_id=42,
config={
"target": {
"type": "abtio",
"config": {
"path": "/tmp/warabi_persist_demo.dat",
"create_if_missing": True
}
}
}
)
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)
# Strategy 1: Immediate persistence
print("=== Immediate Persistence ===")
data1 = b"Immediately persisted data"
region1 = target.create_and_write(data1, persist=True)
print("Data written and persisted immediately")
# Strategy 2: Explicit persistence
print("\n=== Explicit Persistence ===")
region2 = target.create(size=4096)
data2 = b"Write first, persist later"
target.write(region2, offset=0, data=data2, persist=False)
print("Data written (not yet persisted)")
# Do more writes if needed
target.persist(region2, offset=0, size=len(data2))
print("Data now persisted explicitly")
# Strategy 3: Batch persistence
print("\n=== Batch Persistence ===")
region3 = target.create(size=10240)
# Multiple writes without persistence
chunks = [
(0, b"Chunk 1"),
(100, b"Chunk 2"),
(200, b"Chunk 3"),
(300, b"Chunk 4"),
]
for offset, data in chunks:
target.write(region3, offset=offset, data=data, persist=False)
print(f"Wrote chunk at offset {offset} (not persisted)")
# Persist all at once
target.persist(region3, offset=0, size=400)
print("All chunks persisted in one operation")
# Strategy 4: Asynchronous persistence
print("\n=== Asynchronous Persistence ===")
region4 = target.create(size=4096)
data4 = b"Async persisted data"
# Write without persistence
target.write(region4, offset=0, data=data4, persist=False)
print("Data written (not persisted)")
# Async persist (non-blocking)
req = target.persist_async(region4, offset=0, size=len(data4))
print("Async persist issued, doing other work...")
# Do other work here
req.wait()
print("Async persist completed")
# Verification: Read back all data
print("\n=== Verification ===")
result1 = target.read(region1, 0, len(data1))
assert result1 == data1
print("Region 1 verified")
result2 = target.read(region2, 0, len(data2))
assert result2 == data2
print("Region 2 verified")
for offset, expected in chunks:
result = target.read(region3, offset, len(expected))
assert result == expected
print("Region 3 (all chunks) verified")
result4 = target.read(region4, 0, len(data4))
assert result4 == data4
print("Region 4 verified")
print("\nAll data successfully persisted and verified!")
engine.finalize()
Persistence strategies:
Immediate:
persist=Truein write operationsExplicit: Call
persist()after writesBatch: Group multiple writes, then persist once
Async: Use
persist_async()for non-blocking persistence
Working with NumPy Arrays
Warabi integrates seamlessly with NumPy:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
import numpy as np
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)
# Store NumPy arrays directly
print("=== Storing NumPy Arrays ===")
# Small array
small_array = np.array([1, 2, 3, 4, 5], dtype=np.float64)
region1 = target.create_and_write(small_array.tobytes(), persist=False)
print(f"Stored small array: shape={small_array.shape}, dtype={small_array.dtype}")
# Large 2D array
large_array = np.random.rand(1000, 1000).astype(np.float32)
region2 = target.create_and_write(large_array.tobytes(), persist=False)
print(f"Stored large array: shape={large_array.shape}, dtype={large_array.dtype}")
# 3D array
array_3d = np.random.randint(0, 255, (100, 100, 3), dtype=np.uint8)
region3 = target.create_and_write(array_3d.tobytes(), persist=False)
print(f"Stored 3D array: shape={array_3d.shape}, dtype={array_3d.dtype}")
# Retrieve and reconstruct arrays
print("\n=== Retrieving NumPy Arrays ===")
# Small array
data1 = target.read(region1, 0, small_array.nbytes)
reconstructed1 = np.frombuffer(data1, dtype=np.float64)
assert np.array_equal(small_array, reconstructed1)
print(f"Reconstructed small array: {reconstructed1}")
# Large array
data2 = target.read(region2, 0, large_array.nbytes)
reconstructed2 = np.frombuffer(data2, dtype=np.float32).reshape(1000, 1000)
assert np.array_equal(large_array, reconstructed2)
print(f"Reconstructed large array: shape={reconstructed2.shape}")
# 3D array
data3 = target.read(region3, 0, array_3d.nbytes)
reconstructed3 = np.frombuffer(data3, dtype=np.uint8).reshape(100, 100, 3)
assert np.array_equal(array_3d, reconstructed3)
print(f"Reconstructed 3D array: shape={reconstructed3.shape}")
# Efficient reading into preallocated arrays
print("\n=== Zero-Copy with Pre-allocated Buffers ===")
# Create array to read into
buffer_array = np.zeros_like(large_array)
target.read_into(region2, 0, buffer_array.data)
assert np.array_equal(large_array, buffer_array)
print("Read into preallocated array (zero-copy)")
# Batch storage of multiple arrays
print("\n=== Batch Storage ===")
arrays = [
np.random.rand(100, 100) for _ in range(10)
]
regions = []
for i, arr in enumerate(arrays):
region = target.create_and_write(arr.tobytes(), persist=False)
regions.append((region, arr.shape, arr.dtype))
print(f"Stored array {i}: shape={arr.shape}")
# Retrieve all arrays
print("\n=== Batch Retrieval ===")
retrieved_arrays = []
for i, (region, shape, dtype) in enumerate(regions):
size = np.prod(shape) * np.dtype(dtype).itemsize
data = target.read(region, 0, size)
arr = np.frombuffer(data, dtype=dtype).reshape(shape)
retrieved_arrays.append(arr)
assert np.array_equal(arrays[i], arr)
print(f"Retrieved array {i}: verified")
print("\nAll NumPy arrays successfully stored and retrieved!")
engine.finalize()
This is useful for:
Scientific computing workflows
Machine learning model storage
Large array datasets
HPC applications
Advanced Patterns
Batch Operations
Efficient batch processing:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)
# Batch create regions
print("=== Batch Creating Regions ===")
regions = []
create_requests = []
# Issue async creates
for i in range(10):
req = target.create_async(size=4096)
create_requests.append(req)
# Wait for all to complete
for req in create_requests:
region = req.wait()
regions.append(region)
print(f"Created {len(regions)} regions")
# Batch write to all regions
print("\n=== Batch Writing ===")
write_requests = []
for i, region in enumerate(regions):
data = f"Data for region {i}".encode()
req = target.write_async(region, 0, data, persist=False)
write_requests.append((req, data))
# Wait for all writes
for req, _ in write_requests:
req.wait()
print(f"Wrote to {len(regions)} regions")
# Batch read from all regions
print("\n=== Batch Reading ===")
read_requests = []
for region in regions:
buffer = bytearray(100) # Large enough buffer
req = target.read_async(region, 0, buffer)
read_requests.append((req, buffer))
# Wait and verify
for i, (req, buffer) in enumerate(read_requests):
req.wait()
expected = f"Data for region {i}".encode()
# Find actual data length
result = bytes(buffer[:len(expected)])
assert result == expected
print(f"Region {i} verified: {result}")
# Batch persist
print("\n=== Batch Persisting ===")
persist_requests = []
for region in regions:
req = target.persist_async(region, 0, 100)
persist_requests.append(req)
for req in persist_requests:
req.wait()
print(f"Persisted {len(regions)} regions")
# Batch erase
print("\n=== Batch Erasing ===")
for region in regions:
target.erase(region)
print(f"Erased {len(regions)} regions")
print("Batch operations completed!")
engine.finalize()
Data Migration Pattern
Migrate data between storage backends:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
# Setup source and destination providers
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
# Source: memory backend
source_provider = Provider(
engine=engine,
provider_id=1,
config={"target": {"type": "memory"}}
)
# Destination: persistent ABT-IO backend
dest_provider = Provider(
engine=engine,
provider_id=2,
config={
"target": {
"type": "abtio",
"config": {
"path": "/tmp/warabi_migrated.dat",
"create_if_missing": True
}
}
}
)
client = Client(engine=engine)
source = client.make_target_handle(str(engine.addr()), 1)
dest = client.make_target_handle(str(engine.addr()), 2)
print("=== Data Migration Pattern ===")
# Create data in source (memory)
print("\nCreating data in source (memory backend)...")
source_regions = []
for i in range(5):
data = f"Data item {i}".encode()
region = source.create_and_write(data, persist=False)
source_regions.append((region, data))
print(f"Created source region {i}: {region}")
# Migrate data to destination (persistent)
print("\nMigrating to destination (ABT-IO backend)...")
dest_regions = []
for i, (src_region, original_data) in enumerate(source_regions):
# Read from source
data = source.read(src_region, 0, len(original_data))
# Write to destination with persistence
dst_region = dest.create_and_write(data, persist=True)
dest_regions.append(dst_region)
print(f"Migrated region {i}: {src_region} -> {dst_region}")
# Verify migration
print("\nVerifying migration...")
for i, (dst_region, (_, original_data)) in enumerate(zip(dest_regions, source_regions)):
data = dest.read(dst_region, 0, len(original_data))
assert data == original_data
print(f"Region {i} verified: {data}")
# Clean up source after successful migration
print("\nCleaning up source...")
for src_region, _ in source_regions:
source.erase(src_region)
print("Source regions cleaned up")
print("\nMigration completed successfully!")
print(f"Data now persisted in destination backend")
engine.finalize()
Performance Tips
Use buffer protocol objects instead of strings for large data
Pre-allocate regions when sizes are known to avoid multiple allocations
Batch writes when possible to reduce network round-trips
Use async operations to overlap I/O with computation
Choose appropriate backends: - Memory backend for temporary data - ABT-IO backend for persistent storage - PMem backend for byte-addressable persistence
Control persistence explicitly for better performance: - Use
persist=Falsefor writes - Batch multiple writes - Callpersist()once at the endPreallocate buffers for read operations to avoid allocations
Integration Examples
With Bedrock
Using Warabi through Bedrock in Python:
"""
Example of using Warabi through Bedrock configuration.
This demonstrates how to configure and use Warabi when deployed
via Bedrock's configuration system.
"""
import sys
# Example Bedrock configuration for Warabi
bedrock_config = {
"libraries": [
"libwarabi-bedrock-module.so"
],
"providers": [
{
"name": "warabi_memory",
"type": "warabi",
"provider_id": 1,
"pool": "default_pool",
"config": {
"target": {
"type": "memory"
}
}
},
{
"name": "warabi_persistent",
"type": "warabi",
"provider_id": 2,
"pool": "io_pool",
"config": {
"target": {
"type": "abtio",
"config": {
"path": "/data/warabi_storage.dat",
"create_if_missing": True
}
}
}
}
]
}
# Client code to use Bedrock-deployed Warabi
import mochi.margo
from mochi.warabi.client import Client
# Connect to Bedrock server
engine = mochi.margo.Engine("na+sm", mochi.margo.client)
client = Client(engine=engine)
# Connect to providers
# (In real usage, you'd get the server address from Bedrock)
server_addr = sys.argv[1]
memory_target = client.make_target_handle(server_addr, 1)
persistent_target = client.make_target_handle(server_addr, 2)
# Use memory target for temporary data
temp_data = b"Temporary data"
temp_region = memory_target.create_and_write(temp_data, persist=False)
print(f"Stored temp data in memory: {temp_region}")
# Use persistent target for important data
important_data = b"Important persistent data"
persist_region = persistent_target.create_and_write(important_data, persist=True)
print(f"Stored important data persistently: {persist_region}")
print("\nBedrock integration example (configuration shown)")
Building Higher-Level APIs
Create application-specific abstractions:
import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client, Exception as WarabiException
from typing import Optional, Dict
import pickle
class BlobStore:
"""
Higher-level blob storage interface with automatic serialization.
Provides a more convenient API on top of Warabi.
"""
def __init__(self, target):
self.target = target
self._metadata = {} # In-memory metadata cache
def put(self, key: str, data: bytes, persist: bool = True) -> None:
"""Store data with a string key."""
region = self.target.create_and_write(data, persist=persist)
self._metadata[key] = {
'region': region,
'size': len(data)
}
def get(self, key: str) -> Optional[bytes]:
"""Retrieve data by key."""
if key not in self._metadata:
return None
meta = self._metadata[key]
try:
return self.target.read(meta['region'], 0, meta['size'])
except WarabiException:
return None
def delete(self, key: str) -> bool:
"""Delete data by key."""
if key not in self._metadata:
return False
meta = self._metadata[key]
try:
self.target.erase(meta['region'])
del self._metadata[key]
return True
except WarabiException:
return False
def exists(self, key: str) -> bool:
"""Check if key exists."""
return key in self._metadata
def keys(self):
"""Get all keys."""
return self._metadata.keys()
class ObjectStore(BlobStore):
"""
Object store with automatic Python object serialization.
"""
def put_object(self, key: str, obj, persist: bool = True) -> None:
"""Store a Python object (uses pickle)."""
data = pickle.dumps(obj)
self.put(key, data, persist=persist)
def get_object(self, key: str):
"""Retrieve and deserialize a Python object."""
data = self.get(key)
if data is None:
return None
return pickle.loads(data)
# Example usage
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)
print("=== BlobStore API ===")
blob_store = BlobStore(target)
# Store blobs with string keys
blob_store.put("file1", b"Contents of file 1")
blob_store.put("file2", b"Contents of file 2")
blob_store.put("image", b"\x89PNG\r\n\x1a\n...") # Binary data
print(f"Stored keys: {list(blob_store.keys())}")
# Retrieve blobs
data = blob_store.get("file1")
print(f"Retrieved file1: {data}")
# Check existence
if blob_store.exists("file2"):
print("file2 exists")
# Delete
blob_store.delete("file1")
print(f"After delete: {list(blob_store.keys())}")
print("\n=== ObjectStore API ===")
obj_store = ObjectStore(target)
# Store Python objects
obj_store.put_object("config", {
"host": "localhost",
"port": 8080,
"debug": True
})
obj_store.put_object("users", [
{"name": "Alice", "id": 1},
{"name": "Bob", "id": 2},
{"name": "Carol", "id": 3}
])
obj_store.put_object("matrix", [[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print(f"Stored objects: {list(obj_store.keys())}")
# Retrieve and use objects
config = obj_store.get_object("config")
print(f"Config: {config}")
print(f"Debug mode: {config['debug']}")
users = obj_store.get_object("users")
print(f"Users: {users}")
print(f"First user: {users[0]['name']}")
matrix = obj_store.get_object("matrix")
print(f"Matrix: {matrix}")
print("\n=== Advanced Pattern: Versioned Storage ===")
class VersionedStore(BlobStore):
"""Store with versioning support."""
def __init__(self, target):
super().__init__(target)
self._versions = {} # key -> [regions]
def put_version(self, key: str, data: bytes) -> int:
"""Store a new version of data."""
region = self.target.create_and_write(data, persist=True)
if key not in self._versions:
self._versions[key] = []
self._versions[key].append({
'region': region,
'size': len(data)
})
return len(self._versions[key]) - 1 # Return version number
def get_version(self, key: str, version: int = -1) -> Optional[bytes]:
"""Get a specific version (-1 for latest)."""
if key not in self._versions:
return None
versions = self._versions[key]
if not versions:
return None
try:
meta = versions[version]
return self.target.read(meta['region'], 0, meta['size'])
except (IndexError, WarabiException):
return None
def version_count(self, key: str) -> int:
"""Get number of versions for a key."""
return len(self._versions.get(key, []))
versioned = VersionedStore(target)
# Store multiple versions
v0 = versioned.put_version("document", b"Version 1 content")
v1 = versioned.put_version("document", b"Version 2 content")
v2 = versioned.put_version("document", b"Version 3 content")
print(f"Stored {versioned.version_count('document')} versions")
# Get specific versions
print(f"Version 0: {versioned.get_version('document', 0)}")
print(f"Version 1: {versioned.get_version('document', 1)}")
print(f"Latest: {versioned.get_version('document', -1)}")
print("\nHigher-level API examples completed!")
engine.finalize()
This allows you to:
Add domain-specific interfaces
Implement caching strategies
Add validation and type checking
Create convenient abstractions