Skip to content

Workflow status

The hera.workflows.workflow_status module provides lightweight functionality for representing and interacting with workflow statuses.

See the Workflow Status example for usage.

WorkflowStatus

Placeholder for workflow statuses.

Source code in src/hera/workflows/workflow_status.py
class WorkflowStatus(str, Enum):
    """Placeholder for workflow statuses."""

    pending = "Pending"
    running = "Running"
    succeeded = "Succeeded"
    failed = "Failed"
    error = "Error"
    terminated = "Terminated"

    def __str__(self) -> str:
        """Returns the value representation of the workflow status enum."""
        return str(self.value)

    @classmethod
    def from_argo_status(cls, s: str) -> "WorkflowStatus":
        """Turns an Argo status into a Hera workflow status representation."""
        switch = {
            "Pending": WorkflowStatus.pending,
            "Running": WorkflowStatus.running,
            "Succeeded": WorkflowStatus.succeeded,
            "Failed": WorkflowStatus.failed,
            "Error": WorkflowStatus.error,
            "Terminated": WorkflowStatus.terminated,
        }

        ss = switch.get(s)
        if not ss:
            raise KeyError(f"Unrecognized status {s}. Available Argo statuses are: {list(switch.keys())}")
        return ss

error

error = 'Error'

failed

failed = 'Failed'

pending

pending = 'Pending'

running

running = 'Running'

succeeded

succeeded = 'Succeeded'

terminated

terminated = 'Terminated'

from_argo_status

from_argo_status(s: str) -> WorkflowStatus

Turns an Argo status into a Hera workflow status representation.

Source code in src/hera/workflows/workflow_status.py
@classmethod
def from_argo_status(cls, s: str) -> "WorkflowStatus":
    """Turns an Argo status into a Hera workflow status representation."""
    switch = {
        "Pending": WorkflowStatus.pending,
        "Running": WorkflowStatus.running,
        "Succeeded": WorkflowStatus.succeeded,
        "Failed": WorkflowStatus.failed,
        "Error": WorkflowStatus.error,
        "Terminated": WorkflowStatus.terminated,
    }

    ss = switch.get(s)
    if not ss:
        raise KeyError(f"Unrecognized status {s}. Available Argo statuses are: {list(switch.keys())}")
    return ss

Comments