State Proxy#
How to read and write persistent state from your actor handlers using standard Python file operations.
What is the state proxy?#
The state proxy gives your actor handlers persistent state via the filesystem — without StatefulSets, without volumes, and without installing an SDK.
Your handler code reads and writes files under /state/:
# Read state
with open("/state/model.bin", "rb") as f:
weights = f.read()
# Write state
with open("/state/cache/result.json", "w") as f:
f.write(json.dumps(result))
Behind the scenes, the Asya runtime intercepts these file operations and forwards them to a storage backend (S3, GCS, Redis) via a sidecar connector. Your handler sees a filesystem; the platform stores data in object storage or a key-value store. The runtime is a dumb translator — all intelligence (buffering, CAS, retries) lives in the connector sidecar.
Key benefits:
- No imports, no SDK — just standard Python open(), os.stat(), os.listdir()
- Actors remain stateless Deployments (no StatefulSets, no PVCs)
- Storage backend is swappable (S3, GCS, Redis) without changing handler code
- Conflict-free or CAS guarantees depending on connector choice
- Extended attributes (os.getxattr) expose backend metadata (URLs, ETags, TTL)
How it works#
When you configure stateProxy in your AsyncActor spec, the Crossplane composition:
- Adds a connector sidecar container to the pod (one per mount)
- Mounts a shared Unix socket volume between the runtime and connectors
- Sets
ASYA_STATE_PROXY_MOUNTSenv var on the runtime container
At startup, the runtime patches Python builtins (open, os.stat, os.listdir, etc.) to intercept file operations on configured mount paths and translate them to HTTP requests over Unix sockets to the connector sidecars.
Handler code
open("/state/weights/model.bin", "rb")
│
▼
Patched builtins.open (runtime hooks)
│ HTTP GET /keys/model.bin
▼
Connector sidecar (s3-buffered-lww)
│
▼
Storage backend (S3)
Configuration#
Add stateProxy entries to your AsyncActor spec:
apiVersion: asya.sh/v1alpha1
kind: AsyncActor
metadata:
name: model-inference
namespace: prod
spec:
actor: model-inference
stateProxy:
- name: weights # Unique mount identifier
mount:
path: /state/weights # Path in runtime container
writeMode: buffered # buffered (default) or passthrough
connector:
image: ghcr.io/deliveryhero/asya-state-proxy-s3-buffered-lww:v1.0.0
env:
- name: STATE_BUCKET
value: my-model-weights
- name: AWS_REGION
value: us-east-1
resources:
requests:
cpu: 50m
memory: 64Mi
Multiple mounts#
You can configure multiple mounts with different backends:
stateProxy:
- name: weights
mount:
path: /state/weights
connector:
image: ghcr.io/deliveryhero/asya-state-proxy-s3-buffered-lww:v1.0.0
env:
- name: STATE_BUCKET
value: my-model-weights
- name: cache
mount:
path: /state/cache
connector:
image: ghcr.io/deliveryhero/asya-state-proxy-redis-buffered-cas:v1.0.0
env:
- name: REDIS_URL
value: redis://redis.default:6379/0
Each mount is independent — you can mix S3 and Redis, or use different buckets for different data types.
Basic usage#
Reading state#
import json
async def my_handler(payload: dict):
# Check if state exists
import os
if os.path.exists("/state/cache/config.json"):
with open("/state/cache/config.json") as f:
config = json.load(f)
else:
config = {"default": True}
# Use the config
result = await process(payload, config)
payload["result"] = result
yield payload
Writing state#
import json
async def my_handler(payload: dict):
result = await compute(payload)
# Cache the result
with open("/state/cache/result.json", "w") as f:
json.dump(result, f)
payload["result"] = result
yield payload
Binary data#
async def my_handler(payload: dict):
# Read binary data
with open("/state/weights/model.bin", "rb") as f:
weights = f.read()
model = load_model_from_bytes(weights)
result = await model.predict(payload["input"])
payload["result"] = result
yield payload
Common patterns#
1. Caching model weights#
Load model weights from state on first call, cache in memory:
class Inference:
def __init__(self):
self.model = None # Lazy-loaded
async def process(self, payload: dict):
if self.model is None:
# Load weights from state proxy
with open("/state/weights/llama3.bin", "rb") as f:
weights = f.read()
self.model = load_model(weights)
result = await self.model.predict(payload["prompt"])
payload["response"] = result
yield payload
Why this works: The class handler is instantiated once per pod. self.model persists across messages, so weights are loaded only on the first call.
2. Storing conversation history#
Append to conversation state for multi-turn agents:
import json
import os
async def conversational_agent(payload: dict):
session_id = payload["session_id"]
state_path = f"/state/sessions/{session_id}.json"
# Load existing conversation
if os.path.exists(state_path):
with open(state_path) as f:
conversation = json.load(f)
else:
conversation = {"messages": []}
# Add user message
conversation["messages"].append({
"role": "user",
"content": payload["message"]
})
# Call LLM with full history
response = await llm.chat(conversation["messages"])
# Append assistant response
conversation["messages"].append({
"role": "assistant",
"content": response
})
# Persist updated conversation
with open(state_path, "w") as f:
json.dump(conversation, f)
payload["response"] = response
yield payload
3. Result caching with TTL#
Cache expensive computations with a timestamp:
import json
import os
from datetime import datetime, timedelta
async def cached_computation(payload: dict):
cache_key = payload["input_hash"]
cache_path = f"/state/cache/{cache_key}.json"
# Check cache
if os.path.exists(cache_path):
with open(cache_path) as f:
cached = json.load(f)
cached_at = datetime.fromisoformat(cached["timestamp"])
if datetime.utcnow() - cached_at < timedelta(hours=1):
# Cache hit
payload["result"] = cached["result"]
payload["cached"] = True
yield payload
return
# Cache miss — compute
result = await expensive_operation(payload["input"])
# Store in cache
with open(cache_path, "w") as f:
json.dump({
"result": result,
"timestamp": datetime.utcnow().isoformat()
}, f)
payload["result"] = result
payload["cached"] = False
yield payload
4. Listing and deleting state#
import os
async def cleanup_handler(payload: dict):
# List all cache files
cache_files = os.listdir("/state/cache/")
deleted = []
for filename in cache_files:
path = f"/state/cache/{filename}"
# Delete old files (simple heuristic: starts with "temp-")
if filename.startswith("temp-"):
os.remove(path)
deleted.append(filename)
payload["deleted_files"] = deleted
yield payload
LWW vs CAS guarantees#
Asya provides two conflict-resolution strategies via different connector images:
Last-Write-Wins (LWW)#
Image suffix: s3-buffered-lww, s3-passthrough, gcs-buffered-lww
Behavior: Writes always overwrite the existing object. No conflict detection.
Use when:
- State is written by a single actor instance (Deployment with replicas: 1)
- You don't care about lost updates (e.g., ephemeral cache)
- Simplicity is more important than correctness
Example: Model weights loaded at startup (read-only after initialization).
Check-And-Set (CAS)#
Image suffix: s3-buffered-cas, gcs-buffered-cas, redis-buffered-cas
Behavior: On write, the connector checks if the object has changed since the last read. If it has, the write fails with FileExistsError. CAS uses ETag-based conflict detection for S3, generation-based preconditions for GCS, and WATCH/MULTI/EXEC optimistic locking for Redis.
Use when: - Multiple actor replicas may write to the same key - You need to detect concurrent modifications - You can handle write conflicts in your handler code
Example: Conversation history updated by multiple replicas.
Handler code:
CAS retries are handled by the sidecar. On a CAS conflict (FileExistsError),
the sidecar requeues the message with exponential backoff, and the handler runs
again from scratch with a fresh read() that sees the latest value. In most
cases, your handler does not need explicit retry logic:
import json
import os
async def update_state_with_cas(payload: dict):
state_path = "/state/shared/counter.json"
# Read current state (connector caches the revision internally)
if os.path.exists(state_path):
with open(state_path) as f:
state = json.load(f)
else:
state = {"count": 0}
# Modify
state["count"] += 1
# Write — connector uses cached revision for conditional write.
# On conflict, raises FileExistsError (HTTP 409).
# The sidecar catches this, nacks the message, and it is retried from scratch.
with open(state_path, "w") as f:
json.dump(state, f)
payload["new_count"] = state["count"]
yield payload
CAS guarantees: With s3-buffered-cas, gcs-buffered-cas, or redis-buffered-cas, concurrent writes to the same key are detected — one succeeds, others raise FileExistsError and the sidecar requeues the message.
LWW behavior: With s3-buffered-lww, the same code would allow concurrent writes — both would succeed, but one would silently overwrite the other.
Write modes#
Write mode controls how data is buffered before being sent to the storage backend.
buffered#
writeMode: buffered (default)
Data is collected in memory (or spilled to disk above 4 MiB) and sent as a single PUT when the file is closed.
Pros:
- Supports seek() and tell() before close
- Connector receives Content-Length upfront (required by some S3 SDKs)
Cons: - Not suitable for very large files (spills to disk)
Use for: Small-to-medium files (model configs, conversation history, cache entries).
passthrough#
writeMode: passthrough
Data is sent to the connector immediately on each write() call using HTTP chunked transfer encoding.
Pros: - No buffering — suitable for very large files - Streaming writes (e.g., model weights downloaded from external source)
Cons:
- Does not support seek() or tell()
Use for: Large files (multi-GB model weights, video files, large datasets).
Configuration:
stateProxy:
- name: weights
mount:
path: /state/weights
writeMode: passthrough # Stream large files
connector:
image: ghcr.io/deliveryhero/asya-state-proxy-s3-passthrough:v1.0.0
env:
- name: STATE_BUCKET
value: my-large-weights
Library compatibility#
The state proxy works with pure-Python libraries that use builtins.open(). Libraries with C extensions that perform system-level I/O (e.g., pandas parquet, PyTorch, NumPy) require a workaround.
Works out of the box#
import json, csv, pickle, yaml
# json
with open("/state/meta/config.json") as f:
config = json.load(f)
# csv
with open("/state/meta/data.csv") as f:
rows = list(csv.reader(f))
# pickle
with open("/state/meta/model.pkl", "rb") as f:
model = pickle.load(f)
# yaml (PyYAML)
with open("/state/meta/spec.yaml") as f:
spec = yaml.safe_load(f)
Requires BytesIO workaround#
Libraries with C extensions (pandas parquet, PyTorch, NumPy) bypass Python-level open():
import io
import pandas as pd
import torch
import numpy as np
# pandas parquet
with open("/state/meta/data.parquet", "rb") as f:
df = pd.read_parquet(io.BytesIO(f.read()))
# torch
with open("/state/weights/model.pt", "rb") as f:
model = torch.load(io.BytesIO(f.read()))
# numpy
with open("/state/meta/array.npy", "rb") as f:
arr = np.load(io.BytesIO(f.read()))
Why: These libraries use C-level system calls (open(2), read(2)) that bypass the Python builtin patches. Reading through open() first, then wrapping in io.BytesIO, forces the data through the patched path.
Error handling#
HTTP error responses from the connector are mapped to standard Python exceptions:
import os
async def my_handler(payload: dict):
try:
with open("/state/cache/result.json", "rb") as f:
cached = json.load(f)
except FileNotFoundError:
# Cache miss
cached = None
except PermissionError:
# Connector denied access (e.g., IAM permissions)
raise
except FileExistsError:
# CAS conflict (s3-buffered-cas, redis-buffered-cas)
# Retry or fail
raise
| Exception | HTTP status | Meaning |
|---|---|---|
FileNotFoundError |
404 | Key does not exist |
PermissionError |
403 | Connector lacks access (IAM, Redis ACL) |
FileExistsError |
409 | CAS conflict (key modified since read) |
OSError |
500 | Backend error (S3 throttling, Redis down) |
ConnectionError |
503 | Connector unavailable |
TimeoutError |
504 | Backend timeout |
Limitations#
The state proxy patches Python builtins at the interpreter level. This intercepts Python-level file operations but cannot intercept C-level system calls.
Not patched#
| API | Workaround |
|---|---|
os.open() |
Use builtins.open() instead |
os.rename() |
Read + write + delete manually |
os.walk() |
Use os.listdir() recursively |
shutil.copy2() |
Fails on metadata; use shutil.copyfileobj() |
Note: pathlib.Path.open(), pathlib.Path.read_bytes(), and
pathlib.Path.exists() all delegate to builtins.open or os.stat internally,
so they work transparently with the state proxy.
Filesystem metadata#
os.stat() returns synthetic metadata:
| Field | Value | Real? |
|---|---|---|
st_size |
Actual content size | ✅ |
st_mode |
0o644 (file) / 0o755 (dir) |
❌ Synthetic |
st_mtime |
0 |
❌ Not available |
Libraries that depend on modification times (e.g., caching based on mtime) will not work correctly.
Directory semantics#
The state proxy is a flat key-value store, not a real filesystem. Paths like /state/meta/subdir/file.txt are stored as the key subdir/file.txt — there is no actual subdir/ directory.
os.makedirs()is a no-op for mount pathsos.listdir()uses prefix-based listing to simulate directory entries
Extended attributes (xattr)#
Connectors expose backend-specific metadata through Python's os.getxattr,
os.setxattr, and os.listxattr APIs. The runtime intercepts these calls on
state mount paths and translates them to connector requests.
All Asya xattrs use the user.asya.{attr} naming convention:
import os
async def handler(payload):
# Write a file
with open("/state/media/report.pdf", "wb") as f:
f.write(generate_pdf(payload))
# Discover available attributes
attrs = os.listxattr("/state/media/report.pdf")
# ["user.asya.url", "user.asya.presigned_url", "user.asya.etag", ...]
# Read the external URL (S3 URI or GCS URI)
url = os.getxattr("/state/media/report.pdf", "user.asya.url")
# b"s3://my-bucket/prefix/media/report.pdf"
# Read a presigned URL for external access
presigned = os.getxattr("/state/media/report.pdf", "user.asya.presigned_url")
# Set content type (writable attribute)
os.setxattr("/state/media/report.pdf", "user.asya.content_type",
b"application/pdf")
return payload
Available attributes#
| Attribute | S3 | GCS | Redis | Access |
|---|---|---|---|---|
url |
s3://... |
gs://... |
- | Read |
presigned_url (S3) / signed_url (GCS) |
HTTPS URL | HTTPS URL | - | Read |
etag / generation |
Content hash | Generation number | - | Read |
content_type |
MIME type | MIME type | - | Read/Write |
version / metageneration |
S3 version ID | Metageneration | - | Read |
storage_class |
Storage tier | Storage tier | - | Read |
ttl |
- | - | Seconds to expiry | Read/Write |
Note: The presigned URL attribute name differs between backends: use
user.asya.presigned_url for S3 connectors and user.asya.signed_url for GCS
connectors.
For Redis, set TTL after writing a key:
import os
async def handler(payload):
with open("/state/cache/temp.json", "w") as f:
f.write(json.dumps(data))
# Set TTL to 1 hour
os.setxattr("/state/cache/temp.json", "user.asya.ttl", b"3600")
return payload
See also#
| Topic | Document |
|---|---|
| State proxy architecture | docs/reference/components/core-state-proxy.md |
| Available connector images | docs/reference/components/core-state-proxy.md |
| Crossplane composition rendering | docs/reference/components/core-state-proxy.md |
Platform configuration: To configure state proxy storage backends, see setup/guide-state-proxy.md.