fix: [sparse float vector] support the new streaming deserialize reader (#31325)

issue: https://github.com/milvus-io/milvus/issues/31324

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
pull/31255/head
Buqian Zheng 2024-03-17 13:59:04 +08:00 committed by GitHub
parent 6055a89713
commit d7dbc3c9d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 1 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/testutils"
)
func generateTestData(num int) ([]*Blob, error) {
@ -46,6 +47,7 @@ func generateTestData(num int) ([]*Blob, error) {
{FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector},
{FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector},
{FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector},
{FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector},
}}
insertCodec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: 1, Schema: schema})
@ -70,6 +72,7 @@ func generateTestData(num int) ([]*Blob, error) {
field104 []byte
field105 []byte
field106 [][]byte
)
for i := 1; i <= num; i++ {
@ -108,6 +111,8 @@ func generateTestData(num int) ([]*Blob, error) {
}
field104 = append(field104, f104...)
field105 = append(field105, f104...)
field106 = append(field106, testutils.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}))
}
data := &InsertData{Data: map[FieldID]FieldData{
@ -141,6 +146,12 @@ func generateTestData(num int) ([]*Blob, error) {
Data: field105,
Dim: 4,
},
106: &SparseFloatVectorFieldData{
SparseFloatArray: schemapb.SparseFloatArray{
Dim: 28433,
Contents: field106,
},
},
}}
blobs, err := insertCodec.Serialize(1, 1, data)
@ -159,6 +170,8 @@ func assertTestData(t *testing.T, i int, value *Value) {
f104[j] = byte(i)
}
f106 := testutils.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4})
assert.EqualValues(t, &Value{
int64(i),
&Int64PrimaryKey{Value: int64(i)},
@ -183,6 +196,7 @@ func assertTestData(t *testing.T, i int, value *Value) {
103: []byte{0xff},
104: f104,
105: f104,
106: f106,
},
}, value)
}

View File

@ -361,7 +361,12 @@ func deserializeCell(col arrow.Array, dataType schemapb.DataType, i int) (interf
return nil, false
}
return arrow.Float32Traits.CastFromBytes(arr.Value(i)), true
case schemapb.DataType_SparseFloatVector:
arr, ok := col.(*array.Binary)
if !ok {
return nil, false
}
return arr.Value(i), true
default:
panic(fmt.Sprintf("unsupported type %s", dataType))
}

View File

@ -161,6 +161,8 @@ func Test_deserializeCell(t *testing.T) {
{"test float16 vector", args{col: onelinerArray(&arrow.FixedSizeBinaryType{ByteWidth: 4}, []byte("test")), dataType: schemapb.DataType_Float16Vector, i: 0}, []byte("test"), true},
{"test bfloat16 vector", args{col: onelinerArray(&arrow.FixedSizeBinaryType{ByteWidth: 4}, []byte("test")), dataType: schemapb.DataType_BFloat16Vector, i: 0}, []byte("test"), true},
{"test bfloat16 vector negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_BFloat16Vector, i: 0}, nil, false},
{"test sparse float vector", args{col: onelinerArray(arrow.BinaryTypes.Binary, []byte("1234test")), dataType: schemapb.DataType_SparseFloatVector, i: 0}, []byte("1234test"), true},
{"test sparse float vector negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_SparseFloatVector, i: 0}, nil, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {