Existing Operators

This section lists the ready-to-use operators available in Kaapana. Note that all operators might not be documented yet. For the full list, please refer to the kaapana.operators package.

Base operators

Base operators serve as foundational classes for task-specific operators. When developing your own customized operator leverage these operators as base classes.

KaapanaBaseOperator

class kaapana.operators.KaapanaBaseOperator.KaapanaBaseOperator(*args: Any, **kwargs: Any)[source]

Bases: BaseOperator, SkipMixin

Execute a task in a Kubernetes Pod

Parameters:
  • image (str) – Docker image you wish to launch. Defaults to dockerhub.io, but fully qualified URLS will point to custom repositories

  • namespace (str) – the namespace to run within kubernetes

  • cmds (list of str) – entrypoint of the container. (templated) The docker images’s entrypoint is used if this is not provide.

  • arguments (list of str) – arguments of to the entrypoint. (templated) The docker image’s CMD is used if this is not provided.

  • volume_mounts (list of VolumeMount) – volumeMounts for launched pod

  • volumes (list of Volume) – volumes for launched pod. Includes ConfigMaps and PersistentVolumes

  • labels (dict) – labels to apply to the Pod

  • startup_timeout_seconds (int) – timeout in seconds to startup the pod

  • name (str) – name of the task you want to run, will be used to generate a pod id

  • env_vars (dict) – Environment variables initialized in the container. (templated)

  • secrets (list of Secret) – Kubernetes secrets to inject in the container, They can be exposed as environment vars or files in a volume.

  • in_cluster (bool) – run kubernetes client with in_cluster configuration

  • cluster_context (string) – context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used.

  • get_logs (bool) – get the stdout of the container as logs of the tasks

  • affinity (dict) – A dict containing a group of affinity scheduling rules

  • node_selectors (list[V1Toleration]) – A dict containing a group of scheduling rules

  • tolerations – A V1Toleration list specifying pod tolerations

  • config_file (str) – The path to the Kublernetes config file

  • xcom_push (bool) – If xcom_push is True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes.

CURE_INVALID_NAME_REGEX = '[a-z]([-a-z0-9]*[a-z0-9])?'
HELM_API = 'http://kube-helm-service.None.svc:5000'
TIMEOUT = 43200
clear(start_date=None, end_date=None, upstream=False, downstream=False, session=None)[source]
delete_operator_out_dir(run_id, operator_dir)[source]
execute(context)[source]
static extract_timestamp(s)[source]

Accepts timestamp in the forms: - dcm2nifti-200831164505663620 - manual__2020-08-31T16:58:05.469533+00:00 if the format is not provided,the last 10 characters of the run_id will be added

static on_execute(context)[source]

Use this method with caution, because it unclear at which state the context object is updated!

static on_failure(context)[source]

Use this method with caution, because it unclear at which state the context object is updated!

on_kill() None[source]
static on_retry(context)[source]

Use this method with caution, because it unclear at which state the context object is updated!

static on_success(context)[source]

Use this method with caution, because it unclear at which state the context object is updated!

post_execute(context, result=None)[source]
set_context_variables(context)[source]
static set_defaults(obj, name, task_id, operator_out_dir, input_operator, operator_in_dir, parallel_id, keep_parallel_id, trigger_rule, pool, pool_slots, ram_mem_mb, ram_mem_mb_lmt, cpu_millicores, cpu_millicores_lmt, gpu_mem_mb, gpu_mem_mb_lmt, manage_cache, allow_federated_learning, whitelist_federated_learning, batch_name, airflow_workflow_dir, delete_input_on_success, delete_output_on_start, priority_class_name)[source]

KaapanaPythonBaseOperator

class kaapana.operators.KaapanaPythonBaseOperator.KaapanaPythonBaseOperator(*args: Any, **kwargs: Any)[source]

Bases: PythonOperator, SkipMixin

post_execute(context, result=None)[source]

KaapanaApplicationOperator

class kaapana.operators.KaapanaApplicationOperator.KaapanaApplicationOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

HELM_API = 'http://kube-helm-service.None.svc:5000'
TIMEOUT = 43200
static on_failure(context)[source]

Use this method with caution, because it unclear at which state the context object is updated!

static on_retry(context)[source]

Use this method with caution, because it unclear at which state the context object is updated!

start(ds, **kwargs)[source]
static uninstall_helm_chart(kwargs)[source]

DICOM operators

We provide a range of operators designed to handle and manage DICOM data efficiently. These operators offer various capabilities, such as modifying DICOM tags, generating DICOM data from binaries or JSON, and much more.

Bin2DcmOperator

class kaapana.operators.Bin2DcmOperator.Bin2DcmOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator to encode binary data into DICOM or to decode binary data from DICOM.

This operator encodes or decodes files using a container. The container uses the dcmtk tool xml2dcm https://support.dcmtk.org/docs/xml2dcm.html Binary files can be split up in chunks of a specific size [size_limit]

Inputs:

  • The input_operator

  • When decoding: file_extension *.dcm

  • When encoding: file_extensions, size_limit and additional encoded string parameters

Outputs:

  • When encoding: DICOM files

  • When decoding: binary file (e.g. zip-file)

Parameters:
  • dataset_info_operator – Only encoding. Input operator producing dataset.json as output.

  • dataset_info_operator_in_dir – Only encoding. Directory with a dataset.json file used in study_description.

  • file_extensions – for decoding .dcm, for encoding list of extensions to include seperated by ‘,’ eg: “.zip,*.bin”

  • size_limit – Only for decoding. Allows to split up the binary in chunks of a specific size in bytes.

  • patient_id – Only for decoding. Name written in DICOM tag.

  • patient_name – Only for decoding. Name written in DICOM tag.

  • protocol_name – Only for decoding. Name written in DICOM tag.

  • instance_name – Only for decoding. Name written in DICOM tag.

  • version – Only for decoding. Name written in DICOM tag.

  • manufacturer – Only for decoding. Name written in DICOM tag.

  • manufacturer_model – Only for decoding. Name written in DICOM tag.

  • study_description – Only for decoding. Name written in DICOM tag.

  • series_description – Only for decoding. Name written in DICOM tag.

  • study_id – Only for decoding. Name written in DICOM tag.

  • study_uid – Only for decoding. Name written in DICOM tag.

  • sop_class_uid – Only for decoding. Name written in DICOM tag.

execution_timeout = datetime.timedelta(seconds=600)

Dcm2MetaJsonConverter

DcmConverterOperator

class kaapana.operators.DcmConverterOperator.DcmConverterOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator to convert dcm files to nrrd or nii.gz files.

This operator converts incoming DICOM (dcm) files to nrrd or NIFTI (nii.gz) files. By dafult, the files are converted to nrrd file format. The conversion operation is executed using MITK’s FileConverter.

Parameters:

output_format – File format to which the incoming DICOM files are converted to. Possible values: “nrrd” (default), nii.gz, nii

Parallel_processes:

Defines how many processes are executed in parallel (default: 3)

DcmModifyOperator

class kaapana.operators.DcmModifyOperator.DcmModifyOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator to modify DICOM tags.

This operator serves to modify DICOM tags of DICOM files. The operator relies on DCMTK’s “dcmodify” function.

Note: This operator edits the DICOM files in-place. Therefore, the original DICOM files are modified. Furthermore, it creates a backup of the original DICOM files in the same directory with ending “{filename}.bak”. If the filename ends with “.dcm”, the backup file will be named “{filename}.dcm.bak”.

Inputs:

  • DICOM files which should be modified.

  • DICOM tags which should be modified in given DICOM files.

Outputs:

  • Modified DICOM files according to DICOM tags.

Parameters:

dicom_tags_to_modify – List of all DICOM tags which should be modified in given DICOM file. Specify using the following syntax: eg: “(0008,0016)=1.2.840.10008.5.1.4.1.1.88.11;(0008,0017)=1.2.840.10008.5.1.4.1.1.88.11; … .

DcmQueryOperator

class kaapana.operators.DcmQueryOperator.DcmQueryOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

DcmSeg2ItkOperator

class kaapana.operators.DcmSeg2ItkOperator.DcmSeg2ItkOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator converts .dcm segmentation data into other file formats.

This operator takes a .dcm segmentation file and creates a segmentation file in a different file format e.g. .nrrd file format. The operator uses the dcmqi libary. For dcmqi documentation please have a look at https://qiicr.gitbook.io/dcmqi-guide/.

Inputs: * .dcm segmentation file

Outputs: * segmentation file in a file format specified by output_format

Parameters:
  • output_format – File format of the created segmentation file If not specified “nrrd”

  • seg_filter – A bash list of organs, that should be filtered from the segmentation e.g. “liver,aorta”

  • env_vars – Environmental variables

  • execution_timeout – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail

Json2DcmSROperator

class kaapana.operators.Json2DcmSROperator.Json2DcmSROperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator to generate DICOM Structured Reports.

This operator extracts information given by input json-files, dicom-files and dicom-seg-files. The extracted information is collected in a tid_template downloaded from https://raw.githubusercontent.com/qiicr/dcmqi/master/doc/schemas/sr-tid1500-schema.json# . The modified tid_template is then passed to DCMQI’s tid1500writer which converts and saves the previously extracted and summarized information into a DICOM Structured Report following the template TID1500. Furter information about DCMQI’s tid1500writer: https://qiicr.gitbook.io/dcmqi-guide/opening/cmd_tools/sr/tid1500writer

Inputs:

  • src_dicom_operator: Input operator which provides the processed DICOM file.

  • seg_dicom_operator: Input operator which provides the processed DICOM_SEG file.

  • input_file_extension: “.json” by default

*Outputs:

  • DcmSR-file: .dcm-file which contains the DICOM Structured Report.

Parameters:
  • dag – DAG in which the operator is executed.

  • src_dicom_operator – Input operator which provides the processed DICOM file.

  • seg_dicom_operator – Input operator which provides the processed DICOM_SEG file.

  • input_file_extension – “.json” by default

LocalDcm2JsonOperator

LocalDcmAnonymizerOperator

class kaapana.operators.LocalDcmAnonymizerOperator.LocalDcmAnonymizerOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to anonymize sensitive information.

Inputs:

  • Input data which should be anonymized is given via input parameter: input_operator

Outputs:

  • Dicom file: Anonymized .dcm file(s)

Parameters:
  • bulk – process all files of a series or only the first one (default: False).

  • overwrite – should overwrite or not (default: True).

  • single_slice – only single slice to be processed or not (default: False).

start(ds, **kwargs)[source]

Mask2nifitiOperator

class kaapana.operators.Mask2nifitiOperator.Mask2nifitiOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

This operator takes a .dcm SEG or RTSTRUCT file and converts it into the Nifti file format. For DICOM SEG, the operator uses the dcmqi libary (https://qiicr.gitbook.io/dcmqi-guide/). For DICOM RTSTRUCT, the operator uses the dcmrtstruct2nii libary (https://github.com/Sikerdebaard/dcmrtstruct2nii).

Inputs: * .dcm mask-file (SEG and RTSTRUCT are supported)

Outputs: * Nifti file with the segmentation mask

Parameters:
  • seg_filter – A bash list of organs, that should be filtered from the segmentation e.g. “liver,aorta”

  • env_vars – Environmental variables

  • execution_timeout – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail

Pdf2DcmOperator

class kaapana.operators.Pdf2DcmOperator.Pdf2DcmOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator saves DICOM meta information in a DICOM Encapsulated PDF Storage SOP instance.

This operator either takes DICOM meta information or reads the DICOM meta information from existing DICOM files. The DICOM meta information are converted to a DICOM Encapsulated PDF Storage SOP instance and are stored in a specified ouput directory. The converting process is done by DCMTK’s pdf2dcm function: https://support.dcmtk.org/docs/pdf2dcm.html .

Inputs: * (either) PDF file containing all DICOM meta information * (or) DICOM file to extract DICOM meta information from

Outputs: * DICOM Encapsulated PDF Storage SOP instance containing the extracted DICOM meta information

Parameters:

pdf_title – Title of PDF file that is DICOM encapsulated.

Dicom_operator:

Used to find DICOM files from which DICOM meta information is extracted.

Aetitle:

DICOM meta information.

Study_uid:

DICOM meta information.

Study_description:

DICOM meta information.

Patient_id:

DICOM meta information.

Patient_name:

DICOM meta information.

Manufacturer:

DICOM meta information.

Sop_class_uid:

DICOM meta information.

Env_vars:

Dictionary to store environmental variables.

Execution_timeout:

File-based operators

This is a collection of operators that can be utilized for various file-based operations.

LocalConcatJsonOperator

class kaapana.operators.LocalConcatJsonOperator.LocalConcatJsonOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Concatenate data from multiple json files in the dag_run directory into a single json file.

Inputs:

  • Multiple json files

Outputs

  • A single json file in the outpot directory of the operator

Parameters:

name (str) – Used in the filename of the output json file i.e. timestamp-name.json. Defaults to “concatenated”.

start(ds, **kwargs)[source]

ZipUnzipOperator

class kaapana.operators.ZipUnzipOperator.ZipUnzipOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator to extract or pack zip archives.

This operator packs or extracts a set of files using a container. Its execution fails no files have been processed. When extracting it extracts all files provided in the output of its input operator. When packing it packs all files but those extracted and included by a white or blacklist into a single zip file in it’s output directory.

Inputs:

  • When packing (e.g. mode is zip) the operator which files should be packed.

  • When extracting (e.g. mode is unzip) nothing.

Outputs:

  • When packing the packed zipfile under the given name.

  • When extracting the extracted files

Parameters:
  • target_filename – Only for packing. The created file.

  • subdir – Only for packing. Subdir used to pack, if empty all data are packed.

  • mode – “zip” for packing “unzip” for extracting

  • whitelist_files – Only for packing. List of files to include seperated by ‘,’ eg: “.txt,.png” or whole filenames

  • blacklist_files – Only for packing. List of files to exclude seperated by ‘,’ eg: “.txt,.png” or whole filenames

  • info_file – additional files to add to the target zip files

NIFTI and nrrd operators

We offer operators specifically designed for NIFTI and nrrd data, enabling functionalities such as radiomics, resampling, and more.

MergeMasksOperator

class kaapana.operators.MergeMasksOperator.MergeMasksOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Searches for NIFTI files (expects segmentation masks) in operator input-dir and merges them according to the given mode. mode can either be “combine” (default) or “fuse”.

mode = “combine”: combines NIFTI files into a single NIFTI-file; labels that overlap with already combined labels are sorted out. mode = “fuse”: fuses label masks of specified labels (via label_name) into a single NIFTI file.

Inputs: * mode: either “combine” or “fuse” * fuse_label_names: label_names of segmentation masks which should be fused * fused_label_name: new label name of fused segmentation masks * input Nifti files

Outputs: * Nifti file(s) with merged (combined or fused) segmentation masks * adapted seg_info and meta_info JSON files

Itk2DcmOperator

class kaapana.operators.Itk2DcmOperator.Itk2DcmOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator which converts nifti files to dicom files.

Inputs:

Nifti files in a dataset directory structure specified by the nnU-Net project (https://github.com/MIC-DKFZ/nnUNet)

Unimodal datasets can also be parsed directly with the following structure.

` path |----dataset |    | meta_data.json |    | seg_info.json |    | series1.nii.gz |    | series1_seg.nii.gz |    | series2.nii.gz |    | series2_seg.nii.gz |    | ... `

Outputs:

Converted Dicoms. Associated segmentations are not converted yet, but prepared to be converted by the Itk2DcmSegOperator.

Itk2DcmSegOperator

class kaapana.operators.Itk2DcmSegOperator.Itk2DcmSegOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator converts segmentation data into .dcm segmentation files.

This operator takes .nrrd segmentation files and creates .dcm segmentation files. The operator uses the dcmqi libary. For dcmqi documentation please have a look at https://qiicr.gitbook.io/dcmqi-guide/.

Inputs: * .nrrd segmentation file

Outputs: * .dcm segmentation file

Parameters:
  • segmentation_operator – Operator object used for segmentation

  • input_type – “multi_label_seg” or “single_label_segs” If “multi_label_seg”, a json file inside OPERATOR_IMAGE_LIST_INPUT_DIR must exist containing parts as follows {“seg_info”: [“spleen”, “right@kidney”]}

  • alg_name – Name of the algorithm

  • creator_name – Name of the creator

  • alg_type – Type of the algorithm. Default: “AUTOMATIC”

  • single_label_seg_info – “from_file_name” or e.g. “right@kidney” or “abdomen”

  • create_multi_label_dcm_from_single_label_segs – True or False

  • multi_label_seg_info_json – name of json file inside OPERATOR_IMAGE_LIST_INPUT_DIR that contains the organ seg infos e.g. {“seg_info”: [“spleen”, “right@kidney”]}

  • multi_label_seg_name – Name used for multi-label segmentation object, if it will be created

  • series_description – Description of the series

  • skip_empty_slices – Whether empty sclices of the series should be skipped. Default: False

  • env_vars – Environment variables

  • execution_timeout – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail

PyRadiomicsOperator

class kaapana.operators.PyRadiomicsOperator.PyRadiomicsOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Calculates Radiomics feature for given nifti segmentation masks using pyradiomics.

The operator can work with both, multi-mask and single-mask nifti files, but the indicated use is with multiple single-mask nifti files, so that the radiomics features will be calculated per class.

Default behaviour is only shape and first order features.

  • Publication: Van Griethuysen, J. J., Fedorov, A., Parmar, C., Hosny, A., Aucoin, N., Narayan, V., … & Aerts, H. J. (2017). Computational radiomics system to decode the radiographic phenotype. Cancer research, 77(21), e104-e107.

  • Code: https://pyradiomics.readthedocs.io/en/latest/

ResampleOperator

class kaapana.operators.ResampleOperator.ResampleOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator to resample NIFTI images.

This operator resamples NIFTI images (.nii.gz file format) according to a defined shape served by a template image. The resampling is executed using MITK’s “MitkCLResampleImageToReference.sh”. The resampling is executed without additional registration. Possible interpolator types are a linear interpolator (0), a nearest neighbor interpolator (1) and a sinc interpolator (2).

Inputs:

  • input_operator: Data which should be resampled.

  • original_img_operator: Data element which serves as a shape template for the to-be-resampled input data.

  • operator_out_dir: Directory in which the resampled data is saved.

Outputs:

  • resampled NIFTI images

Parameters:
  • original_img_operator – Data element which serves as a shape template for the to-be-resampled input data.

  • original_img_batch_name – Batch nahem of template data element.

  • format – = .nii.gz NIFTI image format.

  • interpolator – Interpolation mechanism to resample input images according to reference image’s shape. 0 = linear (default), 1 = nearest neighbor, 2 = sinc.

Opensearch operators

Utilize the following operators to establish communication with Opensearch and effectively manage metadata information.

LocalDeleteFromMetaOperator

class kaapana.operators.LocalDeleteFromMetaOperator.LocalDeleteFromMetaOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to remove series from OpenSearch’s index.

This operator removes either selected series or whole studies from OpenSearch’s index for the functional unit Meta. The operator relies on OpenSearch’s “delete_by_query” function.

Inputs:

  • Input data which should be removed is given via input parameter: delete_operator.

Parameters:
  • delete_operator

  • delete_all_documents – Specifies the amount of removed data to all documents.

  • delete_complete_study – Specifies the amount of removed data to all series of a specified study.

start(ds, **kwargs)[source]

LocalJson2MetaOperator

class kaapana.operators.LocalJson2MetaOperator.LocalJson2MetaOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

This operater pushes JSON data to OpenSearch.

Pushes JSON data to the specified OpenSearch instance. If meta-data already exists, it can either be updated or replaced, depending on the no_update parameter. If the operator fails, some or no data is pushed to OpenSearch. Further information about OpenSearch can be found here: https://opensearch.org/docs/latest/

Inputs:

  • JSON data that should be pushed to OpenSearch

Outputs:

  • If successful, the given JSON data is included in OpenSearch

Parameters:
  • dicom_operator – Used to get OpenSearch document ID from dicom data. Only used with json_operator.

  • json_operator – Provides json data, use either this one OR jsonl_operator.

  • jsonl_operator – Provides json data, which is read and pushed line by line. This operator is prioritized over json_operator.

  • set_dag_id – Only used with json_operator. Setting this to True will use the dag run_id as the OpenSearch document ID when dicom_operator is not given.

  • no_update – If there is a series found with the same ID, setting this to True will replace the series with new data instead of updating it.

  • avalability_check_delay – When checking for series availability in PACS, this parameter determines how many seconds are waited between checks in case series is not found.

  • avalability_check_max_tries – When checking for series availability in PACS, this parameter determines how often to check for series in case it is not found.

  • check_in_pacs – Determines whether or not to search for series in PACS. If set to True and series is not found in PACS, the data will not be put into OpenSearch.

check_pacs_availability(instanceUID: str)[source]
produce_inserts(new_json)[source]
push_json(json_dict)[source]
set_id(dcm_file=None)[source]
start(ds, **kwargs)[source]

Service operators

These operators are used in service DAGs which run automatically, e.g. if data arrives at the platform or nightly to perform a cleanup.

GenerateThumbnailOperator

class kaapana.operators.GenerateThumbnailOperator.GenerateThumbnailOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator which generates a thumbnail from a DICOM-SEG or -RTStruct file.

Given the DICOM-SEG or -RTStruct file, this operator generates a thumbnail image of the segmentation overlayed of the original image.

Inputs:

  • DICOM-SEG or -RTStruct file.

  • Original image operator directory.

Outputs:

  • Thumbnail: Segmentation overlayed on original image.

Parameters:

orig_image_operator – Operator of the original image, usually CT or MRI.

LocalAddToDatasetOperator

class kaapana.operators.LocalAddToDatasetOperator.LocalAddToDatasetOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to assign series to dataset.

Given the metadata of a series, this operator assigns the series to a dataset based on the tags_to_add_from_file parameter.

Inputs: * Metadata of a series

Parameters:

tags_to_add_from_file – a list of fields form the metadata json which holds the dataset name.

start(ds, **kwargs)[source]

LocalAutoTriggerOperator

class kaapana.operators.LocalAutoTriggerOperator.LocalAutoTriggerOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to trigger workflows from within a workflow. To automatically trigger workflows configuration JSON files with the name /*trigger_rule.json are needed. The operator search for all files with this extension in the dags folder.

Inputs:

JSON file example:

[{
    "search_tags": {
        "0x0008,0x0060": "SEG,RTSTRUCT" # comma separated values represent 'or'-logic
    },
    "dag_ids": {
            <dag id to trigger>: {
            "fetch_method": "copy",
            "single_execution" : false
            }
    }
}]

Outputs:

  • all workflows with predefined trigger rules are triggered

set_data_input(dag_id, dcm_path, dag_run_id, series_uid, conf={})[source]
start(ds, **kwargs)[source]
trigger_it(triggering)[source]

LocalCleanUpExpiredWorkflowDataOperator

class kaapana.operators.LocalCleanUpExpiredWorkflowDataOperator.LocalCleanUpExpiredWorkflowDataOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to cleanup/remove the expired workflows data directories

Inputs:

  • dag: DAG to be cleaned.

  • expired_period: Clean items that have expired since a certain period.

Parameters:
  • dag – DAG in which the operator has to be executed.

  • expired_period – Clean items that have expired since in day(s), default: 60 days

start(ds, **kwargs)[source]

LocalCtpQuarantineCheckOperator

class kaapana.operators.LocalCtpQuarantineCheckOperator.LocalCtpQuarantineCheckOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to check the CTP quarantine folder (FASTDATADIR/ctp/incoming/.quarantines), for dicom files. If files are found, trigger_dag_id is triggered.

Inputs:

  • Quarantine folder of the CTP

Found quarantine files are processed as incoming files and added to the PACs and meta.

Parameters:
  • trigger_dag_id – Is by default “service-process-incoming-dcm”, has to be set for a different incoming process.

  • max_number_of_batch_files – default 2000, defines the maximum of files handled in a single dag trigger.

  • target_dir – The input dir of the trigger_dag_id. Has to be set, for a different incoming process.

check(**kwargs)[source]

LocalServiceSyncDagsDbOperator

class kaapana.operators.LocalServiceSyncDagsDbOperator.LocalServiceSyncDagsDbOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to synchronize DAG-files with Airflow.

This operator synchronizes DAG-files on the file system with the Airflow API. So far, this operator supports the synchronization of DAGs in terms of removing DAGs from the Airflow API. The operator checks whether a DAG-file is still present in the Airflow API but not anymore in the DAG-files on the file-system, and removes the DAG from the Airflow API. This operator is applied when uninstalling extensions from the platform.

Parameters:

dag

start(ds, **kwargs)[source]

Store operators (MinIO and PACS)

You can utilize existing operators to communicate with the MinIO storage and the interal PACS of Kaapana. Use these operators e.g. to send, retrieve or delete data.

DcmSendOperator

class kaapana.operators.DcmSendOperator.DcmSendOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator sends data to the platform.

This operator is used for sending data to the platform. For dcmsend documentation please have a look at https://support.dcmtk.org/docs/dcmsend.html.

Parameters:
  • ae_title – calling Application Entity (AE) title

  • pacs_host – Host of PACS

  • pacs_port – Port of PACS

  • env_vars – Environment variables

  • level – ‘element’ or batch’ If batch, an operator folder next to the batch folder with .dcm files is expected. If element, *.dcm are expected in the corresponding operator with .dcm files is expected.

  • execution_timeout – timeout for connection requests

LocalDeleteFromPacsOperator

class kaapana.operators.LocalDeleteFromPacsOperator.LocalDeleteFromPacsOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to remove series from PACS system.

This operator removes either selected series or whole studies from Kaapana’s integrated research PACS system by DCM4Chee. The operaator relies on “delete_study” function of Kaapana’s “HelperDcmWeb” operator.

Inputs:

  • Input data which should be removed given by input parameter: input_operator.

Parameters:
  • pacs_host – Needed to specify “pacs_dcmweb_endpoint”.

  • pacs_port – Needed to specify “pacs_dcmweb_endpoint”.

  • pacs_aet – Is by default set to “KAAPANA” and is used as a template AE-title for “HelperDcmWeb“‘s “delete_study” function.

  • delete_complete_study – Specifies the amount of removed data to all series of a specified study.

start(ds, **kwargs)[source]

LocalDicomSendOperator

class kaapana.operators.LocalDicomSendOperator.LocalDicomSendOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator sends data to the platform locally.

This operator is used for sending data to the platform locally. For dcmsend documentation please have a look at https://support.dcmtk.org/docs/dcmsend.html.

Parameters:
  • pacs_host – Host of PACS

  • pacs_port – Port of PACS

  • ae_title – calling Application Entity (AE) title

  • aetitle_send – called AE title of peer

  • check_arrival – Verifies if data transfer was successful

send_dicom_data(send_dir, series_uid)[source]
start(**kwargs)[source]

LocalGetRefSeriesOperator

class kaapana.operators.LocalGetRefSeriesOperator.LocalGetRefSeriesOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to get DICOM series.

This operator downloads DICOM series from a given PACS system according to specified search filters. The downloading is executed in a structured manner such that the downloaded data is saved to target directories named with the series_uid.

Parameters:
  • name – “get-ref-series” (default)

  • search_policy – reference_uid

  • data_type – ‘dicom’ or ‘json’

  • modality – None (defalut)

  • taget_level – “batch_element” (default)

  • dicom_tags – (empty list by default)

  • expected_file_count – either number of files (type: int) or “all”

  • limit_file_count – to limit number of files

  • parallel_downloads – number of files to download in parallel (default: 3)

  • pacs_dcmweb_host – “http://dcm4chee-service.{SERVICES_NAMESPACE}.svc” (default)

  • pacs_dcmweb_port – 8080 (default)

  • aetitle – “KAAPANA” (default)

  • batch_name – None (default)

download_series(series)[source]
get_files(ds, **kwargs)[source]

LocalMinioOperator

class kaapana.operators.LocalMinioOperator.LocalMinioOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to communicate with MinIO buckets

Parameters:
  • action – Action to execute (‘get’, ‘remove’ or ‘put’)

  • local_root_dir – Workflow directory

  • bucket_name – Name of the Bucket to interact with

  • action_operators – Operator to use the output data from

  • action_operator_dirs – (Additional) directory to apply MinIO action on.

  • action_files – (Additional) files to apply MinIO action on.

  • minio_host – MinIO host

  • minio_port – MinIO port

  • file_white_tuples – Optional whitelisting for files

  • zip_files – If files should be zipped

start(ds, **kwargs)[source]

LocalTaggingOperator

class kaapana.operators.LocalTaggingOperator.LocalTaggingOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Parameters:
  • tag_field – the field of the opensearch object where the tags are stored

  • add_tags_from_file – determines if the content of the fields specified by tags_to_add_from_file are added as tags

  • tags_to_add_from_file – a list of fields form the input json where the values are added as tags if add_tags_from_file is true

class Action(value)[source]

Bases: Enum

An enumeration.

ADD = 'add'
ADD_FROM_FILE = 'add_from_file'
DELETE = 'delete'
start(ds, **kwargs)[source]
tagging(series_instance_uid: str, tags: List[str], tags2add: List[str] = [], tags2delete: List[str] = [])[source]

Uncategorized operators

We have even more operators for different usecases. Just take a look if there is something you can utilize in your own workflow.

LocalDagTriggerOperator

class kaapana.operators.LocalDagTriggerOperator.LocalDagTriggerOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

check_cache(dicom_series, cache_operator)[source]
copy(src, target)[source]
get_dicom_list()[source]
trigger_dag(ds, **kwargs)[source]

Orchestrates triggering of Airflow workflows.

Parameters:

ds (str): Datestamp parameter for Airflow. kwargs: Additional keyword arguments, including dag_run information.

Returns:

None

Raises:

ValueError: If any triggered DAG fails or encounters unexpected behavior.

trigger_dag_dicom_helper()[source]

Triggers Airflow workflows passed with the input DICOM data.

Returns:

pending_dags (list): List containing information about triggered DAGs that are still pending.

TrainTestSplitOperator

class kaapana.operators.TrainTestSplitOperator.TrainTestSplitOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Workflow management operators

This section comprises commonly used operators for obtaining input data, downloading models, and performing a workflow directory cleanup during the final step of a dag.

GetZenodoModelOperator

class kaapana.operators.GetZenodoModelOperator.GetZenodoModelOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaBaseOperator

Operator to download models from https://zenodo.org/ and extract them into the desired location on the file system.

Inputs:

  • model_dir: Location where to extract the model archive

  • task_ids: List of comma separated tasks that should be downloaded

Outputs:

  • The downloaded model is available for inference.

Parameters:
  • model_dir (str) – The directory relative to SLOW_DATA_DIR/workflows where the downloaded models should be extracted. Defaults to “/models/nnUNet”.

  • name (str) – The base name of the pod. Defaults to “get-zenodo-models”.

  • task_ids (str) – A comma separated list of the task IDs associated with the models that should be downloaded and extracted. Defaults to None.

  • enable_proxy (bool) – Determines if the proxy should be enabled. Defaults to True.

  • delete_output_on_start (bool) – Determines if the operator output directory should be deleted on start. Defaults to False.

execution_timeout = datetime.timedelta(seconds=14400)

LocalGetInputDataOperator

class kaapana.operators.LocalGetInputDataOperator.LocalGetInputDataOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Operator to get input data for a workflow/dag.

This operator pulls all defined files from it’s defined source and stores the files in the workflow directory. All subsequent operators can get and process the files from within the workflow directory. Typically, this operator can be used as the first operator in a workflow.

Inputs:

  • data_form: ‘json’

  • data_type: ‘dicom’ or ‘json’

  • dataset_limit: limit the download series list number

  • include_custom_tag_property: Key in workflow_form used to specify tags that must be present in the data for inclusion

  • exclude_custom_tag_property: Key in workflow_form used to specify tags that, if present in the data, lead to exclusion

Outputs:

  • Stores downloaded ‘dicoms’ or ‘json’ files in the ‘operator_out_dir’

Parameters:
  • data_type – ‘dicom’ or ‘json’

  • data_form – ‘json’

  • check_modality – ‘True’ or ‘False’

  • dataset_limit – limits the download list

  • include_custom_tag_property – key in workflow_form for filtering with tags that must exist

  • exclude_custom_tag_property – key in workflow_form for filtering with tags that must not exist

  • parallel_downloads – default 3, number of parallel downloads

check_dag_modality(input_modality)[source]
get_data(series_dict)[source]
move_series(src_dcm_path: str, target: str)[source]
start(ds, **kwargs)[source]

LocalWorkflowCleanerOperator

class kaapana.operators.LocalWorkflowCleanerOperator.LocalWorkflowCleanerOperator(*args: Any, **kwargs: Any)[source]

Bases: KaapanaPythonBaseOperator

Cleans a local workflow.

Cleans a local workflow and optionally deleting the workflow directory.

Parameters:
  • run_dir – Path to directory of the workflow

  • clean_workflow_dir – Bool if workflow directory should be deleted

start(ds, **kwargs)[source]