Skip to content

Cron workflow

CronWorkflow

CronWorkflow allows a user to run a Workflow on a recurring basis.

Note

Hera’s CronWorkflow is a subclass of Workflow which means certain fields are renamed for compatibility, see cron_suspend and cron_status which are different from the Argo spec. See CronWorkflowSpec for more details.

Source code in src/hera/workflows/cron_workflow.py
@dataclass(kw_only=True)
class CronWorkflow(Workflow):
    """CronWorkflow allows a user to run a Workflow on a recurring basis.

    Note:
        Hera's CronWorkflow is a subclass of Workflow which means certain fields are renamed
        for compatibility, see `cron_suspend` and `cron_status` which are different from the Argo
        spec. See [CronWorkflowSpec](https://argoproj.github.io/argo-workflows/fields/#cronworkflow) for more details.
    """

    concurrency_policy: Annotated[Optional[str], _CronWorkflowModelMapper("spec.concurrency_policy")] = None
    failed_jobs_history_limit: Annotated[Optional[int], _CronWorkflowModelMapper("spec.failed_jobs_history_limit")] = (
        None
    )
    schedule: Optional[str] = None
    """Deprecated: `spec.schedule` was removed in Argo Workflows v4. Use `schedules` instead.

    For backwards compatibility, Hera v7 still accepts `schedule`: setting it emits a
    DeprecationWarning and appends the value to `schedules` (initialising the list if needed).
    The shim will be removed in Hera v8.
    """
    schedules: Annotated[Optional[List[str]], _CronWorkflowModelMapper("spec.schedules")] = None
    starting_deadline_seconds: Annotated[Optional[int], _CronWorkflowModelMapper("spec.starting_deadline_seconds")] = (
        None
    )
    stop_strategy: Annotated[Optional[StopStrategy], _CronWorkflowModelMapper("spec.stop_strategy")] = None
    successful_jobs_history_limit: Annotated[
        Optional[int], _CronWorkflowModelMapper("spec.successful_jobs_history_limit")
    ] = None
    cron_suspend: Annotated[Optional[bool], _CronWorkflowModelMapper("spec.suspend")] = None
    timezone: Annotated[Optional[str], _CronWorkflowModelMapper("spec.timezone")] = None
    when: Annotated[Optional[str], _CronWorkflowModelMapper("spec.when")] = None
    cron_status: Annotated[Optional[CronWorkflowStatus], _CronWorkflowModelMapper("status")] = None

    def __post_init__(self) -> None:
        """Apply parent post-init logic and translate the deprecated `schedule` field."""
        super().__post_init__()
        self._translate_legacy_schedule()

    def _translate_legacy_schedule(self) -> None:
        """Translate `schedule` (removed in Argo Workflows v4) to `schedules` and warn.

        If both `schedule` and `schedules` are set, the legacy value is appended after the
        existing schedules: e.g. `schedule="X"` + `schedules=["Y"]` becomes
        `schedules=["Y", "X"]`. The legacy attribute is then cleared so the v4 wire payload
        contains only `schedules`.
        """
        if self.schedule is None:
            return
        warnings.warn(
            "CronWorkflowSpec.schedule was removed in Argo Workflows v4; use `schedules=[...]` "
            "instead. Hera will translate this for you in v7 but will remove the shim in v8.",
            DeprecationWarning,
            stacklevel=3,
        )
        if self.schedules is None:
            self.schedules = []
        self.schedules.append(self.schedule)
        self.schedule = None

    def create(self) -> TWorkflow:  # type: ignore
        """Creates the CronWorkflow on the Argo cluster."""
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"

        wf = self.workflows_service.create_cron_workflow(
            CreateCronWorkflowRequest(cron_workflow=self.build()),  # type: ignore
            namespace=self.namespace,
        )
        # set the name on the object so that we can do a get/update later
        self.name = wf.metadata.name
        return wf

    def get(self) -> TWorkflow:
        """Attempts to get a cron workflow based on the parameters of this template e.g. name + namespace."""
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"
        assert self.name, "workflow name not defined"
        return self.workflows_service.get_cron_workflow(name=self.name, namespace=self.namespace)

    def update(self) -> TWorkflow:
        """Attempts to perform a workflow template update based on the parameters of this template.

        Note that this creates the template if it does not exist. In addition, this performs
        a get prior to updating to get the resource version to update in the first place. If you know the template
        does not exist ahead of time, it is more efficient to use `create()` directly to avoid one round trip.
        """
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"
        assert self.name, "workflow name not defined"
        # we always need to do a get prior to updating to get the resource version to update in the first place
        # https://github.com/argoproj/argo-workflows/pull/5465#discussion_r597797052

        template = self.build()
        try:
            curr = self.get()
            template.metadata.resource_version = curr.metadata.resource_version
        except NotFound:
            return self.create()
        return self.workflows_service.update_cron_workflow(
            self.name,
            UpdateCronWorkflowRequest(cron_workflow=template),  # type: ignore
            namespace=self.namespace,
        )

    def lint(self) -> TWorkflow:
        """Lints the CronWorkflow using the Argo cluster."""
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return self.workflows_service.lint_cron_workflow(
            LintCronWorkflowRequest(cron_workflow=self.build()),  # type: ignore
            namespace=self.namespace,
        )

    async def async_create(self) -> TWorkflow:  # type: ignore
        """Creates the CronWorkflow on the Argo cluster."""
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"

        wf = await self.workflows_service.create_cron_workflow(
            CreateCronWorkflowRequest(cron_workflow=self.build()),  # type: ignore
            namespace=self.namespace,
        )
        # set the name on the object so that we can do a get/update later
        self.name = wf.metadata.name
        return wf

    async def async_get(self) -> TWorkflow:
        """Attempts to get a cron workflow based on the parameters of this template e.g. name + namespace."""
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"
        assert self.name, "workflow name not defined"
        return await self.workflows_service.get_cron_workflow(name=self.name, namespace=self.namespace)

    async def async_update(self) -> TWorkflow:
        """Attempts to perform a workflow template update based on the parameters of this template.

        Note that this creates the template if it does not exist. In addition, this performs
        a get prior to updating to get the resource version to update in the first place. If you know the template
        does not exist ahead of time, it is more efficient to use `create()` directly to avoid one round trip.
        """
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"
        assert self.name, "workflow name not defined"
        # we always need to do a get prior to updating to get the resource version to update in the first place
        # https://github.com/argoproj/argo-workflows/pull/5465#discussion_r597797052

        template = self.build()
        try:
            curr = await self.async_get()
            template.metadata.resource_version = curr.metadata.resource_version
        except NotFound:
            return await self.async_create()
        return await self.workflows_service.update_cron_workflow(
            self.name,
            UpdateCronWorkflowRequest(cron_workflow=template),  # type: ignore
            namespace=self.namespace,
        )

    async def async_lint(self) -> TWorkflow:
        """Lints the CronWorkflow using the Argo cluster."""
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return await self.workflows_service.lint_cron_workflow(
            LintCronWorkflowRequest(cron_workflow=self.build()),  # type: ignore
            namespace=self.namespace,
        )

    def build(self) -> TWorkflow:
        """Builds the CronWorkflow and its components into an Argo schema CronWorkflow object."""
        self = self._dispatch_hooks()

        # The v4 CronWorkflowSpec requires `schedules` (previously `schedule` was an alternative).
        # Seed with a copy of `self.schedules` so the ModelMapper-driven assignment that follows
        # does not mutate the caller's list, and so a missing list does not blow up validation.
        model_workflow = cast(_ModelWorkflow, super().build())
        model_cron_workflow = _ModelCronWorkflow(
            metadata=model_workflow.metadata,
            spec=CronWorkflowSpec(
                schedules=list(self.schedules or []),
                workflow_spec=model_workflow.spec,
            ),
        )

        return _CronWorkflowModelMapper.build_model(CronWorkflow, self, model_cron_workflow)

    @classmethod
    def _from_model(cls, model: APIBaseModel) -> ModelMapperMixin:
        """Parse from given model to cls's type."""
        assert isinstance(model, _ModelCronWorkflow)
        hera_cron_workflow = cls()

        for attr, annotation in cls._get_all_annotations().items():
            if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
                if len(mappers) != 1:
                    raise ValueError("Expected only one ModelMapper")

                if mappers[0].model_path:
                    value = None

                    if (
                        isinstance(mappers[0], _CronWorkflowModelMapper)
                        or isinstance(mappers[0], _WorkflowModelMapper)
                        and mappers[0].model_path[0] == "metadata"
                    ):
                        value = _get_model_attr(model, mappers[0].model_path)

                    elif isinstance(mappers[0], _WorkflowModelMapper) and mappers[0].model_path[0] == "spec":
                        # We map "spec.workflow_spec" from the model CronWorkflow to "spec" for Hera's Workflow (used
                        # as the parent class of Hera's CronWorkflow)
                        value = _get_model_attr(model.spec.workflow_spec, mappers[0].model_path[1:])

                    if value is not None:
                        setattr(hera_cron_workflow, attr, value)

        return hera_cron_workflow

    @classmethod
    def from_dict(cls, model_dict: Dict) -> ModelMapperMixin:
        """Create a CronWorkflow from a CronWorkflow contained in a dict.

        Examples:
            >>> my_cron_workflow = CronWorkflow(name="my-cron-wf")
            >>> my_cron_workflow == CronWorkflow.from_dict(my_cron_workflow.to_dict())
            True
        """
        return cls._from_dict(model_dict, _ModelCronWorkflow)

    @classmethod
    def from_yaml(cls, yaml_str: str) -> ModelMapperMixin:
        """Create a CronWorkflow from a CronWorkflow contained in a YAML string.

        Examples:
            >>> my_cron_workflow = CronWorkflow.from_yaml(yaml_str)
        """
        return cls._from_yaml(yaml_str, _ModelCronWorkflow)

    @classmethod
    def from_file(cls, yaml_file: Union[Path, str]) -> ModelMapperMixin:
        """Create a CronWorkflow from a CronWorkflow contained in a YAML file.

        Examples:
            >>> yaml_file = Path(...)
            >>> my_workflow_template = CronWorkflow.from_file(yaml_file)
        """
        return cls._from_file(yaml_file, _ModelCronWorkflow)

    def get_workflow_link(self) -> str:
        """Returns the workflow link for the workflow."""
        assert self.workflows_service is not None, "Cannot fetch a cron workflow link without a service"
        assert self.name is not None, "Cannot fetch a cron workflow link without a cron workflow name"
        return self.workflows_service.get_cron_workflow_link(self.name)

active_deadline_seconds

active_deadline_seconds: Optional[int] = None

affinity

affinity: Optional[Affinity] = None

annotations

annotations: Optional[Dict[str, str]] = None

api_version

api_version: Optional[str] = None

archive_logs

archive_logs: Optional[bool] = None

arguments

arguments: ArgumentsT = None

artifact_gc

artifact_gc: Optional[
    ArtifactGC | WorkflowLevelArtifactGC
] = None

artifact_repository_ref

artifact_repository_ref: Optional[ArtifactRepositoryRef] = (
    None
)

automount_service_account_token

automount_service_account_token: Optional[bool] = None

concurrency_policy

concurrency_policy: Optional[str] = None

creation_timestamp

creation_timestamp: Optional[Time] = None

cron_status

cron_status: Optional[CronWorkflowStatus] = None

cron_suspend

cron_suspend: Optional[bool] = None

deletion_grace_period_seconds

deletion_grace_period_seconds: Optional[int] = None

deletion_timestamp

deletion_timestamp: Optional[Time] = None

dns_config

dns_config: Optional[PodDNSConfig] = None

dns_policy

dns_policy: Optional[str] = None

entrypoint

entrypoint: Optional[str] = None

executor

executor: Optional[ExecutorConfig] = None

failed_jobs_history_limit

failed_jobs_history_limit: Optional[int] = None

finalizers

finalizers: Optional[List[str]] = None

generate_name

generate_name: Optional[str] = None

generation

generation: Optional[int] = None

hooks

hooks: Optional[Dict[str, LifecycleHook]] = None

host_aliases

host_aliases: Optional[List[HostAlias]] = None

host_network

host_network: Optional[bool] = None

image_pull_secrets

image_pull_secrets: ImagePullSecretsT = None

kind

kind: Optional[str] = None

labels

labels: Optional[Dict[str, str]] = None

managed_fields

managed_fields: Optional[List[ManagedFieldsEntry]] = None

metrics

metrics: MetricsT = None

name

name: Optional[str] = None

namespace

namespace: Optional[str] = None

node_selector

node_selector: Optional[Dict[str, str]] = None

on_exit

on_exit: Optional[Union[str, Templatable]] = None

owner_references

owner_references: Optional[List[OwnerReference]] = None

parallelism

parallelism: Optional[int] = None

pod_disruption_budget

pod_disruption_budget: Optional[PodDisruptionBudgetSpec] = (
    None
)

pod_gc

pod_gc: Optional[PodGC] = None

pod_metadata

pod_metadata: Optional[Metadata] = None

pod_priority_class_name

pod_priority_class_name: Optional[str] = None

pod_spec_patch

pod_spec_patch: Optional[str] = None

priority

priority: Optional[int] = None

resource_version

resource_version: Optional[str] = None

retry_strategy

retry_strategy: Optional[
    Union[RetryStrategy, RetryStrategy]
] = None

schedule

schedule: Optional[str] = None

Deprecated: spec.schedule was removed in Argo Workflows v4. Use schedules instead.

For backwards compatibility, Hera v7 still accepts schedule: setting it emits a DeprecationWarning and appends the value to schedules (initialising the list if needed). The shim will be removed in Hera v8.

scheduler_name

scheduler_name: Optional[str] = None

schedules

schedules: Optional[List[str]] = None

security_context

security_context: Optional[PodSecurityContext] = None
self_link: Optional[str] = None

service_account_name

service_account_name: Optional[str] = None

shutdown

shutdown: Optional[str] = None

starting_deadline_seconds

starting_deadline_seconds: Optional[int] = None

status

status: Optional[WorkflowStatus] = None

stop_strategy

stop_strategy: Optional[StopStrategy] = None

successful_jobs_history_limit

successful_jobs_history_limit: Optional[int] = None

suspend

suspend: Optional[bool] = None

synchronization

synchronization: Optional[Synchronization] = None

template_defaults

template_defaults: Optional[Template] = None

templates

templates: List[Union[Template, Templatable]] = field(
    default_factory=list
)

timezone

timezone: Optional[str] = None

tolerations

tolerations: Optional[List[Toleration]] = None

ttl_strategy

ttl_strategy: Optional[TTLStrategy] = None

uid

uid: Optional[str] = None

volume_claim_gc

volume_claim_gc: Optional[VolumeClaimGC] = None

volume_claim_templates

volume_claim_templates: Optional[
    List[PersistentVolumeClaim]
] = None

volumes

volumes: VolumesT = None

when

when: Optional[str] = None

workflow_metadata

workflow_metadata: Optional[WorkflowMetadata] = None

workflow_template_ref

workflow_template_ref: Optional[WorkflowTemplateRef] = None

workflows_service

workflows_service: Optional[
    Union[WorkflowsService, AsyncWorkflowsService]
] = None

ModelMapper

Source code in src/hera/workflows/_meta_mixins.py
class ModelMapper:
    def __init__(self, model_path: str, hera_builder: Optional[Callable] = None):
        self.model_path = []
        self.builder = hera_builder

        if not model_path:
            # Allows overriding parent attribute annotations to remove the mapping
            return

        self.model_path = model_path.split(".")
        curr_class: Type[APIBaseModel] = self._get_model_class()
        for key in self.model_path:
            fields = get_fields(curr_class)
            if key not in fields:
                raise ValueError(f"Model key '{key}' does not exist in class {curr_class}")
            curr_class = fields[key].annotation  # type: ignore

    @classmethod
    def _get_model_class(cls) -> Type[APIBaseModel]:
        raise NotImplementedError

    @classmethod
    def build_model(
        cls, hera_class: Type[ModelMapperMixin], hera_obj: ModelMapperMixin, model: TWorkflow
    ) -> TWorkflow:
        for attr, annotation in hera_class._get_all_annotations().items():
            if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
                if len(mappers) != 1:
                    raise ValueError("Expected only one ModelMapper")

                # Value comes from builder function if it exists on hera_obj, otherwise directly from the attr
                value = (
                    getattr(hera_obj, mappers[0].builder.__name__)()
                    if mappers[0].builder is not None
                    else getattr(hera_obj, attr)
                )
                if value is not None:
                    _set_model_attr(model, mappers[0].model_path, value)

        return model

builder

builder = hera_builder

model_path

model_path = split('.')

build_model

build_model(
    hera_class: Type[ModelMapperMixin],
    hera_obj: ModelMapperMixin,
    model: TWorkflow,
) -> TWorkflow
Source code in src/hera/workflows/_meta_mixins.py
@classmethod
def build_model(
    cls, hera_class: Type[ModelMapperMixin], hera_obj: ModelMapperMixin, model: TWorkflow
) -> TWorkflow:
    for attr, annotation in hera_class._get_all_annotations().items():
        if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
            if len(mappers) != 1:
                raise ValueError("Expected only one ModelMapper")

            # Value comes from builder function if it exists on hera_obj, otherwise directly from the attr
            value = (
                getattr(hera_obj, mappers[0].builder.__name__)()
                if mappers[0].builder is not None
                else getattr(hera_obj, attr)
            )
            if value is not None:
                _set_model_attr(model, mappers[0].model_path, value)

    return model

async_create

async_create() -> TWorkflow

Creates the CronWorkflow on the Argo cluster.

Source code in src/hera/workflows/cron_workflow.py
async def async_create(self) -> TWorkflow:  # type: ignore
    """Creates the CronWorkflow on the Argo cluster."""
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"

    wf = await self.workflows_service.create_cron_workflow(
        CreateCronWorkflowRequest(cron_workflow=self.build()),  # type: ignore
        namespace=self.namespace,
    )
    # set the name on the object so that we can do a get/update later
    self.name = wf.metadata.name
    return wf

async_get

async_get() -> TWorkflow

Attempts to get a cron workflow based on the parameters of this template e.g. name + namespace.

Source code in src/hera/workflows/cron_workflow.py
async def async_get(self) -> TWorkflow:
    """Attempts to get a cron workflow based on the parameters of this template e.g. name + namespace."""
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"
    assert self.name, "workflow name not defined"
    return await self.workflows_service.get_cron_workflow(name=self.name, namespace=self.namespace)

async_lint

async_lint() -> TWorkflow

Lints the CronWorkflow using the Argo cluster.

Source code in src/hera/workflows/cron_workflow.py
async def async_lint(self) -> TWorkflow:
    """Lints the CronWorkflow using the Argo cluster."""
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return await self.workflows_service.lint_cron_workflow(
        LintCronWorkflowRequest(cron_workflow=self.build()),  # type: ignore
        namespace=self.namespace,
    )

async_update

async_update() -> TWorkflow

Attempts to perform a workflow template update based on the parameters of this template.

Note that this creates the template if it does not exist. In addition, this performs a get prior to updating to get the resource version to update in the first place. If you know the template does not exist ahead of time, it is more efficient to use create() directly to avoid one round trip.

Source code in src/hera/workflows/cron_workflow.py
async def async_update(self) -> TWorkflow:
    """Attempts to perform a workflow template update based on the parameters of this template.

    Note that this creates the template if it does not exist. In addition, this performs
    a get prior to updating to get the resource version to update in the first place. If you know the template
    does not exist ahead of time, it is more efficient to use `create()` directly to avoid one round trip.
    """
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"
    assert self.name, "workflow name not defined"
    # we always need to do a get prior to updating to get the resource version to update in the first place
    # https://github.com/argoproj/argo-workflows/pull/5465#discussion_r597797052

    template = self.build()
    try:
        curr = await self.async_get()
        template.metadata.resource_version = curr.metadata.resource_version
    except NotFound:
        return await self.async_create()
    return await self.workflows_service.update_cron_workflow(
        self.name,
        UpdateCronWorkflowRequest(cron_workflow=template),  # type: ignore
        namespace=self.namespace,
    )

async_wait

async_wait(poll_interval: int = 5) -> TWorkflow

Waits for the Workflow to complete execution.

Parameters

poll_interval: int = 5 The interval in seconds to poll the workflow status.

Source code in src/hera/workflows/workflow.py
async def async_wait(self, poll_interval: int = 5) -> TWorkflow:
    """Waits for the Workflow to complete execution.

    Parameters
    ----------
    poll_interval: int = 5
        The interval in seconds to poll the workflow status.
    """
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace is not None, "workflow namespace not defined"
    assert self.name is not None, "workflow name not defined"

    # here we use the sleep interval to wait for the workflow post creation. This is to address a potential
    # race conditions such as:
    # 1. Argo server says "workflow was accepted" but the workflow is not yet created
    # 2. Hera wants to verify the status of the workflow, but it's not yet defined because it's not created
    # 3. Argo finally creates the workflow
    # 4. Hera throws an `AssertionError` because the phase assertion fails
    await asyncio.sleep(poll_interval)
    wf = await self.workflows_service.get_workflow(self.name, namespace=self.namespace)
    assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

    assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
    assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
    status = WorkflowStatus.from_argo_status(wf.status.phase)

    # keep polling for workflow status until completed, at the interval dictated by the user
    while status == WorkflowStatus.running:
        await asyncio.sleep(poll_interval)
        wf = await self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
        assert wf.metadata.name is not None
        assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
        assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
        status = WorkflowStatus.from_argo_status(wf.status.phase)
    return wf

build

build() -> TWorkflow

Builds the CronWorkflow and its components into an Argo schema CronWorkflow object.

Source code in src/hera/workflows/cron_workflow.py
def build(self) -> TWorkflow:
    """Builds the CronWorkflow and its components into an Argo schema CronWorkflow object."""
    self = self._dispatch_hooks()

    # The v4 CronWorkflowSpec requires `schedules` (previously `schedule` was an alternative).
    # Seed with a copy of `self.schedules` so the ModelMapper-driven assignment that follows
    # does not mutate the caller's list, and so a missing list does not blow up validation.
    model_workflow = cast(_ModelWorkflow, super().build())
    model_cron_workflow = _ModelCronWorkflow(
        metadata=model_workflow.metadata,
        spec=CronWorkflowSpec(
            schedules=list(self.schedules or []),
            workflow_spec=model_workflow.spec,
        ),
    )

    return _CronWorkflowModelMapper.build_model(CronWorkflow, self, model_cron_workflow)

create

create() -> TWorkflow

Creates the CronWorkflow on the Argo cluster.

Source code in src/hera/workflows/cron_workflow.py
def create(self) -> TWorkflow:  # type: ignore
    """Creates the CronWorkflow on the Argo cluster."""
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"

    wf = self.workflows_service.create_cron_workflow(
        CreateCronWorkflowRequest(cron_workflow=self.build()),  # type: ignore
        namespace=self.namespace,
    )
    # set the name on the object so that we can do a get/update later
    self.name = wf.metadata.name
    return wf

from_dict

from_dict(model_dict: Dict) -> ModelMapperMixin

Create a CronWorkflow from a CronWorkflow contained in a dict.

Examples:

>>> my_cron_workflow = CronWorkflow(name="my-cron-wf")
>>> my_cron_workflow == CronWorkflow.from_dict(my_cron_workflow.to_dict())
True
Source code in src/hera/workflows/cron_workflow.py
@classmethod
def from_dict(cls, model_dict: Dict) -> ModelMapperMixin:
    """Create a CronWorkflow from a CronWorkflow contained in a dict.

    Examples:
        >>> my_cron_workflow = CronWorkflow(name="my-cron-wf")
        >>> my_cron_workflow == CronWorkflow.from_dict(my_cron_workflow.to_dict())
        True
    """
    return cls._from_dict(model_dict, _ModelCronWorkflow)

from_file

from_file(yaml_file: Union[Path, str]) -> ModelMapperMixin

Create a CronWorkflow from a CronWorkflow contained in a YAML file.

Examples:

>>> yaml_file = Path(...)
>>> my_workflow_template = CronWorkflow.from_file(yaml_file)
Source code in src/hera/workflows/cron_workflow.py
@classmethod
def from_file(cls, yaml_file: Union[Path, str]) -> ModelMapperMixin:
    """Create a CronWorkflow from a CronWorkflow contained in a YAML file.

    Examples:
        >>> yaml_file = Path(...)
        >>> my_workflow_template = CronWorkflow.from_file(yaml_file)
    """
    return cls._from_file(yaml_file, _ModelCronWorkflow)

from_yaml

from_yaml(yaml_str: str) -> ModelMapperMixin

Create a CronWorkflow from a CronWorkflow contained in a YAML string.

Examples:

>>> my_cron_workflow = CronWorkflow.from_yaml(yaml_str)
Source code in src/hera/workflows/cron_workflow.py
@classmethod
def from_yaml(cls, yaml_str: str) -> ModelMapperMixin:
    """Create a CronWorkflow from a CronWorkflow contained in a YAML string.

    Examples:
        >>> my_cron_workflow = CronWorkflow.from_yaml(yaml_str)
    """
    return cls._from_yaml(yaml_str, _ModelCronWorkflow)

get

get() -> TWorkflow

Attempts to get a cron workflow based on the parameters of this template e.g. name + namespace.

Source code in src/hera/workflows/cron_workflow.py
def get(self) -> TWorkflow:
    """Attempts to get a cron workflow based on the parameters of this template e.g. name + namespace."""
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"
    assert self.name, "workflow name not defined"
    return self.workflows_service.get_cron_workflow(name=self.name, namespace=self.namespace)

get_parameter

get_parameter(name: str) -> Parameter

Attempts to find and return a Parameter of the specified name.

Source code in src/hera/workflows/workflow.py
def get_parameter(self, name: str) -> Parameter:
    """Attempts to find and return a `Parameter` of the specified name."""
    arguments = self._build_arguments()
    if arguments is None:
        raise KeyError("Workflow has no arguments set")
    if arguments.parameters is None:
        raise KeyError("Workflow has no argument parameters set")

    parameters = arguments.parameters
    if next((p for p in parameters if p.name == name), None) is None:
        raise KeyError(f"`{name}` is not a valid workflow parameter")
    return Parameter(name=name, value=f"{{{{workflow.parameters.{name}}}}}")
get_workflow_link() -> str

Returns the workflow link for the workflow.

Source code in src/hera/workflows/cron_workflow.py
def get_workflow_link(self) -> str:
    """Returns the workflow link for the workflow."""
    assert self.workflows_service is not None, "Cannot fetch a cron workflow link without a service"
    assert self.name is not None, "Cannot fetch a cron workflow link without a cron workflow name"
    return self.workflows_service.get_cron_workflow_link(self.name)

lint

lint() -> TWorkflow

Lints the CronWorkflow using the Argo cluster.

Source code in src/hera/workflows/cron_workflow.py
def lint(self) -> TWorkflow:
    """Lints the CronWorkflow using the Argo cluster."""
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return self.workflows_service.lint_cron_workflow(
        LintCronWorkflowRequest(cron_workflow=self.build()),  # type: ignore
        namespace=self.namespace,
    )

to_dict

to_dict() -> Any

Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary.

Source code in src/hera/workflows/workflow.py
def to_dict(self) -> Any:
    """Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary."""
    return self.build().model_dump(exclude_none=True, by_alias=True)

to_file

to_file(
    output_directory: Union[Path, str] = ".",
    name: str = "",
    *args,
    **kwargs,
) -> Path

Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

Parameters:

Name Type Description Default
output_directory Union[Path, str]

The directory to write the file to. Defaults to the current working directory.

'.'
name str

The name of the file to write without the file extension. Defaults to the Workflow’s name or a generated name.

''
*args

Additional arguments to pass to yaml.dump.

()
**kwargs

Additional keyword arguments to pass to yaml.dump.

{}
Source code in src/hera/workflows/workflow.py
def to_file(self, output_directory: Union[Path, str] = ".", name: str = "", *args, **kwargs) -> Path:
    """Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

    Args:
        output_directory: The directory to write the file to. Defaults to the current working directory.
        name: The name of the file to write without the file extension.  Defaults to the Workflow's name or a
              generated name.
        *args: Additional arguments to pass to `yaml.dump`.
        **kwargs: Additional keyword arguments to pass to `yaml.dump`.
    """
    workflow_name = self.name or (self.generate_name or "workflow").rstrip("-")
    name = name or workflow_name
    output_directory = Path(output_directory)
    output_path = Path(output_directory) / f"{name}.yaml"
    output_directory.mkdir(parents=True, exist_ok=True)
    output_path.write_text(self.to_yaml(*args, **kwargs))
    return output_path.absolute()

to_yaml

to_yaml(*args, **kwargs) -> str

Builds the Workflow as an Argo schema Workflow object and returns it as yaml string.

Source code in src/hera/workflows/workflow.py
def to_yaml(self, *args, **kwargs) -> str:
    """Builds the Workflow as an Argo schema Workflow object and returns it as yaml string."""

    def human_readable_ordering(kv: tuple) -> int:
        """Key ordering function for ordering in a more human-readable fashion.

        Ordering is:
        1. "name" keys always first (if present)
        2. Primitives (not dicts/lists)
        3. lists
        4. dict
        """
        k, v = kv
        if k == "name" and isinstance(v, str):
            return 0
        if not isinstance(v, (dict, list)):
            return 1
        if isinstance(v, list):
            return 2
        return 3

    def order_dict(d: dict) -> dict[str, Any]:
        """Recursively orders `d` by the custom_ordering function by inserting them into a copy of the dict in order."""
        d_copy: dict[str, Any] = dict()
        for k, v in sorted(d.items(), key=lambda x: (human_readable_ordering(x), x[0])):
            if isinstance(v, dict):
                d_copy[k] = order_dict(v)
            elif isinstance(v, list):
                if v and isinstance(v[0], dict):
                    d_copy[k] = [order_dict(i) if isinstance(i, dict) else i for i in v]
                elif v and isinstance(v[0], list):
                    d_copy[k] = [[order_dict(i) for i in sublist] for sublist in v]
                else:
                    d_copy[k] = v
            else:
                d_copy[k] = v
        return d_copy

    human_ordered_dict = order_dict(self.to_dict())
    return _yaml.dump(human_ordered_dict, *args, **kwargs)

update

update() -> TWorkflow

Attempts to perform a workflow template update based on the parameters of this template.

Note that this creates the template if it does not exist. In addition, this performs a get prior to updating to get the resource version to update in the first place. If you know the template does not exist ahead of time, it is more efficient to use create() directly to avoid one round trip.

Source code in src/hera/workflows/cron_workflow.py
def update(self) -> TWorkflow:
    """Attempts to perform a workflow template update based on the parameters of this template.

    Note that this creates the template if it does not exist. In addition, this performs
    a get prior to updating to get the resource version to update in the first place. If you know the template
    does not exist ahead of time, it is more efficient to use `create()` directly to avoid one round trip.
    """
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"
    assert self.name, "workflow name not defined"
    # we always need to do a get prior to updating to get the resource version to update in the first place
    # https://github.com/argoproj/argo-workflows/pull/5465#discussion_r597797052

    template = self.build()
    try:
        curr = self.get()
        template.metadata.resource_version = curr.metadata.resource_version
    except NotFound:
        return self.create()
    return self.workflows_service.update_cron_workflow(
        self.name,
        UpdateCronWorkflowRequest(cron_workflow=template),  # type: ignore
        namespace=self.namespace,
    )

wait

wait(poll_interval: int = 5) -> TWorkflow

Waits for the Workflow to complete execution.

Parameters

poll_interval: int = 5 The interval in seconds to poll the workflow status.

Source code in src/hera/workflows/workflow.py
def wait(self, poll_interval: int = 5) -> TWorkflow:
    """Waits for the Workflow to complete execution.

    Parameters
    ----------
    poll_interval: int = 5
        The interval in seconds to poll the workflow status.
    """
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace is not None, "workflow namespace not defined"
    assert self.name is not None, "workflow name not defined"

    # here we use the sleep interval to wait for the workflow post creation. This is to address a potential
    # race conditions such as:
    # 1. Argo server says "workflow was accepted" but the workflow is not yet created
    # 2. Hera wants to verify the status of the workflow, but it's not yet defined because it's not created
    # 3. Argo finally creates the workflow
    # 4. Hera throws an `AssertionError` because the phase assertion fails
    time.sleep(poll_interval)
    wf = self.workflows_service.get_workflow(self.name, namespace=self.namespace)
    assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

    assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
    assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
    status = WorkflowStatus.from_argo_status(wf.status.phase)

    # keep polling for workflow status until completed, at the interval dictated by the user
    while status == WorkflowStatus.running:
        time.sleep(poll_interval)
        wf = self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
        assert wf.metadata.name is not None
        assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
        assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
        status = WorkflowStatus.from_argo_status(wf.status.phase)
    return wf

Comments