diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index fcd3da1093..1d854c33d8 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -58,11 +58,12 @@ func (c *compactionExecutor) toCompleteState(task compactor) { task.complete() c.executing.Delete(task.getPlanID()) } -func (c *compactionExecutor) injectDone(planID UniqueID) { + +func (c *compactionExecutor) injectDone(planID UniqueID, success bool) { c.completed.Delete(planID) task, loaded := c.completedCompactor.LoadAndDelete(planID) if loaded { - task.(compactor).injectDone() + task.(compactor).injectDone(success) } } @@ -128,7 +129,7 @@ func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string // remove all completed plans for vChannelName c.completed.Range(func(key interface{}, value interface{}) bool { if value.(*datapb.CompactionResult).GetChannel() == vChannelName { - c.injectDone(key.(UniqueID)) + c.injectDone(key.(UniqueID), true) log.Info("remove compaction results for dropped channel", zap.String("channel", vChannelName), zap.Int64("planID", key.(UniqueID))) diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index 2f0ff051eb..a19ad9a767 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -142,7 +142,7 @@ func (mc *mockCompactor) complete() { mc.done <- struct{}{} } -func (mc *mockCompactor) injectDone() { +func (mc *mockCompactor) injectDone(success bool) { } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 06f0c30384..948d823a60 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -60,7 +60,7 @@ type iterator = storage.Iterator type compactor interface { complete() compact() (*datapb.CompactionResult, error) - injectDone() + injectDone(success bool) stop() getPlanID() UniqueID getCollection() UniqueID @@ -126,7 +126,7 @@ func (t *compactionTask) complete() { func (t *compactionTask) stop() { t.cancel() <-t.done - t.injectDone() + t.injectDone(true) } func (t *compactionTask) getPlanID() UniqueID { @@ -759,10 +759,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { return pack, nil } -func (t *compactionTask) injectDone() { +func (t *compactionTask) injectDone(success bool) { if t.inject != nil { uninjectStart := time.Now() - t.inject.injectDone(true) + t.inject.injectDone(success) uninjectEnd := time.Now() log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart)))) } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 879663dab6..6221e64974 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -863,7 +863,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.NotEmpty(t, result.Field2StatslogPaths) assert.Equal(t, 0, mockfm.injectCount()) - task.injectDone() + task.injectDone(true) time.Sleep(500 * time.Millisecond) assert.Equal(t, 1, mockfm.injectCount()) } @@ -958,7 +958,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.NotEmpty(t, result.Field2StatslogPaths) assert.Equal(t, 0, mockfm.injectCount()) - task.injectDone() + task.injectDone(true) time.Sleep(500 * time.Millisecond) assert.Equal(t, 1, mockfm.injectCount()) }) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 9ba3efc3d8..798669ec2c 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -399,9 +399,10 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments ds.fg.Blockall() defer ds.fg.Unblock() if err := channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil { + node.compactionExecutor.injectDone(req.GetPlanID(), false) return merr.Status(err), nil } - node.compactionExecutor.injectDone(req.GetPlanID()) + node.compactionExecutor.injectDone(req.GetPlanID(), true) return merr.Status(nil), nil }