Don't prematurely close reflectors in case of slow initialization in watch based manager

Signed-off-by: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
This commit is contained in:
zhaoxiaohu 2024-08-23 16:59:03 +08:00 committed by liuxu
parent 8df5d228f7
commit ef0ef3875b
2 changed files with 156 additions and 1 deletions

View File

@ -0,0 +1,151 @@
From 033bafcda990c254a53794556ff5aa9d2fceb1f7 Mon Sep 17 00:00:00 2001
From: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Date: Fri, 23 Aug 2024 15:04:42 +0800
Subject: [PATCH] Don't prematurely close reflectors in case of slow
initialization in watch based manager
Reference: https://github.com/kubernetes/kubernetes/pull/104604/commits/515106b7957c4da6fdbbb7e474d19bad0fa3f57f
Signed-off-by: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Signed-off-by: wojtekt <wojtekt@google.com>
Signed-off-by: yuwang <yuwang@kuaishou.com>
---
.../util/manager/watch_based_manager.go | 11 ++-
.../util/manager/watch_based_manager_test.go | 91 +++++++++++++++++++
2 files changed, 100 insertions(+), 2 deletions(-)
diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go
index cee515ca..d4e7597b 100644
--- a/pkg/kubelet/util/manager/watch_based_manager.go
+++ b/pkg/kubelet/util/manager/watch_based_manager.go
@@ -97,7 +97,11 @@ func (i *objectCacheItem) setImmutable() {
func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
i.lock.Lock()
defer i.lock.Unlock()
- if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) {
+ // Ensure that we don't try to stop not yet initialized reflector.
+ // In case of overloaded kube-apiserver, if the list request is
+ // already being processed, all the work would lost and would have
+ // to be retried.
+ if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) {
return i.stopThreadUnsafe()
}
return false
@@ -289,11 +293,14 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
if !exists {
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
}
+ // Record last access time independently if it succeeded or not.
+ // This protects from premature (racy) reflector closure.
+ item.setLastAccessTime(c.clock.Now())
+
item.restartReflectorIfNeeded()
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
}
- item.setLastAccessTime(c.clock.Now())
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
if err != nil {
return nil, err
diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go
index 17a8b01a..0fc9915c 100644
--- a/pkg/kubelet/util/manager/watch_based_manager_test.go
+++ b/pkg/kubelet/util/manager/watch_based_manager_test.go
@@ -411,3 +411,94 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
// Reflector should be running when the get function is called periodically.
assert.True(t, reflectorRunning())
}
+
+func TestReflectorNotStopedOnSlowInitialization(t *testing.T) {
+ secret := &v1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "name",
+ Namespace: "ns",
+ ResourceVersion: "200",
+ },
+ }
+
+ fakeClock := clock.NewFakeClock(time.Now())
+
+ fakeClient := &fake.Clientset{}
+ listReactor := func(a core.Action) (bool, runtime.Object, error) {
+ <-fakeClock.After(120 * time.Second)
+
+ result := &v1.SecretList{
+ ListMeta: metav1.ListMeta{
+ ResourceVersion: "200",
+ },
+ Items: []v1.Secret{*secret},
+ }
+
+ return true, result, nil
+ }
+
+ fakeClient.AddReactor("list", "secrets", listReactor)
+ fakeWatch := watch.NewFake()
+ fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
+
+ key := objectKey{namespace: "ns", name: "name"}
+ itemExists := func() (bool, error) {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ _, ok := store.items[key]
+ return ok, nil
+ }
+
+ reflectorRunning := func() bool {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ item := store.items[key]
+
+ item.lock.Lock()
+ defer item.lock.Unlock()
+ return !item.stopped
+ }
+
+ reflectorInitialized := func() (bool, error) {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ item := store.items[key]
+
+ item.lock.Lock()
+ defer item.lock.Unlock()
+ return item.store.hasSynced(), nil
+ }
+
+ // AddReference should start reflector.
+ store.AddReference("ns", "name")
+ if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
+ t.Errorf("item wasn't added to cache")
+ }
+
+ fakeClock.Step(90 * time.Second)
+ store.startRecycleIdleWatch()
+
+ // Reflector didn't yet initialize, so it shouldn't be stopped.
+ // However, Get should still be failing.
+ assert.True(t, reflectorRunning())
+ initialized, _ := reflectorInitialized()
+ assert.False(t, initialized)
+ _, err := store.Get("ns", "name")
+ if err == nil || !strings.Contains(err.Error(), "failed to sync") {
+ t.Errorf("Expected failed to sync error, got: %v", err)
+ }
+
+ // Initialization should successfully finish.
+ fakeClock.Step(30 * time.Second)
+ if err := wait.Poll(10*time.Millisecond, time.Second, reflectorInitialized); err != nil {
+ t.Errorf("reflector didn't iniailize correctly")
+ }
+
+ // recycling shouldn't stop the reflector because it was accessed within last minute.
+ store.startRecycleIdleWatch()
+ assert.True(t, reflectorRunning())
+
+ obj, _ := store.Get("ns", "name")
+ assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
+}
--
2.33.0

View File

@ -3,7 +3,7 @@
Name: kubernetes Name: kubernetes
Version: 1.20.2 Version: 1.20.2
Release: 23 Release: 24
Summary: Container cluster management Summary: Container cluster management
License: ASL 2.0 License: ASL 2.0
URL: https://k8s.io/kubernetes URL: https://k8s.io/kubernetes
@ -42,6 +42,7 @@ Patch6014: 0015-Add-ephemeralcontainer-to-imagepolicy-securityaccoun.patch
Patch6015: 0016-Add-envFrom-to-serviceaccount-admission-plugin.patch Patch6015: 0016-Add-envFrom-to-serviceaccount-admission-plugin.patch
Patch6016: 0017-backport-Fix-kubelet-panic-when-allocate-resource-for-pod.patch Patch6016: 0017-backport-Fix-kubelet-panic-when-allocate-resource-for-pod.patch
Patch6017: 0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch Patch6017: 0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch
Patch6018: 0019-backport-Don-t-prematurely-close-reflectors-in-case-of-slow-i.patch
%description %description
Container cluster management. Container cluster management.
@ -273,6 +274,9 @@ getent passwd kube >/dev/null || useradd -r -g kube -d / -s /sbin/nologin \
%systemd_postun kubelet kube-proxy %systemd_postun kubelet kube-proxy
%changelog %changelog
* Fri Aug 23 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-24
- DESC:Don't prematurely close reflectors in case of slow initialization in watch based manager
* Fri Aug 23 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-23 * Fri Aug 23 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-23
- DESC:reduce configmap and secret watch of kubelet - DESC:reduce configmap and secret watch of kubelet