[Cherry-pick] Refine json parser for import (#21332)

Signed-off-by: groot <yihua.mo@zilliz.com>
pull/21352/head
groot 2022-12-22 10:03:27 +08:00 committed by GitHub
parent a21892ac1a
commit 3d1f7a88a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 653 additions and 854 deletions

View File

@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"path"
"runtime/debug"
"strconv"
@ -121,24 +122,26 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi
return segmentData
}
func parseFloat(s string, bitsize int, fieldName string) (float64, error) {
value, err := strconv.ParseFloat(s, bitsize)
if err != nil {
return 0, fmt.Errorf("failed to parse value '%s' for field '%s', error: %w", s, fieldName, err)
}
// not allow not-a-number and infinity
if math.IsNaN(value) || math.IsInf(value, -1) || math.IsInf(value, 1) {
return 0, fmt.Errorf("value '%s' is not a number or infinity, field '%s', error: %w", s, fieldName, err)
}
return value, nil
}
// initValidators constructs valiator methods and data conversion methods
func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[storage.FieldID]*Validator) error {
if collectionSchema == nil {
return errors.New("collection schema is nil")
}
// json decoder parse all the numeric value into float64
numericValidator := func(fieldName string) func(obj interface{}) error {
return func(obj interface{}) error {
switch obj.(type) {
case json.Number:
return nil
default:
return fmt.Errorf("illegal value %v for numeric type field '%s'", obj, fieldName)
}
}
}
for i := 0; i < len(collectionSchema.Fields); i++ {
schema := collectionSchema.Fields[i]
@ -150,91 +153,99 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
switch schema.DataType {
case schemapb.DataType_Bool:
validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error {
switch obj.(type) {
case bool:
return nil
default:
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
if value, ok := obj.(bool); ok {
field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value)
field.(*storage.BoolFieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for bool type field '%s'", obj, schema.GetName())
}
}
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
value := obj.(bool)
field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value)
field.(*storage.BoolFieldData).NumRows[0]++
return nil
}
case schemapb.DataType_Float:
validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName())
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
value, err := strconv.ParseFloat(string(obj.(json.Number)), 32)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for float type field '%s', error: %w",
obj, schema.GetName(), err)
if num, ok := obj.(json.Number); ok {
value, err := parseFloat(string(num), 32, schema.GetName())
if err != nil {
return err
}
field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, float32(value))
field.(*storage.FloatFieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for float type field '%s'", obj, schema.GetName())
}
field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, float32(value))
field.(*storage.FloatFieldData).NumRows[0]++
return nil
}
case schemapb.DataType_Double:
validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName())
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
value, err := strconv.ParseFloat(string(obj.(json.Number)), 32)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for double type field '%s', error: %w",
obj, schema.GetName(), err)
if num, ok := obj.(json.Number); ok {
value, err := parseFloat(string(num), 64, schema.GetName())
if err != nil {
return err
}
field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value)
field.(*storage.DoubleFieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for double type field '%s'", obj, schema.GetName())
}
field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value)
field.(*storage.DoubleFieldData).NumRows[0]++
return nil
}
case schemapb.DataType_Int8:
validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName())
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
value, err := strconv.ParseInt(string(obj.(json.Number)), 10, 8)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for int8 type field '%s', error: %w",
obj, schema.GetName(), err)
if num, ok := obj.(json.Number); ok {
value, err := strconv.ParseInt(string(num), 0, 8)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for int8 field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, int8(value))
field.(*storage.Int8FieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for int8 type field '%s'", obj, schema.GetName())
}
field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, int8(value))
field.(*storage.Int8FieldData).NumRows[0]++
return nil
}
case schemapb.DataType_Int16:
validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName())
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
value, err := strconv.ParseInt(string(obj.(json.Number)), 10, 16)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for int16 type field '%s', error: %w",
obj, schema.GetName(), err)
if num, ok := obj.(json.Number); ok {
value, err := strconv.ParseInt(string(num), 0, 16)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for int16 field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, int16(value))
field.(*storage.Int16FieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for int16 type field '%s'", obj, schema.GetName())
}
field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, int16(value))
field.(*storage.Int16FieldData).NumRows[0]++
return nil
}
case schemapb.DataType_Int32:
validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName())
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
value, err := strconv.ParseInt(string(obj.(json.Number)), 10, 32)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for int32 type field '%s', error: %w",
obj, schema.GetName(), err)
if num, ok := obj.(json.Number); ok {
value, err := strconv.ParseInt(string(num), 0, 32)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for int32 field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, int32(value))
field.(*storage.Int32FieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for int32 type field '%s'", obj, schema.GetName())
}
field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, int32(value))
field.(*storage.Int32FieldData).NumRows[0]++
return nil
}
case schemapb.DataType_Int64:
validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName())
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
value, err := strconv.ParseInt(string(obj.(json.Number)), 10, 64)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for int64 type field '%s', error: %w",
obj, schema.GetName(), err)
if num, ok := obj.(json.Number); ok {
value, err := strconv.ParseInt(string(num), 0, 64)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for int64 field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value)
field.(*storage.Int64FieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for int64 type field '%s'", obj, schema.GetName())
}
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value)
field.(*storage.Int64FieldData).NumRows[0]++
return nil
}
case schemapb.DataType_BinaryVector:
@ -244,33 +255,26 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
}
validators[schema.GetFieldID()].dimension = dim
validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error {
switch vt := obj.(type) {
case []interface{}:
if len(vt)*8 != dim {
return fmt.Errorf("bit size %d doesn't equal to vector dimension %d of field '%s'", len(vt)*8, dim, schema.GetName())
}
numValidateFunc := numericValidator(schema.GetName())
for i := 0; i < len(vt); i++ {
if e := numValidateFunc(vt[i]); e != nil {
return e
}
}
return nil
default:
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
arr, ok := obj.([]interface{})
if !ok {
return fmt.Errorf("'%v' is not an array for binary vector field '%s'", obj, schema.GetName())
}
}
// we use uint8 to represent binary vector in json file, each uint8 value represents 8 dimensions.
if len(arr)*8 != dim {
return fmt.Errorf("bit size %d doesn't equal to vector dimension %d of field '%s'", len(arr)*8, dim, schema.GetName())
}
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
arr := obj.([]interface{})
for i := 0; i < len(arr); i++ {
value, err := strconv.ParseUint(string(arr[i].(json.Number)), 10, 8)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for binary vector type field '%s', error: %w",
obj, schema.GetName(), err)
if num, ok := arr[i].(json.Number); ok {
value, err := strconv.ParseUint(string(num), 0, 8)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for binary vector field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.BinaryVectorFieldData).Data = append(field.(*storage.BinaryVectorFieldData).Data, byte(value))
} else {
return fmt.Errorf("illegal value '%v' for binary vector field '%s'", obj, schema.GetName())
}
field.(*storage.BinaryVectorFieldData).Data = append(field.(*storage.BinaryVectorFieldData).Data, byte(value))
}
field.(*storage.BinaryVectorFieldData).NumRows[0]++
@ -283,52 +287,40 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
}
validators[schema.GetFieldID()].dimension = dim
validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error {
switch vt := obj.(type) {
case []interface{}:
if len(vt) != dim {
return fmt.Errorf("array size %d doesn't equal to vector dimension %d of field '%s'", len(vt), dim, schema.GetName())
}
numValidateFunc := numericValidator(schema.GetName())
for i := 0; i < len(vt); i++ {
if e := numValidateFunc(vt[i]); e != nil {
return e
}
}
return nil
default:
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
arr, ok := obj.([]interface{})
if !ok {
return fmt.Errorf("'%v' is not an array for float vector field '%s'", obj, schema.GetName())
}
}
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
arr := obj.([]interface{})
for i := 0; i < len(arr); i++ {
value, err := strconv.ParseFloat(string(arr[i].(json.Number)), 32)
if err != nil {
return fmt.Errorf("failed to parse value '%v' for binary vector type field '%s', error: %w",
obj, schema.GetName(), err)
}
field.(*storage.FloatVectorFieldData).Data = append(field.(*storage.FloatVectorFieldData).Data, float32(value))
if len(arr) != dim {
return fmt.Errorf("array size %d doesn't equal to vector dimension %d of field '%s'", len(arr), dim, schema.GetName())
}
for i := 0; i < len(arr); i++ {
if num, ok := arr[i].(json.Number); ok {
value, err := parseFloat(string(num), 32, schema.GetName())
if err != nil {
return err
}
field.(*storage.FloatVectorFieldData).Data = append(field.(*storage.FloatVectorFieldData).Data, float32(value))
} else {
return fmt.Errorf("illegal value '%v' for float vector field '%s'", obj, schema.GetName())
}
}
field.(*storage.FloatVectorFieldData).NumRows[0]++
return nil
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
validators[schema.GetFieldID()].isString = true
validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error {
switch obj.(type) {
case string:
return nil
default:
return fmt.Errorf("'%v' is not a string for varchar type field '%s'", obj, schema.GetName())
}
}
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
value := obj.(string)
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value)
field.(*storage.StringFieldData).NumRows[0]++
if value, ok := obj.(string); ok {
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value)
field.(*storage.StringFieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for varchar type field '%s'", obj, schema.GetName())
}
return nil
}
default:

View File

@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"errors"
"math"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
@ -27,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
)
// sampleSchema() return a schema contains all supported data types with an int64 primary key
func sampleSchema() *schemapb.CollectionSchema {
schema := &schemapb.CollectionSchema{
Name: "schema",
@ -35,35 +37,35 @@ func sampleSchema() *schemapb.CollectionSchema {
Fields: []*schemapb.FieldSchema{
{
FieldID: 102,
Name: "field_bool",
Name: "FieldBool",
IsPrimaryKey: false,
Description: "bool",
DataType: schemapb.DataType_Bool,
},
{
FieldID: 103,
Name: "field_int8",
Name: "FieldInt8",
IsPrimaryKey: false,
Description: "int8",
DataType: schemapb.DataType_Int8,
},
{
FieldID: 104,
Name: "field_int16",
Name: "FieldInt16",
IsPrimaryKey: false,
Description: "int16",
DataType: schemapb.DataType_Int16,
},
{
FieldID: 105,
Name: "field_int32",
Name: "FieldInt32",
IsPrimaryKey: false,
Description: "int32",
DataType: schemapb.DataType_Int32,
},
{
FieldID: 106,
Name: "field_int64",
Name: "FieldInt64",
IsPrimaryKey: true,
AutoID: false,
Description: "int64",
@ -71,21 +73,21 @@ func sampleSchema() *schemapb.CollectionSchema {
},
{
FieldID: 107,
Name: "field_float",
Name: "FieldFloat",
IsPrimaryKey: false,
Description: "float",
DataType: schemapb.DataType_Float,
},
{
FieldID: 108,
Name: "field_double",
Name: "FieldDouble",
IsPrimaryKey: false,
Description: "double",
DataType: schemapb.DataType_Double,
},
{
FieldID: 109,
Name: "field_string",
Name: "FieldString",
IsPrimaryKey: false,
Description: "string",
DataType: schemapb.DataType_VarChar,
@ -95,7 +97,7 @@ func sampleSchema() *schemapb.CollectionSchema {
},
{
FieldID: 110,
Name: "field_binary_vector",
Name: "FieldBinaryVector",
IsPrimaryKey: false,
Description: "binary_vector",
DataType: schemapb.DataType_BinaryVector,
@ -105,7 +107,7 @@ func sampleSchema() *schemapb.CollectionSchema {
},
{
FieldID: 111,
Name: "field_float_vector",
Name: "FieldFloatVector",
IsPrimaryKey: false,
Description: "float_vector",
DataType: schemapb.DataType_FloatVector,
@ -118,6 +120,24 @@ func sampleSchema() *schemapb.CollectionSchema {
return schema
}
// sampleContent/sampleRow is json structs to represent sampleSchema() for testing
type sampleRow struct {
FieldBool bool
FieldInt8 int8
FieldInt16 int16
FieldInt32 int32
FieldInt64 int64
FieldFloat float32
FieldDouble float64
FieldString string
FieldBinaryVector []int
FieldFloatVector []float32
}
type sampleContent struct {
Rows []sampleRow
}
// strKeySchema() return a schema with a varchar primary key
func strKeySchema() *schemapb.CollectionSchema {
schema := &schemapb.CollectionSchema{
Name: "schema",
@ -126,7 +146,7 @@ func strKeySchema() *schemapb.CollectionSchema {
Fields: []*schemapb.FieldSchema{
{
FieldID: 101,
Name: "uid",
Name: "UID",
IsPrimaryKey: true,
AutoID: false,
Description: "uid",
@ -137,21 +157,21 @@ func strKeySchema() *schemapb.CollectionSchema {
},
{
FieldID: 102,
Name: "int_scalar",
Name: "FieldInt32",
IsPrimaryKey: false,
Description: "int_scalar",
DataType: schemapb.DataType_Int32,
},
{
FieldID: 103,
Name: "float_scalar",
Name: "FieldFloat",
IsPrimaryKey: false,
Description: "float_scalar",
DataType: schemapb.DataType_Float,
},
{
FieldID: 104,
Name: "string_scalar",
Name: "FieldString",
IsPrimaryKey: false,
Description: "string_scalar",
DataType: schemapb.DataType_VarChar,
@ -161,14 +181,14 @@ func strKeySchema() *schemapb.CollectionSchema {
},
{
FieldID: 105,
Name: "bool_scalar",
Name: "FieldBool",
IsPrimaryKey: false,
Description: "bool_scalar",
DataType: schemapb.DataType_Bool,
},
{
FieldID: 106,
Name: "vectors",
Name: "FieldFloatVector",
IsPrimaryKey: false,
Description: "vectors",
DataType: schemapb.DataType_FloatVector,
@ -181,6 +201,19 @@ func strKeySchema() *schemapb.CollectionSchema {
return schema
}
// strKeyContent/strKeyRow is json structs to represent strKeySchema() for testing
type strKeyRow struct {
UID string
FieldInt32 int32
FieldFloat float32
FieldString string
FieldBool bool
FieldFloatVector []float32
}
type strKeyContent struct {
Rows []strKeyRow
}
func jsonNumber(value string) json.Number {
return json.Number(value)
}
@ -232,6 +265,40 @@ func Test_InitSegmentData(t *testing.T) {
assert.Nil(t, data)
}
func Test_parseFloat(t *testing.T) {
value, err := parseFloat("dummy", 32, "")
assert.Zero(t, value)
assert.Error(t, err)
value, err = parseFloat("NaN", 32, "")
assert.Zero(t, value)
assert.Error(t, err)
value, err = parseFloat("Inf", 32, "")
assert.Zero(t, value)
assert.Error(t, err)
value, err = parseFloat("Infinity", 32, "")
assert.Zero(t, value)
assert.Error(t, err)
value, err = parseFloat("3.5e+38", 32, "")
assert.Zero(t, value)
assert.Error(t, err)
value, err = parseFloat("1.8e+308", 64, "")
assert.Zero(t, value)
assert.Error(t, err)
value, err = parseFloat("3.14159", 32, "")
assert.True(t, math.Abs(value-3.14159) < 0.000001)
assert.Nil(t, err)
value, err = parseFloat("2.718281828459045", 64, "")
assert.True(t, math.Abs(value-2.718281828459045) < 0.0000000000000001)
assert.Nil(t, err)
}
func Test_InitValidators(t *testing.T) {
validators := make(map[storage.FieldID]*Validator)
err := initValidators(nil, validators)
@ -242,6 +309,18 @@ func Test_InitValidators(t *testing.T) {
err = initValidators(schema, validators)
assert.Nil(t, err)
assert.Equal(t, len(schema.Fields), len(validators))
for _, field := range schema.Fields {
fieldID := field.GetFieldID()
assert.Equal(t, field.GetName(), validators[fieldID].fieldName)
assert.Equal(t, field.GetIsPrimaryKey(), validators[fieldID].primaryKey)
assert.Equal(t, field.GetAutoID(), validators[fieldID].autoID)
if field.GetDataType() != schemapb.DataType_VarChar && field.GetDataType() != schemapb.DataType_String {
assert.False(t, validators[fieldID].isString)
} else {
assert.True(t, validators[fieldID].isString)
}
}
name2ID := make(map[string]storage.FieldID)
for _, field := range schema.Fields {
name2ID[field.GetName()] = field.GetFieldID()
@ -250,16 +329,6 @@ func Test_InitValidators(t *testing.T) {
fields := initSegmentData(schema)
assert.NotNil(t, fields)
checkValidateFunc := func(funcName string, validVal interface{}, invalidVal interface{}) {
id := name2ID[funcName]
v, ok := validators[id]
assert.True(t, ok)
err = v.validateFunc(validVal)
assert.Nil(t, err)
err = v.validateFunc(invalidVal)
assert.NotNil(t, err)
}
checkConvertFunc := func(funcName string, validVal interface{}, invalidVal interface{}) {
id := name2ID[funcName]
v, ok := validators[id]
@ -272,83 +341,61 @@ func Test_InitValidators(t *testing.T) {
postNum := fieldData.RowNum()
assert.Equal(t, 1, postNum-preNum)
if invalidVal != nil {
err = v.convertFunc(invalidVal, fieldData)
assert.NotNil(t, err)
}
err = v.convertFunc(invalidVal, fieldData)
assert.NotNil(t, err)
}
t.Run("check validate functions", func(t *testing.T) {
var validVal interface{} = true
var invalidVal interface{} = "aa"
checkValidateFunc("field_bool", validVal, invalidVal)
validVal = jsonNumber("100")
invalidVal = "aa"
checkValidateFunc("field_int8", validVal, invalidVal)
checkValidateFunc("field_int16", validVal, invalidVal)
checkValidateFunc("field_int32", validVal, invalidVal)
checkValidateFunc("field_int64", validVal, invalidVal)
checkValidateFunc("field_float", validVal, invalidVal)
checkValidateFunc("field_double", validVal, invalidVal)
validVal = "aa"
invalidVal = 100
checkValidateFunc("field_string", validVal, invalidVal)
// the binary vector dimension is 16, shoud input 2 uint8 values
validVal = []interface{}{jsonNumber("100"), jsonNumber("101")}
invalidVal = "aa"
checkValidateFunc("field_binary_vector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("100")}
checkValidateFunc("field_binary_vector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("100"), jsonNumber("101"), jsonNumber("102")}
checkValidateFunc("field_binary_vector", validVal, invalidVal)
invalidVal = []interface{}{100, jsonNumber("100")}
checkValidateFunc("field_binary_vector", validVal, invalidVal)
// the float vector dimension is 4, shoud input 4 float values
validVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), jsonNumber("4")}
invalidVal = true
checkValidateFunc("field_float_vector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3")}
checkValidateFunc("field_float_vector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), jsonNumber("4"), jsonNumber("5")}
checkValidateFunc("field_float_vector", validVal, invalidVal)
invalidVal = []interface{}{"a", "b", "c", "d"}
checkValidateFunc("field_float_vector", validVal, invalidVal)
})
t.Run("check convert functions", func(t *testing.T) {
var validVal interface{} = true
var invalidVal interface{}
checkConvertFunc("field_bool", validVal, invalidVal)
var invalidVal interface{} = 5
checkConvertFunc("FieldBool", validVal, invalidVal)
validVal = jsonNumber("100")
invalidVal = jsonNumber("128")
checkConvertFunc("field_int8", validVal, invalidVal)
checkConvertFunc("FieldInt8", validVal, invalidVal)
invalidVal = jsonNumber("65536")
checkConvertFunc("field_int16", validVal, invalidVal)
checkConvertFunc("FieldInt16", validVal, invalidVal)
invalidVal = jsonNumber("2147483648")
checkConvertFunc("field_int32", validVal, invalidVal)
checkConvertFunc("FieldInt32", validVal, invalidVal)
invalidVal = jsonNumber("1.2")
checkConvertFunc("field_int64", validVal, invalidVal)
checkConvertFunc("FieldInt64", validVal, invalidVal)
invalidVal = jsonNumber("dummy")
checkConvertFunc("field_float", validVal, invalidVal)
checkConvertFunc("field_double", validVal, invalidVal)
checkConvertFunc("FieldFloat", validVal, invalidVal)
checkConvertFunc("FieldDouble", validVal, invalidVal)
invalidVal = "6"
checkConvertFunc("FieldInt8", validVal, invalidVal)
checkConvertFunc("FieldInt16", validVal, invalidVal)
checkConvertFunc("FieldInt32", validVal, invalidVal)
checkConvertFunc("FieldInt64", validVal, invalidVal)
checkConvertFunc("FieldFloat", validVal, invalidVal)
checkConvertFunc("FieldDouble", validVal, invalidVal)
validVal = "aa"
checkConvertFunc("field_string", validVal, nil)
checkConvertFunc("FieldString", validVal, nil)
// the binary vector dimension is 16, shoud input two uint8 values, each value should between 0~255
validVal = []interface{}{jsonNumber("100"), jsonNumber("101")}
invalidVal = []interface{}{jsonNumber("100"), jsonNumber("256")}
checkConvertFunc("field_binary_vector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("100"), jsonNumber("1256")}
checkConvertFunc("FieldBinaryVector", validVal, invalidVal)
invalidVal = false
checkConvertFunc("FieldBinaryVector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("100")}
checkConvertFunc("FieldBinaryVector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("100"), 0}
checkConvertFunc("FieldBinaryVector", validVal, invalidVal)
// the float vector dimension is 4, each value should be valid float number
validVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), jsonNumber("4")}
invalidVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), jsonNumber("dummy")}
checkConvertFunc("field_float_vector", validVal, invalidVal)
checkConvertFunc("FieldFloatVector", validVal, invalidVal)
invalidVal = false
checkConvertFunc("FieldFloatVector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("1")}
checkConvertFunc("FieldFloatVector", validVal, invalidVal)
invalidVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), true}
checkConvertFunc("FieldFloatVector", validVal, invalidVal)
})
t.Run("init error cases", func(t *testing.T) {
@ -360,7 +407,7 @@ func Test_InitValidators(t *testing.T) {
}
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
FieldID: 111,
Name: "field_float_vector",
Name: "FieldFloatVector",
IsPrimaryKey: false,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
@ -375,7 +422,7 @@ func Test_InitValidators(t *testing.T) {
schema.Fields = make([]*schemapb.FieldSchema, 0)
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
FieldID: 110,
Name: "field_binary_vector",
Name: "FieldBinaryVector",
IsPrimaryKey: false,
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
@ -410,7 +457,7 @@ func Test_GetFileNameAndExt(t *testing.T) {
func Test_GetFieldDimension(t *testing.T) {
schema := &schemapb.FieldSchema{
FieldID: 111,
Name: "field_float_vector",
Name: "FieldFloatVector",
IsPrimaryKey: false,
Description: "float_vector",
DataType: schemapb.DataType_FloatVector,

View File

@ -513,26 +513,27 @@ func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) er
// parse file
reader := bufio.NewReader(file)
parser := NewJSONParser(p.ctx, p.collectionSchema)
var consumer *JSONRowConsumer
if !onlyValidate {
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
// if only validate, we input a empty flushFunc so that the consumer do nothing but only validation.
var flushFunc ImportFlushFunc
if onlyValidate {
flushFunc = func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return nil
}
} else {
flushFunc = func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
var filePaths = []string{filePath}
printFieldsDataInfo(fields, "import wrapper: prepare to flush binlogs", filePaths)
return p.flushFunc(fields, shardID)
}
consumer, err = NewJSONRowConsumer(p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize, flushFunc)
if err != nil {
return err
}
}
validator, err := NewJSONRowValidator(p.collectionSchema, consumer)
consumer, err := NewJSONRowConsumer(p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize, flushFunc)
if err != nil {
return err
}
err = parser.ParseRows(reader, validator)
err = parser.ParseRows(reader, consumer)
if err != nil {
return err
}

View File

@ -239,11 +239,11 @@ func Test_ImportWrapperRowBased(t *testing.T) {
content := []byte(`{
"rows":[
{"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]},
{"field_bool": false, "field_int8": 11, "field_int16": 102, "field_int32": 1002, "field_int64": 10002, "field_float": 3.15, "field_double": 2.56, "field_string": "hello world", "field_binary_vector": [253, 0], "field_float_vector": [2.1, 2.2, 2.3, 2.4]},
{"field_bool": true, "field_int8": 12, "field_int16": 103, "field_int32": 1003, "field_int64": 10003, "field_float": 3.16, "field_double": 3.56, "field_string": "hello world", "field_binary_vector": [252, 0], "field_float_vector": [3.1, 3.2, 3.3, 3.4]},
{"field_bool": false, "field_int8": 13, "field_int16": 104, "field_int32": 1004, "field_int64": 10004, "field_float": 3.17, "field_double": 4.56, "field_string": "hello world", "field_binary_vector": [251, 0], "field_float_vector": [4.1, 4.2, 4.3, 4.4]},
{"field_bool": true, "field_int8": 14, "field_int16": 105, "field_int32": 1005, "field_int64": 10005, "field_float": 3.18, "field_double": 5.56, "field_string": "hello world", "field_binary_vector": [250, 0], "field_float_vector": [5.1, 5.2, 5.3, 5.4]}
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]},
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
{"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]},
{"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]}
]
}`)
@ -286,7 +286,7 @@ func Test_ImportWrapperRowBased(t *testing.T) {
// parse error
content = []byte(`{
"rows":[
{"field_bool": true, "field_int8": false, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": true, "FieldInt8": false, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
]
}`)
@ -314,70 +314,70 @@ func createSampleNumpyFiles(t *testing.T, cm storage.ChunkManager) []string {
ctx := context.Background()
files := make([]string, 0)
filePath := path.Join(cm.RootPath(), "field_bool.npy")
filePath := path.Join(cm.RootPath(), "FieldBool.npy")
content, err := CreateNumpyData([]bool{true, false, true, true, true})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_int8.npy")
filePath = path.Join(cm.RootPath(), "FieldInt8.npy")
content, err = CreateNumpyData([]int8{10, 11, 12, 13, 14})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_int16.npy")
filePath = path.Join(cm.RootPath(), "FieldInt16.npy")
content, err = CreateNumpyData([]int16{100, 101, 102, 103, 104})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_int32.npy")
filePath = path.Join(cm.RootPath(), "FieldInt32.npy")
content, err = CreateNumpyData([]int32{1000, 1001, 1002, 1003, 1004})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_int64.npy")
filePath = path.Join(cm.RootPath(), "FieldInt64.npy")
content, err = CreateNumpyData([]int64{10000, 10001, 10002, 10003, 10004})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_float.npy")
filePath = path.Join(cm.RootPath(), "FieldFloat.npy")
content, err = CreateNumpyData([]float32{3.14, 3.15, 3.16, 3.17, 3.18})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_double.npy")
filePath = path.Join(cm.RootPath(), "FieldDouble.npy")
content, err = CreateNumpyData([]float64{5.1, 5.2, 5.3, 5.4, 5.5})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_string.npy")
filePath = path.Join(cm.RootPath(), "FieldString.npy")
content, err = CreateNumpyData([]string{"a", "bb", "ccc", "dd", "e"})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_binary_vector.npy")
filePath = path.Join(cm.RootPath(), "FieldBinaryVector.npy")
content, err = CreateNumpyData([][2]uint8{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files = append(files, filePath)
filePath = path.Join(cm.RootPath(), "field_float_vector.npy")
filePath = path.Join(cm.RootPath(), "FieldFloatVector.npy")
content, err = CreateNumpyData([][4]float32{{1, 2, 3, 4}, {3, 4, 5, 6}, {5, 6, 7, 8}, {7, 8, 9, 10}, {9, 10, 11, 12}})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
@ -431,7 +431,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
// row count of fields not equal
filePath := path.Join(cm.RootPath(), "field_int8.npy")
filePath := path.Join(cm.RootPath(), "FieldInt8.npy")
content, err := CreateNumpyData([]int8{10})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
@ -780,11 +780,11 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
content := []byte(`{
"rows":[
{"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]},
{"field_bool": false, "field_int8": 11, "field_int16": 102, "field_int32": 1002, "field_int64": 10002, "field_float": 3.15, "field_double": 2.56, "field_string": "hello world", "field_binary_vector": [253, 0], "field_float_vector": [2.1, 2.2, 2.3, 2.4]},
{"field_bool": true, "field_int8": 12, "field_int16": 103, "field_int32": 1003, "field_int64": 10003, "field_float": 3.16, "field_double": 3.56, "field_string": "hello world", "field_binary_vector": [252, 0], "field_float_vector": [3.1, 3.2, 3.3, 3.4]},
{"field_bool": false, "field_int8": 13, "field_int16": 104, "field_int32": 1004, "field_int64": 10004, "field_float": 3.17, "field_double": 4.56, "field_string": "hello world", "field_binary_vector": [251, 0], "field_float_vector": [4.1, 4.2, 4.3, 4.4]},
{"field_bool": true, "field_int8": 14, "field_int16": 105, "field_int32": 1005, "field_int64": 10005, "field_float": 3.18, "field_double": 5.56, "field_string": "hello world", "field_binary_vector": [250, 0], "field_float_vector": [5.1, 5.2, 5.3, 5.4]}
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]},
{"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]},
{"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]},
{"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]}
]
}`)

View File

@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
"go.uber.org/zap"
@ -40,92 +39,12 @@ type JSONRowHandler interface {
// Validator is field value validator
type Validator struct {
validateFunc func(obj interface{}) error // validate data type function
convertFunc func(obj interface{}, field storage.FieldData) error // convert data function
primaryKey bool // true for primary key
autoID bool // only for primary key field
isString bool // for string field
dimension int // only for vector field
fieldName string // field name
}
// JSONRowValidator is row-based json format validator class
type JSONRowValidator struct {
downstream JSONRowHandler // downstream processor, typically is a JSONRowComsumer
validators map[storage.FieldID]*Validator // validators for each field
rowCounter int64 // how many rows have been validated
}
func NewJSONRowValidator(collectionSchema *schemapb.CollectionSchema, downstream JSONRowHandler) (*JSONRowValidator, error) {
v := &JSONRowValidator{
validators: make(map[storage.FieldID]*Validator),
downstream: downstream,
rowCounter: 0,
}
err := initValidators(collectionSchema, v.validators)
if err != nil {
log.Error("JSON row validator: failed to initialize json row-based validator", zap.Error(err))
return nil, err
}
return v, nil
}
func (v *JSONRowValidator) ValidateCount() int64 {
return v.rowCounter
}
func (v *JSONRowValidator) Handle(rows []map[storage.FieldID]interface{}) error {
if v == nil || v.validators == nil || len(v.validators) == 0 {
log.Error("JSON row validator is not initialized")
return errors.New("JSON row validator is not initialized")
}
// parse completed
if rows == nil {
log.Info("JSON row validation finished")
if v.downstream != nil && !reflect.ValueOf(v.downstream).IsNil() {
return v.downstream.Handle(rows)
}
return nil
}
for i := 0; i < len(rows); i++ {
row := rows[i]
for id, validator := range v.validators {
value, ok := row[id]
if validator.primaryKey && validator.autoID {
// primary key is auto-generated, if user provided it, return error
if ok {
log.Error("JSON row validator: primary key is auto-generated, no need to provide PK value at the row",
zap.String("fieldName", validator.fieldName), zap.Int64("rowNumber", v.rowCounter+int64(i)))
return fmt.Errorf("the primary key '%s' is auto-generated, no need to provide PK value at the row %d",
validator.fieldName, v.rowCounter+int64(i))
}
continue
}
if !ok {
log.Error("JSON row validator: field missed at the row",
zap.String("fieldName", validator.fieldName), zap.Int64("rowNumber", v.rowCounter+int64(i)))
return fmt.Errorf("the field '%s' missed at the row %d", validator.fieldName, v.rowCounter+int64(i))
}
if err := validator.validateFunc(value); err != nil {
log.Error("JSON row validator: invalid value at the row", zap.String("fieldName", validator.fieldName),
zap.Int64("rowNumber", v.rowCounter+int64(i)), zap.Any("value", value), zap.Error(err))
return fmt.Errorf("the field '%s' value at the row %d is invalid, error: %s",
validator.fieldName, v.rowCounter+int64(i), err.Error())
}
}
}
v.rowCounter += int64(len(rows))
if v.downstream != nil && !reflect.ValueOf(v.downstream).IsNil() {
return v.downstream.Handle(rows)
}
return nil
convertFunc func(obj interface{}, field storage.FieldData) error // convert data function
primaryKey bool // true for primary key
autoID bool // only for primary key field
isString bool // for string field
dimension int // only for vector field
fieldName string // field name
}
// JSONRowConsumer is row-based json format consumer class
@ -203,6 +122,10 @@ func (v *JSONRowConsumer) IDRange() []int64 {
return v.autoIDRange
}
func (v *JSONRowConsumer) RowCount() int64 {
return v.rowCounter
}
func (v *JSONRowConsumer) flush(force bool) error {
// force flush all data
if force {
@ -277,6 +200,10 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
var rowIDBegin typeutil.UniqueID
var rowIDEnd typeutil.UniqueID
if primaryValidator.autoID {
if v.rowIDAllocator == nil {
log.Error("JSON row consumer: primary keys is auto-generated but IDAllocator is nil")
return fmt.Errorf("primary keys is auto-generated but IDAllocator is nil")
}
var err error
rowIDBegin, rowIDEnd, err = v.rowIDAllocator.Alloc(uint32(len(rows)))
if err != nil {

View File

@ -19,7 +19,6 @@ package importutil
import (
"context"
"errors"
"strings"
"testing"
"github.com/stretchr/testify/assert"
@ -59,106 +58,6 @@ func newIDAllocator(ctx context.Context, t *testing.T, allocErr error) *allocato
return idAllocator
}
func Test_NewJSONRowValidator(t *testing.T) {
validator, err := NewJSONRowValidator(nil, nil)
assert.NotNil(t, err)
assert.Nil(t, validator)
validator, err = NewJSONRowValidator(sampleSchema(), nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
}
func Test_JSONRowValidator(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schema := sampleSchema()
parser := NewJSONParser(ctx, schema)
assert.NotNil(t, parser)
// 0 row case
reader := strings.NewReader(`{
"rows":[]
}`)
validator, err := NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
assert.Equal(t, int64(0), validator.ValidateCount())
// missed some fields
reader = strings.NewReader(`{
"rows":[
{"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]},
{"field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}
]
}`)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
// invalid dimension
reader = strings.NewReader(`{
"rows":[
{"field_bool": true, "field_int8": true, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0, 1, 66, 128, 0, 1, 66], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}
]
}`)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
// invalid value type
reader = strings.NewReader(`{
"rows":[
{"field_bool": true, "field_int8": true, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}
]
}`)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
// init failed
validator.validators = nil
err = validator.Handle(nil)
assert.NotNil(t, err)
// primary key is auto-generate, but user provide pk value, return error
schema = &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 101,
Name: "ID",
IsPrimaryKey: true,
AutoID: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 102,
Name: "Age",
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
},
},
}
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
reader = strings.NewReader(`{
"rows":[
{"ID": 1, "Age": 2}
]
}`)
parser = NewJSONParser(ctx, schema)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
}
func Test_NewJSONRowConsumer(t *testing.T) {
// nil schema
consumer, err := NewJSONRowConsumer(nil, nil, 2, 16, nil)
@ -203,67 +102,6 @@ func Test_NewJSONRowConsumer(t *testing.T) {
assert.Nil(t, err)
}
func Test_JSONRowConsumer(t *testing.T) {
ctx := context.Background()
idAllocator := newIDAllocator(ctx, t, nil)
schema := sampleSchema()
parser := NewJSONParser(ctx, schema)
assert.NotNil(t, parser)
reader := strings.NewReader(`{
"rows":[
{"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]},
{"field_bool": false, "field_int8": 11, "field_int16": 102, "field_int32": 1002, "field_int64": 10002, "field_float": 3.15, "field_double": 2.56, "field_string": "hello world", "field_binary_vector": [253, 0], "field_float_vector": [2.1, 2.2, 2.3, 2.4]},
{"field_bool": true, "field_int8": 12, "field_int16": 103, "field_int32": 1003, "field_int64": 10003, "field_float": 3.16, "field_double": 3.56, "field_string": "hello world", "field_binary_vector": [252, 0], "field_float_vector": [3.1, 3.2, 3.3, 3.4]},
{"field_bool": false, "field_int8": 13, "field_int16": 104, "field_int32": 1004, "field_int64": 10004, "field_float": 3.17, "field_double": 4.56, "field_string": "hello world", "field_binary_vector": [251, 0], "field_float_vector": [4.1, 4.2, 4.3, 4.4]},
{"field_bool": true, "field_int8": 14, "field_int16": 105, "field_int32": 1005, "field_int64": 10005, "field_float": 3.18, "field_double": 5.56, "field_string": "hello world", "field_binary_vector": [250, 0], "field_float_vector": [5.1, 5.2, 5.3, 5.4]}
]
}`)
var shardNum int32 = 2
var callTime int32
var totalCount int
consumeFunc := func(fields map[storage.FieldID]storage.FieldData, shard int) error {
assert.Equal(t, int(callTime), shard)
callTime++
rowCount := 0
for _, data := range fields {
if rowCount == 0 {
rowCount = data.RowNum()
} else {
assert.Equal(t, rowCount, data.RowNum())
}
}
totalCount += rowCount
return nil
}
consumer, err := NewJSONRowConsumer(schema, idAllocator, shardNum, 1, consumeFunc)
assert.NotNil(t, consumer)
assert.Nil(t, err)
validator, err := NewJSONRowValidator(schema, consumer)
assert.NotNil(t, validator)
assert.Nil(t, err)
err = parser.ParseRows(reader, validator)
assert.Nil(t, err)
assert.Equal(t, int64(5), validator.ValidateCount())
assert.Equal(t, shardNum, callTime)
assert.Equal(t, 5, totalCount)
// parse primary key error
reader = strings.NewReader(`{
"rows":[
{"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 0.5, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}
]
}`)
err = parser.ParseRows(reader, validator)
assert.Error(t, err)
}
func Test_JSONRowConsumerFlush(t *testing.T) {
var callTime int32
var totalCount int
@ -408,6 +246,11 @@ func Test_JSONRowConsumerHandle(t *testing.T) {
assert.Equal(t, 2, len(consumer.autoIDRange))
assert.Equal(t, int64(1), consumer.autoIDRange[0])
assert.Equal(t, int64(1+rowCount), consumer.autoIDRange[1])
// pk is auto-generated byt IDAllocator is nil
consumer.rowIDAllocator = nil
err = consumer.Handle(input)
assert.Error(t, err)
})
t.Run("handle varchar pk", func(t *testing.T) {
@ -452,133 +295,7 @@ func Test_JSONRowConsumerHandle(t *testing.T) {
err = consumer.Handle(input)
assert.Nil(t, err)
assert.Equal(t, int64(rowCount), consumer.rowCounter)
assert.Equal(t, int64(rowCount), consumer.RowCount())
assert.Equal(t, 0, len(consumer.autoIDRange))
})
}
func Test_JSONRowConsumerStringKey(t *testing.T) {
ctx := context.Background()
idAllocator := newIDAllocator(ctx, t, nil)
schema := strKeySchema()
parser := NewJSONParser(ctx, schema)
assert.NotNil(t, parser)
reader := strings.NewReader(`{
"rows": [{
"uid": "Dm4aWrbNzhmjwCTEnCJ9LDPO2N09sqysxgVfbH9Zmn3nBzmwsmk0eZN6x7wSAoPQ",
"int_scalar": 9070353,
"float_scalar": 0.9798043638085004,
"string_scalar": "ShQ44OX0z8kGpRPhaXmfSsdH7JHq5DsZzu0e2umS1hrWG0uONH2RIIAdOECaaXir",
"bool_scalar": true,
"vectors": [0.5040062902126952, 0.8297619818664708, 0.20248342801564806, 0.12834786423659314]
},
{
"uid": "RP50U0d2napRjXu94a8oGikWgklvVsXFurp8RR4tHGw7N0gk1b7opm59k3FCpyPb",
"int_scalar": 8505288,
"float_scalar": 0.937913432198687,
"string_scalar": "Ld4b0avxathBdNvCrtm3QsWO1pYktUVR7WgAtrtozIwrA8vpeactNhJ85CFGQnK5",
"bool_scalar": false,
"vectors": [0.528232122836893, 0.6916116750653186, 0.41443762522548705, 0.26624344144792056]
},
{
"uid": "oxhFkQitWPPw0Bjmj7UQcn4iwvS0CU7RLAC81uQFFQjWtOdiB329CPyWkfGSeYfE",
"int_scalar": 4392660,
"float_scalar": 0.32381232630490264,
"string_scalar": "EmAlB0xdQcxeBtwlZJQnLgKodiuRinynoQtg0eXrjkq24dQohzSm7Bx3zquHd3kO",
"bool_scalar": false,
"vectors": [0.7978693027281338, 0.12394906726785092, 0.42431962903815285, 0.4098707807351914]
},
{
"uid": "sxoEL4Mpk1LdsyXhbNm059UWJ3CvxURLCQczaVI5xtBD4QcVWTDFUW7dBdye6nbn",
"int_scalar": 7927425,
"float_scalar": 0.31074026464844895,
"string_scalar": "fdY2beCvs1wSws0Gb9ySD92xwfEfJpX5DQgsWoISylBAoYOcXpRaqIJoXYS4g269",
"bool_scalar": true,
"vectors": [0.3716157812069954, 0.006981281113265229, 0.9007003458552365, 0.22492634316191004]
},
{
"uid": "g33Rqq2UQSHPRHw5FvuXxf5uGEhIAetxE6UuXXCJj0hafG8WuJr1ueZftsySCqAd",
"int_scalar": 9288807,
"float_scalar": 0.4953578200336135,
"string_scalar": "6f8Iv1zQAGksj5XxMbbI5evTrYrB8fSFQ58jl0oU7Z4BpA81VsD2tlWqkhfoBNa7",
"bool_scalar": false,
"vectors": [0.5921374209648096, 0.04234832587925662, 0.7803878096531548, 0.1964045837884633]
},
{
"uid": "ACIJd7lTXkRgUNmlQk6AbnWIKEEV8Z6OS3vDcm0w9psmt9sH3z1JLg1fNVCqiX3d",
"int_scalar": 1173595,
"float_scalar": 0.9000745450802002,
"string_scalar": "gpj9YctF2ig1l1APkvRzHbVE8PZVKRbk7nvW73qS2uQbY5l7MeIeTPwRBjasbY8z",
"bool_scalar": true,
"vectors": [0.4655121736168688, 0.6195496905333787, 0.5316616196326639, 0.3417492053890768]
},
{
"uid": "f0wRVZZ9u1bEKrAjLeZj3oliEnUjBiUl6TiermeczceBmGe6M2RHONgz3qEogrd5",
"int_scalar": 3722368,
"float_scalar": 0.7212299175768438,
"string_scalar": "xydiejGUlvS49BfBuy1EuYRKt3v2oKwC6pqy7Ga4dGWn3BnQigV4XAGawixDAGHN",
"bool_scalar": false,
"vectors": [0.6173164237304075, 0.374107748459483, 0.3686321416317251, 0.585725336391797]
},
{
"uid": "uXq9q96vUqnDebcUISFkRFT27OjD89DWhok6urXIjTuLzaSWnCVTJkrJXxFctSg0",
"int_scalar": 1940731,
"float_scalar": 0.9524404085944204,
"string_scalar": "ZXSNzR5V3t62fjop7b7DHK56ByAF0INYwycKsu6OxGP4p2j0Obs6l0NUqukypGXd",
"bool_scalar": false,
"vectors": [0.07178869784465443, 0.4208459174227864, 0.5882811425075762, 0.6867753592116734]
},
{
"uid": "EXDDklLvQIfeCJN8cES3b9mdCYDQVhq2iLj8WWA3TPtZ1SZ4Jpidj7OXJidSD7Wn",
"int_scalar": 2158426,
"float_scalar": 0.23770219927963454,
"string_scalar": "9TNeKVSMqTP8Zxs90kaAcB7n6JbIcvFWInzi9JxZQgmYxD5xLYwaCoeUzRiNAxAg",
"bool_scalar": false,
"vectors": [0.5659468293534021, 0.6275816433340369, 0.3978846871291008, 0.3571179679645908]
},
{
"uid": "mlaXOgYvB88WWRpXNyWv6UqpmvIHrC6pRo03AtaPLMpVymu0L9ioO8GWa1XgGyj0",
"int_scalar": 198279,
"float_scalar": 0.020343767010139513,
"string_scalar": "AblYGRZJiMAwDbMEkungG0yKTeuya4FgyliakWWqSOJ5TvQWB9Ki2WXbnvSsYIDF",
"bool_scalar": true,
"vectors": [0.5374636140212398, 0.7655373567912009, 0.05491796821609715, 0.349384366747262]
}
]
}`)
var shardNum int32 = 2
var callTime int32
var totalCount int
consumeFunc := func(fields map[storage.FieldID]storage.FieldData, shard int) error {
assert.Equal(t, int(callTime), shard)
callTime++
rowCount := 0
for _, data := range fields {
if rowCount == 0 {
rowCount = data.RowNum()
} else {
assert.Equal(t, rowCount, data.RowNum())
}
}
totalCount += rowCount
return nil
}
consumer, err := NewJSONRowConsumer(schema, idAllocator, shardNum, 1, consumeFunc)
assert.NotNil(t, consumer)
assert.Nil(t, err)
validator, err := NewJSONRowValidator(schema, consumer)
assert.NotNil(t, validator)
assert.Nil(t, err)
err = parser.ParseRows(reader, validator)
assert.Nil(t, err)
assert.Equal(t, int64(10), validator.ValidateCount())
assert.Equal(t, shardNum, callTime)
assert.Equal(t, 10, totalCount)
}

View File

@ -25,6 +25,7 @@ import (
"strings"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -53,6 +54,15 @@ func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSch
name2FieldID := make(map[string]storage.FieldID)
for i := 0; i < len(collectionSchema.Fields); i++ {
schema := collectionSchema.Fields[i]
// RowIDField and TimeStampField is internal field, no need to parse
if schema.GetFieldID() == common.RowIDField || schema.GetFieldID() == common.TimeStampField {
continue
}
// if primary key field is auto-gernerated, no need to parse
if schema.GetAutoID() {
continue
}
fields[schema.GetName()] = 0
name2FieldID[schema.GetName()] = schema.GetFieldID()
}
@ -89,6 +99,38 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche
parser.bufSize = int64(bufSize)
}
func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}, error) {
stringMap, ok := raw.(map[string]interface{})
if !ok {
log.Error("JSON parser: invalid JSON format, each row should be a key-value map")
return nil, errors.New("invalid JSON format, each row should be a key-value map")
}
row := make(map[storage.FieldID]interface{})
for k, v := range stringMap {
// if user provided redundant field, return error
fieldID, ok := p.name2FieldID[k]
if !ok {
log.Error("JSON parser: the field is not defined in collection schema", zap.String("fieldName", k))
return nil, fmt.Errorf("the field '%s' is not defined in collection schema", k)
}
row[fieldID] = v
}
// some fields not provided?
if len(row) != len(p.name2FieldID) {
for k, v := range p.name2FieldID {
_, ok := row[v]
if !ok {
log.Error("JSON parser: a field value is missed", zap.String("fieldName", k))
return nil, fmt.Errorf("value of field '%s' is missed", k)
}
}
}
return row, nil
}
func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
if handler == nil {
log.Error("JSON parse handler is nil")
@ -151,24 +193,9 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
return fmt.Errorf("failed to parse row value, error: %w", err)
}
switch value.(type) {
case map[string]interface{}:
break
default:
log.Error("JSON parser: invalid JSON format, each row should be a key-value map")
return errors.New("invalid JSON format, each row should be a key-value map")
}
row := make(map[storage.FieldID]interface{})
stringMap := value.(map[string]interface{})
for k, v := range stringMap {
// if user provided redundant field, return error
fieldID, ok := p.name2FieldID[k]
if !ok {
log.Error("JSON parser: the field is not defined in collection schema", zap.String("fieldName", k))
return fmt.Errorf("the field '%s' is not defined in collection schema", k)
}
row[fieldID] = v
row, err := p.verifyRow(value)
if err != nil {
return err
}
buf = append(buf, row)

View File

@ -18,15 +18,38 @@ package importutil
import (
"context"
"encoding/json"
"errors"
"math"
"strconv"
"strings"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
// mock class of JSONRowCounsumer
type mockJSONRowConsumer struct {
handleErr error
rows []map[storage.FieldID]interface{}
handleCount int
}
func (v *mockJSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
if v.handleErr != nil {
return v.handleErr
}
if rows != nil {
v.rows = append(v.rows, rows...)
}
v.handleCount++
return nil
}
func Test_AdjustBufSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -64,237 +87,302 @@ func Test_AdjustBufSize(t *testing.T) {
assert.Equal(t, int64(MinBufferSize), parser.bufSize)
}
func Test_JSONParserParserRows(t *testing.T) {
func Test_JSONParserParseRows_IntPK(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schema := sampleSchema()
parser := NewJSONParser(ctx, schema)
assert.NotNil(t, parser)
parser.bufSize = 1
reader := strings.NewReader(`{
"rows":[
{"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]},
{"field_bool": false, "field_int8": 11, "field_int16": 102, "field_int32": 1002, "field_int64": 10002, "field_float": 3.15, "field_double": 2.56, "field_string": "hello world", "field_binary_vector": [253, 0], "field_float_vector": [2.1, 2.2, 2.3, 2.4]},
{"field_bool": true, "field_int8": 12, "field_int16": 103, "field_int32": 1003, "field_int64": 10003, "field_float": 3.16, "field_double": 3.56, "field_string": "hello world", "field_binary_vector": [252, 0], "field_float_vector": [3.1, 3.2, 3.3, 3.4]},
{"field_bool": false, "field_int8": 13, "field_int16": 104, "field_int32": 1004, "field_int64": 10004, "field_float": 3.17, "field_double": 4.56, "field_string": "hello world", "field_binary_vector": [251, 0], "field_float_vector": [4.1, 4.2, 4.3, 4.4]},
{"field_bool": true, "field_int8": 14, "field_int16": 105, "field_int32": 1005, "field_int64": 10005, "field_float": 3.18, "field_double": 5.56, "field_string": "hello world", "field_binary_vector": [250, 0], "field_float_vector": [5.1, 5.2, 5.3, 5.4]}
]
}`)
// prepare test data
content := &sampleContent{
Rows: make([]sampleRow, 0),
}
for i := 0; i < 10; i++ {
row := sampleRow{
FieldBool: i%2 == 0,
FieldInt8: int8(i % math.MaxInt8),
FieldInt16: int16(100 + i),
FieldInt32: int32(1000 + i),
FieldInt64: int64(99999999999999999 + i),
FieldFloat: 3 + float32(i)/11,
FieldDouble: 1 + float64(i)/7,
FieldString: "No." + strconv.FormatInt(int64(i), 10),
FieldBinaryVector: []int{(200 + i) % math.MaxUint8, 0},
FieldFloatVector: []float32{float32(i) + 0.1, float32(i) + 0.2, float32(i) + 0.3, float32(i) + 0.4},
}
content.Rows = append(content.Rows, row)
}
// handler is nil
err := parser.ParseRows(reader, nil)
assert.NotNil(t, err)
validator, err := NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
binContent, err := json.Marshal(content)
assert.Nil(t, err)
strContent := string(binContent)
reader := strings.NewReader(strContent)
// success
err = parser.ParseRows(reader, validator)
assert.Nil(t, err)
assert.Equal(t, int64(5), validator.ValidateCount())
consumer := &mockJSONRowConsumer{
handleErr: nil,
rows: make([]map[int64]interface{}, 0),
handleCount: 0,
}
// not a row-based format
reader = strings.NewReader(`{
"dummy":[]
}`)
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
t.Run("parse success", func(t *testing.T) {
// set bufSize = 4, means call handle() after reading 4 rows
parser.bufSize = 4
err = parser.ParseRows(reader, consumer)
assert.Nil(t, err)
assert.Equal(t, len(content.Rows), len(consumer.rows))
for i := 0; i < len(consumer.rows); i++ {
contenctRow := content.Rows[i]
parsedRow := consumer.rows[i]
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
v1, ok := parsedRow[102].(bool)
assert.True(t, ok)
assert.Equal(t, contenctRow.FieldBool, v1)
// rows is not a list
reader = strings.NewReader(`{
"rows":
}`)
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
v2, ok := parsedRow[103].(json.Number)
assert.True(t, ok)
assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldInt8), 10), string(v2))
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
v3, ok := parsedRow[104].(json.Number)
assert.True(t, ok)
assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldInt16), 10), string(v3))
// typo
reader = strings.NewReader(`{
"rows": [}
}`)
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
v4, ok := parsedRow[105].(json.Number)
assert.True(t, ok)
assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldInt32), 10), string(v4))
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
v5, ok := parsedRow[106].(json.Number)
assert.True(t, ok)
assert.Equal(t, strconv.FormatInt(contenctRow.FieldInt64, 10), string(v5))
// rows is not a list
reader = strings.NewReader(`{
"rows": {}
}`)
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
v6, ok := parsedRow[107].(json.Number)
assert.True(t, ok)
f32, err := parseFloat(string(v6), 32, "")
assert.Nil(t, err)
assert.InDelta(t, contenctRow.FieldFloat, float32(f32), 10e-6)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
v7, ok := parsedRow[108].(json.Number)
assert.True(t, ok)
f64, err := parseFloat(string(v7), 64, "")
assert.Nil(t, err)
assert.InDelta(t, contenctRow.FieldDouble, f64, 10e-14)
// rows is not a list of list
reader = strings.NewReader(`{
"rows": [[]]
}`)
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
v8, ok := parsedRow[109].(string)
assert.True(t, ok)
assert.Equal(t, contenctRow.FieldString, v8)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
v9, ok := parsedRow[110].([]interface{})
assert.True(t, ok)
assert.Equal(t, len(contenctRow.FieldBinaryVector), len(v9))
for k := 0; k < len(v9); k++ {
val, ok := v9[k].(json.Number)
assert.True(t, ok)
assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldBinaryVector[k]), 10), string(val))
}
// not valid json format
reader = strings.NewReader(`[]`)
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
v10, ok := parsedRow[111].([]interface{})
assert.True(t, ok)
assert.Equal(t, len(contenctRow.FieldFloatVector), len(v10))
for k := 0; k < len(v10); k++ {
val, ok := v10[k].(json.Number)
assert.True(t, ok)
fval, err := parseFloat(string(val), 64, "")
assert.Nil(t, err)
assert.InDelta(t, contenctRow.FieldFloatVector[k], float32(fval), 10e-6)
}
}
})
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
t.Run("error cases", func(t *testing.T) {
// handler is nil
err = parser.ParseRows(reader, nil)
assert.NotNil(t, err)
// empty content
reader = strings.NewReader(`{}`)
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
// not a row-based format
reader = strings.NewReader(`{
"dummy":[]
}`)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// empty content
reader = strings.NewReader(``)
validator, err = NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
assert.Nil(t, err)
// rows is not a list
reader = strings.NewReader(`{
"rows":
}`)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// redundant field
reader = strings.NewReader(`{
"rows":[
{"dummy": 1, "field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]},
]
}`)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
// typo
reader = strings.NewReader(`{
"rows": [}
}`)
// field missed
reader = strings.NewReader(`{
"rows":[
{"field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]},
]
}`)
err = parser.ParseRows(reader, validator)
assert.NotNil(t, err)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// rows is not a list
reader = strings.NewReader(`{
"rows": {}
}`)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// rows is not a list of list
reader = strings.NewReader(`{
"rows": [[]]
}`)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// not valid json format
reader = strings.NewReader(`[]`)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// empty content
reader = strings.NewReader(`{}`)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// empty content
reader = strings.NewReader(``)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// redundant field
reader = strings.NewReader(`{
"rows":[
{"dummy": 1, "FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}
]
}`)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// field missed
reader = strings.NewReader(`{
"rows":[
{"FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}
]
}`)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// handle() error
content := `{
"rows":[
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]},
{"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}
]
}`
consumer.handleErr = errors.New("error")
reader = strings.NewReader(content)
parser.bufSize = 2
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
reader = strings.NewReader(content)
parser.bufSize = 5
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// row count is 0
reader = strings.NewReader(`{
"rows":[]
}`)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
// canceled
consumer.handleErr = nil
cancel()
reader = strings.NewReader(content)
err = parser.ParseRows(reader, consumer)
assert.NotNil(t, err)
})
}
func Test_JSONParserParserRowsStringKey(t *testing.T) {
func Test_JSONParserParseRows_StrPK(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schema := strKeySchema()
parser := NewJSONParser(ctx, schema)
assert.NotNil(t, parser)
parser.bufSize = 1
reader := strings.NewReader(`{
"rows": [{
"uid": "Dm4aWrbNzhmjwCTEnCJ9LDPO2N09sqysxgVfbH9Zmn3nBzmwsmk0eZN6x7wSAoPQ",
"int_scalar": 9070353,
"float_scalar": 0.9798043638085004,
"string_scalar": "ShQ44OX0z8kGpRPhaXmfSsdH7JHq5DsZzu0e2umS1hrWG0uONH2RIIAdOECaaXir",
"bool_scalar": true,
"vectors": [0.5040062902126952, 0.8297619818664708, 0.20248342801564806, 0.12834786423659314]
},
{
"uid": "RP50U0d2napRjXu94a8oGikWgklvVsXFurp8RR4tHGw7N0gk1b7opm59k3FCpyPb",
"int_scalar": 8505288,
"float_scalar": 0.937913432198687,
"string_scalar": "Ld4b0avxathBdNvCrtm3QsWO1pYktUVR7WgAtrtozIwrA8vpeactNhJ85CFGQnK5",
"bool_scalar": false,
"vectors": [0.528232122836893, 0.6916116750653186, 0.41443762522548705, 0.26624344144792056]
},
{
"uid": "oxhFkQitWPPw0Bjmj7UQcn4iwvS0CU7RLAC81uQFFQjWtOdiB329CPyWkfGSeYfE",
"int_scalar": 4392660,
"float_scalar": 0.32381232630490264,
"string_scalar": "EmAlB0xdQcxeBtwlZJQnLgKodiuRinynoQtg0eXrjkq24dQohzSm7Bx3zquHd3kO",
"bool_scalar": false,
"vectors": [0.7978693027281338, 0.12394906726785092, 0.42431962903815285, 0.4098707807351914]
},
{
"uid": "sxoEL4Mpk1LdsyXhbNm059UWJ3CvxURLCQczaVI5xtBD4QcVWTDFUW7dBdye6nbn",
"int_scalar": 7927425,
"float_scalar": 0.31074026464844895,
"string_scalar": "fdY2beCvs1wSws0Gb9ySD92xwfEfJpX5DQgsWoISylBAoYOcXpRaqIJoXYS4g269",
"bool_scalar": true,
"vectors": [0.3716157812069954, 0.006981281113265229, 0.9007003458552365, 0.22492634316191004]
},
{
"uid": "g33Rqq2UQSHPRHw5FvuXxf5uGEhIAetxE6UuXXCJj0hafG8WuJr1ueZftsySCqAd",
"int_scalar": 9288807,
"float_scalar": 0.4953578200336135,
"string_scalar": "6f8Iv1zQAGksj5XxMbbI5evTrYrB8fSFQ58jl0oU7Z4BpA81VsD2tlWqkhfoBNa7",
"bool_scalar": false,
"vectors": [0.5921374209648096, 0.04234832587925662, 0.7803878096531548, 0.1964045837884633]
},
{
"uid": "ACIJd7lTXkRgUNmlQk6AbnWIKEEV8Z6OS3vDcm0w9psmt9sH3z1JLg1fNVCqiX3d",
"int_scalar": 1173595,
"float_scalar": 0.9000745450802002,
"string_scalar": "gpj9YctF2ig1l1APkvRzHbVE8PZVKRbk7nvW73qS2uQbY5l7MeIeTPwRBjasbY8z",
"bool_scalar": true,
"vectors": [0.4655121736168688, 0.6195496905333787, 0.5316616196326639, 0.3417492053890768]
},
{
"uid": "f0wRVZZ9u1bEKrAjLeZj3oliEnUjBiUl6TiermeczceBmGe6M2RHONgz3qEogrd5",
"int_scalar": 3722368,
"float_scalar": 0.7212299175768438,
"string_scalar": "xydiejGUlvS49BfBuy1EuYRKt3v2oKwC6pqy7Ga4dGWn3BnQigV4XAGawixDAGHN",
"bool_scalar": false,
"vectors": [0.6173164237304075, 0.374107748459483, 0.3686321416317251, 0.585725336391797]
},
{
"uid": "uXq9q96vUqnDebcUISFkRFT27OjD89DWhok6urXIjTuLzaSWnCVTJkrJXxFctSg0",
"int_scalar": 1940731,
"float_scalar": 0.9524404085944204,
"string_scalar": "ZXSNzR5V3t62fjop7b7DHK56ByAF0INYwycKsu6OxGP4p2j0Obs6l0NUqukypGXd",
"bool_scalar": false,
"vectors": [0.07178869784465443, 0.4208459174227864, 0.5882811425075762, 0.6867753592116734]
},
{
"uid": "EXDDklLvQIfeCJN8cES3b9mdCYDQVhq2iLj8WWA3TPtZ1SZ4Jpidj7OXJidSD7Wn",
"int_scalar": 2158426,
"float_scalar": 0.23770219927963454,
"string_scalar": "9TNeKVSMqTP8Zxs90kaAcB7n6JbIcvFWInzi9JxZQgmYxD5xLYwaCoeUzRiNAxAg",
"bool_scalar": false,
"vectors": [0.5659468293534021, 0.6275816433340369, 0.3978846871291008, 0.3571179679645908]
},
{
"uid": "mlaXOgYvB88WWRpXNyWv6UqpmvIHrC6pRo03AtaPLMpVymu0L9ioO8GWa1XgGyj0",
"int_scalar": 198279,
"float_scalar": 0.020343767010139513,
"string_scalar": "AblYGRZJiMAwDbMEkungG0yKTeuya4FgyliakWWqSOJ5TvQWB9Ki2WXbnvSsYIDF",
"bool_scalar": true,
"vectors": [0.5374636140212398, 0.7655373567912009, 0.05491796821609715, 0.349384366747262]
}
]
}`)
// prepare test data
content := &strKeyContent{
Rows: make([]strKeyRow, 0),
}
for i := 0; i < 10; i++ {
row := strKeyRow{
UID: "strID_" + strconv.FormatInt(int64(i), 10),
FieldInt32: int32(10000 + i),
FieldFloat: 1 + float32(i)/13,
FieldString: strconv.FormatInt(int64(i+1), 10) + " this string contains unicode character: 🎵",
FieldBool: i%3 == 0,
FieldFloatVector: []float32{float32(i) / 2, float32(i) / 3, float32(i) / 6, float32(i) / 9},
}
content.Rows = append(content.Rows, row)
}
validator, err := NewJSONRowValidator(schema, nil)
assert.NotNil(t, validator)
binContent, err := json.Marshal(content)
assert.Nil(t, err)
strContent := string(binContent)
reader := strings.NewReader(strContent)
err = parser.ParseRows(reader, validator)
consumer := &mockJSONRowConsumer{
handleErr: nil,
rows: make([]map[int64]interface{}, 0),
handleCount: 0,
}
err = parser.ParseRows(reader, consumer)
assert.Nil(t, err)
assert.Equal(t, int64(10), validator.ValidateCount())
assert.Equal(t, len(content.Rows), len(consumer.rows))
for i := 0; i < len(consumer.rows); i++ {
contenctRow := content.Rows[i]
parsedRow := consumer.rows[i]
v1, ok := parsedRow[101].(string)
assert.True(t, ok)
assert.Equal(t, contenctRow.UID, v1)
v2, ok := parsedRow[102].(json.Number)
assert.True(t, ok)
assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldInt32), 10), string(v2))
v3, ok := parsedRow[103].(json.Number)
assert.True(t, ok)
f32, err := parseFloat(string(v3), 32, "")
assert.Nil(t, err)
assert.InDelta(t, contenctRow.FieldFloat, float32(f32), 10e-6)
v4, ok := parsedRow[104].(string)
assert.True(t, ok)
assert.Equal(t, contenctRow.FieldString, v4)
v5, ok := parsedRow[105].(bool)
assert.True(t, ok)
assert.Equal(t, contenctRow.FieldBool, v5)
v6, ok := parsedRow[106].([]interface{})
assert.True(t, ok)
assert.Equal(t, len(contenctRow.FieldFloatVector), len(v6))
for k := 0; k < len(v6); k++ {
val, ok := v6[k].(json.Number)
assert.True(t, ok)
fval, err := parseFloat(string(val), 64, "")
assert.Nil(t, err)
assert.InDelta(t, contenctRow.FieldFloatVector[k], float32(fval), 10e-6)
}
}
}

View File

@ -55,7 +55,7 @@ func Test_NumpyParserValidate(t *testing.T) {
Fields: []*schemapb.FieldSchema{
{
FieldID: 109,
Name: "field_string",
Name: "FieldString",
IsPrimaryKey: false,
Description: "string",
DataType: schemapb.DataType_String,
@ -64,7 +64,7 @@ func Test_NumpyParserValidate(t *testing.T) {
}, flushFunc)
err = p.validate(adapter, "dummy")
assert.NotNil(t, err)
err = p.validate(adapter, "field_string")
err = p.validate(adapter, "FieldString")
assert.NotNil(t, err)
})
@ -87,7 +87,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_double")
err = parser.validate(adapter, "FieldDouble")
assert.Nil(t, err)
assert.Equal(t, len(data1), parser.columnDesc.elementCount)
@ -108,7 +108,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_double")
err = parser.validate(adapter, "FieldDouble")
assert.NotNil(t, err)
// shape mismatch
@ -125,7 +125,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_double")
err = parser.validate(adapter, "FieldDouble")
assert.NotNil(t, err)
})
@ -143,7 +143,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_binary_vector")
err = parser.validate(adapter, "FieldBinaryVector")
assert.Nil(t, err)
assert.Equal(t, len(data1)*len(data1[0]), parser.columnDesc.elementCount)
@ -175,7 +175,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_binary_vector")
err = parser.validate(adapter, "FieldBinaryVector")
assert.NotNil(t, err)
// shape[1] mismatch
@ -192,7 +192,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_binary_vector")
err = parser.validate(adapter, "FieldBinaryVector")
assert.NotNil(t, err)
// dimension mismatch
@ -200,18 +200,18 @@ func Test_NumpyParserValidate(t *testing.T) {
Fields: []*schemapb.FieldSchema{
{
FieldID: 109,
Name: "field_binary_vector",
Name: "FieldBinaryVector",
DataType: schemapb.DataType_BinaryVector,
},
},
}, flushFunc)
err = p.validate(adapter, "field_binary_vector")
err = p.validate(adapter, "FieldBinaryVector")
assert.NotNil(t, err)
})
t.Run("validate float vector", func(t *testing.T) {
filePath := TempFilesPath + "float_vector.npy"
filePath := TempFilesPath + "Float_vector.npy"
data1 := [][4]float32{{0, 0, 0, 0}, {1, 1, 1, 1}, {2, 2, 2, 2}, {3, 3, 3, 3}}
err := CreateNumpyFile(filePath, data1)
assert.Nil(t, err)
@ -224,7 +224,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_float_vector")
err = parser.validate(adapter, "FieldFloatVector")
assert.Nil(t, err)
assert.Equal(t, len(data1)*len(data1[0]), parser.columnDesc.elementCount)
@ -242,7 +242,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_float_vector")
err = parser.validate(adapter, "FieldFloatVector")
assert.NotNil(t, err)
// shape mismatch
@ -259,7 +259,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_float_vector")
err = parser.validate(adapter, "FieldFloatVector")
assert.NotNil(t, err)
// shape[1] mismatch
@ -276,7 +276,7 @@ func Test_NumpyParserValidate(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, adapter)
err = parser.validate(adapter, "field_float_vector")
err = parser.validate(adapter, "FieldFloatVector")
assert.NotNil(t, err)
// dimension mismatch
@ -284,13 +284,13 @@ func Test_NumpyParserValidate(t *testing.T) {
Fields: []*schemapb.FieldSchema{
{
FieldID: 109,
Name: "field_float_vector",
Name: "FieldFloatVector",
DataType: schemapb.DataType_FloatVector,
},
},
}, flushFunc)
err = p.validate(adapter, "field_float_vector")
err = p.validate(adapter, "FieldFloatVector")
assert.NotNil(t, err)
})
}
@ -350,7 +350,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_bool", flushFunc)
checkFunc(data, "FieldBool", flushFunc)
})
t.Run("parse scalar int8", func(t *testing.T) {
@ -365,7 +365,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_int8", flushFunc)
checkFunc(data, "FieldInt8", flushFunc)
})
t.Run("parse scalar int16", func(t *testing.T) {
@ -380,7 +380,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_int16", flushFunc)
checkFunc(data, "FieldInt16", flushFunc)
})
t.Run("parse scalar int32", func(t *testing.T) {
@ -395,7 +395,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_int32", flushFunc)
checkFunc(data, "FieldInt32", flushFunc)
})
t.Run("parse scalar int64", func(t *testing.T) {
@ -410,7 +410,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_int64", flushFunc)
checkFunc(data, "FieldInt64", flushFunc)
})
t.Run("parse scalar float", func(t *testing.T) {
@ -425,7 +425,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_float", flushFunc)
checkFunc(data, "FieldFloat", flushFunc)
})
t.Run("parse scalar double", func(t *testing.T) {
@ -440,7 +440,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_double", flushFunc)
checkFunc(data, "FieldDouble", flushFunc)
})
t.Run("parse scalar varchar", func(t *testing.T) {
@ -455,7 +455,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_string", flushFunc)
checkFunc(data, "FieldString", flushFunc)
})
t.Run("parse binary vector", func(t *testing.T) {
@ -473,7 +473,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_binary_vector", flushFunc)
checkFunc(data, "FieldBinaryVector", flushFunc)
})
t.Run("parse binary vector with float32", func(t *testing.T) {
@ -491,7 +491,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_float_vector", flushFunc)
checkFunc(data, "FieldFloatVector", flushFunc)
})
t.Run("parse binary vector with float64", func(t *testing.T) {
@ -509,7 +509,7 @@ func Test_NumpyParserParse(t *testing.T) {
return nil
}
checkFunc(data, "field_float_vector", flushFunc)
checkFunc(data, "FieldFloatVector", flushFunc)
})
}