feat: Support sparse float vector bulk insert for binlog/json/parquet (#32649)

Issue: #22837

Signed-off-by: Cai Yudong <yudong.cai@zilliz.com>
pull/32830/head
Cai Yudong 2024-05-07 18:43:30 +08:00 committed by GitHub
parent 53874ce245
commit bcdbd1966e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 221 additions and 69 deletions

View File

@ -653,7 +653,7 @@ func (data *BFloat16VectorFieldData) AppendRows(rows interface{}) error {
}
func (data *SparseFloatVectorFieldData) AppendRows(rows interface{}) error {
v, ok := rows.(SparseFloatVectorFieldData)
v, ok := rows.(*SparseFloatVectorFieldData)
if !ok {
return merr.WrapErrParameterInvalid("SparseFloatVectorFieldData", rows, "Wrong rows type")
}

View File

@ -117,6 +117,7 @@ func (s *InsertDataSuite) TestMemorySize() {
s.Equal(s.iDataEmpty.Data[FloatVectorField].GetMemorySize(), 4)
s.Equal(s.iDataEmpty.Data[Float16VectorField].GetMemorySize(), 4)
s.Equal(s.iDataEmpty.Data[BFloat16VectorField].GetMemorySize(), 4)
s.Equal(s.iDataEmpty.Data[SparseFloatVectorField].GetMemorySize(), 0)
s.Equal(s.iDataOneRow.Data[RowIDField].GetMemorySize(), 8)
s.Equal(s.iDataOneRow.Data[TimestampField].GetMemorySize(), 8)
@ -134,6 +135,7 @@ func (s *InsertDataSuite) TestMemorySize() {
s.Equal(s.iDataOneRow.Data[FloatVectorField].GetMemorySize(), 20)
s.Equal(s.iDataOneRow.Data[Float16VectorField].GetMemorySize(), 12)
s.Equal(s.iDataOneRow.Data[BFloat16VectorField].GetMemorySize(), 12)
s.Equal(s.iDataOneRow.Data[SparseFloatVectorField].GetMemorySize(), 28)
s.Equal(s.iDataTwoRows.Data[RowIDField].GetMemorySize(), 16)
s.Equal(s.iDataTwoRows.Data[TimestampField].GetMemorySize(), 16)
@ -150,6 +152,7 @@ func (s *InsertDataSuite) TestMemorySize() {
s.Equal(s.iDataTwoRows.Data[FloatVectorField].GetMemorySize(), 36)
s.Equal(s.iDataTwoRows.Data[Float16VectorField].GetMemorySize(), 20)
s.Equal(s.iDataTwoRows.Data[BFloat16VectorField].GetMemorySize(), 20)
s.Equal(s.iDataTwoRows.Data[SparseFloatVectorField].GetMemorySize(), 54)
}
func (s *InsertDataSuite) TestGetRowSize() {
@ -169,6 +172,7 @@ func (s *InsertDataSuite) TestGetRowSize() {
s.Equal(s.iDataOneRow.Data[FloatVectorField].GetRowSize(0), 16)
s.Equal(s.iDataOneRow.Data[Float16VectorField].GetRowSize(0), 8)
s.Equal(s.iDataOneRow.Data[BFloat16VectorField].GetRowSize(0), 8)
s.Equal(s.iDataOneRow.Data[SparseFloatVectorField].GetRowSize(0), 24)
}
func (s *InsertDataSuite) TestGetDataType() {

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/testutils"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -150,6 +151,10 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie
vectors := data.(*storage.BFloat16VectorFieldData).Data
err = evt.AddBFloat16VectorToPayload(vectors, int(dim))
assert.NoError(t, err)
case schemapb.DataType_SparseFloatVector:
vectors := data.(*storage.SparseFloatVectorFieldData)
err = evt.AddSparseFloatVectorToPayload(vectors)
assert.NoError(t, err)
default:
assert.True(t, false)
return nil
@ -255,6 +260,11 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount
_, err = rand2.Read(bfloat16VecData)
assert.NoError(t, err)
insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)}
case schemapb.DataType_SparseFloatVector:
sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount)
insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{
SparseFloatArray: *sparseFloatVecData,
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
varcharData := make([]string, 0)
for i := 0; i < rowCount; i++ {
@ -479,6 +489,8 @@ func (suite *ReaderSuite) TestVector() {
suite.run(schemapb.DataType_Int32)
suite.vecDataType = schemapb.DataType_BFloat16Vector
suite.run(schemapb.DataType_Int32)
suite.vecDataType = schemapb.DataType_SparseFloatVector
suite.run(schemapb.DataType_Int32)
}
func TestUtil(t *testing.T) {

View File

@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/testutils"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -143,6 +144,11 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount
_, err = rand2.Read(bfloat16VecData)
assert.NoError(t, err)
insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)}
case schemapb.DataType_SparseFloatVector:
sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount)
insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{
SparseFloatArray: *sparseFloatVecData,
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
varcharData := make([]string, 0)
for i := 0; i < rowCount; i++ {
@ -235,18 +241,19 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
data := make(map[int64]interface{})
for fieldID, v := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
if dataType == schemapb.DataType_Array {
switch dataType {
case schemapb.DataType_Array:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
} else if dataType == schemapb.DataType_JSON {
case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte))
} else if dataType == schemapb.DataType_BinaryVector || dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector {
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector:
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {
ints = append(ints, int(b))
}
data[fieldID] = ints
} else {
default:
data[fieldID] = v.GetRow(i)
}
}
@ -321,6 +328,8 @@ func (suite *ReaderSuite) TestVector() {
suite.run(schemapb.DataType_Int32)
suite.vecDataType = schemapb.DataType_BFloat16Vector
suite.run(schemapb.DataType_Int32)
suite.vecDataType = schemapb.DataType_SparseFloatVector
suite.run(schemapb.DataType_Int32)
}
func TestUtil(t *testing.T) {

View File

@ -49,10 +49,13 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
if err != nil {
return nil, err
}
dim, err := typeutil.GetDim(vecField)
dim := int64(0)
if typeutil.IsVectorType(vecField.DataType) && !typeutil.IsSparseFloatVectorType(vecField.DataType) {
dim, err = typeutil.GetDim(vecField)
if err != nil {
return nil, err
}
}
pkField, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
return nil, err
@ -323,6 +326,27 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
vec[i] = byte(num)
}
return vec, nil
case schemapb.DataType_SparseFloatVector:
arr, ok := obj.([]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
if len(arr)%8 != 0 {
return nil, r.wrapDimError(len(arr), fieldID)
}
vec := make([]byte, len(arr))
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapTypeError(arr[i], fieldID)
}
num, err := strconv.ParseUint(value.String(), 0, 8)
if err != nil {
return nil, err
}
vec[i] = byte(num)
}
return vec, nil
case schemapb.DataType_String, schemapb.DataType_VarChar:
value, ok := obj.(string)
if !ok {

View File

@ -51,7 +51,8 @@ func NewFieldReader(reader io.Reader, field *schemapb.FieldSchema) (*FieldReader
}
var dim int64 = 1
if typeutil.IsVectorType(field.GetDataType()) {
dataType := field.GetDataType()
if typeutil.IsVectorType(dataType) && !typeutil.IsSparseFloatVectorType(dataType) {
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/testutils"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -145,6 +146,11 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount
_, err = rand2.Read(bfloat16VecData)
assert.NoError(t, err)
insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)}
case schemapb.DataType_SparseFloatVector:
sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount)
insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{
SparseFloatArray: *sparseFloatVecData,
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
varcharData := make([]string, 0)
for i := 0; i < rowCount; i++ {
@ -256,7 +262,8 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
}
for fieldID, fieldData := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
if dataType == schemapb.DataType_JSON {
switch dataType {
case schemapb.DataType_JSON:
jsonStrs := make([]string, 0, fieldData.RowNum())
for i := 0; i < fieldData.RowNum(); i++ {
row := fieldData.GetRow(i)
@ -267,29 +274,7 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_FloatVector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2)
chunkedRows := make([][dim * 2]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_BinaryVector {
case schemapb.DataType_BinaryVector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8)
chunkedRows := make([][dim / 8]byte, len(chunked))
for i, innerSlice := range chunked {
@ -300,7 +285,29 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else {
case schemapb.DataType_FloatVector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2)
chunkedRows := make([][dim * 2]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
default:
reader, err := CreateReader(insertData.Data[fieldID].GetRows())
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
@ -481,6 +488,8 @@ func (suite *ReaderSuite) TestVector() {
suite.run(schemapb.DataType_Int32)
suite.vecDataType = schemapb.DataType_BFloat16Vector
suite.run(schemapb.DataType_Int32)
// suite.vecDataType = schemapb.DataType_SparseFloatVector
// suite.run(schemapb.DataType_Int32)
}
func TestUtil(t *testing.T) {

View File

@ -28,6 +28,7 @@ import (
"golang.org/x/exp/constraints"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -47,7 +48,7 @@ func NewFieldReader(ctx context.Context, reader *pqarrow.FileReader, columnIndex
}
var dim int64 = 1
if typeutil.IsVectorType(field.GetDataType()) {
if typeutil.IsVectorType(field.GetDataType()) && !typeutil.IsSparseFloatVectorType(field.GetDataType()) {
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
@ -121,8 +122,8 @@ func (c *FieldReader) Next(count int64) (any, error) {
byteArr = append(byteArr, []byte(str))
}
return byteArr, nil
case schemapb.DataType_BinaryVector:
return ReadBinaryData(c, schemapb.DataType_BinaryVector, count)
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
return ReadBinaryData(c, count)
case schemapb.DataType_FloatVector:
arrayData, err := ReadIntegerOrFloatArrayData[float32](c, count)
if err != nil {
@ -133,10 +134,8 @@ func (c *FieldReader) Next(count int64) (any, error) {
}
vectors := lo.Flatten(arrayData.([][]float32))
return vectors, nil
case schemapb.DataType_Float16Vector:
return ReadBinaryData(c, schemapb.DataType_Float16Vector, count)
case schemapb.DataType_BFloat16Vector:
return ReadBinaryData(c, schemapb.DataType_BFloat16Vector, count)
case schemapb.DataType_SparseFloatVector:
return ReadBinaryDataForSparseFloatVector(c, count)
case schemapb.DataType_Array:
data := make([]*schemapb.ScalarField, 0, count)
elementType := c.field.GetElementType()
@ -389,18 +388,19 @@ func ReadStringData(pcr *FieldReader, count int64) (any, error) {
return data, nil
}
func ReadBinaryData(pcr *FieldReader, dataType schemapb.DataType, count int64) (any, error) {
func ReadBinaryData(pcr *FieldReader, count int64) (any, error) {
dataType := pcr.field.GetDataType()
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
data := make([]byte, 0, count)
for _, chunk := range chunked.Chunks() {
dataNums := chunk.Data().Len()
rows := chunk.Data().Len()
switch chunk.DataType().ID() {
case arrow.BINARY:
binaryReader := chunk.(*array.Binary)
for i := 0; i < dataNums; i++ {
for i := 0; i < rows; i++ {
data = append(data, binaryReader.Value(i)...)
}
case arrow.LIST:
@ -412,9 +412,7 @@ func ReadBinaryData(pcr *FieldReader, dataType schemapb.DataType, count int64) (
if !ok {
return nil, WrapTypeErr("binary", listReader.ListValues().DataType().Name(), pcr.field)
}
for i := 0; i < uint8Reader.Len(); i++ {
data = append(data, uint8Reader.Value(i))
}
data = append(data, uint8Reader.Uint8Values()...)
default:
return nil, WrapTypeErr("binary", chunk.DataType().Name(), pcr.field)
}
@ -425,27 +423,80 @@ func ReadBinaryData(pcr *FieldReader, dataType schemapb.DataType, count int64) (
return data, nil
}
func isVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) bool {
if len(offsets) < 1 {
return false
func ReadBinaryDataForSparseFloatVector(pcr *FieldReader, count int64) (any, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
var elemCount int = 0
switch dataType {
case schemapb.DataType_BinaryVector:
elemCount = dim / 8
case schemapb.DataType_FloatVector:
elemCount = dim
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
elemCount = dim * 2
data := make([][]byte, 0, count)
maxDim := uint32(0)
for _, chunk := range chunked.Chunks() {
rows := chunk.Data().Len()
listReader := chunk.(*array.List)
offsets := listReader.Offsets()
if !isVectorAligned(offsets, pcr.dim, schemapb.DataType_SparseFloatVector) {
return nil, merr.WrapErrImportFailed("%s not aligned", schemapb.DataType_SparseFloatVector.String())
}
uint8Reader, ok := listReader.ListValues().(*array.Uint8)
if !ok {
return nil, WrapTypeErr("binary", listReader.ListValues().DataType().Name(), pcr.field)
}
vecData := uint8Reader.Uint8Values()
for i := 0; i < rows; i++ {
elemCount := int((offsets[i+1] - offsets[i]) / 8)
rowVec := vecData[offsets[i]:offsets[i+1]]
data = append(data, rowVec)
maxIdx := typeutil.SparseFloatRowIndexAt(rowVec, elemCount-1)
if maxIdx+1 > maxDim {
maxDim = maxIdx + 1
}
}
}
return &storage.SparseFloatVectorFieldData{
SparseFloatArray: schemapb.SparseFloatArray{
Dim: int64(maxDim),
Contents: data,
},
}, nil
}
func checkVectorAlignWithDim(offsets []int32, dim int32) bool {
for i := 1; i < len(offsets); i++ {
if offsets[i]-offsets[i-1] != int32(elemCount) {
if offsets[i]-offsets[i-1] != dim {
return false
}
}
return true
}
func checkSparseFloatVectorAlign(offsets []int32) bool {
// index: 4 bytes, value: 4 bytes
for i := 1; i < len(offsets); i++ {
if (offsets[i]-offsets[i-1])%8 != 0 {
return false
}
}
return true
}
func isVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) bool {
if len(offsets) < 1 {
return false
}
switch dataType {
case schemapb.DataType_BinaryVector:
return checkVectorAlignWithDim(offsets, int32(dim/8))
case schemapb.DataType_FloatVector:
return checkVectorAlignWithDim(offsets, int32(dim))
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
return checkVectorAlignWithDim(offsets, int32(dim*2))
case schemapb.DataType_SparseFloatVector:
return checkSparseFloatVectorAlign(offsets)
default:
return false
}
}
func ReadBoolArrayData(pcr *FieldReader, count int64) (any, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {

View File

@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/testutils"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -80,7 +81,7 @@ func buildArrayData(schema *schemapb.CollectionSchema, rows int) ([]arrow.Array,
}
for _, field := range schema.Fields {
dim := 1
if typeutil.IsVectorType(field.GetDataType()) {
if typeutil.IsVectorType(field.GetDataType()) && !typeutil.IsSparseFloatVectorType(field.GetDataType()) {
dim2, err := typeutil.GetDim(field)
if err != nil {
return nil, nil, err
@ -213,6 +214,25 @@ func buildArrayData(schema *schemapb.CollectionSchema, rows int) ([]arrow.Array,
insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: dim}
builder.AppendValues(offsets, valid)
columns = append(columns, builder.NewListArray())
case schemapb.DataType_SparseFloatVector:
sparsefloatVecData := make([]byte, 0)
builder := array.NewListBuilder(mem, &arrow.Uint8Type{})
offsets := make([]int32, 0, rows+1)
valid := make([]bool, 0, rows)
vecData := testutils.GenerateSparseFloatVectors(rows)
offsets = append(offsets, 0)
for i := 0; i < rows; i++ {
rowVecData := vecData.GetContents()[i]
sparsefloatVecData = append(sparsefloatVecData, rowVecData...)
offsets = append(offsets, offsets[i]+int32(len(rowVecData)))
valid = append(valid, true)
}
builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparsefloatVecData, nil)
insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{
SparseFloatArray: *vecData,
}
builder.AppendValues(offsets, valid)
columns = append(columns, builder.NewListArray())
case schemapb.DataType_BinaryVector:
if isBinary {
binVecData := make([][]byte, 0)
@ -617,6 +637,8 @@ func (s *ReaderSuite) TestVector() {
s.run(schemapb.DataType_Int32)
s.vecDataType = schemapb.DataType_BFloat16Vector
s.run(schemapb.DataType_Int32)
s.vecDataType = schemapb.DataType_SparseFloatVector
s.run(schemapb.DataType_Int32)
}
func TestUtil(t *testing.T) {

View File

@ -183,7 +183,7 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da
Nullable: true,
Metadata: arrow.Metadata{},
}), nil
case schemapb.DataType_BinaryVector:
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector:
return arrow.ListOfField(arrow.Field{
Name: "item",
Type: &arrow.Uint8Type{},
@ -197,13 +197,6 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da
Nullable: true,
Metadata: arrow.Metadata{},
}), nil
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
return arrow.ListOfField(arrow.Field{
Name: "item",
Type: &arrow.Uint8Type{},
Nullable: true,
Metadata: arrow.Metadata{},
}), nil
default:
return nil, merr.WrapErrParameterInvalidMsg("unsupported data type %v", dataType.String())
}

View File

@ -213,6 +213,11 @@ func (s *BulkInsertSuite) TestMultiFileTypes() {
s.indexType = indexparamcheck.IndexHNSW
s.metricType = metric.L2
s.run()
// s.vecType = schemapb.DataType_SparseFloatVector
// s.indexType = indexparamcheck.IndexSparseWand
// s.metricType = metric.IP
// s.run()
}
}

View File

@ -43,6 +43,7 @@ import (
pq "github.com/milvus-io/milvus/internal/util/importutilv2/parquet"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/testutils"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/milvus-io/milvus/tests/integration"
)
@ -133,6 +134,11 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount
_, err = rand2.Read(bfloat16VecData)
assert.NoError(t, err)
insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)}
case schemapb.DataType_SparseFloatVector:
sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount)
insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{
SparseFloatArray: *sparseFloatVecData,
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
varcharData := make([]string, 0)
for i := 0; i < rowCount; i++ {
@ -265,6 +271,22 @@ func buildArrayData(dataType, elemType schemapb.DataType, dim, rows int) arrow.A
}
builder.AppendValues(offsets, valid)
return builder.NewListArray()
case schemapb.DataType_SparseFloatVector:
sparsefloatVecData := make([]byte, 0)
builder := array.NewListBuilder(mem, &arrow.Uint8Type{})
offsets := make([]int32, 0, rows+1)
valid := make([]bool, 0, rows)
vecData := testutils.GenerateSparseFloatVectors(rows)
offsets = append(offsets, 0)
for i := 0; i < rows; i++ {
rowVecData := vecData.GetContents()[i]
sparsefloatVecData = append(sparsefloatVecData, rowVecData...)
offsets = append(offsets, offsets[i]+int32(len(rowVecData)))
valid = append(valid, true)
}
builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparsefloatVecData, nil)
builder.AppendValues(offsets, valid)
return builder.NewListArray()
case schemapb.DataType_JSON:
builder := array.NewStringBuilder(mem)
for i := 0; i < rows; i++ {
@ -575,7 +597,7 @@ func GenerateJSONFile(t *testing.T, filePath string, schema *schemapb.Collection
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte))
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector:
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {