From 3c32ba240784db5e8cda145d53e140b9261eafb4 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Mon, 11 Dec 2023 17:52:37 +0800 Subject: [PATCH] enhance: pack datacoord Cluster and SessionManager with interface and mock them (#28869) relate: https://github.com/milvus-io/milvus/issues/28861 https://github.com/milvus-io/milvus/issues/28854 --------- Signed-off-by: aoiasd --- Makefile | 2 + internal/datacoord/cluster.go | 57 +- internal/datacoord/cluster_test.go | 64 +- internal/datacoord/compaction.go | 4 +- internal/datacoord/compaction_test.go | 26 +- internal/datacoord/mock_cluster.go | 463 ++++++++++++++ internal/datacoord/mock_session_manager.go | 662 +++++++++++++++++++++ internal/datacoord/server.go | 12 +- internal/datacoord/server_test.go | 16 +- internal/datacoord/services.go | 66 +- internal/datacoord/session_manager.go | 136 +++-- internal/datacoord/session_manager_test.go | 4 +- 12 files changed, 1341 insertions(+), 171 deletions(-) create mode 100644 internal/datacoord/mock_cluster.go create mode 100644 internal/datacoord/mock_session_manager.go diff --git a/Makefile b/Makefile index e8eca6876c..72099e78c8 100644 --- a/Makefile +++ b/Makefile @@ -428,6 +428,8 @@ generate-mockery-datacoord: getdeps $(INSTALL_PATH)/mockery --name=RWChannelStore --dir=internal/datacoord --filename=mock_channel_store.go --output=internal/datacoord --structname=MockRWChannelStore --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=IndexEngineVersionManager --dir=internal/datacoord --filename=mock_index_engine_version_manager.go --output=internal/datacoord --structname=MockVersionManager --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=TriggerManager --dir=internal/datacoord --filename=mock_trigger_manager.go --output=internal/datacoord --structname=MockTriggerManager --with-expecter --inpackage + $(INSTALL_PATH)/mockery --name=Cluster --dir=internal/datacoord --filename=mock_cluster.go --output=internal/datacoord --structname=MockCluster --with-expecter --inpackage + $(INSTALL_PATH)/mockery --name=SessionManager --dir=internal/datacoord --filename=mock_session_manager.go --output=internal/datacoord --structname=MockSessionManager --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=compactionPlanContext --dir=internal/datacoord --filename=mock_compaction_plan_context.go --output=internal/datacoord --structname=MockCompactionPlanContext --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=CompactionMeta --dir=internal/datacoord --filename=mock_compaction_meta.go --output=internal/datacoord --structname=MockCompactionMeta --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=Scheduler --dir=internal/datacoord --filename=mock_scheduler.go --output=internal/datacoord --structname=MockScheduler --with-expecter --inpackage diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 68469890d0..9a370a012f 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -27,18 +27,34 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) // Cluster provides interfaces to interact with datanode cluster -type Cluster struct { - sessionManager *SessionManager +type Cluster interface { + Startup(ctx context.Context, nodes []*NodeInfo) error + Register(node *NodeInfo) error + UnRegister(node *NodeInfo) error + Watch(ctx context.Context, ch string, collectionID UniqueID) error + Flush(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error + FlushChannels(ctx context.Context, nodeID int64, flushTs Timestamp, channels []string) error + Import(ctx context.Context, nodeID int64, it *datapb.ImportTaskRequest) + AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) + GetSessions() []*Session + Close() +} + +var _ Cluster = (*ClusterImpl)(nil) + +type ClusterImpl struct { + sessionManager SessionManager channelManager *ChannelManager } -// NewCluster creates a new cluster -func NewCluster(sessionManager *SessionManager, channelManager *ChannelManager) *Cluster { - c := &Cluster{ +// NewClusterImpl creates a new cluster +func NewClusterImpl(sessionManager SessionManager, channelManager *ChannelManager) *ClusterImpl { + c := &ClusterImpl{ sessionManager: sessionManager, channelManager: channelManager, } @@ -47,7 +63,7 @@ func NewCluster(sessionManager *SessionManager, channelManager *ChannelManager) } // Startup inits the cluster with the given data nodes. -func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error { +func (c *ClusterImpl) Startup(ctx context.Context, nodes []*NodeInfo) error { for _, node := range nodes { c.sessionManager.AddSession(node) } @@ -59,27 +75,25 @@ func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error { } // Register registers a new node in cluster -func (c *Cluster) Register(node *NodeInfo) error { +func (c *ClusterImpl) Register(node *NodeInfo) error { c.sessionManager.AddSession(node) return c.channelManager.AddNode(node.NodeID) } // UnRegister removes a node from cluster -func (c *Cluster) UnRegister(node *NodeInfo) error { +func (c *ClusterImpl) UnRegister(node *NodeInfo) error { c.sessionManager.DeleteSession(node) return c.channelManager.DeleteNode(node.NodeID) } // Watch tries to add a channel in datanode cluster -func (c *Cluster) Watch(ctx context.Context, ch string, collectionID UniqueID) error { +func (c *ClusterImpl) Watch(ctx context.Context, ch string, collectionID UniqueID) error { return c.channelManager.Watch(ctx, &channelMeta{Name: ch, CollectionID: collectionID}) } // Flush sends flush requests to dataNodes specified // which also according to channels where segments are assigned to. -func (c *Cluster) Flush(ctx context.Context, nodeID int64, channel string, - segments []*datapb.SegmentInfo, -) error { +func (c *ClusterImpl) Flush(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error { if !c.channelManager.Match(nodeID, channel) { log.Warn("node is not matched with channel", zap.String("channel", channel), @@ -109,7 +123,7 @@ func (c *Cluster) Flush(ctx context.Context, nodeID int64, channel string, return nil } -func (c *Cluster) FlushChannels(ctx context.Context, nodeID int64, flushTs Timestamp, channels []string) error { +func (c *ClusterImpl) FlushChannels(ctx context.Context, nodeID int64, flushTs Timestamp, channels []string) error { if len(channels) == 0 { return nil } @@ -133,17 +147,28 @@ func (c *Cluster) FlushChannels(ctx context.Context, nodeID int64, flushTs Times } // Import sends import requests to DataNodes whose ID==nodeID. -func (c *Cluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTaskRequest) { +func (c *ClusterImpl) Import(ctx context.Context, nodeID int64, it *datapb.ImportTaskRequest) { c.sessionManager.Import(ctx, nodeID, it) } +func (c *ClusterImpl) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) { + // Look for the DataNode that watches the channel. + ok, nodeID := c.channelManager.getNodeIDByChannelName(req.GetChannelName()) + if !ok { + err := merr.WrapErrChannelNotFound(req.GetChannelName(), "no DataNode watches this channel") + log.Error("no DataNode found for channel", zap.String("channelName", req.GetChannelName()), zap.Error(err)) + return nil, err + } + return c.sessionManager.AddImportSegment(ctx, nodeID, req) +} + // GetSessions returns all sessions -func (c *Cluster) GetSessions() []*Session { +func (c *ClusterImpl) GetSessions() []*Session { return c.sessionManager.GetSessions() } // Close releases resources opened in Cluster -func (c *Cluster) Close() { +func (c *ClusterImpl) Close() { c.sessionManager.Close() c.channelManager.Close() } diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index ecaa7f30a4..56042cd1d3 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -87,10 +87,10 @@ func (suite *ClusterSuite) TestCreate() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" info := &NodeInfo{ @@ -123,10 +123,10 @@ func (suite *ClusterSuite) TestCreate() { err = kv.Save(Params.CommonCfg.DataCoordWatchSubPath.GetValue()+"/1/channel1", string(info1Data)) suite.NoError(err) - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() err = cluster.Startup(ctx, []*NodeInfo{{NodeID: 1, Address: "localhost:9999"}}) @@ -156,10 +156,10 @@ func (suite *ClusterSuite) TestCreate() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) addr := "localhost:8080" info := &NodeInfo{ @@ -177,10 +177,10 @@ func (suite *ClusterSuite) TestCreate() { cluster.Close() - sessionManager2 := NewSessionManager() + sessionManager2 := NewSessionManagerImpl() channelManager2, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - clusterReload := NewCluster(sessionManager2, channelManager2) + clusterReload := NewClusterImpl(sessionManager2, channelManager2) defer clusterReload.Close() addr = "localhost:8081" @@ -219,10 +219,10 @@ func (suite *ClusterSuite) TestRegister() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" err = cluster.Startup(ctx, nil) @@ -246,7 +246,7 @@ func (suite *ClusterSuite) TestRegister() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) err = channelManager.Watch(context.TODO(), &channelMeta{ @@ -254,7 +254,7 @@ func (suite *ClusterSuite) TestRegister() { CollectionID: 0, }) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" err = cluster.Startup(ctx, nil) @@ -281,10 +281,10 @@ func (suite *ClusterSuite) TestRegister() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) addr := "localhost:8080" err = cluster.Startup(ctx, nil) suite.NoError(err) @@ -296,10 +296,10 @@ func (suite *ClusterSuite) TestRegister() { suite.NoError(err) cluster.Close() - sessionManager2 := NewSessionManager() + sessionManager2 := NewSessionManagerImpl() channelManager2, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - restartCluster := NewCluster(sessionManager2, channelManager2) + restartCluster := NewClusterImpl(sessionManager2, channelManager2) defer restartCluster.Close() channels := channelManager2.GetAssignedChannels() suite.Empty(channels) @@ -317,10 +317,10 @@ func (suite *ClusterSuite) TestUnregister() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" info := &NodeInfo{ @@ -344,10 +344,10 @@ func (suite *ClusterSuite) TestUnregister() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() nodeInfo1 := &NodeInfo{ @@ -384,10 +384,10 @@ func (suite *ClusterSuite) TestUnregister() { mockSessionCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { return newMockDataNodeClient(1, nil) } - sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator)) + sessionManager := NewSessionManagerImpl(withSessionCreator(mockSessionCreator)) channelManager, err := NewChannelManager(kv, newMockHandler()) suite.NoError(err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() nodeInfo := &NodeInfo{ @@ -431,10 +431,10 @@ func TestWatchIfNeeded(t *testing.T) { mockSessionCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { return newMockDataNodeClient(1, nil) } - sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator)) + sessionManager := NewSessionManagerImpl(withSessionCreator(mockSessionCreator)) channelManager, err := NewChannelManager(kv, newMockHandler()) assert.NoError(t, err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" @@ -457,10 +457,10 @@ func TestWatchIfNeeded(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.NoError(t, err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() err = cluster.Watch(ctx, "ch1", 1) @@ -481,12 +481,12 @@ func TestConsistentHashPolicy(t *testing.T) { kv.Close() }() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() chash := consistent.New() factory := NewConsistentHashChannelPolicyFactory(chash) channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(factory)) assert.NoError(t, err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() hash := consistent.New() @@ -565,10 +565,10 @@ func TestCluster_Flush(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.NoError(t, err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" info := &NodeInfo{ @@ -612,10 +612,10 @@ func TestCluster_Import(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) defer cancel() - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.NoError(t, err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) defer cluster.Close() addr := "localhost:8080" info := &NodeInfo{ diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index dac1ed4527..9f497343e5 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -112,15 +112,15 @@ type compactionPlanHandler struct { meta CompactionMeta allocator allocator chManager *ChannelManager - sessions *SessionManager scheduler Scheduler + sessions SessionManager stopCh chan struct{} stopOnce sync.Once stopWg sync.WaitGroup } -func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta CompactionMeta, allocator allocator, +func newCompactionPlanHandler(sessions SessionManager, cm *ChannelManager, meta CompactionMeta, allocator allocator, ) *compactionPlanHandler { return &compactionPlanHandler{ plans: make(map[int64]*compactionTask), diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 5a028d1164..1601cfadd6 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -83,7 +83,7 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { func (s *CompactionPlanHandlerSuite) TestCheckResult() { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil) - session := &SessionManager{ + session := &SessionManagerImpl{ sessions: struct { sync.RWMutex data map[int64]*Session @@ -241,7 +241,7 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { type fields struct { plans map[int64]*compactionTask - sessions *SessionManager + sessions SessionManager chManager *ChannelManager allocatorFactory func() allocator } @@ -260,7 +260,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { "test exec compaction", fields{ plans: map[int64]*compactionTask{}, - sessions: &SessionManager{ + sessions: &SessionManagerImpl{ sessions: struct { sync.RWMutex data map[int64]*Session @@ -348,7 +348,7 @@ func Test_compactionPlanHandler_execWithParallels(t *testing.T) { defer paramtable.Get().Reset(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key) c := &compactionPlanHandler{ plans: map[int64]*compactionTask{}, - sessions: &SessionManager{ + sessions: &SessionManagerImpl{ sessions: struct { sync.RWMutex data map[int64]*Session @@ -461,7 +461,7 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) { Type: datapb.CompactionType_MergeCompaction, } - sessions := &SessionManager{ + sessions := &SessionManagerImpl{ sessions: struct { sync.RWMutex data map[int64]*Session @@ -628,7 +628,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { Type: datapb.CompactionType_MergeCompaction, } - sessions := &SessionManager{ + sessions := &SessionManagerImpl{ sessions: struct { sync.RWMutex data map[int64]*Session @@ -722,7 +722,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { Type: datapb.CompactionType_MergeCompaction, } - sessions := &SessionManager{ + sessions := &SessionManagerImpl{ sessions: struct { sync.RWMutex data map[int64]*Session @@ -792,7 +792,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) { func Test_compactionPlanHandler_getCompaction(t *testing.T) { type fields struct { plans map[int64]*compactionTask - sessions *SessionManager + sessions SessionManager } type args struct { planID int64 @@ -837,7 +837,7 @@ func Test_compactionPlanHandler_getCompaction(t *testing.T) { func Test_compactionPlanHandler_updateCompaction(t *testing.T) { type fields struct { plans map[int64]*compactionTask - sessions *SessionManager + sessions SessionManager meta *meta } type args struct { @@ -923,7 +923,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { }, }, }, - sessions: &SessionManager{ + sessions: &SessionManagerImpl{ sessions: struct { sync.RWMutex data map[int64]*Session @@ -987,7 +987,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { func Test_newCompactionPlanHandler(t *testing.T) { type args struct { - sessions *SessionManager + sessions SessionManager cm *ChannelManager meta *meta allocator allocator @@ -1000,14 +1000,14 @@ func Test_newCompactionPlanHandler(t *testing.T) { { "test new handler", args{ - &SessionManager{}, + &SessionManagerImpl{}, &ChannelManager{}, &meta{}, newMockAllocator(), }, &compactionPlanHandler{ plans: map[int64]*compactionTask{}, - sessions: &SessionManager{}, + sessions: &SessionManagerImpl{}, chManager: &ChannelManager{}, meta: &meta{}, allocator: newMockAllocator(), diff --git a/internal/datacoord/mock_cluster.go b/internal/datacoord/mock_cluster.go new file mode 100644 index 0000000000..095c2a6a1b --- /dev/null +++ b/internal/datacoord/mock_cluster.go @@ -0,0 +1,463 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package datacoord + +import ( + context "context" + + datapb "github.com/milvus-io/milvus/internal/proto/datapb" + mock "github.com/stretchr/testify/mock" +) + +// MockCluster is an autogenerated mock type for the Cluster type +type MockCluster struct { + mock.Mock +} + +type MockCluster_Expecter struct { + mock *mock.Mock +} + +func (_m *MockCluster) EXPECT() *MockCluster_Expecter { + return &MockCluster_Expecter{mock: &_m.Mock} +} + +// AddImportSegment provides a mock function with given fields: ctx, req +func (_m *MockCluster) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) { + ret := _m.Called(ctx, req) + + var r0 *datapb.AddImportSegmentResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error)); ok { + return rf(ctx, req) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.AddImportSegmentRequest) *datapb.AddImportSegmentResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.AddImportSegmentResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.AddImportSegmentRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockCluster_AddImportSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddImportSegment' +type MockCluster_AddImportSegment_Call struct { + *mock.Call +} + +// AddImportSegment is a helper method to define mock.On call +// - ctx context.Context +// - req *datapb.AddImportSegmentRequest +func (_e *MockCluster_Expecter) AddImportSegment(ctx interface{}, req interface{}) *MockCluster_AddImportSegment_Call { + return &MockCluster_AddImportSegment_Call{Call: _e.mock.On("AddImportSegment", ctx, req)} +} + +func (_c *MockCluster_AddImportSegment_Call) Run(run func(ctx context.Context, req *datapb.AddImportSegmentRequest)) *MockCluster_AddImportSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datapb.AddImportSegmentRequest)) + }) + return _c +} + +func (_c *MockCluster_AddImportSegment_Call) Return(_a0 *datapb.AddImportSegmentResponse, _a1 error) *MockCluster_AddImportSegment_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockCluster_AddImportSegment_Call) RunAndReturn(run func(context.Context, *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error)) *MockCluster_AddImportSegment_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockCluster) Close() { + _m.Called() +} + +// MockCluster_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockCluster_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockCluster_Expecter) Close() *MockCluster_Close_Call { + return &MockCluster_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockCluster_Close_Call) Run(run func()) *MockCluster_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCluster_Close_Call) Return() *MockCluster_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockCluster_Close_Call) RunAndReturn(run func()) *MockCluster_Close_Call { + _c.Call.Return(run) + return _c +} + +// Flush provides a mock function with given fields: ctx, nodeID, channel, segments +func (_m *MockCluster) Flush(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error { + ret := _m.Called(ctx, nodeID, channel, segments) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, string, []*datapb.SegmentInfo) error); ok { + r0 = rf(ctx, nodeID, channel, segments) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCluster_Flush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Flush' +type MockCluster_Flush_Call struct { + *mock.Call +} + +// Flush is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - channel string +// - segments []*datapb.SegmentInfo +func (_e *MockCluster_Expecter) Flush(ctx interface{}, nodeID interface{}, channel interface{}, segments interface{}) *MockCluster_Flush_Call { + return &MockCluster_Flush_Call{Call: _e.mock.On("Flush", ctx, nodeID, channel, segments)} +} + +func (_c *MockCluster_Flush_Call) Run(run func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo)) *MockCluster_Flush_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(string), args[3].([]*datapb.SegmentInfo)) + }) + return _c +} + +func (_c *MockCluster_Flush_Call) Return(_a0 error) *MockCluster_Flush_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCluster_Flush_Call) RunAndReturn(run func(context.Context, int64, string, []*datapb.SegmentInfo) error) *MockCluster_Flush_Call { + _c.Call.Return(run) + return _c +} + +// FlushChannels provides a mock function with given fields: ctx, nodeID, flushTs, channels +func (_m *MockCluster) FlushChannels(ctx context.Context, nodeID int64, flushTs uint64, channels []string) error { + ret := _m.Called(ctx, nodeID, flushTs, channels) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, uint64, []string) error); ok { + r0 = rf(ctx, nodeID, flushTs, channels) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCluster_FlushChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushChannels' +type MockCluster_FlushChannels_Call struct { + *mock.Call +} + +// FlushChannels is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - flushTs uint64 +// - channels []string +func (_e *MockCluster_Expecter) FlushChannels(ctx interface{}, nodeID interface{}, flushTs interface{}, channels interface{}) *MockCluster_FlushChannels_Call { + return &MockCluster_FlushChannels_Call{Call: _e.mock.On("FlushChannels", ctx, nodeID, flushTs, channels)} +} + +func (_c *MockCluster_FlushChannels_Call) Run(run func(ctx context.Context, nodeID int64, flushTs uint64, channels []string)) *MockCluster_FlushChannels_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(uint64), args[3].([]string)) + }) + return _c +} + +func (_c *MockCluster_FlushChannels_Call) Return(_a0 error) *MockCluster_FlushChannels_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCluster_FlushChannels_Call) RunAndReturn(run func(context.Context, int64, uint64, []string) error) *MockCluster_FlushChannels_Call { + _c.Call.Return(run) + return _c +} + +// GetSessions provides a mock function with given fields: +func (_m *MockCluster) GetSessions() []*Session { + ret := _m.Called() + + var r0 []*Session + if rf, ok := ret.Get(0).(func() []*Session); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*Session) + } + } + + return r0 +} + +// MockCluster_GetSessions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSessions' +type MockCluster_GetSessions_Call struct { + *mock.Call +} + +// GetSessions is a helper method to define mock.On call +func (_e *MockCluster_Expecter) GetSessions() *MockCluster_GetSessions_Call { + return &MockCluster_GetSessions_Call{Call: _e.mock.On("GetSessions")} +} + +func (_c *MockCluster_GetSessions_Call) Run(run func()) *MockCluster_GetSessions_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCluster_GetSessions_Call) Return(_a0 []*Session) *MockCluster_GetSessions_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCluster_GetSessions_Call) RunAndReturn(run func() []*Session) *MockCluster_GetSessions_Call { + _c.Call.Return(run) + return _c +} + +// Import provides a mock function with given fields: ctx, nodeID, it +func (_m *MockCluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTaskRequest) { + _m.Called(ctx, nodeID, it) +} + +// MockCluster_Import_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Import' +type MockCluster_Import_Call struct { + *mock.Call +} + +// Import is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - it *datapb.ImportTaskRequest +func (_e *MockCluster_Expecter) Import(ctx interface{}, nodeID interface{}, it interface{}) *MockCluster_Import_Call { + return &MockCluster_Import_Call{Call: _e.mock.On("Import", ctx, nodeID, it)} +} + +func (_c *MockCluster_Import_Call) Run(run func(ctx context.Context, nodeID int64, it *datapb.ImportTaskRequest)) *MockCluster_Import_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(*datapb.ImportTaskRequest)) + }) + return _c +} + +func (_c *MockCluster_Import_Call) Return() *MockCluster_Import_Call { + _c.Call.Return() + return _c +} + +func (_c *MockCluster_Import_Call) RunAndReturn(run func(context.Context, int64, *datapb.ImportTaskRequest)) *MockCluster_Import_Call { + _c.Call.Return(run) + return _c +} + +// Register provides a mock function with given fields: node +func (_m *MockCluster) Register(node *NodeInfo) error { + ret := _m.Called(node) + + var r0 error + if rf, ok := ret.Get(0).(func(*NodeInfo) error); ok { + r0 = rf(node) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCluster_Register_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Register' +type MockCluster_Register_Call struct { + *mock.Call +} + +// Register is a helper method to define mock.On call +// - node *NodeInfo +func (_e *MockCluster_Expecter) Register(node interface{}) *MockCluster_Register_Call { + return &MockCluster_Register_Call{Call: _e.mock.On("Register", node)} +} + +func (_c *MockCluster_Register_Call) Run(run func(node *NodeInfo)) *MockCluster_Register_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*NodeInfo)) + }) + return _c +} + +func (_c *MockCluster_Register_Call) Return(_a0 error) *MockCluster_Register_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCluster_Register_Call) RunAndReturn(run func(*NodeInfo) error) *MockCluster_Register_Call { + _c.Call.Return(run) + return _c +} + +// Startup provides a mock function with given fields: ctx, nodes +func (_m *MockCluster) Startup(ctx context.Context, nodes []*NodeInfo) error { + ret := _m.Called(ctx, nodes) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []*NodeInfo) error); ok { + r0 = rf(ctx, nodes) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCluster_Startup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Startup' +type MockCluster_Startup_Call struct { + *mock.Call +} + +// Startup is a helper method to define mock.On call +// - ctx context.Context +// - nodes []*NodeInfo +func (_e *MockCluster_Expecter) Startup(ctx interface{}, nodes interface{}) *MockCluster_Startup_Call { + return &MockCluster_Startup_Call{Call: _e.mock.On("Startup", ctx, nodes)} +} + +func (_c *MockCluster_Startup_Call) Run(run func(ctx context.Context, nodes []*NodeInfo)) *MockCluster_Startup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]*NodeInfo)) + }) + return _c +} + +func (_c *MockCluster_Startup_Call) Return(_a0 error) *MockCluster_Startup_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCluster_Startup_Call) RunAndReturn(run func(context.Context, []*NodeInfo) error) *MockCluster_Startup_Call { + _c.Call.Return(run) + return _c +} + +// UnRegister provides a mock function with given fields: node +func (_m *MockCluster) UnRegister(node *NodeInfo) error { + ret := _m.Called(node) + + var r0 error + if rf, ok := ret.Get(0).(func(*NodeInfo) error); ok { + r0 = rf(node) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCluster_UnRegister_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnRegister' +type MockCluster_UnRegister_Call struct { + *mock.Call +} + +// UnRegister is a helper method to define mock.On call +// - node *NodeInfo +func (_e *MockCluster_Expecter) UnRegister(node interface{}) *MockCluster_UnRegister_Call { + return &MockCluster_UnRegister_Call{Call: _e.mock.On("UnRegister", node)} +} + +func (_c *MockCluster_UnRegister_Call) Run(run func(node *NodeInfo)) *MockCluster_UnRegister_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*NodeInfo)) + }) + return _c +} + +func (_c *MockCluster_UnRegister_Call) Return(_a0 error) *MockCluster_UnRegister_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCluster_UnRegister_Call) RunAndReturn(run func(*NodeInfo) error) *MockCluster_UnRegister_Call { + _c.Call.Return(run) + return _c +} + +// Watch provides a mock function with given fields: ctx, ch, collectionID +func (_m *MockCluster) Watch(ctx context.Context, ch string, collectionID int64) error { + ret := _m.Called(ctx, ch, collectionID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64) error); ok { + r0 = rf(ctx, ch, collectionID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCluster_Watch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Watch' +type MockCluster_Watch_Call struct { + *mock.Call +} + +// Watch is a helper method to define mock.On call +// - ctx context.Context +// - ch string +// - collectionID int64 +func (_e *MockCluster_Expecter) Watch(ctx interface{}, ch interface{}, collectionID interface{}) *MockCluster_Watch_Call { + return &MockCluster_Watch_Call{Call: _e.mock.On("Watch", ctx, ch, collectionID)} +} + +func (_c *MockCluster_Watch_Call) Run(run func(ctx context.Context, ch string, collectionID int64)) *MockCluster_Watch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(int64)) + }) + return _c +} + +func (_c *MockCluster_Watch_Call) Return(_a0 error) *MockCluster_Watch_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCluster_Watch_Call) RunAndReturn(run func(context.Context, string, int64) error) *MockCluster_Watch_Call { + _c.Call.Return(run) + return _c +} + +// NewMockCluster creates a new instance of MockCluster. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockCluster(t interface { + mock.TestingT + Cleanup(func()) +}) *MockCluster { + mock := &MockCluster{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/mock_session_manager.go b/internal/datacoord/mock_session_manager.go new file mode 100644 index 0000000000..35355b76ea --- /dev/null +++ b/internal/datacoord/mock_session_manager.go @@ -0,0 +1,662 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package datacoord + +import ( + context "context" + + datapb "github.com/milvus-io/milvus/internal/proto/datapb" + mock "github.com/stretchr/testify/mock" +) + +// MockSessionManager is an autogenerated mock type for the SessionManager type +type MockSessionManager struct { + mock.Mock +} + +type MockSessionManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockSessionManager) EXPECT() *MockSessionManager_Expecter { + return &MockSessionManager_Expecter{mock: &_m.Mock} +} + +// AddImportSegment provides a mock function with given fields: ctx, nodeID, req +func (_m *MockSessionManager) AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) { + ret := _m.Called(ctx, nodeID, req) + + var r0 *datapb.AddImportSegmentResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error)); ok { + return rf(ctx, nodeID, req) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, *datapb.AddImportSegmentRequest) *datapb.AddImportSegmentResponse); ok { + r0 = rf(ctx, nodeID, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.AddImportSegmentResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, *datapb.AddImportSegmentRequest) error); ok { + r1 = rf(ctx, nodeID, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockSessionManager_AddImportSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddImportSegment' +type MockSessionManager_AddImportSegment_Call struct { + *mock.Call +} + +// AddImportSegment is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - req *datapb.AddImportSegmentRequest +func (_e *MockSessionManager_Expecter) AddImportSegment(ctx interface{}, nodeID interface{}, req interface{}) *MockSessionManager_AddImportSegment_Call { + return &MockSessionManager_AddImportSegment_Call{Call: _e.mock.On("AddImportSegment", ctx, nodeID, req)} +} + +func (_c *MockSessionManager_AddImportSegment_Call) Run(run func(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest)) *MockSessionManager_AddImportSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(*datapb.AddImportSegmentRequest)) + }) + return _c +} + +func (_c *MockSessionManager_AddImportSegment_Call) Return(_a0 *datapb.AddImportSegmentResponse, _a1 error) *MockSessionManager_AddImportSegment_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockSessionManager_AddImportSegment_Call) RunAndReturn(run func(context.Context, int64, *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error)) *MockSessionManager_AddImportSegment_Call { + _c.Call.Return(run) + return _c +} + +// AddSession provides a mock function with given fields: node +func (_m *MockSessionManager) AddSession(node *NodeInfo) { + _m.Called(node) +} + +// MockSessionManager_AddSession_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddSession' +type MockSessionManager_AddSession_Call struct { + *mock.Call +} + +// AddSession is a helper method to define mock.On call +// - node *NodeInfo +func (_e *MockSessionManager_Expecter) AddSession(node interface{}) *MockSessionManager_AddSession_Call { + return &MockSessionManager_AddSession_Call{Call: _e.mock.On("AddSession", node)} +} + +func (_c *MockSessionManager_AddSession_Call) Run(run func(node *NodeInfo)) *MockSessionManager_AddSession_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*NodeInfo)) + }) + return _c +} + +func (_c *MockSessionManager_AddSession_Call) Return() *MockSessionManager_AddSession_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSessionManager_AddSession_Call) RunAndReturn(run func(*NodeInfo)) *MockSessionManager_AddSession_Call { + _c.Call.Return(run) + return _c +} + +// CheckChannelOperationProgress provides a mock function with given fields: ctx, nodeID, info +func (_m *MockSessionManager) CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) { + ret := _m.Called(ctx, nodeID, info) + + var r0 *datapb.ChannelOperationProgressResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error)); ok { + return rf(ctx, nodeID, info) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, *datapb.ChannelWatchInfo) *datapb.ChannelOperationProgressResponse); ok { + r0 = rf(ctx, nodeID, info) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.ChannelOperationProgressResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, *datapb.ChannelWatchInfo) error); ok { + r1 = rf(ctx, nodeID, info) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockSessionManager_CheckChannelOperationProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckChannelOperationProgress' +type MockSessionManager_CheckChannelOperationProgress_Call struct { + *mock.Call +} + +// CheckChannelOperationProgress is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - info *datapb.ChannelWatchInfo +func (_e *MockSessionManager_Expecter) CheckChannelOperationProgress(ctx interface{}, nodeID interface{}, info interface{}) *MockSessionManager_CheckChannelOperationProgress_Call { + return &MockSessionManager_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", ctx, nodeID, info)} +} + +func (_c *MockSessionManager_CheckChannelOperationProgress_Call) Run(run func(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo)) *MockSessionManager_CheckChannelOperationProgress_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(*datapb.ChannelWatchInfo)) + }) + return _c +} + +func (_c *MockSessionManager_CheckChannelOperationProgress_Call) Return(_a0 *datapb.ChannelOperationProgressResponse, _a1 error) *MockSessionManager_CheckChannelOperationProgress_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockSessionManager_CheckChannelOperationProgress_Call) RunAndReturn(run func(context.Context, int64, *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error)) *MockSessionManager_CheckChannelOperationProgress_Call { + _c.Call.Return(run) + return _c +} + +// CheckHealth provides a mock function with given fields: ctx +func (_m *MockSessionManager) CheckHealth(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSessionManager_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockSessionManager_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockSessionManager_Expecter) CheckHealth(ctx interface{}) *MockSessionManager_CheckHealth_Call { + return &MockSessionManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)} +} + +func (_c *MockSessionManager_CheckHealth_Call) Run(run func(ctx context.Context)) *MockSessionManager_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockSessionManager_CheckHealth_Call) Return(_a0 error) *MockSessionManager_CheckHealth_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_CheckHealth_Call) RunAndReturn(run func(context.Context) error) *MockSessionManager_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockSessionManager) Close() { + _m.Called() +} + +// MockSessionManager_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockSessionManager_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockSessionManager_Expecter) Close() *MockSessionManager_Close_Call { + return &MockSessionManager_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockSessionManager_Close_Call) Run(run func()) *MockSessionManager_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSessionManager_Close_Call) Return() *MockSessionManager_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSessionManager_Close_Call) RunAndReturn(run func()) *MockSessionManager_Close_Call { + _c.Call.Return(run) + return _c +} + +// Compaction provides a mock function with given fields: nodeID, plan +func (_m *MockSessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) error { + ret := _m.Called(nodeID, plan) + + var r0 error + if rf, ok := ret.Get(0).(func(int64, *datapb.CompactionPlan) error); ok { + r0 = rf(nodeID, plan) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSessionManager_Compaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Compaction' +type MockSessionManager_Compaction_Call struct { + *mock.Call +} + +// Compaction is a helper method to define mock.On call +// - nodeID int64 +// - plan *datapb.CompactionPlan +func (_e *MockSessionManager_Expecter) Compaction(nodeID interface{}, plan interface{}) *MockSessionManager_Compaction_Call { + return &MockSessionManager_Compaction_Call{Call: _e.mock.On("Compaction", nodeID, plan)} +} + +func (_c *MockSessionManager_Compaction_Call) Run(run func(nodeID int64, plan *datapb.CompactionPlan)) *MockSessionManager_Compaction_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(*datapb.CompactionPlan)) + }) + return _c +} + +func (_c *MockSessionManager_Compaction_Call) Return(_a0 error) *MockSessionManager_Compaction_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_Compaction_Call) RunAndReturn(run func(int64, *datapb.CompactionPlan) error) *MockSessionManager_Compaction_Call { + _c.Call.Return(run) + return _c +} + +// DeleteSession provides a mock function with given fields: node +func (_m *MockSessionManager) DeleteSession(node *NodeInfo) { + _m.Called(node) +} + +// MockSessionManager_DeleteSession_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteSession' +type MockSessionManager_DeleteSession_Call struct { + *mock.Call +} + +// DeleteSession is a helper method to define mock.On call +// - node *NodeInfo +func (_e *MockSessionManager_Expecter) DeleteSession(node interface{}) *MockSessionManager_DeleteSession_Call { + return &MockSessionManager_DeleteSession_Call{Call: _e.mock.On("DeleteSession", node)} +} + +func (_c *MockSessionManager_DeleteSession_Call) Run(run func(node *NodeInfo)) *MockSessionManager_DeleteSession_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*NodeInfo)) + }) + return _c +} + +func (_c *MockSessionManager_DeleteSession_Call) Return() *MockSessionManager_DeleteSession_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSessionManager_DeleteSession_Call) RunAndReturn(run func(*NodeInfo)) *MockSessionManager_DeleteSession_Call { + _c.Call.Return(run) + return _c +} + +// Flush provides a mock function with given fields: ctx, nodeID, req +func (_m *MockSessionManager) Flush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) { + _m.Called(ctx, nodeID, req) +} + +// MockSessionManager_Flush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Flush' +type MockSessionManager_Flush_Call struct { + *mock.Call +} + +// Flush is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - req *datapb.FlushSegmentsRequest +func (_e *MockSessionManager_Expecter) Flush(ctx interface{}, nodeID interface{}, req interface{}) *MockSessionManager_Flush_Call { + return &MockSessionManager_Flush_Call{Call: _e.mock.On("Flush", ctx, nodeID, req)} +} + +func (_c *MockSessionManager_Flush_Call) Run(run func(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest)) *MockSessionManager_Flush_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(*datapb.FlushSegmentsRequest)) + }) + return _c +} + +func (_c *MockSessionManager_Flush_Call) Return() *MockSessionManager_Flush_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSessionManager_Flush_Call) RunAndReturn(run func(context.Context, int64, *datapb.FlushSegmentsRequest)) *MockSessionManager_Flush_Call { + _c.Call.Return(run) + return _c +} + +// FlushChannels provides a mock function with given fields: ctx, nodeID, req +func (_m *MockSessionManager) FlushChannels(ctx context.Context, nodeID int64, req *datapb.FlushChannelsRequest) error { + ret := _m.Called(ctx, nodeID, req) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, *datapb.FlushChannelsRequest) error); ok { + r0 = rf(ctx, nodeID, req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSessionManager_FlushChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushChannels' +type MockSessionManager_FlushChannels_Call struct { + *mock.Call +} + +// FlushChannels is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - req *datapb.FlushChannelsRequest +func (_e *MockSessionManager_Expecter) FlushChannels(ctx interface{}, nodeID interface{}, req interface{}) *MockSessionManager_FlushChannels_Call { + return &MockSessionManager_FlushChannels_Call{Call: _e.mock.On("FlushChannels", ctx, nodeID, req)} +} + +func (_c *MockSessionManager_FlushChannels_Call) Run(run func(ctx context.Context, nodeID int64, req *datapb.FlushChannelsRequest)) *MockSessionManager_FlushChannels_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(*datapb.FlushChannelsRequest)) + }) + return _c +} + +func (_c *MockSessionManager_FlushChannels_Call) Return(_a0 error) *MockSessionManager_FlushChannels_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_FlushChannels_Call) RunAndReturn(run func(context.Context, int64, *datapb.FlushChannelsRequest) error) *MockSessionManager_FlushChannels_Call { + _c.Call.Return(run) + return _c +} + +// GetCompactionPlansResults provides a mock function with given fields: +func (_m *MockSessionManager) GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult { + ret := _m.Called() + + var r0 map[int64]*datapb.CompactionPlanResult + if rf, ok := ret.Get(0).(func() map[int64]*datapb.CompactionPlanResult); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]*datapb.CompactionPlanResult) + } + } + + return r0 +} + +// MockSessionManager_GetCompactionPlansResults_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCompactionPlansResults' +type MockSessionManager_GetCompactionPlansResults_Call struct { + *mock.Call +} + +// GetCompactionPlansResults is a helper method to define mock.On call +func (_e *MockSessionManager_Expecter) GetCompactionPlansResults() *MockSessionManager_GetCompactionPlansResults_Call { + return &MockSessionManager_GetCompactionPlansResults_Call{Call: _e.mock.On("GetCompactionPlansResults")} +} + +func (_c *MockSessionManager_GetCompactionPlansResults_Call) Run(run func()) *MockSessionManager_GetCompactionPlansResults_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSessionManager_GetCompactionPlansResults_Call) Return(_a0 map[int64]*datapb.CompactionPlanResult) *MockSessionManager_GetCompactionPlansResults_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_GetCompactionPlansResults_Call) RunAndReturn(run func() map[int64]*datapb.CompactionPlanResult) *MockSessionManager_GetCompactionPlansResults_Call { + _c.Call.Return(run) + return _c +} + +// GetSessionIDs provides a mock function with given fields: +func (_m *MockSessionManager) GetSessionIDs() []int64 { + ret := _m.Called() + + var r0 []int64 + if rf, ok := ret.Get(0).(func() []int64); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + return r0 +} + +// MockSessionManager_GetSessionIDs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSessionIDs' +type MockSessionManager_GetSessionIDs_Call struct { + *mock.Call +} + +// GetSessionIDs is a helper method to define mock.On call +func (_e *MockSessionManager_Expecter) GetSessionIDs() *MockSessionManager_GetSessionIDs_Call { + return &MockSessionManager_GetSessionIDs_Call{Call: _e.mock.On("GetSessionIDs")} +} + +func (_c *MockSessionManager_GetSessionIDs_Call) Run(run func()) *MockSessionManager_GetSessionIDs_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSessionManager_GetSessionIDs_Call) Return(_a0 []int64) *MockSessionManager_GetSessionIDs_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_GetSessionIDs_Call) RunAndReturn(run func() []int64) *MockSessionManager_GetSessionIDs_Call { + _c.Call.Return(run) + return _c +} + +// GetSessions provides a mock function with given fields: +func (_m *MockSessionManager) GetSessions() []*Session { + ret := _m.Called() + + var r0 []*Session + if rf, ok := ret.Get(0).(func() []*Session); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*Session) + } + } + + return r0 +} + +// MockSessionManager_GetSessions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSessions' +type MockSessionManager_GetSessions_Call struct { + *mock.Call +} + +// GetSessions is a helper method to define mock.On call +func (_e *MockSessionManager_Expecter) GetSessions() *MockSessionManager_GetSessions_Call { + return &MockSessionManager_GetSessions_Call{Call: _e.mock.On("GetSessions")} +} + +func (_c *MockSessionManager_GetSessions_Call) Run(run func()) *MockSessionManager_GetSessions_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSessionManager_GetSessions_Call) Return(_a0 []*Session) *MockSessionManager_GetSessions_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_GetSessions_Call) RunAndReturn(run func() []*Session) *MockSessionManager_GetSessions_Call { + _c.Call.Return(run) + return _c +} + +// Import provides a mock function with given fields: ctx, nodeID, itr +func (_m *MockSessionManager) Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) { + _m.Called(ctx, nodeID, itr) +} + +// MockSessionManager_Import_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Import' +type MockSessionManager_Import_Call struct { + *mock.Call +} + +// Import is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - itr *datapb.ImportTaskRequest +func (_e *MockSessionManager_Expecter) Import(ctx interface{}, nodeID interface{}, itr interface{}) *MockSessionManager_Import_Call { + return &MockSessionManager_Import_Call{Call: _e.mock.On("Import", ctx, nodeID, itr)} +} + +func (_c *MockSessionManager_Import_Call) Run(run func(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest)) *MockSessionManager_Import_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(*datapb.ImportTaskRequest)) + }) + return _c +} + +func (_c *MockSessionManager_Import_Call) Return() *MockSessionManager_Import_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSessionManager_Import_Call) RunAndReturn(run func(context.Context, int64, *datapb.ImportTaskRequest)) *MockSessionManager_Import_Call { + _c.Call.Return(run) + return _c +} + +// NotifyChannelOperation provides a mock function with given fields: ctx, nodeID, req +func (_m *MockSessionManager) NotifyChannelOperation(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error { + ret := _m.Called(ctx, nodeID, req) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, *datapb.ChannelOperationsRequest) error); ok { + r0 = rf(ctx, nodeID, req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSessionManager_NotifyChannelOperation_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyChannelOperation' +type MockSessionManager_NotifyChannelOperation_Call struct { + *mock.Call +} + +// NotifyChannelOperation is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +// - req *datapb.ChannelOperationsRequest +func (_e *MockSessionManager_Expecter) NotifyChannelOperation(ctx interface{}, nodeID interface{}, req interface{}) *MockSessionManager_NotifyChannelOperation_Call { + return &MockSessionManager_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", ctx, nodeID, req)} +} + +func (_c *MockSessionManager_NotifyChannelOperation_Call) Run(run func(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest)) *MockSessionManager_NotifyChannelOperation_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(*datapb.ChannelOperationsRequest)) + }) + return _c +} + +func (_c *MockSessionManager_NotifyChannelOperation_Call) Return(_a0 error) *MockSessionManager_NotifyChannelOperation_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_NotifyChannelOperation_Call) RunAndReturn(run func(context.Context, int64, *datapb.ChannelOperationsRequest) error) *MockSessionManager_NotifyChannelOperation_Call { + _c.Call.Return(run) + return _c +} + +// SyncSegments provides a mock function with given fields: nodeID, req +func (_m *MockSessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error { + ret := _m.Called(nodeID, req) + + var r0 error + if rf, ok := ret.Get(0).(func(int64, *datapb.SyncSegmentsRequest) error); ok { + r0 = rf(nodeID, req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSessionManager_SyncSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncSegments' +type MockSessionManager_SyncSegments_Call struct { + *mock.Call +} + +// SyncSegments is a helper method to define mock.On call +// - nodeID int64 +// - req *datapb.SyncSegmentsRequest +func (_e *MockSessionManager_Expecter) SyncSegments(nodeID interface{}, req interface{}) *MockSessionManager_SyncSegments_Call { + return &MockSessionManager_SyncSegments_Call{Call: _e.mock.On("SyncSegments", nodeID, req)} +} + +func (_c *MockSessionManager_SyncSegments_Call) Run(run func(nodeID int64, req *datapb.SyncSegmentsRequest)) *MockSessionManager_SyncSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(*datapb.SyncSegmentsRequest)) + }) + return _c +} + +func (_c *MockSessionManager_SyncSegments_Call) Return(_a0 error) *MockSessionManager_SyncSegments_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSessionManager_SyncSegments_Call) RunAndReturn(run func(int64, *datapb.SyncSegmentsRequest) error) *MockSessionManager_SyncSegments_Call { + _c.Call.Return(run) + return _c +} + +// NewMockSessionManager creates a new instance of MockSessionManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockSessionManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockSessionManager { + mock := &MockSessionManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 7522d30bd0..1212275545 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -114,8 +114,8 @@ type Server struct { meta *meta segmentManager Manager allocator allocator - cluster *Cluster - sessionManager *SessionManager + cluster Cluster + sessionManager SessionManager channelManager *ChannelManager rootCoordClient types.RootCoordClient garbageCollector *garbageCollector @@ -186,7 +186,7 @@ func WithServerHelper(helper ServerHelper) Option { } // WithCluster returns an `Option` setting Cluster with provided parameter -func WithCluster(cluster *Cluster) Option { +func WithCluster(cluster Cluster) Option { return func(svr *Server) { svr.cluster = cluster } @@ -425,8 +425,8 @@ func (s *Server) initCluster() error { if err != nil { return err } - s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator)) - s.cluster = NewCluster(s.sessionManager, s.channelManager) + s.sessionManager = NewSessionManagerImpl(withSessionCreator(s.dataNodeCreator)) + s.cluster = NewClusterImpl(s.sessionManager, s.channelManager) return nil } @@ -704,7 +704,7 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical)) } // ignore report from a different node - if !s.cluster.channelManager.Match(ttMsg.GetBase().GetSourceID(), ch) { + if !s.channelManager.Match(ttMsg.GetBase().GetSourceID(), ch) { log.Warn("node is not matched with channel", zap.String("channel", ch), zap.Int64("nodeID", ttMsg.GetBase().GetSourceID())) return nil } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 9222f7cd31..c076c5c351 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -337,7 +337,7 @@ func TestFlush(t *testing.T) { svr := newTestServer(t, nil) closeTestServer(t, svr) svr.stateCode.Store(commonpb.StateCode_Healthy) - sm := NewSessionManager() + sm := NewSessionManagerImpl() datanodeClient := mocks.NewMockDataNodeClient(t) datanodeClient.EXPECT().FlushChannels(mock.Anything, mock.Anything).Return(nil, @@ -354,7 +354,7 @@ func TestFlush(t *testing.T) { }}} svr.sessionManager = sm - svr.cluster.sessionManager = sm + svr.cluster = NewClusterImpl(sm, svr.channelManager) err := svr.channelManager.AddNode(1) assert.NoError(t, err) @@ -3235,11 +3235,11 @@ func TestOptions(t *testing.T) { t.Run("WithCluster", func(t *testing.T) { defer kv.RemoveWithPrefix("") - sessionManager := NewSessionManager() + sessionManager := NewSessionManagerImpl() channelManager, err := NewChannelManager(kv, newMockHandler()) assert.NoError(t, err) - cluster := NewCluster(sessionManager, channelManager) + cluster := NewClusterImpl(sessionManager, channelManager) assert.NoError(t, err) opt := WithCluster(cluster) assert.NotNil(t, opt) @@ -3292,8 +3292,8 @@ func TestHandleSessionEvent(t *testing.T) { channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(&mockPolicyFactory{})) assert.NoError(t, err) - sessionManager := NewSessionManager() - cluster := NewCluster(sessionManager, channelManager) + sessionManager := NewSessionManagerImpl() + cluster := NewClusterImpl(sessionManager, channelManager) assert.NoError(t, err) err = cluster.Startup(ctx, nil) @@ -4374,7 +4374,7 @@ func Test_CheckHealth(t *testing.T) { id: 1, state: commonpb.StateCode_Healthy, } - sm := NewSessionManager() + sm := NewSessionManagerImpl() sm.sessions = struct { sync.RWMutex data map[int64]*Session @@ -4400,7 +4400,7 @@ func Test_CheckHealth(t *testing.T) { id: 1, state: commonpb.StateCode_Abnormal, } - sm := NewSessionManager() + sm := NewSessionManagerImpl() sm.sessions = struct { sync.RWMutex data map[int64]*Session diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index e87c36c0cd..b0518551bb 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -21,13 +21,11 @@ import ( "fmt" "math/rand" "strconv" - "sync" "github.com/cockroachdb/errors" "github.com/samber/lo" "go.opentelemetry.io/otel" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -1348,7 +1346,7 @@ func (s *Server) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*da }, nil } - nodes := s.sessionManager.getLiveNodeIDs() + nodes := s.sessionManager.GetSessionIDs() if len(nodes) == 0 { log.Warn("import failed as all DataNodes are offline") resp.Status = merr.Status(merr.WrapErrNodeLackAny("no live DataNode")) @@ -1431,7 +1429,7 @@ func (s *Server) handleRPCTimetickMessage(ctx context.Context, ttMsg *msgpb.Data ts := ttMsg.GetTimestamp() // ignore to handle RPC Timetick message since it's no longer the leader - if !s.cluster.channelManager.Match(ttMsg.GetBase().GetSourceID(), ch) { + if !s.channelManager.Match(ttMsg.GetBase().GetSourceID(), ch) { log.Warn("node is not matched with channel", zap.String("channelName", ch), zap.Int64("nodeID", ttMsg.GetBase().GetSourceID()), @@ -1502,22 +1500,7 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe if err := merr.CheckHealthy(s.GetStateCode()); err != nil { return merr.Status(err), nil } - // Look for the DataNode that watches the channel. - ok, nodeID := s.channelManager.getNodeIDByChannelName(req.GetChannelName()) - if !ok { - err := merr.WrapErrChannelNotFound(req.GetChannelName(), "no DataNode watches this channel") - log.Error("no DataNode found for channel", zap.String("channelName", req.GetChannelName()), zap.Error(err)) - return merr.Status(err), nil - } - // Call DataNode to add the new segment to its own flow graph. - cli, err := s.sessionManager.getClient(ctx, nodeID) - if err != nil { - log.Error("failed to get DataNode client for SaveImportSegment", - zap.Int64("DataNode ID", nodeID), - zap.Error(err)) - return merr.Status(err), nil - } - resp, err := cli.AddImportSegment(ctx, + resp, err := s.cluster.AddImportSegment(ctx, &datapb.AddImportSegmentRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithTimeStamp(req.GetBase().GetTimestamp()), @@ -1530,11 +1513,10 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe RowNum: req.GetRowNum(), StatsLog: req.GetSaveBinlogPathReq().GetField2StatslogPaths(), }) - if err := VerifyResponse(resp.GetStatus(), err); err != nil { - log.Error("failed to add segment", zap.Int64("DataNode ID", nodeID), zap.Error(err)) + if err != nil { return merr.Status(err), nil } - log.Info("succeed to add segment", zap.Int64("DataNode ID", nodeID), zap.Any("add segment req", req)) + // Fill in start position message ID. req.SaveBinlogPathReq.StartPositions[0].StartPosition.MsgID = resp.GetChannelPos() @@ -1622,42 +1604,12 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque }, nil } - mu := &sync.Mutex{} - group, ctx := errgroup.WithContext(ctx) - nodes := s.sessionManager.getLiveNodeIDs() - errReasons := make([]string, 0, len(nodes)) - - for _, nodeID := range nodes { - nodeID := nodeID - group.Go(func() error { - cli, err := s.sessionManager.getClient(ctx, nodeID) - if err != nil { - mu.Lock() - defer mu.Unlock() - errReasons = append(errReasons, fmt.Sprintf("failed to get DataNode %d: %v", nodeID, err)) - return err - } - - sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { - return err - } - err = merr.AnalyzeState("DataNode", nodeID, sta) - if err != nil { - mu.Lock() - defer mu.Unlock() - errReasons = append(errReasons, err.Error()) - } - return nil - }) + err := s.sessionManager.CheckHealth(ctx) + if err != nil { + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: []string{err.Error()}}, nil } - err := group.Wait() - if err != nil || len(errReasons) != 0 { - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errReasons}, nil - } - - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: errReasons}, nil + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil } func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) { diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 3c307cce29..689ea59b95 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -23,8 +23,10 @@ import ( "time" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" @@ -44,8 +46,29 @@ const ( importTimeout = 3 * time.Hour ) -// SessionManager provides the grpc interfaces of cluster -type SessionManager struct { +type SessionManager interface { + AddSession(node *NodeInfo) + DeleteSession(node *NodeInfo) + GetSessionIDs() []int64 + GetSessions() []*Session + + Flush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) + FlushChannels(ctx context.Context, nodeID int64, req *datapb.FlushChannelsRequest) error + Compaction(nodeID int64, plan *datapb.CompactionPlan) error + SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error + Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) + GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult + NotifyChannelOperation(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error + CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) + AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) + CheckHealth(ctx context.Context) error + Close() +} + +var _ SessionManager = (*SessionManagerImpl)(nil) + +// SessionManagerImpl provides the grpc interfaces of cluster +type SessionManagerImpl struct { sessions struct { sync.RWMutex data map[int64]*Session @@ -53,11 +76,11 @@ type SessionManager struct { sessionCreator dataNodeCreatorFunc } -// SessionOpt provides a way to set params in SessionManager -type SessionOpt func(c *SessionManager) +// SessionOpt provides a way to set params in SessionManagerImpl +type SessionOpt func(c *SessionManagerImpl) func withSessionCreator(creator dataNodeCreatorFunc) SessionOpt { - return func(c *SessionManager) { c.sessionCreator = creator } + return func(c *SessionManagerImpl) { c.sessionCreator = creator } } func defaultSessionCreator() dataNodeCreatorFunc { @@ -66,9 +89,9 @@ func defaultSessionCreator() dataNodeCreatorFunc { } } -// NewSessionManager creates a new SessionManager -func NewSessionManager(options ...SessionOpt) *SessionManager { - m := &SessionManager{ +// NewSessionManagerImpl creates a new SessionManagerImpl +func NewSessionManagerImpl(options ...SessionOpt) *SessionManagerImpl { + m := &SessionManagerImpl{ sessions: struct { sync.RWMutex data map[int64]*Session @@ -82,7 +105,7 @@ func NewSessionManager(options ...SessionOpt) *SessionManager { } // AddSession creates a new session -func (c *SessionManager) AddSession(node *NodeInfo) { +func (c *SessionManagerImpl) AddSession(node *NodeInfo) { c.sessions.Lock() defer c.sessions.Unlock() @@ -92,7 +115,7 @@ func (c *SessionManager) AddSession(node *NodeInfo) { } // DeleteSession removes the node session -func (c *SessionManager) DeleteSession(node *NodeInfo) { +func (c *SessionManagerImpl) DeleteSession(node *NodeInfo) { c.sessions.Lock() defer c.sessions.Unlock() @@ -103,8 +126,8 @@ func (c *SessionManager) DeleteSession(node *NodeInfo) { metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data))) } -// getLiveNodeIDs returns IDs of all live DataNodes. -func (c *SessionManager) getLiveNodeIDs() []int64 { +// GetSessionIDs returns IDs of all live DataNodes. +func (c *SessionManagerImpl) GetSessionIDs() []int64 { c.sessions.RLock() defer c.sessions.RUnlock() @@ -116,7 +139,7 @@ func (c *SessionManager) getLiveNodeIDs() []int64 { } // GetSessions gets all node sessions -func (c *SessionManager) GetSessions() []*Session { +func (c *SessionManagerImpl) GetSessions() []*Session { c.sessions.RLock() defer c.sessions.RUnlock() @@ -127,12 +150,24 @@ func (c *SessionManager) GetSessions() []*Session { return ret } +func (c *SessionManagerImpl) getClient(ctx context.Context, nodeID int64) (types.DataNodeClient, error) { + c.sessions.RLock() + session, ok := c.sessions.data[nodeID] + c.sessions.RUnlock() + + if !ok { + return nil, fmt.Errorf("can not find session of node %d", nodeID) + } + + return session.GetOrCreateClient(ctx) +} + // Flush is a grpc interface. It will send req to nodeID asynchronously -func (c *SessionManager) Flush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) { +func (c *SessionManagerImpl) Flush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) { go c.execFlush(ctx, nodeID, req) } -func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) { +func (c *SessionManagerImpl) execFlush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest) { cli, err := c.getClient(ctx, nodeID) if err != nil { log.Warn("failed to get dataNode client", zap.Int64("dataNode ID", nodeID), zap.Error(err)) @@ -150,7 +185,7 @@ func (c *SessionManager) execFlush(ctx context.Context, nodeID int64, req *datap } // Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously. -func (c *SessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) error { +func (c *SessionManagerImpl) Compaction(nodeID int64, plan *datapb.CompactionPlan) error { ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)) defer cancel() cli, err := c.getClient(ctx, nodeID) @@ -170,7 +205,7 @@ func (c *SessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) e } // SyncSegments is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously. -func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error { +func (c *SessionManagerImpl) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error { log := log.With( zap.Int64("nodeID", nodeID), zap.Int64("planID", req.GetPlanID()), @@ -205,12 +240,12 @@ func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequ } // Import is a grpc interface. It will send request to DataNode with provided `nodeID` asynchronously. -func (c *SessionManager) Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) { +func (c *SessionManagerImpl) Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) { go c.execImport(ctx, nodeID, itr) } // execImport gets the corresponding DataNode with its ID and calls its Import method. -func (c *SessionManager) execImport(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) { +func (c *SessionManagerImpl) execImport(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) { cli, err := c.getClient(ctx, nodeID) if err != nil { log.Warn("failed to get client for import", zap.Int64("nodeID", nodeID), zap.Error(err)) @@ -227,7 +262,7 @@ func (c *SessionManager) execImport(ctx context.Context, nodeID int64, itr *data log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr)) } -func (c *SessionManager) GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult { +func (c *SessionManagerImpl) GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult { wg := sync.WaitGroup{} ctx := context.Background() @@ -273,7 +308,7 @@ func (c *SessionManager) GetCompactionPlansResults() map[int64]*datapb.Compactio return rst } -func (c *SessionManager) FlushChannels(ctx context.Context, nodeID int64, req *datapb.FlushChannelsRequest) error { +func (c *SessionManagerImpl) FlushChannels(ctx context.Context, nodeID int64, req *datapb.FlushChannelsRequest) error { log := log.Ctx(ctx).With(zap.Int64("nodeID", nodeID), zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs())), zap.Strings("channels", req.GetChannels())) @@ -283,18 +318,18 @@ func (c *SessionManager) FlushChannels(ctx context.Context, nodeID int64, req *d return err } - log.Info("SessionManager.FlushChannels start") + log.Info("SessionManagerImpl.FlushChannels start") resp, err := cli.FlushChannels(ctx, req) err = VerifyResponse(resp, err) if err != nil { - log.Warn("SessionManager.FlushChannels failed", zap.Error(err)) + log.Warn("SessionManagerImpl.FlushChannels failed", zap.Error(err)) return err } - log.Info("SessionManager.FlushChannels successfully") + log.Info("SessionManagerImpl.FlushChannels successfully") return nil } -func (c *SessionManager) NotifyChannelOperation(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error { +func (c *SessionManagerImpl) NotifyChannelOperation(ctx context.Context, nodeID int64, req *datapb.ChannelOperationsRequest) error { log := log.Ctx(ctx).With(zap.Int64("nodeID", nodeID)) cli, err := c.getClient(ctx, nodeID) if err != nil { @@ -311,7 +346,7 @@ func (c *SessionManager) NotifyChannelOperation(ctx context.Context, nodeID int6 return nil } -func (c *SessionManager) CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) { +func (c *SessionManagerImpl) CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) { log := log.With( zap.Int64("nodeID", nodeID), zap.String("channel", info.GetVchan().GetChannelName()), @@ -334,20 +369,51 @@ func (c *SessionManager) CheckChannelOperationProgress(ctx context.Context, node return resp, nil } -func (c *SessionManager) getClient(ctx context.Context, nodeID int64) (types.DataNodeClient, error) { - c.sessions.RLock() - session, ok := c.sessions.data[nodeID] - c.sessions.RUnlock() - - if !ok { - return nil, fmt.Errorf("can not find session of node %d", nodeID) +func (c *SessionManagerImpl) AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) { + // Call DataNode to add the new segment to its own flow graph. + cli, err := c.getClient(ctx, nodeID) + if err != nil { + log.Error("failed to get DataNode client for SaveImportSegment", + zap.Int64("DataNode ID", nodeID), + zap.Error(err)) + return nil, err } - return session.GetOrCreateClient(ctx) + resp, err := cli.AddImportSegment(ctx, req) + if err := VerifyResponse(resp.GetStatus(), err); err != nil { + log.Error("failed to add segment", zap.Int64("nodeID", nodeID), zap.Error(err)) + return nil, err + } + log.Info("succeed to add segment", zap.Int64("nodeID", nodeID), zap.Any("add segment req", req)) + return resp, err +} + +func (c *SessionManagerImpl) CheckHealth(ctx context.Context) error { + group, ctx := errgroup.WithContext(ctx) + + ids := c.GetSessionIDs() + for _, nodeID := range ids { + nodeID := nodeID + group.Go(func() error { + cli, err := c.getClient(ctx, nodeID) + if err != nil { + return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err) + } + + sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return err + } + err = merr.AnalyzeState("DataNode", nodeID, sta) + return err + }) + } + + return group.Wait() } // Close release sessions -func (c *SessionManager) Close() { +func (c *SessionManagerImpl) Close() { c.sessions.Lock() defer c.sessions.Unlock() diff --git a/internal/datacoord/session_manager_test.go b/internal/datacoord/session_manager_test.go index 0229eec359..9b2bedd790 100644 --- a/internal/datacoord/session_manager_test.go +++ b/internal/datacoord/session_manager_test.go @@ -23,13 +23,13 @@ type SessionManagerSuite struct { dn *mocks.MockDataNodeClient - m *SessionManager + m SessionManager } func (s *SessionManagerSuite) SetupTest() { s.dn = mocks.NewMockDataNodeClient(s.T()) - s.m = NewSessionManager(withSessionCreator(func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + s.m = NewSessionManagerImpl(withSessionCreator(func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { return s.dn, nil }))