Distributed Work
An HTTP front-end that dispatches each POST /process request body to a remote worker node, which applies a line-level text transformation and streams the result back. The worker is provisioned on demand using DistantEngine — no second process needs to be started manually.
Running
melodium run 06_distributed_work/Compo.toml \
--api_token "my-api-token"$ curl -X POST http://127.0.0.1:8080/process -d "hello world"
[WORKER] hello worldHow it works
This example introduces the two key distributed primitives:
DistantEngine— contacts the Mélodium Services API and provisions a cloud runnerDistributionEngine— connects to the runner and routes work to a named treatment
model runner: DistantEngine(
api_url = |wrap<string>("https://api.melodium.tech/0.1"),
api_token = |wrap<string>(api_token)
)
model distributor: DistributionEngine(
treatment = "distributed_work/worker::process",
version = "0.1.0"
)Startup sequence
The HTTP server only starts once the distribution engine signals ready — meaning the remote worker is connected and the treatment is reachable.
Dispatching work
dispatchToWorker allocates a distribution_id per request using distribute, then uses named sendStream and recvStream to exchange byte streams with the remote treatment:
distribute.distribution_id -> sendStream.distribution_id
distribute.distribution_id -> recvStream.distribution_id
Self.data -> sendStream.data
recvStream.data -> Self.dataThe stream names ("data") must match the input/output port names of the remote treatment.
The worker (worker.mel)
The worker file defines two treatments: runWorker (the entrypoint, logs a ready message) and process (the actual transformation). process splits the incoming byte stream into lines, prepends [WORKER] to each using entry + format, and re-encodes the result.
Dependencies
[dependencies]
std = "0.10.1" # core flows, logging, data structures
http = "0.10.1" # HTTP server and client
net = "0.10.1" # IP address helpers
encoding = "0.10.1" # UTF-8 encode / decode
distrib = "0.10.1" # stream distribution across runners
work = "0.10.1" # cloud runner provisioning