From 776709e5ffdf44d569b7f7fa8e72aacd7a3669ad Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sun, 17 Mar 2024 20:59:04 +0800 Subject: [PATCH] fix: Fix binlog import (#31310) Fix binlog import functionality by removing the existing check and refining the size retrieval process. issue: https://github.com/milvus-io/milvus/issues/31221, https://github.com/milvus-io/milvus/issues/28521 --------- Signed-off-by: bigsheeper --- internal/datacoord/import_util.go | 16 --------------- internal/datacoord/import_util_test.go | 27 -------------------------- internal/datacoord/services.go | 3 ++- internal/datacoord/services_test.go | 23 ---------------------- internal/datanode/importv2/executor.go | 2 +- internal/datanode/importv2/util.go | 19 ++++++++++++++++-- 6 files changed, 20 insertions(+), 70 deletions(-) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 7cb6eb95b6..bbdd13e10a 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -18,7 +18,6 @@ package datacoord import ( "context" - "fmt" "path" "sort" "time" @@ -424,13 +423,6 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, } insertPrefix := importFile.GetPaths()[0] - ok, err := cm.Exist(ctx, insertPrefix) - if err != nil { - return nil, err - } - if !ok { - return nil, fmt.Errorf("insert binlog prefix does not exist, path=%s", insertPrefix) - } segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false) if err != nil { return nil, err @@ -443,14 +435,6 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, return segmentImportFiles, nil } deltaPrefix := importFile.GetPaths()[1] - ok, err = cm.Exist(ctx, deltaPrefix) - if err != nil { - return nil, err - } - if !ok { - log.Warn("delta binlog prefix does not exist", zap.String("path", deltaPrefix)) - return segmentImportFiles, nil - } segmentDeltaPaths, _, err := cm.ListWithPrefix(context.Background(), deltaPrefix, false) if err != nil { return nil, err diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 329bb79e11..0f91e49936 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -336,7 +336,6 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) { ctx := context.Background() cm := mocks2.NewChunkManager(t) - cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil) cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil) @@ -355,32 +354,6 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) { assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009") } } - - // test failure - mockErr := errors.New("mock err") - cm = mocks2.NewChunkManager(t) - cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, mockErr) - _, err = ListBinlogsAndGroupBySegment(ctx, cm, file) - assert.Error(t, err) - - cm = mocks2.NewChunkManager(t) - cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(false, nil) - _, err = ListBinlogsAndGroupBySegment(ctx, cm, file) - assert.Error(t, err) - - cm = mocks2.NewChunkManager(t) - cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, nil) - cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) - cm.EXPECT().Exist(mock.Anything, deltaPrefix).Return(true, mockErr) - _, err = ListBinlogsAndGroupBySegment(ctx, cm, file) - assert.Error(t, err) - - cm = mocks2.NewChunkManager(t) - cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, nil) - cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) - cm.EXPECT().Exist(mock.Anything, deltaPrefix).Return(false, nil) - _, err = ListBinlogsAndGroupBySegment(ctx, cm, file) - assert.NoError(t, err) } func TestImportUtil_GetImportProgress(t *testing.T) { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 89851cf159..238adda017 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1808,9 +1808,10 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter return len(file.GetPaths()) > 0 }) if len(files) == 0 { - resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg(fmt.Sprintf("no binlog to import, import_prefix=%s", in.GetFiles()))) + resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles()))) return resp, nil } + log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files)) } idStart, _, err := s.allocator.allocN(int64(len(files)) + 1) diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 4cf9e1a2eb..b9c8e646f7 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1369,7 +1369,6 @@ func TestImportV2(t *testing.T) { // list binlog failed cm := mocks2.NewChunkManager(t) - cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil) cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, mockErr) s.meta = &meta{chunkManager: cm} resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{ @@ -1389,28 +1388,6 @@ func TestImportV2(t *testing.T) { assert.NoError(t, err) assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed)) - // list no binlog - cm = mocks2.NewChunkManager(t) - cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil) - cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil) - s.meta = &meta{chunkManager: cm} - resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{ - Files: []*internalpb.ImportFile{ - { - Id: 1, - Paths: []string{"mock_insert_prefix"}, - }, - }, - Options: []*commonpb.KeyValuePair{ - { - Key: "backup", - Value: "true", - }, - }, - }) - assert.NoError(t, err) - assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)) - // alloc failed alloc := NewNMockAllocator(t) alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr) diff --git a/internal/datanode/importv2/executor.go b/internal/datanode/importv2/executor.go index 6804b17fbd..24cbf1a7fd 100644 --- a/internal/datanode/importv2/executor.go +++ b/internal/datanode/importv2/executor.go @@ -181,7 +181,7 @@ func (e *executor) PreImport(task Task) { } func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int, file *internalpb.ImportFile) error { - fileSize, err := GetFileSize(file, e.cm) + fileSize, err := GetFileSize(file, e.cm, task) if err != nil { return err } diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 1005e3197e..93a0e70314 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -203,14 +204,28 @@ func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.Collection return 0 } -func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager) (int64, error) { +func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager, task Task) (int64, error) { + paths := file.GetPaths() + if importutilv2.IsBackup(task.GetOptions()) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + paths = make([]string, 0) + for _, prefix := range file.GetPaths() { + binlogs, _, err := cm.ListWithPrefix(ctx, prefix, true) + if err != nil { + return 0, err + } + paths = append(paths, binlogs...) + } + } + fn := func(path string) (int64, error) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() return cm.Size(ctx, path) } var totalSize int64 = 0 - for _, path := range file.GetPaths() { + for _, path := range paths { size, err := fn(path) if err != nil { return 0, err