Skip to Content
Mélodium 0.10.1 is now available!
DocsExamplesCloud Worker Pipeline

Cloud Worker Pipeline

Source: 14_cloud_worker_pipeline

Reads a local text file, sends it to a cloud runner for word-count processing (one count per line), and writes the results locally. The cloud runner is released explicitly after the pipeline completes.

Running

melodium run 14_cloud_worker_pipeline/Compo.toml \ --api_token "my-api-token" \ --input data.txt
[…] info: cloud: provisioning cloud runner… […] info: cloud: runner provisioned, distributor connecting… […] info: cloud: pipeline complete

How it works

The overall flow:

See in Compositeur Studio

Explicit cleanup

Unlike other distributed examples where the runner runs indefinitely, this pipeline calls stop[distributor=distributor]() once writing is complete:

write.completed -> distribStop.trigger

This releases the cloud runner and stops billing. The max_duration = 600 on provisionRunner acts as a safety cap if cleanup fails.

Different send and receive types

The dispatch sub-treatment sends byte upstream (raw file content) and receives string back (word counts), with matching names on the remote treatment:

sendStream<byte>[distributor=distributor](name="data") recvStream<string>[distributor=distributor](name="result")

The remote transform treatment

The WordCounter model embeds a JavaScript function that counts whitespace-separated words per line. It is defined in the same file as main but runs on the remote runner:

model WordCounter() : JavaScriptEngine { code = ${{function countWords(line) { var s = line.trim(); if (s.length === 0) return '0'; return s.split(/\s+/).length.toString(); }}} }

Dependencies

[dependencies] std = "0.10.1" # core flows, logging, data structures fs = "0.10.1" # local file I/O encoding = "0.10.1" # UTF-8 encode / decode javascript = "0.10.1" # embedded JavaScript engine json = "0.10.1" # JSON parsing and serialisation work = "0.10.1" # cloud runner provisioning distrib = "0.10.1" # stream distribution across runners