enhance: Add FlowgraphManager interface (#28852)

- Change flowgraphManager to fgManagerImpl
- Change close to stop
- change execute to controlMemWaterLevel
- Change method name of fgManager for readability
- Add mockery for fgmanager

Issue: #28853

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/28882/head
XuanYang-cn 2023-11-30 18:42:32 +08:00 committed by GitHub
parent bf633bb5d7
commit e62edb991a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 610 additions and 176 deletions

View File

@ -438,6 +438,7 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage
$(INSTALL_PATH)/mockery --name=FlowgraphManager --dir=$(PWD)/internal/datanode --output=$(PWD)/internal/datanode --filename=mock_fgmanager.go --with-expecter --structname=MockFlowgraphManager --outpkg=datanode --inpackage
generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks

View File

@ -37,10 +37,11 @@ type ChannelManager struct {
mu sync.RWMutex
dn *DataNode
communicateCh chan *opState
runningFlowgraphs *flowgraphManager
opRunners *typeutil.ConcurrentMap[string, *opRunner] // channel -> runner
abnormals *typeutil.ConcurrentMap[int64, string] // OpID -> Channel
fgManager FlowgraphManager
communicateCh chan *opState
opRunners *typeutil.ConcurrentMap[string, *opRunner] // channel -> runner
abnormals *typeutil.ConcurrentMap[int64, string] // OpID -> Channel
releaseFunc releaseFunc
@ -52,14 +53,14 @@ type ChannelManager struct {
func NewChannelManager(dn *DataNode) *ChannelManager {
fm := newFlowgraphManager()
cm := ChannelManager{
dn: dn,
dn: dn,
fgManager: fm,
communicateCh: make(chan *opState, 100),
runningFlowgraphs: fm,
opRunners: typeutil.NewConcurrentMap[string, *opRunner](),
abnormals: typeutil.NewConcurrentMap[int64, string](),
communicateCh: make(chan *opState, 100),
opRunners: typeutil.NewConcurrentMap[string, *opRunner](),
abnormals: typeutil.NewConcurrentMap[int64, string](),
releaseFunc: fm.release,
releaseFunc: fm.RemoveFlowgraph,
closeCh: make(chan struct{}),
}
@ -84,7 +85,7 @@ func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.Chan
channel := info.GetVchan().GetChannelName()
switch info.GetState() {
case datapb.ChannelWatchState_ToWatch:
if m.runningFlowgraphs.existWithOpID(channel, info.GetOpID()) {
if m.fgManager.HasFlowgraphWithOpID(channel, info.GetOpID()) {
resp.State = datapb.ChannelWatchState_WatchSuccess
return resp
}
@ -101,7 +102,7 @@ func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.Chan
return resp
case datapb.ChannelWatchState_ToRelease:
if !m.runningFlowgraphs.exist(channel) {
if !m.fgManager.HasFlowgraph(channel) {
resp.State = datapb.ChannelWatchState_ReleaseSuccess
return resp
}
@ -126,16 +127,13 @@ func (m *ChannelManager) Close() {
runner.Close()
return true
})
m.runningFlowgraphs.close()
close(m.closeCh)
m.closeWaiter.Wait()
})
}
func (m *ChannelManager) Start() {
m.closeWaiter.Add(2)
go m.runningFlowgraphs.start(&m.closeWaiter)
m.closeWaiter.Add(1)
go func() {
defer m.closeWaiter.Done()
log.Info("DataNode ChannelManager start")
@ -162,7 +160,7 @@ func (m *ChannelManager) handleOpState(opState *opState) {
switch opState.state {
case datapb.ChannelWatchState_WatchSuccess:
log.Info("Success to watch")
m.runningFlowgraphs.Add(opState.fg)
m.fgManager.AddFlowgraph(opState.fg)
m.finishOp(opState.opID, opState.channel)
case datapb.ChannelWatchState_WatchFailure:

View File

@ -159,7 +159,7 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
s.Equal(datapb.ChannelWatchState_ToWatch, resp.GetState())
s.manager.handleOpState(opState)
s.Equal(1, s.manager.runningFlowgraphs.getFlowGraphNum())
s.Equal(1, s.manager.fgManager.GetFlowgraphCount())
s.True(s.manager.opRunners.Contain(info.GetVchan().GetChannelName()))
s.Equal(1, s.manager.opRunners.Len())
@ -182,7 +182,7 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
s.Equal(info.GetOpID(), resp.GetOpID())
s.Equal(datapb.ChannelWatchState_ReleaseSuccess, resp.GetState())
s.Equal(0, s.manager.runningFlowgraphs.getFlowGraphNum())
s.Equal(0, s.manager.fgManager.GetFlowgraphCount())
s.False(s.manager.opRunners.Contain(info.GetVchan().GetChannelName()))
s.Equal(0, s.manager.opRunners.Len())
}

View File

@ -85,7 +85,7 @@ type DataNode struct {
cancel context.CancelFunc
Role string
stateCode atomic.Value // commonpb.StateCode_Initializing
flowgraphManager *flowgraphManager
flowgraphManager FlowgraphManager
eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager]
syncMgr syncmgr.SyncManager
@ -310,7 +310,7 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
// tryToReleaseFlowgraph tries to release a flowgraph
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
node.flowgraphManager.release(vChanName)
node.flowgraphManager.RemoveFlowgraph(vChanName)
}
// BackGroundGC runs in background to release datanode resources
@ -382,9 +382,6 @@ func (node *DataNode) Start() error {
// Start node watch node
go node.StartWatchChannels(node.ctx)
node.stopWaiter.Add(1)
go node.flowgraphManager.start(&node.stopWaiter)
node.UpdateStateCode(commonpb.StateCode_Healthy)
})
return startErr
@ -417,7 +414,6 @@ func (node *DataNode) Stop() error {
node.stopOnce.Do(func() {
// https://github.com/milvus-io/milvus/issues/12282
node.UpdateStateCode(commonpb.StateCode_Abnormal)
node.flowgraphManager.close()
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph
node.cancel()

View File

@ -206,14 +206,14 @@ func TestDataNode(t *testing.T) {
}
for _, test := range testDataSyncs {
err = node.flowgraphManager.addAndStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
assert.NoError(t, err)
vchanNameCh <- test.dmChannelName
}
assert.Eventually(t, func() bool {
for _, test := range testDataSyncs {
if node.flowgraphManager.exist(test.dmChannelName) {
if node.flowgraphManager.HasFlowgraph(test.dmChannelName) {
return false
}
}

View File

@ -171,7 +171,7 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version
switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
if err := node.flowgraphManager.addAndStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
if err := node.flowgraphManager.AddandStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
watchInfo.State = datapb.ChannelWatchState_WatchFailure
} else {

View File

@ -99,7 +99,7 @@ func TestWatchChannel(t *testing.T) {
assert.NoError(t, err)
assert.Eventually(t, func() bool {
exist := node.flowgraphManager.exist(ch)
exist := node.flowgraphManager.HasFlowgraph(ch)
if !exist {
return false
}
@ -119,7 +119,7 @@ func TestWatchChannel(t *testing.T) {
assert.NoError(t, err)
assert.Eventually(t, func() bool {
exist := node.flowgraphManager.exist(ch)
exist := node.flowgraphManager.HasFlowgraph(ch)
return !exist
}, 3*time.Second, 100*time.Millisecond)
})
@ -170,7 +170,7 @@ func TestWatchChannel(t *testing.T) {
// wait for check goroutine received 2 events
<-c
exist := node.flowgraphManager.exist(ch)
exist := node.flowgraphManager.HasFlowgraph(ch)
assert.False(t, exist)
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
@ -178,7 +178,7 @@ func TestWatchChannel(t *testing.T) {
// TODO there is not way to sync Release done, use sleep for now
time.Sleep(100 * time.Millisecond)
exist = node.flowgraphManager.exist(ch)
exist = node.flowgraphManager.HasFlowgraph(ch)
assert.False(t, exist)
})
@ -189,7 +189,7 @@ func TestWatchChannel(t *testing.T) {
node.handleWatchInfo(e, "test1", []byte{23})
exist := node.flowgraphManager.exist("test1")
exist := node.flowgraphManager.HasFlowgraph("test1")
assert.False(t, exist)
info := datapb.ChannelWatchInfo{
@ -200,7 +200,7 @@ func TestWatchChannel(t *testing.T) {
assert.NoError(t, err)
node.handleWatchInfo(e, "test2", bs)
exist = node.flowgraphManager.exist("test2")
exist = node.flowgraphManager.HasFlowgraph("test2")
assert.False(t, exist)
chPut := make(chan struct{}, 1)
@ -238,7 +238,7 @@ func TestWatchChannel(t *testing.T) {
node.factory = &FailMessageStreamFactory{}
node.handleWatchInfo(e, ch, bs)
<-chPut
exist = node.flowgraphManager.exist(ch)
exist = node.flowgraphManager.HasFlowgraph(ch)
assert.True(t, exist)
})
@ -275,7 +275,7 @@ func TestWatchChannel(t *testing.T) {
node.handleWatchInfo(e, ch, bs)
<-chPut
exist := node.flowgraphManager.exist("test3")
exist := node.flowgraphManager.HasFlowgraph("test3")
assert.False(t, exist)
})

View File

@ -19,8 +19,6 @@ package datanode
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/zap"
@ -28,100 +26,41 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type flowgraphManager struct {
type FlowgraphManager interface {
AddFlowgraph(ds *dataSyncService)
AddandStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error
RemoveFlowgraph(channel string)
ClearFlowgraphs()
GetFlowgraphService(channel string) (*dataSyncService, bool)
HasFlowgraph(channel string) bool
HasFlowgraphWithOpID(channel string, opID UniqueID) bool
GetFlowgraphCount() int
GetCollectionIDs() []int64
}
var _ FlowgraphManager = (*fgManagerImpl)(nil)
type fgManagerImpl struct {
flowgraphs *typeutil.ConcurrentMap[string, *dataSyncService]
closeCh chan struct{}
closeOnce sync.Once
}
func newFlowgraphManager() *flowgraphManager {
return &flowgraphManager{
func newFlowgraphManager() *fgManagerImpl {
return &fgManagerImpl{
flowgraphs: typeutil.NewConcurrentMap[string, *dataSyncService](),
closeCh: make(chan struct{}),
}
}
func (fm *flowgraphManager) start(waiter *sync.WaitGroup) {
defer waiter.Done()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-fm.closeCh:
return
case <-ticker.C:
fm.execute(hardware.GetMemoryCount())
}
}
}
func (fm *flowgraphManager) close() {
fm.dropAll()
fm.closeOnce.Do(func() {
close(fm.closeCh)
})
}
func (fm *flowgraphManager) execute(totalMemory uint64) {
if !Params.DataNodeCfg.MemoryForceSyncEnable.GetAsBool() {
return
}
// TODO change to buffer manager
/*
var total int64
channels := make([]struct {
channel string
bufferSize int64
}, 0)
fm.flowgraphs.Range(func(key string, value *dataSyncService) bool {
size := value.channel.getTotalMemorySize()
channels = append(channels, struct {
channel string
bufferSize int64
}{key, size})
total += size
return true
})
if len(channels) == 0 {
return
}
toMB := func(mem float64) float64 {
return mem / 1024 / 1024
}
memoryWatermark := float64(totalMemory) * Params.DataNodeCfg.MemoryWatermark.GetAsFloat()
if float64(total) < memoryWatermark {
log.RatedDebug(5, "skip force sync because memory level is not high enough",
zap.Float64("current_total_memory_usage", toMB(float64(total))),
zap.Float64("current_memory_watermark", toMB(memoryWatermark)),
zap.Any("channel_memory_usages", channels))
return
}
sort.Slice(channels, func(i, j int) bool {
return channels[i].bufferSize > channels[j].bufferSize
})
if fg, ok := fm.flowgraphs.Get(channels[0].channel); ok { // sync the first channel with the largest memory usage
fg.channel.setIsHighMemory(true)
log.Info("notify flowgraph to sync",
zap.String("channel", channels[0].channel), zap.Int64("bufferSize", channels[0].bufferSize))
}*/
}
func (fm *flowgraphManager) Add(ds *dataSyncService) {
func (fm *fgManagerImpl) AddFlowgraph(ds *dataSyncService) {
fm.flowgraphs.Insert(ds.vchannelName, ds)
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
}
func (fm *flowgraphManager) addAndStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error {
func (fm *fgManagerImpl) AddandStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error {
log := log.With(zap.String("channel", vchan.GetChannelName()))
if fm.flowgraphs.Contain(vchan.GetChannelName()) {
log.Warn("try to add an existed DataSyncService")
@ -143,36 +82,17 @@ func (fm *flowgraphManager) addAndStartWithEtcdTickler(dn *DataNode, vchan *data
return nil
}
func (fm *flowgraphManager) release(vchanName string) {
if fg, loaded := fm.flowgraphs.Get(vchanName); loaded {
func (fm *fgManagerImpl) RemoveFlowgraph(channel string) {
if fg, loaded := fm.flowgraphs.Get(channel); loaded {
fg.close()
fm.flowgraphs.Remove(vchanName)
fm.flowgraphs.Remove(channel)
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
rateCol.removeFlowGraphChannel(vchanName)
rateCol.removeFlowGraphChannel(channel)
}
}
func (fm *flowgraphManager) getFlowgraphService(vchan string) (*dataSyncService, bool) {
return fm.flowgraphs.Get(vchan)
}
func (fm *flowgraphManager) exist(vchan string) bool {
_, exist := fm.getFlowgraphService(vchan)
return exist
}
func (fm *flowgraphManager) existWithOpID(vchan string, opID UniqueID) bool {
ds, exist := fm.getFlowgraphService(vchan)
return exist && ds.opID == opID
}
// getFlowGraphNum returns number of flow graphs.
func (fm *flowgraphManager) getFlowGraphNum() int {
return fm.flowgraphs.Len()
}
func (fm *flowgraphManager) dropAll() {
func (fm *fgManagerImpl) ClearFlowgraphs() {
log.Info("start drop all flowgraph resources in DataNode")
fm.flowgraphs.Range(func(key string, value *dataSyncService) bool {
value.GracefullyClose()
@ -183,7 +103,26 @@ func (fm *flowgraphManager) dropAll() {
})
}
func (fm *flowgraphManager) collections() []int64 {
func (fm *fgManagerImpl) GetFlowgraphService(channel string) (*dataSyncService, bool) {
return fm.flowgraphs.Get(channel)
}
func (fm *fgManagerImpl) HasFlowgraph(channel string) bool {
_, exist := fm.flowgraphs.Get(channel)
return exist
}
func (fm *fgManagerImpl) HasFlowgraphWithOpID(channel string, opID UniqueID) bool {
ds, exist := fm.flowgraphs.Get(channel)
return exist && ds.opID == opID
}
// GetFlowgraphCount returns number of flow graphs.
func (fm *fgManagerImpl) GetFlowgraphCount() int {
return fm.flowgraphs.Len()
}
func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
collectionSet := typeutil.UniqueSet{}
fm.flowgraphs.Range(func(key string, value *dataSyncService) bool {
collectionSet.Insert(value.metacache.Collection())

View File

@ -72,7 +72,7 @@ func TestFlowGraphManager(t *testing.T) {
fm := newFlowgraphManager()
defer func() {
fm.dropAll()
fm.ClearFlowgraphs()
}()
t.Run("Test addAndStart", func(t *testing.T) {
@ -81,13 +81,13 @@ func TestFlowGraphManager(t *testing.T) {
CollectionID: 1,
ChannelName: vchanName,
}
require.False(t, fm.exist(vchanName))
require.False(t, fm.HasFlowgraph(vchanName))
err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler())
err := fm.AddandStartWithEtcdTickler(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
assert.True(t, fm.HasFlowgraph(vchanName))
fm.dropAll()
fm.ClearFlowgraphs()
})
t.Run("Test Release", func(t *testing.T) {
@ -96,20 +96,20 @@ func TestFlowGraphManager(t *testing.T) {
CollectionID: 1,
ChannelName: vchanName,
}
require.False(t, fm.exist(vchanName))
require.False(t, fm.HasFlowgraph(vchanName))
err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler())
err := fm.AddandStartWithEtcdTickler(node, vchan, nil, genTestTickler())
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
assert.True(t, fm.HasFlowgraph(vchanName))
fm.release(vchanName)
fm.RemoveFlowgraph(vchanName)
assert.False(t, fm.exist(vchanName))
fm.dropAll()
assert.False(t, fm.HasFlowgraph(vchanName))
fm.ClearFlowgraphs()
})
t.Run("Test getFlowgraphService", func(t *testing.T) {
fg, ok := fm.getFlowgraphService("channel-not-exist")
fg, ok := fm.GetFlowgraphService("channel-not-exist")
assert.False(t, ok)
assert.Nil(t, fg)
})

View File

@ -56,11 +56,11 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro
Fgm: metricsinfo.FlowGraphMetric{
MinFlowGraphChannel: minFGChannel,
MinFlowGraphTt: minFGTt,
NumFlowGraph: node.flowgraphManager.getFlowGraphNum(),
NumFlowGraph: node.flowgraphManager.GetFlowgraphCount(),
},
Effect: metricsinfo.NodeEffect{
NodeID: node.GetSession().ServerID,
CollectionIDs: node.flowgraphManager.collections(),
CollectionIDs: node.flowgraphManager.GetCollectionIDs(),
},
}, nil
}

View File

@ -0,0 +1,500 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datanode
import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)
// MockFlowgraphManager is an autogenerated mock type for the FlowgraphManager type
type MockFlowgraphManager struct {
mock.Mock
}
type MockFlowgraphManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockFlowgraphManager) EXPECT() *MockFlowgraphManager_Expecter {
return &MockFlowgraphManager_Expecter{mock: &_m.Mock}
}
// AddFlowgraph provides a mock function with given fields: ds
func (_m *MockFlowgraphManager) AddFlowgraph(ds *dataSyncService) {
_m.Called(ds)
}
// MockFlowgraphManager_AddFlowgraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddFlowgraph'
type MockFlowgraphManager_AddFlowgraph_Call struct {
*mock.Call
}
// AddFlowgraph is a helper method to define mock.On call
// - ds *dataSyncService
func (_e *MockFlowgraphManager_Expecter) AddFlowgraph(ds interface{}) *MockFlowgraphManager_AddFlowgraph_Call {
return &MockFlowgraphManager_AddFlowgraph_Call{Call: _e.mock.On("AddFlowgraph", ds)}
}
func (_c *MockFlowgraphManager_AddFlowgraph_Call) Run(run func(ds *dataSyncService)) *MockFlowgraphManager_AddFlowgraph_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*dataSyncService))
})
return _c
}
func (_c *MockFlowgraphManager_AddFlowgraph_Call) Return() *MockFlowgraphManager_AddFlowgraph_Call {
_c.Call.Return()
return _c
}
func (_c *MockFlowgraphManager_AddFlowgraph_Call) RunAndReturn(run func(*dataSyncService)) *MockFlowgraphManager_AddFlowgraph_Call {
_c.Call.Return(run)
return _c
}
// AddandStartWithEtcdTickler provides a mock function with given fields: dn, vchan, schema, tickler
func (_m *MockFlowgraphManager) AddandStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error {
ret := _m.Called(dn, vchan, schema, tickler)
var r0 error
if rf, ok := ret.Get(0).(func(*DataNode, *datapb.VchannelInfo, *schemapb.CollectionSchema, *etcdTickler) error); ok {
r0 = rf(dn, vchan, schema, tickler)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockFlowgraphManager_AddandStartWithEtcdTickler_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddandStartWithEtcdTickler'
type MockFlowgraphManager_AddandStartWithEtcdTickler_Call struct {
*mock.Call
}
// AddandStartWithEtcdTickler is a helper method to define mock.On call
// - dn *DataNode
// - vchan *datapb.VchannelInfo
// - schema *schemapb.CollectionSchema
// - tickler *etcdTickler
func (_e *MockFlowgraphManager_Expecter) AddandStartWithEtcdTickler(dn interface{}, vchan interface{}, schema interface{}, tickler interface{}) *MockFlowgraphManager_AddandStartWithEtcdTickler_Call {
return &MockFlowgraphManager_AddandStartWithEtcdTickler_Call{Call: _e.mock.On("AddandStartWithEtcdTickler", dn, vchan, schema, tickler)}
}
func (_c *MockFlowgraphManager_AddandStartWithEtcdTickler_Call) Run(run func(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler)) *MockFlowgraphManager_AddandStartWithEtcdTickler_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*DataNode), args[1].(*datapb.VchannelInfo), args[2].(*schemapb.CollectionSchema), args[3].(*etcdTickler))
})
return _c
}
func (_c *MockFlowgraphManager_AddandStartWithEtcdTickler_Call) Return(_a0 error) *MockFlowgraphManager_AddandStartWithEtcdTickler_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlowgraphManager_AddandStartWithEtcdTickler_Call) RunAndReturn(run func(*DataNode, *datapb.VchannelInfo, *schemapb.CollectionSchema, *etcdTickler) error) *MockFlowgraphManager_AddandStartWithEtcdTickler_Call {
_c.Call.Return(run)
return _c
}
// ClearFlowgraphs provides a mock function with given fields:
func (_m *MockFlowgraphManager) ClearFlowgraphs() {
_m.Called()
}
// MockFlowgraphManager_ClearFlowgraphs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearFlowgraphs'
type MockFlowgraphManager_ClearFlowgraphs_Call struct {
*mock.Call
}
// ClearFlowgraphs is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) ClearFlowgraphs() *MockFlowgraphManager_ClearFlowgraphs_Call {
return &MockFlowgraphManager_ClearFlowgraphs_Call{Call: _e.mock.On("ClearFlowgraphs")}
}
func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) Run(run func()) *MockFlowgraphManager_ClearFlowgraphs_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) Return() *MockFlowgraphManager_ClearFlowgraphs_Call {
_c.Call.Return()
return _c
}
func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) RunAndReturn(run func()) *MockFlowgraphManager_ClearFlowgraphs_Call {
_c.Call.Return(run)
return _c
}
// GetCollectionIDs provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 {
ret := _m.Called()
var r0 []int64
if rf, ok := ret.Get(0).(func() []int64); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
return r0
}
// MockFlowgraphManager_GetCollectionIDs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionIDs'
type MockFlowgraphManager_GetCollectionIDs_Call struct {
*mock.Call
}
// GetCollectionIDs is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) GetCollectionIDs() *MockFlowgraphManager_GetCollectionIDs_Call {
return &MockFlowgraphManager_GetCollectionIDs_Call{Call: _e.mock.On("GetCollectionIDs")}
}
func (_c *MockFlowgraphManager_GetCollectionIDs_Call) Run(run func()) *MockFlowgraphManager_GetCollectionIDs_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_GetCollectionIDs_Call) Return(_a0 []int64) *MockFlowgraphManager_GetCollectionIDs_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() []int64) *MockFlowgraphManager_GetCollectionIDs_Call {
_c.Call.Return(run)
return _c
}
// GetFlowgraphCount provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetFlowgraphCount() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MockFlowgraphManager_GetFlowgraphCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFlowgraphCount'
type MockFlowgraphManager_GetFlowgraphCount_Call struct {
*mock.Call
}
// GetFlowgraphCount is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) GetFlowgraphCount() *MockFlowgraphManager_GetFlowgraphCount_Call {
return &MockFlowgraphManager_GetFlowgraphCount_Call{Call: _e.mock.On("GetFlowgraphCount")}
}
func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) Run(run func()) *MockFlowgraphManager_GetFlowgraphCount_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) Return(_a0 int) *MockFlowgraphManager_GetFlowgraphCount_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) RunAndReturn(run func() int) *MockFlowgraphManager_GetFlowgraphCount_Call {
_c.Call.Return(run)
return _c
}
// GetFlowgraphService provides a mock function with given fields: channel
func (_m *MockFlowgraphManager) GetFlowgraphService(channel string) (*dataSyncService, bool) {
ret := _m.Called(channel)
var r0 *dataSyncService
var r1 bool
if rf, ok := ret.Get(0).(func(string) (*dataSyncService, bool)); ok {
return rf(channel)
}
if rf, ok := ret.Get(0).(func(string) *dataSyncService); ok {
r0 = rf(channel)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*dataSyncService)
}
}
if rf, ok := ret.Get(1).(func(string) bool); ok {
r1 = rf(channel)
} else {
r1 = ret.Get(1).(bool)
}
return r0, r1
}
// MockFlowgraphManager_GetFlowgraphService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFlowgraphService'
type MockFlowgraphManager_GetFlowgraphService_Call struct {
*mock.Call
}
// GetFlowgraphService is a helper method to define mock.On call
// - channel string
func (_e *MockFlowgraphManager_Expecter) GetFlowgraphService(channel interface{}) *MockFlowgraphManager_GetFlowgraphService_Call {
return &MockFlowgraphManager_GetFlowgraphService_Call{Call: _e.mock.On("GetFlowgraphService", channel)}
}
func (_c *MockFlowgraphManager_GetFlowgraphService_Call) Run(run func(channel string)) *MockFlowgraphManager_GetFlowgraphService_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockFlowgraphManager_GetFlowgraphService_Call) Return(_a0 *dataSyncService, _a1 bool) *MockFlowgraphManager_GetFlowgraphService_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(string) (*dataSyncService, bool)) *MockFlowgraphManager_GetFlowgraphService_Call {
_c.Call.Return(run)
return _c
}
// HasFlowgraph provides a mock function with given fields: channel
func (_m *MockFlowgraphManager) HasFlowgraph(channel string) bool {
ret := _m.Called(channel)
var r0 bool
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(channel)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockFlowgraphManager_HasFlowgraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasFlowgraph'
type MockFlowgraphManager_HasFlowgraph_Call struct {
*mock.Call
}
// HasFlowgraph is a helper method to define mock.On call
// - channel string
func (_e *MockFlowgraphManager_Expecter) HasFlowgraph(channel interface{}) *MockFlowgraphManager_HasFlowgraph_Call {
return &MockFlowgraphManager_HasFlowgraph_Call{Call: _e.mock.On("HasFlowgraph", channel)}
}
func (_c *MockFlowgraphManager_HasFlowgraph_Call) Run(run func(channel string)) *MockFlowgraphManager_HasFlowgraph_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockFlowgraphManager_HasFlowgraph_Call) Return(_a0 bool) *MockFlowgraphManager_HasFlowgraph_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlowgraphManager_HasFlowgraph_Call) RunAndReturn(run func(string) bool) *MockFlowgraphManager_HasFlowgraph_Call {
_c.Call.Return(run)
return _c
}
// HasFlowgraphWithOpID provides a mock function with given fields: channel, opID
func (_m *MockFlowgraphManager) HasFlowgraphWithOpID(channel string, opID int64) bool {
ret := _m.Called(channel, opID)
var r0 bool
if rf, ok := ret.Get(0).(func(string, int64) bool); ok {
r0 = rf(channel, opID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockFlowgraphManager_HasFlowgraphWithOpID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasFlowgraphWithOpID'
type MockFlowgraphManager_HasFlowgraphWithOpID_Call struct {
*mock.Call
}
// HasFlowgraphWithOpID is a helper method to define mock.On call
// - channel string
// - opID int64
func (_e *MockFlowgraphManager_Expecter) HasFlowgraphWithOpID(channel interface{}, opID interface{}) *MockFlowgraphManager_HasFlowgraphWithOpID_Call {
return &MockFlowgraphManager_HasFlowgraphWithOpID_Call{Call: _e.mock.On("HasFlowgraphWithOpID", channel, opID)}
}
func (_c *MockFlowgraphManager_HasFlowgraphWithOpID_Call) Run(run func(channel string, opID int64)) *MockFlowgraphManager_HasFlowgraphWithOpID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(int64))
})
return _c
}
func (_c *MockFlowgraphManager_HasFlowgraphWithOpID_Call) Return(_a0 bool) *MockFlowgraphManager_HasFlowgraphWithOpID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlowgraphManager_HasFlowgraphWithOpID_Call) RunAndReturn(run func(string, int64) bool) *MockFlowgraphManager_HasFlowgraphWithOpID_Call {
_c.Call.Return(run)
return _c
}
// RemoveFlowgraph provides a mock function with given fields: channel
func (_m *MockFlowgraphManager) RemoveFlowgraph(channel string) {
_m.Called(channel)
}
// MockFlowgraphManager_RemoveFlowgraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveFlowgraph'
type MockFlowgraphManager_RemoveFlowgraph_Call struct {
*mock.Call
}
// RemoveFlowgraph is a helper method to define mock.On call
// - channel string
func (_e *MockFlowgraphManager_Expecter) RemoveFlowgraph(channel interface{}) *MockFlowgraphManager_RemoveFlowgraph_Call {
return &MockFlowgraphManager_RemoveFlowgraph_Call{Call: _e.mock.On("RemoveFlowgraph", channel)}
}
func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) Run(run func(channel string)) *MockFlowgraphManager_RemoveFlowgraph_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) Return() *MockFlowgraphManager_RemoveFlowgraph_Call {
_c.Call.Return()
return _c
}
func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) RunAndReturn(run func(string)) *MockFlowgraphManager_RemoveFlowgraph_Call {
_c.Call.Return(run)
return _c
}
// Start provides a mock function with given fields:
func (_m *MockFlowgraphManager) Start() {
_m.Called()
}
// MockFlowgraphManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
type MockFlowgraphManager_Start_Call struct {
*mock.Call
}
// Start is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) Start() *MockFlowgraphManager_Start_Call {
return &MockFlowgraphManager_Start_Call{Call: _e.mock.On("Start")}
}
func (_c *MockFlowgraphManager_Start_Call) Run(run func()) *MockFlowgraphManager_Start_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_Start_Call) Return() *MockFlowgraphManager_Start_Call {
_c.Call.Return()
return _c
}
func (_c *MockFlowgraphManager_Start_Call) RunAndReturn(run func()) *MockFlowgraphManager_Start_Call {
_c.Call.Return(run)
return _c
}
// Stop provides a mock function with given fields:
func (_m *MockFlowgraphManager) Stop() {
_m.Called()
}
// MockFlowgraphManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
type MockFlowgraphManager_Stop_Call struct {
*mock.Call
}
// Stop is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) Stop() *MockFlowgraphManager_Stop_Call {
return &MockFlowgraphManager_Stop_Call{Call: _e.mock.On("Stop")}
}
func (_c *MockFlowgraphManager_Stop_Call) Run(run func()) *MockFlowgraphManager_Stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_Stop_Call) Return() *MockFlowgraphManager_Stop_Call {
_c.Call.Return()
return _c
}
func (_c *MockFlowgraphManager_Stop_Call) RunAndReturn(run func()) *MockFlowgraphManager_Stop_Call {
_c.Call.Return(run)
return _c
}
// controlMemWaterLevel provides a mock function with given fields: totalMemory
func (_m *MockFlowgraphManager) controlMemWaterLevel(totalMemory uint64) {
_m.Called(totalMemory)
}
// MockFlowgraphManager_controlMemWaterLevel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'controlMemWaterLevel'
type MockFlowgraphManager_controlMemWaterLevel_Call struct {
*mock.Call
}
// controlMemWaterLevel is a helper method to define mock.On call
// - totalMemory uint64
func (_e *MockFlowgraphManager_Expecter) controlMemWaterLevel(totalMemory interface{}) *MockFlowgraphManager_controlMemWaterLevel_Call {
return &MockFlowgraphManager_controlMemWaterLevel_Call{Call: _e.mock.On("controlMemWaterLevel", totalMemory)}
}
func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) Run(run func(totalMemory uint64)) *MockFlowgraphManager_controlMemWaterLevel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(uint64))
})
return _c
}
func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) Return() *MockFlowgraphManager_controlMemWaterLevel_Call {
_c.Call.Return()
return _c
}
func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) RunAndReturn(run func(uint64)) *MockFlowgraphManager_controlMemWaterLevel_Call {
_c.Call.Return(run)
return _c
}
// NewMockFlowgraphManager creates a new instance of MockFlowgraphManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockFlowgraphManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockFlowgraphManager {
mock := &MockFlowgraphManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -240,7 +240,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
return merr.Status(err), nil
}
ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel())
ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannel())
if !ok {
log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channelName", req.GetChannel()))
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil
@ -325,7 +325,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
return merr.Status(merr.WrapErrParameterInvalid(">0", "0", "compacted from segments shouldn't be empty")), nil
}
ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannelName())
ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName())
if !ok {
node.compactionExecutor.clearTasksByChannel(req.GetChannelName())
err := merr.WrapErrChannelNotFound(req.GetChannelName())
@ -509,7 +509,7 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
// Retry in case the channel hasn't been watched yet.
err := retry.Do(ctx, func() error {
var ok bool
ds, ok = node.flowgraphManager.getFlowgraphService(req.GetChannelName())
ds, ok = node.flowgraphManager.GetFlowgraphService(req.GetChannelName())
if !ok {
return errors.New("channel not found")
}

View File

@ -89,7 +89,7 @@ func (s *DataNodeServicesSuite) SetupTest() {
err := s.node.Init()
s.Require().NoError(err)
alloc := &allocator.MockAllocator{}
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().Start().Return(nil).Maybe()
alloc.EXPECT().Close().Maybe()
alloc.EXPECT().GetIDAlloactor().Return(&allocator2.IDAllocator{}).Maybe()
@ -234,10 +234,10 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
FlushedSegmentIds: []int64{},
}
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, vchan, schema, genTestTickler())
err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, vchan, schema, genTestTickler())
s.Require().NoError(err)
fgservice, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName)
fgservice, ok := s.node.flowgraphManager.GetFlowgraphService(dmChannelName)
s.Require().True(ok)
metaCache := metacache.NewMockMetaCache(s.T())
@ -422,14 +422,14 @@ func (s *DataNodeServicesSuite) TestImport() {
}()
chName1 := "fake-by-dev-rootcoord-dml-testimport-1"
chName2 := "fake-by-dev-rootcoord-dml-testimport-2"
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName1,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
s.Require().Nil(err)
err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName2,
UnflushedSegmentIds: []int64{},
@ -437,9 +437,9 @@ func (s *DataNodeServicesSuite) TestImport() {
}, nil, genTestTickler())
s.Require().Nil(err)
_, ok := s.node.flowgraphManager.getFlowgraphService(chName1)
_, ok := s.node.flowgraphManager.GetFlowgraphService(chName1)
s.Require().True(ok)
_, ok = s.node.flowgraphManager.getFlowgraphService(chName2)
_, ok = s.node.flowgraphManager.GetFlowgraphService(chName2)
s.Require().True(ok)
req := &datapb.ImportTaskRequest{
@ -485,14 +485,14 @@ func (s *DataNodeServicesSuite) TestImport() {
}()
chName1 := "fake-by-dev-rootcoord-dml-testimport-1-badflowgraph"
chName2 := "fake-by-dev-rootcoord-dml-testimport-2-badflowgraph"
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName1,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
s.Require().Nil(err)
err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 999, // wrong collection ID.
ChannelName: chName2,
UnflushedSegmentIds: []int64{},
@ -500,9 +500,9 @@ func (s *DataNodeServicesSuite) TestImport() {
}, nil, genTestTickler())
s.Require().Nil(err)
_, ok := s.node.flowgraphManager.getFlowgraphService(chName1)
_, ok := s.node.flowgraphManager.GetFlowgraphService(chName1)
s.Require().True(ok)
_, ok = s.node.flowgraphManager.getFlowgraphService(chName2)
_, ok = s.node.flowgraphManager.GetFlowgraphService(chName2)
s.Require().True(ok)
s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil)
@ -612,14 +612,14 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
chName1 := "fake-by-dev-rootcoord-dml-testaddsegment-1"
chName2 := "fake-by-dev-rootcoord-dml-testaddsegment-2"
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName1,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, schema, genTestTickler())
s.Require().NoError(err)
err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName2,
UnflushedSegmentIds: []int64{},
@ -627,9 +627,9 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
}, schema, genTestTickler())
s.Require().NoError(err)
_, ok := s.node.flowgraphManager.getFlowgraphService(chName1)
_, ok := s.node.flowgraphManager.GetFlowgraphService(chName1)
s.Assert().True(ok)
_, ok = s.node.flowgraphManager.getFlowgraphService(chName2)
_, ok = s.node.flowgraphManager.GetFlowgraphService(chName2)
s.Assert().True(ok)
resp, err := s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{
@ -673,14 +673,14 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
},
}
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: chanName,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{100, 200, 300},
}, schema, genTestTickler())
s.Require().NoError(err)
fg, ok := s.node.flowgraphManager.getFlowgraphService(chanName)
fg, ok := s.node.flowgraphManager.GetFlowgraphService(chanName)
s.Assert().True(ok)
fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 100, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)