fix: memory leak in proxy meta cache (#36075)

issue: #36074

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/36113/head
jaime 2024-09-08 17:33:05 +08:00 committed by GitHub
parent 6b4ae0c65e
commit 91d23ecbe1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 26 additions and 320 deletions

View File

@ -59,8 +59,6 @@ type Cache interface {
GetCollectionName(ctx context.Context, database string, collectionID int64) (string, error)
// GetCollectionInfo get collection's information by name or collection id, such as schema, and etc.
GetCollectionInfo(ctx context.Context, database, collectionName string, collectionID int64) (*collectionBasicInfo, error)
// GetCollectionNamesByID get collection name and database name by collection id
GetCollectionNamesByID(ctx context.Context, collectionID []UniqueID) ([]string, []string, error)
// GetPartitionID get partition's identifier of specific collection.
GetPartitionID(ctx context.Context, database, collectionName string, partitionName string) (typeutil.UniqueID, error)
// GetPartitions get all partitions' id of specific collection.
@ -343,19 +341,18 @@ type MetaCache struct {
rootCoord types.RootCoordClient
queryCoord types.QueryCoordClient
dbInfo map[string]*databaseInfo // database -> db_info
collInfo map[string]map[string]*collectionInfo // database -> collectionName -> collection_info
collLeader map[string]map[string]*shardLeaders // database -> collectionName -> collection_leaders
dbCollectionInfo map[string]map[typeutil.UniqueID]string // database -> collectionID -> collectionName
credMap map[string]*internalpb.CredentialInfo // cache for credential, lazy load
privilegeInfos map[string]struct{} // privileges cache
userToRoles map[string]map[string]struct{} // user to role cache
mu sync.RWMutex
credMut sync.RWMutex
leaderMut sync.RWMutex
shardMgr shardClientMgr
sfGlobal conc.Singleflight[*collectionInfo]
sfDB conc.Singleflight[*databaseInfo]
dbInfo map[string]*databaseInfo // database -> db_info
collInfo map[string]map[string]*collectionInfo // database -> collectionName -> collection_info
collLeader map[string]map[string]*shardLeaders // database -> collectionName -> collection_leaders
credMap map[string]*internalpb.CredentialInfo // cache for credential, lazy load
privilegeInfos map[string]struct{} // privileges cache
userToRoles map[string]map[string]struct{} // user to role cache
mu sync.RWMutex
credMut sync.RWMutex
leaderMut sync.RWMutex
shardMgr shardClientMgr
sfGlobal conc.Singleflight[*collectionInfo]
sfDB conc.Singleflight[*databaseInfo]
IDStart int64
IDCount int64
@ -388,16 +385,15 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo
// NewMetaCache creates a MetaCache with provided RootCoord and QueryNode
func NewMetaCache(rootCoord types.RootCoordClient, queryCoord types.QueryCoordClient, shardMgr shardClientMgr) (*MetaCache, error) {
return &MetaCache{
rootCoord: rootCoord,
queryCoord: queryCoord,
dbInfo: map[string]*databaseInfo{},
collInfo: map[string]map[string]*collectionInfo{},
collLeader: map[string]map[string]*shardLeaders{},
dbCollectionInfo: map[string]map[typeutil.UniqueID]string{},
credMap: map[string]*internalpb.CredentialInfo{},
shardMgr: shardMgr,
privilegeInfos: map[string]struct{}{},
userToRoles: map[string]map[string]struct{}{},
rootCoord: rootCoord,
queryCoord: queryCoord,
dbInfo: map[string]*databaseInfo{},
collInfo: map[string]map[string]*collectionInfo{},
collLeader: map[string]map[string]*shardLeaders{},
credMap: map[string]*internalpb.CredentialInfo{},
shardMgr: shardMgr,
privilegeInfos: map[string]struct{}{},
userToRoles: map[string]map[string]struct{}{},
}, nil
}
@ -588,90 +584,6 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, database string, coll
return collInfo.getBasicInfo(), nil
}
func (m *MetaCache) GetCollectionNamesByID(ctx context.Context, collectionIDs []UniqueID) ([]string, []string, error) {
hasUpdate := false
dbNames := make([]string, 0)
collectionNames := make([]string, 0)
for _, collectionID := range collectionIDs {
dbName, collectionName := m.innerGetCollectionByID(collectionID)
if dbName != "" {
dbNames = append(dbNames, dbName)
collectionNames = append(collectionNames, collectionName)
continue
}
if hasUpdate {
return nil, nil, errors.New("collection not found after meta cache has been updated")
}
hasUpdate = true
err := m.updateDBInfo(ctx)
if err != nil {
return nil, nil, err
}
dbName, collectionName = m.innerGetCollectionByID(collectionID)
if dbName == "" {
return nil, nil, errors.New("collection not found")
}
dbNames = append(dbNames, dbName)
collectionNames = append(collectionNames, collectionName)
}
return dbNames, collectionNames, nil
}
func (m *MetaCache) innerGetCollectionByID(collectionID int64) (string, string) {
m.mu.RLock()
defer m.mu.RUnlock()
for database, db := range m.dbCollectionInfo {
name, ok := db[collectionID]
if ok {
return database, name
}
}
return "", ""
}
func (m *MetaCache) updateDBInfo(ctx context.Context) error {
databaseResp, err := m.rootCoord.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases)),
})
if err := merr.CheckRPCCall(databaseResp, err); err != nil {
log.Warn("failed to ListDatabases", zap.Error(err))
return err
}
dbInfo := make(map[string]map[int64]string)
for _, dbName := range databaseResp.DbNames {
resp, err := m.rootCoord.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections),
),
DbName: dbName,
})
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to ShowCollections",
zap.String("dbName", dbName),
zap.Error(err))
return err
}
collections := make(map[int64]string)
for i, collection := range resp.CollectionNames {
collections[resp.CollectionIds[i]] = collection
}
dbInfo[dbName] = collections
}
m.mu.Lock()
defer m.mu.Unlock()
m.dbCollectionInfo = dbInfo
return nil
}
// GetCollectionInfo returns the collection information related to provided collection name
// If the information is not found, proxy will try to fetch information for other source (RootCoord for now)
// TODO: may cause data race of this implementation, should be refactored in future.
@ -1261,9 +1173,13 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) {
func (m *MetaCache) RemoveDatabase(ctx context.Context, database string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.collInfo, database)
delete(m.dbInfo, database)
m.mu.Unlock()
m.leaderMut.Lock()
delete(m.collLeader, database)
m.leaderMut.Unlock()
}
func (m *MetaCache) HasDatabase(ctx context.Context, database string) bool {

View File

@ -943,152 +943,6 @@ func TestMetaCache_AllocID(t *testing.T) {
})
}
func TestGlobalMetaCache_UpdateDBInfo(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
queryCoord := mocks.NewMockQueryCoordClient(t)
shardMgr := newShardClientMgr()
ctx := context.Background()
cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
t.Run("fail to list db", func(t *testing.T) {
rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Code: 500,
},
}, nil).Once()
err := cache.updateDBInfo(ctx)
assert.Error(t, err)
})
t.Run("fail to list collection", func(t *testing.T) {
rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
DbNames: []string{"db1"},
}, nil).Once()
rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Code: 500,
},
}, nil).Once()
err := cache.updateDBInfo(ctx)
assert.Error(t, err)
})
t.Run("success", func(t *testing.T) {
rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
DbNames: []string{"db1"},
}, nil).Once()
rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionNames: []string{"collection1"},
CollectionIds: []int64{1},
}, nil).Once()
err := cache.updateDBInfo(ctx)
assert.NoError(t, err)
assert.Len(t, cache.dbCollectionInfo, 1)
assert.Len(t, cache.dbCollectionInfo["db1"], 1)
assert.Equal(t, "collection1", cache.dbCollectionInfo["db1"][1])
})
}
func TestGlobalMetaCache_GetCollectionNamesByID(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
queryCoord := mocks.NewMockQueryCoordClient(t)
shardMgr := newShardClientMgr()
ctx := context.Background()
t.Run("fail to update db info", func(t *testing.T) {
rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Code: 500,
},
}, nil).Once()
cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
_, _, err = cache.GetCollectionNamesByID(ctx, []int64{1})
assert.Error(t, err)
})
t.Run("not found collection", func(t *testing.T) {
rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
DbNames: []string{"db1"},
}, nil).Once()
rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionNames: []string{"collection1"},
CollectionIds: []int64{1},
}, nil).Once()
cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
_, _, err = cache.GetCollectionNamesByID(ctx, []int64{2})
assert.Error(t, err)
})
t.Run("not found collection 2", func(t *testing.T) {
rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
DbNames: []string{"db1"},
}, nil).Once()
rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionNames: []string{"collection1"},
CollectionIds: []int64{1},
}, nil).Once()
cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
_, _, err = cache.GetCollectionNamesByID(ctx, []int64{1, 2})
assert.Error(t, err)
})
t.Run("success", func(t *testing.T) {
rootCoord.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
DbNames: []string{"db1"},
}, nil).Once()
rootCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionNames: []string{"collection1", "collection2"},
CollectionIds: []int64{1, 2},
}, nil).Once()
cache, err := NewMetaCache(rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
dbNames, collectionNames, err := cache.GetCollectionNamesByID(ctx, []int64{1, 2})
assert.NoError(t, err)
assert.Equal(t, []string{"collection1", "collection2"}, collectionNames)
assert.Equal(t, []string{"db1", "db1"}, dbNames)
})
}
func TestMetaCache_InvalidateShardLeaderCache(t *testing.T) {
paramtable.Init()
paramtable.Get().Save(Params.ProxyCfg.ShardLeaderCacheInterval.Key, "1")

View File

@ -275,70 +275,6 @@ func (_c *MockCache_GetCollectionName_Call) RunAndReturn(run func(context.Contex
return _c
}
// GetCollectionNamesByID provides a mock function with given fields: ctx, collectionID
func (_m *MockCache) GetCollectionNamesByID(ctx context.Context, collectionID []int64) ([]string, []string, error) {
ret := _m.Called(ctx, collectionID)
var r0 []string
var r1 []string
var r2 error
if rf, ok := ret.Get(0).(func(context.Context, []int64) ([]string, []string, error)); ok {
return rf(ctx, collectionID)
}
if rf, ok := ret.Get(0).(func(context.Context, []int64) []string); ok {
r0 = rf(ctx, collectionID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
if rf, ok := ret.Get(1).(func(context.Context, []int64) []string); ok {
r1 = rf(ctx, collectionID)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).([]string)
}
}
if rf, ok := ret.Get(2).(func(context.Context, []int64) error); ok {
r2 = rf(ctx, collectionID)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockCache_GetCollectionNamesByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionNamesByID'
type MockCache_GetCollectionNamesByID_Call struct {
*mock.Call
}
// GetCollectionNamesByID is a helper method to define mock.On call
// - ctx context.Context
// - collectionID []int64
func (_e *MockCache_Expecter) GetCollectionNamesByID(ctx interface{}, collectionID interface{}) *MockCache_GetCollectionNamesByID_Call {
return &MockCache_GetCollectionNamesByID_Call{Call: _e.mock.On("GetCollectionNamesByID", ctx, collectionID)}
}
func (_c *MockCache_GetCollectionNamesByID_Call) Run(run func(ctx context.Context, collectionID []int64)) *MockCache_GetCollectionNamesByID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]int64))
})
return _c
}
func (_c *MockCache_GetCollectionNamesByID_Call) Return(_a0 []string, _a1 []string, _a2 error) *MockCache_GetCollectionNamesByID_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *MockCache_GetCollectionNamesByID_Call) RunAndReturn(run func(context.Context, []int64) ([]string, []string, error)) *MockCache_GetCollectionNamesByID_Call {
_c.Call.Return(run)
return _c
}
// GetCollectionSchema provides a mock function with given fields: ctx, database, collectionName
func (_m *MockCache) GetCollectionSchema(ctx context.Context, database string, collectionName string) (*schemaInfo, error) {
ret := _m.Called(ctx, database, collectionName)