mirror of https://github.com/milvus-io/milvus.git
fix: BinlogDeserializeReader leak in mix_compactor.go (#36270)
https://github.com/milvus-io/milvus/issues/36269 Signed-off-by: fengjun2016 <jornfeng@gmail.com>pull/36775/head
parent
621dbc9107
commit
7c8b71e26c
|
@ -28,6 +28,7 @@ import (
|
|||
"go.opentelemetry.io/otel"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/io"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -141,17 +142,6 @@ func (t *mixCompactionTask) mergeSplit(
|
|||
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
|
||||
mWriter := NewMultiSegmentWriter(t.binlogIO, compAlloc, t.plan, t.maxRows, t.partitionID, t.collectionID)
|
||||
|
||||
isValueDeleted := func(v *storage.Value) bool {
|
||||
ts, ok := delta[v.PK.GetValue()]
|
||||
// insert task and delete task has the same ts when upsert
|
||||
// here should be < instead of <=
|
||||
// to avoid the upsert data to be deleted after compact
|
||||
if ok && uint64(v.Timestamp) < ts {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
deletedRowCount := int64(0)
|
||||
expiredRowCount := int64(0)
|
||||
|
||||
|
@ -161,51 +151,10 @@ func (t *mixCompactionTask) mergeSplit(
|
|||
return nil, err
|
||||
}
|
||||
for _, paths := range binlogPaths {
|
||||
log := log.With(zap.Strings("paths", paths))
|
||||
allValues, err := t.binlogIO.Download(ctx, paths)
|
||||
err := t.dealBinlogPaths(ctx, delta, mWriter, pkField, paths, &deletedRowCount, &expiredRowCount)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
|
||||
return &storage.Blob{Key: paths[i], Value: v}
|
||||
})
|
||||
|
||||
iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID())
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for {
|
||||
err := iter.Next()
|
||||
if err != nil {
|
||||
if err == sio.EOF {
|
||||
break
|
||||
} else {
|
||||
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
v := iter.Value()
|
||||
if isValueDeleted(v) {
|
||||
deletedRowCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// Filtering expired entity
|
||||
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(v.Timestamp)) {
|
||||
expiredRowCount++
|
||||
continue
|
||||
}
|
||||
|
||||
err = mWriter.Write(v)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to writer row", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
res, err := mWriter.Finish()
|
||||
if err != nil {
|
||||
|
@ -223,6 +172,70 @@ func (t *mixCompactionTask) mergeSplit(
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func isValueDeleted(v *storage.Value, delta map[interface{}]typeutil.Timestamp) bool {
|
||||
ts, ok := delta[v.PK.GetValue()]
|
||||
// insert task and delete task has the same ts when upsert
|
||||
// here should be < instead of <=
|
||||
// to avoid the upsert data to be deleted after compact
|
||||
if ok && uint64(v.Timestamp) < ts {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) dealBinlogPaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp, mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string, deletedRowCount, expiredRowCount *int64) error {
|
||||
log := log.With(zap.Strings("paths", paths))
|
||||
allValues, err := t.binlogIO.Download(ctx, paths)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
|
||||
return &storage.Blob{Key: paths[i], Value: v}
|
||||
})
|
||||
|
||||
iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID())
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
for {
|
||||
err := iter.Next()
|
||||
if err != nil {
|
||||
if err == sio.EOF {
|
||||
break
|
||||
} else {
|
||||
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
v := iter.Value()
|
||||
if isValueDeleted(v, delta) {
|
||||
oldDeletedRowCount := *deletedRowCount
|
||||
*deletedRowCount = oldDeletedRowCount + 1
|
||||
continue
|
||||
}
|
||||
|
||||
// Filtering expired entity
|
||||
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(v.Timestamp)) {
|
||||
oldExpiredRowCount := *expiredRowCount
|
||||
*expiredRowCount = oldExpiredRowCount + 1
|
||||
continue
|
||||
}
|
||||
|
||||
err = mWriter.Write(v)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to writer row", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
|
||||
durInQueue := t.tr.RecordSpan()
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("MixCompact-%d", t.GetPlanID()))
|
||||
|
|
Loading…
Reference in New Issue