mirror of https://github.com/milvus-io/milvus.git
Fix CompactionLatency metrics (#26747)
- Refine compactor logs See also: #26743 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/26780/head
parent
9004601817
commit
8d54509e54
|
@ -201,16 +201,11 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
|
|||
dbuff.accumulateEntriesNum(dbuff.delData.RowCount)
|
||||
log.Info("mergeDeltalogs end",
|
||||
zap.Int("number of deleted pks to compact in insert logs", len(pk2ts)),
|
||||
zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart))))
|
||||
zap.Duration("elapse", time.Since(mergeStart)))
|
||||
|
||||
return pk2ts, dbuff, nil
|
||||
}
|
||||
|
||||
// nano2Milli transfers nanoseconds to milliseconds in unit
|
||||
func nano2Milli(nano time.Duration) float64 {
|
||||
return float64(nano) / float64(time.Millisecond)
|
||||
}
|
||||
|
||||
func (t *compactionTask) uploadRemainLog(
|
||||
ctxTimeout context.Context,
|
||||
targetSegID UniqueID,
|
||||
|
@ -499,16 +494,19 @@ func (t *compactionTask) merge(
|
|||
statPaths = append(statPaths, path)
|
||||
}
|
||||
|
||||
log.Info("merge end", zap.Int64("remaining insert numRows", numRows),
|
||||
zap.Int64("expired entities", expired), zap.Int("binlog file number", numBinlogs),
|
||||
zap.Float64("download insert log elapse in ms", nano2Milli(downloadTimeCost)),
|
||||
zap.Float64("upload insert log elapse in ms", nano2Milli(uploadInsertTimeCost)),
|
||||
zap.Float64("merge elapse in ms", nano2Milli(time.Since(mergeStart))))
|
||||
log.Info("compact merge end",
|
||||
zap.Int64("remaining insert numRows", numRows),
|
||||
zap.Int64("expired entities", expired),
|
||||
zap.Int("binlog file number", numBinlogs),
|
||||
zap.Duration("download insert log elapse", downloadTimeCost),
|
||||
zap.Duration("upload insert log elapse", uploadInsertTimeCost),
|
||||
zap.Duration("merge elapse", time.Since(mergeStart)))
|
||||
|
||||
return insertPaths, statPaths, numRows, nil
|
||||
}
|
||||
|
||||
func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
||||
log := log.With(zap.Int64("planID", t.plan.GetPlanID()))
|
||||
compactStart := time.Now()
|
||||
if ok := funcutil.CheckCtxValid(t.ctx); !ok {
|
||||
log.Warn("compact wrong, task context done or timeout")
|
||||
|
@ -539,7 +537,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
}
|
||||
}
|
||||
|
||||
log.Info("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds()))
|
||||
log.Info("compact start", zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds()))
|
||||
segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs()))
|
||||
for _, s := range t.plan.GetSegmentBinlogs() {
|
||||
segIDs = append(segIDs, s.GetSegmentID())
|
||||
|
@ -547,7 +545,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
|
||||
_, partID, meta, err := t.getSegmentMeta(segIDs[0])
|
||||
if err != nil {
|
||||
log.Warn("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
log.Warn("compact wrong", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -654,10 +652,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
|
||||
t.injectFlush(ti, segIDs...)
|
||||
<-ti.Injected()
|
||||
injectEnd := time.Now()
|
||||
defer func() {
|
||||
log.Info("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(injectEnd.Sub(injectStart))))
|
||||
}()
|
||||
log.Info("compact inject elapse", zap.Duration("elapse", time.Since(injectStart)))
|
||||
|
||||
var (
|
||||
// SegmentID to deltaBlobs
|
||||
|
@ -681,7 +676,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
}
|
||||
// Unable to deal with all empty segments cases, so return error
|
||||
if binlogNum == 0 {
|
||||
log.Warn("compact wrong, all segments' binlogs are empty", zap.Int64("planID", t.plan.GetPlanID()))
|
||||
log.Warn("compact wrong, all segments' binlogs are empty")
|
||||
return nil, errIllegalCompactionPlan
|
||||
}
|
||||
|
||||
|
@ -700,7 +695,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
g.Go(func() error {
|
||||
bs, err := t.download(gCtx, []string{path})
|
||||
if err != nil {
|
||||
log.Warn("download deltalogs wrong", zap.String("path", path), zap.Error(err))
|
||||
log.Warn("compact download deltalogs wrong", zap.String("path", path), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -715,13 +710,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
}
|
||||
|
||||
err = g.Wait()
|
||||
downloadEnd := time.Now()
|
||||
defer func() {
|
||||
log.Info("download deltalogs elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(downloadEnd.Sub(downloadStart))))
|
||||
}()
|
||||
log.Info("compact download deltalogs elapse", zap.Duration("elapse", time.Since(downloadStart)))
|
||||
|
||||
if err != nil {
|
||||
log.Warn("compaction IO wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
log.Warn("compact IO wrong", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -732,17 +724,17 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
|
||||
inPaths, statsPaths, numRows, err := t.merge(ctxTimeout, allPath, targetSegID, partID, meta, deltaPk2Ts)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
log.Warn("compact wrong", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
uploadDeltaStart := time.Now()
|
||||
deltaInfo, err := t.uploadDeltaLog(ctxTimeout, targetSegID, partID, deltaBuf.delData, meta)
|
||||
if err != nil {
|
||||
log.Warn("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
log.Warn("compact wrong", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
log.Info("upload delta log elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(uploadDeltaStart))))
|
||||
log.Info("compact upload deltalog elapse", zap.Duration("elapse", time.Since(uploadDeltaStart)))
|
||||
|
||||
for _, fbl := range deltaInfo {
|
||||
for _, deltaLogInfo := range fbl.GetBinlogs() {
|
||||
|
@ -765,8 +757,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
|
||||
t.inject = ti
|
||||
|
||||
log.Info("compaction done",
|
||||
zap.Int64("planID", t.plan.GetPlanID()),
|
||||
log.Info("compact done",
|
||||
zap.Int64("targetSegmentID", targetSegID),
|
||||
zap.Int64s("compactedFrom", segIDs),
|
||||
zap.Int("num of binlog paths", len(inPaths)),
|
||||
|
@ -774,8 +765,8 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
zap.Int("num of delta paths", len(deltaInfo)),
|
||||
)
|
||||
|
||||
log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(compactStart))))
|
||||
metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.tr.ElapseSpan().Milliseconds()))
|
||||
log.Info("compact overall elapse", zap.Duration("elapse", time.Since(compactStart)))
|
||||
metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(t.tr.ElapseSpan().Seconds())
|
||||
metrics.DataNodeCompactionLatencyInQueue.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(durInQueue.Milliseconds()))
|
||||
|
||||
return pack, nil
|
||||
|
@ -785,8 +776,7 @@ func (t *compactionTask) injectDone(success bool) {
|
|||
if t.inject != nil {
|
||||
uninjectStart := time.Now()
|
||||
t.inject.injectDone(success)
|
||||
uninjectEnd := time.Now()
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Duration("elapse", time.Since(uninjectStart)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -154,7 +154,7 @@ var (
|
|||
Subsystem: typeutil.DataNodeRole,
|
||||
Name: "compaction_latency",
|
||||
Help: "latency of compaction operation",
|
||||
Buckets: buckets,
|
||||
Buckets: []float64{0.001, 0.1, 0.5, 1, 5, 10, 20, 50, 100, 250, 500, 1000, 3600, 5000, 10000}, // unit seconds
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue