Source code for kaapana.operators.LocalTaggingOperator

import os
import json
import glob
from opensearchpy import OpenSearch
from enum import Enum
from typing import List
from kaapana.operators.KaapanaPythonBaseOperator import KaapanaPythonBaseOperator
from kaapana.blueprints.kaapana_global_variables import SERVICES_NAMESPACE


[docs]class LocalTaggingOperator(KaapanaPythonBaseOperator):
[docs] class Action(Enum): ADD = "add" DELETE = "delete" ADD_FROM_FILE = "add_from_file"
[docs] def tagging( self, series_instance_uid: str, tags: List[str], tags2add: List[str] = [], tags2delete: List[str] = [], ): print(series_instance_uid) print(f"Tags 2 add: {tags2add}") print(f"Tags 2 delete: {tags2delete}") # Read Tags auth = None os_client = OpenSearch( hosts=[{"host": self.opensearch_host, "port": self.opensearch_port}], http_compress=True, # enables gzip compression for request bodies http_auth=auth, # client_cert = client_cert_path, # client_key = client_key_path, use_ssl=False, verify_certs=False, ssl_assert_hostname=False, ssl_show_warn=False, timeout=2, # ca_certs = ca_certs_path ) doc = os_client.get(index=self.opensearch_index, id=series_instance_uid) print(doc) index_tags = doc["_source"].get(self.tag_field, []) final_tags = list( set(tags) .union(set(index_tags)) .difference(set(tags2delete)) .union(set(tags2add)) ) print(f"Final tags: {final_tags}") # Write Tags back body = {"doc": {self.tag_field: final_tags}} os_client.update(index=self.opensearch_index, id=series_instance_uid, body=body)
[docs] def start(self, ds, **kwargs): print("Start tagging") tags = [] action = self.Action.ADD_FROM_FILE conf = kwargs["dag_run"].conf if "form_data" in conf: form_data = conf["form_data"] if "tags" in form_data: tags = form_data["tags"].split(",") if "action" in form_data: action_param = form_data["action"].lower().strip() action = self.Action(action_param) print(f"Action: {action}") print(f"Tags from form: {tags}") run_dir = os.path.join(self.airflow_workflow_dir, kwargs["dag_run"].run_id) batch_folder = [ f for f in glob.glob(os.path.join(run_dir, self.batch_name, "*")) ] for batch_element_dir in batch_folder: json_files = sorted( glob.glob( os.path.join(batch_element_dir, self.operator_in_dir, "*.json*"), recursive=True, ) ) for meta_files in json_files: print(f"Do tagging for file {meta_files}") with open(meta_files) as fs: metadata = json.load(fs) series_uid = metadata["0020000E SeriesInstanceUID_keyword"] existing_tags = metadata.get(self.tag_field, []) # Adding tags based on other fields of the file file_tags = [] if self.add_tags_from_file: for tag_from_file in self.tags_to_add_from_file: value = metadata.get(tag_from_file) if value: file_tags.extend(value) if action == self.Action.ADD_FROM_FILE: self.tagging(series_uid, tags=existing_tags, tags2add=file_tags) elif action == self.Action.ADD: self.tagging(series_uid, tags=existing_tags, tags2add=tags) elif action == self.Action.DELETE: self.tagging(series_uid, tags=existing_tags, tags2delete=tags)
def __init__( self, dag, tag_field: str = "00000000 Tags_keyword", name: str = "tagging", add_tags_from_file: bool = False, tags_to_add_from_file: List[str] = ["00120020 ClinicalTrialProtocolID_keyword"], opensearch_host=f"opensearch-service.{SERVICES_NAMESPACE}.svc", opensearch_port=9200, opensearch_index="meta-index", *args, **kwargs, ): """ :param tag_field: the field of the opensearch object where the tags are stored :param add_tags_from_file: determines if the content of the fields specified by tags_to_add_from_file are added as tags :param tags_to_add_from_file: a list of fields form the input json where the values are added as tags if add_tags_from_file is true """ self.tag_field = tag_field self.add_tags_from_file = add_tags_from_file self.tags_to_add_from_file = tags_to_add_from_file self.opensearch_host = opensearch_host self.opensearch_port = opensearch_port self.opensearch_index = opensearch_index super().__init__(dag=dag, name=name, python_callable=self.start, **kwargs)