milvus/internal/datacoord/create_meta_test.go

688 lines
20 KiB
Go

package datacoord
import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
var (
collID = UniqueID(100)
partID = UniqueID(200)
indexID = UniqueID(300)
fieldID = UniqueID(400)
indexName = "_default_idx"
segID = UniqueID(500)
buildID = UniqueID(600)
nodeID = UniqueID(700)
partitionKeyID = UniqueID(800)
statsTaskID = UniqueID(900)
)
func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
indexBuildInfo := newSegmentIndexBuildInfo()
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 2,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 2,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: true,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 3,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 3,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 4,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 4,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 5,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 5,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 6,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 6,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 7,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 7,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Failed,
FailReason: "error",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 8,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 8,
NodeID: nodeID + 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 9,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 9,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
indexBuildInfo.Add(&model.SegmentIndex{
SegmentID: segID + 10,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 10,
NodeID: nodeID,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx0.Insert(indexID, &model.SegmentIndex{
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1025,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx1 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx1.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx2 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx2.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 2,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 2,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: true,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx3 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx3.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 3,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 3,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx4 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx4.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 4,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 4,
NodeID: nodeID,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx5 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx5.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 5,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 5,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx6 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx6.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 6,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 6,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx7 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx7.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 7,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 7,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Failed,
FailReason: "error",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx8 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx8.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 8,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 8,
NodeID: nodeID + 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx9 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx9.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 9,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 9,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIdx10 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx10.Insert(indexID, &model.SegmentIndex{
SegmentID: segID + 10,
CollectionID: collID,
PartitionID: partID,
NumRows: 500,
IndexID: indexID,
BuildID: buildID + 10,
NodeID: nodeID,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 1111,
IndexFileKeys: nil,
IndexSerializedSize: 1,
})
segIndexes.Insert(segID, segIdx0)
segIndexes.Insert(segID+1, segIdx1)
segIndexes.Insert(segID+2, segIdx2)
segIndexes.Insert(segID+3, segIdx3)
segIndexes.Insert(segID+4, segIdx4)
segIndexes.Insert(segID+5, segIdx5)
segIndexes.Insert(segID+6, segIdx6)
segIndexes.Insert(segID+7, segIdx7)
segIndexes.Insert(segID+8, segIdx8)
segIndexes.Insert(segID+9, segIdx9)
segIndexes.Insert(segID+10, segIdx10)
return &indexMeta{
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 1,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
{
Key: common.MetricTypeKey,
Value: "L2",
},
},
},
},
},
segmentIndexes: segIndexes,
segmentBuildInfo: indexBuildInfo,
}
}
type testMetaOption func(*meta)
func withAnalyzeMeta(am *analyzeMeta) testMetaOption {
return func(mt *meta) {
mt.analyzeMeta = am
}
}
func withIndexMeta(im *indexMeta) testMetaOption {
return func(mt *meta) {
mt.indexMeta = im
}
}
func withStatsTaskMeta(stm *statsTaskMeta) testMetaOption {
return func(mt *meta) {
mt.statsTaskMeta = stm
}
}
func createMeta(catalog metastore.DataCoordCatalog, opts ...testMetaOption) *meta {
mt := &meta{
catalog: catalog,
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
1000: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1000,
CollectionID: 10000,
PartitionID: 10001,
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
},
},
1001: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1001,
CollectionID: 10000,
PartitionID: 10001,
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
},
},
1002: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1002,
CollectionID: 10000,
PartitionID: 10001,
NumOfRows: 3000,
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 2,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 3: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 3,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 500,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 4: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 4,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 5: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 5,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 6: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 6,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 7: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 7,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 8: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 8,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 9: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 9,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 500,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
segID + 10: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 10,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 500,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
},
},
}
for _, opt := range opts {
opt(mt)
}
return mt
}
func createIndexMetaWithSegment(catalog metastore.DataCoordCatalog,
collID, partID, segID, indexID, fieldID, buildID UniqueID,
) *indexMeta {
tasks := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
indexTask := &model.SegmentIndex{
CollectionID: collID,
PartitionID: partID,
SegmentID: segID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Unissued,
NodeID: 0,
FailReason: "",
NumRows: 1025,
}
tasks.Insert(indexID, indexTask)
im := &indexMeta{
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {indexID: {
IndexID: indexID,
FieldID: fieldID,
IndexName: "default_index",
IndexParams: []*commonpb.KeyValuePair{
{Key: "index_type", Value: "HNSW"},
},
TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "128"},
},
}},
},
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
segmentBuildInfo: newSegmentIndexBuildInfo(),
}
im.segmentIndexes.Insert(segID, tasks)
im.segmentBuildInfo.Add(indexTask)
return im
}