Skip to content

Steps

The hera.workflows.steps module provides the Steps, Step and Parallel classes.

See https://argoproj.github.io/argo-workflows/walk-through/steps for more on Steps.

Step

A step runs a given template.

It must be instantiated under a Steps or Parallel context, or outside a Workflow.

Source code in src/hera/workflows/steps.py
@dataclass(kw_only=True)
class Step(
    TemplateInvocatorSubNodeMixin,
    ArgumentsMixin,
    WithParamMixin,
    WithItemsMixin,
):
    """A step runs a given template.

    It must be instantiated under a Steps or Parallel context, or outside a Workflow.
    """

    @property
    def _subtype(self) -> str:
        return "steps"

    def _build_as_workflow_step(self) -> _ModelWorkflowStep:
        _template = None
        if isinstance(self.template, str):
            _template = self.template
        elif isinstance(self.template, (_ModelTemplate, TemplateMixin)):
            _template = self.template.name

        _inline = None
        if isinstance(self.inline, _ModelTemplate):
            _inline = self.inline
        elif isinstance(self.inline, Templatable):
            _inline = self.inline._build_template()

        return _ModelWorkflowStep(
            arguments=self._build_arguments(),
            continue_on=self.continue_on,
            hooks=self.hooks,
            inline=_inline,
            name=self.name,
            on_exit=self._build_on_exit(),
            template=_template,
            template_ref=self.template_ref,
            when=self.when,
            with_items=self._build_with_items(),
            with_param=self._build_with_param(),
            with_sequence=self.with_sequence,
        )

    def _build_step(
        self,
    ) -> List[_ModelWorkflowStep]:
        return [self._build_as_workflow_step()]

arguments

arguments: ArgumentsT = None

continue_on

continue_on: Optional[ContinueOn] = None

exit_code

exit_code: str

ExitCode holds the exit code of a script template.

finished_at

finished_at: str

Time at which this node completed.

hooks

hooks: Optional[Dict[str, LifecycleHook]] = None

id

id: str

ID of this node.

inline

inline: Optional[Union[Template, Templatable]] = None

ip

ip: str

IP of this node.

name

name: str

on_exit

on_exit: Optional[Union[str, Templatable]] = None

result

result: str

Result holds the result (stdout) of a script template.

started_at

started_at: str

Time at which this node started.

status

status: str

Status of this node.

template

template: Optional[Union[str, Template, Templatable]] = None

template_ref

template_ref: Optional[TemplateRef] = None

when

when: Optional[str] = None

with_items

with_items: Optional[OneOrMany[Any]] = None

with_param

with_param: Optional[Any] = None

with_sequence

with_sequence: Optional[Sequence] = None

get_artifact

get_artifact(name: str) -> Artifact

Gets an artifact from the outputs of this subnode.

Source code in src/hera/workflows/_mixins.py
def get_artifact(self, name: str) -> Artifact:
    """Gets an artifact from the outputs of this subnode."""
    return self._get_artifact(name=name, subtype=self._subtype)

get_outputs_as_arguments

get_outputs_as_arguments() -> List[
    Union[Parameter, Artifact]
]

Get all output parameters and artifacts as a combined list from this task/step for use as arguments.

This is useful for when all the inputs of another template match all the outputs of this template. It is also possible to combine the outputs of multiple templates if they collectively match the inputs of another template.

Source code in src/hera/workflows/_mixins.py
def get_outputs_as_arguments(self) -> List[Union[Parameter, Artifact]]:
    """Get all output parameters and artifacts as a combined list from this task/step for use as arguments.

    This is useful for when all the inputs of another template match all the outputs of this template. It
    is also possible to combine the outputs of multiple templates if they collectively match the inputs of
    another template.
    """
    if isinstance(self.template, str):
        raise ValueError(f"Cannot get outputs when the template was set via a name: {self.template}")

    # here, we build the template early to verify that we can get the outputs
    if isinstance(self.template, Templatable):
        template = self.template._build_template()
    elif isinstance(self.template, Template):
        template = self.template
    else:
        raise ValueError("Only 'template' is supported (not inline or template_ref)")

    if template.outputs is None:
        raise ValueError(f"Template '{template.name}' has no outputs")

    parameters = [self.get_parameter(p.name) for p in template.outputs.parameters or []]
    artifacts = [self.get_artifact(art.name) for art in template.outputs.artifacts or []]

    result = parameters + artifacts
    if not result:
        raise ValueError(f"Template '{template.name}' has no outputs")

    return result

get_parameter

get_parameter(name: str) -> Parameter

Gets a parameter from the outputs of this subnode.

Source code in src/hera/workflows/_mixins.py
def get_parameter(self, name: str) -> Parameter:
    """Gets a parameter from the outputs of this subnode."""
    return self._get_parameter(name=name, subtype=self._subtype)

get_parameters_as

get_parameters_as(name: str) -> Parameter

Returns a Parameter that represents all the outputs of this subnode.

Parameters

name: str The name of the parameter to search for.

Returns:

Parameter The parameter, named based on the given name, along with a value that references all outputs.

Source code in src/hera/workflows/_mixins.py
def get_parameters_as(self, name: str) -> Parameter:
    """Returns a `Parameter` that represents all the outputs of this subnode.

    Parameters
    ----------
    name: str
        The name of the parameter to search for.

    Returns:
    -------
    Parameter
        The parameter, named based on the given `name`, along with a value that references all outputs.
    """
    return self._get_parameters_as(name=name, subtype=self._subtype)

get_result_as

get_result_as(name: str) -> Parameter

Returns a Parameter specification with the given name containing the results of self.

Source code in src/hera/workflows/_mixins.py
def get_result_as(self, name: str) -> Parameter:
    """Returns a `Parameter` specification with the given name containing the `results` of `self`."""
    return Parameter(name=name, value=self.result)

Steps

A Steps template invocator is used to define a sequence of steps which can run sequentially or in parallel.

Steps implements the contextmanager interface so allows usage of with, under which any hera.workflows.steps.Step objects instantiated will be added to the Steps’ list of sub_steps.

  • Step and Parallel objects initialised within a Steps context will be added to the list of sub_steps in the order they are initialised.
  • All Step objects initialised within a Parallel context will run in parallel.
Source code in src/hera/workflows/steps.py
@dataclass(kw_only=True)
class Steps(
    IOMixin,
    TemplateMixin,
    CallableTemplateMixin,
    ContextMixin,
):
    """A Steps template invocator is used to define a sequence of steps which can run sequentially or in parallel.

    Steps implements the contextmanager interface so allows usage of `with`, under which any
    `hera.workflows.steps.Step` objects instantiated will be added to the Steps' list of sub_steps.

    * Step and Parallel objects initialised within a Steps context will be added to the list of sub_steps
    in the order they are initialised.
    * All Step objects initialised within a Parallel context will run in parallel.
    """

    _node_names: Set[str] = field(default_factory=set)

    sub_steps: List[
        Union[
            Step,
            Parallel,
            List[Step],
            _ModelWorkflowStep,
            List[_ModelWorkflowStep],
        ]
    ] = field(default_factory=list)

    def _build_steps(self) -> Optional[List[ParallelSteps]]:
        steps = []
        for workflow_step in self.sub_steps:
            if isinstance(workflow_step, Steppable):
                steps.append(ParallelSteps(root=workflow_step._build_step()))
            elif isinstance(workflow_step, _ModelWorkflowStep):
                steps.append(ParallelSteps(root=[workflow_step]))
            elif isinstance(workflow_step, List):
                substeps = []
                for s in workflow_step:
                    if isinstance(s, Step):
                        substeps.append(s._build_as_workflow_step())
                    elif isinstance(s, _ModelWorkflowStep):
                        substeps.append(s)
                    else:
                        raise InvalidType(type(s))
                steps.append(ParallelSteps(root=substeps))
            else:
                raise InvalidType(type(workflow_step))

        return steps or None

    def _add_sub(self, node: Any):
        if isinstance(node, Templatable):
            from hera.workflows.workflow import Workflow

            # We must be under a workflow context due to checks in _HeraContext.add_sub_node
            assert _context.pieces and isinstance(_context.pieces[0], Workflow)
            _context.pieces[0]._add_sub(node)
            return

        if not isinstance(node, (Step, Parallel)):
            raise InvalidType(type(node))
        if isinstance(node, Step):
            if node.name in self._node_names:
                raise NodeNameConflict(f"Found multiple Step nodes with name: {node.name}")
            self._node_names.add(node.name)
        if isinstance(node, Parallel):
            node._node_names = self._node_names
        self.sub_steps.append(node)

    def parallel(self) -> Parallel:
        """Returns a Parallel object which can be used in a sub-context manager."""
        return Parallel()

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=IntOrString(root=self.active_deadline_seconds)
            if self.active_deadline_seconds
            else None,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            container=None,
            container_set=None,
            daemon=self.daemon,
            dag=None,
            data=None,
            executor=self.executor,
            fail_fast=self.fail_fast,
            http=None,
            host_aliases=self.host_aliases,
            init_containers=self._build_init_containers(),
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self._build_metrics(),
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            parallelism=self.parallelism,
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority_class_name=self.priority_class_name,
            resource=None,
            retry_strategy=self._build_retry_strategy(),
            scheduler_name=self.scheduler_name,
            script=None,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self._build_sidecars(),
            steps=self._build_steps(),
            suspend=None,
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

active_deadline_seconds

active_deadline_seconds: Optional[int | str] = None

affinity

affinity: Optional[Affinity] = None

annotations

annotations: Optional[Dict[str, str]] = None

archive_location

archive_location: Optional[ArtifactLocation] = None

automount_service_account_token

automount_service_account_token: Optional[bool] = None

daemon

daemon: Optional[bool] = None

executor

executor: Optional[ExecutorConfig] = None

fail_fast

fail_fast: Optional[bool] = None

host_aliases

host_aliases: Optional[List[HostAlias]] = None

init_containers

init_containers: Optional[
    List[Union[UserContainer, UserContainer]]
] = None

inputs

inputs: InputsT = None

labels

labels: Optional[Dict[str, str]] = None

memoize

memoize: Optional[Memoize] = None

metrics

metrics: Optional[MetricsT] = None

name

name: Optional[str] = None

node_selector

node_selector: Optional[Dict[str, str]] = None

outputs

outputs: OutputsT = None

parallelism

parallelism: Optional[int] = None

plugin

plugin: Optional[Plugin] = None

pod_security_context

pod_security_context: Optional[PodSecurityContext] = None

pod_spec_patch

pod_spec_patch: Optional[str] = None

priority_class_name

priority_class_name: Optional[str] = None

retry_strategy

retry_strategy: Optional[
    Union[RetryStrategy, RetryStrategy]
] = None

scheduler_name

scheduler_name: Optional[str] = None

service_account_name

service_account_name: Optional[str] = None

sidecars

sidecars: Optional[
    OneOrMany[UserContainer | UserContainer]
] = None

sub_steps

sub_steps: List[
    Union[
        Step,
        Parallel,
        List[Step],
        WorkflowStep,
        List[WorkflowStep],
    ]
] = field(default_factory=list)

synchronization

synchronization: Optional[Synchronization] = None

timeout

timeout: Optional[str] = None

tolerations

tolerations: Optional[List[Toleration]] = None

get_artifact

get_artifact(name: str) -> Artifact

Finds and returns the artifact with the supplied name.

Note that this method will raise an error if the artifact is not found.

Parameters:

Name Type Description Default
name str

name of the input artifact to find and return.

required

Returns:

Name Type Description
Artifact Artifact

the artifact with the supplied name.

Raises:

Type Description
KeyError

if the artifact is not found.

Source code in src/hera/workflows/_mixins.py
def get_artifact(self, name: str) -> Artifact:
    """Finds and returns the artifact with the supplied name.

    Note that this method will raise an error if the artifact is not found.

    Args:
        name: name of the input artifact to find and return.

    Returns:
        Artifact: the artifact with the supplied name.

    Raises:
        KeyError: if the artifact is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Artifact {name} not found.")
    if inputs.artifacts is None:
        raise KeyError(f"No artifacts set. Artifact {name} not found.")
    for artifact in inputs.artifacts:
        if artifact.name == name:
            return Artifact(name=name, from_=f"{{{{inputs.artifacts.{artifact.name}}}}}")
    raise KeyError(f"Artifact {name} not found.")

get_parameter

get_parameter(name: str) -> Parameter

Finds and returns the parameter with the supplied name.

Note that this method will raise an error if the parameter is not found.

Parameters:

Name Type Description Default
name str

name of the input parameter to find and return.

required

Returns:

Name Type Description
Parameter Parameter

the parameter with the supplied name.

Raises:

Type Description
KeyError

if the parameter is not found.

Source code in src/hera/workflows/_mixins.py
def get_parameter(self, name: str) -> Parameter:
    """Finds and returns the parameter with the supplied name.

    Note that this method will raise an error if the parameter is not found.

    Args:
        name: name of the input parameter to find and return.

    Returns:
        Parameter: the parameter with the supplied name.

    Raises:
        KeyError: if the parameter is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Parameter {name} not found.")
    if inputs.parameters is None:
        raise KeyError(f"No parameters set. Parameter {name} not found.")
    for p in inputs.parameters:
        if p.name == name:
            param = Parameter.from_model(p)
            param.value = f"{{{{inputs.parameters.{param.name}}}}}"
            return param
    raise KeyError(f"Parameter {name} not found.")

parallel

parallel() -> Parallel

Returns a Parallel object which can be used in a sub-context manager.

Source code in src/hera/workflows/steps.py
def parallel(self) -> Parallel:
    """Returns a Parallel object which can be used in a sub-context manager."""
    return Parallel()

Parallel

Parallel is a context manager used to create a list of steps to run in parallel.

Parallel implements the context manager interface so allows usage of with, under which any hera.workflows.steps.Step objects instantiated will be added to Parallel’s list of sub_steps.

Source code in src/hera/workflows/steps.py
@dataclass(kw_only=True)
class Parallel(
    SubNodeMixin,
    ContextMixin,
):
    """Parallel is a context manager used to create a list of steps to run in parallel.

    Parallel implements the context manager interface so allows usage of `with`, under which any
    `hera.workflows.steps.Step` objects instantiated will be added to Parallel's list of sub_steps.
    """

    sub_steps: List[Union[Step, _ModelWorkflowStep]] = field(default_factory=list)

    _node_names: Set[str] = field(default_factory=set)

    def _add_sub(self, node: Any):
        if isinstance(node, Templatable):
            from hera.workflows.workflow import Workflow

            # We must be under a workflow context due to checks in _HeraContext.add_sub_node
            assert _context.pieces and isinstance(_context.pieces[0], Workflow)
            _context.pieces[0]._add_sub(node)
            return

        if not isinstance(node, Step):
            raise InvalidType(type(node))
        if node.name in self._node_names:
            raise NodeNameConflict(f"Found multiple Steps named: {node.name}")
        self._node_names.add(node.name)
        self.sub_steps.append(node)

    def _build_step(self) -> List[_ModelWorkflowStep]:
        steps = []
        for step in self.sub_steps:
            if isinstance(step, Step):
                steps.append(step._build_as_workflow_step())
            elif isinstance(step, _ModelWorkflowStep):
                steps.append(step)
            else:
                raise InvalidType(type(step))
        return steps

sub_steps

sub_steps: List[Union[Step, WorkflowStep]] = field(
    default_factory=list
)

parallel

parallel()

Open a parallel context within a steps-decorator function.

When running locally, the context will be a no-op.

Source code in src/hera/workflows/steps.py
def parallel():
    """Open a parallel context within a steps-decorator function.

    When running locally, the context will be a no-op.
    """
    if _context.declaring:
        return Parallel()

    return DummyContext()

Comments