update shard leader cache periodically (#22595)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/22628/head
wei liu 2023-03-08 15:13:51 +08:00 committed by GitHub
parent f1ec78d3c1
commit 1a1a5caafa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 157 additions and 6 deletions

View File

@ -24,7 +24,9 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -66,6 +68,7 @@ type Cache interface {
GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error)
GetShards(ctx context.Context, withCache bool, collectionName string) (map[string][]nodeInfo, error)
ClearShards(collectionName string)
expireShardLeaderCache(ctx context.Context)
RemoveCollection(ctx context.Context, collectionName string)
RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string
RemovePartition(ctx context.Context, collectionName string, partitionName string)
@ -174,6 +177,7 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoord, queryCoord ty
}
globalMetaCache.InitPolicyInfo(resp.PolicyInfos, resp.UserRoles)
log.Info("success to init meta cache", zap.Strings("policy_infos", resp.PolicyInfos))
globalMetaCache.expireShardLeaderCache(ctx)
return nil
}
@ -734,14 +738,51 @@ func (m *MetaCache) ClearShards(collectionName string) {
log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName))
m.mu.Lock()
info, ok := m.collInfo[collectionName]
if ok {
m.collInfo[collectionName].shardLeaders = nil
}
m.mu.Unlock()
// delete refcnt in shardClientMgr
if ok && info.shardLeaders != nil {
_ = m.shardMgr.UpdateShardLeaders(info.shardLeaders.shardLeaders, nil)
var shardLeaders *shardLeaders
if ok {
info.leaderMutex.Lock()
m.collInfo[collectionName].shardLeaders = nil
shardLeaders = info.shardLeaders
info.leaderMutex.Unlock()
}
// delete refcnt in shardClientMgr
if ok && shardLeaders != nil {
_ = m.shardMgr.UpdateShardLeaders(shardLeaders.shardLeaders, nil)
}
}
func (m *MetaCache) expireShardLeaderCache(ctx context.Context) {
go func() {
ticker := time.NewTicker(params.Params.ProxyCfg.ShardLeaderCacheInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("stop periodically update meta cache")
return
case <-ticker.C:
m.mu.Lock()
log.Info("expire all shard leader cache",
zap.Strings("collections", lo.Keys(m.collInfo)))
for _, info := range m.collInfo {
info.leaderMutex.Lock()
shardLeaders := info.shardLeaders
info.shardLeaders = nil
info.leaderMutex.Unlock()
if shardLeaders != nil {
err := m.shardMgr.UpdateShardLeaders(shardLeaders.shardLeaders, nil)
if err != nil {
// unreachable logic path
log.Warn("failed to update shard leaders reference", zap.Error(err))
}
}
}
m.mu.Unlock()
}
}
}()
}
func (m *MetaCache) InitPolicyInfo(info []string, userRoles []string) {

View File

@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -781,3 +782,101 @@ func TestMetaCache_RemoveCollection(t *testing.T) {
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 3)
}
func TestMetaCache_ExpireShardLeaderCache(t *testing.T) {
ctx := context.Background()
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &types.MockQueryCoord{}
shardMgr := newShardClientMgr()
err := InitMetaCache(ctx, rootCoord, queryCoord, shardMgr)
assert.Nil(t, err)
paramtable.Init()
paramtable.Get().Save(Params.ProxyCfg.ShardLeaderCacheInterval.Key, "1")
queryCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionIDs: []UniqueID{1},
InMemoryPercentages: []int64{100},
}, nil)
queryCoord.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(&querypb.GetShardLeadersResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Shards: []*querypb.ShardLeadersList{
{
ChannelName: "channel-1",
NodeIds: []int64{1, 2, 3},
NodeAddrs: []string{"localhost:9000", "localhost:9001", "localhost:9002"},
},
},
}, nil).Times(1)
nodeInfos, err := globalMetaCache.GetShards(ctx, true, "collection1")
assert.NoError(t, err)
assert.Len(t, nodeInfos["channel-1"], 3)
queryCoord.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(&querypb.GetShardLeadersResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Shards: []*querypb.ShardLeadersList{
{
ChannelName: "channel-1",
NodeIds: []int64{1, 2},
NodeAddrs: []string{"localhost:9000", "localhost:9001"},
},
},
}, nil).Times(1)
assert.Eventually(t, func() bool {
nodeInfos, err := globalMetaCache.GetShards(ctx, true, "collection1")
assert.NoError(t, err)
return assert.Len(t, nodeInfos["channel-1"], 2)
}, 3*time.Second, 1*time.Second)
queryCoord.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(&querypb.GetShardLeadersResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Shards: []*querypb.ShardLeadersList{
{
ChannelName: "channel-1",
NodeIds: []int64{1, 2, 3},
NodeAddrs: []string{"localhost:9000", "localhost:9001", "localhost:9002"},
},
},
}, nil).Times(1)
assert.Eventually(t, func() bool {
nodeInfos, err := globalMetaCache.GetShards(ctx, true, "collection1")
assert.NoError(t, err)
return assert.Len(t, nodeInfos["channel-1"], 3)
}, 3*time.Second, 1*time.Second)
queryCoord.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(&querypb.GetShardLeadersResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Shards: []*querypb.ShardLeadersList{
{
ChannelName: "channel-1",
NodeIds: []int64{1, 2, 3},
NodeAddrs: []string{"localhost:9000", "localhost:9001", "localhost:9002"},
},
{
ChannelName: "channel-2",
NodeIds: []int64{1, 2, 3},
NodeAddrs: []string{"localhost:9000", "localhost:9001", "localhost:9002"},
},
},
}, nil).Times(1)
assert.Eventually(t, func() bool {
nodeInfos, err := globalMetaCache.GetShards(ctx, true, "collection1")
assert.NoError(t, err)
return assert.Len(t, nodeInfos["channel-1"], 3) && assert.Len(t, nodeInfos["channel-2"], 3)
}, 3*time.Second, 1*time.Second)
}

View File

@ -845,6 +845,7 @@ type proxyConfig struct {
MaxRoleNum ParamItem `refreshable:"true"`
MaxTaskNum ParamItem `refreshable:"false"`
AccessLog AccessLogConfig
ShardLeaderCacheInterval ParamItem `refreshable:"false"`
}
func (p *proxyConfig) init(base *BaseTable) {
@ -1046,6 +1047,14 @@ please adjust in embedded Milvus: false`,
Doc: "Max time for log file in minIO, in hours",
}
p.AccessLog.RemoteMaxTime.Init(base.mgr)
p.ShardLeaderCacheInterval = ParamItem{
Key: "proxy.shardLeaderCacheInterval",
Version: "2.2.4",
DefaultValue: "30",
Doc: "time interval to update shard leader cache, in seconds",
}
p.ShardLeaderCacheInterval.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -164,6 +164,8 @@ func TestComponentParam(t *testing.T) {
t.Logf("AccessLog.MaxBackups: %d", Params.AccessLog.MaxBackups.GetAsInt64())
t.Logf("AccessLog.MaxDays: %d", Params.AccessLog.RotatedTime.GetAsInt64())
t.Logf("ShardLeaderCacheInterval: %d", Params.ShardLeaderCacheInterval.GetAsInt64())
})
// t.Run("test proxyConfig panic", func(t *testing.T) {