feat: add segment,pipeline, replica and resourcegroup api for WebUI (#37344)

issue: #36621

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/37501/head
jaime 2024-11-07 11:52:25 +08:00 committed by GitHub
parent 9b6dd23f8e
commit f348bd9441
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
58 changed files with 2836 additions and 253 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -53,6 +54,8 @@ type ChannelManager interface {
GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string
GetChannelsByCollectionID(collectionID int64) []RWChannel
GetChannelNamesByCollectionID(collectionID int64) []string
GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo
}
// An interface sessionManager implments
@ -739,6 +742,22 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error {
return nil
}
func (m *ChannelManagerImpl) GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo {
m.mu.RLock()
defer m.mu.RUnlock()
infos := make(map[int64]map[string]*datapb.ChannelWatchInfo)
for _, nc := range m.store.GetNodesChannels() {
for _, ch := range nc.Channels {
watchInfo := proto.Clone(ch.GetWatchInfo()).(*datapb.ChannelWatchInfo)
if _, ok := infos[nc.NodeID]; !ok {
infos[nc.NodeID] = make(map[string]*datapb.ChannelWatchInfo)
}
infos[nc.NodeID][watchInfo.Vchan.ChannelName] = watchInfo
}
}
return infos
}
func inferStateByOpType(opType ChannelOpType) datapb.ChannelWatchState {
switch opType {
case Watch:

View File

@ -805,3 +805,51 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {
func (s *ChannelManagerSuite) TestCheckLoop() {}
func (s *ChannelManagerSuite) TestGet() {}
func (s *ChannelManagerSuite) TestGetChannelWatchInfos() {
store := NewMockRWChannelStore(s.T())
store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{
NodeID: 1,
Channels: map[string]RWChannel{
"ch1": &channelMeta{
WatchInfo: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
ChannelName: "ch1",
},
StartTs: 100,
State: datapb.ChannelWatchState_ToWatch,
OpID: 1,
},
},
},
},
{
NodeID: 2,
Channels: map[string]RWChannel{
"ch2": &channelMeta{
WatchInfo: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
ChannelName: "ch2",
},
StartTs: 10,
State: datapb.ChannelWatchState_WatchSuccess,
OpID: 1,
},
},
},
},
})
cm := &ChannelManagerImpl{store: store}
infos := cm.GetChannelWatchInfos()
s.Equal(2, len(infos))
s.Equal("ch1", infos[1]["ch1"].GetVchan().ChannelName)
s.Equal("ch2", infos[2]["ch2"].GetVchan().ChannelName)
// test empty value
store.EXPECT().GetNodesChannels().Unset()
store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{})
infos = cm.GetChannelWatchInfos()
s.Equal(0, len(infos))
}

View File

@ -25,11 +25,11 @@ import (
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"

View File

@ -2025,3 +2025,28 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
return metricMutation, nil
}
func (m *meta) getSegmentsMetrics() []*metricsinfo.Segment {
m.RLock()
defer m.RUnlock()
segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments))
for _, s := range m.segments.segments {
segments = append(segments, &metricsinfo.Segment{
SegmentID: s.ID,
CollectionID: s.CollectionID,
PartitionID: s.PartitionID,
Channel: s.InsertChannel,
NumOfRows: s.NumOfRows,
State: s.State.String(),
MemSize: s.size.Load(),
Level: s.Level.String(),
IsImporting: s.IsImporting,
Compacted: s.Compacted,
IsSorted: s.IsSorted,
NodeID: paramtable.GetNodeID(),
})
}
return segments
}

View File

@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -43,6 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/testutils"
)
@ -1319,3 +1321,62 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
assert.NotNil(t, c)
})
}
func TestMeta_GetSegmentsJSON(t *testing.T) {
// Create a mock meta object
m := &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "channel1",
NumOfRows: 100,
State: commonpb.SegmentState_Growing,
MaxRowNum: 1000,
Compacted: false,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 2,
InsertChannel: "channel2",
NumOfRows: 200,
State: commonpb.SegmentState_Sealed,
MaxRowNum: 2000,
Compacted: true,
},
},
},
},
}
segments := m.getSegmentsMetrics()
// Check the length of the segments
assert.Equal(t, 2, len(segments))
slices.SortFunc(segments, func(i, j *metricsinfo.Segment) int { return int(i.SegmentID - j.SegmentID) })
// Check the first segment
assert.Equal(t, int64(1), segments[0].SegmentID)
assert.Equal(t, int64(1), segments[0].CollectionID)
assert.Equal(t, int64(1), segments[0].PartitionID)
assert.Equal(t, "channel1", segments[0].Channel)
assert.Equal(t, int64(100), segments[0].NumOfRows)
assert.Equal(t, "Growing", segments[0].State)
assert.False(t, segments[0].Compacted)
// Check the second segment
assert.Equal(t, int64(2), segments[1].SegmentID)
assert.Equal(t, int64(2), segments[1].CollectionID)
assert.Equal(t, int64(2), segments[1].PartitionID)
assert.Equal(t, "channel2", segments[1].Channel)
assert.Equal(t, int64(200), segments[1].NumOfRows)
assert.Equal(t, "Sealed", segments[1].State)
assert.True(t, segments[1].Compacted)
}

View File

@ -19,6 +19,7 @@ package datacoord
import (
"context"
"encoding/json"
"sync"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
@ -27,8 +28,10 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -82,74 +85,90 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor
return ret
}
// GetSyncTaskMetrics retrieves and aggregates the sync task metrics of the datanode.
func (s *Server) GetSyncTaskMetrics(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
) (string, error) {
resp, err := s.requestDataNodeGetMetrics(ctx, req)
if err != nil {
return "", err
}
tasks := make(map[string][]*metricsinfo.SyncTask, resp.Len())
resp.Range(func(key string, value *milvuspb.GetMetricsResponse) bool {
if value.Response != "" {
var sts []*metricsinfo.SyncTask
if err1 := json.Unmarshal([]byte(value.Response), &sts); err1 != nil {
log.Warn("failed to unmarshal sync task metrics")
err = err1
return false
}
tasks[key] = sts
func (s *Server) getChannelsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
channels, err := getMetrics[*metricsinfo.Channel](s, ctx, req)
// fill checkpoint timestamp
channel2Checkpoints := s.meta.GetChannelCheckpoints()
for _, channel := range channels {
if cp, ok := channel2Checkpoints[channel.Name]; ok {
channel.CheckpointTS = typeutil.TimestampToString(cp.GetTimestamp())
} else {
log.Warn("channel not found in meta cache", zap.String("channel", channel.Name))
}
return true
})
if err != nil {
return "", err
}
if len(tasks) == 0 {
return "", nil
}
bs, err := json.Marshal(tasks)
if err != nil {
return "", err
}
return (string)(bs), nil
return metricsinfo.MarshalGetMetricsValues(channels, err)
}
func (s *Server) requestDataNodeGetMetrics(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
) (*typeutil.ConcurrentMap[string, *milvuspb.GetMetricsResponse], error) {
nodes := s.cluster.GetSessions()
// mergeChannels merges the channel metrics from data nodes and channel watch infos from channel manager
// dnChannels: a slice of Channel metrics from data nodes
// dcChannels: a map of channel watch infos from the channel manager, keyed by node ID and channel name
func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[string]*datapb.ChannelWatchInfo) []*metricsinfo.Channel {
mergedChannels := make([]*metricsinfo.Channel, 0)
rets := typeutil.NewConcurrentMap[string, *milvuspb.GetMetricsResponse]()
wg, ctx := errgroup.WithContext(ctx)
for _, node := range nodes {
wg.Go(func() error {
cli, err := node.GetOrCreateClient(ctx)
if err != nil {
return err
// Add or update channels from data nodes
for _, dnChannel := range dnChannels {
if dcChannelMap, ok := dcChannels[dnChannel.NodeID]; ok {
if dcChannel, ok := dcChannelMap[dnChannel.Name]; ok {
dnChannel.WatchState = dcChannel.State.String()
delete(dcChannelMap, dnChannel.Name)
}
ret, err := cli.GetMetrics(ctx, req)
if err != nil {
return err
}
key := metricsinfo.ConstructComponentName(typeutil.DataNodeRole, node.NodeID())
rets.Insert(key, ret)
return nil
})
}
mergedChannels = append(mergedChannels, dnChannel)
}
err := wg.Wait()
// Add remaining channels from channel manager
for nodeID, dcChannelMap := range dcChannels {
for _, dcChannel := range dcChannelMap {
mergedChannels = append(mergedChannels, &metricsinfo.Channel{
Name: dcChannel.Vchan.ChannelName,
CollectionID: dcChannel.Vchan.CollectionID,
WatchState: dcChannel.State.String(),
NodeID: nodeID,
})
}
}
return mergedChannels
}
func (s *Server) getDistJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) string {
segments := s.meta.getSegmentsMetrics()
var channels []*metricsinfo.DmChannel
for nodeID, ch := range s.channelManager.GetChannelWatchInfos() {
for _, chInfo := range ch {
dmChannel := metrics.NewDMChannelFrom(chInfo.GetVchan())
dmChannel.NodeID = nodeID
dmChannel.WatchState = chInfo.State.String()
dmChannel.StartWatchTS = chInfo.GetStartTs()
channels = append(channels, dmChannel)
}
}
if len(segments) == 0 && len(channels) == 0 {
return ""
}
dist := &metricsinfo.DataCoordDist{
Segments: segments,
DMChannels: channels,
}
bs, err := json.Marshal(dist)
if err != nil {
return nil, err
log.Warn("marshal dist value failed", zap.String("err", err.Error()))
return ""
}
return rets, nil
return string(bs)
}
func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
ret, err := getMetrics[*metricsinfo.Segment](s, ctx, req)
return metricsinfo.MarshalGetMetricsValues(ret, err)
}
func (s *Server) getSyncTaskJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
ret, err := getMetrics[*metricsinfo.SyncTask](s, ctx, req)
return metricsinfo.MarshalGetMetricsValues(ret, err)
}
// getSystemInfoMetrics composes data cluster metrics
@ -322,3 +341,44 @@ func (s *Server) getIndexNodeMetrics(ctx context.Context, req *milvuspb.GetMetri
infos.BaseComponentInfos.HasError = false
return infos, nil
}
// getMetrics retrieves and aggregates the metrics of the datanode to a slice
func getMetrics[T any](s *Server, ctx context.Context, req *milvuspb.GetMetricsRequest) ([]T, error) {
var metrics []T
var mu sync.Mutex
errorGroup, ctx := errgroup.WithContext(ctx)
nodes := s.cluster.GetSessions()
for _, node := range nodes {
errorGroup.Go(func() error {
cli, err := node.GetOrCreateClient(ctx)
if err != nil {
return err
}
resp, err := cli.GetMetrics(ctx, req)
if err != nil {
log.Warn("failed to get metric from DataNode", zap.Int64("nodeID", node.NodeID()))
return err
}
if resp.Response == "" {
return nil
}
var infos []T
err = json.Unmarshal([]byte(resp.Response), &infos)
if err != nil {
log.Warn("invalid metrics of data node was found", zap.Error(err))
return err
}
mu.Lock()
metrics = append(metrics, infos...)
mu.Unlock()
return nil
})
}
err := errorGroup.Wait()
return metrics, err
}

View File

@ -18,17 +18,22 @@ package datacoord
import (
"context"
"encoding/json"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -206,10 +211,25 @@ func TestGetSyncTaskMetrics(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
task := `[{"segment_id": 1, "batch_rows": 100, "segment_level": "L0", "ts_from": 1000, "ts_to": 2000,"delta_row_count": 10, "flush_size": 1024, "running_time": 2000000000}]`
tasks := []metricsinfo.SyncTask{
{
SegmentID: 1,
BatchRows: 100,
SegmentLevel: "L0",
TSFrom: 1000,
TSTo: 2000,
DeltaRowCount: 10,
FlushSize: 1024,
RunningTime: "2h",
},
}
tasksBytes, err := json.Marshal(tasks)
assert.NoError(t, err)
expectedJSON := string(tasksBytes)
mockResp := &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: task,
Response: expectedJSON,
}
mockClient := &mockMetricDataNodeClient{
@ -226,9 +246,8 @@ func TestGetSyncTaskMetrics(t *testing.T) {
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
actualJSON, err := svr.GetSyncTaskMetrics(ctx, req)
actualJSON, err := svr.getSyncTaskJSON(ctx, req)
assert.NoError(t, err)
expectedJSON := `{"datanode1":[{"segment_id":1,"batch_rows":100,"segment_level":"L0","ts_from":1000,"ts_to":2000,"delta_row_count":10,"flush_size":1024,"running_time":2000000000}]}`
assert.Equal(t, expectedJSON, actualJSON)
})
@ -250,7 +269,7 @@ func TestGetSyncTaskMetrics(t *testing.T) {
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
actualJSON, err := svr.GetSyncTaskMetrics(ctx, req)
actualJSON, err := svr.getSyncTaskJSON(ctx, req)
assert.Error(t, err)
assert.Equal(t, "", actualJSON)
})
@ -278,7 +297,7 @@ func TestGetSyncTaskMetrics(t *testing.T) {
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
actualJSON, err := svr.GetSyncTaskMetrics(ctx, req)
actualJSON, err := svr.getSyncTaskJSON(ctx, req)
assert.Error(t, err)
assert.Equal(t, "", actualJSON)
})
@ -307,8 +326,360 @@ func TestGetSyncTaskMetrics(t *testing.T) {
svr.cluster = mockCluster
expectedJSON := ""
actualJSON, err := svr.GetSyncTaskMetrics(ctx, req)
actualJSON, err := svr.getSyncTaskJSON(ctx, req)
assert.NoError(t, err)
assert.Equal(t, expectedJSON, actualJSON)
})
}
func TestGetSegmentsJSON(t *testing.T) {
svr := Server{}
t.Run("ReturnsCorrectJSON", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
segments := []*metricsinfo.Segment{
{
SegmentID: 1,
CollectionID: 100,
PartitionID: 10,
NumOfRows: 1000,
State: "Flushed",
},
}
segmentsBytes, err := json.Marshal(segments)
assert.NoError(t, err)
expectedJSON := string(segmentsBytes)
mockResp := &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: expectedJSON,
}
mockClient := &mockMetricDataNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) {
return mockResp, nil
},
}
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return mockClient, nil
}
mockCluster := NewMockCluster(t)
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
actualJSON, err := svr.getSegmentsJSON(ctx, req)
assert.NoError(t, err)
assert.Equal(t, expectedJSON, actualJSON)
})
t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
mockClient := &mockMetricDataNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) {
return nil, errors.New("request failed")
},
}
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return mockClient, nil
}
mockCluster := NewMockCluster(t)
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
actualJSON, err := svr.getSegmentsJSON(ctx, req)
assert.Error(t, err)
assert.Equal(t, "", actualJSON)
})
t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
mockResp := &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: `invalid json`,
}
mockClient := &mockMetricDataNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) {
return mockResp, nil
},
}
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return mockClient, nil
}
mockCluster := NewMockCluster(t)
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
actualJSON, err := svr.getSegmentsJSON(ctx, req)
assert.Error(t, err)
assert.Equal(t, "", actualJSON)
})
t.Run("ReturnsEmptyJSONWhenNoSegments", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
mockResp := &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: "",
}
mockClient := &mockMetricDataNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) {
return mockResp, nil
},
}
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return mockClient, nil
}
mockCluster := NewMockCluster(t)
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
expectedJSON := ""
actualJSON, err := svr.getSegmentsJSON(ctx, req)
assert.NoError(t, err)
assert.Equal(t, expectedJSON, actualJSON)
})
}
func TestGetChannelsJSON(t *testing.T) {
svr := Server{}
t.Run("ReturnsCorrectJSON", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
channels := []*metricsinfo.Channel{
{
Name: "channel1",
CollectionID: 100,
NodeID: 1,
},
}
channelsBytes, err := json.Marshal(channels)
assert.NoError(t, err)
channelJSON := string(channelsBytes)
mockResp := &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: channelJSON,
}
mockClient := &mockMetricDataNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) {
return mockResp, nil
},
}
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return mockClient, nil
}
mockCluster := NewMockCluster(t)
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
svr.meta = &meta{channelCPs: newChannelCps()}
svr.meta.channelCPs.checkpoints["channel1"] = &msgpb.MsgPosition{Timestamp: 1000}
actualJSON, err := svr.getChannelsJSON(context.TODO(), req)
assert.NoError(t, err)
channels = []*metricsinfo.Channel{
{
Name: "channel1",
CollectionID: 100,
NodeID: 1,
CheckpointTS: typeutil.TimestampToString(1000),
},
}
channelsBytes, err = json.Marshal(channels)
assert.NoError(t, err)
expectedJSON := string(channelsBytes)
assert.Equal(t, expectedJSON, actualJSON)
})
t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
mockClient := &mockMetricDataNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) {
return nil, errors.New("request failed")
},
}
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return mockClient, nil
}
mockCluster := NewMockCluster(t)
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
svr.meta = &meta{channelCPs: newChannelCps()}
actualJSON, err := svr.getChannelsJSON(ctx, req)
assert.Error(t, err)
assert.Equal(t, "", actualJSON)
})
t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
mockResp := &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: `invalid json`,
}
mockClient := &mockMetricDataNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) {
return mockResp, nil
},
}
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return mockClient, nil
}
mockCluster := NewMockCluster(t)
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
svr.meta = &meta{channelCPs: newChannelCps()}
actualJSON, err := svr.getChannelsJSON(ctx, req)
assert.Error(t, err)
assert.Equal(t, "", actualJSON)
})
t.Run("ReturnsEmptyJSONWhenNoChannels", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
mockResp := &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: "",
}
mockClient := &mockMetricDataNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) {
return mockResp, nil
},
}
dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return mockClient, nil
}
mockCluster := NewMockCluster(t)
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster
svr.meta = &meta{channelCPs: newChannelCps()}
expectedJSON := ""
actualJSON, err := svr.getChannelsJSON(ctx, req)
assert.NoError(t, err)
assert.Equal(t, expectedJSON, actualJSON)
})
}
func TestGetDistJSON(t *testing.T) {
svr := Server{}
nodeID := paramtable.GetNodeID()
paramtable.SetNodeID(1)
defer paramtable.SetNodeID(nodeID)
t.Run("ReturnsCorrectJSON", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
svr.meta = &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "channel1",
Level: datapb.SegmentLevel_L1,
State: commonpb.SegmentState_Flushed,
},
},
},
},
}
cm := NewMockChannelManager(t)
cm.EXPECT().GetChannelWatchInfos().Return(map[int64]map[string]*datapb.ChannelWatchInfo{
1: {
"channel1": {
State: datapb.ChannelWatchState_ToWatch,
Vchan: &datapb.VchannelInfo{
ChannelName: "channel1",
},
},
},
})
svr.channelManager = cm
segments := []*metricsinfo.Segment{
{
SegmentID: 1,
State: commonpb.SegmentState_Flushed.String(),
CollectionID: 1,
PartitionID: 1,
Channel: "channel1",
Level: datapb.SegmentLevel_L1.String(),
NodeID: 1,
},
}
channels := []*metricsinfo.DmChannel{
{
ChannelName: "channel1",
NodeID: 1,
WatchState: datapb.ChannelWatchState_ToWatch.String(),
},
}
dist := &metricsinfo.DataCoordDist{
Segments: segments,
DMChannels: channels,
}
distBytes, err := json.Marshal(dist)
assert.NoError(t, err)
expectedJSON := string(distBytes)
actualJSON := svr.getDistJSON(ctx, req)
assert.Equal(t, expectedJSON, actualJSON)
})
t.Run("ReturnsEmptyJSONWhenNoDist", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
ctx := context.Background()
svr.meta = &meta{segments: &SegmentsInfo{segments: map[int64]*SegmentInfo{}}}
cm := NewMockChannelManager(t)
cm.EXPECT().GetChannelWatchInfos().Return(map[int64]map[string]*datapb.ChannelWatchInfo{})
svr.channelManager = cm
expectedJSON := ""
actualJSON := svr.getDistJSON(ctx, req)
assert.Equal(t, expectedJSON, actualJSON)
})
}

View File

@ -5,6 +5,7 @@ package datacoord
import (
context "context"
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
)
@ -308,6 +309,53 @@ func (_c *MockChannelManager_GetChannelNamesByCollectionID_Call) RunAndReturn(ru
return _c
}
// GetChannelWatchInfos provides a mock function with given fields:
func (_m *MockChannelManager) GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetChannelWatchInfos")
}
var r0 map[int64]map[string]*datapb.ChannelWatchInfo
if rf, ok := ret.Get(0).(func() map[int64]map[string]*datapb.ChannelWatchInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]map[string]*datapb.ChannelWatchInfo)
}
}
return r0
}
// MockChannelManager_GetChannelWatchInfos_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelWatchInfos'
type MockChannelManager_GetChannelWatchInfos_Call struct {
*mock.Call
}
// GetChannelWatchInfos is a helper method to define mock.On call
func (_e *MockChannelManager_Expecter) GetChannelWatchInfos() *MockChannelManager_GetChannelWatchInfos_Call {
return &MockChannelManager_GetChannelWatchInfos_Call{Call: _e.mock.On("GetChannelWatchInfos")}
}
func (_c *MockChannelManager_GetChannelWatchInfos_Call) Run(run func()) *MockChannelManager_GetChannelWatchInfos_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockChannelManager_GetChannelWatchInfos_Call) Return(_a0 map[int64]map[string]*datapb.ChannelWatchInfo) *MockChannelManager_GetChannelWatchInfos_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockChannelManager_GetChannelWatchInfos_Call) RunAndReturn(run func() map[int64]map[string]*datapb.ChannelWatchInfo) *MockChannelManager_GetChannelWatchInfos_Call {
_c.Call.Return(run)
return _c
}
// GetChannelsByCollectionID provides a mock function with given fields: collectionID
func (_m *MockChannelManager) GetChannelsByCollectionID(collectionID int64) []RWChannel {
ret := _m.Called(collectionID)

View File

@ -1141,6 +1141,11 @@ func (s *Server) registerMetricsRequest() {
return s.getSystemInfoMetrics(ctx, req)
})
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataDist,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.getDistJSON(ctx, req), nil
})
s.metricsRequest.RegisterMetricsRequest(metricsinfo.ImportTasks,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.importMeta.TaskStatsJSON(), nil
@ -1158,8 +1163,19 @@ func (s *Server) registerMetricsRequest() {
s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.GetSyncTaskMetrics(ctx, req)
return s.getSyncTaskJSON(ctx, req)
})
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.getSegmentsJSON(ctx, req)
})
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.getChannelsJSON(ctx, req)
})
log.Info("register metrics actions finished")
}

View File

@ -22,8 +22,8 @@ import (
"strconv"
"sync"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/indexpb"

View File

@ -284,10 +284,21 @@ func (node *DataNode) registerMetricsRequest() {
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.getSystemInfoMetrics(ctx, req)
})
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.syncMgr.TaskStatsJSON(), nil
})
node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.flowgraphManager.GetSegmentsJSON(), nil
})
node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.flowgraphManager.GetChannelsJSON(), nil
})
log.Info("register metrics actions finished")
}

View File

@ -87,6 +87,14 @@ func (s *SegmentInfo) Level() datapb.SegmentLevel {
return s.level
}
func (s *SegmentInfo) BufferRows() int64 {
return s.bufferRows
}
func (s *SegmentInfo) SyncingRows() int64 {
return s.syncingRows
}
func (s *SegmentInfo) Clone() *SegmentInfo {
return &SegmentInfo{
segmentID: s.segmentID,

View File

@ -18,6 +18,7 @@ package pipeline
import (
"context"
"encoding/json"
"fmt"
"go.uber.org/zap"
@ -25,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -40,6 +42,8 @@ type FlowgraphManager interface {
GetFlowgraphCount() int
GetCollectionIDs() []int64
GetChannelsJSON() string
GetSegmentsJSON() string
Close()
}
@ -115,6 +119,59 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
return collectionSet.Collect()
}
// GetChannelsJSON returns all channels in json format.
func (fm *fgManagerImpl) GetChannelsJSON() string {
var channels []*metricsinfo.Channel
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch)
channels = append(channels, &metricsinfo.Channel{
Name: ch,
WatchState: ds.fg.Status(),
LatestTimeTick: typeutil.TimestampToString(latestTimeTick),
NodeID: paramtable.GetNodeID(),
CollectionID: ds.metacache.Collection(),
})
return true
})
ret, err := json.Marshal(channels)
if err != nil {
log.Warn("failed to marshal channels", zap.Error(err))
return ""
}
return string(ret)
}
func (fm *fgManagerImpl) GetSegmentsJSON() string {
var segments []*metricsinfo.Segment
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
meta := ds.metacache
for _, segment := range meta.GetSegmentsBy() {
segments = append(segments, &metricsinfo.Segment{
SegmentID: segment.SegmentID(),
CollectionID: meta.Collection(),
PartitionID: segment.PartitionID(),
Channel: ch,
State: segment.State().String(),
Level: segment.Level().String(),
NodeID: paramtable.GetNodeID(),
NumOfRows: segment.NumOfRows(),
FlushedRows: segment.FlushedRows(),
SyncBufferRows: segment.BufferRows(),
SyncingRows: segment.SyncingRows(),
})
}
return true
})
ret, err := json.Marshal(segments)
if err != nil {
log.Warn("failed to marshal segments", zap.Error(err))
return ""
}
return string(ret)
}
func (fm *fgManagerImpl) Close() {
fm.cancelFunc()
}

View File

@ -18,8 +18,8 @@ package pipeline
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"testing"
@ -30,15 +30,20 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestMain(t *testing.M) {
@ -98,7 +103,7 @@ func TestFlowGraphManager(t *testing.T) {
}
func generateChannelWatchInfo() *datapb.ChannelWatchInfo {
collectionID := int64(rand.Uint32())
collectionID := int64(1)
dmChannelName := fmt.Sprintf("%s_%d", "fake-ch-", collectionID)
schema := &schemapb.CollectionSchema{
Name: fmt.Sprintf("%s_%d", "collection_", collectionID),
@ -124,3 +129,105 @@ func generateChannelWatchInfo() *datapb.ChannelWatchInfo {
Schema: schema,
}
}
type mockTimeSender struct{}
func (m *mockTimeSender) Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats) {
panic("implement me")
}
func (m *mockTimeSender) GetLatestTimestamp(channel string) typeutil.Timestamp {
return 0
}
func newFlowGraphManager(t *testing.T) (string, FlowgraphManager) {
mockBroker := broker.NewMockBroker(t)
mockBroker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
mockBroker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
mockBroker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
wbm := writebuffer.NewMockBufferManager(t)
wbm.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
dispClient := msgdispatcher.NewMockClient(t)
dispClient.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(make(chan *msgstream.MsgPack), nil)
pipelineParams := &util.PipelineParams{
Ctx: context.TODO(),
Session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}},
Broker: mockBroker,
TimeTickSender: &mockTimeSender{},
DispClient: dispClient,
WriteBufferManager: wbm,
}
chanWatchInfo := generateChannelWatchInfo()
ds, err := NewDataSyncService(
context.TODO(),
pipelineParams,
chanWatchInfo,
util.NewTickler(),
)
assert.NoError(t, err)
fm := NewFlowgraphManager()
fm.AddFlowgraph(ds)
return ds.vchannelName, fm
}
func TestGetChannelsJSON(t *testing.T) {
paramtable.SetNodeID(1)
_, fm := newFlowGraphManager(t)
obj := []*metricsinfo.Channel{
{
Name: "fake-ch-_1",
WatchState: "Healthy",
LatestTimeTick: typeutil.TimestampToString(0),
NodeID: paramtable.GetNodeID(),
CollectionID: 1,
},
}
expectedBytes, err := json.Marshal(obj)
assert.NoError(t, err)
expectedJSON := string(expectedBytes)
jsonResult := fm.GetChannelsJSON()
assert.JSONEq(t, expectedJSON, jsonResult)
}
func TestGetSegmentJSON(t *testing.T) {
ch, fm := newFlowGraphManager(t)
ds, ok := fm.GetFlowgraphService(ch)
assert.True(t, ok)
nodeID := paramtable.GetNodeID()
paramtable.SetNodeID(1)
defer paramtable.SetNodeID(nodeID)
pkStatsFactory := func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
}
segment := &datapb.SegmentInfo{
ID: 1,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 10240,
CollectionID: 1,
}
s := metrics.NewSegmentFrom(segment)
s.NodeID = 1
s.Channel = "fake-ch-_1"
s.FlushedRows = 10240
expectedBytes, err := json.Marshal([]*metricsinfo.Segment{s})
assert.NoError(t, err)
expectedJSON := string(expectedBytes)
ds.metacache.AddSegment(segment, pkStatsFactory, metacache.NoneBm25StatsFactory)
jsonResult := fm.GetSegmentsJSON()
fmt.Println(jsonResult)
assert.JSONEq(t, expectedJSON, jsonResult)
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.46.0. DO NOT EDIT.
package pipeline
@ -114,10 +114,59 @@ func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgra
return _c
}
// GetChannelsJSON provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetChannelsJSON() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetChannelsJSON")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockFlowgraphManager_GetChannelsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelsJSON'
type MockFlowgraphManager_GetChannelsJSON_Call struct {
*mock.Call
}
// GetChannelsJSON is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) GetChannelsJSON() *MockFlowgraphManager_GetChannelsJSON_Call {
return &MockFlowgraphManager_GetChannelsJSON_Call{Call: _e.mock.On("GetChannelsJSON")}
}
func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Run(run func()) *MockFlowgraphManager_GetChannelsJSON_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Return(_a0 string) *MockFlowgraphManager_GetChannelsJSON_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetChannelsJSON_Call {
_c.Call.Return(run)
return _c
}
// GetCollectionIDs provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetCollectionIDs")
}
var r0 []int64
if rf, ok := ret.Get(0).(func() []int64); ok {
r0 = rf()
@ -161,6 +210,10 @@ func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() []
func (_m *MockFlowgraphManager) GetFlowgraphCount() int {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetFlowgraphCount")
}
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
@ -202,6 +255,10 @@ func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) RunAndReturn(run func() i
func (_m *MockFlowgraphManager) GetFlowgraphService(channel string) (*DataSyncService, bool) {
ret := _m.Called(channel)
if len(ret) == 0 {
panic("no return value specified for GetFlowgraphService")
}
var r0 *DataSyncService
var r1 bool
if rf, ok := ret.Get(0).(func(string) (*DataSyncService, bool)); ok {
@ -252,10 +309,59 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s
return _c
}
// GetSegmentsJSON provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetSegmentsJSON() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetSegmentsJSON")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockFlowgraphManager_GetSegmentsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentsJSON'
type MockFlowgraphManager_GetSegmentsJSON_Call struct {
*mock.Call
}
// GetSegmentsJSON is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) GetSegmentsJSON() *MockFlowgraphManager_GetSegmentsJSON_Call {
return &MockFlowgraphManager_GetSegmentsJSON_Call{Call: _e.mock.On("GetSegmentsJSON")}
}
func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Run(run func()) *MockFlowgraphManager_GetSegmentsJSON_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Return(_a0 string) *MockFlowgraphManager_GetSegmentsJSON_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetSegmentsJSON_Call {
_c.Call.Return(run)
return _c
}
// HasFlowgraph provides a mock function with given fields: channel
func (_m *MockFlowgraphManager) HasFlowgraph(channel string) bool {
ret := _m.Called(channel)
if len(ret) == 0 {
panic("no return value specified for HasFlowgraph")
}
var r0 bool
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(channel)
@ -298,6 +404,10 @@ func (_c *MockFlowgraphManager_HasFlowgraph_Call) RunAndReturn(run func(string)
func (_m *MockFlowgraphManager) HasFlowgraphWithOpID(channel string, opID int64) bool {
ret := _m.Called(channel, opID)
if len(ret) == 0 {
panic("no return value specified for HasFlowgraphWithOpID")
}
var r0 bool
if rf, ok := ret.Get(0).(func(string, int64) bool); ok {
r0 = rf(channel, opID)

View File

@ -422,10 +422,11 @@ func (t *SyncTask) MarshalJSON() ([]byte, error) {
SegmentID: t.segmentID,
BatchRows: t.batchRows,
SegmentLevel: t.level.String(),
TsFrom: t.tsFrom,
TsTo: t.tsTo,
TSFrom: t.tsFrom,
TSTo: t.tsTo,
DeltaRowCount: t.deltaRowCount,
FlushSize: t.flushedSize,
RunningTime: t.execTime,
RunningTime: t.execTime.String(),
NodeID: paramtable.GetNodeID(),
})
}

View File

@ -35,10 +35,12 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -383,7 +385,7 @@ func (s *SyncTaskSuite) TestNextID() {
}
func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() {
task := &SyncTask{
t := &SyncTask{
segmentID: 12345,
batchRows: 100,
level: datapb.SegmentLevel_L0,
@ -394,18 +396,22 @@ func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() {
execTime: 2 * time.Second,
}
expectedJSON := `{
"segment_id": 12345,
"batch_rows": 100,
"segment_level": "L0",
"ts_from": 1000,
"ts_to": 2000,
"delta_row_count": 10,
"flush_size": 1024,
"running_time": 2000000000
}`
tm := &metricsinfo.SyncTask{
SegmentID: t.segmentID,
BatchRows: t.batchRows,
SegmentLevel: t.level.String(),
TSFrom: t.tsFrom,
TSTo: t.tsTo,
DeltaRowCount: t.deltaRowCount,
FlushSize: t.flushedSize,
RunningTime: t.execTime.String(),
NodeID: paramtable.GetNodeID(),
}
expectedBytes, err := json.Marshal(tm)
s.NoError(err)
expectedJSON := string(expectedBytes)
data, err := task.MarshalJSON()
data, err := t.MarshalJSON()
s.NoError(err)
s.JSONEq(expectedJSON, string(data))
}

View File

@ -36,6 +36,7 @@ import (
type StatsUpdater interface {
Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats)
GetLatestTimestamp(channel string) typeutil.Timestamp
}
// TimeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically
@ -126,6 +127,17 @@ func (m *TimeTickSender) Update(channelName string, timestamp uint64, segStats [
m.statsCache[channelName].lastTs = timestamp
}
func (m *TimeTickSender) GetLatestTimestamp(channel string) typeutil.Timestamp {
m.mu.RLock()
defer m.mu.RUnlock()
chStats, ok := m.statsCache[channel]
if !ok {
log.Warn("channel not found in TimeTickSender", zap.String("channel", channel))
return 0
}
return chStats.lastTs
}
func (m *TimeTickSender) assembleDatanodeTtMsg() ([]*msgpb.DataNodeTtMsg, map[string]uint64) {
m.mu.RLock()
defer m.mu.RUnlock()

View File

@ -65,19 +65,46 @@ const (
// for WebUI restful api root path
const (
ClusterInfoPath = "/_cluster/info"
ClusterConfigsPath = "/_cluster/configs"
ClusterClientsPath = "/_cluster/clients"
// ClusterInfoPath is the path to get cluster information.
ClusterInfoPath = "/_cluster/info"
// ClusterConfigsPath is the path to get cluster configurations.
ClusterConfigsPath = "/_cluster/configs"
// ClusterClientsPath is the path to get connected clients.
ClusterClientsPath = "/_cluster/clients"
// ClusterDependenciesPath is the path to get cluster dependencies.
ClusterDependenciesPath = "/_cluster/dependencies"
HookConfigsPath = "/_hook/configs"
QCoordSegmentsPath = "/_qcoord/segments"
QCoordChannelsPath = "/_qcoord/channels"
QCoordAllTasksPath = "/_qcoord/tasks/all"
// HookConfigsPath is the path to get hook configurations.
HookConfigsPath = "/_hook/configs"
DCoordAllTasksPath = "/_dcoord/tasks/all"
DCoordImportTasksPath = "/_dcoord/tasks/import"
DCoordCompactionTasksPath = "/_dcoord/tasks/compaction"
DCoordBuildIndexTasksPath = "/_dcoord/tasks/build_index"
// QCDistPath is the path to get QueryCoord distribution.
QCDistPath = "/_qc/dist"
// QCTargetPath is the path to get QueryCoord target.
QCTargetPath = "/_qc/target"
// QCReplicaPath is the path to get QueryCoord replica.
QCReplicaPath = "/_qc/replica"
// QCResourceGroupPath is the path to get QueryCoord resource group.
QCResourceGroupPath = "/_qc/resource_group"
// QCAllTasksPath is the path to get all tasks in QueryCoord.
QCAllTasksPath = "/_qc/tasks"
DNodeSyncTasksPath = "/_dnode/tasks/sync"
// QNSegmentsPath is the path to get segments in QueryNode.
QNSegmentsPath = "/_qn/segments"
// QNChannelsPath is the path to get channels in QueryNode.
QNChannelsPath = "/_qn/channels"
// DCDistPath is the path to get all segments and channels distribution in DataCoord.
DCDistPath = "/_dc/dist"
// DCImportTasksPath is the path to get import tasks in DataCoord.
DCImportTasksPath = "/_dc/tasks/import"
// DCCompactionTasksPath is the path to get compaction tasks in DataCoord.
DCCompactionTasksPath = "/_dc/tasks/compaction"
// DCBuildIndexTasksPath is the path to get build index tasks in DataCoord.
DCBuildIndexTasksPath = "/_dc/tasks/build_index"
// DNSyncTasksPath is the path to get sync tasks in DataNode.
DNSyncTasksPath = "/_dn/tasks/sync"
// DNSegmentsPath is the path to get segments in DataNode.
DNSegmentsPath = "/_dn/segments"
// DNChannelsPath is the path to get channels in DataNode.
DNChannelsPath = "/_dn/channels"
)

View File

@ -7,9 +7,6 @@
</button> <a class="navbar-brand active" href="index.html">Home</a>
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="navbar-nav">
<li class="nav-item">
<a class="nav-link" href="nodes.html">Nodes</a>
</li>
<li class="nav-item">
<a class="nav-link" href="collections.html">Collections</a>
</li>

View File

@ -30,7 +30,29 @@
<h2>
Component Information
</h2>
<table id="components" class="table table-hover"></table>
<!-- Navigation Tabs -->
<ul class="nav nav-tabs" id="componentTabs" role="tablist">
<li class="nav-item">
<a class="nav-link active" id="base-stats-tab" data-toggle="tab" href="#base-stats" role="tab" aria-controls="base-stats" aria-selected="true">Base Stats</a>
</li>
<li class="nav-item">
<a class="nav-link" id="runtime-metrics-tab" data-toggle="tab" href="#runtime-metrics" role="tab" aria-controls="runtime-metrics" aria-selected="false">Runtime Metrics</a>
</li>
</ul>
<!-- Tab Content -->
<div class="tab-content" id="componentTabsContent">
<!-- Components Table (Base Stats) -->
<div class="tab-pane fade show active" id="base-stats" role="tabpanel" aria-labelledby="base-stats-tab">
<table id="components" class="table table-hover mt-3"></table>
</div>
<!-- Node Metrics Table (Runtime Metrics) -->
<div class="tab-pane fade" id="runtime-metrics" role="tabpanel" aria-labelledby="runtime-metrics-tab">
<table id="nodeMetrics" class="table table-hover mt-3"></table>
</div>
</div>
<h2>
Connected Clients
@ -58,6 +80,7 @@
.then(data => {
renderSysInfo(data);
renderComponentInfo(data);
renderNodesMetrics(data)
})
.catch(error => {
handleError(error);

View File

@ -1,62 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Milvus WebUI - Nodes</title>
<meta name="description" content="Milvus Management WebUI">
<link href="./static/css/bootstrap.min.css" rel="stylesheet">
<link href="./static/css/style.css" rel="stylesheet">
<script src="./static/js/jquery.min.js"></script>
<script src="./static/js/bootstrap.min.js"></script>
<script src="./static/js/bootstrap.bundle.min.js"></script>
<script src="./static/js/render.js"></script>
<script src="./static/js/common.js"></script>
<script src="./static/js/mockdata.js"></script>
</head>
<body>
<div class="container-fluid">
<div id="header"></div>
<div class="row">
<div class="col-md-2"></div>
<div class="col-md-8">
<h2>
Activity Metrics
</h2>
<table id="nodeMetrics" class="table table-hover"></table>
<h2>
Replica
</h2>
<table id="replica" class="table table-hover"></table>
<h2>
Resource Group
</h2>
<table id="rg" class="table table-hover"></table>
</div>
<div class="col-md-2"></div>
<div id="footer"></div>
</div>
</div>
<script>
$(document).ready(function(){
$('#header').load("header.html");
$('#footer').load("footer.html");
});
// load nodes information data
document.addEventListener("DOMContentLoaded", function() {
fetchData(MILVUS_URI + "/_cluster/info", sysmetrics)
.then(data => {
renderNodesMetrics(data)
})
.catch(error => {
handleError(error);
});
});
</script>
</body>
</html>

View File

@ -412,6 +412,179 @@ var mconfigs = `
}
`;
var qcTargets = `
[
{
"collection_id": 1,
"segments": [
{
"segment_id": 1,
"collection_id": 1,
"partition_id": 1,
"channel": "channel1",
"num_of_rows": 1000,
"state": "Sealed",
"is_importing": false,
"compacted": false,
"level": "L0",
"is_sorted": true,
"node_id": 1,
"is_invisible": false,
"loaded_timestamp": 1633072800,
"index": [
{
"field_id": 1,
"index_id": 1,
"build_id": 1,
"index_size": 1024,
"is_loaded": true
}
],
"resource_group": "rg1",
"loaded_insert_row_count": 1000,
"mem_size": 2048,
}
],
"dm_channels": [
{
"node_id": 1,
"version": 1,
"collection_id": 1,
"channel_name": "channel1",
"unflushed_segment_ids": [1],
"flushed_segment_ids": [2],
"dropped_segment_ids": [3],
"level_zero_segment_ids": [4],
"partition_stats_versions": {
"1": 1
}
}
]
}
]
`
var qcDist =`
{
"segments": [
{
"segment_id": 1,
"collection_id": 1,
"partition_id": 1,
"channel": "channel1",
"num_of_rows": 1000,
"state": "Sealed",
"is_importing": false,
"compacted": false,
"level": "L0",
"is_sorted": true,
"node_id": 1,
"is_invisible": false,
"loaded_timestamp": 1633072800,
"index": [
{
"field_id": 1,
"index_id": 1,
"build_id": 1,
"index_size": 1024,
"is_loaded": true
}
],
"resource_group": "rg1",
"loaded_insert_row_count": 1000,
"mem_size": 2048,
}
],
"dm_channels": [
{
"node_id": 1,
"version": 1,
"collection_id": 1,
"channel_name": "channel1",
"unflushed_segment_ids": [1],
"flushed_segment_ids": [2],
"dropped_segment_ids": [3],
"level_zero_segment_ids": [4],
"partition_stats_versions": {
"1": 1
}
}
],
"leader_views": [
{
"node_id": 1,
"collection_id": 1,
"channel_name": "channel1",
"segments": [
{
"segment_id": 1,
"partition_id": 1,
"num_of_rows": 1000,
"state": "Sealed",
"is_importing": false,
"compacted": false,
"level": "L0",
"is_sorted": true,
"node_id": 1,
"is_invisible": false,
"loaded_timestamp": 1633072800,
"index": [
{
"field_id": 1,
"index_id": 1,
"build_id": 1,
"index_size": 1024,
"is_loaded": true
}
],
"resource_group": "rg1",
"loaded_insert_row_count": 1000,
"mem_size": 2048,
}
]
}
]
}
`
var qcReplica = `
[
{
"ID": 1,
"CollectionID": 1,
"RWNodes": [1, 2],
"ResourceGroup": "rg1",
"RONodes": [3],
"ChannelToRWNodes": {
"channel1": [1, 2]
}
},
{
"ID": 2,
"CollectionID": 2,
"RWNodes": [4, 5],
"ResourceGroup": "rg2",
"RONodes": [6],
"ChannelToRWNodes": {
"channel2": [4, 5]
}
}
]
`
var qcResourceGroup = `
[
{
"Name": "rg1",
"Nodes": [1, 2]
},
{
"Name": "rg2",
"Nodes": [3, 4]
}
]
`
var qcTasks = `
[
{
@ -456,6 +629,119 @@ var qcTasks = `
]
`
var qn_segments = `
[
{
"segment_id": 1,
"collection_id": 1,
"partition_id": 1,
"channel": "channel1",
"num_of_rows": 1000,
"state": "Sealed",
"is_importing": false,
"compacted": false,
"level": "L1",
"is_sorted": true,
"node_id": 1,
"is_invisible": false,
"loaded_timestamp": 1620000000,
"index": [
{
"field_id": 1,
"index_id": 1,
"build_id": 1,
"index_size": 1024,
"is_loaded": true
}
],
"resource_group": "rg1",
"loaded_insert_row_count": 1000,
"mem_size": 2048,
},
{
"segment_id": 2,
"collection_id": 2,
"partition_id": 2,
"channel": "channel2",
"num_of_rows": 2000,
"state": "Sealed",
"is_importing": false,
"compacted": false,
"level": "L2",
"is_sorted": true,
"node_id": 2,
"is_invisible": false,
"loaded_timestamp": 1620000001,
"index": [
{
"field_id": 2,
"index_id": 2,
"build_id": 2,
"index_size": 2048,
"is_loaded": true
}
],
"resource_group": "rg2",
"loaded_insert_row_count": 2000,
"mem_size": 4096,
}
]
`
var qn_channels = `
[
{
"name": "channel1",
"watch_state": "Healthy",
"assign_state": "assigned",
"latest_time_tick": "2023-10-01 12:00:00",
"node_id": 1,
"collection_id": 1,
},
{
"name": "channel2",
"watch_state": "Healthy",
"assign_state": "assigned",
"latest_time_tick": "2023-10-01 12:05:00",
"node_id": 2,
"collection_id": 2,
}
]
`
var dc_dist = `
{
"segments": [
{
"segment_id": 1,
"collection_id": 100,
"partition_id": 10,
"channel": "channel1",
"num_of_rows": 1000,
"state": "flushed",
"is_importing": false,
"compacted": false,
"level": "L1",
"is_sorted": true,
"node_id": 1
}
],
"dm_channels": [
{
"node_id": 1,
"version": 1,
"collection_id": 100,
"channel_name": "channel1",
"unflushed_segment_ids": [1, 2, 3],
"flushed_segment_ids": [4, 5, 6],
"dropped_segment_ids": [7, 8, 9],
"watch_state": "success",
"start_watch_ts": 123456789
}
]
}
`
var dc_build_index_task = `
[
{
@ -485,30 +771,31 @@ var dc_compaction_task = `
[
{
"plan_id": 1,
"collection_id": 1001,
"collection_id": 1,
"type": "Merge",
"state": "Completed",
"start_time": 1633036800,
"end_time": 1633040400,
"total_rows": 100000,
"fail_reason": "",
"start_time": 1620000000,
"end_time": 1620003600,
"total_rows": 10000,
"input_segments": [1, 2, 3],
"result_segments": [4]
},
{
"plan_id": 2,
"collection_id": 1002,
"collection_id": 2,
"type": "Merge",
"state": "Failed",
"fail_reason": "Disk full",
"start_time": 1633123200,
"end_time": 1633126800,
"total_rows": 200000,
"start_time": 1620007200,
"end_time": 1620010800,
"total_rows": 20000,
"input_segments": [5, 6, 7],
"result_segments": [8]
"result_segments": []
}
]`
var dc_sync_task = `
var dn_sync_task = `
[
{
"segment_id": 1,
@ -518,7 +805,8 @@ var dc_sync_task = `
"ts_to": 1633040400,
"delta_row_count": 10,
"flush_size": 1024,
"running_time": 100000000
"running_time": "100000000",
"node_id": 1
},
{
"segment_id": 2,
@ -528,7 +816,8 @@ var dc_sync_task = `
"ts_to": 1633126800,
"delta_row_count": 20,
"flush_size": 2048,
"running_time": 200000000
"running_time": "200000000",
"node_id": 2
}
]
`
@ -569,4 +858,64 @@ var dc_import_task = `
"complete_time": "2023-10-01T01:00:00Z"
}
]
`
var dn_segments = `
[
{
"segment_id": 1,
"collection_id": 1,
"partition_id": 1,
"channel": "channel1",
"num_of_rows": 1000,
"state": "active",
"is_importing": false,
"compacted": false,
"level": "L1",
"is_sorted": true,
"node_id": 1,
"flushed_rows": 1000,
"sync_buffer_rows": 0,
"syncing_rows": 0
},
{
"segment_id": 2,
"collection_id": 2,
"partition_id": 2,
"channel": "channel2",
"num_of_rows": 2000,
"state": "inactive",
"is_importing": true,
"compacted": true,
"level": "L2",
"is_sorted": false,
"node_id": 2,
"flushed_rows": 2000,
"sync_buffer_rows": 100,
"syncing_rows": 50
}
]
`
var dn_channels = `
[
{
"name": "channel1",
"watch_state": "Healthy",
"assign_state": "assigned",
"latest_time_tick": "2023-10-01 12:00:00",
"node_id": 1,
"collection_id": 1,
"check_point_ts": "2023-10-01 12:00:00"
},
{
"name": "channel2",
"watch_state": "Healthy",
"assign_state": "assigned",
"latest_time_tick": "2023-10-01 12:05:00",
"node_id": 2,
"collection_id": 2,
"check_point_ts": "2023-10-01 12:05:00"
}
]
`

View File

@ -6503,26 +6503,34 @@ func DeregisterSubLabel(subLabel string) {
// RegisterRestRouter registers the router for the proxy
func (node *Proxy) RegisterRestRouter(router gin.IRouter) {
// Cluster request
// Cluster request that executed by proxy
router.GET(http.ClusterInfoPath, getClusterInfo(node))
router.GET(http.ClusterConfigsPath, getConfigs(paramtable.Get().GetAll()))
router.GET(http.ClusterClientsPath, getConnectedClients)
router.GET(http.ClusterDependenciesPath, getDependencies)
// Hook request
// Hook request that executed by proxy
router.GET(http.HookConfigsPath, getConfigs(paramtable.GetHookParams().GetAll()))
// QueryCoord request
router.GET(http.QCoordSegmentsPath, getQueryComponentMetrics(node, metricsinfo.QuerySegmentDist))
router.GET(http.QCoordChannelsPath, getQueryComponentMetrics(node, metricsinfo.QueryChannelDist))
router.GET(http.QCoordAllTasksPath, getQueryComponentMetrics(node, metricsinfo.QueryCoordAllTasks))
// QueryCoord requests that are forwarded from proxy
router.GET(http.QCTargetPath, getQueryComponentMetrics(node, metricsinfo.QueryTarget))
router.GET(http.QCDistPath, getQueryComponentMetrics(node, metricsinfo.QueryDist))
router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.QueryReplicas))
router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.QueryResourceGroups))
router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.QueryCoordAllTasks))
// DataCoord request
router.GET(http.DCoordAllTasksPath, getDataComponentMetrics(node, metricsinfo.DataCoordAllTasks))
router.GET(http.DCoordCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTasks))
router.GET(http.DCoordImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTasks))
router.GET(http.DCoordBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTasks))
// QueryNode requests that are forwarded from querycoord
router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.QuerySegments))
router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.QueryChannels))
// Datanode request
router.GET(http.DNodeSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTasks))
// DataCoord requests that are forwarded from proxy
router.GET(http.DCDistPath, getDataComponentMetrics(node, metricsinfo.DataDist))
router.GET(http.DCCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTasks))
router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTasks))
router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTasks))
// Datanode requests that are forwarded from datacoord
router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTasks))
router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.DataSegments))
router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.DataChannels))
}

View File

@ -1880,14 +1880,14 @@ func TestRegisterRestRouter(t *testing.T) {
path string
statusCode int
}{
{path: mhttp.QCoordSegmentsPath, statusCode: http.StatusInternalServerError},
{path: mhttp.QCoordChannelsPath, statusCode: http.StatusInternalServerError},
{path: mhttp.QCoordAllTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DNodeSyncTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DCoordCompactionTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DCoordImportTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DCoordBuildIndexTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DNodeSyncTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.QCTargetPath, statusCode: http.StatusInternalServerError},
{path: mhttp.QCDistPath, statusCode: http.StatusInternalServerError},
{path: mhttp.QCAllTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DNSyncTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DCCompactionTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DCImportTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DCBuildIndexTasksPath, statusCode: http.StatusInternalServerError},
{path: mhttp.DNSyncTasksPath, statusCode: http.StatusInternalServerError},
}
for _, tt := range tests {

View File

@ -18,12 +18,14 @@ package querycoordv2
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -34,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -242,6 +245,52 @@ func (s *Server) balanceChannels(ctx context.Context,
return nil
}
func getMetrics[T any](ctx context.Context, s *Server, req *milvuspb.GetMetricsRequest) ([]T, error) {
var metrics []T
var mu sync.Mutex
errorGroup, ctx := errgroup.WithContext(ctx)
for _, node := range s.nodeMgr.GetAll() {
node := node
errorGroup.Go(func() error {
resp, err := s.cluster.GetMetrics(ctx, node.ID(), req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to get metric from QueryNode", zap.Int64("nodeID", node.ID()))
return err
}
if resp.Response == "" {
return nil
}
infos := make([]T, 0)
err = json.Unmarshal([]byte(resp.Response), &infos)
if err != nil {
log.Warn("invalid metrics of query node was found", zap.Error(err))
return err
}
mu.Lock()
metrics = append(metrics, infos...)
mu.Unlock()
return nil
})
}
err := errorGroup.Wait()
return metrics, err
}
func (s *Server) getChannelsFromQueryNode(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
channels, err := getMetrics[*metricsinfo.Channel](ctx, s, req)
return metricsinfo.MarshalGetMetricsValues(channels, err)
}
func (s *Server) getSegmentsFromQueryNode(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
segments, err := getMetrics[*metricsinfo.Segment](ctx, s, req)
return metricsinfo.MarshalGetMetricsValues(segments, err)
}
// TODO(dragondriver): add more detail metrics
func (s *Server) getSystemInfoMetrics(
ctx context.Context,

View File

@ -0,0 +1,108 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoordv2
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
)
func TestGetChannelsFromQueryNode(t *testing.T) {
mockCluster := session.NewMockCluster(t)
nodeManager := session.NewNodeManager()
nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1}))
server := &Server{cluster: mockCluster, nodeMgr: nodeManager}
req := &milvuspb.GetMetricsRequest{}
expectedChannels := []*metricsinfo.Channel{
{
Name: "channel1",
WatchState: "Healthy",
LatestTimeTick: "1",
NodeID: int64(1),
CollectionID: int64(100),
},
{
Name: "channel2",
WatchState: "Healthy",
LatestTimeTick: "2",
NodeID: int64(2),
CollectionID: int64(200),
},
}
resp := &milvuspb.GetMetricsResponse{
Response: func() string {
data, _ := json.Marshal(expectedChannels)
return string(data)
}(),
}
mockCluster.EXPECT().GetMetrics(mock.Anything, mock.Anything, req).Return(resp, nil)
result, err := server.getChannelsFromQueryNode(context.Background(), req)
assert.NoError(t, err)
var actualChannels []*metricsinfo.Channel
err = json.Unmarshal([]byte(result), &actualChannels)
assert.NoError(t, err)
assert.Equal(t, expectedChannels, actualChannels)
}
func TestGetSegmentsFromQueryNode(t *testing.T) {
mockCluster := session.NewMockCluster(t)
nodeManager := session.NewNodeManager()
nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1}))
server := &Server{cluster: mockCluster, nodeMgr: nodeManager}
expectedSegments := []*metricsinfo.Segment{
{
SegmentID: 1,
PartitionID: 1,
Channel: "channel1",
ResourceGroup: "default",
MemSize: int64(1024),
LoadedInsertRowCount: 100,
},
{
SegmentID: 2,
PartitionID: 1,
Channel: "channel2",
ResourceGroup: "default",
MemSize: int64(1024),
LoadedInsertRowCount: 200,
},
}
resp := &milvuspb.GetMetricsResponse{
Response: func() string {
data, _ := json.Marshal(expectedSegments)
return string(data)
}(),
}
req := &milvuspb.GetMetricsRequest{}
mockCluster.EXPECT().GetMetrics(mock.Anything, mock.Anything, req).Return(resp, nil)
result, err := server.getSegmentsFromQueryNode(context.Background(), req)
assert.NoError(t, err)
var actualSegments []*metricsinfo.Segment
err = json.Unmarshal([]byte(result), &actualSegments)
assert.NoError(t, err)
assert.Equal(t, expectedSegments, actualSegments)
}

View File

@ -23,6 +23,8 @@ import (
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -130,6 +132,13 @@ func (channel *DmChannel) Clone() *DmChannel {
}
}
func newDmChannelMetricsFrom(channel *DmChannel) *metricsinfo.DmChannel {
dmChannel := metrics.NewDMChannelFrom(channel.VchannelInfo)
dmChannel.NodeID = channel.Node
dmChannel.Version = channel.Version
return dmChannel
}
type nodeChannels struct {
channels []*DmChannel
// collection id => channels
@ -290,3 +299,16 @@ func (m *ChannelDistManager) updateCollectionIndex() {
}
}
}
func (m *ChannelDistManager) GetChannelDist() []*metricsinfo.DmChannel {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
var channels []*metricsinfo.DmChannel
for _, nodeChannels := range m.channels {
for _, channel := range nodeChannels.channels {
channels = append(channels, newDmChannelMetricsFrom(channel))
}
}
return channels
}

View File

@ -19,10 +19,12 @@ package meta
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -185,3 +187,42 @@ func (suite *ChannelDistManagerSuite) AssertCollection(channels []*DmChannel, co
func TestChannelDistManager(t *testing.T) {
suite.Run(t, new(ChannelDistManagerSuite))
}
func TestGetChannelDistJSON(t *testing.T) {
manager := NewChannelDistManager()
channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: 100,
ChannelName: "channel-1",
})
channel1.Node = 1
channel1.Version = 1
channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: 200,
ChannelName: "channel-2",
})
channel2.Node = 2
channel2.Version = 1
manager.Update(1, channel1)
manager.Update(2, channel2)
channels := manager.GetChannelDist()
assert.Equal(t, 2, len(channels))
checkResult := func(channel *metricsinfo.DmChannel) {
if channel.NodeID == 1 {
assert.Equal(t, "channel-1", channel.ChannelName)
assert.Equal(t, int64(100), channel.CollectionID)
} else if channel.NodeID == 2 {
assert.Equal(t, "channel-2", channel.ChannelName)
assert.Equal(t, int64(200), channel.CollectionID)
} else {
assert.Failf(t, "unexpected node id", "unexpected node id %d", channel.NodeID)
}
}
for _, channel := range channels {
checkResult(channel)
}
}

View File

@ -16,6 +16,15 @@
package meta
import (
"encoding/json"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
)
type DistributionManager struct {
*SegmentDistManager
*ChannelDistManager
@ -29,3 +38,30 @@ func NewDistributionManager() *DistributionManager {
LeaderViewManager: NewLeaderViewManager(),
}
}
// GetDistributionJSON returns a JSON representation of the current distribution state.
// It includes segments, DM channels, and leader views.
// If there are no segments, channels, or leader views, it returns an empty string.
// In case of an error during JSON marshaling, it returns the error.
func (dm *DistributionManager) GetDistributionJSON() string {
segments := dm.GetSegmentDist()
channels := dm.GetChannelDist()
leaderView := dm.GetLeaderView()
if len(segments) == 0 && len(channels) == 0 && len(leaderView) == 0 {
return ""
}
dist := &metricsinfo.QueryCoordDist{
Segments: segments,
DMChannels: channels,
LeaderViews: leaderView,
}
v, err := json.Marshal(dist)
if err != nil {
log.Warn("failed to marshal dist", zap.Error(err))
return ""
}
return string(v)
}

View File

@ -0,0 +1,94 @@
package meta
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
)
func TestGetDistributionJSON(t *testing.T) {
// Initialize DistributionManager
manager := NewDistributionManager()
// Add some segments to the SegmentDistManager
segment1 := SegmentFromInfo(&datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
InsertChannel: "channel-1",
NumOfRows: 1000,
State: commonpb.SegmentState_Flushed,
})
segment1.Node = 1
segment1.Version = 1
segment2 := SegmentFromInfo(&datapb.SegmentInfo{
ID: 2,
CollectionID: 200,
PartitionID: 20,
InsertChannel: "channel-2",
NumOfRows: 2000,
State: commonpb.SegmentState_Flushed,
})
segment2.Node = 2
segment2.Version = 1
manager.SegmentDistManager.Update(1, segment1)
manager.SegmentDistManager.Update(2, segment2)
// Add some channels to the ChannelDistManager
channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: 100,
ChannelName: "channel-1",
})
channel1.Node = 1
channel1.Version = 1
channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: 200,
ChannelName: "channel-2",
})
channel2.Node = 2
channel2.Version = 1
manager.ChannelDistManager.Update(1, channel1)
manager.ChannelDistManager.Update(2, channel2)
// Add some leader views to the LeaderViewManager
leaderView1 := &LeaderView{
ID: 1,
CollectionID: 100,
Channel: "channel-1",
Version: 1,
Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}},
}
leaderView2 := &LeaderView{
ID: 2,
CollectionID: 200,
Channel: "channel-2",
Version: 1,
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
}
manager.LeaderViewManager.Update(1, leaderView1)
manager.LeaderViewManager.Update(2, leaderView2)
// Call GetDistributionJSON
jsonOutput := manager.GetDistributionJSON()
// Verify JSON output
var dist metricsinfo.QueryCoordDist
err := json.Unmarshal([]byte(jsonOutput), &dist)
assert.NoError(t, err)
assert.Len(t, dist.Segments, 2)
assert.Len(t, dist.DMChannels, 2)
assert.Len(t, dist.LeaderViews, 2)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/samber/lo"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -314,3 +315,46 @@ func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderView
return v1.Version > v2.Version
})
}
// GetLeaderView returns a slice of LeaderView objects, each representing the state of a leader node.
// It traverses the views map, converts each LeaderView to a metricsinfo.LeaderView, and collects them into a slice.
// The method locks the views map for reading to ensure thread safety.
func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView {
mgr.rwmutex.RLock()
defer mgr.rwmutex.RUnlock()
var leaderViews []*metricsinfo.LeaderView
for _, nodeViews := range mgr.views {
for _, lv := range nodeViews.views {
errString := ""
if lv.UnServiceableError != nil {
errString = lv.UnServiceableError.Error()
}
leaderView := &metricsinfo.LeaderView{
LeaderID: lv.ID,
CollectionID: lv.CollectionID,
Channel: lv.Channel,
Version: lv.Version,
SealedSegments: make([]*metricsinfo.Segment, 0, len(lv.Segments)),
GrowingSegments: make([]*metricsinfo.Segment, 0, len(lv.GrowingSegments)),
TargetVersion: lv.TargetVersion,
NumOfGrowingRows: lv.NumOfGrowingRows,
UnServiceableError: errString,
}
for segID, seg := range lv.Segments {
leaderView.SealedSegments = append(leaderView.SealedSegments, &metricsinfo.Segment{
SegmentID: segID,
NodeID: seg.NodeID,
})
}
for _, seg := range lv.GrowingSegments {
leaderView.GrowingSegments = append(leaderView.GrowingSegments, newSegmentMetricsFrom(seg))
}
leaderViews = append(leaderViews, leaderView)
}
}
return leaderViews
}

View File

@ -17,13 +17,20 @@
package meta
import (
"encoding/json"
"testing"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -317,3 +324,74 @@ func (suite *LeaderViewManagerSuite) TestNotifyDelegatorChanges() {
func TestLeaderViewManager(t *testing.T) {
suite.Run(t, new(LeaderViewManagerSuite))
}
func TestGetLeaderView(t *testing.T) {
manager := NewLeaderViewManager()
leaderView1 := &LeaderView{
ID: 1,
CollectionID: 100,
Channel: "channel-1",
Version: 1,
Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}},
GrowingSegments: map[int64]*Segment{
1: {SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 100, PartitionID: 10, InsertChannel: "channel-1", NumOfRows: 1000, State: commonpb.SegmentState_Growing}, Node: 1},
},
TargetVersion: 1,
NumOfGrowingRows: 1000,
UnServiceableError: nil,
}
leaderView2 := &LeaderView{
ID: 2,
CollectionID: 200,
Channel: "channel-2",
Version: 1,
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
GrowingSegments: map[int64]*Segment{
2: {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 200, PartitionID: 20, InsertChannel: "channel-2", NumOfRows: 2000, State: commonpb.SegmentState_Growing}, Node: 2},
},
TargetVersion: 1,
NumOfGrowingRows: 2000,
UnServiceableError: nil,
}
manager.Update(1, leaderView1)
manager.Update(2, leaderView2)
// Call GetLeaderView
leaderViews := manager.GetLeaderView()
jsonOutput, err := json.Marshal(leaderViews)
assert.NoError(t, err)
var result []*metricsinfo.LeaderView
err = json.Unmarshal(jsonOutput, &result)
assert.NoError(t, err)
assert.Len(t, result, 2)
log.Info("====", zap.Any("result", result))
checkResult := func(lv *metricsinfo.LeaderView) {
if lv.LeaderID == 1 {
assert.Equal(t, int64(100), lv.CollectionID)
assert.Equal(t, "channel-1", lv.Channel)
assert.Equal(t, int64(1), lv.Version)
assert.Len(t, lv.SealedSegments, 1)
assert.Len(t, lv.GrowingSegments, 1)
assert.Equal(t, int64(1), lv.SealedSegments[0].SegmentID)
assert.Equal(t, int64(1), lv.GrowingSegments[0].SegmentID)
} else if lv.LeaderID == 2 {
assert.Equal(t, int64(200), lv.CollectionID)
assert.Equal(t, "channel-2", lv.Channel)
assert.Equal(t, int64(1), lv.Version)
assert.Len(t, lv.SealedSegments, 1)
assert.Len(t, lv.GrowingSegments, 1)
assert.Equal(t, int64(2), lv.SealedSegments[0].SegmentID)
assert.Equal(t, int64(2), lv.GrowingSegments[0].SegmentID)
} else {
assert.Failf(t, "unexpected leader id", "unexpected leader id %d", lv.LeaderID)
}
}
for _, lv := range result {
checkResult(lv)
}
}

View File

@ -565,6 +565,52 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run
return _c
}
// GetTargetJSON provides a mock function with given fields: scope
func (_m *MockTargetManager) GetTargetJSON(scope int32) string {
ret := _m.Called(scope)
if len(ret) == 0 {
panic("no return value specified for GetTargetJSON")
}
var r0 string
if rf, ok := ret.Get(0).(func(int32) string); ok {
r0 = rf(scope)
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockTargetManager_GetTargetJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTargetJSON'
type MockTargetManager_GetTargetJSON_Call struct {
*mock.Call
}
// GetTargetJSON is a helper method to define mock.On call
// - scope int32
func (_e *MockTargetManager_Expecter) GetTargetJSON(scope interface{}) *MockTargetManager_GetTargetJSON_Call {
return &MockTargetManager_GetTargetJSON_Call{Call: _e.mock.On("GetTargetJSON", scope)}
}
func (_c *MockTargetManager_GetTargetJSON_Call) Run(run func(scope int32)) *MockTargetManager_GetTargetJSON_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int32))
})
return _c
}
func (_c *MockTargetManager_GetTargetJSON_Call) Return(_a0 string) *MockTargetManager_GetTargetJSON_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetTargetJSON_Call) RunAndReturn(run func(int32) string) *MockTargetManager_GetTargetJSON_Call {
_c.Call.Return(run)
return _c
}
// IsCurrentTargetExist provides a mock function with given fields: collectionID, partitionID
func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool {
ret := _m.Called(collectionID, partitionID)

View File

@ -17,6 +17,7 @@
package meta
import (
"encoding/json"
"fmt"
"sync"
@ -28,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -466,3 +468,33 @@ func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.Unique
ret := typeutil.NewSet(lo.Map(replicas, func(r *Replica, _ int) string { return r.GetResourceGroup() })...)
return ret
}
// GetReplicasJSON returns a JSON representation of all replicas managed by the ReplicaManager.
// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation,
// marshals them into a JSON string, and returns the result.
// If an error occurs during marshaling, it logs a warning and returns an empty string.
func (m *ReplicaManager) GetReplicasJSON() string {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
replicas := lo.MapToSlice(m.replicas, func(i typeutil.UniqueID, r *Replica) *metricsinfo.Replica {
channelTowRWNodes := make(map[string][]int64)
for k, v := range r.replicaPB.GetChannelNodeInfos() {
channelTowRWNodes[k] = v.GetRwNodes()
}
return &metricsinfo.Replica{
ID: r.GetID(),
CollectionID: r.GetCollectionID(),
RWNodes: r.GetNodes(),
ResourceGroup: r.GetResourceGroup(),
RONodes: r.GetRONodes(),
ChannelToRWNodes: channelTowRWNodes,
}
})
ret, err := json.Marshal(replicas)
if err != nil {
log.Warn("failed to marshal replicas", zap.Error(err))
return ""
}
return string(ret)
}

View File

@ -17,9 +17,12 @@
package meta
import (
"encoding/json"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"
@ -27,10 +30,12 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -494,3 +499,54 @@ func TestReplicaManager(t *testing.T) {
suite.Run(t, new(ReplicaManagerSuite))
suite.Run(t, new(ReplicaManagerV2Suite))
}
func TestGetReplicasJSON(t *testing.T) {
catalog := mocks.NewQueryCoordCatalog(t)
catalog.EXPECT().SaveReplica(mock.Anything).Return(nil)
idAllocator := RandomIncrementIDAllocator()
replicaManager := NewReplicaManager(idAllocator, catalog)
// Add some replicas to the ReplicaManager
replica1 := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 100,
ResourceGroup: "rg1",
Nodes: []int64{1, 2, 3},
})
replica2 := newReplica(&querypb.Replica{
ID: 2,
CollectionID: 200,
ResourceGroup: "rg2",
Nodes: []int64{4, 5, 6},
})
err := replicaManager.put(replica1)
assert.NoError(t, err)
err = replicaManager.put(replica2)
assert.NoError(t, err)
jsonOutput := replicaManager.GetReplicasJSON()
var replicas []*metricsinfo.Replica
err = json.Unmarshal([]byte(jsonOutput), &replicas)
assert.NoError(t, err)
assert.Len(t, replicas, 2)
checkResult := func(replica *metricsinfo.Replica) {
if replica.ID == 1 {
assert.Equal(t, int64(100), replica.CollectionID)
assert.Equal(t, "rg1", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes)
} else if replica.ID == 2 {
assert.Equal(t, int64(200), replica.CollectionID)
assert.Equal(t, "rg2", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes)
} else {
assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID)
}
}
for _, replica := range replicas {
checkResult(replica)
}
}

View File

@ -17,6 +17,7 @@
package meta
import (
"encoding/json"
"fmt"
"sync"
@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -918,3 +920,23 @@ func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error
}
return nil
}
func (rm *ResourceManager) GetResourceGroupsJSON() string {
rm.rwmutex.RLock()
defer rm.rwmutex.RUnlock()
rgs := lo.MapToSlice(rm.groups, func(i string, r *ResourceGroup) *metricsinfo.ResourceGroup {
return &metricsinfo.ResourceGroup{
Name: r.GetName(),
Nodes: r.GetNodes(),
Cfg: r.GetConfig(),
}
})
ret, err := json.Marshal(rgs)
if err != nil {
log.Error("failed to marshal resource groups", zap.Error(err))
return ""
}
return string(ret)
}

View File

@ -16,8 +16,10 @@
package meta
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -30,7 +32,9 @@ import (
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type ResourceManagerSuite struct {
@ -619,3 +623,33 @@ func (suite *ResourceManagerSuite) TestUnassignFail() {
suite.manager.HandleNodeDown(1)
})
}
func TestGetResourceGroupsJSON(t *testing.T) {
manager := &ResourceManager{groups: make(map[string]*ResourceGroup)}
rg1 := NewResourceGroup("rg1", newResourceGroupConfig(0, 10))
rg1.nodes = typeutil.NewUniqueSet(1, 2)
rg2 := NewResourceGroup("rg2", newResourceGroupConfig(0, 20))
rg2.nodes = typeutil.NewUniqueSet(3, 4)
manager.groups["rg1"] = rg1
manager.groups["rg2"] = rg2
jsonOutput := manager.GetResourceGroupsJSON()
var resourceGroups []*metricsinfo.ResourceGroup
err := json.Unmarshal([]byte(jsonOutput), &resourceGroups)
assert.NoError(t, err)
assert.Len(t, resourceGroups, 2)
checkResult := func(rg *metricsinfo.ResourceGroup) {
if rg.Name == "rg1" {
assert.ElementsMatch(t, []int64{1, 2}, rg.Nodes)
} else if rg.Name == "rg2" {
assert.ElementsMatch(t, []int64{3, 4}, rg.Nodes)
} else {
assert.Failf(t, "unexpected resource group name", "unexpected resource group name %s", rg.Name)
}
}
for _, rg := range resourceGroups {
checkResult(rg)
}
}

View File

@ -24,6 +24,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -130,6 +132,21 @@ func SegmentFromInfo(info *datapb.SegmentInfo) *Segment {
}
}
func newSegmentMetricsFrom(segment *Segment) *metricsinfo.Segment {
convertedSegment := metrics.NewSegmentFrom(segment.SegmentInfo)
convertedSegment.NodeID = segment.Node
convertedSegment.LoadedTimestamp = segment.Version
convertedSegment.Index = lo.Map(lo.Values(segment.IndexInfo), func(e *querypb.FieldIndexInfo, i int) *metricsinfo.SegmentIndex {
return &metricsinfo.SegmentIndex{
IndexFieldID: e.FieldID,
IndexID: e.IndexID,
BuildID: e.BuildID,
IndexSize: e.IndexSize,
}
})
return convertedSegment
}
func (segment *Segment) Clone() *Segment {
return &Segment{
SegmentInfo: proto.Clone(segment.SegmentInfo).(*datapb.SegmentInfo),
@ -227,3 +244,17 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen
}
return ret
}
func (m *SegmentDistManager) GetSegmentDist() []*metricsinfo.Segment {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
var segments []*metricsinfo.Segment
for _, nodeSeg := range m.segments {
for _, segment := range nodeSeg.segments {
segments = append(segments, newSegmentMetricsFrom(segment))
}
}
return segments
}

View File

@ -19,10 +19,13 @@ package meta
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
)
type SegmentDistManagerSuite struct {
@ -188,3 +191,63 @@ func (suite *SegmentDistManagerSuite) AssertShard(segments []*Segment, shard str
func TestSegmentDistManager(t *testing.T) {
suite.Run(t, new(SegmentDistManagerSuite))
}
func TestGetSegmentDistJSON(t *testing.T) {
// Initialize SegmentDistManager
manager := NewSegmentDistManager()
// Add some segments to the SegmentDistManager
segment1 := SegmentFromInfo(&datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
InsertChannel: "channel-1",
NumOfRows: 1000,
State: commonpb.SegmentState_Flushed,
})
segment1.Node = 1
segment1.Version = 1
segment2 := SegmentFromInfo(&datapb.SegmentInfo{
ID: 2,
CollectionID: 200,
PartitionID: 20,
InsertChannel: "channel-2",
NumOfRows: 2000,
State: commonpb.SegmentState_Flushed,
})
segment2.Node = 2
segment2.Version = 1
manager.Update(1, segment1)
manager.Update(2, segment2)
segments := manager.GetSegmentDist()
assert.Equal(t, 2, len(segments))
checkResults := func(s *metricsinfo.Segment) {
if s.SegmentID == 1 {
assert.Equal(t, int64(100), s.CollectionID)
assert.Equal(t, int64(10), s.PartitionID)
assert.Equal(t, "channel-1", s.Channel)
assert.Equal(t, int64(1000), s.NumOfRows)
assert.Equal(t, "Flushed", s.State)
assert.Equal(t, int64(1), s.NodeID)
assert.Equal(t, int64(1), s.LoadedTimestamp)
} else if s.SegmentID == 2 {
assert.Equal(t, int64(200), s.CollectionID)
assert.Equal(t, int64(20), s.PartitionID)
assert.Equal(t, "channel-2", s.Channel)
assert.Equal(t, int64(2000), s.NumOfRows)
assert.Equal(t, "Flushed", s.State)
assert.Equal(t, int64(2), s.NodeID)
assert.Equal(t, int64(1), s.LoadedTimestamp)
} else {
assert.Failf(t, "unexpected segment id", "unexpected segment id %d", s.SegmentID)
}
}
for _, s := range segments {
checkResults(s)
}
}

View File

@ -23,6 +23,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -183,3 +185,21 @@ func (t *target) removeCollectionTarget(collectionID int64) {
func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget {
return t.collectionTargetMap[collectionID]
}
func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget {
return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordTarget {
segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment {
return metrics.NewSegmentFrom(s)
})
dmChannels := lo.MapToSlice(v.GetAllDmChannels(), func(k string, ch *DmChannel) *metricsinfo.DmChannel {
return metrics.NewDMChannelFrom(ch.VchannelInfo)
})
return &metricsinfo.QueryCoordTarget{
CollectionID: k,
Segments: segments,
DMChannels: dmChannels,
}
})
}

View File

@ -18,6 +18,7 @@ package meta
import (
"context"
"encoding/json"
"fmt"
"runtime"
"sync"
@ -68,6 +69,7 @@ type TargetManagerInterface interface {
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
Recover(catalog metastore.QueryCoordCatalog) error
CanSegmentBeMoved(collectionID, segmentID int64) bool
GetTargetJSON(scope TargetScope) string
}
type TargetManager struct {
@ -632,3 +634,28 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool
return false
}
func (mgr *TargetManager) GetTargetJSON(scope TargetScope) string {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
ret := mgr.getTarget(scope)
if ret == nil {
return ""
}
v, err := json.Marshal(ret.toQueryCoordCollectionTargets())
if err != nil {
log.Warn("failed to marshal target", zap.Error(err))
return ""
}
return string(v)
}
func (mgr *TargetManager) getTarget(scope TargetScope) *target {
if scope == CurrentTarget {
return mgr.current
}
return mgr.next
}

View File

@ -17,11 +17,13 @@
package meta
import (
"encoding/json"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -34,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -602,6 +605,64 @@ func (suite *TargetManagerSuite) TestRecover() {
suite.Len(targets, 0)
}
func (suite *TargetManagerSuite) TestGetTargetJSON() {
collectionID := int64(1003)
suite.meta.PutCollection(&Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: collectionID,
ReplicaNumber: 1,
},
})
suite.meta.PutPartition(&Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: collectionID,
PartitionID: 1,
},
})
nextTargetChannels := []*datapb.VchannelInfo{
{
CollectionID: collectionID,
ChannelName: "channel-1",
UnflushedSegmentIds: []int64{1, 2, 3, 4},
DroppedSegmentIds: []int64{11, 22, 33},
},
{
CollectionID: collectionID,
ChannelName: "channel-2",
UnflushedSegmentIds: []int64{5},
},
}
nextTargetSegments := []*datapb.SegmentInfo{
{
ID: 11,
PartitionID: 1,
InsertChannel: "channel-1",
},
{
ID: 12,
PartitionID: 1,
InsertChannel: "channel-2",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil)
suite.NoError(suite.mgr.UpdateCollectionNextTarget(collectionID))
suite.True(suite.mgr.UpdateCollectionCurrentTarget(collectionID))
jsonStr := suite.mgr.GetTargetJSON(CurrentTarget)
assert.NotEmpty(suite.T(), jsonStr)
var currentTarget []*metricsinfo.QueryCoordTarget
err := json.Unmarshal([]byte(jsonStr), &currentTarget)
suite.NoError(err)
assert.Len(suite.T(), currentTarget, 1)
assert.Equal(suite.T(), collectionID, currentTarget[0].CollectionID)
assert.Len(suite.T(), currentTarget[0].DMChannels, 2)
assert.Len(suite.T(), currentTarget[0].Segments, 2)
}
func TestTargetManager(t *testing.T) {
suite.Run(t, new(TargetManagerSuite))
}

View File

@ -199,8 +199,41 @@ func (s *Server) registerMetricsRequest() {
return s.taskScheduler.GetTasksJSON(), nil
}
QueryDistAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.targetMgr.GetTargetJSON(meta.CurrentTarget), nil
}
QueryTargetAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.dist.GetDistributionJSON(), nil
}
QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.meta.GetReplicasJSON(), nil
}
QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.meta.GetResourceGroupsJSON(), nil
}
QuerySegmentsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.getSegmentsFromQueryNode(ctx, req)
}
QueryChannelsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.getChannelsFromQueryNode(ctx, req)
}
// register actions that requests are processed in querycoord
s.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, getSystemInfoAction)
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryCoordAllTasks, QueryTasksAction)
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryDist, QueryDistAction)
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryTarget, QueryTargetAction)
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryReplicas, QueryReplicasAction)
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryResourceGroups, QueryResourceGroupsAction)
// register actions that requests are processed in querynode
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, QuerySegmentsAction)
s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, QueryChannelsAction)
log.Info("register metrics actions finished")
}

View File

@ -18,14 +18,17 @@ package querynodev2
import (
"context"
"encoding/json"
"fmt"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -170,6 +173,54 @@ func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetr
return ret, nil
}
// getChannelJSON returns the JSON string of channels
func getChannelJSON(node *QueryNode) string {
stats := node.pipelineManager.GetChannelStats()
ret, err := json.Marshal(stats)
if err != nil {
log.Warn("failed to marshal channels", zap.Error(err))
return ""
}
return string(ret)
}
// getSegmentJSON returns the JSON string of segments
func getSegmentJSON(node *QueryNode) string {
allSegments := node.manager.Segment.GetBy()
var ms []*metricsinfo.Segment
for _, s := range allSegments {
indexes := make([]*metricsinfo.SegmentIndex, 0, len(s.Indexes()))
for _, index := range s.Indexes() {
indexes = append(indexes, &metricsinfo.SegmentIndex{
IndexFieldID: index.IndexInfo.FieldID,
IndexID: index.IndexInfo.IndexID,
IndexSize: index.IndexInfo.IndexSize,
BuildID: index.IndexInfo.BuildID,
IsLoaded: index.IsLoaded,
})
}
ms = append(ms, &metricsinfo.Segment{
SegmentID: s.ID(),
CollectionID: s.Collection(),
PartitionID: s.Partition(),
MemSize: s.MemSize(),
Index: indexes,
State: s.Type().String(),
ResourceGroup: s.ResourceGroup(),
LoadedInsertRowCount: s.InsertCount(),
NodeID: node.GetNodeID(),
})
}
ret, err := json.Marshal(ms)
if err != nil {
log.Warn("failed to marshal segments", zap.Error(err))
return ""
}
return string(ret)
}
// getSystemInfoMetrics returns metrics info of QueryNode
func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (string, error) {
usedMem := hardware.GetUsedMemoryCount()

View File

@ -0,0 +1,131 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynodev2
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/pipeline"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestGetPipelineJSON(t *testing.T) {
paramtable.Init()
ch := "ch"
tSafeManager := tsafe.NewTSafeReplica()
tSafeManager.Add(context.Background(), ch, 0)
delegators := typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()
d := delegator.NewMockShardDelegator(t)
delegators.Insert(ch, d)
msgDispatcher := msgdispatcher.NewMockClient(t)
collectionManager := segments.NewMockCollectionManager(t)
segmentManager := segments.NewMockSegmentManager(t)
collectionManager.EXPECT().Get(mock.Anything).Return(&segments.Collection{})
manager := &segments.Manager{
Collection: collectionManager,
Segment: segmentManager,
}
pipelineManager := pipeline.NewManager(manager, tSafeManager, msgDispatcher, delegators)
_, err := pipelineManager.Add(1, ch)
assert.NoError(t, err)
assert.Equal(t, 1, pipelineManager.Num())
stats := pipelineManager.GetChannelStats()
expectedStats := []*metricsinfo.Channel{
{
Name: ch,
WatchState: "Healthy",
LatestTimeTick: typeutil.TimestampToString(0),
NodeID: paramtable.GetNodeID(),
CollectionID: 1,
},
}
assert.Equal(t, expectedStats, stats)
JSONStr := getChannelJSON(&QueryNode{pipelineManager: pipelineManager})
assert.NotEmpty(t, JSONStr)
var actualStats []*metricsinfo.Channel
err = json.Unmarshal([]byte(JSONStr), &actualStats)
assert.NoError(t, err)
assert.Equal(t, expectedStats, actualStats)
}
func TestGetSegmentJSON(t *testing.T) {
segment := segments.NewMockSegment(t)
segment.EXPECT().ID().Return(int64(1))
segment.EXPECT().Collection().Return(int64(1001))
segment.EXPECT().Partition().Return(int64(2001))
segment.EXPECT().MemSize().Return(int64(1024))
segment.EXPECT().Indexes().Return([]*segments.IndexedFieldInfo{
{
IndexInfo: &querypb.FieldIndexInfo{
FieldID: 1,
IndexID: 101,
IndexSize: 512,
BuildID: 10001,
},
IsLoaded: true,
},
})
segment.EXPECT().Type().Return(segments.SegmentTypeGrowing)
segment.EXPECT().ResourceGroup().Return("default")
segment.EXPECT().InsertCount().Return(int64(100))
node := &QueryNode{}
mockedSegmentManager := segments.NewMockSegmentManager(t)
mockedSegmentManager.EXPECT().GetBy().Return([]segments.Segment{segment})
node.manager = &segments.Manager{Segment: mockedSegmentManager}
jsonStr := getSegmentJSON(node)
assert.NotEmpty(t, jsonStr)
var segments []*metricsinfo.Segment
err := json.Unmarshal([]byte(jsonStr), &segments)
assert.NoError(t, err)
assert.NotNil(t, segments)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(1), segments[0].SegmentID)
assert.Equal(t, int64(1001), segments[0].CollectionID)
assert.Equal(t, int64(2001), segments[0].PartitionID)
assert.Equal(t, int64(1024), segments[0].MemSize)
assert.Equal(t, 1, len(segments[0].Index))
assert.Equal(t, int64(1), segments[0].Index[0].IndexFieldID)
assert.Equal(t, int64(101), segments[0].Index[0].IndexID)
assert.Equal(t, int64(512), segments[0].Index[0].IndexSize)
assert.Equal(t, int64(10001), segments[0].Index[0].BuildID)
assert.True(t, segments[0].Index[0].IsLoaded)
assert.Equal(t, "Growing", segments[0].State)
assert.Equal(t, "default", segments[0].ResourceGroup)
assert.Equal(t, int64(100), segments[0].LoadedInsertRowCount)
}

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -40,6 +41,7 @@ type Manager interface {
Remove(channels ...string)
Start(channels ...string) error
Close()
GetChannelStats() []*metricsinfo.Channel
}
type manager struct {
@ -155,6 +157,27 @@ func (m *manager) Close() {
}
}
func (m *manager) GetChannelStats() []*metricsinfo.Channel {
m.mu.RLock()
defer m.mu.RUnlock()
ret := make([]*metricsinfo.Channel, 0, len(m.channel2Pipeline))
for ch, p := range m.channel2Pipeline {
tt, err := m.tSafeManager.Get(ch)
if err != nil {
log.Warn("get tSafe failed", zap.String("channel", ch), zap.Error(err))
}
ret = append(ret, &metricsinfo.Channel{
Name: ch,
WatchState: p.Status(),
LatestTimeTick: typeutil.TimestampToString(tt),
NodeID: paramtable.GetNodeID(),
CollectionID: p.GetCollectionID(),
})
}
return ret
}
func NewManager(dataManager *DataManager,
tSafeManager TSafeManager,
dispatcher msgdispatcher.Client,

View File

@ -26,6 +26,7 @@ import (
// pipeline used for querynode
type Pipeline interface {
base.StreamPipeline
GetCollectionID() UniqueID
}
type pipeline struct {
@ -35,6 +36,10 @@ type pipeline struct {
embeddingNode embeddingNode
}
func (p *pipeline) GetCollectionID() UniqueID {
return p.collectionID
}
func (p *pipeline) Close() {
p.StreamPipeline.Close()
}

View File

@ -283,6 +283,16 @@ func (node *QueryNode) registerMetricsRequest() {
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return getSystemInfoMetrics(ctx, req, node)
})
node.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return getSegmentJSON(node), nil
})
node.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return getChannelJSON(node), nil
})
log.Info("register metrics actions finished")
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
@ -123,11 +124,21 @@ func (fg *TimeTickedFlowGraph) Close() {
})
}
// Status returns the status of the pipeline, it will return "Healthy" if the input node
// has received any msg in the last nodeTtInterval
func (fg *TimeTickedFlowGraph) Status() string {
diff := time.Since(fg.nodeCtxManager.lastAccessTime.Load())
if diff > nodeCtxTtInterval {
return fmt.Sprintf("input node hasn't received any msg in the last %s", diff.String())
}
return "Healthy"
}
// NewTimeTickedFlowGraph create timetick flowgraph
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph {
flowGraph := TimeTickedFlowGraph{
nodeCtx: make(map[string]*nodeCtx),
nodeCtxManager: &nodeCtxManager{},
nodeCtxManager: &nodeCtxManager{lastAccessTime: atomic.NewTime(time.Now())},
closeWg: &sync.WaitGroup{},
closeGracefully: atomic.NewBool(CloseImmediately),
}

View File

@ -21,6 +21,7 @@ import (
"sync"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
@ -59,14 +60,17 @@ type nodeCtxManager struct {
closeWg *sync.WaitGroup
closeOnce sync.Once
closeCh chan struct{} // notify nodes to exit
lastAccessTime *atomic.Time
}
// NewNodeCtxManager init with the inputNode and fg.closeWg
func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManager {
return &nodeCtxManager{
inputNodeCtx: nodeCtx,
closeWg: closeWg,
closeCh: make(chan struct{}),
inputNodeCtx: nodeCtx,
closeWg: closeWg,
closeCh: make(chan struct{}),
lastAccessTime: atomic.NewTime(time.Now()),
}
}
@ -119,6 +123,10 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() {
continue
}
if nodeCtxManager.lastAccessTime != nil {
nodeCtxManager.lastAccessTime.Store(time.Now())
}
output = n.Operate(input)
curNode.blockMutex.RUnlock()
// the output decide whether the node should be closed.

View File

@ -105,11 +105,7 @@ func TestNodeManager_Start(t *testing.T) {
node0.inputChannel = make(chan []Msg)
nodeCtxManager := &nodeCtxManager{
inputNodeCtx: node0,
closeWg: &sync.WaitGroup{},
}
nodeCtxManager := NewNodeCtxManager(node0, &sync.WaitGroup{})
assert.NotPanics(t, func() {
nodeCtxManager.Start()
})

View File

@ -1,46 +1,34 @@
package metrics
import (
"github.com/samber/lo"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
)
func PruneFieldIndexInfo(f *querypb.FieldIndexInfo) *querypb.FieldIndexInfo {
return &querypb.FieldIndexInfo{
FieldID: f.FieldID,
IndexID: f.IndexID,
BuildID: f.BuildID,
IndexSize: f.IndexSize,
NumRows: f.NumRows,
func NewSegmentFrom(segment *datapb.SegmentInfo) *metricsinfo.Segment {
return &metricsinfo.Segment{
SegmentID: segment.GetID(),
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),
Channel: segment.GetInsertChannel(),
NumOfRows: segment.GetNumOfRows(),
State: segment.GetState().String(),
IsImporting: segment.GetIsImporting(),
Compacted: segment.GetCompacted(),
Level: segment.GetLevel().String(),
IsSorted: segment.GetIsSorted(),
IsInvisible: segment.GetIsInvisible(),
}
}
func PruneSegmentInfo(s *datapb.SegmentInfo) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: s.ID,
NumOfRows: s.NumOfRows,
State: s.State,
Compacted: s.Compacted,
Level: s.Level,
}
}
func PruneVChannelInfo(channel *datapb.VchannelInfo) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
ChannelName: channel.ChannelName,
UnflushedSegments: lo.Map(channel.UnflushedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo {
return PruneSegmentInfo(s)
}),
FlushedSegments: lo.Map(channel.FlushedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo {
return PruneSegmentInfo(s)
}),
DroppedSegments: lo.Map(channel.DroppedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo {
return PruneSegmentInfo(s)
}),
IndexedSegments: lo.Map(channel.IndexedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo {
return PruneSegmentInfo(s)
}),
func NewDMChannelFrom(channel *datapb.VchannelInfo) *metricsinfo.DmChannel {
return &metricsinfo.DmChannel{
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetChannelName(),
UnflushedSegmentIds: channel.GetUnflushedSegmentIds(),
FlushedSegmentIds: channel.GetFlushedSegmentIds(),
DroppedSegmentIds: channel.GetDroppedSegmentIds(),
LevelZeroSegmentIds: channel.GetLevelZeroSegmentIds(),
PartitionStatsVersions: channel.GetPartitionStatsVersions(),
}
}

View File

@ -18,9 +18,11 @@ package pipeline
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -39,6 +41,7 @@ import (
type StreamPipeline interface {
Pipeline
ConsumeMsgStream(position *msgpb.MsgPosition) error
Status() string
}
type streamPipeline struct {
@ -52,6 +55,8 @@ type streamPipeline struct {
closeCh chan struct{} // notify work to exit
closeWg sync.WaitGroup
closeOnce sync.Once
lastAccessTime *atomic.Time
}
func (p *streamPipeline) work() {
@ -62,6 +67,7 @@ func (p *streamPipeline) work() {
log.Debug("stream pipeline input closed")
return
case msg := <-p.input:
p.lastAccessTime.Store(time.Now())
log.RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs)))
p.pipeline.inputChannel <- msg
p.pipeline.process()
@ -69,6 +75,16 @@ func (p *streamPipeline) work() {
}
}
// Status returns the status of the pipeline, it will return "Healthy" if the input node
// has received any msg in the last nodeTtInterval
func (p *streamPipeline) Status() string {
diff := time.Since(p.lastAccessTime.Load())
if diff > p.pipeline.nodeTtInterval {
return fmt.Sprintf("input node hasn't received any msg in the last %s", diff.String())
}
return "Healthy"
}
func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
var err error
if position == nil {
@ -150,10 +166,11 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval time.
nodeTtInterval: nodeTtInterval,
enableTtChecker: enableTtChecker,
},
dispatcher: dispatcher,
vChannel: vChannel,
closeCh: make(chan struct{}),
closeWg: sync.WaitGroup{},
dispatcher: dispatcher,
vChannel: vChannel,
closeCh: make(chan struct{}),
closeWg: sync.WaitGroup{},
lastAccessTime: atomic.NewTime(time.Now()),
}
return pipeline

View File

@ -45,10 +45,16 @@ const (
MetricRequestParamsSeparator = ","
// QuerySegmentDist request for segment distribution on the query node
QuerySegmentDist = "qc_segment_dist"
QuerySegments = "qn_segments"
// QueryChannelDist request for channel distribution on the query node
QueryChannelDist = "qc_channel_dist"
QueryChannels = "qn_channels"
// QueryDist request for segment/channel/leader view distribution on querycoord
QueryDist = "qc_dist"
// QueryTarget request for segment/channel target on the querycoord
QueryTarget = "qc_target"
// QueryCoordAllTasks request for get tasks on the querycoord
QueryCoordAllTasks = "qc_tasks_all"
@ -59,8 +65,8 @@ const (
// QueryResourceGroups request for get resource groups on the querycoord
QueryResourceGroups = "qc_resource_group"
// DataCoordAllTasks request for get tasks on the datacoord
DataCoordAllTasks = "dc_tasks_all"
// DataDist request for get segments on the datacoord
DataDist = "dc_segments"
// ImportTasks request for get import tasks from the datacoord
ImportTasks = "dc_import_tasks"
@ -74,6 +80,12 @@ const (
// SyncTasks request for get sync tasks from the datanode
SyncTasks = "dn_sync_tasks"
// DataSegments request for get segments from the datanode
DataSegments = "dn_segments"
// DataChannels request for get channels from the datanode
DataChannels = "dn_channels"
// MetricRequestParamVerboseKey as a request parameter decide to whether return verbose value
MetricRequestParamVerboseKey = "verbose"
)

View File

@ -13,8 +13,8 @@ package metricsinfo
import (
"encoding/json"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -70,6 +70,105 @@ const (
MilvusUsedGoVersion = "MILVUS_USED_GO_VERSION"
)
type DmChannel struct {
NodeID int64 `json:"node_id,omitempty"`
Version int64 `json:"version,omitempty"`
CollectionID int64 `json:"collection_id,omitempty"`
ChannelName string `json:"channel_name,omitempty"`
UnflushedSegmentIds []int64 `json:"unflushed_segment_ids,omitempty"`
FlushedSegmentIds []int64 `json:"flushed_segment_ids,omitempty"`
DroppedSegmentIds []int64 `json:"dropped_segment_ids,omitempty"`
LevelZeroSegmentIds []int64 `json:"level_zero_segment_ids,omitempty"`
PartitionStatsVersions map[int64]int64 `json:"partition_stats_versions,omitempty"`
WatchState string `json:"watch_state,omitempty"`
StartWatchTS int64 `json:"start_watch_ts,omitempty"`
}
type Segment struct {
SegmentID int64 `json:"segment_id,omitempty"`
CollectionID int64 `json:"collection_id,omitempty"`
PartitionID int64 `json:"partition_id,omitempty"`
Channel string `json:"channel,omitempty"`
NumOfRows int64 `json:"num_of_rows,omitempty"`
State string `json:"state,omitempty"`
IsImporting bool `json:"is_importing,omitempty"`
Compacted bool `json:"compacted,omitempty"`
Level string `json:"level,omitempty"`
IsSorted bool `json:"is_sorted,omitempty"`
NodeID int64 `json:"node_id,omitempty"`
// load related
IsInvisible bool `json:"is_invisible,omitempty"`
LoadedTimestamp int64 `json:"loaded_timestamp,omitempty"`
Index []*SegmentIndex `json:"index,omitempty"`
ResourceGroup string `json:"resource_group,omitempty"`
LoadedInsertRowCount int64 `json:"loaded_insert_row_count,omitempty"` // inert row count for growing segment that excludes the deleted row count in QueryNode
MemSize int64 `json:"mem_size,omitempty"` // memory size of segment in QueryNode
// flush related
FlushedRows int64 `json:"flushed_rows,omitempty"`
SyncBufferRows int64 `json:"sync_buffer_rows,omitempty"`
SyncingRows int64 `json:"syncing_rows,omitempty"`
// TODO add checkpoints
}
type SegmentIndex struct {
IndexFieldID int64 `json:"field_id,omitempty"`
IndexID int64 `json:"index_id,omitempty"`
BuildID int64 `json:"build_id,omitempty"`
IndexSize int64 `json:"index_size,omitempty"`
IsLoaded bool `json:"is_loaded,omitempty"`
}
type QueryCoordTarget struct {
CollectionID int64 `json:"collection_id,omitempty"`
Segments []*Segment `json:"segments,omitempty"`
DMChannels []*DmChannel `json:"dm_channels,omitempty"`
}
type LeaderView struct {
LeaderID int64 `json:"leader_id"`
CollectionID int64 `json:"collection_id"`
Channel string `json:"channel"`
Version int64 `json:"version"`
SealedSegments []*Segment `json:"sealed_segments"`
GrowingSegments []*Segment `json:"growing_segments"`
TargetVersion int64 `json:"target_version"`
NumOfGrowingRows int64 `json:"num_of_growing_rows"`
UnServiceableError string `json:"unserviceable_error"`
}
type QueryCoordDist struct {
Segments []*Segment `json:"segments,omitempty"`
DMChannels []*DmChannel `json:"dm_channels,omitempty"`
LeaderViews []*LeaderView `json:"leader_views,omitempty"`
}
type ResourceGroup struct {
Name string `json:"name,omitempty"`
Nodes []int64 `json:"nodes,omitempty"`
Cfg *rgpb.ResourceGroupConfig `json:"cfg,omitempty"`
}
type Replica struct {
ID int64 `json:"ID,omitempty"`
CollectionID int64 `json:"collectionID,omitempty"`
RWNodes []int64 `json:"rw_nodes,omitempty"`
ResourceGroup string `json:"resource_group,omitempty"`
RONodes []int64 `json:"ro_nodes,omitempty"`
ChannelToRWNodes map[string][]int64 `json:"channel_to_rw_nodes,omitempty"`
}
// Channel is a subscribed channel of in querynode or datanode.
type Channel struct {
Name string `json:"name,omitempty"`
WatchState string `json:"watch_state,omitempty"`
LatestTimeTick string `json:"latest_time_tick,omitempty"` // a time string that indicates the latest time tick of the channel is received
NodeID int64 `json:"node_id,omitempty"`
CollectionID int64 `json:"collection_id,omitempty"`
CheckpointTS string `json:"check_point_ts,omitempty"` // a time string, format like "2006-01-02 15:04:05"
}
// DeployMetrics records the deploy information of nodes.
type DeployMetrics struct {
SystemVersion string `json:"system_version"`
@ -167,11 +266,12 @@ type SyncTask struct {
SegmentID int64 `json:"segment_id,omitempty"`
BatchRows int64 `json:"batch_rows,omitempty"`
SegmentLevel string `json:"segment_level,omitempty"`
TsFrom typeutil.Timestamp `json:"ts_from,omitempty"`
TsTo typeutil.Timestamp `json:"ts_to,omitempty"`
TSFrom typeutil.Timestamp `json:"ts_from,omitempty"`
TSTo typeutil.Timestamp `json:"ts_to,omitempty"`
DeltaRowCount int64 `json:"delta_row_count,omitempty"`
FlushSize int64 `json:"flush_size,omitempty"`
RunningTime time.Duration `json:"running_time,omitempty"`
RunningTime string `json:"running_time,omitempty"`
NodeID int64 `json:"node_id,omitempty"`
}
// DataNodeInfos implements ComponentInfos
@ -181,6 +281,11 @@ type DataNodeInfos struct {
QuotaMetrics *DataNodeQuotaMetrics `json:"quota_metrics"`
}
type DataCoordDist struct {
Segments []*Segment `json:"segments,omitempty"`
DMChannels []*DmChannel `json:"dm_channels,omitempty"`
}
// DataCoordConfiguration records the configuration of DataCoord.
type DataCoordConfiguration struct {
SegmentMaxSize float64 `json:"segment_max_size"`

View File

@ -12,7 +12,12 @@
package metricsinfo
import (
"encoding/json"
"os"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
)
// FillDeployMetricsWithEnv fill deploy metrics with env.
@ -23,3 +28,20 @@ func FillDeployMetricsWithEnv(m *DeployMetrics) {
m.UsedGoVersion = os.Getenv(MilvusUsedGoVersion)
m.BuildTime = os.Getenv(MilvusBuildTimeEnvKey)
}
func MarshalGetMetricsValues[T any](metrics []T, err error) (string, error) {
if err != nil {
return "", err
}
if len(metrics) == 0 {
return "", nil
}
bs, err := json.Marshal(metrics)
if err != nil {
log.Warn("marshal metrics value failed", zap.Any("metrics", metrics), zap.String("err", err.Error()))
return "", nil
}
return string(bs), nil
}

View File

@ -45,3 +45,8 @@ func ParseTimestamp(data []byte) (time.Time, error) {
func SubTimeByWallClock(after, before time.Time) time.Duration {
return time.Duration(after.UnixNano() - before.UnixNano())
}
func TimestampToString(ts uint64) string {
ut := time.Unix(int64(ts), 0)
return ut.Format(time.DateTime)
}