From a81c9cc11c35095b6dd4fead9fe30b84575bc789 Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Tue, 13 Dec 2022 15:39:21 +0800 Subject: [PATCH] Recalculate segment max size during compaction (#21077) issue: #21077 /kind improvement Signed-off-by: Yuchen Gao Signed-off-by: Yuchen Gao --- internal/datacoord/compaction_trigger.go | 89 +++--- internal/datacoord/compaction_trigger_test.go | 254 +++++++++++------- 2 files changed, 219 insertions(+), 124 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 23fb765c8b..eb83791925 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -66,18 +66,22 @@ type compactionSignal struct { var _ trigger = (*compactionTrigger)(nil) type compactionTrigger struct { - handler Handler - meta *meta - allocator allocator - signals chan *compactionSignal - compactionHandler compactionPlanContext - globalTrigger *time.Ticker - forceMu sync.Mutex - quit chan struct{} - wg sync.WaitGroup - segRefer *SegmentReferenceManager - indexCoord types.IndexCoord - estimateDiskSegmentPolicy calUpperLimitPolicy + handler Handler + meta *meta + allocator allocator + signals chan *compactionSignal + compactionHandler compactionPlanContext + globalTrigger *time.Ticker + forceMu sync.Mutex + quit chan struct{} + wg sync.WaitGroup + segRefer *SegmentReferenceManager + indexCoord types.IndexCoord + estimateNonDiskSegmentPolicy calUpperLimitPolicy + estimateDiskSegmentPolicy calUpperLimitPolicy + // A sloopy hack, so we can test with different segment row count without worrying that + // they are re-calculated in every compaction. + testingOnly bool } func newCompactionTrigger( @@ -89,14 +93,15 @@ func newCompactionTrigger( handler Handler, ) *compactionTrigger { return &compactionTrigger{ - meta: meta, - allocator: allocator, - signals: make(chan *compactionSignal, 100), - compactionHandler: compactionHandler, - segRefer: segRefer, - indexCoord: indexCoord, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - handler: handler, + meta: meta, + allocator: allocator, + signals: make(chan *compactionSignal, 100), + compactionHandler: compactionHandler, + segRefer: segRefer, + indexCoord: indexCoord, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + handler: handler, } } @@ -260,17 +265,20 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) { return t.allocator.allocID(ctx) } -func (t *compactionTrigger) estimateDiskSegmentMaxNumOfRows(collectionID UniqueID) (int, error) { +func (t *compactionTrigger) reCalcSegmentMaxNumOfRows(collectionID UniqueID, isDisk bool) (int, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() collMeta, err := t.handler.GetCollection(ctx, collectionID) if err != nil { return -1, fmt.Errorf("failed to get collection %d", collectionID) } - - return t.estimateDiskSegmentPolicy(collMeta.Schema) + if isDisk { + return t.estimateDiskSegmentPolicy(collMeta.Schema) + } + return t.estimateNonDiskSegmentPolicy(collMeta.Schema) } +// TODO: Update segment info should be written back to Etcd. func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, error) { ctx := context.Background() @@ -287,23 +295,44 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, return false, err } - isDiskIndex := false + isDiskANN := false for _, indexInfo := range resp.IndexInfos { indexParamsMap := funcutil.KeyValuePair2Map(indexInfo.IndexParams) if indexType, ok := indexParamsMap["index_type"]; ok { if indexType == indexparamcheck.IndexDISKANN { - diskSegmentMaxRows, err := t.estimateDiskSegmentMaxNumOfRows(collectionID) + // If index type is DiskANN, recalc segment max size here. + isDiskANN = true + newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true) if err != nil { return false, err } - for _, segment := range segments { - segment.MaxRowNum = int64(diskSegmentMaxRows) + if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { + log.Info("segment max rows recalculated for DiskANN collection", + zap.Int64("old max rows", segments[0].GetMaxRowNum()), + zap.Int64("new max rows", int64(newMaxRows))) + for _, segment := range segments { + segment.MaxRowNum = int64(newMaxRows) + } } - isDiskIndex = true } } } - return isDiskIndex, nil + // If index type is not DiskANN, recalc segment max size using default policy. + if !isDiskANN && !t.testingOnly { + newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, false) + if err != nil { + return isDiskANN, err + } + if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { + log.Info("segment max rows recalculated for non-DiskANN collection", + zap.Int64("old max rows", segments[0].GetMaxRowNum()), + zap.Int64("new max rows", int64(newMaxRows))) + for _, segment := range segments { + segment.MaxRowNum = int64(newMaxRows) + } + } + } + return isDiskANN, nil } func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { @@ -340,7 +369,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { isDiskIndex, err := t.updateSegmentMaxSize(group.segments) if err != nil { - log.Warn("failed to update segment max size,", zap.Error(err)) + log.Warn("failed to update segment max size", zap.Error(err)) continue } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 51caf3af12..e752a043f2 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -181,6 +181,12 @@ func Test_compactionTrigger_force(t *testing.T) { { FieldID: vecFieldID, DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, }, }, }, @@ -277,6 +283,12 @@ func Test_compactionTrigger_force(t *testing.T) { { FieldID: vecFieldID, DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, }, }, }, @@ -347,14 +359,17 @@ func Test_compactionTrigger_force(t *testing.T) { indexCoord := newMockIndexCoord() tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: tt.fields.segRefer, - indexCoord: indexCoord, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: tt.fields.segRefer, + indexCoord: indexCoord, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } _, err := tr.forceTriggerCompaction(tt.collectionID) assert.Equal(t, tt.wantErr, err != nil) @@ -373,15 +388,17 @@ func Test_compactionTrigger_force(t *testing.T) { segment.CollectionID = 1000 } tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: tt.fields.segRefer, - indexCoord: indexCood, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: tt.fields.segRefer, + indexCoord: indexCood, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } tt.collectionID = 1000 _, err := tr.forceTriggerCompaction(tt.collectionID) @@ -401,15 +418,17 @@ func Test_compactionTrigger_force(t *testing.T) { segment.CollectionID = 2000 } tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: tt.fields.segRefer, - indexCoord: indexCood, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: tt.fields.segRefer, + indexCoord: indexCood, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } tt.collectionID = 2000 _, err := tr.forceTriggerCompaction(tt.collectionID) @@ -434,15 +453,17 @@ func Test_compactionTrigger_force(t *testing.T) { segment.CollectionID = 3000 } tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: tt.fields.segRefer, - indexCoord: indexCood, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: tt.fields.segRefer, + indexCoord: indexCood, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } tt.collectionID = 3000 _, err := tr.forceTriggerCompaction(tt.collectionID) @@ -467,15 +488,17 @@ func Test_compactionTrigger_force(t *testing.T) { segment.CollectionID = 4000 } tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: tt.fields.segRefer, - indexCoord: indexCood, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: tt.fields.segRefer, + indexCoord: indexCood, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } tt.collectionID = 4000 _, err := tr.forceTriggerCompaction(tt.collectionID) @@ -508,6 +531,7 @@ func Test_compactionTrigger_force(t *testing.T) { globalTrigger: tt.fields.globalTrigger, segRefer: tt.fields.segRefer, indexCoord: indexCood, + testingOnly: true, } tt.collectionID = 10000 _, err := tr.forceTriggerCompaction(tt.collectionID) @@ -527,15 +551,17 @@ func Test_compactionTrigger_force(t *testing.T) { t.Run(tt.name+" with allocate ts error", func(t *testing.T) { indexCood := newMockIndexCoord() tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: &FailsAllocator{allocIDSucceed: true}, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: tt.fields.segRefer, - indexCoord: indexCood, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: &FailsAllocator{allocIDSucceed: true}, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: tt.fields.segRefer, + indexCoord: indexCood, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } { @@ -587,15 +613,17 @@ func Test_compactionTrigger_force(t *testing.T) { segment.CollectionID = 1111 } tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: tt.fields.segRefer, - indexCoord: indexCood, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: tt.fields.segRefer, + indexCoord: indexCood, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } { @@ -711,6 +739,12 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { { FieldID: vecFieldID, DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, }, }, }, @@ -782,14 +816,17 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { indexCoord := newMockIndexCoord() tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, - indexCoord: indexCoord, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, + indexCoord: indexCoord, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } _, err := tr.forceTriggerCompaction(tt.args.collectionID) assert.Equal(t, tt.wantErr, err != nil) @@ -898,6 +935,12 @@ func Test_compactionTrigger_noplan(t *testing.T) { { FieldID: vecFieldID, DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, }, }, }, @@ -922,14 +965,17 @@ func Test_compactionTrigger_noplan(t *testing.T) { indexCoord := newMockIndexCoord() tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, - indexCoord: indexCoord, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, + indexCoord: indexCoord, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } tr.start() defer tr.stop() @@ -1045,6 +1091,12 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { { FieldID: vecFieldID, DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, }, }, }, @@ -1077,6 +1129,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { globalTrigger: tt.fields.globalTrigger, segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord: indexCoord, + testingOnly: true, } tr.start() defer tr.stop() @@ -1207,14 +1260,17 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { indexCoord := newMockIndexCoord() tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, - indexCoord: indexCoord, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, + indexCoord: indexCoord, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } tr.start() defer tr.stop() @@ -1342,14 +1398,17 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { indexCoord := newMockIndexCoord() tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, - indexCoord: indexCoord, + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, + indexCoord: indexCoord, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, } tr.start() defer tr.stop() @@ -1443,6 +1502,12 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { { FieldID: vecFieldID, DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, }, }, }, @@ -1475,6 +1540,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { globalTrigger: tt.fields.globalTrigger, segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord: indexCoord, + testingOnly: true, } tr.start() defer tr.stop()