diff --git a/internal/storage/serde.go b/internal/storage/serde.go index e19d92c351..190c92cf0c 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "math" + "strconv" "sync" "github.com/apache/arrow/go/v12/arrow" @@ -33,6 +34,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type Record interface { @@ -675,12 +677,14 @@ var _ RecordWriter = (*CompositeRecordWriter)(nil) type CompositeRecordWriter struct { writers map[FieldID]RecordWriter - - writtenUncompressed uint64 } func (crw *CompositeRecordWriter) GetWrittenUncompressed() uint64 { - return crw.writtenUncompressed + s := uint64(0) + for _, w := range crw.writers { + s += w.GetWrittenUncompressed() + } + return s } func (crw *CompositeRecordWriter) Write(r Record) error { @@ -688,12 +692,6 @@ func (crw *CompositeRecordWriter) Write(r Record) error { return fmt.Errorf("schema length mismatch %d, expected %d", len(r.Schema()), len(crw.writers)) } - var bytes uint64 - for fid := range r.Schema() { - arr := r.Column(fid) - bytes += uint64(calculateArraySize(arr)) - } - crw.writtenUncompressed += bytes for fieldId, w := range crw.writers { sr := newSelectiveRecord(r, fieldId) if err := w.Write(sr); err != nil { @@ -735,8 +733,9 @@ type singleFieldRecordWriter struct { schema *arrow.Schema writerProps *parquet.WriterProperties - numRows int - writtenUncompressed uint64 + numRows int + writtenUncompressed uint64 + memoryExpansionRatio int } func (sfw *singleFieldRecordWriter) Write(r Record) error { @@ -750,21 +749,45 @@ func (sfw *singleFieldRecordWriter) Write(r Record) error { } func (sfw *singleFieldRecordWriter) GetWrittenUncompressed() uint64 { - return sfw.writtenUncompressed + return sfw.writtenUncompressed * uint64(sfw.memoryExpansionRatio) } func (sfw *singleFieldRecordWriter) Close() { sfw.fw.Close() } -func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer, opts ...RecordWriterOptions) (*singleFieldRecordWriter, error) { +func newSingleFieldRecordWriter(field *schemapb.FieldSchema, writer io.Writer, opts ...RecordWriterOptions) (*singleFieldRecordWriter, error) { + // calculate memory expansion ratio + // arrays are serialized by protobuf, where int values may be compacted, see https://protobuf.dev/reference/go/size + // to correct the actual size, we need to multiply the memory expansion ratio accordingly. + determineMemoryExpansionRatio := func(field *schemapb.FieldSchema) int { + if field.DataType == schemapb.DataType_Array { + switch field.GetElementType() { + case schemapb.DataType_Int16: + return 2 + case schemapb.DataType_Int32: + return 4 + case schemapb.DataType_Int64: + return 8 + } + } + return 1 + } + dim, _ := typeutil.GetDim(field) w := &singleFieldRecordWriter{ - fieldId: fieldId, - schema: arrow.NewSchema([]arrow.Field{field}, nil), + fieldId: field.FieldID, + schema: arrow.NewSchema([]arrow.Field{ + { + Name: strconv.Itoa(int(field.FieldID)), + Type: serdeMap[field.DataType].arrowType(int(dim)), + Nullable: true, // No nullable check here. + }, + }, nil), writerProps: parquet.NewWriterProperties( parquet.WithMaxRowGroupLength(math.MaxInt64), // No additional grouping for now. parquet.WithCompression(compress.Codecs.Zstd), parquet.WithCompressionLevel(3)), + memoryExpansionRatio: determineMemoryExpansionRatio(field), } for _, o := range opts { o(w) diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index fb7836e336..141f5b50d1 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -272,13 +272,7 @@ func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) { return bsw.rw, nil } - fid := bsw.fieldSchema.FieldID - dim, _ := typeutil.GetDim(bsw.fieldSchema) - rw, err := newSingleFieldRecordWriter(fid, arrow.Field{ - Name: strconv.Itoa(int(fid)), - Type: serdeMap[bsw.fieldSchema.DataType].arrowType(int(dim)), - Nullable: true, // No nullable check here. - }, &bsw.buf, WithRecordWriterProps(getFieldWriterProps(bsw.fieldSchema))) + rw, err := newSingleFieldRecordWriter(bsw.fieldSchema, &bsw.buf, WithRecordWriterProps(getFieldWriterProps(bsw.fieldSchema))) if err != nil { return nil, err } @@ -426,12 +420,7 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) { if dsw.rw != nil { return dsw.rw, nil } - dim, _ := typeutil.GetDim(dsw.fieldSchema) - rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{ - Name: dsw.fieldSchema.Name, - Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)), - Nullable: false, - }, &dsw.buf, WithRecordWriterProps(getFieldWriterProps(dsw.fieldSchema))) + rw, err := newSingleFieldRecordWriter(dsw.fieldSchema, &dsw.buf, WithRecordWriterProps(getFieldWriterProps(dsw.fieldSchema))) if err != nil { return nil, err } diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index b0799405be..1d5e239401 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -75,9 +75,13 @@ func TestBinlogStreamWriter(t *testing.T) { t.Run("test write", func(t *testing.T) { size := 3 - field := arrow.Field{Name: "bool", Type: arrow.FixedWidthTypes.Boolean} + field := &schemapb.FieldSchema{ + FieldID: 1, + DataType: schemapb.DataType_Bool, + } + var w bytes.Buffer - rw, err := newSingleFieldRecordWriter(1, field, &w) + rw, err := newSingleFieldRecordWriter(field, &w) assert.NoError(t, err) builder := array.NewBooleanBuilder(memory.DefaultAllocator) @@ -86,7 +90,7 @@ func TestBinlogStreamWriter(t *testing.T) { defer arr.Release() ar := array.NewRecord( arrow.NewSchema( - []arrow.Field{field}, + []arrow.Field{{Name: "bool", Type: arrow.FixedWidthTypes.Boolean}}, nil, ), []arrow.Array{arr}, @@ -189,6 +193,84 @@ func TestBinlogSerializeWriter(t *testing.T) { }) } +func TestSize(t *testing.T) { + t.Run("test array of int", func(t *testing.T) { + size := 100 + schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + { + FieldID: 18, + Name: "array", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int32, + }, + }} + + writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields) + writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7) + assert.NoError(t, err) + + for i := 0; i < size; i++ { + e := int32(i) + d := []int32{e, e, e, e, e, e, e, e} + value := &Value{ + Value: map[FieldID]any{ + 18: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: d}, + }, + }, + }, + } + err := writer.Write(value) + assert.NoError(t, err) + } + + err = writer.Close() + assert.NoError(t, err) + memSize := writer.WrittenMemorySize() + assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size + t.Log("writtern memory size", memSize) + }) + + t.Run("test array of varchar", func(t *testing.T) { + size := 100 + schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + { + FieldID: 18, + Name: "array", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_String, + }, + }} + + writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields) + writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7) + assert.NoError(t, err) + + for i := 0; i < size; i++ { + e := fmt.Sprintf("%4d", i) + d := []string{e, e, e, e, e, e, e, e} + value := &Value{ + Value: map[FieldID]any{ + 18: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{Data: d}, + }, + }, + }, + } + err := writer.Write(value) + assert.NoError(t, err) + } + + err = writer.Close() + assert.NoError(t, err) + memSize := writer.WrittenMemorySize() + assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size + t.Log("writtern memory size", memSize) + }) +} + func BenchmarkSerializeWriter(b *testing.B) { const ( dim = 128