!184 fix CVE-2024-10220

From: @liuxu180400617 
Reviewed-by: @xuxuepeng 
Signed-off-by: @xuxuepeng
This commit is contained in:
openeuler-ci-bot 2024-12-06 02:08:21 +00:00 committed by Gitee
commit 7354a5ff44
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
6 changed files with 981 additions and 1 deletions

View File

@ -0,0 +1,116 @@
From 44140f192be2eea3a71b3b6372ef45e8535dd802 Mon Sep 17 00:00:00 2001
From: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Date: Thu, 22 Aug 2024 16:39:45 +0800
Subject: [PATCH] Fix kubelet panic when allocate resource for pod.
Reference: https://github.com/kubernetes/kubernetes/pull/119561/commits/d6b8a660b081916f3fae3319581ec2c49a2f5a05
Signed-off-by: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Signed-off-by: payall4u <payall4u@qq.com>
Signed-off-by: yuwang <yuwang@kuaishou.com>
---
pkg/kubelet/cm/devicemanager/manager.go | 12 ++--
pkg/kubelet/cm/devicemanager/manager_test.go | 60 ++++++++++++++++++++
2 files changed, 67 insertions(+), 5 deletions(-)
diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go
index 95cf058f..1370675b 100644
--- a/pkg/kubelet/cm/devicemanager/manager.go
+++ b/pkg/kubelet/cm/devicemanager/manager.go
@@ -667,6 +667,13 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
// Create a closure to help with device allocation
// Returns 'true' once no more devices need to be allocated.
allocateRemainingFrom := func(devices sets.String) bool {
+ // When we call callGetPreferredAllocationIfAvailable below, we will release
+ // the lock and call the device plugin. If someone calls ListResource concurrently,
+ // device manager will recalculate the allocatedDevices map. Some entries with
+ // empty sets may be removed, so we reinit here.
+ if m.allocatedDevices[resource] == nil {
+ m.allocatedDevices[resource] = sets.NewString()
+ }
for device := range devices.Difference(allocated) {
m.allocatedDevices[resource].Insert(device)
allocated.Insert(device)
@@ -683,11 +690,6 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
return allocated, nil
}
- // Needs to allocate additional devices.
- if m.allocatedDevices[resource] == nil {
- m.allocatedDevices[resource] = sets.NewString()
- }
-
// Gets Devices in use.
devicesInUse := m.allocatedDevices[resource]
// Gets Available devices.
diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go
index 9034498c..354dee50 100644
--- a/pkg/kubelet/cm/devicemanager/manager_test.go
+++ b/pkg/kubelet/cm/devicemanager/manager_test.go
@@ -1080,3 +1080,63 @@ func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]p
}
return res
}
+
+func TestDevicesToAllocateConflictWithUpdateAllocatedDevices(t *testing.T) {
+ podToAllocate := "podToAllocate"
+ containerToAllocate := "containerToAllocate"
+ podToRemove := "podToRemove"
+ containerToRemove := "containerToRemove"
+ deviceID := "deviceID"
+ resourceName := "domain1.com/resource"
+
+ socket := filepath.Join(os.TempDir(), esocketName())
+ devs := []*pluginapi.Device{
+ {ID: deviceID, Health: pluginapi.Healthy},
+ }
+ p, e := esetup(t, devs, socket, resourceName, func(n string, d []pluginapi.Device) {})
+
+ waitUpdateAllocatedDevicesChan := make(chan struct{})
+ waitSetGetPreferredAllocChan := make(chan struct{})
+
+ p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
+ waitSetGetPreferredAllocChan <- struct{}{}
+ <-waitUpdateAllocatedDevicesChan
+ return &pluginapi.PreferredAllocationResponse{
+ ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
+ {
+ DeviceIDs: []string{deviceID},
+ },
+ },
+ }, nil
+ })
+
+ testManager := &ManagerImpl{
+ endpoints: make(map[string]endpointInfo),
+ healthyDevices: make(map[string]sets.Set[string]),
+ unhealthyDevices: make(map[string]sets.Set[string]),
+ allocatedDevices: make(map[string]sets.Set[string]),
+ podDevices: newPodDevices(),
+ activePods: func() []*v1.Pod { return []*v1.Pod{} },
+ sourcesReady: &sourcesReadyStub{},
+ topologyAffinityStore: topologymanager.NewFakeManager(),
+ }
+
+ testManager.endpoints[resourceName] = endpointInfo{
+ e: e,
+ opts: &pluginapi.DevicePluginOptions{
+ GetPreferredAllocationAvailable: true,
+ },
+ }
+ testManager.healthyDevices[resourceName] = sets.NewString(deviceID)
+ testManager.podDevices.insert(podToRemove, containerToRemove, resourceName, nil, nil)
+
+ go func() {
+ <-waitSetGetPreferredAllocChan
+ testManager.UpdateAllocatedDevices()
+ waitUpdateAllocatedDevicesChan <- struct{}{}
+ }()
+
+ set, err := testManager.devicesToAllocate(podToAllocate, containerToAllocate, resourceName, 1, sets.NewString())
+ assert.NoError(t, err)
+ assert.Equal(t, set, sets.NewString(deviceID))
+}
--
2.33.0

View File

@ -0,0 +1,524 @@
From 8e4558ba2469a92a215f43c027a4e8e082b38fe1 Mon Sep 17 00:00:00 2001
From: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Date: Fri, 23 Aug 2024 14:52:03 +0800
Subject: [PATCH] reduce configmap and secret watch of kubelet
Reference: https://github.com/kubernetes/kubernetes/pull/99393/commits/57a3b0abd678c66a9a04e553b6d6ae49671a4779
Signed-off-by: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Signed-off-by: chenyw1990 <chenyawei1@huawei.com>
Signed-off-by: yuwang <yuwang@kuaishou.com>
---
pkg/kubelet/configmap/configmap_manager.go | 4 +-
pkg/kubelet/kubelet.go | 4 +-
pkg/kubelet/secret/secret_manager.go | 4 +-
.../util/manager/watch_based_manager.go | 166 +++++++++++++++---
.../util/manager/watch_based_manager_test.go | 101 +++++++++--
.../integration/kubelet/watch_manager_test.go | 4 +-
6 files changed, 245 insertions(+), 38 deletions(-)
diff --git a/pkg/kubelet/configmap/configmap_manager.go b/pkg/kubelet/configmap/configmap_manager.go
index 2f525357..5466dd8b 100644
--- a/pkg/kubelet/configmap/configmap_manager.go
+++ b/pkg/kubelet/configmap/configmap_manager.go
@@ -135,7 +135,7 @@ func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.G
// - whenever a pod is created or updated, we start individual watches for all
// referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches
-func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
+func NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
listConfigMap := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().ConfigMaps(namespace).List(context.TODO(), opts)
}
@@ -153,6 +153,6 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface) Manager {
}
gr := corev1.Resource("configmap")
return &configMapManager{
- manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, getConfigMapNames),
+ manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, resyncInterval, getConfigMapNames),
}
}
diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go
index 9cb92eda..c6f9e6e5 100644
--- a/pkg/kubelet/kubelet.go
+++ b/pkg/kubelet/kubelet.go
@@ -526,8 +526,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
- secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
- configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
+ secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval)
+ configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go
index 3aa132a5..af17711e 100644
--- a/pkg/kubelet/secret/secret_manager.go
+++ b/pkg/kubelet/secret/secret_manager.go
@@ -136,7 +136,7 @@ func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetO
// - whenever a pod is created or updated, we start individual watches for all
// referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches
-func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
+func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts)
}
@@ -154,6 +154,6 @@ func NewWatchingSecretManager(kubeClient clientset.Interface) Manager {
}
gr := corev1.Resource("secret")
return &secretManager{
- manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, getSecretNames),
+ manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames),
}
}
diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go
index 4dfe1037..cee515ca 100644
--- a/pkg/kubelet/util/manager/watch_based_manager.go
+++ b/pkg/kubelet/util/manager/watch_based_manager.go
@@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@@ -46,25 +47,108 @@ type isImmutableFunc func(runtime.Object) bool
// objectCacheItem is a single item stored in objectCache.
type objectCacheItem struct {
refCount int
- store cache.Store
+ store *cacheStore
+ reflector *cache.Reflector
+
hasSynced func() (bool, error)
- // lock is protecting from closing stopCh multiple times.
- lock sync.Mutex
- stopCh chan struct{}
+ // waitGroup is used to ensure that there won't be two concurrent calls to reflector.Run
+ waitGroup sync.WaitGroup
+
+ // lock is to ensure the access and modify of lastAccessTime, stopped, and immutable are thread safety,
+ // and protecting from closing stopCh multiple times.
+ lock sync.Mutex
+ lastAccessTime time.Time
+ stopped bool
+ immutable bool
+ stopCh chan struct{}
}
func (i *objectCacheItem) stop() bool {
i.lock.Lock()
defer i.lock.Unlock()
- select {
- case <-i.stopCh:
- // This means that channel is already closed.
+ return i.stopThreadUnsafe()
+}
+
+func (i *objectCacheItem) stopThreadUnsafe() bool {
+ if i.stopped {
return false
- default:
- close(i.stopCh)
- return true
}
+ i.stopped = true
+ close(i.stopCh)
+ if !i.immutable {
+ i.store.unsetInitialized()
+ }
+ return true
+}
+
+func (i *objectCacheItem) setLastAccessTime(time time.Time) {
+ i.lock.Lock()
+ defer i.lock.Unlock()
+ i.lastAccessTime = time
+}
+
+func (i *objectCacheItem) setImmutable() {
+ i.lock.Lock()
+ defer i.lock.Unlock()
+ i.immutable = true
+}
+
+func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
+ i.lock.Lock()
+ defer i.lock.Unlock()
+ if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) {
+ return i.stopThreadUnsafe()
+ }
+ return false
+}
+
+func (i *objectCacheItem) restartReflectorIfNeeded() {
+ i.lock.Lock()
+ defer i.lock.Unlock()
+ if i.immutable || !i.stopped {
+ return
+ }
+ i.stopCh = make(chan struct{})
+ i.stopped = false
+ go i.startReflector()
+}
+
+func (i *objectCacheItem) startReflector() {
+ i.waitGroup.Wait()
+ i.waitGroup.Add(1)
+ defer i.waitGroup.Done()
+ i.reflector.Run(i.stopCh)
+}
+
+// cacheStore is in order to rewrite Replace function to mark initialized flag
+type cacheStore struct {
+ cache.Store
+ lock sync.Mutex
+ initialized bool
+}
+
+func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ err := c.Store.Replace(list, resourceVersion)
+ if err != nil {
+ return err
+ }
+ c.initialized = true
+ return nil
+}
+
+func (c *cacheStore) hasSynced() bool {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ return c.initialized
+}
+
+func (c *cacheStore) unsetInitialized() {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ c.initialized = false
}
// objectCache is a local cache of objects propagated via
@@ -75,35 +159,53 @@ type objectCache struct {
newObject newObjectFunc
isImmutable isImmutableFunc
groupResource schema.GroupResource
+ clock clock.Clock
+ maxIdleTime time.Duration
lock sync.RWMutex
items map[objectKey]*objectCacheItem
}
+const minIdleTime = 1 * time.Minute
+
// NewObjectCache returns a new watch-based instance of Store interface.
func NewObjectCache(
listObject listObjectFunc,
watchObject watchObjectFunc,
newObject newObjectFunc,
isImmutable isImmutableFunc,
- groupResource schema.GroupResource) Store {
- return &objectCache{
+ groupResource schema.GroupResource,
+ clock clock.Clock,
+ maxIdleTime time.Duration) Store {
+
+ if maxIdleTime < minIdleTime {
+ maxIdleTime = minIdleTime
+ }
+
+ store := &objectCache{
listObject: listObject,
watchObject: watchObject,
newObject: newObject,
isImmutable: isImmutable,
groupResource: groupResource,
+ clock: clock,
+ maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem),
}
+
+ // TODO propagate stopCh from the higher level.
+ go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop)
+ return store
}
-func (c *objectCache) newStore() cache.Store {
+func (c *objectCache) newStore() *cacheStore {
// TODO: We may consider created a dedicated store keeping just a single
// item, instead of using a generic store implementation for this purpose.
// However, simple benchmarks show that memory overhead in that case is
// decrease from ~600B to ~300B per object. So we are not optimizing it
// until we will see a good reason for that.
- return cache.NewStore(cache.MetaNamespaceKeyFunc)
+ store := cache.NewStore(cache.MetaNamespaceKeyFunc)
+ return &cacheStore{store, sync.Mutex{}, false}
}
func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
@@ -124,14 +226,15 @@ func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
store,
0,
)
- stopCh := make(chan struct{})
- go reflector.Run(stopCh)
- return &objectCacheItem{
+ item := &objectCacheItem{
refCount: 0,
store: store,
- hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil },
- stopCh: stopCh,
+ reflector: reflector,
+ hasSynced: func() (bool, error) { return store.hasSynced(), nil },
+ stopCh: make(chan struct{}),
}
+ go item.startReflector()
+ return item
}
func (c *objectCache) AddReference(namespace, name string) {
@@ -186,10 +289,11 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
if !exists {
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
}
+ item.restartReflectorIfNeeded()
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
}
-
+ item.setLastAccessTime(c.clock.Now())
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
if err != nil {
return nil, err
@@ -209,6 +313,7 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
// - doing that would require significant refactoring to reflector
// we limit ourselves to just quickly stop the reflector here.
if utilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes) && c.isImmutable(object) {
+ item.setImmutable()
if item.stop() {
klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable", namespace, name)
}
@@ -218,6 +323,17 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
return nil, fmt.Errorf("unexpected object type: %v", obj)
}
+func (c *objectCache) startRecycleIdleWatch() {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ for key, item := range c.items {
+ if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) {
+ klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime)
+ }
+ }
+}
+
// NewWatchBasedManager creates a manager that keeps a cache of all objects
// necessary for registered pods.
// It implements the following logic:
@@ -230,7 +346,15 @@ func NewWatchBasedManager(
newObject newObjectFunc,
isImmutable isImmutableFunc,
groupResource schema.GroupResource,
+ resyncInterval time.Duration,
getReferencedObjects func(*v1.Pod) sets.String) Manager {
- objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource)
+
+ // If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle,
+ // We just want to stop the objectCacheItem referenced by environment variables,
+ // So, maxIdleTime is set to an integer multiple of resyncInterval,
+ // We currently set it to 5 times.
+ maxIdleTime := resyncInterval * 5
+
+ objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime)
return NewCacheBasedManager(objectStore, getReferencedObjects)
}
diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go
index 1f103404..17a8b01a 100644
--- a/pkg/kubelet/util/manager/watch_based_manager_test.go
+++ b/pkg/kubelet/util/manager/watch_based_manager_test.go
@@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@@ -62,13 +63,15 @@ func isSecretImmutable(object runtime.Object) bool {
return false
}
-func newSecretCache(fakeClient clientset.Interface) *objectCache {
+func newSecretCache(fakeClient clientset.Interface, fakeClock clock.Clock, maxIdleTime time.Duration) *objectCache {
return &objectCache{
listObject: listSecret(fakeClient),
watchObject: watchSecret(fakeClient),
newObject: func() runtime.Object { return &v1.Secret{} },
isImmutable: isSecretImmutable,
groupResource: corev1.Resource("secret"),
+ clock: fakeClock,
+ maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem),
}
}
@@ -88,7 +91,8 @@ func TestSecretCache(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
- store := newSecretCache(fakeClient)
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
store.AddReference("ns", "name")
_, err := store.Get("ns", "name")
@@ -157,7 +161,8 @@ func TestSecretCacheMultipleRegistrations(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
- store := newSecretCache(fakeClient)
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
store.AddReference("ns", "name")
// This should trigger List and Watch actions eventually.
@@ -264,7 +269,8 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
- store := newSecretCache(fakeClient)
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
key := objectKey{namespace: "ns", name: "name"}
itemExists := func() (bool, error) {
@@ -280,12 +286,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
item.lock.Lock()
defer item.lock.Unlock()
- select {
- case <-item.stopCh:
- return false
- default:
- return true
- }
+ return !item.stopped
}
// AddReference should start reflector.
@@ -330,3 +331,83 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
})
}
}
+
+func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
+ secret := &v1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "name",
+ Namespace: "ns",
+ ResourceVersion: "200",
+ },
+ }
+
+ fakeClient := &fake.Clientset{}
+ listReactor := func(a core.Action) (bool, runtime.Object, error) {
+ result := &v1.SecretList{
+ ListMeta: metav1.ListMeta{
+ ResourceVersion: "200",
+ },
+ Items: []v1.Secret{*secret},
+ }
+
+ return true, result, nil
+ }
+
+ fakeClient.AddReactor("list", "secrets", listReactor)
+ fakeWatch := watch.NewFake()
+ fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
+
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
+
+ key := objectKey{namespace: "ns", name: "name"}
+ itemExists := func() (bool, error) {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ _, ok := store.items[key]
+ return ok, nil
+ }
+
+ reflectorRunning := func() bool {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ item := store.items[key]
+
+ item.lock.Lock()
+ defer item.lock.Unlock()
+ return !item.stopped
+ }
+
+ // AddReference should start reflector.
+ store.AddReference("ns", "name")
+ if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
+ t.Errorf("item wasn't added to cache")
+ }
+
+ obj, _ := store.Get("ns", "name")
+ assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
+
+ assert.True(t, reflectorRunning())
+
+ fakeClock.Step(90 * time.Second)
+ store.startRecycleIdleWatch()
+
+ // Reflector should already be stopped for maxIdleTime exceeded.
+ assert.False(t, reflectorRunning())
+
+ obj, _ = store.Get("ns", "name")
+ assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
+ // Reflector should reRun after get secret again.
+ assert.True(t, reflectorRunning())
+
+ fakeClock.Step(20 * time.Second)
+ _, _ = store.Get("ns", "name")
+ fakeClock.Step(20 * time.Second)
+ _, _ = store.Get("ns", "name")
+ fakeClock.Step(20 * time.Second)
+ _, _ = store.Get("ns", "name")
+ store.startRecycleIdleWatch()
+
+ // Reflector should be running when the get function is called periodically.
+ assert.True(t, reflectorRunning())
+}
diff --git a/test/integration/kubelet/watch_manager_test.go b/test/integration/kubelet/watch_manager_test.go
index a065bc26..07c934b5 100644
--- a/test/integration/kubelet/watch_manager_test.go
+++ b/test/integration/kubelet/watch_manager_test.go
@@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
@@ -61,7 +62,8 @@ func TestWatchBasedManager(t *testing.T) {
// We want all watches to be up and running to stress test it.
// So don't treat any secret as immutable here.
isImmutable := func(_ runtime.Object) bool { return false }
- store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"})
+ fakeClock := clock.NewFakeClock(time.Now())
+ store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute)
// create 1000 secrets in parallel
t.Log(time.Now(), "creating 1000 secrets")
--
2.33.0

View File

@ -0,0 +1,151 @@
From 033bafcda990c254a53794556ff5aa9d2fceb1f7 Mon Sep 17 00:00:00 2001
From: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Date: Fri, 23 Aug 2024 15:04:42 +0800
Subject: [PATCH] Don't prematurely close reflectors in case of slow
initialization in watch based manager
Reference: https://github.com/kubernetes/kubernetes/pull/104604/commits/515106b7957c4da6fdbbb7e474d19bad0fa3f57f
Signed-off-by: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Signed-off-by: wojtekt <wojtekt@google.com>
Signed-off-by: yuwang <yuwang@kuaishou.com>
---
.../util/manager/watch_based_manager.go | 11 ++-
.../util/manager/watch_based_manager_test.go | 91 +++++++++++++++++++
2 files changed, 100 insertions(+), 2 deletions(-)
diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go
index cee515ca..d4e7597b 100644
--- a/pkg/kubelet/util/manager/watch_based_manager.go
+++ b/pkg/kubelet/util/manager/watch_based_manager.go
@@ -97,7 +97,11 @@ func (i *objectCacheItem) setImmutable() {
func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
i.lock.Lock()
defer i.lock.Unlock()
- if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) {
+ // Ensure that we don't try to stop not yet initialized reflector.
+ // In case of overloaded kube-apiserver, if the list request is
+ // already being processed, all the work would lost and would have
+ // to be retried.
+ if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) {
return i.stopThreadUnsafe()
}
return false
@@ -289,11 +293,14 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
if !exists {
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
}
+ // Record last access time independently if it succeeded or not.
+ // This protects from premature (racy) reflector closure.
+ item.setLastAccessTime(c.clock.Now())
+
item.restartReflectorIfNeeded()
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
}
- item.setLastAccessTime(c.clock.Now())
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
if err != nil {
return nil, err
diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go
index 17a8b01a..0fc9915c 100644
--- a/pkg/kubelet/util/manager/watch_based_manager_test.go
+++ b/pkg/kubelet/util/manager/watch_based_manager_test.go
@@ -411,3 +411,94 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
// Reflector should be running when the get function is called periodically.
assert.True(t, reflectorRunning())
}
+
+func TestReflectorNotStopedOnSlowInitialization(t *testing.T) {
+ secret := &v1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "name",
+ Namespace: "ns",
+ ResourceVersion: "200",
+ },
+ }
+
+ fakeClock := clock.NewFakeClock(time.Now())
+
+ fakeClient := &fake.Clientset{}
+ listReactor := func(a core.Action) (bool, runtime.Object, error) {
+ <-fakeClock.After(120 * time.Second)
+
+ result := &v1.SecretList{
+ ListMeta: metav1.ListMeta{
+ ResourceVersion: "200",
+ },
+ Items: []v1.Secret{*secret},
+ }
+
+ return true, result, nil
+ }
+
+ fakeClient.AddReactor("list", "secrets", listReactor)
+ fakeWatch := watch.NewFake()
+ fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
+ store := newSecretCache(fakeClient, fakeClock, time.Minute)
+
+ key := objectKey{namespace: "ns", name: "name"}
+ itemExists := func() (bool, error) {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ _, ok := store.items[key]
+ return ok, nil
+ }
+
+ reflectorRunning := func() bool {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ item := store.items[key]
+
+ item.lock.Lock()
+ defer item.lock.Unlock()
+ return !item.stopped
+ }
+
+ reflectorInitialized := func() (bool, error) {
+ store.lock.Lock()
+ defer store.lock.Unlock()
+ item := store.items[key]
+
+ item.lock.Lock()
+ defer item.lock.Unlock()
+ return item.store.hasSynced(), nil
+ }
+
+ // AddReference should start reflector.
+ store.AddReference("ns", "name")
+ if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
+ t.Errorf("item wasn't added to cache")
+ }
+
+ fakeClock.Step(90 * time.Second)
+ store.startRecycleIdleWatch()
+
+ // Reflector didn't yet initialize, so it shouldn't be stopped.
+ // However, Get should still be failing.
+ assert.True(t, reflectorRunning())
+ initialized, _ := reflectorInitialized()
+ assert.False(t, initialized)
+ _, err := store.Get("ns", "name")
+ if err == nil || !strings.Contains(err.Error(), "failed to sync") {
+ t.Errorf("Expected failed to sync error, got: %v", err)
+ }
+
+ // Initialization should successfully finish.
+ fakeClock.Step(30 * time.Second)
+ if err := wait.Poll(10*time.Millisecond, time.Second, reflectorInitialized); err != nil {
+ t.Errorf("reflector didn't iniailize correctly")
+ }
+
+ // recycling shouldn't stop the reflector because it was accessed within last minute.
+ store.startRecycleIdleWatch()
+ assert.True(t, reflectorRunning())
+
+ obj, _ := store.Get("ns", "name")
+ assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
+}
--
2.33.0

View File

@ -0,0 +1,104 @@
From 051b66b82d8cc76eca0da38e44ae0e7dd391ba09 Mon Sep 17 00:00:00 2001
From: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Date: Tue, 27 Aug 2024 16:04:40 +0800
Subject: [PATCH] Fix cpu share issues on systems with large amounts of cpu
On systems where the calculated cpu shares results in a value above the
max value in linux, containers getting that value are unable to start.
This occur on systems with 300+ cpu cores, and where containers are
given such a value.
This issue was fixed for the pod and qos control groups in the similar
cm.MilliCPUToShares that also has tests verifying the behavior. Since
this code already has an dependency on kubelet/cm, lets reuse that code
instead.
Reference: https://github.com/kubernetes/kubernetes/pull/106570/commits/de0ece541c7fd885a32f1562342fe85535fc11f5
Signed-off-by: zhaoxiaohu <zhaoxiaohu@kuaishou.com>
Signed-off-by: Odin Ugedal <odin@uged.al>
Signed-off-by: yuwang@kuaishou.com
---
pkg/kubelet/kuberuntime/helpers_linux.go | 17 -----------------
pkg/kubelet/kuberuntime/helpers_unsupported.go | 5 -----
.../kuberuntime/kuberuntime_container_linux.go | 7 ++++---
3 files changed, 4 insertions(+), 25 deletions(-)
diff --git a/pkg/kubelet/kuberuntime/helpers_linux.go b/pkg/kubelet/kuberuntime/helpers_linux.go
index 204bc4e9..4257a014 100644
--- a/pkg/kubelet/kuberuntime/helpers_linux.go
+++ b/pkg/kubelet/kuberuntime/helpers_linux.go
@@ -19,9 +19,6 @@ limitations under the License.
package kuberuntime
const (
- // Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
- minShares = 2
- sharesPerCPU = 1024
milliCPUToCPU = 1000
// 100000 is equivalent to 100ms
@@ -29,20 +26,6 @@ const (
minQuotaPeriod = 1000
)
-// milliCPUToShares converts milliCPU to CPU shares
-func milliCPUToShares(milliCPU int64) int64 {
- if milliCPU == 0 {
- // Return 2 here to really match kernel default for zero milliCPU.
- return minShares
- }
- // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
- shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
- if shares < minShares {
- return minShares
- }
- return shares
-}
-
// milliCPUToQuota converts milliCPU to CFS quota and period values
func milliCPUToQuota(milliCPU int64, period int64) (quota int64) {
// CFS quota is measured in two values:
diff --git a/pkg/kubelet/kuberuntime/helpers_unsupported.go b/pkg/kubelet/kuberuntime/helpers_unsupported.go
index cc1e88a5..8f6da8f4 100644
--- a/pkg/kubelet/kuberuntime/helpers_unsupported.go
+++ b/pkg/kubelet/kuberuntime/helpers_unsupported.go
@@ -17,8 +17,3 @@ limitations under the License.
*/
package kuberuntime
-
-// milliCPUToShares converts milliCPU to CPU shares
-func milliCPUToShares(milliCPU int64) int64 {
- return 0
-}
diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go b/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go
index d7c22c86..25fb68ad 100644
--- a/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go
+++ b/pkg/kubelet/kuberuntime/kuberuntime_container_linux.go
@@ -28,6 +28,7 @@ import (
"k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
kubefeatures "k8s.io/kubernetes/pkg/features"
+ "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/qos"
)
@@ -69,11 +70,11 @@ func (m *kubeGenericRuntimeManager) generateLinuxContainerConfig(container *v1.C
// API server does this for new containers, but we repeat this logic in Kubelet
// for containers running on existing Kubernetes clusters.
if cpuRequest.IsZero() && !cpuLimit.IsZero() {
- cpuShares = milliCPUToShares(cpuLimit.MilliValue())
+ cpuShares = int64(cm.MilliCPUToShares(cpuLimit.MilliValue()))
} else {
- // if cpuRequest.Amount is nil, then milliCPUToShares will return the minimal number
+ // if cpuRequest.Amount is nil, then MilliCPUToShares will return the minimal number
// of CPU shares.
- cpuShares = milliCPUToShares(cpuRequest.MilliValue())
+ cpuShares = int64(cm.MilliCPUToShares(cpuRequest.MilliValue()))
}
lc.Resources.CpuShares = cpuShares
if memoryLimit != 0 {
--
2.33.0

View File

@ -0,0 +1,56 @@
From c7846fd24c16266a3bfd86315171f5b4d5f0c9c9 Mon Sep 17 00:00:00 2001
From: Imre Rad <imrer@google.com>
Date: Thu, 25 Apr 2024 14:21:51 +0000
Subject: [PATCH] gitRepo volume: directory must be max 1 level deep
More details on Hackerone #2266560
---
pkg/volume/git_repo/git_repo.go | 6 ++++++
pkg/volume/git_repo/git_repo_test.go | 14 ++++++++++++++
2 files changed, 20 insertions(+)
diff --git a/pkg/volume/git_repo/git_repo.go b/pkg/volume/git_repo/git_repo.go
index 995018d9007..b3827b92ad0 100644
--- a/pkg/volume/git_repo/git_repo.go
+++ b/pkg/volume/git_repo/git_repo.go
@@ -261,6 +261,12 @@ func validateVolume(src *v1.GitRepoVolumeSource) error {
if err := validateNonFlagArgument(src.Directory, "directory"); err != nil {
return err
}
+ if (src.Revision != "") && (src.Directory != "") {
+ cleanedDir := filepath.Clean(src.Directory)
+ if strings.Contains(cleanedDir, "/") || (strings.Contains(cleanedDir, "\\")) {
+ return fmt.Errorf("%q is not a valid directory, it must not contain a directory separator", src.Directory)
+ }
+ }
return nil
}
diff --git a/pkg/volume/git_repo/git_repo_test.go b/pkg/volume/git_repo/git_repo_test.go
index 5b1461be892..650f765cc48 100644
--- a/pkg/volume/git_repo/git_repo_test.go
+++ b/pkg/volume/git_repo/git_repo_test.go
@@ -267,6 +267,20 @@ func TestPlugin(t *testing.T) {
},
isExpectedFailure: true,
},
+ {
+ name: "invalid-revision-directory-combo",
+ vol: &v1.Volume{
+ Name: "vol1",
+ VolumeSource: v1.VolumeSource{
+ GitRepo: &v1.GitRepoVolumeSource{
+ Repository: gitURL,
+ Revision: "main",
+ Directory: "foo/bar",
+ },
+ },
+ },
+ isExpectedFailure: true,
+ },
}
for _, scenario := range scenarios {
--
2.34.1

View File

@ -3,7 +3,7 @@
Name: kubernetes
Version: 1.20.2
Release: 21
Release: 26
Summary: Container cluster management
License: ASL 2.0
URL: https://k8s.io/kubernetes
@ -40,6 +40,11 @@ Patch6012: 0013-Validate-etcd-paths.patch
Patch6013: 0014-fix-node-address-validation.patch
Patch6014: 0015-Add-ephemeralcontainer-to-imagepolicy-securityaccoun.patch
Patch6015: 0016-Add-envFrom-to-serviceaccount-admission-plugin.patch
Patch6016: 0017-backport-Fix-kubelet-panic-when-allocate-resource-for-pod.patch
Patch6017: 0018-backport-reduce-configmap-and-secret-watch-of-kubelet.patch
Patch6018: 0019-backport-Don-t-prematurely-close-reflectors-in-case-of-slow-i.patch
Patch6019: 0020-backport-Fix-cpu-share-issues-on-systems-with-large-amounts-o.patch
Patch6020: 0021-gitRepo-volume-directory-must-be-max-1-level-deep.patch
%description
Container cluster management.
@ -271,6 +276,30 @@ getent passwd kube >/dev/null || useradd -r -g kube -d / -s /sbin/nologin \
%systemd_postun kubelet kube-proxy
%changelog
* Thu Dec 05 2024 liuxu <liuxu156@huawei.com> - 1.20.2-26
- Type:bugfix
- CVE:NA
- SUG:NA
- DESC:fix CVE-2024-10220
* Tue Aug 27 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-25
- Type:bugfix
- CVE:NA
- SUG:NA
- DESC:Fix cpu share issues on systems with large amounts of cpu(when cpucore>256)
* Fri Aug 23 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-24
- DESC:Don't prematurely close reflectors in case of slow initialization in watch based manager
* Fri Aug 23 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-23
- DESC:reduce configmap and secret watch of kubelet
* Thu Aug 22 2024 zhaoxiaohu <zhaoxiaohu@kuaishou.com> - 1.20.2-22
- Type:bugfix
- CVE:NA
- SUG:NA
- DESC:fix kubelet panic when allocate resource for pod. #119561
* Mon Apr 29 2024 liuxu <liuxu156@huawei.com> - 1.20.2-21
- Type:bugfix
- CVE:NA