Add more logs on failure path in load segments (#13785)

This PR added more clarified logs
- in IO failures of estimateSegmentsSize
- in esitimate goroutines

See also: #13250

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/13858/head
XuanYang-cn 2021-12-21 11:13:03 +08:00 committed by GitHub
parent ac1cd1ee6a
commit 2e0f3695b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 15 deletions

View File

@ -831,6 +831,10 @@ func estimateSegmentsSize(segments *querypb.LoadSegmentsRequest, kvClient kv.Dat
if err != nil {
indexSize, err = storage.GetBinlogSize(kvClient, path)
if err != nil {
log.Warn("estimate index size wrong",
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.String("path", path),
zap.Error(err))
return 0, err
}
}
@ -847,6 +851,10 @@ func estimateSegmentsSize(segments *querypb.LoadSegmentsRequest, kvClient kv.Dat
if err != nil {
binlogSize, err = storage.GetBinlogSize(kvClient, path.GetLogPath())
if err != nil {
log.Warn("estimate binlog size wrong",
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.String("binlog path", path.GetLogPath()),
zap.Error(err))
return 0, err
}
}

View File

@ -20,10 +20,10 @@ import (
"context"
"errors"
"sort"
"sync"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -101,23 +101,30 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
}
log.Debug("shuffleSegmentsToQueryNodeV2: start estimate the size of loadReqs")
dataSizePerReq := make([]int64, len(reqs))
estimateError := make([]error, len(reqs))
var estimateWg sync.WaitGroup
estimateReqFn := func(offset int, req *querypb.LoadSegmentsRequest) {
defer estimateWg.Done()
dataSizePerReq[offset], estimateError[offset] = cluster.estimateSegmentsSize(req)
}
// use errgroup to collect errors of goroutines
g, _ := errgroup.WithContext(ctx)
for offset, req := range reqs {
estimateWg.Add(1)
go estimateReqFn(offset, req)
r, i := req, offset
g.Go(func() error {
size, err := cluster.estimateSegmentsSize(r)
if err != nil {
log.Warn("estimate segment size error",
zap.Int64("collectionID", r.GetCollectionID()),
zap.Error(err))
return err
}
dataSizePerReq[i] = size
return nil
})
}
estimateWg.Wait()
for _, err := range estimateError {
if err != nil {
log.Debug("shuffleSegmentsToQueryNodeV2: estimate segment size error", zap.Error(err))
return err
}
if err := g.Wait(); err != nil {
log.Warn("shuffleSegmentsToQueryNodeV2: estimate segment size error", zap.Error(err))
return err
}
log.Debug("shuffleSegmentsToQueryNodeV2: estimate the size of loadReqs end")
for {
// online nodes map and totalMem, usedMem, memUsage of every node

View File

@ -122,6 +122,12 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) {
assert.Equal(t, node2ID, firstReq.DstNodeID)
assert.Equal(t, node2ID, secondReq.DstNodeID)
err = shuffleSegmentsToQueryNodeV2(baseCtx, reqs, cluster, true, nil, nil)
assert.Nil(t, err)
assert.Equal(t, node2ID, firstReq.DstNodeID)
assert.Equal(t, node2ID, secondReq.DstNodeID)
})
err = removeAllSession()