From 737bd7c73469de46a5a022fb3385fde557e67133 Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 9 Jul 2024 15:40:17 +0800 Subject: [PATCH] enhance: [2.4] release the record in delete codec and add some log for compaction (#34506) /kind improvement - pr: #34454 Signed-off-by: SimFG --- internal/datanode/compaction/mix_compactor.go | 3 +++ internal/storage/data_codec.go | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 8a4ff77f40..8144ed8e07 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -124,6 +124,7 @@ func (t *mixCompactionTask) merge( syncBatchCount int // binlog batch count remainingRowCount int64 // the number of remaining entities expiredRowCount int64 // the number of expired entities + deletedRowCount int64 = 0 unflushedRowCount int64 = 0 // All binlog meta of a segment @@ -177,6 +178,7 @@ func (t *mixCompactionTask) merge( } v := iter.Value() if isValueDeleted(v) { + deletedRowCount++ continue } @@ -257,6 +259,7 @@ func (t *mixCompactionTask) merge( log.Info("compact merge end", zap.Int64("remaining row count", remainingRowCount), + zap.Int64("deleted row count", deletedRowCount), zap.Int64("expired entities", expiredRowCount), zap.Int("binlog batch count", syncBatchCount), zap.Duration("download binlogs elapse", downloadTimeCost), diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 9ca53eff12..1932371525 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -892,7 +892,7 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID deleteLog := &DeleteLog{} - for rr.Next() { + handleRecord := func() error { rec := rr.Record() defer rec.Release() column := rec.Column(0) @@ -905,6 +905,14 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID result.Append(deleteLog.Pk, deleteLog.Ts) } + return nil + } + + for rr.Next() { + err := handleRecord() + if err != nil { + return err + } } return nil }