Skip to Content
Mélodium 0.10.1 is now available!
DocsExemplesPipeline worker cloud

Pipeline worker cloud

Source: 14_cloud_worker_pipeline

Lit un fichier texte local, l’envoie à un runner cloud pour un traitement de comptage de mots (un compte par ligne), et écrit les résultats localement. Le runner cloud est libéré explicitement une fois le pipeline terminé.

Exécution

melodium run 14_cloud_worker_pipeline/Compo.toml \ --api_token "my-api-token" \ --input data.txt
[…] info: cloud: provisioning cloud runner… […] info: cloud: runner provisioned, distributor connecting… […] info: cloud: pipeline complete

Fonctionnement

Le flux global :

See in Compositeur Studio

Nettoyage explicite

Contrairement aux autres exemples distribués où le runner tourne indéfiniment, ce pipeline appelle stop[distributor=distributor]() une fois l’écriture terminée :

write.completed -> distribStop.trigger

Cela libère le runner cloud et arrête la facturation. Le max_duration = 600 sur provisionRunner sert de sécurité si le nettoyage échoue.

Types d’envoi et de réception différents

Le sous-traitement dispatch envoie des byte en amont (contenu brut du fichier) et reçoit des string en retour (comptages de mots), avec des noms correspondants sur le traitement distant :

sendStream<byte>[distributor=distributor](name="data") recvStream<string>[distributor=distributor](name="result")

Le traitement distant transform

Le modèle WordCounter embarque une fonction JavaScript qui compte les mots séparés par des espaces par ligne. Il est défini dans le même fichier que main mais s’exécute sur le runner distant :

model WordCounter() : JavaScriptEngine { code = ${{function countWords(line) { var s = line.trim(); if (s.length === 0) return '0'; return s.split(/\s+/).length.toString(); }}} }

Dépendances

[dependencies] std = "0.10.1" # flux de base, journalisation, structures de données fs = "0.10.1" # lecture/écriture de fichiers locaux encoding = "0.10.1" # encodage / décodage UTF-8 javascript = "0.10.1" # moteur JavaScript embarqué json = "0.10.1" # parsing et sérialisation JSON work = "0.10.1" # provisionnement de runners cloud distrib = "0.10.1" # distribution de flux entre runners