mirror of https://github.com/milvus-io/milvus.git
Bulkinsert support pure list json (#28126)
Signed-off-by: yhmo <yihua.mo@zilliz.com>pull/28154/head
parent
4558af94d5
commit
f11e99efed
|
@ -232,38 +232,41 @@ func (p *JSONParser) ParseRows(reader *IOReader, handler JSONRowHandler) error {
|
|||
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
|
||||
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
|
||||
}
|
||||
if t != json.Delim('{') {
|
||||
log.Warn("JSON parser: invalid JSON format, the content should be started with'{'")
|
||||
return errors.New("invalid JSON format, the content should be started with'{'")
|
||||
if t != json.Delim('{') && t != json.Delim('[') {
|
||||
log.Warn("JSON parser: invalid JSON format, the content should be started with '{' or '['")
|
||||
return errors.New("invalid JSON format, the content should be started with '{' or '['")
|
||||
}
|
||||
|
||||
// read the first level
|
||||
isEmpty := true
|
||||
isOldFormat := (t == json.Delim('{'))
|
||||
for dec.More() {
|
||||
// read the key
|
||||
t, err := dec.Token()
|
||||
if err != nil {
|
||||
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
|
||||
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
|
||||
}
|
||||
key := t.(string)
|
||||
keyLower := strings.ToLower(key)
|
||||
// the root key should be RowRootNode
|
||||
if keyLower != RowRootNode {
|
||||
log.Warn("JSON parser: invalid JSON format, the root key is not found", zap.String("RowRootNode", RowRootNode), zap.String("key", key))
|
||||
return fmt.Errorf("invalid JSON format, the root key should be '%s', but get '%s'", RowRootNode, key)
|
||||
}
|
||||
if isOldFormat {
|
||||
// read the key
|
||||
t, err := dec.Token()
|
||||
if err != nil {
|
||||
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
|
||||
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
|
||||
}
|
||||
key := t.(string)
|
||||
keyLower := strings.ToLower(key)
|
||||
// the root key should be RowRootNode
|
||||
if keyLower != RowRootNode {
|
||||
log.Warn("JSON parser: invalid JSON format, the root key is not found", zap.String("RowRootNode", RowRootNode), zap.String("key", key))
|
||||
return fmt.Errorf("invalid JSON format, the root key should be '%s', but get '%s'", RowRootNode, key)
|
||||
}
|
||||
|
||||
// started by '['
|
||||
t, err = dec.Token()
|
||||
if err != nil {
|
||||
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
|
||||
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
|
||||
}
|
||||
// started by '['
|
||||
t, err = dec.Token()
|
||||
if err != nil {
|
||||
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
|
||||
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
|
||||
}
|
||||
|
||||
if t != json.Delim('[') {
|
||||
log.Warn("JSON parser: invalid JSON format, rows list should begin with '['")
|
||||
return errors.New("invalid JSON format, rows list should begin with '['")
|
||||
if t != json.Delim('[') {
|
||||
log.Warn("JSON parser: invalid JSON format, rows list should begin with '['")
|
||||
return errors.New("invalid JSON format, rows list should begin with '['")
|
||||
}
|
||||
}
|
||||
|
||||
// read buffer
|
||||
|
|
|
@ -116,21 +116,16 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
|
|||
content.Rows = append(content.Rows, row)
|
||||
}
|
||||
|
||||
binContent, err := json.Marshal(content)
|
||||
assert.NoError(t, err)
|
||||
strContent := string(binContent)
|
||||
reader := strings.NewReader(strContent)
|
||||
verifyRows := func(ioReader *IOReader) {
|
||||
consumer := &mockJSONRowConsumer{
|
||||
handleErr: nil,
|
||||
rows: make([]map[int64]interface{}, 0),
|
||||
handleCount: 0,
|
||||
}
|
||||
|
||||
consumer := &mockJSONRowConsumer{
|
||||
handleErr: nil,
|
||||
rows: make([]map[int64]interface{}, 0),
|
||||
handleCount: 0,
|
||||
}
|
||||
|
||||
t.Run("parse success", func(t *testing.T) {
|
||||
// set bufRowCount = 4, means call handle() after reading 4 rows
|
||||
parser.bufRowCount = 4
|
||||
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(len(strContent))}, consumer)
|
||||
err = parser.ParseRows(ioReader, consumer)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(content.Rows), len(consumer.rows))
|
||||
for i := 0; i < len(consumer.rows); i++ {
|
||||
|
@ -193,6 +188,22 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
|
|||
assert.InDelta(t, contenctRow.FieldFloatVector[k], float32(fval), 10e-6)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
consumer := &mockJSONRowConsumer{
|
||||
handleErr: nil,
|
||||
rows: make([]map[int64]interface{}, 0),
|
||||
handleCount: 0,
|
||||
}
|
||||
|
||||
t.Run("parse old format success", func(t *testing.T) {
|
||||
binContent, err := json.Marshal(content)
|
||||
assert.NoError(t, err)
|
||||
strContent := string(binContent)
|
||||
reader := strings.NewReader(strContent)
|
||||
|
||||
ioReader := &IOReader{r: reader, fileSize: int64(len(strContent))}
|
||||
verifyRows(ioReader)
|
||||
|
||||
// empty content
|
||||
reader = strings.NewReader(`{}`)
|
||||
|
@ -207,8 +218,25 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("parse new format success", func(t *testing.T) {
|
||||
binContent, err := json.Marshal(content.Rows)
|
||||
assert.NoError(t, err)
|
||||
strContent := string(binContent)
|
||||
reader := strings.NewReader(strContent)
|
||||
fmt.Println(strContent)
|
||||
|
||||
ioReader := &IOReader{r: reader, fileSize: int64(len(strContent))}
|
||||
verifyRows(ioReader)
|
||||
|
||||
// empty list
|
||||
reader = strings.NewReader(`[]`)
|
||||
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(2)}, consumer)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("error cases", func(t *testing.T) {
|
||||
// handler is nil
|
||||
reader := strings.NewReader("")
|
||||
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(0)}, nil)
|
||||
assert.Error(t, err)
|
||||
|
||||
|
@ -259,11 +287,6 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
|
|||
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(10)}, consumer)
|
||||
assert.Error(t, err)
|
||||
|
||||
// not valid json format
|
||||
reader = strings.NewReader(`[]`)
|
||||
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(2)}, consumer)
|
||||
assert.Error(t, err)
|
||||
|
||||
// empty file
|
||||
reader = strings.NewReader(``)
|
||||
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(0)}, consumer)
|
||||
|
|
Loading…
Reference in New Issue