feat: Add checker activation (#28611)

issue: https://github.com/milvus-io/milvus/issues/28610

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
pull/28735/head
Bingyi Sun 2023-11-24 18:08:24 +08:00 committed by GitHub
parent 089e58dfbb
commit 8514a39d1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 210 additions and 18 deletions

View File

@ -36,6 +36,7 @@ import (
// BalanceChecker checks the cluster distribution and generates balance tasks.
type BalanceChecker struct {
*checkerActivation
balance.Balance
meta *meta.Meta
nodeManager *session.NodeManager
@ -45,6 +46,7 @@ type BalanceChecker struct {
func NewBalanceChecker(meta *meta.Meta, balancer balance.Balance, nodeMgr *session.NodeManager, scheduler task.Scheduler) *BalanceChecker {
return &BalanceChecker{
checkerActivation: newCheckerActivation(),
Balance: balancer,
meta: meta,
nodeManager: nodeMgr,
@ -53,7 +55,7 @@ func NewBalanceChecker(meta *meta.Meta, balancer balance.Balance, nodeMgr *sessi
}
}
func (b *BalanceChecker) ID() task.Source {
func (b *BalanceChecker) ID() checkerType {
return balanceChecker
}
@ -144,6 +146,9 @@ func (b *BalanceChecker) balanceReplicas(replicaIDs []int64) ([]balance.SegmentA
}
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
if !b.IsActive() {
return nil
}
ret := make([]task.Task, 0)
replicasToBalance := b.replicasToBalance()

View File

@ -34,6 +34,7 @@ import (
// TODO(sunby): have too much similar codes with SegmentChecker
type ChannelChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
@ -47,14 +48,15 @@ func NewChannelChecker(
balancer balance.Balance,
) *ChannelChecker {
return &ChannelChecker{
meta: meta,
dist: dist,
targetMgr: targetMgr,
balancer: balancer,
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
targetMgr: targetMgr,
balancer: balancer,
}
}
func (c *ChannelChecker) ID() task.Source {
func (c *ChannelChecker) ID() checkerType {
return channelChecker
}
@ -70,6 +72,9 @@ func (c *ChannelChecker) readyToCheck(collectionID int64) bool {
}
func (c *ChannelChecker) Check(ctx context.Context) []task.Task {
if !c.IsActive() {
return nil
}
collectionIDs := c.meta.CollectionManager.GetAll()
tasks := make([]task.Task, 0)
for _, cid := range collectionIDs {

View File

@ -18,12 +18,38 @@ package checkers
import (
"context"
"sync/atomic"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
)
type Checker interface {
ID() task.Source
ID() checkerType
Description() string
Check(ctx context.Context) []task.Task
IsActive() bool
Activate()
Deactivate()
}
type checkerActivation struct {
active atomic.Bool
}
func (c *checkerActivation) IsActive() bool {
return c.active.Load()
}
func (c *checkerActivation) Activate() {
c.active.Store(true)
}
func (c *checkerActivation) Deactivate() {
c.active.Store(false)
}
func newCheckerActivation() *checkerActivation {
c := &checkerActivation{}
c.Activate()
return c
}

View File

@ -21,6 +21,7 @@ import (
"sync"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
@ -56,6 +57,7 @@ var (
balanceChecker: balanceCheckerName,
indexChecker: indexCheckerName,
}
errTypeNotFound = errors.New("checker type not found")
)
func (s checkerType) String() string {
@ -190,3 +192,32 @@ func (controller *CheckerController) check(ctx context.Context, checkType checke
}
}
}
func (controller *CheckerController) Deactivate(typ checkerType) error {
for _, checker := range controller.checkers {
if checker.ID() == typ {
checker.Deactivate()
return nil
}
}
return errTypeNotFound
}
func (controller *CheckerController) Activate(typ checkerType) error {
for _, checker := range controller.checkers {
if checker.ID() == typ {
checker.Activate()
return nil
}
}
return errTypeNotFound
}
func (controller *CheckerController) IsActive(typ checkerType) (bool, error) {
for _, checker := range controller.checkers {
if checker.ID() == typ {
return checker.IsActive(), nil
}
}
return false, errTypeNotFound
}

View File

@ -0,0 +1,104 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkers
import (
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type ControllerBaseTestSuite struct {
suite.Suite
kv kv.MetaKv
meta *meta.Meta
broker *meta.MockBroker
nodeMgr *session.NodeManager
dist *meta.DistributionManager
targetManager *meta.TargetManager
scheduler *task.MockScheduler
balancer *balance.MockBalancer
controller *CheckerController
}
func (suite *ControllerBaseTestSuite) SetupSuite() {
paramtable.Init()
}
func (suite *ControllerBaseTestSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.dist = meta.NewDistributionManager()
suite.broker = meta.NewMockBroker(suite.T())
suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta)
suite.balancer = balance.NewMockBalancer(suite.T())
suite.scheduler = task.NewMockScheduler(suite.T())
suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.balancer, suite.nodeMgr, suite.scheduler, suite.broker)
}
func (s *ControllerBaseTestSuite) TestActivation() {
active, err := s.controller.IsActive(segmentChecker)
s.NoError(err)
s.True(active)
err = s.controller.Deactivate(segmentChecker)
s.NoError(err)
active, err = s.controller.IsActive(segmentChecker)
s.NoError(err)
s.False(active)
err = s.controller.Activate(segmentChecker)
s.NoError(err)
active, err = s.controller.IsActive(segmentChecker)
s.NoError(err)
s.True(active)
invalidTyp := -1
_, err = s.controller.IsActive(checkerType(invalidTyp))
s.Equal(errTypeNotFound, err)
}
func TestControllerBaseTestSuite(t *testing.T) {
suite.Run(t, new(ControllerBaseTestSuite))
}

View File

@ -36,6 +36,7 @@ var _ Checker = (*IndexChecker)(nil)
// IndexChecker perform segment index check.
type IndexChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
broker meta.Broker
@ -49,14 +50,15 @@ func NewIndexChecker(
nodeMgr *session.NodeManager,
) *IndexChecker {
return &IndexChecker{
meta: meta,
dist: dist,
broker: broker,
nodeMgr: nodeMgr,
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
broker: broker,
nodeMgr: nodeMgr,
}
}
func (c *IndexChecker) ID() task.Source {
func (c *IndexChecker) ID() checkerType {
return indexChecker
}
@ -65,6 +67,9 @@ func (c *IndexChecker) Description() string {
}
func (c *IndexChecker) Check(ctx context.Context) []task.Task {
if !c.IsActive() {
return nil
}
collectionIDs := c.meta.CollectionManager.GetAll()
var tasks []task.Task

View File

@ -36,6 +36,7 @@ import (
)
type SegmentChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
@ -51,15 +52,16 @@ func NewSegmentChecker(
nodeMgr *session.NodeManager,
) *SegmentChecker {
return &SegmentChecker{
meta: meta,
dist: dist,
targetMgr: targetMgr,
balancer: balancer,
nodeMgr: nodeMgr,
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
targetMgr: targetMgr,
balancer: balancer,
nodeMgr: nodeMgr,
}
}
func (c *SegmentChecker) ID() task.Source {
func (c *SegmentChecker) ID() checkerType {
return segmentChecker
}
@ -75,6 +77,9 @@ func (c *SegmentChecker) readyToCheck(collectionID int64) bool {
}
func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
if !c.IsActive() {
return nil
}
collectionIDs := c.meta.CollectionManager.GetAll()
results := make([]task.Task, 0)
for _, cid := range collectionIDs {

View File

@ -147,6 +147,17 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() {
suite.Equal(task.ActionTypeGrow, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
// test activation
checker.Deactivate()
suite.False(checker.IsActive())
tasks = checker.Check(context.TODO())
suite.Len(tasks, 0)
checker.Activate()
suite.True(checker.IsActive())
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
}
func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() {