Source code for kaapana.operators.LocalCtpQuarantineCheckOperator

import glob
import os
import shutil
from pathlib import Path
from kaapana.blueprints.kaapana_utils import generate_run_id
from kaapana.operators.KaapanaPythonBaseOperator import KaapanaPythonBaseOperator
from airflow.api.common.trigger_dag import trigger_dag as trigger
import pydicom
from datetime import timedelta


[docs]class LocalCtpQuarantineCheckOperator(KaapanaPythonBaseOperator): """ Operator to check the CTP quarantine folder (FASTDATADIR/ctp/incoming/.quarantines), for dicom files. If files are found, trigger_dag_id is triggered. **Inputs:** * Quarantine folder of the CTP Found quarantine files are processed as incoming files and added to the PACs and meta. """
[docs] def check(self, **kwargs): conf = kwargs["dag_run"].conf if conf and "dataInputDirs" in conf: print("This is already a Dag triggered by this operator") return quarantine_path = os.path.join( "/kaapana/mounted/ctpinput", "incoming", ".quarantines" ) path_list = [p for p in Path(quarantine_path).rglob("*.dcm") if p.is_file()] if path_list: print("Files found in quarantine!") # limit number of handled file in the same dag run devided_path_list = [ path_list[x : x + self.max_number_of_batch_files] for x in range(0, len(path_list), self.max_number_of_batch_files) ] for path_list_part in devided_path_list: dag_run_id = generate_run_id(self.trigger_dag_id) print("MOVE with dag run id: ", dag_run_id) target_list = set() try: for dcm_file in path_list_part: series_uid = pydicom.dcmread(dcm_file, force=True)[ 0x0020, 0x000E ].value target = os.path.join( self.airflow_workflow_dir, dag_run_id, "batch", series_uid, self.target_dir, ) if not os.path.exists(target): os.makedirs(target) print("SRC: {}".format(dcm_file)) print("TARGET: {}".format(target)) target_list.add(target) shutil.move(str(dcm_file), target) conf = {"dataInputDirs": list(target_list)} except Exception as e: print("An exception occurred, when moving files:") print(e) print("Please have a look at this file:" + str(dcm_file)) print("Remove or future process unvaild file") if target_list: print("Trigger all already moved files.") conf = {"dataInputDirs": list(target_list)} trigger( dag_id=self.trigger_dag_id, run_id=dag_run_id, conf=conf, replace_microseconds=False, ) exit(1) print( ( "TRIGGERING! DAG-ID: %s RUN_ID: %s" % (self.trigger_dag_id, dag_run_id) ) ) trigger( dag_id=self.trigger_dag_id, run_id=dag_run_id, conf=conf, replace_microseconds=False, )
def __init__( self, dag, trigger_dag_id="service-process-incoming-dcm", max_number_of_batch_files=2000, target_dir="get-input-data", **kwargs ): """ :param trigger_dag_id: Is by default "service-process-incoming-dcm", has to be set for a different incoming process. :param max_number_of_batch_files: default 2000, defines the maximum of files handled in a single dag trigger. :param target_dir: The input dir of the trigger_dag_id. Has to be set, for a different incoming process. """ name = "ctp-quarantine-check" self.trigger_dag_id = trigger_dag_id self.max_number_of_batch_files = max_number_of_batch_files self.target_dir = target_dir super().__init__( dag=dag, name=name, python_callable=self.check, execution_timeout=timedelta(minutes=180), **kwargs )