State Proxy#
Overview#
The state proxy gives actors persistent state access via standard Python file
operations (open, os.stat, os.listdir, os.remove). Handlers read and
write state as if it were a local directory — no SDK, no special imports.
The runtime patches Python builtins at startup to intercept file operations on configured mount paths and translates them to HTTP requests over Unix sockets to connector sidecar processes. Each connector adapts those HTTP requests to a specific storage backend (S3, GCS, Redis).
The runtime is a dumb translator — it intercepts Python file I/O and forwards operations to the proxy. The proxy is smart — it decides buffering strategy, atomicity guarantees, CAS behavior, and retries. Adding new guarantee types means building a new proxy connector, not modifying the runtime.
Architecture#
Handler code (actor runtime)
open("/state/model.bin", "rb")
│
▼
patched builtins.open
(installed by _install_state_proxy_hooks)
│
▼
HTTP GET /keys/model.bin
over Unix socket: /var/run/asya/state/{name}.sock
│
▼
Connector sidecar (e.g. s3-buffered-lww)
│
▼
Storage backend (S3, Redis)
Pod layout#
When stateProxy is configured, the Crossplane composition adds one connector container per
mount to the actor pod. All connector containers share the state-sockets
emptyDir volume with the runtime container.
Pod
├── asya-runtime (runtime + user handler)
│ ├── /var/run/asya/state/ ← state-sockets volume
│ └── /state/meta/ ← logical mount path (no real FS)
│
├── asya-state-proxy-meta (connector sidecar)
│ ├── /var/run/asya/state/meta.sock ← Unix socket
│ └── env: CONNECTOR_SOCKET, STATE_BUCKET, ...
│
└── asya-state-proxy-media (another mount, if configured)
└── /var/run/asya/state/media.sock
Configuration#
AsyncActor CRD#
Configure state proxy in spec.stateProxy:
apiVersion: asya.sh/v1alpha1
kind: AsyncActor
metadata:
name: model-inference
namespace: prod
spec:
actor: model-inference
stateProxy:
- name: weights # DNS label, becomes socket name: weights.sock
mount:
path: /state/weights # Absolute path intercepted in runtime container
writeMode: buffered # buffered (default) or passthrough
connector:
image: ghcr.io/deliveryhero/asya-state-proxy-s3-buffered-lww:v1.0.0
env:
- name: STATE_BUCKET
value: my-model-weights
- name: AWS_REGION
value: us-east-1
resources:
requests:
cpu: 50m
memory: 64Mi
- name: cache
mount:
path: /state/cache
writeMode: passthrough
connector:
image: ghcr.io/deliveryhero/asya-state-proxy-s3-passthrough:v1.0.0
env:
- name: STATE_BUCKET
value: my-inference-cache
XRD fields#
Defined in deploy/helm-charts/asya-crossplane/templates/xrd-asyncactor.yaml:
| Field | Type | Description |
|---|---|---|
stateProxy[].name |
string | Unique mount identifier (DNS label, 1-63 chars) |
stateProxy[].mount.path |
string | Absolute path in runtime container |
stateProxy[].writeMode |
enum | buffered (default) or passthrough |
stateProxy[].connector.image |
string | Connector container image |
stateProxy[].connector.env |
array | Backend-specific environment variables |
stateProxy[].connector.resources |
object | Kubernetes resource requests/limits |
Crossplane Composition Step#
When the Crossplane composition processes an AsyncActor with stateProxy entries, it:
- Adds a
state-socketsemptyDir volume to the pod. - For each mount entry, adds a connector container:
- Name:
asya-state-proxy-{name} - Env:
CONNECTOR_SOCKET=/var/run/asya/state/{name}.sockplus anyconnector.envvalues from the spec - VolumeMount:
state-socketsat/var/run/asya/state - On the
asya-runtimecontainer: - Mounts
state-socketsat/var/run/asya/state - Sets
ASYA_STATE_PROXY_MOUNTSenv var
The ASYA_STATE_PROXY_MOUNTS format is semicolon-separated entries:
{name}:{mountPath}:write={writeMode}[;{name}:{mountPath}:write={writeMode}]*
Example:
weights:/state/weights:write=buffered;cache:/state/cache:write=passthrough
Runtime Hooks#
Source: src/asya-runtime/asya_runtime.py, function _install_state_proxy_hooks
When ASYA_STATE_PROXY_MOUNTS is set, the runtime installs hooks before the
handler initialises:
builtins.open → _patched_open
os.stat → _patched_stat
os.listdir → _patched_listdir
os.unlink → _patched_unlink
os.remove → _patched_unlink (alias)
os.makedirs → _patched_makedirs (no-op for mount paths)
os.listxattr → _patched_listxattr
os.getxattr → _patched_getxattr
os.setxattr → _patched_setxattr
Calls to non-mount paths are forwarded to the original functions unchanged.
pathlib.Path.open() delegates to builtins.open, and pathlib.Path.exists()
delegates to os.stat. Patching the low-level functions catches all high-level
wrappers including pathlib.
Mount resolution#
_resolve_mount normalises the path and checks whether it starts with a
configured mount prefix. If it matches, the key (path relative to mount) and the
mount's socket path are returned.
Handler usage#
import json, os
async def handler(payload):
# Read state
with open("/state/weights/model.bin", "rb") as f:
weights = f.read()
# Write state
with open("/state/cache/result.json", "w") as f:
f.write(json.dumps(result))
# Check existence
if os.path.exists("/state/cache/result.json"):
...
# List directory
files = os.listdir("/state/weights/")
# Delete
os.remove("/state/cache/stale.json")
# Extended attributes (xattr)
url = os.getxattr("/state/weights/model.bin", "user.asya.url")
attrs = os.listxattr("/state/weights/model.bin")
os.setxattr("/state/weights/model.bin", "user.asya.content_type",
b"application/octet-stream")
return payload
Limitations and Compatibility#
The state proxy works by patching Python builtins at the interpreter level. This means it intercepts Python-level file operations but cannot intercept C-level system calls made by native extensions.
What is patched#
| Python API | Patched | Notes |
|---|---|---|
builtins.open() |
✅ | Primary file I/O — most libraries use this |
pathlib.Path.open() |
✅ | Delegates to builtins.open internally |
pathlib.Path.read_bytes() / read_text() |
✅ | Delegates to builtins.open internally |
pathlib.Path.write_bytes() / write_text() |
✅ | Delegates to builtins.open internally |
pathlib.Path.exists() |
✅ | Delegates to os.stat internally |
os.stat() |
✅ | Returns synthetic stat_result (see below) |
os.path.exists() |
✅ | Works via patched os.stat() internally |
os.path.isfile() / os.path.isdir() |
✅ | Works via patched os.stat() internally |
os.path.getsize() |
✅ | Works via patched os.stat() internally |
os.listdir() |
✅ | Lists keys from connector |
os.remove() / os.unlink() |
✅ | Deletes key via connector |
os.makedirs() |
✅ | No-op for mount paths (flat key-value store) |
os.listxattr() |
✅ | Lists available metadata attributes from connector |
os.getxattr() |
✅ | Reads metadata attribute from connector |
os.setxattr() |
✅ | Sets metadata attribute on connector |
What is NOT patched#
| Python API | Patched | Workaround |
|---|---|---|
os.scandir() |
❌ | Use os.listdir() instead |
os.open() |
❌ | Use builtins.open() instead |
os.rename() / os.replace() |
❌ | Read + write + delete manually |
os.walk() |
❌ | Use os.listdir() recursively |
os.chmod() / os.chown() |
❌ | Not applicable (no real filesystem) |
mmap.mmap() |
❌ | Read into io.BytesIO instead |
shutil.copy2() |
❌ | Fails copying filesystem metadata |
Filesystem metadata#
os.stat() returns a synthetic os.stat_result with limited fields:
| Field | Value | Real? |
|---|---|---|
st_size |
Actual content size | ✅ |
st_mode |
0o644 (file) / 0o755 (dir) |
❌ Synthetic |
st_uid / st_gid |
Current process user | ❌ Synthetic |
st_ino |
0 |
❌ Not available |
st_dev |
0 |
❌ Not available |
st_nlink |
1 |
❌ Always 1 |
st_atime / st_mtime / st_ctime |
0 |
❌ Not available |
Libraries that depend on modification times (e.g. caching based on mtime) will
not work correctly.
Libraries that work#
Any pure-Python library that reads or writes via builtins.open() works
transparently:
import json, csv, pickle, configparser
# json — uses open() internally
with open("/state/meta/config.json") as f:
config = json.load(f)
# csv — wraps a file object from open()
with open("/state/meta/data.csv") as f:
reader = csv.reader(f)
rows = list(reader)
# pickle — uses open() in binary mode
with open("/state/meta/model.pkl", "rb") as f:
model = pickle.load(f)
# yaml (PyYAML) — wraps a file object from open()
import yaml
with open("/state/meta/spec.yaml") as f:
spec = yaml.safe_load(f)
# PIL/Pillow — pass a file object (not a path string)
from PIL import Image
with open("/state/meta/photo.png", "rb") as f:
img = Image.open(f)
img.load() # Force read before file closes
Libraries that do NOT work (with path strings)#
Libraries with C extensions that perform their own system-level I/O bypass the Python-level patch:
# These will FAIL — they use C-level file I/O, not builtins.open()
import pandas as pd
df = pd.read_parquet("/state/meta/data.parquet") # pyarrow C extension
df = pd.read_csv("/state/meta/data.csv") # C parser by default
import torch
model = torch.load("/state/meta/model.pt") # C extension
import numpy as np
arr = np.load("/state/meta/array.npy") # C extension
import h5py
f = h5py.File("/state/meta/data.h5") # HDF5 C library
import cv2
img = cv2.imread("/state/meta/photo.png") # OpenCV C library
Workaround: read into BytesIO#
Read the data through open() first, then pass an io.BytesIO buffer to the
library:
import io
# pandas
with open("/state/meta/data.parquet", "rb") as f:
df = pd.read_parquet(io.BytesIO(f.read()))
# pandas CSV (force Python parser, or use BytesIO)
with open("/state/meta/data.csv", "rb") as f:
df = pd.read_csv(io.BytesIO(f.read()))
# torch
with open("/state/meta/model.pt", "rb") as f:
model = torch.load(io.BytesIO(f.read()))
# numpy
with open("/state/meta/array.npy", "rb") as f:
arr = np.load(io.BytesIO(f.read()))
# Pillow (already works with file objects)
with open("/state/meta/photo.png", "rb") as f:
img = Image.open(io.BytesIO(f.read()))
For writes, buffer first and write through open():
# pandas to parquet
buf = io.BytesIO()
df.to_parquet(buf)
with open("/state/meta/data.parquet", "wb") as f:
f.write(buf.getvalue())
# torch save
buf = io.BytesIO()
torch.save(model.state_dict(), buf)
with open("/state/meta/model.pt", "wb") as f:
f.write(buf.getvalue())
Directory semantics#
The state proxy is a flat key-value store, not a real filesystem. Paths like
/state/meta/subdir/file.txt are stored as the key subdir/file.txt — there is
no actual subdir/ directory. os.makedirs() is a no-op for mount paths.
os.listdir() uses the connector's prefix-based listing to simulate directory
entries.
HTTP Protocol#
Source: src/asya-state-proxy/asya_state_proxy/server.py
Each connector runs ConnectorServer on a Unix socket. The runtime connects
via _UnixHTTPClient (a subclass of http.client.HTTPConnection).
Socket path: /var/run/asya/state/{name}.sock
| Method | Path | Description |
|---|---|---|
GET |
/keys/{key} |
Read key, returns body bytes |
PUT |
/keys/{key} |
Write key, body is the data |
HEAD |
/keys/{key} |
Stat key, returns Content-Length and X-Is-File headers |
DELETE |
/keys/{key} |
Delete key |
GET |
/keys/?prefix=X&delimiter=Y |
List keys under prefix |
GET |
/meta/{key} |
List available xattr names |
GET |
/meta/{key}?attr=X |
Read xattr value |
PUT |
/meta/{key}?attr=X |
Set xattr value (JSON body {"value": "..."}) |
GET |
/healthz |
Liveness check, returns {"status": "ready"} |
List response body:
{"keys": ["file.txt", "other.bin"], "prefixes": ["subdir/"]}
Write Modes#
Write mode controls how the runtime buffers data before sending it to the connector.
buffered#
_BufferedWriteFile collects all writes into a SpooledTemporaryFile
(spills to disk above 4 MiB). On close(), it sends a single PUT with
Content-Length.
- Supports
seek()andtell()before close - Suitable for small-to-medium files where the full size is needed upfront
- Default for all connectors except passthrough
passthrough#
_PassthroughWriteFile opens the HTTP connection immediately, sends each
write() call as an HTTP chunk using chunked transfer encoding, and finalises on
close().
- Does not buffer in memory — suitable for large files
- Does not support
seek()ortell() - Used when
writeMode: passthroughis set in the AsyncActor spec
Connector Types#
Source: src/asya-state-proxy/asya_state_proxy/connectors/
All connectors implement the StateProxyConnector abstract base class
(src/asya-state-proxy/asya_state_proxy/interface.py):
class StateProxyConnector(ABC):
def read(self, key: str) -> BinaryIO: ...
def write(self, key: str, data: BinaryIO, size: int | None = None, *, exclusive: bool = False) -> None: ...
def exists(self, key: str) -> bool: ...
def stat(self, key: str) -> KeyMeta | None: ...
def list(self, key_prefix: str, delimiter: str = "/") -> ListResult: ...
def delete(self, key: str) -> None: ...
When exclusive=True, the write succeeds only if the key does not already exist
(atomic create-if-absent). If the key is present, the connector raises
FileExistsError. The runtime triggers exclusive writes when the HTTP PUT
request includes an If-None-Match: * header, which corresponds to opening a
file with mode "x" in Python (open(path, "x")).
s3-buffered-lww#
Image suffix: s3-buffered-lww
Last-Write-Wins semantics. Writes always overwrite the existing object. No conflict detection. Suitable for state that is written by a single actor instance.
Required env: STATE_BUCKET. Optional: STATE_PREFIX, AWS_REGION,
AWS_ENDPOINT_URL.
s3-passthrough#
Image suffix: s3-passthrough
Streaming writes directly to S3 via upload_fileobj. Reads return a
StreamingBody wrapper without buffering. No conflict detection.
Required env: STATE_BUCKET. Optional: STATE_PREFIX, AWS_REGION,
AWS_ENDPOINT_URL.
s3-buffered-cas#
Image suffix: s3-buffered-cas
Check-And-Set with ETag-based conflict detection. On read, the ETag is cached
in memory. On write, a conditional PutObject with IfMatch: {cached_etag} is
sent. If the object was modified externally since the last read, S3 returns
PreconditionFailed, which the connector maps to FileExistsError.
Write is unconditional for keys that have never been read (new key path).
Required env: STATE_BUCKET. Optional: STATE_PREFIX, AWS_REGION,
AWS_ENDPOINT_URL.
gcs-buffered-lww#
Image suffix: gcs-buffered-lww
Last-Write-Wins semantics for Google Cloud Storage. Writes always overwrite the existing blob. No conflict detection. Suitable for state written by a single actor instance.
Required env: STATE_BUCKET. Optional: STATE_PREFIX, GCS_PROJECT,
STORAGE_EMULATOR_HOST.
gcs-buffered-cas#
Image suffix: gcs-buffered-cas
Check-And-Set with GCS generation-based conflict detection. On read, the object
generation number is cached in memory. On write, if_generation_match is used
as a precondition. If the object was modified externally since the last read,
GCS returns PreconditionFailed (412), which the connector maps to
FileExistsError.
Write is unconditional for keys that have never been read (new key path).
Required env: STATE_BUCKET. Optional: STATE_PREFIX, GCS_PROJECT,
STORAGE_EMULATOR_HOST.
redis-buffered-cas#
Image suffix: redis-buffered-cas
Check-And-Set with Redis WATCH/MULTI/EXEC optimistic locking. On write, the key
is watched; if it changes before the transaction executes, WatchError is raised
and mapped to FileExistsError.
Required env: REDIS_URL (e.g. redis://localhost:6379/0). Optional:
STATE_PREFIX.
Extended Attributes (xattr)#
Connectors expose backend-specific metadata through Python's os.getxattr,
os.setxattr, and os.listxattr APIs. The runtime intercepts these calls on
state mount paths and translates them to HTTP requests on the connector's
/meta/ endpoints.
All Asya xattrs use the user.asya.{attr} naming convention. The runtime strips
the user.asya. prefix before sending to the connector and prepends it when
returning results from os.listxattr.
Attributes by connector#
| Connector | url |
presigned_url / signed_url |
etag / generation |
content_type |
version |
ttl |
|---|---|---|---|---|---|---|
s3-* |
R | R (presigned_url) |
R (etag) |
RW | R | - |
gcs-* |
R | R (signed_url) |
R (generation) |
RW | R (metageneration) |
- |
redis-* |
- | - | - | - | - | RW |
R = read-only, RW = read-write, - = not supported.
Error Mapping#
Source: src/asya-runtime/asya_runtime.py, _raise_for_status
HTTP error responses from the connector are mapped to standard Python exceptions:
| HTTP status | Python exception |
|---|---|
| 400 | ValueError |
| 403 | PermissionError |
| 404 | FileNotFoundError |
| 409 | FileExistsError (CAS conflict) |
| 413 | OSError(errno.EFBIG, ...) |
| 500 | OSError |
| 503 | ConnectionError |
| 504 | TimeoutError |
Handler code can catch these exceptions directly:
try:
with open("/state/cache/result.json", "rb") as f:
cached = json.load(f)
except FileNotFoundError:
cached = None # Cache miss
Connector Environment Variables#
| Variable | Connectors | Description |
|---|---|---|
CONNECTOR_SOCKET |
all | Unix socket path (set by Crossplane composition) |
STATE_BUCKET |
s3-, gcs- | S3 bucket or GCS bucket name |
STATE_PREFIX |
s3-, gcs-, redis | Key prefix within bucket or namespace |
AWS_REGION |
s3-* | AWS region (default: us-east-1) |
AWS_ENDPOINT_URL |
s3-* | Custom endpoint for MinIO/LocalStack |
GCS_PROJECT |
gcs-* | GCP project ID (auto-detected from credentials) |
STORAGE_EMULATOR_HOST |
gcs-* | Override for fake-gcs-server in testing |
STATE_PRESIGN_TTL |
s3-, gcs- | Presigned/signed URL expiration in seconds (default: 3600) |
REDIS_URL |
redis-* | Redis connection URL |
Related Components#
- Crossplane Compositions — reads
stateProxyfrom AsyncActor XR and renders connector containers - Runtime — installs file I/O hooks from
ASYA_STATE_PROXY_MOUNTS - XRD definition:
deploy/helm-charts/asya-crossplane/templates/xrd-asyncactor.yaml - Connector server:
src/asya-state-proxy/asya_state_proxy/server.py - Connector interface:
src/asya-state-proxy/asya_state_proxy/interface.py