This example showcases how clients can use Hera to dynamically generate tasks that process outputs from one task in
parallel. This is useful for batch jobs and instances where clients do not know ahead of time how many tasks/entities
they may need to process. In addition to the fanout, this example showcases how one can set up extra parameters for
the job to dictate what the fanout should execute over.
fromhera.workflowsimportDAG,Workflow,script@script()defgenerate():importjsonimportsys# this can be anything! e.g fetch from some API, then in parallel process all entities; chunk database records# and process them in parallel, etc.json.dump([iforiinrange(10)],sys.stdout)@script()defconsume(value:int,extra_param1:str,extra_param2:int=42):print("Received value={value}, extra_param1={extra_param1}, extra_param2={extra_param2}!".format(value=value,extra_param1=extra_param1,extra_param2=extra_param2,))# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submittedwithWorkflow(generate_name="dynamic-fanout-",entrypoint="d")asw:withDAG(name="d"):g=generate()# the following fanout will occur over the items in the list that is returned from the generate script# the `extra_param1` will take the `hello world` value while `extra_param2` will hold the default value of 42c1=consume(name="c1",with_param=g.result,arguments={"value":"{{item}}","extra_param1":"hello world"})# the following fanout will occur over the items in the list that is returned from the generate script# the `extra_param1` will take the `hello world` value while `extra_param2` will hold the default value of 123c2=consume(name="c2",with_param=g.result,arguments={"value":"{{item}}","extra_param1":"hello world","extra_param2":"123"},)g>>c1g>>c2