diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9d6de7e00c..ff35477d9b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -559,9 +559,9 @@ dataCoord: # level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions. # mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions. taskPrioritizer: default + taskQueueCapacity: 256 # compaction task queue size rpcTimeout: 10 maxParallelTaskNum: 10 - workerMaxParallelTaskNum: 2 dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) gcInterval: 1800 # The time interval in seconds for compaction gc clustering: diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 05193eb31d..8210306259 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -37,9 +37,12 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) +// TODO: we just warn about the long executing/queuing tasks +// need to get rid of long queuing tasks because the compaction tasks are local optimum. var maxCompactionTaskExecutionDuration = map[datapb.CompactionType]time.Duration{ datapb.CompactionType_MixCompaction: 30 * time.Minute, datapb.CompactionType_Level0DeleteCompaction: 30 * time.Minute, @@ -180,8 +183,11 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta, allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler, ) *compactionPlanHandler { + // Higher capacity will have better ordering in priority, but consumes more memory. + // TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of. + capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt() return &compactionPlanHandler{ - queueTasks: *NewCompactionQueue(256, getPrioritizer()), // Higher capacity will have better ordering in priority, but consumes more memory. + queueTasks: *NewCompactionQueue(capacity, getPrioritizer()), chManager: cm, meta: meta, sessions: sessions, @@ -293,6 +299,7 @@ func (c *compactionPlanHandler) loadMeta() { state := task.GetState() if state == datapb.CompactionTaskState_completed || state == datapb.CompactionTaskState_cleaned || + state == datapb.CompactionTaskState_timeout || state == datapb.CompactionTaskState_unknown { log.Info("compactionPlanHandler loadMeta abandon compactionTask", zap.Int64("planID", task.GetPlanID()), diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index c0769ff02c..002b8a2635 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -80,8 +79,6 @@ func (t *l0CompactionTask) Process() bool { return t.processPipelining() case datapb.CompactionTaskState_executing: return t.processExecuting() - case datapb.CompactionTaskState_timeout: - return t.processTimeout() case datapb.CompactionTaskState_meta_saved: return t.processMetaSaved() case datapb.CompactionTaskState_completed: @@ -133,16 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool { return false } switch result.GetState() { - case datapb.CompactionTaskState_executing: - // will L0Compaction be timeouted? - if t.checkTimeout() { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err != nil { - log.Warn("l0CompactionTask failed to set task timeout state", zap.Error(err)) - return false - } - return t.processTimeout() - } case datapb.CompactionTaskState_completed: t.result = result if err := t.saveSegmentMeta(); err != nil { @@ -190,16 +177,6 @@ func (t *l0CompactionTask) processCompleted() bool { return true } -func (t *l0CompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - if err != nil { - log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } - return true -} - func (t *l0CompactionTask) processFailed() bool { if t.hasAssignedWorker() { err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ @@ -359,24 +336,6 @@ func (t *l0CompactionTask) hasAssignedWorker() bool { return t.GetTaskProto().GetNodeID() != 0 && t.GetTaskProto().GetNodeID() != NullNodeID } -func (t *l0CompactionTask) checkTimeout() bool { - if t.GetTaskProto().GetTimeoutInSeconds() > 0 { - start := time.Unix(t.GetTaskProto().GetStartTime(), 0) - diff := time.Since(start).Seconds() - if diff > float64(t.GetTaskProto().GetTimeoutInSeconds()) { - log.Warn("compaction timeout", - zap.Int64("taskID", t.GetTaskProto().GetTriggerID()), - zap.Int64("planID", t.GetTaskProto().GetPlanID()), - zap.Int64("nodeID", t.GetTaskProto().GetNodeID()), - zap.Int32("timeout in seconds", t.GetTaskProto().GetTimeoutInSeconds()), - zap.Time("startTime", start), - ) - return true - } - } - return false -} - func (t *l0CompactionTask) SetNodeID(id UniqueID) error { return t.updateAndSaveTaskMeta(setNodeID(id)) } diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 116d1f2597..ede8ddd78e 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "testing" - "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -405,48 +404,14 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t.updateAndSaveTaskMeta(setNodeID(100)) s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything). - Return(&datapb.CompactionPlanResult{ - PlanID: t.GetTaskProto().GetPlanID(), - State: datapb.CompactionTaskState_executing, - }, nil).Twice() - - got := t.Process() - s.False(got) - - // test timeout - t.updateAndSaveTaskMeta(setStartTime(time.Now().Add(-time.Hour).Unix()), setTimeoutInSeconds(10)) - - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false). - RunAndReturn(func(inputs []int64, compacting bool) { - s.ElementsMatch(inputs, t.GetTaskProto().GetInputSegments()) - s.False(compacting) - }).Once() - - got = t.Process() - s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) - }) - - s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() { - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() - t := s.generateTestL0Task(datapb.CompactionTaskState_executing) - t.updateAndSaveTaskMeta(setNodeID(100)) - s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything). Return(&datapb.CompactionPlanResult{ PlanID: t.GetTaskProto().GetPlanID(), State: datapb.CompactionTaskState_executing, }, nil).Once() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() - - t.updateAndSaveTaskMeta(setStartTime(time.Now().Add(-time.Hour).Unix()), setTimeoutInSeconds(10)) got := t.Process() s.False(got) - s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState()) }) s.Run("test executing with result completed", func() { @@ -545,20 +510,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState()) }) - s.Run("test timeout", func() { - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - t := s.generateTestL0Task(datapb.CompactionTaskState_timeout) - t.updateAndSaveTaskMeta(setNodeID(100)) - s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.Require().False(isCompacting) - s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments()) - }).Once() - - got := t.Process() - s.True(got) - }) - s.Run("test metaSaved success", func() { s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index c3deaacd89..b028f7f66c 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -3,7 +3,6 @@ package datacoord import ( "context" "fmt" - "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -112,16 +111,6 @@ func (t *mixCompactionTask) processExecuting() bool { return false } switch result.GetState() { - case datapb.CompactionTaskState_executing: - if t.checkTimeout() { - log.Info("mixCompactionTask timeout", zap.Int32("timeout in seconds", t.GetTaskProto().GetTimeoutInSeconds()), zap.Int64("startTime", t.GetTaskProto().GetStartTime())) - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } - return t.processTimeout() - } case datapb.CompactionTaskState_completed: t.result = result if len(result.GetSegments()) == 0 { @@ -195,8 +184,6 @@ func (t *mixCompactionTask) Process() bool { processResult = t.processPipelining() case datapb.CompactionTaskState_executing: processResult = t.processExecuting() - case datapb.CompactionTaskState_timeout: - processResult = t.processTimeout() case datapb.CompactionTaskState_meta_saved: processResult = t.processMetaSaved() case datapb.CompactionTaskState_completed: @@ -250,16 +237,6 @@ func (t *mixCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.taskProto.Load().(*datapb.CompactionTask).GetInputSegments(), false) } -func (t *mixCompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } - return true -} - func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask { taskClone := proto.Clone(t.GetTaskProto()).(*datapb.CompactionTask) for _, opt := range opts { @@ -286,16 +263,6 @@ func (t *mixCompactionTask) processFailed() bool { return true } -func (t *mixCompactionTask) checkTimeout() bool { - if t.GetTaskProto().GetTimeoutInSeconds() > 0 { - diff := time.Since(time.Unix(t.GetTaskProto().GetStartTime(), 0)).Seconds() - if diff > float64(t.GetTaskProto().GetTimeoutInSeconds()) { - return true - } - } - return false -} - func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) diff --git a/internal/datacoord/compaction_task_mix_test.go b/internal/datacoord/compaction_task_mix_test.go index 00230dd658..0a60e1a416 100644 --- a/internal/datacoord/compaction_task_mix_test.go +++ b/internal/datacoord/compaction_task_mix_test.go @@ -2,7 +2,6 @@ package datacoord import ( "testing" - "time" "github.com/samber/lo" "github.com/stretchr/testify/mock" @@ -93,44 +92,3 @@ func (s *MixCompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() { s.ErrorIs(err, merr.ErrSegmentNotFound) }) } - -func (s *MixCompactionTaskSuite) TestCompactionTimeout() { - channel := "Ch-1" - binLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} - s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { - return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - Level: datapb.SegmentLevel_L1, - InsertChannel: channel, - State: commonpb.SegmentState_Flushed, - Binlogs: binLogs, - }} - }).Times(2) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything) - alloc := allocator.NewMockAllocator(s.T()) - alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) - task := newMixCompactionTask(&datapb.CompactionTask{ - PlanID: 1, - TriggerID: 19530, - CollectionID: 1, - PartitionID: 10, - Type: datapb.CompactionType_MixCompaction, - NodeID: 1, - State: datapb.CompactionTaskState_executing, - InputSegments: []int64{200, 201}, - ResultSegments: []int64{100, 200}, - TimeoutInSeconds: 1, - }, alloc, s.mockMeta, s.mockSessMgr) - plan, err := task.BuildCompactionRequest() - task.plan = plan - s.Require().NoError(err) - time.Sleep(time.Second * 2) - - s.mockSessMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ - State: datapb.CompactionTaskState_executing, - }, nil) - end := task.processExecuting() - s.Equal(true, end) - s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().State) -} diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index ec345b4b75..b0fb0c97f0 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -742,8 +742,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() { s.handler.checkCompaction() t := s.handler.getCompactionTask(1) - // timeout - s.Nil(t) + s.NotNil(t) t = s.handler.getCompactionTask(2) // completed diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c8ffa4babd..40a3212430 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3184,10 +3184,11 @@ type dataCoordConfig struct { SegmentFlushInterval ParamItem `refreshable:"true"` // compaction - EnableCompaction ParamItem `refreshable:"false"` - EnableAutoCompaction ParamItem `refreshable:"true"` - IndexBasedCompaction ParamItem `refreshable:"true"` - CompactionTaskPrioritizer ParamItem `refreshable:"true"` + EnableCompaction ParamItem `refreshable:"false"` + EnableAutoCompaction ParamItem `refreshable:"true"` + IndexBasedCompaction ParamItem `refreshable:"true"` + CompactionTaskPrioritizer ParamItem `refreshable:"true"` + CompactionTaskQueueCapacity ParamItem `refreshable:"false"` CompactionRPCTimeout ParamItem `refreshable:"true"` CompactionMaxParallelTasks ParamItem `refreshable:"true"` @@ -3474,6 +3475,15 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl } p.CompactionTaskPrioritizer.Init(base.mgr) + p.CompactionTaskQueueCapacity = ParamItem{ + Key: "dataCoord.compaction.taskQueueCapacity", + Version: "2.5.0", + DefaultValue: "256", + Doc: `compaction task queue size`, + Export: true, + } + p.CompactionTaskQueueCapacity.Init(base.mgr) + p.CompactionRPCTimeout = ParamItem{ Key: "dataCoord.compaction.rpcTimeout", Version: "2.2.12", @@ -3490,14 +3500,6 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl } p.CompactionMaxParallelTasks.Init(base.mgr) - p.CompactionWorkerParalleTasks = ParamItem{ - Key: "dataCoord.compaction.workerMaxParallelTaskNum", - Version: "2.3.0", - DefaultValue: "2", - Export: true, - } - p.CompactionWorkerParalleTasks.Init(base.mgr) - p.MinSegmentToMerge = ParamItem{ Key: "dataCoord.compaction.min.segment", Version: "2.0.0",