enhance: [2.5] Accelerate listing objects during binlog import (#40048)

issue: https://github.com/milvus-io/milvus/issues/40030

pr: https://github.com/milvus-io/milvus/pull/40047

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/40155/head
yihao.dai 2025-02-24 15:59:56 +08:00 committed by GitHub
parent fdad35e668
commit b6b03ff74c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 31 additions and 9 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"math" "math"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
@ -42,7 +43,9 @@ import (
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -1688,7 +1691,8 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
log := log.Ctx(ctx).With(zap.Int64("collection", in.GetCollectionID()), log := log.Ctx(ctx).With(zap.Int64("collection", in.GetCollectionID()),
zap.Int64s("partitions", in.GetPartitionIDs()), zap.Int64s("partitions", in.GetPartitionIDs()),
zap.Strings("channels", in.GetChannelNames())) zap.Strings("channels", in.GetChannelNames()))
log.Info("receive import request", zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions())) log.Info("receive import request", zap.Int("fileNum", len(in.GetFiles())),
zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions()))
timeoutTs, err := importutilv2.GetTimeoutTs(in.GetOptions()) timeoutTs, err := importutilv2.GetTimeoutTs(in.GetOptions())
if err != nil { if err != nil {
@ -1700,14 +1704,28 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
isBackup := importutilv2.IsBackup(in.GetOptions()) isBackup := importutilv2.IsBackup(in.GetOptions())
if isBackup { if isBackup {
files = make([]*internalpb.ImportFile, 0) files = make([]*internalpb.ImportFile, 0)
pool := conc.NewPool[struct{}](hardware.GetCPUNum() * 2)
futures := make([]*conc.Future[struct{}], 0, len(in.GetFiles()))
mu := &sync.Mutex{}
for _, importFile := range in.GetFiles() { for _, importFile := range in.GetFiles() {
importFile := importFile
futures = append(futures, pool.Submit(func() (struct{}, error) {
segmentPrefixes, err := ListBinlogsAndGroupBySegment(ctx, s.meta.chunkManager, importFile) segmentPrefixes, err := ListBinlogsAndGroupBySegment(ctx, s.meta.chunkManager, importFile)
if err != nil {
return struct{}{}, err
}
mu.Lock()
defer mu.Unlock()
files = append(files, segmentPrefixes...)
return struct{}{}, nil
}))
}
err = conc.AwaitAll(futures...)
if err != nil { if err != nil {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("list binlogs failed, err=%s", err))) resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("list binlogs failed, err=%s", err)))
return resp, nil return resp, nil
} }
files = append(files, segmentPrefixes...)
}
files = lo.Filter(files, func(file *internalpb.ImportFile, _ int) bool { files = lo.Filter(files, func(file *internalpb.ImportFile, _ int) bool {
return len(file.GetPaths()) > 0 return len(file.GetPaths()) > 0
}) })
@ -1720,7 +1738,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(files)))) paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(files))))
return resp, nil return resp, nil
} }
log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files)) log.Info("list binlogs prefixes for import", zap.Int("num", len(files)), zap.Any("binlog_prefixes", files))
} }
// Check if the number of jobs exceeds the limit. // Check if the number of jobs exceeds the limit.
@ -1770,7 +1788,11 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
} }
resp.JobID = fmt.Sprint(job.GetJobID()) resp.JobID = fmt.Sprint(job.GetJobID())
log.Info("add import job done", zap.Int64("jobID", job.GetJobID()), zap.Any("files", files)) log.Info("add import job done",
zap.Int64("jobID", job.GetJobID()),
zap.Int("fileNum", len(files)),
zap.Any("files", files),
)
return resp, nil return resp, nil
} }