milvus/internal/querycoordv2/checkers/controller.go

106 lines
2.5 KiB
Go

package checkers
import (
"context"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"go.uber.org/zap"
)
var (
checkRoundTaskNumLimit = 128
)
type CheckerController struct {
stopCh chan struct{}
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
broker *meta.CoordinatorBroker
nodeMgr *session.NodeManager
balancer balance.Balance
scheduler task.Scheduler
checkers []Checker
}
func NewCheckerController(
meta *meta.Meta,
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
balancer balance.Balance,
scheduler task.Scheduler) *CheckerController {
// CheckerController runs checkers with the order,
// the former checker has higher priority
checkers := []Checker{
NewChannelChecker(meta, dist, targetMgr, balancer),
NewSegmentChecker(meta, dist, targetMgr, balancer),
NewBalanceChecker(balancer),
}
for i, checker := range checkers {
checker.SetID(int64(i + 1))
}
return &CheckerController{
stopCh: make(chan struct{}),
meta: meta,
dist: dist,
targetMgr: targetMgr,
scheduler: scheduler,
checkers: checkers,
}
}
func (controller *CheckerController) Start(ctx context.Context) {
go func() {
ticker := time.NewTicker(Params.QueryCoordCfg.CheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("CheckerController stopped due to context canceled")
return
case <-controller.stopCh:
log.Info("CheckerController stopped")
return
case <-ticker.C:
controller.check(ctx)
}
}
}()
}
func (controller *CheckerController) Stop() {
close(controller.stopCh)
}
// check is the real implementation of Check
func (controller *CheckerController) check(ctx context.Context) {
tasks := make([]task.Task, 0)
for id, checker := range controller.checkers {
log := log.With(zap.Int("checkerID", id))
tasks = append(tasks, checker.Check(ctx)...)
if len(tasks) >= checkRoundTaskNumLimit {
log.Info("checkers have spawn too many tasks, won't run subsequent checkers, and truncate the spawned tasks",
zap.Int("taskNum", len(tasks)),
zap.Int("taskNumLimit", checkRoundTaskNumLimit))
tasks = tasks[:checkRoundTaskNumLimit]
break
}
}
for _, task := range tasks {
controller.scheduler.Add(task)
}
}