Skip to content

Cluster workflow template

ClusterWorkflowTemplate

ClusterWorkflowTemplates are cluster scoped templates.

Since cluster workflow templates are scoped at the cluster level, they are available globally in the cluster.

Source code in src/hera/workflows/cluster_workflow_template.py
@dataclass(kw_only=True)
class ClusterWorkflowTemplate(WorkflowTemplate):
    """ClusterWorkflowTemplates are cluster scoped templates.

    Since cluster workflow templates are scoped at the cluster level, they are available globally in the cluster.
    """

    def __post_init__(self):
        """Set class defaults via __post_init__, then ensure namespace is not set."""
        super().__post_init__()

        if self.namespace is not None:
            raise ValueError("namespace is not a valid field on a ClusterWorkflowTemplate")

    def create(self) -> TWorkflow:  # type: ignore
        """Creates the ClusterWorkflowTemplate on the Argo cluster."""
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        return self.workflows_service.create_cluster_workflow_template(
            ClusterWorkflowTemplateCreateRequest(template=self.build())  # type: ignore
        )

    def get(self) -> TWorkflow:
        """Attempts to get a workflow template based on the parameters of this template e.g. name + namespace."""
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.name, "workflow name not defined"
        return self.workflows_service.get_cluster_workflow_template(name=self.name)

    def update(self) -> TWorkflow:
        """Attempts to perform a workflow template update based on the parameters of this template.

        Note that this creates the template if it does not exist. In addition, this performs
        a get prior to updating to get the resource version to update in the first place. If you know the template
        does not exist ahead of time, it is more efficient to use `create()` directly to avoid one round trip.
        """
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.name, "workflow name not defined"
        # we always need to do a get prior to updating to get the resource version to update in the first place
        # https://github.com/argoproj/argo-workflows/pull/5465#discussion_r597797052

        template = self.build()
        try:
            curr = self.get()
            template.metadata.resource_version = curr.metadata.resource_version
        except NotFound:
            return self.create()
        return self.workflows_service.update_cluster_workflow_template(
            self.name,
            ClusterWorkflowTemplateUpdateRequest(template=template),  # type: ignore
        )

    def lint(self) -> TWorkflow:
        """Lints the ClusterWorkflowTemplate using the Argo cluster."""
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        return self.workflows_service.lint_cluster_workflow_template(
            ClusterWorkflowTemplateLintRequest(template=self.build())  # type: ignore
        )

    async def async_create(self) -> TWorkflow:  # type: ignore
        """Creates the ClusterWorkflowTemplate on the Argo cluster."""
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        return await self.workflows_service.create_cluster_workflow_template(
            ClusterWorkflowTemplateCreateRequest(template=self.build())  # type: ignore
        )

    async def async_get(self) -> TWorkflow:
        """Attempts to get a workflow template based on the parameters of this template e.g. name + namespace."""
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.name, "workflow name not defined"
        return await self.workflows_service.get_cluster_workflow_template(name=self.name)

    async def async_update(self) -> TWorkflow:
        """Attempts to perform a workflow template update based on the parameters of this template.

        Note that this creates the template if it does not exist. In addition, this performs
        a get prior to updating to get the resource version to update in the first place. If you know the template
        does not exist ahead of time, it is more efficient to use `create()` directly to avoid one round trip.
        """
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.name, "workflow name not defined"
        # we always need to do a get prior to updating to get the resource version to update in the first place
        # https://github.com/argoproj/argo-workflows/pull/5465#discussion_r597797052

        template = self.build()
        try:
            curr = await self.async_get()
            template.metadata.resource_version = curr.metadata.resource_version
        except NotFound:
            return await self.async_create()
        return await self.workflows_service.update_cluster_workflow_template(
            self.name,
            ClusterWorkflowTemplateUpdateRequest(template=template),  # type: ignore
        )

    async def async_lint(self) -> TWorkflow:
        """Lints the ClusterWorkflowTemplate using the Argo cluster."""
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        return await self.workflows_service.lint_cluster_workflow_template(
            ClusterWorkflowTemplateLintRequest(template=self.build())  # type: ignore
        )

    def build(self) -> TWorkflow:
        """Builds the ClusterWorkflowTemplate and its components into an Argo schema ClusterWorkflowTemplate object."""
        # Note that ClusterWorkflowTemplates are exactly the same as WorkflowTemplates except for the kind which is
        # handled in Workflow._set_kind (by __name__). When using ClusterWorkflowTemplates via templateRef, clients
        # should specify cluster_scope=True, but that is an intrinsic property of ClusterWorkflowTemplates from our
        # perspective.
        return _ModelClusterWorkflowTemplate(**super().build().model_dump())

active_deadline_seconds

active_deadline_seconds: Optional[int] = None

affinity

affinity: Optional[Affinity] = None

annotations

annotations: Optional[Dict[str, str]] = None

api_version

api_version: Optional[str] = None

archive_logs

archive_logs: Optional[bool] = None

arguments

arguments: ArgumentsT = None

artifact_gc

artifact_gc: Optional[
    ArtifactGC | WorkflowLevelArtifactGC
] = None

artifact_repository_ref

artifact_repository_ref: Optional[ArtifactRepositoryRef] = (
    None
)

automount_service_account_token

automount_service_account_token: Optional[bool] = None

creation_timestamp

creation_timestamp: Optional[Time] = None

deletion_grace_period_seconds

deletion_grace_period_seconds: Optional[int] = None

deletion_timestamp

deletion_timestamp: Optional[Time] = None

dns_config

dns_config: Optional[PodDNSConfig] = None

dns_policy

dns_policy: Optional[str] = None

entrypoint

entrypoint: Optional[str] = None

executor

executor: Optional[ExecutorConfig] = None

finalizers

finalizers: Optional[List[str]] = None

generate_name

generate_name: Optional[str] = None

generation

generation: Optional[int] = None

hooks

hooks: Optional[Dict[str, LifecycleHook]] = None

host_aliases

host_aliases: Optional[List[HostAlias]] = None

host_network

host_network: Optional[bool] = None

image_pull_secrets

image_pull_secrets: ImagePullSecretsT = None

kind

kind: Optional[str] = None

labels

labels: Optional[Dict[str, str]] = None

managed_fields

managed_fields: Optional[List[ManagedFieldsEntry]] = None

metrics

metrics: MetricsT = None

name

name: Optional[str] = None

namespace

namespace: Optional[str] = None

node_selector

node_selector: Optional[Dict[str, str]] = None

on_exit

on_exit: Optional[Union[str, Templatable]] = None

owner_references

owner_references: Optional[List[OwnerReference]] = None

parallelism

parallelism: Optional[int] = None

pod_disruption_budget

pod_disruption_budget: Optional[PodDisruptionBudgetSpec] = (
    None
)

pod_gc

pod_gc: Optional[PodGC] = None

pod_metadata

pod_metadata: Optional[Metadata] = None

pod_priority_class_name

pod_priority_class_name: Optional[str] = None

pod_spec_patch

pod_spec_patch: Optional[str] = None

priority

priority: Optional[int] = None

resource_version

resource_version: Optional[str] = None

retry_strategy

retry_strategy: Optional[
    Union[RetryStrategy, RetryStrategy]
] = None

scheduler_name

scheduler_name: Optional[str] = None

security_context

security_context: Optional[PodSecurityContext] = None
self_link: Optional[str] = None

service_account_name

service_account_name: Optional[str] = None

shutdown

shutdown: Optional[str] = None

status

status: Optional[WorkflowStatus] = None

suspend

suspend: Optional[bool] = None

synchronization

synchronization: Optional[Synchronization] = None

template_defaults

template_defaults: Optional[Template] = None

templates

templates: List[Union[Template, Templatable]] = field(
    default_factory=list
)

tolerations

tolerations: Optional[List[Toleration]] = None

ttl_strategy

ttl_strategy: Optional[TTLStrategy] = None

uid

uid: Optional[str] = None

volume_claim_gc

volume_claim_gc: Optional[VolumeClaimGC] = None

volume_claim_templates

volume_claim_templates: Optional[
    List[PersistentVolumeClaim]
] = None

volumes

volumes: VolumesT = None

workflow_metadata

workflow_metadata: Optional[WorkflowMetadata] = None

workflow_template_ref

workflow_template_ref: Optional[WorkflowTemplateRef] = None

workflows_service

workflows_service: Optional[
    Union[WorkflowsService, AsyncWorkflowsService]
] = None

ModelMapper

Source code in src/hera/workflows/_meta_mixins.py
class ModelMapper:
    def __init__(self, model_path: str, hera_builder: Optional[Callable] = None):
        self.model_path = []
        self.builder = hera_builder

        if not model_path:
            # Allows overriding parent attribute annotations to remove the mapping
            return

        self.model_path = model_path.split(".")
        curr_class: Type[APIBaseModel] = self._get_model_class()
        for key in self.model_path:
            fields = get_fields(curr_class)
            if key not in fields:
                raise ValueError(f"Model key '{key}' does not exist in class {curr_class}")
            curr_class = fields[key].annotation  # type: ignore

    @classmethod
    def _get_model_class(cls) -> Type[APIBaseModel]:
        raise NotImplementedError

    @classmethod
    def build_model(
        cls, hera_class: Type[ModelMapperMixin], hera_obj: ModelMapperMixin, model: TWorkflow
    ) -> TWorkflow:
        for attr, annotation in hera_class._get_all_annotations().items():
            if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
                if len(mappers) != 1:
                    raise ValueError("Expected only one ModelMapper")

                # Value comes from builder function if it exists on hera_obj, otherwise directly from the attr
                value = (
                    getattr(hera_obj, mappers[0].builder.__name__)()
                    if mappers[0].builder is not None
                    else getattr(hera_obj, attr)
                )
                if value is not None:
                    _set_model_attr(model, mappers[0].model_path, value)

        return model

builder

builder = hera_builder

model_path

model_path = split('.')

build_model

build_model(
    hera_class: Type[ModelMapperMixin],
    hera_obj: ModelMapperMixin,
    model: TWorkflow,
) -> TWorkflow
Source code in src/hera/workflows/_meta_mixins.py
@classmethod
def build_model(
    cls, hera_class: Type[ModelMapperMixin], hera_obj: ModelMapperMixin, model: TWorkflow
) -> TWorkflow:
    for attr, annotation in hera_class._get_all_annotations().items():
        if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
            if len(mappers) != 1:
                raise ValueError("Expected only one ModelMapper")

            # Value comes from builder function if it exists on hera_obj, otherwise directly from the attr
            value = (
                getattr(hera_obj, mappers[0].builder.__name__)()
                if mappers[0].builder is not None
                else getattr(hera_obj, attr)
            )
            if value is not None:
                _set_model_attr(model, mappers[0].model_path, value)

    return model

async_create

async_create() -> TWorkflow

Creates the ClusterWorkflowTemplate on the Argo cluster.

Source code in src/hera/workflows/cluster_workflow_template.py
async def async_create(self) -> TWorkflow:  # type: ignore
    """Creates the ClusterWorkflowTemplate on the Argo cluster."""
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    return await self.workflows_service.create_cluster_workflow_template(
        ClusterWorkflowTemplateCreateRequest(template=self.build())  # type: ignore
    )

async_get

async_get() -> TWorkflow

Attempts to get a workflow template based on the parameters of this template e.g. name + namespace.

Source code in src/hera/workflows/cluster_workflow_template.py
async def async_get(self) -> TWorkflow:
    """Attempts to get a workflow template based on the parameters of this template e.g. name + namespace."""
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.name, "workflow name not defined"
    return await self.workflows_service.get_cluster_workflow_template(name=self.name)

async_lint

async_lint() -> TWorkflow

Lints the ClusterWorkflowTemplate using the Argo cluster.

Source code in src/hera/workflows/cluster_workflow_template.py
async def async_lint(self) -> TWorkflow:
    """Lints the ClusterWorkflowTemplate using the Argo cluster."""
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    return await self.workflows_service.lint_cluster_workflow_template(
        ClusterWorkflowTemplateLintRequest(template=self.build())  # type: ignore
    )

async_update

async_update() -> TWorkflow

Attempts to perform a workflow template update based on the parameters of this template.

Note that this creates the template if it does not exist. In addition, this performs a get prior to updating to get the resource version to update in the first place. If you know the template does not exist ahead of time, it is more efficient to use create() directly to avoid one round trip.

Source code in src/hera/workflows/cluster_workflow_template.py
async def async_update(self) -> TWorkflow:
    """Attempts to perform a workflow template update based on the parameters of this template.

    Note that this creates the template if it does not exist. In addition, this performs
    a get prior to updating to get the resource version to update in the first place. If you know the template
    does not exist ahead of time, it is more efficient to use `create()` directly to avoid one round trip.
    """
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.name, "workflow name not defined"
    # we always need to do a get prior to updating to get the resource version to update in the first place
    # https://github.com/argoproj/argo-workflows/pull/5465#discussion_r597797052

    template = self.build()
    try:
        curr = await self.async_get()
        template.metadata.resource_version = curr.metadata.resource_version
    except NotFound:
        return await self.async_create()
    return await self.workflows_service.update_cluster_workflow_template(
        self.name,
        ClusterWorkflowTemplateUpdateRequest(template=template),  # type: ignore
    )

async_wait

async_wait(poll_interval: int = 5) -> TWorkflow

Waits for the Workflow to complete execution.

Parameters

poll_interval: int = 5 The interval in seconds to poll the workflow status.

Source code in src/hera/workflows/workflow.py
async def async_wait(self, poll_interval: int = 5) -> TWorkflow:
    """Waits for the Workflow to complete execution.

    Parameters
    ----------
    poll_interval: int = 5
        The interval in seconds to poll the workflow status.
    """
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace is not None, "workflow namespace not defined"
    assert self.name is not None, "workflow name not defined"

    # here we use the sleep interval to wait for the workflow post creation. This is to address a potential
    # race conditions such as:
    # 1. Argo server says "workflow was accepted" but the workflow is not yet created
    # 2. Hera wants to verify the status of the workflow, but it's not yet defined because it's not created
    # 3. Argo finally creates the workflow
    # 4. Hera throws an `AssertionError` because the phase assertion fails
    await asyncio.sleep(poll_interval)
    wf = await self.workflows_service.get_workflow(self.name, namespace=self.namespace)
    assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

    assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
    assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
    status = WorkflowStatus.from_argo_status(wf.status.phase)

    # keep polling for workflow status until completed, at the interval dictated by the user
    while status == WorkflowStatus.running:
        await asyncio.sleep(poll_interval)
        wf = await self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
        assert wf.metadata.name is not None
        assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
        assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
        status = WorkflowStatus.from_argo_status(wf.status.phase)
    return wf

build

build() -> TWorkflow

Builds the ClusterWorkflowTemplate and its components into an Argo schema ClusterWorkflowTemplate object.

Source code in src/hera/workflows/cluster_workflow_template.py
def build(self) -> TWorkflow:
    """Builds the ClusterWorkflowTemplate and its components into an Argo schema ClusterWorkflowTemplate object."""
    # Note that ClusterWorkflowTemplates are exactly the same as WorkflowTemplates except for the kind which is
    # handled in Workflow._set_kind (by __name__). When using ClusterWorkflowTemplates via templateRef, clients
    # should specify cluster_scope=True, but that is an intrinsic property of ClusterWorkflowTemplates from our
    # perspective.
    return _ModelClusterWorkflowTemplate(**super().build().model_dump())

create

create() -> TWorkflow

Creates the ClusterWorkflowTemplate on the Argo cluster.

Source code in src/hera/workflows/cluster_workflow_template.py
def create(self) -> TWorkflow:  # type: ignore
    """Creates the ClusterWorkflowTemplate on the Argo cluster."""
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    return self.workflows_service.create_cluster_workflow_template(
        ClusterWorkflowTemplateCreateRequest(template=self.build())  # type: ignore
    )

create_as_workflow

create_as_workflow(
    generate_name: Optional[str] = None,
    wait: bool = False,
    poll_interval: int = 5,
) -> TWorkflow

Run this WorkflowTemplate instantly as a Workflow.

If generate_name is given, the workflow created uses generate_name as a prefix, as per the usual for hera.workflows.Workflow.generate_name. If not given, the WorkflowTemplate’s name will be used, truncated to 57 chars and appended with a hyphen.

Note: this function does not require the WorkflowTemplate to already exist on the cluster

Source code in src/hera/workflows/workflow_template.py
def create_as_workflow(
    self,
    generate_name: Optional[str] = None,
    wait: bool = False,
    poll_interval: int = 5,
) -> TWorkflow:
    """Run this WorkflowTemplate instantly as a Workflow.

    If generate_name is given, the workflow created uses generate_name as a prefix, as per the usual for
    hera.workflows.Workflow.generate_name. If not given, the WorkflowTemplate's name will be used, truncated to 57
    chars and appended with a hyphen.

    Note: this function does not require the WorkflowTemplate to already exist on the cluster
    """
    workflow = self._get_as_workflow(generate_name)
    return workflow.create(wait=wait, poll_interval=poll_interval)

from_dict

from_dict(model_dict: Dict) -> ModelMapperMixin

Create a WorkflowTemplate from a WorkflowTemplate contained in a dict.

Examples:

>>> my_workflow_template = WorkflowTemplate(name="my-wft")
>>> my_workflow_template == WorkflowTemplate.from_dict(my_workflow_template.to_dict())
True
Source code in src/hera/workflows/workflow_template.py
@classmethod
def from_dict(cls, model_dict: Dict) -> ModelMapperMixin:
    """Create a WorkflowTemplate from a WorkflowTemplate contained in a dict.

    Examples:
        >>> my_workflow_template = WorkflowTemplate(name="my-wft")
        >>> my_workflow_template == WorkflowTemplate.from_dict(my_workflow_template.to_dict())
        True
    """
    return cls._from_dict(model_dict, _ModelWorkflowTemplate)

from_file

from_file(yaml_file: Union[Path, str]) -> ModelMapperMixin

Create a WorkflowTemplate from a WorkflowTemplate contained in a YAML file.

Examples:

>>> yaml_file = Path(...)
>>> my_workflow_template = WorkflowTemplate.from_file(yaml_file)
Source code in src/hera/workflows/workflow_template.py
@classmethod
def from_file(cls, yaml_file: Union[Path, str]) -> ModelMapperMixin:
    """Create a WorkflowTemplate from a WorkflowTemplate contained in a YAML file.

    Examples:
        >>> yaml_file = Path(...)
        >>> my_workflow_template = WorkflowTemplate.from_file(yaml_file)
    """
    return cls._from_file(yaml_file, _ModelWorkflowTemplate)

from_yaml

from_yaml(yaml_str: str) -> ModelMapperMixin

Create a WorkflowTemplate from a WorkflowTemplate contained in a YAML string.

Examples:

>>> my_workflow_template = WorkflowTemplate.from_yaml(yaml_str)
Source code in src/hera/workflows/workflow_template.py
@classmethod
def from_yaml(cls, yaml_str: str) -> ModelMapperMixin:
    """Create a WorkflowTemplate from a WorkflowTemplate contained in a YAML string.

    Examples:
        >>> my_workflow_template = WorkflowTemplate.from_yaml(yaml_str)
    """
    return cls._from_yaml(yaml_str, _ModelWorkflowTemplate)

get

get() -> TWorkflow

Attempts to get a workflow template based on the parameters of this template e.g. name + namespace.

Source code in src/hera/workflows/cluster_workflow_template.py
def get(self) -> TWorkflow:
    """Attempts to get a workflow template based on the parameters of this template e.g. name + namespace."""
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.name, "workflow name not defined"
    return self.workflows_service.get_cluster_workflow_template(name=self.name)

get_parameter

get_parameter(name: str) -> Parameter

Attempts to find and return a Parameter of the specified name.

Source code in src/hera/workflows/workflow.py
def get_parameter(self, name: str) -> Parameter:
    """Attempts to find and return a `Parameter` of the specified name."""
    arguments = self._build_arguments()
    if arguments is None:
        raise KeyError("Workflow has no arguments set")
    if arguments.parameters is None:
        raise KeyError("Workflow has no argument parameters set")

    parameters = arguments.parameters
    if next((p for p in parameters if p.name == name), None) is None:
        raise KeyError(f"`{name}` is not a valid workflow parameter")
    return Parameter(name=name, value=f"{{{{workflow.parameters.{name}}}}}")
get_workflow_link() -> str

Returns the workflow link for the workflow.

Source code in src/hera/workflows/workflow.py
def get_workflow_link(self) -> str:
    """Returns the workflow link for the workflow."""
    assert self.workflows_service is not None, "Cannot fetch a workflow link without a service"
    assert self.name is not None, "Cannot fetch a workflow link without a workflow name"
    return self.workflows_service.get_workflow_link(self.name)

lint

lint() -> TWorkflow

Lints the ClusterWorkflowTemplate using the Argo cluster.

Source code in src/hera/workflows/cluster_workflow_template.py
def lint(self) -> TWorkflow:
    """Lints the ClusterWorkflowTemplate using the Argo cluster."""
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    return self.workflows_service.lint_cluster_workflow_template(
        ClusterWorkflowTemplateLintRequest(template=self.build())  # type: ignore
    )

to_dict

to_dict() -> Any

Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary.

Source code in src/hera/workflows/workflow.py
def to_dict(self) -> Any:
    """Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary."""
    return self.build().model_dump(exclude_none=True, by_alias=True)

to_file

to_file(
    output_directory: Union[Path, str] = ".",
    name: str = "",
    *args,
    **kwargs,
) -> Path

Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

Parameters:

Name Type Description Default
output_directory Union[Path, str]

The directory to write the file to. Defaults to the current working directory.

'.'
name str

The name of the file to write without the file extension. Defaults to the Workflow’s name or a generated name.

''
*args

Additional arguments to pass to yaml.dump.

()
**kwargs

Additional keyword arguments to pass to yaml.dump.

{}
Source code in src/hera/workflows/workflow.py
def to_file(self, output_directory: Union[Path, str] = ".", name: str = "", *args, **kwargs) -> Path:
    """Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

    Args:
        output_directory: The directory to write the file to. Defaults to the current working directory.
        name: The name of the file to write without the file extension.  Defaults to the Workflow's name or a
              generated name.
        *args: Additional arguments to pass to `yaml.dump`.
        **kwargs: Additional keyword arguments to pass to `yaml.dump`.
    """
    workflow_name = self.name or (self.generate_name or "workflow").rstrip("-")
    name = name or workflow_name
    output_directory = Path(output_directory)
    output_path = Path(output_directory) / f"{name}.yaml"
    output_directory.mkdir(parents=True, exist_ok=True)
    output_path.write_text(self.to_yaml(*args, **kwargs))
    return output_path.absolute()

to_yaml

to_yaml(*args, **kwargs) -> str

Builds the Workflow as an Argo schema Workflow object and returns it as yaml string.

Source code in src/hera/workflows/workflow.py
def to_yaml(self, *args, **kwargs) -> str:
    """Builds the Workflow as an Argo schema Workflow object and returns it as yaml string."""

    def human_readable_ordering(kv: tuple) -> int:
        """Key ordering function for ordering in a more human-readable fashion.

        Ordering is:
        1. "name" keys always first (if present)
        2. Primitives (not dicts/lists)
        3. lists
        4. dict
        """
        k, v = kv
        if k == "name" and isinstance(v, str):
            return 0
        if not isinstance(v, (dict, list)):
            return 1
        if isinstance(v, list):
            return 2
        return 3

    def order_dict(d: dict) -> dict[str, Any]:
        """Recursively orders `d` by the custom_ordering function by inserting them into a copy of the dict in order."""
        d_copy: dict[str, Any] = dict()
        for k, v in sorted(d.items(), key=lambda x: (human_readable_ordering(x), x[0])):
            if isinstance(v, dict):
                d_copy[k] = order_dict(v)
            elif isinstance(v, list):
                if v and isinstance(v[0], dict):
                    d_copy[k] = [order_dict(i) if isinstance(i, dict) else i for i in v]
                elif v and isinstance(v[0], list):
                    d_copy[k] = [[order_dict(i) for i in sublist] for sublist in v]
                else:
                    d_copy[k] = v
            else:
                d_copy[k] = v
        return d_copy

    human_ordered_dict = order_dict(self.to_dict())
    return _yaml.dump(human_ordered_dict, *args, **kwargs)

update

update() -> TWorkflow

Attempts to perform a workflow template update based on the parameters of this template.

Note that this creates the template if it does not exist. In addition, this performs a get prior to updating to get the resource version to update in the first place. If you know the template does not exist ahead of time, it is more efficient to use create() directly to avoid one round trip.

Source code in src/hera/workflows/cluster_workflow_template.py
def update(self) -> TWorkflow:
    """Attempts to perform a workflow template update based on the parameters of this template.

    Note that this creates the template if it does not exist. In addition, this performs
    a get prior to updating to get the resource version to update in the first place. If you know the template
    does not exist ahead of time, it is more efficient to use `create()` directly to avoid one round trip.
    """
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.name, "workflow name not defined"
    # we always need to do a get prior to updating to get the resource version to update in the first place
    # https://github.com/argoproj/argo-workflows/pull/5465#discussion_r597797052

    template = self.build()
    try:
        curr = self.get()
        template.metadata.resource_version = curr.metadata.resource_version
    except NotFound:
        return self.create()
    return self.workflows_service.update_cluster_workflow_template(
        self.name,
        ClusterWorkflowTemplateUpdateRequest(template=template),  # type: ignore
    )

wait

wait(poll_interval: int = 5) -> TWorkflow

Waits for the Workflow to complete execution.

Parameters

poll_interval: int = 5 The interval in seconds to poll the workflow status.

Source code in src/hera/workflows/workflow.py
def wait(self, poll_interval: int = 5) -> TWorkflow:
    """Waits for the Workflow to complete execution.

    Parameters
    ----------
    poll_interval: int = 5
        The interval in seconds to poll the workflow status.
    """
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace is not None, "workflow namespace not defined"
    assert self.name is not None, "workflow name not defined"

    # here we use the sleep interval to wait for the workflow post creation. This is to address a potential
    # race conditions such as:
    # 1. Argo server says "workflow was accepted" but the workflow is not yet created
    # 2. Hera wants to verify the status of the workflow, but it's not yet defined because it's not created
    # 3. Argo finally creates the workflow
    # 4. Hera throws an `AssertionError` because the phase assertion fails
    time.sleep(poll_interval)
    wf = self.workflows_service.get_workflow(self.name, namespace=self.namespace)
    assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

    assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
    assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
    status = WorkflowStatus.from_argo_status(wf.status.phase)

    # keep polling for workflow status until completed, at the interval dictated by the user
    while status == WorkflowStatus.running:
        time.sleep(poll_interval)
        wf = self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
        assert wf.metadata.name is not None
        assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
        assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
        status = WorkflowStatus.from_argo_status(wf.status.phase)
    return wf

Comments