enhance: Optimize bulk insert unittest (#33224)

Issue: #22837

Signed-off-by: Cai Yudong <yudong.cai@zilliz.com>
pull/33351/head
Cai Yudong 2024-05-24 10:23:41 +08:00 committed by GitHub
parent 7730b910b9
commit 4004e4c545
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 169 additions and 230 deletions

View File

@ -84,7 +84,9 @@ func (i *InsertData) GetRowNum() int {
var rowNum int
for _, data := range i.Data {
rowNum = data.RowNum()
break
if rowNum > 0 {
break
}
}
return rowNum
}

View File

@ -24,7 +24,6 @@ import (
"strings"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
@ -98,64 +97,16 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
},
},
}
insertData, err := testutil.CreateInsertData(schema, suite.numRows)
suite.NoError(err)
rows := make([]map[string]any, 0, suite.numRows)
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
for i := 0; i < insertData.GetRowNum(); i++ {
data := make(map[int64]interface{})
for fieldID, v := range insertData.Data {
field := fieldIDToField[fieldID]
dataType := field.GetDataType()
elemType := field.GetElementType()
switch dataType {
case schemapb.DataType_Array:
switch elemType {
case schemapb.DataType_Bool:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetBoolData().GetData()
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
case schemapb.DataType_Int64:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetLongData().GetData()
case schemapb.DataType_Float:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetFloatData().GetData()
case schemapb.DataType_Double:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetDoubleData().GetData()
case schemapb.DataType_String:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData()
}
case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte))
case schemapb.DataType_BinaryVector:
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {
ints = append(ints, int(b))
}
data[fieldID] = ints
case schemapb.DataType_Float16Vector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.Float16BytesToFloat32Vector(bytes)
case schemapb.DataType_BFloat16Vector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.BFloat16BytesToFloat32Vector(bytes)
case schemapb.DataType_SparseFloatVector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.SparseFloatBytesToMap(bytes)
default:
data[fieldID] = v.GetRow(i)
}
}
row := lo.MapKeys(data, func(_ any, fieldID int64) string {
return fieldIDToField[fieldID].GetName()
})
rows = append(rows, row)
}
rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData)
suite.NoError(err)
jsonBytes, err := json.Marshal(rows)
suite.NoError(err)
type mockReader struct {
io.Reader
io.Closer

View File

@ -128,60 +128,54 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
io.ReaderAt
io.Seeker
}
var data interface{}
for fieldID, fieldData := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
rowNum := fieldData.RowNum()
switch dataType {
case schemapb.DataType_JSON:
jsonStrs := make([]string, 0, fieldData.RowNum())
for i := 0; i < fieldData.RowNum(); i++ {
jsonStrs := make([]string, 0, rowNum)
for i := 0; i < rowNum; i++ {
row := fieldData.GetRow(i)
jsonStrs = append(jsonStrs, string(row.([]byte)))
}
reader, err := CreateReader(jsonStrs)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = jsonStrs
case schemapb.DataType_BinaryVector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8)
chunkedRows := make([][dim / 8]byte, len(chunked))
rows := fieldData.GetRows().([]byte)
const rowBytes = dim / 8
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = chunkedRows
case schemapb.DataType_FloatVector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim)
rows := fieldData.GetRows().([]float32)
chunked := lo.Chunk(rows, dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = chunkedRows
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2)
chunkedRows := make([][dim * 2]byte, len(chunked))
rows := fieldData.GetRows().([]byte)
const rowBytes = dim * 2
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = chunkedRows
default:
reader, err := CreateReader(insertData.Data[fieldID].GetRows())
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = fieldData.GetRows()
}
reader, err := CreateReader(data)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
}
reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt)
@ -268,59 +262,54 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) {
io.ReaderAt
io.Seeker
}
var data interface{}
for fieldID, fieldData := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
if dataType == schemapb.DataType_JSON {
jsonStrs := make([]string, 0, fieldData.RowNum())
for i := 0; i < fieldData.RowNum(); i++ {
rowNum := fieldData.RowNum()
switch dataType {
case schemapb.DataType_JSON:
jsonStrs := make([]string, 0, rowNum)
for i := 0; i < rowNum; i++ {
row := fieldData.GetRow(i)
jsonStrs = append(jsonStrs, string(row.([]byte)))
}
reader, err := CreateReader(jsonStrs)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_FloatVector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim)
data = jsonStrs
case schemapb.DataType_BinaryVector:
rows := fieldData.GetRows().([]byte)
const rowBytes = dim / 8
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = chunkedRows
case schemapb.DataType_FloatVector:
rows := fieldData.GetRows().([]float32)
chunked := lo.Chunk(rows, dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2)
chunkedRows := make([][dim * 2]byte, len(chunked))
data = chunkedRows
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
rows := fieldData.GetRows().([]byte)
const rowBytes = dim * 2
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_BinaryVector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8)
chunkedRows := make([][dim / 8]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else {
reader, err := CreateReader(insertData.Data[fieldID].GetRows())
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = chunkedRows
default:
data = fieldData.GetRows()
}
reader, err := CreateReader(data)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
}
reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt)

View File

@ -484,3 +484,66 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
}
return columns, nil
}
func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([]map[string]any, error) {
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
rowNum := insertData.GetRowNum()
rows := make([]map[string]any, 0, rowNum)
for i := 0; i < rowNum; i++ {
data := make(map[int64]interface{})
for fieldID, v := range insertData.Data {
field := fieldIDToField[fieldID]
dataType := field.GetDataType()
elemType := field.GetElementType()
if field.GetAutoID() {
continue
}
switch dataType {
case schemapb.DataType_Array:
switch elemType {
case schemapb.DataType_Bool:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetBoolData().GetData()
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
case schemapb.DataType_Int64:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetLongData().GetData()
case schemapb.DataType_Float:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetFloatData().GetData()
case schemapb.DataType_Double:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetDoubleData().GetData()
case schemapb.DataType_String:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData()
}
case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte))
case schemapb.DataType_BinaryVector:
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {
ints = append(ints, int(b))
}
data[fieldID] = ints
case schemapb.DataType_Float16Vector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.Float16BytesToFloat32Vector(bytes)
case schemapb.DataType_BFloat16Vector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.BFloat16BytesToFloat32Vector(bytes)
case schemapb.DataType_SparseFloatVector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.SparseFloatBytesToMap(bytes)
default:
data[fieldID] = v.GetRow(i)
}
}
row := lo.MapKeys(data, func(_ any, fieldID int64) string {
return fieldIDToField[fieldID].GetName()
})
rows = append(rows, row)
}
return rows, nil
}

View File

@ -39,7 +39,6 @@ import (
"github.com/milvus-io/milvus/internal/util/testutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/milvus-io/milvus/tests/integration"
)
@ -110,87 +109,60 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche
path := fmt.Sprintf("%s/%s.npy", cm.RootPath(), field.GetName())
fieldID := field.GetFieldID()
fieldData := insertData.Data[fieldID]
dType := field.GetDataType()
switch dType {
case schemapb.DataType_Bool:
data = insertData.Data[fieldID].(*storage.BoolFieldData).Data
case schemapb.DataType_Int8:
data = insertData.Data[fieldID].(*storage.Int8FieldData).Data
case schemapb.DataType_Int16:
data = insertData.Data[fieldID].(*storage.Int16FieldData).Data
case schemapb.DataType_Int32:
data = insertData.Data[fieldID].(*storage.Int32FieldData).Data
case schemapb.DataType_Int64:
data = insertData.Data[fieldID].(*storage.Int64FieldData).Data
case schemapb.DataType_Float:
data = insertData.Data[fieldID].(*storage.FloatFieldData).Data
case schemapb.DataType_Double:
data = insertData.Data[fieldID].(*storage.DoubleFieldData).Data
case schemapb.DataType_String, schemapb.DataType_VarChar:
data = insertData.Data[fieldID].(*storage.StringFieldData).Data
case schemapb.DataType_BinaryVector:
vecData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Data
if dim != insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim))
rows := fieldData.GetRows().([]byte)
if dim != fieldData.(*storage.BinaryVectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.BinaryVectorFieldData).Dim))
}
const rowBytes = dim / 8
rows := len(vecData) / rowBytes
binVecData := make([][rowBytes]byte, 0, rows)
for i := 0; i < rows; i++ {
rowVec := [rowBytes]byte{}
copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes])
binVecData = append(binVecData, rowVec)
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = binVecData
data = chunkedRows
case schemapb.DataType_FloatVector:
vecData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Data
if dim != insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim))
rows := fieldData.GetRows().([]float32)
if dim != fieldData.(*storage.FloatVectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.FloatVectorFieldData).Dim))
}
rows := len(vecData) / dim
floatVecData := make([][dim]float32, 0, rows)
for i := 0; i < rows; i++ {
rowVec := [dim]float32{}
copy(rowVec[:], vecData[i*dim:(i+1)*dim])
floatVecData = append(floatVecData, rowVec)
chunked := lo.Chunk(rows, dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = floatVecData
data = chunkedRows
case schemapb.DataType_Float16Vector:
vecData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Data
if dim != insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim))
rows := insertData.Data[fieldID].GetRows().([]byte)
if dim != fieldData.(*storage.Float16VectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.Float16VectorFieldData).Dim))
}
const rowBytes = dim * 2
rows := len(vecData) / rowBytes
float16VecData := make([][rowBytes]byte, 0, rows)
for i := 0; i < rows; i++ {
rowVec := [rowBytes]byte{}
copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes])
float16VecData = append(float16VecData, rowVec)
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = float16VecData
data = chunkedRows
case schemapb.DataType_BFloat16Vector:
vecData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Data
if dim != insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim))
rows := insertData.Data[fieldID].GetRows().([]byte)
if dim != fieldData.(*storage.BFloat16VectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.BFloat16VectorFieldData).Dim))
}
const rowBytes = dim * 2
rows := len(vecData) / rowBytes
bfloat16VecData := make([][rowBytes]byte, 0, rows)
for i := 0; i < rows; i++ {
rowVec := [rowBytes]byte{}
copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes])
bfloat16VecData = append(bfloat16VecData, rowVec)
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = bfloat16VecData
data = chunkedRows
case schemapb.DataType_SparseFloatVector:
data = insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents()
case schemapb.DataType_JSON:
data = insertData.Data[fieldID].(*storage.JSONFieldData).Data
case schemapb.DataType_Array:
data = insertData.Data[fieldID].(*storage.ArrayFieldData).Data
default:
panic(fmt.Sprintf("unsupported data type: %s", dType.String()))
data = insertData.Data[fieldID].GetRows()
}
err := writeFn(path, data)
@ -207,47 +179,9 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche
func GenerateJSONFile(t *testing.T, filePath string, schema *schemapb.CollectionSchema, count int) {
insertData, err := testutil.CreateInsertData(schema, count)
assert.NoError(t, err)
rows := make([]map[string]any, 0, count)
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
for i := 0; i < count; i++ {
data := make(map[int64]interface{})
for fieldID, v := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
if fieldIDToField[fieldID].GetAutoID() {
continue
}
switch dataType {
case schemapb.DataType_Array:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte))
case schemapb.DataType_BinaryVector:
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {
ints = append(ints, int(b))
}
data[fieldID] = ints
case schemapb.DataType_Float16Vector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.Float16BytesToFloat32Vector(bytes)
case schemapb.DataType_BFloat16Vector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.BFloat16BytesToFloat32Vector(bytes)
case schemapb.DataType_SparseFloatVector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.SparseFloatBytesToMap(bytes)
default:
data[fieldID] = v.GetRow(i)
}
}
row := lo.MapKeys(data, func(_ any, fieldID int64) string {
return fieldIDToField[fieldID].GetName()
})
rows = append(rows, row)
}
rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData)
assert.NoError(t, err)
jsonBytes, err := json.Marshal(rows)
assert.NoError(t, err)