From 3af720ddbcb25479523cf502818d338e2d681737 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Fri, 9 Nov 2018 11:01:13 -0700 Subject: [PATCH] Allow indexers to be added after an informer start Both SharedIndexInformer and threadSafeMap were changed to allow AddIndexers to be called after a start or items are in the cache. While a new Indexer is being added handling deltas is blocked in the informer. When a new Indexer is added to a cache with existing items all indices are recalculated. One point to note is that adding a new indexer on a started informer will case all indexes to be rebuilt, but it will not trigger an updateNotification. This is done because it is impractical to assume any existing ResourceEventHandler would have knowledge of a yet to be added index. Any ResourceEventHandler that would need to consume this new index should be added after the new Indexer is added. --- .../client-go/tools/cache/shared_informer.go | 3 +- .../tools/cache/thread_safe_store.go | 14 ++- .../tools/cache/thread_safe_store_test.go | 115 ++++++++++++++++++ 3 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 staging/src/k8s.io/client-go/tools/cache/thread_safe_store_test.go diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index e91fc9e955..859099d077 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -260,7 +260,8 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { defer s.startedLock.Unlock() if s.started { - return fmt.Errorf("informer has already started") + s.blockDeltas.Lock() + defer s.blockDeltas.Unlock() } return s.indexer.AddIndexers(indexers) diff --git a/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go b/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go index 1c201efb62..a9512b4825 100644 --- a/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go +++ b/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go @@ -125,6 +125,11 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st c.items = items // rebuild any index + c.rebuildIndices() +} + +// rebuildIndices rebuilds all indices for the current set c.items. Assumes that c.lock is held by caller +func (c *threadSafeMap) rebuildIndices() { c.indices = Indices{} for key, item := range c.items { c.updateIndices(nil, item, key) @@ -222,10 +227,6 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { c.lock.Lock() defer c.lock.Unlock() - if len(c.items) > 0 { - return fmt.Errorf("cannot add indexers to running index") - } - oldKeys := sets.StringKeySet(c.indexers) newKeys := sets.StringKeySet(newIndexers) @@ -236,6 +237,11 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { for k, v := range newIndexers { c.indexers[k] = v } + + if len(c.items) > 0 { + c.rebuildIndices() + } + return nil } diff --git a/staging/src/k8s.io/client-go/tools/cache/thread_safe_store_test.go b/staging/src/k8s.io/client-go/tools/cache/thread_safe_store_test.go new file mode 100644 index 0000000000..35f7dd6f78 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/thread_safe_store_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/sets" + "testing" +) + +func TestAddIndexerAfterAdd(t *testing.T) { + store := NewThreadSafeStore(Indexers{}, Indices{}) + + // Add first indexer + err := store.AddIndexers(Indexers{ + "first": func(obj interface{}) ([]string, error) { + value := obj.(string) + return []string{ + value, + }, nil + }, + }) + if err != nil { + t.Errorf("failed to add first indexer") + } + + // Add some data to index + store.Add("keya", "value") + store.Add("keyb", "value") + + // Assert + indexKeys, _ := store.IndexKeys("first", "value") + expected := sets.NewString("keya", "keyb") + actual := sets.NewString(indexKeys...) + if !actual.Equal(expected) { + t.Errorf("expected %v does not match actual %v", expected, actual) + } + + // Add same indexer, which should fail + err = store.AddIndexers(Indexers{ + "first": func(interface{}) ([]string, error) { + return nil, nil + }, + }) + if err == nil { + t.Errorf("Add same index should have failed") + } + + // Add new indexer + err = store.AddIndexers(Indexers{ + "second": func(obj interface{}) ([]string, error) { + v := obj.(string) + return []string{ + v +"2", + }, nil + }, + }) + if err != nil { + t.Errorf("failed to add second indexer") + } + + // Assert indexers was added + if _, ok := store.GetIndexers()["first"]; !ok { + t.Errorf("missing indexer first") + } + if _, ok := store.GetIndexers()["second"]; !ok { + t.Errorf("missing indexer second") + } + + // Assert existing data is re-indexed + indexKeys, _ = store.IndexKeys("first", "value") + expected = sets.NewString("keya", "keyb") + actual = sets.NewString(indexKeys...) + if !actual.Equal(expected) { + t.Errorf("expected %v does not match actual %v", expected, actual) + } + indexKeys, _ = store.IndexKeys("second", "value2") + expected = sets.NewString("keya", "keyb") + actual = sets.NewString(indexKeys...) + if !actual.Equal(expected) { + t.Errorf("expected %v does not match actual %v", expected, actual) + } + + // Add more data + store.Add("keyc", "value") + store.Add("keyd", "value") + + // Assert new data is indexed + indexKeys, _ = store.IndexKeys("first", "value") + expected = sets.NewString("keya", "keyb", "keyc", "keyd") + actual = sets.NewString(indexKeys...) + if !actual.Equal(expected) { + t.Errorf("expected %v does not match actual %v", expected, actual) + } + indexKeys, _ = store.IndexKeys("second", "value2") + expected = sets.NewString("keya", "keyb", "keyc", "keyd") + actual = sets.NewString(indexKeys...) + if !actual.Equal(expected) { + t.Errorf("expected %v does not match actual %v", expected, actual) + } +} +