Source code for kaapana.operators.LocalDeleteFromMetaOperator
import glob
import os
import pydicom
import json
from kaapana.operators.KaapanaPythonBaseOperator import KaapanaPythonBaseOperator
from kaapana.operators.HelperOpensearch import HelperOpensearch
[docs]class LocalDeleteFromMetaOperator(KaapanaPythonBaseOperator):
"""
Operator to remove series from OpenSearch's index.
This operator removes either selected series or whole studies from OpenSearch's index for the functional unit Meta.
The operator relies on OpenSearch's "delete_by_query" function.
**Inputs:**
* Input data which should be removed is given via input parameter: delete_operator.
"""
[docs] def start(self, ds, **kwargs):
conf = kwargs["dag_run"].conf
if (
"form_data" in conf
and conf["form_data"] is not None
and "delete_complete_study" in conf["form_data"]
):
self.delete_complete_study = conf["form_data"]["delete_complete_study"]
print("Delete entire study set to ", self.delete_complete_study)
if self.delete_all_documents:
print("Deleting all documents from META ...")
query = {"query": {"match_all": {}}}
HelperOpensearch.delete_by_query(query)
else:
run_dir = os.path.join(self.airflow_workflow_dir, kwargs["dag_run"].run_id)
batch_folder = [
f for f in glob.glob(os.path.join(run_dir, self.batch_name, "*"))
]
dicoms_to_delete = []
for batch_element_dir in batch_folder:
dcm_files = sorted(
glob.glob(
os.path.join(batch_element_dir, self.operator_in_dir, "*.dcm*"),
recursive=True,
)
)
if len(dcm_files) > 0:
incoming_dcm = pydicom.dcmread(dcm_files[0])
series_uid = incoming_dcm.SeriesInstanceUID
study_uid = incoming_dcm.StudyInstanceUID
if self.delete_complete_study:
dicoms_to_delete.append(study_uid)
else:
dicoms_to_delete.append(series_uid)
else:
json_files = sorted(
glob.glob(
os.path.join(
batch_element_dir, self.operator_in_dir, "*.json*"
),
recursive=True,
)
)
for meta_files in json_files:
with open(meta_files) as fs:
metadata = json.load(fs)
dicoms_to_delete.append(
{
"study_uid": metadata[
"0020000D StudyInstanceUID_keyword"
],
"series_uid": metadata[
"0020000E SeriesInstanceUID_keyword"
],
}
)
if self.delete_complete_study:
query = {
"query": {
"terms": {"0020000D StudyInstanceUID_keyword": dicoms_to_delete}
}
}
else:
query = {"query": {"terms": {"_id": dicoms_to_delete}}}
HelperOpensearch.delete_by_query(query)
def __init__(
self,
dag,
delete_operator=None,
delete_all_documents=False,
delete_complete_study=False,
**kwargs
):
"""
:param delete_operator:
:param delete_all_documents: Specifies the amount of removed data to all documents.
:param delete_complete_study: Specifies the amount of removed data to all series of a specified study.
"""
self.delete_all_documents = delete_all_documents
self.delete_complete_study = delete_complete_study
super().__init__(
dag=dag, name="delete-meta", python_callable=self.start, **kwargs
)