enhance: index meta use independent rather than global meta lock (#30869)

issue: https://github.com/milvus-io/milvus/issues/30837

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/30963/head
jaime 2024-03-04 16:56:59 +08:00 committed by GitHub
parent 3dc5e38240
commit 4b0c3dd377
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 3020 additions and 2800 deletions

View File

@ -54,8 +54,8 @@ type MockBroker_DescribeCollectionInternal_Call struct {
}
// DescribeCollectionInternal is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - ctx context.Context
// - collectionID int64
func (_e *MockBroker_Expecter) DescribeCollectionInternal(ctx interface{}, collectionID interface{}) *MockBroker_DescribeCollectionInternal_Call {
return &MockBroker_DescribeCollectionInternal_Call{Call: _e.mock.On("DescribeCollectionInternal", ctx, collectionID)}
}
@ -107,8 +107,8 @@ type MockBroker_HasCollection_Call struct {
}
// HasCollection is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - ctx context.Context
// - collectionID int64
func (_e *MockBroker_Expecter) HasCollection(ctx interface{}, collectionID interface{}) *MockBroker_HasCollection_Call {
return &MockBroker_HasCollection_Call{Call: _e.mock.On("HasCollection", ctx, collectionID)}
}
@ -162,7 +162,7 @@ type MockBroker_ListDatabases_Call struct {
}
// ListDatabases is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *MockBroker_Expecter) ListDatabases(ctx interface{}) *MockBroker_ListDatabases_Call {
return &MockBroker_ListDatabases_Call{Call: _e.mock.On("ListDatabases", ctx)}
}
@ -216,8 +216,8 @@ type MockBroker_ShowCollections_Call struct {
}
// ShowCollections is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
// - ctx context.Context
// - dbName string
func (_e *MockBroker_Expecter) ShowCollections(ctx interface{}, dbName interface{}) *MockBroker_ShowCollections_Call {
return &MockBroker_ShowCollections_Call{Call: _e.mock.On("ShowCollections", ctx, dbName)}
}
@ -271,8 +271,8 @@ type MockBroker_ShowPartitionsInternal_Call struct {
}
// ShowPartitionsInternal is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - ctx context.Context
// - collectionID int64
func (_e *MockBroker_Expecter) ShowPartitionsInternal(ctx interface{}, collectionID interface{}) *MockBroker_ShowPartitionsInternal_Call {
return &MockBroker_ShowPartitionsInternal_Call{Call: _e.mock.On("ShowPartitionsInternal", ctx, collectionID)}
}

View File

@ -318,7 +318,7 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
}
collectionID := segments[0].GetCollectionID()
indexInfos := t.meta.GetIndexesForCollection(segments[0].GetCollectionID(), "")
indexInfos := t.meta.indexMeta.GetIndexesForCollection(segments[0].GetCollectionID(), "")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@ -945,7 +945,8 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
if Params.DataCoordCfg.AutoUpgradeSegmentIndex.GetAsBool() {
// index version of segment lower than current version and IndexFileKeys should have value, trigger compaction
for _, index := range segment.segmentIndexes {
indexIDToSegIdxes := t.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID)
for _, index := range indexIDToSegIdxes {
if index.CurrentIndexVersion < t.indexEngineVersionManager.GetCurrentIndexEngineVersion() &&
len(index.IndexFileKeys) > 0 {
log.Info("index version is too old, trigger compaction",

File diff suppressed because it is too large Load Diff

View File

@ -460,9 +460,9 @@ func (gc *garbageCollector) removeLogs(logs []*datapb.Binlog) bool {
func (gc *garbageCollector) recycleUnusedIndexes() {
log.Info("start recycleUnusedIndexes")
deletedIndexes := gc.meta.GetDeletedIndexes()
deletedIndexes := gc.meta.indexMeta.GetDeletedIndexes()
for _, index := range deletedIndexes {
if err := gc.meta.RemoveIndex(index.CollectionID, index.IndexID); err != nil {
if err := gc.meta.indexMeta.RemoveIndex(index.CollectionID, index.IndexID); err != nil {
log.Warn("remove index on collection fail", zap.Int64("collectionID", index.CollectionID),
zap.Int64("indexID", index.IndexID), zap.Error(err))
continue
@ -471,10 +471,10 @@ func (gc *garbageCollector) recycleUnusedIndexes() {
}
func (gc *garbageCollector) recycleUnusedSegIndexes() {
segIndexes := gc.meta.GetAllSegIndexes()
segIndexes := gc.meta.indexMeta.GetAllSegIndexes()
for _, segIdx := range segIndexes {
if gc.meta.GetSegment(segIdx.SegmentID) == nil || !gc.meta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
if err := gc.meta.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
if gc.meta.GetSegment(segIdx.SegmentID) == nil || !gc.meta.indexMeta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
if err := gc.meta.indexMeta.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("segmentID", segIdx.SegmentID), zap.Int64("nodeID", segIdx.NodeID), zap.Error(err))
continue
@ -507,7 +507,7 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
continue
}
log.Info("garbageCollector will recycle index files", zap.Int64("buildID", buildID))
canRecycle, segIdx := gc.meta.CleanSegmentIndex(buildID)
canRecycle, segIdx := gc.meta.indexMeta.CleanSegmentIndex(buildID)
if !canRecycle {
// Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc,
// and delete all index files about the buildID at one time.

View File

@ -29,7 +29,7 @@ import (
"time"
"github.com/cockroachdb/errors"
minio "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -349,52 +349,55 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta
segments: nil,
channelCPs: nil,
chunkManager: nil,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 10,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
indexMeta: &indexMeta{
catalog: catalog,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 10,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
indexID + 1: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: "_default_idx_101",
IsDeleted: true,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
indexID + 1: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: "_default_idx_101",
IsDeleted: true,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
collID + 1: {
indexID + 10: {
TenantID: "",
CollectionID: collID + 1,
FieldID: fieldID + 10,
IndexID: indexID + 10,
IndexName: "index",
IsDeleted: true,
CreateTime: 10,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
collID + 1: {
indexID + 10: {
TenantID: "",
CollectionID: collID + 1,
FieldID: fieldID + 10,
IndexID: indexID + 10,
IndexName: "index",
IsDeleted: true,
CreateTime: 10,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
buildID2SegmentIndex: nil,
},
buildID2SegmentIndex: nil,
}
}
@ -447,89 +450,94 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
},
},
segID + 1: {
SegmentInfo: nil,
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
},
},
},
indexMeta: &indexMeta{
catalog: catalog,
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
},
segID + 1: {
indexID: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
},
},
indexes: map[UniqueID]map[UniqueID]*model.Index{},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
buildID + 1: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
},
},
channelCPs: nil,
chunkManager: nil,
indexes: map[UniqueID]map[UniqueID]*model.Index{},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
buildID + 1: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
},
}
}
@ -587,25 +595,6 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
@ -616,79 +605,103 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
},
},
indexMeta: &indexMeta{
catalog: catalog,
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
},
segID + 1: {
indexID: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
},
},
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 10,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 10,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
buildID + 1: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 0,
WriteHandoff: false,
},
buildID + 1: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 1,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreateTime: 10,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
},
}
@ -851,25 +864,6 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 5000,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 1024,
WriteHandoff: false,
},
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
@ -885,25 +879,6 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
Timestamp: 900,
},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 5000,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: []string{"file3", "file4"},
IndexSize: 1024,
WriteHandoff: false,
},
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
@ -920,7 +895,6 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
CompactionFrom: []int64{segID, segID + 1},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{},
},
segID + 3: {
SegmentInfo: &datapb.SegmentInfo{
@ -937,7 +911,6 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
CompactionFrom: nil,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{},
},
segID + 4: {
SegmentInfo: &datapb.SegmentInfo{
@ -954,7 +927,6 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
CompactionFrom: []int64{segID + 2, segID + 3},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{},
},
segID + 5: {
SegmentInfo: &datapb.SegmentInfo{
@ -1009,59 +981,105 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
},
},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 5000,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 1024,
WriteHandoff: false,
indexMeta: &indexMeta{
catalog: catalog,
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 5000,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 1024,
WriteHandoff: false,
},
},
segID + 1: {
indexID: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 5000,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: []string{"file3", "file4"},
IndexSize: 1024,
WriteHandoff: false,
},
},
},
buildID + 1: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 5000,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: []string{"file3", "file4"},
IndexSize: 1024,
WriteHandoff: false,
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 5000,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: []string{"file1", "file2"},
IndexSize: 1024,
WriteHandoff: false,
},
buildID + 1: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 5000,
IndexID: indexID,
BuildID: buildID + 1,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: []string{"file3", "file4"},
IndexSize: 1024,
WriteHandoff: false,
},
},
},
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
},
collections: map[UniqueID]*collectionInfo{
collID: {
ID: collID,
@ -1135,7 +1153,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
assert.Nil(t, segG)
segH := gc.meta.GetSegment(segID + 7)
assert.NotNil(t, segH)
err := gc.meta.AddSegmentIndex(&model.SegmentIndex{
err := gc.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: segID + 4,
CollectionID: collID,
PartitionID: partID,
@ -1145,7 +1163,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
})
assert.NoError(t, err)
err = gc.meta.FinishTask(&indexpb.IndexTaskInfo{
err = gc.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID + 4,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2", "file3", "file4"},

View File

@ -128,7 +128,7 @@ func (ib *indexBuilder) Stop() {
func (ib *indexBuilder) reloadFromKV() {
segments := ib.meta.GetAllSegmentsUnsafe()
for _, segment := range segments {
for _, segIndex := range segment.segmentIndexes {
for _, segIndex := range ib.meta.indexMeta.getSegmentIndexes(segment.ID) {
if segIndex.IsDeleted {
continue
}
@ -235,7 +235,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
delete(ib.tasks, buildID)
}
meta, exist := ib.meta.GetIndexJob(buildID)
meta, exist := ib.meta.indexMeta.GetIndexJob(buildID)
if !exist {
log.Ctx(ib.ctx).Debug("index task has not exist in meta table, remove task", zap.Int64("buildID", buildID))
deleteFunc(buildID)
@ -245,21 +245,21 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
switch state {
case indexTaskInit:
segment := ib.meta.GetSegment(meta.SegmentID)
if !isSegmentHealthy(segment) || !ib.meta.IsIndexExist(meta.CollectionID, meta.IndexID) {
if !isSegmentHealthy(segment) || !ib.meta.indexMeta.IsIndexExist(meta.CollectionID, meta.IndexID) {
log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID))
if err := ib.meta.DeleteTask(buildID); err != nil {
if err := ib.meta.indexMeta.DeleteTask(buildID); err != nil {
log.Ctx(ib.ctx).Warn("IndexCoord delete index failed", zap.Int64("buildID", buildID), zap.Error(err))
return false
}
deleteFunc(buildID)
return true
}
indexParams := ib.meta.GetIndexParams(meta.CollectionID, meta.IndexID)
indexParams := ib.meta.indexMeta.GetIndexParams(meta.CollectionID, meta.IndexID)
indexType := getIndexType(indexParams)
if isFlatIndex(indexType) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID),
zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{
if err := ib.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: nil,
@ -280,7 +280,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
return false
}
// update version and set nodeID
if err := ib.meta.UpdateVersion(buildID, nodeID); err != nil {
if err := ib.meta.indexMeta.UpdateVersion(buildID, nodeID); err != nil {
log.Ctx(ib.ctx).Warn("index builder update index version failed", zap.Int64("build", buildID), zap.Error(err))
return false
}
@ -305,7 +305,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
}
}
typeParams := ib.meta.GetTypeParams(meta.CollectionID, meta.IndexID)
typeParams := ib.meta.indexMeta.GetTypeParams(meta.CollectionID, meta.IndexID)
var storageConfig *indexpb.StorageConfig
if Params.CommonCfg.StorageType.GetValue() == "local" {
@ -331,7 +331,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
}
}
fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
fieldID := ib.meta.indexMeta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
binlogIDs := getBinLogIds(segment, fieldID)
if isDiskANNIndex(getIndexType(indexParams)) {
var err error
@ -428,7 +428,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
log.Ctx(ib.ctx).Info("index task assigned successfully", zap.Int64("buildID", buildID),
zap.Int64("segmentID", meta.SegmentID), zap.Int64("nodeID", nodeID))
// update index meta state to InProgress
if err := ib.meta.BuildIndex(buildID); err != nil {
if err := ib.meta.indexMeta.BuildIndex(buildID); err != nil {
// need to release lock then reassign, so set task state to retry
log.Ctx(ib.ctx).Warn("index builder update index meta to InProgress failed", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID), zap.Error(err))
@ -481,7 +481,7 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState {
if info.GetState() == commonpb.IndexState_Failed || info.GetState() == commonpb.IndexState_Finished {
log.Ctx(ib.ctx).Info("this task has been finished", zap.Int64("buildID", info.GetBuildID()),
zap.String("index state", info.GetState().String()))
if err := ib.meta.FinishTask(info); err != nil {
if err := ib.meta.indexMeta.FinishTask(info); err != nil {
log.Ctx(ib.ctx).Warn("IndexCoord update index state fail", zap.Int64("buildID", info.GetBuildID()),
zap.String("index state", info.GetState().String()), zap.Error(err))
return indexTaskInProgress
@ -552,7 +552,7 @@ func (ib *indexBuilder) assignTask(builderClient types.IndexNodeClient, req *ind
func (ib *indexBuilder) nodeDown(nodeID UniqueID) {
defer ib.notify()
metas := ib.meta.GetMetasByNodeID(nodeID)
metas := ib.meta.indexMeta.GetMetasByNodeID(nodeID)
ib.taskMutex.Lock()
defer ib.taskMutex.Unlock()

File diff suppressed because it is too large Load Diff

View File

@ -21,33 +21,98 @@ import (
"context"
"fmt"
"strconv"
"sync"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
func (m *meta) updateCollectionIndex(index *model.Index) {
type indexMeta struct {
sync.RWMutex
ctx context.Context
catalog metastore.DataCoordCatalog
// collectionIndexes records which indexes are on the collection
// collID -> indexID -> index
indexes map[UniqueID]map[UniqueID]*model.Index
// buildID2Meta records the meta information of the segment
// buildID -> segmentIndex
buildID2SegmentIndex map[UniqueID]*model.SegmentIndex
// segmentID -> indexID -> segmentIndex
segmentIndexes map[UniqueID]map[UniqueID]*model.SegmentIndex
}
// NewMeta creates meta from provided `kv.TxnKV`
func newIndexMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*indexMeta, error) {
mt := &indexMeta{
ctx: ctx,
catalog: catalog,
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
segmentIndexes: make(map[UniqueID]map[UniqueID]*model.SegmentIndex),
}
err := mt.reloadFromKV()
if err != nil {
return nil, err
}
return mt, nil
}
// reloadFromKV loads meta from KV storage
func (m *indexMeta) reloadFromKV() error {
record := timerecord.NewTimeRecorder("indexMeta-reloadFromKV")
// load field indexes
fieldIndexes, err := m.catalog.ListIndexes(m.ctx)
if err != nil {
log.Error("indexMeta reloadFromKV load field indexes fail", zap.Error(err))
return err
}
for _, fieldIndex := range fieldIndexes {
m.updateCollectionIndex(fieldIndex)
}
segmentIndexes, err := m.catalog.ListSegmentIndexes(m.ctx)
if err != nil {
log.Error("indexMeta reloadFromKV load segment indexes fail", zap.Error(err))
return err
}
for _, segIdx := range segmentIndexes {
m.updateSegmentIndex(segIdx)
metrics.FlushedSegmentFileNum.WithLabelValues(metrics.IndexFileLabel).Observe(float64(len(segIdx.IndexFileKeys)))
}
log.Info("indexMeta reloadFromKV done", zap.Duration("duration", record.ElapseSpan()))
return nil
}
func (m *indexMeta) updateCollectionIndex(index *model.Index) {
if _, ok := m.indexes[index.CollectionID]; !ok {
m.indexes[index.CollectionID] = make(map[UniqueID]*model.Index)
}
m.indexes[index.CollectionID][index.IndexID] = index
}
func (m *meta) updateSegmentIndex(segIdx *model.SegmentIndex) {
m.segments.SetSegmentIndex(segIdx.SegmentID, segIdx)
func (m *indexMeta) updateSegmentIndex(segIdx *model.SegmentIndex) {
indexes, ok := m.segmentIndexes[segIdx.SegmentID]
if ok {
indexes[segIdx.IndexID] = segIdx
} else {
m.segmentIndexes[segIdx.SegmentID] = make(map[UniqueID]*model.SegmentIndex)
m.segmentIndexes[segIdx.SegmentID][segIdx.IndexID] = segIdx
}
m.buildID2SegmentIndex[segIdx.BuildID] = segIdx
}
func (m *meta) alterSegmentIndexes(segIdxes []*model.SegmentIndex) error {
func (m *indexMeta) alterSegmentIndexes(segIdxes []*model.SegmentIndex) error {
err := m.catalog.AlterSegmentIndexes(m.ctx, segIdxes)
if err != nil {
log.Error("failed to alter segments index in meta store", zap.Int("segment indexes num", len(segIdxes)),
@ -60,15 +125,15 @@ func (m *meta) alterSegmentIndexes(segIdxes []*model.SegmentIndex) error {
return nil
}
func (m *meta) updateIndexMeta(index *model.Index, updateFunc func(clonedIndex *model.Index) error) error {
func (m *indexMeta) updateIndexMeta(index *model.Index, updateFunc func(clonedIndex *model.Index) error) error {
return updateFunc(model.CloneIndex(index))
}
func (m *meta) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc func(clonedSegIdx *model.SegmentIndex) error) error {
func (m *indexMeta) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc func(clonedSegIdx *model.SegmentIndex) error) error {
return updateFunc(model.CloneSegmentIndex(segIdx))
}
func (m *meta) updateIndexTasksMetrics() {
func (m *indexMeta) updateIndexTasksMetrics() {
taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int)
for _, segIdx := range m.buildID2SegmentIndex {
if segIdx.IsDeleted {
@ -138,7 +203,7 @@ func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool
return !notEq
}
func (m *meta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) {
func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) {
m.RLock()
defer m.RUnlock()
@ -171,7 +236,7 @@ func (m *meta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error)
}
// HasSameReq determine whether there are same indexing tasks.
func (m *meta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID) {
func (m *indexMeta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID) {
m.RLock()
defer m.RUnlock()
@ -194,7 +259,7 @@ func (m *meta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID) {
return false, 0
}
func (m *meta) CreateIndex(index *model.Index) error {
func (m *indexMeta) CreateIndex(index *model.Index) error {
log.Info("meta update: CreateIndex", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID), zap.String("indexName", index.IndexName))
m.Lock()
@ -213,7 +278,7 @@ func (m *meta) CreateIndex(index *model.Index) error {
return nil
}
func (m *meta) AlterIndex(ctx context.Context, indexes ...*model.Index) error {
func (m *indexMeta) AlterIndex(ctx context.Context, indexes ...*model.Index) error {
m.Lock()
defer m.Unlock()
@ -230,7 +295,7 @@ func (m *meta) AlterIndex(ctx context.Context, indexes ...*model.Index) error {
}
// AddSegmentIndex adds the index meta corresponding the indexBuildID to meta table.
func (m *meta) AddSegmentIndex(segIndex *model.SegmentIndex) error {
func (m *indexMeta) AddSegmentIndex(segIndex *model.SegmentIndex) error {
m.Lock()
defer m.Unlock()
@ -254,7 +319,7 @@ func (m *meta) AddSegmentIndex(segIndex *model.SegmentIndex) error {
return nil
}
func (m *meta) GetIndexIDByName(collID int64, indexName string) map[int64]uint64 {
func (m *indexMeta) GetIndexIDByName(collID int64, indexName string) map[int64]uint64 {
m.RLock()
defer m.RUnlock()
indexID2CreateTs := make(map[int64]uint64)
@ -272,7 +337,7 @@ func (m *meta) GetIndexIDByName(collID int64, indexName string) map[int64]uint64
return indexID2CreateTs
}
func (m *meta) GetSegmentIndexState(collID, segmentID UniqueID, indexID UniqueID) *indexpb.SegmentIndexState {
func (m *indexMeta) GetSegmentIndexState(collID, segmentID UniqueID, indexID UniqueID) *indexpb.SegmentIndexState {
m.RLock()
defer m.RUnlock()
@ -286,14 +351,16 @@ func (m *meta) GetSegmentIndexState(collID, segmentID UniqueID, indexID UniqueID
state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID)
return state
}
segment := m.segments.GetSegment(segmentID)
if segment == nil {
state.FailReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID)
indexes, ok := m.segmentIndexes[segmentID]
if !ok {
state.State = commonpb.IndexState_Unissued
state.FailReason = fmt.Sprintf("segment index not exist with ID: %d", segmentID)
return state
}
if index, ok := fieldIndexes[indexID]; ok && !index.IsDeleted {
if segIdx, ok := segment.segmentIndexes[indexID]; ok {
if segIdx, ok := indexes[indexID]; ok {
state.IndexName = index.IndexName
state.State = segIdx.IndexState
state.FailReason = segIdx.FailReason
@ -307,7 +374,7 @@ func (m *meta) GetSegmentIndexState(collID, segmentID UniqueID, indexID UniqueID
return state
}
func (m *meta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) *indexpb.SegmentIndexState {
func (m *indexMeta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) *indexpb.SegmentIndexState {
m.RLock()
defer m.RUnlock()
@ -321,14 +388,17 @@ func (m *meta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID)
state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID)
return state
}
segment := m.segments.GetSegment(segmentID)
if segment == nil {
state.FailReason = fmt.Sprintf("segment is not exist with ID: %d", segmentID)
indexes, ok := m.segmentIndexes[segmentID]
if !ok {
state.FailReason = fmt.Sprintf("segment index not exist with ID: %d", segmentID)
state.State = commonpb.IndexState_Unissued
return state
}
for indexID, index := range fieldIndexes {
if index.FieldID == fieldID && !index.IsDeleted {
if segIdx, ok := segment.segmentIndexes[indexID]; ok {
if segIdx, ok := indexes[indexID]; ok {
state.IndexName = index.IndexName
state.State = segIdx.IndexState
state.FailReason = segIdx.FailReason
@ -343,7 +413,7 @@ func (m *meta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID)
}
// GetIndexesForCollection gets all indexes info with the specified collection.
func (m *meta) GetIndexesForCollection(collID UniqueID, indexName string) []*model.Index {
func (m *indexMeta) GetIndexesForCollection(collID UniqueID, indexName string) []*model.Index {
m.RLock()
defer m.RUnlock()
@ -359,7 +429,7 @@ func (m *meta) GetIndexesForCollection(collID UniqueID, indexName string) []*mod
return indexInfos
}
func (m *meta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
func (m *indexMeta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*model.Index {
m.RLock()
defer m.RUnlock()
@ -376,7 +446,7 @@ func (m *meta) GetFieldIndexes(collID, fieldID UniqueID, indexName string) []*mo
}
// MarkIndexAsDeleted will mark the corresponding index as deleted, and recycleUnusedIndexFiles will recycle these tasks.
func (m *meta) MarkIndexAsDeleted(collID UniqueID, indexIDs []UniqueID) error {
func (m *indexMeta) MarkIndexAsDeleted(collID UniqueID, indexIDs []UniqueID) error {
log.Info("IndexCoord metaTable MarkIndexAsDeleted", zap.Int64("collectionID", collID),
zap.Int64s("indexIDs", indexIDs))
@ -413,29 +483,73 @@ func (m *meta) MarkIndexAsDeleted(collID UniqueID, indexIDs []UniqueID) error {
return nil
}
func (m *meta) GetSegmentIndexes(segID UniqueID) []*model.SegmentIndex {
func (m *indexMeta) IsUnIndexedSegment(collectionID UniqueID, segID UniqueID) bool {
m.RLock()
defer m.RUnlock()
segIndexInfos := make([]*model.SegmentIndex, 0)
segment := m.segments.GetSegment(segID)
if segment == nil {
return segIndexInfos
}
fieldIndex, ok := m.indexes[segment.CollectionID]
fieldIndexes, ok := m.indexes[collectionID]
if !ok {
return segIndexInfos
return false
}
for _, segIdx := range segment.segmentIndexes {
if index, ok := fieldIndex[segIdx.IndexID]; ok && !index.IsDeleted {
segIndexInfos = append(segIndexInfos, model.CloneSegmentIndex(segIdx))
// the segment should be unindexed status if the fieldIndexes is not nil
segIndexInfos, ok := m.segmentIndexes[segID]
if !ok || len(segIndexInfos) == 0 {
return true
}
for _, index := range fieldIndexes {
if _, ok := segIndexInfos[index.IndexID]; !index.IsDeleted {
if !ok {
// the segment should be unindexed status if the segment index is not found within field indexes
return true
}
}
}
return segIndexInfos
return false
}
func (m *meta) GetFieldIDByIndexID(collID, indexID UniqueID) UniqueID {
func (m *indexMeta) getSegmentIndexes(segID UniqueID) map[UniqueID]*model.SegmentIndex {
m.RLock()
defer m.RUnlock()
ret := make(map[UniqueID]*model.SegmentIndex, 0)
segIndexInfos, ok := m.segmentIndexes[segID]
if !ok || len(segIndexInfos) == 0 {
return ret
}
for _, segIdx := range segIndexInfos {
ret[segIdx.IndexID] = model.CloneSegmentIndex(segIdx)
}
return ret
}
func (m *indexMeta) GetSegmentIndexes(collectionID UniqueID, segID UniqueID) map[UniqueID]*model.SegmentIndex {
m.RLock()
defer m.RUnlock()
ret := make(map[UniqueID]*model.SegmentIndex, 0)
segIndexInfos, ok := m.segmentIndexes[segID]
if !ok || len(segIndexInfos) == 0 {
return ret
}
fieldIndexes, ok := m.indexes[collectionID]
if !ok {
return ret
}
for _, segIdx := range segIndexInfos {
if index, ok := fieldIndexes[segIdx.IndexID]; ok && !index.IsDeleted {
ret[segIdx.IndexID] = model.CloneSegmentIndex(segIdx)
}
}
return ret
}
func (m *indexMeta) GetFieldIDByIndexID(collID, indexID UniqueID) UniqueID {
m.RLock()
defer m.RUnlock()
@ -447,7 +561,7 @@ func (m *meta) GetFieldIDByIndexID(collID, indexID UniqueID) UniqueID {
return 0
}
func (m *meta) GetIndexNameByID(collID, indexID UniqueID) string {
func (m *indexMeta) GetIndexNameByID(collID, indexID UniqueID) string {
m.RLock()
defer m.RUnlock()
if fieldIndexes, ok := m.indexes[collID]; ok {
@ -458,7 +572,7 @@ func (m *meta) GetIndexNameByID(collID, indexID UniqueID) string {
return ""
}
func (m *meta) GetIndexParams(collID, indexID UniqueID) []*commonpb.KeyValuePair {
func (m *indexMeta) GetIndexParams(collID, indexID UniqueID) []*commonpb.KeyValuePair {
m.RLock()
defer m.RUnlock()
@ -479,7 +593,7 @@ func (m *meta) GetIndexParams(collID, indexID UniqueID) []*commonpb.KeyValuePair
return indexParams
}
func (m *meta) GetTypeParams(collID, indexID UniqueID) []*commonpb.KeyValuePair {
func (m *indexMeta) GetTypeParams(collID, indexID UniqueID) []*commonpb.KeyValuePair {
m.RLock()
defer m.RUnlock()
@ -500,7 +614,7 @@ func (m *meta) GetTypeParams(collID, indexID UniqueID) []*commonpb.KeyValuePair
return typeParams
}
func (m *meta) GetIndexJob(buildID UniqueID) (*model.SegmentIndex, bool) {
func (m *indexMeta) GetIndexJob(buildID UniqueID) (*model.SegmentIndex, bool) {
m.RLock()
defer m.RUnlock()
@ -512,7 +626,7 @@ func (m *meta) GetIndexJob(buildID UniqueID) (*model.SegmentIndex, bool) {
return nil, false
}
func (m *meta) IsIndexExist(collID, indexID UniqueID) bool {
func (m *indexMeta) IsIndexExist(collID, indexID UniqueID) bool {
m.RLock()
defer m.RUnlock()
@ -528,7 +642,7 @@ func (m *meta) IsIndexExist(collID, indexID UniqueID) bool {
}
// UpdateVersion updates the version and nodeID of the index meta, whenever the task is built once, the version will be updated once.
func (m *meta) UpdateVersion(buildID UniqueID, nodeID UniqueID) error {
func (m *indexMeta) UpdateVersion(buildID UniqueID, nodeID UniqueID) error {
m.Lock()
defer m.Unlock()
@ -547,7 +661,7 @@ func (m *meta) UpdateVersion(buildID UniqueID, nodeID UniqueID) error {
return m.updateSegIndexMeta(segIdx, updateFunc)
}
func (m *meta) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
func (m *indexMeta) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
m.Lock()
defer m.Unlock()
@ -578,7 +692,7 @@ func (m *meta) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
return nil
}
func (m *meta) DeleteTask(buildID int64) error {
func (m *indexMeta) DeleteTask(buildID int64) error {
m.Lock()
defer m.Unlock()
@ -603,7 +717,7 @@ func (m *meta) DeleteTask(buildID int64) error {
}
// BuildIndex set the index state to be InProgress. It means IndexNode is building the index.
func (m *meta) BuildIndex(buildID UniqueID) error {
func (m *indexMeta) BuildIndex(buildID UniqueID) error {
m.Lock()
defer m.Unlock()
@ -632,7 +746,7 @@ func (m *meta) BuildIndex(buildID UniqueID) error {
return nil
}
func (m *meta) GetAllSegIndexes() map[int64]*model.SegmentIndex {
func (m *indexMeta) GetAllSegIndexes() map[int64]*model.SegmentIndex {
m.RLock()
defer m.RUnlock()
@ -643,7 +757,7 @@ func (m *meta) GetAllSegIndexes() map[int64]*model.SegmentIndex {
return segIndexes
}
func (m *meta) RemoveSegmentIndex(collID, partID, segID, indexID, buildID UniqueID) error {
func (m *indexMeta) RemoveSegmentIndex(collID, partID, segID, indexID, buildID UniqueID) error {
m.Lock()
defer m.Unlock()
@ -652,13 +766,20 @@ func (m *meta) RemoveSegmentIndex(collID, partID, segID, indexID, buildID Unique
return err
}
m.segments.DropSegmentIndex(segID, indexID)
if _, ok := m.segmentIndexes[segID]; ok {
delete(m.segmentIndexes[segID], indexID)
}
if len(m.segmentIndexes[segID]) == 0 {
delete(m.segmentIndexes, segID)
}
delete(m.buildID2SegmentIndex, buildID)
m.updateIndexTasksMetrics()
return nil
}
func (m *meta) GetDeletedIndexes() []*model.Index {
func (m *indexMeta) GetDeletedIndexes() []*model.Index {
m.RLock()
defer m.RUnlock()
@ -673,7 +794,7 @@ func (m *meta) GetDeletedIndexes() []*model.Index {
return deletedIndexes
}
func (m *meta) RemoveIndex(collID, indexID UniqueID) error {
func (m *indexMeta) RemoveIndex(collID, indexID UniqueID) error {
m.Lock()
defer m.Unlock()
log.Info("IndexCoord meta table remove index", zap.Int64("collectionID", collID), zap.Int64("indexID", indexID))
@ -696,7 +817,7 @@ func (m *meta) RemoveIndex(collID, indexID UniqueID) error {
return nil
}
func (m *meta) CleanSegmentIndex(buildID UniqueID) (bool, *model.SegmentIndex) {
func (m *indexMeta) CleanSegmentIndex(buildID UniqueID) (bool, *model.SegmentIndex) {
m.RLock()
defer m.RUnlock()
@ -709,28 +830,7 @@ func (m *meta) CleanSegmentIndex(buildID UniqueID) (bool, *model.SegmentIndex) {
return true, nil
}
func (m *meta) GetHasUnindexTaskSegments() []*SegmentInfo {
m.RLock()
defer m.RUnlock()
segments := m.segments.GetSegments()
unindexedSegments := make(map[int64]*SegmentInfo)
for _, segment := range segments {
if !isFlush(segment) {
continue
}
if fieldIndexes, ok := m.indexes[segment.CollectionID]; ok {
for _, index := range fieldIndexes {
if _, ok := segment.segmentIndexes[index.IndexID]; !index.IsDeleted && !ok {
unindexedSegments[segment.GetID()] = segment
}
}
}
}
return lo.MapToSlice(unindexedSegments, func(_ int64, segment *SegmentInfo) *SegmentInfo { return segment })
}
func (m *meta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex {
func (m *indexMeta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex {
m.RLock()
defer m.RUnlock()

File diff suppressed because it is too large Load Diff

View File

@ -70,7 +70,7 @@ func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) e
CreateTime: uint64(segment.ID),
WriteHandoff: false,
}
if err = s.meta.AddSegmentIndex(segIndex); err != nil {
if err = s.meta.indexMeta.AddSegmentIndex(segIndex); err != nil {
return err
}
s.indexBuilder.enqueue(buildID)
@ -78,9 +78,10 @@ func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) e
}
func (s *Server) createIndexesForSegment(segment *SegmentInfo) error {
indexes := s.meta.GetIndexesForCollection(segment.CollectionID, "")
indexes := s.meta.indexMeta.GetIndexesForCollection(segment.CollectionID, "")
indexIDToSegIndexes := s.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID)
for _, index := range indexes {
if _, ok := segment.segmentIndexes[index.IndexID]; !ok {
if _, ok := indexIDToSegIndexes[index.IndexID]; !ok {
if err := s.createIndexForSegment(segment, index.IndexID); err != nil {
log.Warn("create index for segment fail", zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", index.IndexID))
@ -91,6 +92,20 @@ func (s *Server) createIndexesForSegment(segment *SegmentInfo) error {
return nil
}
func (s *Server) getUnIndexTaskSegments() []*SegmentInfo {
flushedSegments := s.meta.SelectSegments(func(seg *SegmentInfo) bool {
return isFlush(seg)
})
unindexedSegments := make([]*SegmentInfo, 0)
for _, segment := range flushedSegments {
if s.meta.indexMeta.IsUnIndexedSegment(segment.CollectionID, segment.GetID()) {
unindexedSegments = append(unindexedSegments, segment)
}
}
return unindexedSegments
}
func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
log.Info("start create index for segment loop...")
defer s.serverLoopWg.Done()
@ -103,7 +118,7 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
log.Warn("DataCoord context done, exit...")
return
case <-ticker.C:
segments := s.meta.GetHasUnindexTaskSegments()
segments := s.getUnIndexTaskSegments()
for _, segment := range segments {
if err := s.createIndexesForSegment(segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
@ -171,7 +186,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
metrics.IndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc()
if req.GetIndexName() == "" {
indexes := s.meta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName())
indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName())
if len(indexes) == 0 {
fieldName, err := s.getFieldNameByID(ctx, req.GetCollectionID(), req.GetFieldID())
if err != nil {
@ -184,7 +199,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
}
}
indexID, err := s.meta.CanCreateIndex(req)
indexID, err := s.meta.indexMeta.CanCreateIndex(req)
if err != nil {
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
@ -219,8 +234,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
}
// Get flushed segments and create index
err = s.meta.CreateIndex(index)
err = s.meta.indexMeta.CreateIndex(index)
if err != nil {
log.Error("CreateIndex fail",
zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err))
@ -290,11 +304,12 @@ func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest)
return merr.Status(err), nil
}
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
indexes := s.meta.indexMeta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if req.GetIndexName() != "" && len(indexes) == 0 {
err := merr.WrapErrIndexNotFound(req.GetIndexName())
return merr.Status(err), nil
}
for _, index := range indexes {
// update user index params
newUserIndexParams, err := UpdateParams(index, index.UserIndexParams, req.GetParams())
@ -319,7 +334,7 @@ func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest)
index.IndexParams = newIndexParams
}
err := s.meta.AlterIndex(ctx, indexes...)
err := s.meta.indexMeta.AlterIndex(ctx, indexes...)
if err != nil {
log.Warn("failed to alter index", zap.Error(err))
return merr.Status(err), nil
@ -344,7 +359,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
}, nil
}
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
indexes := s.meta.indexMeta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("GetIndexState fail", zap.Error(err))
@ -366,7 +381,7 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
indexInfo := &indexpb.IndexInfo{}
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
@ -400,7 +415,7 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
Status: merr.Success(),
States: make([]*indexpb.SegmentIndexState, 0),
}
indexID2CreateTs := s.meta.GetIndexIDByName(req.GetCollectionID(), req.GetIndexName())
indexID2CreateTs := s.meta.indexMeta.GetIndexIDByName(req.GetCollectionID(), req.GetIndexName())
if len(indexID2CreateTs) == 0 {
err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("GetSegmentIndexState fail", zap.String("indexName", req.GetIndexName()), zap.Error(err))
@ -410,7 +425,7 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
}
for _, segID := range req.GetSegmentIDs() {
for indexID := range indexID2CreateTs {
state := s.meta.GetSegmentIndexState(req.GetCollectionID(), segID, indexID)
state := s.meta.indexMeta.GetSegmentIndexState(req.GetCollectionID(), segID, indexID)
ret.States = append(ret.States, state)
}
}
@ -418,6 +433,32 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
return ret, nil
}
func (s *Server) selectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats {
ret := make(map[int64]*indexStats)
for _, info := range s.meta.SelectSegments(selector) {
is := &indexStats{
ID: info.GetID(),
numRows: info.GetNumOfRows(),
compactionFrom: info.GetCompactionFrom(),
indexStates: make(map[int64]*indexpb.SegmentIndexState),
state: info.GetState(),
lastExpireTime: info.GetLastExpireTime(),
}
indexIDToSegIdxes := s.meta.indexMeta.GetSegmentIndexes(info.GetCollectionID(), info.GetID())
for indexID, segIndex := range indexIDToSegIdxes {
is.indexStates[indexID] = &indexpb.SegmentIndexState{
SegmentID: segIndex.SegmentID,
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
}
}
ret[info.GetID()] = is
}
return ret
}
func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments map[int64]*indexStats) int64 {
unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]()
for segID, seg := range segments {
@ -566,7 +607,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
}, nil
}
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
indexes := s.meta.indexMeta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("GetIndexBuildProgress fail", zap.String("indexName", req.IndexName), zap.Error(err))
@ -592,7 +633,7 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
}
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
@ -635,7 +676,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
}, nil
}
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
indexes := s.meta.indexMeta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("DescribeIndex fail", zap.Error(err))
@ -645,7 +686,7 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
}
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
@ -692,7 +733,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
}, nil
}
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
indexes := s.meta.indexMeta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("GetIndexStatistics fail",
@ -704,7 +745,7 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
}
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
segments := s.selectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
@ -751,7 +792,7 @@ func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (
return merr.Status(err), nil
}
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
indexes := s.meta.indexMeta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 {
log.Info(fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName))
return merr.Success(), nil
@ -770,7 +811,7 @@ func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (
// from being dropped at the same time when dropping_partition in version 2.1
if len(req.GetPartitionIDs()) == 0 {
// drop collection index
err := s.meta.MarkIndexAsDeleted(req.GetCollectionID(), indexIDs)
err := s.meta.indexMeta.MarkIndexAsDeleted(req.GetCollectionID(), indexIDs)
if err != nil {
log.Warn("DropIndex fail", zap.String("indexName", req.IndexName), zap.Error(err))
return merr.Status(err), nil
@ -800,7 +841,7 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
}
for _, segID := range req.GetSegmentIDs() {
segIdxes := s.meta.GetSegmentIndexes(segID)
segIdxes := s.meta.indexMeta.GetSegmentIndexes(req.GetCollectionID(), segID)
ret.SegmentInfo[segID] = &indexpb.SegmentInfo{
CollectionID: req.GetCollectionID(),
SegmentID: segID,
@ -813,15 +854,15 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
if segIdx.IndexState == commonpb.IndexState_Finished {
indexFilePaths := metautil.BuildSegmentIndexFilePaths(s.meta.chunkManager.RootPath(), segIdx.BuildID, segIdx.IndexVersion,
segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexFileKeys)
indexParams := s.meta.GetIndexParams(segIdx.CollectionID, segIdx.IndexID)
indexParams = append(indexParams, s.meta.GetTypeParams(segIdx.CollectionID, segIdx.IndexID)...)
indexParams := s.meta.indexMeta.GetIndexParams(segIdx.CollectionID, segIdx.IndexID)
indexParams = append(indexParams, s.meta.indexMeta.GetTypeParams(segIdx.CollectionID, segIdx.IndexID)...)
ret.SegmentInfo[segID].IndexInfos = append(ret.SegmentInfo[segID].IndexInfos,
&indexpb.IndexFilePathInfo{
SegmentID: segID,
FieldID: s.meta.GetFieldIDByIndexID(segIdx.CollectionID, segIdx.IndexID),
FieldID: s.meta.indexMeta.GetFieldIDByIndexID(segIdx.CollectionID, segIdx.IndexID),
IndexID: segIdx.IndexID,
BuildID: segIdx.BuildID,
IndexName: s.meta.GetIndexNameByID(segIdx.CollectionID, segIdx.IndexID),
IndexName: s.meta.indexMeta.GetIndexNameByID(segIdx.CollectionID, segIdx.IndexID),
IndexParams: indexParams,
IndexFilePaths: indexFilePaths,
SerializedSize: segIdx.IndexSize,

File diff suppressed because it is too large Load Diff

View File

@ -34,9 +34,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/pkg/common"
@ -61,12 +59,7 @@ type meta struct {
channelCPs *typeutil.ConcurrentMap[string, *msgpb.MsgPosition] // vChannel -> channel checkpoint/see position
chunkManager storage.ChunkManager
// collectionIndexes records which indexes are on the collection
// collID -> indexID -> index
indexes map[UniqueID]map[UniqueID]*model.Index
// buildID2Meta records the meta information of the segment
// buildID -> segmentIndex
buildID2SegmentIndex map[UniqueID]*model.SegmentIndex
indexMeta *indexMeta
}
// A local cache of segment metric update. Must call commit() to take effect.
@ -87,18 +80,22 @@ type collectionInfo struct {
// NewMeta creates meta from provided `kv.TxnKV`
func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManager storage.ChunkManager) (*meta, error) {
mt := &meta{
ctx: ctx,
catalog: catalog,
collections: make(map[UniqueID]*collectionInfo),
segments: NewSegmentsInfo(),
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
chunkManager: chunkManager,
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
indexMeta, err := newIndexMeta(ctx, catalog)
if err != nil {
return nil, err
}
err := mt.reloadFromKV()
mt := &meta{
ctx: ctx,
catalog: catalog,
collections: make(map[UniqueID]*collectionInfo),
segments: NewSegmentsInfo(),
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
indexMeta: indexMeta,
chunkManager: chunkManager,
}
err = mt.reloadFromKV()
if err != nil {
return nil, err
}
@ -152,25 +149,6 @@ func (m *meta) reloadFromKV() error {
pos.ChannelName = vChannel
m.channelCPs.Insert(vChannel, pos)
}
// load field indexes
fieldIndexes, err := m.catalog.ListIndexes(m.ctx)
if err != nil {
log.Error("DataCoord meta reloadFromKV load field indexes fail", zap.Error(err))
return err
}
for _, fieldIndex := range fieldIndexes {
m.updateCollectionIndex(fieldIndex)
}
segmentIndexes, err := m.catalog.ListSegmentIndexes(m.ctx)
if err != nil {
log.Error("DataCoord meta reloadFromKV load segment indexes fail", zap.Error(err))
return err
}
for _, segIdx := range segmentIndexes {
m.updateSegmentIndex(segIdx)
metrics.FlushedSegmentFileNum.WithLabelValues(metrics.IndexFileLabel).Observe(float64(len(segIdx.IndexFileKeys)))
}
log.Info("DataCoord meta reloadFromKV done", zap.Duration("duration", record.ElapseSpan()))
return nil
}
@ -1051,33 +1029,6 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
return ret
}
func (m *meta) SelectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats {
m.RLock()
defer m.RUnlock()
ret := make(map[int64]*indexStats)
for _, info := range m.segments.segments {
if selector(info) {
s := &indexStats{
ID: info.GetID(),
numRows: info.GetNumOfRows(),
compactionFrom: info.GetCompactionFrom(),
indexStates: make(map[int64]*indexpb.SegmentIndexState),
state: info.GetState(),
lastExpireTime: info.GetLastExpireTime(),
}
for indexID, segIndex := range info.segmentIndexes {
s.indexStates[indexID] = &indexpb.SegmentIndexState{
SegmentID: segIndex.SegmentID,
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
}
}
ret[info.GetID()] = s
}
}
return ret
}
// AddAllocation add allocation in segment
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
log.Debug("meta update: add allocation",

View File

@ -65,6 +65,8 @@ func (suite *MetaReloadSuite) TestReloadFromKV() {
suite.Run("ListSegments_fail", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListSegments(mock.Anything).Return(nil, errors.New("mock"))
suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{}, nil)
suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{}, nil)
_, err := newMeta(ctx, suite.catalog, nil)
suite.Error(err)
@ -75,29 +77,8 @@ func (suite *MetaReloadSuite) TestReloadFromKV() {
suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{}, nil)
suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock"))
_, err := newMeta(ctx, suite.catalog, nil)
suite.Error(err)
})
suite.Run("ListIndexes_fail", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{}, nil)
suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{}, nil)
suite.catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, errors.New("mock"))
_, err := newMeta(ctx, suite.catalog, nil)
suite.Error(err)
})
suite.Run("ListSegmentIndexes_fails", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{}, nil)
suite.catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{}, nil)
suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{}, nil)
suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, errors.New("mock"))
suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{}, nil)
_, err := newMeta(ctx, suite.catalog, nil)
suite.Error(err)
@ -105,7 +86,8 @@ func (suite *MetaReloadSuite) TestReloadFromKV() {
suite.Run("ok", func() {
defer suite.resetMock()
suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{}, nil)
suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{}, nil)
suite.catalog.EXPECT().ListSegments(mock.Anything).Return([]*datapb.SegmentInfo{
{
ID: 1,
@ -121,25 +103,9 @@ func (suite *MetaReloadSuite) TestReloadFromKV() {
Timestamp: 1000,
},
}, nil)
suite.catalog.EXPECT().ListIndexes(mock.Anything).Return([]*model.Index{
{
CollectionID: 1,
IndexID: 1,
IndexName: "dix",
CreateTime: 1,
},
}, nil)
suite.catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return([]*model.SegmentIndex{
{
SegmentID: 1,
IndexID: 1,
},
}, nil)
meta, err := newMeta(ctx, suite.catalog, nil)
_, err := newMeta(ctx, suite.catalog, nil)
suite.NoError(err)
suite.NotNil(meta)
suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String()), 1)
})

View File

@ -51,7 +51,7 @@ type NMockAllocator_allocID_Call struct {
}
// allocID is a helper method to define mock.On call
// - _a0 context.Context
// - _a0 context.Context
func (_e *NMockAllocator_Expecter) allocID(_a0 interface{}) *NMockAllocator_allocID_Call {
return &NMockAllocator_allocID_Call{Call: _e.mock.On("allocID", _a0)}
}
@ -110,7 +110,7 @@ type NMockAllocator_allocN_Call struct {
}
// allocN is a helper method to define mock.On call
// - n int64
// - n int64
func (_e *NMockAllocator_Expecter) allocN(n interface{}) *NMockAllocator_allocN_Call {
return &NMockAllocator_allocN_Call{Call: _e.mock.On("allocN", n)}
}
@ -162,7 +162,7 @@ type NMockAllocator_allocTimestamp_Call struct {
}
// allocTimestamp is a helper method to define mock.On call
// - _a0 context.Context
// - _a0 context.Context
func (_e *NMockAllocator_Expecter) allocTimestamp(_a0 interface{}) *NMockAllocator_allocTimestamp_Call {
return &NMockAllocator_allocTimestamp_Call{Call: _e.mock.On("allocTimestamp", _a0)}
}

View File

@ -54,8 +54,8 @@ type MockCluster_AddImportSegment_Call struct {
}
// AddImportSegment is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.AddImportSegmentRequest
// - ctx context.Context
// - req *datapb.AddImportSegmentRequest
func (_e *MockCluster_Expecter) AddImportSegment(ctx interface{}, req interface{}) *MockCluster_AddImportSegment_Call {
return &MockCluster_AddImportSegment_Call{Call: _e.mock.On("AddImportSegment", ctx, req)}
}
@ -129,8 +129,8 @@ type MockCluster_DropImport_Call struct {
}
// DropImport is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.DropImportRequest
// - nodeID int64
// - in *datapb.DropImportRequest
func (_e *MockCluster_Expecter) DropImport(nodeID interface{}, in interface{}) *MockCluster_DropImport_Call {
return &MockCluster_DropImport_Call{Call: _e.mock.On("DropImport", nodeID, in)}
}
@ -172,10 +172,10 @@ type MockCluster_Flush_Call struct {
}
// Flush is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - channel string
// - segments []*datapb.SegmentInfo
// - ctx context.Context
// - nodeID int64
// - channel string
// - segments []*datapb.SegmentInfo
func (_e *MockCluster_Expecter) Flush(ctx interface{}, nodeID interface{}, channel interface{}, segments interface{}) *MockCluster_Flush_Call {
return &MockCluster_Flush_Call{Call: _e.mock.On("Flush", ctx, nodeID, channel, segments)}
}
@ -217,10 +217,10 @@ type MockCluster_FlushChannels_Call struct {
}
// FlushChannels is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - flushTs uint64
// - channels []string
// - ctx context.Context
// - nodeID int64
// - flushTs uint64
// - channels []string
func (_e *MockCluster_Expecter) FlushChannels(ctx interface{}, nodeID interface{}, flushTs interface{}, channels interface{}) *MockCluster_FlushChannels_Call {
return &MockCluster_FlushChannels_Call{Call: _e.mock.On("FlushChannels", ctx, nodeID, flushTs, channels)}
}
@ -296,9 +296,9 @@ type MockCluster_Import_Call struct {
}
// Import is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - it *datapb.ImportTaskRequest
// - ctx context.Context
// - nodeID int64
// - it *datapb.ImportTaskRequest
func (_e *MockCluster_Expecter) Import(ctx interface{}, nodeID interface{}, it interface{}) *MockCluster_Import_Call {
return &MockCluster_Import_Call{Call: _e.mock.On("Import", ctx, nodeID, it)}
}
@ -340,8 +340,8 @@ type MockCluster_ImportV2_Call struct {
}
// ImportV2 is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.ImportRequest
// - nodeID int64
// - in *datapb.ImportRequest
func (_e *MockCluster_Expecter) ImportV2(nodeID interface{}, in interface{}) *MockCluster_ImportV2_Call {
return &MockCluster_ImportV2_Call{Call: _e.mock.On("ImportV2", nodeID, in)}
}
@ -383,8 +383,8 @@ type MockCluster_PreImport_Call struct {
}
// PreImport is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.PreImportRequest
// - nodeID int64
// - in *datapb.PreImportRequest
func (_e *MockCluster_Expecter) PreImport(nodeID interface{}, in interface{}) *MockCluster_PreImport_Call {
return &MockCluster_PreImport_Call{Call: _e.mock.On("PreImport", nodeID, in)}
}
@ -438,8 +438,8 @@ type MockCluster_QueryImport_Call struct {
}
// QueryImport is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.QueryImportRequest
// - nodeID int64
// - in *datapb.QueryImportRequest
func (_e *MockCluster_Expecter) QueryImport(nodeID interface{}, in interface{}) *MockCluster_QueryImport_Call {
return &MockCluster_QueryImport_Call{Call: _e.mock.On("QueryImport", nodeID, in)}
}
@ -493,8 +493,8 @@ type MockCluster_QueryPreImport_Call struct {
}
// QueryPreImport is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.QueryPreImportRequest
// - nodeID int64
// - in *datapb.QueryPreImportRequest
func (_e *MockCluster_Expecter) QueryPreImport(nodeID interface{}, in interface{}) *MockCluster_QueryPreImport_Call {
return &MockCluster_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", nodeID, in)}
}
@ -536,7 +536,7 @@ type MockCluster_Register_Call struct {
}
// Register is a helper method to define mock.On call
// - node *NodeInfo
// - node *NodeInfo
func (_e *MockCluster_Expecter) Register(node interface{}) *MockCluster_Register_Call {
return &MockCluster_Register_Call{Call: _e.mock.On("Register", node)}
}
@ -578,8 +578,8 @@ type MockCluster_Startup_Call struct {
}
// Startup is a helper method to define mock.On call
// - ctx context.Context
// - nodes []*NodeInfo
// - ctx context.Context
// - nodes []*NodeInfo
func (_e *MockCluster_Expecter) Startup(ctx interface{}, nodes interface{}) *MockCluster_Startup_Call {
return &MockCluster_Startup_Call{Call: _e.mock.On("Startup", ctx, nodes)}
}
@ -621,7 +621,7 @@ type MockCluster_UnRegister_Call struct {
}
// UnRegister is a helper method to define mock.On call
// - node *NodeInfo
// - node *NodeInfo
func (_e *MockCluster_Expecter) UnRegister(node interface{}) *MockCluster_UnRegister_Call {
return &MockCluster_UnRegister_Call{Call: _e.mock.On("UnRegister", node)}
}
@ -663,9 +663,9 @@ type MockCluster_Watch_Call struct {
}
// Watch is a helper method to define mock.On call
// - ctx context.Context
// - ch string
// - collectionID int64
// - ctx context.Context
// - ch string
// - collectionID int64
func (_e *MockCluster_Expecter) Watch(ctx interface{}, ch interface{}, collectionID interface{}) *MockCluster_Watch_Call {
return &MockCluster_Watch_Call{Call: _e.mock.On("Watch", ctx, ch, collectionID)}
}

View File

@ -53,11 +53,11 @@ type MockManager_AllocImportSegment_Call struct {
}
// AllocImportSegment is a helper method to define mock.On call
// - ctx context.Context
// - taskID int64
// - collectionID int64
// - partitionID int64
// - channelName string
// - ctx context.Context
// - taskID int64
// - collectionID int64
// - partitionID int64
// - channelName string
func (_e *MockManager_Expecter) AllocImportSegment(ctx interface{}, taskID interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}) *MockManager_AllocImportSegment_Call {
return &MockManager_AllocImportSegment_Call{Call: _e.mock.On("AllocImportSegment", ctx, taskID, collectionID, partitionID, channelName)}
}
@ -111,11 +111,11 @@ type MockManager_AllocSegment_Call struct {
}
// AllocSegment is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - partitionID int64
// - channelName string
// - requestRows int64
// - ctx context.Context
// - collectionID int64
// - partitionID int64
// - channelName string
// - requestRows int64
func (_e *MockManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockManager_AllocSegment_Call {
return &MockManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)}
}
@ -148,8 +148,8 @@ type MockManager_DropSegment_Call struct {
}
// DropSegment is a helper method to define mock.On call
// - ctx context.Context
// - segmentID int64
// - ctx context.Context
// - segmentID int64
func (_e *MockManager_Expecter) DropSegment(ctx interface{}, segmentID interface{}) *MockManager_DropSegment_Call {
return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segmentID)}
}
@ -182,8 +182,8 @@ type MockManager_DropSegmentsOfChannel_Call struct {
}
// DropSegmentsOfChannel is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - ctx context.Context
// - channel string
func (_e *MockManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockManager_DropSegmentsOfChannel_Call {
return &MockManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)}
}
@ -225,8 +225,8 @@ type MockManager_ExpireAllocations_Call struct {
}
// ExpireAllocations is a helper method to define mock.On call
// - channel string
// - ts uint64
// - channel string
// - ts uint64
func (_e *MockManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call {
return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)}
}
@ -268,9 +268,9 @@ type MockManager_FlushImportSegments_Call struct {
}
// FlushImportSegments is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - segmentIDs []int64
// - ctx context.Context
// - collectionID int64
// - segmentIDs []int64
func (_e *MockManager_Expecter) FlushImportSegments(ctx interface{}, collectionID interface{}, segmentIDs interface{}) *MockManager_FlushImportSegments_Call {
return &MockManager_FlushImportSegments_Call{Call: _e.mock.On("FlushImportSegments", ctx, collectionID, segmentIDs)}
}
@ -324,9 +324,9 @@ type MockManager_GetFlushableSegments_Call struct {
}
// GetFlushableSegments is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - ts uint64
// - ctx context.Context
// - channel string
// - ts uint64
func (_e *MockManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockManager_GetFlushableSegments_Call {
return &MockManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)}
}
@ -380,9 +380,9 @@ type MockManager_SealAllSegments_Call struct {
}
// SealAllSegments is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - segIDs []int64
// - ctx context.Context
// - collectionID int64
// - segIDs []int64
func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, collectionID interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call {
return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, collectionID, segIDs)}
}
@ -436,12 +436,12 @@ type MockManager_allocSegmentForImport_Call struct {
}
// allocSegmentForImport is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - partitionID int64
// - channelName string
// - requestRows int64
// - taskID int64
// - ctx context.Context
// - collectionID int64
// - partitionID int64
// - channelName string
// - requestRows int64
// - taskID int64
func (_e *MockManager_Expecter) allocSegmentForImport(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}, taskID interface{}) *MockManager_allocSegmentForImport_Call {
return &MockManager_allocSegmentForImport_Call{Call: _e.mock.On("allocSegmentForImport", ctx, collectionID, partitionID, channelName, requestRows, taskID)}
}

View File

@ -56,9 +56,9 @@ type MockSessionManager_AddImportSegment_Call struct {
}
// AddImportSegment is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - req *datapb.AddImportSegmentRequest
// - ctx context.Context
// - nodeID int64
// - req *datapb.AddImportSegmentRequest
func (_e *MockSessionManager_Expecter) AddImportSegment(ctx interface{}, nodeID interface{}, req interface{}) *MockSessionManager_AddImportSegment_Call {
return &MockSessionManager_AddImportSegment_Call{Call: _e.mock.On("AddImportSegment", ctx, nodeID, req)}
}
@ -91,7 +91,7 @@ type MockSessionManager_AddSession_Call struct {
}
// AddSession is a helper method to define mock.On call
// - node *NodeInfo
// - node *NodeInfo
func (_e *MockSessionManager_Expecter) AddSession(node interface{}) *MockSessionManager_AddSession_Call {
return &MockSessionManager_AddSession_Call{Call: _e.mock.On("AddSession", node)}
}
@ -145,9 +145,9 @@ type MockSessionManager_CheckChannelOperationProgress_Call struct {
}
// CheckChannelOperationProgress is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - info *datapb.ChannelWatchInfo
// - ctx context.Context
// - nodeID int64
// - info *datapb.ChannelWatchInfo
func (_e *MockSessionManager_Expecter) CheckChannelOperationProgress(ctx interface{}, nodeID interface{}, info interface{}) *MockSessionManager_CheckChannelOperationProgress_Call {
return &MockSessionManager_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", ctx, nodeID, info)}
}
@ -189,7 +189,7 @@ type MockSessionManager_CheckHealth_Call struct {
}
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *MockSessionManager_Expecter) CheckHealth(ctx interface{}) *MockSessionManager_CheckHealth_Call {
return &MockSessionManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)}
}
@ -263,9 +263,9 @@ type MockSessionManager_Compaction_Call struct {
}
// Compaction is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - plan *datapb.CompactionPlan
// - ctx context.Context
// - nodeID int64
// - plan *datapb.CompactionPlan
func (_e *MockSessionManager_Expecter) Compaction(ctx interface{}, nodeID interface{}, plan interface{}) *MockSessionManager_Compaction_Call {
return &MockSessionManager_Compaction_Call{Call: _e.mock.On("Compaction", ctx, nodeID, plan)}
}
@ -298,7 +298,7 @@ type MockSessionManager_DeleteSession_Call struct {
}
// DeleteSession is a helper method to define mock.On call
// - node *NodeInfo
// - node *NodeInfo
func (_e *MockSessionManager_Expecter) DeleteSession(node interface{}) *MockSessionManager_DeleteSession_Call {
return &MockSessionManager_DeleteSession_Call{Call: _e.mock.On("DeleteSession", node)}
}
@ -340,8 +340,8 @@ type MockSessionManager_DropImport_Call struct {
}
// DropImport is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.DropImportRequest
// - nodeID int64
// - in *datapb.DropImportRequest
func (_e *MockSessionManager_Expecter) DropImport(nodeID interface{}, in interface{}) *MockSessionManager_DropImport_Call {
return &MockSessionManager_DropImport_Call{Call: _e.mock.On("DropImport", nodeID, in)}
}
@ -374,9 +374,9 @@ type MockSessionManager_Flush_Call struct {
}
// Flush is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - req *datapb.FlushSegmentsRequest
// - ctx context.Context
// - nodeID int64
// - req *datapb.FlushSegmentsRequest
func (_e *MockSessionManager_Expecter) Flush(ctx interface{}, nodeID interface{}, req interface{}) *MockSessionManager_Flush_Call {
return &MockSessionManager_Flush_Call{Call: _e.mock.On("Flush", ctx, nodeID, req)}
}
@ -418,9 +418,9 @@ type MockSessionManager_FlushChannels_Call struct {
}
// FlushChannels is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - req *datapb.FlushChannelsRequest
// - ctx context.Context
// - nodeID int64
// - req *datapb.FlushChannelsRequest
func (_e *MockSessionManager_Expecter) FlushChannels(ctx interface{}, nodeID interface{}, req interface{}) *MockSessionManager_FlushChannels_Call {
return &MockSessionManager_FlushChannels_Call{Call: _e.mock.On("FlushChannels", ctx, nodeID, req)}
}
@ -582,9 +582,9 @@ type MockSessionManager_Import_Call struct {
}
// Import is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - itr *datapb.ImportTaskRequest
// - ctx context.Context
// - nodeID int64
// - itr *datapb.ImportTaskRequest
func (_e *MockSessionManager_Expecter) Import(ctx interface{}, nodeID interface{}, itr interface{}) *MockSessionManager_Import_Call {
return &MockSessionManager_Import_Call{Call: _e.mock.On("Import", ctx, nodeID, itr)}
}
@ -626,8 +626,8 @@ type MockSessionManager_ImportV2_Call struct {
}
// ImportV2 is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.ImportRequest
// - nodeID int64
// - in *datapb.ImportRequest
func (_e *MockSessionManager_Expecter) ImportV2(nodeID interface{}, in interface{}) *MockSessionManager_ImportV2_Call {
return &MockSessionManager_ImportV2_Call{Call: _e.mock.On("ImportV2", nodeID, in)}
}
@ -669,9 +669,9 @@ type MockSessionManager_NotifyChannelOperation_Call struct {
}
// NotifyChannelOperation is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - req *datapb.ChannelOperationsRequest
// - ctx context.Context
// - nodeID int64
// - req *datapb.ChannelOperationsRequest
func (_e *MockSessionManager_Expecter) NotifyChannelOperation(ctx interface{}, nodeID interface{}, req interface{}) *MockSessionManager_NotifyChannelOperation_Call {
return &MockSessionManager_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", ctx, nodeID, req)}
}
@ -713,8 +713,8 @@ type MockSessionManager_PreImport_Call struct {
}
// PreImport is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.PreImportRequest
// - nodeID int64
// - in *datapb.PreImportRequest
func (_e *MockSessionManager_Expecter) PreImport(nodeID interface{}, in interface{}) *MockSessionManager_PreImport_Call {
return &MockSessionManager_PreImport_Call{Call: _e.mock.On("PreImport", nodeID, in)}
}
@ -768,8 +768,8 @@ type MockSessionManager_QueryImport_Call struct {
}
// QueryImport is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.QueryImportRequest
// - nodeID int64
// - in *datapb.QueryImportRequest
func (_e *MockSessionManager_Expecter) QueryImport(nodeID interface{}, in interface{}) *MockSessionManager_QueryImport_Call {
return &MockSessionManager_QueryImport_Call{Call: _e.mock.On("QueryImport", nodeID, in)}
}
@ -823,8 +823,8 @@ type MockSessionManager_QueryPreImport_Call struct {
}
// QueryPreImport is a helper method to define mock.On call
// - nodeID int64
// - in *datapb.QueryPreImportRequest
// - nodeID int64
// - in *datapb.QueryPreImportRequest
func (_e *MockSessionManager_Expecter) QueryPreImport(nodeID interface{}, in interface{}) *MockSessionManager_QueryPreImport_Call {
return &MockSessionManager_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", nodeID, in)}
}
@ -866,8 +866,8 @@ type MockSessionManager_SyncSegments_Call struct {
}
// SyncSegments is a helper method to define mock.On call
// - nodeID int64
// - req *datapb.SyncSegmentsRequest
// - nodeID int64
// - req *datapb.SyncSegmentsRequest
func (_e *MockSessionManager_Expecter) SyncSegments(nodeID interface{}, req interface{}) *MockSessionManager_SyncSegments_Call {
return &MockSessionManager_SyncSegments_Call{Call: _e.mock.On("SyncSegments", nodeID, req)}
}

View File

@ -21,13 +21,10 @@ import (
"github.com/golang/protobuf/proto"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
@ -38,11 +35,10 @@ type SegmentsInfo struct {
// SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it
type SegmentInfo struct {
*datapb.SegmentInfo
segmentIndexes map[UniqueID]*model.SegmentIndex
currRows int64
allocations []*Allocation
lastFlushTime time.Time
isCompacting bool
currRows int64
allocations []*Allocation
lastFlushTime time.Time
isCompacting bool
// a cache to avoid calculate twice
size atomic.Int64
lastWrittenTime time.Time
@ -54,11 +50,10 @@ type SegmentInfo struct {
// the worst case scenario is to have a segment with twice size we expects
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
return &SegmentInfo{
SegmentInfo: info,
segmentIndexes: make(map[UniqueID]*model.SegmentIndex),
currRows: info.GetNumOfRows(),
allocations: make([]*Allocation, 0, 16),
lastFlushTime: time.Now().Add(-1 * flushInterval),
SegmentInfo: info,
currRows: info.GetNumOfRows(),
allocations: make([]*Allocation, 0, 16),
lastFlushTime: time.Now().Add(-1 * flushInterval),
// A growing segment from recovery can be also considered idle.
lastWrittenTime: getZeroTime(),
}
@ -104,30 +99,6 @@ func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
s.segments[segmentID] = segment
}
// SetSegmentIndex sets SegmentIndex with segmentID, perform overwrite if already exists
func (s *SegmentsInfo) SetSegmentIndex(segmentID UniqueID, segIndex *model.SegmentIndex) {
segment, ok := s.segments[segmentID]
if !ok {
log.Warn("segment missing for set segment index",
zap.Int64("segmentID", segmentID),
zap.Int64("indexID", segIndex.IndexID),
)
return
}
segment = segment.Clone()
if segment.segmentIndexes == nil {
segment.segmentIndexes = make(map[UniqueID]*model.SegmentIndex)
}
segment.segmentIndexes[segIndex.IndexID] = segIndex
s.segments[segmentID] = segment
}
func (s *SegmentsInfo) DropSegmentIndex(segmentID UniqueID, indexID UniqueID) {
if _, ok := s.segments[segmentID]; ok {
delete(s.segments[segmentID].segmentIndexes, indexID)
}
}
// SetRowCount sets rowCount info for SegmentInfo with provided segmentID
// if SegmentInfo not found, do nothing
func (s *SegmentsInfo) SetRowCount(segmentID UniqueID, rowCount int64) {
@ -213,17 +184,12 @@ func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) {
// Clone deep clone the segment info and return a new instance
func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo)
segmentIndexes := make(map[UniqueID]*model.SegmentIndex, len(s.segmentIndexes))
for indexID, segIdx := range s.segmentIndexes {
segmentIndexes[indexID] = model.CloneSegmentIndex(segIdx)
}
cloned := &SegmentInfo{
SegmentInfo: info,
segmentIndexes: segmentIndexes,
currRows: s.currRows,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
SegmentInfo: info,
currRows: s.currRows,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
// cannot copy size, since binlog may be changed
lastWrittenTime: s.lastWrittenTime,
}
@ -235,13 +201,8 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
// ShadowClone shadow clone the segment and return a new instance
func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
segmentIndexes := make(map[UniqueID]*model.SegmentIndex, len(s.segmentIndexes))
for indexID, segIdx := range s.segmentIndexes {
segmentIndexes[indexID] = model.CloneSegmentIndex(segIdx)
}
cloned := &SegmentInfo{
SegmentInfo: s.SegmentInfo,
segmentIndexes: segmentIndexes,
currRows: s.currRows,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,

View File

@ -26,7 +26,7 @@ import (
"syscall"
"time"
semver "github.com/blang/semver/v4"
"github.com/blang/semver/v4"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/tikv/client-go/v2/txnkv"

View File

@ -1452,7 +1452,7 @@ func TestGetQueryVChanPositions(t *testing.T) {
},
})
err := svr.meta.CreateIndex(&model.Index{
err := svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -1476,13 +1476,13 @@ func TestGetQueryVChanPositions(t *testing.T) {
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s1))
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: 1,
BuildID: 1,
IndexID: 1,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: 1,
State: commonpb.IndexState_Finished,
})
@ -1619,7 +1619,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.CreateIndex(&model.Index{
err := svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -1689,7 +1689,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.CreateIndex(&model.Index{
err := svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -1775,7 +1775,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.CreateIndex(&model.Index{
err := svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -1813,13 +1813,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(d))
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: 2,
BuildID: 1,
IndexID: 1,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: 1,
State: commonpb.IndexState_Finished,
})
@ -1841,13 +1841,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(e))
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: 3,
BuildID: 2,
IndexID: 1,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: 2,
State: commonpb.IndexState_Finished,
})
@ -1976,7 +1976,7 @@ func TestGetRecoveryInfo(t *testing.T) {
})
assert.NoError(t, err)
err = svr.meta.CreateIndex(&model.Index{
err = svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -2025,22 +2025,22 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NoError(t, err)
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: seg1.ID,
BuildID: seg1.ID,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: seg1.ID,
State: commonpb.IndexState_Finished,
})
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: seg2.ID,
BuildID: seg2.ID,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: seg2.ID,
State: commonpb.IndexState_Finished,
})
@ -2195,7 +2195,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment))
assert.NoError(t, err)
err = svr.meta.CreateIndex(&model.Index{
err = svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -2203,12 +2203,12 @@ func TestGetRecoveryInfo(t *testing.T) {
IndexName: "",
})
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: segment.ID,
BuildID: segment.ID,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: segment.ID,
State: commonpb.IndexState_Finished,
})
@ -2361,7 +2361,7 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NoError(t, err)
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
err = svr.meta.CreateIndex(&model.Index{
err = svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -2375,7 +2375,7 @@ func TestGetRecoveryInfo(t *testing.T) {
UserIndexParams: nil,
})
assert.NoError(t, err)
svr.meta.segments.SetSegmentIndex(seg4.ID, &model.SegmentIndex{
svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: seg4.ID,
CollectionID: 0,
PartitionID: 0,

View File

@ -894,7 +894,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
})
assert.NoError(t, err)
err = svr.meta.CreateIndex(&model.Index{
err = svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -943,22 +943,22 @@ func TestGetRecoveryInfoV2(t *testing.T) {
assert.NoError(t, err)
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: seg1.ID,
BuildID: seg1.ID,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: seg1.ID,
State: commonpb.IndexState_Finished,
})
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: seg2.ID,
BuildID: seg2.ID,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: seg2.ID,
State: commonpb.IndexState_Finished,
})
@ -1118,7 +1118,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segment))
assert.NoError(t, err)
err = svr.meta.CreateIndex(&model.Index{
err = svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -1126,12 +1126,12 @@ func TestGetRecoveryInfoV2(t *testing.T) {
IndexName: "",
})
assert.NoError(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
err = svr.meta.indexMeta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: segment.ID,
BuildID: segment.ID,
})
assert.NoError(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
err = svr.meta.indexMeta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: segment.ID,
State: commonpb.IndexState_Finished,
})
@ -1284,7 +1284,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
assert.NoError(t, err)
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
err = svr.meta.CreateIndex(&model.Index{
err = svr.meta.indexMeta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
@ -1298,7 +1298,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
UserIndexParams: nil,
})
assert.NoError(t, err)
svr.meta.segments.SetSegmentIndex(seg4.ID, &model.SegmentIndex{
svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: seg4.ID,
CollectionID: 0,
PartitionID: 0,

View File

@ -102,7 +102,7 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
hasUnindexedVecField := false
for _, fieldID := range vecFieldIDs[segment.GetCollectionID()] {
segmentIndexState := mt.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), fieldID)
segmentIndexState := mt.indexMeta.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), fieldID)
if segmentIndexState.State != commonpb.IndexState_Finished {
hasUnindexedVecField = true
}

View File

@ -82,11 +82,33 @@ func (s *DataNodeSuite) compactAndReboot(collection string) {
CollectionName: collection,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(s.dim, "FLAT", metric.IP),
ExtraParams: integration.ConstructIndexParam(s.dim, integration.IndexHNSW, metric.IP),
})
s.Require().NoError(err)
s.Require().True(merr.Ok(createIndexStatus))
for stay, timeout := true, time.After(time.Second*10); stay; {
select {
case <-timeout:
stay = false
default:
describeIndexResp, err := s.Cluster.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
CollectionName: collection,
FieldName: integration.FloatVecField,
IndexName: "_default",
})
s.Require().NoError(err)
for _, d := range describeIndexResp.GetIndexDescriptions() {
if d.GetFieldName() == integration.FloatVecField && d.GetState() == commonpb.IndexState_Finished {
log.Info("build index finished", zap.Any("index_desc", d))
stay = false
}
}
time.Sleep(1 * time.Second)
}
}
coll, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
CollectionName: collection,
})
@ -112,7 +134,7 @@ func (s *DataNodeSuite) compactAndReboot(collection string) {
s.Require().True(merr.Ok(stateResp.GetStatus()))
// sleep to ensure compaction tasks are submitted to DN
time.Sleep(time.Second)
time.Sleep(3 * time.Second)
planResp, err := s.Cluster.Proxy.GetCompactionStateWithPlans(ctx, &milvuspb.GetCompactionPlansRequest{
CompactionID: compactID,