Report bulkinsert progress (#21612)

Signed-off-by: groot <yihua.mo@zilliz.com>
pull/21592/head
groot 2023-01-10 16:53:39 +08:00 committed by GitHub
parent 120c876122
commit de318c2e7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 242 additions and 83 deletions

View File

@ -986,6 +986,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
AutoIds: make([]int64, 0),
RowCount: 0,
}
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.ProgressPercent, Value: "0"})
// Spawn a new context to ignore cancellation from parental context.
newCtx, cancel := context.WithTimeout(context.TODO(), ImportCallTimeout)

View File

@ -607,8 +607,9 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im
if kv.GetKey() == importutil.FailedReason {
toPersistImportTaskInfo.State.ErrorMessage = kv.GetValue()
break
} else if kv.GetKey() == importutil.PersistTimeCost {
toPersistImportTaskInfo.Infos = append(toPersistImportTaskInfo.Infos, kv)
} else if kv.GetKey() == importutil.PersistTimeCost ||
kv.GetKey() == importutil.ProgressPercent {
importutil.UpdateKVInfo(&toPersistImportTaskInfo.Infos, kv.GetKey(), kv.GetValue())
}
}
log.Info("importManager update task info", zap.Any("toPersistImportTaskInfo", toPersistImportTaskInfo))
@ -667,6 +668,9 @@ func (m *importManager) setImportTaskStateAndReason(taskID int64, targetState co
// Meta persist should be done before memory objs change.
toPersistImportTaskInfo := cloneImportTaskInfo(t)
toPersistImportTaskInfo.State.StateCode = targetState
if targetState == commonpb.ImportState_ImportCompleted {
importutil.UpdateKVInfo(&toPersistImportTaskInfo.Infos, importutil.ProgressPercent, "100")
}
tryUpdateErrMsg(errReason, toPersistImportTaskInfo)
// Update task in task store.
if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil {
@ -684,6 +688,9 @@ func (m *importManager) setImportTaskStateAndReason(taskID int64, targetState co
// Meta persist should be done before memory objs change.
toPersistImportTaskInfo := cloneImportTaskInfo(v)
toPersistImportTaskInfo.State.StateCode = targetState
if targetState == commonpb.ImportState_ImportCompleted {
importutil.UpdateKVInfo(&toPersistImportTaskInfo.Infos, importutil.ProgressPercent, "100")
}
tryUpdateErrMsg(errReason, toPersistImportTaskInfo)
// Update task in task store.
if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil {
@ -704,6 +711,9 @@ func (m *importManager) setImportTaskStateAndReason(taskID int64, targetState co
} else {
toPersistImportTaskInfo := cloneImportTaskInfo(ti)
toPersistImportTaskInfo.State.StateCode = targetState
if targetState == commonpb.ImportState_ImportCompleted {
importutil.UpdateKVInfo(&toPersistImportTaskInfo.Infos, importutil.ProgressPercent, "100")
}
tryUpdateErrMsg(errReason, toPersistImportTaskInfo)
// Update task in task store.
if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil {

View File

@ -31,12 +31,13 @@ import (
)
type BinlogParser struct {
ctx context.Context // for canceling parse process
collectionSchema *schemapb.CollectionSchema // collection schema
shardNum int32 // sharding number of the collection
blockSize int64 // maximum size of a read block(unit:byte)
chunkManager storage.ChunkManager // storage interfaces to browse/read the files
callFlushFunc ImportFlushFunc // call back function to flush segment
ctx context.Context // for canceling parse process
collectionSchema *schemapb.CollectionSchema // collection schema
shardNum int32 // sharding number of the collection
blockSize int64 // maximum size of a read block(unit:byte)
chunkManager storage.ChunkManager // storage interfaces to browse/read the files
callFlushFunc ImportFlushFunc // call back function to flush segment
updateProgressFunc func(percent int64) // update working progress percent value
// a timestamp to define the start time point of restore, data before this time point will be ignored
// set this value to 0, all the data will be imported
@ -57,6 +58,7 @@ func NewBinlogParser(ctx context.Context,
blockSize int64,
chunkManager storage.ChunkManager,
flushFunc ImportFlushFunc,
updateProgressFunc func(percent int64),
tsStartPoint uint64,
tsEndPoint uint64) (*BinlogParser, error) {
if collectionSchema == nil {
@ -248,12 +250,23 @@ func (p *BinlogParser) Parse(filePaths []string) error {
return err
}
for _, segmentHolder := range segmentHolders {
updateProgress := func(readBatch int) {
if p.updateProgressFunc != nil && len(segmentHolders) != 0 {
percent := (readBatch * ProgressValueForPersist) / len(segmentHolders)
log.Debug("Binlog parser: working progress", zap.Int("readBatch", readBatch),
zap.Int("totalBatchCount", len(segmentHolders)), zap.Int("percent", percent))
p.updateProgressFunc(int64(percent))
}
}
for i, segmentHolder := range segmentHolders {
err = p.parseSegmentFiles(segmentHolder)
if err != nil {
return err
}
updateProgress(i + 1)
// trigger gb after each segment finished
triggerGC()
}

View File

@ -32,17 +32,17 @@ func Test_NewBinlogParser(t *testing.T) {
ctx := context.Background()
// nil schema
parser, err := NewBinlogParser(ctx, nil, 2, 1024, nil, nil, 0, math.MaxUint64)
parser, err := NewBinlogParser(ctx, nil, 2, 1024, nil, nil, nil, 0, math.MaxUint64)
assert.Nil(t, parser)
assert.NotNil(t, err)
// nil chunkmanager
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, nil, nil, 0, math.MaxUint64)
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, nil, nil, nil, 0, math.MaxUint64)
assert.Nil(t, parser)
assert.NotNil(t, err)
// nil flushfunc
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, nil, 0, math.MaxUint64)
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, nil, nil, 0, math.MaxUint64)
assert.Nil(t, parser)
assert.NotNil(t, err)
@ -50,12 +50,12 @@ func Test_NewBinlogParser(t *testing.T) {
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64)
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
// tsStartPoint larger than tsEndPoint
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, 2, 1)
parser, err = NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 2, 1)
assert.Nil(t, parser)
assert.NotNil(t, err)
}
@ -127,7 +127,7 @@ func Test_BinlogParserConstructHolders(t *testing.T) {
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009/434574382554415105",
}
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, 0, math.MaxUint64)
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
@ -187,7 +187,7 @@ func Test_BinlogParserConstructHoldersFailed(t *testing.T) {
listResult: make(map[string][]string),
}
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, 0, math.MaxUint64)
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, chunkManager, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
@ -233,7 +233,7 @@ func Test_BinlogParserParseFilesFailed(t *testing.T) {
return nil
}
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64)
parser, err := NewBinlogParser(ctx, sampleSchema(), 2, 1024, &MockChunkManager{}, flushFunc, nil, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)
@ -268,7 +268,10 @@ func Test_BinlogParserParse(t *testing.T) {
},
}
parser, err := NewBinlogParser(ctx, schema, 2, 1024, chunkManager, flushFunc, 0, math.MaxUint64)
updateProgress := func(percent int64) {
assert.Greater(t, percent, int64(0))
}
parser, err := NewBinlogParser(ctx, schema, 2, 1024, chunkManager, flushFunc, updateProgress, 0, math.MaxUint64)
assert.NotNil(t, parser)
assert.Nil(t, err)

View File

@ -30,6 +30,7 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
@ -539,3 +540,22 @@ func pkToShard(pk interface{}, shardNum uint32) (uint32, error) {
return shard, nil
}
func UpdateKVInfo(infos *[]*commonpb.KeyValuePair, k string, v string) error {
if infos == nil {
return errors.New("Import util: kv array pointer is nil")
}
found := false
for _, kv := range *infos {
if kv.GetKey() == k {
kv.Value = v
found = true
}
}
if !found {
*infos = append(*infos, &commonpb.KeyValuePair{Key: k, Value: v})
}
return nil
}

View File

@ -640,3 +640,24 @@ func Test_PkToShard(t *testing.T) {
hash, _ = typeutil.Hash32Int64(pk)
assert.Equal(t, hash%shardNum, shard)
}
func Test_UpdateKVInfo(t *testing.T) {
err := UpdateKVInfo(nil, "a", "1")
assert.Error(t, err)
infos := make([]*commonpb.KeyValuePair, 0)
err = UpdateKVInfo(&infos, "a", "1")
assert.NoError(t, err)
assert.Equal(t, 1, len(infos))
assert.Equal(t, "1", infos[0].Value)
err = UpdateKVInfo(&infos, "a", "2")
assert.NoError(t, err)
assert.Equal(t, "2", infos[0].Value)
err = UpdateKVInfo(&infos, "b", "5")
assert.NoError(t, err)
assert.Equal(t, 2, len(infos))
assert.Equal(t, "5", infos[1].Value)
}

View File

@ -58,12 +58,16 @@ const (
// the total memory size might cause OOM.
MaxTotalSizeInMemory = 2 * 1024 * 1024 * 1024 // 2GB
// progress percent value of persist state
ProgressValueForPersist = 90
// keywords of import task informations
FailedReason = "failed_reason"
Files = "files"
CollectionName = "collection"
PartitionName = "partition"
PersistTimeCost = "persist_cost"
ProgressPercent = "progress_percent"
)
// ReportImportAttempts is the maximum # of attempts to retry when import fails.
@ -102,6 +106,7 @@ type ImportWrapper struct {
reportImportAttempts uint // attempts count if report function get error
workingSegments map[int]*WorkingSegment // a map shard id to working segments
progressPercent int64 // working progress percent
}
func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.CollectionSchema, shardNum int32, segmentSize int64,
@ -291,7 +296,8 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
return p.flushFunc(fields, shardID)
}
parser, err := NewNumpyParser(p.ctx, p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize, p.chunkManager, flushFunc)
parser, err := NewNumpyParser(p.ctx, p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize,
p.chunkManager, flushFunc, p.updateProgressPercent)
if err != nil {
return err
}
@ -326,6 +332,9 @@ func (p *ImportWrapper) reportPersisted(reportAttempts uint, tr *timerecord.Time
// report file process state
p.importResult.State = commonpb.ImportState_ImportPersisted
progressValue := strconv.Itoa(ProgressValueForPersist)
UpdateKVInfo(&p.importResult.Infos, ProgressPercent, progressValue)
log.Info("import wrapper: report import result", zap.Any("importResult", p.importResult))
// persist state task is valuable, retry more times in case fail this task only because of network error
reportErr := retry.Do(p.ctx, func() error {
@ -390,7 +399,7 @@ func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64,
return p.flushFunc(fields, shardID)
}
parser, err := NewBinlogParser(p.ctx, p.collectionSchema, p.shardNum, SingleBlockSize, p.chunkManager, flushFunc,
tsStartPoint, tsEndPoint)
p.updateProgressPercent, tsStartPoint, tsEndPoint)
if err != nil {
return err
}
@ -415,9 +424,14 @@ func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) er
}
defer file.Close()
size, err := p.chunkManager.Size(p.ctx, filePath)
if err != nil {
return err
}
// parse file
reader := bufio.NewReader(file)
parser := NewJSONParser(p.ctx, p.collectionSchema)
parser := NewJSONParser(p.ctx, p.collectionSchema, p.updateProgressPercent)
// if only validate, we input a empty flushFunc so that the consumer do nothing but only validation.
var flushFunc ImportFlushFunc
@ -438,7 +452,7 @@ func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) er
return err
}
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: size}, consumer)
if err != nil {
return err
}
@ -516,6 +530,17 @@ func (p *ImportWrapper) flushFunc(fields map[storage.FieldID]storage.FieldData,
segment.rowCount += int64(rowNum)
segment.memSize += memSize
// report working progress percent value to rootcoord
// if failed to report, ignore the error, the percent value might be improper but the task can be succeed
progressValue := strconv.Itoa(int(p.progressPercent))
UpdateKVInfo(&p.importResult.Infos, ProgressPercent, progressValue)
reportErr := retry.Do(p.ctx, func() error {
return p.reportFunc(p.importResult)
}, retry.Attempts(p.reportImportAttempts))
if reportErr != nil {
log.Warn("import wrapper: fail to report working progress percent value to RootCoord", zap.Error(reportErr))
}
return nil
}
@ -555,3 +580,11 @@ func (p *ImportWrapper) closeAllWorkingSegments() error {
return nil
}
func (p *ImportWrapper) updateProgressPercent(percent int64) {
// ignore illegal percent value
if percent < 0 || percent > 100 {
return
}
p.progressPercent = percent
}

View File

@ -969,3 +969,20 @@ func Test_ImportWrapperReportPersisted(t *testing.T) {
err = wrapper.reportPersisted(2, tr)
assert.Error(t, err)
}
func Test_ImportWrapperUpdateProgressPercent(t *testing.T) {
ctx := context.Background()
wrapper := NewImportWrapper(ctx, sampleSchema(), 2, 1, nil, nil, nil, nil)
assert.NotNil(t, wrapper)
assert.Equal(t, int64(0), wrapper.progressPercent)
wrapper.updateProgressPercent(5)
assert.Equal(t, int64(5), wrapper.progressPercent)
wrapper.updateProgressPercent(200)
assert.Equal(t, int64(5), wrapper.progressPercent)
wrapper.updateProgressPercent(100)
assert.Equal(t, int64(100), wrapper.progressPercent)
}

View File

@ -37,15 +37,21 @@ const (
RowRootNode = "rows"
)
type IOReader struct {
r io.Reader
fileSize int64
}
type JSONParser struct {
ctx context.Context // for canceling parse process
bufRowCount int // max rows in a buffer
fields map[string]int64 // fields need to be parsed
name2FieldID map[string]storage.FieldID
ctx context.Context // for canceling parse process
bufRowCount int // max rows in a buffer
fields map[string]int64 // fields need to be parsed
name2FieldID map[string]storage.FieldID
updateProgressFunc func(percent int64) // update working progress percent value
}
// NewJSONParser helper function to create a JSONParser
func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSchema) *JSONParser {
func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSchema, updateProgressFunc func(percent int64)) *JSONParser {
fields := make(map[string]int64)
name2FieldID := make(map[string]storage.FieldID)
for i := 0; i < len(collectionSchema.Fields); i++ {
@ -64,10 +70,11 @@ func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSch
}
parser := &JSONParser{
ctx: ctx,
bufRowCount: 1024,
fields: fields,
name2FieldID: name2FieldID,
ctx: ctx,
bufRowCount: 1024,
fields: fields,
name2FieldID: name2FieldID,
updateProgressFunc: updateProgressFunc,
}
adjustBufSize(parser, collectionSchema)
@ -132,19 +139,31 @@ func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}
return row, nil
}
func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
if handler == nil {
func (p *JSONParser) ParseRows(reader *IOReader, handler JSONRowHandler) error {
if handler == nil || reader == nil {
log.Error("JSON parse handler is nil")
return errors.New("JSON parse handler is nil")
}
dec := json.NewDecoder(r)
dec := json.NewDecoder(reader.r)
oldPercent := int64(0)
updateProgress := func() {
if p.updateProgressFunc != nil && reader.fileSize > 0 {
percent := (dec.InputOffset() * ProgressValueForPersist) / reader.fileSize
if percent > oldPercent { // avoid too many log
log.Debug("JSON parser: working progress", zap.Int64("offset", dec.InputOffset()),
zap.Int64("fileSize", reader.fileSize), zap.Int64("percent", percent))
}
oldPercent = percent
p.updateProgressFunc(percent)
}
}
// treat number value as a string instead of a float64.
// by default, json lib treat all number values as float64, but if an int64 value
// has more than 15 digits, the value would be incorrect after converting from float64
dec.UseNumber()
t, err := dec.Token()
if err != nil {
log.Error("JSON parser: failed to decode the JSON file", zap.Error(err))
@ -166,7 +185,6 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
}
key := t.(string)
keyLower := strings.ToLower(key)
// the root key should be RowRootNode
if keyLower != RowRootNode {
log.Error("JSON parser: invalid JSON format, the root key is not found", zap.String("RowRootNode", RowRootNode), zap.String("key", key))
@ -199,6 +217,8 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
return err
}
updateProgress()
buf = append(buf, row)
if len(buf) >= p.bufRowCount {
isEmpty = false
@ -249,6 +269,8 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
return errors.New("row count is 0")
}
updateProgress()
// send nil to notify the handler all have done
return handler.Handle(nil)
}

View File

@ -55,7 +55,7 @@ func Test_AdjustBufSize(t *testing.T) {
// small row
schema := sampleSchema()
parser := NewJSONParser(ctx, schema)
parser := NewJSONParser(ctx, schema, nil)
assert.NotNil(t, parser)
assert.Greater(t, parser.bufRowCount, 0)
@ -63,7 +63,7 @@ func Test_AdjustBufSize(t *testing.T) {
schema.Fields[9].TypeParams = []*commonpb.KeyValuePair{
{Key: "dim", Value: "32768"},
}
parser = NewJSONParser(ctx, schema)
parser = NewJSONParser(ctx, schema, nil)
assert.NotNil(t, parser)
assert.Greater(t, parser.bufRowCount, 0)
@ -74,7 +74,7 @@ func Test_AdjustBufSize(t *testing.T) {
AutoID: true,
Fields: []*schemapb.FieldSchema{},
}
parser = NewJSONParser(ctx, schema)
parser = NewJSONParser(ctx, schema, nil)
assert.NotNil(t, parser)
assert.Greater(t, parser.bufRowCount, 0)
}
@ -84,7 +84,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
defer cancel()
schema := sampleSchema()
parser := NewJSONParser(ctx, schema)
parser := NewJSONParser(ctx, schema, nil)
assert.NotNil(t, parser)
// prepare test data
@ -121,7 +121,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
t.Run("parse success", func(t *testing.T) {
// set bufRowCount = 4, means call handle() after reading 4 rows
parser.bufRowCount = 4
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(len(strContent))}, consumer)
assert.Nil(t, err)
assert.Equal(t, len(content.Rows), len(consumer.rows))
for i := 0; i < len(consumer.rows); i++ {
@ -188,7 +188,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
t.Run("error cases", func(t *testing.T) {
// handler is nil
err = parser.ParseRows(reader, nil)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(0)}, nil)
assert.NotNil(t, err)
// not a row-based format
@ -196,7 +196,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
"dummy":[]
}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(10)}, consumer)
assert.NotNil(t, err)
// rows is not a list
@ -204,7 +204,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
"rows":
}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(5)}, consumer)
assert.NotNil(t, err)
// typo
@ -212,7 +212,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
"rows": [}
}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(6)}, consumer)
assert.NotNil(t, err)
// rows is not a list
@ -220,7 +220,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
"rows": {}
}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(8)}, consumer)
assert.NotNil(t, err)
// rows is not a list of list
@ -228,25 +228,25 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
"rows": [[]]
}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(10)}, consumer)
assert.NotNil(t, err)
// not valid json format
reader = strings.NewReader(`[]`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(2)}, consumer)
assert.NotNil(t, err)
// empty content
reader = strings.NewReader(`{}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(2)}, consumer)
assert.NotNil(t, err)
// empty content
reader = strings.NewReader(``)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(0)}, consumer)
assert.NotNil(t, err)
// redundant field
@ -255,7 +255,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
{"dummy": 1, "FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}
]
}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer)
assert.NotNil(t, err)
// field missed
@ -264,7 +264,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
{"FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}
]
}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer)
assert.NotNil(t, err)
// handle() error
@ -278,26 +278,26 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
consumer.handleErr = errors.New("error")
reader = strings.NewReader(content)
parser.bufRowCount = 2
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer)
assert.NotNil(t, err)
reader = strings.NewReader(content)
parser.bufRowCount = 5
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer)
assert.NotNil(t, err)
// row count is 0
reader = strings.NewReader(`{
"rows":[]
}`)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer)
assert.NotNil(t, err)
// canceled
consumer.handleErr = nil
cancel()
reader = strings.NewReader(content)
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(100)}, consumer)
assert.NotNil(t, err)
})
}
@ -307,7 +307,10 @@ func Test_JSONParserParseRows_StrPK(t *testing.T) {
defer cancel()
schema := strKeySchema()
parser := NewJSONParser(ctx, schema)
updateProgress := func(percent int64) {
assert.Greater(t, percent, int64(0))
}
parser := NewJSONParser(ctx, schema, updateProgress)
assert.NotNil(t, parser)
// prepare test data
@ -337,7 +340,7 @@ func Test_JSONParserParseRows_StrPK(t *testing.T) {
handleCount: 0,
}
err = parser.ParseRows(reader, consumer)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(len(binContent))}, consumer)
assert.Nil(t, err)
assert.Equal(t, len(content.Rows), len(consumer.rows))
for i := 0; i < len(consumer.rows); i++ {

View File

@ -53,14 +53,15 @@ func closeReaders(columnReaders []*NumpyColumnReader) {
}
type NumpyParser struct {
ctx context.Context // for canceling parse process
collectionSchema *schemapb.CollectionSchema // collection schema
rowIDAllocator *allocator.IDAllocator // autoid allocator
shardNum int32 // sharding number of the collection
blockSize int64 // maximum size of a read block(unit:byte)
chunkManager storage.ChunkManager // storage interfaces to browse/read the files
autoIDRange []int64 // auto-generated id range, for example: [1, 10, 20, 25] means id from 1 to 10 and 20 to 25
callFlushFunc ImportFlushFunc // call back function to flush segment
ctx context.Context // for canceling parse process
collectionSchema *schemapb.CollectionSchema // collection schema
rowIDAllocator *allocator.IDAllocator // autoid allocator
shardNum int32 // sharding number of the collection
blockSize int64 // maximum size of a read block(unit:byte)
chunkManager storage.ChunkManager // storage interfaces to browse/read the files
autoIDRange []int64 // auto-generated id range, for example: [1, 10, 20, 25] means id from 1 to 10 and 20 to 25
callFlushFunc ImportFlushFunc // call back function to flush segment
updateProgressFunc func(percent int64) // update working progress percent value
}
// NewNumpyParser is helper function to create a NumpyParser
@ -70,7 +71,8 @@ func NewNumpyParser(ctx context.Context,
shardNum int32,
blockSize int64,
chunkManager storage.ChunkManager,
flushFunc ImportFlushFunc) (*NumpyParser, error) {
flushFunc ImportFlushFunc,
updateProgressFunc func(percent int64)) (*NumpyParser, error) {
if collectionSchema == nil {
log.Error("Numper parser: collection schema is nil")
return nil, errors.New("collection schema is nil")
@ -92,14 +94,15 @@ func NewNumpyParser(ctx context.Context,
}
parser := &NumpyParser{
ctx: ctx,
collectionSchema: collectionSchema,
rowIDAllocator: idAlloc,
shardNum: shardNum,
blockSize: blockSize,
chunkManager: chunkManager,
autoIDRange: make([]int64, 0),
callFlushFunc: flushFunc,
ctx: ctx,
collectionSchema: collectionSchema,
rowIDAllocator: idAlloc,
shardNum: shardNum,
blockSize: blockSize,
chunkManager: chunkManager,
autoIDRange: make([]int64, 0),
callFlushFunc: flushFunc,
updateProgressFunc: updateProgressFunc,
}
return parser, nil
@ -362,12 +365,21 @@ func (p *NumpyParser) consume(columnReaders []*NumpyColumnReader) error {
return err
}
updateProgress := func(readRowCount int) {
if p.updateProgressFunc != nil && len(columnReaders) != 0 && columnReaders[0].rowCount > 0 {
percent := (readRowCount * ProgressValueForPersist) / columnReaders[0].rowCount
log.Debug("Numper parser: working progress", zap.Int("readRowCount", readRowCount),
zap.Int("totalRowCount", columnReaders[0].rowCount), zap.Int("percent", percent))
p.updateProgressFunc(int64(percent))
}
}
// prepare shards
shards := make([]map[storage.FieldID]storage.FieldData, 0, p.shardNum)
for i := 0; i < int(p.shardNum); i++ {
segmentData := initSegmentData(p.collectionSchema)
if segmentData == nil {
log.Error("import wrapper: failed to initialize FieldData list")
log.Error("Numper parser: failed to initialize FieldData list")
return fmt.Errorf("failed to initialize FieldData list")
}
shards = append(shards, segmentData)
@ -399,6 +411,7 @@ func (p *NumpyParser) consume(columnReaders []*NumpyColumnReader) error {
if readRowCount == 0 {
break
}
updateProgress(readRowCount)
tr.Record("readData")
// split data to shards
err = p.splitFieldsData(segmentData, shards)

View File

@ -52,7 +52,7 @@ func createNumpyParser(t *testing.T) *NumpyParser {
return nil
}
parser, err := NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, flushFunc)
parser, err := NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, flushFunc, nil)
assert.NoError(t, err)
assert.NotNil(t, parser)
return parser
@ -71,30 +71,30 @@ func findSchema(schema *schemapb.CollectionSchema, dt schemapb.DataType) *schema
func Test_NewNumpyParser(t *testing.T) {
ctx := context.Background()
parser, err := NewNumpyParser(ctx, nil, nil, 2, 100, nil, nil)
parser, err := NewNumpyParser(ctx, nil, nil, 2, 100, nil, nil, nil)
assert.Error(t, err)
assert.Nil(t, parser)
schema := sampleSchema()
parser, err = NewNumpyParser(ctx, schema, nil, 2, 100, nil, nil)
parser, err = NewNumpyParser(ctx, schema, nil, 2, 100, nil, nil, nil)
assert.Error(t, err)
assert.Nil(t, parser)
idAllocator := newIDAllocator(ctx, t, nil)
parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, nil, nil)
parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, nil, nil, nil)
assert.Error(t, err)
assert.Nil(t, parser)
cm := createLocalChunkManager(t)
parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, nil)
parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, nil, nil)
assert.Error(t, err)
assert.Nil(t, parser)
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, flushFunc)
parser, err = NewNumpyParser(ctx, schema, idAllocator, 2, 100, cm, flushFunc, nil)
assert.NoError(t, err)
assert.NotNil(t, parser)
}
@ -878,7 +878,10 @@ func Test_NumpyParserParse_perf(t *testing.T) {
}
idAllocator := newIDAllocator(ctx, t, nil)
parser, err := NewNumpyParser(ctx, perfSchema(dim), idAllocator, shardNum, 16*1024*1024, cm, callFlushFunc)
updateProgress := func(percent int64) {
assert.Greater(t, percent, int64(0))
}
parser, err := NewNumpyParser(ctx, perfSchema(dim), idAllocator, shardNum, 16*1024*1024, cm, callFlushFunc, updateProgress)
assert.NoError(t, err)
assert.NotNil(t, parser)
parser.collectionSchema = perfSchema(dim)