Create informer per resources to avoid huge memory consumption

Create informer per resources to avoid huge memory consumption

Fixes #7323

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
pull/7326/head
Wenkai Yin(尹文开) 2024-01-17 16:54:44 +08:00
parent 6a4c661735
commit 956248e8a6
3 changed files with 35 additions and 64 deletions

View File

@ -35,8 +35,8 @@ type DynamicFactory interface {
// ClientForGroupVersionResource returns a Dynamic client for the given group/version
// and resource for the given namespace.
ClientForGroupVersionResource(gv schema.GroupVersion, resource metav1.APIResource, namespace string) (Dynamic, error)
// DynamicSharedInformerFactoryForNamespace returns a DynamicSharedInformerFactory for the given namespace.
DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory
// DynamicSharedInformerFactory returns a DynamicSharedInformerFactory.
DynamicSharedInformerFactory() dynamicinformer.DynamicSharedInformerFactory
}
// dynamicFactory implements DynamicFactory.
@ -55,8 +55,8 @@ func (f *dynamicFactory) ClientForGroupVersionResource(gv schema.GroupVersion, r
}, nil
}
func (f *dynamicFactory) DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory {
return dynamicinformer.NewFilteredDynamicSharedInformerFactory(f.dynamicClient, time.Minute, namespace, nil)
func (f *dynamicFactory) DynamicSharedInformerFactory() dynamicinformer.DynamicSharedInformerFactory {
return dynamicinformer.NewDynamicSharedInformerFactory(f.dynamicClient, time.Minute)
}
// Creator creates an object.

View File

@ -45,7 +45,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
crclient "sigs.k8s.io/controller-runtime/pkg/client"
@ -309,8 +308,6 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
resourceTerminatingTimeout: kr.resourceTerminatingTimeout,
resourceTimeout: kr.resourceTimeout,
resourceClients: make(map[resourceClientKey]client.Dynamic),
dynamicInformerFactories: make(map[string]*informerFactoryWithContext),
resourceInformers: make(map[resourceClientKey]informers.GenericInformer),
restoredItems: req.RestoredItems,
renamedPVs: make(map[string]string),
pvRenamer: kr.pvRenamer,
@ -362,8 +359,7 @@ type restoreContext struct {
resourceTerminatingTimeout time.Duration
resourceTimeout time.Duration
resourceClients map[resourceClientKey]client.Dynamic
dynamicInformerFactories map[string]*informerFactoryWithContext
resourceInformers map[resourceClientKey]informers.GenericInformer
dynamicInformerFactory *informerFactoryWithContext
restoredItems map[itemKey]restoredItemStatus
renamedPVs map[string]string
pvRenamer func(string) (string, error)
@ -447,11 +443,16 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
// Need to stop all informers if enabled
if !ctx.disableInformerCache {
context, cancel := signal.NotifyContext(go_context.Background(), os.Interrupt)
ctx.dynamicInformerFactory = &informerFactoryWithContext{
factory: ctx.dynamicFactory.DynamicSharedInformerFactory(),
context: context,
cancel: cancel,
}
defer func() {
// Call the cancel func to close the channel for each started informer
for _, factory := range ctx.dynamicInformerFactories {
factory.cancel()
}
ctx.dynamicInformerFactory.cancel()
// After upgrading to client-go 0.27 or newer, also call Shutdown for each informer factory
}()
}
@ -579,28 +580,24 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) {
// initialize informer caches for selected resources if enabled
if !ctx.disableInformerCache {
// CRD informer will have already been initialized if any CRDs were created,
// but already-initialized informers aren't re-initialized because getGenericInformer
// looks for an existing one first.
factoriesToStart := make(map[string]*informerFactoryWithContext)
for _, informerResource := range selectedResourceCollection {
gr := schema.ParseGroupResource(informerResource.resource)
if informerResource.totalItems == 0 {
continue
}
version := ""
for _, items := range informerResource.selectedItemsByNamespace {
// don't use ns key since it represents original ns, not mapped ns
if len(items) == 0 {
continue
}
// use the first item in the list to initialize the informer. The rest of the list
// should share the same gvr and namespace
_, factory := ctx.getGenericInformerInternal(gr, items[0].version, items[0].targetNamespace)
if factory != nil {
factoriesToStart[items[0].targetNamespace] = factory
}
version = items[0].version
break
}
gvr := schema.ParseGroupResource(informerResource.resource).WithVersion(version)
ctx.dynamicInformerFactory.factory.ForResource(gvr)
}
for _, factoryWithContext := range factoriesToStart {
factoryWithContext.factory.WaitForCacheSync(factoryWithContext.context.Done())
}
ctx.dynamicInformerFactory.factory.Start(ctx.dynamicInformerFactory.context.Done())
ctx.log.Info("waiting informer cache sync ...")
ctx.dynamicInformerFactory.factory.WaitForCacheSync(ctx.dynamicInformerFactory.context.Done())
}
// reset processedItems and totalItems before processing full resource list
@ -1061,42 +1058,15 @@ func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource,
return client, nil
}
// if new informer is created, non-nil factory is returned
func (ctx *restoreContext) getGenericInformerInternal(groupResource schema.GroupResource, version, namespace string) (informers.GenericInformer, *informerFactoryWithContext) {
var returnFactory *informerFactoryWithContext
key := getResourceClientKey(groupResource, version, namespace)
factoryWithContext, ok := ctx.dynamicInformerFactories[key.namespace]
if !ok {
factory := ctx.dynamicFactory.DynamicSharedInformerFactoryForNamespace(namespace)
informerContext, informerCancel := signal.NotifyContext(go_context.Background(), os.Interrupt)
factoryWithContext = &informerFactoryWithContext{
factory: factory,
context: informerContext,
cancel: informerCancel,
}
ctx.dynamicInformerFactories[key.namespace] = factoryWithContext
}
informer, ok := ctx.resourceInformers[key]
if !ok {
ctx.log.Infof("[debug] Creating factory for %s in namespace %s", key.resource, key.namespace)
informer = factoryWithContext.factory.ForResource(key.resource)
factoryWithContext.factory.Start(factoryWithContext.context.Done())
ctx.resourceInformers[key] = informer
returnFactory = factoryWithContext
}
return informer, returnFactory
}
func (ctx *restoreContext) getGenericInformer(groupResource schema.GroupResource, version, namespace string) informers.GenericInformer {
informer, factoryWithContext := ctx.getGenericInformerInternal(groupResource, version, namespace)
if factoryWithContext != nil {
factoryWithContext.factory.WaitForCacheSync(factoryWithContext.context.Done())
}
return informer
}
func (ctx *restoreContext) getResourceLister(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) cache.GenericNamespaceLister {
informer := ctx.getGenericInformer(groupResource, obj.GroupVersionKind().Version, namespace)
informer := ctx.dynamicInformerFactory.factory.ForResource(groupResource.WithVersion(obj.GroupVersionKind().Version))
// if the restore contains CRDs or the RIA returns new resources, need to make sure the corresponding informers are synced
if !informer.Informer().HasSynced() {
ctx.dynamicInformerFactory.factory.Start(ctx.dynamicInformerFactory.context.Done())
ctx.log.Infof("waiting informer cache sync for %s, %s/%s ...", groupResource, namespace, obj.GetName())
ctx.dynamicInformerFactory.factory.WaitForCacheSync(ctx.dynamicInformerFactory.context.Done())
}
if namespace == "" {
return informer.Lister()
} else {
@ -1123,6 +1093,7 @@ func (ctx *restoreContext) getResource(groupResource schema.GroupResource, obj *
ctx.log.WithError(errors.WithStack(fmt.Errorf("expected *unstructured.Unstructured but got %T", u))).Error("unable to understand entry returned from client")
return nil, fmt.Errorf("expected *unstructured.Unstructured but got %T", u)
}
ctx.log.Debugf("get %s, %s/%s from informer cache", groupResource, namespace, name)
return u, nil
}

View File

@ -38,8 +38,8 @@ func (df *FakeDynamicFactory) ClientForGroupVersionResource(gv schema.GroupVersi
return args.Get(0).(client.Dynamic), args.Error(1)
}
func (df *FakeDynamicFactory) DynamicSharedInformerFactoryForNamespace(namespace string) dynamicinformer.DynamicSharedInformerFactory {
args := df.Called(namespace)
func (df *FakeDynamicFactory) DynamicSharedInformerFactory() dynamicinformer.DynamicSharedInformerFactory {
args := df.Called()
return args.Get(0).(dynamicinformer.DynamicSharedInformerFactory)
}