reduce querycoord unnecessary panic (#19925)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/20175/head
wei liu 2022-10-28 17:15:32 +08:00 committed by GitHub
parent 53289a63e6
commit 4412cfcaaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 169 additions and 49 deletions

View File

@ -20,7 +20,9 @@ import (
"context"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"go.uber.org/zap"
)
func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout time.Duration, plans []SegmentAssignPlan) []task.Task {
@ -35,7 +37,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t
action := task.NewSegmentAction(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID())
actions = append(actions, action)
}
task := task.NewSegmentTask(
task, err := task.NewSegmentTask(
ctx,
timeout,
checkerID,
@ -43,6 +45,17 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t
p.ReplicaID,
actions...,
)
if err != nil {
log.Warn("Create segment task from plan failed",
zap.Int64("collection", p.Segment.GetCollectionID()),
zap.Int64("replica", p.ReplicaID),
zap.String("channel", p.Segment.GetInsertChannel()),
zap.Int64("From", p.From),
zap.Int64("To", p.To),
zap.Error(err),
)
continue
}
ret = append(ret, task)
}
return ret
@ -60,7 +73,18 @@ func CreateChannelTasksFromPlans(ctx context.Context, checkerID int64, timeout t
action := task.NewChannelAction(p.From, task.ActionTypeReduce, p.Channel.GetChannelName())
actions = append(actions, action)
}
task := task.NewChannelTask(ctx, timeout, checkerID, p.Channel.GetCollectionID(), p.ReplicaID, actions...)
task, err := task.NewChannelTask(ctx, timeout, checkerID, p.Channel.GetCollectionID(), p.ReplicaID, actions...)
if err != nil {
log.Warn("Create channel task from plan failed",
zap.Int64("collection", p.Channel.GetCollectionID()),
zap.Int64("replica", p.ReplicaID),
zap.String("channel", p.Channel.GetChannelName()),
zap.Int64("From", p.From),
zap.Int64("To", p.To),
zap.Error(err),
)
continue
}
ret = append(ret, task)
}
return ret

View File

@ -19,11 +19,13 @@ package checkers
import (
"context"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"go.uber.org/zap"
)
// TODO(sunby): have too much similar codes with SegmentChecker
@ -148,7 +150,17 @@ func (c *ChannelChecker) createChannelReduceTasks(ctx context.Context, channels
ret := make([]task.Task, 0, len(channels))
for _, ch := range channels {
action := task.NewChannelAction(ch.Node, task.ActionTypeReduce, ch.GetChannelName())
task := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout, c.ID(), ch.GetCollectionID(), replicaID, action)
task, err := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout, c.ID(), ch.GetCollectionID(), replicaID, action)
if err != nil {
log.Warn("Create channel reduce task failed",
zap.Int64("collection", ch.GetCollectionID()),
zap.Int64("replica", replicaID),
zap.String("channel", ch.GetChannelName()),
zap.Int64("From", ch.Node),
zap.Error(err),
)
continue
}
ret = append(ret, task)
}
return ret

View File

@ -19,6 +19,7 @@ package checkers
import (
"context"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
@ -27,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
type SegmentChecker struct {
@ -246,14 +248,27 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments
ret := make([]task.Task, 0, len(segments))
for _, s := range segments {
action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope)
ret = append(ret, task.NewSegmentTask(
task, err := task.NewSegmentTask(
ctx,
Params.QueryCoordCfg.SegmentTaskTimeout,
c.ID(),
s.GetCollectionID(),
replicaID,
action,
))
)
if err != nil {
log.Warn("Create segment reduce task failed",
zap.Int64("collection", s.GetCollectionID()),
zap.Int64("replica", replicaID),
zap.String("channel", s.GetInsertChannel()),
zap.Int64("From", s.Node),
zap.Error(err),
)
continue
}
ret = append(ret, task)
}
return ret
}

View File

@ -115,7 +115,7 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe
zap.Int64("destNodeID", plan.To),
zap.Int64("segmentID", plan.Segment.GetID()),
)
task := task.NewSegmentTask(ctx,
task, err := task.NewSegmentTask(ctx,
Params.QueryCoordCfg.SegmentTaskTimeout,
req.GetBase().GetMsgID(),
req.GetCollectionID(),
@ -123,7 +123,19 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe
task.NewSegmentAction(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID()),
task.NewSegmentAction(srcNode, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID()),
)
err := s.taskScheduler.Add(task)
if err != nil {
log.Warn("Create segment task for balance failed",
zap.Int64("collection", req.GetCollectionID()),
zap.Int64("replica", replica.GetID()),
zap.String("channel", plan.Segment.InsertChannel),
zap.Int64("From", srcNode),
zap.Int64("To", plan.To),
zap.Error(err),
)
continue
}
err = s.taskScheduler.Add(task)
if err != nil {
task.Cancel()
return err

View File

@ -120,9 +120,9 @@ func (suite *MergerSuite) TestMerge() {
ctx := context.Background()
for segmentID := int64(1); segmentID <= 3; segmentID++ {
task := NewSegmentTask(ctx, timeout, 0, suite.collectionID, suite.replicaID,
task, err := NewSegmentTask(ctx, timeout, 0, suite.collectionID, suite.replicaID,
NewSegmentAction(suite.nodeID, ActionTypeGrow, "", segmentID))
suite.NoError(err)
suite.merger.Add(NewLoadSegmentsTask(task, 0, suite.requests[segmentID]))
}

View File

@ -18,6 +18,7 @@ package task
import (
"context"
"errors"
"fmt"
"time"
@ -44,6 +45,12 @@ const (
TaskPriorityHigh
)
var (
ErrEmptyActions = errors.New("actions could not be empty")
ErrActionsTypeInconsistent = errors.New("actions have inconsistent type")
ErrActionsTargetInconsistent = errors.New("actions have inconsistent target channel/segment")
)
var (
// All task priorities from low to high
TaskPriorities = []Priority{TaskPriorityLow, TaskPriorityNormal, TaskPriorityHigh}
@ -233,9 +240,9 @@ func NewSegmentTask(ctx context.Context,
sourceID,
collectionID,
replicaID UniqueID,
actions ...Action) *SegmentTask {
actions ...Action) (*SegmentTask, error) {
if len(actions) == 0 {
panic("empty actions is not allowed")
return nil, ErrEmptyActions
}
segmentID := int64(-1)
@ -243,13 +250,13 @@ func NewSegmentTask(ctx context.Context,
for _, action := range actions {
action, ok := action.(*SegmentAction)
if !ok {
panic("SegmentTask can only contain SegmentActions")
return nil, ErrActionsTypeInconsistent
}
if segmentID == -1 {
segmentID = action.SegmentID()
shard = action.Shard()
} else if segmentID != action.SegmentID() {
panic("all actions must process the same segment")
return nil, ErrActionsTargetInconsistent
}
}
@ -258,7 +265,7 @@ func NewSegmentTask(ctx context.Context,
return &SegmentTask{
baseTask: base,
segmentID: segmentID,
}
}, nil
}
func (task *SegmentTask) Shard() string {
@ -285,21 +292,21 @@ func NewChannelTask(ctx context.Context,
sourceID,
collectionID,
replicaID UniqueID,
actions ...Action) *ChannelTask {
actions ...Action) (*ChannelTask, error) {
if len(actions) == 0 {
panic("empty actions is not allowed")
return nil, ErrEmptyActions
}
channel := ""
for _, action := range actions {
channelAction, ok := action.(interface{ ChannelName() string })
if !ok {
panic("ChannelTask must contain only ChannelAction")
return nil, ErrActionsTypeInconsistent
}
if channel == "" {
channel = channelAction.ChannelName()
} else if channel != channelAction.ChannelName() {
panic("all actions must process the same channel")
return nil, ErrActionsTargetInconsistent
}
}
@ -307,7 +314,7 @@ func NewChannelTask(ctx context.Context,
base.actions = actions
return &ChannelTask{
baseTask: base,
}
}, nil
}
func (task *ChannelTask) Channel() string {

View File

@ -203,7 +203,7 @@ func (suite *TaskSuite) TestSubscribeChannelTask() {
ChannelName: channel,
UnflushedSegmentIds: []int64{suite.growingSegments[channel]},
}))
task := NewChannelTask(
task, err := NewChannelTask(
ctx,
timeout,
0,
@ -211,8 +211,9 @@ func (suite *TaskSuite) TestSubscribeChannelTask() {
suite.replica,
NewChannelAction(targetNode, ActionTypeGrow, channel),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.AssertTaskNum(0, len(suite.subChannels), len(suite.subChannels), 0)
@ -261,7 +262,7 @@ func (suite *TaskSuite) TestUnsubscribeChannelTask() {
CollectionID: suite.collection,
ChannelName: channel,
}))
task := NewChannelTask(
task, err := NewChannelTask(
ctx,
timeout,
0,
@ -269,8 +270,10 @@ func (suite *TaskSuite) TestUnsubscribeChannelTask() {
-1,
NewChannelAction(targetNode, ActionTypeReduce, channel),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
// Only first channel exists
@ -343,7 +346,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
PartitionID: partition,
InsertChannel: channel.ChannelName,
})
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -351,8 +354,9 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
suite.replica,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
segmentsNum := len(suite.loadSegments)
@ -426,7 +430,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
PartitionID: partition,
InsertChannel: channel.ChannelName,
})
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -434,8 +438,9 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
suite.replica,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
segmentsNum := len(suite.loadSegments)
@ -492,7 +497,7 @@ func (suite *TaskSuite) TestReleaseSegmentTask() {
},
})
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -500,8 +505,9 @@ func (suite *TaskSuite) TestReleaseSegmentTask() {
suite.replica,
NewSegmentAction(targetNode, ActionTypeReduce, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.dist.SegmentDistManager.Update(targetNode, segments...)
@ -539,7 +545,7 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() {
tasks := []Task{}
for _, segment := range suite.releaseSegments {
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -547,8 +553,9 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() {
suite.replica,
NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
@ -630,7 +637,7 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
})
view.Segments[segment] = &querypb.SegmentDist{NodeID: sourceNode, Version: 0}
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -639,8 +646,9 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
NewSegmentAction(sourceNode, ActionTypeReduce, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.dist.SegmentDistManager.Update(sourceNode, segments...)
@ -689,7 +697,7 @@ func (suite *TaskSuite) TestTaskCanceled() {
CollectionID: suite.collection,
ChannelName: channel,
}))
task := NewChannelTask(
task, err := NewChannelTask(
ctx,
timeout,
0,
@ -697,8 +705,9 @@ func (suite *TaskSuite) TestTaskCanceled() {
-1,
NewChannelAction(targetNode, ActionTypeReduce, channel),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
// Only first channel exists
@ -778,7 +787,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
PartitionID: partition,
InsertChannel: channel.ChannelName,
})
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -786,8 +795,9 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
suite.replica,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
segmentsNum := len(suite.loadSegments)
@ -833,7 +843,7 @@ func (suite *TaskSuite) TestChannelTaskReplace() {
targetNode := int64(3)
for _, channel := range suite.subChannels {
task := NewChannelTask(
task, err := NewChannelTask(
ctx,
timeout,
0,
@ -841,15 +851,16 @@ func (suite *TaskSuite) TestChannelTaskReplace() {
suite.replica,
NewChannelAction(targetNode, ActionTypeGrow, channel),
)
suite.NoError(err)
task.SetPriority(TaskPriorityNormal)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
// Task with the same replica and segment,
// but without higher priority can't be added
for _, channel := range suite.subChannels {
task := NewChannelTask(
task, err := NewChannelTask(
ctx,
timeout,
0,
@ -857,8 +868,9 @@ func (suite *TaskSuite) TestChannelTaskReplace() {
suite.replica,
NewChannelAction(targetNode, ActionTypeGrow, channel),
)
suite.NoError(err)
task.SetPriority(TaskPriorityNormal)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.ErrorIs(err, ErrConflictTaskExisted)
task.SetPriority(TaskPriorityLow)
err = suite.scheduler.Add(task)
@ -867,7 +879,7 @@ func (suite *TaskSuite) TestChannelTaskReplace() {
// Replace the task with one with higher priority
for _, channel := range suite.subChannels {
task := NewChannelTask(
task, err := NewChannelTask(
ctx,
timeout,
0,
@ -875,21 +887,56 @@ func (suite *TaskSuite) TestChannelTaskReplace() {
suite.replica,
NewChannelAction(targetNode, ActionTypeGrow, channel),
)
suite.NoError(err)
task.SetPriority(TaskPriorityHigh)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
channelNum := len(suite.subChannels)
suite.AssertTaskNum(0, channelNum, channelNum, 0)
}
func (suite *TaskSuite) TestCreateTaskBehavior() {
chanelTask, err := NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0)
suite.Error(err)
suite.ErrorIs(err, ErrEmptyActions)
suite.Nil(chanelTask)
action := NewSegmentAction(0, 0, "", 0)
chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0, action)
suite.ErrorIs(err, ErrActionsTypeInconsistent)
suite.Nil(chanelTask)
action1 := NewChannelAction(0, 0, "fake-channel1")
action2 := NewChannelAction(0, 0, "fake-channel2")
chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0, action1, action2)
suite.ErrorIs(err, ErrActionsTargetInconsistent)
suite.Nil(chanelTask)
segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0)
suite.ErrorIs(err, ErrEmptyActions)
suite.Nil(segmentTask)
channelAction := NewChannelAction(0, 0, "fake-channel1")
segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0, channelAction)
suite.ErrorIs(err, ErrActionsTypeInconsistent)
suite.Nil(segmentTask)
segmentAction1 := NewSegmentAction(0, 0, "", 0)
segmentAction2 := NewSegmentAction(0, 0, "", 1)
segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0, segmentAction1, segmentAction2)
suite.ErrorIs(err, ErrActionsTargetInconsistent)
suite.Nil(segmentTask)
}
func (suite *TaskSuite) TestSegmentTaskReplace() {
ctx := context.Background()
timeout := 10 * time.Second
targetNode := int64(3)
for _, segment := range suite.loadSegments {
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -897,15 +944,16 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
suite.replica,
NewSegmentAction(targetNode, ActionTypeGrow, "", segment),
)
suite.NoError(err)
task.SetPriority(TaskPriorityNormal)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
// Task with the same replica and segment,
// but without higher priority can't be added
for _, segment := range suite.loadSegments {
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -913,8 +961,9 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
suite.replica,
NewSegmentAction(targetNode, ActionTypeGrow, "", segment),
)
suite.NoError(err)
task.SetPriority(TaskPriorityNormal)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.ErrorIs(err, ErrConflictTaskExisted)
task.SetPriority(TaskPriorityLow)
err = suite.scheduler.Add(task)
@ -923,7 +972,7 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
// Replace the task with one with higher priority
for _, segment := range suite.loadSegments {
task := NewSegmentTask(
task, err := NewSegmentTask(
ctx,
timeout,
0,
@ -931,8 +980,9 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
suite.replica,
NewSegmentAction(targetNode, ActionTypeGrow, "", segment),
)
suite.NoError(err)
task.SetPriority(TaskPriorityHigh)
err := suite.scheduler.Add(task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
segmentNum := len(suite.loadSegments)