Skip to content

SparkKubernetesOperator error related to self.launcher #58228

@Ferdinanddb

Description

@Ferdinanddb

Apache Airflow version

3.1.2

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

Sometimes, I enter a weird state and it seems to happen in the following chain of events:

  • The first retry fails on Airflow side for some reason, but not on Kubernetes side where the job is running. So I guess this can be called a zombie task.
  • The second retry will, in my case, find that a driver pod exists matching the job, and so it will start listening to it. And the problem happens at the end when the job finish and Airflow tries to delete the pod: there is no self.launcher object available.

We can see that here:

    def execute(self, context: Context):
        self.name = self.create_job_name()

        self._setup_spark_configuration(context)

        if self.deferrable:
            self.execute_async(context)

        return super().execute(context)

    def _setup_spark_configuration(self, context: Context):
        """Set up Spark-specific configuration including reattach logic."""
        import copy

        template_body = copy.deepcopy(self.template_body)

        if self.reattach_on_restart:
            task_context_labels = self._get_ti_pod_labels(context)

            existing_pod = self.find_spark_job(context)
            if existing_pod:
                self.log.info(
                    "Found existing Spark driver pod %s. Reattaching to it.", existing_pod.metadata.name
                )
                self.pod = existing_pod
                self.pod_request_obj = None
                return

            if "spark" not in template_body:
                template_body["spark"] = {}
            if "spec" not in template_body["spark"]:
                template_body["spark"]["spec"] = {}

            spec_dict = template_body["spark"]["spec"]

            if "labels" not in spec_dict:
                spec_dict["labels"] = {}
            spec_dict["labels"].update(task_context_labels)

            for component in ["driver", "executor"]:
                if component not in spec_dict:
                    spec_dict[component] = {}

                if "labels" not in spec_dict[component]:
                    spec_dict[component]["labels"] = {}

                spec_dict[component]["labels"].update(task_context_labels)

        self.log.info("Creating sparkApplication.")
        self.launcher = CustomObjectLauncher(
            name=self.name,
            namespace=self.namespace,
            kube_client=self.client,
            custom_obj_api=self.custom_obj_api,
            template_body=template_body,
        )
        self.pod = self.get_or_create_spark_crd(self.launcher, context)
        self.pod_request_obj = self.launcher.pod_spec

Notice that, in my example, the second retries will enter the if block here, and so it will return early without initializing the self.launcher object:

            existing_pod = self.find_spark_job(context)
            if existing_pod:
                self.log.info(
                    "Found existing Spark driver pod %s. Reattaching to it.", existing_pod.metadata.name
                )
                self.pod = existing_pod
                self.pod_request_obj = None
                return

This explain why we are missing the self.launcher.

Here is a typical task I run that can enter this state:

maintenance_job = SparkKubernetesOperator(
        task_id="maintenance_job",
        namespace="spark-operator",
        application_file="spark_app/maintenance_job/spark_application_config.yml",
        kubernetes_conn_id="kubernetes_default",
        random_name_suffix=True,
        get_logs=True,
        reattach_on_restart=True,
        delete_on_termination=True,
        do_xcom_push=False,
        deferrable=False,
        retries=1,
        trigger_rule="none_failed_min_one_success",
        on_execute_callback=upload_spark_config_to_gcs,
    )

What you think should happen instead?

The job should succeed, and the pod should be deleted.

How to reproduce

Enter a state where you have a zombie task that triggered a spark job on retry number 0. Then, on retry 1, the Spark driver pod will be found and Airflow will listen to it. Once the Spark job is completed, Airflow will try to delete the pod and other objects associated to the job, but it won't be able to do it since the self.launcher object is missing.

Operating System

Official Airflow image: docker.io/apache/airflow:3.1.1-python3.12

Versions of Apache Airflow Providers

See deployment details, but cncf-kubernetes provider version is 10.8.1, for info.

Deployment

Official Apache Airflow Helm Chart

Deployment details

I use helm with a custom image built via this Dockerfile:

FROM docker.io/apache/airflow:3.1.1-python3.12

USER root

# Copy requirements to working directory
COPY requirements.txt /var/airflow/requirements.txt

# Set the working directory in the container
WORKDIR /var/airflow


USER airflow

RUN pip install --upgrade pip

# Install the necessary dependencies
RUN pip install \
    --no-cache-dir \
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.1/constraints-3.12.txt" \
    -r /var/airflow/requirements.txt

The requirements.txt file is:

apache-airflow[amazon,google,postgres,async,cncf.kubernetes,celery,slack,http,fab,standard,openlineage]==3.1.1

Anything else?

I opened another issue regarding the deferrable mode not working on this operator, and I believe that this issue and the other one are somehow connected in the sense that something is wrongly done in the operator.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions