diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index 2ce4e5de02..ee741eca99 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -14,6 +14,7 @@ msgChannel: chanNamePrefix: rootCoordTimeTick: "rootcoord-timetick" rootCoordStatistics: "rootcoord-statistics" + rootCoordDml: "rootcoord-dml" search: "search" searchResult: "searchResult" proxyTimeTick: "proxyTimeTick" diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 011828c64a..6ab388c98f 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -20,7 +20,6 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/flowgraph" @@ -160,11 +159,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro dsService.saveBinlog = saveBinlog - pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName()) var dmStreamNode Node = newDmInputNode( dsService.ctx, dsService.msFactory, - pchan, + vchanInfo.CollectionID, + vchanInfo.GetChannelName(), vchanInfo.GetSeekPosition(), ) var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 49161c7fd0..52df9ead7a 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -92,7 +92,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { zap.Uint64("Message endts", msg.EndTs()), zap.Uint64("FilterThreshold", FilterThreshold), ) - if ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) { + if ddn.filterFlushedSegmentInsertMessages(imsg) { continue } } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index ad7d4b8103..0258db958e 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -13,19 +13,24 @@ package datanode import ( "context" + "strconv" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util/flowgraph" ) -func newDmInputNode(ctx context.Context, factory msgstream.Factory, pchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode { +func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - consumeSubName := Params.MsgChannelSubName + + // subName should be unique, since pchannelName is shared among several collections + consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10) insertStream, _ := factory.NewTtMsgStream(ctx) + pchannelName := rootcoord.ToPhysicalChannel(chanName) insertStream.AsConsumer([]string{pchannelName}, consumeSubName) log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName) diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index 25f598ddb5..3935b15b30 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -84,5 +84,5 @@ func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error { func TestNewDmInputNode(t *testing.T) { ctx := context.Background() - newDmInputNode(ctx, &mockMsgStreamFactory{}, "abc_adc", new(internalpb.MsgPosition)) + newDmInputNode(ctx, &mockMsgStreamFactory{}, 0, "abc_adc", new(internalpb.MsgPosition)) } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 9ae116b859..6466303c1d 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -355,7 +355,12 @@ func (ibNode *insertBufferNode) updateSegStatesInReplica(insertMsgs []*msgstream err = ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(), startPos, endPos) if err != nil { - log.Warn("add segment wrong", zap.Int64("Seg ID", currentSegID), zap.Error(err)) + log.Error("add segment wrong", + zap.Int64("segID", currentSegID), + zap.Int64("collID", collID), + zap.Int64("partID", partitionID), + zap.String("chanName", msg.GetChannelID()), + zap.Error(err)) return } } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 9886628758..ba4cd25c67 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -110,9 +110,9 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg // check if the collection from message is target collection if msg.CollectionID != fdmNode.collectionID { - log.Debug("filter invalid insert message, collection is not the target collection", - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) + //log.Debug("filter invalid insert message, collection is not the target collection", + // zap.Any("collectionID", msg.CollectionID), + // zap.Any("partitionID", msg.PartitionID)) return nil } diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 1592238d3c..f8d2a84b9f 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -125,6 +125,7 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS log.Debug("query node flow graph consumes from pChannel", zap.Any("collectionID", q.collectionID), zap.Any("channel", channel), + zap.Any("subName", subName), ) return nil } diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index e376ffcba7..d76457e5d3 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -15,87 +15,81 @@ import ( "fmt" "sync" + "go.uber.org/atomic" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "go.uber.org/zap" ) type dmlChannels struct { - core *Core - lock sync.RWMutex - dml map[string]msgstream.MsgStream + core *Core + namePrefix string + capacity int64 + refcnt sync.Map + idx *atomic.Int64 + pool sync.Map } -func newDMLChannels(c *Core) *dmlChannels { - return &dmlChannels{ - core: c, - lock: sync.RWMutex{}, - dml: make(map[string]msgstream.MsgStream), +func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels { + d := &dmlChannels{ + core: c, + namePrefix: chanNamePrefix, + capacity: chanNum, + refcnt: sync.Map{}, + idx: atomic.NewInt64(0), + pool: sync.Map{}, } + + var i int64 + for i = 0; i < chanNum; i++ { + name := fmt.Sprintf("%s_%d", d.namePrefix, i) + ms, err := c.msFactory.NewMsgStream(c.ctx) + if err != nil { + log.Error("add msgstream failed", zap.String("name", name), zap.Error(err)) + panic("add msgstream failed") + } + ms.AsProducer([]string{name}) + d.pool.Store(name, &ms) + } + log.Debug("init dml channels", zap.Int64("num", chanNum)) + return d } -// GetNumChannels get current dml channel count -func (d *dmlChannels) GetNumChannels() int { - d.lock.RLock() - defer d.lock.RUnlock() - return len(d.dml) +func (d *dmlChannels) GetDmlMsgStreamName() string { + cnt := d.idx.Load() + name := fmt.Sprintf("%s_%d", d.namePrefix, cnt) + d.idx.Store((cnt + 1) % d.capacity) + return name } // ListChannels lists all dml channel names func (d *dmlChannels) ListChannels() []string { - d.lock.RLock() - defer d.lock.RUnlock() - - ret := make([]string, 0, len(d.dml)) - for n := range d.dml { - ret = append(ret, n) - } - return ret - + chanNames := make([]string, 0) + d.refcnt.Range( + func(k, v interface{}) bool { + chanNames = append(chanNames, k.(string)) + return true + }) + return chanNames } -// Produce produces msg pack into specified channel -func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error { - d.lock.Lock() - defer d.lock.Unlock() - - ds, ok := d.dml[name] - if !ok { - return fmt.Errorf("channel %s not exist", name) - } - if err := ds.Produce(pack); err != nil { - return err - } - return nil +// GetNumChannels get current dml channel count +func (d *dmlChannels) GetNumChannels() int { + return len(d.ListChannels()) } // Broadcast broadcasts msg pack into specified channel -func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error { - d.lock.Lock() - defer d.lock.Unlock() - - ds, ok := d.dml[name] - if !ok { - return fmt.Errorf("channel %s not exist", name) - } - if err := ds.Broadcast(pack); err != nil { - return err - } - return nil -} - -// BroadcastAll invoke broadcast with provided msg pack in all channels specified -func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) error { - d.lock.Lock() - defer d.lock.Unlock() - - for _, ch := range channels { - ds, ok := d.dml[ch] - if !ok { - return fmt.Errorf("channel %s not exist", ch) - } - if err := ds.Broadcast(pack); err != nil { - return err +func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error { + for _, chanName := range chanNames { + // only in-use chanName exist in refcnt + if _, ok := d.refcnt.Load(chanName); ok { + v, _ := d.pool.Load(chanName) + if err := (*(v.(*msgstream.MsgStream))).Broadcast(pack); err != nil { + return err + } + } else { + return fmt.Errorf("channel %s not exist", chanName) } } return nil @@ -103,33 +97,34 @@ func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) e // AddProducerChannels add named channels as producer func (d *dmlChannels) AddProducerChannels(names ...string) { - d.lock.Lock() - defer d.lock.Unlock() - for _, name := range names { - log.Debug("add dml channel", zap.String("channel name", name)) - _, ok := d.dml[name] - if !ok { - ms, err := d.core.msFactory.NewMsgStream(d.core.ctx) - if err != nil { - log.Debug("add msgstream failed", zap.String("name", name), zap.Error(err)) - continue + if _, ok := d.pool.Load(name); ok { + var cnt int64 + if _, ok := d.refcnt.Load(name); !ok { + cnt = 1 + } else { + v, _ := d.refcnt.Load(name) + cnt = v.(int64) + 1 } - ms.AsProducer([]string{name}) - d.dml[name] = ms + d.refcnt.Store(name, cnt) + log.Debug("assign dml channel", zap.String("chanName", name), zap.Int64("refcnt", cnt)) + } else { + log.Error("invalid channel name", zap.String("chanName", name)) + panic("invalid channel name: " + name) } } } // RemoveProducerChannels removes specified channels func (d *dmlChannels) RemoveProducerChannels(names ...string) { - d.lock.Lock() - defer d.lock.Unlock() - for _, name := range names { - if ds, ok := d.dml[name]; ok { - ds.Close() - delete(d.dml, name) + if v, ok := d.refcnt.Load(name); ok { + cnt := v.(int64) + if cnt > 1 { + d.refcnt.Store(name, cnt-1) + } else { + d.refcnt.Delete(name) + } } } } diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go new file mode 100644 index 0000000000..81bbcb52fa --- /dev/null +++ b/internal/rootcoord/dml_channels_test.go @@ -0,0 +1,78 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package rootcoord + +import ( + "context" + "fmt" + "testing" + + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/stretchr/testify/assert" +) + +func TestDmlChannels(t *testing.T) { + const ( + dmlChanPrefix = "rootcoord-dml" + totalDmlChannelNum = 2 + ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + factory := msgstream.NewPmsFactory() + Params.Init() + + m := map[string]interface{}{ + "pulsarAddress": Params.PulsarAddress, + "receiveBufSize": 1024, + "pulsarBufSize": 1024} + err := factory.SetParams(m) + assert.Nil(t, err) + + core, err := NewCore(ctx, factory) + assert.Nil(t, err) + + dml := newDmlChannels(core, dmlChanPrefix, totalDmlChannelNum) + chanNames := dml.ListChannels() + assert.Equal(t, 0, len(chanNames)) + + randStr := funcutil.RandomString(8) + assert.Panics(t, func() { dml.AddProducerChannels(randStr) }) + + err = dml.Broadcast([]string{randStr}, nil) + assert.NotNil(t, err) + assert.EqualError(t, err, fmt.Sprintf("channel %s not exist", randStr)) + + // dml_xxx_0 => {chanName0, chanName2} + // dml_xxx_1 => {chanName1} + chanName0 := dml.GetDmlMsgStreamName() + dml.AddProducerChannels(chanName0) + assert.Equal(t, 1, dml.GetNumChannels()) + + chanName1 := dml.GetDmlMsgStreamName() + dml.AddProducerChannels(chanName1) + assert.Equal(t, 2, dml.GetNumChannels()) + + chanName2 := dml.GetDmlMsgStreamName() + dml.AddProducerChannels(chanName2) + assert.Equal(t, 2, dml.GetNumChannels()) + + dml.RemoveProducerChannels(chanName0) + assert.Equal(t, 2, dml.GetNumChannels()) + + dml.RemoveProducerChannels(chanName1) + assert.Equal(t, 1, dml.GetNumChannels()) + + dml.RemoveProducerChannels(chanName0) + assert.Equal(t, 0, dml.GetNumChannels()) +} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 4fc40d3f3e..ad44993a71 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -174,6 +174,7 @@ func (mt *metaTable) reloadFromKV() error { mt.indexID2Meta[meta.IndexID] = meta } + log.Debug("reload meta table from KV successfully") return nil } diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index b2736aa8a0..3ba696b8b1 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -73,7 +73,9 @@ func (p *ParamTable) Init() { p.initMsgChannelSubName() p.initTimeTickChannel() p.initStatisticsChannelName() + p.initDmlChannelName() + p.initDmlChannelNum() p.initMaxPartitionNum() p.initMinSegmentSizeToEnableIndex() p.initDefaultPartitionName() @@ -151,6 +153,18 @@ func (p *ParamTable) initStatisticsChannelName() { p.StatisticsChannel = channel } +func (p *ParamTable) initDmlChannelName() { + channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml") + if err != nil { + panic(err) + } + p.DmlChannelName = channel +} + +func (p *ParamTable) initDmlChannelNum() { + p.DmlChannelNum = p.ParseInt64("rootcoord.dmlChannelNum") +} + func (p *ParamTable) initMaxPartitionNum() { p.MaxPartitionNum = p.ParseInt64("rootcoord.maxPartitionNum") } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 922a90ff9e..517c5da7a1 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -514,7 +514,7 @@ func (c *Core) setMsgStreams() error { CreateCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.BroadcastAll(channelNames, &msgPack) + return c.dmlChannels.Broadcast(channelNames, &msgPack) } c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error { @@ -530,7 +530,7 @@ func (c *Core) setMsgStreams() error { DropCollectionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.BroadcastAll(channelNames, &msgPack) + return c.dmlChannels.Broadcast(channelNames, &msgPack) } c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error { @@ -546,7 +546,7 @@ func (c *Core) setMsgStreams() error { CreatePartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.BroadcastAll(channelNames, &msgPack) + return c.dmlChannels.Broadcast(channelNames, &msgPack) } c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error { @@ -562,7 +562,7 @@ func (c *Core) setMsgStreams() error { DropPartitionRequest: *req, } msgPack.Msgs = append(msgPack.Msgs, msg) - return c.dmlChannels.BroadcastAll(channelNames, &msgPack) + return c.dmlChannels.Broadcast(channelNames, &msgPack) } return nil @@ -941,9 +941,12 @@ func (c *Core) Init() error { return } - c.dmlChannels = newDMLChannels(c) + c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum) + + // recover physical channels for all collections pc := c.MetaTable.ListCollectionPhysicalChannels() c.dmlChannels.AddProducerChannels(pc...) + log.Debug("recover all physical channels", zap.Any("chanNames", pc)) c.chanTimeTick = newTimeTickSync(c) c.chanTimeTick.AddProxy(c.session) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index ad70687574..47175cba84 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -320,7 +320,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32 vchanNames := make([]string, t.ShardsNum) chanNames := make([]string, t.ShardsNum) for i := int32(0); i < t.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%d_%d_v%d", collName, collID, i, i) + vchanNames[i] = fmt.Sprintf("%s_%dv%d", core.dmlChannels.GetDmlMsgStreamName(), collID, i) chanNames[i] = ToPhysicalChannel(vchanNames[i]) } @@ -1721,7 +1721,10 @@ func TestRootCoord(t *testing.T) { assert.Nil(t, err) time.Sleep(100 * time.Millisecond) - core.dmlChannels.AddProducerChannels("c0", "c1", "c2") + cn0 := core.dmlChannels.GetDmlMsgStreamName() + cn1 := core.dmlChannels.GetDmlMsgStreamName() + cn2 := core.dmlChannels.GetDmlMsgStreamName() + core.dmlChannels.AddProducerChannels(cn0, cn1, cn2) msg0 := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index eabfb549d6..9aaa6cb177 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -133,7 +133,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { vchanNames := make([]string, t.Req.ShardsNum) chanNames := make([]string, t.Req.ShardsNum) for i := int32(0); i < t.Req.ShardsNum; i++ { - vchanNames[i] = fmt.Sprintf("%s_%d_%d_v%d", t.Req.CollectionName, collID, i, i) + vchanNames[i] = fmt.Sprintf("%s_%dv%d", t.core.dmlChannels.GetDmlMsgStreamName(), collID, i) chanNames[i] = ToPhysicalChannel(vchanNames[i]) } @@ -284,10 +284,8 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) t.core.SendTimeTick(ts, reason) - for _, chanName := range collMeta.PhysicalChannelNames { - // send tt into deleted channels to tell data_node to clear flowgragh - t.core.chanTimeTick.SendChannelTimeTick(chanName, ts) - } + // send tt into deleted channels to tell data_node to clear flowgragh + t.core.chanTimeTick.SendTimeTickToChannel(collMeta.PhysicalChannelNames, ts) // remove dml channel after send dd msg t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...) diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 8083737cff..bc021775f7 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -22,16 +22,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" ) -type ddlTimetickInfo struct { - ddlMinTs typeutil.Timestamp - ddlTsSet map[typeutil.Timestamp]struct{} -} - type timetickSync struct { core *Core lock sync.Mutex @@ -191,7 +187,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in) //log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID), - // zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) + // zap.Any("Ts", in.Timestamps), zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason)) t.sendToChannel() return nil @@ -226,34 +222,53 @@ func (t *timetickSync) StartWatch(wg *sync.WaitGroup) { for { select { case <-t.core.ctx.Done(): - log.Debug("root coord context done", zap.Error(t.core.ctx.Err())) + log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err())) return - case ptt, ok := <-t.sendChan: + case proxyTimetick, ok := <-t.sendChan: if !ok { log.Debug("timetickSync sendChan closed") return } // reduce each channel to get min timestamp - mtt := ptt[t.core.session.ServerID] - for _, chanName := range mtt.in.ChannelNames { - mints := mtt.getTimetick(chanName) - for _, tt := range ptt { - ts := tt.getTimetick(chanName) - if ts < mints { - mints = ts + local := proxyTimetick[t.core.session.ServerID] + if len(local.in.ChannelNames) == 0 { + continue + } + + hdr := fmt.Sprintf("send ts to %d channels", len(local.in.ChannelNames)) + tr := timerecord.NewTimeRecorder(hdr) + wg := sync.WaitGroup{} + for _, chanName := range local.in.ChannelNames { + wg.Add(1) + go func(chanName string) { + mints := local.getTimetick(chanName) + for _, tt := range proxyTimetick { + ts := tt.getTimetick(chanName) + if ts < mints { + mints = ts + } } - } - if err := t.SendChannelTimeTick(chanName, mints); err != nil { - log.Debug("SendChannelTimeTick fail", zap.Error(err)) - } + if err := t.SendTimeTickToChannel([]string{chanName}, mints); err != nil { + log.Debug("SendTimeTickToChannel fail", zap.Error(err)) + } + wg.Done() + }(chanName) + } + wg.Wait() + span := tr.ElapseSpan() + // rootcoord send tt msg to all channels every 200ms by default + if span.Milliseconds() > 200 { + log.Warn("rootcoord send tt to all channels too slowly", + zap.Int("chanNum", len(local.in.ChannelNames)), + zap.Int64("span", span.Milliseconds())) } } } } -// SendChannelTimeTick send each channel's min timetick to msg stream -func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error { +// SendTimeTickToChannel send each channel's min timetick to msg stream +func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error { msgPack := msgstream.MsgPack{} baseMsg := msgstream.BaseMsg{ BeginTimestamp: ts, @@ -274,11 +289,14 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam } msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - err := t.core.dmlChannels.Broadcast(chanName, &msgPack) - if err == nil { + if err := t.core.dmlChannels.Broadcast(chanNames, &msgPack); err != nil { + return err + } + + for _, chanName := range chanNames { metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts))) } - return err + return nil } // GetProxyNum return the num of detected proxy node diff --git a/internal/util/timerecord/time_recorder.go b/internal/util/timerecord/time_recorder.go index e22d21ae3b..69253b322c 100644 --- a/internal/util/timerecord/time_recorder.go +++ b/internal/util/timerecord/time_recorder.go @@ -33,19 +33,30 @@ func NewTimeRecorder(header string) *TimeRecorder { } } -// Record calculates the time span from previous Record call -func (tr *TimeRecorder) Record(msg string) time.Duration { +func (tr *TimeRecorder) RecordSpan() time.Duration { curr := time.Now() span := curr.Sub(tr.last) tr.last = curr + return span +} + +func (tr *TimeRecorder) ElapseSpan() time.Duration { + curr := time.Now() + span := curr.Sub(tr.start) + tr.last = curr + return span +} + +// Record calculates the time span from previous Record call +func (tr *TimeRecorder) Record(msg string) time.Duration { + span := tr.RecordSpan() tr.printTimeRecord(msg, span) return span } // Elapse calculates the time span from the beginning of this TimeRecorder func (tr *TimeRecorder) Elapse(msg string) time.Duration { - curr := time.Now() - span := curr.Sub(tr.start) + span := tr.ElapseSpan() tr.printTimeRecord(msg, span) return span }