mirror of https://github.com/milvus-io/milvus.git
enhance: Use batch to speed up list collections from meta kv (#37742)
issue: #36228 Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/37775/head
parent
0fc0d1a888
commit
226fe900e7
|
@ -3,6 +3,9 @@ package rootcoord
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
|
@ -354,6 +357,28 @@ func (kc *Catalog) listPartitionsAfter210(ctx context.Context, collectionID type
|
|||
return partitions, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) batchListPartitionsAfter210(ctx context.Context, ts typeutil.Timestamp) (map[int64][]*model.Partition, error) {
|
||||
_, values, err := kc.Snapshot.LoadWithPrefix(PartitionMetaPrefix, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := make(map[int64][]*model.Partition)
|
||||
for i := 0; i < len(values); i++ {
|
||||
partitionMeta := &pb.PartitionInfo{}
|
||||
err := proto.Unmarshal([]byte(values[i]), partitionMeta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
collectionID := partitionMeta.GetCollectionId()
|
||||
if ret[collectionID] == nil {
|
||||
ret[collectionID] = make([]*model.Partition, 0)
|
||||
}
|
||||
ret[collectionID] = append(ret[collectionID], model.UnmarshalPartitionModel(partitionMeta))
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func fieldVersionAfter210(collMeta *pb.CollectionInfo) bool {
|
||||
return len(collMeta.GetSchema().GetFields()) <= 0
|
||||
}
|
||||
|
@ -376,6 +401,32 @@ func (kc *Catalog) listFieldsAfter210(ctx context.Context, collectionID typeutil
|
|||
return fields, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) batchListFieldsAfter210(ctx context.Context, ts typeutil.Timestamp) (map[int64][]*model.Field, error) {
|
||||
keys, values, err := kc.Snapshot.LoadWithPrefix(FieldMetaPrefix, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := make(map[int64][]*model.Field)
|
||||
for i := 0; i < len(values); i++ {
|
||||
fieldMeta := &schemapb.FieldSchema{}
|
||||
err := proto.Unmarshal([]byte(values[i]), fieldMeta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
collectionID, err := strconv.ParseInt(strings.Split(keys[i], "/")[2], 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ret[collectionID] == nil {
|
||||
ret[collectionID] = make([]*model.Field, 0)
|
||||
}
|
||||
ret[collectionID] = append(ret[collectionID], model.UnmarshalFieldModel(fieldMeta))
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) listFunctions(collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Function, error) {
|
||||
prefix := BuildFunctionPrefix(collectionID)
|
||||
_, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts)
|
||||
|
@ -394,6 +445,30 @@ func (kc *Catalog) listFunctions(collectionID typeutil.UniqueID, ts typeutil.Tim
|
|||
return functions, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) batchListFunctions(ts typeutil.Timestamp) (map[int64][]*model.Function, error) {
|
||||
keys, values, err := kc.Snapshot.LoadWithPrefix(FunctionMetaPrefix, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := make(map[int64][]*model.Function)
|
||||
for i := 0; i < len(values); i++ {
|
||||
functionSchema := &schemapb.FunctionSchema{}
|
||||
err := proto.Unmarshal([]byte(values[i]), functionSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
collectionID, err := strconv.ParseInt(strings.Split(keys[i], "/")[2], 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ret[collectionID] == nil {
|
||||
ret[collectionID] = make([]*model.Function, 0)
|
||||
}
|
||||
ret[collectionID] = append(ret[collectionID], model.UnmarshalFunctionModel(functionSchema))
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) appendPartitionAndFieldsInfo(ctx context.Context, collMeta *pb.CollectionInfo,
|
||||
ts typeutil.Timestamp,
|
||||
) (*model.Collection, error) {
|
||||
|
@ -423,6 +498,50 @@ func (kc *Catalog) appendPartitionAndFieldsInfo(ctx context.Context, collMeta *p
|
|||
return collection, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) batchAppendPartitionAndFieldsInfo(ctx context.Context, collMeta []*pb.CollectionInfo,
|
||||
ts typeutil.Timestamp,
|
||||
) ([]*model.Collection, error) {
|
||||
var partitionMetaMap map[int64][]*model.Partition
|
||||
var fieldMetaMap map[int64][]*model.Field
|
||||
var functionMetaMap map[int64][]*model.Function
|
||||
ret := make([]*model.Collection, 0)
|
||||
for _, coll := range collMeta {
|
||||
collection := model.UnmarshalCollectionModel(coll)
|
||||
if partitionVersionAfter210(coll) || fieldVersionAfter210(coll) {
|
||||
if len(partitionMetaMap) == 0 {
|
||||
var err error
|
||||
partitionMetaMap, err = kc.batchListPartitionsAfter210(ctx, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fieldMetaMap, err = kc.batchListFieldsAfter210(ctx, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
functionMetaMap, err = kc.batchListFunctions(ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if partitionMetaMap[collection.CollectionID] != nil {
|
||||
collection.Partitions = partitionMetaMap[collection.CollectionID]
|
||||
}
|
||||
if fieldMetaMap[collection.CollectionID] != nil {
|
||||
collection.Fields = fieldMetaMap[collection.CollectionID]
|
||||
}
|
||||
if functionMetaMap[collection.CollectionID] != nil {
|
||||
collection.Functions = functionMetaMap[collection.CollectionID]
|
||||
}
|
||||
}
|
||||
ret = append(ret, collection)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) GetCollectionByID(ctx context.Context, dbID int64, ts typeutil.Timestamp, collectionID typeutil.UniqueID) (*model.Collection, error) {
|
||||
collMeta, err := kc.loadCollection(ctx, dbID, collectionID, ts)
|
||||
if err != nil {
|
||||
|
@ -677,23 +796,28 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
|
|||
return nil, err
|
||||
}
|
||||
|
||||
colls := make([]*model.Collection, 0, len(vals))
|
||||
start := time.Now()
|
||||
colls := make([]*pb.CollectionInfo, 0, len(vals))
|
||||
for _, val := range vals {
|
||||
collMeta := pb.CollectionInfo{}
|
||||
err := proto.Unmarshal([]byte(val), &collMeta)
|
||||
collMeta := &pb.CollectionInfo{}
|
||||
err := proto.Unmarshal([]byte(val), collMeta)
|
||||
if err != nil {
|
||||
log.Warn("unmarshal collection info failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
kc.fixDefaultDBIDConsistency(ctx, &collMeta, ts)
|
||||
collection, err := kc.appendPartitionAndFieldsInfo(ctx, &collMeta, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
colls = append(colls, collection)
|
||||
kc.fixDefaultDBIDConsistency(ctx, collMeta, ts)
|
||||
colls = append(colls, collMeta)
|
||||
}
|
||||
log.Info("unmarshal all collection details cost", zap.Int64("db", dbID), zap.Duration("cost", time.Since(start)))
|
||||
|
||||
start = time.Now()
|
||||
ret, err := kc.batchAppendPartitionAndFieldsInfo(ctx, colls, ts)
|
||||
log.Info("append partition and fields info cost", zap.Int64("db", dbID), zap.Duration("cost", time.Since(start)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return colls, nil
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// fixDefaultDBIDConsistency fix dbID consistency for collectionInfo.
|
||||
|
|
|
@ -197,7 +197,7 @@ func TestCatalog_ListCollections(t *testing.T) {
|
|||
func(prefix string) bool {
|
||||
return strings.HasPrefix(prefix, PartitionMetaPrefix)
|
||||
}), ts).
|
||||
Return([]string{"key"}, []string{string(pm)}, nil)
|
||||
Return([]string{"rootcoord/partitions/1/1"}, []string{string(pm)}, nil)
|
||||
|
||||
fieldMeta := &schemapb.FieldSchema{}
|
||||
fm, err := proto.Marshal(fieldMeta)
|
||||
|
@ -207,7 +207,7 @@ func TestCatalog_ListCollections(t *testing.T) {
|
|||
func(prefix string) bool {
|
||||
return strings.HasPrefix(prefix, FieldMetaPrefix)
|
||||
}), ts).
|
||||
Return([]string{"key"}, []string{string(fm)}, nil)
|
||||
Return([]string{"rootcoord/fields/1/1"}, []string{string(fm)}, nil)
|
||||
|
||||
functionMeta := &schemapb.FunctionSchema{}
|
||||
fcm, err := proto.Marshal(functionMeta)
|
||||
|
@ -216,7 +216,7 @@ func TestCatalog_ListCollections(t *testing.T) {
|
|||
func(prefix string) bool {
|
||||
return strings.HasPrefix(prefix, FunctionMetaPrefix)
|
||||
}), ts).
|
||||
Return([]string{"key"}, []string{string(fcm)}, nil)
|
||||
Return([]string{"rootcoord/functions/1/1"}, []string{string(fcm)}, nil)
|
||||
|
||||
kc := Catalog{Snapshot: kv}
|
||||
ret, err := kc.ListCollections(ctx, testDb, ts)
|
||||
|
@ -247,7 +247,7 @@ func TestCatalog_ListCollections(t *testing.T) {
|
|||
func(prefix string) bool {
|
||||
return strings.HasPrefix(prefix, PartitionMetaPrefix)
|
||||
}), ts).
|
||||
Return([]string{"key"}, []string{string(pm)}, nil)
|
||||
Return([]string{"rootcoord/partitions/1/1"}, []string{string(pm)}, nil)
|
||||
|
||||
fieldMeta := &schemapb.FieldSchema{}
|
||||
fm, err := proto.Marshal(fieldMeta)
|
||||
|
@ -257,7 +257,7 @@ func TestCatalog_ListCollections(t *testing.T) {
|
|||
func(prefix string) bool {
|
||||
return strings.HasPrefix(prefix, FieldMetaPrefix)
|
||||
}), ts).
|
||||
Return([]string{"key"}, []string{string(fm)}, nil)
|
||||
Return([]string{"rootcoord/fields/1/1"}, []string{string(fm)}, nil)
|
||||
|
||||
functionMeta := &schemapb.FunctionSchema{}
|
||||
fcm, err := proto.Marshal(functionMeta)
|
||||
|
@ -266,7 +266,7 @@ func TestCatalog_ListCollections(t *testing.T) {
|
|||
func(prefix string) bool {
|
||||
return strings.HasPrefix(prefix, FunctionMetaPrefix)
|
||||
}), ts).
|
||||
Return([]string{"key"}, []string{string(fcm)}, nil)
|
||||
Return([]string{"rootcoord/functions/1/1"}, []string{string(fcm)}, nil)
|
||||
|
||||
kv.On("MultiSaveAndRemove", mock.Anything, mock.Anything, ts).Return(nil)
|
||||
kc := Catalog{Snapshot: kv}
|
||||
|
|
Loading…
Reference in New Issue