From 1518baacd13d7b4313c1a82994b5549a72a782d9 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 14 Dec 2022 18:21:23 +0800 Subject: [PATCH] Make sure querynode stop only once (#21203) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/querynode/query_node.go | 87 +++++++++++++++----------------- 1 file changed, 42 insertions(+), 45 deletions(-) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 1db5d08b2c..c728c6327e 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -30,8 +30,6 @@ import "C" import ( "context" "fmt" - "github.com/samber/lo" - uberatomic "go.uber.org/atomic" "os" "path" "runtime" @@ -42,6 +40,8 @@ import ( "time" "unsafe" + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/panjf2000/ants/v2" @@ -89,7 +89,7 @@ type QueryNode struct { wg sync.WaitGroup stateCode atomic.Value - stopFlag uberatomic.Bool + stopOnce sync.Once //call once initOnce sync.Once @@ -140,7 +140,6 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode { node.tSafeReplica = newTSafeReplica() node.scheduler = newTaskScheduler(ctx1, node.tSafeReplica) node.UpdateStateCode(commonpb.StateCode_Abnormal) - node.stopFlag.Store(false) return node } @@ -325,54 +324,52 @@ func (node *QueryNode) Start() error { // Stop mainly stop QueryNode's query service, historical loop and streaming loop. func (node *QueryNode) Stop() error { - if node.stopFlag.Load() { - return nil - } - node.stopFlag.Store(true) - - log.Warn("Query node stop..") - err := node.session.GoingStop() - if err != nil { - log.Warn("session fail to go stopping state", zap.Error(err)) - } else { - node.UpdateStateCode(commonpb.StateCode_Stopping) - noSegmentChan := node.metaReplica.getNoSegmentChan() - select { - case <-noSegmentChan: - case <-time.After(Params.QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)): - log.Warn("migrate data timed out", zap.Int64("server_id", paramtable.GetNodeID()), - zap.Int64s("sealed_segment", lo.Map(node.metaReplica.getSealedSegments(), func(t *Segment, i int) int64 { - return t.ID() - })), - zap.Int64s("growing_segment", lo.Map(node.metaReplica.getGrowingSegments(), func(t *Segment, i int) int64 { - return t.ID() - })), - ) + node.stopOnce.Do(func() { + log.Warn("Query node stop..") + err := node.session.GoingStop() + if err != nil { + log.Warn("session fail to go stopping state", zap.Error(err)) + } else { + node.UpdateStateCode(commonpb.StateCode_Stopping) + noSegmentChan := node.metaReplica.getNoSegmentChan() + select { + case <-noSegmentChan: + case <-time.After(Params.QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)): + log.Warn("migrate data timed out", zap.Int64("server_id", paramtable.GetNodeID()), + zap.Int64s("sealed_segment", lo.Map(node.metaReplica.getSealedSegments(), func(t *Segment, i int) int64 { + return t.ID() + })), + zap.Int64s("growing_segment", lo.Map(node.metaReplica.getGrowingSegments(), func(t *Segment, i int) int64 { + return t.ID() + })), + ) + } } - } - node.UpdateStateCode(commonpb.StateCode_Abnormal) - node.wg.Wait() - node.queryNodeLoopCancel() + node.UpdateStateCode(commonpb.StateCode_Abnormal) + node.wg.Wait() + node.queryNodeLoopCancel() - // close services - if node.dataSyncService != nil { - node.dataSyncService.close() - } + // close services + if node.dataSyncService != nil { + node.dataSyncService.close() + } - if node.metaReplica != nil { - node.metaReplica.freeAll() - } + if node.metaReplica != nil { + node.metaReplica.freeAll() + } - if node.ShardClusterService != nil { - node.ShardClusterService.close() - } + if node.ShardClusterService != nil { + node.ShardClusterService.close() + } - if node.queryShardService != nil { - node.queryShardService.close() - } + if node.queryShardService != nil { + node.queryShardService.close() + } + + node.session.Revoke(time.Second) + }) - node.session.Revoke(time.Second) return nil }