Skip to content

Suspend

The hera.workflows.suspend module provides the Suspend class.

The Suspend template in Hera provides a convenience wrapper around the “Intermediate Parameters” Argo feature.

Note

Suspend

The Suspend template allows the user to pause a workflow for a specified length of time.

The workflow can pause based on the given by duration or indefinitely (i.e. until manually resumed). The Suspend template also allows you to specify intermediate_parameters which will replicate the given parameters to the “inputs” and “outputs” of the template, resulting in a Suspend template that pauses and waits for values from the user for the given list of parameters.

Source code in src/hera/workflows/suspend.py
@dataclass(kw_only=True)
class Suspend(
    TemplateMixin,
    CallableTemplateMixin,
    IOMixin,
):
    """The Suspend template allows the user to pause a workflow for a specified length of time.

    The workflow can pause based on the given by `duration` or indefinitely (i.e. until manually resumed).
    The Suspend template also allows you to specify `intermediate_parameters` which will replicate the given
    parameters to the "inputs" and "outputs" of the template, resulting in a Suspend template that pauses and
    waits for values from the user for the given list of parameters.
    """

    duration: Optional[Union[int, str]] = None
    intermediate_parameters: List[Parameter] = field(default_factory=list)

    def _build_suspend_template(self) -> _ModelSuspendTemplate:
        return _ModelSuspendTemplate(
            duration=str(self.duration) if self.duration else None,
        )

    def _build_outputs(self) -> Optional[Outputs]:
        intermediate_param_outputs = []
        for param in self.intermediate_parameters:
            intermediate_param_outputs.append(
                Parameter(
                    name=param.name,
                    value_from=ValueFrom(supplied=SuppliedValueFrom()),
                    description=param.description,
                ).as_output()
            )

        all_outputs = super()._build_outputs()
        if all_outputs is not None:
            if all_outputs.parameters is None:
                all_outputs.parameters = []
            all_outputs.parameters.extend(intermediate_param_outputs)
            return all_outputs
        else:
            return Outputs(parameters=intermediate_param_outputs) if intermediate_param_outputs else None

    def _build_inputs(self) -> Optional[Inputs]:
        intermediate_param_inputs = []
        for param in self.intermediate_parameters:
            intermediate_param_inputs.append(param.as_input())

        all_inputs = super()._build_inputs()
        if all_inputs is not None:
            if all_inputs.parameters is None:
                all_inputs.parameters = []
            all_inputs.parameters.extend(intermediate_param_inputs)
            return all_inputs
        else:
            return Inputs(parameters=intermediate_param_inputs) if intermediate_param_inputs else None

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=IntOrString(root=self.active_deadline_seconds)
            if self.active_deadline_seconds
            else None,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self._build_init_containers(),
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            plugin=self.plugin,
            priority_class_name=self.priority_class_name,
            retry_strategy=self._build_retry_strategy(),
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self._build_sidecars(),
            suspend=self._build_suspend_template(),
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

active_deadline_seconds

active_deadline_seconds: Optional[int | str] = None

affinity

affinity: Optional[Affinity] = None

annotations

annotations: Optional[Dict[str, str]] = None

archive_location

archive_location: Optional[ArtifactLocation] = None

automount_service_account_token

automount_service_account_token: Optional[bool] = None

daemon

daemon: Optional[bool] = None

duration

duration: Optional[Union[int, str]] = None

executor

executor: Optional[ExecutorConfig] = None

fail_fast

fail_fast: Optional[bool] = None

host_aliases

host_aliases: Optional[List[HostAlias]] = None

init_containers

init_containers: Optional[
    List[Union[UserContainer, UserContainer]]
] = None

inputs

inputs: InputsT = None

intermediate_parameters

intermediate_parameters: List[Parameter] = field(
    default_factory=list
)

labels

labels: Optional[Dict[str, str]] = None

memoize

memoize: Optional[Memoize] = None

metrics

metrics: Optional[MetricsT] = None

name

name: Optional[str] = None

node_selector

node_selector: Optional[Dict[str, str]] = None

outputs

outputs: OutputsT = None

parallelism

parallelism: Optional[int] = None

plugin

plugin: Optional[Plugin] = None

pod_security_context

pod_security_context: Optional[PodSecurityContext] = None

pod_spec_patch

pod_spec_patch: Optional[str] = None

priority_class_name

priority_class_name: Optional[str] = None

retry_strategy

retry_strategy: Optional[
    Union[RetryStrategy, RetryStrategy]
] = None

scheduler_name

scheduler_name: Optional[str] = None

service_account_name

service_account_name: Optional[str] = None

sidecars

sidecars: Optional[
    OneOrMany[UserContainer | UserContainer]
] = None

synchronization

synchronization: Optional[Synchronization] = None

timeout

timeout: Optional[str] = None

tolerations

tolerations: Optional[List[Toleration]] = None

get_artifact

get_artifact(name: str) -> Artifact

Finds and returns the artifact with the supplied name.

Note that this method will raise an error if the artifact is not found.

Parameters:

Name Type Description Default
name str

name of the input artifact to find and return.

required

Returns:

Name Type Description
Artifact Artifact

the artifact with the supplied name.

Raises:

Type Description
KeyError

if the artifact is not found.

Source code in src/hera/workflows/_mixins.py
def get_artifact(self, name: str) -> Artifact:
    """Finds and returns the artifact with the supplied name.

    Note that this method will raise an error if the artifact is not found.

    Args:
        name: name of the input artifact to find and return.

    Returns:
        Artifact: the artifact with the supplied name.

    Raises:
        KeyError: if the artifact is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Artifact {name} not found.")
    if inputs.artifacts is None:
        raise KeyError(f"No artifacts set. Artifact {name} not found.")
    for artifact in inputs.artifacts:
        if artifact.name == name:
            return Artifact(name=name, from_=f"{{{{inputs.artifacts.{artifact.name}}}}}")
    raise KeyError(f"Artifact {name} not found.")

get_parameter

get_parameter(name: str) -> Parameter

Finds and returns the parameter with the supplied name.

Note that this method will raise an error if the parameter is not found.

Parameters:

Name Type Description Default
name str

name of the input parameter to find and return.

required

Returns:

Name Type Description
Parameter Parameter

the parameter with the supplied name.

Raises:

Type Description
KeyError

if the parameter is not found.

Source code in src/hera/workflows/_mixins.py
def get_parameter(self, name: str) -> Parameter:
    """Finds and returns the parameter with the supplied name.

    Note that this method will raise an error if the parameter is not found.

    Args:
        name: name of the input parameter to find and return.

    Returns:
        Parameter: the parameter with the supplied name.

    Raises:
        KeyError: if the parameter is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Parameter {name} not found.")
    if inputs.parameters is None:
        raise KeyError(f"No parameters set. Parameter {name} not found.")
    for p in inputs.parameters:
        if p.name == name:
            param = Parameter.from_model(p)
            param.value = f"{{{{inputs.parameters.{param.name}}}}}"
            return param
    raise KeyError(f"Parameter {name} not found.")

Comments