Let dml tolerant physical channels generated by old version (#7644)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/7672/head
Cai Yudong 2021-09-09 18:08:02 +08:00 committed by GitHub
parent 4485b6f819
commit d5f7e358aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 16 deletions

View File

@ -29,6 +29,7 @@ type dmlChannels struct {
refcnt sync.Map
idx *atomic.Int64
pool sync.Map
dml sync.Map
}
func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels {
@ -39,6 +40,7 @@ func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels
refcnt: sync.Map{},
idx: atomic.NewInt64(0),
pool: sync.Map{},
dml: sync.Map{},
}
var i int64
@ -83,12 +85,18 @@ func (d *dmlChannels) GetNumChannels() int {
func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error {
for _, chanName := range chanNames {
// only in-use chanName exist in refcnt
if _, ok := d.refcnt.Load(chanName); !ok {
return fmt.Errorf("channel %s not exist", chanName)
}
v, _ := d.pool.Load(chanName)
if err := (*(v.(*msgstream.MsgStream))).Broadcast(pack); err != nil {
return err
if _, ok := d.refcnt.Load(chanName); ok {
v, _ := d.pool.Load(chanName)
if err := (*(v.(*msgstream.MsgStream))).Broadcast(pack); err != nil {
return err
}
} else {
log.Debug("broadcast to old version channel", zap.String("chanName", chanName))
if ds, ok := d.dml.Load(chanName); ok {
if err := (*(ds.(*msgstream.MsgStream))).Broadcast(pack); err != nil {
return err
}
}
}
}
return nil
@ -97,10 +105,7 @@ func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) err
// AddProducerChannels add named channels as producer
func (d *dmlChannels) AddProducerChannels(names ...string) {
for _, name := range names {
if _, ok := d.pool.Load(name); !ok {
log.Error("invalid channel name", zap.String("chanName", name))
panic("invalid channel name")
} else {
if _, ok := d.pool.Load(name); ok {
var cnt int64
if _, ok := d.refcnt.Load(name); !ok {
cnt = 1
@ -110,6 +115,15 @@ func (d *dmlChannels) AddProducerChannels(names ...string) {
}
d.refcnt.Store(name, cnt)
log.Debug("assign dml channel", zap.String("chanName", name), zap.Int64("refcnt", cnt))
} else {
log.Debug("add old version channel name", zap.String("chanName", name))
ms, err := d.core.msFactory.NewMsgStream(d.core.ctx)
if err != nil {
log.Error("add msgstream failed", zap.String("name", name), zap.Error(err))
panic("add msgstream failed")
}
ms.AsProducer([]string{name})
d.dml.Store(name, &ms)
}
}
}
@ -117,14 +131,19 @@ func (d *dmlChannels) AddProducerChannels(names ...string) {
// RemoveProducerChannels removes specified channels
func (d *dmlChannels) RemoveProducerChannels(names ...string) {
for _, name := range names {
v, ok := d.refcnt.Load(name)
if ok {
if v, ok := d.refcnt.Load(name); ok {
cnt := v.(int64)
if cnt > 1 {
d.refcnt.Store(name, cnt-1)
} else {
d.refcnt.Delete(name)
}
} else {
log.Debug("remove old version channel name", zap.String("chanName", name))
if ds, ok := d.dml.Load(name); ok {
(*(ds.(*msgstream.MsgStream))).Close()
d.dml.Delete(name)
}
}
}
}

View File

@ -13,7 +13,6 @@ package rootcoord
import (
"context"
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/msgstream"
@ -46,12 +45,20 @@ func TestDmlChannels(t *testing.T) {
chanNames := dml.ListChannels()
assert.Equal(t, 0, len(chanNames))
//randStr := funcutil.RandomString(8)
//assert.Panics(t, func() { dml.AddProducerChannels(randStr) })
//
//err = dml.Broadcast([]string{randStr}, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, fmt.Sprintf("channel %s not exist", randStr))
randStr := funcutil.RandomString(8)
assert.Panics(t, func() { dml.AddProducerChannels(randStr) })
dml.AddProducerChannels(randStr)
err = dml.Broadcast([]string{randStr}, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("channel %s not exist", randStr))
assert.Nil(t, err)
dml.RemoveProducerChannels(randStr)
// dml_xxx_0 => {chanName0, chanName2}
// dml_xxx_1 => {chanName1}