Skip to content

Commit 9252ce9

Browse files
authored
Enable/disable np controller based on configmap (#8)
* enable/disable controller based on configmap Use the common configmap kube-system/amazon-vpc-cni to check feature enablement. The controller will be enabled only if the configmap exists and contains the key enable-network-policy-controller in the data and the value is set to "true". * pass context from main
1 parent 726257d commit 9252ce9

File tree

4 files changed

+269
-3
lines changed

4 files changed

+269
-3
lines changed

cmd/main.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"os"
2122

2223
"github.com/go-logr/logr"
@@ -29,6 +30,7 @@ import (
2930

3031
"k8s.io/apimachinery/pkg/runtime"
3132
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33+
"k8s.io/client-go/kubernetes"
3234
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3335
ctrl "sigs.k8s.io/controller-runtime"
3436
"sigs.k8s.io/controller-runtime/pkg/healthz"
@@ -39,6 +41,7 @@ import (
3941
"github.com/aws/amazon-network-policy-controller-k8s/pkg/config"
4042
"github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s"
4143
"github.com/aws/amazon-network-policy-controller-k8s/pkg/policyendpoints"
44+
"github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/configmap"
4245
"github.com/aws/amazon-network-policy-controller-k8s/version"
4346
//+kubebuilder:scaffold:imports
4447
)
@@ -82,17 +85,36 @@ func main() {
8285
setupLog.Error(err, "unable to create controller manager")
8386
os.Exit(1)
8487
}
88+
clientSet, err := kubernetes.NewForConfig(mgr.GetConfig())
89+
if err != nil {
90+
setupLog.Error(err, "unable to obtain clientSet")
91+
os.Exit(1)
92+
}
93+
8594
ctx := ctrl.SetupSignalHandler()
86-
enablePolicyController := true
95+
enableNetworkPolicyController := true
96+
if controllerCFG.EnableConfigMapCheck {
97+
var cancelFn context.CancelFunc
98+
ctx, cancelFn = context.WithCancel(ctx)
99+
setupLog.Info("Enable network policy controller based on configuration", "configmap", configmap.GetControllerConfigMapId())
100+
configMapManager := config.NewConfigmapManager(configmap.GetControllerConfigMapId(),
101+
clientSet, cancelFn, configmap.GetConfigmapCheckFn(), ctrl.Log.WithName("configmap-manager"))
102+
if err := configMapManager.MonitorConfigMap(ctx); err != nil {
103+
setupLog.Error(err, "Unable to monitor configmap for checking if controller is enabled")
104+
os.Exit(1)
105+
}
106+
enableNetworkPolicyController = configMapManager.IsControllerEnabled()
107+
}
108+
87109
policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(mgr.GetClient(),
88110
controllerCFG.EndpointChunkSize, ctrl.Log.WithName("endpoints-manager"))
89111
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer-manager"))
90112
policyController := controllers.NewPolicyReconciler(mgr.GetClient(), policyEndpointsManager,
91113
controllerCFG, finalizerManager, ctrl.Log.WithName("controllers").WithName("policy"))
92-
if enablePolicyController {
114+
if enableNetworkPolicyController {
93115
setupLog.Info("Network Policy controller is enabled, starting watches")
94116
if err := policyController.SetupWithManager(ctx, mgr); err != nil {
95-
setupLog.Error(err, "unable to create controller", "controller", "policy")
117+
setupLog.Error(err, "Unable to setup network policy controller")
96118
os.Exit(1)
97119
}
98120
}

pkg/config/configmap_manager.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package config
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s"
8+
"github.com/go-logr/logr"
9+
corev1 "k8s.io/api/core/v1"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/fields"
12+
"k8s.io/apimachinery/pkg/runtime"
13+
"k8s.io/apimachinery/pkg/types"
14+
"k8s.io/apimachinery/pkg/watch"
15+
"k8s.io/client-go/kubernetes"
16+
"k8s.io/client-go/tools/cache"
17+
)
18+
19+
// +kubebuilder:rbac:groups="",resources=configmaps,namespace="system",resourceNames=amazon-vpc-cni,verbs=get;list;watch
20+
21+
type ConfigmapManager interface {
22+
MonitorConfigMap(ctx context.Context) error
23+
IsControllerEnabled() bool
24+
}
25+
26+
var _ ConfigmapManager = (*defaultConfigmapManager)(nil)
27+
28+
type defaultConfigmapManager struct {
29+
initialState bool
30+
resourceRef types.NamespacedName
31+
store cache.Store
32+
rt *cache.Reflector
33+
clientSet *kubernetes.Clientset
34+
logger logr.Logger
35+
cancelFn context.CancelFunc
36+
monitorStopChan chan struct{}
37+
storeNotifyChan chan struct{}
38+
configMapCheckFunction func(*corev1.ConfigMap) bool
39+
}
40+
41+
func NewConfigmapManager(resourceRef types.NamespacedName, clientSet *kubernetes.Clientset,
42+
cancelFn context.CancelFunc, configmapCheckFunction func(configMap *corev1.ConfigMap) bool, logger logr.Logger) *defaultConfigmapManager {
43+
storeNotifyChan := make(chan struct{})
44+
cmStore := k8s.NewConfigMapStore(storeNotifyChan)
45+
return &defaultConfigmapManager{
46+
clientSet: clientSet,
47+
resourceRef: resourceRef,
48+
store: cmStore,
49+
logger: logger,
50+
cancelFn: cancelFn,
51+
monitorStopChan: make(chan struct{}),
52+
storeNotifyChan: storeNotifyChan,
53+
configMapCheckFunction: configmapCheckFunction,
54+
}
55+
}
56+
57+
// IsControllerEnabled returns the initial state of the policy controller.
58+
func (m *defaultConfigmapManager) IsControllerEnabled() bool {
59+
m.logger.V(1).Info("IsControllerEnabled() returning", "value", m.initialState)
60+
return m.initialState
61+
}
62+
63+
// MonitorConfigMap starts cache reflector and watches for configmap updates.
64+
func (m *defaultConfigmapManager) MonitorConfigMap(ctx context.Context) error {
65+
fieldSelector := fields.Set{"metadata.name": m.resourceRef.Name}.AsSelector().String()
66+
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
67+
options.FieldSelector = fieldSelector
68+
return m.clientSet.CoreV1().ConfigMaps(m.resourceRef.Namespace).List(ctx, options)
69+
}
70+
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
71+
options.FieldSelector = fieldSelector
72+
return m.clientSet.CoreV1().ConfigMaps(m.resourceRef.Namespace).Watch(ctx, options)
73+
}
74+
m.rt = cache.NewReflector(&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
75+
&corev1.ConfigMap{},
76+
m.store,
77+
0,
78+
)
79+
go m.rt.Run(m.monitorStopChan)
80+
go m.listenForConfigMapUpdates()
81+
82+
if _, err := m.setInitialControllerState(); err != nil {
83+
m.logger.Info("Failed to set initial state", "err", err)
84+
return err
85+
}
86+
return nil
87+
}
88+
89+
// listen for the messages in the storeNotifyChan in a loop and update the state of the policy controller accordingly.
90+
func (m *defaultConfigmapManager) listenForConfigMapUpdates() {
91+
defer func() {
92+
m.logger.Info("Controller detected changes to the configmap, cancelling manager context")
93+
close(m.monitorStopChan)
94+
m.cancelFn()
95+
}()
96+
97+
for {
98+
select {
99+
case <-m.storeNotifyChan:
100+
enabled, err := m.getCurrentEnabledConfig()
101+
if err != nil {
102+
m.logger.Error(err, "Failed to get controller state from configmap")
103+
return
104+
}
105+
m.logger.V(1).Info("Received configmap notification", "initial", m.initialState,
106+
"new", enabled)
107+
if m.initialState != enabled {
108+
m.logger.Info("Controller state changed", "initial", m.initialState,
109+
"new", enabled)
110+
return
111+
}
112+
}
113+
}
114+
}
115+
116+
// getCurrentEnabledConfig gets the current state of the policy controller from the configmap
117+
func (m *defaultConfigmapManager) getCurrentEnabledConfig() (bool, error) {
118+
cm, exists, err := m.store.GetByKey(m.resourceRef.String())
119+
if err != nil {
120+
return false, err
121+
}
122+
if !exists {
123+
return false, nil
124+
}
125+
return m.configMapCheckFunction(cm.(*corev1.ConfigMap)), nil
126+
}
127+
128+
// setInitialControllerState sets the initial state of the policy controller based on the configmap
129+
func (m *defaultConfigmapManager) setInitialControllerState() (retVal bool, err error) {
130+
defer func() {
131+
m.logger.V(1).Info("setInitialControllerState", "retVal", retVal, "err", err)
132+
m.initialState = retVal
133+
}()
134+
// Wait for cache sync
135+
if !cache.WaitForCacheSync(m.monitorStopChan, func() bool {
136+
return m.rt.LastSyncResourceVersion() != ""
137+
}) {
138+
return false, errors.New("failed to sync configmap cache")
139+
}
140+
return m.getCurrentEnabledConfig()
141+
}

pkg/k8s/configmap_store.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package k8s
2+
3+
import (
4+
"k8s.io/client-go/tools/cache"
5+
)
6+
7+
// NewConfigMapStore constructs new conversionStore
8+
func NewConfigMapStore(notifyChan chan<- struct{}) *ConfigMapStore {
9+
return &ConfigMapStore{
10+
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
11+
notifyChannel: notifyChan,
12+
}
13+
}
14+
15+
var _ cache.Store = &ConfigMapStore{}
16+
17+
type ConfigMapStore struct {
18+
store cache.Store
19+
notifyChannel chan<- struct{}
20+
}
21+
22+
// Add adds the given object to the accumulator associated with the given object's key
23+
func (s *ConfigMapStore) Add(obj interface{}) error {
24+
if err := s.store.Add(obj); err != nil {
25+
return err
26+
}
27+
s.notifyChannel <- struct{}{}
28+
return nil
29+
}
30+
31+
// Update updates the given object in the accumulator associated with the given object's key
32+
func (s *ConfigMapStore) Update(obj interface{}) error {
33+
if err := s.store.Update(obj); err != nil {
34+
return err
35+
}
36+
s.notifyChannel <- struct{}{}
37+
return nil
38+
}
39+
40+
// Delete deletes the given object from the accumulator associated with the given object's key
41+
func (s *ConfigMapStore) Delete(obj interface{}) error {
42+
if err := s.store.Delete(obj); err != nil {
43+
return err
44+
}
45+
s.notifyChannel <- struct{}{}
46+
return nil
47+
}
48+
49+
// List returns a list of all the objects
50+
func (s *ConfigMapStore) List() []interface{} {
51+
return s.store.List()
52+
}
53+
54+
// ListKeys returns a list of all the keys
55+
func (s *ConfigMapStore) ListKeys() []string {
56+
return s.store.ListKeys()
57+
}
58+
59+
// Get returns the object with the given key
60+
func (s *ConfigMapStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
61+
return s.store.Get(obj)
62+
}
63+
64+
// GetByKey returns the object with the given key
65+
func (s *ConfigMapStore) GetByKey(key string) (item interface{}, exists bool, err error) {
66+
return s.store.GetByKey(key)
67+
}
68+
69+
// Replace will delete the contents of the store, using instead the given list.
70+
func (s *ConfigMapStore) Replace(list []interface{}, resourceVersion string) error {
71+
return s.store.Replace(list, resourceVersion)
72+
}
73+
74+
// Resync invokes the cache.store Resync method
75+
func (s *ConfigMapStore) Resync() error {
76+
return s.store.Resync()
77+
}

pkg/utils/configmap/configmap.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package configmap
2+
3+
import (
4+
corev1 "k8s.io/api/core/v1"
5+
"k8s.io/apimachinery/pkg/types"
6+
)
7+
8+
const (
9+
controllerConfigMapKey = "enable-network-policy-controller"
10+
controllerConfigEnabledValue = "true"
11+
)
12+
13+
// GetControllerConfigMapId returns the id for the configmap resource containing the controller config
14+
func GetControllerConfigMapId() types.NamespacedName {
15+
return types.NamespacedName{
16+
Namespace: "kube-system",
17+
Name: "amazon-vpc-cni",
18+
}
19+
}
20+
21+
// GetConfigmapCheckFn returns a function that checks if controller is enabled in the configmap
22+
func GetConfigmapCheckFn() func(configMap *corev1.ConfigMap) bool {
23+
return func(configMap *corev1.ConfigMap) bool {
24+
return configMap.Data[controllerConfigMapKey] == controllerConfigEnabledValue
25+
}
26+
}

0 commit comments

Comments
 (0)