from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, S3Artifact, Workflow, script
@script(
image="python:alpine3.6",
outputs=S3Artifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
)
def split(num_parts: int) -> None:
import json
import os
import sys
os.mkdir("/mnt/out")
part_ids = list(map(lambda x: str(x), range(num_parts)))
for i, part_id in enumerate(part_ids, start=1):
with open("/mnt/out/" + part_id + ".json", "w") as f:
json.dump({"foo": i}, f)
json.dump(part_ids, sys.stdout)
@script(
image="python:alpine3.6",
inputs=[
Parameter(name="part_id"),
Artifact(name="part", path="/mnt/in/part.json"),
],
outputs=S3Artifact(
name="part",
path="/mnt/out/part.json",
archive=NoneArchiveStrategy(),
key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
),
)
def map_() -> None:
import json
import os
os.mkdir("/mnt/out")
with open("/mnt/in/part.json") as f:
part = json.load(f)
with open("/mnt/out/part.json", "w") as f:
json.dump({"bar": part["foo"] * 2}, f)
@script(
image="python:alpine3.6",
inputs=S3Artifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
outputs=S3Artifact(
name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
),
)
def reduce() -> None:
import json
import os
os.mkdir("/mnt/out")
total = 0
for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
result = json.load(f)
total = total + result["bar"]
with open("/mnt/out/total.json", "w") as f:
json.dump({"total": total}, f)
with Workflow(generate_name="map-reduce-", entrypoint="main", arguments=Parameter(name="num_parts", value="4")) as w:
with DAG(name="main"):
s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}"))
m = map_(
with_param=s.result,
arguments=[
Parameter(name="part_id", value="{{item}}"),
S3Artifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),
],
)
s >> m >> reduce()