mirror of https://github.com/milvus-io/milvus.git
Add elapse logs in compaction (#15414)
See also: #15413 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/15486/head
parent
b6bf6b0cff
commit
46dea0f006
|
@ -130,7 +130,7 @@ func (t *compactionTask) getChannelName() string {
|
|||
}
|
||||
|
||||
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[UniqueID]Timestamp, *DelDataBuf, error) {
|
||||
|
||||
mergeStart := time.Now()
|
||||
dCodec := storage.NewDeleteCodec()
|
||||
|
||||
var (
|
||||
|
@ -176,12 +176,19 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
|
|||
|
||||
dbuff.updateSize(dbuff.delData.RowCount)
|
||||
log.Debug("mergeDeltalogs end", zap.Int64("PlanID", t.getPlanID()),
|
||||
zap.Int("number of pks to compact in insert logs", len(pk2ts)))
|
||||
zap.Int("number of pks to compact in insert logs", len(pk2ts)),
|
||||
zap.Any("elapse in ms", nano2Milli(time.Since(mergeStart))))
|
||||
|
||||
return pk2ts, dbuff, nil
|
||||
}
|
||||
|
||||
// nano2Milli transfers nanoseconds to milliseconds in unit
|
||||
func nano2Milli(nano time.Duration) float64 {
|
||||
return float64(nano) / float64(time.Millisecond)
|
||||
}
|
||||
|
||||
func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) {
|
||||
mergeStart := time.Now()
|
||||
|
||||
var (
|
||||
dim int // dimension of vector field
|
||||
|
@ -284,11 +291,14 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
|
|||
|
||||
}
|
||||
|
||||
log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired))
|
||||
log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows),
|
||||
zap.Int64("expired entities", expired),
|
||||
zap.Any("elapse in ms", nano2Milli(time.Since(mergeStart))))
|
||||
return iDatas, numRows, nil
|
||||
}
|
||||
|
||||
func (t *compactionTask) compact() error {
|
||||
compactStart := time.Now()
|
||||
if ok := funcutil.CheckCtxValid(t.ctx); !ok {
|
||||
log.Error("compact wrong, task context done or timeout")
|
||||
return errContext
|
||||
|
@ -333,6 +343,7 @@ func (t *compactionTask) compact() error {
|
|||
}
|
||||
|
||||
// Inject to stop flush
|
||||
injectStart := time.Now()
|
||||
ti := newTaskInjection(len(segIDs), func(pack *segmentFlushPack) {
|
||||
pack.segmentID = targetSegID
|
||||
})
|
||||
|
@ -340,6 +351,7 @@ func (t *compactionTask) compact() error {
|
|||
|
||||
t.injectFlush(ti, segIDs...)
|
||||
<-ti.Injected()
|
||||
injectEnd := time.Now()
|
||||
|
||||
var (
|
||||
iItr = make([]iterator, 0)
|
||||
|
@ -360,6 +372,7 @@ func (t *compactionTask) compact() error {
|
|||
}
|
||||
}
|
||||
|
||||
downloadStart := time.Now()
|
||||
g, gCtx := errgroup.WithContext(ctxTimeout)
|
||||
for _, s := range t.plan.GetSegmentBinlogs() {
|
||||
|
||||
|
@ -418,6 +431,7 @@ func (t *compactionTask) compact() error {
|
|||
log.Error("compaction IO wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
downloadEnd := time.Now()
|
||||
|
||||
mergeItr := storage.NewMergeIterator(iItr)
|
||||
|
||||
|
@ -432,11 +446,13 @@ func (t *compactionTask) compact() error {
|
|||
return err
|
||||
}
|
||||
|
||||
uploadStart := time.Now()
|
||||
cpaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta)
|
||||
if err != nil {
|
||||
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
uploadEnd := time.Now()
|
||||
|
||||
for _, fbl := range cpaths.deltaInfo {
|
||||
for _, deltaLogInfo := range fbl.GetBinlogs() {
|
||||
|
@ -456,6 +472,7 @@ func (t *compactionTask) compact() error {
|
|||
NumOfRows: numRows,
|
||||
}
|
||||
|
||||
rpcStart := time.Now()
|
||||
status, err := t.dc.CompleteCompaction(ctxTimeout, pack)
|
||||
if err != nil {
|
||||
log.Error("complete compaction rpc wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
|
@ -465,6 +482,7 @@ func (t *compactionTask) compact() error {
|
|||
log.Error("complete compaction wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.String("reason", status.GetReason()))
|
||||
return fmt.Errorf("complete comapction wrong: %s", status.GetReason())
|
||||
}
|
||||
rpcEnd := time.Now()
|
||||
|
||||
// Compaction I: update pk range.
|
||||
// Compaction II: remove the segments and add a new flushed segment with pk range.
|
||||
|
@ -476,10 +494,17 @@ func (t *compactionTask) compact() error {
|
|||
}
|
||||
|
||||
ti.injectDone(true)
|
||||
log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()),
|
||||
log.Info("compaction done",
|
||||
zap.Int64("planID", t.plan.GetPlanID()),
|
||||
zap.Int("num of binlog paths", len(cpaths.inPaths)),
|
||||
zap.Int("num of stats paths", len(cpaths.statsPaths)),
|
||||
zap.Int("num of delta paths", len(cpaths.deltaInfo)),
|
||||
zap.Any("inject elapse in ms", nano2Milli(injectEnd.Sub(injectStart))),
|
||||
zap.Any("download IO elapse in ms", nano2Milli(downloadEnd.Sub(downloadStart))),
|
||||
zap.Any("upload IO elapse in ms", nano2Milli(uploadEnd.Sub(uploadStart))),
|
||||
zap.Any("complete compaction rpc elapse in ms", nano2Milli(rpcEnd.Sub(rpcStart))),
|
||||
zap.Any("injectDone elapse in ms", nano2Milli(time.Since(rpcEnd))),
|
||||
zap.Any("total elapse in ms", nano2Milli(time.Since(compactStart))),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue