mirror of https://github.com/milvus-io/milvus.git
fix: [hotfix-2.3.3]Clean the compaction plan info to avoid the object leak (#29369)
issue: https://github.com/milvus-io/milvus/issues/29296 pr: https://github.com/milvus-io/milvus/pull/29365 Signed-off-by: SimFG <bang.fu@zilliz.com>pull/29396/head
parent
1ddb12ba92
commit
5b6a894e7c
|
@ -136,18 +136,61 @@ func (c *compactionPlanHandler) start() {
|
|||
log.Info("compaction handler quit")
|
||||
return
|
||||
case <-ticker.C:
|
||||
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
ts, err := c.allocator.allocTimestamp(cctx)
|
||||
ts, err := c.GetCurrentTS()
|
||||
if err != nil {
|
||||
log.Warn("unable to alloc timestamp", zap.Error(err))
|
||||
cancel()
|
||||
log.Warn("unable to get current timestamp", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
_ = c.updateCompaction(ts)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
cleanTicker := time.NewTicker(30 * time.Minute)
|
||||
defer cleanTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.quit:
|
||||
log.Info("Compaction handler quit clean")
|
||||
return
|
||||
case <-cleanTicker.C:
|
||||
c.Clean()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) Clean() {
|
||||
current, err := c.GetCurrentTS()
|
||||
if err != nil {
|
||||
log.Warn("fail to get current ts when clean", zap.Error(err))
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
for id, task := range c.plans {
|
||||
if task.state == executing || task.state == pipelining {
|
||||
continue
|
||||
}
|
||||
// after timeout + 1h, the plan will be cleaned
|
||||
if c.isTimeout(current, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()+60*60) {
|
||||
delete(c.plans, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
|
||||
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), interval)
|
||||
defer cancel()
|
||||
ts, err := c.allocator.allocTimestamp(ctx)
|
||||
if err != nil {
|
||||
log.Warn("unable to alloc timestamp", zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) stop() {
|
||||
|
@ -236,7 +279,8 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
|
|||
default:
|
||||
return errors.New("unknown compaction type")
|
||||
}
|
||||
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result))
|
||||
metrics.DataCoordCompactedSegmentSize.WithLabelValues().Observe(float64(getCompactedSegmentSize(result)))
|
||||
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result), cleanLogPath())
|
||||
c.executingTaskNum--
|
||||
if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction ||
|
||||
c.plans[planID].plan.GetType() == datapb.CompactionType_MixCompaction {
|
||||
|
@ -246,8 +290,6 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
|
|||
|
||||
nodeID := c.plans[planID].dataNodeID
|
||||
c.releaseQueue(nodeID)
|
||||
|
||||
metrics.DataCoordCompactedSegmentSize.WithLabelValues().Observe(float64(getCompactedSegmentSize(result)))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -454,6 +496,23 @@ func setResult(result *datapb.CompactionResult) compactionTaskOpt {
|
|||
}
|
||||
}
|
||||
|
||||
func cleanLogPath() compactionTaskOpt {
|
||||
return func(task *compactionTask) {
|
||||
if task.plan.GetSegmentBinlogs() != nil {
|
||||
for _, binlogs := range task.plan.GetSegmentBinlogs() {
|
||||
binlogs.FieldBinlogs = nil
|
||||
binlogs.Deltalogs = nil
|
||||
binlogs.Field2StatslogPaths = nil
|
||||
}
|
||||
}
|
||||
if task.result != nil {
|
||||
task.result.InsertLogs = nil
|
||||
task.result.Deltalogs = nil
|
||||
task.result.Field2StatslogPaths = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 0.5*min(8, NumCPU/2)
|
||||
func calculateParallel() int {
|
||||
// TODO after node memory management enabled, use this config as hard limit
|
||||
|
|
|
@ -968,3 +968,42 @@ func getFieldBinlogPathsWithEntry(id int64, entry int64, paths ...string) *datap
|
|||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func TestCompactionPlanHandler_Clean(t *testing.T) {
|
||||
startTime := tsoutil.ComposeTSByTime(time.Now(), 0)
|
||||
cleanTime := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Hour), 0)
|
||||
|
||||
a := newMockAllocator()
|
||||
c := &compactionPlanHandler{
|
||||
allocator: a,
|
||||
plans: map[int64]*compactionTask{
|
||||
1: {
|
||||
state: executing,
|
||||
},
|
||||
2: {
|
||||
state: pipelining,
|
||||
},
|
||||
3: {
|
||||
state: completed,
|
||||
plan: &datapb.CompactionPlan{
|
||||
StartTime: startTime,
|
||||
},
|
||||
},
|
||||
4: {
|
||||
state: completed,
|
||||
plan: &datapb.CompactionPlan{
|
||||
StartTime: cleanTime,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
a.err = errors.New("mock error")
|
||||
c.Clean()
|
||||
assert.Len(t, c.plans, 4)
|
||||
|
||||
a.err = nil
|
||||
a.cnt = int64(startTime)
|
||||
c.Clean()
|
||||
assert.Len(t, c.plans, 3)
|
||||
}
|
||||
|
|
|
@ -92,9 +92,13 @@ var _ allocator = (*MockAllocator)(nil)
|
|||
|
||||
type MockAllocator struct {
|
||||
cnt int64
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *MockAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) {
|
||||
if m.err != nil {
|
||||
return 0, m.err
|
||||
}
|
||||
val := atomic.AddInt64(&m.cnt, 1)
|
||||
return Timestamp(val), nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue