mirror of https://github.com/milvus-io/milvus.git
fix: fix clustering compaction can't enqueue when compaction queue is full (#34445)
#30633 --------- Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/34481/head
parent
4e5f1d5f75
commit
efdaed4ac6
|
@ -75,12 +75,12 @@ func (t *clusteringCompactionTask) Process() bool {
|
|||
currentState := t.State.String()
|
||||
if currentState != lastState {
|
||||
ts := time.Now().UnixMilli()
|
||||
t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts))
|
||||
lastStateDuration := ts - t.GetLastStateStartTime()
|
||||
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration))
|
||||
metrics.DataCoordCompactionLatency.
|
||||
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), datapb.CompactionType_ClusteringCompaction.String(), lastState).
|
||||
Observe(float64(lastStateDuration))
|
||||
t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts))
|
||||
|
||||
if t.State == datapb.CompactionTaskState_completed {
|
||||
t.updateAndSaveTaskMeta(setEndTime(ts))
|
||||
|
@ -367,7 +367,6 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
|
|||
err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
|
||||
if err != nil {
|
||||
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
|
||||
return merr.WrapErrClusteringCompactionMetaError("CleanPartitionStatsInfo", err)
|
||||
}
|
||||
|
||||
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
|
||||
|
@ -469,6 +468,7 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap
|
|||
PreferSegmentRows: t.GetPreferSegmentRows(),
|
||||
AnalyzeTaskID: t.GetAnalyzeTaskID(),
|
||||
AnalyzeVersion: t.GetAnalyzeVersion(),
|
||||
LastStateStartTime: t.GetLastStateStartTime(),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(taskClone)
|
||||
|
|
|
@ -171,11 +171,6 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection
|
|||
|
||||
func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) {
|
||||
for _, view := range views {
|
||||
if m.compactionHandler.isFull() {
|
||||
log.RatedInfo(10, "Skip trigger compaction for scheduler is full")
|
||||
return
|
||||
}
|
||||
|
||||
switch eventType {
|
||||
case TriggerTypeLevelZeroViewChange:
|
||||
log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange")
|
||||
|
|
|
@ -48,32 +48,6 @@ func (s *CompactionTriggerManagerSuite) SetupTest() {
|
|||
s.triggerManager = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.mockPlanContext, s.meta)
|
||||
}
|
||||
|
||||
func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() {
|
||||
s.mockPlanContext.EXPECT().isFull().Return(true)
|
||||
collSegs := s.meta.GetCompactableSegmentGroupByCollection()
|
||||
segments, found := collSegs[1]
|
||||
s.Require().True(found)
|
||||
|
||||
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
|
||||
return info.GetLevel() == datapb.SegmentLevel_L0
|
||||
})
|
||||
|
||||
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
|
||||
s.Require().NotEmpty(latestL0Segments)
|
||||
needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments)
|
||||
s.Require().True(needRefresh)
|
||||
s.Require().Equal(1, len(levelZeroView))
|
||||
cView, ok := levelZeroView[0].(*LevelZeroSegmentsView)
|
||||
s.True(ok)
|
||||
s.NotNil(cView)
|
||||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
// s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe()
|
||||
s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView)
|
||||
}
|
||||
|
||||
func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
||||
handler := NewNMockHandler(s.T())
|
||||
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil)
|
||||
|
@ -104,7 +78,6 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
|
|||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything).
|
||||
RunAndReturn(func(task *datapb.CompactionTask) error {
|
||||
s.EqualValues(19530, task.GetTriggerID())
|
||||
|
@ -149,7 +122,6 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
|
|||
log.Info("view", zap.Any("cView", cView))
|
||||
|
||||
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(1, nil)
|
||||
s.mockPlanContext.EXPECT().isFull().Return(false)
|
||||
s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything).
|
||||
RunAndReturn(func(task *datapb.CompactionTask) error {
|
||||
s.EqualValues(19530, task.GetTriggerID())
|
||||
|
|
Loading…
Reference in New Issue