mirror of https://github.com/milvus-io/milvus.git
fix: Release loaded growing if WatchDmlChannel fail (#30735)
See also #30734 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/30744/head
parent
12acaf3e4f
commit
f3b7a8892f
|
@ -195,7 +195,7 @@ func (node *QueryNode) composeIndexMeta(indexInfos []*indexpb.IndexInfo, schema
|
|||
}
|
||||
|
||||
// WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
|
||||
func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
|
||||
func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (status *commonpb.Status, e error) {
|
||||
channel := req.GetInfos()[0]
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
|
@ -246,6 +246,11 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
|||
|
||||
node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
|
||||
node.composeIndexMeta(req.GetIndexInfoList(), req.Schema), req.GetLoadMeta())
|
||||
defer func() {
|
||||
if !merr.Ok(status) {
|
||||
node.manager.Collection.Unref(req.GetCollectionID(), 1)
|
||||
}
|
||||
}()
|
||||
|
||||
delegator, err := delegator.NewShardDelegator(
|
||||
ctx,
|
||||
|
@ -316,6 +321,9 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
|||
}
|
||||
err = loadGrowingSegments(ctx, delegator, req)
|
||||
if err != nil {
|
||||
// remove legacy growing
|
||||
node.manager.Segment.RemoveBy(segments.WithChannel(channel.GetChannelName()),
|
||||
segments.WithType(segments.SegmentTypeGrowing))
|
||||
msg := "failed to load growing segments"
|
||||
log.Warn(msg, zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
|
|
|
@ -324,6 +324,7 @@ func (suite *ServiceSuite) TestWatchDmChannelsVarchar() {
|
|||
|
||||
// data
|
||||
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_VarChar)
|
||||
|
||||
req := &querypb.WatchDmChannelsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_WatchDmChannels,
|
||||
|
@ -375,6 +376,23 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
|
|||
|
||||
// data
|
||||
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
|
||||
|
||||
indexInfos := segments.GenTestIndexInfoList(suite.collectionID, schema)
|
||||
|
||||
infos := suite.genSegmentLoadInfos(schema, indexInfos)
|
||||
segmentInfos := lo.SliceToMap(infos, func(info *querypb.SegmentLoadInfo) (int64, *datapb.SegmentInfo) {
|
||||
return info.SegmentID, &datapb.SegmentInfo{
|
||||
ID: info.SegmentID,
|
||||
CollectionID: info.CollectionID,
|
||||
PartitionID: info.PartitionID,
|
||||
InsertChannel: info.InsertChannel,
|
||||
Binlogs: info.BinlogPaths,
|
||||
Statslogs: info.Statslogs,
|
||||
Deltalogs: info.Deltalogs,
|
||||
Level: info.Level,
|
||||
}
|
||||
})
|
||||
|
||||
req := &querypb.WatchDmChannelsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_WatchDmChannels,
|
||||
|
@ -397,7 +415,8 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
|
|||
LoadMeta: &querypb.LoadMetaInfo{
|
||||
MetricType: defaultMetricType,
|
||||
},
|
||||
IndexInfoList: segments.GenTestIndexInfoList(suite.collectionID, schema),
|
||||
SegmentInfos: segmentInfos,
|
||||
IndexInfoList: indexInfos,
|
||||
}
|
||||
|
||||
// test channel is unsubscribing
|
||||
|
@ -411,12 +430,28 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
|
|||
suite.factory.EXPECT().NewTtMsgStream(mock.Anything).Return(suite.msgStream, nil)
|
||||
suite.msgStream.EXPECT().AsConsumer(mock.Anything, []string{suite.pchannel}, mock.Anything, mock.Anything).Return(nil)
|
||||
suite.msgStream.EXPECT().Close().Return()
|
||||
suite.msgStream.EXPECT().Seek(mock.Anything, mock.Anything).Return(errors.New("mock error"))
|
||||
suite.msgStream.EXPECT().Seek(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
|
||||
|
||||
status, err = suite.node.WatchDmChannels(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
|
||||
// load growing failed
|
||||
badSegmentReq := typeutil.Clone(req)
|
||||
for _, info := range badSegmentReq.SegmentInfos {
|
||||
for _, fbl := range info.Binlogs {
|
||||
for _, binlog := range fbl.Binlogs {
|
||||
binlog.LogPath += "bad_suffix"
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, channel := range badSegmentReq.Infos {
|
||||
channel.UnflushedSegmentIds = lo.Keys(badSegmentReq.SegmentInfos)
|
||||
}
|
||||
status, err = suite.node.WatchDmChannels(ctx, badSegmentReq)
|
||||
err = merr.CheckRPCCall(status, err)
|
||||
suite.Error(err)
|
||||
|
||||
// empty index
|
||||
req.IndexInfoList = nil
|
||||
status, err = suite.node.WatchDmChannels(ctx, req)
|
||||
|
|
Loading…
Reference in New Issue