mirror of https://github.com/milvus-io/milvus.git
Add metrics for master (#5515)
* metric Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * metric Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * add MasterDDChannelTimeTick for metrics Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * add metrics for all master grpc Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * register metrics Signed-off-by: yefu.chen <yefu.chen@zilliz.com> * add metrics register Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * add comments for codacy check Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * fix format issue Signed-off-by: yudong.cai <yudong.cai@zilliz.com> Co-authored-by: yefu.chen <yefu.chen@zilliz.com>pull/5518/head
parent
f3fb0f8b00
commit
d5cd561449
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
ms "github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -68,6 +69,18 @@ type DdOperation struct {
|
|||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
const (
|
||||
// MetricRequestsTotal used to count the num of total requests
|
||||
MetricRequestsTotal = "total"
|
||||
|
||||
// MetricRequestsSuccess used to count the num of successful requests
|
||||
MetricRequestsSuccess = "success"
|
||||
)
|
||||
|
||||
func metricProxyNode(v int64) string {
|
||||
return fmt.Sprintf("client_%d", v)
|
||||
}
|
||||
|
||||
// master core
|
||||
type Core struct {
|
||||
/*
|
||||
|
@ -566,6 +579,7 @@ func (c *Core) setMsgStreams() error {
|
|||
if err := ddStream.Broadcast(&msgPack); err != nil {
|
||||
return err
|
||||
}
|
||||
metrics.MasterDDChannelTimeTick.Set(float64(tsoutil.Mod24H(t)))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -709,7 +723,7 @@ func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.ProxyN
|
|||
if c.NewProxyClient == nil {
|
||||
c.NewProxyClient = f
|
||||
} else {
|
||||
log.Debug("NewProxyClient has alread set")
|
||||
log.Debug("NewProxyClient has already set")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1215,6 +1229,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon
|
|||
}
|
||||
|
||||
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
|
||||
metrics.MasterCreateCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
|
@ -1241,6 +1256,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
|
|||
}, nil
|
||||
}
|
||||
log.Debug("CreateCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterCreateCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1248,6 +1264,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
|
|||
}
|
||||
|
||||
func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
|
||||
metrics.MasterDropCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
|
@ -1274,6 +1291,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
|
|||
}, nil
|
||||
}
|
||||
log.Debug("DropCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterDropCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1281,6 +1299,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
|
|||
}
|
||||
|
||||
func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
|
||||
metrics.MasterHasCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &milvuspb.BoolResponse{
|
||||
|
@ -1314,6 +1333,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
|
|||
}, nil
|
||||
}
|
||||
log.Debug("HasCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterHasCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -1324,6 +1344,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
|
|||
}
|
||||
|
||||
func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
metrics.MasterDescribeCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &milvuspb.DescribeCollectionResponse{
|
||||
|
@ -1358,6 +1379,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
|
|||
}, nil
|
||||
}
|
||||
log.Debug("DescribeCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterDescribeCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
t.Rsp.Status = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1366,6 +1388,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
|
|||
}
|
||||
|
||||
func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
|
||||
metrics.MasterShowCollectionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &milvuspb.ShowCollectionsResponse{
|
||||
|
@ -1401,6 +1424,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
|
|||
}, nil
|
||||
}
|
||||
log.Debug("ShowCollections Success", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterShowCollectionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
t.Rsp.Status = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1409,6 +1433,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
|
|||
}
|
||||
|
||||
func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
|
||||
metrics.MasterCreatePartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
|
@ -1435,6 +1460,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
|
|||
}, nil
|
||||
}
|
||||
log.Debug("CreatePartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterCreatePartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1442,6 +1468,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
|
|||
}
|
||||
|
||||
func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
|
||||
metrics.MasterDropPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
|
@ -1468,6 +1495,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
|
|||
}, nil
|
||||
}
|
||||
log.Debug("DropPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterDropPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1475,6 +1503,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
|
|||
}
|
||||
|
||||
func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
|
||||
metrics.MasterHasPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &milvuspb.BoolResponse{
|
||||
|
@ -1508,6 +1537,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
|
|||
}, nil
|
||||
}
|
||||
log.Debug("HasPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterHasPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -1518,6 +1548,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
|
|||
}
|
||||
|
||||
func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
|
||||
metrics.MasterShowPartitionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
log.Debug("ShowPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID),
|
||||
zap.String("collection", in.CollectionName))
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
|
@ -1560,6 +1591,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
|
|||
log.Debug("ShowPartitions succeed", zap.String("role", Params.RoleName), zap.Int64("msgID", t.Req.Base.MsgID),
|
||||
zap.String("collection name", in.CollectionName), zap.Strings("partition names", t.Rsp.PartitionNames),
|
||||
zap.Int64s("partition ids", t.Rsp.PartitionIDs))
|
||||
metrics.MasterShowPartitionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
t.Rsp.Status = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1568,6 +1600,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
|
|||
}
|
||||
|
||||
func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
|
||||
metrics.MasterCreateIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
|
@ -1594,6 +1627,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest)
|
|||
}, nil
|
||||
}
|
||||
log.Debug("CreateIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterCreateIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1601,6 +1635,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest)
|
|||
}
|
||||
|
||||
func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
|
||||
metrics.MasterDescribeIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &milvuspb.DescribeIndexResponse{
|
||||
|
@ -1641,6 +1676,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
|
|||
idxNames = append(idxNames, i.IndexName)
|
||||
}
|
||||
log.Debug("DescribeIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterDescribeIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
if len(t.Rsp.IndexDescriptions) == 0 {
|
||||
t.Rsp.Status = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_IndexNotExist,
|
||||
|
@ -1656,6 +1692,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
|
|||
}
|
||||
|
||||
func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
|
||||
metrics.MasterDropIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &commonpb.Status{
|
||||
|
@ -1682,6 +1719,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c
|
|||
}, nil
|
||||
}
|
||||
log.Debug("DropIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterDropIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1689,6 +1727,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c
|
|||
}
|
||||
|
||||
func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
|
||||
metrics.MasterDescribeSegmentCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &milvuspb.DescribeSegmentResponse{
|
||||
|
@ -1725,6 +1764,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
|
|||
}, nil
|
||||
}
|
||||
log.Debug("DescribeSegment Success", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterDescribeSegmentCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
t.Rsp.Status = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -1733,6 +1773,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
|
|||
}
|
||||
|
||||
func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
|
||||
metrics.MasterShowSegmentsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc()
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
return &milvuspb.ShowSegmentsResponse{
|
||||
|
@ -1769,6 +1810,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
|
|||
}, nil
|
||||
}
|
||||
log.Debug("ShowSegments Success", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs), zap.Int64("msgID", in.Base.MsgID))
|
||||
metrics.MasterShowSegmentsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc()
|
||||
t.Rsp.Status = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
|
@ -89,6 +90,9 @@ func (p *proxyNodeManager) WatchProxyNode() error {
|
|||
for _, f := range p.getSessions {
|
||||
f(sessions)
|
||||
}
|
||||
for _, s := range sessions {
|
||||
metrics.MasterProxyNodeLister.WithLabelValues(metricProxyNode(s.ServerID)).Set(1)
|
||||
}
|
||||
for _, s := range sessions {
|
||||
log.Debug("Get proxy node", zap.Int64("node id", s.ServerID), zap.String("node addr", s.Address), zap.String("node name", s.ServerName))
|
||||
}
|
||||
|
@ -127,6 +131,7 @@ func (p *proxyNodeManager) WatchProxyNode() error {
|
|||
f(sess)
|
||||
}
|
||||
p.lock.Unlock()
|
||||
metrics.MasterProxyNodeLister.WithLabelValues(metricProxyNode(sess.ServerID)).Set(1)
|
||||
case mvccpb.DELETE:
|
||||
sess := new(sessionutil.Session)
|
||||
err := json.Unmarshal(ev.PrevKv.Value, sess)
|
||||
|
@ -139,6 +144,7 @@ func (p *proxyNodeManager) WatchProxyNode() error {
|
|||
f(sess)
|
||||
}
|
||||
p.lock.Unlock()
|
||||
metrics.MasterProxyNodeLister.WithLabelValues(metricProxyNode(sess.ServerID)).Set(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,10 +16,12 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -170,7 +172,12 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam
|
|||
stream.AsProducer([]string{chanName})
|
||||
t.chanStream[chanName] = stream
|
||||
}
|
||||
return stream.Broadcast(&msgPack)
|
||||
|
||||
err = stream.Broadcast(&msgPack)
|
||||
if err == nil {
|
||||
metrics.MasterInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts)))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// GetProxyNodeNum return the num of detected proxy node
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"net/http"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -21,8 +22,190 @@ var (
|
|||
)
|
||||
*/
|
||||
|
||||
var (
|
||||
// MasterProxyNodeLister used to count the num of registered proxy nodes
|
||||
MasterProxyNodeLister = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "list_of_proxy_node",
|
||||
Help: "List of proxy nodes which has register with etcd",
|
||||
}, []string{"client_id"})
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// for grpc
|
||||
|
||||
// MasterCreateCollectionCounter used to count the num of calls of CreateCollection
|
||||
MasterCreateCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "create_collection_total",
|
||||
Help: "Counter of create collection",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterDropCollectionCounter used to count the num of calls of DropCollection
|
||||
MasterDropCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "drop_collection_total",
|
||||
Help: "Counter of drop collection",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterHasCollectionCounter used to count the num of calls of HasCollection
|
||||
MasterHasCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "has_collection_total",
|
||||
Help: "Counter of has collection",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterDescribeCollectionCounter used to count the num of calls of DescribeCollection
|
||||
MasterDescribeCollectionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "describe_collection_total",
|
||||
Help: "Counter of describe collection",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterShowCollectionsCounter used to count the num of calls of ShowCollections
|
||||
MasterShowCollectionsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "show_collections_total",
|
||||
Help: "Counter of show collections",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterCreatePartitionCounter used to count the num of calls of CreatePartition
|
||||
MasterCreatePartitionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "create_partition_total",
|
||||
Help: "Counter of create partition",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterDropPartitionCounter used to count the num of calls of DropPartition
|
||||
MasterDropPartitionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "drop_partition_total",
|
||||
Help: "Counter of drop partition",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterHasPartitionCounter used to count the num of calls of HasPartition
|
||||
MasterHasPartitionCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "has_partition_total",
|
||||
Help: "Counter of has partition",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterShowPartitionsCounter used to count the num of calls of ShowPartitions
|
||||
MasterShowPartitionsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "show_partitions_total",
|
||||
Help: "Counter of show partitions",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterCreateIndexCounter used to count the num of calls of CreateIndex
|
||||
MasterCreateIndexCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "create_index_total",
|
||||
Help: "Counter of create index",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterDropIndexCounter used to count the num of calls of DropIndex
|
||||
MasterDropIndexCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "drop_index_total",
|
||||
Help: "Counter of drop index",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterDescribeIndexCounter used to count the num of calls of DescribeIndex
|
||||
MasterDescribeIndexCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "describe_index_total",
|
||||
Help: "Counter of describe index",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterDescribeSegmentCounter used to count the num of calls of DescribeSegment
|
||||
MasterDescribeSegmentCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "describe_segment_total",
|
||||
Help: "Counter of describe segment",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
// MasterShowSegmentsCounter used to count the num of calls of ShowSegments
|
||||
MasterShowSegmentsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "show_segments_total",
|
||||
Help: "Counter of show segments",
|
||||
}, []string{"client_id", "type"})
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// for time tick
|
||||
|
||||
// MasterInsertChannelTimeTick used to count the time tick num of insert channel in 24H
|
||||
MasterInsertChannelTimeTick = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "insert_channel_time_tick",
|
||||
Help: "Time tick of insert Channel in 24H",
|
||||
}, []string{"vchannel"})
|
||||
|
||||
// MasterDDChannelTimeTick used to count the time tick num of dd channel in 24H
|
||||
MasterDDChannelTimeTick = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "milvus",
|
||||
Subsystem: "master",
|
||||
Name: "dd_channel_time_tick",
|
||||
Help: "Time tick of dd Channel in 24H",
|
||||
})
|
||||
)
|
||||
|
||||
//RegisterMaster register Master metrics
|
||||
func RegisterMaster() {
|
||||
prometheus.MustRegister(MasterProxyNodeLister)
|
||||
|
||||
// for grpc
|
||||
prometheus.MustRegister(MasterCreateCollectionCounter)
|
||||
prometheus.MustRegister(MasterDropCollectionCounter)
|
||||
prometheus.MustRegister(MasterHasCollectionCounter)
|
||||
prometheus.MustRegister(MasterDescribeCollectionCounter)
|
||||
prometheus.MustRegister(MasterShowCollectionsCounter)
|
||||
prometheus.MustRegister(MasterCreatePartitionCounter)
|
||||
prometheus.MustRegister(MasterDropPartitionCounter)
|
||||
prometheus.MustRegister(MasterHasPartitionCounter)
|
||||
prometheus.MustRegister(MasterShowPartitionsCounter)
|
||||
prometheus.MustRegister(MasterCreateIndexCounter)
|
||||
prometheus.MustRegister(MasterDropIndexCounter)
|
||||
prometheus.MustRegister(MasterDescribeIndexCounter)
|
||||
prometheus.MustRegister(MasterDescribeSegmentCounter)
|
||||
prometheus.MustRegister(MasterShowSegmentsCounter)
|
||||
|
||||
// for time tick
|
||||
prometheus.MustRegister(MasterInsertChannelTimeTick)
|
||||
prometheus.MustRegister(MasterDDChannelTimeTick)
|
||||
//prometheus.MustRegister(PanicCounter)
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,14 @@ func ParseTS(ts uint64) (time.Time, uint64) {
|
|||
return physicalTime, logical
|
||||
}
|
||||
|
||||
// Mod24H parses the ts to millisecond in one day
|
||||
func Mod24H(ts uint64) uint64 {
|
||||
logical := ts & logicalBitsMask
|
||||
physical := ts >> logicalBits
|
||||
physical = physical % (uint64(24 * 60 * 60 * 1000))
|
||||
return (physical << logicalBits) | logical
|
||||
}
|
||||
|
||||
func NewTSOKVBase(etcdAddr []string, tsoRoot, subPath string) *etcdkv.EtcdKV {
|
||||
client, _ := clientv3.New(clientv3.Config{
|
||||
Endpoints: etcdAddr,
|
||||
|
|
Loading…
Reference in New Issue