diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 8a4ff77f40..8144ed8e07 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -124,6 +124,7 @@ func (t *mixCompactionTask) merge( syncBatchCount int // binlog batch count remainingRowCount int64 // the number of remaining entities expiredRowCount int64 // the number of expired entities + deletedRowCount int64 = 0 unflushedRowCount int64 = 0 // All binlog meta of a segment @@ -177,6 +178,7 @@ func (t *mixCompactionTask) merge( } v := iter.Value() if isValueDeleted(v) { + deletedRowCount++ continue } @@ -257,6 +259,7 @@ func (t *mixCompactionTask) merge( log.Info("compact merge end", zap.Int64("remaining row count", remainingRowCount), + zap.Int64("deleted row count", deletedRowCount), zap.Int64("expired entities", expiredRowCount), zap.Int("binlog batch count", syncBatchCount), zap.Duration("download binlogs elapse", downloadTimeCost), diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 9ca53eff12..1932371525 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -892,7 +892,7 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID deleteLog := &DeleteLog{} - for rr.Next() { + handleRecord := func() error { rec := rr.Record() defer rec.Release() column := rec.Column(0) @@ -905,6 +905,14 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID result.Append(deleteLog.Pk, deleteLog.Ts) } + return nil + } + + for rr.Next() { + err := handleRecord() + if err != nil { + return err + } } return nil }