From 3352030a8490defabbf6c9e7928594de674ad6ec Mon Sep 17 00:00:00 2001 From: congqixia Date: Sat, 14 Sep 2024 15:55:08 +0800 Subject: [PATCH] enhance: Graceful stop flowgraph manager when stopping datanode (#36229) Flowgraph manager is not stopped durong datanode stopping procedure which may lead to unexpect flowgraph behavior during/after datanode stop progress. --------- Signed-off-by: Congqi Xia --- internal/datanode/data_node.go | 5 +++++ internal/datanode/services_test.go | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ba9527ef2a..b374c113ae 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -362,6 +362,11 @@ func (node *DataNode) Stop() error { node.channelManager.Close() } + if node.flowgraphManager != nil { + node.flowgraphManager.ClearFlowgraphs() + node.flowgraphManager.Close() + } + if node.writeBufferManager != nil { node.writeBufferManager.Stop() } diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 9d67db20cd..c776b67599 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -685,6 +685,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe() + mockFlowgraphManager.EXPECT().Close().Return().Maybe() s.node.flowgraphManager = mockFlowgraphManager ctx := context.Background() req := &datapb.SyncSegmentsRequest{ @@ -783,6 +785,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe() + mockFlowgraphManager.EXPECT().Close().Return().Maybe() s.node.flowgraphManager = mockFlowgraphManager ctx := context.Background() req := &datapb.SyncSegmentsRequest{ @@ -869,6 +873,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe() + mockFlowgraphManager.EXPECT().Close().Return().Maybe() s.node.flowgraphManager = mockFlowgraphManager ctx := context.Background() req := &datapb.SyncSegmentsRequest{ @@ -966,6 +972,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe() + mockFlowgraphManager.EXPECT().Close().Return().Maybe() s.node.flowgraphManager = mockFlowgraphManager ctx := context.Background() req := &datapb.SyncSegmentsRequest{ @@ -1046,6 +1054,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe() + mockFlowgraphManager.EXPECT().Close().Return().Maybe() s.node.flowgraphManager = mockFlowgraphManager ctx := context.Background() req := &datapb.SyncSegmentsRequest{ @@ -1104,6 +1114,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe() + mockFlowgraphManager.EXPECT().Close().Return().Maybe() s.node.flowgraphManager = mockFlowgraphManager ctx := context.Background() req := &datapb.SyncSegmentsRequest{