enhance: Ensure the idempotency of compaction task (#33872)

/kind enhancement

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/33891/head
yihao.dai 2024-06-16 22:09:57 +08:00 committed by GitHub
parent 8537f3daeb
commit 1a9ab52f66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 7 deletions

View File

@ -70,18 +70,20 @@ func NewExecutor() *executor {
}
func (e *executor) Execute(task Compactor) {
_, ok := e.executing.GetOrInsert(task.GetPlanID(), task)
if ok {
log.Warn("duplicated compaction task",
zap.Int64("planID", task.GetPlanID()),
zap.String("channel", task.GetChannelName()))
return
}
e.taskCh <- task
e.toExecutingState(task)
}
func (e *executor) Slots() int64 {
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len())
}
func (e *executor) toExecutingState(task Compactor) {
e.executing.Insert(task.GetPlanID(), task)
}
func (e *executor) toCompleteState(task Compactor) {
task.Complete()
e.executing.GetAndRemove(task.GetPlanID())

View File

@ -31,10 +31,11 @@ func TestCompactionExecutor(t *testing.T) {
t.Run("Test execute", func(t *testing.T) {
planID := int64(1)
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(planID).Once()
mockC.EXPECT().GetChannelName().Return("ch1").Once()
mockC.EXPECT().GetPlanID().Return(planID)
mockC.EXPECT().GetChannelName().Return("ch1")
executor := NewExecutor()
executor.Execute(mockC)
executor.Execute(mockC)
assert.EqualValues(t, 1, len(executor.taskCh))
assert.EqualValues(t, 1, executor.executing.Len())