mirror of https://github.com/milvus-io/milvus.git
fix: Import data from parquet file in streaming way (#29514)
issue: #29292 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/29539/head
parent
839a72129e
commit
c45f8a2946
|
@ -1024,7 +1024,7 @@ func splitFieldsData(collectionInfo *CollectionInfo, fieldsData BlockData, shard
|
|||
rowIDField, ok := fieldsData[common.RowIDField]
|
||||
if !ok {
|
||||
rowIDField = &storage.Int64FieldData{
|
||||
Data: make([]int64, 0),
|
||||
Data: make([]int64, 0, rowCount),
|
||||
}
|
||||
fieldsData[common.RowIDField] = rowIDField
|
||||
}
|
||||
|
|
|
@ -301,6 +301,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer parser.Close()
|
||||
|
||||
err = parser.Parse()
|
||||
if err != nil {
|
||||
|
|
|
@ -208,7 +208,7 @@ func ReadBoolArrayData(pcr *ParquetColumnReader, count int64) ([][]bool, error)
|
|||
offsets := listReader.Offsets()
|
||||
for i := 1; i < len(offsets); i++ {
|
||||
start, end := offsets[i-1], offsets[i]
|
||||
elementData := make([]bool, 0)
|
||||
elementData := make([]bool, 0, end-start)
|
||||
for j := start; j < end; j++ {
|
||||
elementData = append(elementData, boolReader.Value(int(j)))
|
||||
}
|
||||
|
@ -306,7 +306,7 @@ func ReadStringArrayData(pcr *ParquetColumnReader, count int64) ([][]string, err
|
|||
offsets := listReader.Offsets()
|
||||
for i := 1; i < len(offsets); i++ {
|
||||
start, end := offsets[i-1], offsets[i]
|
||||
elementData := make([]string, 0)
|
||||
elementData := make([]string, 0, end-start)
|
||||
for j := start; j < end; j++ {
|
||||
elementData = append(elementData, stringReader.Value(int(j)))
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
"github.com/apache/arrow/go/v12/parquet"
|
||||
"github.com/apache/arrow/go/v12/parquet/file"
|
||||
"github.com/apache/arrow/go/v12/parquet/pqarrow"
|
||||
"go.uber.org/zap"
|
||||
|
@ -87,11 +88,15 @@ func NewParquetParser(ctx context.Context,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
reader, err := file.NewParquetReader(cmReader)
|
||||
reader, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{
|
||||
BufferSize: 32 * 1024 * 1024,
|
||||
BufferedStreamEnabled: true,
|
||||
}))
|
||||
if err != nil {
|
||||
log.Warn("create parquet reader failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
log.Info("create file reader done!", zap.Int("row group num", reader.NumRowGroups()), zap.Int64("num rows", reader.NumRows()))
|
||||
|
||||
fileReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
|
||||
if err != nil {
|
||||
|
@ -544,7 +549,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
|
|||
Dim: columnReader.dimension,
|
||||
}, nil
|
||||
case schemapb.DataType_Array:
|
||||
data := make([]*schemapb.ScalarField, 0)
|
||||
data := make([]*schemapb.ScalarField, 0, rowCount)
|
||||
switch columnReader.elementType {
|
||||
case schemapb.DataType_Bool:
|
||||
boolArray, err := ReadBoolArrayData(columnReader, rowCount)
|
||||
|
|
|
@ -474,7 +474,7 @@ func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int, isBi
|
|||
|
||||
func writeParquet(w io.Writer, milvusSchema *schemapb.CollectionSchema, numRows int) error {
|
||||
schema := convertMilvusSchemaToArrowSchema(milvusSchema)
|
||||
fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
|
||||
fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(1000)), pqarrow.DefaultWriterProps())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue