fix: main dispatcher not terminate cause messsage stream leak. (#40061) (#40351)

Main dispatcher will leak when we remove solo dispatcher in the end.
relate: https://github.com/milvus-io/milvus/issues/40046
pr: https://github.com/milvus-io/milvus/pull/40061

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/40393/head
aoiasd 2025-03-05 20:14:02 +08:00 committed by GitHub
parent 345ca01634
commit ebf3416245
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 49 additions and 10 deletions

View File

@ -82,3 +82,40 @@ func TestClient_Concurrency(t *testing.T) {
n := c.managers.Len()
assert.Equal(t, expected, n)
}
func TestClientMainDispatcherLeak(t *testing.T) {
client := NewClient(newMockFactory(), typeutil.ProxyRole, 1)
assert.NotNil(t, client)
pchannel := "mock_vchannel_0"
vchannel1 := fmt.Sprintf("%s_abc_v0", pchannel) //"mock_vchannel_0_abc_v0"
vchannel2 := fmt.Sprintf("%s_abc_v1", pchannel) //"mock_vchannel_0_abc_v0"
_, err := client.Register(context.Background(), NewStreamConfig(vchannel1, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
_, err = client.Register(context.Background(), NewStreamConfig(vchannel2, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
client.Deregister(vchannel2)
client.Deregister(vchannel1)
assert.NotPanics(
t, func() {
_, err = client.Register(context.Background(), NewStreamConfig(vchannel1, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
_, err = client.Register(context.Background(), NewStreamConfig(vchannel2, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
},
)
client.Deregister(vchannel1)
client.Deregister(vchannel2)
assert.NotPanics(
t, func() {
_, err = client.Register(context.Background(), NewStreamConfig(vchannel1, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
_, err = client.Register(context.Background(), NewStreamConfig(vchannel2, nil, common.SubscriptionPositionUnknown))
assert.NoError(t, err)
},
)
}

View File

@ -126,16 +126,6 @@ func (c *dispatcherManager) Remove(vchannel string) {
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
c.mu.Lock()
defer c.mu.Unlock()
if c.mainDispatcher != nil {
c.mainDispatcher.Handle(pause)
c.mainDispatcher.CloseTarget(vchannel)
if c.mainDispatcher.TargetNum() == 0 && len(c.soloDispatchers) == 0 {
c.mainDispatcher.Handle(terminate)
c.mainDispatcher = nil
} else {
c.mainDispatcher.Handle(resume)
}
}
if _, ok := c.soloDispatchers[vchannel]; ok {
c.soloDispatchers[vchannel].Handle(terminate)
c.soloDispatchers[vchannel].CloseTarget(vchannel)
@ -144,6 +134,18 @@ func (c *dispatcherManager) Remove(vchannel string) {
log.Info("remove soloDispatcher done")
}
c.lagTargets.GetAndRemove(vchannel)
if c.mainDispatcher != nil {
c.mainDispatcher.Handle(pause)
c.mainDispatcher.CloseTarget(vchannel)
if c.mainDispatcher.TargetNum() == 0 && len(c.soloDispatchers) == 0 {
c.mainDispatcher.Handle(terminate)
c.mainDispatcher = nil
log.Info("remove mainDispatcher done")
} else {
c.mainDispatcher.Handle(resume)
}
}
}
func (c *dispatcherManager) NumTarget() int {