Skip to content

Workflow

Workflow

The base Workflow class for Hera.

Workflow implements the contextmanager interface so allows usage of with, under which any hera.workflows.protocol.Templatable object instantiated under the context will be added to the Workflow’s list of templates.

Workflows can be created directly on your Argo cluster via create. They can also be dumped to yaml via to_yaml or built according to the Argo schema via build to get an OpenAPI model object.

Source code in src/hera/workflows/workflow.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
@dataclass(kw_only=True)
class Workflow(
    ArgumentsMixin,
    ContextMixin,
    HookMixin,
    VolumeMixin,
    MetricsMixin,
    ModelMapperMixin,
):
    """The base Workflow class for Hera.

    Workflow implements the contextmanager interface so allows usage of `with`, under which
    any `hera.workflows.protocol.Templatable` object instantiated under the context will be
    added to the Workflow's list of templates.

    Workflows can be created directly on your Argo cluster via `create`. They can also be dumped
    to yaml via `to_yaml` or built according to the Argo schema via `build` to get an OpenAPI model
    object.
    """

    def _build_volume_claim_templates(self) -> Optional[List]:
        return ((self.volume_claim_templates or []) + (self._build_persistent_volume_claims() or [])) or None

    def _build_on_exit(self) -> Optional[str]:
        if isinstance(self.on_exit, Templatable):
            return self.on_exit._build_template().name  # type: ignore
        return self.on_exit

    def _build_retry_strategy(self) -> Optional[ModelRetryStrategy]:
        if self.retry_strategy is None:
            return None

        if isinstance(self.retry_strategy, RetryStrategy):
            return self.retry_strategy.build()

        return self.retry_strategy

    def _build_templates(self) -> Optional[List[_ModelTemplate]]:
        """Builds the templates into an Argo schema."""
        templates: List[_ModelTemplate] = []
        for template in self.templates:
            if isinstance(template, HookMixin):
                template = template._dispatch_hooks()

            if isinstance(template, Templatable):
                templates.append(template._build_template())
            elif isinstance(template, _ModelTemplate):
                templates.append(template)
            else:
                raise InvalidType(f"{type(template)} is not a valid template type")

            if isinstance(template, VolumeClaimable):
                claims = template._build_persistent_volume_claims()
                # If there are no claims, continue, nothing to add
                if not claims:
                    continue
                # If there are no volume claim templates, set them to the constructed claims
                elif self.volume_claim_templates is None:
                    self.volume_claim_templates = claims
                else:
                    # otherwise, we need to merge the two lists of volume claim templates. This prioritizes the
                    # already existing volume claim templates under the assumption that the user has already set
                    # a claim template on the workflow intentionally, or the user is sharing the same volumes across
                    # different templates
                    current_volume_claims_map = {}
                    for claim in self.volume_claim_templates:
                        assert claim.metadata is not None, "expected a workflow volume claim with metadata"
                        assert claim.metadata.name is not None, "expected a named workflow volume claim"
                        current_volume_claims_map[claim.metadata.name] = claim

                    new_volume_claims_map = {}
                    for claim in claims:
                        assert claim.metadata is not None, "expected a volume claim with metadata"
                        assert claim.metadata.name is not None, "expected a named volume claim"
                        new_volume_claims_map[claim.metadata.name] = claim

                    for claim_name, claim in new_volume_claims_map.items():
                        if claim_name not in current_volume_claims_map:
                            self.volume_claim_templates.append(claim)
        return templates or None

    # Workflow fields - https://argoproj.github.io/argo-workflows/fields/#workflow
    api_version: Annotated[Optional[str], _WorkflowModelMapper("api_version")] = None
    kind: Annotated[Optional[str], _WorkflowModelMapper("kind")] = None
    status: Annotated[Optional[_ModelWorkflowStatus], _WorkflowModelMapper("status")] = None

    # ObjectMeta fields - https://argoproj.github.io/argo-workflows/fields/#objectmeta
    annotations: Annotated[Optional[Dict[str, str]], _WorkflowModelMapper("metadata.annotations")] = None
    creation_timestamp: Annotated[Optional[Time], _WorkflowModelMapper("metadata.creation_timestamp")] = None
    deletion_grace_period_seconds: Annotated[
        Optional[int], _WorkflowModelMapper("metadata.deletion_grace_period_seconds")
    ] = None
    deletion_timestamp: Annotated[Optional[Time], _WorkflowModelMapper("metadata.deletion_timestamp")] = None
    finalizers: Annotated[Optional[List[str]], _WorkflowModelMapper("metadata.finalizers")] = None
    generate_name: Annotated[Optional[str], _WorkflowModelMapper("metadata.generate_name")] = None
    generation: Annotated[Optional[int], _WorkflowModelMapper("metadata.generation")] = None
    labels: Annotated[Optional[Dict[str, str]], _WorkflowModelMapper("metadata.labels")] = None
    managed_fields: Annotated[Optional[List[ManagedFieldsEntry]], _WorkflowModelMapper("metadata.managed_fields")] = (
        None
    )
    name: Annotated[Optional[str], _WorkflowModelMapper("metadata.name")] = None
    namespace: Annotated[Optional[str], _WorkflowModelMapper("metadata.namespace")] = None
    owner_references: Annotated[Optional[List[OwnerReference]], _WorkflowModelMapper("metadata.owner_references")] = (
        None
    )
    resource_version: Annotated[Optional[str], _WorkflowModelMapper("metadata.resource_version")] = None
    self_link: Annotated[Optional[str], _WorkflowModelMapper("metadata.self_link")] = None
    uid: Annotated[Optional[str], _WorkflowModelMapper("metadata.uid")] = None

    # WorkflowSpec fields - https://argoproj.github.io/argo-workflows/fields/#workflowspec
    active_deadline_seconds: Annotated[Optional[int], _WorkflowModelMapper("spec.active_deadline_seconds")] = None
    affinity: Annotated[Optional[Affinity], _WorkflowModelMapper("spec.affinity")] = None
    archive_logs: Annotated[Optional[bool], _WorkflowModelMapper("spec.archive_logs")] = None
    artifact_gc: Annotated[
        Optional[ArtifactGC | WorkflowLevelArtifactGC], _WorkflowModelMapper("spec.artifact_gc")
    ] = None
    artifact_repository_ref: Annotated[
        Optional[ArtifactRepositoryRef], _WorkflowModelMapper("spec.artifact_repository_ref")
    ] = None
    automount_service_account_token: Annotated[
        Optional[bool], _WorkflowModelMapper("spec.automount_service_account_token")
    ] = None
    dns_config: Annotated[Optional[PodDNSConfig], _WorkflowModelMapper("spec.dns_config")] = None
    dns_policy: Annotated[Optional[str], _WorkflowModelMapper("spec.dns_policy")] = None
    entrypoint: Annotated[Optional[str], _WorkflowModelMapper("spec.entrypoint")] = None
    executor: Annotated[Optional[ExecutorConfig], _WorkflowModelMapper("spec.executor")] = None
    hooks: Annotated[Optional[Dict[str, LifecycleHook]], _WorkflowModelMapper("spec.hooks")] = None
    host_aliases: Annotated[Optional[List[HostAlias]], _WorkflowModelMapper("spec.host_aliases")] = None
    host_network: Annotated[Optional[bool], _WorkflowModelMapper("spec.host_network")] = None
    image_pull_secrets: Annotated[ImagePullSecretsT, _WorkflowModelMapper("spec.image_pull_secrets")] = None
    node_selector: Annotated[Optional[Dict[str, str]], _WorkflowModelMapper("spec.node_selector")] = None
    on_exit: Annotated[Optional[Union[str, Templatable]], _WorkflowModelMapper("spec.on_exit", _build_on_exit)] = None
    parallelism: Annotated[Optional[int], _WorkflowModelMapper("spec.parallelism")] = None
    pod_disruption_budget: Annotated[
        Optional[PodDisruptionBudgetSpec], _WorkflowModelMapper("spec.pod_disruption_budget")
    ] = None
    pod_gc: Annotated[Optional[PodGC], _WorkflowModelMapper("spec.pod_gc")] = None
    pod_metadata: Annotated[Optional[Metadata], _WorkflowModelMapper("spec.pod_metadata")] = None
    pod_priority_class_name: Annotated[Optional[str], _WorkflowModelMapper("spec.pod_priority_class_name")] = None
    pod_spec_patch: Annotated[Optional[str], _WorkflowModelMapper("spec.pod_spec_patch")] = None
    priority: Annotated[Optional[int], _WorkflowModelMapper("spec.priority")] = None
    retry_strategy: Annotated[
        Optional[Union[RetryStrategy, ModelRetryStrategy]],
        _WorkflowModelMapper("spec.retry_strategy", _build_retry_strategy),
    ] = None
    scheduler_name: Annotated[Optional[str], _WorkflowModelMapper("spec.scheduler_name")] = None
    security_context: Annotated[Optional[PodSecurityContext], _WorkflowModelMapper("spec.security_context")] = None
    service_account_name: Annotated[Optional[str], _WorkflowModelMapper("spec.service_account_name")] = None
    shutdown: Annotated[Optional[str], _WorkflowModelMapper("spec.shutdown")] = None
    suspend: Annotated[Optional[bool], _WorkflowModelMapper("spec.suspend")] = None
    synchronization: Annotated[Optional[Synchronization], _WorkflowModelMapper("spec.synchronization")] = None
    template_defaults: Annotated[Optional[_ModelTemplate], _WorkflowModelMapper("spec.template_defaults")] = None
    templates: Annotated[
        List[Union[_ModelTemplate, Templatable]], _WorkflowModelMapper("spec.templates", _build_templates)
    ] = field(default_factory=list)
    tolerations: Annotated[Optional[List[Toleration]], _WorkflowModelMapper("spec.tolerations")] = None
    ttl_strategy: Annotated[Optional[TTLStrategy], _WorkflowModelMapper("spec.ttl_strategy")] = None
    volume_claim_gc: Annotated[Optional[VolumeClaimGC], _WorkflowModelMapper("spec.volume_claim_gc")] = None
    volume_claim_templates: Annotated[
        Optional[List[PersistentVolumeClaim]],
        _WorkflowModelMapper("spec.volume_claim_templates", _build_volume_claim_templates),
    ] = None
    workflow_metadata: Annotated[Optional[WorkflowMetadata], _WorkflowModelMapper("spec.workflow_metadata")] = None
    workflow_template_ref: Annotated[
        Optional[WorkflowTemplateRef], _WorkflowModelMapper("spec.workflow_template_ref")
    ] = None

    # Override types for mixin fields
    arguments: Annotated[
        ArgumentsT,
        _WorkflowModelMapper("spec.arguments", ArgumentsMixin._build_arguments),
    ] = None
    metrics: Annotated[
        MetricsT,
        _WorkflowModelMapper("spec.metrics", MetricsMixin._build_metrics),
    ] = None
    volumes: Annotated[VolumesT, _WorkflowModelMapper("spec.volumes", VolumeMixin._build_volumes)] = None

    # Hera-specific fields
    workflows_service: Optional[Union[WorkflowsService, AsyncWorkflowsService]] = None

    def __post_init__(self):
        """Set hooks via __post_init__ and perform validation."""
        super().__post_init__()

        if self.name is not None and len(self.name) > NAME_LIMIT:
            raise ValueError(f"name must be no more than {NAME_LIMIT} characters: {self.name}")

        if self.generate_name is not None and len(self.generate_name) > NAME_LIMIT:
            raise ValueError(f"generate_name must be no more than {NAME_LIMIT} characters: {self.generate_name}")

        if self.api_version is None:
            self.api_version = global_config.api_version

        if self.kind is None:
            self.kind = self.__class__.__name__

        if self.workflows_service is None:
            self.workflows_service = WorkflowsService()

        if self.namespace is None:
            self.namespace = global_config.namespace

        if self.service_account_name is None:
            self.service_account_name = global_config.service_account_name

        if self.image_pull_secrets is not None:
            self.image_pull_secrets = self._validate_image_pull_secrets(self.image_pull_secrets)

    @staticmethod
    def _validate_image_pull_secrets(image_pull_secrets: ImagePullSecretsT) -> ImagePullSecretsT:
        if isinstance(image_pull_secrets, str):
            image_pull_secrets = [LocalObjectReference(name=image_pull_secrets)]
        elif isinstance(image_pull_secrets, LocalObjectReference):
            image_pull_secrets = [image_pull_secrets]

        assert isinstance(image_pull_secrets, list), (
            "`image_pull_secrets` expected to be either a `str`, a `LocalObjectReferences`, a list of `str`, "
            "or a list of `LocalObjectReferences`"
        )

        result = []
        for secret in image_pull_secrets:
            if isinstance(secret, str):
                result.append(LocalObjectReference(name=secret))
            elif isinstance(secret, LocalObjectReference):
                result.append(secret)

        return result

    def get_parameter(self, name: str) -> Parameter:
        """Attempts to find and return a `Parameter` of the specified name."""
        arguments = self._build_arguments()
        if arguments is None:
            raise KeyError("Workflow has no arguments set")
        if arguments.parameters is None:
            raise KeyError("Workflow has no argument parameters set")

        parameters = arguments.parameters
        if next((p for p in parameters if p.name == name), None) is None:
            raise KeyError(f"`{name}` is not a valid workflow parameter")
        return Parameter(name=name, value=f"{{{{workflow.parameters.{name}}}}}")

    def build(self) -> TWorkflow:
        """Builds the Workflow and its components into an Argo schema Workflow object."""
        self = self._dispatch_hooks()

        model_workflow = _ModelWorkflow(
            metadata=ObjectMeta(),
            spec=_ModelWorkflowSpec(),
        )
        return _WorkflowModelMapper.build_model(Workflow, self, model_workflow)

    def to_dict(self) -> Any:
        """Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary."""
        return self.build().model_dump(exclude_none=True, by_alias=True)

    def __eq__(self, other) -> bool:
        """Verifies equality of `self` with the specified `other`."""
        if other.__class__ is self.__class__:
            return self.to_dict() == other.to_dict()

        return False

    def to_yaml(self, *args, **kwargs) -> str:
        """Builds the Workflow as an Argo schema Workflow object and returns it as yaml string."""

        def human_readable_ordering(kv: tuple) -> int:
            """Key ordering function for ordering in a more human-readable fashion.

            Ordering is:
            1. "name" keys always first (if present)
            2. Primitives (not dicts/lists)
            3. lists
            4. dict
            """
            k, v = kv
            if k == "name" and isinstance(v, str):
                return 0
            if not isinstance(v, (dict, list)):
                return 1
            if isinstance(v, list):
                return 2
            return 3

        def order_dict(d: dict) -> dict[str, Any]:
            """Recursively orders `d` by the custom_ordering function by inserting them into a copy of the dict in order."""
            d_copy: dict[str, Any] = dict()
            for k, v in sorted(d.items(), key=lambda x: (human_readable_ordering(x), x[0])):
                if isinstance(v, dict):
                    d_copy[k] = order_dict(v)
                elif isinstance(v, list):
                    if v and isinstance(v[0], dict):
                        d_copy[k] = [order_dict(i) if isinstance(i, dict) else i for i in v]
                    elif v and isinstance(v[0], list):
                        d_copy[k] = [[order_dict(i) for i in sublist] for sublist in v]
                    else:
                        d_copy[k] = v
                else:
                    d_copy[k] = v
            return d_copy

        human_ordered_dict = order_dict(self.to_dict())
        return _yaml.dump(human_ordered_dict, *args, **kwargs)

    def create(self, wait: bool = False, poll_interval: int = 5) -> TWorkflow:
        """Creates the Workflow on the Argo cluster.

        Parameters
        ----------
        wait: bool = False
            If false then the workflow is created and the function returns immediately after the server
            creates the Workflow.
            If true then the workflow is created and the function blocks until the workflow is done executing.
        poll_interval: int = 5
            The interval in seconds to poll the workflow status if wait is true. Ignored when wait is false.
        """
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"

        wf = self.workflows_service.create_workflow(
            WorkflowCreateRequest(workflow=self.build()),  # type: ignore
            namespace=self.namespace,
        )
        # set the workflow name to the name returned by the API, which helps cover the case of users relying on
        # `generate_name=True`
        self.name = wf.metadata.name

        if wait:
            return self.wait(poll_interval=poll_interval)
        return wf

    def wait(self, poll_interval: int = 5) -> TWorkflow:
        """Waits for the Workflow to complete execution.

        Parameters
        ----------
        poll_interval: int = 5
            The interval in seconds to poll the workflow status.
        """
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.namespace is not None, "workflow namespace not defined"
        assert self.name is not None, "workflow name not defined"

        # here we use the sleep interval to wait for the workflow post creation. This is to address a potential
        # race conditions such as:
        # 1. Argo server says "workflow was accepted" but the workflow is not yet created
        # 2. Hera wants to verify the status of the workflow, but it's not yet defined because it's not created
        # 3. Argo finally creates the workflow
        # 4. Hera throws an `AssertionError` because the phase assertion fails
        time.sleep(poll_interval)
        wf = self.workflows_service.get_workflow(self.name, namespace=self.namespace)
        assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

        assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
        assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
        status = WorkflowStatus.from_argo_status(wf.status.phase)

        # keep polling for workflow status until completed, at the interval dictated by the user
        while status == WorkflowStatus.running:
            time.sleep(poll_interval)
            wf = self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
            assert wf.metadata.name is not None
            assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
            assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
            status = WorkflowStatus.from_argo_status(wf.status.phase)
        return wf

    def lint(self) -> TWorkflow:
        """Lints the Workflow using the Argo cluster."""
        assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return self.workflows_service.lint_workflow(
            WorkflowLintRequest(workflow=self.build()),  # type: ignore
            namespace=self.namespace,
        )

    async def async_create(self, wait: bool = True, poll_interval: int = 5) -> TWorkflow:
        """Creates the Workflow on the Argo cluster. Note that `wait` is `True` by default as this is an async function.

        Parameters
        ----------
        wait: bool = True
            If false then the workflow is created and the function returns immediately.
            If true then the workflow is created and the function blocks until the workflow is done executing.
        poll_interval: int = 5
            The interval in seconds to poll the workflow status if wait is true. Ignored when wait is false.
        """
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"

        wf = await self.workflows_service.create_workflow(
            WorkflowCreateRequest(workflow=self.build()),  # type: ignore
            namespace=self.namespace,
        )
        # set the workflow name to the name returned by the API, which helps cover the case of users relying on
        # `generate_name=True`
        self.name = wf.metadata.name

        if wait:
            return await self.async_wait(poll_interval=poll_interval)
        return wf

    async def async_wait(self, poll_interval: int = 5) -> TWorkflow:
        """Waits for the Workflow to complete execution.

        Parameters
        ----------
        poll_interval: int = 5
            The interval in seconds to poll the workflow status.
        """
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.namespace is not None, "workflow namespace not defined"
        assert self.name is not None, "workflow name not defined"

        # here we use the sleep interval to wait for the workflow post creation. This is to address a potential
        # race conditions such as:
        # 1. Argo server says "workflow was accepted" but the workflow is not yet created
        # 2. Hera wants to verify the status of the workflow, but it's not yet defined because it's not created
        # 3. Argo finally creates the workflow
        # 4. Hera throws an `AssertionError` because the phase assertion fails
        await asyncio.sleep(poll_interval)
        wf = await self.workflows_service.get_workflow(self.name, namespace=self.namespace)
        assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

        assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
        assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
        status = WorkflowStatus.from_argo_status(wf.status.phase)

        # keep polling for workflow status until completed, at the interval dictated by the user
        while status == WorkflowStatus.running:
            await asyncio.sleep(poll_interval)
            wf = await self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
            assert wf.metadata.name is not None
            assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
            assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
            status = WorkflowStatus.from_argo_status(wf.status.phase)
        return wf

    async def async_lint(self) -> TWorkflow:
        """Lints the Workflow using the Argo cluster."""
        assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
        assert self.namespace, "workflow namespace not defined"
        return await self.workflows_service.lint_workflow(
            WorkflowLintRequest(workflow=self.build()),  # type: ignore
            namespace=self.namespace,
        )

    def _add_sub(self, node: Any):
        """Adds the given node (expected to satisfy the `Templatable` protocol) to the context."""
        if not isinstance(node, (Templatable, _ModelTemplate)):
            raise InvalidType(type(node))
        self.templates.append(node)

    def to_file(self, output_directory: Union[Path, str] = ".", name: str = "", *args, **kwargs) -> Path:
        """Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

        Args:
            output_directory: The directory to write the file to. Defaults to the current working directory.
            name: The name of the file to write without the file extension.  Defaults to the Workflow's name or a
                  generated name.
            *args: Additional arguments to pass to `yaml.dump`.
            **kwargs: Additional keyword arguments to pass to `yaml.dump`.
        """
        workflow_name = self.name or (self.generate_name or "workflow").rstrip("-")
        name = name or workflow_name
        output_directory = Path(output_directory)
        output_path = Path(output_directory) / f"{name}.yaml"
        output_directory.mkdir(parents=True, exist_ok=True)
        output_path.write_text(self.to_yaml(*args, **kwargs))
        return output_path.absolute()

    @classmethod
    def from_dict(cls, model_dict: Dict) -> ModelMapperMixin:
        """Create a Workflow from a Workflow contained in a dict.

        Examples:
            >>> my_workflow = Workflow(name="my-workflow")
            >>> my_workflow == Workflow.from_dict(my_workflow.to_dict())
            True
        """
        return cls._from_dict(model_dict, _ModelWorkflow)

    @classmethod
    def from_yaml(cls, yaml_str: str) -> ModelMapperMixin:
        """Create a Workflow from a Workflow contained in a YAML string.

        Examples:
            >>> my_workflow = Workflow.from_yaml(yaml_str)
        """
        return cls._from_yaml(yaml_str, _ModelWorkflow)

    @classmethod
    def from_file(cls, yaml_file: Union[Path, str]) -> ModelMapperMixin:
        """Create a Workflow from a Workflow contained in a YAML file.

        Examples:
            >>> yaml_file = Path(...)
            >>> my_workflow = Workflow.from_file(yaml_file)
        """
        return cls._from_file(yaml_file, _ModelWorkflow)

    def get_workflow_link(self) -> str:
        """Returns the workflow link for the workflow."""
        assert self.workflows_service is not None, "Cannot fetch a workflow link without a service"
        assert self.name is not None, "Cannot fetch a workflow link without a workflow name"
        return self.workflows_service.get_workflow_link(self.name)

active_deadline_seconds

active_deadline_seconds: Optional[int] = None

affinity

affinity: Optional[Affinity] = None

annotations

annotations: Optional[Dict[str, str]] = None

api_version

api_version: Optional[str] = None

archive_logs

archive_logs: Optional[bool] = None

arguments

arguments: ArgumentsT = None

artifact_gc

artifact_gc: Optional[
    ArtifactGC | WorkflowLevelArtifactGC
] = None

artifact_repository_ref

artifact_repository_ref: Optional[ArtifactRepositoryRef] = (
    None
)

automount_service_account_token

automount_service_account_token: Optional[bool] = None

creation_timestamp

creation_timestamp: Optional[Time] = None

deletion_grace_period_seconds

deletion_grace_period_seconds: Optional[int] = None

deletion_timestamp

deletion_timestamp: Optional[Time] = None

dns_config

dns_config: Optional[PodDNSConfig] = None

dns_policy

dns_policy: Optional[str] = None

entrypoint

entrypoint: Optional[str] = None

executor

executor: Optional[ExecutorConfig] = None

finalizers

finalizers: Optional[List[str]] = None

generate_name

generate_name: Optional[str] = None

generation

generation: Optional[int] = None

hooks

hooks: Optional[Dict[str, LifecycleHook]] = None

host_aliases

host_aliases: Optional[List[HostAlias]] = None

host_network

host_network: Optional[bool] = None

image_pull_secrets

image_pull_secrets: ImagePullSecretsT = None

kind

kind: Optional[str] = None

labels

labels: Optional[Dict[str, str]] = None

managed_fields

managed_fields: Optional[List[ManagedFieldsEntry]] = None

metrics

metrics: MetricsT = None

name

name: Optional[str] = None

namespace

namespace: Optional[str] = None

node_selector

node_selector: Optional[Dict[str, str]] = None

on_exit

on_exit: Optional[Union[str, Templatable]] = None

owner_references

owner_references: Optional[List[OwnerReference]] = None

parallelism

parallelism: Optional[int] = None

pod_disruption_budget

pod_disruption_budget: Optional[PodDisruptionBudgetSpec] = (
    None
)

pod_gc

pod_gc: Optional[PodGC] = None

pod_metadata

pod_metadata: Optional[Metadata] = None

pod_priority_class_name

pod_priority_class_name: Optional[str] = None

pod_spec_patch

pod_spec_patch: Optional[str] = None

priority

priority: Optional[int] = None

resource_version

resource_version: Optional[str] = None

retry_strategy

retry_strategy: Optional[
    Union[RetryStrategy, RetryStrategy]
] = None

scheduler_name

scheduler_name: Optional[str] = None

security_context

security_context: Optional[PodSecurityContext] = None
self_link: Optional[str] = None

service_account_name

service_account_name: Optional[str] = None

shutdown

shutdown: Optional[str] = None

status

status: Optional[WorkflowStatus] = None

suspend

suspend: Optional[bool] = None

synchronization

synchronization: Optional[Synchronization] = None

template_defaults

template_defaults: Optional[Template] = None

templates

templates: List[Union[Template, Templatable]] = field(
    default_factory=list
)

tolerations

tolerations: Optional[List[Toleration]] = None

ttl_strategy

ttl_strategy: Optional[TTLStrategy] = None

uid

uid: Optional[str] = None

volume_claim_gc

volume_claim_gc: Optional[VolumeClaimGC] = None

volume_claim_templates

volume_claim_templates: Optional[
    List[PersistentVolumeClaim]
] = None

volumes

volumes: VolumesT = None

workflow_metadata

workflow_metadata: Optional[WorkflowMetadata] = None

workflow_template_ref

workflow_template_ref: Optional[WorkflowTemplateRef] = None

workflows_service

workflows_service: Optional[
    Union[WorkflowsService, AsyncWorkflowsService]
] = None

ModelMapper

Source code in src/hera/workflows/_meta_mixins.py
class ModelMapper:
    def __init__(self, model_path: str, hera_builder: Optional[Callable] = None):
        self.model_path = []
        self.builder = hera_builder

        if not model_path:
            # Allows overriding parent attribute annotations to remove the mapping
            return

        self.model_path = model_path.split(".")
        curr_class: Type[APIBaseModel] = self._get_model_class()
        for key in self.model_path:
            fields = get_fields(curr_class)
            if key not in fields:
                raise ValueError(f"Model key '{key}' does not exist in class {curr_class}")
            curr_class = fields[key].annotation  # type: ignore

    @classmethod
    def _get_model_class(cls) -> Type[APIBaseModel]:
        raise NotImplementedError

    @classmethod
    def build_model(
        cls, hera_class: Type[ModelMapperMixin], hera_obj: ModelMapperMixin, model: TWorkflow
    ) -> TWorkflow:
        for attr, annotation in hera_class._get_all_annotations().items():
            if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
                if len(mappers) != 1:
                    raise ValueError("Expected only one ModelMapper")

                # Value comes from builder function if it exists on hera_obj, otherwise directly from the attr
                value = (
                    getattr(hera_obj, mappers[0].builder.__name__)()
                    if mappers[0].builder is not None
                    else getattr(hera_obj, attr)
                )
                if value is not None:
                    _set_model_attr(model, mappers[0].model_path, value)

        return model

builder

builder = hera_builder

model_path

model_path = split('.')

build_model

build_model(
    hera_class: Type[ModelMapperMixin],
    hera_obj: ModelMapperMixin,
    model: TWorkflow,
) -> TWorkflow
Source code in src/hera/workflows/_meta_mixins.py
@classmethod
def build_model(
    cls, hera_class: Type[ModelMapperMixin], hera_obj: ModelMapperMixin, model: TWorkflow
) -> TWorkflow:
    for attr, annotation in hera_class._get_all_annotations().items():
        if mappers := get_annotated_metadata(annotation, ModelMapperMixin.ModelMapper):
            if len(mappers) != 1:
                raise ValueError("Expected only one ModelMapper")

            # Value comes from builder function if it exists on hera_obj, otherwise directly from the attr
            value = (
                getattr(hera_obj, mappers[0].builder.__name__)()
                if mappers[0].builder is not None
                else getattr(hera_obj, attr)
            )
            if value is not None:
                _set_model_attr(model, mappers[0].model_path, value)

    return model

async_create

async_create(
    wait: bool = True, poll_interval: int = 5
) -> TWorkflow

Creates the Workflow on the Argo cluster. Note that wait is True by default as this is an async function.

Parameters

wait: bool = True If false then the workflow is created and the function returns immediately. If true then the workflow is created and the function blocks until the workflow is done executing. poll_interval: int = 5 The interval in seconds to poll the workflow status if wait is true. Ignored when wait is false.

Source code in src/hera/workflows/workflow.py
async def async_create(self, wait: bool = True, poll_interval: int = 5) -> TWorkflow:
    """Creates the Workflow on the Argo cluster. Note that `wait` is `True` by default as this is an async function.

    Parameters
    ----------
    wait: bool = True
        If false then the workflow is created and the function returns immediately.
        If true then the workflow is created and the function blocks until the workflow is done executing.
    poll_interval: int = 5
        The interval in seconds to poll the workflow status if wait is true. Ignored when wait is false.
    """
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"

    wf = await self.workflows_service.create_workflow(
        WorkflowCreateRequest(workflow=self.build()),  # type: ignore
        namespace=self.namespace,
    )
    # set the workflow name to the name returned by the API, which helps cover the case of users relying on
    # `generate_name=True`
    self.name = wf.metadata.name

    if wait:
        return await self.async_wait(poll_interval=poll_interval)
    return wf

async_lint

async_lint() -> TWorkflow

Lints the Workflow using the Argo cluster.

Source code in src/hera/workflows/workflow.py
async def async_lint(self) -> TWorkflow:
    """Lints the Workflow using the Argo cluster."""
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return await self.workflows_service.lint_workflow(
        WorkflowLintRequest(workflow=self.build()),  # type: ignore
        namespace=self.namespace,
    )

async_wait

async_wait(poll_interval: int = 5) -> TWorkflow

Waits for the Workflow to complete execution.

Parameters

poll_interval: int = 5 The interval in seconds to poll the workflow status.

Source code in src/hera/workflows/workflow.py
async def async_wait(self, poll_interval: int = 5) -> TWorkflow:
    """Waits for the Workflow to complete execution.

    Parameters
    ----------
    poll_interval: int = 5
        The interval in seconds to poll the workflow status.
    """
    assert isinstance(self.workflows_service, AsyncWorkflowsService), "workflows service not initialized"
    assert self.namespace is not None, "workflow namespace not defined"
    assert self.name is not None, "workflow name not defined"

    # here we use the sleep interval to wait for the workflow post creation. This is to address a potential
    # race conditions such as:
    # 1. Argo server says "workflow was accepted" but the workflow is not yet created
    # 2. Hera wants to verify the status of the workflow, but it's not yet defined because it's not created
    # 3. Argo finally creates the workflow
    # 4. Hera throws an `AssertionError` because the phase assertion fails
    await asyncio.sleep(poll_interval)
    wf = await self.workflows_service.get_workflow(self.name, namespace=self.namespace)
    assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

    assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
    assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
    status = WorkflowStatus.from_argo_status(wf.status.phase)

    # keep polling for workflow status until completed, at the interval dictated by the user
    while status == WorkflowStatus.running:
        await asyncio.sleep(poll_interval)
        wf = await self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
        assert wf.metadata.name is not None
        assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
        assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
        status = WorkflowStatus.from_argo_status(wf.status.phase)
    return wf

build

build() -> TWorkflow

Builds the Workflow and its components into an Argo schema Workflow object.

Source code in src/hera/workflows/workflow.py
def build(self) -> TWorkflow:
    """Builds the Workflow and its components into an Argo schema Workflow object."""
    self = self._dispatch_hooks()

    model_workflow = _ModelWorkflow(
        metadata=ObjectMeta(),
        spec=_ModelWorkflowSpec(),
    )
    return _WorkflowModelMapper.build_model(Workflow, self, model_workflow)

create

create(
    wait: bool = False, poll_interval: int = 5
) -> TWorkflow

Creates the Workflow on the Argo cluster.

Parameters

wait: bool = False If false then the workflow is created and the function returns immediately after the server creates the Workflow. If true then the workflow is created and the function blocks until the workflow is done executing. poll_interval: int = 5 The interval in seconds to poll the workflow status if wait is true. Ignored when wait is false.

Source code in src/hera/workflows/workflow.py
def create(self, wait: bool = False, poll_interval: int = 5) -> TWorkflow:
    """Creates the Workflow on the Argo cluster.

    Parameters
    ----------
    wait: bool = False
        If false then the workflow is created and the function returns immediately after the server
        creates the Workflow.
        If true then the workflow is created and the function blocks until the workflow is done executing.
    poll_interval: int = 5
        The interval in seconds to poll the workflow status if wait is true. Ignored when wait is false.
    """
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"

    wf = self.workflows_service.create_workflow(
        WorkflowCreateRequest(workflow=self.build()),  # type: ignore
        namespace=self.namespace,
    )
    # set the workflow name to the name returned by the API, which helps cover the case of users relying on
    # `generate_name=True`
    self.name = wf.metadata.name

    if wait:
        return self.wait(poll_interval=poll_interval)
    return wf

from_dict

from_dict(model_dict: Dict) -> ModelMapperMixin

Create a Workflow from a Workflow contained in a dict.

Examples:

>>> my_workflow = Workflow(name="my-workflow")
>>> my_workflow == Workflow.from_dict(my_workflow.to_dict())
True
Source code in src/hera/workflows/workflow.py
@classmethod
def from_dict(cls, model_dict: Dict) -> ModelMapperMixin:
    """Create a Workflow from a Workflow contained in a dict.

    Examples:
        >>> my_workflow = Workflow(name="my-workflow")
        >>> my_workflow == Workflow.from_dict(my_workflow.to_dict())
        True
    """
    return cls._from_dict(model_dict, _ModelWorkflow)

from_file

from_file(yaml_file: Union[Path, str]) -> ModelMapperMixin

Create a Workflow from a Workflow contained in a YAML file.

Examples:

>>> yaml_file = Path(...)
>>> my_workflow = Workflow.from_file(yaml_file)
Source code in src/hera/workflows/workflow.py
@classmethod
def from_file(cls, yaml_file: Union[Path, str]) -> ModelMapperMixin:
    """Create a Workflow from a Workflow contained in a YAML file.

    Examples:
        >>> yaml_file = Path(...)
        >>> my_workflow = Workflow.from_file(yaml_file)
    """
    return cls._from_file(yaml_file, _ModelWorkflow)

from_yaml

from_yaml(yaml_str: str) -> ModelMapperMixin

Create a Workflow from a Workflow contained in a YAML string.

Examples:

>>> my_workflow = Workflow.from_yaml(yaml_str)
Source code in src/hera/workflows/workflow.py
@classmethod
def from_yaml(cls, yaml_str: str) -> ModelMapperMixin:
    """Create a Workflow from a Workflow contained in a YAML string.

    Examples:
        >>> my_workflow = Workflow.from_yaml(yaml_str)
    """
    return cls._from_yaml(yaml_str, _ModelWorkflow)

get_parameter

get_parameter(name: str) -> Parameter

Attempts to find and return a Parameter of the specified name.

Source code in src/hera/workflows/workflow.py
def get_parameter(self, name: str) -> Parameter:
    """Attempts to find and return a `Parameter` of the specified name."""
    arguments = self._build_arguments()
    if arguments is None:
        raise KeyError("Workflow has no arguments set")
    if arguments.parameters is None:
        raise KeyError("Workflow has no argument parameters set")

    parameters = arguments.parameters
    if next((p for p in parameters if p.name == name), None) is None:
        raise KeyError(f"`{name}` is not a valid workflow parameter")
    return Parameter(name=name, value=f"{{{{workflow.parameters.{name}}}}}")
get_workflow_link() -> str

Returns the workflow link for the workflow.

Source code in src/hera/workflows/workflow.py
def get_workflow_link(self) -> str:
    """Returns the workflow link for the workflow."""
    assert self.workflows_service is not None, "Cannot fetch a workflow link without a service"
    assert self.name is not None, "Cannot fetch a workflow link without a workflow name"
    return self.workflows_service.get_workflow_link(self.name)

lint

lint() -> TWorkflow

Lints the Workflow using the Argo cluster.

Source code in src/hera/workflows/workflow.py
def lint(self) -> TWorkflow:
    """Lints the Workflow using the Argo cluster."""
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace, "workflow namespace not defined"
    return self.workflows_service.lint_workflow(
        WorkflowLintRequest(workflow=self.build()),  # type: ignore
        namespace=self.namespace,
    )

to_dict

to_dict() -> Any

Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary.

Source code in src/hera/workflows/workflow.py
def to_dict(self) -> Any:
    """Builds the Workflow as an Argo schema Workflow object and returns it as a dictionary."""
    return self.build().model_dump(exclude_none=True, by_alias=True)

to_file

to_file(
    output_directory: Union[Path, str] = ".",
    name: str = "",
    *args,
    **kwargs,
) -> Path

Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

Parameters:

Name Type Description Default
output_directory Union[Path, str]

The directory to write the file to. Defaults to the current working directory.

'.'
name str

The name of the file to write without the file extension. Defaults to the Workflow’s name or a generated name.

''
*args

Additional arguments to pass to yaml.dump.

()
**kwargs

Additional keyword arguments to pass to yaml.dump.

{}
Source code in src/hera/workflows/workflow.py
def to_file(self, output_directory: Union[Path, str] = ".", name: str = "", *args, **kwargs) -> Path:
    """Writes the Workflow as an Argo schema Workflow object to a YAML file and returns the path to the file.

    Args:
        output_directory: The directory to write the file to. Defaults to the current working directory.
        name: The name of the file to write without the file extension.  Defaults to the Workflow's name or a
              generated name.
        *args: Additional arguments to pass to `yaml.dump`.
        **kwargs: Additional keyword arguments to pass to `yaml.dump`.
    """
    workflow_name = self.name or (self.generate_name or "workflow").rstrip("-")
    name = name or workflow_name
    output_directory = Path(output_directory)
    output_path = Path(output_directory) / f"{name}.yaml"
    output_directory.mkdir(parents=True, exist_ok=True)
    output_path.write_text(self.to_yaml(*args, **kwargs))
    return output_path.absolute()

to_yaml

to_yaml(*args, **kwargs) -> str

Builds the Workflow as an Argo schema Workflow object and returns it as yaml string.

Source code in src/hera/workflows/workflow.py
def to_yaml(self, *args, **kwargs) -> str:
    """Builds the Workflow as an Argo schema Workflow object and returns it as yaml string."""

    def human_readable_ordering(kv: tuple) -> int:
        """Key ordering function for ordering in a more human-readable fashion.

        Ordering is:
        1. "name" keys always first (if present)
        2. Primitives (not dicts/lists)
        3. lists
        4. dict
        """
        k, v = kv
        if k == "name" and isinstance(v, str):
            return 0
        if not isinstance(v, (dict, list)):
            return 1
        if isinstance(v, list):
            return 2
        return 3

    def order_dict(d: dict) -> dict[str, Any]:
        """Recursively orders `d` by the custom_ordering function by inserting them into a copy of the dict in order."""
        d_copy: dict[str, Any] = dict()
        for k, v in sorted(d.items(), key=lambda x: (human_readable_ordering(x), x[0])):
            if isinstance(v, dict):
                d_copy[k] = order_dict(v)
            elif isinstance(v, list):
                if v and isinstance(v[0], dict):
                    d_copy[k] = [order_dict(i) if isinstance(i, dict) else i for i in v]
                elif v and isinstance(v[0], list):
                    d_copy[k] = [[order_dict(i) for i in sublist] for sublist in v]
                else:
                    d_copy[k] = v
            else:
                d_copy[k] = v
        return d_copy

    human_ordered_dict = order_dict(self.to_dict())
    return _yaml.dump(human_ordered_dict, *args, **kwargs)

wait

wait(poll_interval: int = 5) -> TWorkflow

Waits for the Workflow to complete execution.

Parameters

poll_interval: int = 5 The interval in seconds to poll the workflow status.

Source code in src/hera/workflows/workflow.py
def wait(self, poll_interval: int = 5) -> TWorkflow:
    """Waits for the Workflow to complete execution.

    Parameters
    ----------
    poll_interval: int = 5
        The interval in seconds to poll the workflow status.
    """
    assert isinstance(self.workflows_service, WorkflowsService), "workflows service not initialized"
    assert self.namespace is not None, "workflow namespace not defined"
    assert self.name is not None, "workflow name not defined"

    # here we use the sleep interval to wait for the workflow post creation. This is to address a potential
    # race conditions such as:
    # 1. Argo server says "workflow was accepted" but the workflow is not yet created
    # 2. Hera wants to verify the status of the workflow, but it's not yet defined because it's not created
    # 3. Argo finally creates the workflow
    # 4. Hera throws an `AssertionError` because the phase assertion fails
    time.sleep(poll_interval)
    wf = self.workflows_service.get_workflow(self.name, namespace=self.namespace)
    assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

    assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
    assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
    status = WorkflowStatus.from_argo_status(wf.status.phase)

    # keep polling for workflow status until completed, at the interval dictated by the user
    while status == WorkflowStatus.running:
        time.sleep(poll_interval)
        wf = self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
        assert wf.metadata.name is not None
        assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
        assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
        status = WorkflowStatus.from_argo_status(wf.status.phase)
    return wf

Comments