enhance: Make import-related error message clearer (#28978)

issue: #28976

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/29077/head
cai.zhang 2023-12-08 10:12:38 +08:00 committed by GitHub
parent 464bc9e8f4
commit 2b05460ef9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 135 additions and 112 deletions

View File

@ -103,8 +103,8 @@ func NewBinlogAdapter(ctx context.Context,
} }
// amend the segment size to avoid portential OOM risk // amend the segment size to avoid portential OOM risk
if adapter.blockSize > MaxSegmentSizeInMemory { if adapter.blockSize > Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() {
adapter.blockSize = MaxSegmentSizeInMemory adapter.blockSize = Params.DataCoordCfg.SegmentMaxSize.GetAsInt64()
} }
return adapter, nil return adapter, nil

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
) )
const ( const (
@ -69,6 +70,7 @@ func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) [
func Test_BinlogAdapterNew(t *testing.T) { func Test_BinlogAdapterNew(t *testing.T) {
ctx := context.Background() ctx := context.Background()
paramtable.Init()
// nil schema // nil schema
adapter, err := NewBinlogAdapter(ctx, nil, 1024, 2048, nil, nil, 0, math.MaxUint64) adapter, err := NewBinlogAdapter(ctx, nil, 1024, 2048, nil, nil, 0, math.MaxUint64)
@ -103,10 +105,10 @@ func Test_BinlogAdapterNew(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
// amend blockSize, blockSize should less than MaxSegmentSizeInMemory // amend blockSize, blockSize should less than MaxSegmentSizeInMemory
adapter, err = NewBinlogAdapter(ctx, collectionInfo, MaxSegmentSizeInMemory+1, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64) adapter, err = NewBinlogAdapter(ctx, collectionInfo, Params.DataCoordCfg.SegmentMaxSize.GetAsInt64()+1, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64)
assert.NotNil(t, adapter) assert.NotNil(t, adapter)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, int64(MaxSegmentSizeInMemory), adapter.blockSize) assert.Equal(t, Params.DataCoordCfg.SegmentMaxSize.GetAsInt64(), adapter.blockSize)
} }
func Test_BinlogAdapterVerify(t *testing.T) { func Test_BinlogAdapterVerify(t *testing.T) {

View File

@ -225,7 +225,7 @@ func (p *BinlogParser) parseSegmentFiles(segmentHolder *SegmentFilesHolder) erro
} }
adapter, err := NewBinlogAdapter(p.ctx, p.collectionInfo, p.blockSize, adapter, err := NewBinlogAdapter(p.ctx, p.collectionInfo, p.blockSize,
MaxTotalSizeInMemory, p.chunkManager, p.callFlushFunc, p.tsStartPoint, p.tsEndPoint) Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), p.chunkManager, p.callFlushFunc, p.tsStartPoint, p.tsEndPoint)
if err != nil { if err != nil {
log.Warn("Binlog parser: failed to create binlog adapter", zap.Error(err)) log.Warn("Binlog parser: failed to create binlog adapter", zap.Error(err))
return merr.WrapErrImportFailed(fmt.Sprintf("failed to create binlog adapter, error: %v", err)) return merr.WrapErrImportFailed(fmt.Sprintf("failed to create binlog adapter, error: %v", err))

View File

@ -28,10 +28,10 @@ import (
"github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/timerecord"
) )
@ -41,20 +41,6 @@ const (
NumpyFileExt = ".npy" NumpyFileExt = ".npy"
ParquetFileExt = ".parquet" ParquetFileExt = ".parquet"
// parsers read JSON/Numpy/CSV files buffer by buffer, this limitation is to define the buffer size.
ReadBufferSize = 16 * 1024 * 1024 // 16MB
// this limitation is to avoid this OOM risk:
// simetimes system segment max size is a large number, a single segment fields data might cause OOM.
// flush the segment when its data reach this limitation, let the compaction to compact it later.
MaxSegmentSizeInMemory = 512 * 1024 * 1024 // 512MB
// this limitation is to avoid this OOM risk:
// if the shard number is a large number, although single segment size is small, but there are lot of in-memory segments,
// the total memory size might cause OOM.
// TODO: make it configurable.
MaxTotalSizeInMemory = 6 * 1024 * 1024 * 1024 // 6GB
// progress percent value of persist state // progress percent value of persist state
ProgressValueForPersist = 90 ProgressValueForPersist = 90
@ -67,6 +53,8 @@ const (
ProgressPercent = "progress_percent" ProgressPercent = "progress_percent"
) )
var Params *paramtable.ComponentParam = paramtable.Get()
// ReportImportAttempts is the maximum # of attempts to retry when import fails. // ReportImportAttempts is the maximum # of attempts to retry when import fails.
var ReportImportAttempts uint = 10 var ReportImportAttempts uint = 10
@ -126,8 +114,8 @@ func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segme
// average binlogSize is expected to be half of the maxBinlogSize // average binlogSize is expected to be half of the maxBinlogSize
// and avoid binlogSize to be a tiny value // and avoid binlogSize to be a tiny value
binlogSize := int64(float32(maxBinlogSize) * 0.5) binlogSize := int64(float32(maxBinlogSize) * 0.5)
if binlogSize < ReadBufferSize { if binlogSize < Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64() {
binlogSize = ReadBufferSize binlogSize = Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64()
} }
wrapper := &ImportWrapper{ wrapper := &ImportWrapper{
@ -234,11 +222,11 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
return rowBased, merr.WrapErrImportFailed(fmt.Sprintf("the file '%s' size is zero", filePath)) return rowBased, merr.WrapErrImportFailed(fmt.Sprintf("the file '%s' size is zero", filePath))
} }
if size > params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64() { if size > Params.CommonCfg.ImportMaxFileSize.GetAsInt64() {
log.Warn("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath), log.Warn("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath),
zap.Int64("fileSize", size), zap.String("MaxFileSize", params.Params.CommonCfg.ImportMaxFileSize.GetValue())) zap.Int64("fileSize", size), zap.String("MaxFileSize", Params.CommonCfg.ImportMaxFileSize.GetValue()))
return rowBased, merr.WrapErrImportFailed(fmt.Sprintf("the file '%s' size exceeds the maximum size: %s bytes", return rowBased, merr.WrapErrImportFailed(fmt.Sprintf("the file '%s' size exceeds the maximum size: %s bytes",
filePath, params.Params.CommonCfg.ImportMaxFileSize.GetValue())) filePath, Params.CommonCfg.ImportMaxFileSize.GetValue()))
} }
totalSize += size totalSize += size
} }

View File

@ -37,7 +37,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
@ -190,7 +189,7 @@ func Test_ImportWrapperNew(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx) cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, nil, 1, ReadBufferSize, nil, cm, nil, nil) wrapper := NewImportWrapper(ctx, nil, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, cm, nil, nil)
assert.Nil(t, wrapper) assert.Nil(t, wrapper)
schema := &schemapb.CollectionSchema{ schema := &schemapb.CollectionSchema{
@ -210,7 +209,7 @@ func Test_ImportWrapperNew(t *testing.T) {
}) })
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1}) collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper = NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, cm, nil, nil) wrapper = NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, cm, nil, nil)
assert.NotNil(t, wrapper) assert.NotNil(t, wrapper)
assignSegFunc := func(shardID int, partID int64) (int64, string, error) { assignSegFunc := func(shardID int, partID int64) (int64, string, error) {
@ -287,7 +286,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
t.Run("success case", func(t *testing.T) { t.Run("success case", func(t *testing.T) {
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0) files := make([]string, 0)
files = append(files, filePath) files = append(files, filePath)
@ -313,7 +312,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
importResult.State = commonpb.ImportState_ImportStarted importResult.State = commonpb.ImportState_ImportStarted
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0) files := make([]string, 0)
files = append(files, filePath) files = append(files, filePath)
@ -325,7 +324,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
t.Run("file doesn't exist", func(t *testing.T) { t.Run("file doesn't exist", func(t *testing.T) {
files := make([]string, 0) files := make([]string, 0)
files = append(files, "/dummy/dummy.json") files = append(files, "/dummy/dummy.json")
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
err = wrapper.Import(files, ImportOptions{OnlyValidate: true}) err = wrapper.Import(files, ImportOptions{OnlyValidate: true})
assert.Error(t, err) assert.Error(t, err)
}) })
@ -368,7 +367,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
files := createSampleNumpyFiles(t, cm) files := createSampleNumpyFiles(t, cm)
t.Run("success case", func(t *testing.T) { t.Run("success case", func(t *testing.T) {
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
err = wrapper.Import(files, DefaultImportOptions()) err = wrapper.Import(files, DefaultImportOptions())
@ -386,7 +385,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
files[1] = filePath files[1] = filePath
importResult.State = commonpb.ImportState_ImportStarted importResult.State = commonpb.ImportState_ImportStarted
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
err = wrapper.Import(files, DefaultImportOptions()) err = wrapper.Import(files, DefaultImportOptions())
@ -397,7 +396,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
t.Run("file doesn't exist", func(t *testing.T) { t.Run("file doesn't exist", func(t *testing.T) {
files := make([]string, 0) files := make([]string, 0)
files = append(files, "/dummy/dummy.npy") files = append(files, "/dummy/dummy.npy")
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
err = wrapper.Import(files, DefaultImportOptions()) err = wrapper.Import(files, DefaultImportOptions())
assert.Error(t, err) assert.Error(t, err)
}) })
@ -517,7 +516,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
} }
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1}) collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0) files := make([]string, 0)
@ -561,7 +560,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1}) collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil) wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)
t.Run("unsupported file type", func(t *testing.T) { t.Run("unsupported file type", func(t *testing.T) {
files := []string{"uid.txt"} files := []string{"uid.txt"}
@ -611,7 +610,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("empty file list", func(t *testing.T) { t.Run("empty file list", func(t *testing.T) {
files := []string{} files := []string{}
cm.size = 0 cm.size = 0
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil) wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files) rowBased, err := wrapper.fileValidation(files)
assert.NoError(t, err) assert.NoError(t, err)
assert.False(t, rowBased) assert.False(t, rowBased)
@ -619,8 +618,8 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("file size exceed MaxFileSize limit", func(t *testing.T) { t.Run("file size exceed MaxFileSize limit", func(t *testing.T) {
files := []string{"a/1.json"} files := []string{"a/1.json"}
cm.size = params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64() + 1 cm.size = Params.CommonCfg.ImportMaxFileSize.GetAsInt64() + 1
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil) wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files) rowBased, err := wrapper.fileValidation(files)
assert.Error(t, err) assert.Error(t, err)
assert.True(t, rowBased) assert.True(t, rowBased)
@ -691,7 +690,7 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
} }
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1}) collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := []string{filePath} files := []string{filePath}
@ -738,7 +737,7 @@ func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) {
} }
collectionInfo, err := NewCollectionInfo(createNumpySchema(), 2, []int64{1}) collectionInfo, err := NewCollectionInfo(createNumpySchema(), 2, []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
wrapper.reportImportAttempts = 2 wrapper.reportImportAttempts = 2
@ -773,7 +772,7 @@ func Test_ImportWrapperIsBinlogImport(t *testing.T) {
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1}) collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil) wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)
// empty paths // empty paths
paths := []string{} paths := []string{}
@ -837,7 +836,7 @@ func Test_ImportWrapperDoBinlogImport(t *testing.T) {
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1}) collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil) wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), idAllocator, cm, nil, nil)
paths := []string{ paths := []string{
"/tmp", "/tmp",
"/tmp", "/tmp",
@ -900,7 +899,7 @@ func Test_ImportWrapperReportPersisted(t *testing.T) {
} }
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1}) collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(1024), ReadBufferSize, nil, nil, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, int64(1024), Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper) assert.NotNil(t, wrapper)
rowCounter := &rowCounterTest{} rowCounter := &rowCounterTest{}
@ -943,7 +942,7 @@ func Test_ImportWrapperUpdateProgressPercent(t *testing.T) {
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1}) collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, nil, nil, nil) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, nil, nil, nil)
assert.NotNil(t, wrapper) assert.NotNil(t, wrapper)
assert.Equal(t, int64(0), wrapper.progressPercent) assert.Equal(t, int64(0), wrapper.progressPercent)
@ -982,7 +981,7 @@ func Test_ImportWrapperFlushFunc(t *testing.T) {
schema := sampleSchema() schema := sampleSchema()
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1}) collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err) assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, nil, importResult, reportFunc) wrapper := NewImportWrapper(ctx, collectionInfo, 1, Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt64(), nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper) assert.NotNil(t, wrapper)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc) wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

View File

@ -130,7 +130,7 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
// if rows is nil, that means read to end of file, force flush all data // if rows is nil, that means read to end of file, force flush all data
if rows == nil { if rows == nil {
err := tryFlushBlocks(v.ctx, v.shardsData, v.collectionInfo.Schema, v.callFlushFunc, v.blockSize, MaxTotalSizeInMemory, true) err := tryFlushBlocks(v.ctx, v.shardsData, v.collectionInfo.Schema, v.callFlushFunc, v.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), true)
log.Info("JSON row consumer finished") log.Info("JSON row consumer finished")
return err return err
} }
@ -138,7 +138,7 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
// rows is not nil, flush in necessary: // rows is not nil, flush in necessary:
// 1. data block size larger than v.blockSize will be flushed // 1. data block size larger than v.blockSize will be flushed
// 2. total data size exceeds MaxTotalSizeInMemory, the largest data block will be flushed // 2. total data size exceeds MaxTotalSizeInMemory, the largest data block will be flushed
err := tryFlushBlocks(v.ctx, v.shardsData, v.collectionInfo.Schema, v.callFlushFunc, v.blockSize, MaxTotalSizeInMemory, false) err := tryFlushBlocks(v.ctx, v.shardsData, v.collectionInfo.Schema, v.callFlushFunc, v.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), false)
if err != nil { if err != nil {
log.Warn("JSON row consumer: try flush data but failed", zap.Error(err)) log.Warn("JSON row consumer: try flush data but failed", zap.Error(err))
return merr.WrapErrImportFailed(fmt.Sprintf("try flush data but failed, error: %v", err)) return merr.WrapErrImportFailed(fmt.Sprintf("try flush data but failed, error: %v", err))

View File

@ -73,7 +73,7 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche
// for low dimensional vector, the bufSize is a large value, read more rows each time // for low dimensional vector, the bufSize is a large value, read more rows each time
bufRowCount := parser.bufRowCount bufRowCount := parser.bufRowCount
for { for {
if bufRowCount*sizePerRecord > ReadBufferSize { if bufRowCount*sizePerRecord > Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt() {
bufRowCount-- bufRowCount--
} else { } else {
break break

View File

@ -563,9 +563,9 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
// read string one by one is not efficient, here we read strings batch by batch, each bach size is no more than 16MB // read string one by one is not efficient, here we read strings batch by batch, each bach size is no more than 16MB
batchRead := 1 // rows of each batch, make sure this value is equal or greater than 1 batchRead := 1 // rows of each batch, make sure this value is equal or greater than 1
if utf { if utf {
batchRead += ReadBufferSize / (utf8.UTFMax * maxLen) batchRead += Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt() / (utf8.UTFMax * maxLen)
} else { } else {
batchRead += ReadBufferSize / maxLen batchRead += Params.DataNodeCfg.BulkInsertReadBufferSize.GetAsInt() / maxLen
} }
log.Info("Numpy adapter: prepare to read varchar batch by batch", log.Info("Numpy adapter: prepare to read varchar batch by batch",

View File

@ -439,7 +439,7 @@ func (p *NumpyParser) consume(columnReaders []*NumpyColumnReader) error {
} }
tr.Record("splitFieldsData") tr.Record("splitFieldsData")
// when the estimated size is close to blockSize, save to binlog // when the estimated size is close to blockSize, save to binlog
err = tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, MaxTotalSizeInMemory, false) err = tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), false)
if err != nil { if err != nil {
return err return err
} }
@ -447,7 +447,7 @@ func (p *NumpyParser) consume(columnReaders []*NumpyColumnReader) error {
} }
// force flush at the end // force flush at the end
return tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, MaxTotalSizeInMemory, true) return tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), true)
} }
// readData method reads numpy data section into a storage.FieldData // readData method reads numpy data section into a storage.FieldData

View File

@ -94,7 +94,7 @@ func NewParquetParser(ctx context.Context,
return nil, err return nil, err
} }
fileReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{BatchSize: 1}, memory.DefaultAllocator) fileReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
if err != nil { if err != nil {
log.Warn("create arrow parquet file reader failed", zap.Error(err)) log.Warn("create arrow parquet file reader failed", zap.Error(err))
return nil, err return nil, err
@ -165,10 +165,22 @@ func (p *ParquetParser) createReaders() error {
return merr.WrapErrImportFailed(fmt.Sprintf("there is multi field of fieldName: %s", field.GetName())) return merr.WrapErrImportFailed(fmt.Sprintf("there is multi field of fieldName: %s", field.GetName()))
} }
if !verifyFieldSchema(field.GetDataType(), field.GetElementType(), fields[0]) { if !verifyFieldSchema(field.GetDataType(), field.GetElementType(), fields[0]) {
if fields[0].Type.ID() == arrow.LIST {
log.Warn("field schema is not match",
zap.String("fieldName", field.GetName()),
zap.String("collection schema", field.GetDataType().String()),
zap.String("file schema", fields[0].Type.Name()),
zap.String("collection schema element type", field.GetElementType().String()),
zap.String("file list element type", fields[0].Type.(*arrow.ListType).ElemField().Type.Name()))
return merr.WrapErrImportFailed(fmt.Sprintf("array field schema is not match of field: %s, collection field element dataType: %s, file field element dataType:%s",
field.GetName(), field.GetElementType().String(), fields[0].Type.(*arrow.ListType).ElemField().Type.Name()))
}
log.Warn("field schema is not match", log.Warn("field schema is not match",
zap.String("fieldName", field.GetName()),
zap.String("collection schema", field.GetDataType().String()), zap.String("collection schema", field.GetDataType().String()),
zap.String("file schema", fields[0].Type.Name())) zap.String("file schema", fields[0].Type.Name()))
return merr.WrapErrImportFailed(fmt.Sprintf("field schema is not match, collection field dataType: %s, file field dataType:%s", field.GetDataType().String(), fields[0].Type.Name())) return merr.WrapErrImportFailed(fmt.Sprintf("schema is not match of field: %s, collection field dataType: %s, file field dataType:%s",
field.GetName(), field.GetDataType().String(), fields[0].Type.Name()))
} }
indices := schema.FieldIndices(field.GetName()) indices := schema.FieldIndices(field.GetName())
if len(indices) != 1 { if len(indices) != 1 {
@ -315,7 +327,7 @@ func (p *ParquetParser) consume() error {
} }
tr.Record("splitFieldsData") tr.Record("splitFieldsData")
// when the estimated size is close to blockSize, save to binlog // when the estimated size is close to blockSize, save to binlog
err = tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, MaxTotalSizeInMemory, false) err = tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), false)
if err != nil { if err != nil {
return err return err
} }
@ -323,7 +335,7 @@ func (p *ParquetParser) consume() error {
} }
// force flush at the end // force flush at the end
return tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, MaxTotalSizeInMemory, true) return tryFlushBlocks(p.ctx, shards, p.collectionInfo.Schema, p.callFlushFunc, p.blockSize, Params.DataNodeCfg.BulkInsertMaxMemorySize.GetAsInt64(), true)
} }
// readData method reads Parquet data section into a storage.FieldData // readData method reads Parquet data section into a storage.FieldData
@ -332,13 +344,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Bool: case schemapb.DataType_Bool:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]bool, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]bool, error) {
boolReader, ok := chunk.(*array.Boolean) boolReader, ok := chunk.(*array.Boolean)
boolData := make([]bool, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not bool", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not bool", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not bool of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not bool of field: %s", columnReader.fieldName))
} }
boolData := make([]bool, boolReader.Data().Len())
for i := 0; i < boolReader.Data().Len(); i++ { for i := 0; i < boolReader.Data().Len(); i++ {
boolData = append(boolData, boolReader.Value(i)) boolData[i] = boolReader.Value(i)
} }
return boolData, nil return boolData, nil
}) })
@ -353,13 +365,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Int8: case schemapb.DataType_Int8:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int8, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int8, error) {
int8Reader, ok := chunk.(*array.Int8) int8Reader, ok := chunk.(*array.Int8)
int8Data := make([]int8, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not int8", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not int8", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int8 of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int8 of field: %s", columnReader.fieldName))
} }
int8Data := make([]int8, int8Reader.Data().Len())
for i := 0; i < int8Reader.Data().Len(); i++ { for i := 0; i < int8Reader.Data().Len(); i++ {
int8Data = append(int8Data, int8Reader.Value(i)) int8Data[i] = int8Reader.Value(i)
} }
return int8Data, nil return int8Data, nil
}) })
@ -374,13 +386,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Int16: case schemapb.DataType_Int16:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int16, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int16, error) {
int16Reader, ok := chunk.(*array.Int16) int16Reader, ok := chunk.(*array.Int16)
int16Data := make([]int16, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not int16", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not int16", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int16 of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int16 of field: %s", columnReader.fieldName))
} }
int16Data := make([]int16, int16Reader.Data().Len())
for i := 0; i < int16Reader.Data().Len(); i++ { for i := 0; i < int16Reader.Data().Len(); i++ {
int16Data = append(int16Data, int16Reader.Value(i)) int16Data[i] = int16Reader.Value(i)
} }
return int16Data, nil return int16Data, nil
}) })
@ -395,13 +407,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Int32: case schemapb.DataType_Int32:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int32, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int32, error) {
int32Reader, ok := chunk.(*array.Int32) int32Reader, ok := chunk.(*array.Int32)
int32Data := make([]int32, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not int32", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not int32", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int32 of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int32 of field: %s", columnReader.fieldName))
} }
int32Data := make([]int32, int32Reader.Data().Len())
for i := 0; i < int32Reader.Data().Len(); i++ { for i := 0; i < int32Reader.Data().Len(); i++ {
int32Data = append(int32Data, int32Reader.Value(i)) int32Data[i] = int32Reader.Value(i)
} }
return int32Data, nil return int32Data, nil
}) })
@ -416,13 +428,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Int64: case schemapb.DataType_Int64:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int64, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int64, error) {
int64Reader, ok := chunk.(*array.Int64) int64Reader, ok := chunk.(*array.Int64)
int64Data := make([]int64, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not int64", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not int64", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int64 of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int64 of field: %s", columnReader.fieldName))
} }
int64Data := make([]int64, int64Reader.Data().Len())
for i := 0; i < int64Reader.Data().Len(); i++ { for i := 0; i < int64Reader.Data().Len(); i++ {
int64Data = append(int64Data, int64Reader.Value(i)) int64Data[i] = int64Reader.Value(i)
} }
return int64Data, nil return int64Data, nil
}) })
@ -437,13 +449,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Float: case schemapb.DataType_Float:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]float32, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]float32, error) {
float32Reader, ok := chunk.(*array.Float32) float32Reader, ok := chunk.(*array.Float32)
float32Data := make([]float32, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not float", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not float", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not float of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not float of field: %s", columnReader.fieldName))
} }
float32Data := make([]float32, float32Reader.Data().Len())
for i := 0; i < float32Reader.Data().Len(); i++ { for i := 0; i < float32Reader.Data().Len(); i++ {
float32Data = append(float32Data, float32Reader.Value(i)) float32Data[i] = float32Reader.Value(i)
} }
return float32Data, nil return float32Data, nil
}) })
@ -464,13 +476,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Double: case schemapb.DataType_Double:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]float64, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]float64, error) {
float64Reader, ok := chunk.(*array.Float64) float64Reader, ok := chunk.(*array.Float64)
float64Data := make([]float64, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not double", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not double", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not double of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not double of field: %s", columnReader.fieldName))
} }
float64Data := make([]float64, float64Reader.Data().Len())
for i := 0; i < float64Reader.Data().Len(); i++ { for i := 0; i < float64Reader.Data().Len(); i++ {
float64Data = append(float64Data, float64Reader.Value(i)) float64Data[i] = float64Reader.Value(i)
} }
return float64Data, nil return float64Data, nil
}) })
@ -491,13 +503,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_VarChar, schemapb.DataType_String: case schemapb.DataType_VarChar, schemapb.DataType_String:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]string, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]string, error) {
stringReader, ok := chunk.(*array.String) stringReader, ok := chunk.(*array.String)
stringData := make([]string, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not string", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not string", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not string of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not string of field: %s", columnReader.fieldName))
} }
stringData := make([]string, stringReader.Data().Len())
for i := 0; i < stringReader.Data().Len(); i++ { for i := 0; i < stringReader.Data().Len(); i++ {
stringData = append(stringData, stringReader.Value(i)) stringData[i] = stringReader.Value(i)
} }
return stringData, nil return stringData, nil
}) })
@ -513,13 +525,13 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
// JSON field read data from string array Parquet // JSON field read data from string array Parquet
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]string, error) { data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]string, error) {
stringReader, ok := chunk.(*array.String) stringReader, ok := chunk.(*array.String)
stringData := make([]string, 0)
if !ok { if !ok {
log.Warn("the column data in parquet is not json string", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column data in parquet is not json string", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not json string of field: %s", columnReader.fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not json string of field: %s", columnReader.fieldName))
} }
stringData := make([]string, stringReader.Data().Len())
for i := 0; i < stringReader.Data().Len(); i++ { for i := 0; i < stringReader.Data().Len(); i++ {
stringData = append(stringData, stringReader.Value(i)) stringData[i] = stringReader.Value(i)
} }
return stringData, nil return stringData, nil
}) })
@ -545,7 +557,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
}, nil }, nil
case schemapb.DataType_BinaryVector: case schemapb.DataType_BinaryVector:
data, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]uint8, error) { data, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]uint8, error) {
arrayData := make([][]uint8, 0) arrayData := make([][]uint8, 0, len(offsets))
uint8Reader, ok := reader.(*array.Uint8) uint8Reader, ok := reader.(*array.Uint8)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not binary", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not binary", zap.String("fieldName", columnReader.fieldName))
@ -553,7 +565,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]uint8, 0) elementData := make([]uint8, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, uint8Reader.Value(int(j))) elementData = append(elementData, uint8Reader.Value(int(j)))
} }
@ -586,7 +598,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
rowNum := 0 rowNum := 0
if columnReader.columnReader.Field().Type.(*arrow.ListType).Elem().ID() == arrow.FLOAT32 { if columnReader.columnReader.Field().Type.(*arrow.ListType).Elem().ID() == arrow.FLOAT32 {
arrayData, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float32, error) { arrayData, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float32, error) {
arrayData := make([][]float32, 0) arrayData := make([][]float32, 0, len(offsets))
float32Reader, ok := reader.(*array.Float32) float32Reader, ok := reader.(*array.Float32)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not float", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not float", zap.String("fieldName", columnReader.fieldName))
@ -594,7 +606,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]float32, 0) elementData := make([]float32, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, float32Reader.Value(int(j))) elementData = append(elementData, float32Reader.Value(int(j)))
} }
@ -617,7 +629,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
rowNum = len(arrayData) rowNum = len(arrayData)
} else if columnReader.columnReader.Field().Type.(*arrow.ListType).Elem().ID() == arrow.FLOAT64 { } else if columnReader.columnReader.Field().Type.(*arrow.ListType).Elem().ID() == arrow.FLOAT64 {
arrayData, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float64, error) { arrayData, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float64, error) {
arrayData := make([][]float64, 0) arrayData := make([][]float64, 0, len(offsets))
float64Reader, ok := reader.(*array.Float64) float64Reader, ok := reader.(*array.Float64)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not double", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not double", zap.String("fieldName", columnReader.fieldName))
@ -625,7 +637,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]float64, 0) elementData := make([]float64, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, float64Reader.Value(int(j))) elementData = append(elementData, float64Reader.Value(int(j)))
} }
@ -671,7 +683,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
switch columnReader.elementType { switch columnReader.elementType {
case schemapb.DataType_Bool: case schemapb.DataType_Bool:
boolArray, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]bool, error) { boolArray, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]bool, error) {
arrayData := make([][]bool, 0) arrayData := make([][]bool, 0, len(offsets))
boolReader, ok := reader.(*array.Boolean) boolReader, ok := reader.(*array.Boolean)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not bool", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not bool", zap.String("fieldName", columnReader.fieldName))
@ -679,7 +691,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]bool, 0) elementData := make([]bool, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, boolReader.Value(int(j))) elementData = append(elementData, boolReader.Value(int(j)))
} }
@ -701,7 +713,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
case schemapb.DataType_Int8: case schemapb.DataType_Int8:
int8Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) { int8Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) {
arrayData := make([][]int32, 0) arrayData := make([][]int32, 0, len(offsets))
int8Reader, ok := reader.(*array.Int8) int8Reader, ok := reader.(*array.Int8)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not int8", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not int8", zap.String("fieldName", columnReader.fieldName))
@ -709,7 +721,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]int32, 0) elementData := make([]int32, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, int32(int8Reader.Value(int(j)))) elementData = append(elementData, int32(int8Reader.Value(int(j))))
} }
@ -731,7 +743,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
case schemapb.DataType_Int16: case schemapb.DataType_Int16:
int16Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) { int16Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) {
arrayData := make([][]int32, 0) arrayData := make([][]int32, 0, len(offsets))
int16Reader, ok := reader.(*array.Int16) int16Reader, ok := reader.(*array.Int16)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not int16", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not int16", zap.String("fieldName", columnReader.fieldName))
@ -739,7 +751,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]int32, 0) elementData := make([]int32, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, int32(int16Reader.Value(int(j)))) elementData = append(elementData, int32(int16Reader.Value(int(j))))
} }
@ -762,7 +774,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Int32: case schemapb.DataType_Int32:
int32Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) { int32Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) {
arrayData := make([][]int32, 0) arrayData := make([][]int32, 0, len(offsets))
int32Reader, ok := reader.(*array.Int32) int32Reader, ok := reader.(*array.Int32)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not int32", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not int32", zap.String("fieldName", columnReader.fieldName))
@ -770,7 +782,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]int32, 0) elementData := make([]int32, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, int32Reader.Value(int(j))) elementData = append(elementData, int32Reader.Value(int(j)))
} }
@ -793,7 +805,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Int64: case schemapb.DataType_Int64:
int64Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int64, error) { int64Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int64, error) {
arrayData := make([][]int64, 0) arrayData := make([][]int64, 0, len(offsets))
int64Reader, ok := reader.(*array.Int64) int64Reader, ok := reader.(*array.Int64)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not int64", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not int64", zap.String("fieldName", columnReader.fieldName))
@ -801,7 +813,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]int64, 0) elementData := make([]int64, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, int64Reader.Value(int(j))) elementData = append(elementData, int64Reader.Value(int(j)))
} }
@ -824,7 +836,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Float: case schemapb.DataType_Float:
float32Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float32, error) { float32Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float32, error) {
arrayData := make([][]float32, 0) arrayData := make([][]float32, 0, len(offsets))
float32Reader, ok := reader.(*array.Float32) float32Reader, ok := reader.(*array.Float32)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not float", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not float", zap.String("fieldName", columnReader.fieldName))
@ -832,7 +844,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]float32, 0) elementData := make([]float32, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, float32Reader.Value(int(j))) elementData = append(elementData, float32Reader.Value(int(j)))
} }
@ -855,7 +867,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_Double: case schemapb.DataType_Double:
float64Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float64, error) { float64Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float64, error) {
arrayData := make([][]float64, 0) arrayData := make([][]float64, 0, len(offsets))
float64Reader, ok := reader.(*array.Float64) float64Reader, ok := reader.(*array.Float64)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not double", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not double", zap.String("fieldName", columnReader.fieldName))
@ -863,7 +875,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]float64, 0) elementData := make([]float64, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, float64Reader.Value(int(j))) elementData = append(elementData, float64Reader.Value(int(j)))
} }
@ -886,7 +898,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
case schemapb.DataType_VarChar, schemapb.DataType_String: case schemapb.DataType_VarChar, schemapb.DataType_String:
stringArray, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]string, error) { stringArray, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]string, error) {
arrayData := make([][]string, 0) arrayData := make([][]string, 0, len(offsets))
stringReader, ok := reader.(*array.String) stringReader, ok := reader.(*array.String)
if !ok { if !ok {
log.Warn("the column element data of array in parquet is not string", zap.String("fieldName", columnReader.fieldName)) log.Warn("the column element data of array in parquet is not string", zap.String("fieldName", columnReader.fieldName))
@ -894,7 +906,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
} }
for i := 1; i < len(offsets); i++ { for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i] start, end := offsets[i-1], offsets[i]
elementData := make([]string, 0) elementData := make([]string, 0, end-start)
for j := start; j < end; j++ { for j := start; j < end; j++ {
elementData = append(elementData, stringReader.Value(int(j))) elementData = append(elementData, stringReader.Value(int(j)))
} }
@ -917,7 +929,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
default: default:
log.Warn("unsupported element type", zap.String("element type", columnReader.elementType.String()), log.Warn("unsupported element type", zap.String("element type", columnReader.elementType.String()),
zap.String("fieldName", columnReader.fieldName)) zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type: %s of array", columnReader.elementType.String())) return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type: %s of array field: %s", columnReader.elementType.String(), columnReader.fieldName))
} }
return &storage.ArrayFieldData{ return &storage.ArrayFieldData{
ElementType: columnReader.elementType, ElementType: columnReader.elementType,
@ -927,6 +939,6 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
log.Warn("Parquet parser: unsupported data type of field", log.Warn("Parquet parser: unsupported data type of field",
zap.String("dataType", columnReader.dataType.String()), zap.String("dataType", columnReader.dataType.String()),
zap.String("fieldName", columnReader.fieldName)) zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type: %s", columnReader.elementType.String())) return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type: %s of field: %s", columnReader.elementType.String(), columnReader.fieldName))
} }
} }

View File

@ -284,7 +284,7 @@ func convertMilvusSchemaToArrowSchema(schema *schemapb.CollectionSchema) *arrow.
return arrow.NewSchema(fields, nil) return arrow.NewSchema(fields, nil)
} }
func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int) arrow.Array { func buildArrayData(dataType, elementType schemapb.DataType, dim, rows, arrLen int) arrow.Array {
mem := memory.NewGoAllocator() mem := memory.NewGoAllocator()
switch dataType { switch dataType {
case schemapb.DataType_Bool: case schemapb.DataType_Bool:
@ -372,11 +372,11 @@ func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int) arro
valid := make([]bool, 0, rows) valid := make([]bool, 0, rows)
index := 0 index := 0
for i := 0; i < rows; i++ { for i := 0; i < rows; i++ {
index += i index += arrLen
offsets = append(offsets, int32(index)) offsets = append(offsets, int32(index))
valid = append(valid, true) valid = append(valid, true)
} }
index += rows index += arrLen
switch elementType { switch elementType {
case schemapb.DataType_Bool: case schemapb.DataType_Bool:
builder := array.NewListBuilder(mem, &arrow.BooleanType{}) builder := array.NewListBuilder(mem, &arrow.BooleanType{})
@ -449,23 +449,27 @@ func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int) arro
func writeParquet(w io.Writer, milvusSchema *schemapb.CollectionSchema, numRows int) error { func writeParquet(w io.Writer, milvusSchema *schemapb.CollectionSchema, numRows int) error {
schema := convertMilvusSchemaToArrowSchema(milvusSchema) schema := convertMilvusSchemaToArrowSchema(milvusSchema)
columns := make([]arrow.Array, 0, len(milvusSchema.Fields))
for _, field := range milvusSchema.Fields {
dim, _ := getFieldDimension(field)
columnData := buildArrayData(field.DataType, field.ElementType, dim, numRows)
columns = append(columns, columnData)
}
recordBatch := array.NewRecord(schema, columns, int64(numRows))
fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps()) fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
if err != nil { if err != nil {
return err return err
} }
defer fw.Close() defer fw.Close()
err = fw.Write(recordBatch) batch := 1000
if err != nil { for i := 0; i <= numRows/batch; i++ {
return err columns := make([]arrow.Array, 0, len(milvusSchema.Fields))
for _, field := range milvusSchema.Fields {
dim, _ := getFieldDimension(field)
columnData := buildArrayData(field.DataType, field.ElementType, dim, batch, 10)
columns = append(columns, columnData)
}
recordBatch := array.NewRecord(schema, columns, int64(batch))
err = fw.Write(recordBatch)
if err != nil {
return err
}
} }
return nil return nil
} }

View File

@ -2590,6 +2590,8 @@ type dataNodeConfig struct {
// timeout for bulkinsert // timeout for bulkinsert
BulkInsertTimeoutSeconds ParamItem `refreshable:"true"` BulkInsertTimeoutSeconds ParamItem `refreshable:"true"`
BulkInsertReadBufferSize ParamItem `refreshable:"true"`
BulkInsertMaxMemorySize ParamItem `refreshable:"true"`
// Skip BF // Skip BF
SkipBFStatsLoad ParamItem `refreshable:"true"` SkipBFStatsLoad ParamItem `refreshable:"true"`
@ -2781,6 +2783,22 @@ func (p *dataNodeConfig) init(base *BaseTable) {
} }
p.BulkInsertTimeoutSeconds.Init(base.mgr) p.BulkInsertTimeoutSeconds.Init(base.mgr)
p.BulkInsertReadBufferSize = ParamItem{
Key: "datanode.bulkinsert.readBufferSize",
Version: "2.3.4",
PanicIfEmpty: false,
DefaultValue: "16777216",
}
p.BulkInsertReadBufferSize.Init(base.mgr)
p.BulkInsertMaxMemorySize = ParamItem{
Key: "datanode.bulkinsert.maxMemorySize",
Version: "2.3.4",
PanicIfEmpty: false,
DefaultValue: "6442450944",
}
p.BulkInsertMaxMemorySize.Init(base.mgr)
p.ChannelWorkPoolSize = ParamItem{ p.ChannelWorkPoolSize = ParamItem{
Key: "datanode.channel.workPoolSize", Key: "datanode.channel.workPoolSize",
Version: "2.3.2", Version: "2.3.2",