Skip to content

Dag

The hera.workflows.dag module provides the DAG class.

See https://argoproj.github.io/argo-workflows/walk-through/dag for more on DAGs (Directed Acyclic Graphs) in Argo Workflows.

DAG

A DAG template invocator is used to define Task dependencies as an acyclic graph.

DAG implements the contextmanager interface so allows usage of with, under which any Task objects instantiated will be added to the DAG’s list of Tasks.

See the DAG examples for usage.

Source code in src/hera/workflows/dag.py
@dataclass(kw_only=True)
class DAG(
    IOMixin,
    TemplateMixin,
    CallableTemplateMixin,
    ContextMixin,
):
    """A DAG template invocator is used to define Task dependencies as an acyclic graph.

    DAG implements the contextmanager interface so allows usage of `with`, under which any Task
    objects instantiated will be added to the DAG's list of Tasks.

    See the [DAG examples](../../../examples/workflows/dags/dag_diamond_with_script.md) for usage.
    """

    fail_fast: Optional[bool] = None
    target: Optional[str] = None
    tasks: List[Union[Task, DAGTask]] = field(default_factory=list)

    _node_names: Set[str] = field(default_factory=set)
    _current_task_depends: Set[str] = field(default_factory=set)

    def _add_sub(self, node: Any):
        if isinstance(node, Templatable):
            from hera.workflows.workflow import Workflow

            # We must be under a workflow context due to checks in _HeraContext.add_sub_node
            assert _context.pieces and isinstance(_context.pieces[0], Workflow)
            _context.pieces[0]._add_sub(node)
            return

        if not isinstance(node, Task):
            raise InvalidType(type(node))
        if node.name in self._node_names:
            raise NodeNameConflict(f"Found multiple Task nodes with name: {node.name}")
        self._node_names.add(node.name)
        self.tasks.append(node)

    def _build_template(self) -> _ModelTemplate:
        """Builds the auto-generated `Template` representation of the `DAG`."""
        tasks = []
        for task in self.tasks:
            if isinstance(task, Task):
                tasks.append(task._build_dag_task())
            else:
                tasks.append(task)
        return _ModelTemplate(
            active_deadline_seconds=IntOrString(root=self.active_deadline_seconds)
            if self.active_deadline_seconds
            else None,
            affinity=self.affinity,
            archive_location=self.archive_location,
            automount_service_account_token=self.automount_service_account_token,
            daemon=self.daemon,
            dag=_ModelDAGTemplate(fail_fast=self.fail_fast, target=self.target, tasks=tasks),
            executor=self.executor,
            fail_fast=self.fail_fast,
            host_aliases=self.host_aliases,
            init_containers=self._build_init_containers(),
            inputs=self._build_inputs(),
            memoize=self.memoize,
            metadata=self._build_metadata(),
            metrics=self._build_metrics(),
            name=self.name,
            node_selector=self.node_selector,
            outputs=self._build_outputs(),
            parallelism=self.parallelism,
            plugin=self.plugin,
            pod_spec_patch=self.pod_spec_patch,
            priority_class_name=self.priority_class_name,
            retry_strategy=self._build_retry_strategy(),
            scheduler_name=self.scheduler_name,
            security_context=self.pod_security_context,
            service_account_name=self.service_account_name,
            sidecars=self._build_sidecars(),
            synchronization=self.synchronization,
            timeout=self.timeout,
            tolerations=self.tolerations,
        )

active_deadline_seconds

active_deadline_seconds: Optional[int | str] = None

affinity

affinity: Optional[Affinity] = None

annotations

annotations: Optional[Dict[str, str]] = None

archive_location

archive_location: Optional[ArtifactLocation] = None

automount_service_account_token

automount_service_account_token: Optional[bool] = None

daemon

daemon: Optional[bool] = None

executor

executor: Optional[ExecutorConfig] = None

fail_fast

fail_fast: Optional[bool] = None

host_aliases

host_aliases: Optional[List[HostAlias]] = None

init_containers

init_containers: Optional[
    List[Union[UserContainer, UserContainer]]
] = None

inputs

inputs: InputsT = None

labels

labels: Optional[Dict[str, str]] = None

memoize

memoize: Optional[Memoize] = None

metrics

metrics: Optional[MetricsT] = None

name

name: Optional[str] = None

node_selector

node_selector: Optional[Dict[str, str]] = None

outputs

outputs: OutputsT = None

parallelism

parallelism: Optional[int] = None

plugin

plugin: Optional[Plugin] = None

pod_security_context

pod_security_context: Optional[PodSecurityContext] = None

pod_spec_patch

pod_spec_patch: Optional[str] = None

priority_class_name

priority_class_name: Optional[str] = None

retry_strategy

retry_strategy: Optional[
    Union[RetryStrategy, RetryStrategy]
] = None

scheduler_name

scheduler_name: Optional[str] = None

service_account_name

service_account_name: Optional[str] = None

sidecars

sidecars: Optional[
    OneOrMany[UserContainer | UserContainer]
] = None

synchronization

synchronization: Optional[Synchronization] = None

target

target: Optional[str] = None

tasks

tasks: List[Union[Task, DAGTask]] = field(
    default_factory=list
)

timeout

timeout: Optional[str] = None

tolerations

tolerations: Optional[List[Toleration]] = None

get_artifact

get_artifact(name: str) -> Artifact

Finds and returns the artifact with the supplied name.

Note that this method will raise an error if the artifact is not found.

Parameters:

Name Type Description Default
name str

name of the input artifact to find and return.

required

Returns:

Name Type Description
Artifact Artifact

the artifact with the supplied name.

Raises:

Type Description
KeyError

if the artifact is not found.

Source code in src/hera/workflows/_mixins.py
def get_artifact(self, name: str) -> Artifact:
    """Finds and returns the artifact with the supplied name.

    Note that this method will raise an error if the artifact is not found.

    Args:
        name: name of the input artifact to find and return.

    Returns:
        Artifact: the artifact with the supplied name.

    Raises:
        KeyError: if the artifact is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Artifact {name} not found.")
    if inputs.artifacts is None:
        raise KeyError(f"No artifacts set. Artifact {name} not found.")
    for artifact in inputs.artifacts:
        if artifact.name == name:
            return Artifact(name=name, from_=f"{{{{inputs.artifacts.{artifact.name}}}}}")
    raise KeyError(f"Artifact {name} not found.")

get_parameter

get_parameter(name: str) -> Parameter

Finds and returns the parameter with the supplied name.

Note that this method will raise an error if the parameter is not found.

Parameters:

Name Type Description Default
name str

name of the input parameter to find and return.

required

Returns:

Name Type Description
Parameter Parameter

the parameter with the supplied name.

Raises:

Type Description
KeyError

if the parameter is not found.

Source code in src/hera/workflows/_mixins.py
def get_parameter(self, name: str) -> Parameter:
    """Finds and returns the parameter with the supplied name.

    Note that this method will raise an error if the parameter is not found.

    Args:
        name: name of the input parameter to find and return.

    Returns:
        Parameter: the parameter with the supplied name.

    Raises:
        KeyError: if the parameter is not found.
    """
    inputs = self._build_inputs()
    if inputs is None:
        raise KeyError(f"No inputs set. Parameter {name} not found.")
    if inputs.parameters is None:
        raise KeyError(f"No parameters set. Parameter {name} not found.")
    for p in inputs.parameters:
        if p.name == name:
            param = Parameter.from_model(p)
            param.value = f"{{{{inputs.parameters.{param.name}}}}}"
            return param
    raise KeyError(f"Parameter {name} not found.")

Comments