enhance: [cp25]Enable to observe write amplification (#39661) (#39743)

pr: #39661

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/39932/head
XuanYang-cn 2025-02-17 16:00:17 +08:00 committed by GitHub
parent 56c1a8d462
commit 8067113133
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 39 additions and 0 deletions

View File

@ -18,6 +18,7 @@ package compaction
import (
"context"
"fmt"
"sync"
"github.com/samber/lo"
@ -25,6 +26,7 @@ import (
"golang.org/x/sync/semaphore"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -177,6 +179,24 @@ func (e *executor) executeTask(task Compactor) {
e.completed.Insert(result.GetPlanID(), result)
e.completedCompactor.Insert(result.GetPlanID(), task)
getLogSize := func(binlogs []*datapb.FieldBinlog) int64 {
size := int64(0)
for _, binlog := range binlogs {
for _, fbinlog := range binlog.GetBinlogs() {
size += fbinlog.GetLogSize()
}
}
return size
}
totalSize := lo.SumBy(result.Segments, func(seg *datapb.CompactionSegment) int64 {
return getLogSize(seg.GetInsertLogs()) +
getLogSize(seg.GetField2StatslogPaths()) +
getLogSize(seg.GetBm25Logs()) +
getLogSize(seg.GetDeltalogs())
})
metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.CompactionDataSourceLabel, fmt.Sprint(task.GetCollection())).Add(float64(totalSize))
log.Info("end to execute compaction")
}

View File

@ -179,6 +179,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
t.flushedSize = totalSize
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize))
metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, fmt.Sprint(t.collectionID)).Add(float64(t.flushedSize))
metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows))
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds()))

View File

@ -58,6 +58,18 @@ var (
segmentLevelLabelName,
})
DataNodeWriteBinlogSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "write_data_size",
Help: "byte size of datanode write to object storage, including flushed size",
}, []string{
nodeIDLabelName,
dataSourceLabelName,
collectionIDLabelName,
})
DataNodeFlushedRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
@ -274,6 +286,7 @@ func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeFlushReqCounter)
registry.MustRegister(DataNodeFlushedSize)
registry.MustRegister(DataNodeFlushedRows)
registry.MustRegister(DataNodeWriteBinlogSize)
// compaction related
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeCompactionLatencyInQueue)
@ -324,4 +337,8 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel
DataNodeCompactionMissingDeleteCount.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})
DataNodeWriteBinlogSize.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})
}

View File

@ -59,6 +59,7 @@ const (
StreamingDataSourceLabel = "streaming"
BulkinsertDataSourceLabel = "bulkinsert"
CompactionDataSourceLabel = "compaction"
Leader = "OnLeader"
FromLeader = "FromLeader"