Stop blocking until all channels & segments released (#25328)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/25356/head
yah01 2023-07-06 10:04:25 +08:00 committed by GitHub
parent 71d99d4ec2
commit 4c0e36d28c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 16 additions and 3 deletions

View File

@ -374,11 +374,23 @@ func (node *QueryNode) Stop() error {
timeoutCh := time.After(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second))
outer:
for node.manager == nil || node.manager.Segment.Empty() {
for (node.manager == nil || node.manager.Segment.Empty()) &&
(node.pipelineManager == nil || node.pipelineManager.Num() == 0) {
select {
case <-timeoutCh:
sealedSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed))
growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
var (
sealedSegments = []segments.Segment{}
growingSegments = []segments.Segment{}
channelNum = 0
)
if node.manager != nil {
sealedSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed))
growingSegments = node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
}
if node.pipelineManager != nil {
channelNum = node.pipelineManager.Num()
}
log.Warn("migrate data timed out", zap.Int64("ServerID", paramtable.GetNodeID()),
zap.Int64s("sealedSegments", lo.Map[segments.Segment, int64](sealedSegments, func(s segments.Segment, i int) int64 {
return s.ID()
@ -386,6 +398,7 @@ func (node *QueryNode) Stop() error {
zap.Int64s("growingSegments", lo.Map[segments.Segment, int64](growingSegments, func(t segments.Segment, i int) int64 {
return t.ID()
})),
zap.Int("channelNum", channelNum),
)
break outer