From 8736372fd28e36b527aa65e82e9a07a8b7befefe Mon Sep 17 00:00:00 2001 From: groot Date: Fri, 6 May 2022 11:21:50 +0800 Subject: [PATCH] Fix bulkload bugs (#16760) Signed-off-by: groot --- .../core/src/indexbuilder/VecIndexCreator.cpp | 8 +- internal/util/importutil/import_wrapper.go | 18 ++- .../util/importutil/import_wrapper_test.go | 128 ++++++++++++++++++ internal/util/importutil/json_handler.go | 2 +- 4 files changed, 148 insertions(+), 8 deletions(-) diff --git a/internal/core/src/indexbuilder/VecIndexCreator.cpp b/internal/core/src/indexbuilder/VecIndexCreator.cpp index 12385812a1..7609d8efec 100644 --- a/internal/core/src/indexbuilder/VecIndexCreator.cpp +++ b/internal/core/src/indexbuilder/VecIndexCreator.cpp @@ -57,12 +57,8 @@ VecIndexCreator::parse_impl(const std::string& serialized_params_str, knowhere:: conf[key] = value; } - auto stoi_closure = [](const std::string& s) -> auto { - return std::stoi(s); - }; - auto stof_closure = [](const std::string& s) -> auto { - return std::stof(s); - }; + auto stoi_closure = [](const std::string& s) -> int { return std::stoi(s); }; + auto stof_closure = [](const std::string& s) -> float { return std::stof(s); }; /***************************** meta *******************************/ check_parameter(conf, knowhere::meta::DIM, stoi_closure, std::nullopt); diff --git a/internal/util/importutil/import_wrapper.go b/internal/util/importutil/import_wrapper.go index afdd2427f9..0a937aa2f4 100644 --- a/internal/util/importutil/import_wrapper.go +++ b/internal/util/importutil/import_wrapper.go @@ -110,9 +110,21 @@ func getFileNameAndExt(filePath string) (string, string) { } func (p *ImportWrapper) fileValidation(filePaths []string, rowBased bool) error { + // use this map to check duplicate files + files := make(map[string]struct{}) + for i := 0; i < len(filePaths); i++ { filePath := filePaths[i] _, fileType := getFileNameAndExt(filePath) + _, ok := files[filePath] + if ok { + // only check dupliate numpy file + if fileType == NumpyFileExt { + return errors.New("duplicate file: " + filePath) + } + } else { + files[filePath] = struct{}{} + } // check file type if rowBased { @@ -127,6 +139,9 @@ func (p *ImportWrapper) fileValidation(filePaths []string, rowBased bool) error // check file size size, _ := p.chunkManager.Size(filePath) + if size == 0 { + return errors.New("the file " + filePath + " is empty") + } if size > MaxFileSize { return errors.New("the file " + filePath + " size exceeds the maximum file size: " + strconv.FormatInt(MaxFileSize, 10) + " bytes") } @@ -141,6 +156,7 @@ func (p *ImportWrapper) fileValidation(filePaths []string, rowBased bool) error func (p *ImportWrapper) Import(filePaths []string, rowBased bool, onlyValidate bool) error { err := p.fileValidation(filePaths, rowBased) if err != nil { + log.Error("import error: " + err.Error()) return err } @@ -480,7 +496,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F for name, count := range rowCounter { if count != rowCount { - return errors.New("import error: field " + name + " row count " + strconv.Itoa(count) + " is not equal to other fields " + strconv.Itoa(rowCount)) + return errors.New("import error: field " + name + " row count " + strconv.Itoa(count) + " is not equal to other fields row count " + strconv.Itoa(rowCount)) } } diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index 53ca13ab05..2a8098dfee 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "golang.org/x/exp/mmap" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" @@ -25,6 +26,70 @@ const ( TempFilesPath = "/tmp/milvus_test/import/" ) +type MockChunkManager struct { + size int64 +} + +func (mc *MockChunkManager) Path(filePath string) (string, error) { + return "", nil +} + +func (mc *MockChunkManager) Reader(filePath string) (storage.FileReader, error) { + return nil, nil +} + +func (mc *MockChunkManager) Write(filePath string, content []byte) error { + return nil +} + +func (mc *MockChunkManager) MultiWrite(contents map[string][]byte) error { + return nil +} + +func (mc *MockChunkManager) Exist(filePath string) bool { + return true +} + +func (mc *MockChunkManager) Read(filePath string) ([]byte, error) { + return nil, nil +} + +func (mc *MockChunkManager) MultiRead(filePaths []string) ([][]byte, error) { + return nil, nil +} + +func (mc *MockChunkManager) ListWithPrefix(prefix string) ([]string, error) { + return nil, nil +} + +func (mc *MockChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, error) { + return nil, nil, nil +} + +func (mc *MockChunkManager) ReadAt(filePath string, off int64, length int64) ([]byte, error) { + return nil, nil +} + +func (mc *MockChunkManager) Mmap(filePath string) (*mmap.ReaderAt, error) { + return nil, nil +} + +func (mc *MockChunkManager) Size(filePath string) (int64, error) { + return mc.size, nil +} + +func (mc *MockChunkManager) Remove(filePath string) error { + return nil +} + +func (mc *MockChunkManager) MultiRemove(filePaths []string) error { + return nil +} + +func (mc *MockChunkManager) RemoveWithPrefix(prefix string) error { + return nil +} + func Test_NewImportWrapper(t *testing.T) { f := dependency.NewDefaultFactory(true) ctx := context.Background() @@ -591,3 +656,66 @@ func Test_ImportColumnBased_perf(t *testing.T) { tr.Record("parse large json files: " + filePath1 + "," + filePath2) } + +func Test_FileValidation(t *testing.T) { + ctx := context.Background() + + cm := &MockChunkManager{ + size: 1, + } + + idAllocator := newIDAllocator(ctx, t) + schema := perfSchema(128) + shardNum := 2 + segmentSize := 512 // unit: MB + + wrapper := NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil, nil) + + // duplicate files + var files = [2]string{"1.npy", "1.npy"} + err := wrapper.fileValidation(files[:], false) + assert.NotNil(t, err) + err = wrapper.fileValidation(files[:], true) + assert.NotNil(t, err) + + // unsupported file type + files[0] = "1" + files[1] = "1" + err = wrapper.fileValidation(files[:], true) + assert.NotNil(t, err) + + err = wrapper.fileValidation(files[:], false) + assert.NotNil(t, err) + + // valid cases + files[0] = "1.json" + files[1] = "2.json" + err = wrapper.fileValidation(files[:], true) + assert.Nil(t, err) + + files[1] = "2.npy" + err = wrapper.fileValidation(files[:], false) + assert.Nil(t, err) + + // empty file + cm = &MockChunkManager{ + size: 0, + } + wrapper = NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil, nil) + err = wrapper.fileValidation(files[:], true) + assert.NotNil(t, err) + + err = wrapper.fileValidation(files[:], false) + assert.NotNil(t, err) + + // file size exceed limit + cm = &MockChunkManager{ + size: MaxFileSize + 1, + } + wrapper = NewImportWrapper(ctx, schema, int32(shardNum), int64(segmentSize), idAllocator, cm, nil, nil, nil) + err = wrapper.fileValidation(files[:], true) + assert.NotNil(t, err) + + err = wrapper.fileValidation(files[:], false) + assert.NotNil(t, err) +} diff --git a/internal/util/importutil/json_handler.go b/internal/util/importutil/json_handler.go index 1a02e7cb4e..3db0bf92a5 100644 --- a/internal/util/importutil/json_handler.go +++ b/internal/util/importutil/json_handler.go @@ -350,7 +350,7 @@ func (v *JSONColumnValidator) Handle(columns map[storage.FieldID][]interface{}) if rowCount == -1 { rowCount = counter } else if rowCount != counter { - return errors.New("JSON column validator: the field " + k + " row count " + strconv.Itoa(int(counter)) + " is not equal to other fields " + strconv.Itoa(int(rowCount))) + return errors.New("JSON column validator: the field " + k + " row count " + strconv.Itoa(int(counter)) + " is not equal to other fields row count" + strconv.Itoa(int(rowCount))) } }