mirror of https://github.com/milvus-io/milvus.git
Add some msgstream metrics (#20601)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com> Signed-off-by: yun.zhang <yun.zhang@zilliz.com>pull/20622/head
parent
6c651d25a4
commit
cb294d1441
|
@ -24,7 +24,7 @@ We recommend reading [Milvus monitoring framework overview](monitor_overview.md)
|
|||
| Mutation Send Latency | The average latency and the 99th percentile of the latency of sending insertion or deletion requests by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, msg_type, pod, node_id) (rate(milvus_proxy_mutation_send_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_mutation_send_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id, msg_type) / sum(increase(milvus_proxy_mutation_send_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id, msg_type) ``` | `milvus_proxy_mutation_send_latency` | The latency of sending insertion or deletion requests. |
|
||||
| Cache Hit Rate | The average cache hit rate of operations including `GeCollectionID`, `GetCollectionInfo `, and `GetCollectionSchema` per second within the past two minutes. | ``` sum(increase(milvus_proxy_cache_hit_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace", cache_state="hit"}[2m])/120) by(cache_name, pod, node_id) / sum(increase(milvus_proxy_cache_hit_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by(cache_name, pod, node_id) ``` | `milvus_proxy_cache_hit_count` | The statistics of hit and failure rate of each cache reading operation. |
|
||||
| Cache Update Latency | The average latency and the 99th percentile of cache update latency by proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_cache_update_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_cache_update_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_cache_update_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_cache_update_latency` | The latency of updating cache each time. |
|
||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each proxy in its corresponding physical channel. | ``` avg(milvus_proxy_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) max(milvus_proxy_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) min(milvus_proxy_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_proxy_sync_epoch_time ` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). <br/> There is a default `ChannelName` apart from the physical channels. |
|
||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each proxy in its corresponding physical channel. | ``` avg(milvus_proxy_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) max(milvus_proxy_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) min(milvus_proxy_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_proxy_tt_lag_ms ` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). <br/> There is a default `ChannelName` apart from the physical channels. |
|
||||
| Apply PK Latency | The average latency and the 99th percentile of primary key application latency by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_apply_pk_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_apply_pk_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_apply_pk_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_apply_pk_latency` | The latency of applying primary key. |
|
||||
| Apply Timestamp Latency | The average latency and the 99th percentile of timestamp application latency by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_apply_timestamp_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_apply_timestamp_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_apply_timestamp_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_apply_timestamp_latency` | The latency of applying timestamp. |
|
||||
| DQL Request Rate | The status and number of DQL requests received per second by each proxy within the past two minutes. <br/> DQL requests include `DescribeCollection`, `DescribeIndex`, `GetCollectionStatistics`, `HasCollection`, `Search`, `Query`, `ShowPartitions`, etc. This panel specifically shows the total number and the number of successful DQL requests. | ``` sum(increase(milvus_proxy_dql_req_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by(function_name, status, pod, node_id) ``` | `milvus_proxy_dql_req_count` | The number of all types of DQL requests. |
|
||||
|
@ -45,7 +45,7 @@ We recommend reading [Milvus monitoring framework overview](monitor_overview.md)
|
|||
| Panel | Panel description | PromQL (Prometheus query language) | The Milvus metrics used | Milvus metrics description |
|
||||
|---|---|---|---|---|
|
||||
| Proxy Node Num | The number of proxies created. | ``` sum(milvus_rootcoord_proxy_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_rootcoord_proxy_num` | The number of proxies. |
|
||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each root coord in each physical channel (PChannel). | ``` avg(milvus_rootcoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) max(milvus_rootcoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) min(milvus_rootcoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_rootcoord_sync_epoch_time` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). |
|
||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each root coord in each physical channel (PChannel). | ``` avg(milvus_rootcoord_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) max(milvus_rootcoord_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) min(milvus_rootcoord_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_rootcoord_produce_tt_lag_ms` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). |
|
||||
| DDL Request Rate | The status and number of DDL requests per second within the past two minutes. | ``` sum(increase(milvus_rootcoord_ddl_req_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (status, function_name) ``` | `milvus_rootcoord_ddl_req_count` | The total number of DDL requests including `CreateCollection`, `DescribeCollection`, `DescribeSegments`, `HasCollection`, `ShowCollections`, `ShowPartitions`, and `ShowSegments`. |
|
||||
| DDL Request Latency | The average latency and the 99th percentile of DDL request latency within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, function_name) (rate(milvus_rootcoord_ddl_req_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_rootcoord_ddl_req_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (function_name) / sum(increase(milvus_rootcoord_ddl_req_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (function_name) ``` | `milvus_rootcoord_ddl_req_latency` | The latency of all types of DDL requests. |
|
||||
| Sync Timetick Latency | The average latency and the 99th percentile of the time used by root coord to sync all timestamp to PChannel within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le) (rate(milvus_rootcoord_sync_timetick_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_rootcoord_sync_timetick_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) / sum(increase(milvus_rootcoord_sync_timetick_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) ``` | `milvus_rootcoord_sync_timetick_latency` | the time used by root coord to sync all timestamp to pchannel. |
|
||||
|
@ -120,7 +120,7 @@ We recommend reading [Milvus monitoring framework overview](monitor_overview.md)
|
|||
| Collection Num | The number of collections recorded in metadata by data coord. | ``` sum(milvus_datacoord_collection_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_collection_num` | The number of collections recorded in metadata by data coord. |
|
||||
| Stored Rows | The accumulated number of rows of valid and flushed data in data coord. | ``` sum(milvus_datacoord_stored_rows_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_stored_rows_num` | The accumulated number of rows of valid and flushed data in data coord. |
|
||||
| Stored Rows Rate | The average number of rows flushed per second within the past two minutes. | ``` sum(increase(milvus_datacoord_stored_rows_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (pod, node_id) ``` | `milvus_datacoord_stored_rows_count` | The accumulated number of rows flushed by data coord. |
|
||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by data coord in each physical channel. | ``` avg(milvus_datacoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) max(milvus_datacoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) min(milvus_datacoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_sync_epoch_time` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). |
|
||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by data coord in each physical channel. | ``` avg(milvus_datacoord_consumer_datanode_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) max(milvus_datacoord_consumer_datanode_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) min(milvus_datacoord_consumer_datanode_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_consumer_datanode_tt_lag_ms` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). |
|
||||
|
||||
</details>
|
||||
|
||||
|
@ -133,7 +133,7 @@ We recommend reading [Milvus monitoring framework overview](monitor_overview.md)
|
|||
| Flush Data Size Rate | The size of each flushed message recorded per second by each data node within the past two minutes. | ``` sum(increase(milvus_datanode_flushed_data_size{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (msg_type, pod, node_id) ``` | `milvus_datanode_flushed_data_size` | The size of each flushed message. <br/> Currently, streaming messages counted by data node only include insertion and deletion messages. |
|
||||
| Consumer Num | The number of consumers created on each data node. | ``` sum(milvus_datanode_consumer_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_consumer_num` | The number of consumers created on each data node. <br/> Each flowgraph corresponds to a consumer. |
|
||||
| Producer Num | The number of producers created on each data node. | ``` sum(milvus_datanode_producer_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_producer_num` | The number of consumers created on each data node. <br/> Each shard in a collection corresponds to a delta channel producer and a timetick channel producer. |
|
||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each data node in all physical topics. | ``` avg(milvus_datanode_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) max(milvus_datanode_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) min(milvus_datanode_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_sync_epoch_time` | The epoch time (Unix time, the milliseconds passed ever since January 1, 1970.) of each physical topic on a data node. |
|
||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each data node in all physical topics. | ``` avg(milvus_datanode_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) max(milvus_datanode_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) min(milvus_datanode_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_produce_tt_lag_ms` | The epoch time (Unix time, the milliseconds passed ever since January 1, 1970.) of each physical topic on a data node. |
|
||||
| Unflushed Segment Num | The number of unflushed segments created on each data node. | ``` sum(milvus_datanode_unflushed_segment_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_unflushed_segment_num` | The number of unflushed segments created on each data node. |
|
||||
| Encode Buffer Latency | The average latency and the 99th percentile of the time used to encode a buffer by each data node within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_datanode_encode_buffer_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_datanode_encode_buffer_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) / sum(increase(milvus_datanode_encode_buffer_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) ``` | `milvus_datanode_encode_buffer_latency` | The time each data node takes to encode a buffer. |
|
||||
| Save Data Latency | The average latency and the 99th percentile of the time used to write a buffer into the storage layer by each data node within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_datanode_save_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_datanode_save_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) / sum(increase(milvus_datanode_save_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) ``` | `milvus_datanode_save_latency` | The time each data node takes to write a buffer into the storage layer. |
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -550,10 +550,11 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
|
|||
log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
|
||||
}
|
||||
|
||||
utcT, _ := tsoutil.ParseHybridTs(ts)
|
||||
|
||||
sub := tsoutil.SubByNow(ts)
|
||||
pChannelName := funcutil.ToPhysicalChannel(ch)
|
||||
metrics.DataCoordSyncEpoch.WithLabelValues(pChannelName).Set(float64(utcT))
|
||||
metrics.DataCoordConsumeDataNodeTimeTickLag.
|
||||
WithLabelValues(fmt.Sprint(Params.DataCoordCfg.GetNodeID()), pChannelName).
|
||||
Set(float64(sub))
|
||||
|
||||
s.updateSegmentStatistics(ttMsg.GetSegmentsStats())
|
||||
|
||||
|
|
|
@ -20,9 +20,10 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"go.uber.org/zap"
|
||||
|
@ -140,6 +141,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||
log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName))
|
||||
ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vChannelName)
|
||||
fgMsg.dropCollection = true
|
||||
|
||||
pChan := funcutil.ToPhysicalChannel(ddn.vChannelName)
|
||||
metrics.CleanupDataNodeCollectionMetrics(Params.DataNodeCfg.GetNodeID(), ddn.collectionID, pChan)
|
||||
}
|
||||
|
||||
case commonpb.MsgType_DropPartition:
|
||||
|
@ -171,7 +175,14 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||
}
|
||||
|
||||
rateCol.Add(metricsinfo.InsertConsumeThroughput, float64(proto.Size(&imsg.InsertRequest)))
|
||||
metrics.DataNodeConsumeCounter.WithLabelValues(strconv.FormatInt(Params.DataNodeCfg.GetNodeID(), 10), metrics.InsertLabel).Add(float64(proto.Size(&imsg.InsertRequest)))
|
||||
|
||||
metrics.DataNodeConsumeBytesCount.
|
||||
WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.InsertLabel).
|
||||
Add(float64(proto.Size(&imsg.InsertRequest)))
|
||||
|
||||
metrics.DataNodeConsumeMsgCount.
|
||||
WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.InsertLabel, fmt.Sprint(ddn.collectionID)).
|
||||
Inc()
|
||||
|
||||
log.Debug("DDNode receive insert messages",
|
||||
zap.Int("numRows", len(imsg.GetRowIDs())),
|
||||
|
@ -194,7 +205,14 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||
continue
|
||||
}
|
||||
rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest)))
|
||||
metrics.DataNodeConsumeCounter.WithLabelValues(strconv.FormatInt(Params.DataNodeCfg.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(proto.Size(&dmsg.DeleteRequest)))
|
||||
|
||||
metrics.DataNodeConsumeBytesCount.
|
||||
WithLabelValues(fmt.Sprint(), metrics.DeleteLabel).
|
||||
Add(float64(proto.Size(&dmsg.DeleteRequest)))
|
||||
|
||||
metrics.DataNodeConsumeMsgCount.
|
||||
WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.DeleteLabel, fmt.Sprint(ddn.collectionID)).
|
||||
Inc()
|
||||
fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg)
|
||||
}
|
||||
}
|
||||
|
@ -261,6 +279,8 @@ func (ddn *ddNode) isDropped(segID UniqueID) bool {
|
|||
}
|
||||
|
||||
func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error {
|
||||
tr := timerecord.NewTimeRecorder("forwardDeleteMsg")
|
||||
|
||||
if len(msgs) != 0 {
|
||||
var msgPack = msgstream.MsgPack{
|
||||
Msgs: msgs,
|
||||
|
@ -274,6 +294,10 @@ func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, max
|
|||
if err := ddn.sendDeltaTimeTick(maxTs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metrics.DataNodeForwardDeleteMsgTimeTaken.
|
||||
WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).
|
||||
Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all
|
||||
|
@ -78,6 +79,7 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
|
|||
log.Info("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID))
|
||||
|
||||
name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
|
||||
node := flowgraph.NewInputNode(insertStream, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism)
|
||||
node := flowgraph.NewInputNode(insertStream, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism,
|
||||
typeutil.DataNodeRole, Params.DataNodeCfg.GetNodeID(), dmNodeConfig.collectionID, metrics.AllLabel)
|
||||
return node, nil
|
||||
}
|
||||
|
|
|
@ -646,9 +646,11 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl
|
|||
},
|
||||
}
|
||||
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
||||
pt, _ := tsoutil.ParseHybridTs(ts)
|
||||
sub := tsoutil.SubByNow(ts)
|
||||
pChan := funcutil.ToPhysicalChannel(config.vChannelName)
|
||||
metrics.DataNodeTimeSync.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), pChan).Set(float64(pt))
|
||||
metrics.DataNodeProduceTimeTickLag.
|
||||
WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), fmt.Sprint(collID), pChan).
|
||||
Set(float64(sub))
|
||||
return wTtMsgStream.Produce(&msgPack)
|
||||
})
|
||||
|
||||
|
|
|
@ -76,13 +76,16 @@ var (
|
|||
Help: "count of all stored rows ever",
|
||||
}, []string{})
|
||||
|
||||
DataCoordSyncEpoch = prometheus.NewGaugeVec(
|
||||
DataCoordConsumeDataNodeTimeTickLag = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataCoordRole,
|
||||
Name: "sync_epoch_time",
|
||||
Help: "synchronized unix epoch per physical channel",
|
||||
}, []string{channelNameLabelName})
|
||||
Name: "consume_datanode_tt_lag_ms",
|
||||
Help: "now time minus tt per physical channel",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
channelNameLabelName,
|
||||
})
|
||||
|
||||
DataCoordStoredBinlogSize = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
|
@ -146,6 +149,6 @@ func RegisterDataCoord(registry *prometheus.Registry) {
|
|||
registry.MustRegister(DataCoordNumCollections)
|
||||
registry.MustRegister(DataCoordNumStoredRows)
|
||||
registry.MustRegister(DataCoordNumStoredRowsCounter)
|
||||
registry.MustRegister(DataCoordSyncEpoch)
|
||||
registry.MustRegister(DataCoordConsumeDataNodeTimeTickLag)
|
||||
registry.MustRegister(DataCoordStoredBinlogSize)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
@ -74,17 +76,42 @@ var (
|
|||
nodeIDLabelName,
|
||||
})
|
||||
|
||||
DataNodeTimeSync = prometheus.NewGaugeVec(
|
||||
DataNodeConsumeTimeTickLag = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataNodeRole,
|
||||
Name: "sync_epoch_time",
|
||||
Help: "synchronized unix epoch per physical channel",
|
||||
Name: "consume_tt_lag_ms",
|
||||
Help: "now time minus tt per physical channel",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
msgTypeLabelName,
|
||||
collectionIDLabelName,
|
||||
})
|
||||
|
||||
DataNodeProduceTimeTickLag = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataNodeRole,
|
||||
Name: "produce_tt_lag_ms",
|
||||
Help: "now time minus tt pts per physical channel",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
collectionIDLabelName,
|
||||
channelNameLabelName,
|
||||
})
|
||||
|
||||
DataNodeConsumeMsgCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataNodeRole,
|
||||
Name: "consume_msg_count",
|
||||
Help: "count of consumed msg",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
msgTypeLabelName,
|
||||
collectionIDLabelName,
|
||||
})
|
||||
|
||||
DataNodeNumUnflushedSegments = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
|
@ -163,14 +190,23 @@ var (
|
|||
statusLabelName,
|
||||
})
|
||||
|
||||
// DataNodeConsumeCounter counts the bytes DataNode consumed from message storage.
|
||||
DataNodeConsumeCounter = prometheus.NewCounterVec(
|
||||
// DataNodeConsumeBytesCount counts the bytes DataNode consumed from message storage.
|
||||
DataNodeConsumeBytesCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataNodeRole,
|
||||
Name: "consume_counter",
|
||||
Help: "",
|
||||
}, []string{nodeIDLabelName, msgTypeLabelName})
|
||||
|
||||
DataNodeForwardDeleteMsgTimeTaken = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataNodeRole,
|
||||
Name: "forward_delete_msg_time_taken_ms",
|
||||
Help: "forward delete message time taken",
|
||||
Buckets: buckets, // unit: ms
|
||||
}, []string{nodeIDLabelName})
|
||||
)
|
||||
|
||||
//RegisterDataNode registers DataNode metrics
|
||||
|
@ -180,7 +216,7 @@ func RegisterDataNode(registry *prometheus.Registry) {
|
|||
registry.MustRegister(DataNodeFlushedSize)
|
||||
registry.MustRegister(DataNodeNumConsumers)
|
||||
registry.MustRegister(DataNodeNumProducers)
|
||||
registry.MustRegister(DataNodeTimeSync)
|
||||
registry.MustRegister(DataNodeConsumeTimeTickLag)
|
||||
registry.MustRegister(DataNodeNumUnflushedSegments)
|
||||
registry.MustRegister(DataNodeEncodeBufferLatency)
|
||||
registry.MustRegister(DataNodeSave2StorageLatency)
|
||||
|
@ -188,5 +224,36 @@ func RegisterDataNode(registry *prometheus.Registry) {
|
|||
registry.MustRegister(DataNodeAutoFlushBufferCount)
|
||||
registry.MustRegister(DataNodeCompactionLatency)
|
||||
registry.MustRegister(DataNodeFlushReqCounter)
|
||||
registry.MustRegister(DataNodeConsumeCounter)
|
||||
registry.MustRegister(DataNodeConsumeMsgCount)
|
||||
registry.MustRegister(DataNodeProduceTimeTickLag)
|
||||
registry.MustRegister(DataNodeConsumeBytesCount)
|
||||
registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken)
|
||||
}
|
||||
|
||||
func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) {
|
||||
DataNodeConsumeTimeTickLag.
|
||||
Delete(
|
||||
prometheus.Labels{
|
||||
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||
msgTypeLabelName: AllLabel,
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
|
||||
DataNodeProduceTimeTickLag.
|
||||
Delete(
|
||||
prometheus.Labels{
|
||||
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
channelNameLabelName: channel,
|
||||
})
|
||||
|
||||
for _, label := range []string{AllLabel, DeleteLabel, InsertLabel} {
|
||||
DataNodeConsumeMsgCount.
|
||||
Delete(
|
||||
prometheus.Labels{
|
||||
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||
msgTypeLabelName: label,
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,13 +34,14 @@ const (
|
|||
FailLabel = "fail"
|
||||
TotalLabel = "total"
|
||||
|
||||
InsertLabel = "insert"
|
||||
DeleteLabel = "delete"
|
||||
SearchLabel = "search"
|
||||
QueryLabel = "query"
|
||||
|
||||
InsertLabel = "insert"
|
||||
DeleteLabel = "delete"
|
||||
SearchLabel = "search"
|
||||
QueryLabel = "query"
|
||||
CacheHitLabel = "hit"
|
||||
CacheMissLabel = "miss"
|
||||
TimetickLabel = "timetick"
|
||||
AllLabel = "all"
|
||||
|
||||
UnissuedIndexTaskLabel = "unissued"
|
||||
InProgressIndexTaskLabel = "in-progress"
|
||||
|
|
|
@ -150,13 +150,13 @@ var (
|
|||
Buckets: buckets, // unit: ms
|
||||
}, []string{nodeIDLabelName})
|
||||
|
||||
// ProxySyncTimeTick record Proxy synchronization timestamp statistics, differentiated by Channel.
|
||||
ProxySyncTimeTick = prometheus.NewGaugeVec(
|
||||
// ProxySyncTimeTickLag record Proxy synchronization timestamp statistics, differentiated by Channel.
|
||||
ProxySyncTimeTickLag = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "sync_epoch_time",
|
||||
Help: "synchronized unix epoch per physical channel and default channel",
|
||||
Name: "tt_lag_ms",
|
||||
Help: "now time minus tt per physical channel",
|
||||
}, []string{nodeIDLabelName, channelNameLabelName})
|
||||
|
||||
// ProxyApplyPrimaryKeyLatency record the latency that apply primary key.
|
||||
|
@ -247,7 +247,7 @@ func RegisterProxy(registry *prometheus.Registry) {
|
|||
registry.MustRegister(ProxyCacheStatsCounter)
|
||||
registry.MustRegister(ProxyUpdateCacheLatency)
|
||||
|
||||
registry.MustRegister(ProxySyncTimeTick)
|
||||
registry.MustRegister(ProxySyncTimeTickLag)
|
||||
registry.MustRegister(ProxyApplyPrimaryKeyLatency)
|
||||
registry.MustRegister(ProxyApplyTimestampLatency)
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
@ -32,6 +34,30 @@ var (
|
|||
nodeIDLabelName,
|
||||
})
|
||||
|
||||
QueryNodeConsumeTimeTickLag = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.QueryNodeRole,
|
||||
Name: "consume_tt_lag_ms",
|
||||
Help: "now time minus tt per physical channel",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
msgTypeLabelName,
|
||||
collectionIDLabelName,
|
||||
})
|
||||
|
||||
QueryNodeConsumerMsgCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.QueryNodeRole,
|
||||
Name: "consume_msg_count",
|
||||
Help: "count of consumed msg",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
msgTypeLabelName,
|
||||
collectionIDLabelName,
|
||||
})
|
||||
|
||||
QueryNodeNumPartitions = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
|
@ -340,4 +366,27 @@ func RegisterQueryNode(registry *prometheus.Registry) {
|
|||
registry.MustRegister(QueryNodeNumEntities)
|
||||
registry.MustRegister(QueryNodeConsumeCounter)
|
||||
registry.MustRegister(QueryNodeExecuteCounter)
|
||||
registry.MustRegister(QueryNodeConsumerMsgCount)
|
||||
registry.MustRegister(QueryNodeConsumeTimeTickLag)
|
||||
}
|
||||
|
||||
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {
|
||||
for _, label := range []string{DeleteLabel, InsertLabel} {
|
||||
QueryNodeConsumerMsgCount.
|
||||
Delete(
|
||||
prometheus.Labels{
|
||||
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||
msgTypeLabelName: label,
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
|
||||
QueryNodeConsumeTimeTickLag.
|
||||
Delete(
|
||||
prometheus.Labels{
|
||||
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||
msgTypeLabelName: label,
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,8 +23,8 @@ var (
|
|||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.RootCoordRole,
|
||||
Name: "sync_epoch_time",
|
||||
Help: "synchronized unix epoch per physical channel",
|
||||
Name: "produce_tt_lag_ms",
|
||||
Help: "now time minus tt per physical channel",
|
||||
}, []string{channelNameLabelName})
|
||||
|
||||
RootCoordDDLReqCounter = prometheus.NewCounterVec(
|
||||
|
|
|
@ -290,8 +290,6 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
|
|||
|
||||
maxTs := ts
|
||||
for channel, ts := range stats {
|
||||
physicalTs, _ := tsoutil.ParseHybridTs(ts)
|
||||
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), channel).Set(float64(physicalTs))
|
||||
channels = append(channels, channel)
|
||||
tss = append(tss, ts)
|
||||
if ts > maxTs {
|
||||
|
@ -310,8 +308,8 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
|
|||
Timestamps: tss,
|
||||
DefaultTimestamp: maxTs,
|
||||
}
|
||||
maxPhysicalTs, _ := tsoutil.ParseHybridTs(maxTs)
|
||||
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "default").Set(float64(maxPhysicalTs))
|
||||
sub := tsoutil.SubByNow(maxTs)
|
||||
metrics.ProxySyncTimeTickLag.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "default").Set(float64(sub))
|
||||
status, err := node.rootCoord.UpdateChannelTimeTick(node.ctx, req)
|
||||
if err != nil {
|
||||
log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", zap.Error(err))
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
@ -70,7 +72,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
|
|||
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
|
||||
}
|
||||
|
||||
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel)
|
||||
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.InsertLabel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -136,7 +138,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
|
|||
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
|
||||
}
|
||||
|
||||
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel)
|
||||
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.DeleteLabel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -185,7 +187,8 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
|
|||
}
|
||||
|
||||
// newDmInputNode returns a new inputNode
|
||||
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory, collectionID UniqueID, vchannel Channel) (*flowgraph.InputNode, error) {
|
||||
|
||||
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory, collectionID UniqueID, vchannel Channel, dataType string) (*flowgraph.InputNode, error) {
|
||||
insertStream, err := factory.NewTtMsgStream(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -196,7 +199,8 @@ func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstre
|
|||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||
name := fmt.Sprintf("dmInputNode-query-%d-%s", collectionID, vchannel)
|
||||
node := flowgraph.NewInputNode(insertStream, name, maxQueueLength, maxParallelism)
|
||||
node := flowgraph.NewInputNode(insertStream, name, maxQueueLength, maxParallelism, typeutil.QueryNodeRole,
|
||||
Params.QueryNodeCfg.GetNodeID(), collectionID, dataType)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
|
@ -266,4 +270,6 @@ func (q *queryNodeFlowGraph) close() {
|
|||
zap.Int64("collectionID", q.collectionID),
|
||||
zap.String("vchannel", q.vchannel),
|
||||
)
|
||||
|
||||
metrics.CleanupQueryNodeCollectionMetrics(Params.QueryNodeCfg.GetNodeID(), q.collectionID)
|
||||
}
|
||||
|
|
|
@ -318,9 +318,9 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim
|
|||
return err
|
||||
}
|
||||
|
||||
physicalTs, _ := tsoutil.ParseHybridTs(ts)
|
||||
sub := tsoutil.SubByNow(ts)
|
||||
for _, chanName := range chanNames {
|
||||
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(physicalTs))
|
||||
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(sub))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -17,9 +17,15 @@
|
|||
package flowgraph
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
|
@ -30,9 +36,13 @@ import (
|
|||
// InputNode is the entry point of flowgragh
|
||||
type InputNode struct {
|
||||
BaseNode
|
||||
inStream msgstream.MsgStream
|
||||
name string
|
||||
closeOnce sync.Once
|
||||
inStream msgstream.MsgStream
|
||||
name string
|
||||
role string
|
||||
nodeID int64
|
||||
collectionID int64
|
||||
dataType string
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// IsInputNode returns whether Node is InputNode
|
||||
|
@ -76,6 +86,28 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
|
|||
if msgPack == nil {
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
sub := tsoutil.SubByNow(msgPack.EndTs)
|
||||
if inNode.role == typeutil.QueryNodeRole {
|
||||
metrics.QueryNodeConsumerMsgCount.
|
||||
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
|
||||
Inc()
|
||||
|
||||
metrics.QueryNodeConsumeTimeTickLag.
|
||||
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
|
||||
Set(float64(sub))
|
||||
}
|
||||
|
||||
if inNode.role == typeutil.DataNodeRole {
|
||||
metrics.DataNodeConsumeMsgCount.
|
||||
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
|
||||
Inc()
|
||||
|
||||
metrics.DataNodeConsumeTimeTickLag.
|
||||
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
|
||||
Set(float64(sub))
|
||||
}
|
||||
|
||||
var spans []opentracing.Span
|
||||
for _, msg := range msgPack.Msgs {
|
||||
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
|
||||
|
@ -101,14 +133,18 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
|
|||
}
|
||||
|
||||
// NewInputNode composes an InputNode with provided MsgStream, name and parameters
|
||||
func NewInputNode(inStream msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
|
||||
func NewInputNode(inStream msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32, role string, nodeID int64, collectionID int64, dataType string) *InputNode {
|
||||
baseNode := BaseNode{}
|
||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||
baseNode.SetMaxParallelism(maxParallelism)
|
||||
|
||||
return &InputNode{
|
||||
BaseNode: baseNode,
|
||||
inStream: inStream,
|
||||
name: nodeName,
|
||||
BaseNode: baseNode,
|
||||
inStream: inStream,
|
||||
name: nodeName,
|
||||
role: role,
|
||||
nodeID: nodeID,
|
||||
collectionID: collectionID,
|
||||
dataType: dataType,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ func TestInputNode(t *testing.T) {
|
|||
produceStream.Produce(&msgPack)
|
||||
|
||||
nodeName := "input_node"
|
||||
inputNode := NewInputNode(msgStream, nodeName, 100, 100)
|
||||
inputNode := NewInputNode(msgStream, nodeName, 100, 100, "", 0, 0, "")
|
||||
defer inputNode.Close()
|
||||
|
||||
isInputNode := inputNode.IsInputNode()
|
||||
|
@ -64,7 +64,7 @@ func Test_NewInputNode(t *testing.T) {
|
|||
nodeName := "input_node"
|
||||
var maxQueueLength int32
|
||||
var maxParallelism int32 = 100
|
||||
node := NewInputNode(nil, nodeName, maxQueueLength, maxParallelism)
|
||||
node := NewInputNode(nil, nodeName, maxQueueLength, maxParallelism, "", 0, 0, "")
|
||||
assert.NotNil(t, node)
|
||||
assert.Equal(t, node.name, nodeName)
|
||||
assert.Equal(t, node.maxQueueLength, maxQueueLength)
|
||||
|
|
|
@ -74,7 +74,7 @@ func TestNodeCtx_Start(t *testing.T) {
|
|||
produceStream.Produce(&msgPack)
|
||||
|
||||
nodeName := "input_node"
|
||||
inputNode := NewInputNode(msgStream, nodeName, 100, 100)
|
||||
inputNode := NewInputNode(msgStream, nodeName, 100, 100, "", 0, 0, "")
|
||||
|
||||
node := &nodeCtx{
|
||||
node: inputNode,
|
||||
|
|
|
@ -97,3 +97,10 @@ func AddPhysicalDurationOnTs(ts uint64, duration time.Duration) uint64 {
|
|||
func NewTSOKVBase(client *clientv3.Client, tsoRoot, subPath string) *etcdkv.EtcdKV {
|
||||
return etcdkv.NewEtcdKV(client, path.Join(tsoRoot, subPath))
|
||||
}
|
||||
|
||||
// SubByNow ts is a hybrid
|
||||
func SubByNow(ts uint64) int64 {
|
||||
utcT, _ := ParseHybridTs(ts)
|
||||
now := time.Now().UnixMilli()
|
||||
return now - utcT
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue