fix: Add more throughput in related metrics (#30038)

This PR also fixes bugs in l0 compactor where
l0 results would never be removed from datanode

See also: #30099

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/30127/head
XuanYang-cn 2024-01-19 11:34:54 +08:00 committed by GitHub
parent ddccccbcab
commit 86f48861c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 38 additions and 10 deletions

View File

@ -19,6 +19,7 @@ package datanode
import (
"context"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -145,9 +146,15 @@ func (c *compactionExecutor) clearTasksByChannel(channel string) {
}
func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanResult {
var (
executing []int64
completed []int64
completedLevelZero []int64
)
results := make([]*datapb.CompactionPlanResult, 0)
// get executing results
c.executing.Range(func(planID int64, task compactor) bool {
executing = append(executing, planID)
results = append(results, &datapb.CompactionPlanResult{
State: commonpb.CompactionState_Executing,
PlanID: planID,
@ -157,9 +164,27 @@ func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanR
// get completed results
c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool {
completed = append(completed, planID)
results = append(results, result)
if result.GetType() == datapb.CompactionType_Level0DeleteCompaction {
completedLevelZero = append(completedLevelZero, planID)
}
return true
})
// remote level zero results
lo.ForEach(completedLevelZero, func(planID int64, _ int) {
c.completed.Remove(planID)
})
if len(results) > 0 {
log.Info("DataNode Compaction results",
zap.Int64s("executing", executing),
zap.Int64s("completed", completed),
zap.Int64s("completed levelzero", completedLevelZero),
)
}
return results
}

View File

@ -576,6 +576,7 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
State: commonpb.CompactionState_Completed,
PlanID: t.getPlanID(),
Segments: []*datapb.CompactionSegment{pack},
Type: t.plan.GetType(),
}
return planResult, nil

View File

@ -199,6 +199,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel, fmt.Sprint(ddn.collectionID)).
Inc()
metrics.DataNodeConsumeMsgRowsCount.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).
Add(float64(imsg.GetNumRows()))
log.Debug("DDNode receive insert messages",
zap.Int("numRows", len(imsg.GetRowIDs())),
zap.String("vChannelName", ddn.vChannelName))
@ -225,6 +229,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
metrics.DataNodeConsumeMsgCount.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel, fmt.Sprint(ddn.collectionID)).
Inc()
metrics.DataNodeConsumeMsgRowsCount.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Add(float64(dmsg.GetNumRows()))
fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg)
}
}

View File

@ -227,6 +227,7 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
State: commonpb.CompactionState_Completed,
Segments: resultSegments,
Channel: t.plan.GetChannel(),
Type: t.plan.GetType(),
}
metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()).

View File

@ -27,7 +27,6 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -309,13 +308,6 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
}, nil
}
results := node.compactionExecutor.getAllCompactionResults()
if len(results) > 0 {
planIDs := lo.Map(results, func(result *datapb.CompactionPlanResult, i int) UniqueID {
return result.GetPlanID()
})
log.Info("Compaction results", zap.Int64s("planIDs", planIDs))
}
return &datapb.CompactionStateResponse{
Status: merr.Success(),
Results: results,

View File

@ -540,6 +540,7 @@ message CompactionPlanResult {
common.CompactionState state = 2;
repeated CompactionSegment segments = 3;
string channel = 4;
CompactionType type = 5;
}
message CompactionStateResponse {

View File

@ -192,7 +192,7 @@ var (
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "consume_counter",
Name: "consume_bytes_count",
Help: "",
}, []string{nodeIDLabelName, msgTypeLabelName})