enhance: [Cherry-pick] Release level zero segments when channel unsub (#31486) (#31509)

Cherry-pick from master
pr: #31486
Related to #27349
See also #30816

Level zero is not allowed to balance among delegators, they shall always
serve current delegator. This PR releases all level zero segments after
channel is unsubscribed and preventing level zero segment blocking
graceful stop.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/31542/head^2
congqixia 2024-03-25 10:21:08 +08:00 committed by GitHub
parent 1e0bf5acd2
commit 6e0baa47e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 2 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -367,6 +368,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
node.pipelineManager.Remove(req.GetChannelName())
node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
node.tSafeManager.Remove(ctx, req.GetChannelName())
node.manager.Collection.Unref(req.GetCollectionID(), 1)

View File

@ -45,6 +45,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
@ -465,6 +466,18 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
// prepate
suite.TestWatchDmChannelsInt64()
l0Segment := segments.NewMockSegment(suite.T())
l0Segment.EXPECT().ID().Return(10000)
l0Segment.EXPECT().Collection().Return(suite.collectionID)
l0Segment.EXPECT().Partition().Return(common.AllPartitionsID)
l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0)
l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed)
l0Segment.EXPECT().Indexes().Return(nil)
l0Segment.EXPECT().Shard().Return(suite.vchannel)
l0Segment.EXPECT().Release().Return()
suite.node.manager.Segment.Put(segments.SegmentTypeSealed, l0Segment)
// data
req := &querypb.UnsubDmChannelRequest{
Base: &commonpb.MsgBase{
@ -478,8 +491,11 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
}
status, err := suite.node.UnsubDmChannel(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
suite.NoError(merr.CheckRPCCall(status, err))
suite.Len(suite.node.manager.Segment.GetBy(
segments.WithChannel(suite.vchannel),
segments.WithLevel(datapb.SegmentLevel_L0)), 0)
}
func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {