fix: Fix subscription leak (#37382)

Close (unsubscribe) the msg stream after completing the
`PreCreatedTopic` check to prevent backlog issue.

issue: https://github.com/milvus-io/milvus/issues/36021

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/37551/head
yihao.dai 2024-11-08 16:16:26 +08:00 committed by GitHub
parent ae227e3934
commit ebc3c82761
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 24 additions and 11 deletions

View File

@ -185,17 +185,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
}
if params.PreCreatedTopicEnabled.GetAsBool() {
subName := fmt.Sprintf("pre-created-topic-check-%s", name)
ms.AsConsumer(ctx, []string{name}, subName, common.SubscriptionPositionUnknown)
// check if topic is existed
// kafka and rmq will err if the topic does not yet exist, pulsar will not
// allow topics is not empty, for the reason that when restart or upgrade, the topic is not empty
// if there are any message that not belong to milvus, will skip it
err := ms.CheckTopicValid(name)
if err != nil {
log.Error("created topic is invaild", zap.String("name", name), zap.Error(err))
panic("created topic is invaild")
}
d.checkPreCreatedTopic(ctx, factory, name)
}
ms.AsProducer([]string{name})
@ -220,6 +210,29 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
return d
}
func (d *dmlChannels) checkPreCreatedTopic(ctx context.Context, factory msgstream.Factory, name string) {
tmpMs, err := factory.NewMsgStream(ctx)
if err != nil {
panic(fmt.Sprintf("failed to add msgstream, name:%s, err:%v", name, err))
}
defer tmpMs.Close()
subName := fmt.Sprintf("pre-created-topic-check-%s", name)
err = tmpMs.AsConsumer(ctx, []string{name}, subName, common.SubscriptionPositionUnknown)
if err != nil {
panic(fmt.Sprintf("failed to add consumer, name:%s, err:%v", name, err))
}
// check if topic is existed
// kafka and rmq will err if the topic does not yet exist, pulsar will not
// allow topics is not empty, for the reason that when restart or upgrade, the topic is not empty
// if there are any message that not belong to milvus, will skip it
err = tmpMs.CheckTopicValid(name)
if err != nil {
panic(fmt.Sprintf("created topic is invalid, name:%s, err:%v", name, err))
}
}
func (d *dmlChannels) getChannelNames(count int) []string {
d.mut.Lock()
defer d.mut.Unlock()