Source code for kaapana.operators.LocalWorkflowCleanerOperator

import shutil
from pathlib import Path

from kaapana.blueprints.kaapana_utils import (
    get_operator_properties,
    clean_previous_dag_run,
)
from kaapana.operators.KaapanaPythonBaseOperator import KaapanaPythonBaseOperator


[docs]class LocalWorkflowCleanerOperator(KaapanaPythonBaseOperator): """ Cleans a local workflow. Cleans a local workflow and optionally deleting the workflow directory. """
[docs] def start(self, ds, **kwargs): run_id, dag_run_dir, dag_run, downstream_tasks = get_operator_properties( self.airflow_workflow_dir, **kwargs ) conf = dag_run.conf clean_previous_dag_run(self.airflow_workflow_dir, conf, "from_previous_dag_run") clean_previous_dag_run( self.airflow_workflow_dir, conf, "before_previous_dag_run" ) run_dir = dag_run_dir if self.run_dir is None else self.run_dir if self.clean_workflow_dir and Path(run_dir).exists(): print(("Cleaning dir: %s" % run_dir)) shutil.rmtree(run_dir)
def __init__( self, dag, run_dir: str = None, clean_workflow_dir: bool = True, **kwargs ): """ :param run_dir: Path to directory of the workflow :param clean_workflow_dir: Bool if workflow directory should be deleted """ self.run_dir = run_dir self.clean_workflow_dir = clean_workflow_dir super().__init__( dag=dag, name="workflow-cleaner", python_callable=self.start, **kwargs )