Source code for kaapana.operators.LocalCleanUpExpiredWorkflowDataOperator

from minio import Minio
import os
import time
import glob
from datetime import timedelta
from datetime import datetime
import shutil
from kaapana.operators.KaapanaPythonBaseOperator import KaapanaPythonBaseOperator


[docs]class LocalCleanUpExpiredWorkflowDataOperator(KaapanaPythonBaseOperator): """ Operator to cleanup/remove the expired workflows data directories **Inputs:** * dag: DAG to be cleaned. * expired_period: Clean items that have expired since a certain period. """
[docs] def start(self, ds, **kwargs): conf = kwargs["dag_run"].conf print(f"Expired time {self.expired_period}") print(f"Working in {self.airflow_workflow_dir}") for dag_id in os.listdir(self.airflow_workflow_dir): target_dir = os.path.join(self.airflow_workflow_dir, dag_id) youngest_time = 0 modified_time = 0 for file_path in glob.glob(f"{target_dir}/**/*", recursive=True): modified_time = os.path.getmtime(file_path) if modified_time > youngest_time: youngest_time = modified_time age_in_seconds = time.time() - youngest_time print(f"Checking in {dag_id}") print(f"Age of directory {timedelta(seconds=age_in_seconds)}") print( f'Last changed {datetime.fromtimestamp(modified_time).strftime("%A, %B %d, %Y %I:%M:%S")}' ) if age_in_seconds > self.expired_period.total_seconds(): print( f'Removing folder since it was last modified on the {datetime.fromtimestamp(modified_time).strftime("%A, %B %d, %Y %I:%M:%S")}' ) shutil.rmtree(target_dir, ignore_errors=True) return
def __init__(self, dag, expired_period=timedelta(days=60), **kwargs): """ :param dag: DAG in which the operator has to be executed. :param expired_period: Clean items that have expired since in day(s), default: 60 days """ self.expired_period = expired_period super().__init__( dag=dag, name=f"clean-up", python_callable=self.start, **kwargs )