diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 517c535c81..fdbab1a19e 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -110,6 +110,10 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { zap.Int64("segmentID", segmentID), zap.Error(err), ) + if errors.IsAny(err, merr.ErrSegmentNotLoaded, merr.ErrSegmentNotFound) { + log.Warn("try to insert data into released segment, skip it", zap.Error(err)) + continue + } // panic here, insert failure panic(err) } diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index b47e9e5343..5f5d79445a 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -167,8 +167,9 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque collection := node.manager.Collection.Get(req.Req.GetCollectionID()) if collection == nil { - log.Warn("Query failed, failed to get collection") - failRet.Status.Reason = segments.WrapCollectionNotFound(req.Req.CollectionID).Error() + err := merr.WrapErrCollectionNotFound(req.Req.GetCollectionID()) + log.Warn("Query failed, failed to get collection", zap.Error(err)) + failRet.Status = merr.Status(err) return failRet, nil } @@ -196,7 +197,7 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque func (node *QueryNode) querySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { collection := node.manager.Collection.Get(req.Req.GetCollectionID()) if collection == nil { - return nil, segments.ErrCollectionNotFound + return nil, merr.WrapErrCollectionNotFound(req.Req.GetCollectionID()) } // Send task to scheduler and wait until it finished. diff --git a/internal/querynodev2/pipeline/filter_node.go b/internal/querynodev2/pipeline/filter_node.go index 04c973262a..f95c82c2cb 100644 --- a/internal/querynodev2/pipeline/filter_node.go +++ b/internal/querynodev2/pipeline/filter_node.go @@ -25,7 +25,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/querynodev2/segments" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -73,7 +72,7 @@ func (fNode *filterNode) Operate(in Msg) Msg { //Get collection from collection manager collection := fNode.manager.Collection.Get(fNode.collectionID) if collection == nil { - err := segments.WrapCollectionNotFound(fNode.collectionID) + err := merr.WrapErrCollectionNotFound(fNode.collectionID) log.Error(err.Error()) panic(err) } diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index 90e1953d41..780c446565 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -23,7 +23,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/querynodev2/delegator" - "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" @@ -71,7 +70,7 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) { tr := timerecord.NewTimeRecorder("add dmChannel") collection := m.dataManager.Collection.Get(collectionID) if collection == nil { - return nil, segments.WrapCollectionNotFound(collectionID) + return nil, merr.WrapErrCollectionNotFound(collectionID) } if pipeline, ok := m.channel2Pipeline[channel]; ok { diff --git a/internal/querynodev2/segments/errors.go b/internal/querynodev2/segments/errors.go deleted file mode 100644 index b264af6b1a..0000000000 --- a/internal/querynodev2/segments/errors.go +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segments - -import ( - "fmt" - - "github.com/cockroachdb/errors" -) - -var ( - // Manager related errors - ErrCollectionNotFound = errors.New("CollectionNotFound") - ErrPartitionNotFound = errors.New("PartitionNotFound") - ErrSegmentNotFound = errors.New("SegmentNotFound") - ErrFieldNotFound = errors.New("FieldNotFound") - ErrSegmentReleased = errors.New("SegmentReleased") -) - -func WrapSegmentNotFound(segmentID int64) error { - return fmt.Errorf("%w(%v)", ErrSegmentNotFound, segmentID) -} - -func WrapCollectionNotFound(collectionID int64) error { - return fmt.Errorf("%w(%v)", ErrCollectionNotFound, collectionID) -} - -func WrapFieldNotFound(fieldID int64) error { - return fmt.Errorf("%w(%v)", ErrFieldNotFound, fieldID) -} - -// WrapSegmentReleased wrap ErrSegmentReleased with segmentID. -func WrapSegmentReleased(segmentID int64) error { - return fmt.Errorf("%w(%d)", ErrSegmentReleased, segmentID) -} diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index 2048a9574f..2a6fb33cd8 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/initcore" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -178,7 +179,7 @@ func (suite *RetrieveSuite) TestRetrieveNilSegment() { suite.collectionID, []int64{suite.partitionID}, []int64{suite.sealed.ID()}) - suite.ErrorIs(err, ErrSegmentReleased) + suite.ErrorIs(err, merr.ErrSegmentNotLoaded) suite.Len(res, 0) } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 5bbc419036..c17e520979 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" @@ -364,7 +365,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S defer s.mut.RUnlock() if s.ptr == nil { - return nil, WrapSegmentReleased(s.segmentID) + return nil, merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } span := trace.SpanFromContext(ctx) @@ -407,7 +408,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco defer s.mut.RUnlock() if s.ptr == nil { - return nil, WrapSegmentReleased(s.segmentID) + return nil, merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } log := log.With( @@ -533,7 +534,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r defer s.mut.RUnlock() if s.ptr == nil { - return WrapSegmentReleased(s.segmentID) + return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } offset, err := s.preInsert(len(rowIDs)) @@ -592,7 +593,7 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ defer s.mut.RUnlock() if s.ptr == nil { - return WrapSegmentReleased(s.segmentID) + return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } var cOffset = C.int64_t(0) // depre @@ -657,7 +658,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field defer s.mut.RUnlock() if s.ptr == nil { - return WrapSegmentReleased(s.segmentID) + return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } log := log.With( @@ -710,7 +711,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap defer s.mut.RUnlock() if s.ptr == nil { - return WrapSegmentReleased(s.segmentID) + return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } log := log.With( @@ -763,7 +764,7 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error { defer s.mut.RUnlock() if s.ptr == nil { - return WrapSegmentReleased(s.segmentID) + return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } log := log.With( @@ -854,7 +855,7 @@ func (s *LocalSegment) LoadIndex(bytesIndex [][]byte, indexInfo *querypb.FieldIn defer s.mut.RUnlock() if s.ptr == nil { - return WrapSegmentReleased(s.segmentID) + return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } log := log.With( @@ -902,7 +903,7 @@ func (s *LocalSegment) LoadIndexData(indexInfo *querypb.FieldIndexInfo, fieldTyp defer s.mut.RUnlock() if s.ptr == nil { - return WrapSegmentReleased(s.segmentID) + return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") } log := log.With( diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 6e871e1a52..9f233e4fcf 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -171,7 +172,7 @@ func (loader *segmentLoader) Load(ctx context.Context, collection := loader.manager.Collection.Get(collectionID) if collection == nil { - err := WrapCollectionNotFound(collectionID) + err := merr.WrapErrCollectionNotFound(collectionID) log.Warn("failed to get collection", zap.Error(err)) clearAll() return nil, err @@ -345,7 +346,7 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI collection := loader.manager.Collection.Get(collectionID) if collection == nil { - err := WrapCollectionNotFound(collectionID) + err := merr.WrapErrCollectionNotFound(collectionID) log.Warn("failed to get collection while loading segment", zap.Error(err)) return nil, err } @@ -403,7 +404,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, collection := loader.manager.Collection.Get(segment.Collection()) if collection == nil { - err := WrapCollectionNotFound(segment.Collection()) + err := merr.WrapErrCollectionNotFound(segment.Collection()) log.Warn("failed to get collection while loading segment", zap.Error(err)) return err } @@ -613,7 +614,7 @@ func (loader *segmentLoader) insertIntoSegment(segment *LocalSegment, } collection := loader.manager.Collection.Get(segment.Collection()) if collection == nil { - err := WrapCollectionNotFound(segment.Collection()) + err := merr.WrapErrCollectionNotFound(segment.Collection()) log.Warn("failed to get collection while inserting data into segment", zap.Error(err)) return err } @@ -911,7 +912,7 @@ func (loader *segmentLoader) checkSegmentSize(segmentLoadInfos []*querypb.Segmen func (loader *segmentLoader) getFieldType(segment *LocalSegment, fieldID int64) (schemapb.DataType, error) { collection := loader.manager.Collection.Get(segment.collectionID) if collection == nil { - return 0, WrapCollectionNotFound(segment.Collection()) + return 0, merr.WrapErrCollectionNotFound(segment.Collection()) } for _, field := range collection.Schema().GetFields() { @@ -919,7 +920,7 @@ func (loader *segmentLoader) getFieldType(segment *LocalSegment, fieldID int64) return field.GetDataType(), nil } } - return 0, WrapFieldNotFound(fieldID) + return 0, merr.WrapErrFieldNotFound(fieldID) } func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 { diff --git a/internal/querynodev2/segments/validate.go b/internal/querynodev2/segments/validate.go index 05d7ca4a23..2e93070d62 100644 --- a/internal/querynodev2/segments/validate.go +++ b/internal/querynodev2/segments/validate.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" ) func validate(ctx context.Context, manager *Manager, collectionID int64, partitionIDs []int64, segmentIDs []int64, segmentFilter SegmentFilter) ([]int64, []int64, error) { @@ -34,7 +35,7 @@ func validate(ctx context.Context, manager *Manager, collectionID int64, partiti collection := manager.Collection.Get(collectionID) if collection == nil { - return nil, nil, WrapCollectionNotFound(collectionID) + return nil, nil, merr.WrapErrCollectionNotFound(collectionID) } //validate partition diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 132404f2c2..98f7d1df16 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -740,9 +740,9 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe collection := node.manager.Collection.Get(req.Req.GetCollectionID()) if collection == nil { - log.Warn("failed to search segments", zap.Error(segments.ErrCollectionNotFound)) - err := segments.WrapCollectionNotFound(req.GetReq().GetCollectionID()) - failRet.Status.Reason = err.Error() + err := merr.WrapErrCollectionNotLoaded(req.GetReq().GetCollectionID()) + log.Warn("failed to search segments", zap.Error(err)) + failRet.Status = merr.Status(err) return failRet, err } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index c6ba9ab9ef..2b02fc5e72 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -124,6 +124,9 @@ var ( // task related ErrTaskQueueFull = newMilvusError("task queue full", 1600, false) + // field related + ErrFieldNotFound = newMilvusError("field not found", 1700, false) + // Do NOT export this, // never allow programmer using this, keep only for converting unknown error to milvusError errUnexpected = newMilvusError("unexpected error", (1<<16)-1, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 3367a0278a..4ea7e5c1a3 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -123,6 +123,9 @@ func (s *ErrSuite) TestWrap() { // task related s.ErrorIs(WrapErrTaskQueueFull("test_task_queue", "task queue is full"), ErrTaskQueueFull) + + // field related + s.ErrorIs(WrapErrFieldNotFound("meta", "failed to get field"), ErrFieldNotFound) } func (s *ErrSuite) TestCombine() { diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 5665613a06..fd7ee5d90e 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -495,6 +495,15 @@ func WrapErrTaskQueueFull(msg ...string) error { return err } +// field related +func WrapErrFieldNotFound[T any](field T, msg ...string) error { + err := errors.Wrapf(ErrFieldNotFound, "field=%v", field) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + func wrapWithField(err error, name string, value any) error { return errors.Wrapf(err, "%s=%v", name, value) }