Enhance newDataSyncService (#27277)

- Add flowgraph.Assemble assembles nodes in flowgraph.go
- remove fgCtx in newDataSyncService
- Add newServiceWithEtcdTickler func, reduce param numbers to 3
- Remove unnecessary params
  - config.maxQueueLength, config.maxParallelish

See also: #27207

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/27351/head
XuanYang-cn 2023-09-27 11:07:25 +08:00 committed by GitHub
parent 9fb4c27a90
commit 5c5f9aa05e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 398 additions and 506 deletions

View File

@ -75,8 +75,7 @@ var Params *paramtable.ComponentParam = paramtable.Get()
// `etcdCli` is a connection of etcd
// `rootCoord` is a grpc client of root coordinator.
// `dataCoord` is a grpc client of data service.
// `NodeID` is unique to each datanode.
// `State` is current statement of this data node, indicating whether it's healthy.
// `stateCode` is current statement of this data node, indicating whether it's healthy.
//
// `clearSignal` is a signal channel for releasing the flowgraph resources.
// `segmentCache` stores all flushing and flushed segments.

View File

@ -197,7 +197,7 @@ func TestDataNode(t *testing.T) {
}
for _, test := range testDataSyncs {
err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
err = node.flowgraphManager.addAndStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
assert.NoError(t, err)
vchanNameCh <- test.dmChannelName
}

View File

@ -18,7 +18,6 @@ package datanode
import (
"context"
"fmt"
"sync"
"github.com/cockroachdb/errors"
@ -33,7 +32,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -45,93 +43,36 @@ import (
type dataSyncService struct {
ctx context.Context
cancelFn context.CancelFunc
fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages
flushCh chan flushMsg
resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message.
channel Channel // channel stores meta of channel
idAllocator allocator.Allocator // id/timestamp allocator
dispClient msgdispatcher.Client
msFactory msgstream.Factory
channel Channel // channel stores meta of channel
collectionID UniqueID // collection id of vchan for which this data sync service serves
vchannelName string
dataCoord types.DataCoordClient // DataCoord instance to interact with
clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed
// TODO: should be equal to paramtable.GetNodeID(), but intergrationtest has 1 paramtable for a minicluster, the NodeID
// varies, will cause savebinglogpath check fail. So we pass ServerID into dataSyncService to aviod it failure.
serverID UniqueID
fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages
delBufferManager *DeltaBufferManager
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
flushManager flushManager // flush manager handles flush process
chunkManager storage.ChunkManager
flushCh chan flushMsg
resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message.
timetickSender *timeTickSender // reference to timeTickSender
compactor *compactionExecutor // reference to compaction executor
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
serverID int64
stopOnce sync.Once
flushListener chan *segmentFlushPack // chan to listen flush event
timetickSender *timeTickSender // reference to timeTickSender
}
clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed
idAllocator allocator.Allocator // id/timestamp allocator
msFactory msgstream.Factory
dispClient msgdispatcher.Client
dataCoord types.DataCoordClient // DataCoord instance to interact with
chunkManager storage.ChunkManager
func newDataSyncService(
fgCtx, initCtx context.Context,
flushCh chan flushMsg,
resendTTCh chan resendTTMsg,
channel Channel,
alloc allocator.Allocator,
dispClient msgdispatcher.Client,
factory msgstream.Factory,
vchan *datapb.VchannelInfo,
clearSignal chan<- string,
dataCoord types.DataCoordClient,
flushingSegCache *Cache,
chunkManager storage.ChunkManager,
compactor *compactionExecutor,
tickler *tickler,
serverID int64,
timetickSender *timeTickSender,
) (*dataSyncService, error) {
if channel == nil {
return nil, errors.New("Nil input")
}
// test only
flushListener chan *segmentFlushPack // chan to listen flush event
childCtx, cancel := context.WithCancel(fgCtx)
delBufferManager := &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
service := &dataSyncService{
ctx: childCtx,
cancelFn: cancel,
fg: nil,
flushCh: flushCh,
resendTTCh: resendTTCh,
channel: channel,
idAllocator: alloc,
dispClient: dispClient,
msFactory: factory,
collectionID: vchan.GetCollectionID(),
vchannelName: vchan.GetChannelName(),
dataCoord: dataCoord,
clearSignal: clearSignal,
delBufferManager: delBufferManager,
flushingSegCache: flushingSegCache,
chunkManager: chunkManager,
compactor: compactor,
serverID: serverID,
timetickSender: timetickSender,
}
if err := service.initNodes(initCtx, vchan, tickler); err != nil {
return nil, err
}
if tickler.isWatchFailed.Load() {
return nil, errors.Errorf("tickler watch failed")
}
return service, nil
}
type parallelConfig struct {
maxQueueLength int32
maxParallelism int32
stopOnce sync.Once
}
type nodeConfig struct {
@ -140,13 +81,7 @@ type nodeConfig struct {
vChannelName string
channel Channel // Channel info
allocator allocator.Allocator
serverID int64
// defaults
parallelConfig
}
func newParallelConfig() parallelConfig {
return parallelConfig{Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32(), Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32()}
serverID UniqueID
}
// start the flow graph in datasyncservice
@ -184,8 +119,12 @@ func (dsService *dataSyncService) close() {
dsService.clearGlobalFlushingCache()
close(dsService.flushCh)
dsService.flushManager.close()
log.Info("dataSyncService flush manager closed")
if dsService.flushManager != nil {
dsService.flushManager.close()
log.Info("dataSyncService flush manager closed")
}
dsService.cancelFn()
dsService.channel.close()
@ -198,47 +137,43 @@ func (dsService *dataSyncService) clearGlobalFlushingCache() {
dsService.flushingSegCache.Remove(segments...)
}
// initNodes inits a TimetickedFlowGraph
// initCtx are used to init only.
func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo *datapb.VchannelInfo, tickler *tickler) error {
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
// initialize flush manager for DataSync Service
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.chunkManager, dsService.channel,
flushNotifyFunc(dsService, retry.Attempts(50)), dropVirtualChannelFunc(dsService))
// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord
// TODO: add a broker for the rpc
func getSegmentInfos(ctx context.Context, datacoord types.DataCoordClient, segmentIDs []int64) ([]*datapb.SegmentInfo, error) {
infoResp, err := datacoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
SegmentIDs: segmentIDs,
IncludeUnHealthy: true,
})
if err := funcutil.VerifyResponse(infoResp, err); err != nil {
log.Error("Fail to get SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
log.Info("begin to init data sync service", zap.Int64("collection", vchanInfo.CollectionID),
zap.String("Chan", vchanInfo.ChannelName),
zap.Int64s("unflushed", vchanInfo.GetUnflushedSegmentIds()),
zap.Int64s("flushed", vchanInfo.GetFlushedSegmentIds()),
return infoResp.Infos, nil
}
// getChannelWithEtcdTickler updates progress into etcd when a new segment is added into channel.
func getChannelWithEtcdTickler(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *etcdTickler, unflushed, flushed []*datapb.SegmentInfo) (Channel, error) {
var (
channelName = info.GetVchan().GetChannelName()
collectionID = info.GetVchan().GetCollectionID()
recoverTs = info.GetVchan().GetSeekPosition().GetTimestamp()
)
var err error
// recover segment checkpoints
unflushedSegmentInfos, err := dsService.getSegmentInfos(initCtx, vchanInfo.GetUnflushedSegmentIds())
if err != nil {
return err
}
flushedSegmentInfos, err := dsService.getSegmentInfos(initCtx, vchanInfo.GetFlushedSegmentIds())
if err != nil {
return err
}
// init channel meta
channel := newChannel(channelName, collectionID, info.GetSchema(), node.rootCoord, node.chunkManager)
// tickler will update addSegment progress to watchInfo
tickler.watch()
defer tickler.stop()
futures := make([]*conc.Future[any], 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos))
for _, us := range unflushedSegmentInfos {
if us.CollectionID != dsService.collectionID ||
us.GetInsertChannel() != vchanInfo.ChannelName {
log.Warn("Collection ID or ChannelName not match",
zap.Int64("Wanted ID", dsService.collectionID),
zap.Int64("Actual ID", us.CollectionID),
zap.String("Wanted channel Name", vchanInfo.ChannelName),
zap.String("Actual Channel Name", us.GetInsertChannel()),
)
continue
}
futures := make([]*conc.Future[any], 0, len(unflushed)+len(flushed))
for _, us := range unflushed {
log.Info("recover growing segments from checkpoints",
zap.String("vChannelName", us.GetInsertChannel()),
zap.Int64("segmentID", us.GetID()),
@ -248,7 +183,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo *
// avoid closure capture iteration variable
segment := us
future := getOrCreateIOPool().Submit(func() (interface{}, error) {
if err := dsService.channel.addSegment(initCtx, addSegmentReq{
if err := channel.addSegment(initCtx, addSegmentReq{
segType: datapb.SegmentType_Normal,
segID: segment.GetID(),
collID: segment.CollectionID,
@ -257,7 +192,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo *
statsBinLogs: segment.Statslogs,
binLogs: segment.GetBinlogs(),
endPos: segment.GetDmlPosition(),
recoverTs: vchanInfo.GetSeekPosition().GetTimestamp(),
recoverTs: recoverTs,
}); err != nil {
return nil, err
}
@ -267,17 +202,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo *
futures = append(futures, future)
}
for _, fs := range flushedSegmentInfos {
if fs.CollectionID != dsService.collectionID ||
fs.GetInsertChannel() != vchanInfo.ChannelName {
log.Warn("Collection ID or ChannelName not match",
zap.Int64("Wanted ID", dsService.collectionID),
zap.Int64("Actual ID", fs.CollectionID),
zap.String("Wanted Channel Name", vchanInfo.ChannelName),
zap.String("Actual Channel Name", fs.GetInsertChannel()),
)
continue
}
for _, fs := range flushed {
log.Info("recover sealed segments form checkpoints",
zap.String("vChannelName", fs.GetInsertChannel()),
zap.Int64("segmentID", fs.GetID()),
@ -286,7 +211,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo *
// avoid closure capture iteration variable
segment := fs
future := getOrCreateIOPool().Submit(func() (interface{}, error) {
if err := dsService.channel.addSegment(initCtx, addSegmentReq{
if err := channel.addSegment(initCtx, addSegmentReq{
segType: datapb.SegmentType_Flushed,
segID: segment.GetID(),
collID: segment.GetCollectionID(),
@ -294,7 +219,7 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo *
numOfRows: segment.GetNumOfRows(),
statsBinLogs: segment.GetStatslogs(),
binLogs: segment.GetBinlogs(),
recoverTs: vchanInfo.GetSeekPosition().GetTimestamp(),
recoverTs: recoverTs,
}); err != nil {
return nil, err
}
@ -304,163 +229,149 @@ func (dsService *dataSyncService) initNodes(initCtx context.Context, vchanInfo *
futures = append(futures, future)
}
err = conc.AwaitAll(futures...)
if err != nil {
return err
if err := conc.AwaitAll(futures...); err != nil {
return nil, err
}
c := &nodeConfig{
msFactory: dsService.msFactory,
collectionID: vchanInfo.GetCollectionID(),
vChannelName: vchanInfo.GetChannelName(),
channel: dsService.channel,
allocator: dsService.idAllocator,
parallelConfig: newParallelConfig(),
serverID: dsService.serverID,
if tickler.isWatchFailed.Load() {
return nil, errors.Errorf("tickler watch failed")
}
var dmStreamNode Node
dmStreamNode, err = newDmInputNode(initCtx, dsService.dispClient, vchanInfo.GetSeekPosition(), c)
if err != nil {
return err
}
var ddNode Node
ddNode, err = newDDNode(
dsService.ctx,
dsService.collectionID,
vchanInfo.GetChannelName(),
vchanInfo.GetDroppedSegmentIds(),
flushedSegmentInfos,
unflushedSegmentInfos,
dsService.compactor)
if err != nil {
return err
}
var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
dsService.collectionID,
dsService.delBufferManager,
dsService.flushCh,
dsService.resendTTCh,
dsService.flushManager,
dsService.flushingSegCache,
c,
dsService.timetickSender,
)
if err != nil {
return err
}
var deleteNode Node
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, dsService.delBufferManager, dsService.clearSignal, c)
if err != nil {
return err
}
var ttNode Node
ttNode, err = newTTNode(c, dsService.dataCoord)
if err != nil {
return err
}
dsService.fg.AddNode(dmStreamNode)
dsService.fg.AddNode(ddNode)
dsService.fg.AddNode(insertBufferNode)
dsService.fg.AddNode(deleteNode)
dsService.fg.AddNode(ttNode)
// ddStreamNode
err = dsService.fg.SetEdges(dmStreamNode.Name(),
[]string{ddNode.Name()},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
return err
}
// ddNode
err = dsService.fg.SetEdges(ddNode.Name(),
[]string{insertBufferNode.Name()},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
return err
}
// insertBufferNode
err = dsService.fg.SetEdges(insertBufferNode.Name(),
[]string{deleteNode.Name()},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
return err
}
// deleteNode
err = dsService.fg.SetEdges(deleteNode.Name(),
[]string{ttNode.Name()},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", deleteNode.Name()), zap.Error(err))
return err
}
// ttNode
err = dsService.fg.SetEdges(ttNode.Name(),
[]string{},
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", ttNode.Name()), zap.Error(err))
return err
}
return nil
return channel, nil
}
// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord
func (dsService *dataSyncService) getSegmentInfos(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error) {
infoResp, err := dsService.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
SegmentIDs: segmentIDs,
IncludeUnHealthy: true,
})
if err != nil {
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(infoResp.GetStatus().Reason)
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
return infoResp.Infos, nil
}
func (dsService *dataSyncService) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) {
pChannelName := funcutil.ToPhysicalChannel(channelName)
dmlStream, err := dsService.msFactory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
defer dmlStream.Close()
subName := fmt.Sprintf("datanode-%d-%s-%d", paramtable.GetNodeID(), channelName, segmentID)
log.Debug("dataSyncService register consumer for getChannelLatestMsgID",
zap.String("pChannelName", pChannelName),
zap.String("subscription", subName),
func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, channel Channel, unflushed, flushed []*datapb.SegmentInfo) (*dataSyncService, error) {
var (
channelName = info.GetVchan().GetChannelName()
collectionID = info.GetVchan().GetCollectionID()
)
dmlStream.AsConsumer(ctx, []string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown)
id, err := dmlStream.GetLatestMsgID(pChannelName)
config := &nodeConfig{
msFactory: node.factory,
allocator: node.allocator,
collectionID: collectionID,
vChannelName: channelName,
channel: channel,
serverID: node.session.ServerID,
}
var (
flushCh = make(chan flushMsg, 100)
resendTTCh = make(chan resendTTMsg, 100)
delBufferManager = &DeltaBufferManager{
channel: channel,
delBufHeap: &PriorityQueue{},
}
)
ctx, cancel := context.WithCancel(node.ctx)
ds := &dataSyncService{
ctx: ctx,
cancelFn: cancel,
flushCh: flushCh,
resendTTCh: resendTTCh,
delBufferManager: delBufferManager,
dispClient: node.dispClient,
msFactory: node.factory,
dataCoord: node.dataCoord,
idAllocator: config.allocator,
channel: config.channel,
collectionID: config.collectionID,
vchannelName: config.vChannelName,
serverID: config.serverID,
flushingSegCache: node.segmentCache,
clearSignal: node.clearSignal,
chunkManager: node.chunkManager,
compactor: node.compactionExecutor,
timetickSender: node.timeTickSender,
fg: nil,
flushManager: nil,
}
// init flushManager
flushManager := NewRendezvousFlushManager(
node.allocator,
node.chunkManager,
channel,
flushNotifyFunc(ds, retry.Attempts(50)), dropVirtualChannelFunc(ds),
)
ds.flushManager = flushManager
// init flowgraph
fg := flowgraph.NewTimeTickedFlowGraph(node.ctx)
dmStreamNode, err := newDmInputNode(initCtx, node.dispClient, info.GetVchan().GetSeekPosition(), config)
if err != nil {
log.Error("fail to GetLatestMsgID", zap.String("pChannelName", pChannelName), zap.Error(err))
return nil, err
}
return id.Serialize(), nil
ddNode, err := newDDNode(
node.ctx,
collectionID,
channelName,
info.GetVchan().GetDroppedSegmentIds(),
flushed,
unflushed,
node.compactionExecutor,
)
if err != nil {
return nil, err
}
insertBufferNode, err := newInsertBufferNode(
node.ctx,
flushCh,
resendTTCh,
delBufferManager,
flushManager,
node.segmentCache,
node.timeTickSender,
config,
)
if err != nil {
return nil, err
}
deleteNode, err := newDeleteNode(node.ctx, flushManager, delBufferManager, node.clearSignal, config)
if err != nil {
return nil, err
}
ttNode, err := newTTNode(config, node.dataCoord)
if err != nil {
return nil, err
}
if err := fg.AssembleNodes(dmStreamNode, ddNode, insertBufferNode, deleteNode, ttNode); err != nil {
return nil, err
}
ds.fg = fg
return ds, nil
}
// newServiceWithEtcdTickler gets a dataSyncService, but flowgraphs are not running
// initCtx is used to init the dataSyncService only, if initCtx.Canceled or initCtx.Timeout
// newServiceWithEtcdTickler stops and returns the initCtx.Err()
func newServiceWithEtcdTickler(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *etcdTickler) (*dataSyncService, error) {
// recover segment checkpoints
unflushedSegmentInfos, err := getSegmentInfos(initCtx, node.dataCoord, info.GetVchan().GetUnflushedSegmentIds())
if err != nil {
return nil, err
}
flushedSegmentInfos, err := getSegmentInfos(initCtx, node.dataCoord, info.GetVchan().GetFlushedSegmentIds())
if err != nil {
return nil, err
}
// init channel meta
channel, err := getChannelWithEtcdTickler(initCtx, node, info, tickler, unflushedSegmentInfos, flushedSegmentInfos)
if err != nil {
return nil, err
}
return getServiceWithChannel(initCtx, node, info, channel, unflushedSegmentInfos, flushedSegmentInfos)
}

View File

@ -41,11 +41,9 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var dataSyncServiceTestDir = "/tmp/milvus_test/data_sync_service"
@ -54,6 +52,12 @@ func init() {
paramtable.Init()
}
func getWatchInfo(info *testInfo) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
Vchan: getVchanInfo(info),
}
}
func getVchanInfo(info *testInfo) *datapb.VchannelInfo {
var ufs []*datapb.SegmentInfo
var fs []*datapb.SegmentInfo
@ -99,7 +103,7 @@ func getVchanInfo(info *testInfo) *datapb.VchannelInfo {
type testInfo struct {
isValidCase bool
channelNil bool
inMsgFactory msgstream.Factory
inMsgFactory dependency.Factory
collID UniqueID
chanName string
@ -117,30 +121,16 @@ type testInfo struct {
description string
}
func TestDataSyncService_newDataSyncService(t *testing.T) {
func TestDataSyncService_getDataSyncService(t *testing.T) {
ctx := context.Background()
tests := []*testInfo{
{
true, false, &mockMsgStreamFactory{false, true},
0, "by-dev-rootcoord-dml-test_v0",
0, 0, "", 0,
0, 0, "", 0,
"SetParamsReturnError",
},
{
true, false, &mockMsgStreamFactory{true, true},
0, "by-dev-rootcoord-dml-test_v0",
1, "by-dev-rootcoord-dml-test_v0",
1, 0, "", 0,
1, 1, "", 0,
"CollID 0 mismach with seginfo collID 1",
},
{
true, false, &mockMsgStreamFactory{true, true},
1, "by-dev-rootcoord-dml-test_v1",
1, 0, "by-dev-rootcoord-dml-test_v2", 0,
1, 1, "by-dev-rootcoord-dml-test_v3", 0,
"chanName c1 mismach with seginfo chanName c2",
1, 0, "", 0,
"SetParamsReturnError",
},
{
true, false, &mockMsgStreamFactory{true, true},
@ -160,34 +150,16 @@ func TestDataSyncService_newDataSyncService(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
df := &DataCoordFactory{}
rc := &RootCoordFactory{pkType: schemapb.DataType_Int64}
channel := newChannel("channel", test.collID, nil, rc, cm)
if test.channelNil {
channel = nil
}
dispClient := msgdispatcher.NewClient(test.inMsgFactory, typeutil.DataNodeRole, paramtable.GetNodeID())
ds, err := newDataSyncService(ctx,
node.factory = test.inMsgFactory
ds, err := newServiceWithEtcdTickler(
ctx,
make(chan flushMsg),
make(chan resendTTMsg),
channel,
allocator.NewMockAllocator(t),
dispClient,
test.inMsgFactory,
getVchanInfo(test),
make(chan string),
df,
newCache(),
cm,
newCompactionExecutor(),
node,
getWatchInfo(test),
genTestTickler(),
0,
nil,
)
if !test.isValidCase {
@ -208,34 +180,31 @@ func TestDataSyncService_newDataSyncService(t *testing.T) {
// NOTE: start pulsar before test
func TestDataSyncService_Start(t *testing.T) {
const ctxTimeInMillisecond = 10000
os.RemoveAll("/tmp/milvus")
defer os.RemoveAll("/tmp/milvus")
delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), delay)
defer cancel()
// init data node
insertChannelName := fmt.Sprintf("by-dev-rootcoord-dml-%d", rand.Int())
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
node := newIDLEDataNodeMock(context.Background(), schemapb.DataType_Int64)
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer node.chunkManager.RemoveWithPrefix(ctx, node.chunkManager.RootPath())
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222),
func(count uint32) int64 {
return int64(22222 + count)
}, nil)
factory := dependency.NewDefaultFactory(true)
dispClient := msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID())
defer os.RemoveAll("/tmp/milvus")
node.allocator = alloc
var (
insertChannelName = fmt.Sprintf("by-dev-rootcoord-dml-%d", rand.Int())
Factory = &MetaFactory{}
collMeta = Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
)
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "1")
ufs := []*datapb.SegmentInfo{{
@ -262,17 +231,18 @@ func TestDataSyncService_Start(t *testing.T) {
for _, segmentInfo := range fs {
fsIds = append(fsIds, segmentInfo.ID)
}
vchan := &datapb.VchannelInfo{
CollectionID: collMeta.ID,
ChannelName: insertChannelName,
UnflushedSegmentIds: ufsIds,
FlushedSegmentIds: fsIds,
watchInfo := &datapb.ChannelWatchInfo{
Schema: collMeta.GetSchema(),
Vchan: &datapb.VchannelInfo{
CollectionID: collMeta.ID,
ChannelName: insertChannelName,
UnflushedSegmentIds: ufsIds,
FlushedSegmentIds: fsIds,
},
}
signalCh := make(chan string, 100)
dataCoord := &DataCoordFactory{}
dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{
node.dataCoord.(*DataCoordFactory).UserSegmentInfo = map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
@ -288,9 +258,14 @@ func TestDataSyncService_Start(t *testing.T) {
},
}
atimeTickSender := newTimeTickSender(dataCoord, 0)
sync, err := newDataSyncService(ctx, ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender)
assert.Nil(t, err)
sync, err := newServiceWithEtcdTickler(
ctx,
node,
watchInfo,
genTestTickler(),
)
require.NoError(t, err)
require.NotNil(t, sync)
sync.flushListener = make(chan *segmentFlushPack)
defer close(sync.flushListener)
@ -338,7 +313,7 @@ func TestDataSyncService_Start(t *testing.T) {
// pulsar produce
assert.NoError(t, err)
insertStream, _ := factory.NewMsgStream(ctx)
insertStream, _ := node.factory.NewMsgStream(ctx)
insertStream.AsProducer([]string{insertChannelName})
var insertMsgStream msgstream.MsgStream = insertStream
@ -372,13 +347,12 @@ func TestDataSyncService_Close(t *testing.T) {
var (
insertChannelName = "by-dev-rootcoord-dml2"
metaFactory = &MetaFactory{}
mockRootCoord = &RootCoordFactory{pkType: schemapb.DataType_Int64}
collMeta = metaFactory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
cm = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
metaFactory = &MetaFactory{}
collMeta = metaFactory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
node = newIDLEDataNodeMock(context.Background(), schemapb.DataType_Int64)
)
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer node.chunkManager.RemoveWithPrefix(ctx, node.chunkManager.RootPath())
ufs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
@ -404,29 +378,25 @@ func TestDataSyncService_Close(t *testing.T) {
for _, segmentInfo := range fs {
fsIds = append(fsIds, segmentInfo.ID)
}
vchan := &datapb.VchannelInfo{
CollectionID: collMeta.ID,
ChannelName: insertChannelName,
UnflushedSegmentIds: ufsIds,
FlushedSegmentIds: fsIds,
watchInfo := &datapb.ChannelWatchInfo{
Schema: collMeta.GetSchema(),
Vchan: &datapb.VchannelInfo{
CollectionID: collMeta.ID,
ChannelName: insertChannelName,
UnflushedSegmentIds: ufsIds,
FlushedSegmentIds: fsIds,
},
}
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222),
func(count uint32) int64 {
return int64(22222 + count)
}, nil)
node.allocator = alloc
var (
flushChan = make(chan flushMsg, 100)
resendTTChan = make(chan resendTTMsg, 100)
signalCh = make(chan string, 100)
factory = dependency.NewDefaultFactory(true)
dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID())
mockDataCoord = &DataCoordFactory{}
)
mockDataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{
node.dataCoord.(*DataCoordFactory).UserSegmentInfo = map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
@ -445,13 +415,17 @@ func TestDataSyncService_Close(t *testing.T) {
// No Auto flush
paramtable.Get().Reset(Params.DataNodeCfg.FlushInsertBufferSize.Key)
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
channel.syncPolicies = []segmentSyncPolicy{
syncService, err := newServiceWithEtcdTickler(
context.Background(),
node,
watchInfo,
genTestTickler(),
)
assert.NoError(t, err)
assert.NotNil(t, syncService)
syncService.channel.(*ChannelMeta).syncPolicies = []segmentSyncPolicy{
syncMemoryTooHigh(),
}
atimeTickSender := newTimeTickSender(mockDataCoord, 0)
syncService, err := newDataSyncService(ctx, ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender)
assert.NoError(t, err)
syncService.flushListener = make(chan *segmentFlushPack, 10)
defer close(syncService.flushListener)
@ -524,7 +498,7 @@ func TestDataSyncService_Close(t *testing.T) {
// pulsar produce
assert.NoError(t, err)
insertStream, _ := factory.NewMsgStream(ctx)
insertStream, _ := node.factory.NewMsgStream(ctx)
insertStream.AsProducer([]string{insertChannelName})
var insertMsgStream msgstream.MsgStream = insertStream
@ -628,22 +602,19 @@ func TestBytesReader(t *testing.T) {
func TestGetSegmentInfos(t *testing.T) {
dataCoord := &DataCoordFactory{}
dsService := &dataSyncService{
dataCoord: dataCoord,
}
ctx := context.Background()
segmentInfos, err := dsService.getSegmentInfos(ctx, []int64{1})
segmentInfos, err := getSegmentInfos(ctx, dataCoord, []int64{1})
assert.NoError(t, err)
assert.Equal(t, 1, len(segmentInfos))
dataCoord.GetSegmentInfosError = true
segmentInfos2, err := dsService.getSegmentInfos(ctx, []int64{1})
segmentInfos2, err := getSegmentInfos(ctx, dataCoord, []int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos2)
dataCoord.GetSegmentInfosError = false
dataCoord.GetSegmentInfosNotSuccess = true
segmentInfos3, err := dsService.getSegmentInfos(ctx, []int64{1})
segmentInfos3, err := getSegmentInfos(ctx, dataCoord, []int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos3)
@ -658,7 +629,7 @@ func TestGetSegmentInfos(t *testing.T) {
},
}
segmentInfos, err = dsService.getSegmentInfos(ctx, []int64{5})
segmentInfos, err = getSegmentInfos(ctx, dataCoord, []int64{5})
assert.NoError(t, err)
assert.Equal(t, 1, len(segmentInfos))
assert.Equal(t, int64(100), segmentInfos[0].ID)
@ -734,19 +705,13 @@ func TestGetChannelLatestMsgID(t *testing.T) {
delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), delay)
defer cancel()
factory := dependency.NewDefaultFactory(true)
dataCoord := &DataCoordFactory{}
dsService := &dataSyncService{
dataCoord: dataCoord,
msFactory: factory,
}
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
dmlChannelName := "fake-by-dev-rootcoord-dml-channel_12345v0"
insertStream, _ := factory.NewMsgStream(ctx)
insertStream, _ := node.factory.NewMsgStream(ctx)
insertStream.AsProducer([]string{dmlChannelName})
id, err := dsService.getChannelLatestMsgID(ctx, dmlChannelName, 0)
id, err := node.getChannelLatestMsgID(ctx, dmlChannelName, 0)
assert.NoError(t, err)
assert.NotNil(t, id)
}

View File

@ -167,11 +167,11 @@ func parseDeleteEventKey(key string) string {
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
vChanName := watchInfo.GetVchan().GetChannelName()
key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName)
tickler := newTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second))
tickler := newEtcdTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second))
switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
if err := node.flowgraphManager.addAndStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
watchInfo.State = datapb.ChannelWatchState_WatchFailure
} else {
@ -291,7 +291,7 @@ func isEndWatchState(state datapb.ChannelWatchState) bool {
state != datapb.ChannelWatchState_Uncomplete // legacy state, equal to ToWatch
}
type tickler struct {
type etcdTickler struct {
progress *atomic.Int32
version int64
@ -305,11 +305,11 @@ type tickler struct {
isWatchFailed *atomic.Bool
}
func (t *tickler) inc() {
func (t *etcdTickler) inc() {
t.progress.Inc()
}
func (t *tickler) watch() {
func (t *etcdTickler) watch() {
if t.interval == 0 {
log.Info("zero interval, close ticler watch",
zap.String("channelName", t.watchInfo.GetVchan().GetChannelName()),
@ -363,13 +363,13 @@ func (t *tickler) watch() {
}()
}
func (t *tickler) stop() {
func (t *etcdTickler) stop() {
close(t.closeCh)
t.closeWg.Wait()
}
func newTickler(version int64, path string, watchInfo *datapb.ChannelWatchInfo, kv kv.WatchKV, interval time.Duration) *tickler {
return &tickler{
func newEtcdTickler(version int64, path string, watchInfo *datapb.ChannelWatchInfo, kv kv.WatchKV, interval time.Duration) *etcdTickler {
return &etcdTickler{
progress: atomic.NewInt32(0),
path: path,
kv: kv,

View File

@ -432,7 +432,7 @@ func TestEventTickler(t *testing.T) {
kv.RemoveWithPrefix(etcdPrefix)
defer kv.RemoveWithPrefix(etcdPrefix)
tickler := newTickler(0, path.Join(etcdPrefix, channelName), &datapb.ChannelWatchInfo{
tickler := newEtcdTickler(0, path.Join(etcdPrefix, channelName), &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
ChannelName: channelName,
},

View File

@ -198,8 +198,8 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [
func newDeleteNode(ctx context.Context, fm flushManager, manager *DeltaBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32())
return &deleteNode{
ctx: ctx,

View File

@ -66,8 +66,8 @@ func newDmInputNode(initCtx context.Context, dispatcherClient msgdispatcher.Clie
node := flowgraph.NewInputNode(
input,
name,
dmNodeConfig.maxQueueLength,
dmNodeConfig.maxParallelism,
Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32(),
Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32(),
typeutil.DataNodeRole,
paramtable.GetNodeID(),
dmNodeConfig.collectionID,

View File

@ -24,6 +24,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
@ -36,13 +38,14 @@ type mockMsgStreamFactory struct {
NewMsgStreamNoError bool
}
var _ msgstream.Factory = &mockMsgStreamFactory{}
var (
_ msgstream.Factory = &mockMsgStreamFactory{}
_ dependency.Factory = (*mockMsgStreamFactory)(nil)
)
func (mm *mockMsgStreamFactory) Init(params *paramtable.ComponentParam) error {
if !mm.InitReturnNil {
return errors.New("Init Error")
}
return nil
func (mm *mockMsgStreamFactory) Init(params *paramtable.ComponentParam) {}
func (mm *mockMsgStreamFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {
return nil, nil
}
func (mm *mockMsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {

View File

@ -699,12 +699,19 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
return ibNode.channel.getCollectionAndPartitionID(segmentID)
}
func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DeltaBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg,
fm flushManager, flushingSegCache *Cache, config *nodeConfig, timeTickManager *timeTickSender,
func newInsertBufferNode(
ctx context.Context,
flushCh <-chan flushMsg,
resendTTCh <-chan resendTTMsg,
delBufManager *DeltaBufferManager,
fm flushManager,
flushingSegCache *Cache,
timeTickManager *timeTickSender,
config *nodeConfig,
) (*insertBufferNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32())
if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
return &insertBufferNode{
@ -767,7 +774,7 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *De
sub := tsoutil.SubByNow(ts)
pChan := funcutil.ToPhysicalChannel(config.vChannelName)
metrics.DataNodeProduceTimeTickLag.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(collID), pChan).
WithLabelValues(fmt.Sprint(config.serverID), fmt.Sprint(config.collectionID), pChan).
Set(float64(sub))
return wTtMsgStream.Produce(&msgPack)
})

View File

@ -120,7 +120,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
assert.NotNil(t, iBNode)
require.NoError(t, err)
}
@ -221,7 +221,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
require.NoError(t, err)
flushChan <- flushMsg{
@ -387,6 +387,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
c := &nodeConfig{
collectionID: collMeta.GetID(),
channel: channel,
msFactory: factory,
allocator: alloc,
@ -398,7 +399,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
}
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
require.NoError(t, err)
// Auto flush number of rows set to 2
@ -632,6 +633,7 @@ func TestInsertBufferNodeRollBF(t *testing.T) {
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
c := &nodeConfig{
collectionID: collMeta.ID,
channel: channel,
msFactory: factory,
allocator: alloc,
@ -644,7 +646,7 @@ func TestInsertBufferNodeRollBF(t *testing.T) {
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
require.NoError(t, err)
// Auto flush number of rows set to 2
@ -1012,6 +1014,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
c := &nodeConfig{
collectionID: collMeta.ID,
channel: channel,
msFactory: factory,
allocator: alloc,
@ -1024,7 +1027,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
require.NoError(t, err)
inMsg := genFlowGraphInsertMsg(insertChannelName)

View File

@ -115,14 +115,14 @@ func (fm *flowgraphManager) execute(totalMemory uint64) {
}
}
func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *tickler) error {
func (fm *flowgraphManager) addAndStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error {
log := log.With(zap.String("channel", vchan.GetChannelName()))
if fm.flowgraphs.Contain(vchan.GetChannelName()) {
log.Warn("try to add an existed DataSyncService")
return nil
}
dataSyncService, err := getDataSyncService(context.TODO(), dn, &datapb.ChannelWatchInfo{
dataSyncService, err := newServiceWithEtcdTickler(context.TODO(), dn, &datapb.ChannelWatchInfo{
Schema: schema,
Vchan: vchan,
}, tickler)
@ -240,45 +240,3 @@ func (fm *flowgraphManager) collections() []int64 {
return collectionSet.Collect()
}
// getDataSyncService gets and init the dataSyncService
// initCtx is used to init the dataSyncService only, if initCtx.Canceled or initCtx.Timeout
// getDataSyncService stops and returns the initCtx.Err()
func getDataSyncService(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *tickler) (*dataSyncService, error) {
channelName := info.GetVchan().GetChannelName()
log := log.With(zap.String("channel", channelName))
channel := newChannel(
info.GetVchan().GetChannelName(),
info.GetVchan().GetCollectionID(),
info.GetSchema(),
node.rootCoord,
node.chunkManager,
)
dataSyncService, err := newDataSyncService(
node.ctx,
initCtx,
make(chan flushMsg, 100),
make(chan resendTTMsg, 100),
channel,
node.allocator,
node.dispClient,
node.factory,
info.GetVchan(),
node.clearSignal,
node.dataCoord,
node.segmentCache,
node.chunkManager,
node.compactionExecutor,
tickler,
node.GetSession().ServerID,
node.timeTickSender,
)
if err != nil {
log.Warn("fail to create new datasyncservice", zap.Error(err))
return nil, err
}
return dataSyncService, nil
}

View File

@ -64,7 +64,7 @@ func TestFlowGraphManager(t *testing.T) {
}
require.False(t, fm.exist(vchanName))
err := fm.addAndStart(node, vchan, nil, genTestTickler())
err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
@ -79,7 +79,7 @@ func TestFlowGraphManager(t *testing.T) {
}
require.False(t, fm.exist(vchanName))
err := fm.addAndStart(node, vchan, nil, genTestTickler())
err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
@ -97,7 +97,7 @@ func TestFlowGraphManager(t *testing.T) {
}
require.False(t, fm.exist(vchanName))
err := fm.addAndStart(node, vchan, nil, genTestTickler())
err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
fg, ok := fm.getFlowgraphService(vchanName)
@ -147,7 +147,7 @@ func TestFlowGraphManager(t *testing.T) {
}
require.False(t, fm.exist(vchanName))
err := fm.addAndStart(node, vchan, nil, genTestTickler())
err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
@ -226,7 +226,7 @@ func TestFlowGraphManager(t *testing.T) {
vchan := &datapb.VchannelInfo{
ChannelName: vchannel,
}
err = fm.addAndStart(node, vchan, nil, genTestTickler())
err = fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
fg, ok := fm.flowgraphs.Get(vchannel)
assert.True(t, ok)

View File

@ -746,7 +746,7 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(0), // TODO msg type
commonpbutil.WithMsgID(0), // TODO msg id
commonpbutil.WithSourceID(paramtable.GetNodeID()),
commonpbutil.WithSourceID(dsService.serverID),
),
ChannelName: dsService.vchannelName,
}
@ -900,7 +900,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(0),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(dsService.serverID),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
SegmentID: pack.segmentID,
CollectionID: dsService.collectionID,

View File

@ -92,6 +92,7 @@ func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNod
ds := &DataCoordFactory{}
node.dataCoord = ds
node.timeTickSender = newTimeTickSender(node.dataCoord, 0)
return node
}
@ -312,7 +313,8 @@ func (ds *DataCoordFactory) GetSegmentInfo(ctx context.Context, req *datapb.GetS
segmentInfos = append(segmentInfos, segInfo)
} else {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
ID: segmentID,
ID: segmentID,
CollectionID: 1,
})
}
}
@ -1257,6 +1259,6 @@ func genTimestamp() typeutil.Timestamp {
return tsoutil.ComposeTSByTime(gb, 0)
}
func genTestTickler() *tickler {
return newTickler(0, "", nil, nil, 0)
func genTestTickler() *etcdTickler {
return newEtcdTickler(0, "", nil, nil, 0)
}

View File

@ -43,7 +43,9 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -642,7 +644,7 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
// Get the current dml channel position ID, that will be used in segments start positions and end positions.
var posID []byte
err = retry.Do(ctx, func() error {
id, innerError := ds.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId())
id, innerError := node.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId())
posID = id
return innerError
}, retry.Attempts(30))
@ -701,6 +703,28 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
}, nil
}
func (node *DataNode) getChannelLatestMsgID(ctx context.Context, channelName string, segmentID int64) ([]byte, error) {
pChannelName := funcutil.ToPhysicalChannel(channelName)
dmlStream, err := node.factory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
defer dmlStream.Close()
subName := fmt.Sprintf("datanode-%d-%s-%d", paramtable.GetNodeID(), channelName, segmentID)
log.Debug("dataSyncService register consumer for getChannelLatestMsgID",
zap.String("pChannelName", pChannelName),
zap.String("subscription", subName),
)
dmlStream.AsConsumer(ctx, []string{pChannelName}, subName, mqwrapper.SubscriptionPositionUnknown)
id, err := dmlStream.GetLatestMsgID(pChannelName)
if err != nil {
log.Error("fail to GetLatestMsgID", zap.String("pChannelName", pChannelName), zap.Error(err))
return nil, err
}
return id.Serialize(), nil
}
func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil.AssignSegmentFunc {
return func(shardID int, partID int64) (int64, string, error) {
chNames := req.GetImportTask().GetChannelNames()

View File

@ -190,7 +190,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
FlushedSegmentIds: []int64{},
}
err := s.node.flowgraphManager.addAndStart(s.node, vchan, nil, genTestTickler())
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, vchan, nil, genTestTickler())
s.Require().NoError(err)
fgservice, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName)
@ -394,14 +394,14 @@ func (s *DataNodeServicesSuite) TestImport() {
chName1 := "fake-by-dev-rootcoord-dml-testimport-1"
chName2 := "fake-by-dev-rootcoord-dml-testimport-2"
err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName1,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
s.Require().Nil(err)
err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{
err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName2,
UnflushedSegmentIds: []int64{},
@ -472,14 +472,14 @@ func (s *DataNodeServicesSuite) TestImport() {
s.Run("Test Import bad flow graph", func() {
chName1 := "fake-by-dev-rootcoord-dml-testimport-1-badflowgraph"
chName2 := "fake-by-dev-rootcoord-dml-testimport-2-badflowgraph"
err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName1,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
s.Require().Nil(err)
err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{
err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 999, // wrong collection ID.
ChannelName: chName2,
UnflushedSegmentIds: []int64{},
@ -620,14 +620,14 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
chName1 := "fake-by-dev-rootcoord-dml-testaddsegment-1"
chName2 := "fake-by-dev-rootcoord-dml-testaddsegment-2"
err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName1,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
s.Require().NoError(err)
err = s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{
err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName2,
UnflushedSegmentIds: []int64{},
@ -670,7 +670,8 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
func (s *DataNodeServicesSuite) TestSyncSegments() {
chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1"
err := s.node.flowgraphManager.addAndStart(s.node, &datapb.VchannelInfo{
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: chanName,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{100, 200, 300},
@ -679,9 +680,9 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
fg, ok := s.node.flowgraphManager.getFlowgraphService(chanName)
s.Assert().True(ok)
s1 := Segment{segmentID: 100}
s2 := Segment{segmentID: 200}
s3 := Segment{segmentID: 300}
s1 := Segment{segmentID: 100, collectionID: 1}
s2 := Segment{segmentID: 200, collectionID: 1}
s3 := Segment{segmentID: 300, collectionID: 1}
s1.setType(datapb.SegmentType_Flushed)
s2.setType(datapb.SegmentType_Flushed)
s3.setType(datapb.SegmentType_Flushed)
@ -762,7 +763,7 @@ func (s *DataNodeServicesSuite) TestResendSegmentStats() {
FlushedSegmentIds: []int64{},
}
err := s.node.flowgraphManager.addAndStart(s.node, vChan, nil, genTestTickler())
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, vChan, nil, genTestTickler())
s.Require().Nil(err)
fgService, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName)
@ -831,7 +832,7 @@ func (s *DataNodeServicesSuite) TestFlushChannels() {
FlushedSegmentIds: []int64{},
}
err := s.node.flowgraphManager.addAndStart(s.node, vChan, nil, genTestTickler())
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, vChan, nil, genTestTickler())
s.Require().NoError(err)
fgService, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName)

View File

@ -18,6 +18,7 @@ package flowgraph
import (
"context"
"fmt"
"sync"
"github.com/cockroachdb/errors"
@ -125,3 +126,21 @@ func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph {
return &flowGraph
}
func (fg *TimeTickedFlowGraph) AssembleNodes(orderedNodes ...Node) error {
for _, node := range orderedNodes {
fg.AddNode(node)
}
for i, node := range orderedNodes {
// Set edge to the next node
if i < len(orderedNodes)-1 {
err := fg.SetEdges(node.Name(), []string{orderedNodes[i+1].Name()})
if err != nil {
errMsg := fmt.Sprintf("set edges failed for flow graph, node=%s", node.Name())
return errors.New(errMsg)
}
}
}
return nil
}