Developing Workflows

Introduction

This dev-guide introduces how to integrate your custom workflow into your Kaapana platform. By doing so, you can leverage Kaapana’s infrastructure and extend the platforms capabilities to process data with your workflow. The platform’s data (provided by Kaapana’s storage stack) can be explored and curated to specific datasets in the “Datasets” view, or further inspected via the Meta-Dashboard. In order to integrate your custom workflow, we will use the python API for Apache Airflow to create Directed Acyclic Graphs (DAGs).

Write your first own DAG

Aim: In this chapter we create a DAG that converts DICOMs to .nrrd-files.

Hint

DAGs are stored on the host machine inside the subdirectory workflows/dags of the FAST_DATA_DIR. By default FAST_DATA_DIR=/home/kaapana. In this directory any file that defines a dag is automatically identified and the DAG is available on the platform.

In order to deploy a new DAG that converts DICOMs to .nrrd-files, create a file called dag_example_dcm2nrrd.py inside the dags-folder with the following content. You can add this file either directly to the folder, if you have access to the host machine or use the internal code server of Kapaana. You can install the code-server-chart in the extensions tab and open it with the blue icon right next to it.

from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.models import DAG
from kaapana.operators.DcmConverterOperator import DcmConverterOperator
from kaapana.operators.LocalGetInputDataOperator import LocalGetInputDataOperator
from kaapana.operators.LocalWorkflowCleanerOperator import LocalWorkflowCleanerOperator
from kaapana.operators.LocalMinioOperator import LocalMinioOperator


log = LoggingMixin().log

ui_forms = {
    "workflow_form": {
        "type": "object",
        "properties": {
            "single_execution": {
                "title": "single execution",
                "description": "Should each series be processed separately?",
                "type": "boolean",
                "default": False,
                "readOnly": False,
            }
        },
    }
}

args = {
    "ui_forms": ui_forms,
    "ui_visible": True,
    "owner": "kaapana",
    "start_date": days_ago(0),
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
}

dag = DAG(dag_id="example-dcm2nrrd", default_args=args, schedule_interval=None)


get_input = LocalGetInputDataOperator(dag=dag)
convert = DcmConverterOperator(dag=dag, input_operator=get_input, output_format="nrrd")
put_to_minio = LocalMinioOperator(
    dag=dag, action="put", action_operators=[convert], file_white_tuples=(".nrrd")
)
clean = LocalWorkflowCleanerOperator(dag=dag, clean_workflow_dir=True)

get_input >> convert >> put_to_minio >> clean

That’s it basically. Now we can check if the DAG is successfully added to Airflow and then we can test our workflow!

  • Go to Sytem -> Airflow and check if your newly added DAG example-dcm2nrrd appears in the DAG list (it might take up to five minutes until airflow recognizes the DAG! Alternatively you could restart the Airflow Pod in Kubernetes)

  • If there is an error in the created DAG file like indexing, library imports, etc., you will see an error at the top of the Airflow page

  • Go to the Workflows -> Workflow Execution

  • Select your workflow from the workflow drop-down menu

  • Optionally set a custom workflow name for your workflow execution

  • Select an available dataset from the dataset drop-down menu

  • Submit your workflow to start the execution

  • You will be redirected to the Workflow List view, where your workflow run example-dcm2nrrd-<workflow-id> is visualized as an additional row with further details about the workflow and jobs which are running as part of your workflow.

  • If everything was successful you can go to Store -> Minio where you will find a bucket called example-dcm2nrrd. Inside this folder you will find the .nrrd files of the selected images.

Write a DAG that utilizes a new processing algorithm

Aim: Create a new operator that executes a processing algorithm. Create a DAG that utilizes this new operator.

We will focus on the development of the algorithm directly on the Kaapana platform. This means we will utilize the functionality of Kaapana to start a code-server that mounts files from within the container, where the algorithm should run. We will use this code-server to develop the algorithm directly in the environment of this container. The subsection Alternative: Develop the workflow locally explains how the environment in the container can be mimiced in order to develop an algorithm locally.

In this example the algorithm is a script that extracts the study id of a DICOM study. We embed this algorithm into a DAG, that performs the following tasks:

  1. Get dicom data from the Kaapana PACS

  2. Extract the study ids

  3. Save the study ids as a json file into a minio bucket

  4. Clean the workflow directory

We will develop the DAG in the following steps:

  1. Build an empty base image and push it to the private registry

  2. Create an operator that pulls this image and starts the container. Create the DAG file that executes the operator

  3. Start the workflow, enter the code-server via Pending Applications and develop the processing algorithm

  4. Adjust the image from step 1 to execute the algorithm developed in step 3. Build and push again

Step 1: Provide an empty base image

To develop an algorithm within a container we have to provide an image to start with. Since we provide the algorithm as a python script, we start with a minimal python image:

Dockerfile
FROM local-only/base-python-cpu:latest
LABEL IMAGE="python-template"
LABEL VERSION="0.1.0"
LABEL BUILD_IGNORE="True"

Important

To access the base images for our container (like local-only/base-python-cpu:latest) we can either build them individualy following How to build the base images or build the whole platform Build Kaapana. Building only the base images and not the whole platform is in general recommended as it is faster and takes less disk space on your system.

Hint

If docker containers should be build on a system with a proxy configured, please make sure to configure docker correctly.

To utilize our base image, we have to build and push it to our registry.

docker build -t <docker-registry><docker-repo>/example-extract-study-id:0.1.0 .
docker push <docker-registry><docker-repo>/example-extract-study-id:0.1.0

Since we just used a generic python image as a template for our algorithm and made it available in the Kaapana registry, we can also reuse it for any other python based algorithm.

Step 2: Create the operator and the DAG

Now we create the operator, that pulls the base image and starts the container. Additionally we create the DAG. We can create both files via the code-server extension analogous to how we wrote the DAG in Write your first own DAG.

We define the operator in a file ExtractStudyIdOperator.py in dags/example:

ExtractStudyIdOperator.py
from datetime import timedelta
from kaapana.operators.KaapanaBaseOperator import KaapanaBaseOperator
from kaapana.blueprints.kaapana_global_variables import (
    DEFAULT_REGISTRY,
    KAAPANA_BUILD_VERSION,
)


class ExtractStudyIdOperator(KaapanaBaseOperator):
    def __init__(
        self,
        dag,
        name="extract-study-id",
        execution_timeout=timedelta(seconds=30),
        *args,
        **kwargs,
    ):
        super().__init__(
            dag=dag,
            name=name,
            image=f"{DEFAULT_REGISTRY}/example-extract-study-id:{KAAPANA_BUILD_VERSION}",
            image_pull_secrets=["registry-secret"],
            execution_timeout=execution_timeout,
            *args,
            **kwargs,
        )

The DAG file dag_example_extract_study_id.py has to be stored in dags/ and can look like this:

dag_example_extract_study_id.py
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.models import DAG

from kaapana.operators.LocalGetInputDataOperator import LocalGetInputDataOperator
from kaapana.operators.LocalMinioOperator import LocalMinioOperator
from kaapana.operators.LocalWorkflowCleanerOperator import LocalWorkflowCleanerOperator
from example.ExtractStudyIdOperator import ExtractStudyIdOperator


ui_forms = {
    "workflow_form": {
        "type": "object",
        "properties": {
            "single_execution": {
                "title": "single execution",
                "description": "Should each series be processed separately?",
                "type": "boolean",
                "default": False,
                "readOnly": False,
            }
        },
    }
}

log = LoggingMixin().log

args = {
    "ui_forms": ui_forms,
    "ui_visible": True,
    "owner": "kaapana",
    "start_date": days_ago(0),
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
}

dag = DAG(
    dag_id="example-dcm-extract-study-id", default_args=args, schedule_interval=None
)


get_input = LocalGetInputDataOperator(dag=dag)
extract = ExtractStudyIdOperator(
    dag=dag, input_operator=get_input, dev_server="code-server"
)
put_to_minio = LocalMinioOperator(dag=dag, action="put", action_operators=[extract])
clean = LocalWorkflowCleanerOperator(dag=dag, clean_workflow_dir=True)

get_input >> extract >> put_to_minio >> clean

The DAG is just a sequence of different operators. In this example the LocalGetInputDataOperator loads the data we want to work with. The ExtractStudyIdOperator loads (so far) our empty base image and utilizes the Kaapana code-server as development server to implement our algorithm inside the active container. We enable this behavior by setting argument dev_server="code-server" when we initialize the ExtractStudyIdOperator:

extract = ExtractStudyIdOperator(dag=dag, input_operator=get_input, dev_server="code-server")

Step 3: Start the workflow and implement the algorithm

Now it’s time to trigger the development workflow. Therefore, we go to Workflows -> Workflow Execution and select from the workflow drop-down menu our developed workflow example-dcm-extract-study-id and start the workflow. The workflow will run all operators until it comes to the ExtractStudyIdOperator which we have set by adding the dev_server="code-server" argument into dev-mode. Now we navigate to Workflows -> Pending Applications, click on the blue link icon besides the operator’s name and a dev-code-server is opened up and we can create, modify and run files inside the container.

https://www.kaapana.ai/kaapana-downloads/kaapana-docs/stable/img/dev_guide_pending.png

We can now implement and test our algorithm. In this example the algorithm is a python script, that extracts the study IDs from the loaded data and returns it.

Note

The code server looks for the app directory by default. When we use it as dev-server inside the docker container it will prompt an error message, that app does not exist. You can safely ignore this message and open the root directory of the container.

The python code of the algorithm which we want to integrate into our Kaapana platform is the following:

extract_study_id.py
import sys, os
import glob
import json
import pydicom
from datetime import datetime

# For local testng

# os.environ["WORKFLOW_DIR"] = "<your data directory>"
# os.environ["BATCH_NAME"] = "batch"
# os.environ["OPERATOR_IN_DIR"] = "get-input-data"
# os.environ["OPERATOR_OUT_DIR"] = "output"

# From the template
batch_folders = sorted(
    [
        f
        for f in glob.glob(
            os.path.join("/", os.environ["WORKFLOW_DIR"], os.environ["BATCH_NAME"], "*")
        )
    ]
)

for batch_element_dir in batch_folders:
    element_input_dir = os.path.join(batch_element_dir, os.environ["OPERATOR_IN_DIR"])
    element_output_dir = os.path.join(batch_element_dir, os.environ["OPERATOR_OUT_DIR"])

    # The processing algorithm
    print(f"Checking {element_input_dir} for dcm files")
    dcm_files = sorted(
        glob.glob(os.path.join(element_input_dir, "*.dcm*"), recursive=True)
    )

    if len(dcm_files) == 0:
        print("No dicom file found!")
        exit(1)
    else:
        print(f"Writing results to {element_output_dir}")
        print(("Extracting study_id: %s" % dcm_files[0]))

        incoming_dcm = pydicom.dcmread(dcm_files[0])
        json_dict = {
            "study_id": incoming_dcm.StudyInstanceUID,
            "series_uid": incoming_dcm.SeriesInstanceUID,
        }

        if not os.path.exists(element_output_dir):
            os.makedirs(element_output_dir)

        json_file_path = os.path.join(
            element_output_dir, "{}.json".format(os.path.basename(batch_element_dir))
        )

        with open(json_file_path, "w", encoding="utf-8") as jsonData:
            json.dump(json_dict, jsonData, indent=4, sort_keys=True, ensure_ascii=True)

We just store the python file in the directory /mounted/workflows/mounted_scripts inside the docker container as /extract_study_id.py.

To check if everything works as expected open a terminal in the code-server and run python3 extract-study-id.py.

Example standard output
Checking /kaapana/mounted/data/example-dcm-extract-study-id-230619080048556028/batch/1.3.12.2.1107.5.1.4.73104.30000020081307523376400012735/get-input-data for dcm files
Writing results to /kaapana/mounted/data/example-dcm-extract-study-id-230619080048556028/batch/1.3.12.2.1107.5.1.4.73104.30000020081307523376400012735/extract-study-id
Extracting study_id: /kaapana/mounted/data/example-dcm-extract-study-id-230619080048556028/batch/1.3.12.2.1107.5.1.4.73104.30000020081307523376400012735/get-input-data/1.3.12.2.1107.5.1.4.73104.30000020081307523376400012736.dcm

After we are finished, we can close the code-server browser tab and terminate the dev-code-server in the “Workflows -> Pending applications” tab of Kaapana, with the “FINISHED MANUAL INTERACTION” button.

Hint

The directory /mounted_scripts in the container of any operator initialized with the parameter dev_server="code-server" is also available in the code-server extension. Hence, you don’t have to worry that your files in this directory are lost after the container finishes.

Step 4: Adjust the base image from step 1 to execute the processing script

When we are finished with the implementation, we adjust the image from step 1 to execute the algorithm. To do so, we create a files directory beside the Dockerfile of the original image and put the extract_study_id.py script inside it. Then adjust the Dockerfile such that the container copies and executes the script.

Dockerfile
FROM local-only/base-python-cpu:latest

LABEL IMAGE="example-extract-study-id"
LABEL VERSION="0.1.0"
LABEL BUILD_IGNORE="False"

COPY files/extract_study_id.py /kaapana/app/
WORKDIR /kaapana/app/

CMD ["python3","-u","/kaapana/app/extract_study_id.py"]

Afterwards build and push the finished image again.

docker build -t <docker-registry><docker-repo>/example-extract-study-id:0.1.0 .
docker push <docker-registry><docker-repo>/example-extract-study-id:0.1.0

Since we finished the implementation process we also don’t want the DAG to initiate a dev-server every time. Hence, we should delete the dev-server="code-server" option from the initialization of the ExtractStudyIdOperator in dag_example_extract_study_id.py.

extract = ExtractStudyIdOperator(dag=dag, input_operator=get_input)

Congrats! You developed your first workflow in Kaapana :).

Alternative: Develop the workflow locally

As an alternative to the steps above, you can also develop the algorithm on your local machine. In this case skip Step 1: Provide an empty base image. To debug and test the algorithm you need data that is structured in the same way as on the platform. You can get such data by downloading some suitable data from the platform:

  1. Go to Workflows -> Datasets and select the data you want to use.

  2. Click on the play-button and start the workflow download-selected-files.

  3. After it finished go to Store - Minio and browse the downloads bucket for the object with the correct timestamp.

  4. Click on the download action to download the data.

  5. Extract the archive at the place, where you want to develop the algorithm.

Additionally, you need to emulate the Kaapana environment on your local machine. You can simply do this by setting the required environment variables at the beginning of the algorithm script extract_study_id.py:

import os

os.environ["WORKFLOW_DIR"] = "<your data directory>"
os.environ["BATCH_NAME"] = "batch"
os.environ["OPERATOR_IN_DIR"] = "get-input-data"
os.environ["OPERATOR_OUT_DIR"] = "output"

Change <your data directory> to the local path to the directory, where you ectracted the data.

After developing the algorithm build and push the docker image as described in Step 4: Adjust the base image from step 1 to execute the processing script. Then create the operator and the DAG analogously to Step 2: Create the operator and the DAG, but without setting the ExtractStudyIdOperator into dev-mode.

Provide a workflow as an extension

The previous section Write a DAG that utilizes a new processing algorithm gives an introduction on how to integrate a proccessing algorithm into a Kaapana platform. If you want to make you algorithm easily installable and available on multiple Kaapana platforms, you have to provide it as an installable extension.

Goal: We write a workflow that applies Otsu’s method to create a segmentation of DICOM data. We provide this workflow as an extension to the Kaapana platform.

Requirements: You need the image local-only/base-installer:latest locally available in order to build the image for the DAG.

All files used in this tutorial can be found in the repository under templates_and_examples.

The final directory structure has to look like this:

otsus-method/
├── extension
│   ├── docker
│   │   ├── Dockerfile
│   │   └── files
│   │       ├── dag_otsus_method.py
│   │       └── otsus-method
│   │           ├── OtsusMethodOperator.py
│   │           └── OtsusNotebookOperator.py
│   └── otsus-method-workflow
│       ├── Chart.yaml
│       ├── requirements.yaml
│       └── values.yaml
└── processing-containers
    └── otsus-method
        ├── Dockerfile
        └── files
            ├── otsus_method.py
            ├── otsus_notebooks
            │   ├── run_otsus_report_notebook.ipynb
            │   └── run_otsus_report_notebook.sh
            └── requirements.txt

Step 1: Create, build and push a docker image to containerize the processing algorithm

First you need to create a directory for the processing algorithm. To remain consistent with the structure of Kaapana we recommend to create the new folder in the location kaapana/data-processing/processing-piplines/, but it can be located anywhere.

mkdir -p otsus-method/processing-containers/otsus-method/files/otsus_notebooks

In the files directory create a file called otsus_method.py that contains the segmentation algorithm based on Otsu’s method:

otsus_method.py
import SimpleITK as sitk
import os
import glob

### For local testing you can uncomment the following lines
# os.environ["WORKFLOW_DIR"] = "<your data directory>"
# os.environ["BATCH_NAME"] = "batch"
# os.environ["OPERATOR_IN_DIR"] = "get-input-data"
# os.environ["OPERATOR_OUT_DIR"] = "output"

### From the template
batch_folders = sorted(
    [
        f
        for f in glob.glob(
            os.path.join("/", os.environ["WORKFLOW_DIR"], os.environ["BATCH_NAME"], "*")
        )
    ]
)

for batch_element_dir in batch_folders:
    element_input_dir = os.path.join(batch_element_dir, os.environ["OPERATOR_IN_DIR"])
    element_output_dir = os.path.join(batch_element_dir, os.environ["OPERATOR_OUT_DIR"])

    if not os.path.exists(element_output_dir):
        os.makedirs(element_output_dir)

    nddr_files = sorted(
        glob.glob(os.path.join(element_input_dir, "*.nrrd"), recursive=True)
    )

    if len(nddr_files) == 0:
        print("No nrrd file found!")
        exit(1)
    else:
        for file in nddr_files:
            print(("Applying Otsus method to: %s" % file))

            ### Load image
            img = sitk.ReadImage(file)

            ### Apply Otsus method
            seg = sitk.OtsuThreshold(img, 0, 1)

            ### Save the segmentation as .nrrd file
            output_file = os.path.join(
                element_output_dir,
                "{}.nrrd".format(os.path.basename(batch_element_dir)),
            )
            sitk.WriteImage(seg, output_file, useCompression=True)

In otsus_notebooks/ include the files run_otsus_report_notebook.sh and run_otsus_report_notebook.ipynb. These files generate an example jupyter notebook from the algorithm results in Workflows -> Workflow Results.

run_otsus_report_notebook.sh
#!/bin/bash
set -euf -o pipefail

if [ ! -v WORKFLOW_NAME ]; then
    echo WORKFLOW_NAME not set, setting it to the RUN_ID
    WORKFLOW_NAME=$RUN_ID
    echo $WORKFLOW_NAME
fi


echo 'Converting jupyter notebook file'
TIMESTAMP=$(date +%y-%m-%d-%H:%M:%S)
jupyter nbconvert --to pdf --execute --no-input /kaapana/app/otsus_notebooks/run_otsus_report_notebook.ipynb  --output-dir /$WORKFLOW_DIR/$OPERATOR_OUT_DIR --output otsus_report_${WORKFLOW_NAME// /_}.pdf
jupyter nbconvert --to html --execute --no-input /kaapana/app/otsus_notebooks/run_otsus_report_notebook.ipynb  --output-dir /$WORKFLOW_DIR/$OPERATOR_OUT_DIR --output otsus_report_${WORKFLOW_NAME// /_}.html
run_otsus_report_notebook.ipynb
{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "88a34a3f-1761-41b7-a69a-d12cb20d6e95",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import glob\n",
    "from pathlib import Path\n",
    "import SimpleITK as sitk\n",
    "# import nibabel as nib\n",
    "import numpy as np\n",
    "import matplotlib.pyplot as plt"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3817256a-3a1b-496c-beed-daf9e1c7d182",
   "metadata": {},
   "source": [
    "# Visualizations of images"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d5bea737-899d-41a3-a8ec-28d76a3b5d1d",
   "metadata": {},
   "outputs": [],
   "source": [
    "batch_folders = sorted([f for f in glob.glob(os.path.join('/', os.environ['WORKFLOW_DIR'], os.environ['BATCH_NAME'], '*'))])\n",
    "\n",
    "for batch_element_dir in batch_folders:\n",
    "\n",
    "    element_input_dir = os.path.join(batch_element_dir, os.environ['OPERATOR_IN_DIR'])\n",
    "    element_output_dir = os.path.join(batch_element_dir, os.environ['OPERATOR_OUT_DIR'])\n",
    "\n",
    "    if not os.path.exists(element_output_dir):\n",
    "        os.makedirs(element_output_dir)\n",
    "\n",
    "\n",
    "    nddr_files = sorted(glob.glob(os.path.join(element_input_dir, \"*.nrrd\"), recursive=True))\n",
    "\n",
    "    if len(nddr_files) == 0:\n",
    "        print(\"No nrrd file found!\")\n",
    "        exit(1)\n",
    "    else:\n",
    "        for file in nddr_files:\n",
    "            print((\"Applying Otsus method to: %s\" % file))\n",
    "\n",
    "            ### Load image\n",
    "            img = sitk.ReadImage(file)\n",
    "\n",
    "            z = img.GetSize()[2]\n",
    "            nda = sitk.GetArrayFromImage(img)\n",
    "            nda_slice = nda[:, :, int(z/2)]\n",
    "\n",
    "            # display the slice\n",
    "            plt.imshow(nda_slice)\n",
    "            plt.show()\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "kaapana-venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.10 (default, Mar 15 2022, 12:22:08) \n[GCC 9.4.0]"
  },
  "vscode": {
   "interpreter": {
    "hash": "4e29a91940ee7feabbc4958f4d0059fed19524df8c0a80d2d55f4c5325b75cae"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}

In the files directory create requirements.txt, which contains all dependencies the image needs to run the above scripts:

requirements.txt
pynrrd==1.0.0
matplotlib==3.6.3

In otsus-method/processing-containers/otsus-method create the Dockerfile for the otsus-method algorithm.

Dockerfile
FROM local-only/base-python-cpu:latest

LABEL IMAGE="otsus-method"
LABEL VERSION="0.1.0"
LABEL BUILD_IGNORE="False"

WORKDIR /kaapana/app

COPY files/requirements.txt /kaapana/app/
RUN python -m pip install -c https://codebase.helmholtz.cloud/kaapana/constraints/-/raw/0.3.0/constraints.txt --no-cache-dir -r /kaapana/app/requirements.txt

COPY files/ /kaapana/app/
CMD ["python3","-u","/kaapana/app/otsus_method.py"]

Starting this container will execute the segmentation algorithm.

To build the image and push it to the registry, run the following commands inside the otsus-method/processing-containers/otsus-method directory.

docker build -t <docker-registry>/<docker-repo>/otsus-method:0.1.0 .
docker push <docker-registry>/<docker-repo>/otsus-method:0.1.0

Hint

If not already done, you have to log into your Docker registry with docker login <docker-registry>/<docker-repo>, before you can push the image.

Step 2: Create, build and push a docker image for the workflow’s Airflow DAG

We also need an image that is able to install the DAG on the platform. This image is based on local-only/base-installer:latest and contains the operators as well as the dag file. Create the folder for the DAG image. Inside the outer otsus-method directory run:

mkdir -p extension/docker/files/otsus-method

Inside the folder otsus-method/extension/docker/files/otsus-method create two files: OtsusMethodOperator.py and OtsusNotebookOperator.py. These two files define the operators, that execute the code in otsus_method.py and run_otsus_report_notebook.sh, respectively.

OtsusMethodOperator.py
from datetime import timedelta
from kaapana.operators.KaapanaBaseOperator import KaapanaBaseOperator
from kaapana.blueprints.kaapana_global_variables import (
    DEFAULT_REGISTRY,
    KAAPANA_BUILD_VERSION,
)


class OtsusMethodOperator(KaapanaBaseOperator):
    def __init__(
        self,
        dag,
        name="otsus-method",
        execution_timeout=timedelta(seconds=120),
        *args,
        **kwargs,
    ):
        super().__init__(
            dag=dag,
            name=name,
            image=f"{DEFAULT_REGISTRY}/otsus-method:{KAAPANA_BUILD_VERSION}",
            image_pull_secrets=["registry-secret"],
            execution_timeout=execution_timeout,
            # operator_out_dir="otsus-method/",
            *args,
            **kwargs,
        )
OtsusNotebookOperator.py
import os
import glob
from datetime import timedelta

from kaapana.operators.KaapanaBaseOperator import KaapanaBaseOperator
from kaapana.blueprints.kaapana_global_variables import (
    DEFAULT_REGISTRY,
    KAAPANA_BUILD_VERSION,
)


class OtsusNotebookOperator(KaapanaBaseOperator):
    def __init__(
        self,
        dag,
        name="otsus-notebook-operator",
        execution_timeout=timedelta(minutes=20),
        *args,
        **kwargs,
    ):
        super().__init__(
            dag=dag,
            name=name,
            image=f"{DEFAULT_REGISTRY}/otsus-method:{KAAPANA_BUILD_VERSION}",
            image_pull_secrets=["registry-secret"],
            execution_timeout=execution_timeout,
            ram_mem_mb=1000,
            ram_mem_mb_lmt=3000,
            *args,
            **kwargs,
        )

The dag file dag_otsus_method.py should be created in otsus-method/extension/docker/files/.

dag_otsus_method.py
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.models import DAG

from kaapana.operators.LocalGetInputDataOperator import LocalGetInputDataOperator
from kaapana.operators.DcmConverterOperator import DcmConverterOperator
from kaapana.operators.Itk2DcmSegOperator import Itk2DcmSegOperator
from kaapana.operators.DcmSendOperator import DcmSendOperator
from kaapana.operators.LocalMinioOperator import LocalMinioOperator
from kaapana.operators.LocalWorkflowCleanerOperator import LocalWorkflowCleanerOperator
from otsus_method.OtsusMethodOperator import OtsusMethodOperator
from otsus_method.OtsusNotebookOperator import OtsusNotebookOperator


ui_forms = {
    "workflow_form": {
        "type": "object",
        "properties": {
            "single_execution": {
                "title": "single execution",
                "description": "Should each series be processed separately?",
                "type": "boolean",
                "default": False,
                "readOnly": False,
            }
        },
    }
}

log = LoggingMixin().log

args = {
    "ui_forms": ui_forms,
    "ui_visible": True,
    "owner": "kaapana",
    "start_date": days_ago(0),
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
}

dag = DAG(dag_id="otsus-method", default_args=args, schedule_interval=None)


get_input = LocalGetInputDataOperator(dag=dag)

convert = DcmConverterOperator(dag=dag, input_operator=get_input)

otsus_method = OtsusMethodOperator(
    dag=dag,
    input_operator=convert,
    # dev_server='code-server'
)

seg_to_dcm = Itk2DcmSegOperator(
    dag=dag,
    segmentation_operator=otsus_method,
    single_label_seg_info="abdomen",
    input_operator=get_input,
    series_description="Otsu's method",
)

dcm_send = DcmSendOperator(dag=dag, input_operator=seg_to_dcm)

generate_report = OtsusNotebookOperator(
    dag=dag,
    name="generate-otsus-report",
    input_operator=otsus_method,
    cmds=["/bin/bash"],
    arguments=["/kaapana/app/otsus_notebooks/run_otsus_report_notebook.sh"],
)

put_report_to_minio = LocalMinioOperator(
    dag=dag,
    name="upload-to-staticwebsite",
    bucket_name="staticwebsiteresults",
    action="put",
    action_operators=[generate_report],
    file_white_tuples=(".html", ".pdf"),
)

clean = LocalWorkflowCleanerOperator(dag=dag, clean_workflow_dir=True)

get_input >> convert >> otsus_method

otsus_method >> seg_to_dcm >> dcm_send >> clean
otsus_method >> generate_report >> put_report_to_minio >> clean

Hint

The DAG will perform the following steps:
  • Get the dicom files (LocalGetInputDataOperator),

  • Convert the dicom files to .nrrd files (DcmConverterOperator),

  • Apply the segmentation (OtsusMethodOperator),

  • Create a dicom segmentation from the .nrrd segmentation (Itk2DcmSegOperator ),

  • Send the data back to the PACS (DcmSendOperator),

  • Generate a jupyter notebook from the algorithm results (OtsusNotebookOperator),

  • Store the notebook in a minio bucket (LocalMinioOperator),

  • Clean the workflow dir (LocalWorkflowCleanerOperator).

Note: If you want to use this DAG as a template for your own segmentation algorithm note that Itk2DcmSegOperator requires the arguments segmentation_operator and single_label_seg_info.

In otsus-method/extension/docker/ create Dockerfile for the image, that will install the DAG on the platform.

Dockerfile
FROM local-only/base-installer:latest

LABEL IMAGE="dag-otsus-method"
LABEL VERSION="0.1.0"
LABEL BUILD_IGNORE="False"

COPY files/dag_otsus_method.py /kaapana/tmp/dags/
COPY files/otsus-method/OtsusMethodOperator.py /kaapana/tmp/dags/otsus_method/
COPY files/otsus-method/OtsusNotebookOperator.py /kaapana/tmp/dags/otsus_method/

Build the image and push it to the registry. Next to the Dockerfile in otsus-method/extension/docker/ execute:

docker build -t <docker-registry>/<docker-repo>/dag-otsus-method:<version-tag> .
docker push <docker-registry>/<docker-repo>/dag-otsus-method:<version-tag>

Important

Setting the correct <version-tag> is important, because Kaapana will pull images according to its own version. You can find the platform version of your instance at the bottom of the web interface: kaapana-admin-chart:<version-tag>

Step 3: Create the helm chart

The helm chart for the extension contains all information the platform needs to pull the images from the registry and make the extension available.

Create a folder for the chart. Inside otsus-method/extension/ run mkdir -p otsus-method-workflow.

Create the three files Chart.yaml and requirements.yaml and values.yaml in otsus-method-workflow/.

Chart.yaml
---
apiVersion: v1
appVersion: "0.1.0"
description: thresholding segmentation workflow for Otsus method
name: otsus-method-workflow
version: "0.0.0"
keywords:
- kaapanaworkflow
- kaapanaexperimental
requirements.yaml
---
dependencies:
  - name: dag-installer-chart
    version: 0.0.0
    repository: file://../../../../../../services/utils/dag-installer-chart/
    
values.yaml
---
global:
    image: "dag-otsus-method"
    action: "copy"

Important

The field repository in requirements.yaml must be the relative path to the directory that contains the Chart.yaml file for the dag-installer chart. This file is located in the subdirectory services/utils/dag-installer-chart/ of the kaapana repository.

Update helm dependencies and package the chart.

helm dep up
helm package .

This will create the file otsus-method-workflow-0.1.0.tgz, which contains all the required information.

Step 4: Add extension to the platform

There are three ways how to add an extension to the platform. The first two options Option 1: Add the extension via the UI and Option 2: Add the extension to the file system of the host machine make the extension available only on a single instance. The third approach: Option 3: (Persistent alternative) Build the platform adds the extension as a dependency to the helm chart of the platform. Therefore, every platform that will be deployed based on this helm chart will have the extension available.

Option 1: Add the extension via the UI

You can simply drag and drop the file otsus-method-workflow-0.1.0.tgz into the Upload field on the Extensions page.

https://www.kaapana.ai/kaapana-downloads/kaapana-docs/stable/img/extensions_upload_chart.png

Option 2: Add the extension to the file system of the host machine

Warning

This approach requires root permissions on the host server of the platform.

You can copy the file otsus-method-workflow-0.1.0.tgz into the correct location directory on the host machine of your instance. Copy the file otsus-method-workflow-0.1.0.tgz to the subdirectory extensions/ of the FAST_DATA_DIR directory, which is by default /home/kaapana/.

Hint

The FAST_DATA_DIR can be configured by editing the deploy_platform.sh script before the installation of the platform.

Option 3: (Persistent alternative) Build the platform

The Kaapana repository contains a special collection chart that depends on mutliple other charts. You can add the new helm chart to the requirements.yaml file of this collection chart list. If you build the platform again, the new extension will automatically be a part of it.

First append the file collections/kaapana-collection/requirements.yaml in your Kaapana repository with these two lines

- name: otsus-method-workflow
  version: 0.0.0

Then build the platform by running python3 build-scripts/start_build.py in the root directory of your repository.

To make the extension available on a running platform you only need to restart the kaapana-extension-collection pod. You can do this in the Kaapana GUI by clicking on the cloud button next to Applications and workflows in the Extension page.

Alternatively, you can also manually delete the pod on your host machine. First get the name of the pod.

kubectl get pods -n admin | grep copy-kube-helm-collections

Then delete the pod

kubectl delete pod -n admin <pod-name>

After some seconds the extension list should contain otsus-method-workflow.

Debugging

This short section will show you how to debug in case a workflow throws an error.

Syntax errors

If there is a syntax error in the implementation of a DAG or in the implementation of an operator, the errors are normally shown directly at the top of the Airflow DAGs view in red. For further information, you can also consult the log of the container that runs Airflow. For this, you have to go to Kubernetes, select the namespace services and click on the Airflow pod. On the top right there is a button to view the logs. Since Airflow starts two containers at the same time, you can switch between the two outputs at the top in ‘Logs from…’.

Operator errors during execution

  • Via Workflow List: When you click on the red bubble within the workflow list all failed workflow runs will appear underneath the workflow. Within the ‘Logs’ column you can see two buttons linking directly to the logs in airflow and to the task view.

  • Via Airflow: when you click in Airflow on the DAG you are guided to the ‘Graph View’. Clicking on the red, failed operator a pop-up dialog opens where you can click on ‘View Log’ to see what happened.

  • Via Kubernetes: in the namespace jobs, you should find the running pod that was triggered from Airflow. Here you can click on the logs to see why the container failed. If the container is still running, you can also click on ‘Exec into pod’ to debug directly into the container.

After you resolved the bug in the operator, you can either restart the whole workflow or you can click on the operator in the ‘Graph View’, select ‘Clear’ in the pop-up dialog and confirm the next dialog. This will restart the operator.