enhance: Bulkinsert supports null in csv formats (#35912)

see details in this issue
https://github.com/milvus-io/milvus/issues/35911

---------

Signed-off-by: OxalisCu <2127298698@qq.com>
pull/36128/head
OxalisCu 2024-09-09 19:17:07 +08:00 committed by GitHub
parent aaa8487590
commit 3a381bc247
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 199 additions and 44 deletions

View File

@ -31,7 +31,7 @@ type reader struct {
filePath string
}
func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int, sep rune) (*reader, error) {
func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int, sep rune, nullkey string) (*reader, error) {
cmReader, err := cm.Reader(ctx, path)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("read csv file failed, path=%s, err=%s", path, err.Error()))
@ -53,7 +53,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read csv header, error: %v", err))
}
rowParser, err := NewRowParser(schema, header)
rowParser, err := NewRowParser(schema, header, nullkey)
if err != nil {
return nil, err
}

View File

@ -8,7 +8,6 @@ import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -37,7 +36,7 @@ func (suite *ReaderSuite) SetupTest() {
suite.vecDataType = schemapb.DataType_FloatVector
}
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) {
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
@ -74,25 +73,31 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
Value: "128",
},
},
Nullable: nullable,
},
},
}
// config
// csv separator
sep := ','
// csv writer write null value as empty string
nullkey := ""
// generate csv data
insertData, err := testutil.CreateInsertData(schema, suite.numRows)
suite.NoError(err)
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData)
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey)
suite.NoError(err)
// write to csv file
sep := '\t'
filePath := fmt.Sprintf("/tmp/test_%d_reader.csv", rand.Int())
defer os.Remove(filePath)
// defer os.Remove(filePath)
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(suite.T(), err)
suite.NoError(err)
writer := csv.NewWriter(wf)
writer.Comma = sep
writer.WriteAll(csvData)
err = writer.WriteAll(csvData)
suite.NoError(err)
// read from csv file
@ -102,13 +107,13 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
suite.NoError(err)
// check reader separate fields by '\t'
wrongSep := ','
_, err = NewReader(ctx, cm, schema, filePath, 64*1024*1024, wrongSep)
wrongSep := '\t'
_, err = NewReader(ctx, cm, schema, filePath, 64*1024*1024, wrongSep, nullkey)
suite.Error(err)
suite.Contains(err.Error(), "value of field is missed: ")
// check data
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024, sep)
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024, sep, nullkey)
suite.NoError(err)
checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) {
@ -129,43 +134,63 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
}
func (suite *ReaderSuite) TestReadScalarFields() {
suite.run(schemapb.DataType_Bool, schemapb.DataType_None)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None)
suite.run(schemapb.DataType_Float, schemapb.DataType_None)
suite.run(schemapb.DataType_Double, schemapb.DataType_None)
suite.run(schemapb.DataType_String, schemapb.DataType_None)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None)
suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Float, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Double, schemapb.DataType_None, false)
suite.run(schemapb.DataType_String, schemapb.DataType_None, false)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Bool)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double)
suite.run(schemapb.DataType_Array, schemapb.DataType_String)
suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false)
suite.run(schemapb.DataType_Array, schemapb.DataType_String, false)
suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Float, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Double, schemapb.DataType_None, true)
suite.run(schemapb.DataType_String, schemapb.DataType_None, true)
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true)
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true)
suite.run(schemapb.DataType_Array, schemapb.DataType_String, true)
}
func (suite *ReaderSuite) TestStringPK() {
suite.pkDataType = schemapb.DataType_VarChar
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}
func (suite *ReaderSuite) TestVector() {
suite.vecDataType = schemapb.DataType_BinaryVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_FloatVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_Float16Vector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_BFloat16Vector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
suite.vecDataType = schemapb.DataType_SparseFloatVector
suite.run(schemapb.DataType_Int32, schemapb.DataType_None)
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
}
func TestUtil(t *testing.T) {

View File

@ -17,6 +17,7 @@ type RowParser interface {
Parse(raw []string) (Row, error)
}
type rowParser struct {
nullkey string
header []string
name2Dim map[string]int
name2Field map[string]*schemapb.FieldSchema
@ -24,7 +25,7 @@ type rowParser struct {
dynamicField *schemapb.FieldSchema
}
func NewRowParser(schema *schemapb.CollectionSchema, header []string) (RowParser, error) {
func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) {
name2Field := lo.KeyBy(schema.GetFields(),
func(field *schemapb.FieldSchema) string {
return field.GetName()
@ -74,6 +75,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string) (RowParser
}
return &rowParser{
nullkey: nullkey,
name2Dim: name2Dim,
header: header,
name2Field: name2Field,
@ -157,52 +159,80 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row)
}
func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, error) {
nullable := field.GetNullable()
switch field.GetDataType() {
case schemapb.DataType_Bool:
if nullable && obj == r.nullkey {
return nil, nil
}
b, err := strconv.ParseBool(obj)
if err != nil {
return false, r.wrapTypeError(obj, field)
}
return b, nil
case schemapb.DataType_Int8:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 8)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int8(num), nil
case schemapb.DataType_Int16:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 16)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int16(num), nil
case schemapb.DataType_Int32:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 32)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return int32(num), nil
case schemapb.DataType_Int64:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseInt(obj, 10, 64)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return num, nil
case schemapb.DataType_Float:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseFloat(obj, 32)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return float32(num), typeutil.VerifyFloats32([]float32{float32(num)})
case schemapb.DataType_Double:
if nullable && obj == r.nullkey {
return nil, nil
}
num, err := strconv.ParseFloat(obj, 64)
if err != nil {
return 0, r.wrapTypeError(obj, field)
}
return num, typeutil.VerifyFloats64([]float64{num})
case schemapb.DataType_VarChar, schemapb.DataType_String:
if nullable && obj == r.nullkey {
return nil, nil
}
return obj, nil
case schemapb.DataType_BinaryVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []byte
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
@ -213,6 +243,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec, nil
case schemapb.DataType_JSON:
if nullable && obj == r.nullkey {
return nil, nil
}
var data interface{}
err := json.Unmarshal([]byte(obj), &data)
if err != nil {
@ -220,6 +253,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return []byte(obj), nil
case schemapb.DataType_FloatVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
@ -230,6 +266,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec, typeutil.VerifyFloats32(vec)
case schemapb.DataType_Float16Vector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
@ -244,6 +283,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, typeutil.VerifyFloats16(vec2)
case schemapb.DataType_BFloat16Vector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
var vec []float32
err := json.Unmarshal([]byte(obj), &vec)
if err != nil {
@ -258,6 +300,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, typeutil.VerifyBFloats16(vec2)
case schemapb.DataType_SparseFloatVector:
if nullable && obj == r.nullkey {
return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
// use dec.UseNumber() to avoid float64 precision loss
var vec map[string]interface{}
dec := json.NewDecoder(strings.NewReader(obj))
@ -272,6 +317,9 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return vec2, nil
case schemapb.DataType_Array:
if nullable && obj == r.nullkey {
return nil, nil
}
var vec []interface{}
desc := json.NewDecoder(strings.NewReader(obj))
desc.UseNumber()
@ -279,6 +327,7 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
if err != nil {
return nil, r.wrapTypeError(obj, field)
}
// elements in array not support null value
scalarFieldData, err := r.arrayToFieldData(vec, field.GetElementType())
if err != nil {
return nil, err

View File

@ -51,9 +51,11 @@ func TestNewRowParser_Invalid(t *testing.T) {
{header: []string{"id", "vector", "$meta"}, expectErr: "value of field is missed: 'str'"},
}
nullkey := ""
for i, c := range cases {
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
_, err := NewRowParser(schema, c.header)
_, err := NewRowParser(schema, c.header, nullkey)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), c.expectErr))
})
@ -98,8 +100,10 @@ func TestRowParser_Parse_Valid(t *testing.T) {
{header: []string{"id", "vector", "str", "$meta"}, row: []string{"1", "[1, 2]", "xxsddsffwq", "{\"y\": 2}"}, dyFields: map[string]any{"y": 2.0, "str": "xxsddsffwq"}},
}
nullkey := ""
for i, c := range cases {
r, err := NewRowParser(schema, c.header)
r, err := NewRowParser(schema, c.header, nullkey)
assert.NoError(t, err)
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
data, err := r.Parse(c.row)
@ -161,8 +165,10 @@ func TestRowParser_Parse_Invalid(t *testing.T) {
{header: []string{"id", "vector", "x", "$meta"}, row: []string{"1", "[1, 2]", "8"}, expectErr: "the number of fields in the row is not equal to the header"},
}
nullkey := ""
for i, c := range cases {
r, err := NewRowParser(schema, c.header)
r, err := NewRowParser(schema, c.header, nullkey)
assert.NoError(t, err)
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
_, err := r.Parse(c.row)
@ -171,3 +177,58 @@ func TestRowParser_Parse_Invalid(t *testing.T) {
})
}
}
func TestRowParser_Parse_NULL(t *testing.T) {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 1,
Name: "id",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 2,
Name: "vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
},
{
FieldID: 3,
Name: "str",
DataType: schemapb.DataType_String,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: "128",
},
},
Nullable: true,
},
},
}
header := []string{"id", "vector", "str"}
type testCase struct {
nullkey string
row []string
nulldata interface{}
}
cases := []testCase{
{nullkey: "", row: []string{"1", "[1, 2]", ""}, nulldata: nil},
{nullkey: "NULL", row: []string{"1", "[1, 2]", "NULL"}, nulldata: nil},
{nullkey: "\\N", row: []string{"1", "[1, 2]", "\\N"}, nulldata: nil},
}
for i, c := range cases {
r, err := NewRowParser(schema, header, c.nullkey)
assert.NoError(t, err)
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
data, err := r.Parse(c.row)
assert.NoError(t, err)
assert.EqualValues(t, c.nulldata, data[3])
})
}
}

View File

@ -113,3 +113,12 @@ func GetCSVSep(options Options) (rune, error) {
}
return []rune(sep)[0], nil
}
func GetCSVNullKey(options Options) (string, error) {
nullKey, err := funcutil.GetAttrByKeyFromRepeatedKV("nullkey", options)
defaultNullKey := ""
if err != nil || len(nullKey) == 0 {
return defaultNullKey, nil
}
return nullKey, nil
}

View File

@ -76,7 +76,11 @@ func NewReader(ctx context.Context,
if err != nil {
return nil, err
}
return csv.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize, sep)
nullkey, err := GetCSVNullKey(options)
if err != nil {
return nil, err
}
return csv.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize, sep, nullkey)
}
return nil, merr.WrapErrImportFailed("unexpected import file")
}

View File

@ -571,7 +571,7 @@ func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *
return rows, nil
}
func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([][]string, error) {
func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData, nullkey string) ([][]string, error) {
rowNum := insertData.GetRowNum()
csvData := make([][]string, 0, rowNum+1)
@ -595,6 +595,11 @@ func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *stora
if field.GetAutoID() {
continue
}
// deal with null value
if field.GetNullable() && value.GetRow(i) == nil {
data = append(data, nullkey)
continue
}
switch dataType {
case schemapb.DataType_Array:
var arr any

View File

@ -207,10 +207,12 @@ func GenerateCSVFile(t *testing.T, filePath string, schema *schemapb.CollectionS
insertData, err := testutil.CreateInsertData(schema, count)
assert.NoError(t, err)
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData)
sep := ','
nullkey := ""
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey)
assert.NoError(t, err)
sep := ','
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(t, err)