From 3dc2585d9ba9aeab23fb2e8b20671aa45965196a Mon Sep 17 00:00:00 2001 From: PowderLi <135960789+PowderLi@users.noreply.github.com> Date: Sun, 21 Jan 2024 14:00:56 +0800 Subject: [PATCH] enhance: support dataType: array & json (#30077) issue: #30075 master pr: #30076 deal with the array field data correctly Signed-off-by: PowderLi --- .../proxy/httpserver/handler_v1_test.go | 5 +- .../distributed/proxy/httpserver/utils.go | 129 ++++++++++- .../proxy/httpserver/utils_test.go | 211 ++++++++++++++++-- 3 files changed, 321 insertions(+), 24 deletions(-) diff --git a/internal/distributed/proxy/httpserver/handler_v1_test.go b/internal/distributed/proxy/httpserver/handler_v1_test.go index 544d145328..cfd7c65157 100644 --- a/internal/distributed/proxy/httpserver/handler_v1_test.go +++ b/internal/distributed/proxy/httpserver/handler_v1_test.go @@ -761,6 +761,7 @@ func TestInsertForDataType(t *testing.T) { "[success]kinds of data type": newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false)), "[success]use binary vector": newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, true)), "[success]with dynamic field": withDynamicField(newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false))), + "[success]with array fields": withArrayField(newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false))), } for name, schema := range schemas { t.Run(name, func(t *testing.T) { @@ -791,9 +792,7 @@ func TestInsertForDataType(t *testing.T) { assert.Equal(t, "{\"code\":200,\"data\":{\"insertCount\":3,\"insertIds\":[1,2,3]}}", w.Body.String()) }) } - schemas = map[string]*schemapb.CollectionSchema{ - "with unsupport field type": withUnsupportField(newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false))), - } + schemas = map[string]*schemapb.CollectionSchema{} for name, schema := range schemas { t.Run(name, func(t *testing.T) { mp := mocks.NewMockProxy(t) diff --git a/internal/distributed/proxy/httpserver/utils.go b/internal/distributed/proxy/httpserver/utils.go index fd2ec60af0..2384ff2cee 100644 --- a/internal/distributed/proxy/httpserver/utils.go +++ b/internal/distributed/proxy/httpserver/utils.go @@ -244,6 +244,113 @@ func checkAndSetData(body string, collSchema *schemapb.CollectionSchema) (error, return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray } reallyData[fieldName] = result + case schemapb.DataType_Array: + switch field.ElementType { + case schemapb.DataType_Bool: + arr := make([]bool, 0) + err := json.Unmarshal([]byte(dataString), &arr) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: arr, + }, + }, + } + case schemapb.DataType_Int8: + arr := make([]int32, 0) + err := json.Unmarshal([]byte(dataString), &arr) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: arr, + }, + }, + } + case schemapb.DataType_Int16: + arr := make([]int32, 0) + err := json.Unmarshal([]byte(dataString), &arr) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: arr, + }, + }, + } + case schemapb.DataType_Int32: + arr := make([]int32, 0) + err := json.Unmarshal([]byte(dataString), &arr) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: arr, + }, + }, + } + case schemapb.DataType_Int64: + arr := make([]int64, 0) + err := json.Unmarshal([]byte(dataString), &arr) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: arr, + }, + }, + } + case schemapb.DataType_Float: + arr := make([]float32, 0) + err := json.Unmarshal([]byte(dataString), &arr) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: arr, + }, + }, + } + case schemapb.DataType_Double: + arr := make([]float64, 0) + err := json.Unmarshal([]byte(dataString), &arr) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: arr, + }, + }, + } + case schemapb.DataType_VarChar: + arr := make([]string, 0) + err := json.Unmarshal([]byte(dataString), &arr) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: arr, + }, + }, + } + } case schemapb.DataType_JSON: reallyData[fieldName] = []byte(dataString) case schemapb.DataType_Float: @@ -429,6 +536,8 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema) data = make([]string, 0, rowsLen) case schemapb.DataType_VarChar: data = make([]string, 0, rowsLen) + case schemapb.DataType_Array: + data = make([]*schemapb.ScalarField, 0, rowsLen) case schemapb.DataType_JSON: data = make([][]byte, 0, rowsLen) case schemapb.DataType_FloatVector: @@ -491,6 +600,8 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema) nameColumns[field.Name] = append(nameColumns[field.Name].([]string), candi.v.Interface().(string)) case schemapb.DataType_VarChar: nameColumns[field.Name] = append(nameColumns[field.Name].([]string), candi.v.Interface().(string)) + case schemapb.DataType_Array: + nameColumns[field.Name] = append(nameColumns[field.Name].([]*schemapb.ScalarField), candi.v.Interface().(*schemapb.ScalarField)) case schemapb.DataType_JSON: nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), candi.v.Interface().([]byte)) case schemapb.DataType_FloatVector: @@ -610,11 +721,21 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema) }, }, } + case schemapb.DataType_Array: + colData.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: column.([]*schemapb.ScalarField), + }, + }, + }, + } case schemapb.DataType_JSON: colData.Field = &schemapb.FieldData_Scalars{ Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_BytesData{ - BytesData: &schemapb.BytesArray{ + Data: &schemapb.ScalarField_JsonData{ + JsonData: &schemapb.JSONArray{ Data: column.([][]byte), }, }, @@ -747,10 +868,10 @@ func buildQueryResp(rowsNum int64, needFields []string, fieldDataList []*schemap rowsNum = int64(len(fieldDataList[0].GetScalars().GetStringData().Data)) case schemapb.DataType_VarChar: rowsNum = int64(len(fieldDataList[0].GetScalars().GetStringData().Data)) - case schemapb.DataType_JSON: - rowsNum = int64(len(fieldDataList[0].GetScalars().GetJsonData().Data)) case schemapb.DataType_Array: rowsNum = int64(len(fieldDataList[0].GetScalars().GetArrayData().Data)) + case schemapb.DataType_JSON: + rowsNum = int64(len(fieldDataList[0].GetScalars().GetJsonData().Data)) case schemapb.DataType_BinaryVector: rowsNum = int64(len(fieldDataList[0].GetVectors().GetBinaryVector())*8) / fieldDataList[0].GetVectors().GetDim() case schemapb.DataType_FloatVector: diff --git a/internal/distributed/proxy/httpserver/utils_test.go b/internal/distributed/proxy/httpserver/utils_test.go index 81f3f84326..187e269b76 100644 --- a/internal/distributed/proxy/httpserver/utils_test.go +++ b/internal/distributed/proxy/httpserver/utils_test.go @@ -2,6 +2,7 @@ package httpserver import ( "strconv" + "strings" "testing" "github.com/gin-gonic/gin" @@ -428,7 +429,9 @@ func compareRow(m1 map[string]interface{}, m2 map[string]interface{}) bool { } for key, value := range m2 { - if (key == FieldBookIntro) || (key == "field-binary") || (key == "field-json") { + if (key == FieldBookIntro) || (key == "field-binary") || (key == "field-json") || (key == "field-array") { + continue + } else if strings.HasPrefix(key, "array-") { continue } else if value != m1[key] { return false @@ -518,21 +521,11 @@ func newCollectionSchema(coll *schemapb.CollectionSchema) *schemapb.CollectionSc } coll.Fields = append(coll.Fields, &fieldSchema9) - //fieldSchema10 := schemapb.FieldSchema{ - // Name: "$meta", - // DataType: schemapb.DataType_JSON, - // IsDynamic: true, - //} - //coll.Fields = append(coll.Fields, &fieldSchema10) - - return coll -} - -func withUnsupportField(coll *schemapb.CollectionSchema) *schemapb.CollectionSchema { fieldSchema10 := schemapb.FieldSchema{ - Name: "field-array", - DataType: schemapb.DataType_Array, - IsDynamic: false, + Name: "field-array", + DataType: schemapb.DataType_Array, + IsDynamic: false, + ElementType: schemapb.DataType_Bool, } coll.Fields = append(coll.Fields, &fieldSchema10) @@ -550,6 +543,58 @@ func withDynamicField(coll *schemapb.CollectionSchema) *schemapb.CollectionSchem return coll } +func withArrayField(coll *schemapb.CollectionSchema) *schemapb.CollectionSchema { + fieldSchema0 := schemapb.FieldSchema{ + Name: "array-bool", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Bool, + } + coll.Fields = append(coll.Fields, &fieldSchema0) + fieldSchema1 := schemapb.FieldSchema{ + Name: "array-int8", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int8, + } + coll.Fields = append(coll.Fields, &fieldSchema1) + fieldSchema2 := schemapb.FieldSchema{ + Name: "array-int16", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int16, + } + coll.Fields = append(coll.Fields, &fieldSchema2) + fieldSchema3 := schemapb.FieldSchema{ + Name: "array-int32", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int32, + } + coll.Fields = append(coll.Fields, &fieldSchema3) + fieldSchema4 := schemapb.FieldSchema{ + Name: "array-int64", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int64, + } + coll.Fields = append(coll.Fields, &fieldSchema4) + fieldSchema5 := schemapb.FieldSchema{ + Name: "array-float", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Float, + } + coll.Fields = append(coll.Fields, &fieldSchema5) + fieldSchema6 := schemapb.FieldSchema{ + Name: "array-double", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Double, + } + coll.Fields = append(coll.Fields, &fieldSchema6) + fieldSchema7 := schemapb.FieldSchema{ + Name: "array-varchar", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_VarChar, + } + coll.Fields = append(coll.Fields, &fieldSchema7) + return coll +} + func newFieldData(fieldDatas []*schemapb.FieldData, firstFieldType schemapb.DataType) []*schemapb.FieldData { fieldData1 := schemapb.FieldData{ Type: schemapb.DataType_Bool, @@ -783,6 +828,15 @@ func newSearchResult(results []map[string]interface{}) []map[string]interface{} result["field-string"] = strconv.Itoa(i) result["field-binary"] = []byte{byte(i)} result["field-json"] = []byte(`{"XXX": 0}`) + result["field-array"] = []bool{true} + result["array-bool"] = []bool{true} + result["array-int8"] = []int32{0} + result["array-int16"] = []int32{0} + result["array-int32"] = []int32{0} + result["array-int64"] = []int64{0} + result["array-float"] = []float32{0} + result["array-double"] = []float64{0} + result["array-varchar"] = []string{""} result["XXX"] = float64(i) result["YYY"] = strconv.Itoa(i) results[i] = result @@ -790,10 +844,133 @@ func newSearchResult(results []map[string]interface{}) []map[string]interface{} return results } +func newCollectionSchemaWithArray(coll *schemapb.CollectionSchema) *schemapb.CollectionSchema { + fieldSchema1 := schemapb.FieldSchema{ + Name: "array-bool", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Bool, + } + coll.Fields = append(coll.Fields, &fieldSchema1) + + fieldSchema2 := schemapb.FieldSchema{ + Name: "array-int8", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int8, + } + coll.Fields = append(coll.Fields, &fieldSchema2) + + fieldSchema3 := schemapb.FieldSchema{ + Name: "array-int16", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int16, + } + coll.Fields = append(coll.Fields, &fieldSchema3) + + fieldSchema4 := schemapb.FieldSchema{ + Name: "array-int32", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int32, + } + coll.Fields = append(coll.Fields, &fieldSchema4) + + fieldSchema5 := schemapb.FieldSchema{ + Name: "array-int64", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int64, + } + coll.Fields = append(coll.Fields, &fieldSchema5) + + fieldSchema6 := schemapb.FieldSchema{ + Name: "array-float", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Float, + } + coll.Fields = append(coll.Fields, &fieldSchema6) + + fieldSchema7 := schemapb.FieldSchema{ + Name: "array-double", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Double, + } + coll.Fields = append(coll.Fields, &fieldSchema7) + + fieldSchema8 := schemapb.FieldSchema{ + Name: "array-varchar", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_String, + } + coll.Fields = append(coll.Fields, &fieldSchema8) + + return coll +} + +func newSearchResultWithArray(results []map[string]interface{}) []map[string]interface{} { + for i, result := range results { + result["array-bool"] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: []bool{true}, + }, + }, + } + result["array-int8"] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: []int32{0}, + }, + }, + } + result["array-int16"] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: []int32{0}, + }, + }, + } + result["array-int32"] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: []int32{0}, + }, + }, + } + result["array-int64"] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: []int64{0}, + }, + }, + } + result["array-float"] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: []float32{0}, + }, + }, + } + result["array-double"] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: []float64{0}, + }, + }, + } + result["array-varchar"] = &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: []string{""}, + }, + }, + } + results[i] = result + } + return results +} + func TestAnyToColumn(t *testing.T) { - data, err := anyToColumns(newSearchResult(generateSearchResult(schemapb.DataType_Int64)), newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64, false))) + data, err := anyToColumns(newSearchResultWithArray(generateSearchResult(schemapb.DataType_Int64)), newCollectionSchemaWithArray(generateCollectionSchema(schemapb.DataType_Int64, false))) assert.Equal(t, nil, err) - assert.Equal(t, 13, len(data)) + assert.Equal(t, 12, len(data)) } func TestBuildQueryResps(t *testing.T) {