fix: [2.4] Write back dbid modification for nonDB id collection (#33641) (#33694)

Cherry-pick from master
pr: #33641
See also #33608

Make `fixDefaultDBIDConsistency` also write back collection dbid
modification when nonDB id collection is found.

This fix shall prevent dropped collections of this kind show up again
after dropping and restart.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/33747/head
congqixia 2024-06-11 11:25:55 +08:00 committed by GitHub
parent 396f8608dd
commit ee22750104
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 74 additions and 59 deletions

View File

@ -231,7 +231,7 @@ func (kc *Catalog) loadCollection(ctx context.Context, dbID int64, collectionID
if err != nil {
return nil, err
}
fixDefaultDBIDConsistency(info)
kc.fixDefaultDBIDConsistency(ctx, info, ts)
return info, nil
}
return kc.loadCollectionFromDb(ctx, dbID, collectionID, ts)
@ -647,7 +647,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
log.Warn("unmarshal collection info failed", zap.Error(err))
continue
}
fixDefaultDBIDConsistency(&collMeta)
kc.fixDefaultDBIDConsistency(ctx, &collMeta, ts)
collection, err := kc.appendPartitionAndFieldsInfo(ctx, &collMeta, ts)
if err != nil {
return nil, err
@ -658,6 +658,22 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
return colls, nil
}
// fixDefaultDBIDConsistency fix dbID consistency for collectionInfo.
// We have two versions of default databaseID (0 at legacy path, 1 at new path), we should keep consistent view when user use default database.
// all collections in default database should be marked with dbID 1.
// this method also update dbid in meta store when dbid is 0
// see also: https://github.com/milvus-io/milvus/issues/33608
func (kv *Catalog) fixDefaultDBIDConsistency(_ context.Context, collMeta *pb.CollectionInfo, ts typeutil.Timestamp) {
if collMeta.DbId == util.NonDBID {
coll := model.UnmarshalCollectionModel(collMeta)
cloned := coll.Clone()
cloned.DBID = util.DefaultDBID
kv.alterModifyCollection(coll, cloned, ts)
collMeta.DbId = util.DefaultDBID
}
}
func (kc *Catalog) listAliasesBefore210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) {
_, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix210, ts)
if err != nil {
@ -1211,12 +1227,3 @@ func isDefaultDB(dbID int64) bool {
}
return false
}
// fixDefaultDBIDConsistency fix dbID consistency for collectionInfo.
// We have two versions of default databaseID (0 at legacy path, 1 at new path), we should keep consistent view when user use default database.
// all collections in default database should be marked with dbID 1.
func fixDefaultDBIDConsistency(coll *pb.CollectionInfo) {
if isDefaultDB(coll.DbId) {
coll.DbId = util.DefaultDBID
}
}

View File

@ -167,6 +167,7 @@ func TestCatalog_ListCollections(t *testing.T) {
assert.NoError(t, err)
kv.On("LoadWithPrefix", CollectionMetaPrefix, ts).
Return([]string{"key"}, []string{string(bColl)}, nil)
kv.On("MultiSaveAndRemoveWithPrefix", mock.Anything, mock.Anything, ts).Return(nil)
kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, util.NonDBID, ts)
@ -245,6 +246,7 @@ func TestCatalog_ListCollections(t *testing.T) {
return strings.HasPrefix(prefix, FieldMetaPrefix)
}), ts).
Return([]string{"key"}, []string{string(fm)}, nil)
kv.On("MultiSaveAndRemoveWithPrefix", mock.Anything, mock.Anything, ts).Return(nil)
kc := Catalog{Snapshot: kv}
ret, err := kc.ListCollections(ctx, util.NonDBID, ts)
@ -259,37 +261,49 @@ func TestCatalog_ListCollections(t *testing.T) {
func TestCatalog_loadCollection(t *testing.T) {
t.Run("load failed", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return "", errors.New("mock")
}
kc := Catalog{Snapshot: snapshot}
kv := mocks.NewSnapShotKV(t)
kv.EXPECT().Load(mock.Anything, mock.Anything).Return("", errors.New("mock"))
kc := Catalog{Snapshot: kv}
_, err := kc.loadCollection(ctx, testDb, 1, 0)
assert.Error(t, err)
})
t.Run("load, not collection info", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return "not in pb format", nil
}
kc := Catalog{Snapshot: snapshot}
kv := mocks.NewSnapShotKV(t)
kv.EXPECT().Load(mock.Anything, mock.Anything).Return("not in pb format", nil)
kc := Catalog{Snapshot: kv}
_, err := kc.loadCollection(ctx, testDb, 1, 0)
assert.Error(t, err)
})
t.Run("load, normal collection info", func(t *testing.T) {
ctx := context.Background()
snapshot := kv.NewMockSnapshotKV()
coll := &pb.CollectionInfo{ID: 1}
coll := &pb.CollectionInfo{ID: 1, DbId: util.DefaultDBID}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
kv := mocks.NewSnapShotKV(t)
kv.EXPECT().Load(mock.Anything, mock.Anything).Return(string(value), nil)
kc := Catalog{Snapshot: kv}
got, err := kc.loadCollection(ctx, util.DefaultDBID, 1, 0)
assert.NoError(t, err)
assert.Equal(t, got.GetID(), coll.GetID())
})
t.Run("load, nonDBID collection info", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{
ID: 1,
DbId: util.NonDBID,
Schema: &schemapb.CollectionSchema{},
}
kc := Catalog{Snapshot: snapshot}
got, err := kc.loadCollection(ctx, 0, 1, 0)
value, err := proto.Marshal(coll)
assert.NoError(t, err)
kv := mocks.NewSnapShotKV(t)
kv.EXPECT().Load(mock.Anything, mock.Anything).Return(string(value), nil)
kv.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil)
kc := Catalog{Snapshot: kv}
got, err := kc.loadCollection(ctx, util.NonDBID, 1, 0)
assert.NoError(t, err)
assert.Equal(t, got.GetID(), coll.GetID())
})
@ -344,38 +358,28 @@ func TestCatalog_GetCollectionByID(t *testing.T) {
ss := mocks.NewSnapShotKV(t)
c := Catalog{Snapshot: ss}
ss.EXPECT().Load(mock.Anything, mock.Anything).Call.Return(
func(key string, ts typeutil.Timestamp) string {
if ts > 1000 {
collByte, err := proto.Marshal(&pb.CollectionInfo{
ID: 1,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{},
},
},
PartitionIDs: []int64{1, 2, 3},
PartitionNames: []string{"1", "2", "3"},
PartitionCreatedTimestamps: []uint64{1, 2, 3},
})
require.NoError(t, err)
return string(collByte)
}
return ""
},
func(key string, ts typeutil.Timestamp) error {
if ts > 1000 {
return nil
}
return errors.New("load error")
},
)
ss.EXPECT().Load(mock.Anything, mock.Anything).Return("", errors.New("load error")).Twice()
coll, err := c.GetCollectionByID(ctx, 0, 1, 1)
assert.Error(t, err)
assert.Nil(t, coll)
ss.EXPECT().Load(mock.Anything, mock.Anything).RunAndReturn(func(key string, ts uint64) (string, error) {
collByte, err := proto.Marshal(&pb.CollectionInfo{
ID: 1,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{},
},
},
PartitionIDs: []int64{1, 2, 3},
PartitionNames: []string{"1", "2", "3"},
PartitionCreatedTimestamps: []uint64{1, 2, 3},
})
require.NoError(t, err)
return string(collByte), nil
}).Once()
ss.EXPECT().MultiSaveAndRemoveWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil)
coll, err = c.GetCollectionByID(ctx, 0, 10000, 1)
assert.NoError(t, err)
assert.NotNil(t, coll)
@ -397,7 +401,9 @@ func TestCatalog_CreatePartitionV2(t *testing.T) {
t.Run("partition version after 210", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{}
coll := &pb.CollectionInfo{
DbId: util.DefaultDBID,
}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
@ -425,7 +431,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) {
ctx := context.Background()
partID := typeutil.UniqueID(1)
coll := &pb.CollectionInfo{PartitionIDs: []int64{partID}}
coll := &pb.CollectionInfo{DbId: util.DefaultDBID, PartitionIDs: []int64{partID}}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
@ -444,7 +450,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) {
ctx := context.Background()
partition := "partition"
coll := &pb.CollectionInfo{PartitionNames: []string{partition}}
coll := &pb.CollectionInfo{DbId: util.DefaultDBID, PartitionNames: []string{partition}}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
@ -463,6 +469,7 @@ func TestCatalog_CreatePartitionV2(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{
DbId: util.DefaultDBID,
PartitionNames: []string{"partition"},
PartitionIDs: []int64{111},
PartitionCreatedTimestamps: []uint64{111111},
@ -698,7 +705,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) {
t.Run("partition version after 210", func(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{}
coll := &pb.CollectionInfo{DbId: util.DefaultDBID}
value, err := proto.Marshal(coll)
assert.NoError(t, err)
@ -726,6 +733,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) {
ctx := context.Background()
coll := &pb.CollectionInfo{
DbId: util.DefaultDBID,
PartitionIDs: []int64{101, 102},
PartitionNames: []string{"partition1", "partition2"},
PartitionCreatedTimestamps: []uint64{101, 102},