From bcdbd1966e58dea60dc84cf1fb20a74edaa440e5 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Tue, 7 May 2024 18:43:30 +0800 Subject: [PATCH] feat: Support sparse float vector bulk insert for binlog/json/parquet (#32649) Issue: #22837 Signed-off-by: Cai Yudong --- internal/storage/insert_data.go | 2 +- internal/storage/insert_data_test.go | 4 + .../util/importutilv2/binlog/reader_test.go | 12 +++ .../util/importutilv2/json/reader_test.go | 17 ++- internal/util/importutilv2/json/row_parser.go | 30 +++++- .../util/importutilv2/numpy/field_reader.go | 3 +- .../util/importutilv2/numpy/reader_test.go | 59 +++++----- .../util/importutilv2/parquet/field_reader.go | 101 +++++++++++++----- .../util/importutilv2/parquet/reader_test.go | 24 ++++- internal/util/importutilv2/parquet/util.go | 9 +- tests/integration/import/import_test.go | 5 + tests/integration/import/util_test.go | 24 ++++- 12 files changed, 221 insertions(+), 69 deletions(-) diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index a2b8bf9b1a..3722965279 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -653,7 +653,7 @@ func (data *BFloat16VectorFieldData) AppendRows(rows interface{}) error { } func (data *SparseFloatVectorFieldData) AppendRows(rows interface{}) error { - v, ok := rows.(SparseFloatVectorFieldData) + v, ok := rows.(*SparseFloatVectorFieldData) if !ok { return merr.WrapErrParameterInvalid("SparseFloatVectorFieldData", rows, "Wrong rows type") } diff --git a/internal/storage/insert_data_test.go b/internal/storage/insert_data_test.go index c097755e01..cdd85331fb 100644 --- a/internal/storage/insert_data_test.go +++ b/internal/storage/insert_data_test.go @@ -117,6 +117,7 @@ func (s *InsertDataSuite) TestMemorySize() { s.Equal(s.iDataEmpty.Data[FloatVectorField].GetMemorySize(), 4) s.Equal(s.iDataEmpty.Data[Float16VectorField].GetMemorySize(), 4) s.Equal(s.iDataEmpty.Data[BFloat16VectorField].GetMemorySize(), 4) + s.Equal(s.iDataEmpty.Data[SparseFloatVectorField].GetMemorySize(), 0) s.Equal(s.iDataOneRow.Data[RowIDField].GetMemorySize(), 8) s.Equal(s.iDataOneRow.Data[TimestampField].GetMemorySize(), 8) @@ -134,6 +135,7 @@ func (s *InsertDataSuite) TestMemorySize() { s.Equal(s.iDataOneRow.Data[FloatVectorField].GetMemorySize(), 20) s.Equal(s.iDataOneRow.Data[Float16VectorField].GetMemorySize(), 12) s.Equal(s.iDataOneRow.Data[BFloat16VectorField].GetMemorySize(), 12) + s.Equal(s.iDataOneRow.Data[SparseFloatVectorField].GetMemorySize(), 28) s.Equal(s.iDataTwoRows.Data[RowIDField].GetMemorySize(), 16) s.Equal(s.iDataTwoRows.Data[TimestampField].GetMemorySize(), 16) @@ -150,6 +152,7 @@ func (s *InsertDataSuite) TestMemorySize() { s.Equal(s.iDataTwoRows.Data[FloatVectorField].GetMemorySize(), 36) s.Equal(s.iDataTwoRows.Data[Float16VectorField].GetMemorySize(), 20) s.Equal(s.iDataTwoRows.Data[BFloat16VectorField].GetMemorySize(), 20) + s.Equal(s.iDataTwoRows.Data[SparseFloatVectorField].GetMemorySize(), 54) } func (s *InsertDataSuite) TestGetRowSize() { @@ -169,6 +172,7 @@ func (s *InsertDataSuite) TestGetRowSize() { s.Equal(s.iDataOneRow.Data[FloatVectorField].GetRowSize(0), 16) s.Equal(s.iDataOneRow.Data[Float16VectorField].GetRowSize(0), 8) s.Equal(s.iDataOneRow.Data[BFloat16VectorField].GetRowSize(0), 8) + s.Equal(s.iDataOneRow.Data[SparseFloatVectorField].GetRowSize(0), 24) } func (s *InsertDataSuite) TestGetDataType() { diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go index 4a1ace6b7e..63e8dd42d6 100644 --- a/internal/util/importutilv2/binlog/reader_test.go +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -150,6 +151,10 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie vectors := data.(*storage.BFloat16VectorFieldData).Data err = evt.AddBFloat16VectorToPayload(vectors, int(dim)) assert.NoError(t, err) + case schemapb.DataType_SparseFloatVector: + vectors := data.(*storage.SparseFloatVectorFieldData) + err = evt.AddSparseFloatVectorToPayload(vectors) + assert.NoError(t, err) default: assert.True(t, false) return nil @@ -255,6 +260,11 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount _, err = rand2.Read(bfloat16VecData) assert.NoError(t, err) insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} + case schemapb.DataType_SparseFloatVector: + sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount) + insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ + SparseFloatArray: *sparseFloatVecData, + } case schemapb.DataType_String, schemapb.DataType_VarChar: varcharData := make([]string, 0) for i := 0; i < rowCount; i++ { @@ -479,6 +489,8 @@ func (suite *ReaderSuite) TestVector() { suite.run(schemapb.DataType_Int32) suite.vecDataType = schemapb.DataType_BFloat16Vector suite.run(schemapb.DataType_Int32) + suite.vecDataType = schemapb.DataType_SparseFloatVector + suite.run(schemapb.DataType_Int32) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index 0b7cd8c70f..494f6eb5aa 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -40,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -143,6 +144,11 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount _, err = rand2.Read(bfloat16VecData) assert.NoError(t, err) insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} + case schemapb.DataType_SparseFloatVector: + sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount) + insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ + SparseFloatArray: *sparseFloatVecData, + } case schemapb.DataType_String, schemapb.DataType_VarChar: varcharData := make([]string, 0) for i := 0; i < rowCount; i++ { @@ -235,18 +241,19 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { data := make(map[int64]interface{}) for fieldID, v := range insertData.Data { dataType := fieldIDToField[fieldID].GetDataType() - if dataType == schemapb.DataType_Array { + switch dataType { + case schemapb.DataType_Array: data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData() - } else if dataType == schemapb.DataType_JSON { + case schemapb.DataType_JSON: data[fieldID] = string(v.GetRow(i).([]byte)) - } else if dataType == schemapb.DataType_BinaryVector || dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector { + case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: bytes := v.GetRow(i).([]byte) ints := make([]int, 0, len(bytes)) for _, b := range bytes { ints = append(ints, int(b)) } data[fieldID] = ints - } else { + default: data[fieldID] = v.GetRow(i) } } @@ -321,6 +328,8 @@ func (suite *ReaderSuite) TestVector() { suite.run(schemapb.DataType_Int32) suite.vecDataType = schemapb.DataType_BFloat16Vector suite.run(schemapb.DataType_Int32) + suite.vecDataType = schemapb.DataType_SparseFloatVector + suite.run(schemapb.DataType_Int32) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 8a1cfdd993..ec61751ad9 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -49,9 +49,12 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { if err != nil { return nil, err } - dim, err := typeutil.GetDim(vecField) - if err != nil { - return nil, err + dim := int64(0) + if typeutil.IsVectorType(vecField.DataType) && !typeutil.IsSparseFloatVectorType(vecField.DataType) { + dim, err = typeutil.GetDim(vecField) + if err != nil { + return nil, err + } } pkField, err := typeutil.GetPrimaryFieldSchema(schema) if err != nil { @@ -323,6 +326,27 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { vec[i] = byte(num) } return vec, nil + case schemapb.DataType_SparseFloatVector: + arr, ok := obj.([]interface{}) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + if len(arr)%8 != 0 { + return nil, r.wrapDimError(len(arr), fieldID) + } + vec := make([]byte, len(arr)) + for i := 0; i < len(arr); i++ { + value, ok := arr[i].(json.Number) + if !ok { + return nil, r.wrapTypeError(arr[i], fieldID) + } + num, err := strconv.ParseUint(value.String(), 0, 8) + if err != nil { + return nil, err + } + vec[i] = byte(num) + } + return vec, nil case schemapb.DataType_String, schemapb.DataType_VarChar: value, ok := obj.(string) if !ok { diff --git a/internal/util/importutilv2/numpy/field_reader.go b/internal/util/importutilv2/numpy/field_reader.go index 252c381cba..23f9d313e3 100644 --- a/internal/util/importutilv2/numpy/field_reader.go +++ b/internal/util/importutilv2/numpy/field_reader.go @@ -51,7 +51,8 @@ func NewFieldReader(reader io.Reader, field *schemapb.FieldSchema) (*FieldReader } var dim int64 = 1 - if typeutil.IsVectorType(field.GetDataType()) { + dataType := field.GetDataType() + if typeutil.IsVectorType(dataType) && !typeutil.IsSparseFloatVectorType(dataType) { dim, err = typeutil.GetDim(field) if err != nil { return nil, err diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index b990b0f578..ad9b12d18b 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -145,6 +146,11 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount _, err = rand2.Read(bfloat16VecData) assert.NoError(t, err) insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} + case schemapb.DataType_SparseFloatVector: + sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount) + insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ + SparseFloatArray: *sparseFloatVecData, + } case schemapb.DataType_String, schemapb.DataType_VarChar: varcharData := make([]string, 0) for i := 0; i < rowCount; i++ { @@ -256,7 +262,8 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { } for fieldID, fieldData := range insertData.Data { dataType := fieldIDToField[fieldID].GetDataType() - if dataType == schemapb.DataType_JSON { + switch dataType { + case schemapb.DataType_JSON: jsonStrs := make([]string, 0, fieldData.RowNum()) for i := 0; i < fieldData.RowNum(); i++ { row := fieldData.GetRow(i) @@ -267,29 +274,7 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ Reader: reader, }, nil) - } else if dataType == schemapb.DataType_FloatVector { - chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim) - chunkedRows := make([][dim]float32, len(chunked)) - for i, innerSlice := range chunked { - copy(chunkedRows[i][:], innerSlice[:]) - } - reader, err := CreateReader(chunkedRows) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) - } else if dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector { - chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2) - chunkedRows := make([][dim * 2]byte, len(chunked)) - for i, innerSlice := range chunked { - copy(chunkedRows[i][:], innerSlice[:]) - } - reader, err := CreateReader(chunkedRows) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) - } else if dataType == schemapb.DataType_BinaryVector { + case schemapb.DataType_BinaryVector: chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8) chunkedRows := make([][dim / 8]byte, len(chunked)) for i, innerSlice := range chunked { @@ -300,7 +285,29 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ Reader: reader, }, nil) - } else { + case schemapb.DataType_FloatVector: + chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim) + chunkedRows := make([][dim]float32, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice[:]) + } + reader, err := CreateReader(chunkedRows) + suite.NoError(err) + cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ + Reader: reader, + }, nil) + case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2) + chunkedRows := make([][dim * 2]byte, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice[:]) + } + reader, err := CreateReader(chunkedRows) + suite.NoError(err) + cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ + Reader: reader, + }, nil) + default: reader, err := CreateReader(insertData.Data[fieldID].GetRows()) suite.NoError(err) cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ @@ -481,6 +488,8 @@ func (suite *ReaderSuite) TestVector() { suite.run(schemapb.DataType_Int32) suite.vecDataType = schemapb.DataType_BFloat16Vector suite.run(schemapb.DataType_Int32) + // suite.vecDataType = schemapb.DataType_SparseFloatVector + // suite.run(schemapb.DataType_Int32) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index a9fab4d998..0707b1f9db 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -28,6 +28,7 @@ import ( "golang.org/x/exp/constraints" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -47,7 +48,7 @@ func NewFieldReader(ctx context.Context, reader *pqarrow.FileReader, columnIndex } var dim int64 = 1 - if typeutil.IsVectorType(field.GetDataType()) { + if typeutil.IsVectorType(field.GetDataType()) && !typeutil.IsSparseFloatVectorType(field.GetDataType()) { dim, err = typeutil.GetDim(field) if err != nil { return nil, err @@ -121,8 +122,8 @@ func (c *FieldReader) Next(count int64) (any, error) { byteArr = append(byteArr, []byte(str)) } return byteArr, nil - case schemapb.DataType_BinaryVector: - return ReadBinaryData(c, schemapb.DataType_BinaryVector, count) + case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + return ReadBinaryData(c, count) case schemapb.DataType_FloatVector: arrayData, err := ReadIntegerOrFloatArrayData[float32](c, count) if err != nil { @@ -133,10 +134,8 @@ func (c *FieldReader) Next(count int64) (any, error) { } vectors := lo.Flatten(arrayData.([][]float32)) return vectors, nil - case schemapb.DataType_Float16Vector: - return ReadBinaryData(c, schemapb.DataType_Float16Vector, count) - case schemapb.DataType_BFloat16Vector: - return ReadBinaryData(c, schemapb.DataType_BFloat16Vector, count) + case schemapb.DataType_SparseFloatVector: + return ReadBinaryDataForSparseFloatVector(c, count) case schemapb.DataType_Array: data := make([]*schemapb.ScalarField, 0, count) elementType := c.field.GetElementType() @@ -389,18 +388,19 @@ func ReadStringData(pcr *FieldReader, count int64) (any, error) { return data, nil } -func ReadBinaryData(pcr *FieldReader, dataType schemapb.DataType, count int64) (any, error) { +func ReadBinaryData(pcr *FieldReader, count int64) (any, error) { + dataType := pcr.field.GetDataType() chunked, err := pcr.columnReader.NextBatch(count) if err != nil { return nil, err } data := make([]byte, 0, count) for _, chunk := range chunked.Chunks() { - dataNums := chunk.Data().Len() + rows := chunk.Data().Len() switch chunk.DataType().ID() { case arrow.BINARY: binaryReader := chunk.(*array.Binary) - for i := 0; i < dataNums; i++ { + for i := 0; i < rows; i++ { data = append(data, binaryReader.Value(i)...) } case arrow.LIST: @@ -412,9 +412,7 @@ func ReadBinaryData(pcr *FieldReader, dataType schemapb.DataType, count int64) ( if !ok { return nil, WrapTypeErr("binary", listReader.ListValues().DataType().Name(), pcr.field) } - for i := 0; i < uint8Reader.Len(); i++ { - data = append(data, uint8Reader.Value(i)) - } + data = append(data, uint8Reader.Uint8Values()...) default: return nil, WrapTypeErr("binary", chunk.DataType().Name(), pcr.field) } @@ -425,27 +423,80 @@ func ReadBinaryData(pcr *FieldReader, dataType schemapb.DataType, count int64) ( return data, nil } -func isVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) bool { - if len(offsets) < 1 { - return false +func ReadBinaryDataForSparseFloatVector(pcr *FieldReader, count int64) (any, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, err } - var elemCount int = 0 - switch dataType { - case schemapb.DataType_BinaryVector: - elemCount = dim / 8 - case schemapb.DataType_FloatVector: - elemCount = dim - case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: - elemCount = dim * 2 + data := make([][]byte, 0, count) + maxDim := uint32(0) + for _, chunk := range chunked.Chunks() { + rows := chunk.Data().Len() + listReader := chunk.(*array.List) + offsets := listReader.Offsets() + if !isVectorAligned(offsets, pcr.dim, schemapb.DataType_SparseFloatVector) { + return nil, merr.WrapErrImportFailed("%s not aligned", schemapb.DataType_SparseFloatVector.String()) + } + uint8Reader, ok := listReader.ListValues().(*array.Uint8) + if !ok { + return nil, WrapTypeErr("binary", listReader.ListValues().DataType().Name(), pcr.field) + } + vecData := uint8Reader.Uint8Values() + for i := 0; i < rows; i++ { + elemCount := int((offsets[i+1] - offsets[i]) / 8) + rowVec := vecData[offsets[i]:offsets[i+1]] + data = append(data, rowVec) + maxIdx := typeutil.SparseFloatRowIndexAt(rowVec, elemCount-1) + if maxIdx+1 > maxDim { + maxDim = maxIdx + 1 + } + } } + return &storage.SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: int64(maxDim), + Contents: data, + }, + }, nil +} + +func checkVectorAlignWithDim(offsets []int32, dim int32) bool { for i := 1; i < len(offsets); i++ { - if offsets[i]-offsets[i-1] != int32(elemCount) { + if offsets[i]-offsets[i-1] != dim { return false } } return true } +func checkSparseFloatVectorAlign(offsets []int32) bool { + // index: 4 bytes, value: 4 bytes + for i := 1; i < len(offsets); i++ { + if (offsets[i]-offsets[i-1])%8 != 0 { + return false + } + } + return true +} + +func isVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) bool { + if len(offsets) < 1 { + return false + } + switch dataType { + case schemapb.DataType_BinaryVector: + return checkVectorAlignWithDim(offsets, int32(dim/8)) + case schemapb.DataType_FloatVector: + return checkVectorAlignWithDim(offsets, int32(dim)) + case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + return checkVectorAlignWithDim(offsets, int32(dim*2)) + case schemapb.DataType_SparseFloatVector: + return checkSparseFloatVectorAlign(offsets) + default: + return false + } +} + func ReadBoolArrayData(pcr *FieldReader, count int64) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 193e7af5ea..3f359c2243 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -40,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -80,7 +81,7 @@ func buildArrayData(schema *schemapb.CollectionSchema, rows int) ([]arrow.Array, } for _, field := range schema.Fields { dim := 1 - if typeutil.IsVectorType(field.GetDataType()) { + if typeutil.IsVectorType(field.GetDataType()) && !typeutil.IsSparseFloatVectorType(field.GetDataType()) { dim2, err := typeutil.GetDim(field) if err != nil { return nil, nil, err @@ -213,6 +214,25 @@ func buildArrayData(schema *schemapb.CollectionSchema, rows int) ([]arrow.Array, insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: dim} builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) + case schemapb.DataType_SparseFloatVector: + sparsefloatVecData := make([]byte, 0) + builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + offsets := make([]int32, 0, rows+1) + valid := make([]bool, 0, rows) + vecData := testutils.GenerateSparseFloatVectors(rows) + offsets = append(offsets, 0) + for i := 0; i < rows; i++ { + rowVecData := vecData.GetContents()[i] + sparsefloatVecData = append(sparsefloatVecData, rowVecData...) + offsets = append(offsets, offsets[i]+int32(len(rowVecData))) + valid = append(valid, true) + } + builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparsefloatVecData, nil) + insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ + SparseFloatArray: *vecData, + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) case schemapb.DataType_BinaryVector: if isBinary { binVecData := make([][]byte, 0) @@ -617,6 +637,8 @@ func (s *ReaderSuite) TestVector() { s.run(schemapb.DataType_Int32) s.vecDataType = schemapb.DataType_BFloat16Vector s.run(schemapb.DataType_Int32) + s.vecDataType = schemapb.DataType_SparseFloatVector + s.run(schemapb.DataType_Int32) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index 9e3faf8777..4164ff4f6e 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -183,7 +183,7 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da Nullable: true, Metadata: arrow.Metadata{}, }), nil - case schemapb.DataType_BinaryVector: + case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: return arrow.ListOfField(arrow.Field{ Name: "item", Type: &arrow.Uint8Type{}, @@ -197,13 +197,6 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da Nullable: true, Metadata: arrow.Metadata{}, }), nil - case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: - return arrow.ListOfField(arrow.Field{ - Name: "item", - Type: &arrow.Uint8Type{}, - Nullable: true, - Metadata: arrow.Metadata{}, - }), nil default: return nil, merr.WrapErrParameterInvalidMsg("unsupported data type %v", dataType.String()) } diff --git a/tests/integration/import/import_test.go b/tests/integration/import/import_test.go index 0a9ce68c26..7d103cfde2 100644 --- a/tests/integration/import/import_test.go +++ b/tests/integration/import/import_test.go @@ -213,6 +213,11 @@ func (s *BulkInsertSuite) TestMultiFileTypes() { s.indexType = indexparamcheck.IndexHNSW s.metricType = metric.L2 s.run() + + // s.vecType = schemapb.DataType_SparseFloatVector + // s.indexType = indexparamcheck.IndexSparseWand + // s.metricType = metric.IP + // s.run() } } diff --git a/tests/integration/import/util_test.go b/tests/integration/import/util_test.go index f1ba43091f..61e7ba5ada 100644 --- a/tests/integration/import/util_test.go +++ b/tests/integration/import/util_test.go @@ -43,6 +43,7 @@ import ( pq "github.com/milvus-io/milvus/internal/util/importutilv2/parquet" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/tests/integration" ) @@ -133,6 +134,11 @@ func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount _, err = rand2.Read(bfloat16VecData) assert.NoError(t, err) insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} + case schemapb.DataType_SparseFloatVector: + sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount) + insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ + SparseFloatArray: *sparseFloatVecData, + } case schemapb.DataType_String, schemapb.DataType_VarChar: varcharData := make([]string, 0) for i := 0; i < rowCount; i++ { @@ -265,6 +271,22 @@ func buildArrayData(dataType, elemType schemapb.DataType, dim, rows int) arrow.A } builder.AppendValues(offsets, valid) return builder.NewListArray() + case schemapb.DataType_SparseFloatVector: + sparsefloatVecData := make([]byte, 0) + builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + offsets := make([]int32, 0, rows+1) + valid := make([]bool, 0, rows) + vecData := testutils.GenerateSparseFloatVectors(rows) + offsets = append(offsets, 0) + for i := 0; i < rows; i++ { + rowVecData := vecData.GetContents()[i] + sparsefloatVecData = append(sparsefloatVecData, rowVecData...) + offsets = append(offsets, offsets[i]+int32(len(rowVecData))) + valid = append(valid, true) + } + builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparsefloatVecData, nil) + builder.AppendValues(offsets, valid) + return builder.NewListArray() case schemapb.DataType_JSON: builder := array.NewStringBuilder(mem) for i := 0; i < rows; i++ { @@ -575,7 +597,7 @@ func GenerateJSONFile(t *testing.T, filePath string, schema *schemapb.Collection data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData() case schemapb.DataType_JSON: data[fieldID] = string(v.GetRow(i).([]byte)) - case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: bytes := v.GetRow(i).([]byte) ints := make([]int, 0, len(bytes)) for _, b := range bytes {