Use merr in querynodev2 (#23652)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/23740/head
smellthemoon 2023-04-26 17:06:34 +08:00 committed by GitHub
parent 0d3fcd5d16
commit f0ababb420
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 103 additions and 189 deletions

View File

@ -18,6 +18,8 @@ package collector
import (
"sync"
"github.com/milvus-io/milvus/pkg/util/merr"
)
type averageData struct {
@ -65,7 +67,7 @@ func (c *averageCollector) Average(label string) (float64, error) {
average, ok := c.averages[label]
if !ok {
return 0, WrapErrAvarageLabelNotRegister(label)
return 0, merr.WrapErrAverageLabelNotRegister(label)
}
return average.Value(), nil

View File

@ -1,31 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"fmt"
"github.com/cockroachdb/errors"
)
var (
ErrAvarageLabelNotRegister = errors.New("AvarageLabelNotRegister")
)
func WrapErrAvarageLabelNotRegister(label string) error {
return fmt.Errorf("%w :%s", ErrAvarageLabelNotRegister, label)
}

View File

@ -1,42 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynodev2
import (
"fmt"
"github.com/cockroachdb/errors"
)
var (
ErrNodeUnhealthy = errors.New("NodeIsUnhealthy")
ErrGetDelegatorFailed = errors.New("GetShardDelefatorFailed")
ErrInitPipelineFailed = errors.New("InitPipelineFailed")
)
// WrapErrNodeUnhealthy wraps ErrNodeUnhealthy with nodeID.
func WrapErrNodeUnhealthy(nodeID int64) error {
return fmt.Errorf("%w id: %d", ErrNodeUnhealthy, nodeID)
}
func WrapErrInitPipelineFailed(err error) error {
return fmt.Errorf("%w err: %s", ErrInitPipelineFailed, err.Error())
}
func msgQueryNodeIsUnhealthy(nodeID int64) string {
return fmt.Sprintf("query node %d is not ready", nodeID)
}

View File

@ -125,7 +125,8 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
}()
if !node.lifetime.Add(commonpbutil.IsHealthy) {
failRet.Status.Reason = msgQueryNodeIsUnhealthy(paramtable.GetNodeID())
err := merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))
failRet.Status = merr.Status(err)
return failRet, nil
}
defer node.lifetime.Done()
@ -167,8 +168,9 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
// get delegator
sd, ok := node.delegators.Get(channel)
if !ok {
log.Warn("Query failed, failed to get query shard delegator", zap.Error(ErrGetDelegatorFailed))
failRet.Status.Reason = ErrGetDelegatorFailed.Error()
err := merr.WrapErrServiceUnavailable("failed to get query shard delegator")
log.Warn("Query failed, failed to get query shard delegator", zap.Error(err))
failRet.Status = merr.Status(err)
return failRet, nil
}
@ -330,7 +332,7 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
if !node.lifetime.Add(commonpbutil.IsHealthy) {
return nil, WrapErrNodeUnhealthy(paramtable.GetNodeID())
return nil, merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))
}
defer node.lifetime.Done()
@ -353,8 +355,9 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
task := tasks.NewSearchTask(searchCtx, collection, node.manager, req)
if !node.scheduler.Add(task) {
log.Warn("failed to search channel", zap.Error(tasks.ErrTaskQueueFull))
return nil, tasks.ErrTaskQueueFull
err := merr.WrapErrTaskQueueFull()
log.Warn("failed to search channel", zap.Error(err))
return nil, err
}
err := task.Wait()
@ -380,8 +383,9 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq
// get delegator
sd, ok := node.delegators.Get(channel)
if !ok {
log.Warn("Query failed, failed to get query shard delegator", zap.Error(ErrGetDelegatorFailed))
return nil, ErrGetDelegatorFailed
err := merr.WrapErrServiceUnavailable("failed to get query shard delegator")
log.Warn("Query failed, failed to get query shard delegator", zap.Error(err))
return nil, err
}
req, err := node.optimizeSearchParams(ctx, req, sd)
if err != nil {

View File

@ -1,61 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"fmt"
"github.com/cockroachdb/errors"
)
var (
ErrMsgInvalidType = errors.New("InvalidMessageType")
ErrMsgNotAligned = errors.New("CheckAlignedFailed")
ErrMsgEmpty = errors.New("EmptyMessage")
ErrMsgNotTarget = errors.New("NotTarget")
ErrMsgExcluded = errors.New("SegmentExcluded")
ErrCollectionNotFound = errors.New("CollectionNotFound")
ErrShardDelegatorNotFound = errors.New("ShardDelegatorNotFound")
ErrNewPipelineFailed = errors.New("FailedCreateNewPipeline")
ErrStartPipeline = errors.New("PipineStartFailed")
)
func WrapErrMsgNotAligned(err error) error {
return fmt.Errorf("%w :%s", ErrMsgNotAligned, err)
}
func WrapErrMsgNotTarget(reason string) error {
return fmt.Errorf("%w%s", ErrMsgNotTarget, reason)
}
func WrapErrMsgExcluded(segmentID int64) error {
return fmt.Errorf("%w ID:%d", ErrMsgExcluded, segmentID)
}
func WrapErrNewPipelineFailed(err error) error {
return fmt.Errorf("%w :%s", ErrNewPipelineFailed, err)
}
func WrapErrStartPipeline(reason string) error {
return fmt.Errorf("%w :%s", ErrStartPipeline, reason)
}
func WrapErrShardDelegatorNotFound(channel string) error {
return fmt.Errorf("%w channel:%s", ErrShardDelegatorNotFound, channel)
}

View File

@ -30,11 +30,12 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
//filterNode filter the invalid message of pipeline
// filterNode filter the invalid message of pipeline
type filterNode struct {
*BaseNode
collectionID UniqueID
@ -102,7 +103,7 @@ func (fNode *filterNode) Operate(in Msg) Msg {
return out
}
//filtrate message with filter policy
// filtrate message with filter policy
func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error {
switch msg.Type() {
@ -126,7 +127,7 @@ func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error {
}
}
default:
return ErrMsgInvalidType
return merr.WrapErrParameterInvalid("msgType is Insert or Delete", "not")
}
return nil
}

View File

@ -16,31 +16,37 @@
package pipeline
//MsgFilter will return error if Msg was invalid
import (
"fmt"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// MsgFilter will return error if Msg was invalid
type InsertMsgFilter = func(n *filterNode, c *Collection, msg *InsertMsg) error
type DeleteMsgFilter = func(n *filterNode, c *Collection, msg *DeleteMsg) error
//Chack msg is aligned --
//len of each kind of infos in InsertMsg should match each other
// Chack msg is aligned --
// len of each kind of infos in InsertMsg should match each other
func InsertNotAligned(n *filterNode, c *Collection, msg *InsertMsg) error {
err := msg.CheckAligned()
if err != nil {
return WrapErrMsgNotAligned(err)
return err
}
return nil
}
func InsertEmpty(n *filterNode, c *Collection, msg *InsertMsg) error {
if len(msg.GetTimestamps()) <= 0 {
return ErrMsgEmpty
return merr.WrapErrParameterInvalid("has msg", "the length of timestamp field is 0")
}
return nil
}
func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error {
if msg.GetCollectionID() != c.ID() {
return WrapErrMsgNotTarget("Collection")
return merr.WrapErrParameterInvalid(msg.GetCollectionID(), c.ID(), "msg not target because of collection")
}
// all growing will be be in-memory to support dynamic partition load/release
@ -53,7 +59,8 @@ func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error {
return nil
}
if msg.EndTimestamp <= segInfo.GetDmlPosition().GetTimestamp() {
return WrapErrMsgExcluded(msg.SegmentID)
m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID())
return merr.WrapErrSegmentLack(msg.GetSegmentID(), m)
}
return nil
}
@ -61,21 +68,21 @@ func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error {
func DeleteNotAligned(n *filterNode, c *Collection, msg *DeleteMsg) error {
err := msg.CheckAligned()
if err != nil {
return WrapErrMsgNotAligned(err)
return err
}
return nil
}
func DeleteEmpty(n *filterNode, c *Collection, msg *DeleteMsg) error {
if len(msg.GetTimestamps()) <= 0 {
return ErrMsgEmpty
return merr.WrapErrParameterInvalid("has msg", "the length of timestamp field is 0")
}
return nil
}
func DeleteOutOfTarget(n *filterNode, c *Collection, msg *DeleteMsg) error {
if msg.GetCollectionID() != c.ID() {
return WrapErrMsgNotTarget("Collection")
return merr.WrapErrParameterInvalid(msg.GetCollectionID(), c.ID(), "msg not target because of collection")
}
// all growing will be be in-memory to support dynamic partition load/release

View File

@ -27,11 +27,12 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
//Manager manage pipeline in querynode
// Manager manage pipeline in querynode
type Manager interface {
Num() int
Add(collectionID UniqueID, channel string) (Pipeline, error)
@ -55,7 +56,7 @@ func (m *manager) Num() int {
return len(m.channel2Pipeline)
}
//Add pipeline for each channel of collection
// Add pipeline for each channel of collection
func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) {
m.mu.Lock()
defer m.mu.Unlock()
@ -76,12 +77,12 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) {
//get shard delegator for add growing in pipeline
delegator, ok := m.delegators.Get(channel)
if !ok {
return nil, WrapErrShardDelegatorNotFound(channel)
return nil, merr.WrapErrShardDelegatorNotFound(channel)
}
newPipeLine, err := NewPipeLine(collectionID, channel, m.dataManager, m.tSafeManager, m.dispatcher, delegator)
if err != nil {
return nil, WrapErrNewPipelineFailed(err)
return nil, merr.WrapErrServiceUnavailable(err.Error(), "failed to create new pipeline")
}
m.channel2Pipeline[channel] = newPipeLine
@ -105,7 +106,7 @@ func (m *manager) Get(channel string) Pipeline {
return pipeline
}
//Remove pipeline from Manager by channel
// Remove pipeline from Manager by channel
func (m *manager) Remove(channels ...string) {
m.mu.Lock()
defer m.mu.Unlock()
@ -122,7 +123,7 @@ func (m *manager) Remove(channels ...string) {
metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
}
//Start pipeline by channel
// Start pipeline by channel
func (m *manager) Start(channels ...string) error {
m.mu.Lock()
defer m.mu.Unlock()
@ -130,7 +131,8 @@ func (m *manager) Start(channels ...string) error {
//check pipelie all exist before start
for _, channel := range channels {
if _, ok := m.channel2Pipeline[channel]; !ok {
return WrapErrStartPipeline(fmt.Sprintf("pipeline with channel %s not exist", channel))
reason := fmt.Sprintf("pipeline with channel %s not exist", channel)
return merr.WrapErrServiceUnavailable(reason, "pipine start failed")
}
}
@ -140,7 +142,7 @@ func (m *manager) Start(channels ...string) error {
return nil
}
//Close all pipeline of Manager
// Close all pipeline of Manager
func (m *manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()

View File

@ -22,6 +22,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
)
@ -47,7 +48,7 @@ func (msg *insertNodeMsg) append(taskMsg msgstream.TsMsg) error {
msg.deleteMsgs = append(msg.deleteMsgs, deleteMsg)
collector.Rate.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&deleteMsg.DeleteRequest)))
default:
return ErrMsgInvalidType
return merr.WrapErrParameterInvalid("msgType is Insert or Delete", "not")
}
return nil
}

View File

@ -305,12 +305,12 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
}
err = pipeline.ConsumeMsgStream(position)
if err != nil {
err = WrapErrInitPipelineFailed(err)
err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed")
log.Warn(err.Error(),
zap.Int64("collectionID", channel.CollectionID),
zap.String("channel", channel.ChannelName),
)
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "", err), nil
return merr.Status(err), nil
}
// start pipeline
@ -778,16 +778,14 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn
// ShowConfigurations returns the configurations of queryNode matching req.Pattern
func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
if !node.lifetime.Add(commonpbutil.IsHealthy) {
err := merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))
log.Warn("QueryNode.ShowConfigurations failed",
zap.Int64("nodeId", paramtable.GetNodeID()),
zap.String("req", req.Pattern),
zap.Error(WrapErrNodeUnhealthy(paramtable.GetNodeID())))
zap.Error(err))
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgQueryNodeIsUnhealthy(paramtable.GetNodeID()),
},
Status: merr.Status(err),
Configuations: nil,
}, nil
}
@ -814,16 +812,14 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
if !node.lifetime.Add(commonpbutil.IsHealthy) {
err := merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))
log.Warn("QueryNode.GetMetrics failed",
zap.Int64("nodeId", paramtable.GetNodeID()),
zap.String("req", req.Request),
zap.Error(WrapErrNodeUnhealthy(paramtable.GetNodeID())))
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgQueryNodeIsUnhealthy(paramtable.GetNodeID()),
},
Status: merr.Status(err),
Response: "",
}, nil
}
@ -888,14 +884,12 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
zap.Int64("nodeID", paramtable.GetNodeID()),
)
if !node.lifetime.Add(commonpbutil.IsHealthy) {
err := merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))
log.Warn("QueryNode.GetMetrics failed",
zap.Error(WrapErrNodeUnhealthy(paramtable.GetNodeID())))
zap.Error(err))
return &querypb.GetDataDistributionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgQueryNodeIsUnhealthy(paramtable.GetNodeID()),
},
Status: merr.Status(err),
}, nil
}
defer node.lifetime.Done()

View File

@ -1,7 +0,0 @@
package tasks
import "github.com/cockroachdb/errors"
var (
ErrTaskQueueFull = errors.New("TaskQueueFull")
)

View File

@ -103,6 +103,15 @@ var (
ErrTopicNotFound = newMilvusError("topic not found", 1300, false)
ErrTopicNotEmpty = newMilvusError("topic not empty", 1301, false)
// Average related
ErrAverageLabelNotRegister = newMilvusError("average label not register", 1400, false)
// shard delegator related
ErrShardDelegatorNotFound = newMilvusError("shard delegator not found", 1500, false)
// task related
ErrTaskQueueFull = newMilvusError("task queue full", 1600, false)
// Do NOT export this,
// never allow programmer using this, keep only for converting unknown error to milvusError
errUnexpected = newMilvusError("unexpected error", (1<<16)-1, false)

View File

@ -114,6 +114,14 @@ func (s *ErrSuite) TestWrap() {
s.ErrorIs(WrapErrTopicNotFound("unknown", "failed to get topic"), ErrTopicNotFound)
s.ErrorIs(WrapErrTopicNotEmpty("unknown", "topic is not empty"), ErrTopicNotEmpty)
// average related
s.ErrorIs(WrapErrAverageLabelNotRegister("unknown", "average label not register"), ErrAverageLabelNotRegister)
// shard delegator related
s.ErrorIs(WrapErrShardDelegatorNotFound("unknown", "fail to get shard delegator"), ErrShardDelegatorNotFound)
// task related
s.ErrorIs(WrapErrTaskQueueFull("test_task_queue", "task queue is full"), ErrTaskQueueFull)
}
func (s *ErrSuite) TestCombine() {

View File

@ -380,6 +380,33 @@ func WrapErrTopicNotEmpty(name string, msg ...string) error {
return err
}
// Average related
func WrapErrAverageLabelNotRegister(label string, msg ...string) error {
err := errors.Wrapf(ErrAverageLabelNotRegister, "averageLabel=%s", label)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// shard delegator related
func WrapErrShardDelegatorNotFound(channel string, msg ...string) error {
err := errors.Wrapf(ErrShardDelegatorNotFound, "channel=%s", channel)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// task related
func WrapErrTaskQueueFull(msg ...string) error {
err := error(ErrTaskQueueFull)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func wrapWithField(err error, name string, value any) error {
return errors.Wrapf(err, "%s=%v", name, value)
}