Putting it together

Putting it together#

So far this has been a complicated (and slow) way to compute the mean of some small arrays, so let’s look at a real example where we compute ENSO from a model run. Since this is essentially a linear pipeline we could write this as a single python function, but this shows the ability to compose dags, and utilize xarray functionality. First, we create some json files that define the different processes we want to apply:

load_region_data.json#
{
    "dag": [
        {
            "name": "data",
            "function": "xr.open_mfdataset",
            "args": ["input_file"],
            "kwargs": {"engine": "netcdf4", "parallel": true}
        },
        {
            "name": "region_data",
            "function": "select_region",
            "args": ["data"],
            "kwargs": {"region": {"lat": [-10, 10], "lon": [120, 300]}}
        }
    ],
    "output": "region_data"
}

Now we have a set of procedures we can apply to some data in a variety of ways:

Python#

from canproc import DAG, merge
from canproc.runners import DaskRunner
import json

# Generate a DAG from the JSON files
dags = []
for file in ["load_region_data.json", "monthly_anomalies.json", "enso.json", "to_netcdf.json"]:
    dags.append(DAG(**json.load(open(file, "r"))))
dag = merge(dags)

# run the DAG using dask
runner = DaskRunner()
runner.run(dag)

Command Line#

This can be ran from the command line using:

canproc-run "load_region_data.json" "monthly_anomalies.json" "enso.json" "to_netcdf.json"

Remote Procedure Calls#

Or, if we wanted we could spin up a small web server so we could compute DAGs remotely. For FastAPI, a simple endpoint might look like:

@app.post("/dag")
async def run_dag(dag: DAG):

    runner = DaskRunner("threads")
    return runner.run(dag)

Note

This is more useful if you have something like to_geojson as the final stage of the DAG instead of to_netcdf. See Extending canesm-processor for more information on how to include your own functions.