From 628a3274bfa150cecf7bdf63d3f11ae35684e3a7 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 3 Nov 2021 20:18:48 +0800 Subject: [PATCH] Use lazy mode to set dml msgstream as producer (#10993) Signed-off-by: yudong.cai --- internal/rootcoord/dml_channels.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 48e2b73a39..fc848b15de 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -49,7 +49,6 @@ func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels log.Error("Failed to add msgstream", zap.String("name", name), zap.Error(err)) panic("Failed to add msgstream") } - ms.AsProducer([]string{name}) d.pool.Store(name, &ms) } log.Debug("init dml channels", zap.Int64("num", chanNum)) @@ -122,9 +121,11 @@ func (d *dmlChannels) BroadcastMark(chanNames []string, pack *msgstream.MsgPack) // AddProducerChannels add named channels as producer func (d *dmlChannels) AddProducerChannels(names ...string) { for _, name := range names { - if _, ok := d.pool.Load(name); ok { + if v, ok := d.pool.Load(name); ok { var cnt int64 if _, ok := d.refcnt.Load(name); !ok { + ms := *(v.(*msgstream.MsgStream)) + ms.AsProducer([]string{name}) cnt = 1 } else { v, _ := d.refcnt.Load(name) @@ -147,6 +148,9 @@ func (d *dmlChannels) RemoveProducerChannels(names ...string) { if cnt > 1 { d.refcnt.Store(name, cnt-1) } else { + v1, _ := d.pool.Load(name) + ms := *(v1.(*msgstream.MsgStream)) + ms.Close() d.refcnt.Delete(name) } }