-
Notifications
You must be signed in to change notification settings - Fork 15.9k
Description
Apache Airflow version
3.1.2
If "Other Airflow 2/3 version" selected, which one?
No response
What happened?
Sometimes, I enter a weird state and it seems to happen in the following chain of events:
- The first retry fails on Airflow side for some reason, but not on Kubernetes side where the job is running. So I guess this can be called a zombie task.
- The second retry will, in my case, find that a driver pod exists matching the job, and so it will start listening to it. And the problem happens at the end when the job finish and Airflow tries to delete the pod: there is no
self.launcherobject available.
We can see that here:
def execute(self, context: Context):
self.name = self.create_job_name()
self._setup_spark_configuration(context)
if self.deferrable:
self.execute_async(context)
return super().execute(context)
def _setup_spark_configuration(self, context: Context):
"""Set up Spark-specific configuration including reattach logic."""
import copy
template_body = copy.deepcopy(self.template_body)
if self.reattach_on_restart:
task_context_labels = self._get_ti_pod_labels(context)
existing_pod = self.find_spark_job(context)
if existing_pod:
self.log.info(
"Found existing Spark driver pod %s. Reattaching to it.", existing_pod.metadata.name
)
self.pod = existing_pod
self.pod_request_obj = None
return
if "spark" not in template_body:
template_body["spark"] = {}
if "spec" not in template_body["spark"]:
template_body["spark"]["spec"] = {}
spec_dict = template_body["spark"]["spec"]
if "labels" not in spec_dict:
spec_dict["labels"] = {}
spec_dict["labels"].update(task_context_labels)
for component in ["driver", "executor"]:
if component not in spec_dict:
spec_dict[component] = {}
if "labels" not in spec_dict[component]:
spec_dict[component]["labels"] = {}
spec_dict[component]["labels"].update(task_context_labels)
self.log.info("Creating sparkApplication.")
self.launcher = CustomObjectLauncher(
name=self.name,
namespace=self.namespace,
kube_client=self.client,
custom_obj_api=self.custom_obj_api,
template_body=template_body,
)
self.pod = self.get_or_create_spark_crd(self.launcher, context)
self.pod_request_obj = self.launcher.pod_specNotice that, in my example, the second retries will enter the if block here, and so it will return early without initializing the self.launcher object:
existing_pod = self.find_spark_job(context)
if existing_pod:
self.log.info(
"Found existing Spark driver pod %s. Reattaching to it.", existing_pod.metadata.name
)
self.pod = existing_pod
self.pod_request_obj = None
returnThis explain why we are missing the self.launcher.
Here is a typical task I run that can enter this state:
maintenance_job = SparkKubernetesOperator(
task_id="maintenance_job",
namespace="spark-operator",
application_file="spark_app/maintenance_job/spark_application_config.yml",
kubernetes_conn_id="kubernetes_default",
random_name_suffix=True,
get_logs=True,
reattach_on_restart=True,
delete_on_termination=True,
do_xcom_push=False,
deferrable=False,
retries=1,
trigger_rule="none_failed_min_one_success",
on_execute_callback=upload_spark_config_to_gcs,
)What you think should happen instead?
The job should succeed, and the pod should be deleted.
How to reproduce
Enter a state where you have a zombie task that triggered a spark job on retry number 0. Then, on retry 1, the Spark driver pod will be found and Airflow will listen to it. Once the Spark job is completed, Airflow will try to delete the pod and other objects associated to the job, but it won't be able to do it since the self.launcher object is missing.
Operating System
Official Airflow image: docker.io/apache/airflow:3.1.1-python3.12
Versions of Apache Airflow Providers
See deployment details, but cncf-kubernetes provider version is 10.8.1, for info.
Deployment
Official Apache Airflow Helm Chart
Deployment details
I use helm with a custom image built via this Dockerfile:
FROM docker.io/apache/airflow:3.1.1-python3.12
USER root
# Copy requirements to working directory
COPY requirements.txt /var/airflow/requirements.txt
# Set the working directory in the container
WORKDIR /var/airflow
USER airflow
RUN pip install --upgrade pip
# Install the necessary dependencies
RUN pip install \
--no-cache-dir \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.1/constraints-3.12.txt" \
-r /var/airflow/requirements.txt
The requirements.txt file is:
apache-airflow[amazon,google,postgres,async,cncf.kubernetes,celery,slack,http,fab,standard,openlineage]==3.1.1
Anything else?
I opened another issue regarding the deferrable mode not working on this operator, and I believe that this issue and the other one are somehow connected in the sense that something is wrongly done in the operator.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct