Fix bulkinsert bug that segments are compacted after import (#28227)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
pull/28267/head
groot 2023-11-08 10:18:20 +08:00 committed by GitHub
parent cc3dd6eda4
commit 29e66ed46b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 30 deletions

View File

@ -499,8 +499,8 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
// parse files and generate segments
segmentSize := Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
importWrapper := importutil.NewImportWrapper(newCtx, collectionInfo, segmentSize, node.allocator.GetIDAlloactor(),
node.chunkManager, importResult, reportFunc)
importWrapper := importutil.NewImportWrapper(newCtx, collectionInfo, segmentSize, Params.DataNodeCfg.BinLogMaxSize.GetAsInt64(),
node.allocator.GetIDAlloactor(), node.chunkManager, importResult, reportFunc)
importWrapper.SetCallbackFunctions(assignSegmentFunc(node, req),
createBinLogsFunc(node, req, colInfo.GetSchema(), ts),
saveSegmentFunc(node, req, importResult, ts))

View File

@ -39,8 +39,8 @@ const (
JSONFileExt = ".json"
NumpyFileExt = ".npy"
// supposed size of a single block, to control a binlog file size, the max biglog file size is no more than 2*SingleBlockSize
SingleBlockSize = 16 * 1024 * 1024 // 16MB
// parsers read JSON/Numpy/CSV files buffer by buffer, this limitation is to define the buffer size.
ReadBufferSize = 16 * 1024 * 1024 // 16MB
// this limitation is to avoid this OOM risk:
// simetimes system segment max size is a large number, a single segment fields data might cause OOM.
@ -92,6 +92,7 @@ type ImportWrapper struct {
cancel context.CancelFunc // for canceling parse process
collectionInfo *CollectionInfo // collection details including schema
segmentSize int64 // maximum size of a segment(unit:byte) defined by dataCoord.segment.maxSize (milvus.yml)
binlogSize int64 // average binlog size(unit:byte), the max biglog file size is no more than 2*binlogSize
rowIDAllocator *allocator.IDAllocator // autoid allocator
chunkManager storage.ChunkManager
@ -107,7 +108,7 @@ type ImportWrapper struct {
progressPercent int64 // working progress percent
}
func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segmentSize int64,
func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segmentSize int64, maxBinlogSize int64,
idAlloc *allocator.IDAllocator, cm storage.ChunkManager, importResult *rootcoordpb.ImportResult,
reportFunc func(res *rootcoordpb.ImportResult) error,
) *ImportWrapper {
@ -120,11 +121,19 @@ func NewImportWrapper(ctx context.Context, collectionInfo *CollectionInfo, segme
ctx, cancel := context.WithCancel(ctx)
// average binlogSize is expected to be half of the maxBinlogSize
// and avoid binlogSize to be a tiny value
binlogSize := int64(float32(maxBinlogSize) * 0.5)
if binlogSize < ReadBufferSize {
binlogSize = ReadBufferSize
}
wrapper := &ImportWrapper{
ctx: ctx,
cancel: cancel,
collectionInfo: collectionInfo,
segmentSize: segmentSize,
binlogSize: binlogSize,
rowIDAllocator: idAlloc,
chunkManager: cm,
importResult: importResult,
@ -282,7 +291,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
return p.flushFunc(fields, shardID, partitionID)
}
parser, err := NewNumpyParser(p.ctx, p.collectionInfo, p.rowIDAllocator, SingleBlockSize,
parser, err := NewNumpyParser(p.ctx, p.collectionInfo, p.rowIDAllocator, p.binlogSize,
p.chunkManager, flushFunc, p.updateProgressPercent)
if err != nil {
return err
@ -384,7 +393,7 @@ func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64,
printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
return p.flushFunc(fields, shardID, partitionID)
}
parser, err := NewBinlogParser(p.ctx, p.collectionInfo, SingleBlockSize,
parser, err := NewBinlogParser(p.ctx, p.collectionInfo, p.binlogSize,
p.chunkManager, flushFunc, p.updateProgressPercent, tsStartPoint, tsEndPoint)
if err != nil {
return err
@ -433,7 +442,7 @@ func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) er
}
}
consumer, err := NewJSONRowConsumer(p.ctx, p.collectionInfo, p.rowIDAllocator, SingleBlockSize, flushFunc)
consumer, err := NewJSONRowConsumer(p.ctx, p.collectionInfo, p.rowIDAllocator, p.binlogSize, flushFunc)
if err != nil {
return err
}

View File

@ -185,7 +185,7 @@ func Test_ImportWrapperNew(t *testing.T) {
ctx := context.Background()
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, nil, 1, nil, cm, nil, nil)
wrapper := NewImportWrapper(ctx, nil, 1, ReadBufferSize, nil, cm, nil, nil)
assert.Nil(t, wrapper)
schema := &schemapb.CollectionSchema{
@ -205,7 +205,7 @@ func Test_ImportWrapperNew(t *testing.T) {
})
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
wrapper = NewImportWrapper(ctx, collectionInfo, 1, nil, cm, nil, nil)
wrapper = NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, cm, nil, nil)
assert.NotNil(t, wrapper)
assignSegFunc := func(shardID int, partID int64) (int64, string, error) {
@ -282,7 +282,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
assert.NoError(t, err)
t.Run("success case", func(t *testing.T) {
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0)
files = append(files, filePath)
@ -308,7 +308,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
assert.NoError(t, err)
importResult.State = commonpb.ImportState_ImportStarted
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0)
files = append(files, filePath)
@ -320,7 +320,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
t.Run("file doesn't exist", func(t *testing.T) {
files := make([]string, 0)
files = append(files, "/dummy/dummy.json")
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
err = wrapper.Import(files, ImportOptions{OnlyValidate: true})
assert.Error(t, err)
})
@ -362,7 +362,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
files := createSampleNumpyFiles(t, cm)
t.Run("success case", func(t *testing.T) {
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
err = wrapper.Import(files, DefaultImportOptions())
@ -380,7 +380,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
files[1] = filePath
importResult.State = commonpb.ImportState_ImportStarted
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
err = wrapper.Import(files, DefaultImportOptions())
@ -391,7 +391,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
t.Run("file doesn't exist", func(t *testing.T) {
files := make([]string, 0)
files = append(files, "/dummy/dummy.npy")
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
err = wrapper.Import(files, DefaultImportOptions())
assert.Error(t, err)
})
@ -511,7 +511,7 @@ func Test_ImportWrapperRowBased_perf(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := make([]string, 0)
@ -555,7 +555,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
t.Run("unsupported file type", func(t *testing.T) {
files := []string{"uid.txt"}
@ -605,7 +605,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("empty file list", func(t *testing.T) {
files := []string{}
cm.size = 0
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files)
assert.NoError(t, err)
assert.False(t, rowBased)
@ -614,7 +614,7 @@ func Test_ImportWrapperFileValidation(t *testing.T) {
t.Run("file size exceed MaxFileSize limit", func(t *testing.T) {
files := []string{"a/1.json"}
cm.size = params.Params.CommonCfg.ImportMaxFileSize.GetAsInt64() + 1
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper = NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
rowBased, err := wrapper.fileValidation(files)
assert.Error(t, err)
assert.True(t, rowBased)
@ -685,7 +685,7 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files := []string{filePath}
@ -732,7 +732,7 @@ func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, idAllocator, cm, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
wrapper.reportImportAttempts = 2
@ -767,7 +767,7 @@ func Test_ImportWrapperIsBinlogImport(t *testing.T) {
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
// empty paths
paths := []string{}
@ -831,7 +831,7 @@ func Test_ImportWrapperDoBinlogImport(t *testing.T) {
collectionInfo, err := NewCollectionInfo(schema, int32(shardNum), []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), idAllocator, cm, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(segmentSize), ReadBufferSize, idAllocator, cm, nil, nil)
paths := []string{
"/tmp",
"/tmp",
@ -894,7 +894,7 @@ func Test_ImportWrapperReportPersisted(t *testing.T) {
}
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(1024), nil, nil, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, int64(1024), ReadBufferSize, nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper)
rowCounter := &rowCounterTest{}
@ -937,7 +937,7 @@ func Test_ImportWrapperUpdateProgressPercent(t *testing.T) {
collectionInfo, err := NewCollectionInfo(sampleSchema(), 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, nil, nil, nil, nil)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, nil, nil, nil)
assert.NotNil(t, wrapper)
assert.Equal(t, int64(0), wrapper.progressPercent)
@ -976,7 +976,7 @@ func Test_ImportWrapperFlushFunc(t *testing.T) {
schema := sampleSchema()
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, nil, nil, importResult, reportFunc)
wrapper := NewImportWrapper(ctx, collectionInfo, 1, ReadBufferSize, nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)

View File

@ -73,7 +73,7 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche
// for low dimensional vector, the bufSize is a large value, read more rows each time
bufRowCount := parser.bufRowCount
for {
if bufRowCount*sizePerRecord > SingleBlockSize {
if bufRowCount*sizePerRecord > ReadBufferSize {
bufRowCount--
} else {
break

View File

@ -563,9 +563,9 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
// read string one by one is not efficient, here we read strings batch by batch, each bach size is no more than 16MB
batchRead := 1 // rows of each batch, make sure this value is equal or greater than 1
if utf {
batchRead += SingleBlockSize / (utf8.UTFMax * maxLen)
batchRead += ReadBufferSize / (utf8.UTFMax * maxLen)
} else {
batchRead += SingleBlockSize / maxLen
batchRead += ReadBufferSize / maxLen
}
log.Info("Numpy adapter: prepare to read varchar batch by batch",