from hera.workflows import DAG, Script, Task, Workflow
from hera.workflows.models import (
ArchiveStrategy,
Arguments,
Artifact,
Inputs,
NoneStrategy,
Outputs,
Parameter,
S3Artifact,
)
with Workflow(
arguments=Arguments(
parameters=[
Parameter(
name="numParts",
value="4",
)
],
),
api_version="argoproj.io/v1alpha1",
kind="Workflow",
annotations={
"workflows.argoproj.io/description": 'This workflow demonstrates map-reduce using "key-only" artifacts.\nThe first task "split" produces a number of parts, each in the form of a JSON document, saving it to a bucket.\nEach "map" task then reads those documents, performs a map operation, and writes them out to a new bucket.\nFinally, "reduce" merges all the mapped documents into a final document.\n',
"workflows.argoproj.io/version": ">= 3.0.0",
},
generate_name="map-reduce-",
entrypoint="main",
) as w:
with DAG(
name="main",
) as invocator:
Task(
arguments=Arguments(
parameters=[
Parameter(
name="numParts",
value="{{workflow.parameters.numParts}}",
)
],
),
name="split",
template="split",
)
Task(
with_param="{{tasks.split.outputs.result}}",
arguments=Arguments(
artifacts=[
Artifact(
name="part",
s3=S3Artifact(
key="{{workflow.name}}/parts/{{item}}.json",
),
)
],
parameters=[
Parameter(
name="partId",
value="{{item}}",
)
],
),
name="map",
template="map",
depends="split",
)
Task(
name="reduce",
template="reduce",
depends="map",
)
Script(
inputs=Inputs(
parameters=[
Parameter(
name="numParts",
)
],
),
name="split",
outputs=Outputs(
artifacts=[
Artifact(
archive=ArchiveStrategy(
none=NoneStrategy(),
),
name="parts",
path="/mnt/out",
s3=S3Artifact(
key="{{workflow.name}}/parts/",
),
)
],
),
command=["python"],
image="python:alpine3.6",
source='import json\nimport os\nimport sys\nos.mkdir("/mnt/out")\npartIds = list(map(lambda x: str(x), range({{inputs.parameters.numParts}})))\nfor i, partId in enumerate(partIds, start=1):\n with open("/mnt/out/" + partId + ".json", "w") as f:\n json.dump({"foo": i}, f)\njson.dump(partIds, sys.stdout)\n',
)
Script(
inputs=Inputs(
artifacts=[
Artifact(
name="part",
path="/mnt/in/part.json",
)
],
parameters=[
Parameter(
name="partId",
)
],
),
name="map",
outputs=Outputs(
artifacts=[
Artifact(
archive=ArchiveStrategy(
none=NoneStrategy(),
),
name="part",
path="/mnt/out/part.json",
s3=S3Artifact(
key="{{workflow.name}}/results/{{inputs.parameters.partId}}.json",
),
)
],
),
command=["python"],
image="python:alpine3.6",
source='import json\nimport os\nimport sys\nos.mkdir("/mnt/out")\nwith open("/mnt/in/part.json") as f:\n part = json.load(f)\nwith open("/mnt/out/part.json", "w") as f:\n json.dump({"bar": part["foo"] * 2}, f)\n',
)
Script(
inputs=Inputs(
artifacts=[
Artifact(
name="results",
path="/mnt/in",
s3=S3Artifact(
key="{{workflow.name}}/results",
),
)
],
),
name="reduce",
outputs=Outputs(
artifacts=[
Artifact(
archive=ArchiveStrategy(
none=NoneStrategy(),
),
name="total",
path="/mnt/out/total.json",
s3=S3Artifact(
key="{{workflow.name}}/total.json",
),
)
],
),
command=["python"],
image="python:alpine3.6",
source='import json\nimport os\nimport sys\ntotal = 0\nos.mkdir("/mnt/out")\nfor f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):\n result = json.load(f)\n total = total + result["bar"]\nwith open("/mnt/out/total.json" , "w") as f:\n json.dump({"total": total}, f)\n',
)