mirror of https://github.com/milvus-io/milvus.git
Put logs of compaction elapse in defer (#15485)
Before this PR, time elapse are logged after `compact()` finished normally. No elapse will be logged with any errors. This PR put elapse logs in defer once time-consuming steps are finished. Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/15493/head
parent
46dea0f006
commit
43b6c295a4
|
@ -352,6 +352,9 @@ func (t *compactionTask) compact() error {
|
|||
t.injectFlush(ti, segIDs...)
|
||||
<-ti.Injected()
|
||||
injectEnd := time.Now()
|
||||
defer func() {
|
||||
log.Debug("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(injectEnd.Sub(injectStart))))
|
||||
}()
|
||||
|
||||
var (
|
||||
iItr = make([]iterator, 0)
|
||||
|
@ -427,11 +430,16 @@ func (t *compactionTask) compact() error {
|
|||
}
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
err = g.Wait()
|
||||
downloadEnd := time.Now()
|
||||
defer func() {
|
||||
log.Debug("download elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(downloadEnd.Sub(downloadStart))))
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
log.Error("compaction IO wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
downloadEnd := time.Now()
|
||||
|
||||
mergeItr := storage.NewMergeIterator(iItr)
|
||||
|
||||
|
@ -453,6 +461,9 @@ func (t *compactionTask) compact() error {
|
|||
return err
|
||||
}
|
||||
uploadEnd := time.Now()
|
||||
defer func() {
|
||||
log.Debug("upload elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(uploadEnd.Sub(uploadStart))))
|
||||
}()
|
||||
|
||||
for _, fbl := range cpaths.deltaInfo {
|
||||
for _, deltaLogInfo := range fbl.GetBinlogs() {
|
||||
|
@ -483,6 +494,9 @@ func (t *compactionTask) compact() error {
|
|||
return fmt.Errorf("complete comapction wrong: %s", status.GetReason())
|
||||
}
|
||||
rpcEnd := time.Now()
|
||||
defer func() {
|
||||
log.Debug("rpc elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(rpcEnd.Sub(rpcStart))))
|
||||
}()
|
||||
|
||||
// Compaction I: update pk range.
|
||||
// Compaction II: remove the segments and add a new flushed segment with pk range.
|
||||
|
@ -493,19 +507,21 @@ func (t *compactionTask) compact() error {
|
|||
t.mergeFlushedSegments(targetSegID, collID, partID, t.plan.GetPlanID(), segIDs, t.plan.GetChannel(), numRows)
|
||||
}
|
||||
|
||||
uninjectStart := time.Now()
|
||||
ti.injectDone(true)
|
||||
uninjectEnd := time.Now()
|
||||
defer func() {
|
||||
log.Debug("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
}()
|
||||
|
||||
log.Info("compaction done",
|
||||
zap.Int64("planID", t.plan.GetPlanID()),
|
||||
zap.Int("num of binlog paths", len(cpaths.inPaths)),
|
||||
zap.Int("num of stats paths", len(cpaths.statsPaths)),
|
||||
zap.Int("num of delta paths", len(cpaths.deltaInfo)),
|
||||
zap.Any("inject elapse in ms", nano2Milli(injectEnd.Sub(injectStart))),
|
||||
zap.Any("download IO elapse in ms", nano2Milli(downloadEnd.Sub(downloadStart))),
|
||||
zap.Any("upload IO elapse in ms", nano2Milli(uploadEnd.Sub(uploadStart))),
|
||||
zap.Any("complete compaction rpc elapse in ms", nano2Milli(rpcEnd.Sub(rpcStart))),
|
||||
zap.Any("injectDone elapse in ms", nano2Milli(time.Since(rpcEnd))),
|
||||
zap.Any("total elapse in ms", nano2Milli(time.Since(compactStart))),
|
||||
)
|
||||
|
||||
log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(time.Since(compactStart))))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue