Skip to content

Task

The hera.workflows.task module provides the Task and TaskResult classes.

See https://argoproj.github.io/argo-workflows/walk-through/dag for more on using Tasks within a DAG.

Task

Task is used to run a given template within a DAG. Must be instantiated under a DAG context.

Source code in src/hera/workflows/task.py
@dataclass(kw_only=True)
class Task(
    TemplateInvocatorSubNodeMixin,
    ArgumentsMixin,
    WithParamMixin,
    WithItemsMixin,
):
    """Task is used to run a given template within a DAG. Must be instantiated under a DAG context."""

    dependencies: Optional[List[str]] = None
    depends: Optional[str] = None

    def _get_dependency_tasks(self) -> List[str]:
        if self.depends is None:
            return []

        # filter out operators
        all_operators = [o for o in Operator]
        tasks = [t for t in self.depends.split() if t not in all_operators]

        # remove dot suffixes
        task_names = [t.split(".")[0] for t in tasks]
        return task_names

    @property
    def _subtype(self) -> str:
        return "tasks"

    def next(
        self,
        other: Task,
        operator: Optional[Operator] = None,
        on: OnType = None,
    ) -> Task:
        """Set self as a dependency of `other`."""
        operator = operator or _default_next_operator.get()
        on_list = _normalise_on(on, _default_next_on.get())

        # Build condition string:
        # - If multiple on-conditions: OR them and wrap in parens
        # - If single on-condition: A.succeeded
        # - If none: just "A"
        if on_list and len(on_list) > 1:
            condition_str = " || ".join(f"{self.name}.{c.value}" for c in on_list)
            condition_str = f"({condition_str})"
        elif on_list and len(on_list) == 1:
            condition_str = f"{self.name}.{on_list[0].value}"
        else:
            condition_str = self.name

        if other.depends is None:
            # First dependency
            other.depends = condition_str
        elif self.name in other._get_dependency_tasks():
            raise ValueError(f"{self.name} already in {other.name}'s depends: {other.depends}")
        else:
            # Add follow-up dependency
            other.depends += f" {operator} {condition_str}"

        return other

    @classmethod
    @contextmanager
    def set_next_defaults(
        cls,
        operator: Optional[Operator] = None,
        on: Union[TaskResult, Iterable[TaskResult], None] = None,
    ):
        """Temporarily modify the default behaviour of `next` and `>>`."""
        on_list = _normalise_on(on)

        operator_token = _default_next_operator.set(operator or _default_next_operator.get())
        on_token = _default_next_on.set(on_list)
        try:
            yield
        finally:
            _default_next_operator.reset(operator_token)
            _default_next_on.reset(on_token)

    def __rrshift__(self, other: List[Union[Task, str]]) -> Task:
        """Set `other` as a dependency self."""
        assert isinstance(other, list), f"Unknown type {type(other)} specified using reverse right bitshift operator"
        for o in other:
            if isinstance(o, Task):
                o.next(self)
            else:
                assert isinstance(o, str), (
                    f"Unknown list item type {type(o)} specified using reverse right bitshift operator"
                )
                if self.depends is None:
                    self.depends = o
                else:
                    self.depends += f" && {o}"
        return self

    def __rshift__(self, other: Union[Task, List[Task]]) -> Union[Task, List[Task]]:
        """Set self as a dependency of `other` which can be a single Task or list of Tasks."""
        if isinstance(other, Task):
            return self.next(other)
        elif isinstance(other, list):
            for o in other:
                assert isinstance(o, Task), (
                    f"Unknown list item type {type(o)} specified using right bitshift operator `>>`"
                )
                self.next(o)
            return other
        raise ValueError(f"Unknown type {type(other)} provided to `__rshift__`")

    def __or__(self, other: Union[Task, str]) -> str:
        """Return a condition of `self || other`."""
        if isinstance(other, Task):
            return f"({self.name} || {other.name})"
        assert isinstance(other, str), f"Unknown type {type(other)} specified using `|` operator"
        return f"{self.name} || {other}"

    def on_workflow_status(self, status: WorkflowStatus, op: Operator = Operator.equals) -> Task:
        """Sets the current task to run when the workflow finishes with the specified status."""
        expression = f"{{{{workflow.status}}}} {op} {status}"
        if self.when:
            self.when += f" {Operator.and_} {expression}"
        else:
            self.when = expression
        return self

    def on_success(self, other: Task) -> Task:
        """Sets the current task to run when the given `other` task succeeds."""
        return self.next(other, on=TaskResult.succeeded)

    def on_failure(self, other: Task) -> Task:
        """Sets the current task to run when the given `other` task fails."""
        return self.next(other, on=TaskResult.failed)

    def on_error(self, other: Task) -> Task:
        """Sets the current task to run when the given `other` task errors."""
        return self.next(other, on=TaskResult.errored)

    def on_other_result(self, other: Task, value: str, operator: Operator = Operator.equals) -> Task:
        """Sets the current task to run when the given `other` task results in the specified `value` result."""
        expression = f"{other.result} {operator} {value}"
        if self.when:
            self.when += f" {Operator.and_} {expression}"
        else:
            self.when = expression
        other.next(self)
        return self

    def when_any_succeeded(self, other: Task) -> Task:
        """Sets the current task to run when the given `other` task succeedds."""
        assert (self.with_param is not None) or (self.with_sequence is not None), (
            "Can only use `when_all_failed` when using `with_param` or `with_sequence`"
        )

        return self.next(other, on=TaskResult.any_succeeded)

    def when_all_failed(self, other: Task) -> Task:
        """Sets the current task to run when the given `other` task has failed."""
        assert (self.with_param is not None) or (self.with_sequence is not None), (
            "Can only use `when_all_failed` when using `with_param` or `with_sequence`"
        )

        return self.next(other, on=TaskResult.all_failed)

    def _build_dag_task(self) -> _ModelDAGTask:
        _template = None
        if isinstance(self.template, str):
            _template = self.template
        elif isinstance(self.template, (Template, TemplateMixin)):
            _template = self.template.name

        _inline = None
        if isinstance(self.inline, Template):
            _inline = self.inline
        elif isinstance(self.inline, Templatable):
            _inline = self.inline._build_template()

        return _ModelDAGTask(
            arguments=self._build_arguments(),
            continue_on=self.continue_on,
            dependencies=self.dependencies,
            depends=self.depends,
            hooks=self.hooks,
            inline=_inline,
            name=self.name,
            on_exit=self._build_on_exit(),
            template=_template,
            template_ref=self.template_ref,
            when=self.when,
            with_items=self._build_with_items(),
            with_param=self._build_with_param(),
            with_sequence=self.with_sequence,
        )

arguments

arguments: ArgumentsT = None

continue_on

continue_on: Optional[ContinueOn] = None

dependencies

dependencies: Optional[List[str]] = None

depends

depends: Optional[str] = None

exit_code

exit_code: str

ExitCode holds the exit code of a script template.

finished_at

finished_at: str

Time at which this node completed.

hooks

hooks: Optional[Dict[str, LifecycleHook]] = None

id

id: str

ID of this node.

inline

inline: Optional[Union[Template, Templatable]] = None

ip

ip: str

IP of this node.

name

name: str

on_exit

on_exit: Optional[Union[str, Templatable]] = None

result

result: str

Result holds the result (stdout) of a script template.

started_at

started_at: str

Time at which this node started.

status

status: str

Status of this node.

template

template: Optional[Union[str, Template, Templatable]] = None

template_ref

template_ref: Optional[TemplateRef] = None

when

when: Optional[str] = None

with_items

with_items: Optional[OneOrMany[Any]] = None

with_param

with_param: Optional[Any] = None

with_sequence

with_sequence: Optional[Sequence] = None

get_artifact

get_artifact(name: str) -> Artifact

Gets an artifact from the outputs of this subnode.

Source code in src/hera/workflows/_mixins.py
def get_artifact(self, name: str) -> Artifact:
    """Gets an artifact from the outputs of this subnode."""
    return self._get_artifact(name=name, subtype=self._subtype)

get_outputs_as_arguments

get_outputs_as_arguments() -> List[
    Union[Parameter, Artifact]
]

Get all output parameters and artifacts as a combined list from this task/step for use as arguments.

This is useful for when all the inputs of another template match all the outputs of this template. It is also possible to combine the outputs of multiple templates if they collectively match the inputs of another template.

Source code in src/hera/workflows/_mixins.py
def get_outputs_as_arguments(self) -> List[Union[Parameter, Artifact]]:
    """Get all output parameters and artifacts as a combined list from this task/step for use as arguments.

    This is useful for when all the inputs of another template match all the outputs of this template. It
    is also possible to combine the outputs of multiple templates if they collectively match the inputs of
    another template.
    """
    if isinstance(self.template, str):
        raise ValueError(f"Cannot get outputs when the template was set via a name: {self.template}")

    # here, we build the template early to verify that we can get the outputs
    if isinstance(self.template, Templatable):
        template = self.template._build_template()
    elif isinstance(self.template, Template):
        template = self.template
    else:
        raise ValueError("Only 'template' is supported (not inline or template_ref)")

    if template.outputs is None:
        raise ValueError(f"Template '{template.name}' has no outputs")

    parameters = [self.get_parameter(p.name) for p in template.outputs.parameters or []]
    artifacts = [self.get_artifact(art.name) for art in template.outputs.artifacts or []]

    result = parameters + artifacts
    if not result:
        raise ValueError(f"Template '{template.name}' has no outputs")

    return result

get_parameter

get_parameter(name: str) -> Parameter

Gets a parameter from the outputs of this subnode.

Source code in src/hera/workflows/_mixins.py
def get_parameter(self, name: str) -> Parameter:
    """Gets a parameter from the outputs of this subnode."""
    return self._get_parameter(name=name, subtype=self._subtype)

get_parameters_as

get_parameters_as(name: str) -> Parameter

Returns a Parameter that represents all the outputs of this subnode.

Parameters

name: str The name of the parameter to search for.

Returns:

Parameter The parameter, named based on the given name, along with a value that references all outputs.

Source code in src/hera/workflows/_mixins.py
def get_parameters_as(self, name: str) -> Parameter:
    """Returns a `Parameter` that represents all the outputs of this subnode.

    Parameters
    ----------
    name: str
        The name of the parameter to search for.

    Returns:
    -------
    Parameter
        The parameter, named based on the given `name`, along with a value that references all outputs.
    """
    return self._get_parameters_as(name=name, subtype=self._subtype)

get_result_as

get_result_as(name: str) -> Parameter

Returns a Parameter specification with the given name containing the results of self.

Source code in src/hera/workflows/_mixins.py
def get_result_as(self, name: str) -> Parameter:
    """Returns a `Parameter` specification with the given name containing the `results` of `self`."""
    return Parameter(name=name, value=self.result)

next

next(
    other: Task,
    operator: Optional[Operator] = None,
    on: OnType = None,
) -> Task

Set self as a dependency of other.

Source code in src/hera/workflows/task.py
def next(
    self,
    other: Task,
    operator: Optional[Operator] = None,
    on: OnType = None,
) -> Task:
    """Set self as a dependency of `other`."""
    operator = operator or _default_next_operator.get()
    on_list = _normalise_on(on, _default_next_on.get())

    # Build condition string:
    # - If multiple on-conditions: OR them and wrap in parens
    # - If single on-condition: A.succeeded
    # - If none: just "A"
    if on_list and len(on_list) > 1:
        condition_str = " || ".join(f"{self.name}.{c.value}" for c in on_list)
        condition_str = f"({condition_str})"
    elif on_list and len(on_list) == 1:
        condition_str = f"{self.name}.{on_list[0].value}"
    else:
        condition_str = self.name

    if other.depends is None:
        # First dependency
        other.depends = condition_str
    elif self.name in other._get_dependency_tasks():
        raise ValueError(f"{self.name} already in {other.name}'s depends: {other.depends}")
    else:
        # Add follow-up dependency
        other.depends += f" {operator} {condition_str}"

    return other

on_error

on_error(other: Task) -> Task

Sets the current task to run when the given other task errors.

Source code in src/hera/workflows/task.py
def on_error(self, other: Task) -> Task:
    """Sets the current task to run when the given `other` task errors."""
    return self.next(other, on=TaskResult.errored)

on_failure

on_failure(other: Task) -> Task

Sets the current task to run when the given other task fails.

Source code in src/hera/workflows/task.py
def on_failure(self, other: Task) -> Task:
    """Sets the current task to run when the given `other` task fails."""
    return self.next(other, on=TaskResult.failed)

on_other_result

on_other_result(
    other: Task, value: str, operator: Operator = equals
) -> Task

Sets the current task to run when the given other task results in the specified value result.

Source code in src/hera/workflows/task.py
def on_other_result(self, other: Task, value: str, operator: Operator = Operator.equals) -> Task:
    """Sets the current task to run when the given `other` task results in the specified `value` result."""
    expression = f"{other.result} {operator} {value}"
    if self.when:
        self.when += f" {Operator.and_} {expression}"
    else:
        self.when = expression
    other.next(self)
    return self

on_success

on_success(other: Task) -> Task

Sets the current task to run when the given other task succeeds.

Source code in src/hera/workflows/task.py
def on_success(self, other: Task) -> Task:
    """Sets the current task to run when the given `other` task succeeds."""
    return self.next(other, on=TaskResult.succeeded)

on_workflow_status

on_workflow_status(
    status: WorkflowStatus, op: Operator = equals
) -> Task

Sets the current task to run when the workflow finishes with the specified status.

Source code in src/hera/workflows/task.py
def on_workflow_status(self, status: WorkflowStatus, op: Operator = Operator.equals) -> Task:
    """Sets the current task to run when the workflow finishes with the specified status."""
    expression = f"{{{{workflow.status}}}} {op} {status}"
    if self.when:
        self.when += f" {Operator.and_} {expression}"
    else:
        self.when = expression
    return self

set_next_defaults

set_next_defaults(
    operator: Optional[Operator] = None,
    on: Union[
        TaskResult, Iterable[TaskResult], None
    ] = None,
)

Temporarily modify the default behaviour of next and >>.

Source code in src/hera/workflows/task.py
@classmethod
@contextmanager
def set_next_defaults(
    cls,
    operator: Optional[Operator] = None,
    on: Union[TaskResult, Iterable[TaskResult], None] = None,
):
    """Temporarily modify the default behaviour of `next` and `>>`."""
    on_list = _normalise_on(on)

    operator_token = _default_next_operator.set(operator or _default_next_operator.get())
    on_token = _default_next_on.set(on_list)
    try:
        yield
    finally:
        _default_next_operator.reset(operator_token)
        _default_next_on.reset(on_token)

when_all_failed

when_all_failed(other: Task) -> Task

Sets the current task to run when the given other task has failed.

Source code in src/hera/workflows/task.py
def when_all_failed(self, other: Task) -> Task:
    """Sets the current task to run when the given `other` task has failed."""
    assert (self.with_param is not None) or (self.with_sequence is not None), (
        "Can only use `when_all_failed` when using `with_param` or `with_sequence`"
    )

    return self.next(other, on=TaskResult.all_failed)

when_any_succeeded

when_any_succeeded(other: Task) -> Task

Sets the current task to run when the given other task succeedds.

Source code in src/hera/workflows/task.py
def when_any_succeeded(self, other: Task) -> Task:
    """Sets the current task to run when the given `other` task succeedds."""
    assert (self.with_param is not None) or (self.with_sequence is not None), (
        "Can only use `when_all_failed` when using `with_param` or `with_sequence`"
    )

    return self.next(other, on=TaskResult.any_succeeded)

TaskResult

The enumeration of Task Results.

See Also

Argo Depends Docs

Source code in src/hera/workflows/task.py
class TaskResult(Enum):
    """The enumeration of Task Results.

    See Also:
        [Argo Depends Docs](https://argoproj.github.io/argo-workflows/enhanced-depends-logic/#depends)
    """

    failed = "Failed"
    succeeded = "Succeeded"
    errored = "Errored"
    skipped = "Skipped"
    omitted = "Omitted"
    daemoned = "Daemoned"
    any_succeeded = "AnySucceeded"
    all_failed = "AllFailed"

    def __or__(self, other: TaskResult) -> _TaskResultGroup:
        """Create an "or" condition over multiple TaskResults."""
        return _TaskResultGroup([self, other])

all_failed

all_failed = 'AllFailed'

any_succeeded

any_succeeded = 'AnySucceeded'

daemoned

daemoned = 'Daemoned'

errored

errored = 'Errored'

failed

failed = 'Failed'

omitted

omitted = 'Omitted'

skipped

skipped = 'Skipped'

succeeded

succeeded = 'Succeeded'

Comments