mirror of https://github.com/milvus-io/milvus.git
fix: Fix clustering compaction task leak (#36800)
issue: #36686 bug reason: - The clustering compaction tasks on the datanode were never cleaned up. - The clustering compaction task contains a mapping from clustering key to buffer, this caused a large memory leak. fix: - clean the tasks on datanode by datacoord when clustering compaction finished. - reset the mapping that from clustering key to buffer on datanode when clustering finished. Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/36819/head
parent
b474374ea5
commit
04c306e63f
|
@ -293,6 +293,11 @@ func (t *clusteringCompactionTask) processExecuting() error {
|
|||
}
|
||||
|
||||
func (t *clusteringCompactionTask) processMetaSaved() error {
|
||||
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||
PlanID: t.GetPlanID(),
|
||||
}); err != nil {
|
||||
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
|
||||
}
|
||||
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic))
|
||||
}
|
||||
|
||||
|
@ -474,6 +479,13 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
|
|||
|
||||
func (t *clusteringCompactionTask) processFailedOrTimeout() error {
|
||||
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String()))
|
||||
|
||||
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||
PlanID: t.GetPlanID(),
|
||||
}); err != nil {
|
||||
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
|
||||
}
|
||||
|
||||
// revert segments meta
|
||||
var operators []UpdateOperator
|
||||
// revert level of input segments
|
||||
|
|
|
@ -107,6 +107,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
|
|||
},
|
||||
})
|
||||
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
task := s.generateBasicTask(false)
|
||||
|
||||
|
@ -372,6 +373,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
|
|||
},
|
||||
},
|
||||
}, nil).Once()
|
||||
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
s.Equal(false, task.Process())
|
||||
s.Equal(datapb.CompactionTaskState_statistic, task.GetState())
|
||||
})
|
||||
|
@ -405,6 +407,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
|
|||
},
|
||||
},
|
||||
}, nil).Once()
|
||||
// DropCompactionPlan fail
|
||||
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(merr.WrapErrNodeNotFound(1)).Once()
|
||||
s.Equal(false, task.Process())
|
||||
s.Equal(datapb.CompactionTaskState_statistic, task.GetState())
|
||||
})
|
||||
|
@ -440,6 +444,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
|
|||
},
|
||||
},
|
||||
}, nil).Once()
|
||||
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
s.Equal(true, task.Process())
|
||||
s.Equal(datapb.CompactionTaskState_cleaned, task.GetState())
|
||||
|
|
|
@ -297,6 +297,8 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
|
|||
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()).
|
||||
Observe(float64(t.tr.ElapseSpan().Milliseconds()))
|
||||
log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load()))
|
||||
// clear the buffer cache
|
||||
t.keyToBufferFunc = nil
|
||||
|
||||
return planResult, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue