From 8df5d228f79efbfced00e0b2249b44500fe646e2 Mon Sep 17 00:00:00 2001 From: zhaoxiaohu Date: Fri, 23 Aug 2024 16:55:52 +0800 Subject: [PATCH] reduce configmap and secret watch of kubelet Signed-off-by: zhaoxiaohu --- ...onfigmap-and-secret-watch-of-kubelet.patch | 524 ++++++++++++++++++ kubernetes.spec | 6 +- 2 files changed, 529 insertions(+), 1 deletion(-) create mode 100644 0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch diff --git a/0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch b/0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch new file mode 100644 index 0000000..582f86e --- /dev/null +++ b/0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch @@ -0,0 +1,524 @@ +From 8e4558ba2469a92a215f43c027a4e8e082b38fe1 Mon Sep 17 00:00:00 2001 +From: zhaoxiaohu +Date: Fri, 23 Aug 2024 14:52:03 +0800 +Subject: [PATCH] reduce configmap and secret watch of kubelet + +Reference: https://github.com/kubernetes/kubernetes/pull/99393/commits/57a3b0abd678c66a9a04e553b6d6ae49671a4779 + +Signed-off-by: zhaoxiaohu +Signed-off-by: chenyw1990 +Signed-off-by: yuwang +--- + pkg/kubelet/configmap/configmap_manager.go | 4 +- + pkg/kubelet/kubelet.go | 4 +- + pkg/kubelet/secret/secret_manager.go | 4 +- + .../util/manager/watch_based_manager.go | 166 +++++++++++++++--- + .../util/manager/watch_based_manager_test.go | 101 +++++++++-- + .../integration/kubelet/watch_manager_test.go | 4 +- + 6 files changed, 245 insertions(+), 38 deletions(-) + +diff --git a/pkg/kubelet/configmap/configmap_manager.go b/pkg/kubelet/configmap/configmap_manager.go +index 2f525357..5466dd8b 100644 +--- a/pkg/kubelet/configmap/configmap_manager.go ++++ b/pkg/kubelet/configmap/configmap_manager.go +@@ -135,7 +135,7 @@ func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.G + // - whenever a pod is created or updated, we start individual watches for all + // referenced objects that aren't referenced from other registered pods + // - every GetObject() returns a value from local cache propagated via watches +-func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager { ++func NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager { + listConfigMap := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.CoreV1().ConfigMaps(namespace).List(context.TODO(), opts) + } +@@ -153,6 +153,6 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager { + } + gr := corev1.Resource("configmap") + return &configMapManager{ +- manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames), ++ manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, resyncInterval, getConfigMapNames), + } + } +diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go +index 9cb92eda..c6f9e6e5 100644 +--- a/pkg/kubelet/kubelet.go ++++ b/pkg/kubelet/kubelet.go +@@ -526,8 +526,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, + var configMapManager configmap.Manager + switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy { + case kubeletconfiginternal.WatchChangeDetectionStrategy: +- secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient) +- configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient) ++ secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval) ++ configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval) + case kubeletconfiginternal.TTLCacheChangeDetectionStrategy: + secretManager = secret.NewCachingSecretManager( + kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) +diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go +index 3aa132a5..af17711e 100644 +--- a/pkg/kubelet/secret/secret_manager.go ++++ b/pkg/kubelet/secret/secret_manager.go +@@ -136,7 +136,7 @@ func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetO + // - whenever a pod is created or updated, we start individual watches for all + // referenced objects that aren't referenced from other registered pods + // - every GetObject() returns a value from local cache propagated via watches +-func NewWatchingSecretManager(kubeClient clientset.Interface) Manager { ++func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager { + listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) { + return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts) + } +@@ -154,6 +154,6 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager { + } + gr := corev1.Resource("secret") + return &secretManager{ +- manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames), ++ manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames), + } + } +diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go +index 4dfe1037..cee515ca 100644 +--- a/pkg/kubelet/util/manager/watch_based_manager.go ++++ b/pkg/kubelet/util/manager/watch_based_manager.go +@@ -31,6 +31,7 @@ import ( + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" ++ "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" +@@ -46,25 +47,108 @@ type isImmutableFunc func(runtime.Object) bool + // objectCacheItem is a single item stored in objectCache. + type objectCacheItem struct { + refCount int +- store cache.Store ++ store *cacheStore ++ reflector *cache.Reflector ++ + hasSynced func() (bool, error) + +- // lock is protecting from closing stopCh multiple times. +- lock sync.Mutex +- stopCh chan struct{} ++ // waitGroup is used to ensure that there won't be two concurrent calls to reflector.Run ++ waitGroup sync.WaitGroup ++ ++ // lock is to ensure the access and modify of lastAccessTime, stopped, and immutable are thread safety, ++ // and protecting from closing stopCh multiple times. ++ lock sync.Mutex ++ lastAccessTime time.Time ++ stopped bool ++ immutable bool ++ stopCh chan struct{} + } + + func (i *objectCacheItem) stop() bool { + i.lock.Lock() + defer i.lock.Unlock() +- select { +- case <-i.stopCh: +- // This means that channel is already closed. ++ return i.stopThreadUnsafe() ++} ++ ++func (i *objectCacheItem) stopThreadUnsafe() bool { ++ if i.stopped { + return false +- default: +- close(i.stopCh) +- return true + } ++ i.stopped = true ++ close(i.stopCh) ++ if !i.immutable { ++ i.store.unsetInitialized() ++ } ++ return true ++} ++ ++func (i *objectCacheItem) setLastAccessTime(time time.Time) { ++ i.lock.Lock() ++ defer i.lock.Unlock() ++ i.lastAccessTime = time ++} ++ ++func (i *objectCacheItem) setImmutable() { ++ i.lock.Lock() ++ defer i.lock.Unlock() ++ i.immutable = true ++} ++ ++func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool { ++ i.lock.Lock() ++ defer i.lock.Unlock() ++ if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) { ++ return i.stopThreadUnsafe() ++ } ++ return false ++} ++ ++func (i *objectCacheItem) restartReflectorIfNeeded() { ++ i.lock.Lock() ++ defer i.lock.Unlock() ++ if i.immutable || !i.stopped { ++ return ++ } ++ i.stopCh = make(chan struct{}) ++ i.stopped = false ++ go i.startReflector() ++} ++ ++func (i *objectCacheItem) startReflector() { ++ i.waitGroup.Wait() ++ i.waitGroup.Add(1) ++ defer i.waitGroup.Done() ++ i.reflector.Run(i.stopCh) ++} ++ ++// cacheStore is in order to rewrite Replace function to mark initialized flag ++type cacheStore struct { ++ cache.Store ++ lock sync.Mutex ++ initialized bool ++} ++ ++func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error { ++ c.lock.Lock() ++ defer c.lock.Unlock() ++ err := c.Store.Replace(list, resourceVersion) ++ if err != nil { ++ return err ++ } ++ c.initialized = true ++ return nil ++} ++ ++func (c *cacheStore) hasSynced() bool { ++ c.lock.Lock() ++ defer c.lock.Unlock() ++ return c.initialized ++} ++ ++func (c *cacheStore) unsetInitialized() { ++ c.lock.Lock() ++ defer c.lock.Unlock() ++ c.initialized = false + } + + // objectCache is a local cache of objects propagated via +@@ -75,35 +159,53 @@ type objectCache struct { + newObject newObjectFunc + isImmutable isImmutableFunc + groupResource schema.GroupResource ++ clock clock.Clock ++ maxIdleTime time.Duration + + lock sync.RWMutex + items map[objectKey]*objectCacheItem + } + ++const minIdleTime = 1 * time.Minute ++ + // NewObjectCache returns a new watch-based instance of Store interface. + func NewObjectCache( + listObject listObjectFunc, + watchObject watchObjectFunc, + newObject newObjectFunc, + isImmutable isImmutableFunc, +- groupResource schema.GroupResource) Store { +- return &objectCache{ ++ groupResource schema.GroupResource, ++ clock clock.Clock, ++ maxIdleTime time.Duration) Store { ++ ++ if maxIdleTime < minIdleTime { ++ maxIdleTime = minIdleTime ++ } ++ ++ store := &objectCache{ + listObject: listObject, + watchObject: watchObject, + newObject: newObject, + isImmutable: isImmutable, + groupResource: groupResource, ++ clock: clock, ++ maxIdleTime: maxIdleTime, + items: make(map[objectKey]*objectCacheItem), + } ++ ++ // TODO propagate stopCh from the higher level. ++ go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop) ++ return store + } + +-func (c *objectCache) newStore() cache.Store { ++func (c *objectCache) newStore() *cacheStore { + // TODO: We may consider created a dedicated store keeping just a single + // item, instead of using a generic store implementation for this purpose. + // However, simple benchmarks show that memory overhead in that case is + // decrease from ~600B to ~300B per object. So we are not optimizing it + // until we will see a good reason for that. +- return cache.NewStore(cache.MetaNamespaceKeyFunc) ++ store := cache.NewStore(cache.MetaNamespaceKeyFunc) ++ return &cacheStore{store, sync.Mutex{}, false} + } + + func (c *objectCache) newReflector(namespace, name string) *objectCacheItem { +@@ -124,14 +226,15 @@ func (c *objectCache) newReflector(namespace, name string) *objectCacheItem { + store, + 0, + ) +- stopCh := make(chan struct{}) +- go reflector.Run(stopCh) +- return &objectCacheItem{ ++ item := &objectCacheItem{ + refCount: 0, + store: store, +- hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil }, +- stopCh: stopCh, ++ reflector: reflector, ++ hasSynced: func() (bool, error) { return store.hasSynced(), nil }, ++ stopCh: make(chan struct{}), + } ++ go item.startReflector() ++ return item + } + + func (c *objectCache) AddReference(namespace, name string) { +@@ -186,10 +289,11 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { + if !exists { + return nil, fmt.Errorf("object %q/%q not registered", namespace, name) + } ++ item.restartReflectorIfNeeded() + if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil { + return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err) + } +- ++ item.setLastAccessTime(c.clock.Now()) + obj, exists, err := item.store.GetByKey(c.key(namespace, name)) + if err != nil { + return nil, err +@@ -209,6 +313,7 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { + // - doing that would require significant refactoring to reflector + // we limit ourselves to just quickly stop the reflector here. + if utilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes) && c.isImmutable(object) { ++ item.setImmutable() + if item.stop() { + klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name) + } +@@ -218,6 +323,17 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { + return nil, fmt.Errorf("unexpected object type: %v", obj) + } + ++func (c *objectCache) startRecycleIdleWatch() { ++ c.lock.Lock() ++ defer c.lock.Unlock() ++ ++ for key, item := range c.items { ++ if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) { ++ klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime) ++ } ++ } ++} ++ + // NewWatchBasedManager creates a manager that keeps a cache of all objects + // necessary for registered pods. + // It implements the following logic: +@@ -230,7 +346,15 @@ func NewWatchBasedManager( + newObject newObjectFunc, + isImmutable isImmutableFunc, + groupResource schema.GroupResource, ++ resyncInterval time.Duration, + getReferencedObjects func(*v1.Pod) sets.String) Manager { +- objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource) ++ ++ // If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle, ++ // We just want to stop the objectCacheItem referenced by environment variables, ++ // So, maxIdleTime is set to an integer multiple of resyncInterval, ++ // We currently set it to 5 times. ++ maxIdleTime := resyncInterval * 5 ++ ++ objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime) + return NewCacheBasedManager(objectStore, getReferencedObjects) + } +diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go +index 1f103404..17a8b01a 100644 +--- a/pkg/kubelet/util/manager/watch_based_manager_test.go ++++ b/pkg/kubelet/util/manager/watch_based_manager_test.go +@@ -28,6 +28,7 @@ import ( + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ++ "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + +@@ -62,13 +63,15 @@ func isSecretImmutable(object runtime.Object) bool { + return false + } + +-func newSecretCache(fakeClient clientset.Interface) *objectCache { ++func newSecretCache(fakeClient clientset.Interface, fakeClock clock.Clock, maxIdleTime time.Duration) *objectCache { + return &objectCache{ + listObject: listSecret(fakeClient), + watchObject: watchSecret(fakeClient), + newObject: func() runtime.Object { return &v1.Secret{} }, + isImmutable: isSecretImmutable, + groupResource: corev1.Resource("secret"), ++ clock: fakeClock, ++ maxIdleTime: maxIdleTime, + items: make(map[objectKey]*objectCacheItem), + } + } +@@ -88,7 +91,8 @@ func TestSecretCache(t *testing.T) { + fakeWatch := watch.NewFake() + fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) + +- store := newSecretCache(fakeClient) ++ fakeClock := clock.NewFakeClock(time.Now()) ++ store := newSecretCache(fakeClient, fakeClock, time.Minute) + + store.AddReference("ns", "name") + _, err := store.Get("ns", "name") +@@ -157,7 +161,8 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) { + fakeWatch := watch.NewFake() + fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) + +- store := newSecretCache(fakeClient) ++ fakeClock := clock.NewFakeClock(time.Now()) ++ store := newSecretCache(fakeClient, fakeClock, time.Minute) + + store.AddReference("ns", "name") + // This should trigger List and Watch actions eventually. +@@ -264,7 +269,8 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { + fakeWatch := watch.NewFake() + fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) + +- store := newSecretCache(fakeClient) ++ fakeClock := clock.NewFakeClock(time.Now()) ++ store := newSecretCache(fakeClient, fakeClock, time.Minute) + + key := objectKey{namespace: "ns", name: "name"} + itemExists := func() (bool, error) { +@@ -280,12 +286,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { + + item.lock.Lock() + defer item.lock.Unlock() +- select { +- case <-item.stopCh: +- return false +- default: +- return true +- } ++ return !item.stopped + } + + // AddReference should start reflector. +@@ -330,3 +331,83 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { + }) + } + } ++ ++func TestMaxIdleTimeStopsTheReflector(t *testing.T) { ++ secret := &v1.Secret{ ++ ObjectMeta: metav1.ObjectMeta{ ++ Name: "name", ++ Namespace: "ns", ++ ResourceVersion: "200", ++ }, ++ } ++ ++ fakeClient := &fake.Clientset{} ++ listReactor := func(a core.Action) (bool, runtime.Object, error) { ++ result := &v1.SecretList{ ++ ListMeta: metav1.ListMeta{ ++ ResourceVersion: "200", ++ }, ++ Items: []v1.Secret{*secret}, ++ } ++ ++ return true, result, nil ++ } ++ ++ fakeClient.AddReactor("list", "secrets", listReactor) ++ fakeWatch := watch.NewFake() ++ fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) ++ ++ fakeClock := clock.NewFakeClock(time.Now()) ++ store := newSecretCache(fakeClient, fakeClock, time.Minute) ++ ++ key := objectKey{namespace: "ns", name: "name"} ++ itemExists := func() (bool, error) { ++ store.lock.Lock() ++ defer store.lock.Unlock() ++ _, ok := store.items[key] ++ return ok, nil ++ } ++ ++ reflectorRunning := func() bool { ++ store.lock.Lock() ++ defer store.lock.Unlock() ++ item := store.items[key] ++ ++ item.lock.Lock() ++ defer item.lock.Unlock() ++ return !item.stopped ++ } ++ ++ // AddReference should start reflector. ++ store.AddReference("ns", "name") ++ if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil { ++ t.Errorf("item wasn't added to cache") ++ } ++ ++ obj, _ := store.Get("ns", "name") ++ assert.True(t, apiequality.Semantic.DeepEqual(secret, obj)) ++ ++ assert.True(t, reflectorRunning()) ++ ++ fakeClock.Step(90 * time.Second) ++ store.startRecycleIdleWatch() ++ ++ // Reflector should already be stopped for maxIdleTime exceeded. ++ assert.False(t, reflectorRunning()) ++ ++ obj, _ = store.Get("ns", "name") ++ assert.True(t, apiequality.Semantic.DeepEqual(secret, obj)) ++ // Reflector should reRun after get secret again. ++ assert.True(t, reflectorRunning()) ++ ++ fakeClock.Step(20 * time.Second) ++ _, _ = store.Get("ns", "name") ++ fakeClock.Step(20 * time.Second) ++ _, _ = store.Get("ns", "name") ++ fakeClock.Step(20 * time.Second) ++ _, _ = store.Get("ns", "name") ++ store.startRecycleIdleWatch() ++ ++ // Reflector should be running when the get function is called periodically. ++ assert.True(t, reflectorRunning()) ++} +diff --git a/test/integration/kubelet/watch_manager_test.go b/test/integration/kubelet/watch_manager_test.go +index a065bc26..07c934b5 100644 +--- a/test/integration/kubelet/watch_manager_test.go ++++ b/test/integration/kubelet/watch_manager_test.go +@@ -27,6 +27,7 @@ import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" ++ "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" +@@ -61,7 +62,8 @@ func TestWatchBasedManager(t *testing.T) { + // We want all watches to be up and running to stress test it. + // So don't treat any secret as immutable here. + isImmutable := func(_ runtime.Object) bool { return false } +- store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}) ++ fakeClock := clock.NewFakeClock(time.Now()) ++ store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute) + + // create 1000 secrets in parallel + t.Log(time.Now(), "creating 1000 secrets") +-- +2.33.0 + diff --git a/kubernetes.spec b/kubernetes.spec index 875d87f..be49c31 100644 --- a/kubernetes.spec +++ b/kubernetes.spec @@ -3,7 +3,7 @@ Name: kubernetes Version: 1.20.2 -Release: 22 +Release: 23 Summary: Container cluster management License: ASL 2.0 URL: https://k8s.io/kubernetes @@ -41,6 +41,7 @@ Patch6013: 0014-fix-node-address-validation.patch Patch6014: 0015-Add-ephemeralcontainer-to-imagepolicy-securityaccoun.patch Patch6015: 0016-Add-envFrom-to-serviceaccount-admission-plugin.patch Patch6016: 0017-backport-Fix-kubelet-panic-when-allocate-resource-for-pod.patch +Patch6017: 0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch %description Container cluster management. @@ -272,6 +273,9 @@ getent passwd kube >/dev/null || useradd -r -g kube -d / -s /sbin/nologin \ %systemd_postun kubelet kube-proxy %changelog +* Fri Aug 23 2024 zhaoxiaohu - 1.20.2-23 +- DESC:reduce configmap and secret watch of kubelet + * Thu Aug 22 2024 zhaoxiaohu - 1.20.2-22 - Type:bugfix - CVE:NA