import os
import shutil
import glob
import time
import secrets
import json
import requests
from airflow.exceptions import AirflowException
from datetime import timedelta
from kaapana.operators.KaapanaPythonBaseOperator import KaapanaPythonBaseOperator
from kaapana.blueprints.kaapana_global_variables import (
PROCESSING_WORKFLOW_DIR,
ADMIN_NAMESPACE,
SERVICES_NAMESPACE,
JOBS_NAMESPACE,
)
from kaapana.blueprints.kaapana_utils import cure_invalid_name, get_release_name
[docs]class KaapanaApplicationOperator(KaapanaPythonBaseOperator):
HELM_API = f"http://kube-helm-service.{ADMIN_NAMESPACE}.svc:5000"
TIMEOUT = 60 * 60 * 12
[docs] def start(self, ds, **kwargs):
print(kwargs)
release_name = (
get_release_name(kwargs) if self.release_name is None else self.release_name
)
dynamic_volumes_dict = {
"af-data-jobs": PROCESSING_WORKFLOW_DIR,
"minio-jobs": "/minio",
"mounted-scripts-jobs": "/kaapana/mounted/workflows/mounted_scripts",
}
dynamic_volumes = {}
for idx, (name, mount_path) in enumerate(dynamic_volumes_dict.items()):
dynamic_volumes.update(
{
f"global.dynamicVolumes[{idx}].name": name,
f"global.dynamicVolumes[{idx}].mount_path": mount_path,
}
)
payload = {
"name": f"{self.chart_name}",
"version": self.version,
"release_name": release_name,
"sets": {
"global.namespace": JOBS_NAMESPACE,
**dynamic_volumes,
"mount_path": f'{self.data_dir}/{kwargs["run_id"]}',
"workflow_dir": f'{str(PROCESSING_WORKFLOW_DIR)}/{kwargs["run_id"]}',
"batch_name": str(self.batch_name),
"operator_out_dir": str(self.operator_out_dir),
"operator_in_dir": str(self.operator_in_dir),
"batches_input_dir": f'{str(PROCESSING_WORKFLOW_DIR)}/{kwargs["run_id"]}/{self.batch_name}',
},
}
conf = kwargs["dag_run"].conf
if "form_data" in conf:
form_data = conf["form_data"]
if "annotator" in form_data:
payload["sets"]["annotator"] = form_data["annotator"]
for set_key, set_value in self.sets.items():
payload["sets"][set_key] = set_value
url = f"{KaapanaApplicationOperator.HELM_API}/helm-install-chart"
print("payload")
print(payload)
r = requests.post(url, json=payload)
print(r)
print(r.text)
r.raise_for_status()
t_end = time.time() + KaapanaApplicationOperator.TIMEOUT
while time.time() < t_end:
time.sleep(15)
url = f"{KaapanaApplicationOperator.HELM_API}/view-chart-status"
r = requests.get(url, params={"release_name": release_name})
if r.status_code == 500 or r.status_code == 404:
print(f"Release {release_name} was uninstalled. My job is done here!")
break
r.raise_for_status()
[docs] @staticmethod
def uninstall_helm_chart(kwargs):
release_name = get_release_name(kwargs)
url = f"{KaapanaApplicationOperator.HELM_API}/helm-delete-chart"
r = requests.post(url, params={"release_name": release_name})
r.raise_for_status()
print(r)
print(r.text)
[docs] @staticmethod
def on_failure(context):
"""
Use this method with caution, because it unclear at which state the context object is updated!
"""
print("##################################################### ON FAILURE!")
[docs] @staticmethod
def on_retry(context):
"""
Use this method with caution, because it unclear at which state the context object is updated!
"""
print("##################################################### ON RETRY!")
def __init__(
self,
dag,
chart_name,
version,
name="helm-chart",
data_dir=None,
sets=None,
release_name=None,
**kwargs,
):
self.chart_name = chart_name
self.version = version
self.sets = sets or dict()
self.data_dir = data_dir or os.getenv("DATADIR", "")
self.release_name = release_name
super().__init__(
dag=dag,
name=name,
python_callable=self.start,
execution_timeout=timedelta(seconds=KaapanaApplicationOperator.TIMEOUT),
**kwargs,
)