enhance: add db name in replica description (#38673)

issue: #36621
pr: #38672

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/39133/head
jaime 2025-01-09 19:43:04 +08:00 committed by GitHub
parent f70262c980
commit 0693634f62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 315 additions and 163 deletions

View File

@ -716,6 +716,15 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap
}
func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}
task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {

View File

@ -130,9 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool {
log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err))
return false
}
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
@ -141,15 +138,13 @@ func (t *l0CompactionTask) processExecuting() bool {
return false
}
updateOps = append(updateOps, setState(datapb.CompactionTaskState_meta_saved))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil {
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
return false
}
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
updateOps = append(updateOps, setState(datapb.CompactionTaskState_failed))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err))
return false
}
@ -159,9 +154,7 @@ func (t *l0CompactionTask) processExecuting() bool {
}
func (t *l0CompactionTask) processMetaSaved() bool {
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
err := t.updateAndSaveTaskMeta(updateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return false
@ -358,6 +351,15 @@ func (t *l0CompactionTask) SaveTaskMeta() error {
}
func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}
task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {

View File

@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction
Type: task.Type.String(),
State: task.State.String(),
FailReason: task.FailReason,
StartTime: typeutil.TimestampToString(uint64(task.StartTime)),
EndTime: typeutil.TimestampToString(uint64(task.EndTime)),
StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000),
EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000),
TotalRows: task.TotalRows,
InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string {
return strconv.FormatInt(t, 10)

View File

@ -97,9 +97,7 @@ func (t *mixCompactionTask) processPipelining() bool {
func (t *mixCompactionTask) processMetaSaved() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
return false
}
@ -119,15 +117,12 @@ func (t *mixCompactionTask) processExecuting() bool {
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
return false
}
ts := time.Now().Unix()
failedUpdateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_failed)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
@ -137,7 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool {
if err := t.saveSegmentMeta(); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
@ -154,7 +149,7 @@ func (t *mixCompactionTask) processExecuting() bool {
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
log.Info("mixCompactionTask fail in datanode")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
@ -240,10 +235,8 @@ func (t *mixCompactionTask) processCompleted() bool {
t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("mixCompactionTask processCompleted done")
task := t.GetTaskProto()
log.Info("mixCompactionTask processCompleted done",
zap.Int64("planID", task.GetPlanID()), zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second))
return true
}
@ -289,6 +282,15 @@ func (t *mixCompactionTask) doClean() error {
}
func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}
task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {

View File

@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats {
FailReason: s.FailReason,
IndexSize: s.IndexSize,
IndexVersion: s.IndexVersion,
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime),
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000),
}
}

View File

@ -137,23 +137,17 @@ func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[s
func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
v := jsonReq.Get(metricsinfo.MetricRequestParamINKey)
if !v.Exists() {
// default to get all segments from dataanode
// default to get all segments from datanode
return s.getDataNodeSegmentsJSON(ctx, req)
}
in := v.String()
if in == "dn" {
// TODO: support filter by collection id
if in == metricsinfo.MetricsRequestParamsInDN {
return s.getDataNodeSegmentsJSON(ctx, req)
}
if in == "dc" {
v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
collectionID := int64(0)
if v.Exists() {
collectionID = v.Int()
}
if in == metricsinfo.MetricsRequestParamsInDC {
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
segments := s.meta.getSegmentsMetrics(collectionID)
for _, seg := range segments {
isIndexed, indexedFields := s.meta.indexMeta.GetSegmentIndexedFields(seg.CollectionID, seg.SegmentID)
@ -163,7 +157,7 @@ func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRe
bs, err := json.Marshal(segments)
if err != nil {
log.Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error()))
log.Ctx(ctx).Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error()))
return "", nil
}
return string(bs), nil

View File

@ -1164,11 +1164,7 @@ func (s *Server) registerMetricsRequest() {
s.metricsRequest.RegisterMetricsRequest(metricsinfo.IndexKey,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
v := jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
collectionID := int64(0)
if v.Exists() {
collectionID = v.Int()
}
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return s.meta.indexMeta.GetIndexJSON(collectionID), nil
})
log.Ctx(s.ctx).Info("register metrics actions finished")

View File

@ -292,12 +292,14 @@ func (node *DataNode) registerMetricsRequest() {
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.flowgraphManager.GetSegmentsJSON(), nil
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return node.flowgraphManager.GetSegmentsJSON(collectionID), nil
})
node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.flowgraphManager.GetChannelsJSON(), nil
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return node.flowgraphManager.GetChannelsJSON(collectionID), nil
})
log.Ctx(node.ctx).Info("register metrics actions finished")
}

View File

@ -43,8 +43,8 @@ type FlowgraphManager interface {
GetFlowgraphCount() int
GetCollectionIDs() []int64
GetChannelsJSON() string
GetSegmentsJSON() string
GetChannelsJSON(collectionID int64) string
GetSegmentsJSON(collectionID int64) string
Close()
}
@ -121,9 +121,12 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
}
// GetChannelsJSON returns all channels in json format.
func (fm *fgManagerImpl) GetChannelsJSON() string {
func (fm *fgManagerImpl) GetChannelsJSON(collectionID int64) string {
var channels []*metricsinfo.Channel
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
if collectionID > 0 && ds.metacache.Collection() != collectionID {
return true
}
latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch)
channels = append(channels, &metricsinfo.Channel{
Name: ch,
@ -143,9 +146,13 @@ func (fm *fgManagerImpl) GetChannelsJSON() string {
return string(ret)
}
func (fm *fgManagerImpl) GetSegmentsJSON() string {
func (fm *fgManagerImpl) GetSegmentsJSON(collectionID int64) string {
var segments []*metricsinfo.Segment
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
if collectionID > 0 && ds.metacache.Collection() != collectionID {
return true
}
meta := ds.metacache
for _, segment := range meta.GetSegmentsBy() {
segments = append(segments, &metricsinfo.Segment{

View File

@ -193,8 +193,14 @@ func TestGetChannelsJSON(t *testing.T) {
assert.NoError(t, err)
expectedJSON := string(expectedBytes)
jsonResult := fm.GetChannelsJSON()
jsonResult := fm.GetChannelsJSON(0)
assert.JSONEq(t, expectedJSON, jsonResult)
jsonResult = fm.GetChannelsJSON(10)
var ret []*metricsinfo.Channel
err = json.Unmarshal([]byte(jsonResult), &ret)
assert.NoError(t, err)
assert.Equal(t, 0, len(ret))
}
func TestGetSegmentJSON(t *testing.T) {
@ -228,7 +234,12 @@ func TestGetSegmentJSON(t *testing.T) {
expectedJSON := string(expectedBytes)
ds.metacache.AddSegment(segment, pkStatsFactory, metacache.NoneBm25StatsFactory)
jsonResult := fm.GetSegmentsJSON()
fmt.Println(jsonResult)
jsonResult := fm.GetSegmentsJSON(0)
assert.JSONEq(t, expectedJSON, jsonResult)
jsonResult = fm.GetSegmentsJSON(10)
var ret []*metricsinfo.Segment
err = json.Unmarshal([]byte(jsonResult), &ret)
assert.NoError(t, err)
assert.Equal(t, 0, len(ret))
}

View File

@ -114,17 +114,17 @@ func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgra
return _c
}
// GetChannelsJSON provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetChannelsJSON() string {
ret := _m.Called()
// GetChannelsJSON provides a mock function with given fields: collectionID
func (_m *MockFlowgraphManager) GetChannelsJSON(collectionID int64) string {
ret := _m.Called(collectionID)
if len(ret) == 0 {
panic("no return value specified for GetChannelsJSON")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
if rf, ok := ret.Get(0).(func(int64) string); ok {
r0 = rf(collectionID)
} else {
r0 = ret.Get(0).(string)
}
@ -138,13 +138,14 @@ type MockFlowgraphManager_GetChannelsJSON_Call struct {
}
// GetChannelsJSON is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) GetChannelsJSON() *MockFlowgraphManager_GetChannelsJSON_Call {
return &MockFlowgraphManager_GetChannelsJSON_Call{Call: _e.mock.On("GetChannelsJSON")}
// - collectionID int64
func (_e *MockFlowgraphManager_Expecter) GetChannelsJSON(collectionID interface{}) *MockFlowgraphManager_GetChannelsJSON_Call {
return &MockFlowgraphManager_GetChannelsJSON_Call{Call: _e.mock.On("GetChannelsJSON", collectionID)}
}
func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Run(run func()) *MockFlowgraphManager_GetChannelsJSON_Call {
func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Run(run func(collectionID int64)) *MockFlowgraphManager_GetChannelsJSON_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
run(args[0].(int64))
})
return _c
}
@ -154,7 +155,7 @@ func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Return(_a0 string) *MockFlo
return _c
}
func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetChannelsJSON_Call {
func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func(int64) string) *MockFlowgraphManager_GetChannelsJSON_Call {
_c.Call.Return(run)
return _c
}
@ -309,17 +310,17 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s
return _c
}
// GetSegmentsJSON provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetSegmentsJSON() string {
ret := _m.Called()
// GetSegmentsJSON provides a mock function with given fields: collectionID
func (_m *MockFlowgraphManager) GetSegmentsJSON(collectionID int64) string {
ret := _m.Called(collectionID)
if len(ret) == 0 {
panic("no return value specified for GetSegmentsJSON")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
if rf, ok := ret.Get(0).(func(int64) string); ok {
r0 = rf(collectionID)
} else {
r0 = ret.Get(0).(string)
}
@ -333,13 +334,14 @@ type MockFlowgraphManager_GetSegmentsJSON_Call struct {
}
// GetSegmentsJSON is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) GetSegmentsJSON() *MockFlowgraphManager_GetSegmentsJSON_Call {
return &MockFlowgraphManager_GetSegmentsJSON_Call{Call: _e.mock.On("GetSegmentsJSON")}
// - collectionID int64
func (_e *MockFlowgraphManager_Expecter) GetSegmentsJSON(collectionID interface{}) *MockFlowgraphManager_GetSegmentsJSON_Call {
return &MockFlowgraphManager_GetSegmentsJSON_Call{Call: _e.mock.On("GetSegmentsJSON", collectionID)}
}
func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Run(run func()) *MockFlowgraphManager_GetSegmentsJSON_Call {
func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Run(run func(collectionID int64)) *MockFlowgraphManager_GetSegmentsJSON_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
run(args[0].(int64))
})
return _c
}
@ -349,7 +351,7 @@ func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Return(_a0 string) *MockFlo
return _c
}
func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetSegmentsJSON_Call {
func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) RunAndReturn(run func(int64) string) *MockFlowgraphManager_GetSegmentsJSON_Call {
_c.Call.Return(run)
return _c
}

View File

@ -50,6 +50,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {
IndexSize: segIndex.SerializeSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.GetCurrentIndexVersion(),
FinishedUTCTime: segIndex.FinishedTime,
}
}
@ -75,6 +76,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex {
SerializeSize: segIdx.IndexSize,
WriteHandoff: segIdx.WriteHandoff,
CurrentIndexVersion: segIdx.CurrentIndexVersion,
FinishedTime: segIdx.FinishedUTCTime,
}
}
@ -96,5 +98,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex {
IndexSize: segIndex.IndexSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.CurrentIndexVersion,
FinishedUTCTime: segIndex.FinishedUTCTime,
}
}

View File

@ -80,6 +80,7 @@ message SegmentIndex {
bool write_handoff = 15;
int32 current_index_version = 16;
int64 index_store_version = 17;
uint64 finished_time = 18;
}
message RegisterNodeRequest {

View File

@ -668,6 +668,7 @@ message CollectionLoadInfo {
LoadType load_type = 6;
int32 recover_times = 7;
repeated int64 load_fields = 8;
int64 dbID= 9;
}
message PartitionLoadInfo {

View File

@ -148,10 +148,14 @@ func getSlowQuery(node *Proxy) gin.HandlerFunc {
// buildReqParams fetch all parameters from query parameter of URL, add them into a map data structure.
// put key and value from query parameter into map, concatenate values with separator if values size is greater than 1
func buildReqParams(c *gin.Context, metricsType string) map[string]interface{} {
func buildReqParams(c *gin.Context, metricsType string, customParams ...*commonpb.KeyValuePair) map[string]interface{} {
ret := make(map[string]interface{})
ret[metricsinfo.MetricTypeKey] = metricsType
for _, kv := range customParams {
ret[kv.Key] = kv.Value
}
queryParams := c.Request.URL.Query()
for key, values := range queryParams {
if len(values) > 1 {
@ -163,7 +167,7 @@ func buildReqParams(c *gin.Context, metricsType string) map[string]interface{} {
return ret
}
func getQueryComponentMetrics(node *Proxy, metricsType string) gin.HandlerFunc {
func getQueryComponentMetrics(node *Proxy, metricsType string, customParams ...*commonpb.KeyValuePair) gin.HandlerFunc {
return func(c *gin.Context) {
params := buildReqParams(c, metricsType)
req, err := metricsinfo.ConstructGetMetricsRequest(params)
@ -185,7 +189,7 @@ func getQueryComponentMetrics(node *Proxy, metricsType string) gin.HandlerFunc {
}
}
func getDataComponentMetrics(node *Proxy, metricsType string) gin.HandlerFunc {
func getDataComponentMetrics(node *Proxy, metricsType string, customParams ...*commonpb.KeyValuePair) gin.HandlerFunc {
return func(c *gin.Context) {
params := buildReqParams(c, metricsType)
req, err := metricsinfo.ConstructGetMetricsRequest(params)

View File

@ -6753,10 +6753,10 @@ func (node *Proxy) RegisterRestRouter(router gin.IRouter) {
router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.ReplicaKey))
router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.ResourceGroupKey))
router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.AllTaskKey))
router.GET(http.QCSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey))
router.GET(http.QCSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey, metricsinfo.RequestParamsInQC))
// QueryNode requests that are forwarded from querycoord
router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey))
router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.SegmentKey, metricsinfo.RequestParamsInQN))
router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.ChannelKey))
// DataCoord requests that are forwarded from proxy
@ -6765,11 +6765,11 @@ func (node *Proxy) RegisterRestRouter(router gin.IRouter) {
router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTaskKey))
router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTaskKey))
router.GET(http.IndexListPath, getDataComponentMetrics(node, metricsinfo.IndexKey))
router.GET(http.DCSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey))
router.GET(http.DCSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey, metricsinfo.RequestParamsInDC))
// Datanode requests that are forwarded from datacoord
router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTaskKey))
router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey))
router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.SegmentKey, metricsinfo.RequestParamsInDN))
router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.ChannelKey))
// Database requests

View File

@ -301,17 +301,13 @@ func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRe
}
in := v.String()
if in == "qn" {
if in == metricsinfo.MetricsRequestParamsInQN {
// TODO: support filter by collection id
return s.getSegmentsFromQueryNode(ctx, req)
}
if in == "qc" {
v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
collectionID := int64(0)
if v.Exists() {
collectionID = v.Int()
}
if in == metricsinfo.MetricsRequestParamsInQC {
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
filteredSegments := s.dist.SegmentDistManager.GetSegmentDist(collectionID)
bs, err := json.Marshal(filteredSegments)
if err != nil {

View File

@ -171,14 +171,15 @@ func (job *LoadCollectionJob) Execute() error {
}
}
// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}
// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID())
if len(replicas) == 0 {
// API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API.
// Then we can implement dynamic replica changed in different resource group independently.
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
@ -213,6 +214,7 @@ func (job *LoadCollectionJob) Execute() error {
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadCollection,
LoadFields: req.GetLoadFields(),
DbID: collectionInfo.GetDbId(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
@ -371,13 +373,15 @@ func (job *LoadPartitionJob) Execute() error {
}
}
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}
// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(context.TODO(), req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
if err != nil {
msg := "failed to spawn replica for collection"
@ -412,6 +416,7 @@ func (job *LoadPartitionJob) Execute() error {
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
LoadFields: req.GetLoadFields(),
DbID: collectionInfo.GetDbId(),
},
CreatedAt: time.Now(),
LoadSpan: sp,

View File

@ -300,15 +300,24 @@ func (m *ChannelDistManager) updateCollectionIndex() {
}
}
func (m *ChannelDistManager) GetChannelDist() []*metricsinfo.DmChannel {
func (m *ChannelDistManager) GetChannelDist(collectionID int64) []*metricsinfo.DmChannel {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
var channels []*metricsinfo.DmChannel
for _, nodeChannels := range m.channels {
for _, channel := range nodeChannels.channels {
channels = append(channels, newDmChannelMetricsFrom(channel))
var ret []*metricsinfo.DmChannel
if collectionID > 0 {
if channels, ok := m.collectionIndex[collectionID]; ok {
for _, channel := range channels {
ret = append(ret, newDmChannelMetricsFrom(channel))
}
}
return channels
return ret
}
for _, channels := range m.collectionIndex {
for _, channel := range channels {
ret = append(ret, newDmChannelMetricsFrom(channel))
}
}
return ret
}

View File

@ -207,7 +207,7 @@ func TestGetChannelDistJSON(t *testing.T) {
manager.Update(1, channel1)
manager.Update(2, channel2)
channels := manager.GetChannelDist()
channels := manager.GetChannelDist(0)
assert.Equal(t, 2, len(channels))
checkResult := func(channel *metricsinfo.DmChannel) {

View File

@ -285,7 +285,13 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e
// we should save it's CollectionLoadInfo to meta store
for _, partition := range m.GetAllPartitions(ctx) {
// In old version, collection would NOT be stored if the partition existed.
if _, ok := m.collections[partition.GetCollectionID()]; !ok {
if !m.Exist(ctx, partition.GetCollectionID()) {
collectionInfo, err := broker.DescribeCollection(ctx, partition.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}
col := &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: partition.GetCollectionID(),
@ -293,10 +299,11 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e
Status: partition.GetStatus(),
FieldIndexID: partition.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
DbID: collectionInfo.GetDbId(),
},
LoadPercentage: 100,
}
err := m.PutCollection(ctx, col)
err = m.PutCollection(ctx, col)
if err != nil {
return err
}

View File

@ -42,10 +42,10 @@ func NewDistributionManager() *DistributionManager {
// It includes segments, DM channels, and leader views.
// If there are no segments, channels, or leader views, it returns an empty string.
// In case of an error during JSON marshaling, it returns the error.
func (dm *DistributionManager) GetDistributionJSON() string {
segments := dm.GetSegmentDist(0)
channels := dm.GetChannelDist()
leaderView := dm.GetLeaderView()
func (dm *DistributionManager) GetDistributionJSON(collectionID int64) string {
segments := dm.GetSegmentDist(collectionID)
channels := dm.GetChannelDist(collectionID)
leaderView := dm.GetLeaderView(collectionID)
dist := &metricsinfo.QueryCoordDist{
Segments: segments,

View File

@ -81,7 +81,7 @@ func TestGetDistributionJSON(t *testing.T) {
manager.LeaderViewManager.Update(2, leaderView2)
// Call GetDistributionJSON
jsonOutput := manager.GetDistributionJSON()
jsonOutput := manager.GetDistributionJSON(0)
// Verify JSON output
var dist metricsinfo.QueryCoordDist
@ -91,4 +91,13 @@ func TestGetDistributionJSON(t *testing.T) {
assert.Len(t, dist.Segments, 2)
assert.Len(t, dist.DMChannels, 2)
assert.Len(t, dist.LeaderViews, 2)
jsonOutput = manager.GetDistributionJSON(1000)
var dist2 metricsinfo.QueryCoordDist
err = json.Unmarshal([]byte(jsonOutput), &dist2)
assert.NoError(t, err)
assert.Len(t, dist2.Segments, 0)
assert.Len(t, dist2.DMChannels, 0)
assert.Len(t, dist2.LeaderViews, 0)
}

View File

@ -320,13 +320,26 @@ func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderView
// GetLeaderView returns a slice of LeaderView objects, each representing the state of a leader node.
// It traverses the views map, converts each LeaderView to a metricsinfo.LeaderView, and collects them into a slice.
// The method locks the views map for reading to ensure thread safety.
func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView {
func (mgr *LeaderViewManager) GetLeaderView(collectionID int64) []*metricsinfo.LeaderView {
mgr.rwmutex.RLock()
defer mgr.rwmutex.RUnlock()
var leaderViews []*metricsinfo.LeaderView
for _, nodeViews := range mgr.views {
for _, lv := range nodeViews.views {
var filteredViews []*LeaderView
if collectionID > 0 {
if lv, ok := nodeViews.collectionViews[collectionID]; ok {
filteredViews = lv
} else {
// if collectionID is not found, return empty leader views
return leaderViews
}
} else {
// if collectionID is not set, return all leader views
filteredViews = nodeViews.views
}
for _, lv := range filteredViews {
errString := ""
if lv.UnServiceableError != nil {
errString = lv.UnServiceableError.Error()

View File

@ -23,13 +23,11 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -359,7 +357,7 @@ func TestGetLeaderView(t *testing.T) {
manager.Update(2, leaderView2)
// Call GetLeaderView
leaderViews := manager.GetLeaderView()
leaderViews := manager.GetLeaderView(0)
jsonOutput, err := json.Marshal(leaderViews)
assert.NoError(t, err)
@ -368,7 +366,6 @@ func TestGetLeaderView(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, result, 2)
log.Info("====", zap.Any("result", result))
checkResult := func(lv *metricsinfo.LeaderView) {
if lv.LeaderID == 1 {
assert.Equal(t, int64(100), lv.CollectionID)
@ -394,4 +391,10 @@ func TestGetLeaderView(t *testing.T) {
for _, lv := range result {
checkResult(lv)
}
leaderViews = manager.GetLeaderView(1)
assert.Len(t, leaderViews, 0)
leaderViews = manager.GetLeaderView(100)
assert.Len(t, leaderViews, 1)
}

View File

@ -638,17 +638,17 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run
return _c
}
// GetTargetJSON provides a mock function with given fields: ctx, scope
func (_m *MockTargetManager) GetTargetJSON(ctx context.Context, scope int32) string {
ret := _m.Called(ctx, scope)
// GetTargetJSON provides a mock function with given fields: ctx, scope, collectionID
func (_m *MockTargetManager) GetTargetJSON(ctx context.Context, scope int32, collectionID int64) string {
ret := _m.Called(ctx, scope, collectionID)
if len(ret) == 0 {
panic("no return value specified for GetTargetJSON")
}
var r0 string
if rf, ok := ret.Get(0).(func(context.Context, int32) string); ok {
r0 = rf(ctx, scope)
if rf, ok := ret.Get(0).(func(context.Context, int32, int64) string); ok {
r0 = rf(ctx, scope, collectionID)
} else {
r0 = ret.Get(0).(string)
}
@ -664,13 +664,14 @@ type MockTargetManager_GetTargetJSON_Call struct {
// GetTargetJSON is a helper method to define mock.On call
// - ctx context.Context
// - scope int32
func (_e *MockTargetManager_Expecter) GetTargetJSON(ctx interface{}, scope interface{}) *MockTargetManager_GetTargetJSON_Call {
return &MockTargetManager_GetTargetJSON_Call{Call: _e.mock.On("GetTargetJSON", ctx, scope)}
// - collectionID int64
func (_e *MockTargetManager_Expecter) GetTargetJSON(ctx interface{}, scope interface{}, collectionID interface{}) *MockTargetManager_GetTargetJSON_Call {
return &MockTargetManager_GetTargetJSON_Call{Call: _e.mock.On("GetTargetJSON", ctx, scope, collectionID)}
}
func (_c *MockTargetManager_GetTargetJSON_Call) Run(run func(ctx context.Context, scope int32)) *MockTargetManager_GetTargetJSON_Call {
func (_c *MockTargetManager_GetTargetJSON_Call) Run(run func(ctx context.Context, scope int32, collectionID int64)) *MockTargetManager_GetTargetJSON_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int32))
run(args[0].(context.Context), args[1].(int32), args[2].(int64))
})
return _c
}
@ -680,7 +681,7 @@ func (_c *MockTargetManager_GetTargetJSON_Call) Return(_a0 string) *MockTargetMa
return _c
}
func (_c *MockTargetManager_GetTargetJSON_Call) RunAndReturn(run func(context.Context, int32) string) *MockTargetManager_GetTargetJSON_Call {
func (_c *MockTargetManager_GetTargetJSON_Call) RunAndReturn(run func(context.Context, int32, int64) string) *MockTargetManager_GetTargetJSON_Call {
_c.Call.Return(run)
return _c
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -506,7 +507,7 @@ func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, colle
// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation,
// marshals them into a JSON string, and returns the result.
// If an error occurs during marshaling, it logs a warning and returns an empty string.
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -515,9 +516,19 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
for k, v := range r.replicaPB.GetChannelNodeInfos() {
channelTowRWNodes[k] = v.GetRwNodes()
}
collectionInfo := meta.GetCollection(ctx, r.GetCollectionID())
dbID := util.InvalidDBID
if collectionInfo == nil {
log.Ctx(ctx).Warn("failed to get collection info", zap.Int64("collectionID", r.GetCollectionID()))
} else {
dbID = collectionInfo.GetDbID()
}
return &metricsinfo.Replica{
ID: r.GetID(),
CollectionID: r.GetCollectionID(),
DatabaseID: dbID,
RWNodes: r.GetNodes(),
ResourceGroup: r.GetResourceGroup(),
RONodes: r.GetRONodes(),

View File

@ -545,7 +545,26 @@ func TestGetReplicasJSON(t *testing.T) {
err = replicaManager.put(ctx, replica2)
assert.NoError(t, err)
jsonOutput := replicaManager.GetReplicasJSON(ctx)
meta := &Meta{
CollectionManager: NewCollectionManager(catalog),
}
err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 100,
DbID: int64(1),
},
})
assert.NoError(t, err)
err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 200,
},
})
assert.NoError(t, err)
jsonOutput := replicaManager.GetReplicasJSON(ctx, meta)
var replicas []*metricsinfo.Replica
err = json.Unmarshal([]byte(jsonOutput), &replicas)
assert.NoError(t, err)
@ -556,10 +575,12 @@ func TestGetReplicasJSON(t *testing.T) {
assert.Equal(t, int64(100), replica.CollectionID)
assert.Equal(t, "rg1", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes)
assert.Equal(t, int64(1), replica.DatabaseID)
} else if replica.ID == 2 {
assert.Equal(t, int64(200), replica.CollectionID)
assert.Equal(t, "rg2", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes)
assert.Equal(t, int64(0), replica.DatabaseID)
} else {
assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID)
}

View File

@ -254,9 +254,10 @@ func (m *SegmentDistManager) GetSegmentDist(collectionID int64) []*metricsinfo.S
var segments []*metricsinfo.Segment
for _, nodeSeg := range m.segments {
for _, segment := range nodeSeg.segments {
if collectionID == 0 || segment.GetCollectionID() == collectionID {
segments = append(segments, newSegmentMetricsFrom(segment))
if collectionID > 0 && segment.GetCollectionID() != collectionID {
continue
}
segments = append(segments, newSegmentMetricsFrom(segment))
}
}

View File

@ -207,8 +207,13 @@ func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget {
return t.collectionTargetMap[collectionID]
}
func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget {
return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordTarget {
func (t *target) toQueryCoordCollectionTargets(collectionID int64) []*metricsinfo.QueryCoordTarget {
var ret []*metricsinfo.QueryCoordTarget
for k, v := range t.collectionTargetMap {
if collectionID > 0 && collectionID != k {
continue
}
segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment {
return metrics.NewSegmentFrom(s)
})
@ -217,10 +222,12 @@ func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget
return metrics.NewDMChannelFrom(ch.VchannelInfo)
})
return &metricsinfo.QueryCoordTarget{
ret = append(ret, &metricsinfo.QueryCoordTarget{
CollectionID: k,
Segments: segments,
DMChannels: dmChannels,
}
})
}
return ret
}

View File

@ -70,7 +70,7 @@ type TargetManagerInterface interface {
SaveCurrentTarget(ctx context.Context, catalog metastore.QueryCoordCatalog)
Recover(ctx context.Context, catalog metastore.QueryCoordCatalog) error
CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool
GetTargetJSON(ctx context.Context, scope TargetScope) string
GetTargetJSON(ctx context.Context, scope TargetScope, collectionID int64) string
GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error)
IsCurrentTargetReady(ctx context.Context, collectionID int64) bool
}
@ -638,7 +638,7 @@ func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, s
return false
}
func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope) string {
func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope, collectionID int64) string {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
@ -647,7 +647,7 @@ func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope)
return ""
}
v, err := json.Marshal(ret.toQueryCoordCollectionTargets())
v, err := json.Marshal(ret.toQueryCoordCollectionTargets(collectionID))
if err != nil {
log.Warn("failed to marshal target", zap.Error(err))
return ""

View File

@ -669,7 +669,7 @@ func (suite *TargetManagerSuite) TestGetTargetJSON() {
suite.NoError(suite.mgr.UpdateCollectionNextTarget(ctx, collectionID))
suite.True(suite.mgr.UpdateCollectionCurrentTarget(ctx, collectionID))
jsonStr := suite.mgr.GetTargetJSON(ctx, CurrentTarget)
jsonStr := suite.mgr.GetTargetJSON(ctx, CurrentTarget, 0)
assert.NotEmpty(suite.T(), jsonStr)
var currentTarget []*metricsinfo.QueryCoordTarget
@ -679,6 +679,14 @@ func (suite *TargetManagerSuite) TestGetTargetJSON() {
assert.Equal(suite.T(), collectionID, currentTarget[0].CollectionID)
assert.Len(suite.T(), currentTarget[0].DMChannels, 2)
assert.Len(suite.T(), currentTarget[0].Segments, 2)
jsonStr = suite.mgr.GetTargetJSON(ctx, CurrentTarget, 1)
assert.NotEmpty(suite.T(), jsonStr)
var currentTarget2 []*metricsinfo.QueryCoordTarget
err = json.Unmarshal([]byte(jsonStr), &currentTarget)
suite.NoError(err)
assert.Len(suite.T(), currentTarget2, 0)
}
func BenchmarkTargetManager(b *testing.B) {

View File

@ -202,7 +202,8 @@ func (s *Server) registerMetricsRequest() {
}
QueryDistAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.dist.GetDistributionJSON(), nil
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return s.dist.GetDistributionJSON(collectionID), nil
}
QueryTargetAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
@ -211,11 +212,13 @@ func (s *Server) registerMetricsRequest() {
if v.Exists() {
scope = meta.TargetScope(v.Int())
}
return s.targetMgr.GetTargetJSON(ctx, scope), nil
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return s.targetMgr.GetTargetJSON(ctx, scope, collectionID), nil
}
QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.meta.GetReplicasJSON(ctx), nil
return s.meta.GetReplicasJSON(ctx, s.meta), nil
}
QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {

View File

@ -153,7 +153,6 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re
if err != nil {
return nil, err
}
// Spawn it in replica manager.
replicas, err := m.ReplicaManager.Spawn(ctx, collection, replicaNumInRG, channels)
if err != nil {

View File

@ -185,8 +185,8 @@ func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetr
}
// getChannelJSON returns the JSON string of channels
func getChannelJSON(node *QueryNode) string {
stats := node.pipelineManager.GetChannelStats()
func getChannelJSON(node *QueryNode, collectionID int64) string {
stats := node.pipelineManager.GetChannelStats(collectionID)
ret, err := json.Marshal(stats)
if err != nil {
log.Warn("failed to marshal channels", zap.Error(err))
@ -196,10 +196,14 @@ func getChannelJSON(node *QueryNode) string {
}
// getSegmentJSON returns the JSON string of segments
func getSegmentJSON(node *QueryNode) string {
func getSegmentJSON(node *QueryNode, collectionID int64) string {
allSegments := node.manager.Segment.GetBy()
var ms []*metricsinfo.Segment
for _, s := range allSegments {
if collectionID > 0 && s.Collection() != collectionID {
continue
}
indexes := make([]*metricsinfo.IndexedField, 0, len(s.Indexes()))
for _, index := range s.Indexes() {
indexes = append(indexes, &metricsinfo.IndexedField{
@ -208,6 +212,7 @@ func getSegmentJSON(node *QueryNode) string {
IndexSize: index.IndexInfo.IndexSize,
BuildID: index.IndexInfo.BuildID,
IsLoaded: index.IsLoaded,
HasRawData: s.HasRawData(index.IndexInfo.FieldID),
})
}

View File

@ -59,7 +59,7 @@ func TestGetPipelineJSON(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, pipelineManager.Num())
stats := pipelineManager.GetChannelStats()
stats := pipelineManager.GetChannelStats(0)
expectedStats := []*metricsinfo.Channel{
{
Name: ch,
@ -71,7 +71,7 @@ func TestGetPipelineJSON(t *testing.T) {
}
assert.Equal(t, expectedStats, stats)
JSONStr := getChannelJSON(&QueryNode{pipelineManager: pipelineManager})
JSONStr := getChannelJSON(&QueryNode{pipelineManager: pipelineManager}, 0)
assert.NotEmpty(t, JSONStr)
var actualStats []*metricsinfo.Channel
@ -86,6 +86,7 @@ func TestGetSegmentJSON(t *testing.T) {
segment.EXPECT().Collection().Return(int64(1001))
segment.EXPECT().Partition().Return(int64(2001))
segment.EXPECT().MemSize().Return(int64(1024))
segment.EXPECT().HasRawData(mock.Anything).Return(true)
segment.EXPECT().Indexes().Return([]*segments.IndexedFieldInfo{
{
IndexInfo: &querypb.FieldIndexInfo{
@ -106,7 +107,7 @@ func TestGetSegmentJSON(t *testing.T) {
mockedSegmentManager.EXPECT().GetBy().Return([]segments.Segment{segment})
node.manager = &segments.Manager{Segment: mockedSegmentManager}
jsonStr := getSegmentJSON(node)
jsonStr := getSegmentJSON(node, 0)
assert.NotEmpty(t, jsonStr)
var segments []*metricsinfo.Segment

View File

@ -42,7 +42,7 @@ type Manager interface {
Remove(channels ...string)
Start(channels ...string) error
Close()
GetChannelStats() []*metricsinfo.Channel
GetChannelStats(collectionID int64) []*metricsinfo.Channel
}
type manager struct {
@ -157,12 +157,15 @@ func (m *manager) Close() {
}
}
func (m *manager) GetChannelStats() []*metricsinfo.Channel {
func (m *manager) GetChannelStats(collectionID int64) []*metricsinfo.Channel {
m.mu.RLock()
defer m.mu.RUnlock()
ret := make([]*metricsinfo.Channel, 0, len(m.channel2Pipeline))
for ch, p := range m.channel2Pipeline {
if collectionID > 0 && p.GetCollectionID() != collectionID {
continue
}
delegator, ok := m.delegators.Get(ch)
if ok {
tt := delegator.GetTSafe()

View File

@ -284,12 +284,14 @@ func (node *QueryNode) registerMetricsRequest() {
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SegmentKey,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return getSegmentJSON(node), nil
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return getSegmentJSON(node, collectionID), nil
})
node.metricsRequest.RegisterMetricsRequest(metricsinfo.ChannelKey,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return getChannelJSON(node), nil
collectionID := metricsinfo.GetCollectionIDFromRequest(jsonReq)
return getChannelJSON(node, collectionID), nil
})
log.Ctx(node.ctx).Info("register metrics actions finished")
}

View File

@ -86,17 +86,21 @@ const (
MetricRequestParamTargetScopeKey = "target_scope"
MetricRequestParamINKey = "in"
MetricRequestParamCollectionIDKey = "collection_id"
MetricRequestParamINKey = "in"
MetricsRequestParamsInDC = "dc"
MetricsRequestParamsInQC = "qc"
MetricsRequestParamsInDN = "dn"
MetricsRequestParamsInQN = "qn"
)
var MetricRequestParamINValue = map[string]struct{}{
"dc": {},
"qc": {},
"dn": {},
"qn": {},
}
var (
RequestParamsInDC = &commonpb.KeyValuePair{Key: MetricRequestParamINKey, Value: MetricsRequestParamsInDC}
RequestParamsInQC = &commonpb.KeyValuePair{Key: MetricRequestParamINKey, Value: MetricsRequestParamsInQC}
RequestParamsInDN = &commonpb.KeyValuePair{Key: MetricRequestParamINKey, Value: MetricsRequestParamsInDN}
RequestParamsInQN = &commonpb.KeyValuePair{Key: MetricRequestParamINKey, Value: MetricsRequestParamsInQN}
)
type MetricsRequestAction func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error)
@ -172,6 +176,14 @@ func ParseMetricRequestType(jsonRet gjson.Result) (string, error) {
return "", fmt.Errorf("%s or %s not found in request", MetricTypeKey, MetricRequestTypeKey)
}
func GetCollectionIDFromRequest(jsonReq gjson.Result) int64 {
v := jsonReq.Get(MetricRequestParamCollectionIDKey)
if !v.Exists() {
return 0
}
return v.Int()
}
// ConstructRequestByMetricType constructs a request according to the metric type
func ConstructRequestByMetricType(metricType string) (*milvuspb.GetMetricsRequest, error) {
m := make(map[string]interface{})

View File

@ -147,6 +147,7 @@ type IndexedField struct {
BuildID int64 `json:"build_id,omitempty,string"`
IndexSize int64 `json:"index_size,omitempty,string"`
IsLoaded bool `json:"is_loaded,omitempty,string"`
HasRawData bool `json:"has_raw_data,omitempty"`
}
type QueryCoordTarget struct {
@ -195,6 +196,7 @@ type ResourceGroup struct {
type Replica struct {
ID int64 `json:"ID,omitempty,string"`
CollectionID int64 `json:"collectionID,omitempty,string"`
DatabaseID int64 `json:"database_id,omitempty,string"`
RWNodes []int64 `json:"rw_nodes,omitempty"`
ResourceGroup string `json:"resource_group,omitempty"`
RONodes []int64 `json:"ro_nodes,omitempty"`
@ -382,8 +384,8 @@ type ImportTask struct {
}
type CompactionTask struct {
PlanID int64 `json:"plan_id,omitempty"`
CollectionID int64 `json:"collection_id,omitempty"`
PlanID int64 `json:"plan_id,omitempty,string"`
CollectionID int64 `json:"collection_id,omitempty,string"`
Type string `json:"type,omitempty"`
State string `json:"state,omitempty"`
FailReason string `json:"fail_reason,omitempty"`
@ -447,7 +449,7 @@ type Collection struct {
ConsistencyLevel string `json:"consistency_level,omitempty"`
Aliases []string `json:"aliases,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
DBName string `json:"db_name,omitempty,string"`
DBName string `json:"db_name,omitempty"`
NumPartitions int `json:"num_partitions,omitempty,string"`
VirtualChannelNames []string `json:"virtual_channel_names,omitempty"`
PhysicalChannelNames []string `json:"physical_channel_names,omitempty"`
@ -458,7 +460,7 @@ type Collection struct {
type Database struct {
DBName string `json:"db_name,omitempty"`
DBID int64 `json:"dbID,omitempty"`
DBID int64 `json:"dbID,omitempty,string"`
CreatedTimestamp string `json:"created_timestamp,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
}