mirror of https://github.com/milvus-io/milvus.git
Proxy metrics integrates with other coordinators (#7400)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/7418/head
parent
93bbaef91d
commit
3721cbfea1
|
@ -41,7 +41,21 @@ func (s *Server) getSystemInfoMetrics(
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes := s.cluster.GetNodes()
|
nodes := s.cluster.GetNodes()
|
||||||
|
log.Debug("datacoord.getSystemInfoMetrics",
|
||||||
|
zap.Int("data nodes num", len(nodes)))
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
|
if node == nil {
|
||||||
|
log.Warn("skip invalid data node",
|
||||||
|
zap.String("reason", "datanode is nil"))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if node.GetClient() == nil {
|
||||||
|
log.Warn("skip invalid data node",
|
||||||
|
zap.String("reason", "datanode client is nil"))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
metrics, err := node.GetClient().GetMetrics(ctx, req)
|
metrics, err := node.GetClient().GetMetrics(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("invalid metrics of query node was found",
|
log.Warn("invalid metrics of query node was found",
|
||||||
|
@ -92,7 +106,8 @@ func (s *Server) getSystemInfoMetrics(
|
||||||
Cluster: clusterTopology,
|
Cluster: clusterTopology,
|
||||||
Connections: metricsinfo.ConnTopology{
|
Connections: metricsinfo.ConnTopology{
|
||||||
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, Params.NodeID),
|
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, Params.NodeID),
|
||||||
// TODO(dragondriver): connection info
|
// TODO(dragondriver): fill ConnectedComponents if necessary
|
||||||
|
ConnectedComponents: []metricsinfo.ConnectionInfo{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -120,6 +120,37 @@ func (c *mockDataNodeClient) FlushSegments(ctx context.Context, in *datapb.Flush
|
||||||
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
|
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||||
|
// TODO(dragondriver): change the id, though it's not important in ut
|
||||||
|
nodeID := UniqueID(20210819)
|
||||||
|
|
||||||
|
nodeInfos := metricsinfo.DataNodeInfos{
|
||||||
|
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||||
|
Name: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
|
||||||
|
if err != nil {
|
||||||
|
return &milvuspb.GetMetricsResponse{
|
||||||
|
Status: &commonpb.Status{
|
||||||
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||||
|
Reason: err.Error(),
|
||||||
|
},
|
||||||
|
Response: "",
|
||||||
|
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &milvuspb.GetMetricsResponse{
|
||||||
|
Status: &commonpb.Status{
|
||||||
|
ErrorCode: commonpb.ErrorCode_Success,
|
||||||
|
Reason: "",
|
||||||
|
},
|
||||||
|
Response: resp,
|
||||||
|
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *mockDataNodeClient) Stop() error {
|
func (c *mockDataNodeClient) Stop() error {
|
||||||
c.state = internalpb.StateCode_Abnormal
|
c.state = internalpb.StateCode_Abnormal
|
||||||
return nil
|
return nil
|
||||||
|
@ -315,16 +346,24 @@ func (m *mockRootCoordService) AddNewSegment(ctx context.Context, in *datapb.Seg
|
||||||
panic("not implemented") // TODO: Implement
|
panic("not implemented") // TODO: Implement
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
func (m *mockRootCoordService) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||||
// TODO(dragondriver): change the id, though it's not important in ut
|
// TODO(dragondriver): change the id, though it's not important in ut
|
||||||
nodeID := UniqueID(20210819)
|
nodeID := UniqueID(20210901)
|
||||||
|
|
||||||
nodeInfos := metricsinfo.DataNodeInfos{
|
rootCoordTopology := metricsinfo.RootCoordTopology{
|
||||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
Self: metricsinfo.RootCoordInfos{
|
||||||
Name: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
|
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||||
|
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Connections: metricsinfo.ConnTopology{
|
||||||
|
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID),
|
||||||
|
// TODO(dragondriver): fill ConnectedComponents if necessary
|
||||||
|
ConnectedComponents: []metricsinfo.ConnectionInfo{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
|
|
||||||
|
resp, err := metricsinfo.MarshalTopology(rootCoordTopology)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &milvuspb.GetMetricsResponse{
|
return &milvuspb.GetMetricsResponse{
|
||||||
Status: &commonpb.Status{
|
Status: &commonpb.Status{
|
||||||
|
@ -332,7 +371,7 @@ func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMe
|
||||||
Reason: err.Error(),
|
Reason: err.Error(),
|
||||||
},
|
},
|
||||||
Response: "",
|
Response: "",
|
||||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
|
ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,6 +381,6 @@ func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMe
|
||||||
Reason: "",
|
Reason: "",
|
||||||
},
|
},
|
||||||
Response: resp,
|
Response: resp,
|
||||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
|
ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,7 @@ func getSystemInfoMetrics(
|
||||||
clusterTopology := metricsinfo.IndexClusterTopology{
|
clusterTopology := metricsinfo.IndexClusterTopology{
|
||||||
Self: metricsinfo.IndexCoordInfos{
|
Self: metricsinfo.IndexCoordInfos{
|
||||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||||
Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID),
|
Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.session.ServerID),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
ConnectedNodes: make([]metricsinfo.IndexNodeInfos, 0),
|
ConnectedNodes: make([]metricsinfo.IndexNodeInfos, 0),
|
||||||
|
@ -91,8 +91,9 @@ func getSystemInfoMetrics(
|
||||||
coordTopology := metricsinfo.IndexCoordTopology{
|
coordTopology := metricsinfo.IndexCoordTopology{
|
||||||
Cluster: clusterTopology,
|
Cluster: clusterTopology,
|
||||||
Connections: metricsinfo.ConnTopology{
|
Connections: metricsinfo.ConnTopology{
|
||||||
Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID),
|
Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.session.ServerID),
|
||||||
// TODO(dragondriver): connection info
|
// TODO(dragondriver): fill ConnectedComponents if necessary
|
||||||
|
ConnectedComponents: []metricsinfo.ConnectionInfo{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +105,7 @@ func getSystemInfoMetrics(
|
||||||
Reason: err.Error(),
|
Reason: err.Error(),
|
||||||
},
|
},
|
||||||
Response: "",
|
Response: "",
|
||||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID),
|
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.session.ServerID),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,6 +115,6 @@ func getSystemInfoMetrics(
|
||||||
Reason: "",
|
Reason: "",
|
||||||
},
|
},
|
||||||
Response: resp,
|
Response: resp,
|
||||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID),
|
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.session.ServerID),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,16 +52,34 @@ func getSystemInfoMetrics(
|
||||||
}
|
}
|
||||||
|
|
||||||
queryCoordResp, queryCoordErr := node.queryCoord.GetMetrics(ctx, request)
|
queryCoordResp, queryCoordErr := node.queryCoord.GetMetrics(ctx, request)
|
||||||
skipQueryCoord := false
|
|
||||||
queryCoordRoleName := ""
|
queryCoordRoleName := ""
|
||||||
if queryCoordErr != nil || queryCoordResp == nil {
|
if queryCoordErr == nil && queryCoordResp != nil {
|
||||||
skipQueryCoord = true
|
|
||||||
} else {
|
|
||||||
queryCoordRoleName = queryCoordResp.ComponentName
|
queryCoordRoleName = queryCoordResp.ComponentName
|
||||||
identifierMap[queryCoordRoleName] = getUniqueIntGeneratorIns().get()
|
identifierMap[queryCoordRoleName] = getUniqueIntGeneratorIns().get()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !skipQueryCoord {
|
dataCoordResp, dataCoordErr := node.dataCoord.GetMetrics(ctx, request)
|
||||||
|
dataCoordRoleName := ""
|
||||||
|
if dataCoordErr == nil && dataCoordResp != nil {
|
||||||
|
dataCoordRoleName = dataCoordResp.ComponentName
|
||||||
|
identifierMap[dataCoordRoleName] = getUniqueIntGeneratorIns().get()
|
||||||
|
}
|
||||||
|
|
||||||
|
indexCoordResp, indexCoordErr := node.indexCoord.GetMetrics(ctx, request)
|
||||||
|
indexCoordRoleName := ""
|
||||||
|
if indexCoordErr == nil && indexCoordResp != nil {
|
||||||
|
indexCoordRoleName = indexCoordResp.ComponentName
|
||||||
|
identifierMap[indexCoordRoleName] = getUniqueIntGeneratorIns().get()
|
||||||
|
}
|
||||||
|
|
||||||
|
rootCoordResp, rootCoordErr := node.rootCoord.GetMetrics(ctx, request)
|
||||||
|
rootCoordRoleName := ""
|
||||||
|
if rootCoordErr == nil && rootCoordResp != nil {
|
||||||
|
rootCoordRoleName = rootCoordResp.ComponentName
|
||||||
|
identifierMap[rootCoordRoleName] = getUniqueIntGeneratorIns().get()
|
||||||
|
}
|
||||||
|
|
||||||
|
if queryCoordErr == nil && queryCoordResp != nil {
|
||||||
proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
ConnectedIdentifier: identifierMap[queryCoordRoleName],
|
ConnectedIdentifier: identifierMap[queryCoordRoleName],
|
||||||
Type: metricsinfo.Forward,
|
Type: metricsinfo.Forward,
|
||||||
|
@ -78,6 +96,42 @@ func getSystemInfoMetrics(
|
||||||
Infos: &queryCoordTopology.Cluster.Self,
|
Infos: &queryCoordTopology.Cluster.Self,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fill connection edge, a little trick here
|
||||||
|
for _, edge := range queryCoordTopology.Connections.ConnectedComponents {
|
||||||
|
switch edge.TargetType {
|
||||||
|
case typeutil.RootCoordRole:
|
||||||
|
if rootCoordErr == nil && rootCoordResp != nil {
|
||||||
|
queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[rootCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case typeutil.DataCoordRole:
|
||||||
|
if dataCoordErr == nil && dataCoordResp != nil {
|
||||||
|
queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[dataCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.DataCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case typeutil.IndexCoordRole:
|
||||||
|
if indexCoordErr == nil && indexCoordResp != nil {
|
||||||
|
queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[indexCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.IndexCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case typeutil.QueryCoordRole:
|
||||||
|
queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[queryCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.QueryCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// add query nodes to system topology graph
|
// add query nodes to system topology graph
|
||||||
for _, queryNode := range queryCoordTopology.Cluster.ConnectedNodes {
|
for _, queryNode := range queryCoordTopology.Cluster.ConnectedNodes {
|
||||||
identifier := getUniqueIntGeneratorIns().get()
|
identifier := getUniqueIntGeneratorIns().get()
|
||||||
|
@ -100,7 +154,211 @@ func getSystemInfoMetrics(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(dragondriver): integrate other coordinator
|
if dataCoordErr == nil && dataCoordResp != nil {
|
||||||
|
proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[dataCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.DataCoordRole,
|
||||||
|
})
|
||||||
|
|
||||||
|
dataCoordTopology := metricsinfo.DataCoordTopology{}
|
||||||
|
err = metricsinfo.UnmarshalTopology(dataCoordResp.Response, &dataCoordTopology)
|
||||||
|
if err == nil {
|
||||||
|
// data coord in system topology graph
|
||||||
|
dataCoordTopologyNode := metricsinfo.SystemTopologyNode{
|
||||||
|
Identifier: identifierMap[dataCoordRoleName],
|
||||||
|
Connected: make([]metricsinfo.ConnectionEdge, 0),
|
||||||
|
Infos: &dataCoordTopology.Cluster.Self,
|
||||||
|
}
|
||||||
|
|
||||||
|
// fill connection edge, a little trick here
|
||||||
|
for _, edge := range dataCoordTopology.Connections.ConnectedComponents {
|
||||||
|
switch edge.TargetType {
|
||||||
|
case typeutil.RootCoordRole:
|
||||||
|
if rootCoordErr == nil && rootCoordResp != nil {
|
||||||
|
dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[rootCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case typeutil.DataCoordRole:
|
||||||
|
dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[dataCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.DataCoordRole,
|
||||||
|
})
|
||||||
|
case typeutil.IndexCoordRole:
|
||||||
|
if indexCoordErr == nil && indexCoordResp != nil {
|
||||||
|
dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[indexCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.IndexCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case typeutil.QueryCoordRole:
|
||||||
|
if queryCoordErr == nil && queryCoordResp != nil {
|
||||||
|
dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[queryCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.QueryCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// add data nodes to system topology graph
|
||||||
|
for _, dataNode := range dataCoordTopology.Cluster.ConnectedNodes {
|
||||||
|
identifier := getUniqueIntGeneratorIns().get()
|
||||||
|
identifierMap[dataNode.Name] = identifier
|
||||||
|
dataNodeTopologyNode := metricsinfo.SystemTopologyNode{
|
||||||
|
Identifier: identifier,
|
||||||
|
Connected: nil,
|
||||||
|
Infos: &dataNode,
|
||||||
|
}
|
||||||
|
systemTopology.NodesInfo = append(systemTopology.NodesInfo, dataNodeTopologyNode)
|
||||||
|
dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifier,
|
||||||
|
Type: metricsinfo.CoordConnectToNode,
|
||||||
|
TargetType: typeutil.DataNodeRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// add data coord to system topology graph
|
||||||
|
systemTopology.NodesInfo = append(systemTopology.NodesInfo, dataCoordTopologyNode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if indexCoordErr == nil && indexCoordResp != nil {
|
||||||
|
proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[indexCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.IndexCoordRole,
|
||||||
|
})
|
||||||
|
|
||||||
|
indexCoordTopology := metricsinfo.IndexCoordTopology{}
|
||||||
|
err = metricsinfo.UnmarshalTopology(indexCoordResp.Response, &indexCoordTopology)
|
||||||
|
if err == nil {
|
||||||
|
// index coord in system topology graph
|
||||||
|
indexCoordTopologyNode := metricsinfo.SystemTopologyNode{
|
||||||
|
Identifier: identifierMap[indexCoordRoleName],
|
||||||
|
Connected: make([]metricsinfo.ConnectionEdge, 0),
|
||||||
|
Infos: &indexCoordTopology.Cluster.Self,
|
||||||
|
}
|
||||||
|
|
||||||
|
// fill connection edge, a little trick here
|
||||||
|
for _, edge := range indexCoordTopology.Connections.ConnectedComponents {
|
||||||
|
switch edge.TargetType {
|
||||||
|
case typeutil.RootCoordRole:
|
||||||
|
if rootCoordErr == nil && rootCoordResp != nil {
|
||||||
|
indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[rootCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case typeutil.DataCoordRole:
|
||||||
|
if dataCoordErr == nil && dataCoordResp != nil {
|
||||||
|
indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[dataCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.DataCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case typeutil.IndexCoordRole:
|
||||||
|
indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[indexCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.IndexCoordRole,
|
||||||
|
})
|
||||||
|
case typeutil.QueryCoordRole:
|
||||||
|
if queryCoordErr == nil && queryCoordResp != nil {
|
||||||
|
indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[queryCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.QueryCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// add index nodes to system topology graph
|
||||||
|
for _, indexNode := range indexCoordTopology.Cluster.ConnectedNodes {
|
||||||
|
identifier := getUniqueIntGeneratorIns().get()
|
||||||
|
identifierMap[indexNode.Name] = identifier
|
||||||
|
indexNodeTopologyNode := metricsinfo.SystemTopologyNode{
|
||||||
|
Identifier: identifier,
|
||||||
|
Connected: nil,
|
||||||
|
Infos: &indexNode,
|
||||||
|
}
|
||||||
|
systemTopology.NodesInfo = append(systemTopology.NodesInfo, indexNodeTopologyNode)
|
||||||
|
indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifier,
|
||||||
|
Type: metricsinfo.CoordConnectToNode,
|
||||||
|
TargetType: typeutil.IndexNodeRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// add index coord to system topology graph
|
||||||
|
systemTopology.NodesInfo = append(systemTopology.NodesInfo, indexCoordTopologyNode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rootCoordErr == nil && rootCoordResp != nil {
|
||||||
|
proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[rootCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
})
|
||||||
|
|
||||||
|
rootCoordTopology := metricsinfo.RootCoordTopology{}
|
||||||
|
err = metricsinfo.UnmarshalTopology(rootCoordResp.Response, &rootCoordTopology)
|
||||||
|
if err == nil {
|
||||||
|
// root coord in system topology graph
|
||||||
|
rootCoordTopologyNode := metricsinfo.SystemTopologyNode{
|
||||||
|
Identifier: identifierMap[rootCoordRoleName],
|
||||||
|
Connected: make([]metricsinfo.ConnectionEdge, 0),
|
||||||
|
Infos: &rootCoordTopology.Self,
|
||||||
|
}
|
||||||
|
|
||||||
|
// fill connection edge, a little trick here
|
||||||
|
for _, edge := range rootCoordTopology.Connections.ConnectedComponents {
|
||||||
|
switch edge.TargetType {
|
||||||
|
case typeutil.RootCoordRole:
|
||||||
|
rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[rootCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
})
|
||||||
|
case typeutil.DataCoordRole:
|
||||||
|
if dataCoordErr == nil && dataCoordResp != nil {
|
||||||
|
rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[dataCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.DataCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
case typeutil.IndexCoordRole:
|
||||||
|
rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[indexCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.IndexCoordRole,
|
||||||
|
})
|
||||||
|
case typeutil.QueryCoordRole:
|
||||||
|
if queryCoordErr == nil && queryCoordResp != nil {
|
||||||
|
rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{
|
||||||
|
ConnectedIdentifier: identifierMap[queryCoordRoleName],
|
||||||
|
Type: metricsinfo.Forward,
|
||||||
|
TargetType: typeutil.QueryCoordRole,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// add root coord to system topology graph
|
||||||
|
systemTopology.NodesInfo = append(systemTopology.NodesInfo, rootCoordTopologyNode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// add proxy to system topology graph
|
// add proxy to system topology graph
|
||||||
systemTopology.NodesInfo = append(systemTopology.NodesInfo, proxyTopologyNode)
|
systemTopology.NodesInfo = append(systemTopology.NodesInfo, proxyTopologyNode)
|
||||||
|
|
|
@ -92,7 +92,8 @@ func getSystemInfoMetrics(
|
||||||
Cluster: clusterTopology,
|
Cluster: clusterTopology,
|
||||||
Connections: metricsinfo.ConnTopology{
|
Connections: metricsinfo.ConnTopology{
|
||||||
Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, Params.QueryCoordID),
|
Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, Params.QueryCoordID),
|
||||||
// TODO(dragondriver): connection info
|
// TODO(dragondriver): fill ConnectedComponents if necessary
|
||||||
|
ConnectedComponents: []metricsinfo.ConnectionInfo{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,12 +26,20 @@ import (
|
||||||
|
|
||||||
func (c *Core) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
func (c *Core) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||||
// TODO(dragondriver): add more metrics
|
// TODO(dragondriver): add more metrics
|
||||||
nodeInfos := metricsinfo.RootCoordInfos{
|
rootCoordTopology := metricsinfo.RootCoordTopology{
|
||||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
Self: metricsinfo.RootCoordInfos{
|
||||||
|
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||||
|
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, c.session.ServerID),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Connections: metricsinfo.ConnTopology{
|
||||||
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, c.session.ServerID),
|
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, c.session.ServerID),
|
||||||
|
// TODO(dragondriver): fill ConnectedComponents if necessary
|
||||||
|
ConnectedComponents: []metricsinfo.ConnectionInfo{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
|
|
||||||
|
resp, err := metricsinfo.MarshalTopology(rootCoordTopology)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Failed to marshal system info metrics of root coordinator",
|
log.Warn("Failed to marshal system info metrics of root coordinator",
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
|
|
@ -63,6 +63,8 @@ type DataCoord interface {
|
||||||
GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error)
|
GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error)
|
||||||
SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error)
|
SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error)
|
||||||
GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error)
|
GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error)
|
||||||
|
|
||||||
|
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexNode interface {
|
type IndexNode interface {
|
||||||
|
@ -81,6 +83,8 @@ type IndexCoord interface {
|
||||||
DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error)
|
DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error)
|
||||||
GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error)
|
GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error)
|
||||||
GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error)
|
GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error)
|
||||||
|
|
||||||
|
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type RootCoord interface {
|
type RootCoord interface {
|
||||||
|
@ -113,6 +117,8 @@ type RootCoord interface {
|
||||||
ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error)
|
ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error)
|
||||||
ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error)
|
ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error)
|
||||||
SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error)
|
SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error)
|
||||||
|
|
||||||
|
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RootCoordComponent is used by grpc server of RootCoord
|
// RootCoordComponent is used by grpc server of RootCoord
|
||||||
|
|
|
@ -47,14 +47,28 @@ type QueryClusterTopology struct {
|
||||||
ConnectedNodes []QueryNodeInfos `json:"connected_nodes"`
|
ConnectedNodes []QueryNodeInfos `json:"connected_nodes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ConnectionType = string
|
||||||
|
|
||||||
|
const (
|
||||||
|
CoordConnectToNode ConnectionType = "manage"
|
||||||
|
Forward ConnectionType = "forward"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ConnectionTargetType = string
|
||||||
|
|
||||||
|
type ConnectionInfo struct {
|
||||||
|
TargetName string `json:"target_name"`
|
||||||
|
TargetType ConnectionTargetType `json:"target_type"`
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(dragondriver)
|
// TODO(dragondriver)
|
||||||
// necessary to show all connection edge in topology graph?
|
// necessary to show all connection edge in topology graph?
|
||||||
// for example, in system, Proxy connects to RootCoord and RootCoord also connects to Proxy,
|
// for example, in system, Proxy connects to RootCoord and RootCoord also connects to Proxy,
|
||||||
// if we do so, the connection relationship may be confusing.
|
// if we do so, the connection relationship may be confusing.
|
||||||
// ConnTopology shows how different components connect to each other.
|
// ConnTopology shows how different components connect to each other.
|
||||||
type ConnTopology struct {
|
type ConnTopology struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
ConnectedComponents []string `json:"connected_components"`
|
ConnectedComponents []ConnectionInfo `json:"connected_components"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryCoordTopology shows the whole metrics of query cluster
|
// QueryCoordTopology shows the whole metrics of query cluster
|
||||||
|
@ -93,15 +107,6 @@ type RootCoordTopology struct {
|
||||||
Connections ConnTopology `json:"connections"`
|
Connections ConnTopology `json:"connections"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConnectionType string
|
|
||||||
|
|
||||||
const (
|
|
||||||
CoordConnectToNode ConnectionType = "manage"
|
|
||||||
Forward ConnectionType = "forward"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ConnectionTargetType string
|
|
||||||
|
|
||||||
type ConnectionEdge struct {
|
type ConnectionEdge struct {
|
||||||
ConnectedIdentifier int `json:"connected_identifier"`
|
ConnectedIdentifier int `json:"connected_identifier"`
|
||||||
Type ConnectionType `json:"type"`
|
Type ConnectionType `json:"type"`
|
||||||
|
|
|
@ -95,8 +95,10 @@ func TestQueryCoordTopology_Codec(t *testing.T) {
|
||||||
},
|
},
|
||||||
Connections: ConnTopology{
|
Connections: ConnTopology{
|
||||||
Name: ConstructComponentName(typeutil.QueryCoordRole, 1),
|
Name: ConstructComponentName(typeutil.QueryCoordRole, 1),
|
||||||
ConnectedComponents: []string{
|
ConnectedComponents: []ConnectionInfo{
|
||||||
ConstructComponentName(typeutil.RootCoordRole, 1),
|
{
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -176,8 +178,10 @@ func TestIndexCoordTopology_Codec(t *testing.T) {
|
||||||
},
|
},
|
||||||
Connections: ConnTopology{
|
Connections: ConnTopology{
|
||||||
Name: ConstructComponentName(typeutil.IndexCoordRole, 1),
|
Name: ConstructComponentName(typeutil.IndexCoordRole, 1),
|
||||||
ConnectedComponents: []string{
|
ConnectedComponents: []ConnectionInfo{
|
||||||
ConstructComponentName(typeutil.RootCoordRole, 1),
|
{
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -257,8 +261,10 @@ func TestDataCoordTopology_Codec(t *testing.T) {
|
||||||
},
|
},
|
||||||
Connections: ConnTopology{
|
Connections: ConnTopology{
|
||||||
Name: ConstructComponentName(typeutil.DataCoordRole, 1),
|
Name: ConstructComponentName(typeutil.DataCoordRole, 1),
|
||||||
ConnectedComponents: []string{
|
ConnectedComponents: []ConnectionInfo{
|
||||||
ConstructComponentName(typeutil.RootCoordRole, 1),
|
{
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -290,8 +296,10 @@ func TestRootCoordTopology_Codec(t *testing.T) {
|
||||||
},
|
},
|
||||||
Connections: ConnTopology{
|
Connections: ConnTopology{
|
||||||
Name: ConstructComponentName(typeutil.RootCoordRole, 1),
|
Name: ConstructComponentName(typeutil.RootCoordRole, 1),
|
||||||
ConnectedComponents: []string{
|
ConnectedComponents: []ConnectionInfo{
|
||||||
ConstructComponentName(typeutil.RootCoordRole, 1),
|
{
|
||||||
|
TargetType: typeutil.IndexCoordRole,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -313,11 +321,19 @@ func TestRootCoordTopology_Codec(t *testing.T) {
|
||||||
func TestConnTopology_Codec(t *testing.T) {
|
func TestConnTopology_Codec(t *testing.T) {
|
||||||
topology1 := ConnTopology{
|
topology1 := ConnTopology{
|
||||||
Name: ConstructComponentName(typeutil.ProxyRole, 1),
|
Name: ConstructComponentName(typeutil.ProxyRole, 1),
|
||||||
ConnectedComponents: []string{
|
ConnectedComponents: []ConnectionInfo{
|
||||||
ConstructComponentName(typeutil.RootCoordRole, 1),
|
{
|
||||||
ConstructComponentName(typeutil.QueryCoordRole, 1),
|
TargetType: typeutil.IndexCoordRole,
|
||||||
ConstructComponentName(typeutil.DataCoordRole, 1),
|
},
|
||||||
ConstructComponentName(typeutil.IndexCoordRole, 1),
|
{
|
||||||
|
TargetType: typeutil.DataCoordRole,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
TargetType: typeutil.QueryCoordRole,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
TargetType: typeutil.RootCoordRole,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
s, err := MarshalTopology(topology1)
|
s, err := MarshalTopology(topology1)
|
||||||
|
|
Loading…
Reference in New Issue