mirror of https://github.com/milvus-io/milvus.git
				
				
				
			enhance: Graceful stop flowgraph manager when stopping datanode (#36229)
Flowgraph manager is not stopped durong datanode stopping procedure which may lead to unexpect flowgraph behavior during/after datanode stop progress. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/36291/head
							parent
							
								
									b8b4aea4f5
								
							
						
					
					
						commit
						3352030a84
					
				| 
						 | 
					@ -362,6 +362,11 @@ func (node *DataNode) Stop() error {
 | 
				
			||||||
			node.channelManager.Close()
 | 
								node.channelManager.Close()
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if node.flowgraphManager != nil {
 | 
				
			||||||
 | 
								node.flowgraphManager.ClearFlowgraphs()
 | 
				
			||||||
 | 
								node.flowgraphManager.Close()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if node.writeBufferManager != nil {
 | 
							if node.writeBufferManager != nil {
 | 
				
			||||||
			node.writeBufferManager.Stop()
 | 
								node.writeBufferManager.Stop()
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -685,6 +685,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
 | 
				
			||||||
		mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
							mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
				
			||||||
		mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
							mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
				
			||||||
			Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
								Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe()
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().Close().Return().Maybe()
 | 
				
			||||||
		s.node.flowgraphManager = mockFlowgraphManager
 | 
							s.node.flowgraphManager = mockFlowgraphManager
 | 
				
			||||||
		ctx := context.Background()
 | 
							ctx := context.Background()
 | 
				
			||||||
		req := &datapb.SyncSegmentsRequest{
 | 
							req := &datapb.SyncSegmentsRequest{
 | 
				
			||||||
| 
						 | 
					@ -783,6 +785,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
 | 
				
			||||||
		mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
							mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
				
			||||||
		mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
							mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
				
			||||||
			Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
								Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe()
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().Close().Return().Maybe()
 | 
				
			||||||
		s.node.flowgraphManager = mockFlowgraphManager
 | 
							s.node.flowgraphManager = mockFlowgraphManager
 | 
				
			||||||
		ctx := context.Background()
 | 
							ctx := context.Background()
 | 
				
			||||||
		req := &datapb.SyncSegmentsRequest{
 | 
							req := &datapb.SyncSegmentsRequest{
 | 
				
			||||||
| 
						 | 
					@ -869,6 +873,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
 | 
				
			||||||
		mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
							mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
				
			||||||
		mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
							mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
				
			||||||
			Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
								Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe()
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().Close().Return().Maybe()
 | 
				
			||||||
		s.node.flowgraphManager = mockFlowgraphManager
 | 
							s.node.flowgraphManager = mockFlowgraphManager
 | 
				
			||||||
		ctx := context.Background()
 | 
							ctx := context.Background()
 | 
				
			||||||
		req := &datapb.SyncSegmentsRequest{
 | 
							req := &datapb.SyncSegmentsRequest{
 | 
				
			||||||
| 
						 | 
					@ -966,6 +972,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
 | 
				
			||||||
		mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
							mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
				
			||||||
		mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
							mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
				
			||||||
			Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
								Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe()
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().Close().Return().Maybe()
 | 
				
			||||||
		s.node.flowgraphManager = mockFlowgraphManager
 | 
							s.node.flowgraphManager = mockFlowgraphManager
 | 
				
			||||||
		ctx := context.Background()
 | 
							ctx := context.Background()
 | 
				
			||||||
		req := &datapb.SyncSegmentsRequest{
 | 
							req := &datapb.SyncSegmentsRequest{
 | 
				
			||||||
| 
						 | 
					@ -1046,6 +1054,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
 | 
				
			||||||
		mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
							mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
				
			||||||
		mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
							mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
				
			||||||
			Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
								Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe()
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().Close().Return().Maybe()
 | 
				
			||||||
		s.node.flowgraphManager = mockFlowgraphManager
 | 
							s.node.flowgraphManager = mockFlowgraphManager
 | 
				
			||||||
		ctx := context.Background()
 | 
							ctx := context.Background()
 | 
				
			||||||
		req := &datapb.SyncSegmentsRequest{
 | 
							req := &datapb.SyncSegmentsRequest{
 | 
				
			||||||
| 
						 | 
					@ -1104,6 +1114,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
 | 
				
			||||||
		mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
							mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
 | 
				
			||||||
		mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
							mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
 | 
				
			||||||
			Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
								Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().ClearFlowgraphs().Return().Maybe()
 | 
				
			||||||
 | 
							mockFlowgraphManager.EXPECT().Close().Return().Maybe()
 | 
				
			||||||
		s.node.flowgraphManager = mockFlowgraphManager
 | 
							s.node.flowgraphManager = mockFlowgraphManager
 | 
				
			||||||
		ctx := context.Background()
 | 
							ctx := context.Background()
 | 
				
			||||||
		req := &datapb.SyncSegmentsRequest{
 | 
							req := &datapb.SyncSegmentsRequest{
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue