milvus/internal/datacoord/compaction_trigger_v2.go

159 lines
4.6 KiB
Go

package datacoord
import (
"context"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
)
type CompactionTriggerType int8
const (
TriggerTypeLevelZeroViewChange CompactionTriggerType = iota + 1
TriggerTypeLevelZeroViewIDLE
TriggerTypeSegmentSizeViewChange
)
type TriggerManager interface {
Notify(UniqueID, CompactionTriggerType, []CompactionView)
}
// CompactionTriggerManager registers Triggers to TriggerType
// so that when the certain TriggerType happens, the corresponding triggers can
// trigger the correct compaction plans.
// Trigger types:
// 1. Change of Views
// - LevelZeroViewTrigger
// - SegmentSizeViewTrigger
//
// 2. SystemIDLE & schedulerIDLE
// 3. Manual Compaction
type CompactionTriggerManager struct {
scheduler Scheduler
handler compactionPlanContext // TODO replace with scheduler
allocator allocator
}
func NewCompactionTriggerManager(alloc allocator, handler compactionPlanContext) *CompactionTriggerManager {
m := &CompactionTriggerManager{
allocator: alloc,
handler: handler,
}
return m
}
func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionTriggerType, views []CompactionView) {
log := log.With(zap.Int64("taskID", taskID))
for _, view := range views {
if m.handler.isFull() {
log.RatedInfo(1.0, "Skip trigger compaction for scheduler is full")
return
}
switch eventType {
case TriggerTypeLevelZeroViewChange:
log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange")
outView, reason := view.Trigger()
if outView != nil {
log.Info("Success to trigger a LevelZeroCompaction output view, try to sumit",
zap.String("reason", reason),
zap.String("output view", outView.String()))
m.SubmitL0ViewToScheduler(taskID, outView)
}
case TriggerTypeLevelZeroViewIDLE:
log.Debug("Start to trigger a level zero compaction by TriggerTypLevelZeroViewIDLE")
outView, reason := view.Trigger()
if outView == nil {
log.Info("Start to force trigger a level zero compaction by TriggerTypLevelZeroViewIDLE")
outView, reason = view.ForceTrigger()
}
if outView != nil {
log.Info("Success to trigger a LevelZeroCompaction output view, try to submit",
zap.String("reason", reason),
zap.String("output view", outView.String()))
m.SubmitL0ViewToScheduler(taskID, outView)
}
}
}
}
func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(taskID int64, outView CompactionView) {
plan := m.buildL0CompactionPlan(outView)
if plan == nil {
return
}
label := outView.GetGroupLabel()
signal := &compactionSignal{
id: taskID,
isForce: false,
isGlobal: true,
collectionID: label.CollectionID,
partitionID: label.PartitionID,
pos: outView.(*LevelZeroSegmentsView).earliestGrowingSegmentPos,
}
// TODO, remove handler, use scheduler
// m.scheduler.Submit(plan)
m.handler.execCompactionPlan(signal, plan)
log.Info("Finish to submit a LevelZeroCompaction plan",
zap.Int64("taskID", taskID),
zap.Int64("planID", plan.GetPlanID()),
zap.String("type", plan.GetType().String()),
)
}
func (m *CompactionTriggerManager) buildL0CompactionPlan(view CompactionView) *datapb.CompactionPlan {
var segmentBinlogs []*datapb.CompactionSegmentBinlogs
levelZeroSegs := lo.Map(view.GetSegmentsView(), func(segView *SegmentView, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: segView.ID,
Level: datapb.SegmentLevel_L0,
CollectionID: view.GetGroupLabel().CollectionID,
PartitionID: view.GetGroupLabel().PartitionID,
// Deltalogs: deltalogs are filled before executing the plan
}
})
segmentBinlogs = append(segmentBinlogs, levelZeroSegs...)
plan := &datapb.CompactionPlan{
Type: datapb.CompactionType_Level0DeleteCompaction,
SegmentBinlogs: segmentBinlogs,
Channel: view.GetGroupLabel().Channel,
}
if err := fillOriginPlan(m.allocator, plan); err != nil {
return nil
}
return plan
}
// chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName
type chanPartSegments struct {
collectionID UniqueID
partitionID UniqueID
channelName string
segments []*SegmentInfo
}
func fillOriginPlan(alloc allocator, plan *datapb.CompactionPlan) error {
// TODO context
id, err := alloc.allocID(context.TODO())
if err != nil {
return err
}
plan.PlanID = id
plan.TimeoutInSeconds = Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32()
return nil
}