enhance: Release level zero segments when channel unsub (#31486)

Related to #27349
See also #30816

Level zero is not allowed to balance among delegators, they shall always
serve current delegator. This PR releases all level zero segments after
channel is unsubscribed and preventing level zero segment blocking
graceful stop.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/31513/head
congqixia 2024-03-22 10:27:17 +08:00 committed by GitHub
parent 5c5f53d11b
commit d90e01532f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 2 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -367,6 +368,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
node.pipelineManager.Remove(req.GetChannelName())
node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
node.tSafeManager.Remove(ctx, req.GetChannelName())
node.manager.Collection.Unref(req.GetCollectionID(), 1)

View File

@ -45,6 +45,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
@ -465,6 +466,18 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
// prepate
suite.TestWatchDmChannelsInt64()
l0Segment := segments.NewMockSegment(suite.T())
l0Segment.EXPECT().ID().Return(10000)
l0Segment.EXPECT().Collection().Return(suite.collectionID)
l0Segment.EXPECT().Partition().Return(common.AllPartitionsID)
l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0)
l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed)
l0Segment.EXPECT().Indexes().Return(nil)
l0Segment.EXPECT().Shard().Return(suite.vchannel)
l0Segment.EXPECT().Release().Return()
suite.node.manager.Segment.Put(segments.SegmentTypeSealed, l0Segment)
// data
req := &querypb.UnsubDmChannelRequest{
Base: &commonpb.MsgBase{
@ -478,8 +491,11 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
}
status, err := suite.node.UnsubDmChannel(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
suite.NoError(merr.CheckRPCCall(status, err))
suite.Len(suite.node.manager.Segment.GetBy(
segments.WithChannel(suite.vchannel),
segments.WithLevel(datapb.SegmentLevel_L0)), 0)
}
func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {