Skip to Content
Mélodium 0.10.1 is now available!
DocsExemplesTravail distribué

Travail distribué

Source: 06_distributed_work

Un serveur HTTP qui achemine chaque corps de requête POST /process vers un nœud worker distant, qui applique une transformation de texte ligne par ligne et renvoie le résultat en flux. Le worker est provisionné à la demande avec DistantEngine ; aucun second processus n’a besoin d’être démarré manuellement.

Exécution

melodium run 06_distributed_work/Compo.toml \ --api_token "my-api-token"
$ curl -X POST http://127.0.0.1:8080/process -d "hello world" [WORKER] hello world

Fonctionnement

Cet exemple introduit les deux primitives distribuées clés :

  • DistantEngine : contacte l’API Mélodium Services et provisionne un runner cloud
  • DistributionEngine : se connecte au runner et route le travail vers un traitement nommé
model runner: DistantEngine( api_url = |wrap<string>("https://api.melodium.tech/0.1"), api_token = |wrap<string>(api_token) ) model distributor: DistributionEngine( treatment = "distributed_work/worker::process", version = "0.1.0" )

Séquence de démarrage

Le serveur HTTP ne démarre que lorsque le moteur de distribution signale ready, ce qui signifie que le worker distant est connecté et que le traitement est accessible.

Voir dans Compositeur Studio

Acheminement du travail

dispatchToWorker alloue un distribution_id par requête via distribute, puis utilise sendStream et recvStream nommés pour échanger des flux d’octets avec le traitement distant :

distribute.distribution_id -> sendStream.distribution_id distribute.distribution_id -> recvStream.distribution_id Self.data -> sendStream.data recvStream.data -> Self.data

Les noms des flux ("data") doivent correspondre aux noms des ports d’entrée/sortie du traitement distant.

Le worker (worker.mel)

Le fichier worker définit deux traitements : runWorker (le point d’entrée, journalise un message de prêt) et process (la transformation réelle). process divise le flux d’octets entrant en lignes, préfixe chacune avec [WORKER] via entry + format, et réencode le résultat.

Dépendances

[dependencies] std = "0.10.1" # flux de base, journalisation, structures de données http = "0.10.1" # serveur et client HTTP net = "0.10.1" # utilitaires d'adresses IP encoding = "0.10.1" # encodage / décodage UTF-8 distrib = "0.10.1" # distribution de flux entre runners work = "0.10.1" # provisionnement de runners cloud