mirror of https://github.com/milvus-io/milvus.git
688 lines
20 KiB
Go
688 lines
20 KiB
Go
package datacoord
|
|
|
|
import (
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/internal/metastore"
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/lock"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
var (
|
|
collID = UniqueID(100)
|
|
partID = UniqueID(200)
|
|
indexID = UniqueID(300)
|
|
fieldID = UniqueID(400)
|
|
indexName = "_default_idx"
|
|
segID = UniqueID(500)
|
|
buildID = UniqueID(600)
|
|
nodeID = UniqueID(700)
|
|
partitionKeyID = UniqueID(800)
|
|
statsTaskID = UniqueID(900)
|
|
)
|
|
|
|
func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
|
|
indexBuildInfo := newSegmentIndexBuildInfo()
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1025,
|
|
IndexID: indexID,
|
|
BuildID: buildID,
|
|
NodeID: 0,
|
|
IndexVersion: 0,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 0,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 1,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 1,
|
|
NodeID: nodeID,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_InProgress,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 2,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 2,
|
|
NodeID: nodeID,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_InProgress,
|
|
FailReason: "",
|
|
IsDeleted: true,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 3,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 500,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 3,
|
|
NodeID: 0,
|
|
IndexVersion: 0,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 4,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 4,
|
|
NodeID: nodeID,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_Finished,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 5,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 5,
|
|
NodeID: 0,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_Finished,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 6,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 6,
|
|
NodeID: 0,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_Finished,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 7,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 7,
|
|
NodeID: 0,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_Failed,
|
|
FailReason: "error",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 8,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 8,
|
|
NodeID: nodeID + 1,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_InProgress,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 9,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 500,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 9,
|
|
NodeID: 0,
|
|
IndexVersion: 0,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
|
|
indexBuildInfo.Add(&model.SegmentIndex{
|
|
SegmentID: segID + 10,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 500,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 10,
|
|
NodeID: nodeID,
|
|
IndexVersion: 0,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
|
|
segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx0.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1025,
|
|
IndexID: indexID,
|
|
BuildID: buildID,
|
|
NodeID: 0,
|
|
IndexVersion: 0,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 0,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx1 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx1.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 1,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 1,
|
|
NodeID: nodeID,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_InProgress,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx2 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx2.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 2,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 2,
|
|
NodeID: nodeID,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_InProgress,
|
|
FailReason: "",
|
|
IsDeleted: true,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx3 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx3.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 3,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 500,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 3,
|
|
NodeID: 0,
|
|
IndexVersion: 0,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx4 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx4.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 4,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 4,
|
|
NodeID: nodeID,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_Finished,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx5 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx5.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 5,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 5,
|
|
NodeID: 0,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_Finished,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx6 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx6.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 6,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 6,
|
|
NodeID: 0,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_Finished,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx7 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx7.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 7,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 7,
|
|
NodeID: 0,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_Failed,
|
|
FailReason: "error",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx8 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx8.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 8,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 1026,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 8,
|
|
NodeID: nodeID + 1,
|
|
IndexVersion: 1,
|
|
IndexState: commonpb.IndexState_InProgress,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx9 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx9.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 9,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 500,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 9,
|
|
NodeID: 0,
|
|
IndexVersion: 0,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIdx10 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
segIdx10.Insert(indexID, &model.SegmentIndex{
|
|
SegmentID: segID + 10,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
NumRows: 500,
|
|
IndexID: indexID,
|
|
BuildID: buildID + 10,
|
|
NodeID: nodeID,
|
|
IndexVersion: 0,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
FailReason: "",
|
|
IsDeleted: false,
|
|
CreatedUTCTime: 1111,
|
|
IndexFileKeys: nil,
|
|
IndexSerializedSize: 1,
|
|
})
|
|
segIndexes.Insert(segID, segIdx0)
|
|
segIndexes.Insert(segID+1, segIdx1)
|
|
segIndexes.Insert(segID+2, segIdx2)
|
|
segIndexes.Insert(segID+3, segIdx3)
|
|
segIndexes.Insert(segID+4, segIdx4)
|
|
segIndexes.Insert(segID+5, segIdx5)
|
|
segIndexes.Insert(segID+6, segIdx6)
|
|
segIndexes.Insert(segID+7, segIdx7)
|
|
segIndexes.Insert(segID+8, segIdx8)
|
|
segIndexes.Insert(segID+9, segIdx9)
|
|
segIndexes.Insert(segID+10, segIdx10)
|
|
|
|
return &indexMeta{
|
|
catalog: catalog,
|
|
keyLock: lock.NewKeyLock[UniqueID](),
|
|
indexes: map[UniqueID]map[UniqueID]*model.Index{
|
|
collID: {
|
|
indexID: {
|
|
TenantID: "",
|
|
CollectionID: collID,
|
|
FieldID: fieldID,
|
|
IndexID: indexID,
|
|
IndexName: indexName,
|
|
IsDeleted: false,
|
|
CreateTime: 1,
|
|
TypeParams: []*commonpb.KeyValuePair{
|
|
{
|
|
Key: common.DimKey,
|
|
Value: "128",
|
|
},
|
|
},
|
|
IndexParams: []*commonpb.KeyValuePair{
|
|
{
|
|
Key: common.IndexTypeKey,
|
|
Value: "HNSW",
|
|
},
|
|
{
|
|
Key: common.MetricTypeKey,
|
|
Value: "L2",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
segmentIndexes: segIndexes,
|
|
segmentBuildInfo: indexBuildInfo,
|
|
}
|
|
}
|
|
|
|
type testMetaOption func(*meta)
|
|
|
|
func withAnalyzeMeta(am *analyzeMeta) testMetaOption {
|
|
return func(mt *meta) {
|
|
mt.analyzeMeta = am
|
|
}
|
|
}
|
|
|
|
func withIndexMeta(im *indexMeta) testMetaOption {
|
|
return func(mt *meta) {
|
|
mt.indexMeta = im
|
|
}
|
|
}
|
|
|
|
func withStatsTaskMeta(stm *statsTaskMeta) testMetaOption {
|
|
return func(mt *meta) {
|
|
mt.statsTaskMeta = stm
|
|
}
|
|
}
|
|
|
|
func createMeta(catalog metastore.DataCoordCatalog, opts ...testMetaOption) *meta {
|
|
mt := &meta{
|
|
catalog: catalog,
|
|
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
|
|
segments: &SegmentsInfo{
|
|
segments: map[UniqueID]*SegmentInfo{
|
|
1000: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 1000,
|
|
CollectionID: 10000,
|
|
PartitionID: 10001,
|
|
NumOfRows: 3000,
|
|
State: commonpb.SegmentState_Flushed,
|
|
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
|
|
},
|
|
},
|
|
1001: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 1001,
|
|
CollectionID: 10000,
|
|
PartitionID: 10001,
|
|
NumOfRows: 3000,
|
|
State: commonpb.SegmentState_Flushed,
|
|
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
|
|
},
|
|
},
|
|
1002: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 1002,
|
|
CollectionID: 10000,
|
|
PartitionID: 10001,
|
|
NumOfRows: 3000,
|
|
State: commonpb.SegmentState_Flushed,
|
|
Binlogs: []*datapb.FieldBinlog{{FieldID: 10002, Binlogs: []*datapb.Binlog{{LogID: 1}, {LogID: 2}, {LogID: 3}}}},
|
|
},
|
|
},
|
|
segID: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 1025,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 1: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 1,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 1026,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 2: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 2,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 1026,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 3: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 3,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 500,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 4: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 4,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 1026,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 5: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 5,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 1026,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 6: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 6,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 1026,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 7: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 7,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 1026,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 8: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 8,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 1026,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 9: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 9,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 500,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
segID + 10: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: segID + 10,
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
InsertChannel: "",
|
|
NumOfRows: 500,
|
|
State: commonpb.SegmentState_Flushed,
|
|
MaxRowNum: 65536,
|
|
LastExpireTime: 10,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(mt)
|
|
}
|
|
return mt
|
|
}
|
|
|
|
func createIndexMetaWithSegment(catalog metastore.DataCoordCatalog,
|
|
collID, partID, segID, indexID, fieldID, buildID UniqueID,
|
|
) *indexMeta {
|
|
tasks := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
|
|
indexTask := &model.SegmentIndex{
|
|
CollectionID: collID,
|
|
PartitionID: partID,
|
|
SegmentID: segID,
|
|
IndexID: indexID,
|
|
BuildID: buildID,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
NodeID: 0,
|
|
FailReason: "",
|
|
NumRows: 1025,
|
|
}
|
|
tasks.Insert(indexID, indexTask)
|
|
im := &indexMeta{
|
|
catalog: catalog,
|
|
keyLock: lock.NewKeyLock[UniqueID](),
|
|
indexes: map[UniqueID]map[UniqueID]*model.Index{
|
|
collID: {indexID: {
|
|
IndexID: indexID,
|
|
FieldID: fieldID,
|
|
IndexName: "default_index",
|
|
IndexParams: []*commonpb.KeyValuePair{
|
|
{Key: "index_type", Value: "HNSW"},
|
|
},
|
|
TypeParams: []*commonpb.KeyValuePair{
|
|
{Key: "dim", Value: "128"},
|
|
},
|
|
}},
|
|
},
|
|
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
|
|
segmentBuildInfo: newSegmentIndexBuildInfo(),
|
|
}
|
|
im.segmentIndexes.Insert(segID, tasks)
|
|
im.segmentBuildInfo.Add(indexTask)
|
|
return im
|
|
}
|