Skip to Content
Mélodium 0.10.1 is now available!
DocsExamplesDistributed Work

Distributed Work

Source: 06_distributed_work

An HTTP front-end that dispatches each POST /process request body to a remote worker node, which applies a line-level text transformation and streams the result back. The worker is provisioned on demand using DistantEngine; no second process needs to be started manually.

Running

melodium run 06_distributed_work/Compo.toml \ --api_token "my-api-token"
$ curl -X POST http://127.0.0.1:8080/process -d "hello world" [WORKER] hello world

How it works

This example introduces the two key distributed primitives:

  • DistantEngine: contacts the Mélodium Services API and provisions a cloud runner
  • DistributionEngine: connects to the runner and routes work to a named treatment
model runner: DistantEngine( api_url = |wrap<string>("https://api.melodium.tech/0.1"), api_token = |wrap<string>(api_token) ) model distributor: DistributionEngine( treatment = "distributed_work/worker::process", version = "0.1.0" )

Startup sequence

The HTTP server only starts once the distribution engine signals ready, meaning the remote worker is connected and the treatment is reachable.

See in Compositeur Studio

Dispatching work

dispatchToWorker allocates a distribution_id per request using distribute, then uses named sendStream and recvStream to exchange byte streams with the remote treatment:

distribute.distribution_id -> sendStream.distribution_id distribute.distribution_id -> recvStream.distribution_id Self.data -> sendStream.data recvStream.data -> Self.data

The stream names ("data") must match the input/output port names of the remote treatment.

The worker (worker.mel)

The worker file defines two treatments: runWorker (the entrypoint, logs a ready message) and process (the actual transformation). process splits the incoming byte stream into lines, prepends [WORKER] to each using entry + format, and re-encodes the result.

Dependencies

[dependencies] std = "0.10.1" # core flows, logging, data structures http = "0.10.1" # HTTP server and client net = "0.10.1" # IP address helpers encoding = "0.10.1" # UTF-8 encode / decode distrib = "0.10.1" # stream distribution across runners work = "0.10.1" # cloud runner provisioning