Skip to content

Container set

The hera.workflows.container_set module provides Argo’s container set and container node.

ContainerNode

A regular container that can be used as part of a hera.workflows.ContainerSet.

See Also

Container Set Template

Source code in src/hera/workflows/container_set.py
@dataclass(kw_only=True)
class ContainerNode(ContainerMixin, VolumeMountMixin, ResourceMixin, EnvMixin, SubNodeMixin):
    """A regular container that can be used as part of a `hera.workflows.ContainerSet`.

    See Also:
        [Container Set Template](https://argoproj.github.io/argo-workflows/container-set-template/)
    """

    name: str
    args: Optional[List[str]] = None
    command: Optional[List[str]] = None
    dependencies: Optional[List[str]] = None
    lifecycle: Optional[Lifecycle] = None
    security_context: Optional[SecurityContext] = None
    working_dir: Optional[str] = None

    def next(self, other: ContainerNode) -> ContainerNode:
        """Sets the given container as a dependency of this container and returns the given container.

        Examples:
            >>> from hera.workflows import ContainerNode
            >>> a, b = ContainerNode(name="a"), ContainerNode(name="b")
            >>> a.next(b)
            >>> b.dependencies
            ['a']
        """
        assert issubclass(other.__class__, ContainerNode)
        if other.dependencies is None:
            other.dependencies = [self.name]
        else:
            other.dependencies.append(self.name)
        other.dependencies = sorted(list(set(other.dependencies)))
        return other

    def __rrshift__(self, other: List[ContainerNode]) -> ContainerNode:
        """Sets `self` as a dependent of the given list of other `hera.workflows.ContainerNode`.

        Practically, the `__rrshift__` allows us to express statements such as `[a, b, c] >> d`, where `d` is `self.`

        Examples:
            >>> from hera.workflows import ContainerNode
            >>> a, b, c = ContainerNode(name="a"), ContainerNode(name="b"), ContainerNode(name="c")
            >>> [a, b] >> c
            >>> c.dependencies
            ['a', 'b']
        """
        assert isinstance(other, list), f"Unknown type {type(other)} specified using reverse right bitshift operator"
        for o in other:
            o.next(self)
        return self

    def __rshift__(
        self, other: Union[ContainerNode, List[ContainerNode]]
    ) -> Union[ContainerNode, List[ContainerNode]]:
        """Sets the given container as a dependency of this container and returns the given container.

        Examples:
            >>> from hera.workflows import ContainerNode
            >>> a, b = ContainerNode(name="a"), ContainerNode(name="b")
            >>> a >> b
            >>> b.dependencies
            ['a']
        """
        if isinstance(other, ContainerNode):
            return self.next(other)
        elif isinstance(other, list):
            for o in other:
                assert isinstance(o, ContainerNode), (
                    f"Unknown list item type {type(o)} specified using right bitshift operator `>>`"
                )
                self.next(o)
            return other
        raise ValueError(f"Unknown type {type(other)} provided to `__rshift__`")

    def _build_container_node(self) -> _ModelContainerNode:
        """Builds the generated `ContainerNode`."""
        return _ModelContainerNode(
            args=self.args,
            command=self.command,
            dependencies=self.dependencies,
            env=self._build_env(),
            env_from=self._build_env_from(),
            image=self.image,
            image_pull_policy=self._build_image_pull_policy(),
            lifecycle=self.lifecycle,
            liveness_probe=self.liveness_probe,
            name=self.name,
            ports=self.ports,
            readiness_probe=self.readiness_probe,
            resize_policy=self.resize_policy,
            resources=self._build_resources(),
            restart_policy=self.restart_policy,
            security_context=self.security_context,
            startup_probe=self.startup_probe,
            stdin=self.stdin,
            stdin_once=self.stdin_once,
            termination_message_path=self.termination_message_path,
            termination_message_policy=self.termination_message_policy,
            tty=self.tty,
            volume_devices=self.volume_devices,
            volume_mounts=self._build_volume_mounts(),
            working_dir=self.working_dir,
        )

args

args: Optional[List[str]] = None

command

command: Optional[List[str]] = None

dependencies

dependencies: Optional[List[str]] = None

env

env: EnvT = None

env_from

env_from: EnvFromT = None

image

image: Optional[str] = None

image_pull_policy

image_pull_policy: Optional[Union[str, ImagePullPolicy]] = (
    None
)

lifecycle

lifecycle: Optional[Lifecycle] = None

liveness_probe

liveness_probe: Optional[Probe] = None

name

name: str

ports

ports: Optional[List[ContainerPort]] = None

readiness_probe

readiness_probe: Optional[Probe] = None

resize_policy

resize_policy: Optional[List[ContainerResizePolicy]] = None

resources

resources: Optional[
    Union[ResourceRequirements, Resources]
] = None

restart_policy

restart_policy: Optional[str] = None

security_context

security_context: Optional[SecurityContext] = None

startup_probe

startup_probe: Optional[Probe] = None

stdin

stdin: Optional[bool] = None

stdin_once

stdin_once: Optional[bool] = None

termination_message_path

termination_message_path: Optional[str] = None

termination_message_policy

termination_message_policy: Optional[str] = None

tty

tty: Optional[bool] = None

volume_devices

volume_devices: Optional[List[VolumeDevice]] = None

volume_mounts

volume_mounts: Optional[List[VolumeMount]] = None

volumes

volumes: Optional[VolumesT] = None

working_dir

working_dir: Optional[str] = None

next

next(other: ContainerNode) -> ContainerNode

Sets the given container as a dependency of this container and returns the given container.

Examples:

>>> from hera.workflows import ContainerNode
>>> a, b = ContainerNode(name="a"), ContainerNode(name="b")
>>> a.next(b)
>>> b.dependencies
['a']
Source code in src/hera/workflows/container_set.py
def next(self, other: ContainerNode) -> ContainerNode:
    """Sets the given container as a dependency of this container and returns the given container.

    Examples:
        >>> from hera.workflows import ContainerNode
        >>> a, b = ContainerNode(name="a"), ContainerNode(name="b")
        >>> a.next(b)
        >>> b.dependencies
        ['a']
    """
    assert issubclass(other.__class__, ContainerNode)
    if other.dependencies is None:
        other.dependencies = [self.name]
    else:
        other.dependencies.append(self.name)
    other.dependencies = sorted(list(set(other.dependencies)))
    return other

ContainerSet

ContainerSet is the implementation of a set of containers that can be run in parallel on Kubernetes.

The containers are run within the same pod.

Examples:

>>> with ContainerSet(...) as cs:
>>>     ContainerNode(...)
>>>     ContainerNode(...)
Source code in src/hera/workflows/container_set.py
@dataclass(kw_only=True)
class ContainerSet(
    EnvIOMixin,
    ContainerMixin,
    TemplateMixin,
    CallableTemplateMixin,
    VolumeMountMixin,
    ContextMixin,
):
    """`ContainerSet` is the implementation of a set of containers that can be run in parallel on Kubernetes.

    The containers are run within the same pod.

    Examples:
        >>> with ContainerSet(...) as cs:
        >>>     ContainerNode(...)
        >>>     ContainerNode(...)
    """

    containers: List[Union[ContainerNode, _ModelContainerNode]] = field(default_factory=list)
    container_set_retry_strategy: Optional[ContainerSetRetryStrategy] = None

    def _add_sub(self, node: Any):
        if not isinstance(node, ContainerNode):
            raise InvalidType(type(node))

        self.containers.append(node)

    def _build_container_set(self) -> _ModelContainerSetTemplate:
        """Builds the generated `ContainerSetTemplate`."""
        containers = [c._build_container_node() if isinstance(c, ContainerNode) else c for c in self.containers]
        return _ModelContainerSetTemplate(
            containers=containers,
            retry_strategy=self.container_set_retry_strategy,
            volume_mounts=self.volume_mounts,
        )

    def _build_template(self) -> _ModelTemplate:
        """Builds the generated `Template` representation of the container set."""
        return _ModelTemplate(
            active_deadline_seconds=IntOrString(root=self.active_deadline_seconds)
            if self.active_deadline_seconds
            else None,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            container_set=self._build_container_set(),
            daemon=self.daemon,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self._build_init_containers(),
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self._build_metrics(),
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority_class_name=self.priority_class_name,
            retry_strategy=self._build_retry_strategy(),
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self._build_sidecars(),
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
            volumes=self._build_volumes(),
        )

active_deadline_seconds

active_deadline_seconds: Optional[int | str] = None

affinity

affinity: Optional[Affinity] = None

annotations

annotations: Optional[Dict[str, str]] = None

archive_location

archive_location: Optional[ArtifactLocation] = None

automount_service_account_token

automount_service_account_token: Optional[bool] = None

container_set_retry_strategy

container_set_retry_strategy: Optional[
    ContainerSetRetryStrategy
] = None

containers

containers: List[Union[ContainerNode, ContainerNode]] = (
    field(default_factory=list)
)

daemon

daemon: Optional[bool] = None

env

env: EnvT = None

env_from

env_from: EnvFromT = None

executor

executor: Optional[ExecutorConfig] = None

fail_fast

fail_fast: Optional[bool] = None

host_aliases

host_aliases: Optional[List[HostAlias]] = None

image

image: Optional[str] = None

image_pull_policy

image_pull_policy: Optional[Union[str, ImagePullPolicy]] = (
    None
)

init_containers

init_containers: Optional[
    List[Union[UserContainer, UserContainer]]
] = None

inputs

inputs: InputsT = None

labels

labels: Optional[Dict[str, str]] = None

liveness_probe

liveness_probe: Optional[Probe] = None

memoize

memoize: Optional[Memoize] = None

metrics

metrics: Optional[MetricsT] = None

name

name: Optional[str] = None

node_selector

node_selector: Optional[Dict[str, str]] = None

outputs

outputs: OutputsT = None

parallelism

parallelism: Optional[int] = None

plugin

plugin: Optional[Plugin] = None

pod_security_context

pod_security_context: Optional[PodSecurityContext] = None

pod_spec_patch

pod_spec_patch: Optional[str] = None

ports

ports: Optional[List[ContainerPort]] = None

priority_class_name

priority_class_name: Optional[str] = None

readiness_probe

readiness_probe: Optional[Probe] = None

retry_strategy

retry_strategy: Optional[
    Union[RetryStrategy, RetryStrategy]
] = None

scheduler_name

scheduler_name: Optional[str] = None

service_account_name

service_account_name: Optional[str] = None

sidecars

sidecars: Optional[
    OneOrMany[UserContainer | UserContainer]
] = None

startup_probe

startup_probe: Optional[Probe] = None

stdin

stdin: Optional[bool] = None

stdin_once

stdin_once: Optional[bool] = None

synchronization

synchronization: Optional[Synchronization] = None

termination_message_path

termination_message_path: Optional[str] = None

termination_message_policy

termination_message_policy: Optional[str] = None

timeout

timeout: Optional[str] = None

tolerations

tolerations: Optional[List[Toleration]] = None

tty

tty: Optional[bool] = None

volume_devices

volume_devices: Optional[List[VolumeDevice]] = None

volume_mounts

volume_mounts: Optional[List[VolumeMount]] = None

volumes

volumes: Optional[VolumesT] = None

get_artifact

get_artifact(name: str) -> Artifact

Finds and returns the artifact with the supplied name.

Note that this method will raise an error if the artifact is not found.

Parameters:

Name Type Description Default
name str

name of the input artifact to find and return.

required

Returns:

Name Type Description
Artifact Artifact

the artifact with the supplied name.

Raises:

Type Description
KeyError

if the artifact is not found.

Source code in src/hera/workflows/_mixins.py
def get_artifact(self, name: str) -> Artifact:
    """Finds and returns the artifact with the supplied name.

    Note that this method will raise an error if the artifact is not found.

    Args:
        name: name of the input artifact to find and return.

    Returns:
        Artifact: the artifact with the supplied name.

    Raises:
        KeyError: if the artifact is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Artifact {name} not found.")
    if inputs.artifacts is None:
        raise KeyError(f"No artifacts set. Artifact {name} not found.")
    for artifact in inputs.artifacts:
        if artifact.name == name:
            return Artifact(name=name, from_=f"{{{{inputs.artifacts.{artifact.name}}}}}")
    raise KeyError(f"Artifact {name} not found.")

get_parameter

get_parameter(name: str) -> Parameter

Finds and returns the parameter with the supplied name.

Note that this method will raise an error if the parameter is not found.

Parameters:

Name Type Description Default
name str

name of the input parameter to find and return.

required

Returns:

Name Type Description
Parameter Parameter

the parameter with the supplied name.

Raises:

Type Description
KeyError

if the parameter is not found.

Source code in src/hera/workflows/_mixins.py
def get_parameter(self, name: str) -> Parameter:
    """Finds and returns the parameter with the supplied name.

    Note that this method will raise an error if the parameter is not found.

    Args:
        name: name of the input parameter to find and return.

    Returns:
        Parameter: the parameter with the supplied name.

    Raises:
        KeyError: if the parameter is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Parameter {name} not found.")
    if inputs.parameters is None:
        raise KeyError(f"No parameters set. Parameter {name} not found.")
    for p in inputs.parameters:
        if p.name == name:
            param = Parameter.from_model(p)
            param.value = f"{{{{inputs.parameters.{param.name}}}}}"
            return param
    raise KeyError(f"Parameter {name} not found.")

Comments