mirror of https://github.com/milvus-io/milvus.git
enhance: [2.4] release the record in delete codec and add some log for compaction (#34506)
/kind improvement - pr: #34454 Signed-off-by: SimFG <bang.fu@zilliz.com>pull/33822/head^2
parent
7788decb37
commit
737bd7c734
|
@ -124,6 +124,7 @@ func (t *mixCompactionTask) merge(
|
||||||
syncBatchCount int // binlog batch count
|
syncBatchCount int // binlog batch count
|
||||||
remainingRowCount int64 // the number of remaining entities
|
remainingRowCount int64 // the number of remaining entities
|
||||||
expiredRowCount int64 // the number of expired entities
|
expiredRowCount int64 // the number of expired entities
|
||||||
|
deletedRowCount int64 = 0
|
||||||
unflushedRowCount int64 = 0
|
unflushedRowCount int64 = 0
|
||||||
|
|
||||||
// All binlog meta of a segment
|
// All binlog meta of a segment
|
||||||
|
@ -177,6 +178,7 @@ func (t *mixCompactionTask) merge(
|
||||||
}
|
}
|
||||||
v := iter.Value()
|
v := iter.Value()
|
||||||
if isValueDeleted(v) {
|
if isValueDeleted(v) {
|
||||||
|
deletedRowCount++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,6 +259,7 @@ func (t *mixCompactionTask) merge(
|
||||||
|
|
||||||
log.Info("compact merge end",
|
log.Info("compact merge end",
|
||||||
zap.Int64("remaining row count", remainingRowCount),
|
zap.Int64("remaining row count", remainingRowCount),
|
||||||
|
zap.Int64("deleted row count", deletedRowCount),
|
||||||
zap.Int64("expired entities", expiredRowCount),
|
zap.Int64("expired entities", expiredRowCount),
|
||||||
zap.Int("binlog batch count", syncBatchCount),
|
zap.Int("binlog batch count", syncBatchCount),
|
||||||
zap.Duration("download binlogs elapse", downloadTimeCost),
|
zap.Duration("download binlogs elapse", downloadTimeCost),
|
||||||
|
|
|
@ -892,7 +892,7 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
||||||
|
|
||||||
deleteLog := &DeleteLog{}
|
deleteLog := &DeleteLog{}
|
||||||
|
|
||||||
for rr.Next() {
|
handleRecord := func() error {
|
||||||
rec := rr.Record()
|
rec := rr.Record()
|
||||||
defer rec.Release()
|
defer rec.Release()
|
||||||
column := rec.Column(0)
|
column := rec.Column(0)
|
||||||
|
@ -905,6 +905,14 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
||||||
|
|
||||||
result.Append(deleteLog.Pk, deleteLog.Ts)
|
result.Append(deleteLog.Pk, deleteLog.Ts)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for rr.Next() {
|
||||||
|
err := handleRecord()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue