mirror of https://github.com/milvus-io/milvus.git
Fix QueryNode panic while inserting to released segment (#25495)
Signed-off-by: yah01 <yang.cen@zilliz.com>pull/25466/head
parent
a577cac46b
commit
205a7c430a
|
@ -110,6 +110,10 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
|||
zap.Int64("segmentID", segmentID),
|
||||
zap.Error(err),
|
||||
)
|
||||
if errors.IsAny(err, merr.ErrSegmentNotLoaded, merr.ErrSegmentNotFound) {
|
||||
log.Warn("try to insert data into released segment, skip it", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
// panic here, insert failure
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -167,8 +167,9 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
|
|||
|
||||
collection := node.manager.Collection.Get(req.Req.GetCollectionID())
|
||||
if collection == nil {
|
||||
log.Warn("Query failed, failed to get collection")
|
||||
failRet.Status.Reason = segments.WrapCollectionNotFound(req.Req.CollectionID).Error()
|
||||
err := merr.WrapErrCollectionNotFound(req.Req.GetCollectionID())
|
||||
log.Warn("Query failed, failed to get collection", zap.Error(err))
|
||||
failRet.Status = merr.Status(err)
|
||||
return failRet, nil
|
||||
}
|
||||
|
||||
|
@ -196,7 +197,7 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque
|
|||
func (node *QueryNode) querySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
|
||||
collection := node.manager.Collection.Get(req.Req.GetCollectionID())
|
||||
if collection == nil {
|
||||
return nil, segments.ErrCollectionNotFound
|
||||
return nil, merr.WrapErrCollectionNotFound(req.Req.GetCollectionID())
|
||||
}
|
||||
|
||||
// Send task to scheduler and wait until it finished.
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||
base "github.com/milvus-io/milvus/internal/util/pipeline"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
|
@ -73,7 +72,7 @@ func (fNode *filterNode) Operate(in Msg) Msg {
|
|||
//Get collection from collection manager
|
||||
collection := fNode.manager.Collection.Get(fNode.collectionID)
|
||||
if collection == nil {
|
||||
err := segments.WrapCollectionNotFound(fNode.collectionID)
|
||||
err := merr.WrapErrCollectionNotFound(fNode.collectionID)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
|
||||
|
@ -71,7 +70,7 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) {
|
|||
tr := timerecord.NewTimeRecorder("add dmChannel")
|
||||
collection := m.dataManager.Collection.Get(collectionID)
|
||||
if collection == nil {
|
||||
return nil, segments.WrapCollectionNotFound(collectionID)
|
||||
return nil, merr.WrapErrCollectionNotFound(collectionID)
|
||||
}
|
||||
|
||||
if pipeline, ok := m.channel2Pipeline[channel]; ok {
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package segments
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
// Manager related errors
|
||||
ErrCollectionNotFound = errors.New("CollectionNotFound")
|
||||
ErrPartitionNotFound = errors.New("PartitionNotFound")
|
||||
ErrSegmentNotFound = errors.New("SegmentNotFound")
|
||||
ErrFieldNotFound = errors.New("FieldNotFound")
|
||||
ErrSegmentReleased = errors.New("SegmentReleased")
|
||||
)
|
||||
|
||||
func WrapSegmentNotFound(segmentID int64) error {
|
||||
return fmt.Errorf("%w(%v)", ErrSegmentNotFound, segmentID)
|
||||
}
|
||||
|
||||
func WrapCollectionNotFound(collectionID int64) error {
|
||||
return fmt.Errorf("%w(%v)", ErrCollectionNotFound, collectionID)
|
||||
}
|
||||
|
||||
func WrapFieldNotFound(fieldID int64) error {
|
||||
return fmt.Errorf("%w(%v)", ErrFieldNotFound, fieldID)
|
||||
}
|
||||
|
||||
// WrapSegmentReleased wrap ErrSegmentReleased with segmentID.
|
||||
func WrapSegmentReleased(segmentID int64) error {
|
||||
return fmt.Errorf("%w(%d)", ErrSegmentReleased, segmentID)
|
||||
}
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/initcore"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
|
@ -178,7 +179,7 @@ func (suite *RetrieveSuite) TestRetrieveNilSegment() {
|
|||
suite.collectionID,
|
||||
[]int64{suite.partitionID},
|
||||
[]int64{suite.sealed.ID()})
|
||||
suite.ErrorIs(err, ErrSegmentReleased)
|
||||
suite.ErrorIs(err, merr.ErrSegmentNotLoaded)
|
||||
suite.Len(res, 0)
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
@ -364,7 +365,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return nil, WrapSegmentReleased(s.segmentID)
|
||||
return nil, merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
span := trace.SpanFromContext(ctx)
|
||||
|
@ -407,7 +408,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return nil, WrapSegmentReleased(s.segmentID)
|
||||
return nil, merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
log := log.With(
|
||||
|
@ -533,7 +534,7 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return WrapSegmentReleased(s.segmentID)
|
||||
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
offset, err := s.preInsert(len(rowIDs))
|
||||
|
@ -592,7 +593,7 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return WrapSegmentReleased(s.segmentID)
|
||||
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
var cOffset = C.int64_t(0) // depre
|
||||
|
@ -657,7 +658,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return WrapSegmentReleased(s.segmentID)
|
||||
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
log := log.With(
|
||||
|
@ -710,7 +711,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return WrapSegmentReleased(s.segmentID)
|
||||
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
log := log.With(
|
||||
|
@ -763,7 +764,7 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return WrapSegmentReleased(s.segmentID)
|
||||
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
log := log.With(
|
||||
|
@ -854,7 +855,7 @@ func (s *LocalSegment) LoadIndex(bytesIndex [][]byte, indexInfo *querypb.FieldIn
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return WrapSegmentReleased(s.segmentID)
|
||||
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
log := log.With(
|
||||
|
@ -902,7 +903,7 @@ func (s *LocalSegment) LoadIndexData(indexInfo *querypb.FieldIndexInfo, fieldTyp
|
|||
defer s.mut.RUnlock()
|
||||
|
||||
if s.ptr == nil {
|
||||
return WrapSegmentReleased(s.segmentID)
|
||||
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
|
||||
}
|
||||
|
||||
log := log.With(
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
|
@ -171,7 +172,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
|||
|
||||
collection := loader.manager.Collection.Get(collectionID)
|
||||
if collection == nil {
|
||||
err := WrapCollectionNotFound(collectionID)
|
||||
err := merr.WrapErrCollectionNotFound(collectionID)
|
||||
log.Warn("failed to get collection", zap.Error(err))
|
||||
clearAll()
|
||||
return nil, err
|
||||
|
@ -345,7 +346,7 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI
|
|||
|
||||
collection := loader.manager.Collection.Get(collectionID)
|
||||
if collection == nil {
|
||||
err := WrapCollectionNotFound(collectionID)
|
||||
err := merr.WrapErrCollectionNotFound(collectionID)
|
||||
log.Warn("failed to get collection while loading segment", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
@ -403,7 +404,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
|
|||
|
||||
collection := loader.manager.Collection.Get(segment.Collection())
|
||||
if collection == nil {
|
||||
err := WrapCollectionNotFound(segment.Collection())
|
||||
err := merr.WrapErrCollectionNotFound(segment.Collection())
|
||||
log.Warn("failed to get collection while loading segment", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
@ -613,7 +614,7 @@ func (loader *segmentLoader) insertIntoSegment(segment *LocalSegment,
|
|||
}
|
||||
collection := loader.manager.Collection.Get(segment.Collection())
|
||||
if collection == nil {
|
||||
err := WrapCollectionNotFound(segment.Collection())
|
||||
err := merr.WrapErrCollectionNotFound(segment.Collection())
|
||||
log.Warn("failed to get collection while inserting data into segment", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
@ -911,7 +912,7 @@ func (loader *segmentLoader) checkSegmentSize(segmentLoadInfos []*querypb.Segmen
|
|||
func (loader *segmentLoader) getFieldType(segment *LocalSegment, fieldID int64) (schemapb.DataType, error) {
|
||||
collection := loader.manager.Collection.Get(segment.collectionID)
|
||||
if collection == nil {
|
||||
return 0, WrapCollectionNotFound(segment.Collection())
|
||||
return 0, merr.WrapErrCollectionNotFound(segment.Collection())
|
||||
}
|
||||
|
||||
for _, field := range collection.Schema().GetFields() {
|
||||
|
@ -919,7 +920,7 @@ func (loader *segmentLoader) getFieldType(segment *LocalSegment, fieldID int64)
|
|||
return field.GetDataType(), nil
|
||||
}
|
||||
}
|
||||
return 0, WrapFieldNotFound(fieldID)
|
||||
return 0, merr.WrapErrFieldNotFound(fieldID)
|
||||
}
|
||||
|
||||
func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
func validate(ctx context.Context, manager *Manager, collectionID int64, partitionIDs []int64, segmentIDs []int64, segmentFilter SegmentFilter) ([]int64, []int64, error) {
|
||||
|
@ -34,7 +35,7 @@ func validate(ctx context.Context, manager *Manager, collectionID int64, partiti
|
|||
|
||||
collection := manager.Collection.Get(collectionID)
|
||||
if collection == nil {
|
||||
return nil, nil, WrapCollectionNotFound(collectionID)
|
||||
return nil, nil, merr.WrapErrCollectionNotFound(collectionID)
|
||||
}
|
||||
|
||||
//validate partition
|
||||
|
|
|
@ -740,9 +740,9 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
|
|||
|
||||
collection := node.manager.Collection.Get(req.Req.GetCollectionID())
|
||||
if collection == nil {
|
||||
log.Warn("failed to search segments", zap.Error(segments.ErrCollectionNotFound))
|
||||
err := segments.WrapCollectionNotFound(req.GetReq().GetCollectionID())
|
||||
failRet.Status.Reason = err.Error()
|
||||
err := merr.WrapErrCollectionNotLoaded(req.GetReq().GetCollectionID())
|
||||
log.Warn("failed to search segments", zap.Error(err))
|
||||
failRet.Status = merr.Status(err)
|
||||
return failRet, err
|
||||
}
|
||||
|
||||
|
|
|
@ -124,6 +124,9 @@ var (
|
|||
// task related
|
||||
ErrTaskQueueFull = newMilvusError("task queue full", 1600, false)
|
||||
|
||||
// field related
|
||||
ErrFieldNotFound = newMilvusError("field not found", 1700, false)
|
||||
|
||||
// Do NOT export this,
|
||||
// never allow programmer using this, keep only for converting unknown error to milvusError
|
||||
errUnexpected = newMilvusError("unexpected error", (1<<16)-1, false)
|
||||
|
|
|
@ -123,6 +123,9 @@ func (s *ErrSuite) TestWrap() {
|
|||
|
||||
// task related
|
||||
s.ErrorIs(WrapErrTaskQueueFull("test_task_queue", "task queue is full"), ErrTaskQueueFull)
|
||||
|
||||
// field related
|
||||
s.ErrorIs(WrapErrFieldNotFound("meta", "failed to get field"), ErrFieldNotFound)
|
||||
}
|
||||
|
||||
func (s *ErrSuite) TestCombine() {
|
||||
|
|
|
@ -495,6 +495,15 @@ func WrapErrTaskQueueFull(msg ...string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// field related
|
||||
func WrapErrFieldNotFound[T any](field T, msg ...string) error {
|
||||
err := errors.Wrapf(ErrFieldNotFound, "field=%v", field)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func wrapWithField(err error, name string, value any) error {
|
||||
return errors.Wrapf(err, "%s=%v", name, value)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue