mirror of https://github.com/milvus-io/milvus.git
Add some prometheus metrics for proxy (#15582)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/15814/head
parent
bd7d294c6d
commit
d183748484
|
@ -21,18 +21,6 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
SuccessLabel = "success"
|
||||
FailLabel = "fail"
|
||||
TotalLabel = "total"
|
||||
|
||||
UnissuedIndexTaskLabel = "unissued"
|
||||
InProgressIndexTaskLabel = "in-progress"
|
||||
FinishedIndexTaskLabel = "finished"
|
||||
FailedIndexTaskLabel = "failed"
|
||||
RecycledIndexTaskLabel = "recycled"
|
||||
)
|
||||
|
||||
var (
|
||||
// IndexCoordIndexRequestCounter records the number of the index requests.
|
||||
IndexCoordIndexRequestCounter = prometheus.NewCounterVec(
|
||||
|
|
|
@ -31,6 +31,23 @@ import (
|
|||
|
||||
const (
|
||||
milvusNamespace = "milvus"
|
||||
|
||||
SuccessLabel = "success"
|
||||
FailLabel = "fail"
|
||||
TotalLabel = "total"
|
||||
AbandonLabel = "abandon"
|
||||
|
||||
SearchLabel = "search"
|
||||
QueryLabel = "query"
|
||||
|
||||
CacheHitLabel = "hit"
|
||||
CacheMissLabel = "miss"
|
||||
|
||||
UnissuedIndexTaskLabel = "unissued"
|
||||
InProgressIndexTaskLabel = "in-progress"
|
||||
FinishedIndexTaskLabel = "finished"
|
||||
FailedIndexTaskLabel = "failed"
|
||||
RecycledIndexTaskLabel = "recycled"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -220,372 +237,6 @@ func RegisterRootCoord() {
|
|||
//prometheus.MustRegister(PanicCounter)
|
||||
}
|
||||
|
||||
var (
|
||||
// ProxyCreateCollectionCounter counts the num of calls of CreateCollection
|
||||
ProxyCreateCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "create_collection_total",
|
||||
Help: "Counter of create collection",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyDropCollectionCounter counts the num of calls of DropCollection
|
||||
ProxyDropCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "drop_collection_total",
|
||||
Help: "Counter of drop collection",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyHasCollectionCounter counts the num of calls of HasCollection
|
||||
ProxyHasCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "has_collection_total",
|
||||
Help: "Counter of has collection",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyLoadCollectionCounter counts the num of calls of LoadCollection
|
||||
ProxyLoadCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "load_collection_total",
|
||||
Help: "Counter of load collection",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyReleaseCollectionCounter counts the num of calls of ReleaseCollection
|
||||
ProxyReleaseCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "release_collection_total",
|
||||
Help: "Counter of release collection",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyDescribeCollectionCounter counts the num of calls of DescribeCollection
|
||||
ProxyDescribeCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "describe_collection_total",
|
||||
Help: "Counter of describe collection",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetCollectionStatisticsCounter counts the num of calls of GetCollectionStatistics
|
||||
ProxyGetCollectionStatisticsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_collection_statistics_total",
|
||||
Help: "Counter of get collection statistics",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyShowCollectionsCounter counts the num of calls of ShowCollections
|
||||
ProxyShowCollectionsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "show_collections_total",
|
||||
Help: "Counter of show collections",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyCreatePartitionCounter counts the num of calls of CreatePartition
|
||||
ProxyCreatePartitionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "create_partition_total",
|
||||
Help: "Counter of create partition",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyDropPartitionCounter counts the num of calls of DropPartition
|
||||
ProxyDropPartitionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "drop_partition_total",
|
||||
Help: "Counter of drop partition",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyHasPartitionCounter counts the num of calls of HasPartition
|
||||
ProxyHasPartitionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "has_partition_total",
|
||||
Help: "Counter of has partition",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyLoadPartitionsCounter counts the num of calls of LoadPartitions
|
||||
ProxyLoadPartitionsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "load_partitions_total",
|
||||
Help: "Counter of load partitions",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyReleasePartitionsCounter counts the num of calls of ReleasePartitions
|
||||
ProxyReleasePartitionsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "release_partitions_total",
|
||||
Help: "Counter of release partitions",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetPartitionStatisticsCounter counts the num of calls of GetPartitionStatistics
|
||||
ProxyGetPartitionStatisticsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_partition_statistics_total",
|
||||
Help: "Counter of get partition statistics",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyShowPartitionsCounter counts the num of calls of ShowPartitions
|
||||
ProxyShowPartitionsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "show_partitions_total",
|
||||
Help: "Counter of show partitions",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyCreateIndexCounter counts the num of calls of CreateIndex
|
||||
ProxyCreateIndexCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "create_index_counter",
|
||||
Help: "Counter of create index",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyDescribeIndexCounter counts the num of calls of DescribeIndex
|
||||
ProxyDescribeIndexCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "describe_index_counter",
|
||||
Help: "Counter of describe index",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetIndexStateCounter counts the num of calls of GetIndexState
|
||||
ProxyGetIndexStateCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_index_state_counter",
|
||||
Help: "Counter of get index state",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetIndexBuildProgressCounter counts the num of calls of GetIndexBuildProgress
|
||||
ProxyGetIndexBuildProgressCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_index_build_progress_total",
|
||||
Help: "Counter of get index build progress",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyDropIndexCounter counts the num of calls of DropIndex
|
||||
ProxyDropIndexCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "drop_index_total",
|
||||
Help: "Counter of drop index",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyInsertCounter counts the num of calls of Insert
|
||||
ProxyInsertCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "insert_total",
|
||||
Help: "Counter of insert",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxySearchCounter counts the num of calls of Search
|
||||
ProxySearchCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "search_total",
|
||||
Help: "Counter of search",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyRetrieveCounter counts the num of calls of Retrieve
|
||||
ProxyRetrieveCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "retrieve_total",
|
||||
Help: "Counter of retrieve",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyFlushCounter counts the num of calls of Flush
|
||||
ProxyFlushCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "flush_total",
|
||||
Help: "Counter of flush",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyQueryCounter counts the num of calls of Query
|
||||
ProxyQueryCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "query_total",
|
||||
Help: "Counter of query",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetPersistentSegmentInfoCounter counts the num of calls of GetPersistentSegmentInfo
|
||||
ProxyGetPersistentSegmentInfoCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_persistent_segment_info_total",
|
||||
Help: "Counter of get persistent segment info",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetQuerySegmentInfoCounter counts the num of calls of GetQuerySegmentInfo
|
||||
ProxyGetQuerySegmentInfoCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_query_segment_info_total",
|
||||
Help: "Counter of get query segment info",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyDummyCounter counts the num of calls of Dummy
|
||||
ProxyDummyCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "dummy_total",
|
||||
Help: "Counter of dummy",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyRegisterLinkCounter counts the num of calls of RegisterLink
|
||||
ProxyRegisterLinkCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "register_link_total",
|
||||
Help: "Counter of register link",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetComponentStatesCounter counts the num of calls of GetComponentStates
|
||||
ProxyGetComponentStatesCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_component_states_total",
|
||||
Help: "Counter of get component states",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetStatisticsChannelCounter counts the num of calls of GetStatisticsChannel
|
||||
ProxyGetStatisticsChannelCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_statistics_channel_total",
|
||||
Help: "Counter of get statistics channel",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyInvalidateCollectionMetaCacheCounter counts the num of calls of InvalidateCollectionMetaCache
|
||||
ProxyInvalidateCollectionMetaCacheCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "invalidate_collection_meta_cache_total",
|
||||
Help: "Counter of invalidate collection meta cache",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyGetDdChannelCounter counts the num of calls of GetDdChannel
|
||||
ProxyGetDdChannelCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "get_dd_channel_total",
|
||||
Help: "Counter of get dd channel",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyReleaseDQLMessageStreamCounter counts the num of calls of ReleaseDQLMessageStream
|
||||
ProxyReleaseDQLMessageStreamCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "release_dql_message_stream_total",
|
||||
Help: "Counter of release dql message stream",
|
||||
}, []string{"status"})
|
||||
|
||||
// ProxyDmlChannelTimeTick counts the time tick value of dml channels
|
||||
ProxyDmlChannelTimeTick = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "dml_channels_time_tick",
|
||||
Help: "Time tick of dml channels",
|
||||
}, []string{"pchan"})
|
||||
)
|
||||
|
||||
//RegisterProxy registers Proxy metrics
|
||||
func RegisterProxy() {
|
||||
prometheus.MustRegister(ProxyCreateCollectionCounter)
|
||||
prometheus.MustRegister(ProxyDropCollectionCounter)
|
||||
prometheus.MustRegister(ProxyHasCollectionCounter)
|
||||
prometheus.MustRegister(ProxyLoadCollectionCounter)
|
||||
prometheus.MustRegister(ProxyReleaseCollectionCounter)
|
||||
prometheus.MustRegister(ProxyDescribeCollectionCounter)
|
||||
prometheus.MustRegister(ProxyGetCollectionStatisticsCounter)
|
||||
prometheus.MustRegister(ProxyShowCollectionsCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyCreatePartitionCounter)
|
||||
prometheus.MustRegister(ProxyDropPartitionCounter)
|
||||
prometheus.MustRegister(ProxyHasPartitionCounter)
|
||||
prometheus.MustRegister(ProxyLoadPartitionsCounter)
|
||||
prometheus.MustRegister(ProxyReleasePartitionsCounter)
|
||||
prometheus.MustRegister(ProxyGetPartitionStatisticsCounter)
|
||||
prometheus.MustRegister(ProxyShowPartitionsCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyCreateIndexCounter)
|
||||
prometheus.MustRegister(ProxyDescribeIndexCounter)
|
||||
prometheus.MustRegister(ProxyGetIndexStateCounter)
|
||||
prometheus.MustRegister(ProxyGetIndexBuildProgressCounter)
|
||||
prometheus.MustRegister(ProxyDropIndexCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyInsertCounter)
|
||||
prometheus.MustRegister(ProxySearchCounter)
|
||||
prometheus.MustRegister(ProxyRetrieveCounter)
|
||||
prometheus.MustRegister(ProxyFlushCounter)
|
||||
prometheus.MustRegister(ProxyQueryCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyGetPersistentSegmentInfoCounter)
|
||||
prometheus.MustRegister(ProxyGetQuerySegmentInfoCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyDummyCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyRegisterLinkCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyGetComponentStatesCounter)
|
||||
prometheus.MustRegister(ProxyGetStatisticsChannelCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyInvalidateCollectionMetaCacheCounter)
|
||||
prometheus.MustRegister(ProxyGetDdChannelCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyReleaseDQLMessageStreamCounter)
|
||||
|
||||
prometheus.MustRegister(ProxyDmlChannelTimeTick)
|
||||
}
|
||||
|
||||
var (
|
||||
//DataCoordDataNodeList records the num of regsitered data nodes
|
||||
DataCoordDataNodeList = prometheus.NewGaugeVec(
|
||||
|
|
|
@ -0,0 +1,338 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
coarseGrainedBuckets = []float64{1, 10, 20, 50, 100, 200, 500, 1000, 5000, 10000} // unit: ms
|
||||
fineGrainedBuckets = []float64{1, 2, 5, 8, 10, 20, 30, 40, 50, 100} // unit: ms
|
||||
)
|
||||
|
||||
var (
|
||||
// ProxyDmlChannelTimeTick counts the time tick value of dml channels
|
||||
ProxyDmlChannelTimeTick = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "dml_channels_time_tick",
|
||||
Help: "Time tick of dml channels",
|
||||
}, []string{"node_id", "pchan"})
|
||||
|
||||
// ProxySearchCount record the number of times search succeeded or failed.
|
||||
ProxySearchCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "search_counter",
|
||||
Help: "The number of times search succeeded or failed",
|
||||
}, []string{"node_id", "collection_id", "type", "status"})
|
||||
|
||||
// ProxyInsertCount record the number of times insert succeeded or failed.
|
||||
ProxyInsertCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "insert_counter",
|
||||
Help: "The number of times insert succeeded or failed",
|
||||
}, []string{"node_id", "collection_id", "status"})
|
||||
|
||||
// ProxySearchVectors record the number of vectors search successfully.
|
||||
ProxySearchVectors = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "search_vectors",
|
||||
Help: "The number of vectors search successfully",
|
||||
}, []string{"node_id", "collection_id", "type"})
|
||||
|
||||
// ProxyInsertVectors record the number of vectors insert successfully.
|
||||
ProxyInsertVectors = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "insert_vectors",
|
||||
Help: "The number of vectors insert successfully",
|
||||
}, []string{"node_id", "collection_id"})
|
||||
|
||||
// ProxyLinkedSDKs record The number of SDK linked proxy.
|
||||
// TODO: how to know when sdk disconnect?
|
||||
ProxyLinkedSDKs = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "linked_sdk_numbers",
|
||||
Help: "The number of SDK linked proxy",
|
||||
}, []string{"node_id"})
|
||||
|
||||
// ProxySearchLatency record the latency of search successfully.
|
||||
ProxySearchLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "search_latency",
|
||||
Help: "The latency of search successfully",
|
||||
Buckets: coarseGrainedBuckets,
|
||||
}, []string{"node_id", "collection_id", "type"})
|
||||
|
||||
// ProxySendMessageLatency record the latency that the proxy sent the search request to the message stream.
|
||||
ProxySendMessageLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "send_search_msg_time",
|
||||
Help: "The latency that the proxy sent the search request to the message stream",
|
||||
Buckets: fineGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "collection_id", "type"})
|
||||
|
||||
// ProxyWaitForSearchResultLatency record the time that the proxy waits for the search result.
|
||||
ProxyWaitForSearchResultLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "wait_for_search_result_time",
|
||||
Help: "The time that the proxy waits for the search result",
|
||||
Buckets: coarseGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "collection_id", "type"})
|
||||
|
||||
// ProxyReduceSearchResultLatency record the time that the proxy reduces search result.
|
||||
ProxyReduceSearchResultLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "reduce_search_result_time",
|
||||
Help: "The time that the proxy reduces search result",
|
||||
Buckets: fineGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "collection_id", "type"})
|
||||
|
||||
// ProxyDecodeSearchResultLatency record the time that the proxy decodes the search result.
|
||||
ProxyDecodeSearchResultLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "decode_search_result_time",
|
||||
Help: "The time that the proxy decodes the search result",
|
||||
Buckets: fineGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "collection_id", "type"})
|
||||
|
||||
// ProxyMsgStreamObjectsForPChan record the number of MsgStream objects per PChannel on each collection_id on Proxy.
|
||||
ProxyMsgStreamObjectsForPChan = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "msg_stream_obj_for_PChan",
|
||||
Help: "The number of MsgStream objects per PChannel on each collection on Proxy",
|
||||
}, []string{"node_id", "collection_id"})
|
||||
|
||||
// ProxyMsgStreamObjectsForSearch record the number of MsgStream objects for search per collection_id.
|
||||
ProxyMsgStreamObjectsForSearch = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "msg_stream_obj_for_search",
|
||||
Help: "The number of MsgStream objects for search per collection",
|
||||
}, []string{"node_id", "collection_id", "type"})
|
||||
|
||||
// ProxyInsertLatency record the latency that insert successfully.
|
||||
ProxyInsertLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "insert_latency",
|
||||
Help: "The latency that insert successfully.",
|
||||
Buckets: coarseGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "collection_id"})
|
||||
|
||||
// ProxyInsertColToRowLatency record the latency that column to row for inserting in Proxy.
|
||||
ProxyInsertColToRowLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "col_to_row_latency",
|
||||
Help: "The time that column to row for inserting in Proxy",
|
||||
Buckets: fineGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "collection_id"})
|
||||
|
||||
// ProxySendInsertReqLatency record the latency that Proxy send insert request to MsgStream.
|
||||
ProxySendInsertReqLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "send_insert_req_latency",
|
||||
Help: "The latency that Proxy send insert request to MsgStream",
|
||||
Buckets: fineGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "collection_id"})
|
||||
|
||||
// ProxyCacheHitCounter record the number of Proxy cache hits or miss.
|
||||
// TODO: @xiaocai2333 add more cache type
|
||||
ProxyCacheHitCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "cache_hits",
|
||||
Help: "Proxy cache hits",
|
||||
}, []string{"node_id", "cache_type", "hit_type"})
|
||||
|
||||
// ProxyUpdateCacheLatency record the time that proxy update cache when cache miss.
|
||||
ProxyUpdateCacheLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "update_cache_latency",
|
||||
Help: "The time that proxy update cache when cache miss",
|
||||
Buckets: fineGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id"})
|
||||
|
||||
// ProxySyncTimeTick record Proxy synchronization timestamp statistics, differentiated by Channel.
|
||||
ProxySyncTimeTick = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "sync_time_tick",
|
||||
Help: "Proxy synchronization timestamp statistics, differentiated by Channel",
|
||||
}, []string{"node_id", "channel"})
|
||||
|
||||
// ProxyApplyPrimaryKeyLatency record the latency that apply primary key.
|
||||
ProxyApplyPrimaryKeyLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "apply_pk_latency",
|
||||
Help: "The latency that apply primary key",
|
||||
Buckets: fineGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id"})
|
||||
|
||||
// ProxyApplyTimestampLatency record the latency that proxy apply timestamp.
|
||||
ProxyApplyTimestampLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "apply_timestamp_latency",
|
||||
Help: "The latency that proxy apply timestamp",
|
||||
Buckets: fineGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id"})
|
||||
|
||||
// ProxyDDLFunctionCall records the number of times the function of the DDL operation was executed, like `CreateCollection`.
|
||||
ProxyDDLFunctionCall = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "DDL_call_counter",
|
||||
Help: "the number of times the function of the DDL operation was executed",
|
||||
}, []string{"node_id", "function", "status"})
|
||||
|
||||
// ProxyDQLFunctionCall records the number of times the function of the DQL operation was executed, like `HasCollection`.
|
||||
ProxyDQLFunctionCall = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "DQL_call_counter",
|
||||
Help: "",
|
||||
}, []string{"node_id", "function", "collection_id", "status"})
|
||||
|
||||
// ProxyDMLFunctionCall records the number of times the function of the DML operation was executed, like `LoadCollection`.
|
||||
ProxyDMLFunctionCall = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "DML_call_counter",
|
||||
Help: "",
|
||||
}, []string{"node_id", "function", "collection_id", "status"})
|
||||
|
||||
// ProxyDDLReqLatency records the latency that for DML request, like "CreateCollection".
|
||||
ProxyDDLReqLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "DDL_call_latency",
|
||||
Help: "The latency that for DDL request",
|
||||
Buckets: coarseGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "function"})
|
||||
|
||||
// ProxyDMLReqLatency records the latency that for DML request.
|
||||
ProxyDMLReqLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "DML_call_latency",
|
||||
Help: "The latency that for DML request",
|
||||
Buckets: coarseGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "function", "collection_id"})
|
||||
|
||||
// ProxyDQLReqLatency record the latency that for DQL request, like "HasCollection".
|
||||
ProxyDQLReqLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "DQL_call_latency",
|
||||
Help: "The latency that for DQL request",
|
||||
Buckets: coarseGrainedBuckets, // unit: ms
|
||||
}, []string{"node_id", "function", "collection_id"})
|
||||
|
||||
// ProxySearchLatencyPerNQ records the latency for searching.
|
||||
ProxySearchLatencyPerNQ = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "proxy_search_latency_count",
|
||||
Help: "The latency for searching",
|
||||
Buckets: fineGrainedBuckets,
|
||||
}, []string{"node_id", "collection_id"})
|
||||
)
|
||||
|
||||
//RegisterProxy registers Proxy metrics
|
||||
func RegisterProxy() {
|
||||
prometheus.MustRegister(ProxyDmlChannelTimeTick)
|
||||
|
||||
prometheus.MustRegister(ProxySearchCount)
|
||||
prometheus.MustRegister(ProxyInsertCount)
|
||||
prometheus.MustRegister(ProxySearchVectors)
|
||||
prometheus.MustRegister(ProxyInsertVectors)
|
||||
|
||||
prometheus.MustRegister(ProxyLinkedSDKs)
|
||||
|
||||
prometheus.MustRegister(ProxySearchLatency)
|
||||
prometheus.MustRegister(ProxySearchLatencyPerNQ)
|
||||
prometheus.MustRegister(ProxySendMessageLatency)
|
||||
prometheus.MustRegister(ProxyWaitForSearchResultLatency)
|
||||
prometheus.MustRegister(ProxyReduceSearchResultLatency)
|
||||
prometheus.MustRegister(ProxyDecodeSearchResultLatency)
|
||||
|
||||
prometheus.MustRegister(ProxyMsgStreamObjectsForPChan)
|
||||
prometheus.MustRegister(ProxyMsgStreamObjectsForSearch)
|
||||
|
||||
prometheus.MustRegister(ProxyInsertLatency)
|
||||
prometheus.MustRegister(ProxyInsertColToRowLatency)
|
||||
prometheus.MustRegister(ProxySendInsertReqLatency)
|
||||
|
||||
prometheus.MustRegister(ProxyCacheHitCounter)
|
||||
prometheus.MustRegister(ProxyUpdateCacheLatency)
|
||||
|
||||
prometheus.MustRegister(ProxySyncTimeTick)
|
||||
prometheus.MustRegister(ProxyApplyPrimaryKeyLatency)
|
||||
prometheus.MustRegister(ProxyApplyTimestampLatency)
|
||||
|
||||
prometheus.MustRegister(ProxyDDLFunctionCall)
|
||||
prometheus.MustRegister(ProxyDQLFunctionCall)
|
||||
prometheus.MustRegister(ProxyDMLFunctionCall)
|
||||
prometheus.MustRegister(ProxyDDLReqLatency)
|
||||
prometheus.MustRegister(ProxyDMLReqLatency)
|
||||
prometheus.MustRegister(ProxyDQLReqLatency)
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metrics
|
|
@ -22,9 +22,11 @@ import (
|
|||
"fmt"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
|
@ -398,6 +400,8 @@ func (mgr *singleTypeChannelsMgr) createMsgStream(collectionID UniqueID) error {
|
|||
|
||||
mgr.updateCollection(collectionID, id)
|
||||
|
||||
metrics.ProxyMsgStreamObjectsForPChan.WithLabelValues(strconv.FormatInt(collectionID, 10), "PChan").Inc()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -427,6 +431,8 @@ func (mgr *singleTypeChannelsMgr) removeStream(collectionID UniqueID) error {
|
|||
mgr.deleteVChansByVIDs(ids)
|
||||
mgr.deleteStreamByVIDs(ids)
|
||||
|
||||
metrics.ProxyMsgStreamObjectsForPChan.WithLabelValues(strconv.FormatInt(collectionID, 10), "PChan").Dec()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -476,6 +482,7 @@ func (mgr *channelsMgrImpl) getVChannels(collectionID UniqueID) ([]vChan, error)
|
|||
}
|
||||
|
||||
func (mgr *channelsMgrImpl) createDQLStream(collectionID UniqueID) error {
|
||||
metrics.ProxyMsgStreamObjectsForSearch.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), strconv.FormatInt(collectionID, 10), "query").Inc()
|
||||
return mgr.dqlChannelsMgr.createMsgStream(collectionID)
|
||||
}
|
||||
|
||||
|
@ -484,6 +491,7 @@ func (mgr *channelsMgrImpl) getDQLStream(collectionID UniqueID) (msgstream.MsgSt
|
|||
}
|
||||
|
||||
func (mgr *channelsMgrImpl) removeDQLStream(collectionID UniqueID) error {
|
||||
metrics.ProxyMsgStreamObjectsForSearch.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), strconv.FormatInt(collectionID, 10), "query").Dec()
|
||||
return mgr.dqlChannelsMgr.removeStream(collectionID)
|
||||
}
|
||||
|
||||
|
|
|
@ -18,9 +18,12 @@ package proxy
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
@ -90,11 +93,13 @@ func (ticker *channelsTimeTickerImpl) initCurrents(current Timestamp) {
|
|||
}
|
||||
|
||||
func (ticker *channelsTimeTickerImpl) tick() error {
|
||||
applyStart := time.Now()
|
||||
now, err := ticker.tso.AllocOne()
|
||||
if err != nil {
|
||||
log.Warn("Proxy channelsTimeTickerImpl failed to get ts from tso", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
metrics.ProxyApplyTimestampLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(time.Since(applyStart).Milliseconds()))
|
||||
|
||||
stats, err := ticker.getStatisticsFunc()
|
||||
if err != nil {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -20,9 +20,14 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -105,6 +110,7 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string)
|
|||
collInfo, ok := m.collInfo[collectionName]
|
||||
|
||||
if !ok {
|
||||
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetCollection", metrics.CacheMissLabel).Inc()
|
||||
m.mu.RUnlock()
|
||||
coll, err := m.describeCollection(ctx, collectionName)
|
||||
if err != nil {
|
||||
|
@ -112,11 +118,14 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string)
|
|||
}
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
tr := timerecord.NewTimeRecorder("UpdateCache")
|
||||
m.updateCollection(coll, collectionName)
|
||||
metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
collInfo = m.collInfo[collectionName]
|
||||
return collInfo.collID, nil
|
||||
}
|
||||
defer m.mu.RUnlock()
|
||||
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetCollection", metrics.CacheHitLabel).Inc()
|
||||
|
||||
return collInfo.collID, nil
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
|
@ -292,6 +293,7 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
|
|||
|
||||
maxTs := ts
|
||||
for channel, ts := range stats {
|
||||
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), channel).Set(float64(ts))
|
||||
channels = append(channels, channel)
|
||||
tss = append(tss, ts)
|
||||
if ts > maxTs {
|
||||
|
@ -313,9 +315,9 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
|
|||
|
||||
for idx, channel := range channels {
|
||||
ts := tss[idx]
|
||||
metrics.ProxyDmlChannelTimeTick.WithLabelValues(channel).Set(float64(ts))
|
||||
metrics.ProxyDmlChannelTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), channel).Set(float64(ts))
|
||||
}
|
||||
metrics.ProxyDmlChannelTimeTick.WithLabelValues("DefaultTimestamp").Set(float64(maxTs))
|
||||
metrics.ProxyDmlChannelTimeTick.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "DefaultTimestamp").Set(float64(maxTs))
|
||||
|
||||
status, err := node.rootCoord.UpdateChannelTimeTick(node.ctx, req)
|
||||
if err != nil {
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -656,7 +657,9 @@ func (it *insertTask) checkFieldAutoIDAndHashPK() error {
|
|||
var rowIDBegin UniqueID
|
||||
var rowIDEnd UniqueID
|
||||
|
||||
tr := timerecord.NewTimeRecorder("applyPK")
|
||||
rowIDBegin, rowIDEnd, _ = it.rowIDAllocator.Alloc(rowNums)
|
||||
metrics.ProxyApplyPrimaryKeyLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan()))
|
||||
|
||||
it.BaseInsertTask.RowIDs = make([]UniqueID, rowNums)
|
||||
for i := rowIDBegin; i < rowIDEnd; i++ {
|
||||
|
@ -759,10 +762,12 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
tr := timerecord.NewTimeRecorder("ColumnToRow")
|
||||
err = it.transferColumnBasedRequestToRowBasedData()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metrics.ProxyInsertColToRowLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), collectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
rowNum := len(it.RowData)
|
||||
it.Timestamps = make([]uint64, rowNum)
|
||||
|
@ -1046,12 +1051,14 @@ func (it *insertTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
tr.Record("assign segment id")
|
||||
|
||||
tr.Record("sendInsertMsg")
|
||||
err = stream.Produce(pack)
|
||||
if err != nil {
|
||||
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
it.result.Status.Reason = err.Error()
|
||||
return err
|
||||
}
|
||||
metrics.ProxySendInsertReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), collectionName).Observe(float64(tr.RecordSpan().Milliseconds()))
|
||||
tr.Record("send insert request to message stream")
|
||||
|
||||
return nil
|
||||
|
@ -1341,6 +1348,9 @@ type searchTask struct {
|
|||
chMgr channelsMgr
|
||||
qc types.QueryCoord
|
||||
collectionName string
|
||||
|
||||
tr *timerecord.TimeRecorder
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (st *searchTask) TraceCtx() context.Context {
|
||||
|
@ -1431,6 +1441,7 @@ func (st *searchTask) PreExecute(ctx context.Context) error {
|
|||
if err != nil { // err is not nil if collection not exists
|
||||
return err
|
||||
}
|
||||
st.collectionID = collID
|
||||
|
||||
if err := validateCollectionName(st.query.CollectionName); err != nil {
|
||||
return err
|
||||
|
@ -1690,11 +1701,12 @@ func (st *searchTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
tr.Record("get used message stream")
|
||||
|
||||
err = stream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Debug("proxy", zap.String("send search request failed", err.Error()))
|
||||
}
|
||||
metrics.ProxySendMessageLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), collectionName, metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds()))
|
||||
st.tr.Record("send message done")
|
||||
log.Debug("proxy sent one searchMsg",
|
||||
zap.Int64("collectionID", st.CollectionID),
|
||||
zap.Int64("msgID", tsMsg.ID()),
|
||||
|
@ -1915,6 +1927,7 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
|
|||
|
||||
log.Debug("Proxy Search PostExecute stage1",
|
||||
zap.Any("len(filterSearchResults)", len(filterSearchResults)))
|
||||
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), st.collectionName, metrics.SearchLabel).Observe(float64(st.tr.RecordSpan().Milliseconds()))
|
||||
tr.Record("Proxy Search PostExecute stage1 done")
|
||||
if len(filterSearchResults) <= 0 {
|
||||
st.result = &milvuspb.SearchResults{
|
||||
|
@ -1926,12 +1939,12 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
|
|||
}
|
||||
return fmt.Errorf("no Available QueryNode result, filter reason %s: id %d", filterReason, st.ID())
|
||||
}
|
||||
|
||||
tr.Record("decodeResultStart")
|
||||
validSearchResults, err := decodeSearchResults(filterSearchResults)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metrics.ProxyDecodeSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), st.collectionName, metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds()))
|
||||
log.Debug("Proxy Search PostExecute stage2", zap.Any("len(validSearchResults)", len(validSearchResults)))
|
||||
if len(validSearchResults) <= 0 {
|
||||
filterReason += "empty search result\n"
|
||||
|
@ -1951,10 +1964,12 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
tr.Record("reduceResultStart")
|
||||
st.result, err = reduceSearchResultData(validSearchResults, searchResults[0].NumQueries, searchResults[0].TopK, searchResults[0].MetricType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metrics.ProxyReduceSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), st.collectionName, metrics.SuccessLabel).Observe(float64(tr.RecordSpan().Milliseconds()))
|
||||
st.result.CollectionName = st.collectionName
|
||||
|
||||
schema, err := globalMetaCache.GetCollectionSchema(ctx, st.query.CollectionName)
|
||||
|
@ -1988,6 +2003,7 @@ type queryTask struct {
|
|||
qc types.QueryCoord
|
||||
ids *schemapb.IDs
|
||||
collectionName string
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (qt *queryTask) TraceCtx() context.Context {
|
||||
|
@ -2032,7 +2048,6 @@ func (qt *queryTask) getChannels() ([]pChan, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var channels []pChan
|
||||
channels, err = qt.chMgr.getChannels(collID)
|
||||
if err != nil {
|
||||
|
@ -2630,6 +2645,8 @@ type getCollectionStatisticsTask struct {
|
|||
ctx context.Context
|
||||
dataCoord types.DataCoord
|
||||
result *milvuspb.GetCollectionStatisticsResponse
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (g *getCollectionStatisticsTask) TraceCtx() context.Context {
|
||||
|
@ -2680,6 +2697,7 @@ func (g *getCollectionStatisticsTask) Execute(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
g.collectionID = collID
|
||||
req := &datapb.GetCollectionStatisticsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_GetCollectionStatistics,
|
||||
|
@ -2717,6 +2735,8 @@ type getPartitionStatisticsTask struct {
|
|||
ctx context.Context
|
||||
dataCoord types.DataCoord
|
||||
result *milvuspb.GetPartitionStatisticsResponse
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (g *getPartitionStatisticsTask) TraceCtx() context.Context {
|
||||
|
@ -2767,6 +2787,7 @@ func (g *getPartitionStatisticsTask) Execute(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
g.collectionID = collID
|
||||
partitionID, err := globalMetaCache.GetPartitionID(ctx, g.CollectionName, g.PartitionName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -3359,6 +3380,8 @@ type createIndexTask struct {
|
|||
ctx context.Context
|
||||
rootCoord types.RootCoord
|
||||
result *commonpb.Status
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (cit *createIndexTask) TraceCtx() context.Context {
|
||||
|
@ -3448,6 +3471,8 @@ func (cit *createIndexTask) PreExecute(ctx context.Context) error {
|
|||
return fmt.Errorf("invalid index params: %v", cit.CreateIndexRequest.ExtraParams)
|
||||
}
|
||||
|
||||
collID, _ := globalMetaCache.GetCollectionID(ctx, collName)
|
||||
cit.collectionID = collID
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -3473,6 +3498,8 @@ type describeIndexTask struct {
|
|||
ctx context.Context
|
||||
rootCoord types.RootCoord
|
||||
result *milvuspb.DescribeIndexResponse
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (dit *describeIndexTask) TraceCtx() context.Context {
|
||||
|
@ -3525,6 +3552,8 @@ func (dit *describeIndexTask) PreExecute(ctx context.Context) error {
|
|||
dit.IndexName = Params.CommonCfg.DefaultIndexName
|
||||
}
|
||||
|
||||
collID, _ := globalMetaCache.GetCollectionID(ctx, dit.CollectionName)
|
||||
dit.collectionID = collID
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -3550,6 +3579,8 @@ type dropIndexTask struct {
|
|||
*milvuspb.DropIndexRequest
|
||||
rootCoord types.RootCoord
|
||||
result *commonpb.Status
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (dit *dropIndexTask) TraceCtx() context.Context {
|
||||
|
@ -3607,6 +3638,9 @@ func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
|
|||
dit.IndexName = Params.CommonCfg.DefaultIndexName
|
||||
}
|
||||
|
||||
collID, _ := globalMetaCache.GetCollectionID(ctx, dit.CollectionName)
|
||||
dit.collectionID = collID
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -3634,6 +3668,8 @@ type getIndexBuildProgressTask struct {
|
|||
rootCoord types.RootCoord
|
||||
dataCoord types.DataCoord
|
||||
result *milvuspb.GetIndexBuildProgressResponse
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (gibpt *getIndexBuildProgressTask) TraceCtx() context.Context {
|
||||
|
@ -3690,6 +3726,7 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error {
|
|||
if err != nil { // err is not nil if collection not exists
|
||||
return err
|
||||
}
|
||||
gibpt.collectionID = collectionID
|
||||
|
||||
showPartitionRequest := &milvuspb.ShowPartitionsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
|
@ -3855,6 +3892,8 @@ type getIndexStateTask struct {
|
|||
indexCoord types.IndexCoord
|
||||
rootCoord types.RootCoord
|
||||
result *milvuspb.GetIndexStateResponse
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (gist *getIndexStateTask) TraceCtx() context.Context {
|
||||
|
@ -3911,6 +3950,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error {
|
|||
if err != nil { // err is not nil if collection not exists
|
||||
return err
|
||||
}
|
||||
gist.collectionID = collectionID
|
||||
|
||||
showPartitionRequest := &milvuspb.ShowPartitionsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
|
@ -4153,6 +4193,8 @@ type loadCollectionTask struct {
|
|||
ctx context.Context
|
||||
queryCoord types.QueryCoord
|
||||
result *commonpb.Status
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (lct *loadCollectionTask) TraceCtx() context.Context {
|
||||
|
@ -4212,6 +4254,7 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lct.collectionID = collID
|
||||
collSchema, err := globalMetaCache.GetCollectionSchema(ctx, lct.CollectionName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -4251,6 +4294,8 @@ type releaseCollectionTask struct {
|
|||
queryCoord types.QueryCoord
|
||||
result *commonpb.Status
|
||||
chMgr channelsMgr
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (rct *releaseCollectionTask) TraceCtx() context.Context {
|
||||
|
@ -4308,6 +4353,7 @@ func (rct *releaseCollectionTask) Execute(ctx context.Context) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rct.collectionID = collID
|
||||
request := &querypb.ReleaseCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_ReleaseCollection,
|
||||
|
@ -4336,6 +4382,8 @@ type loadPartitionsTask struct {
|
|||
ctx context.Context
|
||||
queryCoord types.QueryCoord
|
||||
result *commonpb.Status
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (lpt *loadPartitionsTask) TraceCtx() context.Context {
|
||||
|
@ -4394,6 +4442,7 @@ func (lpt *loadPartitionsTask) Execute(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lpt.collectionID = collID
|
||||
collSchema, err := globalMetaCache.GetCollectionSchema(ctx, lpt.CollectionName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -4431,6 +4480,8 @@ type releasePartitionsTask struct {
|
|||
ctx context.Context
|
||||
queryCoord types.QueryCoord
|
||||
result *commonpb.Status
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (rpt *releasePartitionsTask) TraceCtx() context.Context {
|
||||
|
@ -4489,6 +4540,7 @@ func (rpt *releasePartitionsTask) Execute(ctx context.Context) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rpt.collectionID = collID
|
||||
for _, partitionName := range rpt.PartitionNames {
|
||||
partitionID, err := globalMetaCache.GetPartitionID(ctx, rpt.CollectionName, partitionName)
|
||||
if err != nil {
|
||||
|
@ -4527,6 +4579,8 @@ type deleteTask struct {
|
|||
chTicker channelsTimeTicker
|
||||
vChannels []vChan
|
||||
pChannels []pChan
|
||||
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (dt *deleteTask) TraceCtx() context.Context {
|
||||
|
@ -4652,6 +4706,7 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
dt.DeleteRequest.CollectionID = collID
|
||||
dt.collectionID = collID
|
||||
|
||||
// If partitionName is not empty, partitionID will be set.
|
||||
if len(dt.req.PartitionName) > 0 {
|
||||
|
|
|
@ -29,6 +29,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
|
@ -809,6 +811,7 @@ func TestSearchTask(t *testing.T) {
|
|||
query: nil,
|
||||
chMgr: nil,
|
||||
qc: nil,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
|
||||
// no result
|
||||
|
@ -839,6 +842,7 @@ func TestSearchTask(t *testing.T) {
|
|||
query: nil,
|
||||
chMgr: nil,
|
||||
qc: nil,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
|
||||
// no result
|
||||
|
@ -874,6 +878,7 @@ func TestSearchTask(t *testing.T) {
|
|||
query: nil,
|
||||
chMgr: nil,
|
||||
qc: nil,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
|
||||
// no result
|
||||
|
@ -1826,6 +1831,7 @@ func TestSearchTask_all(t *testing.T) {
|
|||
query: req,
|
||||
chMgr: chMgr,
|
||||
qc: qc,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
|
||||
// simple mock for query node
|
||||
|
@ -2172,6 +2178,7 @@ func TestSearchTaskWithInvalidRoundDecimal(t *testing.T) {
|
|||
query: req,
|
||||
chMgr: chMgr,
|
||||
qc: qc,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
|
||||
// simple mock for query node
|
||||
|
@ -2512,6 +2519,7 @@ func TestSearchTask_7803_reduce(t *testing.T) {
|
|||
query: req,
|
||||
chMgr: chMgr,
|
||||
qc: qc,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
|
||||
// simple mock for query node
|
||||
|
@ -2649,6 +2657,7 @@ func TestSearchTask_Type(t *testing.T) {
|
|||
SearchRequest: &internalpb.SearchRequest{
|
||||
Base: nil,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
assert.NoError(t, task.OnEnqueue())
|
||||
assert.Equal(t, commonpb.MsgType_Search, task.Type())
|
||||
|
@ -2661,6 +2670,7 @@ func TestSearchTask_Ts(t *testing.T) {
|
|||
SearchRequest: &internalpb.SearchRequest{
|
||||
Base: nil,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
assert.NoError(t, task.OnEnqueue())
|
||||
|
||||
|
@ -2705,6 +2715,7 @@ func TestSearchTask_Channels(t *testing.T) {
|
|||
CollectionName: collectionName,
|
||||
},
|
||||
chMgr: chMgr,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
|
||||
// collection not exist
|
||||
|
@ -2794,6 +2805,7 @@ func TestSearchTask_PreExecute(t *testing.T) {
|
|||
},
|
||||
chMgr: chMgr,
|
||||
qc: qc,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
assert.NoError(t, task.OnEnqueue())
|
||||
|
||||
|
@ -3092,6 +3104,7 @@ func TestSearchTask_Execute(t *testing.T) {
|
|||
},
|
||||
chMgr: chMgr,
|
||||
qc: qc,
|
||||
tr: timerecord.NewTimeRecorder("search"),
|
||||
}
|
||||
assert.NoError(t, task.OnEnqueue())
|
||||
|
||||
|
|
Loading…
Reference in New Issue