From d7dbc3c9d8b90c690099d89cc1172780a9c60e4d Mon Sep 17 00:00:00 2001 From: Buqian Zheng Date: Sun, 17 Mar 2024 13:59:04 +0800 Subject: [PATCH] fix: [sparse float vector] support the new streaming deserialize reader (#31325) issue: https://github.com/milvus-io/milvus/issues/31324 Signed-off-by: Buqian Zheng --- internal/storage/binlog_iterator_test.go | 14 ++++++++++++++ internal/storage/serde.go | 7 ++++++- internal/storage/serde_test.go | 2 ++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index b2e545bebe..98a213d1b6 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func generateTestData(num int) ([]*Blob, error) { @@ -46,6 +47,7 @@ func generateTestData(num int) ([]*Blob, error) { {FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector}, {FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector}, {FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector}, + {FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector}, }} insertCodec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: 1, Schema: schema}) @@ -70,6 +72,7 @@ func generateTestData(num int) ([]*Blob, error) { field104 []byte field105 []byte + field106 [][]byte ) for i := 1; i <= num; i++ { @@ -108,6 +111,8 @@ func generateTestData(num int) ([]*Blob, error) { } field104 = append(field104, f104...) field105 = append(field105, f104...) + + field106 = append(field106, testutils.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4})) } data := &InsertData{Data: map[FieldID]FieldData{ @@ -141,6 +146,12 @@ func generateTestData(num int) ([]*Blob, error) { Data: field105, Dim: 4, }, + 106: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 28433, + Contents: field106, + }, + }, }} blobs, err := insertCodec.Serialize(1, 1, data) @@ -159,6 +170,8 @@ func assertTestData(t *testing.T, i int, value *Value) { f104[j] = byte(i) } + f106 := testutils.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}) + assert.EqualValues(t, &Value{ int64(i), &Int64PrimaryKey{Value: int64(i)}, @@ -183,6 +196,7 @@ func assertTestData(t *testing.T, i int, value *Value) { 103: []byte{0xff}, 104: f104, 105: f104, + 106: f106, }, }, value) } diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 330dd81882..b00b0dedcb 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -361,7 +361,12 @@ func deserializeCell(col arrow.Array, dataType schemapb.DataType, i int) (interf return nil, false } return arrow.Float32Traits.CastFromBytes(arr.Value(i)), true - + case schemapb.DataType_SparseFloatVector: + arr, ok := col.(*array.Binary) + if !ok { + return nil, false + } + return arr.Value(i), true default: panic(fmt.Sprintf("unsupported type %s", dataType)) } diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index f309ae41ec..0d0bfb9ee6 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -161,6 +161,8 @@ func Test_deserializeCell(t *testing.T) { {"test float16 vector", args{col: onelinerArray(&arrow.FixedSizeBinaryType{ByteWidth: 4}, []byte("test")), dataType: schemapb.DataType_Float16Vector, i: 0}, []byte("test"), true}, {"test bfloat16 vector", args{col: onelinerArray(&arrow.FixedSizeBinaryType{ByteWidth: 4}, []byte("test")), dataType: schemapb.DataType_BFloat16Vector, i: 0}, []byte("test"), true}, {"test bfloat16 vector negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_BFloat16Vector, i: 0}, nil, false}, + {"test sparse float vector", args{col: onelinerArray(arrow.BinaryTypes.Binary, []byte("1234test")), dataType: schemapb.DataType_SparseFloatVector, i: 0}, []byte("1234test"), true}, + {"test sparse float vector negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_SparseFloatVector, i: 0}, nil, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {