fix: correct memory size estimation on arrays (#40377)

See: #40342
pr: #40312

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
pull/40425/head
Ted Xu 2025-03-06 17:42:04 +08:00 committed by GitHub
parent 89bc94595d
commit f83567c0d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 125 additions and 31 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"io"
"math"
"strconv"
"sync"
"github.com/apache/arrow/go/v12/arrow"
@ -33,6 +34,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type Record interface {
@ -675,12 +677,14 @@ var _ RecordWriter = (*CompositeRecordWriter)(nil)
type CompositeRecordWriter struct {
writers map[FieldID]RecordWriter
writtenUncompressed uint64
}
func (crw *CompositeRecordWriter) GetWrittenUncompressed() uint64 {
return crw.writtenUncompressed
s := uint64(0)
for _, w := range crw.writers {
s += w.GetWrittenUncompressed()
}
return s
}
func (crw *CompositeRecordWriter) Write(r Record) error {
@ -688,12 +692,6 @@ func (crw *CompositeRecordWriter) Write(r Record) error {
return fmt.Errorf("schema length mismatch %d, expected %d", len(r.Schema()), len(crw.writers))
}
var bytes uint64
for fid := range r.Schema() {
arr := r.Column(fid)
bytes += uint64(calculateArraySize(arr))
}
crw.writtenUncompressed += bytes
for fieldId, w := range crw.writers {
sr := newSelectiveRecord(r, fieldId)
if err := w.Write(sr); err != nil {
@ -735,8 +733,9 @@ type singleFieldRecordWriter struct {
schema *arrow.Schema
writerProps *parquet.WriterProperties
numRows int
writtenUncompressed uint64
numRows int
writtenUncompressed uint64
memoryExpansionRatio int
}
func (sfw *singleFieldRecordWriter) Write(r Record) error {
@ -750,21 +749,45 @@ func (sfw *singleFieldRecordWriter) Write(r Record) error {
}
func (sfw *singleFieldRecordWriter) GetWrittenUncompressed() uint64 {
return sfw.writtenUncompressed
return sfw.writtenUncompressed * uint64(sfw.memoryExpansionRatio)
}
func (sfw *singleFieldRecordWriter) Close() {
sfw.fw.Close()
}
func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer, opts ...RecordWriterOptions) (*singleFieldRecordWriter, error) {
func newSingleFieldRecordWriter(field *schemapb.FieldSchema, writer io.Writer, opts ...RecordWriterOptions) (*singleFieldRecordWriter, error) {
// calculate memory expansion ratio
// arrays are serialized by protobuf, where int values may be compacted, see https://protobuf.dev/reference/go/size
// to correct the actual size, we need to multiply the memory expansion ratio accordingly.
determineMemoryExpansionRatio := func(field *schemapb.FieldSchema) int {
if field.DataType == schemapb.DataType_Array {
switch field.GetElementType() {
case schemapb.DataType_Int16:
return 2
case schemapb.DataType_Int32:
return 4
case schemapb.DataType_Int64:
return 8
}
}
return 1
}
dim, _ := typeutil.GetDim(field)
w := &singleFieldRecordWriter{
fieldId: fieldId,
schema: arrow.NewSchema([]arrow.Field{field}, nil),
fieldId: field.FieldID,
schema: arrow.NewSchema([]arrow.Field{
{
Name: strconv.Itoa(int(field.FieldID)),
Type: serdeMap[field.DataType].arrowType(int(dim)),
Nullable: true, // No nullable check here.
},
}, nil),
writerProps: parquet.NewWriterProperties(
parquet.WithMaxRowGroupLength(math.MaxInt64), // No additional grouping for now.
parquet.WithCompression(compress.Codecs.Zstd),
parquet.WithCompressionLevel(3)),
memoryExpansionRatio: determineMemoryExpansionRatio(field),
}
for _, o := range opts {
o(w)

View File

@ -272,13 +272,7 @@ func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) {
return bsw.rw, nil
}
fid := bsw.fieldSchema.FieldID
dim, _ := typeutil.GetDim(bsw.fieldSchema)
rw, err := newSingleFieldRecordWriter(fid, arrow.Field{
Name: strconv.Itoa(int(fid)),
Type: serdeMap[bsw.fieldSchema.DataType].arrowType(int(dim)),
Nullable: true, // No nullable check here.
}, &bsw.buf, WithRecordWriterProps(getFieldWriterProps(bsw.fieldSchema)))
rw, err := newSingleFieldRecordWriter(bsw.fieldSchema, &bsw.buf, WithRecordWriterProps(getFieldWriterProps(bsw.fieldSchema)))
if err != nil {
return nil, err
}
@ -426,12 +420,7 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
if dsw.rw != nil {
return dsw.rw, nil
}
dim, _ := typeutil.GetDim(dsw.fieldSchema)
rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{
Name: dsw.fieldSchema.Name,
Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)),
Nullable: false,
}, &dsw.buf, WithRecordWriterProps(getFieldWriterProps(dsw.fieldSchema)))
rw, err := newSingleFieldRecordWriter(dsw.fieldSchema, &dsw.buf, WithRecordWriterProps(getFieldWriterProps(dsw.fieldSchema)))
if err != nil {
return nil, err
}

View File

@ -75,9 +75,13 @@ func TestBinlogStreamWriter(t *testing.T) {
t.Run("test write", func(t *testing.T) {
size := 3
field := arrow.Field{Name: "bool", Type: arrow.FixedWidthTypes.Boolean}
field := &schemapb.FieldSchema{
FieldID: 1,
DataType: schemapb.DataType_Bool,
}
var w bytes.Buffer
rw, err := newSingleFieldRecordWriter(1, field, &w)
rw, err := newSingleFieldRecordWriter(field, &w)
assert.NoError(t, err)
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
@ -86,7 +90,7 @@ func TestBinlogStreamWriter(t *testing.T) {
defer arr.Release()
ar := array.NewRecord(
arrow.NewSchema(
[]arrow.Field{field},
[]arrow.Field{{Name: "bool", Type: arrow.FixedWidthTypes.Boolean}},
nil,
),
[]arrow.Array{arr},
@ -189,6 +193,84 @@ func TestBinlogSerializeWriter(t *testing.T) {
})
}
func TestSize(t *testing.T) {
t.Run("test array of int", func(t *testing.T) {
size := 100
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{
FieldID: 18,
Name: "array",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_Int32,
},
}}
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7)
assert.NoError(t, err)
for i := 0; i < size; i++ {
e := int32(i)
d := []int32{e, e, e, e, e, e, e, e}
value := &Value{
Value: map[FieldID]any{
18: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: d},
},
},
},
}
err := writer.Write(value)
assert.NoError(t, err)
}
err = writer.Close()
assert.NoError(t, err)
memSize := writer.WrittenMemorySize()
assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size
t.Log("writtern memory size", memSize)
})
t.Run("test array of varchar", func(t *testing.T) {
size := 100
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{
FieldID: 18,
Name: "array",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_String,
},
}}
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7)
assert.NoError(t, err)
for i := 0; i < size; i++ {
e := fmt.Sprintf("%4d", i)
d := []string{e, e, e, e, e, e, e, e}
value := &Value{
Value: map[FieldID]any{
18: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{Data: d},
},
},
},
}
err := writer.Write(value)
assert.NoError(t, err)
}
err = writer.Close()
assert.NoError(t, err)
memSize := writer.WrittenMemorySize()
assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size
t.Log("writtern memory size", memSize)
})
}
func BenchmarkSerializeWriter(b *testing.B) {
const (
dim = 128