Source code for kaapana.operators.LocalDicomSendOperator

import glob
import os
from datetime import timedelta
from pathlib import Path
from subprocess import PIPE, run

import pydicom
from kaapana.blueprints.kaapana_global_variables import SERVICES_NAMESPACE
from kaapana.operators.KaapanaPythonBaseOperator import KaapanaPythonBaseOperator
from kaapana.operators.HelperDcmWeb import HelperDcmWeb


[docs]class LocalDicomSendOperator(KaapanaPythonBaseOperator): """ Operator sends data to the platform locally. This operator is used for sending data to the platform locally. For dcmsend documentation please have a look at https://support.dcmtk.org/docs/dcmsend.html. """
[docs] def send_dicom_data(self, send_dir, series_uid): if len(list(Path(send_dir).rglob("*.dcm"))) == 0: print(send_dir) print("############### no dicoms found...!") raise FileNotFoundError print( f"Sending {send_dir} to {self.host} {self.port} with aetitle {self.aetitle} and aetitle_send {self.aetitle_send}" ) command = [ "dcmsend", "-v", f"{self.host}", f"{self.port}", "-aet", f"{self.aetitle_send}", "-aec", f"{self.aetitle}", "--scan-directories", "--scan-pattern", "*.dcm", "--recurse", f"{send_dir}", ] timeout = self.execution_timeout.seconds - 60 print("The timeout is set to: ", timeout) output = run( command, stdout=PIPE, stderr=PIPE, universal_newlines=True, timeout=timeout ) if output.returncode != 0 or "with status SUCCESS" not in str(output): print("############### Something went wrong with dcmsend!") for line in str(output).split("\\n"): print(line) print("##################################################") raise ValueError( "dcmsend exited with non-zero code or without success message!" ) else: print(f"Success! output: {output}") print("") if self.check_arrival and not self.dcmweb_helper.check_if_series_in_archive( seriesUID=series_uid ): print(f"Arrival check failed!") raise ValueError("Checkick arrival of dicoms failed!")
[docs] def start(self, **kwargs): self.conf = kwargs["dag_run"].conf self.dcmweb_helper = HelperDcmWeb( application_entity=self.aetitle, dag_run=kwargs["dag_run"] ) run_dir = os.path.join(self.airflow_workflow_dir, kwargs["dag_run"].run_id) batch_folders = [ f for f in glob.glob(os.path.join(run_dir, self.batch_name, "*")) ] study_instance_uids = set() for batch_element_dir in batch_folders: print("input operator ", self.operator_in_dir) element_input_dir = os.path.join(batch_element_dir, self.operator_in_dir) print(element_input_dir) dcm_files = sorted( glob.glob(os.path.join(element_input_dir, "**/*.dcm*"), recursive=True) ) number_of_instances = len(dcm_files) if number_of_instances == 0: continue print("Number of instances to send: ", number_of_instances) dcm_file = dcm_files[0] print("dcm-file: {}".format(dcm_file)) ds = pydicom.dcmread(dcm_file) series_uid = str(ds[0x0020, 0x000E].value) self.send_dicom_data(element_input_dir, series_uid) study_instance_uids.add(str(ds[0x0020, 0x000D].value)) for study_instance_uid in study_instance_uids: self.dcmweb_helper.set_access_control_id_of_study(study_instance_uid) self.dcmweb_helper.add_access_control_id_to_ae(study_instance_uid)
def __init__( self, dag, pacs_host: str = f"dcm4chee-service.{SERVICES_NAMESPACE}.svc", pacs_port: str = "11115", ae_title: str = "KAAPANA", aetitle_send: str = "kaapana", check_arrival: str = "False", **kwargs, ): """ :param pacs_host: Host of PACS :param pacs_port: Port of PACS :param ae_title: calling Application Entity (AE) title :param aetitle_send: called AE title of peer :param check_arrival: Verifies if data transfer was successful """ # The use of the local operator, to be able to set the aetitle_send! self.host = pacs_host self.port = pacs_port self.aetitle = ae_title self.aetitle_send = aetitle_send self.check_arrival = check_arrival self.name = "local-dcm-send" self.task_id = self.name super().__init__( dag=dag, task_id=self.task_id, name=self.name, python_callable=self.start, execution_timeout=timedelta(minutes=60), **kwargs, )