DAG Functionality#

DAG Process#

class canproc.DAGProcess(*, name: str, function: Callable | str, args: list | None = None, kwargs: dict | None = None)#

A single processing node in a DAG

name#

name of the node. Used to connect multiple nodes in a DAG.

Type:

str

function#

A function used to process the node. If a string then this is converted to a function.

Type:

Callable | str

args#

List of input arguments to the function.

Type:

list

kwargs#

Dictionary of key word arguments to the function.

Type:

dict

Examples

>>> proc = DAGProcess(name='make_array', function=np.arange, args=[8])

DAG#

class canproc.DAG(*, dag: list[DAGProcess], output: str | list[str])#

A directed acyclic graph defines the program flow. Each node in the graph is a process to be ran, and each edge (the node connections) are the input and outputs of the process. Importantly, a DAG need not be a simple linear pipeline, but may include parallel branching and execution so long as it does not include a cycle (as this would cause infinite recursion).

dag#

list of DAGProcess that make up the directed acyclic graph

Type:

list[DAGProcess]

output#

Name(s) of the graph edges to return

Type:

str | list[str]

#TODO
Type:

Add validator to check for cyclic graphs

Merge#

canproc.merge(dags: list[DAG], keep_intermediate: bool = False) DAG#

Merge multiple DAGs into a single graph

Parameters:
  • dags (list[DAG]) – the DAGs to be merged

  • keep_intermediate (bool, optional) – if True the output of all DAGs is returned. If False, only the output of the final DAG is kept. By default False

Returns:

DAG – Merged DAG