fix: Add version to the proxy cache to resolve concurrency issues (#38067)

issue: #36989

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/37378/merge
cai.zhang 2024-12-04 18:06:39 +08:00 committed by GitHub
parent a65d395ecd
commit 73aa95f596
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 288 additions and 88 deletions

2
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0

8
go.sum
View File

@ -630,12 +630,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241120015424-93892e628c69 h1:Qt0Bv2Fum3EX3OlkuQYHJINBzeU4oEuHy2lXSfB/gZw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241120015424-93892e628c69/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129024423-3911e6ebd8a6 h1:TrGZtojfj84Rdd1XAaGULCWZqO3rJMiGS8vxFXHT7G4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129024423-3911e6ebd8a6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056 h1:o2uJgfwTOg8bu/E9n6TvmFT2XPrPm1v0XFhc6XXcFoE=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241129033252-5d0b09587056/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910 h1:cFRrdFZwhFHv33pue1z8beYSvrXDYFSFsCuvXGX3DHE=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241202112430-822be0295910/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -129,21 +129,21 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), msgType == commonpb.MsgType_DropCollection)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
}
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
log.Info("complete to invalidate collection meta cache with collection name", zap.String("type", request.GetBase().GetMsgType().String()))
case commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection:
// All the request from query use collectionID
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, 0, false)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
@ -154,31 +154,27 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
log.Warn("invalidate collection meta cache failed. partitionName is empty")
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
}
// no need to deprecate shard cache because shard won't change when create or drop partition
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName)
// drop all the alias as well
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), false)
}
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName)
log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String()))
case commonpb.MsgType_DropDatabase:
globalMetaCache.RemoveDatabase(ctx, request.GetDbName())
default:
log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String()))
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID, request.GetBase().GetTimestamp(), false)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
}
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
}
}
}

View File

@ -75,7 +75,7 @@ type Cache interface {
InvalidateShardLeaderCache(collections []int64)
ListShardLocation() map[int64]nodeInfo
RemoveCollection(ctx context.Context, database, collectionName string)
RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string
RemoveCollectionsByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string
// GetCredentialInfo operate credential cache
GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error)
@ -340,6 +340,8 @@ type MetaCache struct {
IDCount int64
IDIndex int64
IDLock sync.RWMutex
collectionCacheVersion map[UniqueID]uint64 // collectionID -> cacheVersion
}
// globalMetaCache is singleton instance of Cache
@ -368,15 +370,16 @@ func InitMetaCache(ctx context.Context, rootCoord types.RootCoordClient, queryCo
// NewMetaCache creates a MetaCache with provided RootCoord and QueryNode
func NewMetaCache(rootCoord types.RootCoordClient, queryCoord types.QueryCoordClient, shardMgr shardClientMgr) (*MetaCache, error) {
return &MetaCache{
rootCoord: rootCoord,
queryCoord: queryCoord,
dbInfo: map[string]*databaseInfo{},
collInfo: map[string]map[string]*collectionInfo{},
collLeader: map[string]map[string]*shardLeaders{},
credMap: map[string]*internalpb.CredentialInfo{},
shardMgr: shardMgr,
privilegeInfos: map[string]struct{}{},
userToRoles: map[string]map[string]struct{}{},
rootCoord: rootCoord,
queryCoord: queryCoord,
dbInfo: map[string]*databaseInfo{},
collInfo: map[string]map[string]*collectionInfo{},
collLeader: map[string]map[string]*shardLeaders{},
credMap: map[string]*internalpb.CredentialInfo{},
shardMgr: shardMgr,
privilegeInfos: map[string]struct{}{},
userToRoles: map[string]map[string]struct{}{},
collectionCacheVersion: make(map[UniqueID]uint64),
}, nil
}
@ -445,19 +448,36 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
if database == "" {
log.Warn("database is empty, use default database name", zap.String("collectionName", collectionName), zap.Stack("stack"))
}
m.mu.Lock()
defer m.mu.Unlock()
_, dbOk := m.collInfo[database]
if !dbOk {
m.collInfo[database] = make(map[string]*collectionInfo)
}
isolation, err := common.IsPartitionKeyIsolationKvEnabled(collection.Properties...)
if err != nil {
return nil, err
}
schemaInfo := newSchemaInfoWithLoadFields(collection.Schema, loadFields)
m.mu.Lock()
defer m.mu.Unlock()
curVersion := m.collectionCacheVersion[collection.GetCollectionID()]
// Compatibility logic: if the rootcoord version is lower(requestTime = 0), update the cache directly.
if collection.GetRequestTime() < curVersion && collection.GetRequestTime() != 0 {
log.Debug("describe collection timestamp less than version, don't update cache",
zap.String("collectionName", collectionName),
zap.Uint64("version", collection.GetRequestTime()), zap.Uint64("cache version", curVersion))
return &collectionInfo{
collID: collection.CollectionID,
schema: schemaInfo,
partInfo: parsePartitionsInfo(infos, schemaInfo.hasPartitionKeyField),
createdTimestamp: collection.CreatedTimestamp,
createdUtcTimestamp: collection.CreatedUtcTimestamp,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
}, nil
}
_, dbOk := m.collInfo[database]
if !dbOk {
m.collInfo[database] = make(map[string]*collectionInfo)
}
m.collInfo[database][collectionName] = &collectionInfo{
collID: collection.CollectionID,
schema: schemaInfo,
@ -470,9 +490,14 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID),
zap.Strings("partition", partitions.PartitionNames),
zap.Strings("partition", partitions.PartitionNames), zap.Uint64("currentVersion", curVersion),
zap.Uint64("version", collection.GetRequestTime()),
)
return m.collInfo[database][collectionName], nil
m.collectionCacheVersion[collection.GetCollectionID()] = collection.GetRequestTime()
collInfo := m.collInfo[database][collectionName]
return collInfo, nil
}
func buildSfKeyByName(database, collectionName string) string {
@ -822,19 +847,30 @@ func (m *MetaCache) RemoveCollection(ctx context.Context, database, collectionNa
log.Debug("remove collection", zap.String("db", database), zap.String("collection", collectionName))
}
func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string {
func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID, version uint64, removeVersion bool) []string {
m.mu.Lock()
defer m.mu.Unlock()
curVersion := m.collectionCacheVersion[collectionID]
var collNames []string
for database, db := range m.collInfo {
for k, v := range db {
if v.collID == collectionID {
delete(m.collInfo[database], k)
collNames = append(collNames, k)
if version == 0 || curVersion <= version {
delete(m.collInfo[database], k)
collNames = append(collNames, k)
}
}
}
}
log.Debug("remove collection by id", zap.Int64("id", collectionID), zap.Strings("collection", collNames))
if removeVersion {
delete(m.collectionCacheVersion, collectionID)
} else if version != 0 {
m.collectionCacheVersion[collectionID] = version
}
log.Debug("remove collection by id", zap.Int64("id", collectionID),
zap.Strings("collection", collNames), zap.Uint64("currentVersion", curVersion),
zap.Uint64("version", version), zap.Bool("removeVersion", removeVersion))
return collNames
}

View File

@ -140,7 +140,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i
AutoID: true,
Name: "collection1",
},
DbName: dbName,
DbName: dbName,
RequestTime: 100,
}, nil
}
if in.CollectionName == "collection2" || in.CollectionID == 2 {
@ -151,7 +152,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i
AutoID: true,
Name: "collection2",
},
DbName: dbName,
DbName: dbName,
RequestTime: 100,
}, nil
}
if in.CollectionName == "errorCollection" {
@ -161,7 +163,8 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i
Schema: &schemapb.CollectionSchema{
AutoID: true,
},
DbName: dbName,
DbName: dbName,
RequestTime: 100,
}, nil
}
@ -791,14 +794,14 @@ func TestMetaCache_RemoveCollection(t *testing.T) {
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 2)
globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1))
globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1), 100, false)
// no collectionInfo of collection2, should access RootCoord
_, err = globalMetaCache.GetCollectionInfo(ctx, dbName, "collection1", 1)
assert.NoError(t, err)
// shouldn't access RootCoord again
assert.Equal(t, rootCoord.GetAccessCount(), 3)
globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1))
globalMetaCache.RemoveCollectionsByID(ctx, UniqueID(1), 100, false)
// no collectionInfo of collection2, should access RootCoord
_, err = globalMetaCache.GetCollectionInfo(ctx, dbName, "collection1", 1)
assert.NoError(t, err)
@ -1259,3 +1262,60 @@ func TestSchemaInfo_GetLoadFieldIDs(t *testing.T) {
})
}
}
func TestMetaCache_Parallel(t *testing.T) {
ctx := context.Background()
rootCoord := mocks.NewMockRootCoordClient(t)
queryCoord := mocks.NewMockQueryCoordClient(t)
queryCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{}, nil).Maybe()
rootCoord.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(&milvuspb.ShowPartitionsResponse{
Status: merr.Success(),
}, nil).Maybe()
mgr := newShardClientMgr()
cache, err := NewMetaCache(rootCoord, queryCoord, mgr)
assert.NoError(t, err)
cacheVersion := uint64(100)
// clean cache
cache.RemoveCollectionsByID(ctx, 111, cacheVersion+2, false)
// update cache, but version is smaller
rootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *milvuspb.DescribeCollectionRequest, option ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
Schema: &schemapb.CollectionSchema{
Name: "collection1",
},
CollectionID: 111,
DbName: dbName,
RequestTime: cacheVersion,
}, nil
}).Once()
collInfo, err := cache.update(ctx, dbName, "collection1", 111)
assert.NoError(t, err)
assert.Equal(t, "collection1", collInfo.schema.Name)
assert.Equal(t, int64(111), collInfo.collID)
_, ok := cache.collInfo[dbName]["collection1"]
assert.False(t, ok)
rootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *milvuspb.DescribeCollectionRequest, option ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) {
cacheVersion++
return &milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
Schema: &schemapb.CollectionSchema{
Name: "collection1",
},
CollectionID: 111,
DbName: dbName,
RequestTime: cacheVersion + 5,
}, nil
}).Once()
collInfo, err = cache.update(ctx, dbName, "collection1", 111)
assert.NoError(t, err)
assert.Equal(t, "collection1", collInfo.schema.Name)
assert.Equal(t, int64(111), collInfo.collID)
_, ok = cache.collInfo[dbName]["collection1"]
assert.True(t, ok)
}

View File

@ -1109,17 +1109,17 @@ func (_c *MockCache_RemoveCollection_Call) RunAndReturn(run func(context.Context
return _c
}
// RemoveCollectionsByID provides a mock function with given fields: ctx, collectionID
func (_m *MockCache) RemoveCollectionsByID(ctx context.Context, collectionID int64) []string {
ret := _m.Called(ctx, collectionID)
// RemoveCollectionsByID provides a mock function with given fields: ctx, collectionID, version, removeVersion
func (_m *MockCache) RemoveCollectionsByID(ctx context.Context, collectionID int64, version uint64, removeVersion bool) []string {
ret := _m.Called(ctx, collectionID, version, removeVersion)
if len(ret) == 0 {
panic("no return value specified for RemoveCollectionsByID")
}
var r0 []string
if rf, ok := ret.Get(0).(func(context.Context, int64) []string); ok {
r0 = rf(ctx, collectionID)
if rf, ok := ret.Get(0).(func(context.Context, int64, uint64, bool) []string); ok {
r0 = rf(ctx, collectionID, version, removeVersion)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
@ -1137,13 +1137,15 @@ type MockCache_RemoveCollectionsByID_Call struct {
// RemoveCollectionsByID is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
func (_e *MockCache_Expecter) RemoveCollectionsByID(ctx interface{}, collectionID interface{}) *MockCache_RemoveCollectionsByID_Call {
return &MockCache_RemoveCollectionsByID_Call{Call: _e.mock.On("RemoveCollectionsByID", ctx, collectionID)}
// - version uint64
// - removeVersion bool
func (_e *MockCache_Expecter) RemoveCollectionsByID(ctx interface{}, collectionID interface{}, version interface{}, removeVersion interface{}) *MockCache_RemoveCollectionsByID_Call {
return &MockCache_RemoveCollectionsByID_Call{Call: _e.mock.On("RemoveCollectionsByID", ctx, collectionID, version, removeVersion)}
}
func (_c *MockCache_RemoveCollectionsByID_Call) Run(run func(ctx context.Context, collectionID int64)) *MockCache_RemoveCollectionsByID_Call {
func (_c *MockCache_RemoveCollectionsByID_Call) Run(run func(ctx context.Context, collectionID int64, version uint64, removeVersion bool)) *MockCache_RemoveCollectionsByID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64))
run(args[0].(context.Context), args[1].(int64), args[2].(uint64), args[3].(bool))
})
return _c
}
@ -1153,7 +1155,7 @@ func (_c *MockCache_RemoveCollectionsByID_Call) Return(_a0 []string) *MockCache_
return _c
}
func (_c *MockCache_RemoveCollectionsByID_Call) RunAndReturn(run func(context.Context, int64) []string) *MockCache_RemoveCollectionsByID_Call {
func (_c *MockCache_RemoveCollectionsByID_Call) RunAndReturn(run func(context.Context, int64, uint64, bool) []string) *MockCache_RemoveCollectionsByID_Call {
_c.Call.Return(run)
return _c
}

View File

@ -37,7 +37,8 @@ func (t *alterAliasTask) Prepare(ctx context.Context) error {
}
func (t *alterAliasTask) Execute(ctx context.Context) error {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_AlterAlias)); err != nil {
collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_AlterAlias)); err != nil {
return err
}
// alter alias is atomic enough.

View File

@ -18,12 +18,15 @@ package rootcoord
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
)
func Test_alterAliasTask_Prepare(t *testing.T) {
@ -42,7 +45,9 @@ func Test_alterAliasTask_Prepare(t *testing.T) {
func Test_alterAliasTask_Execute(t *testing.T) {
t.Run("failed to expire cache", func(t *testing.T) {
core := newTestCore(withInvalidProxyManager())
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta))
task := &alterAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterAliasRequest{
@ -55,7 +60,11 @@ func Test_alterAliasTask_Execute(t *testing.T) {
})
t.Run("failed to alter alias", func(t *testing.T) {
core := newTestCore(withValidProxyManager(), withInvalidMeta())
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
mockMeta.EXPECT().AlterAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("failed to alter alias"))
core := newTestCore(withValidProxyManager(), withMeta(mockMeta))
task := &alterAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterAliasRequest{

View File

@ -89,7 +89,7 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
baseStep: baseStep{core: a.core},
dbName: a.Req.GetDbName(),
collectionNames: append(aliases, a.Req.GetCollectionName()),
collectionID: InvalidCollectionID,
collectionID: oldColl.CollectionID,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)},
})

View File

@ -611,7 +611,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
baseStep: baseStep{core: t.core},
dbName: t.Req.GetDbName(),
collectionNames: []string{t.Req.GetCollectionName()},
collectionID: InvalidCollectionID,
collectionID: collID,
ts: ts,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropCollection)},
}, &nullStep{})

View File

@ -51,6 +51,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) {
return err
}
t.Rsp = convertModelToDesc(coll, aliases, db.Name)
t.Rsp.RequestTime = t.ts
return nil
}

View File

@ -37,8 +37,9 @@ func (t *dropAliasTask) Prepare(ctx context.Context) error {
}
func (t *dropAliasTask) Execute(ctx context.Context) error {
collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetAlias())
// drop alias is atomic enough.
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_DropAlias)); err != nil {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_DropAlias)); err != nil {
return err
}
return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs())

View File

@ -18,6 +18,7 @@ package rootcoord
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
@ -49,8 +50,10 @@ func Test_dropAliasTask_Prepare(t *testing.T) {
func Test_dropAliasTask_Execute(t *testing.T) {
t.Run("failed to expire cache", func(t *testing.T) {
core := newTestCore(withInvalidProxyManager())
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
alias := funcutil.GenRandomStr()
core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta))
task := &dropAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.DropAliasRequest{
@ -63,7 +66,11 @@ func Test_dropAliasTask_Execute(t *testing.T) {
})
t.Run("failed to drop alias", func(t *testing.T) {
core := newTestCore(withValidProxyManager(), withInvalidMeta())
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
mockMeta.EXPECT().DropAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("failed to alter alias"))
core := newTestCore(withValidProxyManager(), withMeta(mockMeta))
alias := funcutil.GenRandomStr()
task := &dropAliasTask{
baseTask: newBaseTask(context.Background(), core),
@ -77,15 +84,12 @@ func Test_dropAliasTask_Execute(t *testing.T) {
})
t.Run("normal case", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("DropAlias",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil)
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
mockMeta.EXPECT().DropAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
core := newTestCore(withValidProxyManager(), withMeta(meta))
core := newTestCore(withValidProxyManager(), withMeta(mockMeta))
alias := funcutil.GenRandomStr()
task := &dropAliasTask{
baseTask: newBaseTask(context.Background(), core),

View File

@ -56,6 +56,10 @@ type IMetaTable interface {
AddCollection(ctx context.Context, coll *model.Collection) error
ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error
RemoveCollection(ctx context.Context, collectionID UniqueID, ts Timestamp) error
// GetCollectionID retrieves the corresponding collectionID based on the collectionName.
// If the collection does not exist, it will return InvalidCollectionID.
// Please use the function with caution.
GetCollectionID(ctx context.Context, dbName string, collectionName string) UniqueID
GetCollectionByName(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error)
GetCollectionByID(ctx context.Context, dbName string, collectionID UniqueID, ts Timestamp, allowUnavailable bool) (*model.Collection, error)
GetCollectionByIDWithMaxTs(ctx context.Context, collectionID UniqueID) (*model.Collection, error)
@ -610,6 +614,36 @@ func (mt *MetaTable) GetCollectionByName(ctx context.Context, dbName string, col
return mt.getCollectionByNameInternal(ctx, dbName, collectionName, ts)
}
// GetCollectionID retrieves the corresponding collectionID based on the collectionName.
// If the collection does not exist, it will return InvalidCollectionID.
// Please use the function with caution.
func (mt *MetaTable) GetCollectionID(ctx context.Context, dbName string, collectionName string) UniqueID {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
// backward compatibility for rolling upgrade
if dbName == "" {
log.Warn("db name is empty", zap.String("collectionName", collectionName))
dbName = util.DefaultDBName
}
_, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
if err != nil {
return InvalidCollectionID
}
collectionID, ok := mt.aliases.get(dbName, collectionName)
if ok {
return collectionID
}
collectionID, ok = mt.names.get(dbName, collectionName)
if ok {
return collectionID
}
return InvalidCollectionID
}
func (mt *MetaTable) getCollectionByNameInternal(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) {
// backward compatibility for rolling upgrade
if dbName == "" {

View File

@ -1245,6 +1245,54 @@ func (_c *IMetaTable_GetCollectionByName_Call) RunAndReturn(run func(context.Con
return _c
}
// GetCollectionID provides a mock function with given fields: ctx, dbName, collectionName
func (_m *IMetaTable) GetCollectionID(ctx context.Context, dbName string, collectionName string) int64 {
ret := _m.Called(ctx, dbName, collectionName)
if len(ret) == 0 {
panic("no return value specified for GetCollectionID")
}
var r0 int64
if rf, ok := ret.Get(0).(func(context.Context, string, string) int64); ok {
r0 = rf(ctx, dbName, collectionName)
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// IMetaTable_GetCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionID'
type IMetaTable_GetCollectionID_Call struct {
*mock.Call
}
// GetCollectionID is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
// - collectionName string
func (_e *IMetaTable_Expecter) GetCollectionID(ctx interface{}, dbName interface{}, collectionName interface{}) *IMetaTable_GetCollectionID_Call {
return &IMetaTable_GetCollectionID_Call{Call: _e.mock.On("GetCollectionID", ctx, dbName, collectionName)}
}
func (_c *IMetaTable_GetCollectionID_Call) Run(run func(ctx context.Context, dbName string, collectionName string)) *IMetaTable_GetCollectionID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string))
})
return _c
}
func (_c *IMetaTable_GetCollectionID_Call) Return(_a0 int64) *IMetaTable_GetCollectionID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *IMetaTable_GetCollectionID_Call) RunAndReturn(run func(context.Context, string, string) int64) *IMetaTable_GetCollectionID_Call {
_c.Call.Return(run)
return _c
}
// GetCollectionVirtualChannels provides a mock function with given fields: ctx, colID
func (_m *IMetaTable) GetCollectionVirtualChannels(ctx context.Context, colID int64) []string {
ret := _m.Called(ctx, colID)

View File

@ -37,7 +37,8 @@ func (t *renameCollectionTask) Prepare(ctx context.Context) error {
}
func (t *renameCollectionTask) Execute(ctx context.Context) error {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_RenameCollection)); err != nil {
collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetOldName())
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_RenameCollection)); err != nil {
return err
}
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs())

View File

@ -18,13 +18,15 @@ package rootcoord
import (
"context"
"fmt"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
)
func Test_renameCollectionTask_Prepare(t *testing.T) {
@ -55,7 +57,9 @@ func Test_renameCollectionTask_Prepare(t *testing.T) {
func Test_renameCollectionTask_Execute(t *testing.T) {
t.Run("failed to expire cache", func(t *testing.T) {
core := newTestCore(withInvalidProxyManager())
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta))
task := &renameCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.RenameCollectionRequest{
@ -69,12 +73,11 @@ func Test_renameCollectionTask_Execute(t *testing.T) {
})
t.Run("failed to rename collection", func(t *testing.T) {
meta := newMockMetaTable()
meta.RenameCollectionFunc = func(ctx context.Context, oldName string, newName string, ts Timestamp) error {
return errors.New("fail")
}
core := newTestCore(withValidProxyManager(), withMeta(meta))
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
mockMeta.EXPECT().RenameCollection(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("failed to alter alias"))
core := newTestCore(withValidProxyManager(), withMeta(mockMeta))
task := &renameCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.RenameCollectionRequest{

View File

@ -59,3 +59,7 @@ var _ hook.Extension = (*DefaultExtension)(nil)
func (d DefaultExtension) Report(info any) int {
return 0
}
func (d DefaultExtension) ReportRefused(ctx context.Context, req interface{}, resp interface{}, err error, fullMethod string) error {
return nil
}

View File

@ -607,6 +607,10 @@ func (r *ReportChanExtension) Report(info any) int {
return 1
}
func (r *ReportChanExtension) ReportRefused(ctx context.Context, req interface{}, resp interface{}, err error, fullMethod string) error {
return nil
}
func (r *ReportChanExtension) GetReportChan() <-chan any {
return r.reportChan
}