From 1b67cecd6595322f7eae0304be5bddc6c2343e87 Mon Sep 17 00:00:00 2001 From: Buqian Zheng Date: Mon, 27 May 2024 00:47:40 +0800 Subject: [PATCH] enhance: add sparse float vector support to restful v2 (#33231) issue: #29419 also re-enabled an e2e test using restful api, which is previously disabled due to https://github.com/milvus-io/milvus/issues/32214. In restful api, the accepted json formats of sparse float vector are: * `{"indices": [1, 100, 1000], "values": [0.1, 0.2, 0.3]}` * {"1": 0.1, "100": 0.2, "1000": 0.3} for accepted indice and value range, see https://milvus.io/docs/sparse_vector.md#FAQ Signed-off-by: Buqian Zheng --- .../proxy/httpserver/handler_v2.go | 5 +- .../proxy/httpserver/handler_v2_test.go | 39 +++++- .../distributed/proxy/httpserver/utils.go | 51 +++++++ .../proxy/httpserver/utils_test.go | 130 +++++++++++------- .../proxy/httpserver/wrap_request.go | 35 +++++ .../proxy/httpserver/wrap_request_test.go | 95 +++++++++++++ pkg/util/typeutil/schema.go | 99 ++++++++----- pkg/util/typeutil/schema_test.go | 41 +++++- .../testcases/test_vector_operations.py | 35 ++--- 9 files changed, 422 insertions(+), 108 deletions(-) diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index ba97bc5fa4..dc457e9b47 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -848,7 +848,10 @@ func generatePlaceholderGroup(ctx context.Context, body string, collSchema *sche if vectorField == nil { return nil, errors.New("cannot find a vector field named: " + fieldName) } - dim, _ := getDim(vectorField) + dim := int64(0) + if !typeutil.IsSparseFloatVectorType(vectorField.DataType) { + dim, _ = getDim(vectorField) + } phv, err := convertVectors2Placeholder(body, vectorField.DataType, dim) if err != nil { return nil, err diff --git a/internal/distributed/proxy/httpserver/handler_v2_test.go b/internal/distributed/proxy/httpserver/handler_v2_test.go index 13cc65cee5..ab6f71315e 100644 --- a/internal/distributed/proxy/httpserver/handler_v2_test.go +++ b/internal/distributed/proxy/httpserver/handler_v2_test.go @@ -471,11 +471,11 @@ func TestDatabaseWrapper(t *testing.T) { func TestCreateCollection(t *testing.T) { postTestCases := []requestBodyTestCase{} mp := mocks.NewMockProxy(t) - mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(11) + mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(12) mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6) mp.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6) mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Twice() - mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Once() + mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Twice() testEngine := initHTTPServerV2(mp, false) path := versionalV2(CollectionCategory, CreateAction) // quickly create collection @@ -564,6 +564,18 @@ func TestCreateCollection(t *testing.T) { ] }}`), }) + // dim should not be specified for SparseFloatVector field + postTestCases = append(postTestCases, requestBodyTestCase{ + path: path, + requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `", "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": true, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": false, "elementTypeParams": {}}, + {"fieldName": "partition_field", "dataType": "VarChar", "isPartitionKey": true, "elementTypeParams": {"max_length": 256}}, + {"fieldName": "book_intro", "dataType": "SparseFloatVector", "elementTypeParams": {}} + ] + }, "params": {"partitionsNum": "32"}}`), + }) postTestCases = append(postTestCases, requestBodyTestCase{ path: path, requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `", "schema": { @@ -612,6 +624,18 @@ func TestCreateCollection(t *testing.T) { errMsg: "", errCode: 65535, }) + postTestCases = append(postTestCases, requestBodyTestCase{ + path: path, + requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `", "schema": { + "fields": [ + {"fieldName": "book_id", "dataType": "Int64", "isPrimary": true, "elementTypeParams": {}}, + {"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}}, + {"fieldName": "book_intro", "dataType": "SparseFloatVector", "elementTypeParams": {"dim": 2}} + ] + }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]}`), + errMsg: "", + errCode: 65535, + }) for _, testcase := range postTestCases { t.Run("post"+testcase.path, func(t *testing.T) { @@ -1240,16 +1264,19 @@ func TestSearchV2(t *testing.T) { float16VectorField.Name = "float16Vector" bfloat16VectorField := generateVectorFieldSchema(schemapb.DataType_BFloat16Vector) bfloat16VectorField.Name = "bfloat16Vector" + sparseFloatVectorField := generateVectorFieldSchema(schemapb.DataType_SparseFloatVector) + sparseFloatVectorField.Name = "sparseFloatVector" collSchema.Fields = append(collSchema.Fields, &binaryVectorField) collSchema.Fields = append(collSchema.Fields, &float16VectorField) collSchema.Fields = append(collSchema.Fields, &bfloat16VectorField) + collSchema.Fields = append(collSchema.Fields, &sparseFloatVectorField) mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ CollectionName: DefaultCollectionName, Schema: collSchema, ShardsNum: ShardNumDefault, Status: &StatusSuccess, - }, nil).Times(9) - mp.EXPECT().Search(mock.Anything, mock.Anything).Return(&milvuspb.SearchResults{Status: commonSuccessStatus, Results: &schemapb.SearchResultData{TopK: int64(0)}}, nil).Twice() + }, nil).Times(10) + mp.EXPECT().Search(mock.Anything, mock.Anything).Return(&milvuspb.SearchResults{Status: commonSuccessStatus, Results: &schemapb.SearchResultData{TopK: int64(0)}}, nil).Times(3) testEngine := initHTTPServerV2(mp, false) queryTestCases := []requestBodyTestCase{} queryTestCases = append(queryTestCases, requestBodyTestCase{ @@ -1377,6 +1404,10 @@ func TestSearchV2(t *testing.T) { errMsg: "can only accept json format request, error: dimension: 2, bytesLen: 4, but length of []byte: 3: invalid parameter[expected=BFloat16Vector][actual=\x01\x02\x03]", errCode: 1801, }) + queryTestCases = append(queryTestCases, requestBodyTestCase{ + path: SearchAction, + requestBody: []byte(`{"collectionName": "book", "data": [{"1": 0.1}], "annsField": "sparseFloatVector", "filter": "book_id in [2, 4, 6, 8]", "limit": 4, "outputFields": ["word_count"]}`), + }) for _, testcase := range queryTestCases { t.Run("search", func(t *testing.T) { diff --git a/internal/distributed/proxy/httpserver/utils.go b/internal/distributed/proxy/httpserver/utils.go index c57e98d4eb..a42c165cf5 100644 --- a/internal/distributed/proxy/httpserver/utils.go +++ b/internal/distributed/proxy/httpserver/utils.go @@ -248,6 +248,15 @@ func checkAndSetData(body string, collSchema *schemapb.CollectionSchema) (error, return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray } reallyData[fieldName] = vectorArray + case schemapb.DataType_SparseFloatVector: + if dataString == "" { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], "", "missing vector field: "+fieldName), reallyDataArray + } + sparseVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(dataString)) + if err != nil { + return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray + } + reallyData[fieldName] = sparseVec case schemapb.DataType_Float16Vector: if dataString == "" { return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], "", "missing vector field: "+fieldName), reallyDataArray @@ -638,6 +647,9 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema) data = make([][]byte, 0, rowsLen) dim, _ := getDim(field) nameDims[field.Name] = dim + case schemapb.DataType_SparseFloatVector: + data = make([][]byte, 0, rowsLen) + nameDims[field.Name] = int64(0) default: return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", field.DataType, field.Name) } @@ -704,6 +716,13 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema) nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), candi.v.Interface().([]byte)) case schemapb.DataType_BFloat16Vector: nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), candi.v.Interface().([]byte)) + case schemapb.DataType_SparseFloatVector: + content := candi.v.Interface().([]byte) + rowSparseDim := typeutil.SparseFloatRowDim(content) + if rowSparseDim > nameDims[field.Name] { + nameDims[field.Name] = rowSparseDim + } + nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), content) default: return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", field.DataType, field.Name) } @@ -895,6 +914,18 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema) }, }, } + case schemapb.DataType_SparseFloatVector: + colData.Field = &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: nameDims[name], + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: nameDims[name], + Contents: column.([][]byte), + }, + }, + }, + } default: return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", colData.Type, name) } @@ -963,6 +994,19 @@ func serializeByteVectors(vectorStr string, dataType schemapb.DataType, dimensio return values, nil } +func serializeSparseFloatVectors(vectors []gjson.Result, dataType schemapb.DataType) ([][]byte, error) { + values := make([][]byte, 0) + for _, vector := range vectors { + vectorBytes := []byte(vector.String()) + sparseVector, err := typeutil.CreateSparseFloatRowFromJSON(vectorBytes) + if err != nil { + return nil, merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(dataType)], vector.String(), err.Error()) + } + values = append(values, sparseVector) + } + return values, nil +} + func convertVectors2Placeholder(body string, dataType schemapb.DataType, dimension int64) (*commonpb.PlaceholderValue, error) { var valueType commonpb.PlaceholderType var values [][]byte @@ -980,6 +1024,9 @@ func convertVectors2Placeholder(body string, dataType schemapb.DataType, dimensi case schemapb.DataType_BFloat16Vector: valueType = commonpb.PlaceholderType_BFloat16Vector values, err = serializeByteVectors(gjson.Get(body, HTTPRequestData).Raw, dataType, dimension, dimension*2) + case schemapb.DataType_SparseFloatVector: + valueType = commonpb.PlaceholderType_SparseFloatVector + values, err = serializeSparseFloatVectors(gjson.Get(body, HTTPRequestData).Array(), dataType) } if err != nil { return nil, err @@ -1070,6 +1117,8 @@ func buildQueryResp(rowsNum int64, needFields []string, fieldDataList []*schemap rowsNum = int64(len(fieldDataList[0].GetVectors().GetFloat16Vector())/2) / fieldDataList[0].GetVectors().GetDim() case schemapb.DataType_BFloat16Vector: rowsNum = int64(len(fieldDataList[0].GetVectors().GetBfloat16Vector())/2) / fieldDataList[0].GetVectors().GetDim() + case schemapb.DataType_SparseFloatVector: + rowsNum = int64(len(fieldDataList[0].GetVectors().GetSparseFloatVector().Contents)) default: return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", fieldDataList[0].Type, fieldDataList[0].FieldName) } @@ -1125,6 +1174,8 @@ func buildQueryResp(rowsNum int64, needFields []string, fieldDataList []*schemap row[fieldDataList[j].FieldName] = fieldDataList[j].GetVectors().GetFloat16Vector()[i*(fieldDataList[j].GetVectors().GetDim()*2) : (i+1)*(fieldDataList[j].GetVectors().GetDim()*2)] case schemapb.DataType_BFloat16Vector: row[fieldDataList[j].FieldName] = fieldDataList[j].GetVectors().GetBfloat16Vector()[i*(fieldDataList[j].GetVectors().GetDim()*2) : (i+1)*(fieldDataList[j].GetVectors().GetDim()*2)] + case schemapb.DataType_SparseFloatVector: + row[fieldDataList[j].FieldName] = typeutil.SparseFloatBytesToMap(fieldDataList[j].GetVectors().GetSparseFloatVector().Contents[i]) case schemapb.DataType_Array: row[fieldDataList[j].FieldName] = fieldDataList[j].GetScalars().GetArrayData().Data[i] case schemapb.DataType_JSON: diff --git a/internal/distributed/proxy/httpserver/utils_test.go b/internal/distributed/proxy/httpserver/utils_test.go index 945783c335..f860bb37fb 100644 --- a/internal/distributed/proxy/httpserver/utils_test.go +++ b/internal/distributed/proxy/httpserver/utils_test.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -178,21 +179,45 @@ func generateVectorFieldData(vectorType schemapb.DataType) schemapb.FieldData { }, IsDynamic: false, } - } - return schemapb.FieldData{ - Type: schemapb.DataType_FloatVector, - FieldName: FieldBookIntro, - Field: &schemapb.FieldData_Vectors{ - Vectors: &schemapb.VectorField{ - Dim: 2, - Data: &schemapb.VectorField_FloatVector{ - FloatVector: &schemapb.FloatArray{ - Data: []float32{0.1, 0.11, 0.2, 0.22, 0.3, 0.33}, + case schemapb.DataType_FloatVector: + return schemapb.FieldData{ + Type: schemapb.DataType_FloatVector, + FieldName: FieldBookIntro, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 2, + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: []float32{0.1, 0.11, 0.2, 0.22, 0.3, 0.33}, + }, }, }, }, - }, - IsDynamic: false, + IsDynamic: false, + } + case schemapb.DataType_SparseFloatVector: + contents := make([][]byte, 0, 3) + contents = append(contents, typeutil.CreateSparseFloatRow([]uint32{1, 2, 3}, []float32{0.1, 0.11, 0.2})) + contents = append(contents, typeutil.CreateSparseFloatRow([]uint32{100, 200, 300}, []float32{10.1, 20.11, 30.2})) + contents = append(contents, typeutil.CreateSparseFloatRow([]uint32{1000, 2000, 3000}, []float32{5000.1, 7000.11, 9000.2})) + return schemapb.FieldData{ + Type: schemapb.DataType_SparseFloatVector, + FieldName: FieldBookIntro, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: int64(3001), + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: int64(3001), + Contents: contents, + }, + }, + }, + }, + IsDynamic: false, + } + default: + panic("unsupported vector type") } } @@ -1005,7 +1030,7 @@ func newFieldData(fieldDatas []*schemapb.FieldData, firstFieldType schemapb.Data switch firstFieldType { case schemapb.DataType_None: - break + return fieldDatas case schemapb.DataType_Bool: return []*schemapb.FieldData{&fieldData1} case schemapb.DataType_Int8: @@ -1038,6 +1063,9 @@ func newFieldData(fieldDatas []*schemapb.FieldData, firstFieldType schemapb.Data return []*schemapb.FieldData{&fieldData10} case schemapb.DataType_JSON: return []*schemapb.FieldData{&fieldData9} + case schemapb.DataType_SparseFloatVector: + vectorField := generateVectorFieldData(firstFieldType) + return []*schemapb.FieldData{&vectorField} default: return []*schemapb.FieldData{ { @@ -1046,8 +1074,6 @@ func newFieldData(fieldDatas []*schemapb.FieldData, firstFieldType schemapb.Data }, } } - - return fieldDatas } func newSearchResult(results []map[string]interface{}) []map[string]interface{} { @@ -1225,26 +1251,30 @@ func TestVector(t *testing.T) { binaryVector := "vector-binary" float16Vector := "vector-float16" bfloat16Vector := "vector-bfloat16" + sparseFloatVector := "vector-sparse-float" row1 := map[string]interface{}{ - FieldBookID: int64(1), - floatVector: []float32{0.1, 0.11}, - binaryVector: []byte{1}, - float16Vector: []byte{1, 1, 11, 11}, - bfloat16Vector: []byte{1, 1, 11, 11}, + FieldBookID: int64(1), + floatVector: []float32{0.1, 0.11}, + binaryVector: []byte{1}, + float16Vector: []byte{1, 1, 11, 11}, + bfloat16Vector: []byte{1, 1, 11, 11}, + sparseFloatVector: map[uint32]float32{0: 0.1, 1: 0.11}, } row2 := map[string]interface{}{ - FieldBookID: int64(2), - floatVector: []float32{0.2, 0.22}, - binaryVector: []byte{2}, - float16Vector: []byte{2, 2, 22, 22}, - bfloat16Vector: []byte{2, 2, 22, 22}, + FieldBookID: int64(2), + floatVector: []float32{0.2, 0.22}, + binaryVector: []byte{2}, + float16Vector: []byte{2, 2, 22, 22}, + bfloat16Vector: []byte{2, 2, 22, 22}, + sparseFloatVector: map[uint32]float32{1000: 0.3, 200: 0.44}, } row3 := map[string]interface{}{ - FieldBookID: int64(3), - floatVector: []float32{0.3, 0.33}, - binaryVector: []byte{3}, - float16Vector: []byte{3, 3, 33, 33}, - bfloat16Vector: []byte{3, 3, 33, 33}, + FieldBookID: int64(3), + floatVector: []float32{0.3, 0.33}, + binaryVector: []byte{3}, + float16Vector: []byte{3, 3, 33, 33}, + bfloat16Vector: []byte{3, 3, 33, 33}, + sparseFloatVector: map[uint32]float32{987621: 32190.31, 32189: 0.0001}, } body, _ := wrapRequestBody([]map[string]interface{}{row1, row2, row3}) primaryField := generatePrimaryField(schemapb.DataType_Int64) @@ -1256,12 +1286,14 @@ func TestVector(t *testing.T) { float16VectorField.Name = float16Vector bfloat16VectorField := generateVectorFieldSchema(schemapb.DataType_BFloat16Vector) bfloat16VectorField.Name = bfloat16Vector + sparseFloatVectorField := generateVectorFieldSchema(schemapb.DataType_SparseFloatVector) + sparseFloatVectorField.Name = sparseFloatVector collectionSchema := &schemapb.CollectionSchema{ Name: DefaultCollectionName, Description: "", AutoID: false, Fields: []*schemapb.FieldSchema{ - &primaryField, &floatVectorField, &binaryVectorField, &float16VectorField, &bfloat16VectorField, + &primaryField, &floatVectorField, &binaryVectorField, &float16VectorField, &bfloat16VectorField, &sparseFloatVectorField, }, EnableDynamicField: true, } @@ -1271,27 +1303,29 @@ func TestVector(t *testing.T) { assert.Equal(t, 1, len(row[binaryVector].([]byte))) assert.Equal(t, 4, len(row[float16Vector].([]byte))) assert.Equal(t, 4, len(row[bfloat16Vector].([]byte))) + // all test sparse rows have 2 elements, each should be of 8 bytes + assert.Equal(t, 16, len(row[sparseFloatVector].([]byte))) } data, err := anyToColumns(rows, collectionSchema) assert.Equal(t, nil, err) assert.Equal(t, len(collectionSchema.Fields)+1, len(data)) - row1[bfloat16Vector] = []int64{99999999, -99999999} - body, _ = wrapRequestBody([]map[string]interface{}{row1}) - err, _ = checkAndSetData(string(body), collectionSchema) - assert.Error(t, err) - row1[float16Vector] = []int64{99999999, -99999999} - body, _ = wrapRequestBody([]map[string]interface{}{row1}) - err, _ = checkAndSetData(string(body), collectionSchema) - assert.Error(t, err) - row1[binaryVector] = []int64{99999999, -99999999} - body, _ = wrapRequestBody([]map[string]interface{}{row1}) - err, _ = checkAndSetData(string(body), collectionSchema) - assert.Error(t, err) - row1[floatVector] = []float64{math.MaxFloat64, 0} - body, _ = wrapRequestBody([]map[string]interface{}{row1}) - err, _ = checkAndSetData(string(body), collectionSchema) - assert.Error(t, err) + assertError := func(field string, value interface{}) { + row := make(map[string]interface{}) + for k, v := range row1 { + row[k] = v + } + row[field] = value + body, _ = wrapRequestBody([]map[string]interface{}{row}) + err, _ = checkAndSetData(string(body), collectionSchema) + assert.Error(t, err) + } + + assertError(bfloat16Vector, []int64{99999999, -99999999}) + assertError(float16Vector, []int64{99999999, -99999999}) + assertError(binaryVector, []int64{99999999, -99999999}) + assertError(floatVector, []float64{math.MaxFloat64, 0}) + assertError(sparseFloatVector, map[uint32]float32{0: -0.1, 1: 0.11, 2: 0.12}) } func TestBuildQueryResps(t *testing.T) { @@ -1305,7 +1339,7 @@ func TestBuildQueryResps(t *testing.T) { } dataTypes := []schemapb.DataType{ - schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, + schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector, schemapb.DataType_Bool, schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32, schemapb.DataType_Float, schemapb.DataType_Double, schemapb.DataType_String, schemapb.DataType_VarChar, diff --git a/internal/distributed/proxy/httpserver/wrap_request.go b/internal/distributed/proxy/httpserver/wrap_request.go index a8f5eec8b9..79d2f0dfa8 100644 --- a/internal/distributed/proxy/httpserver/wrap_request.go +++ b/internal/distributed/proxy/httpserver/wrap_request.go @@ -12,6 +12,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // We wrap original protobuf structure for 2 reasons: @@ -212,6 +213,40 @@ func (f *FieldData) AsSchemapb() (*schemapb.FieldData, error) { }, }, } + case schemapb.DataType_SparseFloatVector: + var wrappedData []map[string]interface{} + err := json.Unmarshal(raw, &wrappedData) + if err != nil { + return nil, newFieldDataError(f.FieldName, err) + } + if len(wrappedData) < 1 { + return nil, errors.New("at least one row for insert") + } + data := make([][]byte, len(wrappedData)) + dim := int64(0) + for _, row := range wrappedData { + rowData, err := typeutil.CreateSparseFloatRowFromMap(row) + if err != nil { + return nil, newFieldDataError(f.FieldName, err) + } + data = append(data, rowData) + rowDim := typeutil.SparseFloatRowDim(rowData) + if rowDim > dim { + dim = rowDim + } + } + + ret.Field = &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: dim, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: dim, + Contents: data, + }, + }, + }, + } default: return nil, errors.New("unsupported data type") } diff --git a/internal/distributed/proxy/httpserver/wrap_request_test.go b/internal/distributed/proxy/httpserver/wrap_request_test.go index defddf831a..4d673fb6bd 100644 --- a/internal/distributed/proxy/httpserver/wrap_request_test.go +++ b/internal/distributed/proxy/httpserver/wrap_request_test.go @@ -219,6 +219,101 @@ func TestFieldData_AsSchemapb(t *testing.T) { _, err := fieldData.AsSchemapb() assert.Error(t, err) }) + + t.Run("sparsefloatvector_ok_1", func(t *testing.T) { + fieldData := FieldData{ + Type: schemapb.DataType_SparseFloatVector, + Field: []byte(`[ + {"1": 0.1, "2": 0.2}, + {"3": 0.1, "5": 0.2}, + {"4": 0.1, "6": 0.2} + ]`), + } + raw, _ := json.Marshal(fieldData) + json.Unmarshal(raw, &fieldData) + _, err := fieldData.AsSchemapb() + assert.NoError(t, err) + }) + + t.Run("sparsefloatvector_ok_2", func(t *testing.T) { + fieldData := FieldData{ + Type: schemapb.DataType_SparseFloatVector, + Field: []byte(`[ + {"indices": [1, 2], "values": [0.1, 0.2]}, + {"indices": [3, 5], "values": [0.1, 0.2]}, + {"indices": [4, 6], "values": [0.1, 0.2]} + ]`), + } + raw, _ := json.Marshal(fieldData) + json.Unmarshal(raw, &fieldData) + _, err := fieldData.AsSchemapb() + assert.NoError(t, err) + }) + + t.Run("sparsefloatvector_ok_3", func(t *testing.T) { + fieldData := FieldData{ + Type: schemapb.DataType_SparseFloatVector, + Field: []byte(`[ + {"indices": [1, 2], "values": [0.1, 0.2]}, + {"3": 0.1, "5": 0.2}, + {"indices": [4, 6], "values": [0.1, 0.2]} + ]`), + } + raw, _ := json.Marshal(fieldData) + json.Unmarshal(raw, &fieldData) + _, err := fieldData.AsSchemapb() + assert.NoError(t, err) + }) + + t.Run("sparsefloatvector_empty_err", func(t *testing.T) { + fieldData := FieldData{ + Type: schemapb.DataType_SparseFloatVector, + Field: []byte(`[]`), + } + raw, _ := json.Marshal(fieldData) + json.Unmarshal(raw, &fieldData) + _, err := fieldData.AsSchemapb() + assert.Error(t, err) + }) + + t.Run("sparsefloatvector_invalid_json_err", func(t *testing.T) { + fieldData := FieldData{ + Type: schemapb.DataType_SparseFloatVector, + Field: []byte(`[ + {"3": 0.1, : 0.2} + ]`), + } + raw, _ := json.Marshal(fieldData) + json.Unmarshal(raw, &fieldData) + _, err := fieldData.AsSchemapb() + assert.Error(t, err) + }) + + t.Run("sparsefloatvector_invalid_row_1_err", func(t *testing.T) { + fieldData := FieldData{ + Type: schemapb.DataType_SparseFloatVector, + Field: []byte(`[ + {"indices": [1, 2], "values": [-0.1, 0.2]}, + ]`), + } + raw, _ := json.Marshal(fieldData) + json.Unmarshal(raw, &fieldData) + _, err := fieldData.AsSchemapb() + assert.Error(t, err) + }) + + t.Run("sparsefloatvector_invalid_row_2_err", func(t *testing.T) { + fieldData := FieldData{ + Type: schemapb.DataType_SparseFloatVector, + Field: []byte(`[ + {"indices": [1, -2], "values": [0.1, 0.2]}, + ]`), + } + raw, _ := json.Marshal(fieldData) + json.Unmarshal(raw, &fieldData) + _, err := fieldData.AsSchemapb() + assert.Error(t, err) + }) } func Test_vector2Bytes(t *testing.T) { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 5a696be743..fd29f632f7 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -1550,40 +1550,75 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) { return nil, fmt.Errorf("empty JSON input") } + getValue := func(key interface{}) (float32, error) { + var val float64 + switch v := key.(type) { + case int: + val = float64(v) + case float64: + val = v + case json.Number: + if num, err := strconv.ParseFloat(v.String(), 64); err == nil { + val = num + } else { + return 0, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(v)) + } + default: + return 0, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(key)) + } + if VerifyFloat(val) != nil { + return 0, fmt.Errorf("invalid value in JSON: %v", val) + } + if val > math.MaxFloat32 { + return 0, fmt.Errorf("value too large in JSON: %v", val) + } + return float32(val), nil + } + + getIndex := func(key interface{}) (uint32, error) { + var idx int64 + switch v := key.(type) { + case int: + idx = int64(v) + case float64: + // check if the float64 is actually an integer + if v != float64(int64(v)) { + return 0, fmt.Errorf("invalid index in JSON: %v", v) + } + idx = int64(v) + case json.Number: + if num, err := strconv.ParseInt(v.String(), 0, 64); err == nil { + idx = num + } else { + return 0, err + } + default: + return 0, fmt.Errorf("invalid index type in JSON: %s", reflect.TypeOf(key)) + } + if idx >= math.MaxUint32 { + return 0, fmt.Errorf("index too large in JSON: %v", idx) + } + return uint32(idx), nil + } + jsonIndices, ok1 := input["indices"].([]interface{}) jsonValues, ok2 := input["values"].([]interface{}) if ok1 && ok2 { // try format1 for _, idx := range jsonIndices { - if i1, s1 := idx.(int); s1 { - indices = append(indices, uint32(i1)) - } else if i2, s2 := idx.(float64); s2 && i2 == float64(int(i2)) { - indices = append(indices, uint32(i2)) - } else if i3, s3 := idx.(json.Number); s3 { - if num, err := strconv.ParseUint(i3.String(), 0, 32); err == nil { - indices = append(indices, uint32(num)) - } else { - return nil, err - } - } else { - return nil, fmt.Errorf("invalid indicies type: %v(%s)", idx, reflect.TypeOf(idx)) + index, err := getIndex(idx) + if err != nil { + return nil, err } + indices = append(indices, index) } for _, val := range jsonValues { - if v1, s1 := val.(int); s1 { - values = append(values, float32(v1)) - } else if v2, s2 := val.(float64); s2 { - values = append(values, float32(v2)) - } else if v3, s3 := val.(json.Number); s3 { - if num, err := strconv.ParseFloat(v3.String(), 32); err == nil { - values = append(values, float32(num)) - } else { - return nil, err - } - } else { - return nil, fmt.Errorf("invalid values type: %v(%s)", val, reflect.TypeOf(val)) + value, err := getValue(val) + if err != nil { + return nil, err } + values = append(values, value) } } else if !ok1 && !ok2 { // try format2 @@ -1593,21 +1628,13 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) { return nil, err } - var val float64 - val, ok := v.(float64) - if !ok { - num, ok := v.(json.Number) - if !ok { - return nil, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(v)) - } - val, err = strconv.ParseFloat(num.String(), 32) - if err != nil { - return nil, err - } + val, err := getValue(v) + if err != nil { + return nil, err } indices = append(indices, uint32(idx)) - values = append(values, float32(val)) + values = append(values, val) } } else { return nil, fmt.Errorf("invalid JSON input") diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index f487336b94..6e6a6ec698 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -18,6 +18,7 @@ package typeutil import ( "encoding/binary" + "fmt" "math" "reflect" "testing" @@ -2140,7 +2141,7 @@ func TestParseJsonSparseFloatRow(t *testing.T) { assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) }) - t.Run("valid row 3", func(t *testing.T) { + t.Run("valid row 4", func(t *testing.T) { row := map[string]interface{}{"indices": []interface{}{math.MaxInt32 + 1}, "values": []interface{}{1.0}} res, err := CreateSparseFloatRowFromMap(row) assert.NoError(t, err) @@ -2177,6 +2178,30 @@ func TestParseJsonSparseFloatRow(t *testing.T) { assert.Error(t, err) }) + t.Run("invalid row 6", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{-1}, "values": []interface{}{0.2}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid row 7", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{math.MaxUint32}, "values": []interface{}{1.0}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid row 8", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{math.MaxUint32 + 10}, "values": []interface{}{1.0}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid row 9", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{10}, "values": []interface{}{float64(math.MaxFloat32) * 2}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + t.Run("valid dict row 1", func(t *testing.T) { row := map[string]interface{}{"1": 1.0, "3": 2.0, "5": 3.0} res, err := CreateSparseFloatRowFromMap(row) @@ -2228,7 +2253,19 @@ func TestParseJsonSparseFloatRow(t *testing.T) { }) t.Run("invalid dict row 7", func(t *testing.T) { - row := map[string]interface{}{"1.1": 1.0, "3": 2.0, "5": 3.0} + row := map[string]interface{}{fmt.Sprint(math.MaxUint32): 1.0, "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 8", func(t *testing.T) { + row := map[string]interface{}{fmt.Sprint(math.MaxUint32 + 10): 1.0, "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 8", func(t *testing.T) { + row := map[string]interface{}{fmt.Sprint(math.MaxUint32 + 10): 1.0, "3": 2.0, "5": float64(math.MaxFloat32) * 2} _, err := CreateSparseFloatRowFromMap(row) assert.Error(t, err) }) diff --git a/tests/restful_client_v2/testcases/test_vector_operations.py b/tests/restful_client_v2/testcases/test_vector_operations.py index d2b74552ff..5878946ff6 100644 --- a/tests/restful_client_v2/testcases/test_vector_operations.py +++ b/tests/restful_client_v2/testcases/test_vector_operations.py @@ -830,15 +830,15 @@ class TestSearchVector(TestBase): assert len(rsp['data']) == 100 - @pytest.mark.parametrize("insert_round", [1]) - @pytest.mark.parametrize("auto_id", [True]) - @pytest.mark.parametrize("is_partition_key", [True]) + @pytest.mark.parametrize("insert_round", [1, 10]) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("is_partition_key", [True, False]) @pytest.mark.parametrize("enable_dynamic_schema", [True]) @pytest.mark.parametrize("nb", [3000]) @pytest.mark.parametrize("dim", [128]) - @pytest.mark.xfail(reason="issue https://github.com/milvus-io/milvus/issues/32214") + @pytest.mark.parametrize("groupingField", ['user_id', None]) def test_search_vector_with_sparse_float_vector_datatype(self, nb, dim, insert_round, auto_id, - is_partition_key, enable_dynamic_schema): + is_partition_key, enable_dynamic_schema, groupingField): """ Insert a vector with a simple payload """ @@ -860,7 +860,7 @@ class TestSearchVector(TestBase): }, "indexParams": [ {"fieldName": "sparse_float_vector", "indexName": "sparse_float_vector", "metricType": "IP", - "indexConfig": {"index_type": "SPARSE_INVERTED_INDEX", "drop_ratio_build": "0.2"}} + "params": {"index_type": "SPARSE_INVERTED_INDEX", "drop_ratio_build": "0.2"}} ] } rsp = self.collection_client.collection_create(payload) @@ -871,20 +871,21 @@ class TestSearchVector(TestBase): # insert data for i in range(insert_round): data = [] - for i in range(nb): + for j in range(nb): + idx = i * nb + j if auto_id: tmp = { - "user_id": i%100, - "word_count": i, - "book_describe": f"book_{i}", + "user_id": idx%100, + "word_count": j, + "book_describe": f"book_{idx}", "sparse_float_vector": gen_vector(datatype="SparseFloatVector", dim=dim), } else: tmp = { - "book_id": i, - "user_id": i%100, - "word_count": i, - "book_describe": f"book_{i}", + "book_id": idx, + "user_id": idx%100, + "word_count": j, + "book_describe": f"book_{idx}", "sparse_float_vector": gen_vector(datatype="SparseFloatVector", dim=dim), } if enable_dynamic_schema: @@ -902,7 +903,6 @@ class TestSearchVector(TestBase): "collectionName": name, "data": [gen_vector(datatype="SparseFloatVector", dim=dim)], "filter": "word_count > 100", - "groupingField": "user_id", "outputFields": ["*"], "searchParams": { "metricType": "IP", @@ -910,11 +910,12 @@ class TestSearchVector(TestBase): "drop_ratio_search": "0.2", } }, - "limit": 100, + "limit": 500, } + if groupingField: + payload["groupingField"] = groupingField rsp = self.vector_client.vector_search(payload) assert rsp['code'] == 200 - assert len(rsp['data']) == 100