mirror of https://github.com/milvus-io/milvus.git
fix: [Cherry-pick] msgstream memory leak caused by config event don't deregister (#29268)
pr: https://github.com/milvus-io/milvus/pull/29266 relate: https://github.com/milvus-io/milvus/issues/28620 Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/28625/head
parent
2d33c7fe41
commit
af54ce9e20
|
@ -42,7 +42,10 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var _ MsgStream = (*mqMsgStream)(nil)
|
||||
var (
|
||||
_ MsgStream = (*mqMsgStream)(nil)
|
||||
streamCount atomic.Int64
|
||||
)
|
||||
|
||||
type mqMsgStream struct {
|
||||
ctx context.Context
|
||||
|
@ -63,6 +66,7 @@ type mqMsgStream struct {
|
|||
closed int32
|
||||
onceChan sync.Once
|
||||
enableProduce atomic.Value
|
||||
configEvent config.EventHandler
|
||||
}
|
||||
|
||||
// NewMqMsgStream is used to generate a new mqMsgStream object
|
||||
|
@ -98,7 +102,7 @@ func NewMqMsgStream(ctx context.Context,
|
|||
}
|
||||
ctxLog := log.Ctx(ctx)
|
||||
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
|
||||
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, config.NewHandler("enable send tt msg", func(event *config.Event) {
|
||||
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCount.Add(1)), func(event *config.Event) {
|
||||
value, err := strconv.ParseBool(event.Value)
|
||||
if err != nil {
|
||||
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
|
||||
|
@ -106,7 +110,8 @@ func NewMqMsgStream(ctx context.Context,
|
|||
}
|
||||
stream.enableProduce.Store(value)
|
||||
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
|
||||
}))
|
||||
})
|
||||
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
|
||||
ctxLog.Info("Msg Stream state", zap.Bool("can_produce", stream.isEnabledProduce()))
|
||||
|
||||
return stream, nil
|
||||
|
@ -229,6 +234,7 @@ func (ms *mqMsgStream) Close() {
|
|||
|
||||
ms.client.Close()
|
||||
close(ms.receiveBuf)
|
||||
paramtable.Get().Unwatch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, ms.configEvent)
|
||||
}
|
||||
|
||||
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
|
||||
|
|
|
@ -152,6 +152,10 @@ func (p *ComponentParam) Watch(key string, watcher config.EventHandler) {
|
|||
p.baseTable.mgr.Dispatcher.Register(key, watcher)
|
||||
}
|
||||
|
||||
func (p *ComponentParam) Unwatch(key string, watcher config.EventHandler) {
|
||||
p.baseTable.mgr.Dispatcher.Unregister(key, watcher)
|
||||
}
|
||||
|
||||
func (p *ComponentParam) WatchKeyPrefix(keyPrefix string, watcher config.EventHandler) {
|
||||
p.baseTable.mgr.Dispatcher.RegisterForKeyPrefix(keyPrefix, watcher)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue