enhance: Break down the granularity of collection info cache expired (#29977)

issue: #29772 

1. `DropPartition` only invalidates the cache related to the partition.
2. `CreateAlias` does not invalidate the cache.

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/30389/head
cai.zhang 2024-01-30 16:45:02 +08:00 committed by GitHub
parent 168260cba3
commit f619d792c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 127 additions and 84 deletions

View File

@ -32,6 +32,7 @@ message InvalidateCollMetaCacheRequest {
string db_name = 2;
string collection_name = 3;
int64 collectionID = 4;
string partition_name = 5;
}
message InvalidateCredCacheRequest {

View File

@ -107,22 +107,49 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
zap.String("db", request.DbName),
zap.String("collectionName", request.CollectionName),
zap.Int64("collectionID", request.CollectionID),
zap.String("msgType", request.GetBase().GetMsgType().String()),
zap.String("partitionName", request.GetPartitionName()),
)
log.Info("received request to invalidate collection meta cache")
collectionName := request.CollectionName
collectionID := request.CollectionID
var aliasName []string
if globalMetaCache != nil {
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
switch request.GetBase().GetMsgType() {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
}
log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", collectionName))
case commonpb.MsgType_DropPartition:
if globalMetaCache != nil {
if collectionName != "" && request.GetPartitionName() != "" {
globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName())
} else {
log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty",
zap.String("collectionName", collectionName),
zap.String("partitionName", request.GetPartitionName()))
return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil
}
}
default:
log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String()))
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
}
}
}
if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
// no need to handle error, since this Proxy may not create dml stream for the collection.
node.chMgr.removeDMLStream(request.GetCollectionID())

View File

@ -823,6 +823,41 @@ func (_c *MockCache_RefreshPolicyInfo_Call) RunAndReturn(run func(typeutil.Cache
return _c
}
// RemoveAlias provides a mock function with given fields: ctx, database, alias
func (_m *MockCache) RemoveAlias(ctx context.Context, database string, alias string) {
_m.Called(ctx, database, alias)
}
// MockCache_RemoveAlias_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveAlias'
type MockCache_RemoveAlias_Call struct {
*mock.Call
}
// RemoveAlias is a helper method to define mock.On call
// - ctx context.Context
// - database string
// - alias string
func (_e *MockCache_Expecter) RemoveAlias(ctx interface{}, database interface{}, alias interface{}) *MockCache_RemoveAlias_Call {
return &MockCache_RemoveAlias_Call{Call: _e.mock.On("RemoveAlias", ctx, database, alias)}
}
func (_c *MockCache_RemoveAlias_Call) Run(run func(ctx context.Context, database string, alias string)) *MockCache_RemoveAlias_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string))
})
return _c
}
func (_c *MockCache_RemoveAlias_Call) Return() *MockCache_RemoveAlias_Call {
_c.Call.Return()
return _c
}
func (_c *MockCache_RemoveAlias_Call) RunAndReturn(run func(context.Context, string, string)) *MockCache_RemoveAlias_Call {
_c.Call.Return(run)
return _c
}
// RemoveCollection provides a mock function with given fields: ctx, database, collectionName
func (_m *MockCache) RemoveCollection(ctx context.Context, database string, collectionName string) {
_m.Called(ctx, database, collectionName)

View File

@ -745,7 +745,7 @@ func TestProxy(t *testing.T) {
_, _ = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgType: commonpb.MsgType_CreateAlias,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
@ -795,13 +795,13 @@ func TestProxy(t *testing.T) {
_, _ = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgType: commonpb.MsgType_AlterAlias,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
DbName: dbName,
CollectionName: collectionName,
CollectionName: "alias",
})
nonExistingCollName := "coll_name_random_zarathustra"
@ -829,14 +829,17 @@ func TestProxy(t *testing.T) {
_, _ = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgType: commonpb.MsgType_DropAlias,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
DbName: dbName,
CollectionName: collectionName,
CollectionName: "alias",
})
_, err = globalMetaCache.GetCollectionID(ctx, dbName, "alias")
assert.Error(t, err)
})
wg.Add(1)
@ -1993,15 +1996,6 @@ func TestProxy(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, "", resp.Reason)
// release collection cache
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: nil,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, "", resp.Reason)
})
wg.Add(1)
@ -2297,13 +2291,19 @@ func TestProxy(t *testing.T) {
// invalidate meta cache
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: nil,
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropPartition,
},
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
_, err = globalMetaCache.GetPartitionID(ctx, dbName, collectionName, partitionName)
assert.Error(t, err)
// drop non-exist partition -> fail
resp, err = proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{
@ -2314,6 +2314,17 @@ func TestProxy(t *testing.T) {
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// not specify partition name
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropPartition,
},
DbName: dbName,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
wg.Add(1)
@ -2406,20 +2417,17 @@ func TestProxy(t *testing.T) {
// invalidate meta cache
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: nil,
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropCollection,
},
DbName: dbName,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
// release collection load cache
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: nil,
CollectionName: collectionName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
_, err = globalMetaCache.GetCollectionID(ctx, dbName, collectionName)
assert.Error(t, err)
})
wg.Add(1)

View File

@ -21,6 +21,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
)
type alterAliasTask struct {
@ -36,7 +37,7 @@ func (t *alterAliasTask) Prepare(ctx context.Context) error {
}
func (t *alterAliasTask) Execute(ctx context.Context) error {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, t.GetTs()); err != nil {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_AlterAlias)); err != nil {
return err
}
// alter alias is atomic enough.

View File

@ -67,14 +67,6 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
ts: ts,
})
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: a.core},
dbName: a.Req.GetDbName(),
collectionNames: []string{oldColl.Name},
collectionID: oldColl.CollectionID,
ts: ts,
})
a.Req.CollectionID = oldColl.CollectionID
redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{
baseStep: baseStep{core: a.core},

View File

@ -127,7 +127,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return errors.New("err")
}
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withMeta(meta), withBroker(broker))
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
@ -161,7 +161,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return nil
}
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withMeta(meta), withBroker(broker))
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{

View File

@ -36,9 +36,6 @@ func (t *createAliasTask) Prepare(ctx context.Context) error {
}
func (t *createAliasTask) Execute(ctx context.Context) error {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias(), t.Req.GetCollectionName()}, InvalidCollectionID, t.GetTs()); err != nil {
return err
}
// create alias is atomic enough.
return t.core.meta.CreateAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs())
}

View File

@ -41,19 +41,6 @@ func Test_createAliasTask_Prepare(t *testing.T) {
}
func Test_createAliasTask_Execute(t *testing.T) {
t.Run("failed to expire cache", func(t *testing.T) {
core := newTestCore(withInvalidProxyManager())
task := &createAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.CreateAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias},
Alias: "test",
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed to create alias", func(t *testing.T) {
core := newTestCore(withInvalidMeta(), withValidProxyManager())
task := &createAliasTask{

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
ms "github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -478,6 +479,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
collectionNames: []string{t.Req.GetCollectionName()},
collectionID: InvalidCollectionID,
ts: ts,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropCollection)},
}, &nullStep{})
undoTask.AddStep(&nullStep{}, &removeDmlChannelsStep{
baseStep: baseStep{core: t.core},

View File

@ -81,6 +81,7 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {
dbName: t.Req.GetDbName(),
collectionNames: []string{t.collMeta.Name},
collectionID: t.collMeta.CollectionID,
partitionName: t.Req.GetPartitionName(),
ts: t.GetTs(),
}, &nullStep{})

View File

@ -21,6 +21,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
)
type dropAliasTask struct {
@ -37,7 +38,7 @@ func (t *dropAliasTask) Prepare(ctx context.Context) error {
func (t *dropAliasTask) Execute(ctx context.Context) error {
// drop alias is atomic enough.
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, t.GetTs()); err != nil {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_DropAlias)); err != nil {
return err
}
return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs())

View File

@ -80,7 +80,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
collectionNames: append(aliases, collMeta.Name),
collectionID: collMeta.CollectionID,
ts: ts,
opts: []proxyutil.ExpireCacheOpt{proxyutil.ExpireCacheWithDropFlag()},
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropCollection)},
})
redoTask.AddSyncStep(&changeCollectionStateStep{
baseStep: baseStep{core: t.core},

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
)
@ -73,7 +74,9 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
dbName: t.Req.GetDbName(),
collectionNames: []string{t.collMeta.Name},
collectionID: t.collMeta.CollectionID,
partitionName: t.Req.GetPartitionName(),
ts: t.GetTs(),
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropPartition)},
})
redoTask.AddSyncStep(&changePartitionStateStep{
baseStep: baseStep{core: t.core},

View File

@ -26,20 +26,7 @@ import (
)
// ExpireMetaCache will call invalidate collection meta cache
func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []string, collectionID UniqueID, ts typeutil.Timestamp, opts ...proxyutil.ExpireCacheOpt) error {
// if collectionID is specified, invalidate all the collection meta cache with the specified collectionID and return
if collectionID != InvalidCollectionID {
req := proxypb.InvalidateCollMetaCacheRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(c.session.ServerID),
),
DbName: dbName,
CollectionID: collectionID,
}
return c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req, opts...)
}
func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []string, collectionID UniqueID, partitionName string, ts typeutil.Timestamp, opts ...proxyutil.ExpireCacheOpt) error {
// if only collNames are specified, invalidate the collection meta cache with the specified collectionName
for _, collName := range collNames {
req := proxypb.InvalidateCollMetaCacheRequest{
@ -49,6 +36,8 @@ func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []s
),
DbName: dbName,
CollectionName: collName,
CollectionID: collectionID,
PartitionName: partitionName,
}
err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req, opts...)
if err != nil {

View File

@ -30,8 +30,8 @@ func Test_expireCacheConfig_apply(t *testing.T) {
c := proxyutil.DefaultExpireCacheConfig()
req := &proxypb.InvalidateCollMetaCacheRequest{}
c.Apply(req)
assert.Nil(t, req.GetBase())
opt := proxyutil.ExpireCacheWithDropFlag()
assert.Equal(t, commonpb.MsgType_Undefined, req.GetBase().GetMsgType())
opt := proxyutil.SetMsgType(commonpb.MsgType_DropCollection)
opt(&c)
c.Apply(req)
assert.Equal(t, commonpb.MsgType_DropCollection, req.GetBase().GetMsgType())

View File

@ -21,6 +21,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
)
type renameCollectionTask struct {
@ -36,7 +37,7 @@ func (t *renameCollectionTask) Prepare(ctx context.Context) error {
}
func (t *renameCollectionTask) Execute(ctx context.Context) error {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, InvalidCollectionID, t.GetTs()); err != nil {
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, InvalidCollectionID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_RenameCollection)); err != nil {
return err
}
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs())

View File

@ -170,12 +170,13 @@ type expireCacheStep struct {
dbName string
collectionNames []string
collectionID UniqueID
partitionName string
ts Timestamp
opts []proxyutil.ExpireCacheOpt
}
func (s *expireCacheStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.ExpireMetaCache(ctx, s.dbName, s.collectionNames, s.collectionID, s.ts, s.opts...)
err := s.core.ExpireMetaCache(ctx, s.dbName, s.collectionNames, s.collectionID, s.partitionName, s.ts, s.opts...)
return nil, err
}

View File

@ -40,28 +40,25 @@ import (
)
type ExpireCacheConfig struct {
withDropFlag bool
msgType commonpb.MsgType
}
func (c ExpireCacheConfig) Apply(req *proxypb.InvalidateCollMetaCacheRequest) {
if !c.withDropFlag {
return
}
if req.GetBase() == nil {
req.Base = commonpbutil.NewMsgBase()
}
req.Base.MsgType = commonpb.MsgType_DropCollection
req.Base.MsgType = c.msgType
}
func DefaultExpireCacheConfig() ExpireCacheConfig {
return ExpireCacheConfig{withDropFlag: false}
return ExpireCacheConfig{}
}
type ExpireCacheOpt func(c *ExpireCacheConfig)
func ExpireCacheWithDropFlag() ExpireCacheOpt {
func SetMsgType(msgType commonpb.MsgType) ExpireCacheOpt {
return func(c *ExpireCacheConfig) {
c.withDropFlag = true
c.msgType = msgType
}
}