enhance: add sparse float vector support to restful v2 (#33231)

issue: #29419
also re-enabled an e2e test using restful api, which is previously
disabled due to https://github.com/milvus-io/milvus/issues/32214.

In restful api, the accepted json formats of sparse float vector are:

* `{"indices": [1, 100, 1000], "values": [0.1, 0.2, 0.3]}`
* {"1": 0.1, "100": 0.2, "1000": 0.3}

for accepted indice and value range, see
https://milvus.io/docs/sparse_vector.md#FAQ

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
pull/33130/head
Buqian Zheng 2024-05-27 00:47:40 +08:00 committed by GitHub
parent 36cbce4def
commit 1b67cecd65
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 422 additions and 108 deletions

View File

@ -848,7 +848,10 @@ func generatePlaceholderGroup(ctx context.Context, body string, collSchema *sche
if vectorField == nil { if vectorField == nil {
return nil, errors.New("cannot find a vector field named: " + fieldName) return nil, errors.New("cannot find a vector field named: " + fieldName)
} }
dim, _ := getDim(vectorField) dim := int64(0)
if !typeutil.IsSparseFloatVectorType(vectorField.DataType) {
dim, _ = getDim(vectorField)
}
phv, err := convertVectors2Placeholder(body, vectorField.DataType, dim) phv, err := convertVectors2Placeholder(body, vectorField.DataType, dim)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -471,11 +471,11 @@ func TestDatabaseWrapper(t *testing.T) {
func TestCreateCollection(t *testing.T) { func TestCreateCollection(t *testing.T) {
postTestCases := []requestBodyTestCase{} postTestCases := []requestBodyTestCase{}
mp := mocks.NewMockProxy(t) mp := mocks.NewMockProxy(t)
mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(11) mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(12)
mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6) mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6)
mp.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6) mp.EXPECT().LoadCollection(mock.Anything, mock.Anything).Return(commonSuccessStatus, nil).Times(6)
mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Twice() mp.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Twice()
mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Once() mp.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(commonErrorStatus, nil).Twice()
testEngine := initHTTPServerV2(mp, false) testEngine := initHTTPServerV2(mp, false)
path := versionalV2(CollectionCategory, CreateAction) path := versionalV2(CollectionCategory, CreateAction)
// quickly create collection // quickly create collection
@ -564,6 +564,18 @@ func TestCreateCollection(t *testing.T) {
] ]
}}`), }}`),
}) })
// dim should not be specified for SparseFloatVector field
postTestCases = append(postTestCases, requestBodyTestCase{
path: path,
requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `", "schema": {
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": true, "elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "isPartitionKey": false, "elementTypeParams": {}},
{"fieldName": "partition_field", "dataType": "VarChar", "isPartitionKey": true, "elementTypeParams": {"max_length": 256}},
{"fieldName": "book_intro", "dataType": "SparseFloatVector", "elementTypeParams": {}}
]
}, "params": {"partitionsNum": "32"}}`),
})
postTestCases = append(postTestCases, requestBodyTestCase{ postTestCases = append(postTestCases, requestBodyTestCase{
path: path, path: path,
requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `", "schema": { requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `", "schema": {
@ -612,6 +624,18 @@ func TestCreateCollection(t *testing.T) {
errMsg: "", errMsg: "",
errCode: 65535, errCode: 65535,
}) })
postTestCases = append(postTestCases, requestBodyTestCase{
path: path,
requestBody: []byte(`{"collectionName": "` + DefaultCollectionName + `", "schema": {
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": true, "elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}},
{"fieldName": "book_intro", "dataType": "SparseFloatVector", "elementTypeParams": {"dim": 2}}
]
}, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]}`),
errMsg: "",
errCode: 65535,
})
for _, testcase := range postTestCases { for _, testcase := range postTestCases {
t.Run("post"+testcase.path, func(t *testing.T) { t.Run("post"+testcase.path, func(t *testing.T) {
@ -1240,16 +1264,19 @@ func TestSearchV2(t *testing.T) {
float16VectorField.Name = "float16Vector" float16VectorField.Name = "float16Vector"
bfloat16VectorField := generateVectorFieldSchema(schemapb.DataType_BFloat16Vector) bfloat16VectorField := generateVectorFieldSchema(schemapb.DataType_BFloat16Vector)
bfloat16VectorField.Name = "bfloat16Vector" bfloat16VectorField.Name = "bfloat16Vector"
sparseFloatVectorField := generateVectorFieldSchema(schemapb.DataType_SparseFloatVector)
sparseFloatVectorField.Name = "sparseFloatVector"
collSchema.Fields = append(collSchema.Fields, &binaryVectorField) collSchema.Fields = append(collSchema.Fields, &binaryVectorField)
collSchema.Fields = append(collSchema.Fields, &float16VectorField) collSchema.Fields = append(collSchema.Fields, &float16VectorField)
collSchema.Fields = append(collSchema.Fields, &bfloat16VectorField) collSchema.Fields = append(collSchema.Fields, &bfloat16VectorField)
collSchema.Fields = append(collSchema.Fields, &sparseFloatVectorField)
mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
CollectionName: DefaultCollectionName, CollectionName: DefaultCollectionName,
Schema: collSchema, Schema: collSchema,
ShardsNum: ShardNumDefault, ShardsNum: ShardNumDefault,
Status: &StatusSuccess, Status: &StatusSuccess,
}, nil).Times(9) }, nil).Times(10)
mp.EXPECT().Search(mock.Anything, mock.Anything).Return(&milvuspb.SearchResults{Status: commonSuccessStatus, Results: &schemapb.SearchResultData{TopK: int64(0)}}, nil).Twice() mp.EXPECT().Search(mock.Anything, mock.Anything).Return(&milvuspb.SearchResults{Status: commonSuccessStatus, Results: &schemapb.SearchResultData{TopK: int64(0)}}, nil).Times(3)
testEngine := initHTTPServerV2(mp, false) testEngine := initHTTPServerV2(mp, false)
queryTestCases := []requestBodyTestCase{} queryTestCases := []requestBodyTestCase{}
queryTestCases = append(queryTestCases, requestBodyTestCase{ queryTestCases = append(queryTestCases, requestBodyTestCase{
@ -1377,6 +1404,10 @@ func TestSearchV2(t *testing.T) {
errMsg: "can only accept json format request, error: dimension: 2, bytesLen: 4, but length of []byte: 3: invalid parameter[expected=BFloat16Vector][actual=\x01\x02\x03]", errMsg: "can only accept json format request, error: dimension: 2, bytesLen: 4, but length of []byte: 3: invalid parameter[expected=BFloat16Vector][actual=\x01\x02\x03]",
errCode: 1801, errCode: 1801,
}) })
queryTestCases = append(queryTestCases, requestBodyTestCase{
path: SearchAction,
requestBody: []byte(`{"collectionName": "book", "data": [{"1": 0.1}], "annsField": "sparseFloatVector", "filter": "book_id in [2, 4, 6, 8]", "limit": 4, "outputFields": ["word_count"]}`),
})
for _, testcase := range queryTestCases { for _, testcase := range queryTestCases {
t.Run("search", func(t *testing.T) { t.Run("search", func(t *testing.T) {

View File

@ -248,6 +248,15 @@ func checkAndSetData(body string, collSchema *schemapb.CollectionSchema) (error,
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
} }
reallyData[fieldName] = vectorArray reallyData[fieldName] = vectorArray
case schemapb.DataType_SparseFloatVector:
if dataString == "" {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], "", "missing vector field: "+fieldName), reallyDataArray
}
sparseVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(dataString))
if err != nil {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], dataString, err.Error()), reallyDataArray
}
reallyData[fieldName] = sparseVec
case schemapb.DataType_Float16Vector: case schemapb.DataType_Float16Vector:
if dataString == "" { if dataString == "" {
return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], "", "missing vector field: "+fieldName), reallyDataArray return merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(fieldType)], "", "missing vector field: "+fieldName), reallyDataArray
@ -638,6 +647,9 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema)
data = make([][]byte, 0, rowsLen) data = make([][]byte, 0, rowsLen)
dim, _ := getDim(field) dim, _ := getDim(field)
nameDims[field.Name] = dim nameDims[field.Name] = dim
case schemapb.DataType_SparseFloatVector:
data = make([][]byte, 0, rowsLen)
nameDims[field.Name] = int64(0)
default: default:
return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", field.DataType, field.Name) return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", field.DataType, field.Name)
} }
@ -704,6 +716,13 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema)
nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), candi.v.Interface().([]byte)) nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), candi.v.Interface().([]byte))
case schemapb.DataType_BFloat16Vector: case schemapb.DataType_BFloat16Vector:
nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), candi.v.Interface().([]byte)) nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), candi.v.Interface().([]byte))
case schemapb.DataType_SparseFloatVector:
content := candi.v.Interface().([]byte)
rowSparseDim := typeutil.SparseFloatRowDim(content)
if rowSparseDim > nameDims[field.Name] {
nameDims[field.Name] = rowSparseDim
}
nameColumns[field.Name] = append(nameColumns[field.Name].([][]byte), content)
default: default:
return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", field.DataType, field.Name) return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", field.DataType, field.Name)
} }
@ -895,6 +914,18 @@ func anyToColumns(rows []map[string]interface{}, sch *schemapb.CollectionSchema)
}, },
}, },
} }
case schemapb.DataType_SparseFloatVector:
colData.Field = &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: nameDims[name],
Data: &schemapb.VectorField_SparseFloatVector{
SparseFloatVector: &schemapb.SparseFloatArray{
Dim: nameDims[name],
Contents: column.([][]byte),
},
},
},
}
default: default:
return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", colData.Type, name) return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", colData.Type, name)
} }
@ -963,6 +994,19 @@ func serializeByteVectors(vectorStr string, dataType schemapb.DataType, dimensio
return values, nil return values, nil
} }
func serializeSparseFloatVectors(vectors []gjson.Result, dataType schemapb.DataType) ([][]byte, error) {
values := make([][]byte, 0)
for _, vector := range vectors {
vectorBytes := []byte(vector.String())
sparseVector, err := typeutil.CreateSparseFloatRowFromJSON(vectorBytes)
if err != nil {
return nil, merr.WrapErrParameterInvalid(schemapb.DataType_name[int32(dataType)], vector.String(), err.Error())
}
values = append(values, sparseVector)
}
return values, nil
}
func convertVectors2Placeholder(body string, dataType schemapb.DataType, dimension int64) (*commonpb.PlaceholderValue, error) { func convertVectors2Placeholder(body string, dataType schemapb.DataType, dimension int64) (*commonpb.PlaceholderValue, error) {
var valueType commonpb.PlaceholderType var valueType commonpb.PlaceholderType
var values [][]byte var values [][]byte
@ -980,6 +1024,9 @@ func convertVectors2Placeholder(body string, dataType schemapb.DataType, dimensi
case schemapb.DataType_BFloat16Vector: case schemapb.DataType_BFloat16Vector:
valueType = commonpb.PlaceholderType_BFloat16Vector valueType = commonpb.PlaceholderType_BFloat16Vector
values, err = serializeByteVectors(gjson.Get(body, HTTPRequestData).Raw, dataType, dimension, dimension*2) values, err = serializeByteVectors(gjson.Get(body, HTTPRequestData).Raw, dataType, dimension, dimension*2)
case schemapb.DataType_SparseFloatVector:
valueType = commonpb.PlaceholderType_SparseFloatVector
values, err = serializeSparseFloatVectors(gjson.Get(body, HTTPRequestData).Array(), dataType)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -1070,6 +1117,8 @@ func buildQueryResp(rowsNum int64, needFields []string, fieldDataList []*schemap
rowsNum = int64(len(fieldDataList[0].GetVectors().GetFloat16Vector())/2) / fieldDataList[0].GetVectors().GetDim() rowsNum = int64(len(fieldDataList[0].GetVectors().GetFloat16Vector())/2) / fieldDataList[0].GetVectors().GetDim()
case schemapb.DataType_BFloat16Vector: case schemapb.DataType_BFloat16Vector:
rowsNum = int64(len(fieldDataList[0].GetVectors().GetBfloat16Vector())/2) / fieldDataList[0].GetVectors().GetDim() rowsNum = int64(len(fieldDataList[0].GetVectors().GetBfloat16Vector())/2) / fieldDataList[0].GetVectors().GetDim()
case schemapb.DataType_SparseFloatVector:
rowsNum = int64(len(fieldDataList[0].GetVectors().GetSparseFloatVector().Contents))
default: default:
return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", fieldDataList[0].Type, fieldDataList[0].FieldName) return nil, fmt.Errorf("the type(%v) of field(%v) is not supported, use other sdk please", fieldDataList[0].Type, fieldDataList[0].FieldName)
} }
@ -1125,6 +1174,8 @@ func buildQueryResp(rowsNum int64, needFields []string, fieldDataList []*schemap
row[fieldDataList[j].FieldName] = fieldDataList[j].GetVectors().GetFloat16Vector()[i*(fieldDataList[j].GetVectors().GetDim()*2) : (i+1)*(fieldDataList[j].GetVectors().GetDim()*2)] row[fieldDataList[j].FieldName] = fieldDataList[j].GetVectors().GetFloat16Vector()[i*(fieldDataList[j].GetVectors().GetDim()*2) : (i+1)*(fieldDataList[j].GetVectors().GetDim()*2)]
case schemapb.DataType_BFloat16Vector: case schemapb.DataType_BFloat16Vector:
row[fieldDataList[j].FieldName] = fieldDataList[j].GetVectors().GetBfloat16Vector()[i*(fieldDataList[j].GetVectors().GetDim()*2) : (i+1)*(fieldDataList[j].GetVectors().GetDim()*2)] row[fieldDataList[j].FieldName] = fieldDataList[j].GetVectors().GetBfloat16Vector()[i*(fieldDataList[j].GetVectors().GetDim()*2) : (i+1)*(fieldDataList[j].GetVectors().GetDim()*2)]
case schemapb.DataType_SparseFloatVector:
row[fieldDataList[j].FieldName] = typeutil.SparseFloatBytesToMap(fieldDataList[j].GetVectors().GetSparseFloatVector().Contents[i])
case schemapb.DataType_Array: case schemapb.DataType_Array:
row[fieldDataList[j].FieldName] = fieldDataList[j].GetScalars().GetArrayData().Data[i] row[fieldDataList[j].FieldName] = fieldDataList[j].GetScalars().GetArrayData().Data[i]
case schemapb.DataType_JSON: case schemapb.DataType_JSON:

View File

@ -16,6 +16,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
const ( const (
@ -178,21 +179,45 @@ func generateVectorFieldData(vectorType schemapb.DataType) schemapb.FieldData {
}, },
IsDynamic: false, IsDynamic: false,
} }
} case schemapb.DataType_FloatVector:
return schemapb.FieldData{ return schemapb.FieldData{
Type: schemapb.DataType_FloatVector, Type: schemapb.DataType_FloatVector,
FieldName: FieldBookIntro, FieldName: FieldBookIntro,
Field: &schemapb.FieldData_Vectors{ Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{ Vectors: &schemapb.VectorField{
Dim: 2, Dim: 2,
Data: &schemapb.VectorField_FloatVector{ Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{ FloatVector: &schemapb.FloatArray{
Data: []float32{0.1, 0.11, 0.2, 0.22, 0.3, 0.33}, Data: []float32{0.1, 0.11, 0.2, 0.22, 0.3, 0.33},
},
}, },
}, },
}, },
}, IsDynamic: false,
IsDynamic: false, }
case schemapb.DataType_SparseFloatVector:
contents := make([][]byte, 0, 3)
contents = append(contents, typeutil.CreateSparseFloatRow([]uint32{1, 2, 3}, []float32{0.1, 0.11, 0.2}))
contents = append(contents, typeutil.CreateSparseFloatRow([]uint32{100, 200, 300}, []float32{10.1, 20.11, 30.2}))
contents = append(contents, typeutil.CreateSparseFloatRow([]uint32{1000, 2000, 3000}, []float32{5000.1, 7000.11, 9000.2}))
return schemapb.FieldData{
Type: schemapb.DataType_SparseFloatVector,
FieldName: FieldBookIntro,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(3001),
Data: &schemapb.VectorField_SparseFloatVector{
SparseFloatVector: &schemapb.SparseFloatArray{
Dim: int64(3001),
Contents: contents,
},
},
},
},
IsDynamic: false,
}
default:
panic("unsupported vector type")
} }
} }
@ -1005,7 +1030,7 @@ func newFieldData(fieldDatas []*schemapb.FieldData, firstFieldType schemapb.Data
switch firstFieldType { switch firstFieldType {
case schemapb.DataType_None: case schemapb.DataType_None:
break return fieldDatas
case schemapb.DataType_Bool: case schemapb.DataType_Bool:
return []*schemapb.FieldData{&fieldData1} return []*schemapb.FieldData{&fieldData1}
case schemapb.DataType_Int8: case schemapb.DataType_Int8:
@ -1038,6 +1063,9 @@ func newFieldData(fieldDatas []*schemapb.FieldData, firstFieldType schemapb.Data
return []*schemapb.FieldData{&fieldData10} return []*schemapb.FieldData{&fieldData10}
case schemapb.DataType_JSON: case schemapb.DataType_JSON:
return []*schemapb.FieldData{&fieldData9} return []*schemapb.FieldData{&fieldData9}
case schemapb.DataType_SparseFloatVector:
vectorField := generateVectorFieldData(firstFieldType)
return []*schemapb.FieldData{&vectorField}
default: default:
return []*schemapb.FieldData{ return []*schemapb.FieldData{
{ {
@ -1046,8 +1074,6 @@ func newFieldData(fieldDatas []*schemapb.FieldData, firstFieldType schemapb.Data
}, },
} }
} }
return fieldDatas
} }
func newSearchResult(results []map[string]interface{}) []map[string]interface{} { func newSearchResult(results []map[string]interface{}) []map[string]interface{} {
@ -1225,26 +1251,30 @@ func TestVector(t *testing.T) {
binaryVector := "vector-binary" binaryVector := "vector-binary"
float16Vector := "vector-float16" float16Vector := "vector-float16"
bfloat16Vector := "vector-bfloat16" bfloat16Vector := "vector-bfloat16"
sparseFloatVector := "vector-sparse-float"
row1 := map[string]interface{}{ row1 := map[string]interface{}{
FieldBookID: int64(1), FieldBookID: int64(1),
floatVector: []float32{0.1, 0.11}, floatVector: []float32{0.1, 0.11},
binaryVector: []byte{1}, binaryVector: []byte{1},
float16Vector: []byte{1, 1, 11, 11}, float16Vector: []byte{1, 1, 11, 11},
bfloat16Vector: []byte{1, 1, 11, 11}, bfloat16Vector: []byte{1, 1, 11, 11},
sparseFloatVector: map[uint32]float32{0: 0.1, 1: 0.11},
} }
row2 := map[string]interface{}{ row2 := map[string]interface{}{
FieldBookID: int64(2), FieldBookID: int64(2),
floatVector: []float32{0.2, 0.22}, floatVector: []float32{0.2, 0.22},
binaryVector: []byte{2}, binaryVector: []byte{2},
float16Vector: []byte{2, 2, 22, 22}, float16Vector: []byte{2, 2, 22, 22},
bfloat16Vector: []byte{2, 2, 22, 22}, bfloat16Vector: []byte{2, 2, 22, 22},
sparseFloatVector: map[uint32]float32{1000: 0.3, 200: 0.44},
} }
row3 := map[string]interface{}{ row3 := map[string]interface{}{
FieldBookID: int64(3), FieldBookID: int64(3),
floatVector: []float32{0.3, 0.33}, floatVector: []float32{0.3, 0.33},
binaryVector: []byte{3}, binaryVector: []byte{3},
float16Vector: []byte{3, 3, 33, 33}, float16Vector: []byte{3, 3, 33, 33},
bfloat16Vector: []byte{3, 3, 33, 33}, bfloat16Vector: []byte{3, 3, 33, 33},
sparseFloatVector: map[uint32]float32{987621: 32190.31, 32189: 0.0001},
} }
body, _ := wrapRequestBody([]map[string]interface{}{row1, row2, row3}) body, _ := wrapRequestBody([]map[string]interface{}{row1, row2, row3})
primaryField := generatePrimaryField(schemapb.DataType_Int64) primaryField := generatePrimaryField(schemapb.DataType_Int64)
@ -1256,12 +1286,14 @@ func TestVector(t *testing.T) {
float16VectorField.Name = float16Vector float16VectorField.Name = float16Vector
bfloat16VectorField := generateVectorFieldSchema(schemapb.DataType_BFloat16Vector) bfloat16VectorField := generateVectorFieldSchema(schemapb.DataType_BFloat16Vector)
bfloat16VectorField.Name = bfloat16Vector bfloat16VectorField.Name = bfloat16Vector
sparseFloatVectorField := generateVectorFieldSchema(schemapb.DataType_SparseFloatVector)
sparseFloatVectorField.Name = sparseFloatVector
collectionSchema := &schemapb.CollectionSchema{ collectionSchema := &schemapb.CollectionSchema{
Name: DefaultCollectionName, Name: DefaultCollectionName,
Description: "", Description: "",
AutoID: false, AutoID: false,
Fields: []*schemapb.FieldSchema{ Fields: []*schemapb.FieldSchema{
&primaryField, &floatVectorField, &binaryVectorField, &float16VectorField, &bfloat16VectorField, &primaryField, &floatVectorField, &binaryVectorField, &float16VectorField, &bfloat16VectorField, &sparseFloatVectorField,
}, },
EnableDynamicField: true, EnableDynamicField: true,
} }
@ -1271,27 +1303,29 @@ func TestVector(t *testing.T) {
assert.Equal(t, 1, len(row[binaryVector].([]byte))) assert.Equal(t, 1, len(row[binaryVector].([]byte)))
assert.Equal(t, 4, len(row[float16Vector].([]byte))) assert.Equal(t, 4, len(row[float16Vector].([]byte)))
assert.Equal(t, 4, len(row[bfloat16Vector].([]byte))) assert.Equal(t, 4, len(row[bfloat16Vector].([]byte)))
// all test sparse rows have 2 elements, each should be of 8 bytes
assert.Equal(t, 16, len(row[sparseFloatVector].([]byte)))
} }
data, err := anyToColumns(rows, collectionSchema) data, err := anyToColumns(rows, collectionSchema)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
assert.Equal(t, len(collectionSchema.Fields)+1, len(data)) assert.Equal(t, len(collectionSchema.Fields)+1, len(data))
row1[bfloat16Vector] = []int64{99999999, -99999999} assertError := func(field string, value interface{}) {
body, _ = wrapRequestBody([]map[string]interface{}{row1}) row := make(map[string]interface{})
err, _ = checkAndSetData(string(body), collectionSchema) for k, v := range row1 {
assert.Error(t, err) row[k] = v
row1[float16Vector] = []int64{99999999, -99999999} }
body, _ = wrapRequestBody([]map[string]interface{}{row1}) row[field] = value
err, _ = checkAndSetData(string(body), collectionSchema) body, _ = wrapRequestBody([]map[string]interface{}{row})
assert.Error(t, err) err, _ = checkAndSetData(string(body), collectionSchema)
row1[binaryVector] = []int64{99999999, -99999999} assert.Error(t, err)
body, _ = wrapRequestBody([]map[string]interface{}{row1}) }
err, _ = checkAndSetData(string(body), collectionSchema)
assert.Error(t, err) assertError(bfloat16Vector, []int64{99999999, -99999999})
row1[floatVector] = []float64{math.MaxFloat64, 0} assertError(float16Vector, []int64{99999999, -99999999})
body, _ = wrapRequestBody([]map[string]interface{}{row1}) assertError(binaryVector, []int64{99999999, -99999999})
err, _ = checkAndSetData(string(body), collectionSchema) assertError(floatVector, []float64{math.MaxFloat64, 0})
assert.Error(t, err) assertError(sparseFloatVector, map[uint32]float32{0: -0.1, 1: 0.11, 2: 0.12})
} }
func TestBuildQueryResps(t *testing.T) { func TestBuildQueryResps(t *testing.T) {
@ -1305,7 +1339,7 @@ func TestBuildQueryResps(t *testing.T) {
} }
dataTypes := []schemapb.DataType{ dataTypes := []schemapb.DataType{
schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector,
schemapb.DataType_Bool, schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32, schemapb.DataType_Bool, schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32,
schemapb.DataType_Float, schemapb.DataType_Double, schemapb.DataType_Float, schemapb.DataType_Double,
schemapb.DataType_String, schemapb.DataType_VarChar, schemapb.DataType_String, schemapb.DataType_VarChar,

View File

@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// We wrap original protobuf structure for 2 reasons: // We wrap original protobuf structure for 2 reasons:
@ -212,6 +213,40 @@ func (f *FieldData) AsSchemapb() (*schemapb.FieldData, error) {
}, },
}, },
} }
case schemapb.DataType_SparseFloatVector:
var wrappedData []map[string]interface{}
err := json.Unmarshal(raw, &wrappedData)
if err != nil {
return nil, newFieldDataError(f.FieldName, err)
}
if len(wrappedData) < 1 {
return nil, errors.New("at least one row for insert")
}
data := make([][]byte, len(wrappedData))
dim := int64(0)
for _, row := range wrappedData {
rowData, err := typeutil.CreateSparseFloatRowFromMap(row)
if err != nil {
return nil, newFieldDataError(f.FieldName, err)
}
data = append(data, rowData)
rowDim := typeutil.SparseFloatRowDim(rowData)
if rowDim > dim {
dim = rowDim
}
}
ret.Field = &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: dim,
Data: &schemapb.VectorField_SparseFloatVector{
SparseFloatVector: &schemapb.SparseFloatArray{
Dim: dim,
Contents: data,
},
},
},
}
default: default:
return nil, errors.New("unsupported data type") return nil, errors.New("unsupported data type")
} }

View File

@ -219,6 +219,101 @@ func TestFieldData_AsSchemapb(t *testing.T) {
_, err := fieldData.AsSchemapb() _, err := fieldData.AsSchemapb()
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("sparsefloatvector_ok_1", func(t *testing.T) {
fieldData := FieldData{
Type: schemapb.DataType_SparseFloatVector,
Field: []byte(`[
{"1": 0.1, "2": 0.2},
{"3": 0.1, "5": 0.2},
{"4": 0.1, "6": 0.2}
]`),
}
raw, _ := json.Marshal(fieldData)
json.Unmarshal(raw, &fieldData)
_, err := fieldData.AsSchemapb()
assert.NoError(t, err)
})
t.Run("sparsefloatvector_ok_2", func(t *testing.T) {
fieldData := FieldData{
Type: schemapb.DataType_SparseFloatVector,
Field: []byte(`[
{"indices": [1, 2], "values": [0.1, 0.2]},
{"indices": [3, 5], "values": [0.1, 0.2]},
{"indices": [4, 6], "values": [0.1, 0.2]}
]`),
}
raw, _ := json.Marshal(fieldData)
json.Unmarshal(raw, &fieldData)
_, err := fieldData.AsSchemapb()
assert.NoError(t, err)
})
t.Run("sparsefloatvector_ok_3", func(t *testing.T) {
fieldData := FieldData{
Type: schemapb.DataType_SparseFloatVector,
Field: []byte(`[
{"indices": [1, 2], "values": [0.1, 0.2]},
{"3": 0.1, "5": 0.2},
{"indices": [4, 6], "values": [0.1, 0.2]}
]`),
}
raw, _ := json.Marshal(fieldData)
json.Unmarshal(raw, &fieldData)
_, err := fieldData.AsSchemapb()
assert.NoError(t, err)
})
t.Run("sparsefloatvector_empty_err", func(t *testing.T) {
fieldData := FieldData{
Type: schemapb.DataType_SparseFloatVector,
Field: []byte(`[]`),
}
raw, _ := json.Marshal(fieldData)
json.Unmarshal(raw, &fieldData)
_, err := fieldData.AsSchemapb()
assert.Error(t, err)
})
t.Run("sparsefloatvector_invalid_json_err", func(t *testing.T) {
fieldData := FieldData{
Type: schemapb.DataType_SparseFloatVector,
Field: []byte(`[
{"3": 0.1, : 0.2}
]`),
}
raw, _ := json.Marshal(fieldData)
json.Unmarshal(raw, &fieldData)
_, err := fieldData.AsSchemapb()
assert.Error(t, err)
})
t.Run("sparsefloatvector_invalid_row_1_err", func(t *testing.T) {
fieldData := FieldData{
Type: schemapb.DataType_SparseFloatVector,
Field: []byte(`[
{"indices": [1, 2], "values": [-0.1, 0.2]},
]`),
}
raw, _ := json.Marshal(fieldData)
json.Unmarshal(raw, &fieldData)
_, err := fieldData.AsSchemapb()
assert.Error(t, err)
})
t.Run("sparsefloatvector_invalid_row_2_err", func(t *testing.T) {
fieldData := FieldData{
Type: schemapb.DataType_SparseFloatVector,
Field: []byte(`[
{"indices": [1, -2], "values": [0.1, 0.2]},
]`),
}
raw, _ := json.Marshal(fieldData)
json.Unmarshal(raw, &fieldData)
_, err := fieldData.AsSchemapb()
assert.Error(t, err)
})
} }
func Test_vector2Bytes(t *testing.T) { func Test_vector2Bytes(t *testing.T) {

View File

@ -1550,40 +1550,75 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) {
return nil, fmt.Errorf("empty JSON input") return nil, fmt.Errorf("empty JSON input")
} }
getValue := func(key interface{}) (float32, error) {
var val float64
switch v := key.(type) {
case int:
val = float64(v)
case float64:
val = v
case json.Number:
if num, err := strconv.ParseFloat(v.String(), 64); err == nil {
val = num
} else {
return 0, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(v))
}
default:
return 0, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(key))
}
if VerifyFloat(val) != nil {
return 0, fmt.Errorf("invalid value in JSON: %v", val)
}
if val > math.MaxFloat32 {
return 0, fmt.Errorf("value too large in JSON: %v", val)
}
return float32(val), nil
}
getIndex := func(key interface{}) (uint32, error) {
var idx int64
switch v := key.(type) {
case int:
idx = int64(v)
case float64:
// check if the float64 is actually an integer
if v != float64(int64(v)) {
return 0, fmt.Errorf("invalid index in JSON: %v", v)
}
idx = int64(v)
case json.Number:
if num, err := strconv.ParseInt(v.String(), 0, 64); err == nil {
idx = num
} else {
return 0, err
}
default:
return 0, fmt.Errorf("invalid index type in JSON: %s", reflect.TypeOf(key))
}
if idx >= math.MaxUint32 {
return 0, fmt.Errorf("index too large in JSON: %v", idx)
}
return uint32(idx), nil
}
jsonIndices, ok1 := input["indices"].([]interface{}) jsonIndices, ok1 := input["indices"].([]interface{})
jsonValues, ok2 := input["values"].([]interface{}) jsonValues, ok2 := input["values"].([]interface{})
if ok1 && ok2 { if ok1 && ok2 {
// try format1 // try format1
for _, idx := range jsonIndices { for _, idx := range jsonIndices {
if i1, s1 := idx.(int); s1 { index, err := getIndex(idx)
indices = append(indices, uint32(i1)) if err != nil {
} else if i2, s2 := idx.(float64); s2 && i2 == float64(int(i2)) { return nil, err
indices = append(indices, uint32(i2))
} else if i3, s3 := idx.(json.Number); s3 {
if num, err := strconv.ParseUint(i3.String(), 0, 32); err == nil {
indices = append(indices, uint32(num))
} else {
return nil, err
}
} else {
return nil, fmt.Errorf("invalid indicies type: %v(%s)", idx, reflect.TypeOf(idx))
} }
indices = append(indices, index)
} }
for _, val := range jsonValues { for _, val := range jsonValues {
if v1, s1 := val.(int); s1 { value, err := getValue(val)
values = append(values, float32(v1)) if err != nil {
} else if v2, s2 := val.(float64); s2 { return nil, err
values = append(values, float32(v2))
} else if v3, s3 := val.(json.Number); s3 {
if num, err := strconv.ParseFloat(v3.String(), 32); err == nil {
values = append(values, float32(num))
} else {
return nil, err
}
} else {
return nil, fmt.Errorf("invalid values type: %v(%s)", val, reflect.TypeOf(val))
} }
values = append(values, value)
} }
} else if !ok1 && !ok2 { } else if !ok1 && !ok2 {
// try format2 // try format2
@ -1593,21 +1628,13 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) {
return nil, err return nil, err
} }
var val float64 val, err := getValue(v)
val, ok := v.(float64) if err != nil {
if !ok { return nil, err
num, ok := v.(json.Number)
if !ok {
return nil, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(v))
}
val, err = strconv.ParseFloat(num.String(), 32)
if err != nil {
return nil, err
}
} }
indices = append(indices, uint32(idx)) indices = append(indices, uint32(idx))
values = append(values, float32(val)) values = append(values, val)
} }
} else { } else {
return nil, fmt.Errorf("invalid JSON input") return nil, fmt.Errorf("invalid JSON input")

View File

@ -18,6 +18,7 @@ package typeutil
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"math" "math"
"reflect" "reflect"
"testing" "testing"
@ -2140,7 +2141,7 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res)
}) })
t.Run("valid row 3", func(t *testing.T) { t.Run("valid row 4", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{math.MaxInt32 + 1}, "values": []interface{}{1.0}} row := map[string]interface{}{"indices": []interface{}{math.MaxInt32 + 1}, "values": []interface{}{1.0}}
res, err := CreateSparseFloatRowFromMap(row) res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err) assert.NoError(t, err)
@ -2177,6 +2178,30 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("invalid row 6", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{-1}, "values": []interface{}{0.2}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 7", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{math.MaxUint32}, "values": []interface{}{1.0}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 8", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{math.MaxUint32 + 10}, "values": []interface{}{1.0}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 9", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{10}, "values": []interface{}{float64(math.MaxFloat32) * 2}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("valid dict row 1", func(t *testing.T) { t.Run("valid dict row 1", func(t *testing.T) {
row := map[string]interface{}{"1": 1.0, "3": 2.0, "5": 3.0} row := map[string]interface{}{"1": 1.0, "3": 2.0, "5": 3.0}
res, err := CreateSparseFloatRowFromMap(row) res, err := CreateSparseFloatRowFromMap(row)
@ -2228,7 +2253,19 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
}) })
t.Run("invalid dict row 7", func(t *testing.T) { t.Run("invalid dict row 7", func(t *testing.T) {
row := map[string]interface{}{"1.1": 1.0, "3": 2.0, "5": 3.0} row := map[string]interface{}{fmt.Sprint(math.MaxUint32): 1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 8", func(t *testing.T) {
row := map[string]interface{}{fmt.Sprint(math.MaxUint32 + 10): 1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 8", func(t *testing.T) {
row := map[string]interface{}{fmt.Sprint(math.MaxUint32 + 10): 1.0, "3": 2.0, "5": float64(math.MaxFloat32) * 2}
_, err := CreateSparseFloatRowFromMap(row) _, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err) assert.Error(t, err)
}) })

View File

@ -830,15 +830,15 @@ class TestSearchVector(TestBase):
assert len(rsp['data']) == 100 assert len(rsp['data']) == 100
@pytest.mark.parametrize("insert_round", [1]) @pytest.mark.parametrize("insert_round", [1, 10])
@pytest.mark.parametrize("auto_id", [True]) @pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("is_partition_key", [True]) @pytest.mark.parametrize("is_partition_key", [True, False])
@pytest.mark.parametrize("enable_dynamic_schema", [True]) @pytest.mark.parametrize("enable_dynamic_schema", [True])
@pytest.mark.parametrize("nb", [3000]) @pytest.mark.parametrize("nb", [3000])
@pytest.mark.parametrize("dim", [128]) @pytest.mark.parametrize("dim", [128])
@pytest.mark.xfail(reason="issue https://github.com/milvus-io/milvus/issues/32214") @pytest.mark.parametrize("groupingField", ['user_id', None])
def test_search_vector_with_sparse_float_vector_datatype(self, nb, dim, insert_round, auto_id, def test_search_vector_with_sparse_float_vector_datatype(self, nb, dim, insert_round, auto_id,
is_partition_key, enable_dynamic_schema): is_partition_key, enable_dynamic_schema, groupingField):
""" """
Insert a vector with a simple payload Insert a vector with a simple payload
""" """
@ -860,7 +860,7 @@ class TestSearchVector(TestBase):
}, },
"indexParams": [ "indexParams": [
{"fieldName": "sparse_float_vector", "indexName": "sparse_float_vector", "metricType": "IP", {"fieldName": "sparse_float_vector", "indexName": "sparse_float_vector", "metricType": "IP",
"indexConfig": {"index_type": "SPARSE_INVERTED_INDEX", "drop_ratio_build": "0.2"}} "params": {"index_type": "SPARSE_INVERTED_INDEX", "drop_ratio_build": "0.2"}}
] ]
} }
rsp = self.collection_client.collection_create(payload) rsp = self.collection_client.collection_create(payload)
@ -871,20 +871,21 @@ class TestSearchVector(TestBase):
# insert data # insert data
for i in range(insert_round): for i in range(insert_round):
data = [] data = []
for i in range(nb): for j in range(nb):
idx = i * nb + j
if auto_id: if auto_id:
tmp = { tmp = {
"user_id": i%100, "user_id": idx%100,
"word_count": i, "word_count": j,
"book_describe": f"book_{i}", "book_describe": f"book_{idx}",
"sparse_float_vector": gen_vector(datatype="SparseFloatVector", dim=dim), "sparse_float_vector": gen_vector(datatype="SparseFloatVector", dim=dim),
} }
else: else:
tmp = { tmp = {
"book_id": i, "book_id": idx,
"user_id": i%100, "user_id": idx%100,
"word_count": i, "word_count": j,
"book_describe": f"book_{i}", "book_describe": f"book_{idx}",
"sparse_float_vector": gen_vector(datatype="SparseFloatVector", dim=dim), "sparse_float_vector": gen_vector(datatype="SparseFloatVector", dim=dim),
} }
if enable_dynamic_schema: if enable_dynamic_schema:
@ -902,7 +903,6 @@ class TestSearchVector(TestBase):
"collectionName": name, "collectionName": name,
"data": [gen_vector(datatype="SparseFloatVector", dim=dim)], "data": [gen_vector(datatype="SparseFloatVector", dim=dim)],
"filter": "word_count > 100", "filter": "word_count > 100",
"groupingField": "user_id",
"outputFields": ["*"], "outputFields": ["*"],
"searchParams": { "searchParams": {
"metricType": "IP", "metricType": "IP",
@ -910,11 +910,12 @@ class TestSearchVector(TestBase):
"drop_ratio_search": "0.2", "drop_ratio_search": "0.2",
} }
}, },
"limit": 100, "limit": 500,
} }
if groupingField:
payload["groupingField"] = groupingField
rsp = self.vector_client.vector_search(payload) rsp = self.vector_client.vector_search(payload)
assert rsp['code'] == 200 assert rsp['code'] == 200
assert len(rsp['data']) == 100