Python Bindings

Warabi provides comprehensive Python bindings for both server and client libraries, enabling blob storage functionality in Python applications.

Installation

To use Warabi’s Python bindings, install Warabi with the +python variant in Spack:

spack install mochi-warabi+python

The Python bindings are available in the mochi.warabi package.

Quick Start

Here’s a simple example of using Warabi from Python:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client

# Create a Margo engine
engine = mochi.margo.Engine("na+sm", mochi.margo.server)

# Start a Warabi provider with memory backend
provider = Provider(
    engine=engine,
    provider_id=42,
    config={
        "target": {
            "type": "memory"
        }
    }
)

# Create a client
client = Client(engine=engine)

# Get a target handle
target = client.make_target_handle(
    address=str(engine.addr()),
    provider_id=42
)

# Create a region and write data
data = b"Hello, Warabi!"
region = target.create_and_write(data, persist=True)
print(f"Created region: {region}")

# Read the data back
result = target.read(region, offset=0, size=len(data))
print(f"Read data: {result}")

# Verify
assert result == data
print("Success!")

# Cleanup
engine.finalize()

This example demonstrates:

  1. Creating a Margo engine

  2. Starting a Warabi provider with memory backend

  3. Creating a client and target handle

  4. Creating regions and writing/reading data

Server API

The mochi.warabi.server module provides the Provider class for creating Warabi providers:

Basic Provider Creation

import mochi.margo
from mochi.warabi.server import Provider

# Create a Margo engine
engine = mochi.margo.Engine("na+sm", mochi.margo.server)

# Create a Warabi provider with memory backend
provider = Provider(
    engine=engine,
    provider_id=42,
    config={
        "target": {
            "type": "memory"
        }
    }
)

print(f"Warabi provider started at {engine.addr()}")
print(f"Provider ID: 42")
print("Provider is now accepting requests...")

# In a real application, you would wait here
# For this example, we'll just finalize
engine.finalize()

Backend Configuration

Warabi supports multiple storage backends:

import mochi.margo
from mochi.warabi.server import Provider

engine = mochi.margo.Engine("na+sm", mochi.margo.server)

# Memory backend (volatile, fast)
memory_provider = Provider(
    engine=engine,
    provider_id=1,
    config={
        "target": {
            "type": "memory"
        }
    }
)
print("Memory backend provider created (ID: 1)")

# ABT-IO file-based backend (persistent)
abtio_provider = Provider(
    engine=engine,
    provider_id=2,
    config={
        "target": {
            "type": "abtio",
            "config": {
                "path": "/tmp/warabi_storage.dat",
                "create_if_missing": True
            }
        }
    }
)
print("ABT-IO backend provider created (ID: 2)")

# Persistent memory backend (if available)
try:
    pmem_provider = Provider(
        engine=engine,
        provider_id=3,
        config={
            "target": {
                "type": "pmem",
                "config": {
                    "path": "/mnt/pmem/warabi_data",
                    "create_if_missing_with_size": 1073741824  # 1GB
                }
            }
        }
    )
    print("PMem backend provider created (ID: 3)")
except Exception as e:
    print(f"PMem backend not available: {e}")

engine.finalize()

Available backends:

  • memory: Fast in-memory storage (volatile)

  • pmem: Persistent memory storage

  • abtio: File-based storage using ABT-IO

Client API

The mochi.warabi.client module provides the Client and TargetHandle classes for interacting with Warabi providers.

Basic Operations

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client

engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
                   config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)

# Create a region
region = target.create(size=4096)
print(f"Created region: {region}")

# Write data to the region
data = b"Hello, Warabi from Python!"
target.write(region, offset=0, data=data, persist=False)
print(f"Wrote {len(data)} bytes")

# Read data back
result = target.read(region, offset=0, size=len(data))
print(f"Read back: {result}")
assert result == data

# Create and write in one operation
data2 = b"Combined operation!"
region2 = target.create_and_write(data2, persist=True)
print(f"Created and wrote region: {region2}")

# Verify
result2 = target.read(region2, offset=0, size=len(data2))
assert result2 == data2

# Persist data explicitly
target.persist(region, offset=0, size=len(data))
print("Data persisted")

# Erase a region
target.erase(region)
print("Region erased")

engine.finalize()

Key operations:

  • create(size): Create a new region

  • write(region, offset, data, persist=False): Write data to a region

  • read(region, offset, size): Read data from a region

  • create_and_write(data, persist=False): Create region and write in one operation

  • erase(region): Delete a region

  • persist(region, offset, size): Ensure data is persisted to storage

Working with Regions

Regions are logical containers for blob data:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client

engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
                   config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)

# Create a 1MB region
region = target.create(size=1024 * 1024)
print(f"Created 1MB region: {region}")

# Write data at different offsets
chunk1 = b"First chunk of data"
chunk2 = b"Second chunk"
chunk3 = b"Third chunk at the end"

target.write(region, offset=0, data=chunk1, persist=False)
target.write(region, offset=1000, data=chunk2, persist=False)
target.write(region, offset=1000000, data=chunk3, persist=False)

print("Wrote data at offsets: 0, 1000, 1000000")

# Read back specific chunks
result1 = target.read(region, offset=0, size=len(chunk1))
result2 = target.read(region, offset=1000, size=len(chunk2))
result3 = target.read(region, offset=1000000, size=len(chunk3))

assert result1 == chunk1
assert result2 == chunk2
assert result3 == chunk3
print("All chunks verified!")

# Persist the entire region
target.persist(region, offset=0, size=1024 * 1024)
print("Region persisted")

# Create multiple regions for different purposes
metadata_region = target.create(size=4096)  # 4KB for metadata
data_region = target.create(size=10 * 1024 * 1024)  # 10MB for data

print(f"Metadata region: {metadata_region}")
print(f"Data region: {data_region}")

# Clean up
target.erase(region)
target.erase(metadata_region)
target.erase(data_region)

engine.finalize()

Regions support:

  • Fixed-size allocation

  • Offset-based read/write operations

  • Persistence control

  • Efficient data management

Buffer Protocol Support

Like Yokan, Warabi supports Python’s buffer protocol for efficient zero-copy operations:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client

engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
                   config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)

# Using bytes (simple but creates copies)
data_bytes = b"Hello from bytes"
region1 = target.create_and_write(data_bytes)
print("Wrote using bytes")

# Using bytearray (more efficient for large data)
data_bytearray = bytearray(b"Hello from bytearray")
region2 = target.create_and_write(data_bytearray)
print("Wrote using bytearray")

# Reading into pre-allocated buffer (zero-copy)
buffer = bytearray(len(data_bytes))
target.read_into(region1, offset=0, buffer=buffer)
print(f"Read into buffer: {bytes(buffer)}")
assert bytes(buffer) == data_bytes

# Using memoryview for large data
large_data = bytearray(10 * 1024 * 1024)  # 10MB
large_data[0:5] = b"START"
large_data[-5:] = b"END!!"

region3 = target.create_and_write(memoryview(large_data))
print(f"Wrote 10MB using memoryview (zero-copy)")

# Read back into preallocated buffer
read_buffer = bytearray(10 * 1024 * 1024)
target.read_into(region3, offset=0, buffer=read_buffer)

# Verify
assert read_buffer[0:5] == b"START"
assert read_buffer[-5:] == b"END!!"
print("Large data verified!")

# Partial reads with buffer
partial_buffer = bytearray(5)
target.read_into(region3, offset=0, buffer=partial_buffer)
assert bytes(partial_buffer) == b"START"
print("Partial read successful")

engine.finalize()

Using buffer protocol objects (bytearray, memoryview, NumPy arrays) avoids memory copies and improves performance for large data transfers.

Asynchronous Operations

Warabi provides async operations for non-blocking I/O:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client

engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
                   config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)

# Asynchronous create
print("Creating region asynchronously...")
create_req = target.create_async(size=4096)
# Do other work while region is being created
print("Doing other work...")
# Wait for creation to complete
region = create_req.wait()
print(f"Region created: {region}")

# Asynchronous write
data = b"Async write data"
print("Writing asynchronously...")
write_req = target.write_async(region, offset=0, data=data, persist=False)
# Do other work
print("Doing other work while writing...")
# Wait for write to complete
write_req.wait()
print("Write completed")

# Asynchronous read
buffer = bytearray(len(data))
print("Reading asynchronously...")
read_req = target.read_async(region, offset=0, buffer=buffer)
# Do other work
print("Doing other work while reading...")
# Wait for read to complete
read_req.wait()
print(f"Read completed: {bytes(buffer)}")
assert bytes(buffer) == data

# Asynchronous persist
print("Persisting asynchronously...")
persist_req = target.persist_async(region, offset=0, size=len(data))
# Do other work
print("Doing other work while persisting...")
# Wait for persist to complete
persist_req.wait()
print("Persist completed")

# Multiple concurrent async operations
print("\nConcurrent async operations:")
regions = []
requests = []

# Issue multiple async creates
for i in range(5):
    req = target.create_async(size=1024)
    requests.append(req)

# Wait for all to complete
for i, req in enumerate(requests):
    region = req.wait()
    regions.append(region)
    print(f"Region {i} created: {region}")

# Write to all regions concurrently
write_requests = []
for i, region in enumerate(regions):
    data = f"Data for region {i}".encode()
    req = target.write_async(region, offset=0, data=data, persist=False)
    write_requests.append(req)

# Wait for all writes
for req in write_requests:
    req.wait()
print("All writes completed")

# Test completion without waiting
print("\nTesting completion status:")
req = target.create_async(size=512)
print(f"Requested completed: {req.completed()}")

# IMPORTANT: the following code will may loop indefinitely
# if the Warabi provider and the progress loop are located
# in the same xstream as the Python code, as the loop never
# yields to them.

# while not req.completed():
#    print("Not yet complete, doing other work...")
#    # In real code, do actual work here
#    pass
region = req.wait()  # Get the result
print(f"Async create tested and completed: {region}")

engine.finalize()

Async operations return AsyncRequest or AsyncCreateRequest objects that can be used to:

  • Wait for completion with wait()

  • Test completion with completed()

Persistence Control

Control when data is persisted to storage:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client

engine = mochi.margo.Engine("na+sm", mochi.margo.server)

# Use ABT-IO backend for persistent storage
provider = Provider(
    engine=engine,
    provider_id=42,
    config={
        "target": {
            "type": "abtio",
            "config": {
                "path": "/tmp/warabi_persist_demo.dat",
                "create_if_missing": True
            }
        }
    }
)

client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)

# Strategy 1: Immediate persistence
print("=== Immediate Persistence ===")
data1 = b"Immediately persisted data"
region1 = target.create_and_write(data1, persist=True)
print("Data written and persisted immediately")

# Strategy 2: Explicit persistence
print("\n=== Explicit Persistence ===")
region2 = target.create(size=4096)
data2 = b"Write first, persist later"
target.write(region2, offset=0, data=data2, persist=False)
print("Data written (not yet persisted)")
# Do more writes if needed
target.persist(region2, offset=0, size=len(data2))
print("Data now persisted explicitly")

# Strategy 3: Batch persistence
print("\n=== Batch Persistence ===")
region3 = target.create(size=10240)

# Multiple writes without persistence
chunks = [
    (0, b"Chunk 1"),
    (100, b"Chunk 2"),
    (200, b"Chunk 3"),
    (300, b"Chunk 4"),
]

for offset, data in chunks:
    target.write(region3, offset=offset, data=data, persist=False)
    print(f"Wrote chunk at offset {offset} (not persisted)")

# Persist all at once
target.persist(region3, offset=0, size=400)
print("All chunks persisted in one operation")

# Strategy 4: Asynchronous persistence
print("\n=== Asynchronous Persistence ===")
region4 = target.create(size=4096)
data4 = b"Async persisted data"

# Write without persistence
target.write(region4, offset=0, data=data4, persist=False)
print("Data written (not persisted)")

# Async persist (non-blocking)
req = target.persist_async(region4, offset=0, size=len(data4))
print("Async persist issued, doing other work...")
# Do other work here
req.wait()
print("Async persist completed")

# Verification: Read back all data
print("\n=== Verification ===")
result1 = target.read(region1, 0, len(data1))
assert result1 == data1
print("Region 1 verified")

result2 = target.read(region2, 0, len(data2))
assert result2 == data2
print("Region 2 verified")

for offset, expected in chunks:
    result = target.read(region3, offset, len(expected))
    assert result == expected
print("Region 3 (all chunks) verified")

result4 = target.read(region4, 0, len(data4))
assert result4 == data4
print("Region 4 verified")

print("\nAll data successfully persisted and verified!")

engine.finalize()

Persistence strategies:

  • Immediate: persist=True in write operations

  • Explicit: Call persist() after writes

  • Batch: Group multiple writes, then persist once

  • Async: Use persist_async() for non-blocking persistence

Working with NumPy Arrays

Warabi integrates seamlessly with NumPy:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client
import numpy as np

engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
                   config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)

# Store NumPy arrays directly
print("=== Storing NumPy Arrays ===")

# Small array
small_array = np.array([1, 2, 3, 4, 5], dtype=np.float64)
region1 = target.create_and_write(small_array.tobytes(), persist=False)
print(f"Stored small array: shape={small_array.shape}, dtype={small_array.dtype}")

# Large 2D array
large_array = np.random.rand(1000, 1000).astype(np.float32)
region2 = target.create_and_write(large_array.tobytes(), persist=False)
print(f"Stored large array: shape={large_array.shape}, dtype={large_array.dtype}")

# 3D array
array_3d = np.random.randint(0, 255, (100, 100, 3), dtype=np.uint8)
region3 = target.create_and_write(array_3d.tobytes(), persist=False)
print(f"Stored 3D array: shape={array_3d.shape}, dtype={array_3d.dtype}")

# Retrieve and reconstruct arrays
print("\n=== Retrieving NumPy Arrays ===")

# Small array
data1 = target.read(region1, 0, small_array.nbytes)
reconstructed1 = np.frombuffer(data1, dtype=np.float64)
assert np.array_equal(small_array, reconstructed1)
print(f"Reconstructed small array: {reconstructed1}")

# Large array
data2 = target.read(region2, 0, large_array.nbytes)
reconstructed2 = np.frombuffer(data2, dtype=np.float32).reshape(1000, 1000)
assert np.array_equal(large_array, reconstructed2)
print(f"Reconstructed large array: shape={reconstructed2.shape}")

# 3D array
data3 = target.read(region3, 0, array_3d.nbytes)
reconstructed3 = np.frombuffer(data3, dtype=np.uint8).reshape(100, 100, 3)
assert np.array_equal(array_3d, reconstructed3)
print(f"Reconstructed 3D array: shape={reconstructed3.shape}")

# Efficient reading into preallocated arrays
print("\n=== Zero-Copy with Pre-allocated Buffers ===")

# Create array to read into
buffer_array = np.zeros_like(large_array)
target.read_into(region2, 0, buffer_array.data)
assert np.array_equal(large_array, buffer_array)
print("Read into preallocated array (zero-copy)")

# Batch storage of multiple arrays
print("\n=== Batch Storage ===")

arrays = [
    np.random.rand(100, 100) for _ in range(10)
]

regions = []
for i, arr in enumerate(arrays):
    region = target.create_and_write(arr.tobytes(), persist=False)
    regions.append((region, arr.shape, arr.dtype))
    print(f"Stored array {i}: shape={arr.shape}")

# Retrieve all arrays
print("\n=== Batch Retrieval ===")
retrieved_arrays = []
for i, (region, shape, dtype) in enumerate(regions):
    size = np.prod(shape) * np.dtype(dtype).itemsize
    data = target.read(region, 0, size)
    arr = np.frombuffer(data, dtype=dtype).reshape(shape)
    retrieved_arrays.append(arr)
    assert np.array_equal(arrays[i], arr)
    print(f"Retrieved array {i}: verified")

print("\nAll NumPy arrays successfully stored and retrieved!")

engine.finalize()

This is useful for:

  • Scientific computing workflows

  • Machine learning model storage

  • Large array datasets

  • HPC applications

Advanced Patterns

Batch Operations

Efficient batch processing:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client

engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
                   config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)

# Batch create regions
print("=== Batch Creating Regions ===")
regions = []
create_requests = []

# Issue async creates
for i in range(10):
    req = target.create_async(size=4096)
    create_requests.append(req)

# Wait for all to complete
for req in create_requests:
    region = req.wait()
    regions.append(region)

print(f"Created {len(regions)} regions")

# Batch write to all regions
print("\n=== Batch Writing ===")
write_requests = []

for i, region in enumerate(regions):
    data = f"Data for region {i}".encode()
    req = target.write_async(region, 0, data, persist=False)
    write_requests.append((req, data))

# Wait for all writes
for req, _ in write_requests:
    req.wait()

print(f"Wrote to {len(regions)} regions")

# Batch read from all regions
print("\n=== Batch Reading ===")
read_requests = []

for region in regions:
    buffer = bytearray(100)  # Large enough buffer
    req = target.read_async(region, 0, buffer)
    read_requests.append((req, buffer))

# Wait and verify
for i, (req, buffer) in enumerate(read_requests):
    req.wait()
    expected = f"Data for region {i}".encode()
    # Find actual data length
    result = bytes(buffer[:len(expected)])
    assert result == expected
    print(f"Region {i} verified: {result}")

# Batch persist
print("\n=== Batch Persisting ===")
persist_requests = []

for region in regions:
    req = target.persist_async(region, 0, 100)
    persist_requests.append(req)

for req in persist_requests:
    req.wait()

print(f"Persisted {len(regions)} regions")

# Batch erase
print("\n=== Batch Erasing ===")
for region in regions:
    target.erase(region)

print(f"Erased {len(regions)} regions")
print("Batch operations completed!")

engine.finalize()

Data Migration Pattern

Migrate data between storage backends:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client

# Setup source and destination providers
engine = mochi.margo.Engine("na+sm", mochi.margo.server)

# Source: memory backend
source_provider = Provider(
    engine=engine,
    provider_id=1,
    config={"target": {"type": "memory"}}
)

# Destination: persistent ABT-IO backend
dest_provider = Provider(
    engine=engine,
    provider_id=2,
    config={
        "target": {
            "type": "abtio",
            "config": {
                "path": "/tmp/warabi_migrated.dat",
                "create_if_missing": True
            }
        }
    }
)

client = Client(engine=engine)
source = client.make_target_handle(str(engine.addr()), 1)
dest = client.make_target_handle(str(engine.addr()), 2)

print("=== Data Migration Pattern ===")

# Create data in source (memory)
print("\nCreating data in source (memory backend)...")
source_regions = []
for i in range(5):
    data = f"Data item {i}".encode()
    region = source.create_and_write(data, persist=False)
    source_regions.append((region, data))
    print(f"Created source region {i}: {region}")

# Migrate data to destination (persistent)
print("\nMigrating to destination (ABT-IO backend)...")
dest_regions = []

for i, (src_region, original_data) in enumerate(source_regions):
    # Read from source
    data = source.read(src_region, 0, len(original_data))

    # Write to destination with persistence
    dst_region = dest.create_and_write(data, persist=True)
    dest_regions.append(dst_region)

    print(f"Migrated region {i}: {src_region} -> {dst_region}")

# Verify migration
print("\nVerifying migration...")
for i, (dst_region, (_, original_data)) in enumerate(zip(dest_regions, source_regions)):
    data = dest.read(dst_region, 0, len(original_data))
    assert data == original_data
    print(f"Region {i} verified: {data}")

# Clean up source after successful migration
print("\nCleaning up source...")
for src_region, _ in source_regions:
    source.erase(src_region)
print("Source regions cleaned up")

print("\nMigration completed successfully!")
print(f"Data now persisted in destination backend")

engine.finalize()

Performance Tips

  1. Use buffer protocol objects instead of strings for large data

  2. Pre-allocate regions when sizes are known to avoid multiple allocations

  3. Batch writes when possible to reduce network round-trips

  4. Use async operations to overlap I/O with computation

  5. Choose appropriate backends: - Memory backend for temporary data - ABT-IO backend for persistent storage - PMem backend for byte-addressable persistence

  6. Control persistence explicitly for better performance: - Use persist=False for writes - Batch multiple writes - Call persist() once at the end

  7. Preallocate buffers for read operations to avoid allocations

Integration Examples

With Bedrock

Using Warabi through Bedrock in Python:

"""
Example of using Warabi through Bedrock configuration.

This demonstrates how to configure and use Warabi when deployed
via Bedrock's configuration system.
"""
import sys

# Example Bedrock configuration for Warabi
bedrock_config = {
    "libraries": [
        "libwarabi-bedrock-module.so"
    ],
    "providers": [
        {
            "name": "warabi_memory",
            "type": "warabi",
            "provider_id": 1,
            "pool": "default_pool",
            "config": {
                "target": {
                    "type": "memory"
                }
            }
        },
        {
            "name": "warabi_persistent",
            "type": "warabi",
            "provider_id": 2,
            "pool": "io_pool",
            "config": {
                "target": {
                    "type": "abtio",
                    "config": {
                        "path": "/data/warabi_storage.dat",
                        "create_if_missing": True
                    }
                }
            }
        }
    ]
}

# Client code to use Bedrock-deployed Warabi
import mochi.margo
from mochi.warabi.client import Client

# Connect to Bedrock server
engine = mochi.margo.Engine("na+sm", mochi.margo.client)
client = Client(engine=engine)

# Connect to providers
# (In real usage, you'd get the server address from Bedrock)
server_addr = sys.argv[1]

memory_target = client.make_target_handle(server_addr, 1)
persistent_target = client.make_target_handle(server_addr, 2)

# Use memory target for temporary data
temp_data = b"Temporary data"
temp_region = memory_target.create_and_write(temp_data, persist=False)
print(f"Stored temp data in memory: {temp_region}")

# Use persistent target for important data
important_data = b"Important persistent data"
persist_region = persistent_target.create_and_write(important_data, persist=True)
print(f"Stored important data persistently: {persist_region}")

print("\nBedrock integration example (configuration shown)")

Building Higher-Level APIs

Create application-specific abstractions:

import mochi.margo
from mochi.warabi.server import Provider
from mochi.warabi.client import Client, Exception as WarabiException
from typing import Optional, Dict
import pickle

class BlobStore:
    """
    Higher-level blob storage interface with automatic serialization.
    Provides a more convenient API on top of Warabi.
    """

    def __init__(self, target):
        self.target = target
        self._metadata = {}  # In-memory metadata cache

    def put(self, key: str, data: bytes, persist: bool = True) -> None:
        """Store data with a string key."""
        region = self.target.create_and_write(data, persist=persist)
        self._metadata[key] = {
            'region': region,
            'size': len(data)
        }

    def get(self, key: str) -> Optional[bytes]:
        """Retrieve data by key."""
        if key not in self._metadata:
            return None

        meta = self._metadata[key]
        try:
            return self.target.read(meta['region'], 0, meta['size'])
        except WarabiException:
            return None

    def delete(self, key: str) -> bool:
        """Delete data by key."""
        if key not in self._metadata:
            return False

        meta = self._metadata[key]
        try:
            self.target.erase(meta['region'])
            del self._metadata[key]
            return True
        except WarabiException:
            return False

    def exists(self, key: str) -> bool:
        """Check if key exists."""
        return key in self._metadata

    def keys(self):
        """Get all keys."""
        return self._metadata.keys()


class ObjectStore(BlobStore):
    """
    Object store with automatic Python object serialization.
    """

    def put_object(self, key: str, obj, persist: bool = True) -> None:
        """Store a Python object (uses pickle)."""
        data = pickle.dumps(obj)
        self.put(key, data, persist=persist)

    def get_object(self, key: str):
        """Retrieve and deserialize a Python object."""
        data = self.get(key)
        if data is None:
            return None
        return pickle.loads(data)


# Example usage
engine = mochi.margo.Engine("na+sm", mochi.margo.server)
provider = Provider(engine=engine, provider_id=42,
                   config={"target": {"type": "memory"}})
client = Client(engine=engine)
target = client.make_target_handle(str(engine.addr()), 42)

print("=== BlobStore API ===")
blob_store = BlobStore(target)

# Store blobs with string keys
blob_store.put("file1", b"Contents of file 1")
blob_store.put("file2", b"Contents of file 2")
blob_store.put("image", b"\x89PNG\r\n\x1a\n...")  # Binary data

print(f"Stored keys: {list(blob_store.keys())}")

# Retrieve blobs
data = blob_store.get("file1")
print(f"Retrieved file1: {data}")

# Check existence
if blob_store.exists("file2"):
    print("file2 exists")

# Delete
blob_store.delete("file1")
print(f"After delete: {list(blob_store.keys())}")


print("\n=== ObjectStore API ===")
obj_store = ObjectStore(target)

# Store Python objects
obj_store.put_object("config", {
    "host": "localhost",
    "port": 8080,
    "debug": True
})

obj_store.put_object("users", [
    {"name": "Alice", "id": 1},
    {"name": "Bob", "id": 2},
    {"name": "Carol", "id": 3}
])

obj_store.put_object("matrix", [[1, 2, 3], [4, 5, 6], [7, 8, 9]])

print(f"Stored objects: {list(obj_store.keys())}")

# Retrieve and use objects
config = obj_store.get_object("config")
print(f"Config: {config}")
print(f"Debug mode: {config['debug']}")

users = obj_store.get_object("users")
print(f"Users: {users}")
print(f"First user: {users[0]['name']}")

matrix = obj_store.get_object("matrix")
print(f"Matrix: {matrix}")


print("\n=== Advanced Pattern: Versioned Storage ===")

class VersionedStore(BlobStore):
    """Store with versioning support."""

    def __init__(self, target):
        super().__init__(target)
        self._versions = {}  # key -> [regions]

    def put_version(self, key: str, data: bytes) -> int:
        """Store a new version of data."""
        region = self.target.create_and_write(data, persist=True)

        if key not in self._versions:
            self._versions[key] = []

        self._versions[key].append({
            'region': region,
            'size': len(data)
        })

        return len(self._versions[key]) - 1  # Return version number

    def get_version(self, key: str, version: int = -1) -> Optional[bytes]:
        """Get a specific version (-1 for latest)."""
        if key not in self._versions:
            return None

        versions = self._versions[key]
        if not versions:
            return None

        try:
            meta = versions[version]
            return self.target.read(meta['region'], 0, meta['size'])
        except (IndexError, WarabiException):
            return None

    def version_count(self, key: str) -> int:
        """Get number of versions for a key."""
        return len(self._versions.get(key, []))


versioned = VersionedStore(target)

# Store multiple versions
v0 = versioned.put_version("document", b"Version 1 content")
v1 = versioned.put_version("document", b"Version 2 content")
v2 = versioned.put_version("document", b"Version 3 content")

print(f"Stored {versioned.version_count('document')} versions")

# Get specific versions
print(f"Version 0: {versioned.get_version('document', 0)}")
print(f"Version 1: {versioned.get_version('document', 1)}")
print(f"Latest: {versioned.get_version('document', -1)}")

print("\nHigher-level API examples completed!")

engine.finalize()

This allows you to:

  • Add domain-specific interfaces

  • Implement caching strategies

  • Add validation and type checking

  • Create convenient abstractions