enhance: remove compaction parallelism control (#39081)

See #39080

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
pull/39147/head^2
Ted Xu 2025-01-10 13:23:00 +08:00 committed by GitHub
parent e5eb1159e2
commit 4355b485e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 148 additions and 144 deletions

View File

@ -567,7 +567,7 @@ dataCoord:
taskPrioritizer: default
taskQueueCapacity: 100000 # compaction task queue size
rpcTimeout: 10
maxParallelTaskNum: 10
maxParallelTaskNum: -1 # Deprecated, see datanode.slot.slotCap
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
gcInterval: 1800 # The time interval in seconds for compaction gc
mix:

View File

@ -78,6 +78,72 @@ type compactionInfo struct {
mergeInfos map[int64]*milvuspb.CompactionMergeInfo
}
type NodeAssigner interface {
assign(t CompactionTask) bool
}
type SlotBasedNodeAssigner struct {
cluster Cluster
slots map[int64]int64
}
var _ NodeAssigner = (*SlotBasedNodeAssigner)(nil)
func newSlotBasedNodeAssigner(cluster Cluster) *SlotBasedNodeAssigner {
return &SlotBasedNodeAssigner{
cluster: cluster,
}
}
func (sna *SlotBasedNodeAssigner) assign(t CompactionTask) bool {
if sna.slots == nil {
sna.slots = sna.cluster.QuerySlots()
}
logger := log.With(
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.String("type", t.GetTaskProto().GetType().String()),
zap.String("vchannel", t.GetTaskProto().GetChannel()))
nodeID, useSlot := sna.pickAnyNode(t)
if nodeID == NullNodeID {
logger.RatedWarn(10, "cannot find datanode for compaction task",
zap.Int64("required", t.GetSlotUsage()), zap.Any("available", sna.slots))
return false
}
err := t.SetNodeID(nodeID)
if err != nil {
logger.Warn("assignNodeID failed", zap.Error(err))
return false
}
// update the input nodeSlots
sna.slots[nodeID] = sna.slots[nodeID] - useSlot
logger.Debug("assignNodeID success", zap.Any("nodeID", nodeID))
return true
}
func (sna *SlotBasedNodeAssigner) pickAnyNode(task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1
useSlot = task.GetSlotUsage()
if useSlot <= 0 {
log.Warn("task slot should not be 0",
zap.Int64("planID", task.GetTaskProto().GetPlanID()),
zap.String("type", task.GetTaskProto().GetType().String()))
return NullNodeID, useSlot
}
for id, slots := range sna.slots {
if slots >= useSlot && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
return nodeID, useSlot
}
type compactionPlanHandler struct {
queueTasks CompactionQueue
@ -202,15 +268,21 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
}
}
func (c *compactionPlanHandler) schedule() []CompactionTask {
func (c *compactionPlanHandler) checkSchedule() {
assigner := newSlotBasedNodeAssigner(c.cluster)
err := c.checkCompaction(assigner)
if err != nil {
log.Info("fail to update compaction", zap.Error(err))
}
c.cleanFailedTasks()
c.schedule(assigner)
}
func (c *compactionPlanHandler) schedule(assigner NodeAssigner) []CompactionTask {
selected := make([]CompactionTask, 0)
if c.queueTasks.Len() == 0 {
return selected
}
var (
parallelism = Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
slots map[int64]int64
)
l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()
@ -219,11 +291,6 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
clusterLabelExcludes := typeutil.NewSet[string]()
c.executingGuard.RLock()
if len(c.executingTasks) >= parallelism {
c.executingGuard.RUnlock()
return selected
}
for _, t := range c.executingTasks {
switch t.GetTaskProto().GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
@ -253,8 +320,7 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
// The schedule loop will stop if either:
// 1. no more task to schedule (the task queue is empty)
// 2. the parallelism of running tasks is reached
// 3. no avaiable slots
// 2. no avaiable slots
for {
t, err := c.queueTasks.Dequeue()
if err != nil {
@ -291,24 +357,15 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
}
if t.NeedReAssignNodeID() {
if slots == nil {
slots = c.cluster.QuerySlots()
}
id := assignNodeID(slots, t)
if id == NullNodeID {
log.RatedWarn(10, "not enough slots for compaction task", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
if ok := assigner.assign(t); !ok {
selected = selected[:len(selected)-1]
excluded = append(excluded, t)
break // 3. no avaiable slots
break // 2. no avaiable slots
}
}
c.executingGuard.Lock()
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
if len(c.executingTasks) >= parallelism {
c.executingGuard.Unlock()
break // 2. the parallelism of running tasks is reached
}
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
@ -318,9 +375,8 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
func (c *compactionPlanHandler) start() {
c.loadMeta()
c.stopWg.Add(3)
c.stopWg.Add(2)
go c.loopSchedule()
go c.loopCheck()
go c.loopClean()
}
@ -397,29 +453,7 @@ func (c *compactionPlanHandler) loopSchedule() {
return
case <-scheduleTicker.C:
c.schedule()
}
}
}
func (c *compactionPlanHandler) loopCheck() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
log.Info("compactionPlanHandler start loop check", zap.Any("check result interval", interval))
defer c.stopWg.Done()
checkResultTicker := time.NewTicker(interval)
defer checkResultTicker.Stop()
for {
select {
case <-c.stopCh:
log.Info("compactionPlanHandler quit loop check")
return
case <-checkResultTicker.C:
err := c.checkCompaction()
if err != nil {
log.Info("fail to update compaction", zap.Error(err))
}
c.cleanFailedTasks()
c.checkSchedule()
}
}
}
@ -647,50 +681,20 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
return task, nil
}
func assignNodeID(slots map[int64]int64, t CompactionTask) int64 {
if len(slots) == 0 {
return NullNodeID
}
log := log.Ctx(context.TODO())
nodeID, useSlot := pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()), zap.String("vchannel", t.GetTaskProto().GetChannel()))
return NullNodeID
}
err := t.SetNodeID(nodeID)
if err != nil {
log.Info("compactionHandler assignNodeID failed",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Error(err))
return NullNodeID
}
// update the input nodeSlots
slots[nodeID] = slots[nodeID] - useSlot
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Any("nodeID", nodeID))
return nodeID
}
// checkCompaction retrieves executing tasks and calls each task's Process() method
// to evaluate its state and progress through the state machine.
// Completed tasks are removed from executingTasks.
// Tasks that fail or timeout are moved from executingTasks to cleaningTasks,
// where task-specific clean logic is performed asynchronously.
func (c *compactionPlanHandler) checkCompaction() error {
func (c *compactionPlanHandler) checkCompaction(assigner NodeAssigner) error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
// Assign node id if needed
var slots map[int64]int64
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if t.NeedReAssignNodeID() {
if slots == nil {
slots = c.cluster.QuerySlots()
}
id := assignNodeID(slots, t)
if id == NullNodeID {
if ok := assigner.assign(t); !ok {
break
}
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
@ -756,26 +760,6 @@ func (c *compactionPlanHandler) cleanFailedTasks() {
c.cleaningGuard.Unlock()
}
func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1
useSlot = task.GetSlotUsage()
if useSlot <= 0 {
log.Ctx(context.TODO()).Warn("task slot should not be 0", zap.Int64("planID", task.GetTaskProto().GetPlanID()), zap.String("type", task.GetTaskProto().GetType().String()))
return NullNodeID, useSlot
}
for id, slots := range nodeSlots {
if slots >= useSlot && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
return nodeID, useSlot
}
// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
return c.queueTasks.Len() >= c.queueTasks.capacity

View File

@ -67,7 +67,9 @@ func (s *CompactionPlanHandlerSuite) SetupTest() {
func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() {
s.SetupTest()
s.handler.schedule()
assigner := newSlotBasedNodeAssigner(s.cluster)
s.handler.schedule(assigner)
s.Empty(s.handler.executingTasks)
}
@ -228,7 +230,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
s.handler.submitTask(t)
}
gotTasks := s.handler.schedule()
assigner := newSlotBasedNodeAssigner(s.cluster)
gotTasks := s.handler.schedule(assigner)
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 {
return t.GetTaskProto().GetPlanID()
}))
@ -301,8 +304,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
t.SetPlan(test.plans[i])
s.handler.submitTask(t)
}
gotTasks := s.handler.schedule()
assigner := newSlotBasedNodeAssigner(s.cluster)
gotTasks := s.handler.schedule(assigner)
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 {
return t.GetTaskProto().GetPlanID()
}))
@ -437,8 +440,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
for _, t := range test.tasks {
s.handler.submitTask(t)
}
gotTasks := s.handler.schedule()
assigner := newSlotBasedNodeAssigner(s.cluster)
gotTasks := s.handler.schedule(assigner)
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 {
return t.GetTaskProto().GetPlanID()
}))
@ -448,39 +451,40 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.SetupTest()
nodeSlots := map[int64]int64{
assigner := newSlotBasedNodeAssigner(s.cluster)
assigner.slots = map[int64]int64{
100: 16,
101: 23,
}
task1 := newMixCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
}, nil, s.mockMeta, nil)
task1.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
node, useSlot := pickAnyNode(nodeSlots, task1)
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
ok := assigner.assign(task1)
s.Equal(true, ok)
s.Equal(int64(101), task1.GetTaskProto().GetNodeID())
task2 := newMixCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
}, nil, s.mockMeta, nil)
task2.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
node, useSlot = pickAnyNode(nodeSlots, task2)
s.Equal(int64(100), node)
nodeSlots[node] = nodeSlots[node] - useSlot
ok = assigner.assign(task2)
s.Equal(true, ok)
s.Equal(int64(100), task2.GetTaskProto().GetNodeID())
task3 := newMixCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
}, nil, s.mockMeta, nil)
task3.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
node, useSlot = pickAnyNode(nodeSlots, task3)
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
ok = assigner.assign(task3)
s.Equal(true, ok)
s.Equal(int64(101), task3.GetTaskProto().GetNodeID())
node, useSlot = pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
s.Equal(int64(NullNodeID), node)
ok = assigner.assign(&mixCompactionTask{})
s.Equal(false, ok)
}
func (s *CompactionPlanHandlerSuite) TestPickAnyNodeSlotUsageShouldNotBeZero() {
@ -491,11 +495,12 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeSlotUsageShouldNotBeZero() {
}
task1 := newMixCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
}, nil, s.mockMeta, nil)
task1.slotUsage = 0
nodeID, useSlot := pickAnyNode(nodeSlots, task1)
s.Equal(int64(NullNodeID), nodeID)
s.Equal(int64(0), useSlot)
assigner := newSlotBasedNodeAssigner(s.cluster)
assigner.slots = nodeSlots
ok := assigner.assign(task1)
s.Equal(false, ok)
}
func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
@ -509,23 +514,26 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
task1 := newClusteringCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
}, nil, nil, nil, nil, nil)
}, nil, s.mockMeta, nil, nil, nil)
task1.slotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
task2 := newClusteringCompactionTask(&datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
}, nil, nil, nil, nil, nil)
}, nil, s.mockMeta, nil, nil, nil)
task2.slotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
executingTasks[1] = task1
executingTasks[2] = task2
s.handler.executingTasks = executingTasks
node, useSlot := pickAnyNode(nodeSlots, task1)
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot
node, useSlot = pickAnyNode(nodeSlots, task2)
s.Equal(int64(NullNodeID), node)
assigner := newSlotBasedNodeAssigner(s.cluster)
assigner.slots = nodeSlots
ok := assigner.assign(task1)
s.Equal(true, ok)
s.Equal(int64(101), task1.GetTaskProto().GetNodeID())
ok = assigner.assign(task2)
s.Equal(false, ok)
}
func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
@ -623,7 +631,8 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
s.handler.submitTask(t)
}
s.handler.schedule()
assigner := newSlotBasedNodeAssigner(s.cluster)
s.handler.schedule(assigner)
info := s.handler.getCompactionInfo(context.TODO(), 1)
s.Equal(1, info.completedCnt)
@ -804,9 +813,10 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
s.handler.submitTask(t)
}
s.handler.schedule()
assigner := newSlotBasedNodeAssigner(s.cluster)
s.handler.schedule(assigner)
// time.Sleep(2 * time.Second)
s.handler.checkCompaction()
s.handler.checkCompaction(assigner)
t := s.handler.getCompactionTask(1)
s.NotNil(t)
@ -938,8 +948,10 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.submitTask(task)
s.handler.schedule()
err := s.handler.checkCompaction()
assigner := newSlotBasedNodeAssigner(s.cluster)
s.handler.schedule(assigner)
err := s.handler.checkCompaction(assigner)
s.NoError(err)
}
@ -982,7 +994,9 @@ func (s *CompactionPlanHandlerSuite) TestCleanCompaction() {
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
err := s.handler.checkCompaction()
assigner := newSlotBasedNodeAssigner(s.cluster)
err := s.handler.checkCompaction(assigner)
s.NoError(err)
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
@ -1013,8 +1027,9 @@ func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() {
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
assigner := newSlotBasedNodeAssigner(s.cluster)
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.handler.checkCompaction(assigner)
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.cleanFailedTasks()
@ -1061,7 +1076,9 @@ func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompactionCommitFail() {
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
assigner := newSlotBasedNodeAssigner(s.cluster)
s.handler.checkCompaction(assigner)
s.Equal(0, len(task.GetTaskProto().GetResultSegments()))
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
@ -1104,8 +1121,10 @@ func (s *CompactionPlanHandlerSuite) TestKeepClean() {
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
assigner := newSlotBasedNodeAssigner(s.cluster)
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.handler.checkCompaction(assigner)
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.cleanFailedTasks()

View File

@ -3630,7 +3630,8 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl
p.CompactionMaxParallelTasks = ParamItem{
Key: "dataCoord.compaction.maxParallelTaskNum",
Version: "2.2.12",
DefaultValue: "10",
DefaultValue: "-1",
Doc: "Deprecated, see datanode.slot.slotCap",
Export: true,
}
p.CompactionMaxParallelTasks.Init(base.mgr)