Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -216,8 +217,9 @@ func main() {
// This improves the scalability of the system, both for KubeRay itself by reducing the size of the
// informers cache, and for the API server / etcd, by reducing the number of watch events.
// For example, KubeRay is only interested in the batch Jobs it creates when reconciling RayJobs,
// so the controller sets the app.kubernetes.io/created-by=kuberay-operator label on any Job it creates,
// and that label is provided to the manager cache as a selector for Job resources.
// and the Pods it creates when reconciling RayClusters,
// so the controller sets the app.kubernetes.io/created-by=kuberay-operator label on any Job and Pod it creates,
// and that label is provided to the manager cache as a selector for Job and Pod resources.
selectorsByObject, err := cacheSelectors()
exitOnError(err, "unable to create cache selectors")
options.Cache.ByObject = selectorsByObject
Expand Down Expand Up @@ -317,6 +319,7 @@ func cacheSelectors() (map[client.Object]cache.ByObject, error) {

return map[client.Object]cache.ByObject{
&batchv1.Job{}: {Label: selector},
&corev1.Pod{}: {Label: selector},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overridable cache-critical label can hide Pods from operator

Medium Severity

The new Pod cache selector filters on KubernetesCreatedByLabelKey, but labelPod doesn't protect this label from user override — only ray.io/node-type, ray.io/group, and ray.io/cluster are guarded. If a user specifies a different value for app.kubernetes.io/created-by in their pod template labels, the created Pod becomes invisible to the informer cache. The operator would then repeatedly create new Pods it can never observe, causing unbounded Pod creation. Unlike the existing Job cache filter (where the label is set directly by the operator with no override path), the Pod path passes user-provided labels through labelPod's override loop.

Fix in Cursor Fix in Web

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the code, the KubernetesCreatedByLabelKey label can indeed be overwritten by the user. Since these labels (ray.io/node-type, ray.io/group, and ray.io/cluster) cannot be overridden by the user's raycluster labels, and both worker pods and head pods will be set, caching can be done using any one of these tags. This can significantly reduce operator caching. An example of modification is as follows:

    rayPodLabel, err := labels.NewRequirement(utils.RayNodeLabelKey, selection.Equals, []string{"yes"})
    if err != nil {
        return nil, err
    }
    rayPodSelector := labels.NewSelector().Add(*rayPodLabel).Add(*label)
    
    return map[client.Object]cache.ByObject{
        &batchv1.Job{}: {Label: selector},
        &corev1.Pod{}:  {Label: rayPodSelector},
    }, nil

}, nil
}

Expand Down
29 changes: 29 additions & 0 deletions ray-operator/test/e2e/raycluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,32 @@ func TestRayClusterUpgradeStrategy(t *testing.T) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(newWorkerPods).To(HaveLen(1))
}

func TestRayClusterPodCacheSelector(t *testing.T) {
test := With(t)
g := NewWithT(t)

// Create a namespace
namespace := test.NewTestNamespace()

rayClusterAC := rayv1ac.RayCluster("raycluster-pod-cache", namespace.Name).
WithSpec(NewRayClusterSpec())

rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)

LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", rayCluster.Namespace, rayCluster.Name)
g.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium).
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))

// Verify all Pods carry the kuberay-operator label
pods, err := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test would be improved significantly if we created a Pod that did not have the KubeRay label and prove that KubeRay does not see it in it's cache informer. Otherwise we are just testing that the labels are applied correctly

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current test uses the Kubernetes API client directly, which bypasses the informer cache so it only proves that labels are applied correctly, not that the cache selector is actually filtering as intended.

Rather than adding this to the e2e test suite (where we don't have access to the manager's internal cache), I'd suggest adding an integration test in raycluster_controller_test.go, which already uses the envtest environment from suite_test.go. We'd need to expose mgr as a package-level variable, apply cacheSelectors() in BeforeSuite, then add a Describe block that creates one labeled and one unlabeled Pod and verifies that mgr.GetAPIReader().List() sees both while mgr.GetClient().List() only sees the labeled one.

g.Expect(err).NotTo(HaveOccurred())
g.Expect(pods.Items).NotTo(BeEmpty())

for _, pod := range pods.Items {
g.Expect(pod.Labels).To(HaveKeyWithValue(utils.KubernetesCreatedByLabelKey, utils.ComponentName),
"Pod %s/%s should have label %s=%s", pod.Namespace, pod.Name, utils.KubernetesCreatedByLabelKey, utils.ComponentName)
}
}
Loading