diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 5cbc59987b..3250e26a14 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -201,16 +201,11 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT dbuff.accumulateEntriesNum(dbuff.delData.RowCount) log.Info("mergeDeltalogs end", zap.Int("number of deleted pks to compact in insert logs", len(pk2ts)), - zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart)))) + zap.Duration("elapse", time.Since(mergeStart))) return pk2ts, dbuff, nil } -// nano2Milli transfers nanoseconds to milliseconds in unit -func nano2Milli(nano time.Duration) float64 { - return float64(nano) / float64(time.Millisecond) -} - func (t *compactionTask) uploadRemainLog( ctxTimeout context.Context, targetSegID UniqueID, @@ -499,16 +494,19 @@ func (t *compactionTask) merge( statPaths = append(statPaths, path) } - log.Info("merge end", zap.Int64("remaining insert numRows", numRows), - zap.Int64("expired entities", expired), zap.Int("binlog file number", numBinlogs), - zap.Float64("download insert log elapse in ms", nano2Milli(downloadTimeCost)), - zap.Float64("upload insert log elapse in ms", nano2Milli(uploadInsertTimeCost)), - zap.Float64("merge elapse in ms", nano2Milli(time.Since(mergeStart)))) + log.Info("compact merge end", + zap.Int64("remaining insert numRows", numRows), + zap.Int64("expired entities", expired), + zap.Int("binlog file number", numBinlogs), + zap.Duration("download insert log elapse", downloadTimeCost), + zap.Duration("upload insert log elapse", uploadInsertTimeCost), + zap.Duration("merge elapse", time.Since(mergeStart))) return insertPaths, statPaths, numRows, nil } func (t *compactionTask) compact() (*datapb.CompactionResult, error) { + log := log.With(zap.Int64("planID", t.plan.GetPlanID())) compactStart := time.Now() if ok := funcutil.CheckCtxValid(t.ctx); !ok { log.Warn("compact wrong, task context done or timeout") @@ -539,7 +537,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { } } - log.Info("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds())) + log.Info("compact start", zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds())) segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs())) for _, s := range t.plan.GetSegmentBinlogs() { segIDs = append(segIDs, s.GetSegmentID()) @@ -547,7 +545,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { _, partID, meta, err := t.getSegmentMeta(segIDs[0]) if err != nil { - log.Warn("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + log.Warn("compact wrong", zap.Error(err)) return nil, err } @@ -654,10 +652,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { t.injectFlush(ti, segIDs...) <-ti.Injected() - injectEnd := time.Now() - defer func() { - log.Info("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(injectEnd.Sub(injectStart)))) - }() + log.Info("compact inject elapse", zap.Duration("elapse", time.Since(injectStart))) var ( // SegmentID to deltaBlobs @@ -681,7 +676,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { } // Unable to deal with all empty segments cases, so return error if binlogNum == 0 { - log.Warn("compact wrong, all segments' binlogs are empty", zap.Int64("planID", t.plan.GetPlanID())) + log.Warn("compact wrong, all segments' binlogs are empty") return nil, errIllegalCompactionPlan } @@ -700,7 +695,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { g.Go(func() error { bs, err := t.download(gCtx, []string{path}) if err != nil { - log.Warn("download deltalogs wrong", zap.String("path", path), zap.Error(err)) + log.Warn("compact download deltalogs wrong", zap.String("path", path), zap.Error(err)) return err } @@ -715,13 +710,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { } err = g.Wait() - downloadEnd := time.Now() - defer func() { - log.Info("download deltalogs elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(downloadEnd.Sub(downloadStart)))) - }() + log.Info("compact download deltalogs elapse", zap.Duration("elapse", time.Since(downloadStart))) if err != nil { - log.Warn("compaction IO wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + log.Warn("compact IO wrong", zap.Error(err)) return nil, err } @@ -732,17 +724,17 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { inPaths, statsPaths, numRows, err := t.merge(ctxTimeout, allPath, targetSegID, partID, meta, deltaPk2Ts) if err != nil { - log.Warn("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + log.Warn("compact wrong", zap.Error(err)) return nil, err } uploadDeltaStart := time.Now() deltaInfo, err := t.uploadDeltaLog(ctxTimeout, targetSegID, partID, deltaBuf.delData, meta) if err != nil { - log.Warn("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + log.Warn("compact wrong", zap.Error(err)) return nil, err } - log.Info("upload delta log elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(uploadDeltaStart)))) + log.Info("compact upload deltalog elapse", zap.Duration("elapse", time.Since(uploadDeltaStart))) for _, fbl := range deltaInfo { for _, deltaLogInfo := range fbl.GetBinlogs() { @@ -765,8 +757,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { t.inject = ti - log.Info("compaction done", - zap.Int64("planID", t.plan.GetPlanID()), + log.Info("compact done", zap.Int64("targetSegmentID", targetSegID), zap.Int64s("compactedFrom", segIDs), zap.Int("num of binlog paths", len(inPaths)), @@ -774,8 +765,8 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { zap.Int("num of delta paths", len(deltaInfo)), ) - log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(compactStart)))) - metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.tr.ElapseSpan().Milliseconds())) + log.Info("compact overall elapse", zap.Duration("elapse", time.Since(compactStart))) + metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(t.tr.ElapseSpan().Seconds()) metrics.DataNodeCompactionLatencyInQueue.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(durInQueue.Milliseconds())) return pack, nil @@ -785,8 +776,7 @@ func (t *compactionTask) injectDone(success bool) { if t.inject != nil { uninjectStart := time.Now() t.inject.injectDone(success) - uninjectEnd := time.Now() - log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart)))) + log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Duration("elapse", time.Since(uninjectStart))) } } diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index d8ca092ade..89a9c5cb2c 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -154,7 +154,7 @@ var ( Subsystem: typeutil.DataNodeRole, Name: "compaction_latency", Help: "latency of compaction operation", - Buckets: buckets, + Buckets: []float64{0.001, 0.1, 0.5, 1, 5, 10, 20, 50, 100, 250, 500, 1000, 3600, 5000, 10000}, // unit seconds }, []string{ nodeIDLabelName, })