Fix segment loader skip wait if all segments are loading (#25643)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/25682/head
congqixia 2023-07-17 19:51:18 +08:00 committed by GitHub
parent 49655d2f13
commit c490b30db7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 43 additions and 33 deletions

View File

@ -131,17 +131,16 @@ func (loader *segmentLoader) Load(ctx context.Context,
zap.String("segmentType", segmentType.String()),
)
if len(segments) == 0 {
log.Info("no segment to load")
return nil, nil
}
// Filter out loaded & loading segments
infos := loader.prepare(segmentType, segments...)
defer loader.unregister(infos...)
segmentNum := len(infos)
if segmentNum == 0 {
log.Info("no segment to load")
return nil, nil
}
log.Info("start loading...", zap.Int("segmentNum", segmentNum))
// continue to wait other task done
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
// Check memory & storage limit
memUsage, diskUsage, concurrencyLevel, err := loader.requestResource(infos...)
@ -210,12 +209,10 @@ func (loader *segmentLoader) Load(ctx context.Context,
log.Info("load segment done", zap.Int64("segmentID", segmentID))
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
waitCh, ok := loader.loadingSegments.Get(segmentID)
if !ok {
return errors.New("segment was removed from the loading map early")
waitCh, ok := loader.loadingSegments.Get(loadInfo.GetSegmentID())
if ok {
close(waitCh)
}
close(waitCh)
return nil
}
@ -223,9 +220,9 @@ func (loader *segmentLoader) Load(ctx context.Context,
// Start to load,
// Make sure we can always benefit from concurrency, and not spawn too many idle goroutines
log.Info("start to load segments in parallel",
zap.Int("segmentNum", segmentNum),
zap.Int("segmentNum", len(infos)),
zap.Int("concurrencyLevel", concurrencyLevel))
err = funcutil.ProcessFuncParallel(segmentNum,
err = funcutil.ProcessFuncParallel(len(infos),
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
if err != nil {
clearAll()
@ -290,7 +287,14 @@ func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) {
loader.mut.Lock()
defer loader.mut.Unlock()
for i := range segments {
loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID())
waitCh, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID())
if ok {
select {
case <-waitCh:
default: // close wait channel for failed task
close(waitCh)
}
}
}
}
@ -302,26 +306,32 @@ func (loader *segmentLoader) requestResource(infos ...*querypb.SegmentLoadInfo)
concurrencyLevel := funcutil.Min(runtime.GOMAXPROCS(0), len(infos))
logNum := 0
for _, field := range infos[0].GetBinlogPaths() {
logNum += len(field.GetBinlogs())
}
if logNum > 0 {
// IO pool will be run out even with the new smaller level
concurrencyLevel = funcutil.Min(concurrencyLevel, funcutil.Max(loader.ioPool.Free()/logNum, 1))
}
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
_, _, err := loader.checkSegmentSize(infos, concurrencyLevel)
if err == nil {
break
var memUsage, diskUsage uint64
for _, info := range infos {
logNum := 0
for _, field := range info.GetBinlogPaths() {
logNum += len(field.GetBinlogs())
}
if logNum > 0 {
// IO pool will be run out even with the new smaller level
concurrencyLevel = funcutil.Min(concurrencyLevel, funcutil.Max(loader.ioPool.Free()/logNum, 1))
}
}
memUsage, diskUsage, err := loader.checkSegmentSize(infos, concurrencyLevel)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
return 0, 0, 0, err
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
_, _, err := loader.checkSegmentSize(infos, concurrencyLevel)
if err == nil {
break
}
}
mu, du, err := loader.checkSegmentSize(infos, concurrencyLevel)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
return 0, 0, 0, err
}
memUsage += mu
diskUsage += du
}
loader.committedMemSize += memUsage