diff --git a/internal/datanode/compaction/executor.go b/internal/datanode/compaction/executor.go index f0e4a427de..167fc03aca 100644 --- a/internal/datanode/compaction/executor.go +++ b/internal/datanode/compaction/executor.go @@ -70,18 +70,20 @@ func NewExecutor() *executor { } func (e *executor) Execute(task Compactor) { + _, ok := e.executing.GetOrInsert(task.GetPlanID(), task) + if ok { + log.Warn("duplicated compaction task", + zap.Int64("planID", task.GetPlanID()), + zap.String("channel", task.GetChannelName())) + return + } e.taskCh <- task - e.toExecutingState(task) } func (e *executor) Slots() int64 { return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len()) } -func (e *executor) toExecutingState(task Compactor) { - e.executing.Insert(task.GetPlanID(), task) -} - func (e *executor) toCompleteState(task Compactor) { task.Complete() e.executing.GetAndRemove(task.GetPlanID()) diff --git a/internal/datanode/compaction/executor_test.go b/internal/datanode/compaction/executor_test.go index 164852c8e8..81b64556da 100644 --- a/internal/datanode/compaction/executor_test.go +++ b/internal/datanode/compaction/executor_test.go @@ -31,10 +31,11 @@ func TestCompactionExecutor(t *testing.T) { t.Run("Test execute", func(t *testing.T) { planID := int64(1) mockC := NewMockCompactor(t) - mockC.EXPECT().GetPlanID().Return(planID).Once() - mockC.EXPECT().GetChannelName().Return("ch1").Once() + mockC.EXPECT().GetPlanID().Return(planID) + mockC.EXPECT().GetChannelName().Return("ch1") executor := NewExecutor() executor.Execute(mockC) + executor.Execute(mockC) assert.EqualValues(t, 1, len(executor.taskCh)) assert.EqualValues(t, 1, executor.executing.Len())