Skip to Content
Mélodium 0.10.1 is now available!
DocsExamplesDistributed Work

Distributed Work

Source: 06_distributed_work

An HTTP front-end that dispatches each POST /process request body to a remote worker node, which applies a line-level text transformation and streams the result back. The worker is provisioned on demand using DistantEngine — no second process needs to be started manually.

Running

melodium run 06_distributed_work/Compo.toml \ --api_token "my-api-token"
$ curl -X POST http://127.0.0.1:8080/process -d "hello world" [WORKER] hello world

How it works

This example introduces the two key distributed primitives:

  • DistantEngine — contacts the Mélodium Services API and provisions a cloud runner
  • DistributionEngine — connects to the runner and routes work to a named treatment
model runner: DistantEngine( api_url = |wrap<string>("https://api.melodium.tech/0.1"), api_token = |wrap<string>(api_token) ) model distributor: DistributionEngine( treatment = "distributed_work/worker::process", version = "0.1.0" )

Startup sequence

The HTTP server only starts once the distribution engine signals ready — meaning the remote worker is connected and the treatment is reachable.

See in Compositeur Studio

Dispatching work

dispatchToWorker allocates a distribution_id per request using distribute, then uses named sendStream and recvStream to exchange byte streams with the remote treatment:

distribute.distribution_id -> sendStream.distribution_id distribute.distribution_id -> recvStream.distribution_id Self.data -> sendStream.data recvStream.data -> Self.data

The stream names ("data") must match the input/output port names of the remote treatment.

The worker (worker.mel)

The worker file defines two treatments: runWorker (the entrypoint, logs a ready message) and process (the actual transformation). process splits the incoming byte stream into lines, prepends [WORKER] to each using entry + format, and re-encodes the result.

Dependencies

[dependencies] std = "0.10.1" # core flows, logging, data structures http = "0.10.1" # HTTP server and client net = "0.10.1" # IP address helpers encoding = "0.10.1" # UTF-8 encode / decode distrib = "0.10.1" # stream distribution across runners work = "0.10.1" # cloud runner provisioning