Delay the cancellation of ctx when stopping the node (#28247)

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/28181/head
SimFG 2023-11-08 03:20:17 +08:00 committed by GitHub
parent 64f6dbabdc
commit e3b7fdac61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 4 additions and 2 deletions

View File

@ -396,10 +396,11 @@ func (node *DataNode) ReadyToFlush() error {
// Stop will release DataNode resources and shutdown datanode // Stop will release DataNode resources and shutdown datanode
func (node *DataNode) Stop() error { func (node *DataNode) Stop() error {
node.stopOnce.Do(func() { node.stopOnce.Do(func() {
node.cancel()
// https://github.com/milvus-io/milvus/issues/12282 // https://github.com/milvus-io/milvus/issues/12282
node.UpdateStateCode(commonpb.StateCode_Abnormal) node.UpdateStateCode(commonpb.StateCode_Abnormal)
node.flowgraphManager.close() node.flowgraphManager.close()
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph
node.cancel()
node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool { node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool {
m.Close() m.Close()

View File

@ -453,13 +453,14 @@ func (node *QueryNode) Stop() error {
node.UpdateStateCode(commonpb.StateCode_Abnormal) node.UpdateStateCode(commonpb.StateCode_Abnormal)
node.lifetime.Wait() node.lifetime.Wait()
node.cancel()
if node.scheduler != nil { if node.scheduler != nil {
node.scheduler.Stop() node.scheduler.Stop()
} }
if node.pipelineManager != nil { if node.pipelineManager != nil {
node.pipelineManager.Close() node.pipelineManager.Close()
} }
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the pipeline
node.cancel()
if node.session != nil { if node.session != nil {
node.session.Stop() node.session.Stop()
} }