mirror of https://github.com/milvus-io/milvus.git
Apply for msgstream from pool when creating collection (#7377)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/7388/head
parent
08a31010e9
commit
3b9609692b
|
@ -14,6 +14,7 @@ msgChannel:
|
||||||
chanNamePrefix:
|
chanNamePrefix:
|
||||||
rootCoordTimeTick: "rootcoord-timetick"
|
rootCoordTimeTick: "rootcoord-timetick"
|
||||||
rootCoordStatistics: "rootcoord-statistics"
|
rootCoordStatistics: "rootcoord-statistics"
|
||||||
|
rootCoordDml: "rootcoord-dml"
|
||||||
search: "search"
|
search: "search"
|
||||||
searchResult: "searchResult"
|
searchResult: "searchResult"
|
||||||
proxyTimeTick: "proxyTimeTick"
|
proxyTimeTick: "proxyTimeTick"
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
rootcoord:
|
rootcoord:
|
||||||
|
dmlChannelNum: 64
|
||||||
maxPartitionNum: 4096
|
maxPartitionNum: 4096
|
||||||
minSegmentSizeToEnableIndex: 1024
|
minSegmentSizeToEnableIndex: 1024
|
||||||
timeout: 3600 # time out, 5 seconds
|
timeout: 3600 # time out, 5 seconds
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/msgstream"
|
"github.com/milvus-io/milvus/internal/msgstream"
|
||||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/rootcoord"
|
|
||||||
"github.com/milvus-io/milvus/internal/types"
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||||
|
|
||||||
|
@ -148,11 +147,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName())
|
|
||||||
var dmStreamNode Node = newDmInputNode(
|
var dmStreamNode Node = newDmInputNode(
|
||||||
dsService.ctx,
|
dsService.ctx,
|
||||||
dsService.msFactory,
|
dsService.msFactory,
|
||||||
pchan,
|
vchanInfo.CollectionID,
|
||||||
|
vchanInfo.GetChannelName(),
|
||||||
vchanInfo.GetSeekPosition(),
|
vchanInfo.GetSeekPosition(),
|
||||||
)
|
)
|
||||||
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
|
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
|
||||||
|
|
|
@ -92,15 +92,22 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||||
}
|
}
|
||||||
case commonpb.MsgType_Insert:
|
case commonpb.MsgType_Insert:
|
||||||
log.Debug("DDNode with insert messages")
|
log.Debug("DDNode with insert messages")
|
||||||
|
imsg := msg.(*msgstream.InsertMsg)
|
||||||
if msg.EndTs() < FilterThreshold {
|
if msg.EndTs() < FilterThreshold {
|
||||||
log.Info("Filtering Insert Messages",
|
log.Info("Filtering Insert Messages",
|
||||||
zap.Uint64("Message endts", msg.EndTs()),
|
zap.Uint64("Message endts", msg.EndTs()),
|
||||||
zap.Uint64("FilterThreshold", FilterThreshold),
|
zap.Uint64("FilterThreshold", FilterThreshold),
|
||||||
)
|
)
|
||||||
if ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) {
|
if ddn.filterFlushedSegmentInsertMessages(imsg) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if imsg.CollectionID != ddn.collectionID {
|
||||||
|
//log.Debug("filter invalid InsertMsg, collection mis-match",
|
||||||
|
// zap.Int64("msg collID", imsg.CollectionID),
|
||||||
|
// zap.Int64("ddn collID", ddn.collectionID))
|
||||||
|
continue
|
||||||
|
}
|
||||||
iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
|
iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,19 +13,24 @@ package datanode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/msgstream"
|
"github.com/milvus-io/milvus/internal/msgstream"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/rootcoord"
|
||||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newDmInputNode(ctx context.Context, factory msgstream.Factory, pchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode {
|
func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode {
|
||||||
maxQueueLength := Params.FlowGraphMaxQueueLength
|
maxQueueLength := Params.FlowGraphMaxQueueLength
|
||||||
maxParallelism := Params.FlowGraphMaxParallelism
|
maxParallelism := Params.FlowGraphMaxParallelism
|
||||||
consumeSubName := Params.MsgChannelSubName
|
|
||||||
|
// subName should be unique, since pchannelName is shared among several collections
|
||||||
|
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
|
||||||
insertStream, _ := factory.NewTtMsgStream(ctx)
|
insertStream, _ := factory.NewTtMsgStream(ctx)
|
||||||
|
|
||||||
|
pchannelName := rootcoord.ToPhysicalChannel(chanName)
|
||||||
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
|
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
|
||||||
log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName)
|
log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName)
|
||||||
|
|
||||||
|
|
|
@ -184,9 +184,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||||
err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
|
err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
|
||||||
iMsg.startPositions[0], iMsg.endPositions[0])
|
iMsg.startPositions[0], iMsg.endPositions[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("add segment wrong", zap.Error(err))
|
log.Error("add segment wrong",
|
||||||
|
zap.Int64("segID", currentSegID),
|
||||||
|
zap.Int64("collID", collID),
|
||||||
|
zap.Int64("partID", partitionID),
|
||||||
|
zap.String("chanName", msg.GetChannelID()),
|
||||||
|
zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
segNum := uniqueSeg[currentSegID]
|
segNum := uniqueSeg[currentSegID]
|
||||||
|
@ -199,7 +203,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
||||||
|
|
||||||
err := ibNode.replica.updateStatistics(id, num)
|
err := ibNode.replica.updateStatistics(id, num)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("update Segment Row number wrong", zap.Error(err))
|
log.Error("update Segment Row number wrong", zap.Int64("segID", id), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -110,9 +110,9 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
|
||||||
|
|
||||||
// check if the collection from message is target collection
|
// check if the collection from message is target collection
|
||||||
if msg.CollectionID != fdmNode.collectionID {
|
if msg.CollectionID != fdmNode.collectionID {
|
||||||
log.Debug("filter invalid insert message, collection is not the target collection",
|
//log.Debug("filter invalid insert message, collection is not the target collection",
|
||||||
zap.Any("collectionID", msg.CollectionID),
|
// zap.Any("collectionID", msg.CollectionID),
|
||||||
zap.Any("partitionID", msg.PartitionID))
|
// zap.Any("partitionID", msg.PartitionID))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -125,6 +125,7 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS
|
||||||
log.Debug("query node flow graph consumes from pChannel",
|
log.Debug("query node flow graph consumes from pChannel",
|
||||||
zap.Any("collectionID", q.collectionID),
|
zap.Any("collectionID", q.collectionID),
|
||||||
zap.Any("channel", channel),
|
zap.Any("channel", channel),
|
||||||
|
zap.Any("subName", subName),
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,86 +15,79 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/msgstream"
|
"github.com/milvus-io/milvus/internal/msgstream"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type dmlChannels struct {
|
type dmlChannels struct {
|
||||||
core *Core
|
core *Core
|
||||||
lock sync.RWMutex
|
namePrefix string
|
||||||
dml map[string]msgstream.MsgStream
|
capacity int64
|
||||||
|
refcnt sync.Map
|
||||||
|
idx *atomic.Int64
|
||||||
|
pool sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDMLChannels(c *Core) *dmlChannels {
|
func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels {
|
||||||
return &dmlChannels{
|
d := &dmlChannels{
|
||||||
core: c,
|
core: c,
|
||||||
lock: sync.RWMutex{},
|
namePrefix: chanNamePrefix,
|
||||||
dml: make(map[string]msgstream.MsgStream),
|
capacity: chanNum,
|
||||||
}
|
refcnt: sync.Map{},
|
||||||
|
idx: atomic.NewInt64(0),
|
||||||
|
pool: sync.Map{},
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNumChannels get current dml channel count
|
var i int64
|
||||||
func (d *dmlChannels) GetNumChannels() int {
|
for i = 0; i < chanNum; i++ {
|
||||||
d.lock.RLock()
|
name := fmt.Sprintf("%s_%d", d.namePrefix, i)
|
||||||
defer d.lock.RUnlock()
|
ms, err := c.msFactory.NewMsgStream(c.ctx)
|
||||||
return len(d.dml)
|
if err != nil {
|
||||||
|
log.Error("add msgstream failed", zap.String("name", name), zap.Error(err))
|
||||||
|
panic("add msgstream failed")
|
||||||
|
}
|
||||||
|
ms.AsProducer([]string{name})
|
||||||
|
d.pool.Store(name, &ms)
|
||||||
|
}
|
||||||
|
log.Debug("init dml channels", zap.Int64("num", chanNum))
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dmlChannels) GetDmlMsgStreamName() string {
|
||||||
|
cnt := d.idx.Load()
|
||||||
|
name := fmt.Sprintf("%s_%d", d.namePrefix, cnt)
|
||||||
|
d.idx.Store((cnt + 1) % d.capacity)
|
||||||
|
return name
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListChannels lists all dml channel names
|
// ListChannels lists all dml channel names
|
||||||
func (d *dmlChannels) ListChannels() []string {
|
func (d *dmlChannels) ListChannels() []string {
|
||||||
d.lock.RLock()
|
chanNames := make([]string, 0)
|
||||||
defer d.lock.RUnlock()
|
d.refcnt.Range(
|
||||||
|
func(k, v interface{}) bool {
|
||||||
ret := make([]string, 0, len(d.dml))
|
chanNames = append(chanNames, k.(string))
|
||||||
for n := range d.dml {
|
return true
|
||||||
ret = append(ret, n)
|
})
|
||||||
}
|
return chanNames
|
||||||
return ret
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Produce produces msg pack into specified channel
|
// GetNumChannels get current dml channel count
|
||||||
func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error {
|
func (d *dmlChannels) GetNumChannels() int {
|
||||||
d.lock.Lock()
|
return len(d.ListChannels())
|
||||||
defer d.lock.Unlock()
|
|
||||||
|
|
||||||
ds, ok := d.dml[name]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("channel %s not exist", name)
|
|
||||||
}
|
|
||||||
if err := ds.Produce(pack); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast broadcasts msg pack into specified channel
|
// Broadcast broadcasts msg pack into specified channel
|
||||||
func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error {
|
func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error {
|
||||||
d.lock.Lock()
|
for _, chanName := range chanNames {
|
||||||
defer d.lock.Unlock()
|
// only in-use chanName exist in refcnt
|
||||||
|
if _, ok := d.refcnt.Load(chanName); !ok {
|
||||||
ds, ok := d.dml[name]
|
return fmt.Errorf("channel %s not exist", chanName)
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("channel %s not exist", name)
|
|
||||||
}
|
}
|
||||||
if err := ds.Broadcast(pack); err != nil {
|
v, _ := d.pool.Load(chanName)
|
||||||
return err
|
if err := (*(v.(*msgstream.MsgStream))).Broadcast(pack); err != nil {
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// BroadcastAll invoke broadcast with provided msg pack in all channels specified
|
|
||||||
func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) error {
|
|
||||||
d.lock.Lock()
|
|
||||||
defer d.lock.Unlock()
|
|
||||||
|
|
||||||
for _, ch := range channels {
|
|
||||||
ds, ok := d.dml[ch]
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("channel %s not exist", ch)
|
|
||||||
}
|
|
||||||
if err := ds.Broadcast(pack); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -103,33 +96,35 @@ func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) e
|
||||||
|
|
||||||
// AddProducerChannels add named channels as producer
|
// AddProducerChannels add named channels as producer
|
||||||
func (d *dmlChannels) AddProducerChannels(names ...string) {
|
func (d *dmlChannels) AddProducerChannels(names ...string) {
|
||||||
d.lock.Lock()
|
|
||||||
defer d.lock.Unlock()
|
|
||||||
|
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
log.Debug("add dml channel", zap.String("channel name", name))
|
if _, ok := d.pool.Load(name); !ok {
|
||||||
_, ok := d.dml[name]
|
log.Error("invalid channel name", zap.String("chanName", name))
|
||||||
if !ok {
|
panic("invalid channel name")
|
||||||
ms, err := d.core.msFactory.NewMsgStream(d.core.ctx)
|
} else {
|
||||||
if err != nil {
|
var cnt int64
|
||||||
log.Debug("add msgstream failed", zap.String("name", name), zap.Error(err))
|
if _, ok := d.refcnt.Load(name); !ok {
|
||||||
continue
|
cnt = 1
|
||||||
|
} else {
|
||||||
|
v, _ := d.refcnt.Load(name)
|
||||||
|
cnt = v.(int64) + 1
|
||||||
}
|
}
|
||||||
ms.AsProducer([]string{name})
|
d.refcnt.Store(name, cnt)
|
||||||
d.dml[name] = ms
|
log.Debug("assign dml channel", zap.String("chanName", name), zap.Int64("refcnt", cnt))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveProducerChannels removes specified channels
|
// RemoveProducerChannels removes specified channels
|
||||||
func (d *dmlChannels) RemoveProducerChannels(names ...string) {
|
func (d *dmlChannels) RemoveProducerChannels(names ...string) {
|
||||||
d.lock.Lock()
|
|
||||||
defer d.lock.Unlock()
|
|
||||||
|
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
if ds, ok := d.dml[name]; ok {
|
v, ok := d.refcnt.Load(name)
|
||||||
ds.Close()
|
if ok {
|
||||||
delete(d.dml, name)
|
cnt := v.(int64)
|
||||||
|
if cnt > 1 {
|
||||||
|
d.refcnt.Store(name, cnt-1)
|
||||||
|
} else {
|
||||||
|
d.refcnt.Delete(name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||||
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
package rootcoord
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/msgstream"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDmlChannels(t *testing.T) {
|
||||||
|
const (
|
||||||
|
dmlChanPrefix = "rootcoord-dml"
|
||||||
|
totalDmlChannelNum = 2
|
||||||
|
)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
factory := msgstream.NewPmsFactory()
|
||||||
|
Params.Init()
|
||||||
|
|
||||||
|
m := map[string]interface{}{
|
||||||
|
"pulsarAddress": Params.PulsarAddress,
|
||||||
|
"receiveBufSize": 1024,
|
||||||
|
"pulsarBufSize": 1024}
|
||||||
|
err := factory.SetParams(m)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
core, err := NewCore(ctx, factory)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
dml := newDmlChannels(core, dmlChanPrefix, totalDmlChannelNum)
|
||||||
|
chanNames := dml.ListChannels()
|
||||||
|
assert.Equal(t, 0, len(chanNames))
|
||||||
|
|
||||||
|
randStr := funcutil.RandomString(8)
|
||||||
|
assert.Panics(t, func() { dml.AddProducerChannels(randStr) })
|
||||||
|
|
||||||
|
err = dml.Broadcast([]string{randStr}, nil)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.EqualError(t, err, fmt.Sprintf("channel %s not exist", randStr))
|
||||||
|
|
||||||
|
// dml_xxx_0 => {chanName0, chanName2}
|
||||||
|
// dml_xxx_1 => {chanName1}
|
||||||
|
chanName0 := dml.GetDmlMsgStreamName()
|
||||||
|
dml.AddProducerChannels(chanName0)
|
||||||
|
assert.Equal(t, 1, dml.GetNumChannels())
|
||||||
|
|
||||||
|
chanName1 := dml.GetDmlMsgStreamName()
|
||||||
|
dml.AddProducerChannels(chanName1)
|
||||||
|
assert.Equal(t, 2, dml.GetNumChannels())
|
||||||
|
|
||||||
|
chanName2 := dml.GetDmlMsgStreamName()
|
||||||
|
dml.AddProducerChannels(chanName2)
|
||||||
|
assert.Equal(t, 2, dml.GetNumChannels())
|
||||||
|
|
||||||
|
dml.RemoveProducerChannels(chanName0)
|
||||||
|
assert.Equal(t, 2, dml.GetNumChannels())
|
||||||
|
|
||||||
|
dml.RemoveProducerChannels(chanName1)
|
||||||
|
assert.Equal(t, 1, dml.GetNumChannels())
|
||||||
|
|
||||||
|
dml.RemoveProducerChannels(chanName0)
|
||||||
|
assert.Equal(t, 0, dml.GetNumChannels())
|
||||||
|
}
|
|
@ -174,6 +174,7 @@ func (mt *metaTable) reloadFromKV() error {
|
||||||
mt.indexID2Meta[meta.IndexID] = meta
|
mt.indexID2Meta[meta.IndexID] = meta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("reload meta table from KV successfully")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,9 @@ type ParamTable struct {
|
||||||
MsgChannelSubName string
|
MsgChannelSubName string
|
||||||
TimeTickChannel string
|
TimeTickChannel string
|
||||||
StatisticsChannel string
|
StatisticsChannel string
|
||||||
|
DmlChannelName string
|
||||||
|
|
||||||
|
DmlChannelNum int64
|
||||||
MaxPartitionNum int64
|
MaxPartitionNum int64
|
||||||
DefaultPartitionName string
|
DefaultPartitionName string
|
||||||
DefaultIndexName string
|
DefaultIndexName string
|
||||||
|
@ -71,7 +73,9 @@ func (p *ParamTable) Init() {
|
||||||
p.initMsgChannelSubName()
|
p.initMsgChannelSubName()
|
||||||
p.initTimeTickChannel()
|
p.initTimeTickChannel()
|
||||||
p.initStatisticsChannelName()
|
p.initStatisticsChannelName()
|
||||||
|
p.initDmlChannelName()
|
||||||
|
|
||||||
|
p.initDmlChannelNum()
|
||||||
p.initMaxPartitionNum()
|
p.initMaxPartitionNum()
|
||||||
p.initMinSegmentSizeToEnableIndex()
|
p.initMinSegmentSizeToEnableIndex()
|
||||||
p.initDefaultPartitionName()
|
p.initDefaultPartitionName()
|
||||||
|
@ -161,6 +165,18 @@ func (p *ParamTable) initStatisticsChannelName() {
|
||||||
p.StatisticsChannel = channel
|
p.StatisticsChannel = channel
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initDmlChannelName() {
|
||||||
|
channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
p.DmlChannelName = channel
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ParamTable) initDmlChannelNum() {
|
||||||
|
p.DmlChannelNum = p.ParseInt64("rootcoord.dmlChannelNum")
|
||||||
|
}
|
||||||
|
|
||||||
func (p *ParamTable) initMaxPartitionNum() {
|
func (p *ParamTable) initMaxPartitionNum() {
|
||||||
p.MaxPartitionNum = p.ParseInt64("rootcoord.maxPartitionNum")
|
p.MaxPartitionNum = p.ParseInt64("rootcoord.maxPartitionNum")
|
||||||
}
|
}
|
||||||
|
|
|
@ -502,7 +502,7 @@ func (c *Core) setMsgStreams() error {
|
||||||
CreateCollectionRequest: *req,
|
CreateCollectionRequest: *req,
|
||||||
}
|
}
|
||||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||||
return c.dmlChannels.BroadcastAll(channelNames, &msgPack)
|
return c.dmlChannels.Broadcast(channelNames, &msgPack)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error {
|
c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error {
|
||||||
|
@ -518,7 +518,7 @@ func (c *Core) setMsgStreams() error {
|
||||||
DropCollectionRequest: *req,
|
DropCollectionRequest: *req,
|
||||||
}
|
}
|
||||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||||
return c.dmlChannels.BroadcastAll(channelNames, &msgPack)
|
return c.dmlChannels.Broadcast(channelNames, &msgPack)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error {
|
c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error {
|
||||||
|
@ -534,7 +534,7 @@ func (c *Core) setMsgStreams() error {
|
||||||
CreatePartitionRequest: *req,
|
CreatePartitionRequest: *req,
|
||||||
}
|
}
|
||||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||||
return c.dmlChannels.BroadcastAll(channelNames, &msgPack)
|
return c.dmlChannels.Broadcast(channelNames, &msgPack)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error {
|
c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error {
|
||||||
|
@ -550,7 +550,7 @@ func (c *Core) setMsgStreams() error {
|
||||||
DropPartitionRequest: *req,
|
DropPartitionRequest: *req,
|
||||||
}
|
}
|
||||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||||
return c.dmlChannels.BroadcastAll(channelNames, &msgPack)
|
return c.dmlChannels.Broadcast(channelNames, &msgPack)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -966,9 +966,12 @@ func (c *Core) Init() error {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.dmlChannels = newDMLChannels(c)
|
c.dmlChannels = newDmlChannels(c, Params.DmlChannelName, Params.DmlChannelNum)
|
||||||
|
|
||||||
|
// recover physical channels for all collections
|
||||||
pc := c.MetaTable.ListCollectionPhysicalChannels()
|
pc := c.MetaTable.ListCollectionPhysicalChannels()
|
||||||
c.dmlChannels.AddProducerChannels(pc...)
|
c.dmlChannels.AddProducerChannels(pc...)
|
||||||
|
log.Debug("recover all physical channels", zap.Any("chanNames", pc))
|
||||||
|
|
||||||
c.chanTimeTick = newTimeTickSync(c)
|
c.chanTimeTick = newTimeTickSync(c)
|
||||||
c.chanTimeTick.AddProxy(c.session)
|
c.chanTimeTick.AddProxy(c.session)
|
||||||
|
|
|
@ -412,7 +412,7 @@ func TestRootCoord(t *testing.T) {
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
||||||
|
|
||||||
assert.Equal(t, 2, len(core.dmlChannels.dml))
|
assert.Equal(t, 2, core.dmlChannels.GetNumChannels())
|
||||||
|
|
||||||
pChan := core.MetaTable.ListCollectionPhysicalChannels()
|
pChan := core.MetaTable.ListCollectionPhysicalChannels()
|
||||||
dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName)
|
dmlStream.AsConsumer([]string{pChan[0]}, Params.MsgChannelSubName)
|
||||||
|
@ -1433,7 +1433,10 @@ func TestRootCoord(t *testing.T) {
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
core.dmlChannels.AddProducerChannels("c0", "c1", "c2")
|
cn0 := core.dmlChannels.GetDmlMsgStreamName()
|
||||||
|
cn1 := core.dmlChannels.GetDmlMsgStreamName()
|
||||||
|
cn2 := core.dmlChannels.GetDmlMsgStreamName()
|
||||||
|
core.dmlChannels.AddProducerChannels(cn0, cn1, cn2)
|
||||||
|
|
||||||
msg0 := &internalpb.ChannelTimeTickMsg{
|
msg0 := &internalpb.ChannelTimeTickMsg{
|
||||||
Base: &commonpb.MsgBase{
|
Base: &commonpb.MsgBase{
|
||||||
|
|
|
@ -136,7 +136,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
|
||||||
vchanNames := make([]string, t.Req.ShardsNum)
|
vchanNames := make([]string, t.Req.ShardsNum)
|
||||||
chanNames := make([]string, t.Req.ShardsNum)
|
chanNames := make([]string, t.Req.ShardsNum)
|
||||||
for i := int32(0); i < t.Req.ShardsNum; i++ {
|
for i := int32(0); i < t.Req.ShardsNum; i++ {
|
||||||
vchanNames[i] = fmt.Sprintf("%s_%d_%d_v", t.Req.CollectionName, collID, i)
|
vchanNames[i] = fmt.Sprintf("%s_%dv", t.core.dmlChannels.GetDmlMsgStreamName(), collID)
|
||||||
chanNames[i] = ToPhysicalChannel(vchanNames[i])
|
chanNames[i] = ToPhysicalChannel(vchanNames[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,10 +286,8 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
|
||||||
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
|
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
|
||||||
t.core.SendTimeTick(ts, reason)
|
t.core.SendTimeTick(ts, reason)
|
||||||
|
|
||||||
for _, chanName := range collMeta.PhysicalChannelNames {
|
|
||||||
// send tt into deleted channels to tell data_node to clear flowgragh
|
// send tt into deleted channels to tell data_node to clear flowgragh
|
||||||
t.core.chanTimeTick.SendChannelTimeTick(chanName, ts)
|
t.core.chanTimeTick.SendTimeTickToChannel(collMeta.PhysicalChannelNames, ts)
|
||||||
}
|
|
||||||
|
|
||||||
// remove dml channel after send dd msg
|
// remove dml channel after send dd msg
|
||||||
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)
|
t.core.dmlChannels.RemoveProducerChannels(collMeta.PhysicalChannelNames...)
|
||||||
|
|
|
@ -22,16 +22,12 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ddlTimetickInfo struct {
|
|
||||||
ddlMinTs typeutil.Timestamp
|
|
||||||
ddlTsSet map[typeutil.Timestamp]struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
type timetickSync struct {
|
type timetickSync struct {
|
||||||
core *Core
|
core *Core
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
@ -191,7 +187,7 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
|
||||||
|
|
||||||
t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in)
|
t.proxyTimeTick[in.Base.SourceID] = newChannelTimeTickMsg(in)
|
||||||
//log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID),
|
//log.Debug("update proxyTimeTick", zap.Int64("source id", in.Base.SourceID),
|
||||||
// zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason))
|
// zap.Any("Ts", in.Timestamps), zap.Uint64("inTs", in.DefaultTimestamp), zap.String("reason", reason))
|
||||||
|
|
||||||
t.sendToChannel()
|
t.sendToChannel()
|
||||||
return nil
|
return nil
|
||||||
|
@ -227,32 +223,51 @@ func (t *timetickSync) StartWatch() {
|
||||||
case <-t.core.ctx.Done():
|
case <-t.core.ctx.Done():
|
||||||
log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err()))
|
log.Debug("rootcoord context done", zap.Error(t.core.ctx.Err()))
|
||||||
return
|
return
|
||||||
case ptt, ok := <-t.sendChan:
|
case proxyTimetick, ok := <-t.sendChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Debug("timetickSync sendChan closed")
|
log.Debug("timetickSync sendChan closed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// reduce each channel to get min timestamp
|
// reduce each channel to get min timestamp
|
||||||
mtt := ptt[t.core.session.ServerID]
|
local := proxyTimetick[t.core.session.ServerID]
|
||||||
for _, chanName := range mtt.in.ChannelNames {
|
if len(local.in.ChannelNames) == 0 {
|
||||||
mints := mtt.getTimetick(chanName)
|
continue
|
||||||
for _, tt := range ptt {
|
}
|
||||||
|
|
||||||
|
hdr := fmt.Sprintf("send ts to %d channels", len(local.in.ChannelNames))
|
||||||
|
tr := timerecord.NewTimeRecorder(hdr)
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for _, chanName := range local.in.ChannelNames {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(chanName string) {
|
||||||
|
mints := local.getTimetick(chanName)
|
||||||
|
for _, tt := range proxyTimetick {
|
||||||
ts := tt.getTimetick(chanName)
|
ts := tt.getTimetick(chanName)
|
||||||
if ts < mints {
|
if ts < mints {
|
||||||
mints = ts
|
mints = ts
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := t.SendChannelTimeTick(chanName, mints); err != nil {
|
if err := t.SendTimeTickToChannel([]string{chanName}, mints); err != nil {
|
||||||
log.Debug("SendChannelTimeTick fail", zap.Error(err))
|
log.Debug("SendTimeTickToChannel fail", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
wg.Done()
|
||||||
|
}(chanName)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
span := tr.ElapseSpan()
|
||||||
|
// rootcoord send tt msg to all channels every 200ms by default
|
||||||
|
if span.Milliseconds() > 200 {
|
||||||
|
log.Warn("rootcoord send tt to all channels too slowly",
|
||||||
|
zap.Int("chanNum", len(local.in.ChannelNames)),
|
||||||
|
zap.Int64("span", span.Milliseconds()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendChannelTimeTick send each channel's min timetick to msg stream
|
// SendTimeTickToChannel send each channel's min timetick to msg stream
|
||||||
func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestamp) error {
|
func (t *timetickSync) SendTimeTickToChannel(chanNames []string, ts typeutil.Timestamp) error {
|
||||||
msgPack := msgstream.MsgPack{}
|
msgPack := msgstream.MsgPack{}
|
||||||
baseMsg := msgstream.BaseMsg{
|
baseMsg := msgstream.BaseMsg{
|
||||||
BeginTimestamp: ts,
|
BeginTimestamp: ts,
|
||||||
|
@ -273,11 +288,14 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam
|
||||||
}
|
}
|
||||||
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
|
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
|
||||||
|
|
||||||
err := t.core.dmlChannels.Broadcast(chanName, &msgPack)
|
if err := t.core.dmlChannels.Broadcast(chanNames, &msgPack); err != nil {
|
||||||
if err == nil {
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, chanName := range chanNames {
|
||||||
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts)))
|
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts)))
|
||||||
}
|
}
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetProxyNum return the num of detected proxy node
|
// GetProxyNum return the num of detected proxy node
|
||||||
|
|
|
@ -33,19 +33,30 @@ func NewTimeRecorder(header string) *TimeRecorder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record calculates the time span from previous Record call
|
func (tr *TimeRecorder) RecordSpan() time.Duration {
|
||||||
func (tr *TimeRecorder) Record(msg string) time.Duration {
|
|
||||||
curr := time.Now()
|
curr := time.Now()
|
||||||
span := curr.Sub(tr.last)
|
span := curr.Sub(tr.last)
|
||||||
tr.last = curr
|
tr.last = curr
|
||||||
|
return span
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr *TimeRecorder) ElapseSpan() time.Duration {
|
||||||
|
curr := time.Now()
|
||||||
|
span := curr.Sub(tr.start)
|
||||||
|
tr.last = curr
|
||||||
|
return span
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record calculates the time span from previous Record call
|
||||||
|
func (tr *TimeRecorder) Record(msg string) time.Duration {
|
||||||
|
span := tr.RecordSpan()
|
||||||
tr.printTimeRecord(msg, span)
|
tr.printTimeRecord(msg, span)
|
||||||
return span
|
return span
|
||||||
}
|
}
|
||||||
|
|
||||||
// Elapse calculates the time span from the beginning of this TimeRecorder
|
// Elapse calculates the time span from the beginning of this TimeRecorder
|
||||||
func (tr *TimeRecorder) Elapse(msg string) time.Duration {
|
func (tr *TimeRecorder) Elapse(msg string) time.Duration {
|
||||||
curr := time.Now()
|
span := tr.ElapseSpan()
|
||||||
span := curr.Sub(tr.start)
|
|
||||||
tr.printTimeRecord(msg, span)
|
tr.printTimeRecord(msg, span)
|
||||||
return span
|
return span
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue