diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index b70bd2988b..3e94344d5b 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -19,6 +19,7 @@ package datanode import ( "context" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -145,9 +146,15 @@ func (c *compactionExecutor) clearTasksByChannel(channel string) { } func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanResult { + var ( + executing []int64 + completed []int64 + completedLevelZero []int64 + ) results := make([]*datapb.CompactionPlanResult, 0) // get executing results c.executing.Range(func(planID int64, task compactor) bool { + executing = append(executing, planID) results = append(results, &datapb.CompactionPlanResult{ State: commonpb.CompactionState_Executing, PlanID: planID, @@ -157,9 +164,27 @@ func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanR // get completed results c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { + completed = append(completed, planID) results = append(results, result) + + if result.GetType() == datapb.CompactionType_Level0DeleteCompaction { + completedLevelZero = append(completedLevelZero, planID) + } return true }) + // remote level zero results + lo.ForEach(completedLevelZero, func(planID int64, _ int) { + c.completed.Remove(planID) + }) + + if len(results) > 0 { + log.Info("DataNode Compaction results", + zap.Int64s("executing", executing), + zap.Int64s("completed", completed), + zap.Int64s("completed levelzero", completedLevelZero), + ) + } + return results } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 63a7d3eb2f..5029b22982 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -576,6 +576,7 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { State: commonpb.CompactionState_Completed, PlanID: t.getPlanID(), Segments: []*datapb.CompactionSegment{pack}, + Type: t.plan.GetType(), } return planResult, nil diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index cf85a36776..303ce1e0e6 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -199,6 +199,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel, fmt.Sprint(ddn.collectionID)). Inc() + metrics.DataNodeConsumeMsgRowsCount. + WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel). + Add(float64(imsg.GetNumRows())) + log.Debug("DDNode receive insert messages", zap.Int("numRows", len(imsg.GetRowIDs())), zap.String("vChannelName", ddn.vChannelName)) @@ -225,6 +229,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { metrics.DataNodeConsumeMsgCount. WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel, fmt.Sprint(ddn.collectionID)). Inc() + + metrics.DataNodeConsumeMsgRowsCount. + WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel). + Add(float64(dmsg.GetNumRows())) fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg) } } diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 70c94490f3..fc17026a55 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -227,6 +227,7 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error State: commonpb.CompactionState_Completed, Segments: resultSegments, Channel: t.plan.GetChannel(), + Type: t.plan.GetType(), } metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()). diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 28de7669d9..accbb04c34 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -27,7 +27,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -309,13 +308,6 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac }, nil } results := node.compactionExecutor.getAllCompactionResults() - - if len(results) > 0 { - planIDs := lo.Map(results, func(result *datapb.CompactionPlanResult, i int) UniqueID { - return result.GetPlanID() - }) - log.Info("Compaction results", zap.Int64s("planIDs", planIDs)) - } return &datapb.CompactionStateResponse{ Status: merr.Success(), Results: results, diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 1567249448..2ca14da53b 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -540,6 +540,7 @@ message CompactionPlanResult { common.CompactionState state = 2; repeated CompactionSegment segments = 3; string channel = 4; + CompactionType type = 5; } message CompactionStateResponse { @@ -882,4 +883,4 @@ message GcControlRequest { common.MsgBase base = 1; GcCommand command = 2; repeated common.KeyValuePair params = 3; -} \ No newline at end of file +} diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index a46629806d..10807708d8 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -192,7 +192,7 @@ var ( prometheus.CounterOpts{ Namespace: milvusNamespace, Subsystem: typeutil.DataNodeRole, - Name: "consume_counter", + Name: "consume_bytes_count", Help: "", }, []string{nodeIDLabelName, msgTypeLabelName})