Source code for kaapana.operators.LocalMinioOperator

import datetime
import glob
import os
from datetime import timedelta
from zipfile import ZipFile

from kaapana.blueprints.kaapana_global_variables import SERVICES_NAMESPACE
from kaapana.operators.HelperCaching import cache_operator_output
from kaapana.operators.HelperMinio import HelperMinio
from kaapana.operators.KaapanaPythonBaseOperator import KaapanaPythonBaseOperator
from minio import Minio


[docs]class LocalMinioOperator(KaapanaPythonBaseOperator): """ Operator to communicate with MinIO buckets """
[docs] @cache_operator_output def start(self, ds, **kwargs): dag_run = kwargs["dag_run"] conf = kwargs["dag_run"].conf print("conf", conf) if ( conf is not None and "form_data" in conf and conf["form_data"] is not None and "zip_files" in conf["form_data"] ): self.zip_files = conf["form_data"]["zip_files"] print("Zip files set by form data", self.zip_files) if conf is not None and "data_form" in conf: for attr in [ "bucket_name", "action_operator_dirs", "action_operators", "action_files", ]: if attr in conf["data_form"]: print(f'From data_form {attr}={conf["data_form"][attr]}') setattr(self, attr, conf["data_form"][attr]) ################### # TODO: Can't be used like this, since token expires, we should use presigned_urls, which should be generated when the airflow is triggered # if 'conf' in conf: # if 'x_auth_token' in conf: # access_key, secret_key, session_token = generate_minio_credentials(conf['x_auth_token']) # else: # access_key = os.environ.get('MINIOUSER'), # secret_key = os.environ.get('MINIOPASSWORD') # session_token = None ################### # Todo: actually should be in pre_execute, however, when utilizing # Airflow PythonOperator pre_execute seems to have no effect... # For files coming from Minio hooks! if conf is not None and "Key" in conf: self.bucket_name = conf["Key"].split("/")[0] object_names = ["/".join(conf["Key"].split("/")[1:])] else: object_names = [] minioClient = HelperMinio(dag_run=dag_run) run_dir = os.path.join(self.airflow_workflow_dir, kwargs["dag_run"].run_id) local_root_dir = self.local_root_dir.format(run_dir=run_dir) print(f"Working relative to the following director: {local_root_dir}") batch_folder = [ f for f in glob.glob(os.path.join(local_root_dir, self.batch_name, "*")) ] print(batch_folder) if self.bucket_name is None: print("No BUCKETID env set!") self.bucket_name = kwargs["dag"].dag_id print("Generated Bucket-Id: %s" % self.bucket_name) object_dirs = [] # Get contents from run_dir object_dirs = object_dirs + self.action_operator_dirs for action_operator in self.action_operators: object_dirs.append( os.path.relpath( os.path.join(run_dir, action_operator.operator_out_dir), local_root_dir, ) ) # Get contents from batch_elements for batch_element_dir in batch_folder: for operator_dir in self.action_operator_dirs: object_dirs.append( os.path.relpath( os.path.join(batch_element_dir, operator_dir), local_root_dir ) ) for action_operator in self.action_operators: object_dirs.append( os.path.relpath( os.path.join( batch_element_dir, action_operator.operator_out_dir ), local_root_dir, ) ) # Files to apply action # Add object_names object_names = object_names + self.action_files # Add relative file paths from operators for object_dir in object_dirs: for action_file in self.action_files: object_names.append(os.path.join(object_dir, action_file)) if self.zip_files: timestamp = (datetime.datetime.now()).strftime("%y-%m-%d-%H:%M:%S%f") target_dir = os.path.join(run_dir, self.operator_out_dir) if not os.path.exists(target_dir): os.makedirs(target_dir) zip_object_name = ( f"{kwargs['dag'].dag_id}_{kwargs['dag_run'].run_id}_{timestamp}.zip" ) zip_file_path = os.path.join(target_dir, zip_object_name) with ZipFile(zip_file_path, "w") as zipObj: if not object_dirs: print(f"Zipping everything from {local_root_dir}") object_dirs = [""] else: print(f'Zipping everything from {", ".join(object_dirs)}') for object_dir in object_dirs: for path, _, files in os.walk( os.path.join(local_root_dir, object_dir) ): for name in files: file_path = os.path.join(path, name) rel_dir = os.path.relpath(path, local_root_dir) rel_dir = "" if rel_dir == "." else rel_dir if rel_dir == self.operator_out_dir: print( "Skipping files in {rel_dir}, due to " "recursive zipping!" ) continue object_name = os.path.join(rel_dir, name) zipObj.write(os.path.join(path, name), object_name) minioClient.apply_action_to_file( "put", self.bucket_name, zip_object_name, zip_file_path, self.file_white_tuples, ) return if object_names: print(f'Applying action "{self.action}" to files {object_names}') minioClient.apply_action_to_object_names( self.action, self.bucket_name, local_root_dir, object_names, self.file_white_tuples, ) else: if not object_dirs: print(f"Applying action to whole bucket") else: print(f'Applying action "{self.action}" to ' f"files in: {object_dirs}") minioClient.apply_action_to_object_dirs( self.action, self.bucket_name, local_root_dir, object_dirs, self.file_white_tuples, ) return
def __init__( self, dag, action="get", # 'get', 'remove' or 'put' name=None, local_root_dir="{run_dir}", bucket_name=None, action_operators=None, action_operator_dirs=None, action_files=None, minio_host: str = f"minio-service.{SERVICES_NAMESPACE}.svc", minio_port: str = "9000", file_white_tuples=None, zip_files: bool = False, **kwargs, ): """ :param action: Action to execute ('get', 'remove' or 'put') :param local_root_dir: Workflow directory :param bucket_name: Name of the Bucket to interact with :param action_operators: Operator to use the output data from :param action_operator_dirs: (Additional) directory to apply MinIO action on. :param action_files: (Additional) files to apply MinIO action on. :param minio_host: MinIO host :param minio_port: MinIO port :param file_white_tuples: Optional whitelisting for files :param zip_files: If files should be zipped """ if action not in ["get", "remove", "put"]: raise AssertionError("action must be get, remove or put") if action == "put": file_white_tuples = file_white_tuples or ( ".json", ".mat", ".py", ".zip", ".txt", ".gz", ".csv", "pdf", "png", "jpg", ) name = name or f"minio-actions-{action}" self.action = action self.local_root_dir = local_root_dir self.bucket_name = bucket_name self.action_operator_dirs = action_operator_dirs or [] self.action_operators = action_operators or [] self.action_files = action_files or [] self.minio_host = minio_host self.minio_port = minio_port self.file_white_tuples = file_white_tuples self.zip_files = zip_files super(LocalMinioOperator, self).__init__( dag=dag, name=name, python_callable=self.start, execution_timeout=timedelta(minutes=30), **kwargs, )