fix: Fix binlog import and refine error reporting (#31241)

1. Fix binlog import with partition key.
2. Refine binlog import error reportins.
3. Avoid division by zero when retrieving import progress.

issue: https://github.com/milvus-io/milvus/issues/31221,
https://github.com/milvus-io/milvus/issues/28521

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/31290/head
yihao.dai 2024-03-15 10:55:05 +08:00 committed by GitHub
parent ff4237bb90
commit 811316d2ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 99 additions and 9 deletions

View File

@ -18,6 +18,7 @@ package datacoord
import (
"context"
"fmt"
"path"
"sort"
"time"
@ -252,6 +253,9 @@ func getPendingProgress(jobID int64, imeta ImportMeta) float32 {
return len(task.GetFileStats())
})
totalFiles := len(imeta.GetJob(jobID).GetFiles())
if totalFiles == 0 {
return 1
}
return float32(preImportingFiles) / float32(totalFiles)
}
@ -260,6 +264,9 @@ func getPreImportingProgress(jobID int64, imeta ImportMeta) float32 {
completedTasks := lo.Filter(tasks, func(task ImportTask, _ int) bool {
return task.GetState() == datapb.ImportTaskStateV2_Completed
})
if len(tasks) == 0 {
return 1
}
return float32(len(completedTasks)) / float32(len(tasks))
}
@ -277,7 +284,10 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 {
segmentIDs = append(segmentIDs, task.(*importTask).GetSegmentIDs()...)
}
importedRows = meta.GetSegmentsTotalCurrentRows(segmentIDs)
importingProgress := float32(importedRows) / float32(totalRows)
var importingProgress float32 = 1
if totalRows != 0 {
importingProgress = float32(importedRows) / float32(totalRows)
}
var (
unsetIsImportingSegment int64
@ -297,7 +307,10 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 {
}
}
}
completedProgress := float32(unsetIsImportingSegment) / float32(totalSegment)
var completedProgress float32 = 1
if totalSegment != 0 {
completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment)
}
return importingProgress*0.8 + completedProgress*0.2
}
@ -371,7 +384,15 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager,
return nil, merr.WrapErrImportFailed("no insert binlogs to import")
}
segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, importFile.GetPaths()[0], false)
insertPrefix := importFile.GetPaths()[0]
ok, err := cm.Exist(ctx, insertPrefix)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("insert binlog prefix does not exist, path=%s", insertPrefix)
}
segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false)
if err != nil {
return nil, err
}
@ -382,7 +403,16 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager,
if len(importFile.GetPaths()) < 2 {
return segmentImportFiles, nil
}
segmentDeltaPaths, _, err := cm.ListWithPrefix(context.Background(), importFile.GetPaths()[1], false)
deltaPrefix := importFile.GetPaths()[1]
ok, err = cm.Exist(ctx, deltaPrefix)
if err != nil {
return nil, err
}
if !ok {
log.Warn("delta binlog prefix does not exist", zap.String("path", deltaPrefix))
return segmentImportFiles, nil
}
segmentDeltaPaths, _, err := cm.ListWithPrefix(context.Background(), deltaPrefix, false)
if err != nil {
return nil, err
}

View File

@ -23,6 +23,7 @@ import (
"path"
"testing"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -261,6 +262,7 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) {
ctx := context.Background()
cm := mocks2.NewChunkManager(t)
cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil)
@ -279,6 +281,32 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) {
assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009")
}
}
// test failure
mockErr := errors.New("mock err")
cm = mocks2.NewChunkManager(t)
cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, mockErr)
_, err = ListBinlogsAndGroupBySegment(ctx, cm, file)
assert.Error(t, err)
cm = mocks2.NewChunkManager(t)
cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(false, nil)
_, err = ListBinlogsAndGroupBySegment(ctx, cm, file)
assert.Error(t, err)
cm = mocks2.NewChunkManager(t)
cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil)
cm.EXPECT().Exist(mock.Anything, deltaPrefix).Return(true, mockErr)
_, err = ListBinlogsAndGroupBySegment(ctx, cm, file)
assert.Error(t, err)
cm = mocks2.NewChunkManager(t)
cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil)
cm.EXPECT().Exist(mock.Anything, deltaPrefix).Return(false, nil)
_, err = ListBinlogsAndGroupBySegment(ctx, cm, file)
assert.NoError(t, err)
}
func TestImportUtil_GetImportProgress(t *testing.T) {

View File

@ -1773,9 +1773,8 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
log := log.With(zap.Int64("collection", in.GetCollectionID()),
zap.Int64s("partitions", in.GetPartitionIDs()),
zap.Strings("channels", in.GetChannelNames()),
zap.Any("files", in.GetFiles()))
log.Info("receive import request")
zap.Strings("channels", in.GetChannelNames()))
log.Info("receive import request", zap.Any("files", in.GetFiles()))
var timeoutTs uint64 = math.MaxUint64
timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV("timeout", in.GetOptions())
@ -1800,11 +1799,18 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
for _, importFile := range in.GetFiles() {
segmentPrefixes, err := ListBinlogsAndGroupBySegment(ctx, s.meta.chunkManager, importFile)
if err != nil {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("list binlogs and group by segment failed, err=%w", err)))
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("list binlogs failed, err=%s", err)))
return resp, nil
}
files = append(files, segmentPrefixes...)
}
files = lo.Filter(files, func(file *internalpb.ImportFile, _ int) bool {
return len(file.GetPaths()) > 0
})
if len(files) == 0 {
resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg(fmt.Sprintf("no binlog to import, import_prefix=%s", in.GetFiles())))
return resp, nil
}
}
idStart, _, err := s.allocator.allocN(int64(len(files)) + 1)
@ -1839,7 +1845,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
}
resp.JobID = fmt.Sprint(job.GetJobID())
log.Info("add import job done", zap.Int64("jobID", job.GetJobID()))
log.Info("add import job done", zap.Int64("jobID", job.GetJobID()), zap.Any("files", files))
return resp, nil
}

View File

@ -1369,6 +1369,7 @@ func TestImportV2(t *testing.T) {
// list binlog failed
cm := mocks2.NewChunkManager(t)
cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, mockErr)
s.meta = &meta{chunkManager: cm}
resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{
@ -1388,6 +1389,28 @@ func TestImportV2(t *testing.T) {
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
// list no binlog
cm = mocks2.NewChunkManager(t)
cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil)
s.meta = &meta{chunkManager: cm}
resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{
Files: []*internalpb.ImportFile{
{
Id: 1,
Paths: []string{"mock_insert_prefix"},
},
},
Options: []*commonpb.KeyValuePair{
{
Key: "backup",
Value: "true",
},
},
})
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrParameterInvalid))
// alloc failed
alloc := NewNMockAllocator(t)
alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr)

View File

@ -318,6 +318,9 @@ func (e *executor) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future
for channelIdx, datas := range hashedData {
channel := task.GetVchannels()[channelIdx]
for partitionIdx, data := range datas {
if data.GetRowNum() == 0 {
continue
}
partitionID := task.GetPartitionIDs()[partitionIdx]
size := data.GetMemorySize()
segmentID := PickSegment(task, segmentImportedSizes, channel, partitionID, size)