Replace session.ServerID with config.NodeID (#21669)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/21671/head
bigsheeper 2023-01-13 14:21:41 +08:00 committed by GitHub
parent 269ab2f61d
commit a119d4bdf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 42 additions and 61 deletions

View File

@ -830,10 +830,12 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
// GetMetrics returns DataCoord metrics info
// it may include SystemMetrics, Topology metrics, etc.
func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("nodeID", Params.DataCoordCfg.GetNodeID()),
zap.String("req", req.GetRequest()))
if s.isClosed() {
log.Warn("DataCoord.GetMetrics failed",
zap.Int64("nodeID", s.session.ServerID),
zap.String("req", req.Request),
zap.Error(errDataCoordIsUnhealthy(Params.DataCoordCfg.GetNodeID())))
return &milvuspb.GetMetricsResponse{
@ -849,8 +851,6 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("DataCoord.GetMetrics failed to parse metric type",
zap.Int64("nodeID", s.session.ServerID),
zap.String("req", req.Request),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
@ -866,7 +866,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
if metricType == metricsinfo.SystemInfoMetrics {
metrics, err := s.getSystemInfoMetrics(ctx, req)
if err != nil {
log.Warn("DataCoord GetMetrics failed", zap.Int64("nodeID", Params.DataCoordCfg.GetNodeID()), zap.Error(err))
log.Warn("DataCoord GetMetrics failed", zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -876,8 +876,6 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}
log.RatedDebug(60, "DataCoord.GetMetrics",
zap.Int64("nodeID", s.session.ServerID),
zap.String("req", req.Request),
zap.String("metricType", metricType),
zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
zap.Error(err))
@ -886,8 +884,6 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}
log.RatedWarn(60.0, "DataCoord.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("nodeID", s.session.ServerID),
zap.String("req", req.Request),
zap.String("metricType", metricType))
return &milvuspb.GetMetricsResponse{

View File

@ -758,10 +758,12 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh
// GetMetrics return datanode metrics
func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("nodeID", Params.DataNodeCfg.GetNodeID()),
zap.String("req", req.GetRequest()))
if !node.isHealthy() {
log.Warn("DataNode.GetMetrics failed",
zap.Int64("nodeID", node.session.ServerID),
zap.String("req", req.Request),
zap.Error(errDataNodeIsUnhealthy(node.session.ServerID)))
return &milvuspb.GetMetricsResponse{
@ -775,8 +777,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("DataNode.GetMetrics failed to parse metric type",
zap.Int64("nodeID", node.session.ServerID),
zap.String("req", req.Request),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
@ -790,7 +790,7 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
if metricType == metricsinfo.SystemInfoMetrics {
systemInfoMetrics, err := node.getSystemInfoMetrics(ctx, req)
if err != nil {
log.Warn("DataNode GetMetrics failed", zap.Int64("nodeID", node.session.ServerID), zap.Error(err))
log.Warn("DataNode GetMetrics failed", zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -803,8 +803,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
}
log.RatedWarn(60, "DataNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("nodeID", node.session.ServerID),
zap.String("req", req.Request),
zap.String("metric_type", metricType))
return &milvuspb.GetMetricsResponse{

View File

@ -971,7 +971,10 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho
// GetMetrics gets the metrics info of IndexCoord.
func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.RatedInfo(60, "IndexCoord.GetMetrics", zap.Int64("nodeID", i.serverID), zap.String("req", req.Request))
log := log.Ctx(ctx).With(
zap.Int64("nodeID", Params.IndexCoordCfg.GetNodeID()),
zap.String("req", req.GetRequest()))
log.RatedInfo(60, "IndexCoord.GetMetrics")
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
@ -988,8 +991,6 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Error("IndexCoord.GetMetrics failed to parse metric type",
zap.Int64("nodeID", i.session.ServerID),
zap.String("req", req.Request),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
@ -1009,8 +1010,6 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq
}
log.RatedDebug(60, "IndexCoord.GetMetrics",
zap.Int64("nodeID", i.session.ServerID),
zap.String("req", req.Request),
zap.String("metricType", metricType),
zap.String("metrics", metrics.Response), // TODO(dragondriver): necessary? may be very large
zap.Error(err),
@ -1022,8 +1021,6 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq
}
log.RatedWarn(60, "IndexCoord.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("nodeID", i.session.ServerID),
zap.String("req", req.Request),
zap.String("metricType", metricType))
return &milvuspb.GetMetricsResponse{

View File

@ -227,10 +227,12 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
// GetMetrics gets the metrics info of IndexNode.
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("nodeID", Params.IndexNodeCfg.GetNodeID()),
zap.String("req", req.GetRequest()))
if !commonpbutil.IsHealthyOrStopping(i.stateCode) {
log.Ctx(ctx).Warn("IndexNode.GetMetrics failed",
zap.Int64("nodeID", i.GetNodeID()),
zap.String("req", req.Request),
log.Warn("IndexNode.GetMetrics failed",
zap.Error(errIndexNodeIsUnhealthy(Params.IndexNodeCfg.GetNodeID())))
return &milvuspb.GetMetricsResponse{
@ -244,9 +246,7 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Ctx(ctx).Warn("IndexNode.GetMetrics failed to parse metric type",
zap.Int64("nodeID", i.GetNodeID()),
zap.String("req", req.Request),
log.Warn("IndexNode.GetMetrics failed to parse metric type",
zap.Error(err))
return &milvuspb.GetMetricsResponse{
@ -261,18 +261,14 @@ func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequ
if metricType == metricsinfo.SystemInfoMetrics {
metrics, err := getSystemInfoMetrics(ctx, req, i)
log.Ctx(ctx).RatedDebug(60, "IndexNode.GetMetrics",
zap.Int64("nodeID", i.GetNodeID()),
zap.String("req", req.Request),
log.RatedDebug(60, "IndexNode.GetMetrics",
zap.String("metric_type", metricType),
zap.Error(err))
return metrics, nil
}
log.Ctx(ctx).RatedWarn(60, "IndexNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("nodeID", i.GetNodeID()),
zap.String("req", req.Request),
log.RatedWarn(60, "IndexNode.GetMetrics failed, request metric type is not implemented yet",
zap.String("metric_type", metricType))
return &milvuspb.GetMetricsResponse{

View File

@ -3798,13 +3798,11 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics
defer sp.Finish()
log := log.Ctx(ctx).With(
zap.Int64("nodeID", node.session.ServerID),
zap.String("req", req.Request))
zap.Int64("nodeID", Params.ProxyCfg.GetNodeID()),
zap.String("req", req.GetRequest()))
if !node.checkHealthy() {
log.Warn("Proxy.GetProxyMetrics failed",
zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
zap.String("req", req.Request),
zap.Error(errProxyIsUnhealthy(Params.ProxyCfg.GetNodeID())))
return &milvuspb.GetMetricsResponse{
@ -3818,8 +3816,6 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("Proxy.GetProxyMetrics failed to parse metric type",
zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
zap.String("req", req.Request),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
@ -3841,8 +3837,6 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics
proxyMetrics, err := getProxyMetrics(ctx, req, node)
if err != nil {
log.Warn("Proxy.GetProxyMetrics failed to getProxyMetrics",
zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
zap.String("req", req.Request),
zap.Error(err))
return &milvuspb.GetMetricsResponse{

View File

@ -582,8 +582,11 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
}
func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.RatedDebug(60, "get metrics request received",
zap.String("metricType", req.GetRequest()))
log := log.Ctx(ctx).With(
zap.Int64("nodeID", Params.QueryCoordCfg.GetNodeID()),
zap.String("req", req.GetRequest()))
log.RatedDebug(60, "get metrics request received")
if s.status.Load() != commonpb.StateCode_Healthy {
msg := "failed to get metrics"

View File

@ -1228,10 +1228,12 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()),
zap.String("req", req.GetRequest()))
if !node.isHealthyOrStopping() {
log.Warn("QueryNode.GetMetrics failed",
zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()),
zap.String("req", req.Request),
zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
return &milvuspb.GetMetricsResponse{
@ -1247,9 +1249,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Ctx(ctx).Warn("QueryNode.GetMetrics failed to parse metric type",
zap.Int64("nodeID", node.session.ServerID),
zap.String("req", req.Request),
log.Warn("QueryNode.GetMetrics failed to parse metric type",
zap.Error(err))
return &milvuspb.GetMetricsResponse{
@ -1263,9 +1263,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
if metricType == metricsinfo.SystemInfoMetrics {
queryNodeMetrics, err := getSystemInfoMetrics(ctx, req, node)
if err != nil {
log.Ctx(ctx).Warn("QueryNode.GetMetrics failed",
zap.Int64("nodeID", node.session.ServerID),
zap.String("req", req.Request),
log.Warn("QueryNode.GetMetrics failed",
zap.String("metricType", metricType),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
@ -1278,9 +1276,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
return queryNodeMetrics, nil
}
log.Ctx(ctx).RatedDebug(60, "QueryNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("nodeID", node.session.ServerID),
zap.String("req", req.Request),
log.RatedDebug(60, "QueryNode.GetMetrics failed, request metric type is not implemented yet",
zap.String("metricType", metricType))
return &milvuspb.GetMetricsResponse{

View File

@ -1448,6 +1448,10 @@ func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfi
// GetMetrics get metrics
func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("nodeID", Params.RootCoordCfg.GetNodeID()),
zap.String("req", in.GetRequest()))
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.GetMetricsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
@ -1457,8 +1461,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
metricType, err := metricsinfo.ParseMetricType(in.Request)
if err != nil {
log.Warn("ParseMetricType failed", zap.String("role", typeutil.RootCoordRole),
zap.Int64("nodeID", c.session.ServerID), zap.String("req", in.Request), zap.Error(err))
log.Warn("ParseMetricType failed", zap.String("role", typeutil.RootCoordRole), zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ParseMetricType failed: "+err.Error()),
Response: "",
@ -1474,7 +1477,6 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
if err != nil {
log.Warn("GetSystemInfoMetrics failed",
zap.String("role", typeutil.RootCoordRole),
zap.String("metricType", metricType),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, fmt.Sprintf("getSystemInfoMetrics failed: %s", err.Error())),
@ -1486,8 +1488,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
return metrics, err
}
log.RatedWarn(60, "GetMetrics failed, metric type not implemented", zap.String("role", typeutil.RootCoordRole),
zap.String("metricType", metricType))
log.RatedWarn(60, "GetMetrics failed, metric type not implemented", zap.String("role", typeutil.RootCoordRole))
return &milvuspb.GetMetricsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, metricsinfo.MsgUnimplementedMetric),