mirror of https://github.com/milvus-io/milvus.git
parent
024beddfe6
commit
21ba8182ee
|
@ -45,23 +45,24 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t
|
|||
p.ReplicaID,
|
||||
actions...,
|
||||
)
|
||||
log.Info("Create Segment task",
|
||||
zap.Int64("collection", p.Segment.GetCollectionID()),
|
||||
zap.Int64("replica", p.ReplicaID),
|
||||
zap.String("channel", p.Segment.GetInsertChannel()),
|
||||
zap.Int64("From", p.From),
|
||||
zap.Int64("To", p.To))
|
||||
if err != nil {
|
||||
log.Warn("Create segment task from plan failed",
|
||||
log.Warn("create segment task from plan failed",
|
||||
zap.Int64("collection", p.Segment.GetCollectionID()),
|
||||
zap.Int64("replica", p.ReplicaID),
|
||||
zap.String("channel", p.Segment.GetInsertChannel()),
|
||||
zap.Int64("From", p.From),
|
||||
zap.Int64("To", p.To),
|
||||
zap.Int64("from", p.From),
|
||||
zap.Int64("to", p.To),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("create segment task",
|
||||
zap.Int64("collection", p.Segment.GetCollectionID()),
|
||||
zap.Int64("replica", p.ReplicaID),
|
||||
zap.String("channel", p.Segment.GetInsertChannel()),
|
||||
zap.Int64("from", p.From),
|
||||
zap.Int64("to", p.To))
|
||||
task.SetPriority(GetTaskPriorityFromWeight(p.Weight))
|
||||
ret = append(ret, task)
|
||||
}
|
||||
|
@ -81,23 +82,24 @@ func CreateChannelTasksFromPlans(ctx context.Context, checkerID int64, timeout t
|
|||
actions = append(actions, action)
|
||||
}
|
||||
task, err := task.NewChannelTask(ctx, timeout, checkerID, p.Channel.GetCollectionID(), p.ReplicaID, actions...)
|
||||
log.Info("Create Channel task",
|
||||
zap.Int64("collection", p.Channel.GetCollectionID()),
|
||||
zap.Int64("replica", p.ReplicaID),
|
||||
zap.String("channel", p.Channel.GetChannelName()),
|
||||
zap.Int64("From", p.From),
|
||||
zap.Int64("To", p.To))
|
||||
if err != nil {
|
||||
log.Warn("Create channel task from plan failed",
|
||||
log.Warn("create channel task failed",
|
||||
zap.Int64("collection", p.Channel.GetCollectionID()),
|
||||
zap.Int64("replica", p.ReplicaID),
|
||||
zap.String("channel", p.Channel.GetChannelName()),
|
||||
zap.Int64("From", p.From),
|
||||
zap.Int64("To", p.To),
|
||||
zap.Int64("from", p.From),
|
||||
zap.Int64("to", p.To),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("create channel task",
|
||||
zap.Int64("collection", p.Channel.GetCollectionID()),
|
||||
zap.Int64("replica", p.ReplicaID),
|
||||
zap.String("channel", p.Channel.GetChannelName()),
|
||||
zap.Int64("from", p.From),
|
||||
zap.Int64("to", p.To))
|
||||
task.SetPriority(GetTaskPriorityFromWeight(p.Weight))
|
||||
ret = append(ret, task)
|
||||
}
|
||||
|
|
|
@ -193,11 +193,11 @@ func (c *ChannelChecker) createChannelReduceTasks(ctx context.Context, channels
|
|||
action := task.NewChannelAction(ch.Node, task.ActionTypeReduce, ch.GetChannelName())
|
||||
task, err := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), c.ID(), ch.GetCollectionID(), replicaID, action)
|
||||
if err != nil {
|
||||
log.Warn("Create channel reduce task failed",
|
||||
log.Warn("create channel reduce task failed",
|
||||
zap.Int64("collection", ch.GetCollectionID()),
|
||||
zap.Int64("replica", replicaID),
|
||||
zap.String("channel", ch.GetChannelName()),
|
||||
zap.Int64("From", ch.Node),
|
||||
zap.Int64("from", ch.Node),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
|
|
|
@ -296,11 +296,11 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments
|
|||
)
|
||||
|
||||
if err != nil {
|
||||
log.Warn("Create segment reduce task failed",
|
||||
log.Warn("create segment reduce task failed",
|
||||
zap.Int64("collection", s.GetCollectionID()),
|
||||
zap.Int64("replica", replicaID),
|
||||
zap.String("channel", s.GetInsertChannel()),
|
||||
zap.Int64("From", s.Node),
|
||||
zap.Int64("from", s.Node),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
|
|
|
@ -149,12 +149,12 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe
|
|||
)
|
||||
|
||||
if err != nil {
|
||||
log.Warn("Create segment task for balance failed",
|
||||
log.Warn("create segment task for balance failed",
|
||||
zap.Int64("collection", req.GetCollectionID()),
|
||||
zap.Int64("replica", replica.GetID()),
|
||||
zap.String("channel", plan.Segment.InsertChannel),
|
||||
zap.Int64("From", srcNode),
|
||||
zap.Int64("To", plan.To),
|
||||
zap.Int64("from", srcNode),
|
||||
zap.Int64("to", plan.To),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
"github.com/cockroachdb/errors"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
|
@ -28,12 +26,6 @@ import (
|
|||
. "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrActionCanceled = errors.New("ActionCanceled")
|
||||
ErrActionRPCFailed = errors.New("ActionRPCFailed")
|
||||
ErrActionStale = errors.New("ActionStale")
|
||||
)
|
||||
|
||||
type ActionType = int32
|
||||
|
||||
const (
|
||||
|
|
|
@ -22,9 +22,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/util/merr"
|
||||
. "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
@ -46,12 +46,6 @@ const (
|
|||
TaskPriorityHigh // for channel checker
|
||||
)
|
||||
|
||||
var (
|
||||
ErrEmptyActions = errors.New("actions could not be empty")
|
||||
ErrActionsTypeInconsistent = errors.New("actions have inconsistent type")
|
||||
ErrActionsTargetInconsistent = errors.New("actions have inconsistent target channel/segment")
|
||||
)
|
||||
|
||||
var (
|
||||
// All task priorities from low to high
|
||||
TaskPriorities = []Priority{TaskPriorityLow, TaskPriorityNormal, TaskPriorityHigh}
|
||||
|
@ -249,7 +243,7 @@ func NewSegmentTask(ctx context.Context,
|
|||
replicaID UniqueID,
|
||||
actions ...Action) (*SegmentTask, error) {
|
||||
if len(actions) == 0 {
|
||||
return nil, ErrEmptyActions
|
||||
return nil, errors.WithStack(merr.WrapErrParameterInvalid("non-empty actions", "no action"))
|
||||
}
|
||||
|
||||
segmentID := int64(-1)
|
||||
|
@ -257,13 +251,13 @@ func NewSegmentTask(ctx context.Context,
|
|||
for _, action := range actions {
|
||||
action, ok := action.(*SegmentAction)
|
||||
if !ok {
|
||||
return nil, ErrActionsTypeInconsistent
|
||||
return nil, errors.WithStack(merr.WrapErrParameterInvalid("SegmentAction", "other action", "all actions must be with the same type"))
|
||||
}
|
||||
if segmentID == -1 {
|
||||
segmentID = action.SegmentID()
|
||||
shard = action.Shard()
|
||||
} else if segmentID != action.SegmentID() {
|
||||
return nil, ErrActionsTargetInconsistent
|
||||
return nil, errors.WithStack(merr.WrapErrParameterInvalid(segmentID, action.SegmentID(), "all actions must operate the same segment"))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -301,19 +295,19 @@ func NewChannelTask(ctx context.Context,
|
|||
replicaID UniqueID,
|
||||
actions ...Action) (*ChannelTask, error) {
|
||||
if len(actions) == 0 {
|
||||
return nil, ErrEmptyActions
|
||||
return nil, errors.WithStack(merr.WrapErrParameterInvalid("non-empty actions", "no action"))
|
||||
}
|
||||
|
||||
channel := ""
|
||||
for _, action := range actions {
|
||||
channelAction, ok := action.(interface{ ChannelName() string })
|
||||
channelAction, ok := action.(*ChannelAction)
|
||||
if !ok {
|
||||
return nil, ErrActionsTypeInconsistent
|
||||
return nil, errors.WithStack(merr.WrapErrParameterInvalid("ChannelAction", "other action", "all actions must be with the same type"))
|
||||
}
|
||||
if channel == "" {
|
||||
channel = channelAction.ChannelName()
|
||||
} else if channel != channelAction.ChannelName() {
|
||||
return nil, ErrActionsTargetInconsistent
|
||||
return nil, errors.WithStack(merr.WrapErrParameterInvalid(channel, channelAction.ChannelName(), "all actions must operate the same segment"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/merr"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
@ -1051,35 +1052,34 @@ func (suite *TaskSuite) TestChannelTaskReplace() {
|
|||
|
||||
func (suite *TaskSuite) TestCreateTaskBehavior() {
|
||||
chanelTask, err := NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0)
|
||||
suite.Error(err)
|
||||
suite.ErrorIs(err, ErrEmptyActions)
|
||||
suite.ErrorIs(err, merr.ErrParameterInvalid)
|
||||
suite.Nil(chanelTask)
|
||||
|
||||
action := NewSegmentAction(0, 0, "", 0)
|
||||
chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0, action)
|
||||
suite.ErrorIs(err, ErrActionsTypeInconsistent)
|
||||
suite.ErrorIs(err, merr.ErrParameterInvalid)
|
||||
suite.Nil(chanelTask)
|
||||
|
||||
action1 := NewChannelAction(0, 0, "fake-channel1")
|
||||
action2 := NewChannelAction(0, 0, "fake-channel2")
|
||||
chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0, action1, action2)
|
||||
suite.ErrorIs(err, ErrActionsTargetInconsistent)
|
||||
suite.ErrorIs(err, merr.ErrParameterInvalid)
|
||||
suite.Nil(chanelTask)
|
||||
|
||||
segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0)
|
||||
suite.ErrorIs(err, ErrEmptyActions)
|
||||
suite.ErrorIs(err, merr.ErrParameterInvalid)
|
||||
suite.Nil(segmentTask)
|
||||
|
||||
channelAction := NewChannelAction(0, 0, "fake-channel1")
|
||||
segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0, channelAction)
|
||||
suite.ErrorIs(err, ErrActionsTypeInconsistent)
|
||||
suite.ErrorIs(err, merr.ErrParameterInvalid)
|
||||
suite.Nil(segmentTask)
|
||||
|
||||
segmentAction1 := NewSegmentAction(0, 0, "", 0)
|
||||
segmentAction2 := NewSegmentAction(0, 0, "", 1)
|
||||
|
||||
segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0, segmentAction1, segmentAction2)
|
||||
suite.ErrorIs(err, ErrActionsTargetInconsistent)
|
||||
suite.ErrorIs(err, merr.ErrParameterInvalid)
|
||||
suite.Nil(segmentTask)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue