mirror of https://github.com/milvus-io/milvus.git
Mark cache deprecated instead of removing it (#22674)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/22681/head
parent
a36fefb009
commit
3ffa6d5302
|
@ -99,9 +99,18 @@ func (info *collectionInfo) isCollectionCached() bool {
|
|||
return info != nil && info.collID != UniqueID(0) && info.schema != nil
|
||||
}
|
||||
|
||||
func (info *collectionInfo) deprecateLeaderCache() {
|
||||
info.leaderMutex.RLock()
|
||||
defer info.leaderMutex.RUnlock()
|
||||
if info.shardLeaders != nil {
|
||||
info.shardLeaders.deprecated.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
// shardLeaders wraps shard leader mapping for iteration.
|
||||
type shardLeaders struct {
|
||||
idx *atomic.Int64
|
||||
idx *atomic.Int64
|
||||
deprecated *atomic.Bool
|
||||
|
||||
shardLeaders map[string][]nodeInfo
|
||||
}
|
||||
|
@ -648,7 +657,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionNam
|
|||
shardLeaders = info.shardLeaders
|
||||
info.leaderMutex.RUnlock()
|
||||
|
||||
if shardLeaders != nil {
|
||||
if shardLeaders != nil && !shardLeaders.deprecated.Load() {
|
||||
iterator := shardLeaders.GetReader()
|
||||
return iterator.Shuffle(), nil
|
||||
}
|
||||
|
@ -700,6 +709,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionNam
|
|||
oldShards := info.shardLeaders
|
||||
info.shardLeaders = &shardLeaders{
|
||||
shardLeaders: shards,
|
||||
deprecated: atomic.NewBool(false),
|
||||
idx: atomic.NewInt64(0),
|
||||
}
|
||||
iterator := info.shardLeaders.GetReader()
|
||||
|
@ -736,20 +746,13 @@ func parseShardLeaderList2QueryNode(shardsLeaders []*querypb.ShardLeadersList) m
|
|||
// ClearShards clear the shard leader cache of a collection
|
||||
func (m *MetaCache) ClearShards(collectionName string) {
|
||||
log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName))
|
||||
m.mu.Lock()
|
||||
m.mu.RLock()
|
||||
info, ok := m.collInfo[collectionName]
|
||||
m.mu.Unlock()
|
||||
var shardLeaders *shardLeaders
|
||||
m.mu.RUnlock()
|
||||
if ok {
|
||||
info.leaderMutex.Lock()
|
||||
m.collInfo[collectionName].shardLeaders = nil
|
||||
shardLeaders = info.shardLeaders
|
||||
info.leaderMutex.Unlock()
|
||||
}
|
||||
// delete refcnt in shardClientMgr
|
||||
if ok && shardLeaders != nil {
|
||||
_ = m.shardMgr.UpdateShardLeaders(shardLeaders.shardLeaders, nil)
|
||||
info.deprecateLeaderCache()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (m *MetaCache) expireShardLeaderCache(ctx context.Context) {
|
||||
|
@ -763,23 +766,13 @@ func (m *MetaCache) expireShardLeaderCache(ctx context.Context) {
|
|||
log.Info("stop periodically update meta cache")
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.mu.Lock()
|
||||
m.mu.RLock()
|
||||
log.Info("expire all shard leader cache",
|
||||
zap.Strings("collections", lo.Keys(m.collInfo)))
|
||||
for _, info := range m.collInfo {
|
||||
info.leaderMutex.Lock()
|
||||
shardLeaders := info.shardLeaders
|
||||
info.shardLeaders = nil
|
||||
info.leaderMutex.Unlock()
|
||||
if shardLeaders != nil {
|
||||
err := m.shardMgr.UpdateShardLeaders(shardLeaders.shardLeaders, nil)
|
||||
if err != nil {
|
||||
// unreachable logic path
|
||||
log.Warn("failed to update shard leaders reference", zap.Error(err))
|
||||
}
|
||||
}
|
||||
info.deprecateLeaderCache()
|
||||
}
|
||||
m.mu.Unlock()
|
||||
m.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -853,7 +853,7 @@ func TestMetaCache_ExpireShardLeaderCache(t *testing.T) {
|
|||
assert.Eventually(t, func() bool {
|
||||
nodeInfos, err := globalMetaCache.GetShards(ctx, true, "collection1")
|
||||
assert.NoError(t, err)
|
||||
return assert.Len(t, nodeInfos["channel-1"], 3)
|
||||
return len(nodeInfos["channel-1"]) == 3
|
||||
}, 3*time.Second, 1*time.Second)
|
||||
|
||||
queryCoord.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(&querypb.GetShardLeadersResponse{
|
||||
|
|
Loading…
Reference in New Issue