mirror of https://github.com/milvus-io/milvus.git
Use lazy mode to set dml msgstream as producer (#10993)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/11192/head
parent
2c687d372e
commit
628a3274bf
|
@ -49,7 +49,6 @@ func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels
|
||||||
log.Error("Failed to add msgstream", zap.String("name", name), zap.Error(err))
|
log.Error("Failed to add msgstream", zap.String("name", name), zap.Error(err))
|
||||||
panic("Failed to add msgstream")
|
panic("Failed to add msgstream")
|
||||||
}
|
}
|
||||||
ms.AsProducer([]string{name})
|
|
||||||
d.pool.Store(name, &ms)
|
d.pool.Store(name, &ms)
|
||||||
}
|
}
|
||||||
log.Debug("init dml channels", zap.Int64("num", chanNum))
|
log.Debug("init dml channels", zap.Int64("num", chanNum))
|
||||||
|
@ -122,9 +121,11 @@ func (d *dmlChannels) BroadcastMark(chanNames []string, pack *msgstream.MsgPack)
|
||||||
// AddProducerChannels add named channels as producer
|
// AddProducerChannels add named channels as producer
|
||||||
func (d *dmlChannels) AddProducerChannels(names ...string) {
|
func (d *dmlChannels) AddProducerChannels(names ...string) {
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
if _, ok := d.pool.Load(name); ok {
|
if v, ok := d.pool.Load(name); ok {
|
||||||
var cnt int64
|
var cnt int64
|
||||||
if _, ok := d.refcnt.Load(name); !ok {
|
if _, ok := d.refcnt.Load(name); !ok {
|
||||||
|
ms := *(v.(*msgstream.MsgStream))
|
||||||
|
ms.AsProducer([]string{name})
|
||||||
cnt = 1
|
cnt = 1
|
||||||
} else {
|
} else {
|
||||||
v, _ := d.refcnt.Load(name)
|
v, _ := d.refcnt.Load(name)
|
||||||
|
@ -147,6 +148,9 @@ func (d *dmlChannels) RemoveProducerChannels(names ...string) {
|
||||||
if cnt > 1 {
|
if cnt > 1 {
|
||||||
d.refcnt.Store(name, cnt-1)
|
d.refcnt.Store(name, cnt-1)
|
||||||
} else {
|
} else {
|
||||||
|
v1, _ := d.pool.Load(name)
|
||||||
|
ms := *(v1.(*msgstream.MsgStream))
|
||||||
|
ms.Close()
|
||||||
d.refcnt.Delete(name)
|
d.refcnt.Delete(name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue