mirror of https://github.com/milvus-io/milvus.git
fix: Massive memory cost when compacting (#40763)
downloads batch binlogs instead of all segment's binlogs See also: #40761 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/40010/head
parent
1a6761ac69
commit
281260e48a
|
@ -169,15 +169,15 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in
|
|||
}
|
||||
|
||||
func composePaths(segments []*datapb.CompactionSegmentBinlogs) (
|
||||
deltaPaths map[typeutil.UniqueID][]string, insertPaths map[typeutil.UniqueID][]string, err error,
|
||||
deltaPaths map[typeutil.UniqueID][]string, insertPaths map[typeutil.UniqueID][][]string, err error,
|
||||
) {
|
||||
if err := binlog.DecompressCompactionBinlogs(segments); err != nil {
|
||||
log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
deltaPaths = make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths
|
||||
insertPaths = make(map[typeutil.UniqueID][]string, 0) // segmentID to binlog paths
|
||||
deltaPaths = make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths
|
||||
insertPaths = make(map[typeutil.UniqueID][][]string, 0) // segmentID to binlog paths
|
||||
for _, s := range segments {
|
||||
segId := s.GetSegmentID()
|
||||
// Get the batch count of field binlog files from non-empty segment
|
||||
|
@ -199,7 +199,7 @@ func composePaths(segments []*datapb.CompactionSegmentBinlogs) (
|
|||
for _, f := range s.GetFieldBinlogs() {
|
||||
batchPaths = append(batchPaths, f.GetBinlogs()[idx].GetLogPath())
|
||||
}
|
||||
insertPaths[segId] = append(insertPaths[segId], batchPaths...)
|
||||
insertPaths[segId] = append(insertPaths[segId], batchPaths)
|
||||
}
|
||||
|
||||
deltaPaths[s.GetSegmentID()] = []string{}
|
||||
|
|
|
@ -131,7 +131,7 @@ func (t *mixCompactionTask) preCompact() error {
|
|||
|
||||
func (t *mixCompactionTask) mergeSplit(
|
||||
ctx context.Context,
|
||||
insertPaths map[int64][]string,
|
||||
insertPaths map[int64][][]string,
|
||||
deltaPaths map[int64][]string,
|
||||
) ([]*datapb.CompactionSegment, error) {
|
||||
_ = t.tr.RecordSpan()
|
||||
|
@ -154,14 +154,23 @@ func (t *mixCompactionTask) mergeSplit(
|
|||
log.Warn("failed to get pk field from schema")
|
||||
return nil, err
|
||||
}
|
||||
for segId, binlogPaths := range insertPaths {
|
||||
for segId, binlogBatches := range insertPaths {
|
||||
deltaPaths := deltaPaths[segId]
|
||||
del, exp, err := t.writeSegment(ctx, binlogPaths, deltaPaths, mWriter, pkField)
|
||||
delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
deletedRowCount += del
|
||||
expiredRowCount += exp
|
||||
entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)
|
||||
|
||||
for _, batchPaths := range binlogBatches {
|
||||
del, exp, err := t.writeSegment(ctx, batchPaths, entityFilter, mWriter, pkField)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deletedRowCount += del
|
||||
expiredRowCount += exp
|
||||
}
|
||||
}
|
||||
res, err := mWriter.Finish()
|
||||
if err != nil {
|
||||
|
@ -180,28 +189,21 @@ func (t *mixCompactionTask) mergeSplit(
|
|||
}
|
||||
|
||||
func (t *mixCompactionTask) writeSegment(ctx context.Context,
|
||||
binlogPaths []string,
|
||||
deltaPaths []string,
|
||||
batchPaths []string,
|
||||
entityFilter *EntityFilter,
|
||||
mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema,
|
||||
) (deletedRowCount, expiredRowCount int64, err error) {
|
||||
log := log.With(zap.Strings("paths", binlogPaths))
|
||||
allValues, err := t.binlogIO.Download(ctx, binlogPaths)
|
||||
log := log.With(zap.Strings("paths", batchPaths))
|
||||
allValues, err := t.binlogIO.Download(ctx, batchPaths)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
|
||||
return &storage.Blob{Key: binlogPaths[i], Value: v}
|
||||
return &storage.Blob{Key: batchPaths[i], Value: v}
|
||||
})
|
||||
|
||||
delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err))
|
||||
return
|
||||
}
|
||||
entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)
|
||||
|
||||
reader, err := storage.NewCompositeBinlogRecordReader(blobs)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
|
||||
|
@ -272,7 +274,7 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
|
|||
}
|
||||
}
|
||||
|
||||
deltalogDeleteEntriesCount := len(delta)
|
||||
deltalogDeleteEntriesCount := entityFilter.GetDeltalogDeleteCount()
|
||||
deletedRowCount = int64(entityFilter.GetDeletedCount())
|
||||
expiredRowCount = int64(entityFilter.GetExpiredCount())
|
||||
|
||||
|
|
|
@ -357,7 +357,7 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
|
|||
s.task.partitionID = PartitionID
|
||||
s.task.maxRows = 1000
|
||||
|
||||
compactionSegments, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: lo.Keys(kvs)}, nil)
|
||||
compactionSegments, err := s.task.mergeSplit(s.task.ctx, map[int64][][]string{s.segWriter.segmentID: {lo.Keys(kvs)}}, nil)
|
||||
s.NoError(err)
|
||||
s.Equal(1, len(compactionSegments))
|
||||
s.EqualValues(0, compactionSegments[0].GetNumOfRows())
|
||||
|
@ -410,7 +410,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
|
|||
s.task.collectionID = CollectionID
|
||||
s.task.partitionID = PartitionID
|
||||
s.task.maxRows = 1000
|
||||
res, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: insertPaths}, deletePaths)
|
||||
res, err := s.task.mergeSplit(s.task.ctx, map[int64][][]string{s.segWriter.segmentID: {insertPaths}}, deletePaths)
|
||||
s.NoError(err)
|
||||
s.EqualValues(test.expectedRes, len(res))
|
||||
s.EqualValues(test.leftNumRows, res[0].GetNumOfRows())
|
||||
|
|
Loading…
Reference in New Issue