From 3ffa6d530252fb75734021828c98949b47d9ff30 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 10 Mar 2023 10:19:53 +0800 Subject: [PATCH] Mark cache deprecated instead of removing it (#22674) Signed-off-by: Congqi Xia --- internal/proxy/meta_cache.go | 45 +++++++++++++------------------ internal/proxy/meta_cache_test.go | 2 +- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index b23a818c28..914c07639b 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -99,9 +99,18 @@ func (info *collectionInfo) isCollectionCached() bool { return info != nil && info.collID != UniqueID(0) && info.schema != nil } +func (info *collectionInfo) deprecateLeaderCache() { + info.leaderMutex.RLock() + defer info.leaderMutex.RUnlock() + if info.shardLeaders != nil { + info.shardLeaders.deprecated.Store(true) + } +} + // shardLeaders wraps shard leader mapping for iteration. type shardLeaders struct { - idx *atomic.Int64 + idx *atomic.Int64 + deprecated *atomic.Bool shardLeaders map[string][]nodeInfo } @@ -648,7 +657,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionNam shardLeaders = info.shardLeaders info.leaderMutex.RUnlock() - if shardLeaders != nil { + if shardLeaders != nil && !shardLeaders.deprecated.Load() { iterator := shardLeaders.GetReader() return iterator.Shuffle(), nil } @@ -700,6 +709,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionNam oldShards := info.shardLeaders info.shardLeaders = &shardLeaders{ shardLeaders: shards, + deprecated: atomic.NewBool(false), idx: atomic.NewInt64(0), } iterator := info.shardLeaders.GetReader() @@ -736,20 +746,13 @@ func parseShardLeaderList2QueryNode(shardsLeaders []*querypb.ShardLeadersList) m // ClearShards clear the shard leader cache of a collection func (m *MetaCache) ClearShards(collectionName string) { log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName)) - m.mu.Lock() + m.mu.RLock() info, ok := m.collInfo[collectionName] - m.mu.Unlock() - var shardLeaders *shardLeaders + m.mu.RUnlock() if ok { - info.leaderMutex.Lock() - m.collInfo[collectionName].shardLeaders = nil - shardLeaders = info.shardLeaders - info.leaderMutex.Unlock() - } - // delete refcnt in shardClientMgr - if ok && shardLeaders != nil { - _ = m.shardMgr.UpdateShardLeaders(shardLeaders.shardLeaders, nil) + info.deprecateLeaderCache() } + } func (m *MetaCache) expireShardLeaderCache(ctx context.Context) { @@ -763,23 +766,13 @@ func (m *MetaCache) expireShardLeaderCache(ctx context.Context) { log.Info("stop periodically update meta cache") return case <-ticker.C: - m.mu.Lock() + m.mu.RLock() log.Info("expire all shard leader cache", zap.Strings("collections", lo.Keys(m.collInfo))) for _, info := range m.collInfo { - info.leaderMutex.Lock() - shardLeaders := info.shardLeaders - info.shardLeaders = nil - info.leaderMutex.Unlock() - if shardLeaders != nil { - err := m.shardMgr.UpdateShardLeaders(shardLeaders.shardLeaders, nil) - if err != nil { - // unreachable logic path - log.Warn("failed to update shard leaders reference", zap.Error(err)) - } - } + info.deprecateLeaderCache() } - m.mu.Unlock() + m.mu.RUnlock() } } }() diff --git a/internal/proxy/meta_cache_test.go b/internal/proxy/meta_cache_test.go index e837398c25..72f68d23db 100644 --- a/internal/proxy/meta_cache_test.go +++ b/internal/proxy/meta_cache_test.go @@ -853,7 +853,7 @@ func TestMetaCache_ExpireShardLeaderCache(t *testing.T) { assert.Eventually(t, func() bool { nodeInfos, err := globalMetaCache.GetShards(ctx, true, "collection1") assert.NoError(t, err) - return assert.Len(t, nodeInfos["channel-1"], 3) + return len(nodeInfos["channel-1"]) == 3 }, 3*time.Second, 1*time.Second) queryCoord.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(&querypb.GetShardLeadersResponse{