First Flow#
What you'll learn#
- How the Flow DSL describes pipelines as Python functions
- How to compile a flow into router actors
- What the compiler generates and why
- How to deploy and run a compiled flow
Prerequisites#
- A running Asya playground cluster (follow the Getting Started guide through step 4)
- Familiarity with actor pipelines (see Build Your First Pipeline)
asya-labinstalled:pip install git+https://github.com/deliveryhero/asya.git#subdirectory=src/asya-lab
Why flows?#
In the pipeline tutorial, you set the route manually in the envelope:
{"route": {"prev": [], "curr": "uppercaser", "next": ["word-counter"]}}
This works for linear chains, but becomes tedious when you need branching, loops, or conditional routing. The Flow DSL lets you describe the pipeline in Python, and the compiler generates the routing logic for you.
Step 1: Write a flow#
Create a file called text_pipeline.py:
from asya_lab.flow import flow
@flow
async def text_pipeline(p: dict) -> dict:
p = await normalize(p)
p = await analyze(p)
return p
async def normalize(p: dict) -> dict:
"""Lowercase and strip whitespace."""
text = p.get("text", "")
return {**p, "normalized": text.strip().lower()}
async def analyze(p: dict) -> dict:
"""Count characters and words."""
text = p.get("normalized", "")
return {
**p,
"char_count": len(text),
"word_count": len(text.split()),
}
The flow function (text_pipeline) describes the execution order: first normalize, then analyze. The handler functions below it are the actual actor implementations.
Step 2: Compile the flow#
Run the compiler:
asya flow compile text_pipeline.py --output-dir ./compiled/ --plot
Expected output:
[+] Successfully compiled flow to: compiled/routers.py
[+] Generated graphviz dot file: compiled/flow.dot
[+] Generated graphviz svg plot: compiled/flow.svg
Step 3: Inspect the generated code#
Open compiled/routers.py. You will see two router functions:
start_text_pipeline -- the entry point. It prepends the handler actors into route.next:
async def start_text_pipeline(payload: dict):
"""Entrypoint for flow 'text_pipeline'"""
_next = []
_next.append(resolve("normalize"))
_next.append(resolve("analyze"))
yield "SET", ".route.next[:0]", _next
yield payload
end_text_pipeline -- the exit point. It clears route.next so the message goes to x-sink:
async def end_text_pipeline(payload: dict):
"""Exitpoint for flow 'text_pipeline'"""
yield "SET", ".route.next", []
yield payload
The routers use the ABI yield protocol (yield "SET", ...) to modify the envelope's routing. The resolve() function maps handler names to deployed actor names at runtime via environment variables.
If you used --plot, open compiled/flow.svg to see a visual diagram of the flow.
Step 4: Build images#
You need two images: one for the router actors and one for each handler actor. For this tutorial, you can put everything in one image for simplicity.
Create a Dockerfile:
FROM python:3.13-slim
WORKDIR /app
COPY text_pipeline.py /app/
COPY compiled/routers.py /app/
Build and load:
docker build -t text-pipeline:v1 .
kind load docker-image text-pipeline:v1 --name asya-quickstart
Step 5: Deploy the actors#
Every function in the flow becomes a separate AsyncActor. Deploy the routers and handlers:
# flow-actors.yaml
# Entry router
apiVersion: asya.sh/v1alpha1
kind: AsyncActor
metadata:
name: start-text-pipeline
namespace: asya-demo
spec:
actor: start-text-pipeline
image: text-pipeline:v1
handler: routers.start_text_pipeline
env:
- name: ASYA_HANDLER_NORMALIZE
value: "text_pipeline.normalize"
- name: ASYA_HANDLER_ANALYZE
value: "text_pipeline.analyze"
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
---
# Handler: normalize
apiVersion: asya.sh/v1alpha1
kind: AsyncActor
metadata:
name: normalize
namespace: asya-demo
spec:
actor: normalize
image: text-pipeline:v1
handler: text_pipeline.normalize
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
---
# Handler: analyze
apiVersion: asya.sh/v1alpha1
kind: AsyncActor
metadata:
name: analyze
namespace: asya-demo
spec:
actor: analyze
image: text-pipeline:v1
handler: text_pipeline.analyze
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
The ASYA_HANDLER_* environment variables on the router tell resolve() how to map handler names to actor names. The variable name suffix (e.g., NORMALIZE) becomes the actor name in kebab-case (normalize).
Apply:
kubectl apply -f flow-actors.yaml
Step 6: Send a message#
Send a message to the start router. The router will set up the route for the full pipeline:
kubectl run aws-cli --rm -i --restart=Never --image=amazon/aws-cli \
--namespace asya-demo \
--env="AWS_ACCESS_KEY_ID=test" \
--env="AWS_SECRET_ACCESS_KEY=test" \
--env="AWS_DEFAULT_REGION=us-east-1" \
--command -- sh -c "
aws sqs send-message \
--endpoint-url=http://localstack-sqs.asya-demo:4566 \
--queue-url http://localstack-sqs.asya-demo:4566/000000000000/asya-asya-demo-start-text-pipeline \
--message-body '{\"id\":\"test-flow-1\",\"route\":{\"prev\":[],\"curr\":\"start-text-pipeline\",\"next\":[]},\"headers\":{},\"payload\":{\"text\":\" Hello World \"}}'
"
Step 7: Verify the result#
After the actors scale up and process the message, check x-sink:
SINK_POD=$(kubectl get pods -n asya-demo -l asya.sh/actor=x-sink -o jsonpath='{.items[0].metadata.name}')
kubectl logs -n asya-demo "$SINK_POD" -c asya-runtime --tail=10
The final payload should contain:
{
"text": " Hello World ",
"normalized": "hello world",
"char_count": 11,
"word_count": 2
}
How it works#
Here is the message flow:
- Message arrives at
start-text-pipelinewith emptyroute.next - The start router sets
route.nextto["normalize", "analyze"] - Sidecar forwards message to
normalize normalizeprocesses the payload, sidecar forwards toanalyzeanalyzeprocesses the payload,route.nextis empty, sidecar forwards to x-sink
The Flow DSL compiler automated the routing setup. Without it, you would need to manually specify route.next in every envelope you send, as you did in the pipeline tutorial.
Clean up#
kubectl delete asyncactor start-text-pipeline normalize analyze -n asya-demo
What you built#
You used the Flow DSL to:
- Describe a pipeline in familiar Python syntax
- Compile it into router actors that handle envelope routing
- Deploy the routers alongside your handler actors
- Run the pipeline by sending a message to the entry point
The flow compiler transforms Python control flow into message-passing chains. For simple sequences, this saves a few lines of YAML. The real payoff comes with branching and loops -- see the Flow DSL Reference for the full syntax.
Next steps#
- Add Human-in-the-Loop -- pause a pipeline for human approval
- Flow DSL Reference -- conditionals, loops, fan-out, error handling
- ABI Protocol Reference -- the yield protocol used by generated routers