Apply for msgstream from pool when creating collection (#7377)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/7388/head
Cai Yudong 2021-08-31 18:35:58 +08:00 committed by GitHub
parent 08a31010e9
commit 3b9609692b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 271 additions and 130 deletions

View File

@ -14,6 +14,7 @@ msgChannel:
chanNamePrefix: chanNamePrefix:
rootCoordTimeTick: "rootcoord-timetick" rootCoordTimeTick: "rootcoord-timetick"
rootCoordStatistics: "rootcoord-statistics" rootCoordStatistics: "rootcoord-statistics"
rootCoordDml: "rootcoord-dml"
search: "search" search: "search"
searchResult: "searchResult" searchResult: "searchResult"
proxyTimeTick: "proxyTimeTick" proxyTimeTick: "proxyTimeTick"

View File

@ -10,6 +10,7 @@
# or implied. See the License for the specific language governing permissions and limitations under the License. # or implied. See the License for the specific language governing permissions and limitations under the License.
rootcoord: rootcoord:
dmlChannelNum: 64
maxPartitionNum: 4096 maxPartitionNum: 4096
minSegmentSizeToEnableIndex: 1024 minSegmentSizeToEnableIndex: 1024
timeout: 3600 # time out, 5 seconds timeout: 3600 # time out, 5 seconds

View File

@ -19,7 +19,6 @@ import (
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/flowgraph"
@ -148,11 +147,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return nil return nil
} }
pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName())
var dmStreamNode Node = newDmInputNode( var dmStreamNode Node = newDmInputNode(
dsService.ctx, dsService.ctx,
dsService.msFactory, dsService.msFactory,
pchan, vchanInfo.CollectionID,
vchanInfo.GetChannelName(),
vchanInfo.GetSeekPosition(), vchanInfo.GetSeekPosition(),
) )
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)

View File

@ -92,15 +92,22 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
} }
case commonpb.MsgType_Insert: case commonpb.MsgType_Insert:
log.Debug("DDNode with insert messages") log.Debug("DDNode with insert messages")
imsg := msg.(*msgstream.InsertMsg)
if msg.EndTs() < FilterThreshold { if msg.EndTs() < FilterThreshold {
log.Info("Filtering Insert Messages", log.Info("Filtering Insert Messages",
zap.Uint64("Message endts", msg.EndTs()), zap.Uint64("Message endts", msg.EndTs()),
zap.Uint64("FilterThreshold", FilterThreshold), zap.Uint64("FilterThreshold", FilterThreshold),
) )
if ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) { if ddn.filterFlushedSegmentInsertMessages(imsg) {
continue continue
} }
} }
if imsg.CollectionID != ddn.collectionID {
//log.Debug("filter invalid InsertMsg, collection mis-match",
// zap.Int64("msg collID", imsg.CollectionID),
// zap.Int64("ddn collID", ddn.collectionID))
continue
}
iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg)) iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
} }
} }

View File

@ -13,19 +13,24 @@ package datanode
import ( import (
"context" "context"
"strconv"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/flowgraph"
) )
func newDmInputNode(ctx context.Context, factory msgstream.Factory, pchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode { func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode {
maxQueueLength := Params.FlowGraphMaxQueueLength maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism maxParallelism := Params.FlowGraphMaxParallelism
consumeSubName := Params.MsgChannelSubName
// subName should be unique, since pchannelName is shared among several collections
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
insertStream, _ := factory.NewTtMsgStream(ctx) insertStream, _ := factory.NewTtMsgStream(ctx)
pchannelName := rootcoord.ToPhysicalChannel(chanName)
insertStream.AsConsumer([]string{pchannelName}, consumeSubName) insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName) log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName)

View File

@ -184,9 +184,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(), err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
iMsg.startPositions[0], iMsg.endPositions[0]) iMsg.startPositions[0], iMsg.endPositions[0])
if err != nil { if err != nil {
log.Error("add segment wrong", zap.Error(err)) log.Error("add segment wrong",
zap.Int64("segID", currentSegID),
zap.Int64("collID", collID),
zap.Int64("partID", partitionID),
zap.String("chanName", msg.GetChannelID()),
zap.Error(err))
} }
} }
segNum := uniqueSeg[currentSegID] segNum := uniqueSeg[currentSegID]
@ -199,7 +203,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
err := ibNode.replica.updateStatistics(id, num) err := ibNode.replica.updateStatistics(id, num)
if err != nil { if err != nil {
log.Error("update Segment Row number wrong", zap.Error(err)) log.Error("update Segment Row number wrong", zap.Int64("segID", id), zap.Error(err))
} }
} }

View File

@ -110,9 +110,9 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
// check if the collection from message is target collection // check if the collection from message is target collection
if msg.CollectionID != fdmNode.collectionID { if msg.CollectionID != fdmNode.collectionID {
log.Debug("filter invalid insert message, collection is not the target collection", //log.Debug("filter invalid insert message, collection is not the target collection",
zap.Any("collectionID", msg.CollectionID), // zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID)) // zap.Any("partitionID", msg.PartitionID))
return nil return nil
} }

View File

@ -125,6 +125,7 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS
log.Debug("query node flow graph consumes from pChannel", log.Debug("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID), zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel), zap.Any("channel", channel),
zap.Any("subName", subName),
) )
return nil return nil
} }

View File

@ -15,86 +15,79 @@ import (
"fmt" "fmt"
"sync" "sync"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
"go.uber.org/zap"
) )
type dmlChannels struct { type dmlChannels struct {
core *Core core *Core
lock sync.RWMutex namePrefix string
dml map[string]msgstream.MsgStream capacity int64
refcnt sync.Map
idx *atomic.Int64
pool sync.Map
} }
func newDMLChannels(c *Core) *dmlChannels { func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels {
return &dmlChannels{ d := &dmlChannels{
core: c, core: c,
lock: sync.RWMutex{}, namePrefix: chanNamePrefix,
dml: make(map[string]msgstream.MsgStream), capacity: chanNum,
} refcnt: sync.Map{},
idx: atomic.NewInt64(0),
pool: sync.Map{},
} }
// GetNumChannels get current dml channel count var i int64
func (d *dmlChannels) GetNumChannels() int { for i = 0; i < chanNum; i++ {
d.lock.RLock() name := fmt.Sprintf("%s_%d", d.namePrefix, i)
defer d.lock.RUnlock() ms, err := c.msFactory.NewMsgStream(c.ctx)
return len(d.dml) if err != nil {
log.Error("add msgstream failed", zap.String("name", name), zap.Error(err))
panic("add msgstream failed")
}
ms.AsProducer([]string{name})
d.pool.Store(name, &ms)
}
log.Debug("init dml channels", zap.Int64("num", chanNum))
return d
}
func (d *dmlChannels) GetDmlMsgStreamName() string {
cnt := d.idx.Load()
name := fmt.Sprintf("%s_%d", d.namePrefix, cnt)
d.idx.Store((cnt + 1) % d.capacity)
return name
} }
// ListChannels lists all dml channel names // ListChannels lists all dml channel names
func (d *dmlChannels) ListChannels() []string { func (d *dmlChannels) ListChannels() []string {
d.lock.RLock() chanNames := make([]string, 0)
defer d.lock.RUnlock() d.refcnt.Range(
func(k, v interface{}) bool {
ret := make([]string, 0, len(d.dml)) chanNames = append(chanNames, k.(string))
for n := range d.dml { return true
ret = append(ret, n) })
} return chanNames
return ret
} }
// Produce produces msg pack into specified channel // GetNumChannels get current dml channel count
func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error { func (d *dmlChannels) GetNumChannels() int {
d.lock.Lock() return len(d.ListChannels())
defer d.lock.Unlock()
ds, ok := d.dml[name]
if !ok {
return fmt.Errorf("channel %s not exist", name)
}
if err := ds.Produce(pack); err != nil {
return err
}
return nil
} }
// Broadcast broadcasts msg pack into specified channel // Broadcast broadcasts msg pack into specified channel
func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error { func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error {
d.lock.Lock() for _, chanName := range chanNames {
defer d.lock.Unlock() // only in-use chanName exist in refcnt
if _, ok := d.refcnt.Load(chanName); !ok {
ds, ok := d.dml[name] return fmt.Errorf("channel %s not exist", chanName)
if !ok {
return fmt.Errorf("channel %s not exist", name)
} }
if err := ds.Broadcast(pack); err != nil { v, _ := d.pool.Load(chanName)
return err if err := (*(v.(*msgstream.MsgStream))).Broadcast(pack); err != nil {
}
return nil
}
// BroadcastAll invoke broadcast with provided msg pack in all channels specified
func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) error {
d.lock.Lock()
defer d.lock.Unlock()
for _, ch := range channels {
ds, ok := d.dml[ch]
if !ok {
return fmt.Errorf("channel %s not exist", ch)
}
if err := ds.Broadcast(pack); err != nil {
return err return err
} }
} }
@ -103,33 +96,35 @@ func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) e
// AddProducerChannels add named channels as producer // AddProducerChannels add named channels as producer
func (d *dmlChannels) AddProducerChannels(names ...string) { func (d *dmlChannels) AddProducerChannels(names ...string) {
d.lock.Lock()
defer d.lock.Unlock()
for _, name := range names { for _, name := range names {
log.Debug("add dml channel", zap.String("channel name", name)) if _, ok := d.pool.Load(name); !ok {
_, ok := d.dml[name] log.Error("invalid channel name", zap.String("chanName", name))
if !ok { panic("invalid channel name")
ms, err := d.core.msFactory.NewMsgStream(d.core.ctx) } else {
if err != nil { var cnt int64
log.Debug("add msgstream failed", zap.String("name", name), zap.Error(err)) if _, ok := d.refcnt.Load(name); !ok {
continue cnt = 1
} else {
v, _ := d.refcnt.Load(name)
cnt = v.(int64) + 1
} }
ms.AsProducer([]string{name}) d.refcnt.Store(name, cnt)
d.dml[name] = ms log.Debug("assign dml channel", zap.String("chanName", name), zap.Int64("refcnt", cnt))
} }
} }
} }
// RemoveProducerChannels removes specified channels // RemoveProducerChannels removes specified channels
func (d *dmlChannels) RemoveProducerChannels(names ...string) { func (d *dmlChannels) RemoveProducerChannels(names ...string) {
d.lock.Lock()
defer d.lock.Unlock()
for _, name := range names { for _, name := range names {
if ds, ok := d.dml[name]; ok { v, ok := d.refcnt.Load(name)
ds.Close() if ok {
delete(d.dml, name) cnt := v.(int64)
if cnt > 1 {
d.refcnt.Store(name, cnt-1)
} else {
d.refcnt.Delete(name)
}
} }
} }
} }

View File

@ -0,0 +1,78 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package rootcoord
import (
"context"
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/stretchr/testify/assert"
)
func TestDmlChannels(t *testing.T) {
const (
dmlChanPrefix = "rootcoord-dml"
totalDmlChannelNum = 2
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory := msgstream.NewPmsFactory()
Params.Init()
m := map[string]interface{}{
"pulsarAddress": Params.PulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err := factory.SetParams(m)
assert.Nil(t, err)
core, err := NewCore(ctx, factory)
assert.Nil(t, err)
dml := newDmlChannels(core, dmlChanPrefix, totalDmlChannelNum)
chanNames := dml.ListChannels()
assert.Equal(t, 0, len(chanNames))
randStr := funcutil.RandomString(8)
assert.Panics(t, func() { dml.AddProducerChannels(randStr) })
err = dml.Broadcast([]string{randStr}, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("channel %s not exist", randStr))
// dml_xxx_0 => {chanName0, chanName2}
// dml_xxx_1 => {chanName1}
chanName0 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName0)
assert.Equal(t, 1, dml.GetNumChannels())
chanName1 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName1)
assert.Equal(t, 2, dml.GetNumChannels())
chanName2 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName2)
assert.Equal(t, 2, dml.GetNumChannels())
dml.RemoveProducerChannels(chanName0)
assert.Equal(t, 2, dml.GetNumChannels())
dml.RemoveProducerChannels(chanName1)
assert.Equal(t, 1, dml.GetNumChannels())
dml.RemoveProducerChannels(chanName0)
assert.Equal(t, 0, dml.GetNumChannels())
}

View File

@ -174,6 +174,7 @@ func (mt *metaTable) reloadFromKV() error {
mt.indexID2Meta[meta.IndexID] = meta mt.indexID2Meta[meta.IndexID] = meta
} }
log.Debug("reload meta table from KV successfully")
return nil return nil
} }

View File

@ -39,7 +39,9 @@ type ParamTable struct {
MsgChannelSubName string MsgChannelSubName string
TimeTickChannel string TimeTickChannel string
StatisticsChannel string StatisticsChannel string
DmlChannelName string
DmlChannelNum int64
MaxPartitionNum int64 MaxPartitionNum int64
DefaultPartitionName string DefaultPartitionName string
DefaultIndexName string DefaultIndexName string
@ -71,7 +73,9 @@ func (p *ParamTable) Init() {
p.initMsgChannelSubName() p.initMsgChannelSubName()
p.initTimeTickChannel() p.initTimeTickChannel()
p.initStatisticsChannelName() p.initStatisticsChannelName()
p.initDmlChannelName()
p.initDmlChannelNum()
p.initMaxPartitionNum() p.initMaxPartitionNum()
p.initMinSegmentSizeToEnableIndex() p.initMinSegmentSizeToEnableIndex()
p.initDefaultPartitionName() p.initDefaultPartitionName()
@ -161,6 +165,18 @@ func (p *ParamTable) initStatisticsChannelName() {
p.StatisticsChannel = channel p.StatisticsChannel = channel
} }
func (p *ParamTable) initDmlChannelName() {
channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml")
if err != nil {
panic(err)
}
p.DmlChannelName = channel
}
func (p *ParamTable) initDmlChannelNum() {
p.DmlChannelNum = p.ParseInt64("rootcoord.dmlChannelNum")
}
func (p *ParamTable) initMaxPartitionNum() { func (p *ParamTable) initMaxPartitionNum() {
p.MaxPartitionNum = p.ParseInt64("rootcoord.maxPartitionNum") p.MaxPartitionNum = p.ParseInt64("rootcoord.maxPartitionNum")
} }

View File

@ -502,7 +502,7 @@ func (c *Core) setMsgStreams() error {
CreateCollectionRequest: *req, CreateCollectionRequest: *req,
} }
msgPack.Msgs = append(msgPack.Msgs, msg) msgPack.Msgs = append(msgPack.Msgs, msg)
return c.dmlChannels.BroadcastAll(channelNames, &msgPack) return c.dmlChannels.Broadcast(channelNames, &msgPack)
} }
c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error { c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error {
@ -518,7 +518,7 @@ func (c *Core) setMsgStreams() error {
DropCollectionRequest: *req, DropCollectionRequest: *req,
} }
msgPack.Msgs = append(msgPack.Msgs, msg) msgPack.Msgs = append(msgPack.Msgs, msg)
return c.dmlChannels.BroadcastAll(channelNames, &msgPack) return c.dmlChannels.Broadcast(channelNames, &msgPack)
} }
c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error { c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error {
@ -534,7 +534,7 @@ func (c *Core) setMsgStreams() error {
CreatePartitionRequest: *req, CreatePartitionRequest: *req,
} }
msgPack.Msgs = append(msgPack.Msgs, msg) msgPack.Msgs = append(msgPack.Msgs, msg)
return c.dmlChannels.BroadcastAll(channelNames, &msgPack) return c.dmlChannels.Broadcast(channelNames, &msgPack)
} }
c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error { c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error {
@ -550,7 +550,7 @@ func (c *Core) setMsgStreams() error {
DropPartitionRequest: *req, DropPartitionRequest: *req,
} }
msgPack.Msgs = append(msgPack.Msgs, msg) msgPack.Msgs = append(msgPack.Msgs, msg)
return c.dmlChannels.BroadcastAll(channelNames, &msgPack) return c.dmlChannels.Broadcast(channelNames, &msgPack)
} }
return nil return nil
@ -966,9 +966,12 @@ func (c *Core) Init() error {
return return
} }
c.dmlChannels = newDMLChannels(c) c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum)
// recover physical channels for all collections
pc := c.MetaTable.ListCollectionPhysicalChannels() pc := c.MetaTable.ListCollectionPhysicalChannels()
c.dmlChannels.AddProducerChannels(pc...) c.dmlChannels.AddProducerChannels(pc...)
log.Debug("recover all physical channels", zap.Any("chanNames", pc))
c.chanTimeTick = newTimeTickSync(c) c.chanTimeTick = newTimeTickSync(c)
c.chanTimeTick.AddProxy(c.session) c.chanTimeTick.AddProxy(c.session)

View File

@ -412,7 +412,7 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Equal(t, 2, len(core.dmlChannels.dml)) assert.Equal(t, 2, core.dmlChannels.GetNumChannels())
pChan := core.MetaTable.ListCollectionPhysicalChannels() pChan := core.MetaTable.ListCollectionPhysicalChannels()
dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName) dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName)
@ -1433,7 +1433,10 @@ func TestRootCoord(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
core.dmlChannels.AddProducerChannels("c0", "c1", "c2") cn0 := core.dmlChannels.GetDmlMsgStreamName()
cn1 := core.dmlChannels.GetDmlMsgStreamName()
cn2 := core.dmlChannels.GetDmlMsgStreamName()
core.dmlChannels.AddProducerChannels(cn0, cn1, cn2)
msg0 := &internalpb.ChannelTimeTickMsg{ msg0 := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{

View File

@ -136,7 +136,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
vchanNames := make([]string, t.Req.ShardsNum) vchanNames := make([]string, t.Req.ShardsNum)
chanNames := make([]string, t.Req.ShardsNum) chanNames := make([]string, t.Req.ShardsNum)
for i := int32(0); i < t.Req.ShardsNum; i++ { for i := int32(0); i < t.Req.ShardsNum; i++ {
vchanNames[i] = fmt.Sprintf("%s_%d_%d_v", t.Req.CollectionName, collID, i) vchanNames[i] = fmt.Sprintf("%s_%dv", t.core.dmlChannels.GetDmlMsgStreamName(), collID)
chanNames[i] = ToPhysicalChannel(vchanNames[i]) chanNames[i] = ToPhysicalChannel(vchanNames[i])
} }
@ -286,10 +286,8 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason) t.core.SendTimeTick(ts, reason)
for _, chanName := range collMeta.PhysicalChannelNames {
// send tt into deleted channels to tell data_node to clear flowgragh // send tt into deleted channels to tell data_node to clear flowgragh
t.core.chanTimeTick.SendChannelTimeTick(chanName, ts) t.core.chanTimeTick.SendTimeTickToChannel(collMeta.PhysicalChannelNames, ts)
}
// remove dml channel after send dd msg // remove dml channel after send dd msg
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)

View File

@ -22,16 +22,12 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap" "go.uber.org/zap"
) )
type ddlTimetickInfo struct {
ddlMinTs typeutil.Timestamp
ddlTsSet map[typeutil.Timestamp]struct{}
}
type timetickSync struct { type timetickSync struct {
core *Core core *Core
lock sync.Mutex lock sync.Mutex
@ -191,7 +187,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in) t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in)
//log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID), //log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID),
// zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) // zap.Any("Ts", in.Timestamps), zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason))
t.sendToChannel() t.sendToChannel()
return nil return nil
@ -227,32 +223,51 @@ func (t *timetickSync) StartWatch() {
case <-t.core.ctx.Done(): case <-t.core.ctx.Done():
log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err())) log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err()))
return return
case ptt, ok := <-t.sendChan: case proxyTimetick, ok := <-t.sendChan:
if !ok { if !ok {
log.Debug("timetickSync sendChan closed") log.Debug("timetickSync sendChan closed")
return return
} }
// reduce each channel to get min timestamp // reduce each channel to get min timestamp
mtt := ptt[t.core.session.ServerID] local := proxyTimetick[t.core.session.ServerID]
for _, chanName := range mtt.in.ChannelNames { if len(local.in.ChannelNames) == 0 {
mints := mtt.getTimetick(chanName) continue
for _, tt := range ptt { }
hdr := fmt.Sprintf("send ts to %d channels", len(local.in.ChannelNames))
tr := timerecord.NewTimeRecorder(hdr)
wg := sync.WaitGroup{}
for _, chanName := range local.in.ChannelNames {
wg.Add(1)
go func(chanName string) {
mints := local.getTimetick(chanName)
for _, tt := range proxyTimetick {
ts := tt.getTimetick(chanName) ts := tt.getTimetick(chanName)
if ts < mints { if ts < mints {
mints = ts mints = ts
} }
} }
if err := t.SendChannelTimeTick(chanName, mints); err != nil { if err := t.SendTimeTickToChannel([]string{chanName}, mints); err != nil {
log.Debug("SendChannelTimeTick fail", zap.Error(err)) log.Debug("SendTimeTickToChannel fail", zap.Error(err))
} }
wg.Done()
}(chanName)
}
wg.Wait()
span := tr.ElapseSpan()
// rootcoord send tt msg to all channels every 200ms by default
if span.Milliseconds() > 200 {
log.Warn("rootcoord send tt to all channels too slowly",
zap.Int("chanNum", len(local.in.ChannelNames)),
zap.Int64("span", span.Milliseconds()))
} }
} }
} }
} }
// SendChannelTimeTick send each channel's min timetick to msg stream // SendTimeTickToChannel send each channel's min timetick to msg stream
func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error { func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error {
msgPack := msgstream.MsgPack{} msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{ baseMsg := msgstream.BaseMsg{
BeginTimestamp: ts, BeginTimestamp: ts,
@ -273,11 +288,14 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam
} }
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
err := t.core.dmlChannels.Broadcast(chanName, &msgPack) if err := t.core.dmlChannels.Broadcast(chanNames, &msgPack); err != nil {
if err == nil { return err
}
for _, chanName := range chanNames {
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts))) metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts)))
} }
return err return nil
} }
// GetProxyNum return the num of detected proxy node // GetProxyNum return the num of detected proxy node

View File

@ -33,19 +33,30 @@ func NewTimeRecorder(header string) *TimeRecorder {
} }
} }
// Record calculates the time span from previous Record call func (tr *TimeRecorder) RecordSpan() time.Duration {
func (tr *TimeRecorder) Record(msg string) time.Duration {
curr := time.Now() curr := time.Now()
span := curr.Sub(tr.last) span := curr.Sub(tr.last)
tr.last = curr tr.last = curr
return span
}
func (tr *TimeRecorder) ElapseSpan() time.Duration {
curr := time.Now()
span := curr.Sub(tr.start)
tr.last = curr
return span
}
// Record calculates the time span from previous Record call
func (tr *TimeRecorder) Record(msg string) time.Duration {
span := tr.RecordSpan()
tr.printTimeRecord(msg, span) tr.printTimeRecord(msg, span)
return span return span
} }
// Elapse calculates the time span from the beginning of this TimeRecorder // Elapse calculates the time span from the beginning of this TimeRecorder
func (tr *TimeRecorder) Elapse(msg string) time.Duration { func (tr *TimeRecorder) Elapse(msg string) time.Duration {
curr := time.Now() span := tr.ElapseSpan()
span := curr.Sub(tr.start)
tr.printTimeRecord(msg, span) tr.printTimeRecord(msg, span)
return span return span
} }