Add deltaChannels in rootcoord core (#11097)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/11198/head
Cai Yudong 2021-11-03 21:04:14 +08:00 committed by GitHub
parent 93d0616457
commit a2ad88237d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 94 additions and 2 deletions

View File

@ -152,6 +152,7 @@ msgChannel:
rootCoordTimeTick: "rootcoord-timetick"
rootCoordStatistics: "rootcoord-statistics"
rootCoordDml: "rootcoord-dml"
rootCoordDelta: "rootcoord-delta"
search: "search"
searchResult: "searchResult"
proxyTimeTick: "proxyTimeTick"

View File

@ -40,6 +40,7 @@ type ParamTable struct {
TimeTickChannel string
StatisticsChannel string
DmlChannelName string
DeltaChannelName string
DmlChannelNum int64
MaxPartitionNum int64
@ -81,6 +82,7 @@ func (p *ParamTable) Init() {
p.initTimeTickChannel()
p.initStatisticsChannelName()
p.initDmlChannelName()
p.initDeltaChannelName()
p.initDmlChannelNum()
p.initMaxPartitionNum()
@ -178,6 +180,15 @@ func (p *ParamTable) initDmlChannelName() {
p.DmlChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initDeltaChannelName() {
config, err := p.Load("msgChannel.chanNamePrefix.rootCoordDelta")
if err != nil {
config = "rootcoord-delta"
}
s := []string{p.ClusterChannelPrefix, config}
p.DeltaChannelName = strings.Join(s, "-")
}
func (p *ParamTable) initDmlChannelNum() {
p.DmlChannelNum = p.ParseInt64WithDefault("rootCoord.dmlChannelNum", 256)
}

View File

@ -45,6 +45,9 @@ func TestParamTable(t *testing.T) {
assert.Equal(t, Params.DmlChannelName, "by-dev-rootcoord-dml")
t.Logf("dml channel = %s", Params.DmlChannelName)
assert.Equal(t, Params.DeltaChannelName, "by-dev-rootcoord-delta")
t.Logf("delta channel = %s", Params.DeltaChannelName)
assert.NotEqual(t, Params.MaxPartitionNum, 0)
t.Logf("master MaxPartitionNum = %d", Params.MaxPartitionNum)

View File

@ -124,9 +124,12 @@ type Core struct {
CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error
CallReleasePartitionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) error
//dml channels
// dml channels used for insert
dmlChannels *dmlChannels
// delta channels used for delete
deltaChannels *dmlChannels
//Proxy manager
proxyManager *proxyManager
@ -925,13 +928,28 @@ func (c *Core) Init() error {
return
}
// initialize dml channels used for insert
c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum)
// initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels
c.deltaChannels = newDmlChannels(c, Params.DeltaChannelName, Params.DmlChannelNum)
// recover physical channels for all collections
chanMap := c.MetaTable.ListCollectionPhysicalChannels()
for collID, chanNames := range chanMap {
c.dmlChannels.AddProducerChannels(chanNames...)
log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("chanNames", chanNames))
log.Debug("recover physical channels", zap.Int64("collID", collID), zap.Any("physical channels", chanNames))
// TODO: convert physical channel name to delta channel name
for _, chanName := range chanNames {
deltaChanName, err := ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName)
if err != nil {
log.Error("failed to convert dml channel name to delta channel name", zap.String("chanName", chanName))
return
}
c.deltaChannels.AddProducerChannels(deltaChanName)
log.Debug("recover delta channels", zap.Int64("collID", collID), zap.String("deltaChanName", deltaChanName))
}
}
c.chanTimeTick = newTimeTickSync(c)

View File

@ -530,6 +530,7 @@ func TestRootCoord(t *testing.T) {
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
Params.DmlChannelName = fmt.Sprintf("rootcoord-dml-test-%d", randVal)
Params.DeltaChannelName = fmt.Sprintf("rootcoord-delta-test-%d", randVal)
err = core.Register()
assert.Nil(t, err)
@ -1732,6 +1733,11 @@ func TestRootCoord(t *testing.T) {
cn2 := core.dmlChannels.GetDmlMsgStreamName()
core.dmlChannels.AddProducerChannels(cn0, cn1, cn2)
dn0 := core.deltaChannels.GetDmlMsgStreamName()
dn1 := core.deltaChannels.GetDmlMsgStreamName()
dn2 := core.deltaChannels.GetDmlMsgStreamName()
core.deltaChannels.AddProducerChannels(dn0, dn1, dn2)
msg0 := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,

View File

@ -135,9 +135,16 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
vchanNames := make([]string, t.Req.ShardsNum)
chanNames := make([]string, t.Req.ShardsNum)
deltaChanNames := make([]string, t.Req.ShardsNum)
for i := int32(0); i < t.Req.ShardsNum; i++ {
vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.dmlChannels.GetDmlMsgStreamName(), collID, i)
chanNames[i] = ToPhysicalChannel(vchanNames[i])
deltaChanNames[i] = t.core.deltaChannels.GetDmlMsgStreamName()
deltaChanName, err1 := ConvertChannelName(chanNames[i], Params.DmlChannelName, Params.DeltaChannelName)
if err1 != nil || deltaChanName != deltaChanNames[i] {
return fmt.Errorf("dmlChanName %s and deltaChanName %s mis-match", chanNames[i], deltaChanNames[i])
}
}
collInfo := etcdpb.CollectionInfo{
@ -201,6 +208,9 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
// add dml channel before send dd msg
t.core.dmlChannels.AddProducerChannels(chanNames...)
// also add delta channels
t.core.deltaChannels.AddProducerChannels(deltaChanNames...)
ids, err := t.core.SendDdCreateCollectionReq(ctx, &ddCollReq, chanNames)
if err != nil {
return fmt.Errorf("send dd create collection req failed, error = %w", err)
@ -214,6 +224,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
err = t.core.MetaTable.AddCollection(&collInfo, ts, idxInfo, ddOpStr)
if err != nil {
t.core.dmlChannels.RemoveProducerChannels(chanNames...)
t.core.deltaChannels.RemoveProducerChannels(deltaChanNames...)
// it's ok just to leave create collection message sent, datanode and querynode does't process CreateCollection logic
return fmt.Errorf("meta table add collection failed,error = %w", err)
}
@ -309,6 +320,16 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
// remove dml channel after send dd msg
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)
// remove delta channels
deltaChanNames := make([]string, len(collMeta.PhysicalChannelNames))
for i, chanName := range collMeta.PhysicalChannelNames {
deltaChanNames[i], err = ConvertChannelName(chanName, Params.DmlChannelName, Params.DeltaChannelName)
if err != nil {
return err
}
}
t.core.deltaChannels.RemoveProducerChannels(deltaChanNames...)
return nil
}

View File

@ -136,3 +136,19 @@ func ToPhysicalChannel(vchannel string) string {
}
return vchannel[:idx]
}
func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (string, error) {
chanNameLen := len(chanName)
tokenFromLen := len(tokenFrom)
if chanNameLen < tokenFromLen {
return "", fmt.Errorf("cannot find token '%s' in '%s'", tokenFrom, chanName)
}
var i int
for i = 0; i < (chanNameLen - tokenFromLen); i++ {
if chanName[i:i+tokenFromLen] == tokenFrom {
return chanName[0:i] + tokenTo + chanName[i+tokenFromLen:], nil
}
}
return "", fmt.Errorf("cannot find token '%s' in '%s'", tokenFrom, chanName)
}

View File

@ -135,3 +135,19 @@ func Test_DecodeMsgPositions(t *testing.T) {
err = DecodeMsgPositions("null", &mpOut)
assert.Nil(t, err)
}
func Test_ConvertChannelName(t *testing.T) {
const (
chanName = "by-dev_rootcoord-dml_123v0"
deltaChanName = "by-dev_rootcoord-delta_123v0"
tFrom = "rootcoord-dml"
tTo = "rootcoord-delta"
)
_, err := ConvertChannelName("by-dev", tFrom, tTo)
assert.NotNil(t, err)
_, err = ConvertChannelName("by-dev_rootcoord-delta_123v0", tFrom, tTo)
assert.NotNil(t, err)
str, err := ConvertChannelName(chanName, tFrom, tTo)
assert.Nil(t, err)
assert.Equal(t, deltaChanName, str)
}