enhance: Add PreallocatedSegmentIDs for the compaction task (#36734)

Add `PreallocatedSegmentIDs` field to the compaction task, allowing the
`ResultSegments` in the compaction task to represent the final segments
produced by the compaction.

issue: https://github.com/milvus-io/milvus/issues/36733

also related: https://github.com/milvus-io/milvus/issues/36686

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/36790/head
yihao.dai 2024-10-13 17:59:21 +08:00 committed by GitHub
parent 383350c120
commit d230b91bd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 64 additions and 45 deletions

View File

@ -167,25 +167,22 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
return nil, err
}
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
ClusteringKeyField: t.GetClusteringKeyField().GetFieldID(),
MaxSegmentRows: t.GetMaxSegmentRows(),
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
SlotUsage: t.GetSlotUsage(),
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
ClusteringKeyField: t.GetClusteringKeyField().GetFieldID(),
MaxSegmentRows: t.GetMaxSegmentRows(),
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: t.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

View File

@ -57,6 +57,13 @@ func (csm *compactionTaskMeta) reloadFromKV() error {
return err
}
for _, task := range compactionTasks {
// To maintain compatibility with versions ≤v2.4.12, which use `ResultSegments` as preallocate segment IDs.
if task.PreAllocatedSegmentIDs == nil && len(task.GetResultSegments()) == 2 {
task.PreAllocatedSegmentIDs = &datapb.IDRange{
Begin: task.GetResultSegments()[0],
End: task.GetResultSegments()[1],
}
}
csm.saveCompactionTaskMemory(task)
}
log.Info("DataCoord compactionTaskMeta reloadFromKV done", zap.Duration("duration", record.ElapseSpan()))

View File

@ -318,21 +318,18 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
return nil, err
}
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
SlotUsage: t.GetSlotUsage(),
MaxSize: t.GetMaxSize(),
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: t.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
MaxSize: t.GetMaxSize(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

View File

@ -385,10 +385,14 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
PartitionID: group.partitionID,
Channel: group.channelName,
InputSegments: inputSegmentIDs,
ResultSegments: []int64{startID + 1, endID}, // pre-allocated target segment
ResultSegments: []int64{},
TotalRows: totalRows,
Schema: coll.Schema,
MaxSize: getExpandedSize(expectedSize),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
},
}
err = t.compactionHandler.enqueueCompaction(task)
if err != nil {
@ -491,10 +495,14 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
PartitionID: partitionID,
Channel: channel,
InputSegments: inputSegmentIDs,
ResultSegments: []int64{startID + 1, endID}, // pre-allocated target segment
ResultSegments: []int64{},
TotalRows: totalRows,
Schema: coll.Schema,
MaxSize: getExpandedSize(expectedSize),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
},
}
if err := t.compactionHandler.enqueueCompaction(task); err != nil {
log.Warn("failed to execute compaction task",

View File

@ -70,7 +70,6 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er
}
alloc := newMock0Allocator(h.t)
t.allocator = alloc
t.ResultSegments = []int64{100, 200}
plan, err := t.BuildCompactionRequest()
h.spyChan <- plan
return err
@ -532,7 +531,7 @@ func Test_compactionTrigger_force(t *testing.T) {
},
[]*datapb.CompactionPlan{
{
PlanID: 0,
PlanID: 100,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 1,
@ -580,12 +579,13 @@ func Test_compactionTrigger_force(t *testing.T) {
},
},
// StartTime: 0,
BeginLogID: 100,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
Channel: "ch1",
TotalRows: 200,
Schema: schema,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 100, End: 200},
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 101, End: 200},
MaxSize: 1342177280,
},
},
@ -878,6 +878,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
IsSorted: true,
},
},
BeginLogID: 100,
StartTime: 3,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,

View File

@ -335,12 +335,16 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
Schema: collection.Schema,
ClusteringKeyField: view.(*ClusteringSegmentsView).clusteringKeyField,
InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }),
ResultSegments: []int64{start, end}, // pre-allocated result segments range
ResultSegments: []int64{},
MaxSegmentRows: maxSegmentRows,
PreferSegmentRows: preferSegmentRows,
TotalRows: totalRows,
AnalyzeTaskID: taskID + 1,
LastStateStartTime: time.Now().Unix(),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: start,
End: end,
},
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
@ -391,10 +395,14 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
Channel: view.GetGroupLabel().Channel,
Schema: collection.Schema,
InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }),
ResultSegments: []int64{startID + 1, endID},
ResultSegments: []int64{},
TotalRows: totalRows,
LastStateStartTime: time.Now().Unix(),
MaxSize: getExpandedSize(expectedSize),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
},
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {

View File

@ -108,9 +108,9 @@ func newMockAllocator(t *testing.T) *allocator.MockAllocator {
func newMock0Allocator(t *testing.T) *allocator.MockAllocator {
mock0Allocator := allocator.NewMockAllocator(t)
mock0Allocator.EXPECT().AllocID(mock.Anything).Return(0, nil).Maybe()
mock0Allocator.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil).Maybe()
mock0Allocator.EXPECT().AllocN(mock.Anything).Return(0, 0, nil).Maybe()
mock0Allocator.EXPECT().AllocID(mock.Anything).Return(100, nil).Maybe()
mock0Allocator.EXPECT().AllocTimestamp(mock.Anything).Return(1000, nil).Maybe()
mock0Allocator.EXPECT().AllocN(mock.Anything).Return(100, 200, nil).Maybe()
return mock0Allocator
}

View File

@ -609,7 +609,7 @@ message CompactionPlan {
repeated int64 analyze_segment_ids = 15;
int32 state = 16;
int64 begin_logID = 17;
IDRange pre_allocated_segmentIDs = 18; // only for clustering compaction
IDRange pre_allocated_segmentIDs = 18;
int64 slot_usage = 19;
int64 max_size = 20;
}
@ -972,6 +972,7 @@ message CompactionTask{
int64 lastStateStartTime = 25;
int64 max_size = 26;
repeated int64 tmpSegments = 27;
IDRange pre_allocated_segmentIDs = 28;
}
message PartitionStatsInfo {