Cloud Worker Pipeline
Source: 14_cloud_worker_pipeline
Reads a local text file, sends it to a cloud runner for word-count processing (one count per line), and writes the results locally. The cloud runner is released explicitly after the pipeline completes.
Running
melodium run 14_cloud_worker_pipeline/Compo.toml \
--api_token "my-api-token" \
--input data.txt[…] info: cloud: provisioning cloud runner…
[…] info: cloud: runner provisioned, distributor connecting…
[…] info: cloud: pipeline completeHow it works
The overall flow:
See in Compositeur StudioExplicit cleanup
Unlike other distributed examples where the runner runs indefinitely, this pipeline calls stop[distributor=distributor]() once writing is complete:
write.completed -> distribStop.triggerThis releases the cloud runner and stops billing. The max_duration = 600 on provisionRunner acts as a safety cap if cleanup fails.
Different send and receive types
The dispatch sub-treatment sends byte upstream (raw file content) and receives string back (word counts), with matching names on the remote treatment:
sendStream<byte>[distributor=distributor](name="data")
recvStream<string>[distributor=distributor](name="result")The remote transform treatment
The WordCounter model embeds a JavaScript function that counts whitespace-separated words per line. It is defined in the same file as main but runs on the remote runner:
model WordCounter() : JavaScriptEngine {
code = ${{function countWords(line) {
var s = line.trim();
if (s.length === 0) return '0';
return s.split(/\s+/).length.toString();
}}}
}Dependencies
[dependencies]
std = "0.10.1" # core flows, logging, data structures
fs = "0.10.1" # local file I/O
encoding = "0.10.1" # UTF-8 encode / decode
javascript = "0.10.1" # embedded JavaScript engine
json = "0.10.1" # JSON parsing and serialisation
work = "0.10.1" # cloud runner provisioning
distrib = "0.10.1" # stream distribution across runners