enhance: Support implicit type conversion for parquet (#29046)

issue: #29019

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/29143/head
cai.zhang 2023-12-12 16:14:44 +08:00 committed by GitHub
parent e5f155612a
commit 49b8657f95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 829 additions and 551 deletions

View File

@ -68,9 +68,6 @@ class ScalarIndex : public IndexBase {
virtual const TargetBitmap
Query(const DatasetPtr& dataset);
virtual const bool
HasRawData() const override = 0;
virtual int64_t
Size() = 0;
};

View File

@ -395,25 +395,25 @@ func (m *importManager) checkFlushDone(ctx context.Context, segIDs []UniqueID) (
return flushed, nil
}
func (m *importManager) isRowbased(files []string) (bool, error) {
isRowBased := false
func (m *importManager) isSingleFileTask(files []string) (bool, error) {
isSingleFile := false
for _, filePath := range files {
_, fileType := importutil.GetFileNameAndExt(filePath)
if fileType == importutil.JSONFileExt {
isRowBased = true
} else if isRowBased {
log.Error("row-based data file type must be JSON, mixed file types is not allowed", zap.Strings("files", files))
return isRowBased, fmt.Errorf("row-based data file type must be JSON or CSV, file type '%s' is not allowed", fileType)
if fileType == importutil.JSONFileExt || fileType == importutil.ParquetFileExt {
isSingleFile = true
} else if isSingleFile {
log.Error("row-based data file type must be JSON or Parquet, mixed file types is not allowed", zap.Strings("files", files))
return isSingleFile, fmt.Errorf("row-based data file type must be JSON or Parquet, file type '%s' is not allowed", fileType)
}
}
// for row_based, we only allow one file so that each invocation only generate a task
if isRowBased && len(files) > 1 {
log.Error("row-based import, only allow one JSON or CSV file each time", zap.Strings("files", files))
return isRowBased, fmt.Errorf("row-based import, only allow one JSON or CSV file each time")
if isSingleFile && len(files) > 1 {
log.Error("for JSON or parquet file, each task only accepts one file, not allowed to input multiple files", zap.Strings("files", files))
return isSingleFile, fmt.Errorf("for JSON or parquet file, each task only accepts one file, not allowed to input multiple files")
}
return isRowBased, nil
return isSingleFile, nil
}
// importJob processes the import request, generates import tasks, sends these tasks to DataCoord, and returns
@ -448,13 +448,13 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
capacity := cap(m.pendingTasks)
length := len(m.pendingTasks)
isRowBased, err := m.isRowbased(req.GetFiles())
isSingleFileTask, err := m.isSingleFileTask(req.GetFiles())
if err != nil {
return err
}
taskCount := 1
if isRowBased {
if isSingleFileTask {
taskCount = len(req.Files)
}
@ -466,7 +466,7 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
}
// convert import request to import tasks
if isRowBased {
if isSingleFileTask {
// For row-based importing, each file makes a task.
taskList := make([]int64, len(req.Files))
for i := 0; i < len(req.Files); i++ {

View File

@ -1079,26 +1079,26 @@ func TestImportManager_rearrangeTasks(t *testing.T) {
assert.Equal(t, int64(100), tasks[2].GetId())
}
func TestImportManager_isRowbased(t *testing.T) {
func TestImportManager_isSingleFileTask(t *testing.T) {
mgr := &importManager{}
files := []string{"1.json"}
rb, err := mgr.isRowbased(files)
rb, err := mgr.isSingleFileTask(files)
assert.NoError(t, err)
assert.True(t, rb)
files = []string{"1.json", "2.json"}
rb, err = mgr.isRowbased(files)
rb, err = mgr.isSingleFileTask(files)
assert.Error(t, err)
assert.True(t, rb)
files = []string{"1.json", "2.npy"}
rb, err = mgr.isRowbased(files)
rb, err = mgr.isSingleFileTask(files)
assert.Error(t, err)
assert.True(t, rb)
files = []string{"1.npy", "2.npy"}
rb, err = mgr.isRowbased(files)
rb, err = mgr.isSingleFileTask(files)
assert.NoError(t, err)
assert.False(t, rb)
}

View File

@ -23,10 +23,12 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"go.uber.org/zap"
"golang.org/x/exp/constraints"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type ParquetColumnReader struct {
@ -40,40 +42,276 @@ type ParquetColumnReader struct {
dimension int
}
func ReadData[T any](pcr *ParquetColumnReader, count int64, getDataFunc func(chunk arrow.Array) ([]T, error)) ([]T, error) {
func ReadBoolData(pcr *ParquetColumnReader, count int64) ([]bool, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
data := make([]T, 0, count)
data := make([]bool, 0, count)
for _, chunk := range chunked.Chunks() {
chunkData, err := getDataFunc(chunk)
if err != nil {
return nil, err
dataNums := chunk.Data().Len()
chunkData := make([]bool, dataNums)
boolReader, ok := chunk.(*array.Boolean)
if !ok {
log.Warn("the column data in parquet is not bool", zap.String("fieldName", pcr.fieldName), zap.String("actual type", chunk.DataType().Name()))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not bool of field: %s, but: %s", pcr.fieldName, chunk.DataType().Name()))
}
for i := 0; i < dataNums; i++ {
chunkData[i] = boolReader.Value(i)
}
data = append(data, chunkData...)
}
return data, nil
}
func ReadArrayData[T any](pcr *ParquetColumnReader, count int64, getArrayData func(offsets []int32, array arrow.Array) ([][]T, error)) ([][]T, error) {
func ReadIntegerOrFloatData[T constraints.Integer | constraints.Float](pcr *ParquetColumnReader, count int64) ([]T, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
arrayData := make([][]T, 0, count)
data := make([]T, 0, count)
for _, chunk := range chunked.Chunks() {
dataNums := chunk.Data().Len()
chunkData := make([]T, dataNums)
switch chunk.DataType().ID() {
case arrow.INT8:
int8Reader := chunk.(*array.Int8)
for i := 0; i < dataNums; i++ {
chunkData[i] = T(int8Reader.Value(i))
}
case arrow.INT16:
int16Reader := chunk.(*array.Int16)
for i := 0; i < dataNums; i++ {
chunkData[i] = T(int16Reader.Value(i))
}
case arrow.INT32:
int32Reader := chunk.(*array.Int32)
for i := 0; i < dataNums; i++ {
chunkData[i] = T(int32Reader.Value(i))
}
case arrow.INT64:
int64Reader := chunk.(*array.Int64)
for i := 0; i < dataNums; i++ {
chunkData[i] = T(int64Reader.Value(i))
}
case arrow.FLOAT32:
float32Reader := chunk.(*array.Float32)
for i := 0; i < dataNums; i++ {
chunkData[i] = T(float32Reader.Value(i))
}
case arrow.FLOAT64:
float64Reader := chunk.(*array.Float64)
for i := 0; i < dataNums; i++ {
chunkData[i] = T(float64Reader.Value(i))
}
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data type is not integer, neither float, but: %s", chunk.DataType().Name()))
}
data = append(data, chunkData...)
}
return data, nil
}
func ReadStringData(pcr *ParquetColumnReader, count int64) ([]string, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
data := make([]string, 0, count)
for _, chunk := range chunked.Chunks() {
dataNums := chunk.Data().Len()
chunkData := make([]string, dataNums)
stringReader, ok := chunk.(*array.String)
if !ok {
log.Warn("the column data in parquet is not string", zap.String("fieldName", pcr.fieldName), zap.String("actual type", chunk.DataType().Name()))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not string of field: %s, but: %s", pcr.fieldName, chunk.DataType().Name()))
}
for i := 0; i < dataNums; i++ {
chunkData[i] = stringReader.Value(i)
}
data = append(data, chunkData...)
}
return data, nil
}
func ReadBinaryData(pcr *ParquetColumnReader, count int64) ([]byte, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
data := make([]byte, 0, count)
for _, chunk := range chunked.Chunks() {
dataNums := chunk.Data().Len()
switch chunk.DataType().ID() {
case arrow.BINARY:
binaryReader := chunk.(*array.Binary)
for i := 0; i < dataNums; i++ {
data = append(data, binaryReader.Value(i)...)
}
case arrow.LIST:
listReader := chunk.(*array.List)
if !checkVectorIsRegular(listReader.Offsets(), pcr.dimension, true) {
log.Warn("Parquet parser: binary vector is irregular", zap.Int("dim", pcr.dimension), zap.Int32s("offsets", listReader.Offsets()))
return nil, merr.WrapErrImportFailed("binary vector is irregular")
}
uint8Reader, ok := listReader.ListValues().(*array.Uint8)
if !ok {
log.Warn("the column element data of array in parquet is not binary", zap.String("fieldName", pcr.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not binary: %s", pcr.fieldName))
}
for i := 0; i < uint8Reader.Len(); i++ {
data = append(data, uint8Reader.Value(i))
}
default:
log.Warn("the column element data of array in parquet is not binary", zap.String("fieldName", pcr.fieldName), zap.String("actual data type", chunk.DataType().Name()))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not binary: %s, it's: %s", pcr.fieldName, chunk.DataType().Name()))
}
}
return data, nil
}
func checkVectorIsRegular(offsets []int32, dim int, isBinary bool) bool {
if len(offsets) < 1 {
return false
}
if isBinary {
dim = dim / 8
}
start := offsets[0]
for i := 1; i < len(offsets); i++ {
if offsets[i]-start != int32(dim) {
return false
}
start = offsets[i]
}
return true
}
func ReadBoolArrayData(pcr *ParquetColumnReader, count int64) ([][]bool, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
data := make([][]bool, 0, count)
for _, chunk := range chunked.Chunks() {
listReader, ok := chunk.(*array.List)
if !ok {
log.Warn("the column data in parquet is not array", zap.String("fieldName", pcr.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not array of field: %s", pcr.fieldName))
log.Warn("the column data in parquet is not list", zap.String("fieldName", pcr.fieldName), zap.String("actual type", chunk.DataType().Name()))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not list of field: %s, but: %s", pcr.fieldName, chunk.DataType().Name()))
}
boolReader, ok := listReader.ListValues().(*array.Boolean)
if !ok {
log.Warn("the column data in parquet is not bool array", zap.String("fieldName", pcr.fieldName),
zap.String("actual type", listReader.ListValues().DataType().Name()))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not bool array of field: %s but: %s list", pcr.fieldName, listReader.ListValues().DataType().Name()))
}
offsets := listReader.Offsets()
chunkData, err := getArrayData(offsets, listReader.ListValues())
if err != nil {
return nil, err
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]bool, 0)
for j := start; j < end; j++ {
elementData = append(elementData, boolReader.Value(int(j)))
}
data = append(data, elementData)
}
arrayData = append(arrayData, chunkData...)
}
return arrayData, nil
return data, nil
}
func ReadIntegerOrFloatArrayData[T constraints.Integer | constraints.Float](pcr *ParquetColumnReader, count int64) ([][]T, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
data := make([][]T, 0, count)
getDataFunc := func(offsets []int32, getValue func(int) T) {
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]T, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, getValue(int(j)))
}
data = append(data, elementData)
}
}
for _, chunk := range chunked.Chunks() {
listReader, ok := chunk.(*array.List)
if !ok {
log.Warn("the column data in parquet is not list", zap.String("fieldName", pcr.fieldName), zap.String("actual type", chunk.DataType().Name()))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not list of field: %s, but: %s", pcr.fieldName, chunk.DataType().Name()))
}
offsets := listReader.Offsets()
if typeutil.IsVectorType(pcr.dataType) && !checkVectorIsRegular(offsets, pcr.dimension, pcr.dataType == schemapb.DataType_BinaryVector) {
log.Warn("Parquet parser: float vector is irregular", zap.Int("dim", pcr.dimension), zap.Int32s("offsets", listReader.Offsets()))
return nil, merr.WrapErrImportFailed("float vector is irregular")
}
valueReader := listReader.ListValues()
switch valueReader.DataType().ID() {
case arrow.INT8:
int8Reader := valueReader.(*array.Int8)
getDataFunc(offsets, func(i int) T {
return T(int8Reader.Value(i))
})
case arrow.INT16:
int16Reader := valueReader.(*array.Int16)
getDataFunc(offsets, func(i int) T {
return T(int16Reader.Value(i))
})
case arrow.INT32:
int32Reader := valueReader.(*array.Int32)
getDataFunc(offsets, func(i int) T {
return T(int32Reader.Value(i))
})
case arrow.INT64:
int64Reader := valueReader.(*array.Int64)
getDataFunc(offsets, func(i int) T {
return T(int64Reader.Value(i))
})
case arrow.FLOAT32:
float32Reader := valueReader.(*array.Float32)
getDataFunc(offsets, func(i int) T {
return T(float32Reader.Value(i))
})
case arrow.FLOAT64:
float64Reader := valueReader.(*array.Float64)
getDataFunc(offsets, func(i int) T {
return T(float64Reader.Value(i))
})
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data type is not integer array, neither float array, but: %s", valueReader.DataType().Name()))
}
}
return data, nil
}
func ReadStringArrayData(pcr *ParquetColumnReader, count int64) ([][]string, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
data := make([][]string, 0, count)
for _, chunk := range chunked.Chunks() {
listReader, ok := chunk.(*array.List)
if !ok {
log.Warn("the column data in parquet is not list", zap.String("fieldName", pcr.fieldName), zap.String("actual type", chunk.DataType().Name()))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not list of field: %s, but: %s", pcr.fieldName, chunk.DataType().Name()))
}
stringReader, ok := listReader.ListValues().(*array.String)
if !ok {
log.Warn("the column data in parquet is not string array", zap.String("fieldName", pcr.fieldName),
zap.String("actual type", listReader.ListValues().DataType().Name()))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not string array of field: %s but: %s list", pcr.fieldName, listReader.ListValues().DataType().Name()))
}
offsets := listReader.Offsets()
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]string, 0)
for j := start; j < end; j++ {
elementData = append(elementData, stringReader.Value(int(j)))
}
data = append(data, elementData)
}
}
return data, nil
}

View File

@ -22,7 +22,6 @@ import (
"fmt"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
@ -138,97 +137,159 @@ func (p *ParquetParser) Parse() error {
return nil
}
func (p *ParquetParser) checkFields() error {
for _, field := range p.collectionInfo.Schema.GetFields() {
if (field.GetIsPrimaryKey() && field.GetAutoID()) || field.GetIsDynamic() {
continue
}
if _, ok := p.columnMap[field.GetName()]; !ok {
log.Warn("there is no field in parquet file", zap.String("fieldName", field.GetName()))
return merr.WrapErrImportFailed(fmt.Sprintf("there is no field in parquet file of name: %s", field.GetName()))
}
}
return nil
}
func (p *ParquetParser) createReaders() error {
schema, err := p.fileReader.Schema()
if err != nil {
log.Warn("can't schema from file", zap.Error(err))
return err
}
for _, field := range p.collectionInfo.Schema.GetFields() {
dim, _ := getFieldDimension(field)
// The collection schema must be checked, so no errors will occur here.
schemaHelper, _ := typeutil.CreateSchemaHelper(p.collectionInfo.Schema)
parquetFields := schema.Fields()
for i, field := range parquetFields {
fieldSchema, err := schemaHelper.GetFieldFromName(field.Name)
if err != nil {
// TODO @cai.zhang: handle dynamic field
log.Warn("the field is not in schema, if it's a dynamic field, please reformat data by bulk_writer", zap.String("fieldName", field.Name))
return merr.WrapErrImportFailed(fmt.Sprintf("the field: %s is not in schema, if it's a dynamic field, please reformat data by bulk_writer", field.Name))
}
if _, ok := p.columnMap[field.Name]; ok {
log.Warn("there is multi field of fieldName", zap.String("fieldName", field.Name),
zap.Ints("file fields indices", schema.FieldIndices(field.Name)))
return merr.WrapErrImportFailed(fmt.Sprintf("there is multi field of fieldName: %s", field.Name))
}
if fieldSchema.GetIsPrimaryKey() && fieldSchema.GetAutoID() {
log.Warn("the field is primary key, and autoID is true, please remove it from file", zap.String("fieldName", field.Name))
return merr.WrapErrImportFailed(fmt.Sprintf("the field: %s is primary key, and autoID is true, please remove it from file", field.Name))
}
arrowType, isList := convertArrowSchemaToDataType(field, false)
dataType := fieldSchema.GetDataType()
if isList {
if !typeutil.IsVectorType(dataType) && dataType != schemapb.DataType_Array {
log.Warn("field schema is not match",
zap.String("collection schema", dataType.String()),
zap.String("file schema", field.Type.Name()))
return merr.WrapErrImportFailed(fmt.Sprintf("field schema is not match, collection field dataType: %s, file field dataType:%s", dataType.String(), field.Type.Name()))
}
if dataType == schemapb.DataType_Array {
dataType = fieldSchema.GetElementType()
}
}
if !isConvertible(arrowType, dataType, isList) {
log.Warn("field schema is not match",
zap.String("collection schema", dataType.String()),
zap.String("file schema", field.Type.Name()))
return merr.WrapErrImportFailed(fmt.Sprintf("field schema is not match, collection field dataType: %s, file field dataType:%s", dataType.String(), field.Type.Name()))
}
// Here, the scalar column does not have a dim field,
// and the dim type of the vector column must have been checked, so there is no error catch here.
dim, _ := getFieldDimension(fieldSchema)
parquetColumnReader := &ParquetColumnReader{
fieldName: field.GetName(),
fieldID: field.GetFieldID(),
dataType: field.GetDataType(),
elementType: field.GetElementType(),
fieldName: fieldSchema.GetName(),
fieldID: fieldSchema.GetFieldID(),
dataType: fieldSchema.GetDataType(),
elementType: fieldSchema.GetElementType(),
dimension: dim,
}
fields, exist := schema.FieldsByName(field.GetName())
if !exist {
if !(field.GetIsPrimaryKey() && field.GetAutoID()) && !field.GetIsDynamic() {
log.Warn("there is no field in parquet file", zap.String("fieldName", field.GetName()))
return merr.WrapErrImportFailed(fmt.Sprintf("there is no field: %s in parquet file", field.GetName()))
}
} else {
if len(fields) != 1 {
log.Warn("there is multi field of fieldName", zap.String("fieldName", field.GetName()), zap.Any("file fields", fields))
return merr.WrapErrImportFailed(fmt.Sprintf("there is multi field of fieldName: %s", field.GetName()))
}
if !verifyFieldSchema(field.GetDataType(), field.GetElementType(), fields[0]) {
if fields[0].Type.ID() == arrow.LIST {
log.Warn("field schema is not match",
zap.String("fieldName", field.GetName()),
zap.String("collection schema", field.GetDataType().String()),
zap.String("file schema", fields[0].Type.Name()),
zap.String("collection schema element type", field.GetElementType().String()),
zap.String("file list element type", fields[0].Type.(*arrow.ListType).ElemField().Type.Name()))
return merr.WrapErrImportFailed(fmt.Sprintf("array field schema is not match of field: %s, collection field element dataType: %s, file field element dataType:%s",
field.GetName(), field.GetElementType().String(), fields[0].Type.(*arrow.ListType).ElemField().Type.Name()))
}
log.Warn("field schema is not match",
zap.String("fieldName", field.GetName()),
zap.String("collection schema", field.GetDataType().String()),
zap.String("file schema", fields[0].Type.Name()))
return merr.WrapErrImportFailed(fmt.Sprintf("schema is not match of field: %s, collection field dataType: %s, file field dataType:%s",
field.GetName(), field.GetDataType().String(), fields[0].Type.Name()))
}
indices := schema.FieldIndices(field.GetName())
if len(indices) != 1 {
log.Warn("field is not match", zap.String("fieldName", field.GetName()), zap.Ints("indices", indices))
return merr.WrapErrImportFailed(fmt.Sprintf("there is %d indices of fieldName: %s", len(indices), field.GetName()))
}
parquetColumnReader.columnIndex = indices[0]
columnReader, err := p.fileReader.GetColumn(p.ctx, parquetColumnReader.columnIndex)
if err != nil {
log.Warn("get column reader failed", zap.String("fieldName", field.GetName()), zap.Error(err))
return err
}
parquetColumnReader.columnReader = columnReader
p.columnMap[field.GetName()] = parquetColumnReader
parquetColumnReader.columnIndex = i
columnReader, err := p.fileReader.GetColumn(p.ctx, parquetColumnReader.columnIndex)
if err != nil {
log.Warn("get column reader failed", zap.String("fieldName", field.Name), zap.Error(err))
return err
}
parquetColumnReader.columnReader = columnReader
p.columnMap[field.Name] = parquetColumnReader
}
if err = p.checkFields(); err != nil {
return err
}
return nil
}
func verifyFieldSchema(dataType, elementType schemapb.DataType, fileField arrow.Field) bool {
switch fileField.Type.ID() {
func convertArrowSchemaToDataType(field arrow.Field, isList bool) (schemapb.DataType, bool) {
switch field.Type.ID() {
case arrow.BOOL:
return dataType == schemapb.DataType_Bool
return schemapb.DataType_Bool, false
case arrow.UINT8:
if isList {
return schemapb.DataType_BinaryVector, false
}
return schemapb.DataType_None, false
case arrow.INT8:
return dataType == schemapb.DataType_Int8
return schemapb.DataType_Int8, false
case arrow.INT16:
return dataType == schemapb.DataType_Int16
return schemapb.DataType_Int16, false
case arrow.INT32:
return dataType == schemapb.DataType_Int32
return schemapb.DataType_Int32, false
case arrow.INT64:
return dataType == schemapb.DataType_Int64
return schemapb.DataType_Int64, false
case arrow.FLOAT16:
if isList {
return schemapb.DataType_Float16Vector, false
}
return schemapb.DataType_None, false
case arrow.FLOAT32:
return dataType == schemapb.DataType_Float
return schemapb.DataType_Float, false
case arrow.FLOAT64:
return dataType == schemapb.DataType_Double
return schemapb.DataType_Double, false
case arrow.STRING:
return dataType == schemapb.DataType_VarChar || dataType == schemapb.DataType_String || dataType == schemapb.DataType_JSON
return schemapb.DataType_VarChar, false
case arrow.BINARY:
return schemapb.DataType_BinaryVector, false
case arrow.LIST:
if dataType != schemapb.DataType_Array && dataType != schemapb.DataType_FloatVector &&
dataType != schemapb.DataType_Float16Vector && dataType != schemapb.DataType_BinaryVector {
return false
}
if dataType == schemapb.DataType_Array {
return verifyFieldSchema(elementType, schemapb.DataType_None, fileField.Type.(*arrow.ListType).ElemField())
}
return true
elementType, _ := convertArrowSchemaToDataType(field.Type.(*arrow.ListType).ElemField(), true)
return elementType, true
default:
return schemapb.DataType_None, false
}
}
func isConvertible(src, dst schemapb.DataType, isList bool) bool {
switch src {
case schemapb.DataType_Bool:
return typeutil.IsBoolType(dst)
case schemapb.DataType_Int8:
return typeutil.IsArithmetic(dst)
case schemapb.DataType_Int16:
return typeutil.IsArithmetic(dst) && dst != schemapb.DataType_Int8
case schemapb.DataType_Int32:
return typeutil.IsArithmetic(dst) && dst != schemapb.DataType_Int8 && dst != schemapb.DataType_Int16
case schemapb.DataType_Int64:
return typeutil.IsFloatingType(dst) || dst == schemapb.DataType_Int64
case schemapb.DataType_Float:
if isList && dst == schemapb.DataType_FloatVector {
return true
}
return typeutil.IsFloatingType(dst)
case schemapb.DataType_Double:
if isList && dst == schemapb.DataType_FloatVector {
return true
}
return dst == schemapb.DataType_Double
case schemapb.DataType_String, schemapb.DataType_VarChar:
return typeutil.IsStringType(dst) || typeutil.IsJSONType(dst)
case schemapb.DataType_JSON:
return typeutil.IsJSONType(dst)
case schemapb.DataType_BinaryVector:
return dst == schemapb.DataType_BinaryVector
case schemapb.DataType_Float16Vector:
return dst == schemapb.DataType_Float16Vector
default:
return false
}
return false
}
// Close closes the parquet file reader
@ -342,18 +403,7 @@ func (p *ParquetParser) consume() error {
func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int64) (storage.FieldData, error) {
switch columnReader.dataType {
case schemapb.DataType_Bool:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]bool, error) {
boolReader, ok := chunk.(*array.Boolean)
if !ok {
log.Warn("the column data in parquet is not bool", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not bool of field: %s", columnReader.fieldName))
}
boolData := make([]bool, boolReader.Data().Len())
for i := 0; i < boolReader.Data().Len(); i++ {
boolData[i] = boolReader.Value(i)
}
return boolData, nil
})
data, err := ReadBoolData(columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read bool array", zap.Error(err))
return nil, err
@ -363,18 +413,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Data: data,
}, nil
case schemapb.DataType_Int8:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int8, error) {
int8Reader, ok := chunk.(*array.Int8)
if !ok {
log.Warn("the column data in parquet is not int8", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int8 of field: %s", columnReader.fieldName))
}
int8Data := make([]int8, int8Reader.Data().Len())
for i := 0; i < int8Reader.Data().Len(); i++ {
int8Data[i] = int8Reader.Value(i)
}
return int8Data, nil
})
data, err := ReadIntegerOrFloatData[int8](columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read int8 array", zap.Error(err))
return nil, err
@ -384,18 +423,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Data: data,
}, nil
case schemapb.DataType_Int16:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int16, error) {
int16Reader, ok := chunk.(*array.Int16)
if !ok {
log.Warn("the column data in parquet is not int16", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int16 of field: %s", columnReader.fieldName))
}
int16Data := make([]int16, int16Reader.Data().Len())
for i := 0; i < int16Reader.Data().Len(); i++ {
int16Data[i] = int16Reader.Value(i)
}
return int16Data, nil
})
data, err := ReadIntegerOrFloatData[int16](columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to int16 array", zap.Error(err))
return nil, err
@ -405,18 +433,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Data: data,
}, nil
case schemapb.DataType_Int32:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int32, error) {
int32Reader, ok := chunk.(*array.Int32)
if !ok {
log.Warn("the column data in parquet is not int32", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int32 of field: %s", columnReader.fieldName))
}
int32Data := make([]int32, int32Reader.Data().Len())
for i := 0; i < int32Reader.Data().Len(); i++ {
int32Data[i] = int32Reader.Value(i)
}
return int32Data, nil
})
data, err := ReadIntegerOrFloatData[int32](columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read int32 array", zap.Error(err))
return nil, err
@ -426,18 +443,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Data: data,
}, nil
case schemapb.DataType_Int64:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]int64, error) {
int64Reader, ok := chunk.(*array.Int64)
if !ok {
log.Warn("the column data in parquet is not int64", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not int64 of field: %s", columnReader.fieldName))
}
int64Data := make([]int64, int64Reader.Data().Len())
for i := 0; i < int64Reader.Data().Len(); i++ {
int64Data[i] = int64Reader.Value(i)
}
return int64Data, nil
})
data, err := ReadIntegerOrFloatData[int64](columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read int64 array", zap.Error(err))
return nil, err
@ -447,18 +453,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Data: data,
}, nil
case schemapb.DataType_Float:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]float32, error) {
float32Reader, ok := chunk.(*array.Float32)
if !ok {
log.Warn("the column data in parquet is not float", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not float of field: %s", columnReader.fieldName))
}
float32Data := make([]float32, float32Reader.Data().Len())
for i := 0; i < float32Reader.Data().Len(); i++ {
float32Data[i] = float32Reader.Value(i)
}
return float32Data, nil
})
data, err := ReadIntegerOrFloatData[float32](columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read float array", zap.Error(err))
return nil, err
@ -474,18 +469,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Data: data,
}, nil
case schemapb.DataType_Double:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]float64, error) {
float64Reader, ok := chunk.(*array.Float64)
if !ok {
log.Warn("the column data in parquet is not double", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not double of field: %s", columnReader.fieldName))
}
float64Data := make([]float64, float64Reader.Data().Len())
for i := 0; i < float64Reader.Data().Len(); i++ {
float64Data[i] = float64Reader.Value(i)
}
return float64Data, nil
})
data, err := ReadIntegerOrFloatData[float64](columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read double array", zap.Error(err))
return nil, err
@ -501,18 +485,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Data: data,
}, nil
case schemapb.DataType_VarChar, schemapb.DataType_String:
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]string, error) {
stringReader, ok := chunk.(*array.String)
if !ok {
log.Warn("the column data in parquet is not string", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not string of field: %s", columnReader.fieldName))
}
stringData := make([]string, stringReader.Data().Len())
for i := 0; i < stringReader.Data().Len(); i++ {
stringData[i] = stringReader.Value(i)
}
return stringData, nil
})
data, err := ReadStringData(columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read varchar array", zap.Error(err))
return nil, err
@ -523,18 +496,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
}, nil
case schemapb.DataType_JSON:
// JSON field read data from string array Parquet
data, err := ReadData(columnReader, rowCount, func(chunk arrow.Array) ([]string, error) {
stringReader, ok := chunk.(*array.String)
if !ok {
log.Warn("the column data in parquet is not json string", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column data in parquet is not json string of field: %s", columnReader.fieldName))
}
stringData := make([]string, stringReader.Data().Len())
for i := 0; i < stringReader.Data().Len(); i++ {
stringData[i] = stringReader.Value(i)
}
return stringData, nil
})
data, err := ReadStringData(columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read json string array", zap.Error(err))
return nil, err
@ -556,149 +518,36 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Data: byteArr,
}, nil
case schemapb.DataType_BinaryVector:
data, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]uint8, error) {
arrayData := make([][]uint8, 0, len(offsets))
uint8Reader, ok := reader.(*array.Uint8)
if !ok {
log.Warn("the column element data of array in parquet is not binary", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not binary: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]uint8, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, uint8Reader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
binaryData, err := ReadBinaryData(columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read binary vector array", zap.Error(err))
return nil, err
}
binaryData := make([]byte, 0)
for _, arr := range data {
binaryData = append(binaryData, arr...)
}
if len(binaryData) != len(data)*columnReader.dimension/8 {
log.Warn("Parquet parser: binary vector is irregular", zap.Int("actual num", len(binaryData)),
zap.Int("expect num", len(data)*columnReader.dimension/8))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("binary vector is irregular, expect num = %d,"+
" actual num = %d", len(data)*columnReader.dimension/8, len(binaryData)))
}
return &storage.BinaryVectorFieldData{
Data: binaryData,
Dim: columnReader.dimension,
}, nil
case schemapb.DataType_FloatVector:
data := make([]float32, 0)
rowNum := 0
if columnReader.columnReader.Field().Type.(*arrow.ListType).Elem().ID() == arrow.FLOAT32 {
arrayData, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float32, error) {
arrayData := make([][]float32, 0, len(offsets))
float32Reader, ok := reader.(*array.Float32)
if !ok {
log.Warn("the column element data of array in parquet is not float", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not float: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]float32, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, float32Reader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
if err != nil {
log.Warn("Parquet parser: failed to read float vector array", zap.Error(err))
return nil, err
}
for _, arr := range arrayData {
data = append(data, arr...)
}
err = typeutil.VerifyFloats32(data)
if err != nil {
log.Warn("Parquet parser: illegal value in float vector array", zap.Error(err))
return nil, err
}
rowNum = len(arrayData)
} else if columnReader.columnReader.Field().Type.(*arrow.ListType).Elem().ID() == arrow.FLOAT64 {
arrayData, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float64, error) {
arrayData := make([][]float64, 0, len(offsets))
float64Reader, ok := reader.(*array.Float64)
if !ok {
log.Warn("the column element data of array in parquet is not double", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not double: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]float64, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, float64Reader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
if err != nil {
log.Warn("Parquet parser: failed to read float vector array", zap.Error(err))
return nil, err
}
for _, arr := range arrayData {
for _, f64 := range arr {
err = typeutil.VerifyFloat(f64)
if err != nil {
log.Warn("Parquet parser: illegal value in float vector array", zap.Error(err))
return nil, err
}
data = append(data, float32(f64))
}
}
rowNum = len(arrayData)
} else {
log.Warn("Parquet parser: FloatVector type is not float", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("FloatVector type is not float, is: %s",
columnReader.columnReader.Field().Type.(*arrow.ListType).Elem().ID().String()))
arrayData, err := ReadIntegerOrFloatArrayData[float32](columnReader, rowCount)
if err != nil {
log.Warn("Parquet parser: failed to read float vector array", zap.Error(err))
return nil, err
}
if len(data) != rowNum*columnReader.dimension {
log.Warn("Parquet parser: float vector is irregular", zap.Int("actual num", len(data)),
zap.Int("expect num", rowNum*columnReader.dimension))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("float vector is irregular, expect num = %d,"+
" actual num = %d", rowNum*columnReader.dimension, len(data)))
data := make([]float32, 0, len(arrayData)*columnReader.dimension)
for _, arr := range arrayData {
data = append(data, arr...)
}
return &storage.FloatVectorFieldData{
Data: data,
Dim: columnReader.dimension,
}, nil
case schemapb.DataType_Array:
data := make([]*schemapb.ScalarField, 0)
switch columnReader.elementType {
case schemapb.DataType_Bool:
boolArray, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]bool, error) {
arrayData := make([][]bool, 0, len(offsets))
boolReader, ok := reader.(*array.Boolean)
if !ok {
log.Warn("the column element data of array in parquet is not bool", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not bool: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]bool, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, boolReader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
boolArray, err := ReadBoolArrayData(columnReader, rowCount)
if err != nil {
return nil, err
}
@ -712,23 +561,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
})
}
case schemapb.DataType_Int8:
int8Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) {
arrayData := make([][]int32, 0, len(offsets))
int8Reader, ok := reader.(*array.Int8)
if !ok {
log.Warn("the column element data of array in parquet is not int8", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not int8: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]int32, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, int32(int8Reader.Value(int(j))))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
int8Array, err := ReadIntegerOrFloatArrayData[int32](columnReader, rowCount)
if err != nil {
return nil, err
}
@ -742,23 +575,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
})
}
case schemapb.DataType_Int16:
int16Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) {
arrayData := make([][]int32, 0, len(offsets))
int16Reader, ok := reader.(*array.Int16)
if !ok {
log.Warn("the column element data of array in parquet is not int16", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not int16: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]int32, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, int32(int16Reader.Value(int(j))))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
int16Array, err := ReadIntegerOrFloatArrayData[int32](columnReader, rowCount)
if err != nil {
return nil, err
}
@ -773,23 +590,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
}
case schemapb.DataType_Int32:
int32Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int32, error) {
arrayData := make([][]int32, 0, len(offsets))
int32Reader, ok := reader.(*array.Int32)
if !ok {
log.Warn("the column element data of array in parquet is not int32", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not int32: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]int32, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, int32Reader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
int32Array, err := ReadIntegerOrFloatArrayData[int32](columnReader, rowCount)
if err != nil {
return nil, err
}
@ -804,23 +605,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
}
case schemapb.DataType_Int64:
int64Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]int64, error) {
arrayData := make([][]int64, 0, len(offsets))
int64Reader, ok := reader.(*array.Int64)
if !ok {
log.Warn("the column element data of array in parquet is not int64", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not int64: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]int64, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, int64Reader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
int64Array, err := ReadIntegerOrFloatArrayData[int64](columnReader, rowCount)
if err != nil {
return nil, err
}
@ -835,23 +620,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
}
case schemapb.DataType_Float:
float32Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float32, error) {
arrayData := make([][]float32, 0, len(offsets))
float32Reader, ok := reader.(*array.Float32)
if !ok {
log.Warn("the column element data of array in parquet is not float", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not float: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]float32, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, float32Reader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
float32Array, err := ReadIntegerOrFloatArrayData[float32](columnReader, rowCount)
if err != nil {
return nil, err
}
@ -866,23 +635,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
}
case schemapb.DataType_Double:
float64Array, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]float64, error) {
arrayData := make([][]float64, 0, len(offsets))
float64Reader, ok := reader.(*array.Float64)
if !ok {
log.Warn("the column element data of array in parquet is not double", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not double: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]float64, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, float64Reader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
float64Array, err := ReadIntegerOrFloatArrayData[float64](columnReader, rowCount)
if err != nil {
return nil, err
}
@ -897,23 +650,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
}
case schemapb.DataType_VarChar, schemapb.DataType_String:
stringArray, err := ReadArrayData(columnReader, rowCount, func(offsets []int32, reader arrow.Array) ([][]string, error) {
arrayData := make([][]string, 0, len(offsets))
stringReader, ok := reader.(*array.String)
if !ok {
log.Warn("the column element data of array in parquet is not string", zap.String("fieldName", columnReader.fieldName))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the column element data of array in parquet is not string: %s", columnReader.fieldName))
}
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]string, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, stringReader.Value(int(j)))
}
arrayData = append(arrayData, elementData)
}
return arrayData, nil
})
stringArray, err := ReadStringArrayData(columnReader, rowCount)
if err != nil {
return nil, err
}

View File

@ -30,10 +30,13 @@ import (
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// parquetSampleSchema() return a schema contains all supported data types with an int64 primary key
@ -180,7 +183,7 @@ func parquetSampleSchema() *schemapb.CollectionSchema {
ElementType: schemapb.DataType_Float,
},
{
FieldID: 118,
FieldID: 119,
Name: "FieldArrayDouble",
IsPrimaryKey: false,
Description: "int16 array",
@ -203,12 +206,22 @@ func parquetSampleSchema() *schemapb.CollectionSchema {
DataType: schemapb.DataType_JSON,
IsDynamic: true,
},
{
FieldID: 122,
Name: "FieldBinaryVector2",
IsPrimaryKey: false,
Description: "binary_vector2",
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "64"},
},
},
},
}
return schema
}
func milvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataType {
func milvusDataTypeToArrowType(dataType schemapb.DataType, isBinary bool) arrow.DataType {
switch dataType {
case schemapb.DataType_Bool:
return &arrow.BooleanType{}
@ -238,6 +251,9 @@ func milvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataTy
Metadata: arrow.Metadata{},
})
case schemapb.DataType_BinaryVector:
if isBinary {
return &arrow.BinaryType{}
}
return arrow.ListOfField(arrow.Field{
Name: "item",
Type: &arrow.Uint8Type{},
@ -259,13 +275,12 @@ func milvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataTy
func convertMilvusSchemaToArrowSchema(schema *schemapb.CollectionSchema) *arrow.Schema {
fields := make([]arrow.Field, 0)
for _, field := range schema.GetFields() {
dim, _ := getFieldDimension(field)
if field.GetDataType() == schemapb.DataType_Array {
fields = append(fields, arrow.Field{
Name: field.GetName(),
Type: arrow.ListOfField(arrow.Field{
Name: "item",
Type: milvusDataTypeToArrowType(field.GetElementType(), dim),
Type: milvusDataTypeToArrowType(field.GetElementType(), false),
Nullable: true,
Metadata: arrow.Metadata{},
}),
@ -276,7 +291,7 @@ func convertMilvusSchemaToArrowSchema(schema *schemapb.CollectionSchema) *arrow.
}
fields = append(fields, arrow.Field{
Name: field.GetName(),
Type: milvusDataTypeToArrowType(field.GetDataType(), dim),
Type: milvusDataTypeToArrowType(field.GetDataType(), field.Name == "FieldBinaryVector2"),
Nullable: true,
Metadata: arrow.Metadata{},
})
@ -284,7 +299,7 @@ func convertMilvusSchemaToArrowSchema(schema *schemapb.CollectionSchema) *arrow.
return arrow.NewSchema(fields, nil)
}
func buildArrayData(dataType, elementType schemapb.DataType, dim, rows, arrLen int) arrow.Array {
func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int, isBinary bool) arrow.Array {
mem := memory.NewGoAllocator()
switch dataType {
case schemapb.DataType_Bool:
@ -349,6 +364,17 @@ func buildArrayData(dataType, elementType schemapb.DataType, dim, rows, arrLen i
builder.AppendValues(offsets, valid)
return builder.NewListArray()
case schemapb.DataType_BinaryVector:
if isBinary {
builder := array.NewBinaryBuilder(mem, &arrow.BinaryType{})
for i := 0; i < rows; i++ {
element := make([]byte, dim/8)
for j := 0; j < dim/8; j++ {
element[j] = randomString(1)[0]
}
builder.Append(element)
}
return builder.NewBinaryArray()
}
builder := array.NewListBuilder(mem, &arrow.Uint8Type{})
offsets := make([]int32, 0, rows)
valid := make([]bool, 0)
@ -372,11 +398,10 @@ func buildArrayData(dataType, elementType schemapb.DataType, dim, rows, arrLen i
valid := make([]bool, 0, rows)
index := 0
for i := 0; i < rows; i++ {
index += arrLen
index += i % 10
offsets = append(offsets, int32(index))
valid = append(valid, true)
}
index += arrLen
switch elementType {
case schemapb.DataType_Bool:
builder := array.NewListBuilder(mem, &arrow.BooleanType{})
@ -460,7 +485,69 @@ func writeParquet(w io.Writer, milvusSchema *schemapb.CollectionSchema, numRows
columns := make([]arrow.Array, 0, len(milvusSchema.Fields))
for _, field := range milvusSchema.Fields {
dim, _ := getFieldDimension(field)
columnData := buildArrayData(field.DataType, field.ElementType, dim, batch, 10)
columnData := buildArrayData(field.DataType, field.ElementType, dim, batch, field.Name == "FieldBinaryVector2")
columns = append(columns, columnData)
}
recordBatch := array.NewRecord(schema, columns, int64(batch))
err = fw.Write(recordBatch)
if err != nil {
return err
}
}
return nil
}
func writeLessFieldParquet(w io.Writer, milvusSchema *schemapb.CollectionSchema, numRows int) error {
for i, field := range milvusSchema.Fields {
if field.GetName() == "FieldInt64" {
milvusSchema.Fields = append(milvusSchema.Fields[:i], milvusSchema.Fields[i+1:]...)
break
}
}
schema := convertMilvusSchemaToArrowSchema(milvusSchema)
fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
if err != nil {
return err
}
defer fw.Close()
batch := 1000
for i := 0; i <= numRows/batch; i++ {
columns := make([]arrow.Array, 0, len(milvusSchema.Fields))
for _, field := range milvusSchema.Fields {
dim, _ := getFieldDimension(field)
columnData := buildArrayData(field.DataType, field.ElementType, dim, batch, field.Name == "FieldBinaryVector2")
columns = append(columns, columnData)
}
recordBatch := array.NewRecord(schema, columns, int64(batch))
err = fw.Write(recordBatch)
if err != nil {
return err
}
}
return nil
}
func writeMoreFieldParquet(w io.Writer, milvusSchema *schemapb.CollectionSchema, numRows int) error {
milvusSchema.Fields = append(milvusSchema.Fields, &schemapb.FieldSchema{
FieldID: 200,
Name: "FieldMore",
DataType: schemapb.DataType_Int64,
})
schema := convertMilvusSchemaToArrowSchema(milvusSchema)
fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
if err != nil {
return err
}
defer fw.Close()
batch := 1000
for i := 0; i <= numRows/batch; i++ {
columns := make([]arrow.Array, 0, len(milvusSchema.Fields)+1)
for _, field := range milvusSchema.Fields {
dim, _ := getFieldDimension(field)
columnData := buildArrayData(field.DataType, field.ElementType, dim, batch, field.Name == "FieldBinaryVector2")
columns = append(columns, columnData)
}
recordBatch := array.NewRecord(schema, columns, int64(batch))
@ -482,8 +569,9 @@ func randomString(length int) string {
return string(b)
}
func TestParquetReader(t *testing.T) {
filePath := "/tmp/wp.parquet"
func TestParquetParser(t *testing.T) {
paramtable.Init()
filePath := "/tmp/parser.parquet"
ctx := context.Background()
schema := parquetSampleSchema()
idAllocator := newIDAllocator(ctx, t, nil)
@ -516,6 +604,23 @@ func TestParquetReader(t *testing.T) {
err = parquetParser.Parse()
assert.NoError(t, err)
})
}
func TestParquetReader_Error(t *testing.T) {
paramtable.Init()
filePath := "/tmp/par_err.parquet"
ctx := context.Background()
schema := parquetSampleSchema()
idAllocator := newIDAllocator(ctx, t, nil)
defer os.Remove(filePath)
writeFile := func() {
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(t, err)
err = writeParquet(wf, schema, 100)
assert.NoError(t, err)
}
writeFile()
t.Run("field not exist", func(t *testing.T) {
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
@ -561,6 +666,26 @@ func TestParquetReader(t *testing.T) {
schema = parquetSampleSchema()
})
t.Run("list data mismatch", func(t *testing.T) {
schema.Fields[11].DataType = schemapb.DataType_Bool
schema.Fields[11].ElementType = schemapb.DataType_None
cm := createLocalChunkManager(t)
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
parquetParser, err := NewParquetParser(ctx, collectionInfo, idAllocator, 10240, cm, filePath, flushFunc, nil)
assert.NoError(t, err)
defer parquetParser.Close()
err = parquetParser.Parse()
assert.Error(t, err)
// reset schema
schema = parquetSampleSchema()
})
t.Run("data not match", func(t *testing.T) {
cm := createLocalChunkManager(t)
flushFunc := func(fields BlockData, shardID int, partID int64) error {
@ -584,7 +709,7 @@ func TestParquetReader(t *testing.T) {
})
t.Run("read not int8 field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldInt16"]
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_Int8
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
@ -592,7 +717,7 @@ func TestParquetReader(t *testing.T) {
})
t.Run("read not int16 field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldInt32"]
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_Int16
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
@ -600,7 +725,7 @@ func TestParquetReader(t *testing.T) {
})
t.Run("read not int32 field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldInt64"]
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_Int32
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
@ -608,7 +733,7 @@ func TestParquetReader(t *testing.T) {
})
t.Run("read not int64 field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldFloat"]
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_Int64
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
@ -616,7 +741,7 @@ func TestParquetReader(t *testing.T) {
})
t.Run("read not float field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldDouble"]
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_Float
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
@ -648,6 +773,24 @@ func TestParquetReader(t *testing.T) {
assert.Nil(t, data)
})
t.Run("read not array field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_Array
columnReader.elementType = schemapb.DataType_Int64
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
assert.Nil(t, data)
})
t.Run("read not array field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_Array
columnReader.elementType = schemapb.DataType_VarChar
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
assert.Nil(t, data)
})
t.Run("read not bool array field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldArrayString"]
columnReader.dataType = schemapb.DataType_Array
@ -728,7 +871,7 @@ func TestParquetReader(t *testing.T) {
assert.Nil(t, data)
})
t.Run("read irregular float vector", func(t *testing.T) {
t.Run("read irregular float vector field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldArrayFloat"]
columnReader.dataType = schemapb.DataType_FloatVector
data, err := parquetParser.readData(columnReader, 1024)
@ -736,7 +879,7 @@ func TestParquetReader(t *testing.T) {
assert.Nil(t, data)
})
t.Run("read irregular float vector", func(t *testing.T) {
t.Run("read irregular float vector field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldArrayDouble"]
columnReader.dataType = schemapb.DataType_FloatVector
data, err := parquetParser.readData(columnReader, 1024)
@ -744,6 +887,14 @@ func TestParquetReader(t *testing.T) {
assert.Nil(t, data)
})
t.Run("read not binary vector field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_BinaryVector
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
assert.Nil(t, data)
})
t.Run("read not binary vector field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldArrayBool"]
columnReader.dataType = schemapb.DataType_BinaryVector
@ -752,6 +903,14 @@ func TestParquetReader(t *testing.T) {
assert.Nil(t, data)
})
t.Run("read irregular binary vector field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldArrayInt64"]
columnReader.dataType = schemapb.DataType_BinaryVector
data, err := parquetParser.readData(columnReader, 1024)
assert.Error(t, err)
assert.Nil(t, data)
})
t.Run("read not json field", func(t *testing.T) {
columnReader := parquetParser.columnMap["FieldBool"]
columnReader.dataType = schemapb.DataType_JSON
@ -808,6 +967,7 @@ func TestParquetReader(t *testing.T) {
}
func TestNewParquetParser(t *testing.T) {
paramtable.Init()
ctx := context.Background()
t.Run("nil collectionInfo", func(t *testing.T) {
parquetParser, err := NewParquetParser(ctx, nil, nil, 10240, nil, "", nil, nil)
@ -846,106 +1006,174 @@ func TestNewParquetParser(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, parquetParser)
})
//
//t.Run("create reader with closed file", func(t *testing.T) {
// collectionInfo, err := NewCollectionInfo(parquetSampleSchema(), 2, []int64{1})
// assert.NoError(t, err)
//
// idAllocator := newIDAllocator(ctx, t, nil)
// cm := createLocalChunkManager(t)
// flushFunc := func(fields BlockData, shardID int, partID int64) error {
// return nil
// }
//
// rf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
// assert.NoError(t, err)
// r := storage.NewLocalFile(rf)
//
// parquetParser, err := NewParquetParser(ctx, collectionInfo, idAllocator, 10240, cm, filePath, flushFunc, nil)
// assert.Error(t, err)
// assert.Nil(t, parquetParser)
//})
t.Run("chunk manager reader fail", func(t *testing.T) {
collectionInfo, err := NewCollectionInfo(parquetSampleSchema(), 2, []int64{1})
assert.NoError(t, err)
idAllocator := newIDAllocator(ctx, t, nil)
cm := mocks.NewChunkManager(t)
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
parquetParser, err := NewParquetParser(ctx, collectionInfo, idAllocator, 10240, cm, "", flushFunc, nil)
assert.Error(t, err)
assert.Nil(t, parquetParser)
})
}
func TestVerifyFieldSchema(t *testing.T) {
ok := verifyFieldSchema(schemapb.DataType_Bool, schemapb.DataType_None, arrow.Field{Type: &arrow.BooleanType{}})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_Bool, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.BooleanType{}})})
assert.False(t, ok)
func Test_convertArrowSchemaToDataType(t *testing.T) {
type testcase struct {
arrowField arrow.Field
dataType schemapb.DataType
isArray bool
}
testcases := []testcase{
{arrow.Field{Type: &arrow.BooleanType{}}, schemapb.DataType_Bool, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.BooleanType{}})}, schemapb.DataType_Bool, true},
ok = verifyFieldSchema(schemapb.DataType_Int8, schemapb.DataType_None, arrow.Field{Type: &arrow.Int8Type{}})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_Int8, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int8Type{}})})
assert.False(t, ok)
{arrow.Field{Type: &arrow.Int8Type{}}, schemapb.DataType_Int8, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int8Type{}})}, schemapb.DataType_Int8, true},
ok = verifyFieldSchema(schemapb.DataType_Int16, schemapb.DataType_None, arrow.Field{Type: &arrow.Int16Type{}})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_Int16, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int16Type{}})})
assert.False(t, ok)
{arrow.Field{Type: &arrow.Int16Type{}}, schemapb.DataType_Int16, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int16Type{}})}, schemapb.DataType_Int16, true},
ok = verifyFieldSchema(schemapb.DataType_Int32, schemapb.DataType_None, arrow.Field{Type: &arrow.Int32Type{}})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_Int32, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int32Type{}})})
assert.False(t, ok)
{arrow.Field{Type: &arrow.Int32Type{}}, schemapb.DataType_Int32, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int32Type{}})}, schemapb.DataType_Int32, true},
ok = verifyFieldSchema(schemapb.DataType_Int64, schemapb.DataType_None, arrow.Field{Type: &arrow.Int64Type{}})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_Int64, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int64Type{}})})
assert.False(t, ok)
{arrow.Field{Type: &arrow.Int64Type{}}, schemapb.DataType_Int64, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int64Type{}})}, schemapb.DataType_Int64, true},
ok = verifyFieldSchema(schemapb.DataType_Float, schemapb.DataType_None, arrow.Field{Type: &arrow.Float32Type{}})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_Float, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float32Type{}})})
assert.False(t, ok)
{arrow.Field{Type: &arrow.Float32Type{}}, schemapb.DataType_Float, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float32Type{}})}, schemapb.DataType_Float, true},
ok = verifyFieldSchema(schemapb.DataType_Double, schemapb.DataType_None, arrow.Field{Type: &arrow.Float64Type{}})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_Double, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float64Type{}})})
assert.False(t, ok)
{arrow.Field{Type: &arrow.Float64Type{}}, schemapb.DataType_Double, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float64Type{}})}, schemapb.DataType_Double, true},
ok = verifyFieldSchema(schemapb.DataType_VarChar, schemapb.DataType_None, arrow.Field{Type: &arrow.StringType{}})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_VarChar, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.StringType{}})})
assert.False(t, ok)
{arrow.Field{Type: &arrow.StringType{}}, schemapb.DataType_VarChar, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.StringType{}})}, schemapb.DataType_VarChar, true},
ok = verifyFieldSchema(schemapb.DataType_FloatVector, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float32Type{}})})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_FloatVector, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float64Type{}})})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_FloatVector, schemapb.DataType_None, arrow.Field{Type: &arrow.Float32Type{}})
assert.False(t, ok)
{arrow.Field{Type: &arrow.BinaryType{}}, schemapb.DataType_BinaryVector, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Uint8Type{}})}, schemapb.DataType_BinaryVector, true},
{arrow.Field{Type: &arrow.Uint8Type{}}, schemapb.DataType_None, false},
ok = verifyFieldSchema(schemapb.DataType_BinaryVector, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Uint8Type{}})})
assert.True(t, ok)
ok = verifyFieldSchema(schemapb.DataType_BinaryVector, schemapb.DataType_None, arrow.Field{Type: &arrow.Uint8Type{}})
assert.False(t, ok)
{arrow.Field{Type: &arrow.Float16Type{}}, schemapb.DataType_None, false},
{arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float16Type{}})}, schemapb.DataType_Float16Vector, true},
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_Bool, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.BooleanType{}})})
assert.True(t, ok)
{arrow.Field{Type: &arrow.DayTimeIntervalType{}}, schemapb.DataType_None, false},
}
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_Int8, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int8Type{}})})
assert.True(t, ok)
for _, tt := range testcases {
arrowType, isList := convertArrowSchemaToDataType(tt.arrowField, false)
assert.Equal(t, tt.isArray, isList)
assert.Equal(t, tt.dataType, arrowType)
}
}
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_Int16, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int16Type{}})})
assert.True(t, ok)
func Test_isConvertible(t *testing.T) {
type testcase struct {
arrowType schemapb.DataType
dataType schemapb.DataType
isArray bool
expect bool
}
testcases := []testcase{
{schemapb.DataType_Bool, schemapb.DataType_Bool, false, true},
{schemapb.DataType_Bool, schemapb.DataType_Bool, true, true},
{schemapb.DataType_Bool, schemapb.DataType_Int8, false, false},
{schemapb.DataType_Bool, schemapb.DataType_Int8, true, false},
{schemapb.DataType_Bool, schemapb.DataType_String, false, false},
{schemapb.DataType_Bool, schemapb.DataType_String, true, false},
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_Int32, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int32Type{}})})
assert.True(t, ok)
{schemapb.DataType_Int8, schemapb.DataType_Bool, false, false},
{schemapb.DataType_Int8, schemapb.DataType_String, false, false},
{schemapb.DataType_Int8, schemapb.DataType_JSON, false, false},
{schemapb.DataType_Int8, schemapb.DataType_Int8, false, true},
{schemapb.DataType_Int8, schemapb.DataType_Int8, true, true},
{schemapb.DataType_Int8, schemapb.DataType_Int16, false, true},
{schemapb.DataType_Int8, schemapb.DataType_Int32, false, true},
{schemapb.DataType_Int8, schemapb.DataType_Int64, false, true},
{schemapb.DataType_Int8, schemapb.DataType_Float, false, true},
{schemapb.DataType_Int8, schemapb.DataType_Double, false, true},
{schemapb.DataType_Int8, schemapb.DataType_FloatVector, false, false},
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_Int64, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int64Type{}})})
assert.True(t, ok)
{schemapb.DataType_Int16, schemapb.DataType_Bool, false, false},
{schemapb.DataType_Int16, schemapb.DataType_String, false, false},
{schemapb.DataType_Int16, schemapb.DataType_JSON, false, false},
{schemapb.DataType_Int16, schemapb.DataType_Int8, false, false},
{schemapb.DataType_Int16, schemapb.DataType_Int16, false, true},
{schemapb.DataType_Int16, schemapb.DataType_Int32, false, true},
{schemapb.DataType_Int16, schemapb.DataType_Int64, false, true},
{schemapb.DataType_Int16, schemapb.DataType_Float, false, true},
{schemapb.DataType_Int16, schemapb.DataType_Double, false, true},
{schemapb.DataType_Int16, schemapb.DataType_FloatVector, false, false},
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_Float, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float32Type{}})})
assert.True(t, ok)
{schemapb.DataType_Int32, schemapb.DataType_Bool, false, false},
{schemapb.DataType_Int32, schemapb.DataType_String, false, false},
{schemapb.DataType_Int32, schemapb.DataType_JSON, false, false},
{schemapb.DataType_Int32, schemapb.DataType_Int8, false, false},
{schemapb.DataType_Int32, schemapb.DataType_Int16, false, false},
{schemapb.DataType_Int32, schemapb.DataType_Int32, false, true},
{schemapb.DataType_Int32, schemapb.DataType_Int64, false, true},
{schemapb.DataType_Int32, schemapb.DataType_Float, false, true},
{schemapb.DataType_Int32, schemapb.DataType_Double, false, true},
{schemapb.DataType_Int32, schemapb.DataType_FloatVector, false, false},
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_Double, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Float64Type{}})})
assert.True(t, ok)
{schemapb.DataType_Int64, schemapb.DataType_Bool, false, false},
{schemapb.DataType_Int64, schemapb.DataType_String, false, false},
{schemapb.DataType_Int64, schemapb.DataType_JSON, false, false},
{schemapb.DataType_Int64, schemapb.DataType_Int8, false, false},
{schemapb.DataType_Int64, schemapb.DataType_Int16, false, false},
{schemapb.DataType_Int64, schemapb.DataType_Int32, false, false},
{schemapb.DataType_Int64, schemapb.DataType_Int64, false, true},
{schemapb.DataType_Int64, schemapb.DataType_Float, false, true},
{schemapb.DataType_Int64, schemapb.DataType_Double, false, true},
{schemapb.DataType_Int64, schemapb.DataType_FloatVector, false, false},
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_VarChar, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.StringType{}})})
assert.True(t, ok)
{schemapb.DataType_Float, schemapb.DataType_Bool, false, false},
{schemapb.DataType_Float, schemapb.DataType_String, false, false},
{schemapb.DataType_Float, schemapb.DataType_JSON, false, false},
{schemapb.DataType_Float, schemapb.DataType_Int8, false, false},
{schemapb.DataType_Float, schemapb.DataType_Int16, false, false},
{schemapb.DataType_Float, schemapb.DataType_Int32, false, false},
{schemapb.DataType_Float, schemapb.DataType_Int64, false, false},
{schemapb.DataType_Float, schemapb.DataType_Float, false, true},
{schemapb.DataType_Float, schemapb.DataType_Double, false, true},
{schemapb.DataType_Float, schemapb.DataType_FloatVector, true, true},
ok = verifyFieldSchema(schemapb.DataType_Array, schemapb.DataType_None, arrow.Field{Type: arrow.ListOfField(arrow.Field{Type: &arrow.Int64Type{}})})
assert.False(t, ok)
{schemapb.DataType_Double, schemapb.DataType_Bool, false, false},
{schemapb.DataType_Double, schemapb.DataType_String, false, false},
{schemapb.DataType_Double, schemapb.DataType_JSON, false, false},
{schemapb.DataType_Double, schemapb.DataType_Int8, false, false},
{schemapb.DataType_Double, schemapb.DataType_Int16, false, false},
{schemapb.DataType_Double, schemapb.DataType_Int32, false, false},
{schemapb.DataType_Double, schemapb.DataType_Int64, false, false},
{schemapb.DataType_Double, schemapb.DataType_Float, false, false},
{schemapb.DataType_Double, schemapb.DataType_Double, false, true},
{schemapb.DataType_Double, schemapb.DataType_FloatVector, true, true},
{schemapb.DataType_VarChar, schemapb.DataType_VarChar, false, true},
{schemapb.DataType_VarChar, schemapb.DataType_JSON, false, true},
{schemapb.DataType_VarChar, schemapb.DataType_Bool, false, false},
{schemapb.DataType_VarChar, schemapb.DataType_Int64, false, false},
{schemapb.DataType_VarChar, schemapb.DataType_Float, false, false},
{schemapb.DataType_VarChar, schemapb.DataType_FloatVector, false, false},
{schemapb.DataType_Float16Vector, schemapb.DataType_Float16Vector, true, true},
{schemapb.DataType_Float16Vector, schemapb.DataType_Float16Vector, false, true},
{schemapb.DataType_BinaryVector, schemapb.DataType_BinaryVector, true, true},
{schemapb.DataType_BinaryVector, schemapb.DataType_BinaryVector, false, true},
{schemapb.DataType_JSON, schemapb.DataType_JSON, false, true},
{schemapb.DataType_JSON, schemapb.DataType_VarChar, false, false},
{schemapb.DataType_Array, schemapb.DataType_Array, false, false},
}
for _, tt := range testcases {
assert.Equal(t, tt.expect, isConvertible(tt.arrowType, tt.dataType, tt.isArray))
}
}
func TestCalcRowCountPerBlock(t *testing.T) {
@ -1024,3 +1252,81 @@ func TestCalcRowCountPerBlock(t *testing.T) {
assert.NoError(t, err)
})
}
func TestParquetParser_LessField(t *testing.T) {
paramtable.Init()
filePath := "/tmp/less_field.parquet"
ctx := context.Background()
schema := parquetSampleSchema()
idAllocator := newIDAllocator(ctx, t, nil)
defer os.Remove(filePath)
writeFile := func() {
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(t, err)
err = writeLessFieldParquet(wf, schema, 100)
assert.NoError(t, err)
}
writeFile()
schema = parquetSampleSchema()
t.Run("read file", func(t *testing.T) {
cm := createLocalChunkManager(t)
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
updateProgress := func(percent int64) {
assert.Greater(t, percent, int64(0))
}
// parquet schema sizePreRecord = 5296
parquetParser, err := NewParquetParser(ctx, collectionInfo, idAllocator, 102400, cm, filePath, flushFunc, updateProgress)
assert.NoError(t, err)
defer parquetParser.Close()
err = parquetParser.Parse()
assert.Error(t, err)
})
}
func TestParquetParser_MoreField(t *testing.T) {
paramtable.Init()
filePath := "/tmp/more_field.parquet"
ctx := context.Background()
schema := parquetSampleSchema()
idAllocator := newIDAllocator(ctx, t, nil)
defer os.Remove(filePath)
writeFile := func() {
wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
assert.NoError(t, err)
err = writeMoreFieldParquet(wf, schema, 100)
assert.NoError(t, err)
}
writeFile()
schema = parquetSampleSchema()
t.Run("read file", func(t *testing.T) {
cm := createLocalChunkManager(t)
flushFunc := func(fields BlockData, shardID int, partID int64) error {
return nil
}
collectionInfo, err := NewCollectionInfo(schema, 2, []int64{1})
assert.NoError(t, err)
updateProgress := func(percent int64) {
assert.Greater(t, percent, int64(0))
}
// parquet schema sizePreRecord = 5296
parquetParser, err := NewParquetParser(ctx, collectionInfo, idAllocator, 102400, cm, filePath, flushFunc, updateProgress)
assert.NoError(t, err)
defer parquetParser.Close()
err = parquetParser.Parse()
assert.Error(t, err)
})
}