mirror of https://github.com/milvus-io/milvus.git
fix: Fix some left comments (#29027)
See also: #28814 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/29175/head
parent
ad866d2889
commit
2c62bcbbf6
|
@ -134,7 +134,8 @@ func newCompactionPlanHandler(sessions SessionManager, cm *ChannelManager, meta
|
|||
|
||||
func (c *compactionPlanHandler) checkResult() {
|
||||
// deal results
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), interval)
|
||||
defer cancel()
|
||||
ts, err := c.allocator.allocTimestamp(ctx)
|
||||
if err != nil {
|
||||
|
@ -282,7 +283,7 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) {
|
|||
seg.Deltalogs = info.GetDeltalogs()
|
||||
}
|
||||
}
|
||||
log.Info("Compaction handler refresed mix compaction plan")
|
||||
log.Info("Compaction handler refreshed mix compaction plan")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ func (v *LevelZeroSegmentsView) Equal(others []*SegmentView) bool {
|
|||
}
|
||||
|
||||
// Trigger triggers all qualified LevelZeroSegments according to views
|
||||
func (v *LevelZeroSegmentsView) Trigger() CompactionView {
|
||||
func (v *LevelZeroSegmentsView) Trigger() (CompactionView, string) {
|
||||
// Only choose segments with position less than the earliest growing segment position
|
||||
validSegments := lo.Filter(v.segments, func(view *SegmentView, _ int) bool {
|
||||
return view.dmlPos.GetTimestamp() < v.earliestGrowingSegmentPos.GetTimestamp()
|
||||
|
@ -79,6 +79,7 @@ func (v *LevelZeroSegmentsView) Trigger() CompactionView {
|
|||
|
||||
curDeltaSize float64
|
||||
curDeltaCount int
|
||||
reason string
|
||||
)
|
||||
|
||||
for _, segView := range validSegments {
|
||||
|
@ -86,13 +87,21 @@ func (v *LevelZeroSegmentsView) Trigger() CompactionView {
|
|||
curDeltaCount += segView.DeltalogCount
|
||||
}
|
||||
|
||||
if curDeltaSize > minDeltaSize {
|
||||
reason = "level zero segments size reaches compaction limit"
|
||||
}
|
||||
|
||||
if curDeltaCount > minDeltaCount {
|
||||
reason = "level zero segments number reaches compaction limit"
|
||||
}
|
||||
|
||||
if curDeltaSize < minDeltaSize && curDeltaCount < minDeltaCount {
|
||||
return nil
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
return &LevelZeroSegmentsView{
|
||||
label: v.label,
|
||||
segments: validSegments,
|
||||
earliestGrowingSegmentPos: v.earliestGrowingSegmentPos,
|
||||
}
|
||||
}, reason
|
||||
}
|
||||
|
|
|
@ -140,7 +140,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() {
|
|||
}
|
||||
log.Info("LevelZeroSegmentsView", zap.String("view", s.v.String()))
|
||||
|
||||
gotView := s.v.Trigger()
|
||||
gotView, reason := s.v.Trigger()
|
||||
if len(test.expectedSegs) == 0 {
|
||||
s.Nil(gotView)
|
||||
} else {
|
||||
|
@ -152,6 +152,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() {
|
|||
return v.ID
|
||||
})
|
||||
s.ElementsMatch(gotSegIDs, test.expectedSegs)
|
||||
log.Info("trigger reason", zap.Any("trigger reason", reason))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -54,13 +54,12 @@ func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionT
|
|||
for _, view := range views {
|
||||
switch eventType {
|
||||
case TriggerTypeLevelZeroView:
|
||||
log.Info("Start to trigger a level zero compaction")
|
||||
outView := view.Trigger()
|
||||
log.Debug("Start to trigger a level zero compaction")
|
||||
outView, reason := view.Trigger()
|
||||
if outView == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("Finish trigger out view, build level zero compaction plan", zap.String("out view", outView.String()))
|
||||
plan := m.BuildLevelZeroCompactionPlan(outView)
|
||||
if plan == nil {
|
||||
continue
|
||||
|
@ -79,7 +78,11 @@ func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionT
|
|||
// TODO, remove handler, use scheduler
|
||||
// m.scheduler.Submit(plan)
|
||||
m.handler.execCompactionPlan(signal, plan)
|
||||
log.Info("Finish to trigger a LevelZeroCompaction plan", zap.String("output view", outView.String()))
|
||||
log.Info("Finish to trigger a LevelZeroCompaction plan",
|
||||
zap.Int64("planID", plan.GetPlanID()),
|
||||
zap.String("type", plan.GetType().String()),
|
||||
zap.String("reason", reason),
|
||||
zap.String("output view", outView.String()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ type CompactionView interface {
|
|||
GetSegmentsView() []*SegmentView
|
||||
Append(segments ...*SegmentView)
|
||||
String() string
|
||||
Trigger() CompactionView
|
||||
Trigger() (CompactionView, string)
|
||||
}
|
||||
|
||||
type FullViews struct {
|
||||
|
|
Loading…
Reference in New Issue