From de318c2e7c910eb3bc284f93ec7c2e68a71cf59f Mon Sep 17 00:00:00 2001 From: groot Date: Tue, 10 Jan 2023 16:53:39 +0800 Subject: [PATCH] Report bulkinsert progress (#21612) Signed-off-by: groot --- internal/datanode/data_node.go | 1 + internal/rootcoord/import_manager.go | 14 +++++- internal/util/importutil/binlog_parser.go | 27 +++++++--- .../util/importutil/binlog_parser_test.go | 21 ++++---- internal/util/importutil/import_util.go | 20 ++++++++ internal/util/importutil/import_util_test.go | 21 ++++++++ internal/util/importutil/import_wrapper.go | 41 +++++++++++++-- .../util/importutil/import_wrapper_test.go | 17 +++++++ internal/util/importutil/json_parser.go | 50 +++++++++++++------ internal/util/importutil/json_parser_test.go | 47 +++++++++-------- internal/util/importutil/numpy_parser.go | 49 +++++++++++------- internal/util/importutil/numpy_parser_test.go | 17 ++++--- 12 files changed, 242 insertions(+), 83 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index d93701e33e..0367f2cb29 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -986,6 +986,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) AutoIds: make([]int64, 0), RowCount: 0, } + importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.ProgressPercent, Value: "0"}) // Spawn a new context to ignore cancellation from parental context. newCtx, cancel := context.WithTimeout(context.TODO(), ImportCallTimeout) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index d3fe0d0c87..511bda0dd8 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -607,8 +607,9 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im if kv.GetKey() == importutil.FailedReason { toPersistImportTaskInfo.State.ErrorMessage = kv.GetValue() break - } else if kv.GetKey() == importutil.PersistTimeCost { - toPersistImportTaskInfo.Infos = append(toPersistImportTaskInfo.Infos, kv) + } else if kv.GetKey() == importutil.PersistTimeCost || + kv.GetKey() == importutil.ProgressPercent { + importutil.UpdateKVInfo(&toPersistImportTaskInfo.Infos, kv.GetKey(), kv.GetValue()) } } log.Info("importManager update task info", zap.Any("toPersistImportTaskInfo", toPersistImportTaskInfo)) @@ -667,6 +668,9 @@ func (m *importManager) setImportTaskStateAndReason(taskID int64, targetState co // Meta persist should be done before memory objs change. toPersistImportTaskInfo := cloneImportTaskInfo(t) toPersistImportTaskInfo.State.StateCode = targetState + if targetState == commonpb.ImportState_ImportCompleted { + importutil.UpdateKVInfo(&toPersistImportTaskInfo.Infos, importutil.ProgressPercent, "100") + } tryUpdateErrMsg(errReason, toPersistImportTaskInfo) // Update task in task store. if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil { @@ -684,6 +688,9 @@ func (m *importManager) setImportTaskStateAndReason(taskID int64, targetState co // Meta persist should be done before memory objs change. toPersistImportTaskInfo := cloneImportTaskInfo(v) toPersistImportTaskInfo.State.StateCode = targetState + if targetState == commonpb.ImportState_ImportCompleted { + importutil.UpdateKVInfo(&toPersistImportTaskInfo.Infos, importutil.ProgressPercent, "100") + } tryUpdateErrMsg(errReason, toPersistImportTaskInfo) // Update task in task store. if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil { @@ -704,6 +711,9 @@ func (m *importManager) setImportTaskStateAndReason(taskID int64, targetState co } else { toPersistImportTaskInfo := cloneImportTaskInfo(ti) toPersistImportTaskInfo.State.StateCode = targetState + if targetState == commonpb.ImportState_ImportCompleted { + importutil.UpdateKVInfo(&toPersistImportTaskInfo.Infos, importutil.ProgressPercent, "100") + } tryUpdateErrMsg(errReason, toPersistImportTaskInfo) // Update task in task store. if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil { diff --git a/internal/util/importutil/binlog_parser.go b/internal/util/importutil/binlog_parser.go index 7cb34b48d7..7148d1a8d8 100644 --- a/internal/util/importutil/binlog_parser.go +++ b/internal/util/importutil/binlog_parser.go @@ -31,12 +31,13 @@ import ( ) type BinlogParser struct { - ctx context.Context // for canceling parse process - collectionSchema *schemapb.CollectionSchema // collection schema - shardNum int32 // sharding number of the collection - blockSize int64 // maximum size of a read block(unit:byte) - chunkManager storage.ChunkManager // storage interfaces to browse/read the files - callFlushFunc ImportFlushFunc // call back function to flush segment + ctx context.Context // for canceling parse process + collectionSchema *schemapb.CollectionSchema // collection schema + shardNum int32 // sharding number of the collection + blockSize int64 // maximum size of a read block(unit:byte) + chunkManager storage.ChunkManager // storage interfaces to browse/read the files + callFlushFunc ImportFlushFunc // call back function to flush segment + updateProgressFunc func(percent int64) // update working progress percent value // a timestamp to define the start time point of restore, data before this time point will be ignored // set this value to 0, all the data will be imported @@ -57,6 +58,7 @@ func NewBinlogParser(ctx context.Context, blockSize int64, chunkManager storage.ChunkManager, flushFunc ImportFlushFunc, + updateProgressFunc func(percent int64), tsStartPoint uint64, tsEndPoint uint64) (*BinlogParser, error) { if collectionSchema == nil { @@ -248,12 +250,23 @@ func (p *BinlogParser) Parse(filePaths []string) error { return err } - for _, segmentHolder := range segmentHolders { + updateProgress := func(readBatch int) { + if p.updateProgressFunc != nil && len(segmentHolders) != 0 { + percent := (readBatch * ProgressValueForPersist) / len(segmentHolders) + log.Debug("Binlog parser: working progress", zap.Int("readBatch", readBatch), + zap.Int("totalBatchCount", len(segmentHolders)), zap.Int("percent", percent)) + p.updateProgressFunc(int64(percent)) + } + } + + for i, segmentHolder := range segmentHolders { err = p.parseSegmentFiles(segmentHolder) if err != nil { return err } + updateProgress(i + 1) + // trigger gb after each segment finished triggerGC() } diff --git a/internal/util/importutil/binlog_parser_test.go b/internal/util/importutil/binlog_parser_test.go index 7ad375e766..48a8bcc1a8 100644 --- a/internal/util/importutil/binlog_parser_test.go +++ b/internal/util/importutil/binlog_parser_test.go @@ -32,17 +32,17 @@ func Test_NewBinlogParser(t *testing.T) { ctx := context.Background() // nil schema - parser, err := NewBinlogParser(ctx, nil, 2, 1024, nil, nil, 0, math.MaxUint64) + parser, err := NewBinlogParser(ctx, nil, 2, 1024, nil, nil, nil, 0, math.MaxUint64) assert.Nil(t, parser) assert.NotNil(t, err) // nil chunkmanager - parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, nil, nil, 0, math.MaxUint64) + parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, nil, nil, nil, 0, math.MaxUint64) assert.Nil(t, parser) assert.NotNil(t, err) // nil flushfunc - parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, nil, 0, math.MaxUint64) + parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, nil, nil, 0, math.MaxUint64) assert.Nil(t, parser) assert.NotNil(t, err) @@ -50,12 +50,12 @@ func Test_NewBinlogParser(t *testing.T) { flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error { return nil } - parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64) + parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 0, math.MaxUint64) assert.NotNil(t, parser) assert.Nil(t, err) // tsStartPoint larger than tsEndPoint - parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, 2, 1) + parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 2, 1) assert.Nil(t, parser) assert.NotNil(t, err) } @@ -127,7 +127,7 @@ func Test_BinlogParserConstructHolders(t *testing.T) { "backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009/434574382554415105", } - parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, 0, math.MaxUint64) + parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64) assert.NotNil(t, parser) assert.Nil(t, err) @@ -187,7 +187,7 @@ func Test_BinlogParserConstructHoldersFailed(t *testing.T) { listResult: make(map[string][]string), } - parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, 0, math.MaxUint64) + parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64) assert.NotNil(t, parser) assert.Nil(t, err) @@ -233,7 +233,7 @@ func Test_BinlogParserParseFilesFailed(t *testing.T) { return nil } - parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64) + parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 0, math.MaxUint64) assert.NotNil(t, parser) assert.Nil(t, err) @@ -268,7 +268,10 @@ func Test_BinlogParserParse(t *testing.T) { }, } - parser, err := NewBinlogParser(ctx, schema, 2, 1024, chunkManager, flushFunc, 0, math.MaxUint64) + updateProgress := func(percent int64) { + assert.Greater(t, percent, int64(0)) + } + parser, err := NewBinlogParser(ctx, schema, 2, 1024, chunkManager, flushFunc, updateProgress, 0, math.MaxUint64) assert.NotNil(t, parser) assert.Nil(t, err) diff --git a/internal/util/importutil/import_util.go b/internal/util/importutil/import_util.go index a4aaf34710..82bb22650e 100644 --- a/internal/util/importutil/import_util.go +++ b/internal/util/importutil/import_util.go @@ -30,6 +30,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" @@ -539,3 +540,22 @@ func pkToShard(pk interface{}, shardNum uint32) (uint32, error) { return shard, nil } + +func UpdateKVInfo(infos *[]*commonpb.KeyValuePair, k string, v string) error { + if infos == nil { + return errors.New("Import util: kv array pointer is nil") + } + + found := false + for _, kv := range *infos { + if kv.GetKey() == k { + kv.Value = v + found = true + } + } + if !found { + *infos = append(*infos, &commonpb.KeyValuePair{Key: k, Value: v}) + } + + return nil +} diff --git a/internal/util/importutil/import_util_test.go b/internal/util/importutil/import_util_test.go index c20cc51b36..57ebbd15b2 100644 --- a/internal/util/importutil/import_util_test.go +++ b/internal/util/importutil/import_util_test.go @@ -640,3 +640,24 @@ func Test_PkToShard(t *testing.T) { hash, _ = typeutil.Hash32Int64(pk) assert.Equal(t, hash%shardNum, shard) } + +func Test_UpdateKVInfo(t *testing.T) { + err := UpdateKVInfo(nil, "a", "1") + assert.Error(t, err) + + infos := make([]*commonpb.KeyValuePair, 0) + + err = UpdateKVInfo(&infos, "a", "1") + assert.NoError(t, err) + assert.Equal(t, 1, len(infos)) + assert.Equal(t, "1", infos[0].Value) + + err = UpdateKVInfo(&infos, "a", "2") + assert.NoError(t, err) + assert.Equal(t, "2", infos[0].Value) + + err = UpdateKVInfo(&infos, "b", "5") + assert.NoError(t, err) + assert.Equal(t, 2, len(infos)) + assert.Equal(t, "5", infos[1].Value) +} diff --git a/internal/util/importutil/import_wrapper.go b/internal/util/importutil/import_wrapper.go index 79d4667eb2..e3ea14925a 100644 --- a/internal/util/importutil/import_wrapper.go +++ b/internal/util/importutil/import_wrapper.go @@ -58,12 +58,16 @@ const ( // the total memory size might cause OOM. MaxTotalSizeInMemory = 2 * 1024 * 1024 * 1024 // 2GB + // progress percent value of persist state + ProgressValueForPersist = 90 + // keywords of import task informations FailedReason = "failed_reason" Files = "files" CollectionName = "collection" PartitionName = "partition" PersistTimeCost = "persist_cost" + ProgressPercent = "progress_percent" ) // ReportImportAttempts is the maximum # of attempts to retry when import fails. @@ -102,6 +106,7 @@ type ImportWrapper struct { reportImportAttempts uint // attempts count if report function get error workingSegments map[int]*WorkingSegment // a map shard id to working segments + progressPercent int64 // working progress percent } func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.CollectionSchema, shardNum int32, segmentSize int64, @@ -291,7 +296,8 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths) return p.flushFunc(fields, shardID) } - parser, err := NewNumpyParser(p.ctx, p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize, p.chunkManager, flushFunc) + parser, err := NewNumpyParser(p.ctx, p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize, + p.chunkManager, flushFunc, p.updateProgressPercent) if err != nil { return err } @@ -326,6 +332,9 @@ func (p *ImportWrapper) reportPersisted(reportAttempts uint, tr *timerecord.Time // report file process state p.importResult.State = commonpb.ImportState_ImportPersisted + progressValue := strconv.Itoa(ProgressValueForPersist) + UpdateKVInfo(&p.importResult.Infos, ProgressPercent, progressValue) + log.Info("import wrapper: report import result", zap.Any("importResult", p.importResult)) // persist state task is valuable, retry more times in case fail this task only because of network error reportErr := retry.Do(p.ctx, func() error { @@ -390,7 +399,7 @@ func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64, return p.flushFunc(fields, shardID) } parser, err := NewBinlogParser(p.ctx, p.collectionSchema, p.shardNum, SingleBlockSize, p.chunkManager, flushFunc, - tsStartPoint, tsEndPoint) + p.updateProgressPercent, tsStartPoint, tsEndPoint) if err != nil { return err } @@ -415,9 +424,14 @@ func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) er } defer file.Close() + size, err := p.chunkManager.Size(p.ctx, filePath) + if err != nil { + return err + } + // parse file reader := bufio.NewReader(file) - parser := NewJSONParser(p.ctx, p.collectionSchema) + parser := NewJSONParser(p.ctx, p.collectionSchema, p.updateProgressPercent) // if only validate, we input a empty flushFunc so that the consumer do nothing but only validation. var flushFunc ImportFlushFunc @@ -438,7 +452,7 @@ func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) er return err } - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: size}, consumer) if err != nil { return err } @@ -516,6 +530,17 @@ func (p *ImportWrapper) flushFunc(fields map[storage.FieldID]storage.FieldData, segment.rowCount += int64(rowNum) segment.memSize += memSize + // report working progress percent value to rootcoord + // if failed to report, ignore the error, the percent value might be improper but the task can be succeed + progressValue := strconv.Itoa(int(p.progressPercent)) + UpdateKVInfo(&p.importResult.Infos, ProgressPercent, progressValue) + reportErr := retry.Do(p.ctx, func() error { + return p.reportFunc(p.importResult) + }, retry.Attempts(p.reportImportAttempts)) + if reportErr != nil { + log.Warn("import wrapper: fail to report working progress percent value to RootCoord", zap.Error(reportErr)) + } + return nil } @@ -555,3 +580,11 @@ func (p *ImportWrapper) closeAllWorkingSegments() error { return nil } + +func (p *ImportWrapper) updateProgressPercent(percent int64) { + // ignore illegal percent value + if percent < 0 || percent > 100 { + return + } + p.progressPercent = percent +} diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index a3c6217200..0a8d673100 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -969,3 +969,20 @@ func Test_ImportWrapperReportPersisted(t *testing.T) { err = wrapper.reportPersisted(2, tr) assert.Error(t, err) } + +func Test_ImportWrapperUpdateProgressPercent(t *testing.T) { + ctx := context.Background() + + wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, nil, nil, nil, nil) + assert.NotNil(t, wrapper) + assert.Equal(t, int64(0), wrapper.progressPercent) + + wrapper.updateProgressPercent(5) + assert.Equal(t, int64(5), wrapper.progressPercent) + + wrapper.updateProgressPercent(200) + assert.Equal(t, int64(5), wrapper.progressPercent) + + wrapper.updateProgressPercent(100) + assert.Equal(t, int64(100), wrapper.progressPercent) +} diff --git a/internal/util/importutil/json_parser.go b/internal/util/importutil/json_parser.go index 1e76865317..1501b1a0cd 100644 --- a/internal/util/importutil/json_parser.go +++ b/internal/util/importutil/json_parser.go @@ -37,15 +37,21 @@ const ( RowRootNode = "rows" ) +type IOReader struct { + r io.Reader + fileSize int64 +} + type JSONParser struct { - ctx context.Context // for canceling parse process - bufRowCount int // max rows in a buffer - fields map[string]int64 // fields need to be parsed - name2FieldID map[string]storage.FieldID + ctx context.Context // for canceling parse process + bufRowCount int // max rows in a buffer + fields map[string]int64 // fields need to be parsed + name2FieldID map[string]storage.FieldID + updateProgressFunc func(percent int64) // update working progress percent value } // NewJSONParser helper function to create a JSONParser -func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSchema) *JSONParser { +func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSchema, updateProgressFunc func(percent int64)) *JSONParser { fields := make(map[string]int64) name2FieldID := make(map[string]storage.FieldID) for i := 0; i < len(collectionSchema.Fields); i++ { @@ -64,10 +70,11 @@ func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSch } parser := &JSONParser{ - ctx: ctx, - bufRowCount: 1024, - fields: fields, - name2FieldID: name2FieldID, + ctx: ctx, + bufRowCount: 1024, + fields: fields, + name2FieldID: name2FieldID, + updateProgressFunc: updateProgressFunc, } adjustBufSize(parser, collectionSchema) @@ -132,19 +139,31 @@ func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{} return row, nil } -func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error { - if handler == nil { +func (p *JSONParser) ParseRows(reader *IOReader, handler JSONRowHandler) error { + if handler == nil || reader == nil { log.Error("JSON parse handler is nil") return errors.New("JSON parse handler is nil") } - dec := json.NewDecoder(r) + dec := json.NewDecoder(reader.r) + + oldPercent := int64(0) + updateProgress := func() { + if p.updateProgressFunc != nil && reader.fileSize > 0 { + percent := (dec.InputOffset() * ProgressValueForPersist) / reader.fileSize + if percent > oldPercent { // avoid too many log + log.Debug("JSON parser: working progress", zap.Int64("offset", dec.InputOffset()), + zap.Int64("fileSize", reader.fileSize), zap.Int64("percent", percent)) + } + oldPercent = percent + p.updateProgressFunc(percent) + } + } // treat number value as a string instead of a float64. // by default, json lib treat all number values as float64, but if an int64 value // has more than 15 digits, the value would be incorrect after converting from float64 dec.UseNumber() - t, err := dec.Token() if err != nil { log.Error("JSON parser: failed to decode the JSON file", zap.Error(err)) @@ -166,7 +185,6 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error { } key := t.(string) keyLower := strings.ToLower(key) - // the root key should be RowRootNode if keyLower != RowRootNode { log.Error("JSON parser: invalid JSON format, the root key is not found", zap.String("RowRootNode", RowRootNode), zap.String("key", key)) @@ -199,6 +217,8 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error { return err } + updateProgress() + buf = append(buf, row) if len(buf) >= p.bufRowCount { isEmpty = false @@ -249,6 +269,8 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error { return errors.New("row count is 0") } + updateProgress() + // send nil to notify the handler all have done return handler.Handle(nil) } diff --git a/internal/util/importutil/json_parser_test.go b/internal/util/importutil/json_parser_test.go index 1c6f91caeb..54e3609fd1 100644 --- a/internal/util/importutil/json_parser_test.go +++ b/internal/util/importutil/json_parser_test.go @@ -55,7 +55,7 @@ func Test_AdjustBufSize(t *testing.T) { // small row schema := sampleSchema() - parser := NewJSONParser(ctx, schema) + parser := NewJSONParser(ctx, schema, nil) assert.NotNil(t, parser) assert.Greater(t, parser.bufRowCount, 0) @@ -63,7 +63,7 @@ func Test_AdjustBufSize(t *testing.T) { schema.Fields[9].TypeParams = []*commonpb.KeyValuePair{ {Key: "dim", Value: "32768"}, } - parser = NewJSONParser(ctx, schema) + parser = NewJSONParser(ctx, schema, nil) assert.NotNil(t, parser) assert.Greater(t, parser.bufRowCount, 0) @@ -74,7 +74,7 @@ func Test_AdjustBufSize(t *testing.T) { AutoID: true, Fields: []*schemapb.FieldSchema{}, } - parser = NewJSONParser(ctx, schema) + parser = NewJSONParser(ctx, schema, nil) assert.NotNil(t, parser) assert.Greater(t, parser.bufRowCount, 0) } @@ -84,7 +84,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { defer cancel() schema := sampleSchema() - parser := NewJSONParser(ctx, schema) + parser := NewJSONParser(ctx, schema, nil) assert.NotNil(t, parser) // prepare test data @@ -121,7 +121,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { t.Run("parse success", func(t *testing.T) { // set bufRowCount = 4, means call handle() after reading 4 rows parser.bufRowCount = 4 - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(len(strContent))}, consumer) assert.Nil(t, err) assert.Equal(t, len(content.Rows), len(consumer.rows)) for i := 0; i < len(consumer.rows); i++ { @@ -188,7 +188,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { t.Run("error cases", func(t *testing.T) { // handler is nil - err = parser.ParseRows(reader, nil) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(0)}, nil) assert.NotNil(t, err) // not a row-based format @@ -196,7 +196,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { "dummy":[] }`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(10)}, consumer) assert.NotNil(t, err) // rows is not a list @@ -204,7 +204,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { "rows": }`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(5)}, consumer) assert.NotNil(t, err) // typo @@ -212,7 +212,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { "rows": [} }`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(6)}, consumer) assert.NotNil(t, err) // rows is not a list @@ -220,7 +220,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { "rows": {} }`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(8)}, consumer) assert.NotNil(t, err) // rows is not a list of list @@ -228,25 +228,25 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { "rows": [[]] }`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(10)}, consumer) assert.NotNil(t, err) // not valid json format reader = strings.NewReader(`[]`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(2)}, consumer) assert.NotNil(t, err) // empty content reader = strings.NewReader(`{}`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(2)}, consumer) assert.NotNil(t, err) // empty content reader = strings.NewReader(``) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(0)}, consumer) assert.NotNil(t, err) // redundant field @@ -255,7 +255,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { {"dummy": 1, "FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]} ] }`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer) assert.NotNil(t, err) // field missed @@ -264,7 +264,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { {"FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]} ] }`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer) assert.NotNil(t, err) // handle() error @@ -278,26 +278,26 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { consumer.handleErr = errors.New("error") reader = strings.NewReader(content) parser.bufRowCount = 2 - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer) assert.NotNil(t, err) reader = strings.NewReader(content) parser.bufRowCount = 5 - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer) assert.NotNil(t, err) // row count is 0 reader = strings.NewReader(`{ "rows":[] }`) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer) assert.NotNil(t, err) // canceled consumer.handleErr = nil cancel() reader = strings.NewReader(content) - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer) assert.NotNil(t, err) }) } @@ -307,7 +307,10 @@ func Test_JSONParserParseRows_StrPK(t *testing.T) { defer cancel() schema := strKeySchema() - parser := NewJSONParser(ctx, schema) + updateProgress := func(percent int64) { + assert.Greater(t, percent, int64(0)) + } + parser := NewJSONParser(ctx, schema, updateProgress) assert.NotNil(t, parser) // prepare test data @@ -337,7 +340,7 @@ func Test_JSONParserParseRows_StrPK(t *testing.T) { handleCount: 0, } - err = parser.ParseRows(reader, consumer) + err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(len(binContent))}, consumer) assert.Nil(t, err) assert.Equal(t, len(content.Rows), len(consumer.rows)) for i := 0; i < len(consumer.rows); i++ { diff --git a/internal/util/importutil/numpy_parser.go b/internal/util/importutil/numpy_parser.go index cc8ed552bc..e138956be5 100644 --- a/internal/util/importutil/numpy_parser.go +++ b/internal/util/importutil/numpy_parser.go @@ -53,14 +53,15 @@ func closeReaders(columnReaders []*NumpyColumnReader) { } type NumpyParser struct { - ctx context.Context // for canceling parse process - collectionSchema *schemapb.CollectionSchema // collection schema - rowIDAllocator *allocator.IDAllocator // autoid allocator - shardNum int32 // sharding number of the collection - blockSize int64 // maximum size of a read block(unit:byte) - chunkManager storage.ChunkManager // storage interfaces to browse/read the files - autoIDRange []int64 // auto-generated id range, for example: [1, 10, 20, 25] means id from 1 to 10 and 20 to 25 - callFlushFunc ImportFlushFunc // call back function to flush segment + ctx context.Context // for canceling parse process + collectionSchema *schemapb.CollectionSchema // collection schema + rowIDAllocator *allocator.IDAllocator // autoid allocator + shardNum int32 // sharding number of the collection + blockSize int64 // maximum size of a read block(unit:byte) + chunkManager storage.ChunkManager // storage interfaces to browse/read the files + autoIDRange []int64 // auto-generated id range, for example: [1, 10, 20, 25] means id from 1 to 10 and 20 to 25 + callFlushFunc ImportFlushFunc // call back function to flush segment + updateProgressFunc func(percent int64) // update working progress percent value } // NewNumpyParser is helper function to create a NumpyParser @@ -70,7 +71,8 @@ func NewNumpyParser(ctx context.Context, shardNum int32, blockSize int64, chunkManager storage.ChunkManager, - flushFunc ImportFlushFunc) (*NumpyParser, error) { + flushFunc ImportFlushFunc, + updateProgressFunc func(percent int64)) (*NumpyParser, error) { if collectionSchema == nil { log.Error("Numper parser: collection schema is nil") return nil, errors.New("collection schema is nil") @@ -92,14 +94,15 @@ func NewNumpyParser(ctx context.Context, } parser := &NumpyParser{ - ctx: ctx, - collectionSchema: collectionSchema, - rowIDAllocator: idAlloc, - shardNum: shardNum, - blockSize: blockSize, - chunkManager: chunkManager, - autoIDRange: make([]int64, 0), - callFlushFunc: flushFunc, + ctx: ctx, + collectionSchema: collectionSchema, + rowIDAllocator: idAlloc, + shardNum: shardNum, + blockSize: blockSize, + chunkManager: chunkManager, + autoIDRange: make([]int64, 0), + callFlushFunc: flushFunc, + updateProgressFunc: updateProgressFunc, } return parser, nil @@ -362,12 +365,21 @@ func (p *NumpyParser) consume(columnReaders []*NumpyColumnReader) error { return err } + updateProgress := func(readRowCount int) { + if p.updateProgressFunc != nil && len(columnReaders) != 0 && columnReaders[0].rowCount > 0 { + percent := (readRowCount * ProgressValueForPersist) / columnReaders[0].rowCount + log.Debug("Numper parser: working progress", zap.Int("readRowCount", readRowCount), + zap.Int("totalRowCount", columnReaders[0].rowCount), zap.Int("percent", percent)) + p.updateProgressFunc(int64(percent)) + } + } + // prepare shards shards := make([]map[storage.FieldID]storage.FieldData, 0, p.shardNum) for i := 0; i < int(p.shardNum); i++ { segmentData := initSegmentData(p.collectionSchema) if segmentData == nil { - log.Error("import wrapper: failed to initialize FieldData list") + log.Error("Numper parser: failed to initialize FieldData list") return fmt.Errorf("failed to initialize FieldData list") } shards = append(shards, segmentData) @@ -399,6 +411,7 @@ func (p *NumpyParser) consume(columnReaders []*NumpyColumnReader) error { if readRowCount == 0 { break } + updateProgress(readRowCount) tr.Record("readData") // split data to shards err = p.splitFieldsData(segmentData, shards) diff --git a/internal/util/importutil/numpy_parser_test.go b/internal/util/importutil/numpy_parser_test.go index 6e81a4871f..49ce4f19fb 100644 --- a/internal/util/importutil/numpy_parser_test.go +++ b/internal/util/importutil/numpy_parser_test.go @@ -52,7 +52,7 @@ func createNumpyParser(t *testing.T) *NumpyParser { return nil } - parser, err := NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, flushFunc) + parser, err := NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, flushFunc, nil) assert.NoError(t, err) assert.NotNil(t, parser) return parser @@ -71,30 +71,30 @@ func findSchema(schema *schemapb.CollectionSchema, dt schemapb.DataType) *schema func Test_NewNumpyParser(t *testing.T) { ctx := context.Background() - parser, err := NewNumpyParser(ctx, nil, nil, 2, 100, nil, nil) + parser, err := NewNumpyParser(ctx, nil, nil, 2, 100, nil, nil, nil) assert.Error(t, err) assert.Nil(t, parser) schema := sampleSchema() - parser, err = NewNumpyParser(ctx, schema, nil, 2, 100, nil, nil) + parser, err = NewNumpyParser(ctx, schema, nil, 2, 100, nil, nil, nil) assert.Error(t, err) assert.Nil(t, parser) idAllocator := newIDAllocator(ctx, t, nil) - parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, nil, nil) + parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, nil, nil, nil) assert.Error(t, err) assert.Nil(t, parser) cm := createLocalChunkManager(t) - parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, nil) + parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, nil, nil) assert.Error(t, err) assert.Nil(t, parser) flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error { return nil } - parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, flushFunc) + parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, flushFunc, nil) assert.NoError(t, err) assert.NotNil(t, parser) } @@ -878,7 +878,10 @@ func Test_NumpyParserParse_perf(t *testing.T) { } idAllocator := newIDAllocator(ctx, t, nil) - parser, err := NewNumpyParser(ctx, perfSchema(dim), idAllocator, shardNum, 16*1024*1024, cm, callFlushFunc) + updateProgress := func(percent int64) { + assert.Greater(t, percent, int64(0)) + } + parser, err := NewNumpyParser(ctx, perfSchema(dim), idAllocator, shardNum, 16*1024*1024, cm, callFlushFunc, updateProgress) assert.NoError(t, err) assert.NotNil(t, parser) parser.collectionSchema = perfSchema(dim)