How to stream live progress updates from your actor handlers to gateway clients in real-time.


What are FLY events?#

FLY events are ephemeral streaming messages sent upstream from your actor handler to the gateway. They bypass message queues and are delivered directly to connected clients via Server-Sent Events (SSE).

Think of FLY as the fast lane for live updates:

  • FLY events: Real-time, ephemeral, reach only connected SSE clients
  • Payload data: Persistent, routed through queues, visible to all downstream actors

Use FLY for: - Streaming LLM tokens as they're generated - Progress updates ("Processing chunk 3/10...") - Live status messages ("Connecting to database...", "Model loaded") - Debug markers during development

Do NOT use FLY for: - Data that downstream actors need to read - Information that must survive pause/resume - Final results (use yield payload instead)


How to use FLY#

FLY is an ABI verb. Yield a tuple with "FLY" as the first element and a dict as the second:

async def my_handler(payload: dict):
    # Stream a status update
    yield "FLY", {"type": "status", "message": "Starting processing..."}

    # Do work
    result = await process_data(payload["input"])

    # Emit final result downstream
    payload["result"] = result
    yield payload

Signature#

yield "FLY", <dict>

The dict can contain any JSON-serializable data. Common patterns:

# Status update
yield "FLY", {"type": "status", "message": "Loading model..."}

# Progress percentage
yield "FLY", {"type": "progress", "percent": 45, "step": "preprocessing"}

# LLM token (partial=True mirrors Google ADK's Event(partial=True))
yield "FLY", {"partial": True, "text": "Hello"}

FLY vs History#

FLY events are ephemeral — they reach only clients connected at the moment they're sent. If a client disconnects and reconnects, FLY events are gone.

For data that must survive pause/resume or be readable by downstream actors, append to payload.a2a.task.history[] instead:

async def analyst(payload: dict):
    # Ephemeral streaming (live clients only)
    yield "FLY", {"type": "status", "message": "Analyzing risk..."}

    result = await analyze_risk(payload)

    # Persistent history (survives pause/resume, visible to downstream)
    if "a2a" not in payload:
        payload["a2a"] = {}
    if "task" not in payload["a2a"]:
        payload["a2a"]["task"] = {}
    if "history" not in payload["a2a"]["task"]:
        payload["a2a"]["task"]["history"] = []

    payload["a2a"]["task"]["history"].append({
        "timestamp": datetime.utcnow().isoformat(),
        "actor": "analyst",
        "event": "risk_analysis_complete",
        "data": {"risk_level": result["risk_level"]}
    })

    payload["result"] = result
    yield payload
Use case FLY History
Live token streaming (LLMs)
Progress bars for connected clients
Debug logs during development
Audit trail (who did what when)
Data for downstream actors
Data that survives pause/resume

Common Patterns#

1. Streaming LLM tokens#

Stream tokens as they're generated, then send the complete response downstream:

async def llm_handler(payload: dict):
    tokens = []

    async for token in call_llm_stream(payload["prompt"]):
        # partial=True mirrors ADK's Event(partial=True, content=Part(text=token))
        yield "FLY", {"partial": True, "text": token}
        tokens.append(token)

    # Final response (non-partial)
    payload["response"] = "".join(tokens)
    yield payload

ADK equivalent:

# Google ADK
async for token in llm.stream(prompt):
    yield Event(partial=True, content=Part(text=token))
yield Event(partial=False, content=Part(text=full_response))

Asya:

# Asya
async for token in llm.stream(prompt):
    yield "FLY", {"partial": True, "text": token}
yield payload  # downstream frame is the final (non-partial) event

2. Progress updates for long-running jobs#

async def batch_processor(payload: dict):
    items = payload["items"]
    total = len(items)
    results = []

    for i, item in enumerate(items):
        # Stream progress
        percent = int((i / total) * 100)
        yield "FLY", {
            "type": "progress",
            "percent": percent,
            "current": i + 1,
            "total": total,
            "message": f"Processing item {i+1}/{total}"
        }

        result = await process_item(item)
        results.append(result)

    payload["results"] = results
    yield payload

3. Multi-stage status updates#

async def data_pipeline(payload: dict):
    yield "FLY", {"stage": "download", "message": "Downloading data..."}
    data = await download(payload["url"])

    yield "FLY", {"stage": "validate", "message": "Validating schema..."}
    validated = await validate(data)

    yield "FLY", {"stage": "transform", "message": "Transforming data..."}
    transformed = await transform(validated)

    yield "FLY", {"stage": "upload", "message": "Uploading results..."}
    result_url = await upload(transformed)

    payload["result_url"] = result_url
    yield payload

4. Debug markers#

async def debug_handler(payload: dict):
    yield "FLY", {"debug": "handler_start", "payload_keys": list(payload.keys())}

    intermediate = await step_one(payload)
    yield "FLY", {"debug": "step_one_complete", "output": intermediate}

    final = await step_two(intermediate)
    yield "FLY", {"debug": "step_two_complete", "output": final}

    payload["result"] = final
    yield payload

How FLY reaches clients#

Actor handler
    yield "FLY", {...}
         │
         ▼
    Runtime
         │ sends FLY frame over Unix socket
         ▼
    Sidecar
         │ HTTP POST /mesh/{task_id}/fly
         ▼
    Gateway (mesh mode)
         │ broadcasts via pg_notify (no DB write)
         ▼
    PG LISTEN/NOTIFY
         │ cross-process delivery
         ▼
    Gateway (api mode)
         │ in-process subscribers
         ▼
    Connected SSE clients (/stream/{id} or /mesh/{id}/stream)

FLY events bypass message queues entirely — they travel from the sidecar directly to the mesh gateway via HTTP, then are broadcast to the API gateway via PG LISTEN/NOTIFY (in-memory, no disk persistence), and finally streamed to SSE clients. This makes them fast but ephemeral — clients connecting after task completion will NOT see historical FLY events.


MCP vs A2A: when to use which#

Both protocols expose your flows to external clients, but they serve different audiences and have different streaming semantics.

Aspect MCP A2A
Audience LLM clients, developer tools AI agents, orchestrators
Interaction Tool invocation (call + result) Task lifecycle (send, subscribe, cancel)
Streaming SSE on same connection SSE via subscribe endpoint
FLY events Delivered as event: partial (or artifact_update/status_update if payload matches A2A shapes) Same SSE event types
Session Requires MCP session (initialize handshake) Stateless per-request
Pause/resume Not supported Built-in (input_required state)
Multi-turn Not natively supported Context ID tracks conversation

Use MCP when: - Building LLM tool integrations (Claude, GPT, Gemini) - Your flow is a single request-response operation - You want standard MCP client compatibility

Use A2A when: - Building agent-to-agent communication - Your flow involves pause/resume (human-in-the-loop) - You need multi-turn conversations - You want structured streaming with TaskArtifactUpdateEvent

Use both when: - You want maximum interoperability

Register your flow with both protocols:

asya expose my-flow -d "Description" --mcp --a2a

FLY event shapes for A2A compatibility#

If you want FLY events to appear as standard A2A SSE event types (not just event: partial), structure the payload with the appropriate top-level key:

# Standard A2A artifact streaming (event type: artifact_update)
yield "FLY", {
    "artifact_update": {
        "artifact": {
            "parts": [{"kind": "text", "text": "token"}]
        }
    }
}

# Standard A2A status update (event type: status_update)
yield "FLY", {
    "status_update": {
        "state": "working",
        "message": {"role": "agent", "parts": [{"kind": "text", "text": "Processing..."}]}
    }
}

# Non-standard (event type: partial) — works with custom clients
yield "FLY", {"partial": True, "text": "token"}

Dual-gateway streaming latency#

In dual-gateway mode (separate api + mesh pods), FLY events are relayed via PostgreSQL:

Sidecar ---POST /mesh/{id}/fly---> Gateway mesh
                                       |
                                   writes to PostgreSQL
                                       |
Gateway api <---polls DB (500ms)-------+
    |
SSE client

This adds up to 500ms latency per FLY event. For progress updates and stage transitions this is acceptable. For per-token LLM streaming (~30 tokens/sec), events arrive in batches rather than individually.

Workarounds: - Run in single-gateway mode (api + mesh in one pod) for near-zero FLY latency via in-process channels - A future improvement will add a direct pub/sub relay (Redis/NATS) between mesh and api pods for sub-millisecond FLY delivery (see aint debt/jhre)


Client-side consumption#

Protocol-agnostic SSE#

Connect to /stream/{task_id} (API gateway) for protocol-agnostic FLY event streaming:

curl -N -H "Accept: text/event-stream" \
  http://gateway/stream/{task_id}

Or use /mesh/{task_id}/stream (mesh gateway) for internal mesh access:

curl -N -H "Accept: text/event-stream" \
  http://gateway/mesh/{task_id}/stream

Stream format:

event: partial
data: {"partial":true,"text":"Hello"}

event: partial
data: {"partial":true,"text":" world"}

event: update
data: {"id":"task-123","status":"succeeded","result":{...}}

FLY events arrive with event: partial by default. If the FLY payload contains A2A-specific keys (artifact_update, status_update, or message), the event type matches that key instead. Progress updates from the sidecar arrive with event: update. The final result arrives as the last update event with status: succeeded.

A2A Subscribe#

A2A clients receive FLY events via the Subscribe RPC:

{
  "jsonrpc": "2.0",
  "method": "subscribe",
  "params": {"task_id": "task-123"}
}

Response stream:

{"type": "fly", "data": {"partial": true, "text": "Hello"}}
{"type": "fly", "data": {"partial": true, "text": " world"}}
{"type": "update", "status": "succeeded", "result": {...}}

Testing FLY locally#

FLY events are filtered out by the actor() test wrapper (they're tuples, not dicts). To verify FLY emissions in tests, collect all yields:

async def collect_all(gen):
    """Collect all yields: both ABI commands and emitted frames."""
    return [e async for e in gen]

async def test_streaming():
    events = await collect_all(llm_handler({"query": "hello"}))

    # Verify FLY events
    fly_events = [e for e in events if isinstance(e, tuple) and e[0] == "FLY"]
    assert len(fly_events) > 0
    assert all(e[1].get("partial") is True for e in fly_events)
    assert all("text" in e[1] for e in fly_events)

    # Verify final payload
    payloads = [e for e in events if isinstance(e, dict)]
    assert payloads[-1]["response"] is not None

Performance considerations#

  • FLY events are not queued — they're delivered synchronously via HTTP from the sidecar to the gateway
  • High-frequency FLY (e.g., per-character token streaming) may add latency to each handler iteration
  • For extremely high-frequency updates (>100/sec), consider batching or sampling

Best practice: Stream tokens as they arrive (natural LLM cadence), but avoid yielding FLY in tight CPU-bound loops.


See also#

Topic Document
Streaming events reference (all event types, protocol mapping) docs/reference/specs/streaming-events.md
ABI protocol reference docs/reference/specs/abi-protocol.md
Gateway SSE endpoints docs/reference/components/core-gateway.md
Agentic patterns (pause/resume, history) docs/usage/guide-agentic-patterns.md
Testing handlers locally docs/reference/specs/abi-protocol.md