fix: lost dbname when only passing collection id to describeCollection (#31167)

issue: #30931

Signed-off-by: chyezh <chyezh@outlook.com>
pull/31297/head
chyezh 2024-03-11 17:19:02 +08:00 committed by GitHub
parent 58d7b9f902
commit 6a9418d200
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 38 additions and 7 deletions

View File

@ -210,7 +210,12 @@ func (kc *Catalog) loadCollectionFromDefaultDb(ctx context.Context, collectionID
func (kc *Catalog) loadCollection(ctx context.Context, dbID int64, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { func (kc *Catalog) loadCollection(ctx context.Context, dbID int64, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
if isDefaultDB(dbID) { if isDefaultDB(dbID) {
return kc.loadCollectionFromDefaultDb(ctx, collectionID, ts) info, err := kc.loadCollectionFromDefaultDb(ctx, collectionID, ts)
if err != nil {
return nil, err
}
fixDefaultDBIDConsistency(info)
return info, nil
} }
return kc.loadCollectionFromDb(ctx, dbID, collectionID, ts) return kc.loadCollectionFromDb(ctx, dbID, collectionID, ts)
} }
@ -625,6 +630,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
log.Warn("unmarshal collection info failed", zap.Error(err)) log.Warn("unmarshal collection info failed", zap.Error(err))
continue continue
} }
fixDefaultDBIDConsistency(&collMeta)
collection, err := kc.appendPartitionAndFieldsInfo(ctx, &collMeta, ts) collection, err := kc.appendPartitionAndFieldsInfo(ctx, &collMeta, ts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1188,3 +1194,12 @@ func isDefaultDB(dbID int64) bool {
} }
return false return false
} }
// fixDefaultDBIDConsistency fix dbID consistency for collectionInfo.
// We have two versions of default databaseID (0 at legacy path, 1 at new path), we should keep consistent view when user use default database.
// all collections in default database should be marked with dbID 1.
func fixDefaultDBIDConsistency(coll *pb.CollectionInfo) {
if isDefaultDB(coll.DbId) {
coll.DbId = util.DefaultDBID
}
}

View File

@ -73,6 +73,7 @@ func TestCatalog_ListCollections(t *testing.T) {
coll2 := &pb.CollectionInfo{ coll2 := &pb.CollectionInfo{
ID: 2, ID: 2,
DbId: testDb,
Schema: &schemapb.CollectionSchema{ Schema: &schemapb.CollectionSchema{
Name: "c1", Name: "c1",
Fields: []*schemapb.FieldSchema{ Fields: []*schemapb.FieldSchema{
@ -172,6 +173,7 @@ func TestCatalog_ListCollections(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 1, len(ret)) assert.Equal(t, 1, len(ret))
assert.Equal(t, coll1.ID, ret[0].CollectionID) assert.Equal(t, coll1.ID, ret[0].CollectionID)
assert.Equal(t, util.DefaultDBID, ret[0].DBID)
}) })
t.Run("list collection with db", func(t *testing.T) { t.Run("list collection with db", func(t *testing.T) {
@ -208,6 +210,7 @@ func TestCatalog_ListCollections(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, ret) assert.NotNil(t, ret)
assert.Equal(t, 1, len(ret)) assert.Equal(t, 1, len(ret))
assert.Equal(t, int64(testDb), ret[0].DBID)
}) })
t.Run("list collection ok for the newest version", func(t *testing.T) { t.Run("list collection ok for the newest version", func(t *testing.T) {
@ -376,6 +379,7 @@ func TestCatalog_GetCollectionByID(t *testing.T) {
coll, err = c.GetCollectionByID(ctx, 0, 10000, 1) coll, err = c.GetCollectionByID(ctx, 0, 10000, 1)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, coll) assert.NotNil(t, coll)
assert.Equal(t, util.DefaultDBID, coll.DBID)
} }
func TestCatalog_CreatePartitionV2(t *testing.T) { func TestCatalog_CreatePartitionV2(t *testing.T) {

View File

@ -45,7 +45,10 @@ func (t *describeCollectionTask) Execute(ctx context.Context) (err error) {
return err return err
} }
aliases := t.core.meta.ListAliasesByID(coll.CollectionID) aliases := t.core.meta.ListAliasesByID(coll.CollectionID)
t.Rsp = convertModelToDesc(coll, aliases) db, err := t.core.meta.GetDatabaseByID(ctx, coll.DBID, t.GetTs())
t.Rsp.DbName = t.Req.GetDbName() if err != nil {
return err
}
t.Rsp = convertModelToDesc(coll, aliases, db.Name)
return nil return nil
} }

View File

@ -101,10 +101,15 @@ func Test_describeCollectionTask_Execute(t *testing.T) {
).Return(&model.Collection{ ).Return(&model.Collection{
CollectionID: 1, CollectionID: 1,
Name: "test coll", Name: "test coll",
DBID: 1,
}, nil) }, nil)
meta.On("ListAliasesByID", meta.On("ListAliasesByID",
mock.Anything, mock.Anything,
).Return([]string{alias1, alias2}) ).Return([]string{alias1, alias2})
meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).Return(&model.Database{
ID: 1,
Name: "test db",
}, nil)
core := newTestCore(withMeta(meta)) core := newTestCore(withMeta(meta))
task := &describeCollectionTask{ task := &describeCollectionTask{
@ -120,6 +125,7 @@ func Test_describeCollectionTask_Execute(t *testing.T) {
err := task.Execute(context.Background()) err := task.Execute(context.Background())
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) assert.Equal(t, task.Rsp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
assert.Equal(t, "test db", task.Rsp.GetDbName())
assert.ElementsMatch(t, []string{alias1, alias2}, task.Rsp.GetAliases()) assert.ElementsMatch(t, []string{alias1, alias2}, task.Rsp.GetAliases())
}) })
} }

View File

@ -338,7 +338,7 @@ func (mt *MetaTable) GetDatabaseByName(ctx context.Context, dbName string, ts Ti
return mt.getDatabaseByNameInternal(ctx, dbName, ts) return mt.getDatabaseByNameInternal(ctx, dbName, ts)
} }
func (mt *MetaTable) getDatabaseByNameInternal(ctx context.Context, dbName string, ts Timestamp) (*model.Database, error) { func (mt *MetaTable) getDatabaseByNameInternal(_ context.Context, dbName string, _ Timestamp) (*model.Database, error) {
// backward compatibility for rolling upgrade // backward compatibility for rolling upgrade
if dbName == "" { if dbName == "" {
log.Warn("db name is empty") log.Warn("db name is empty")

View File

@ -1122,8 +1122,11 @@ func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeColl
return c.meta.GetCollectionByID(ctx, in.GetDbName(), in.GetCollectionID(), ts, allowUnavailable) return c.meta.GetCollectionByID(ctx, in.GetDbName(), in.GetCollectionID(), ts, allowUnavailable)
} }
func convertModelToDesc(collInfo *model.Collection, aliases []string) *milvuspb.DescribeCollectionResponse { func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName string) *milvuspb.DescribeCollectionResponse {
resp := &milvuspb.DescribeCollectionResponse{Status: merr.Success()} resp := &milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
DbName: dbName,
}
resp.Schema = &schemapb.CollectionSchema{ resp.Schema = &schemapb.CollectionSchema{
Name: collInfo.Name, Name: collInfo.Name,