Skip to content

Resource

The hera.workflows.resource module provides functionality for creating K8s resources via workflows inside task/steps.

Resource

Resource is a representation of a K8s resource that can be created by Argo.

The resource is a callable step that can be invoked in a DAG/Workflow. The resource can create any K8s resource, such as other workflows, workflow templates, daemons, etc, as specified by the manifest field of the resource. The manifest field is a canonical YAML that is submitted to K8s by Argo. Note that the manifest is a union of multiple types. The manifest can be a string, in which case it is assume to be YAML. Otherwise, if it’s a Hera objects, it is automatically converted to the corresponding YAML representation.

Source code in src/hera/workflows/resource.py
@dataclass(kw_only=True)
class Resource(CallableTemplateMixin, TemplateMixin, SubNodeMixin, IOMixin):
    """`Resource` is a representation of a K8s resource that can be created by Argo.

    The resource is a callable step that can be invoked in a DAG/Workflow. The resource can create any K8s resource,
    such as other workflows, workflow templates, daemons, etc, as specified by the `manifest` field of the resource.
    The manifest field is a canonical YAML that is submitted to K8s by Argo. Note that the manifest is a union of
    multiple types. The manifest can be a string, in which case it is assume to be YAML. Otherwise, if it's a Hera
    objects, it is automatically converted to the corresponding YAML representation.
    """

    action: str
    failure_condition: Optional[str] = None
    flags: Optional[List[str]] = None
    manifest: Optional[Union[str, "Workflow", "CronWorkflow", "WorkflowTemplate"]] = None
    manifest_from: Optional[ManifestFrom] = None
    merge_strategy: Optional[str] = None
    set_owner_reference: Optional[bool] = None
    success_condition: Optional[str] = None

    def _build_manifest(self) -> Optional[str]:
        from hera.workflows.cron_workflow import CronWorkflow
        from hera.workflows.workflow import Workflow
        from hera.workflows.workflow_template import WorkflowTemplate

        if isinstance(self.manifest, (Workflow, CronWorkflow, WorkflowTemplate)):
            # hack to appease raw yaml string comparison
            return self.manifest.to_yaml().replace("'{{", "{{").replace("}}'", "}}")
        return self.manifest

    def _build_resource_template(self) -> _ModelResourceTemplate:
        return _ModelResourceTemplate(
            action=self.action,
            failure_condition=self.failure_condition,
            flags=self.flags,
            manifest=self._build_manifest(),
            manifest_from=self.manifest_from,
            merge_strategy=self.merge_strategy,
            set_owner_reference=self.set_owner_reference,
            success_condition=self.success_condition,
        )

    def _build_template(self) -> _ModelTemplate:
        return _ModelTemplate(
            active_deadline_seconds=IntOrString(root=self.active_deadline_seconds)
            if self.active_deadline_seconds
            else None,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            daemon=self.daemon,
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self._build_init_containers(),
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self._build_metrics(),
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            parallelism=self.parallelism,
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority_class_name=self.priority_class_name,
            resource=self._build_resource_template(),
            retry_strategy=self._build_retry_strategy(),
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self._build_sidecars(),
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

action

action: str

active_deadline_seconds

active_deadline_seconds: Optional[int | str] = None

affinity

affinity: Optional[Affinity] = None

annotations

annotations: Optional[Dict[str, str]] = None

archive_location

archive_location: Optional[ArtifactLocation] = None

automount_service_account_token

automount_service_account_token: Optional[bool] = None

daemon

daemon: Optional[bool] = None

executor

executor: Optional[ExecutorConfig] = None

fail_fast

fail_fast: Optional[bool] = None

failure_condition

failure_condition: Optional[str] = None

flags

flags: Optional[List[str]] = None

host_aliases

host_aliases: Optional[List[HostAlias]] = None

init_containers

init_containers: Optional[
    List[Union[UserContainer, UserContainer]]
] = None

inputs

inputs: InputsT = None

labels

labels: Optional[Dict[str, str]] = None

manifest

manifest_from

manifest_from: Optional[ManifestFrom] = None

memoize

memoize: Optional[Memoize] = None

merge_strategy

merge_strategy: Optional[str] = None

metrics

metrics: Optional[MetricsT] = None

name

name: Optional[str] = None

node_selector

node_selector: Optional[Dict[str, str]] = None

outputs

outputs: OutputsT = None

parallelism

parallelism: Optional[int] = None

plugin

plugin: Optional[Plugin] = None

pod_security_context

pod_security_context: Optional[PodSecurityContext] = None

pod_spec_patch

pod_spec_patch: Optional[str] = None

priority_class_name

priority_class_name: Optional[str] = None

retry_strategy

retry_strategy: Optional[
    Union[RetryStrategy, RetryStrategy]
] = None

scheduler_name

scheduler_name: Optional[str] = None

service_account_name

service_account_name: Optional[str] = None

set_owner_reference

set_owner_reference: Optional[bool] = None

sidecars

sidecars: Optional[
    OneOrMany[UserContainer | UserContainer]
] = None

success_condition

success_condition: Optional[str] = None

synchronization

synchronization: Optional[Synchronization] = None

timeout

timeout: Optional[str] = None

tolerations

tolerations: Optional[List[Toleration]] = None

get_artifact

get_artifact(name: str) -> Artifact

Finds and returns the artifact with the supplied name.

Note that this method will raise an error if the artifact is not found.

Parameters:

Name Type Description Default
name str

name of the input artifact to find and return.

required

Returns:

Name Type Description
Artifact Artifact

the artifact with the supplied name.

Raises:

Type Description
KeyError

if the artifact is not found.

Source code in src/hera/workflows/_mixins.py
def get_artifact(self, name: str) -> Artifact:
    """Finds and returns the artifact with the supplied name.

    Note that this method will raise an error if the artifact is not found.

    Args:
        name: name of the input artifact to find and return.

    Returns:
        Artifact: the artifact with the supplied name.

    Raises:
        KeyError: if the artifact is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Artifact {name} not found.")
    if inputs.artifacts is None:
        raise KeyError(f"No artifacts set. Artifact {name} not found.")
    for artifact in inputs.artifacts:
        if artifact.name == name:
            return Artifact(name=name, from_=f"{{{{inputs.artifacts.{artifact.name}}}}}")
    raise KeyError(f"Artifact {name} not found.")

get_parameter

get_parameter(name: str) -> Parameter

Finds and returns the parameter with the supplied name.

Note that this method will raise an error if the parameter is not found.

Parameters:

Name Type Description Default
name str

name of the input parameter to find and return.

required

Returns:

Name Type Description
Parameter Parameter

the parameter with the supplied name.

Raises:

Type Description
KeyError

if the parameter is not found.

Source code in src/hera/workflows/_mixins.py
def get_parameter(self, name: str) -> Parameter:
    """Finds and returns the parameter with the supplied name.

    Note that this method will raise an error if the parameter is not found.

    Args:
        name: name of the input parameter to find and return.

    Returns:
        Parameter: the parameter with the supplied name.

    Raises:
        KeyError: if the parameter is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Parameter {name} not found.")
    if inputs.parameters is None:
        raise KeyError(f"No parameters set. Parameter {name} not found.")
    for p in inputs.parameters:
        if p.name == name:
            param = Parameter.from_model(p)
            param.value = f"{{{{inputs.parameters.{param.name}}}}}"
            return param
    raise KeyError(f"Parameter {name} not found.")

Comments