fix: panic calclulating data size on writing binlogs (#37575)

See: #37568

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
pull/37581/head
Ted Xu 2024-11-11 14:08:27 +08:00 committed by GitHub
parent fca946dee1
commit 31d0c84f67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 40 additions and 46 deletions

View File

@ -30,7 +30,6 @@ import (
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -59,7 +58,7 @@ type RecordWriter interface {
}
type (
Serializer[T any] func([]T) (Record, uint64, error)
Serializer[T any] func([]T) (Record, error)
Deserializer[T any] func(Record, []T) error
)
@ -614,7 +613,13 @@ func (r *selectiveRecord) Slice(start, end int) Record {
}
func calculateArraySize(a arrow.Array) int {
if a == nil || a.Data() == nil || a.Data().Buffers() == nil {
return 0
}
return lo.SumBy[*memory.Buffer, int](a.Data().Buffers(), func(b *memory.Buffer) int {
if b == nil {
return 0
}
return b.Len()
})
}
@ -791,9 +796,8 @@ type SerializeWriter[T any] struct {
batchSize int
mu sync.Mutex
buffer []T
pos int
writtenMemorySize atomic.Uint64
buffer []T
pos int
}
func (sw *SerializeWriter[T]) Flush() error {
@ -803,7 +807,7 @@ func (sw *SerializeWriter[T]) Flush() error {
return nil
}
buf := sw.buffer[:sw.pos]
r, size, err := sw.serializer(buf)
r, err := sw.serializer(buf)
if err != nil {
return err
}
@ -812,7 +816,6 @@ func (sw *SerializeWriter[T]) Flush() error {
return err
}
sw.pos = 0
sw.writtenMemorySize.Add(size)
return nil
}
@ -831,6 +834,8 @@ func (sw *SerializeWriter[T]) Write(value T) error {
}
func (sw *SerializeWriter[T]) WriteRecord(r Record) error {
sw.mu.Lock()
defer sw.mu.Unlock()
if len(sw.buffer) != 0 {
return errors.New("serialize buffer is not empty")
}
@ -839,16 +844,13 @@ func (sw *SerializeWriter[T]) WriteRecord(r Record) error {
return err
}
size := 0
for fid := range r.Schema() {
size += calculateArraySize(r.Column(fid))
}
sw.writtenMemorySize.Add(uint64(size))
return nil
}
func (sw *SerializeWriter[T]) WrittenMemorySize() uint64 {
return sw.writtenMemorySize.Load()
sw.mu.Lock()
defer sw.mu.Unlock()
return sw.rw.GetWrittenUncompressed()
}
func (sw *SerializeWriter[T]) Close() error {
@ -868,14 +870,14 @@ func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T],
}
type simpleArrowRecord struct {
Record
r arrow.Record
schema map[FieldID]schemapb.DataType
field2Col map[FieldID]int
}
var _ Record = (*simpleArrowRecord)(nil)
func (sr *simpleArrowRecord) Schema() map[FieldID]schemapb.DataType {
return sr.schema
}
@ -900,6 +902,11 @@ func (sr *simpleArrowRecord) ArrowSchema() *arrow.Schema {
return sr.r.Schema()
}
func (sr *simpleArrowRecord) Slice(start, end int) Record {
s := sr.r.NewSlice(int64(start), int64(end))
return newSimpleArrowRecord(s, sr.schema, sr.field2Col)
}
func newSimpleArrowRecord(r arrow.Record, schema map[FieldID]schemapb.DataType, field2Col map[FieldID]int) *simpleArrowRecord {
return &simpleArrowRecord{
r: r,

View File

@ -353,7 +353,7 @@ func NewBinlogStreamWriters(collectionID, partitionID, segmentID UniqueID,
return bws
}
func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, uint64, error) {
func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, error) {
builders := make(map[FieldID]array.Builder, len(fieldSchema))
types := make(map[FieldID]schemapb.DataType, len(fieldSchema))
for _, f := range fieldSchema {
@ -362,7 +362,6 @@ func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, u
types[f.FieldID] = f.DataType
}
var memorySize uint64
for _, vv := range v {
m := vv.Value.(map[FieldID]any)
@ -373,7 +372,7 @@ func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, u
}
ok = typeEntry.serialize(builders[fid], e)
if !ok {
return nil, 0, merr.WrapErrServiceInternal(fmt.Sprintf("serialize error on type %s", types[fid]))
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("serialize error on type %s", types[fid]))
}
}
}
@ -383,8 +382,6 @@ func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, u
i := 0
for fid, builder := range builders {
arrays[i] = builder.NewArray()
memorySize += uint64(calculateArraySize(arrays[i]))
builder.Release()
fields[i] = arrow.Field{
Name: strconv.Itoa(int(fid)),
@ -394,7 +391,7 @@ func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, u
field2Col[fid] = i
i++
}
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), types, field2Col), memorySize, nil
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), types, field2Col), nil
}
func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID,
@ -410,7 +407,7 @@ func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, se
rws[fid] = rw
}
compositeRecordWriter := NewCompositeRecordWriter(rws)
return NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, uint64, error) {
return NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, error) {
return ValueSerializer(v, schema.Fields)
}, batchSize), nil
}
@ -421,9 +418,8 @@ type DeltalogStreamWriter struct {
segmentID UniqueID
fieldSchema *schemapb.FieldSchema
memorySize int // To be updated on the fly
buf bytes.Buffer
rw *singleFieldRecordWriter
buf bytes.Buffer
rw *singleFieldRecordWriter
}
func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
@ -459,7 +455,7 @@ func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) {
return &Blob{
Value: b.Bytes(),
RowNum: int64(dsw.rw.numRows),
MemorySize: int64(dsw.memorySize),
MemorySize: int64(dsw.rw.writtenUncompressed),
}, nil
}
@ -471,7 +467,7 @@ func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
// Write descriptor
de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID)
de.PayloadDataType = dsw.fieldSchema.DataType
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize))
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(int(dsw.rw.writtenUncompressed)))
if err := de.Write(w); err != nil {
return err
}
@ -513,19 +509,16 @@ func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int
}
rws[0] = rw
compositeRecordWriter := NewCompositeRecordWriter(rws)
return NewSerializeRecordWriter[*DeleteLog](compositeRecordWriter, func(v []*DeleteLog) (Record, uint64, error) {
return NewSerializeRecordWriter[*DeleteLog](compositeRecordWriter, func(v []*DeleteLog) (Record, error) {
builder := array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String)
var memorySize uint64
for _, vv := range v {
strVal, err := json.Marshal(vv)
if err != nil {
return nil, memorySize, err
return nil, err
}
builder.AppendValueFromString(string(strVal))
eventWriter.memorySize += len(strVal)
memorySize += uint64(len(strVal))
}
arr := []arrow.Array{builder.NewArray()}
field := []arrow.Field{{
@ -539,7 +532,7 @@ func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int
schema := map[FieldID]schemapb.DataType{
0: schemapb.DataType_String,
}
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), schema, field2Col), memorySize, nil
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), schema, field2Col), nil
}, batchSize), nil
}
@ -657,9 +650,8 @@ type MultiFieldDeltalogStreamWriter struct {
segmentID UniqueID
pkType schemapb.DataType
memorySize int // To be updated on the fly
buf bytes.Buffer
rw *multiFieldRecordWriter
buf bytes.Buffer
rw *multiFieldRecordWriter
}
func (dsw *MultiFieldDeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
@ -705,7 +697,7 @@ func (dsw *MultiFieldDeltalogStreamWriter) Finalize() (*Blob, error) {
return &Blob{
Value: b.Bytes(),
RowNum: int64(dsw.rw.numRows),
MemorySize: int64(dsw.memorySize),
MemorySize: int64(dsw.rw.writtenUncompressed),
}, nil
}
@ -717,7 +709,7 @@ func (dsw *MultiFieldDeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) err
// Write descriptor
de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID)
de.PayloadDataType = schemapb.DataType_Int64
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize))
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(int(dsw.rw.writtenUncompressed)))
de.descriptorEventData.AddExtra(version, MultiField)
if err := de.Write(w); err != nil {
return err
@ -744,7 +736,7 @@ func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, ba
if err != nil {
return nil, err
}
return NewSerializeRecordWriter[*DeleteLog](rw, func(v []*DeleteLog) (Record, uint64, error) {
return NewSerializeRecordWriter[*DeleteLog](rw, func(v []*DeleteLog) (Record, error) {
fields := []arrow.Field{
{
Name: "pk",
@ -761,7 +753,6 @@ func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, ba
builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
defer builder.Release()
var memorySize uint64
pkType := schemapb.DataType(v[0].PkType)
switch pkType {
case schemapb.DataType_Int64:
@ -769,24 +760,20 @@ func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, ba
for _, vv := range v {
pk := vv.Pk.GetValue().(int64)
pb.Append(pk)
memorySize += 8
}
case schemapb.DataType_VarChar:
pb := builder.Field(0).(*array.StringBuilder)
for _, vv := range v {
pk := vv.Pk.GetValue().(string)
pb.Append(pk)
memorySize += uint64(binary.Size(pk))
}
default:
return nil, 0, fmt.Errorf("unexpected pk type %v", v[0].PkType)
return nil, fmt.Errorf("unexpected pk type %v", v[0].PkType)
}
for _, vv := range v {
builder.Field(1).(*array.Int64Builder).Append(int64(vv.Ts))
memorySize += 8
}
eventWriter.memorySize += int(memorySize)
arr := []arrow.Array{builder.Field(0).NewArray(), builder.Field(1).NewArray()}
@ -798,7 +785,7 @@ func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, ba
common.RowIDField: pkType,
common.TimeStampField: schemapb.DataType_Int64,
}
return newSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), schema, field2Col), memorySize, nil
return newSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), schema, field2Col), nil
}, batchSize), nil
}