feat: Add import reader for json (#29252)

This PR implements a new json reader for import.

issue: https://github.com/milvus-io/milvus/issues/28521

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/29618/head
yihao.dai 2024-01-05 18:12:48 +08:00 committed by GitHub
parent 2f441dc8e2
commit 23183ffb0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 892 additions and 6 deletions

View File

@ -78,13 +78,12 @@ func (i *InsertData) GetRowNum() int {
if i.Data == nil || len(i.Data) == 0 {
return 0
}
data, ok := i.Data[common.RowIDField]
if !ok {
return 0
var rowNum int
for _, data := range i.Data {
rowNum = data.RowNum()
break
}
return data.RowNum()
return rowNum
}
func (i *InsertData) GetMemorySize() int {

View File

@ -0,0 +1,143 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package json
import (
"encoding/json"
"fmt"
"io"
"strings"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
)
const (
RowRootNode = "rows"
)
type Row = map[storage.FieldID]any
type reader struct {
dec *json.Decoder
schema *schemapb.CollectionSchema
bufferSize int
isOldFormat bool
parser RowParser
}
func NewReader(r io.Reader, schema *schemapb.CollectionSchema, bufferSize int) (*reader, error) {
reader := &reader{
dec: json.NewDecoder(r),
schema: schema,
bufferSize: bufferSize,
}
var err error
reader.parser, err = NewRowParser(schema)
if err != nil {
return nil, err
}
err = reader.Init()
if err != nil {
return nil, err
}
return reader, nil
}
func (j *reader) Init() error {
// Treat number value as a string instead of a float64.
// By default, json lib treat all number values as float64,
// but if an int64 value has more than 15 digits,
// the value would be incorrect after converting from float64.
j.dec.UseNumber()
t, err := j.dec.Token()
if err != nil {
return merr.WrapErrImportFailed(fmt.Sprintf("failed to decode JSON, error: %v", err))
}
if t != json.Delim('{') && t != json.Delim('[') {
return merr.WrapErrImportFailed("invalid JSON format, the content should be started with '{' or '['")
}
j.isOldFormat = t == json.Delim('{')
return nil
}
func (j *reader) Read() (*storage.InsertData, error) {
insertData, err := storage.NewInsertData(j.schema)
if err != nil {
return nil, err
}
if !j.dec.More() {
return nil, nil
}
if j.isOldFormat {
// read the key
t, err := j.dec.Token()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to decode the JSON file, error: %v", err))
}
key := t.(string)
keyLower := strings.ToLower(key)
// the root key should be RowRootNode
if keyLower != RowRootNode {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid JSON format, the root key should be '%s', but get '%s'", RowRootNode, key))
}
// started by '['
t, err = j.dec.Token()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to decode the JSON file, error: %v", err))
}
if t != json.Delim('[') {
return nil, merr.WrapErrImportFailed("invalid JSON format, rows list should begin with '['")
}
}
for j.dec.More() {
var value any
if err = j.dec.Decode(&value); err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to parse row, error: %v", err))
}
row, err := j.parser.Parse(value)
if err != nil {
return nil, err
}
err = insertData.Append(row)
if err != nil {
return nil, err
}
if insertData.GetMemorySize() >= j.bufferSize {
break
}
}
if !j.dec.More() {
t, err := j.dec.Token()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to decode JSON, error: %v", err))
}
if t != json.Delim(']') {
return nil, merr.WrapErrImportFailed("invalid JSON format, rows list should end with ']'")
}
}
return insertData, nil
}
func (j *reader) Close() {}

View File

@ -0,0 +1,278 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package json
import (
rand2 "crypto/rand"
"encoding/json"
"fmt"
"math"
"math/rand"
"strconv"
"strings"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type ReaderSuite struct {
suite.Suite
numRows int
pkDataType schemapb.DataType
vecDataType schemapb.DataType
}
func (suite *ReaderSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
}
func (suite *ReaderSuite) SetupTest() {
// default suite params
suite.numRows = 100
suite.pkDataType = schemapb.DataType_Int64
suite.vecDataType = schemapb.DataType_FloatVector
}
func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData {
insertData, err := storage.NewInsertData(schema)
assert.NoError(t, err)
for _, field := range schema.GetFields() {
switch field.GetDataType() {
case schemapb.DataType_Bool:
boolData := make([]bool, 0)
for i := 0; i < rowCount; i++ {
boolData = append(boolData, i%3 != 0)
}
insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData}
case schemapb.DataType_Float:
floatData := make([]float32, 0)
for i := 0; i < rowCount; i++ {
floatData = append(floatData, float32(i/2))
}
insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData}
case schemapb.DataType_Double:
doubleData := make([]float64, 0)
for i := 0; i < rowCount; i++ {
doubleData = append(doubleData, float64(i/5))
}
insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData}
case schemapb.DataType_Int8:
int8Data := make([]int8, 0)
for i := 0; i < rowCount; i++ {
int8Data = append(int8Data, int8(i%256))
}
insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data}
case schemapb.DataType_Int16:
int16Data := make([]int16, 0)
for i := 0; i < rowCount; i++ {
int16Data = append(int16Data, int16(i%65536))
}
insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data}
case schemapb.DataType_Int32:
int32Data := make([]int32, 0)
for i := 0; i < rowCount; i++ {
int32Data = append(int32Data, int32(i%1000))
}
insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data}
case schemapb.DataType_Int64:
int64Data := make([]int64, 0)
for i := 0; i < rowCount; i++ {
int64Data = append(int64Data, int64(i))
}
insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data}
case schemapb.DataType_BinaryVector:
dim, err := typeutil.GetDim(field)
assert.NoError(t, err)
binVecData := make([]byte, 0)
total := rowCount * int(dim) / 8
for i := 0; i < total; i++ {
binVecData = append(binVecData, byte(i%256))
}
insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: int(dim)}
case schemapb.DataType_FloatVector:
dim, err := typeutil.GetDim(field)
assert.NoError(t, err)
floatVecData := make([]float32, 0)
total := rowCount * int(dim)
for i := 0; i < total; i++ {
floatVecData = append(floatVecData, rand.Float32())
}
insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: int(dim)}
case schemapb.DataType_Float16Vector:
dim, err := typeutil.GetDim(field)
assert.NoError(t, err)
total := int64(rowCount) * dim * 2
float16VecData := make([]byte, total)
_, err = rand2.Read(float16VecData)
assert.NoError(t, err)
insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: int(dim)}
case schemapb.DataType_String, schemapb.DataType_VarChar:
varcharData := make([]string, 0)
for i := 0; i < rowCount; i++ {
varcharData = append(varcharData, strconv.Itoa(i))
}
insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData}
case schemapb.DataType_JSON:
jsonData := make([][]byte, 0)
for i := 0; i < rowCount; i++ {
jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i)))
}
insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData}
case schemapb.DataType_Array:
arrayData := make([]*schemapb.ScalarField, 0)
for i := 0; i < rowCount; i++ {
arrayData = append(arrayData, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{int32(i), int32(i + 1), int32(i + 2)},
},
},
})
}
insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData}
default:
panic(fmt.Sprintf("unexpected data type: %s", field.GetDataType().String()))
}
}
return insertData
}
func (suite *ReaderSuite) run(dt schemapb.DataType) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
DataType: suite.pkDataType,
},
{
FieldID: 101,
Name: "vec",
DataType: suite.vecDataType,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "8",
},
},
},
{
FieldID: 102,
Name: dt.String(),
DataType: dt,
ElementType: schemapb.DataType_Int32,
},
},
}
insertData := createInsertData(suite.T(), schema, suite.numRows)
rows := make([]map[string]any, 0, suite.numRows)
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
for i := 0; i < insertData.GetRowNum(); i++ {
data := make(map[int64]interface{})
for fieldID, v := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
if dataType == schemapb.DataType_Array {
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
} else if dataType == schemapb.DataType_JSON {
data[fieldID] = string(v.GetRow(i).([]byte))
} else if dataType == schemapb.DataType_BinaryVector || dataType == schemapb.DataType_Float16Vector {
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {
ints = append(ints, int(b))
}
data[fieldID] = ints
} else {
data[fieldID] = v.GetRow(i)
}
}
row := lo.MapKeys(data, func(_ any, fieldID int64) string {
return fieldIDToField[fieldID].GetName()
})
rows = append(rows, row)
}
jsonBytes, err := json.Marshal(rows)
suite.NoError(err)
r := strings.NewReader(string(jsonBytes))
reader, err := NewReader(r, schema, math.MaxInt)
suite.NoError(err)
checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) {
expectInsertData := insertData
for fieldID, data := range actualInsertData.Data {
suite.Equal(expectRows, data.RowNum())
fieldDataType := typeutil.GetField(schema, fieldID).GetDataType()
for i := 0; i < expectRows; i++ {
expect := expectInsertData.Data[fieldID].GetRow(i + offsetBegin)
actual := data.GetRow(i)
if fieldDataType == schemapb.DataType_Array {
suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData()))
} else {
suite.Equal(expect, actual)
}
}
}
}
res, err := reader.Read()
suite.NoError(err)
checkFn(res, 0, suite.numRows)
}
func (suite *ReaderSuite) TestReadScalarFields() {
suite.run(schemapb.DataType_Bool)
suite.run(schemapb.DataType_Int8)
suite.run(schemapb.DataType_Int16)
suite.run(schemapb.DataType_Int32)
suite.run(schemapb.DataType_Int64)
suite.run(schemapb.DataType_Float)
suite.run(schemapb.DataType_Double)
suite.run(schemapb.DataType_VarChar)
suite.run(schemapb.DataType_Array)
suite.run(schemapb.DataType_JSON)
}
func (suite *ReaderSuite) TestStringPK() {
suite.pkDataType = schemapb.DataType_VarChar
suite.run(schemapb.DataType_Int32)
}
func (suite *ReaderSuite) TestBinaryAndFloat16Vector() {
suite.vecDataType = schemapb.DataType_BinaryVector
suite.run(schemapb.DataType_Int32)
suite.vecDataType = schemapb.DataType_Float16Vector
suite.run(schemapb.DataType_Int32)
}
func TestUtil(t *testing.T) {
suite.Run(t, new(ReaderSuite))
}

View File

@ -0,0 +1,456 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package json
import (
"encoding/json"
"fmt"
"strconv"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type RowParser interface {
Parse(raw any) (Row, error)
}
type rowParser struct {
dim int
id2Field map[int64]*schemapb.FieldSchema
name2FieldID map[string]int64
pkField *schemapb.FieldSchema
dynamicField *schemapb.FieldSchema
}
func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
id2Field := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
vecField, err := typeutil.GetVectorFieldSchema(schema)
if err != nil {
return nil, err
}
dim, err := typeutil.GetDim(vecField)
if err != nil {
return nil, err
}
pkField, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
return nil, err
}
name2FieldID := lo.SliceToMap(schema.GetFields(),
func(field *schemapb.FieldSchema) (string, int64) {
return field.GetName(), field.GetFieldID()
})
if pkField.GetAutoID() {
delete(name2FieldID, pkField.GetName())
}
dynamicField := typeutil.GetDynamicField(schema)
if dynamicField != nil {
delete(name2FieldID, dynamicField.GetName())
}
return &rowParser{
dim: int(dim),
id2Field: id2Field,
name2FieldID: name2FieldID,
pkField: pkField,
dynamicField: dynamicField,
}, nil
}
func (r *rowParser) wrapTypeError(v any, fieldID int64) error {
field := r.id2Field[fieldID]
return merr.WrapErrImportFailed(fmt.Sprintf("expected type '%s' for field '%s', got type '%T' with value '%v'",
field.GetDataType().String(), field.GetName(), v, v))
}
func (r *rowParser) wrapDimError(actualDim int, fieldID int64) error {
field := r.id2Field[fieldID]
return merr.WrapErrImportFailed(fmt.Sprintf("expected dim '%d' for field '%s' with type '%s', got dim '%d'",
r.dim, field.GetName(), field.GetDataType().String(), actualDim))
}
func (r *rowParser) wrapArrayValueTypeError(v any, eleType schemapb.DataType) error {
return merr.WrapErrImportFailed(fmt.Sprintf("expected element type '%s' in array field, got type '%T' with value '%v'",
eleType.String(), v, v))
}
func (r *rowParser) Parse(raw any) (Row, error) {
stringMap, ok := raw.(map[string]any)
if !ok {
return nil, merr.WrapErrImportFailed("invalid JSON format, each row should be a key-value map")
}
if _, ok = stringMap[r.pkField.GetName()]; ok && r.pkField.GetAutoID() {
return nil, merr.WrapErrImportFailed(
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", r.pkField.GetName()))
}
dynamicValues := make(map[string]any)
row := make(Row)
for key, value := range stringMap {
if fieldID, ok := r.name2FieldID[key]; ok {
data, err := r.parseEntity(fieldID, value)
if err != nil {
return nil, err
}
row[fieldID] = data
} else if r.dynamicField != nil {
if key == r.dynamicField.GetName() {
return nil, merr.WrapErrImportFailed(
fmt.Sprintf("dynamic field is enabled, explicit specification of '%s' is not allowed", key))
}
// has dynamic field, put redundant pair to dynamicValues
dynamicValues[key] = value
} else {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field '%s' is not defined in schema", key))
}
}
for fieldName, fieldID := range r.name2FieldID {
if _, ok = row[fieldID]; !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("value of field '%s' is missed", fieldName))
}
}
if r.dynamicField == nil {
return row, nil
}
// combine the redundant pairs into dynamic field(if it has)
err := r.combineDynamicRow(dynamicValues, row)
if err != nil {
return nil, err
}
return row, err
}
func (r *rowParser) combineDynamicRow(dynamicValues map[string]any, row Row) error {
// Combine the dynamic field value
// invalid inputs:
// case 1: {"id": 1, "vector": [], "$meta": {"x": 8}} ==>> "$meta" is not allowed
// valid inputs:
// case 2: {"id": 1, "vector": [], "x": 8} ==>> {"id": 1, "vector": [], "$meta": "{\"x\": 8}"}
// case 3: {"id": 1, "vector": []}
dynamicFieldID := r.dynamicField.GetFieldID()
if len(dynamicValues) > 0 {
// case 2
data, err := r.parseEntity(dynamicFieldID, dynamicValues)
if err != nil {
return err
}
row[dynamicFieldID] = data
} else {
// case 3
row[dynamicFieldID] = "{}"
}
return nil
}
func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
switch r.id2Field[fieldID].GetDataType() {
case schemapb.DataType_Bool:
b, ok := obj.(bool)
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
return b, nil
case schemapb.DataType_Int8:
value, ok := obj.(json.Number)
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
num, err := strconv.ParseInt(value.String(), 0, 8)
if err != nil {
return nil, err
}
return int8(num), nil
case schemapb.DataType_Int16:
value, ok := obj.(json.Number)
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
num, err := strconv.ParseInt(value.String(), 0, 16)
if err != nil {
return nil, err
}
return int16(num), nil
case schemapb.DataType_Int32:
value, ok := obj.(json.Number)
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
num, err := strconv.ParseInt(value.String(), 0, 32)
if err != nil {
return nil, err
}
return int32(num), nil
case schemapb.DataType_Int64:
value, ok := obj.(json.Number)
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
num, err := strconv.ParseInt(value.String(), 0, 64)
if err != nil {
return nil, err
}
return num, nil
case schemapb.DataType_Float:
value, ok := obj.(json.Number)
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
num, err := strconv.ParseFloat(value.String(), 32)
if err != nil {
return nil, err
}
return float32(num), nil
case schemapb.DataType_Double:
value, ok := obj.(json.Number)
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
num, err := strconv.ParseFloat(value.String(), 64)
if err != nil {
return nil, err
}
return num, nil
case schemapb.DataType_BinaryVector:
arr, ok := obj.([]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
if len(arr)*8 != r.dim {
return nil, r.wrapDimError(len(arr)*8, fieldID)
}
vec := make([]byte, len(arr))
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapTypeError(arr[i], fieldID)
}
num, err := strconv.ParseUint(value.String(), 0, 8)
if err != nil {
return nil, err
}
vec[i] = byte(num)
}
return vec, nil
case schemapb.DataType_FloatVector:
arr, ok := obj.([]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
if len(arr) != r.dim {
return nil, r.wrapDimError(len(arr), fieldID)
}
vec := make([]float32, len(arr))
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapTypeError(arr[i], fieldID)
}
num, err := strconv.ParseFloat(value.String(), 32)
if err != nil {
return nil, err
}
vec[i] = float32(num)
}
return vec, nil
case schemapb.DataType_Float16Vector:
arr, ok := obj.([]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
if len(arr)/2 != r.dim {
return nil, r.wrapDimError(len(arr)/2, fieldID)
}
vec := make([]byte, len(arr))
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapTypeError(arr[i], fieldID)
}
num, err := strconv.ParseUint(value.String(), 0, 8)
if err != nil {
return nil, err
}
vec[i] = byte(num)
}
return vec, nil
case schemapb.DataType_String, schemapb.DataType_VarChar:
value, ok := obj.(string)
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
return value, nil
case schemapb.DataType_JSON:
// for JSON data, we accept two kinds input: string and map[string]interface
// user can write JSON content as {"FieldJSON": "{\"x\": 8}"} or {"FieldJSON": {"x": 8}}
if value, ok := obj.(string); ok {
var dummy interface{}
err := json.Unmarshal([]byte(value), &dummy)
if err != nil {
return nil, err
}
return []byte(value), nil
} else if mp, ok := obj.(map[string]interface{}); ok {
bs, err := json.Marshal(mp)
if err != nil {
return nil, err
}
return bs, nil
} else {
return nil, r.wrapTypeError(obj, fieldID)
}
case schemapb.DataType_Array:
arr, ok := obj.([]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
scalarFieldData, err := r.arrayToFieldData(arr, r.id2Field[fieldID].GetElementType())
if err != nil {
return nil, err
}
return scalarFieldData, nil
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("parse json failed, unsupport data type: %s",
r.id2Field[fieldID].GetDataType().String()))
}
}
func (r *rowParser) arrayToFieldData(arr []interface{}, eleType schemapb.DataType) (*schemapb.ScalarField, error) {
switch eleType {
case schemapb.DataType_Bool:
values := make([]bool, 0)
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(bool)
if !ok {
return nil, r.wrapArrayValueTypeError(arr, eleType)
}
values = append(values, value)
}
return &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: values,
},
},
}, nil
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
values := make([]int32, 0)
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapArrayValueTypeError(arr, eleType)
}
num, err := strconv.ParseInt(value.String(), 0, 32)
if err != nil {
return nil, err
}
values = append(values, int32(num))
}
return &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: values,
},
},
}, nil
case schemapb.DataType_Int64:
values := make([]int64, 0)
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapArrayValueTypeError(arr, eleType)
}
num, err := strconv.ParseInt(value.String(), 0, 64)
if err != nil {
return nil, err
}
values = append(values, num)
}
return &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: values,
},
},
}, nil
case schemapb.DataType_Float:
values := make([]float32, 0)
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapArrayValueTypeError(arr, eleType)
}
num, err := strconv.ParseFloat(value.String(), 32)
if err != nil {
return nil, err
}
values = append(values, float32(num))
}
return &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: values,
},
},
}, nil
case schemapb.DataType_Double:
values := make([]float64, 0)
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapArrayValueTypeError(arr, eleType)
}
num, err := strconv.ParseFloat(value.String(), 64)
if err != nil {
return nil, err
}
values = append(values, num)
}
return &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: values,
},
},
}, nil
case schemapb.DataType_VarChar, schemapb.DataType_String:
values := make([]string, 0)
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(string)
if !ok {
return nil, r.wrapArrayValueTypeError(arr, eleType)
}
values = append(values, value)
}
return &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: values,
},
},
}, nil
default:
return nil, errors.New(fmt.Sprintf("unsupported array data type '%s'", eleType.String()))
}
}

View File

@ -816,6 +816,16 @@ func GetPartitionKeyFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.Fi
return nil, errors.New("partition key field is not found")
}
// GetDynamicField returns the dynamic field if it exists.
func GetDynamicField(schema *schemapb.CollectionSchema) *schemapb.FieldSchema {
for _, fieldSchema := range schema.GetFields() {
if fieldSchema.GetIsDynamic() {
return fieldSchema
}
}
return nil
}
// HasPartitionKey check if a collection schema has PartitionKey field
func HasPartitionKey(schema *schemapb.CollectionSchema) bool {
for _, fieldSchema := range schema.Fields {