reduce configmap and secret watch of kubelet

Signed-off-by: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
This commit is contained in:
zhaoxiaohu 2024-08-23 16:55:52 +08:00 committed by liuxu
parent cefe431cf4
commit 8df5d228f7
2 changed files with 529 additions and 1 deletions

View File

@ -0,0 +1,524 @@
From 8e4558ba2469a92a215f43c027a4e8e082b38fe1 Mon Sep 17 00:00:00 2001
From: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Date: Fri, 23 Aug 2024 14:52:03 +0800
Subject: [PATCH] reduce configmap and secret watch of kubelet
Reference: https://github.com/kubernetes/kubernetes/pull/99393/commits/57a3b0abd678c66a9a04e553b6d6ae49671a4779
Signed-off-by: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Signed-off-by: chenyw1990 <chenyawei1@huawei.com>
Signed-off-by: yuwang <yuwang@kuaishou.com>
---
pkg/kubelet/configmap/configmap_manager.go | 4 +-
pkg/kubelet/kubelet.go | 4 +-
pkg/kubelet/secret/secret_manager.go | 4 +-
.../util/manager/watch_based_manager.go | 166 +++++++++++++++---
.../util/manager/watch_based_manager_test.go | 101 +++++++++--
.../integration/kubelet/watch_manager_test.go | 4 +-
6 files changed, 245 insertions(+), 38 deletions(-)
diff --git a/pkg/kubelet/configmap/configmap_manager.go b/pkg/kubelet/configmap/configmap_manager.go
index 2f525357..5466dd8b 100644
--- a/pkg/kubelet/configmap/configmap_manager.go
+++ b/pkg/kubelet/configmap/configmap_manager.go
@@ -135,7 +135,7 @@ func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.G
// - whenever a pod is created or updated, we start individual watches for all
// referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches
-func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
+func NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
listConfigMap := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().ConfigMaps(namespace).List(context.TODO(), opts)
}
@@ -153,6 +153,6 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
}
gr := corev1.Resource("configmap")
return &configMapManager{
- manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames),
+ manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, resyncInterval, getConfigMapNames),
}
}
diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go
index 9cb92eda..c6f9e6e5 100644
--- a/pkg/kubelet/kubelet.go
+++ b/pkg/kubelet/kubelet.go
@@ -526,8 +526,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
- secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
- configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
+ secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval)
+ configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go
index 3aa132a5..af17711e 100644
--- a/pkg/kubelet/secret/secret_manager.go
+++ b/pkg/kubelet/secret/secret_manager.go
@@ -136,7 +136,7 @@ func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetO
// - whenever a pod is created or updated, we start individual watches for all
// referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches
-func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
+func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts)
}
@@ -154,6 +154,6 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
}
gr := corev1.Resource("secret")
return &secretManager{
- manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames),
+ manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames),
}
}
diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go
index 4dfe1037..cee515ca 100644
--- a/pkg/kubelet/util/manager/watch_based_manager.go
+++ b/pkg/kubelet/util/manager/watch_based_manager.go
@@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@@ -46,25 +47,108 @@ type isImmutableFunc func(runtime.Object) bool
// objectCacheItem is a single item stored in objectCache.
type objectCacheItem struct {
refCount int
- store cache.Store
+ store *cacheStore
+ reflector *cache.Reflector
+
hasSynced func() (bool, error)
- // lock is protecting from closing stopCh multiple times.
- lock sync.Mutex
- stopCh chan struct{}
+ // waitGroup is used to ensure that there won't be two concurrent calls to reflector.Run
+ waitGroup sync.WaitGroup
+
+ // lock is to ensure the access and modify of lastAccessTime, stopped, and immutable are thread safety,
+ // and protecting from closing stopCh multiple times.
+ lock sync.Mutex
+ lastAccessTime time.Time
+ stopped bool
+ immutable bool
+ stopCh chan struct{}
}
func (i *objectCacheItem) stop() bool {
i.lock.Lock()
defer i.lock.Unlock()
- select {
- case <-i.stopCh:
- // This means that channel is already closed.
+ return i.stopThreadUnsafe()
+}
+
+func (i *objectCacheItem) stopThreadUnsafe() bool {
+ if i.stopped {
return false
- default:
- close(i.stopCh)
- return true
}
+ i.stopped = true
+ close(i.stopCh)
+ if !i.immutable {
+ i.store.unsetInitialized()
+ }
+ return true
+}
+
+func (i *objectCacheItem) setLastAccessTime(time time.Time) {
+ i.lock.Lock()
+ defer i.lock.Unlock()
+ i.lastAccessTime = time
+}
+
+func (i *objectCacheItem) setImmutable() {
+ i.lock.Lock()
+ defer i.lock.Unlock()
+ i.immutable = true
+}
+
+func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
+ i.lock.Lock()
+ defer i.lock.Unlock()
+ if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) {
+ return i.stopThreadUnsafe()
+ }
+ return false
+}
+
+func (i *objectCacheItem) restartReflectorIfNeeded() {
+ i.lock.Lock()
+ defer i.lock.Unlock()
+ if i.immutable || !i.stopped {
+ return
+ }
+ i.stopCh = make(chan struct{})
+ i.stopped = false
+ go i.startReflector()
+}
+
+func (i *objectCacheItem) startReflector() {
+ i.waitGroup.Wait()
+ i.waitGroup.Add(1)
+ defer i.waitGroup.Done()
+ i.reflector.Run(i.stopCh)
+}
+
+// cacheStore is in order to rewrite Replace function to mark initialized flag
+type cacheStore struct {
+ cache.Store
+ lock sync.Mutex
+ initialized bool
+}
+
+func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ err := c.Store.Replace(list, resourceVersion)
+ if err != nil {
+ return err
+ }
+ c.initialized = true
+ return nil
+}
+
+func (c *cacheStore) hasSynced() bool {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ return c.initialized
+}
+
+func (c *cacheStore) unsetInitialized() {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ c.initialized = false
}
// objectCache is a local cache of objects propagated via
@@ -75,35 +159,53 @@ type objectCache struct {
newObject newObjectFunc
isImmutable isImmutableFunc
groupResource schema.GroupResource
+ clock clock.Clock
+ maxIdleTime time.Duration
lock sync.RWMutex
items map[objectKey]*objectCacheItem
}
+const minIdleTime = 1 * time.Minute
+
// NewObjectCache returns a new watch-based instance of Store interface.
func NewObjectCache(
listObject listObjectFunc,
watchObject watchObjectFunc,
newObject newObjectFunc,
isImmutable isImmutableFunc,
- groupResource schema.GroupResource) Store {
- return &objectCache{
+ groupResource schema.GroupResource,
+ clock clock.Clock,
+ maxIdleTime time.Duration) Store {
+
+ if maxIdleTime < minIdleTime {
+ maxIdleTime = minIdleTime
+ }
+
+ store := &objectCache{
listObject: listObject,
watchObject: watchObject,
newObject: newObject,
isImmutable: isImmutable,
groupResource: groupResource,
+ clock: clock,
+ maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem),
}
+
+ // TODO propagate stopCh from the higher level.
+ go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop)
+ return store
}
-func (c *objectCache) newStore() cache.Store {
+func (c *objectCache) newStore() *cacheStore {
// TODO: We may consider created a dedicated store keeping just a single
// item, instead of using a generic store implementation for this purpose.
// However, simple benchmarks show that memory overhead in that case is
// decrease from ~600B to ~300B per object. So we are not optimizing it
// until we will see a good reason for that.
- return cache.NewStore(cache.MetaNamespaceKeyFunc)
+ store := cache.NewStore(cache.MetaNamespaceKeyFunc)
+ return &cacheStore{store, sync.Mutex{}, false}
}
func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
@@ -124,14 +226,15 @@ func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
store,
0,
)
- stopCh := make(chan struct{})
- go reflector.Run(stopCh)
- return &objectCacheItem{
+ item := &objectCacheItem{
refCount: 0,
store: store,
- hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil },
- stopCh: stopCh,
+ reflector: reflector,
+ hasSynced: func() (bool, error) { return store.hasSynced(), nil },
+ stopCh: make(chan struct{}),
}
+ go item.startReflector()
+ return item
}
func (c *objectCache) AddReference(namespace, name string) {
@@ -186,10 +289,11 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
if !exists {
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
}
+ item.restartReflectorIfNeeded()
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
}
-
+ item.setLastAccessTime(c.clock.Now())
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
if err != nil {
return nil, err
@@ -209,6 +313,7 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
// - doing that would require significant refactoring to reflector
// we limit ourselves to just quickly stop the reflector here.
if utilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes) && c.isImmutable(object) {
+ item.setImmutable()
if item.stop() {
klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name)
}
@@ -218,6 +323,17 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
return nil, fmt.Errorf("unexpected object type: %v", obj)
}
+func (c *objectCache) startRecycleIdleWatch() {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ for key, item := range c.items {
+ if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) {
+ klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime)
+ }
+ }
+}
+
// NewWatchBasedManager creates a manager that keeps a cache of all objects
// necessary for registered pods.
// It implements the following logic:
@@ -230,7 +346,15 @@ func NewWatchBasedManager(
newObject newObjectFunc,
isImmutable isImmutableFunc,
groupResource schema.GroupResource,
+ resyncInterval time.Duration,
getReferencedObjects func(*v1.Pod) sets.String) Manager {
- objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource)
+
+ // If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle,
+ // We just want to stop the objectCacheItem referenced by environment variables,
+ // So, maxIdleTime is set to an integer multiple of resyncInterval,
+ // We currently set it to 5 times.
+ maxIdleTime := resyncInterval * 5
+
+ objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime)
return NewCacheBasedManager(objectStore, getReferencedObjects)
}
diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go
index 1f103404..17a8b01a 100644
--- a/pkg/kubelet/util/manager/watch_based_manager_test.go
+++ b/pkg/kubelet/util/manager/watch_based_manager_test.go
@@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@@ -62,13 +63,15 @@ func isSecretImmutable(object runtime.Object) bool {
return false
}
-func newSecretCache(fakeClient clientset.Interface) *objectCache {
+func newSecretCache(fakeClient clientset.Interface, fakeClock clock.Clock, maxIdleTime time.Duration) *objectCache {
return &objectCache{
listObject: listSecret(fakeClient),
watchObject: watchSecret(fakeClient),
newObject: func() runtime.Object { return &v1.Secret{} },
isImmutable: isSecretImmutable,
groupResource: corev1.Resource("secret"),
+ clock: fakeClock,
+ maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem),
}
}
@@ -88,7 +91,8 @@ func TestSecretCache(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
- store := newSecretCache(fakeClient)
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
store.AddReference("ns", "name")
_, err := store.Get("ns", "name")
@@ -157,7 +161,8 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
- store := newSecretCache(fakeClient)
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
store.AddReference("ns", "name")
// This should trigger List and Watch actions eventually.
@@ -264,7 +269,8 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
- store := newSecretCache(fakeClient)
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
key := objectKey{namespace: "ns", name: "name"}
itemExists := func() (bool, error) {
@@ -280,12 +286,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
item.lock.Lock()
defer item.lock.Unlock()
- select {
- case <-item.stopCh:
- return false
- default:
- return true
- }
+ return !item.stopped
}
// AddReference should start reflector.
@@ -330,3 +331,83 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
})
}
}
+
+func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
+ secret := &v1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "name",
+ Namespace: "ns",
+ ResourceVersion: "200",
+ },
+ }
+
+ fakeClient := &fake.Clientset{}
+ listReactor := func(a core.Action) (bool, runtime.Object, error) {
+ result := &v1.SecretList{
+ ListMeta: metav1.ListMeta{
+ ResourceVersion: "200",
+ },
+ Items: []v1.Secret{*secret},
+ }
+
+ return true, result, nil
+ }
+
+ fakeClient.AddReactor("list", "secrets", listReactor)
+ fakeWatch := watch.NewFake()
+ fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
+
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
+
+ key := objectKey{namespace: "ns", name: "name"}
+ itemExists := func() (bool, error) {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ _, ok := store.items[key]
+ return ok, nil
+ }
+
+ reflectorRunning := func() bool {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ item := store.items[key]
+
+ item.lock.Lock()
+ defer item.lock.Unlock()
+ return !item.stopped
+ }
+
+ // AddReference should start reflector.
+ store.AddReference("ns", "name")
+ if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
+ t.Errorf("item wasn't added to cache")
+ }
+
+ obj, _ := store.Get("ns", "name")
+ assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
+
+ assert.True(t, reflectorRunning())
+
+ fakeClock.Step(90 * time.Second)
+ store.startRecycleIdleWatch()
+
+ // Reflector should already be stopped for maxIdleTime exceeded.
+ assert.False(t, reflectorRunning())
+
+ obj, _ = store.Get("ns", "name")
+ assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
+ // Reflector should reRun after get secret again.
+ assert.True(t, reflectorRunning())
+
+ fakeClock.Step(20 * time.Second)
+ _, _ = store.Get("ns", "name")
+ fakeClock.Step(20 * time.Second)
+ _, _ = store.Get("ns", "name")
+ fakeClock.Step(20 * time.Second)
+ _, _ = store.Get("ns", "name")
+ store.startRecycleIdleWatch()
+
+ // Reflector should be running when the get function is called periodically.
+ assert.True(t, reflectorRunning())
+}
diff --git a/test/integration/kubelet/watch_manager_test.go b/test/integration/kubelet/watch_manager_test.go
index a065bc26..07c934b5 100644
--- a/test/integration/kubelet/watch_manager_test.go
+++ b/test/integration/kubelet/watch_manager_test.go
@@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
@@ -61,7 +62,8 @@ func TestWatchBasedManager(t *testing.T) {
// We want all watches to be up and running to stress test it.
// So don't treat any secret as immutable here.
isImmutable := func(_ runtime.Object) bool { return false }
- store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"})
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute)
// create 1000 secrets in parallel
t.Log(time.Now(), "creating 1000 secrets")
--
2.33.0

View File

@ -3,7 +3,7 @@
Name: kubernetes
Version: 1.20.2
Release: 22
Release: 23
Summary: Container cluster management
License: ASL 2.0
URL: https://k8s.io/kubernetes
@ -41,6 +41,7 @@ Patch6013: 0014-fix-node-address-validation.patch
Patch6014: 0015-Add-ephemeralcontainer-to-imagepolicy-securityaccoun.patch
Patch6015: 0016-Add-envFrom-to-serviceaccount-admission-plugin.patch
Patch6016: 0017-backport-Fix-kubelet-panic-when-allocate-resource-for-pod.patch
Patch6017: 0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch
%description
Container cluster management.
@ -272,6 +273,9 @@ getent passwd kube >/dev/null || useradd -r -g kube -d / -s /sbin/nologin \
%systemd_postun kubelet kube-proxy
%changelog
* Fri Aug 23 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-23
- DESC:reduce configmap and secret watch of kubelet
* Thu Aug 22 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-22
- Type:bugfix
- CVE:NA