diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index ae13f43d52..3e18d128ed 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -52,8 +52,10 @@ var Registry *metrics.MilvusRegistry func init() { Registry = metrics.NewMilvusRegistry() - metrics.RegisterEtcdMetrics(Registry.GoRegistry) - metrics.RegisterMq(Registry.GoRegistry) + metrics.Register(Registry.GoRegistry) + metrics.RegisterMetaMetrics(Registry.GoRegistry) + metrics.RegisterMsgStreamMetrics(Registry.GoRegistry) + metrics.RegisterStorageMetrics(Registry.GoRegistry) } func stopRocksmq() { diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index a0774368f7..3bc05774f2 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -165,6 +166,7 @@ func (c *ChannelStore) Reload() error { c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel) log.Info("channel store reload channel", zap.Int64("nodeID", nodeID), zap.String("channel", channel.Name)) + metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(len(c.channelsInfo[nodeID].Channels))) } log.Info("channel store reload done", zap.Duration("duration", record.ElapseSpan())) return nil @@ -274,6 +276,7 @@ func (c *ChannelStore) update(opSet ChannelOpSet) error { default: return errUnknownOpType } + metrics.DataCoordDmlChannelNum.WithLabelValues(strconv.FormatInt(op.NodeID, 10)).Set(float64(len(c.channelsInfo[op.NodeID].Channels))) } return nil } diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 9e9018a5a2..c67c8ec207 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" ) // TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple @@ -250,6 +251,8 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu nodeID := c.plans[planID].dataNodeID c.releaseQueue(nodeID) + + metrics.DataCoordCompactedSegmentSize.WithLabelValues().Observe(float64(getCompactedSegmentSize(result))) return nil } diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 1ab51f2289..8b890ed367 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -541,6 +541,7 @@ func (m *meta) FinishTask(taskInfo *indexpb.IndexTaskInfo) error { log.Info("finish index task success", zap.Int64("buildID", taskInfo.BuildID), zap.String("state", taskInfo.GetState().String()), zap.String("fail reason", taskInfo.GetFailReason())) m.updateIndexTasksMetrics() + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.IndexFileLabel).Observe(float64(len(taskInfo.IndexFileKeys))) return nil } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 43d6ab3c6c..4f1d82cc7c 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -118,6 +118,24 @@ func (m *meta) reloadFromKV() error { metrics.DataCoordNumSegments.WithLabelValues(segment.State.String()).Inc() if segment.State == commonpb.SegmentState_Flushed { numStoredRows += segment.NumOfRows + + insertFileNum := 0 + for _, fieldBinlog := range segment.GetBinlogs() { + insertFileNum += len(fieldBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.InsertFileLabel).Observe(float64(insertFileNum)) + + statFileNum := 0 + for _, fieldBinlog := range segment.GetStatslogs() { + statFileNum += len(fieldBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.StatFileLabel).Observe(float64(statFileNum)) + + deleteFileNum := 0 + for _, filedBinlog := range segment.GetDeltalogs() { + deleteFileNum += len(filedBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.DeleteFileLabel).Observe(float64(deleteFileNum)) } } metrics.DataCoordNumStoredRows.WithLabelValues().Set(float64(numStoredRows)) @@ -149,6 +167,7 @@ func (m *meta) reloadFromKV() error { } for _, segIdx := range segmentIndexes { m.updateSegmentIndex(segIdx) + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.IndexFileLabel).Observe(float64(len(segIdx.IndexFileKeys))) } log.Info("DataCoord meta reloadFromKV done", zap.Duration("duration", record.ElapseSpan())) return nil diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 5bfc1d118b..7d1c31e0c6 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -253,11 +253,15 @@ func (s *Server) Register() error { } } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Inc() + log.Info("DataCoord Register Finished") + s.session.LivenessCheck(s.serverLoopCtx, func() { logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID)) if err := s.Stop(); err != nil { logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err)) } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Dec() // manually send signal to starter goroutine if s.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { @@ -898,6 +902,25 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { return err } s.buildIndexCh <- segmentID + + insertFileNum := 0 + for _, fieldBinlog := range segment.GetBinlogs() { + insertFileNum += len(fieldBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.InsertFileLabel).Observe(float64(insertFileNum)) + + statFileNum := 0 + for _, fieldBinlog := range segment.GetStatslogs() { + statFileNum += len(fieldBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.StatFileLabel).Observe(float64(statFileNum)) + + deleteFileNum := 0 + for _, filedBinlog := range segment.GetDeltalogs() { + deleteFileNum += len(filedBinlog.GetBinlogs()) + } + metrics.FlushedSegmentFileNum.WithLabelValues(metrics.DeleteFileLabel).Observe(float64(deleteFileNum)) + log.Info("flush segment complete", zap.Int64("id", segmentID)) return nil } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 35a45b7785..965e55bc0c 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -161,6 +162,32 @@ func getCollectionTTL(properties map[string]string) (time.Duration, error) { return Params.CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second), nil } +func getCompactedSegmentSize(s *datapb.CompactionResult) int64 { + var segmentSize int64 + + if s != nil { + for _, binlogs := range s.GetInsertLogs() { + for _, l := range binlogs.GetBinlogs() { + segmentSize += l.GetLogSize() + } + } + + for _, deltaLogs := range s.GetDeltalogs() { + for _, l := range deltaLogs.GetBinlogs() { + segmentSize += l.GetLogSize() + } + } + + for _, statsLogs := range s.GetDeltalogs() { + for _, l := range statsLogs.GetBinlogs() { + segmentSize += l.GetLogSize() + } + } + } + + return segmentSize +} + // getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled. // if not set, returns global auto compaction config. func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) { diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 14b311c259..75795bac03 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -851,7 +851,7 @@ func (c *ChannelMeta) setCurInsertBuffer(segmentID UniqueID, buf *BufferData) { seg, ok := c.segments[segmentID] if ok { - seg.curInsertBuf = buf + seg.setInsertBuffer(buf) return } log.Warn("cannot find segment when setCurInsertBuffer", zap.Int64("segmentID", segmentID)) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 53baab27fc..ec742dab1d 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -495,6 +495,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { return nil, errContext } + durInQueue := t.tr.RecordSpan() ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) defer cancelAll() @@ -755,6 +756,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(compactStart)))) metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(t.tr.ElapseSpan().Milliseconds())) + metrics.DataNodeCompactionLatencyInQueue.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(durInQueue.Milliseconds())) return pack, nil } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 6221e64974..b1b56f9ea5 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -41,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) var compactTestDir = "/tmp/milvus_test/compact" @@ -673,6 +674,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { cancel: cancel, done: make(chan struct{}, 1), Channel: &ChannelMeta{}, + tr: timerecord.NewTimeRecorder("test"), } plan := &datapb.CompactionPlan{ diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index c897bdb047..06e0a66e82 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -49,6 +49,7 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/logutil" @@ -62,12 +63,6 @@ const ( // RPCConnectionTimeout is used to set the timeout for rpc request RPCConnectionTimeout = 30 * time.Second - // MetricRequestsTotal is used to count the num of total requests - MetricRequestsTotal = "total" - - // MetricRequestsSuccess is used to count the num of successful requests - MetricRequestsSuccess = "success" - // ConnectEtcdMaxRetryTime is used to limit the max retry time for connection etcd ConnectEtcdMaxRetryTime = 100 @@ -193,12 +188,15 @@ func (node *DataNode) SetDataCoord(ds types.DataCoord) error { func (node *DataNode) Register() error { node.session.Register() + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataNodeRole).Inc() + log.Info("DataNode Register Finished") // Start liveness check node.session.LivenessCheck(node.ctx, func() { log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.GetSession().ServerID)) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataNodeRole).Dec() // manually send signal to starter goroutine if node.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 3c416a6d46..4fa16d4af0 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -519,7 +519,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TotalLabel).Inc() if task.auto { metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TotalLabel).Inc() - metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() + metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc() } } return segmentsToSync diff --git a/internal/datanode/segment.go b/internal/datanode/segment.go index 99810eec47..e5bddcc745 100644 --- a/internal/datanode/segment.go +++ b/internal/datanode/segment.go @@ -17,6 +17,8 @@ package datanode import ( + "fmt" + "strconv" "sync" "sync/atomic" @@ -29,6 +31,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) // Segment contains the latest segment infos from channel. @@ -176,11 +180,35 @@ func (s *Segment) isPKExist(pk primaryKey) bool { return false } +// setInsertBuffer set curInsertBuf. +func (s *Segment) setInsertBuffer(buf *BufferData) { + s.curInsertBuf = buf + + if buf != nil && buf.buffer != nil { + dataSize := 0 + for _, data := range buf.buffer.Data { + dataSize += data.GetMemorySize() + } + metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + strconv.FormatInt(s.collectionID, 10)).Add(float64(dataSize)) + } +} + // rollInsertBuffer moves curInsertBuf to historyInsertBuf, and then sets curInsertBuf to nil. func (s *Segment) rollInsertBuffer() { if s.curInsertBuf == nil { return } + + if s.curInsertBuf.buffer != nil { + dataSize := 0 + for _, data := range s.curInsertBuf.buffer.Data { + dataSize += data.GetMemorySize() + } + metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + strconv.FormatInt(s.collectionID, 10)).Sub(float64(dataSize)) + } + s.curInsertBuf.buffer = nil // free buffer memory, only keep meta infos in historyInsertBuf s.historyInsertBuf = append(s.historyInsertBuf, s.curInsertBuf) s.curInsertBuf = nil diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 798669ec2c..edd311ca51 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -91,7 +91,7 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*milvuspb.Compone func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { metrics.DataNodeFlushReqCounter.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), - MetricRequestsTotal).Inc() + metrics.TotalLabel).Inc() if !node.isHealthy() { err := merr.WrapErrServiceNotReady(node.GetStateCode().String()) @@ -158,7 +158,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen metrics.DataNodeFlushReqCounter.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), - MetricRequestsSuccess).Inc() + metrics.SuccessLabel).Inc() return merr.Status(nil), nil } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index fe61c4376b..961ac4d881 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -28,6 +28,7 @@ import "C" import ( "context" + "fmt" "math/rand" "os" "path" @@ -50,6 +51,7 @@ import ( "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/lifetime" @@ -122,12 +124,14 @@ func NewIndexNode(ctx context.Context, factory dependency.Factory) *IndexNode { func (i *IndexNode) Register() error { i.session.Register() + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.IndexNodeRole).Inc() //start liveness check i.session.LivenessCheck(i.loopCtx, func() { log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) if err := i.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.IndexNodeRole).Dec() // manually send signal to starter goroutine if i.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index 26541a4fa2..3f4560612d 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -73,6 +73,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest cancel: taskCancel, state: commonpb.IndexState_InProgress}); oldInfo != nil { log.Ctx(ctx).Warn("duplicated index build task", zap.String("ClusterID", req.ClusterID), zap.Int64("BuildID", req.BuildID)) + metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_BuildIndexError, Reason: "duplicated index build task", @@ -83,6 +84,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest log.Ctx(ctx).Error("create chunk manager failed", zap.String("Bucket", req.StorageConfig.BucketName), zap.String("AccessKey", req.StorageConfig.AccessKeyID), zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", req.BuildID)) + metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_BuildIndexError, Reason: "create chunk manager failed", @@ -112,6 +114,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc() return ret, nil } + metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc() log.Ctx(ctx).Info("IndexNode successfully scheduled", zap.Int64("IndexBuildID", req.BuildID), zap.String("ClusterID", req.ClusterID), zap.String("indexName", req.IndexName)) return ret, nil } diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 2c23529c96..a0a5e300c1 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -99,6 +99,7 @@ type indexBuildTask struct { newIndexParams map[string]string serializedSize uint64 tr *timerecord.TimeRecorder + queueDur time.Duration statistic indexpb.JobInfo node *IndexNode } @@ -139,6 +140,8 @@ func (it *indexBuildTask) GetState() commonpb.IndexState { // OnEnqueue enqueues indexing tasks. func (it *indexBuildTask) OnEnqueue(ctx context.Context) error { + it.queueDur = 0 + it.tr.RecordSpan() it.statistic.StartTime = time.Now().UnixMicro() it.statistic.PodID = it.node.GetNodeID() log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.BuildID), zap.Int64("segID", it.segmentID)) @@ -146,6 +149,7 @@ func (it *indexBuildTask) OnEnqueue(ctx context.Context) error { } func (it *indexBuildTask) Prepare(ctx context.Context) error { + it.queueDur = it.tr.RecordSpan() log.Ctx(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) typeParams := make(map[string]string) diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index eb5588cb3b..f849f542ab 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -19,6 +19,7 @@ package indexnode import ( "container/list" "context" + "fmt" "runtime/debug" "sync" @@ -28,6 +29,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) // TaskQueue is a queue used to store tasks. @@ -229,6 +232,10 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { } } t.SetState(commonpb.IndexState_Finished, "") + if indexBuildTask, ok := t.(*indexBuildTask); ok { + metrics.IndexNodeBuildIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(indexBuildTask.tr.ElapseSpan().Milliseconds())) + metrics.IndexNodeIndexTaskLatencyInQueue.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(indexBuildTask.queueDur.Milliseconds())) + } } func (sched *TaskScheduler) indexBuildLoop() { diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 61159f9c4c..506d72c9a0 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) const ( @@ -76,7 +77,7 @@ func (kv *etcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]by key := prefix for { - resp, err := kv.client.Get(ctx, key, opts...) + resp, err := kv.getEtcdMeta(ctx, key, opts...) if err != nil { return err } @@ -104,7 +105,7 @@ func (kv *etcdKV) LoadWithPrefix(key string) ([]string, []string, error) { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { return nil, nil, err @@ -125,7 +126,7 @@ func (kv *etcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { return nil, nil, err @@ -146,7 +147,7 @@ func (kv *etcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { return nil, nil, nil, err @@ -169,7 +170,7 @@ func (kv *etcdKV) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Get(ctx, key) + resp, err := kv.getEtcdMeta(ctx, key) if err != nil { return "", err } @@ -186,7 +187,7 @@ func (kv *etcdKV) LoadBytes(key string) ([]byte, error) { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Get(ctx, key) + resp, err := kv.getEtcdMeta(ctx, key) if err != nil { return []byte{}, err } @@ -207,7 +208,7 @@ func (kv *etcdKV) MultiLoad(keys []string) ([]string, error) { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { return []string{}, err } @@ -242,7 +243,7 @@ func (kv *etcdKV) MultiLoadBytes(keys []string) ([][]byte, error) { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { return [][]byte{}, err } @@ -273,7 +274,7 @@ func (kv *etcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { return nil, nil, 0, err @@ -295,7 +296,7 @@ func (kv *etcdKV) Save(key, value string) error { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() CheckValueSizeAndWarn(key, value) - _, err := kv.client.Put(ctx, key, value) + _, err := kv.putEtcdMeta(ctx, key, value) CheckElapseAndWarn(start, "Slow etcd operation save", zap.String("key", key)) return err } @@ -307,7 +308,7 @@ func (kv *etcdKV) SaveBytes(key string, value []byte) error { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() CheckValueSizeAndWarn(key, value) - _, err := kv.client.Put(ctx, key, string(value)) + _, err := kv.putEtcdMeta(ctx, key, string(value)) CheckElapseAndWarn(start, "Slow etcd operation save", zap.String("key", key)) return err } @@ -319,7 +320,7 @@ func (kv *etcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.Lease ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() CheckValueSizeAndWarn(key, value) - _, err := kv.client.Put(ctx, key, string(value), clientv3.WithLease(id)) + _, err := kv.putEtcdMeta(ctx, key, string(value), clientv3.WithLease(id)) CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key)) return err } @@ -338,7 +339,7 @@ func (kv *etcdKV) MultiSave(kvs map[string]string) error { defer cancel() CheckTnxStringValueSizeAndWarn(kvs) - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { log.Warn("Etcd MultiSave error", zap.Any("kvs", kvs), zap.Int("len", len(kvs)), zap.Error(err)) } @@ -360,7 +361,7 @@ func (kv *etcdKV) MultiSaveBytes(kvs map[string][]byte) error { defer cancel() CheckTnxBytesValueSizeAndWarn(kvs) - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { log.Warn("Etcd MultiSaveBytes err", zap.Any("kvs", kvs), zap.Int("len", len(kvs)), zap.Error(err)) } @@ -375,7 +376,7 @@ func (kv *etcdKV) RemoveWithPrefix(prefix string) error { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Delete(ctx, key, clientv3.WithPrefix()) + _, err := kv.removeEtcdMeta(ctx, key, clientv3.WithPrefix()) CheckElapseAndWarn(start, "Slow etcd operation remove with prefix", zap.String("prefix", prefix)) return err } @@ -387,7 +388,7 @@ func (kv *etcdKV) Remove(key string) error { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Delete(ctx, key) + _, err := kv.removeEtcdMeta(ctx, key) CheckElapseAndWarn(start, "Slow etcd operation remove", zap.String("key", key)) return err } @@ -403,7 +404,7 @@ func (kv *etcdKV) MultiRemove(keys []string) error { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { log.Warn("Etcd MultiRemove error", zap.Strings("keys", keys), zap.Int("len", len(keys)), zap.Error(err)) } @@ -428,7 +429,7 @@ func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { log.Warn("Etcd MultiSaveAndRemove error", zap.Any("saves", saves), @@ -458,7 +459,7 @@ func (kv *etcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []st ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { log.Warn("Etcd MultiSaveBytesAndRemove error", zap.Any("saves", saves), @@ -509,7 +510,7 @@ func (kv *etcdKV) MultiRemoveWithPrefix(keys []string) error { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { log.Warn("Etcd MultiRemoveWithPrefix error", zap.Strings("keys", keys), zap.Int("len", len(keys)), zap.Error(err)) } @@ -534,7 +535,7 @@ func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { log.Warn("Etcd MultiSaveAndRemoveWithPrefix error", zap.Any("saves", saves), @@ -564,7 +565,7 @@ func (kv *etcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, rem ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) if err != nil { log.Warn("Etcd MultiSaveBytesAndRemoveWithPrefix error", zap.Any("saves", saves), @@ -583,12 +584,9 @@ func (kv *etcdKV) CompareVersionAndSwap(key string, source int64, target string) start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Txn(ctx).If( - clientv3.Compare( - clientv3.Version(path.Join(kv.rootPath, key)), - "=", - source)). - Then(clientv3.OpPut(path.Join(kv.rootPath, key), target)).Commit() + resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx, + clientv3.Compare(clientv3.Version(path.Join(kv.rootPath, key)), "=", source)), + clientv3.OpPut(path.Join(kv.rootPath, key), target)) if err != nil { return false, err } @@ -602,12 +600,9 @@ func (kv *etcdKV) CompareVersionAndSwapBytes(key string, source int64, target [] start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - resp, err := kv.client.Txn(ctx).If( - clientv3.Compare( - clientv3.Version(path.Join(kv.rootPath, key)), - "=", - source)). - Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() + resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx, + clientv3.Compare(clientv3.Version(path.Join(kv.rootPath, key)), "=", source)), + clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)) if err != nil { return false, err } @@ -618,7 +613,6 @@ func (kv *etcdKV) CompareVersionAndSwapBytes(key string, source int64, target [] // CheckElapseAndWarn checks the elapsed time and warns if it is too long. func CheckElapseAndWarn(start time.Time, message string, fields ...zap.Field) bool { elapsed := time.Since(start) - metrics.EtcdRequestLatency.Observe(float64(elapsed)) if elapsed.Milliseconds() > 2000 { log.Warn(message, append([]zap.Field{zap.String("time spent", elapsed.String())}, fields...)...) return true @@ -628,7 +622,6 @@ func CheckElapseAndWarn(start time.Time, message string, fields ...zap.Field) bo func CheckValueSizeAndWarn(key string, value interface{}) bool { size := binary.Size(value) - metrics.EtcdPutKvSize.Observe(float64(size)) if size > 102400 { log.Warn("value size large than 100kb", zap.String("key", key), zap.Int("value_size(kb)", size/1024)) return true @@ -654,3 +647,105 @@ func CheckTnxStringValueSizeAndWarn(kvs map[string]string) bool { return CheckTnxBytesValueSizeAndWarn(newKvs) } + +func (kv *etcdKV) getEtcdMeta(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + ctx1, cancel := context.WithTimeout(ctx, RequestTimeout) + defer cancel() + + start := timerecord.NewTimeRecorder("getEtcdMeta") + resp, err := kv.client.Get(ctx1, key, opts...) + elapsed := start.ElapseSpan() + metrics.MetaOpCounter.WithLabelValues(metrics.MetaGetLabel, metrics.TotalLabel).Inc() + + // cal meta kv size + if err == nil && resp != nil { + totalSize := 0 + for _, v := range resp.Kvs { + totalSize += binary.Size(v) + } + metrics.MetaKvSize.WithLabelValues(metrics.MetaGetLabel).Observe(float64(totalSize)) + metrics.MetaRequestLatency.WithLabelValues(metrics.MetaGetLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MetaOpCounter.WithLabelValues(metrics.MetaGetLabel, metrics.SuccessLabel).Inc() + } else { + metrics.MetaOpCounter.WithLabelValues(metrics.MetaGetLabel, metrics.FailLabel).Inc() + } + return resp, err +} + +func (kv *etcdKV) putEtcdMeta(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + ctx1, cancel := context.WithTimeout(ctx, RequestTimeout) + defer cancel() + + start := timerecord.NewTimeRecorder("putEtcdMeta") + resp, err := kv.client.Put(ctx1, key, val, opts...) + elapsed := start.ElapseSpan() + metrics.MetaOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.TotalLabel).Inc() + if err == nil { + metrics.MetaKvSize.WithLabelValues(metrics.MetaPutLabel).Observe(float64(len(val))) + metrics.MetaRequestLatency.WithLabelValues(metrics.MetaPutLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MetaOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.SuccessLabel).Inc() + } else { + metrics.MetaOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.FailLabel).Inc() + } + + return resp, err +} + +func (kv *etcdKV) removeEtcdMeta(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + ctx1, cancel := context.WithTimeout(ctx, RequestTimeout) + defer cancel() + + start := timerecord.NewTimeRecorder("removeEtcdMeta") + resp, err := kv.client.Delete(ctx1, key, opts...) + elapsed := start.ElapseSpan() + metrics.MetaOpCounter.WithLabelValues(metrics.MetaRemoveLabel, metrics.TotalLabel).Inc() + + if err == nil { + metrics.MetaRequestLatency.WithLabelValues(metrics.MetaRemoveLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MetaOpCounter.WithLabelValues(metrics.MetaRemoveLabel, metrics.SuccessLabel).Inc() + } else { + metrics.MetaOpCounter.WithLabelValues(metrics.MetaRemoveLabel, metrics.FailLabel).Inc() + } + + return resp, err +} + +func (kv *etcdKV) getTxnWithCmp(ctx context.Context, cmp ...clientv3.Cmp) clientv3.Txn { + return kv.client.Txn(ctx).If(cmp...) +} + +func (kv *etcdKV) executeTxn(txn clientv3.Txn, ops ...clientv3.Op) (*clientv3.TxnResponse, error) { + start := timerecord.NewTimeRecorder("executeTxn") + + resp, err := txn.Then(ops...).Commit() + elapsed := start.ElapseSpan() + metrics.MetaOpCounter.WithLabelValues(metrics.MetaTxnLabel, metrics.TotalLabel).Inc() + + if err == nil && resp.Succeeded { + // cal put meta kv size + totalPutSize := 0 + for _, op := range ops { + if op.IsPut() { + totalPutSize += binary.Size(op.ValueBytes()) + } + } + metrics.MetaKvSize.WithLabelValues(metrics.MetaPutLabel).Observe(float64(totalPutSize)) + + // cal get meta kv size + totalGetSize := 0 + for _, rp := range resp.Responses { + if rp.GetResponseRange() != nil { + for _, v := range rp.GetResponseRange().Kvs { + totalGetSize += binary.Size(v) + } + } + } + metrics.MetaKvSize.WithLabelValues(metrics.MetaGetLabel).Observe(float64(totalGetSize)) + metrics.MetaRequestLatency.WithLabelValues(metrics.MetaTxnLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MetaOpCounter.WithLabelValues(metrics.MetaTxnLabel, metrics.SuccessLabel).Inc() + } else { + metrics.MetaOpCounter.WithLabelValues(metrics.MetaTxnLabel, metrics.FailLabel).Inc() + } + + return resp, err +} diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_client.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_client.go index 1a7e513133..117500ad7a 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_client.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_client.go @@ -25,7 +25,9 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) // nmqClient implements mqwrapper.Client. @@ -53,18 +55,30 @@ func NewClient(opts client.Options) (*rmqClient, error) { // CreateProducer creates a producer for rocksmq client func (rc *rmqClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) { + start := timerecord.NewTimeRecorder("create producer") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.TotalLabel).Inc() + rmqOpts := client.ProducerOptions{Topic: options.Topic} pp, err := rc.client.CreateProducer(rmqOpts) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() return nil, err } rp := rmqProducer{p: pp} + + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc() return &rp, nil } // Subscribe subscribes a consumer in rmq client func (rc *rmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) { + start := timerecord.NewTimeRecorder("create consumer") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.TotalLabel).Inc() + if options.BufSize == 0 { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() err := errors.New("subscription bufSize of rmq should never be zero") log.Warn("unexpected subscription consumer options", zap.Error(err)) return nil, err @@ -78,11 +92,15 @@ func (rc *rmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Con SubscriptionInitialPosition: options.SubscriptionInitialPosition, }) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, err } rConsumer := &Consumer{c: cli, closeCh: make(chan struct{})} + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateConsumerLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.SuccessLabel).Inc() return rConsumer, nil } diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_producer.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_producer.go index 9227456995..408fe3810f 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_producer.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_producer.go @@ -15,7 +15,9 @@ import ( "context" "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/client" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) var _ mqwrapper.Producer = (*rmqProducer)(nil) @@ -32,9 +34,20 @@ func (rp *rmqProducer) Topic() string { // Send send the producer messages to rocksmq func (rp *rmqProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { + start := timerecord.NewTimeRecorder("send msg to stream") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc() + pm := &client.ProducerMessage{Payload: message.Payload, Properties: message.Properties} id, err := rp.p.Send(pm) - return &rmqID{messageID: id}, err + if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() + return &rmqID{messageID: id}, err + } + + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc() + return &rmqID{messageID: id}, nil } // Close does nothing currently diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 140dfd732c..5e8f9b5ef2 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "math/rand" - "strconv" "sync" "time" @@ -230,8 +229,9 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, database, collectionNam collInfo, ok = db[collectionName] } + method := "GeCollectionID" if !ok || !collInfo.isCollectionCached() { - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GeCollectionID", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheMissLabel).Inc() tr := timerecord.NewTimeRecorder("UpdateCache") m.mu.RUnlock() coll, err := m.describeCollection(ctx, database, collectionName, 0) @@ -240,13 +240,14 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, database, collectionNam } m.mu.Lock() defer m.mu.Unlock() + m.updateCollection(coll, database, collectionName) - metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) collInfo = m.collInfo[database][collectionName] return collInfo.collID, nil } defer m.mu.RUnlock() - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetCollectionID", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc() return collInfo.collID, nil } @@ -264,8 +265,9 @@ func (m *MetaCache) GetDatabaseAndCollectionName(ctx context.Context, collection } } + method := "GeCollectionName" if collInfo == nil || !collInfo.isCollectionCached() { - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GeCollectionName", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheMissLabel).Inc() tr := timerecord.NewTimeRecorder("UpdateCache") m.mu.RUnlock() coll, err := m.describeCollection(ctx, "", "", collectionID) @@ -276,11 +278,11 @@ func (m *MetaCache) GetDatabaseAndCollectionName(ctx context.Context, collection defer m.mu.Unlock() m.updateCollection(coll, coll.GetDbName(), coll.Schema.Name) - metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return coll.GetDbName(), coll.Schema.Name, nil } defer m.mu.RUnlock() - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GeCollectionName", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc() return collInfo.database, collInfo.schema.Name, nil } @@ -298,9 +300,10 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, database, collectionN } m.mu.RUnlock() + method := "GetCollectionInfo" if !ok || !collInfo.isCollectionCached() { tr := timerecord.NewTimeRecorder("UpdateCache") - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheMissLabel).Inc() coll, err := m.describeCollection(ctx, database, collectionName, 0) if err != nil { return nil, err @@ -309,10 +312,10 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, database, collectionN m.updateCollection(coll, database, collectionName) collInfo = m.collInfo[database][collectionName] m.mu.Unlock() - metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) } - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc() return collInfo, nil } @@ -326,8 +329,9 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectio collInfo, ok = db[collectionName] } + method := "GetCollectionSchema" if !ok || !collInfo.isCollectionCached() { - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheMissLabel).Inc() tr := timerecord.NewTimeRecorder("UpdateCache") m.mu.RUnlock() coll, err := m.describeCollection(ctx, database, collectionName, 0) @@ -339,16 +343,17 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectio } m.mu.Lock() defer m.mu.Unlock() + m.updateCollection(coll, database, collectionName) collInfo = m.collInfo[database][collectionName] - metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Debug("Reload collection from root coordinator ", zap.String("collection name", collectionName), zap.Any("time (milliseconds) take ", tr.ElapseSpan().Milliseconds())) return collInfo.schema, nil } defer m.mu.RUnlock() - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc() return collInfo.schema, nil } @@ -383,6 +388,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, database, collectionName return nil, err } + method := "GetPartitions" m.mu.RLock() var collInfo *collectionInfo @@ -399,7 +405,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, database, collectionName if collInfo.partInfo == nil || len(collInfo.partInfo) == 0 { tr := timerecord.NewTimeRecorder("UpdateCache") - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetPartitions", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheMissLabel).Inc() m.mu.RUnlock() partitions, err := m.showPartitions(ctx, database, collectionName) @@ -414,7 +420,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, database, collectionName if err != nil { return nil, err } - metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.String("collectionName", collectionName)) ret := make(map[string]typeutil.UniqueID) partInfo := m.collInfo[database][collectionName].partInfo @@ -424,8 +430,9 @@ func (m *MetaCache) GetPartitions(ctx context.Context, database, collectionName return ret, nil } + defer m.mu.RUnlock() - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetPartitions", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc() ret := make(map[string]typeutil.UniqueID) partInfo := collInfo.partInfo @@ -459,9 +466,10 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, database, collectionNa partInfo, ok = collInfo.partInfo[partitionName] m.mu.RUnlock() + method := "GetPartitionInfo" if !ok { tr := timerecord.NewTimeRecorder("UpdateCache") - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheMissLabel).Inc() partitions, err := m.showPartitions(ctx, database, collectionName) if err != nil { return nil, err @@ -473,14 +481,14 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, database, collectionNa if err != nil { return nil, err } - metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.String("collectionName", collectionName)) partInfo, ok = m.collInfo[database][collectionName].partInfo[partitionName] if !ok { return nil, merr.WrapErrPartitionNotFound(partitionName) } } - metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc() return &partitionInfo{ partitionID: partInfo.partitionID, createdTimestamp: partInfo.createdTimestamp, @@ -692,6 +700,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col return nil, err } + method := "GetShards" if withCache { var shardLeaders *shardLeaders info.leaderMutex.RLock() @@ -699,10 +708,12 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col info.leaderMutex.RUnlock() if shardLeaders != nil && !shardLeaders.deprecated.Load() { + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc() iterator := shardLeaders.GetReader() return iterator.Shuffle(), nil } + metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheMissLabel).Inc() log.Info("no shard cache for collection, try to get shard leaders from QueryCoord", zap.String("collectionName", collectionName)) } @@ -718,6 +729,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col var resp *querypb.GetShardLeadersResponse childCtx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() + tr := timerecord.NewTimeRecorder("UpdateShardCache") err = retry.Do(childCtx, func() error { resp, err = m.queryCoord.GetShardLeaders(ctx, req) if err != nil { @@ -765,6 +777,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col // and create new client for new leaders _ = m.shardMgr.UpdateShardLeaders(oldLeaders, ret) + metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return ret, nil } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 504bbb7202..e11ddba589 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -137,11 +137,14 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) { // Register registers proxy at etcd func (node *Proxy) Register() error { node.session.Register() + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.ProxyRole).Inc() + log.Info("Proxy Register Finished") node.session.LivenessCheck(node.ctx, func() { log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.ProxyRole).Dec() if node.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { p.Signal(syscall.SIGINT) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8254920369..42ccc37d8d 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -139,12 +139,13 @@ func (s *Server) Register() error { return err } } - + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc() s.session.LivenessCheck(s.ctx, func() { log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID)) if err := s.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Dec() // manually send signal to starter goroutine if s.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index a4a3aa6ab1..90e1953d41 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -67,6 +68,7 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) { zap.Int64("collectionID", collectionID), zap.String("channel", channel), ) + tr := timerecord.NewTimeRecorder("add dmChannel") collection := m.dataManager.Collection.Get(collectionID) if collection == nil { return nil, segments.WrapCollectionNotFound(collectionID) @@ -90,6 +92,7 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) { m.channel2Pipeline[channel] = newPipeLine metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() + metrics.QueryNodeWatchDmlChannelLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds())) return newPipeLine, nil } diff --git a/internal/querynodev2/segments/search.go b/internal/querynodev2/segments/search.go index 6b27836296..c41eac5d29 100644 --- a/internal/querynodev2/segments/search.go +++ b/internal/querynodev2/segments/search.go @@ -71,8 +71,11 @@ func searchSegments(ctx context.Context, manager *Manager, segType SegmentType, errs[i] = err resultCh <- searchResult // update metrics + elapsed := tr.ElapseSpan().Milliseconds() metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), - metrics.SearchLabel, searchLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.SearchLabel, searchLabel).Observe(float64(elapsed)) + metrics.QueryNodeSegmentSearchLatencyPerVector.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), + metrics.SearchLabel, searchLabel).Observe(float64(elapsed) / float64(searchReq.getNumOfQuery())) }(segID, i) } wg.Wait() diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 98744f9120..adba41f1b9 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -58,6 +58,7 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/util/gc" "github.com/milvus-io/milvus/pkg/util/hardware" @@ -155,11 +156,13 @@ func (node *QueryNode) initSession() error { func (node *QueryNode) Register() error { node.session.Register() // start liveness check + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryNodeRole).Inc() node.session.LivenessCheck(node.ctx, func() { log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", paramtable.GetNodeID())) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryNodeRole).Dec() // manually send signal to starter goroutine if node.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { diff --git a/internal/rootcoord/alter_alias_task_test.go b/internal/rootcoord/alter_alias_task_test.go index e98772ae14..c4c714c072 100644 --- a/internal/rootcoord/alter_alias_task_test.go +++ b/internal/rootcoord/alter_alias_task_test.go @@ -45,7 +45,7 @@ func Test_alterAliasTask_Execute(t *testing.T) { t.Run("failed to expire cache", func(t *testing.T) { core := newTestCore(withInvalidProxyManager()) task := &alterAliasTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterAliasRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterAlias}, Alias: "test", @@ -58,7 +58,7 @@ func Test_alterAliasTask_Execute(t *testing.T) { t.Run("failed to alter alias", func(t *testing.T) { core := newTestCore(withValidProxyManager(), withInvalidMeta()) task := &alterAliasTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterAliasRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterAlias}, Alias: "test", diff --git a/internal/rootcoord/alter_collection_task_test.go b/internal/rootcoord/alter_collection_task_test.go index f245d83564..4dd9d97a24 100644 --- a/internal/rootcoord/alter_collection_task_test.go +++ b/internal/rootcoord/alter_collection_task_test.go @@ -67,7 +67,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { t.Run("failed to create alias", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &alterCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "cn", @@ -95,7 +95,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { core := newTestCore(withMeta(meta)) task := &alterCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "cn", @@ -129,7 +129,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker)) task := &alterCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "cn", @@ -163,7 +163,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) { core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker)) task := &alterCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.AlterCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, CollectionName: "cn", diff --git a/internal/rootcoord/create_alias_task_test.go b/internal/rootcoord/create_alias_task_test.go index 0810841795..7bea4d775d 100644 --- a/internal/rootcoord/create_alias_task_test.go +++ b/internal/rootcoord/create_alias_task_test.go @@ -44,7 +44,7 @@ func Test_createAliasTask_Execute(t *testing.T) { t.Run("failed to expire cache", func(t *testing.T) { core := newTestCore(withInvalidProxyManager()) task := &createAliasTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreateAliasRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias}, Alias: "test", @@ -57,7 +57,7 @@ func Test_createAliasTask_Execute(t *testing.T) { t.Run("failed to create alias", func(t *testing.T) { core := newTestCore(withInvalidMeta(), withValidProxyManager()) task := &createAliasTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreateAliasRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias}, Alias: "test", diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go index e9a9ecc3e1..808ffbecec 100644 --- a/internal/rootcoord/create_collection_task_test.go +++ b/internal/rootcoord/create_collection_task_test.go @@ -524,7 +524,7 @@ func Test_createCollectionTask_Prepare(t *testing.T) { core := newTestCore(withInvalidIDAllocator(), withMeta(meta)) task := createCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreateCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, CollectionName: collectionName, @@ -558,7 +558,7 @@ func Test_createCollectionTask_Prepare(t *testing.T) { assert.NoError(t, err) task := createCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreateCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, CollectionName: collectionName, @@ -595,7 +595,7 @@ func Test_createCollectionTask_Execute(t *testing.T) { core := newTestCore(withMeta(meta), withTtSynchronizer(ticker)) task := &createCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreateCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, CollectionName: collectionName, @@ -642,7 +642,7 @@ func Test_createCollectionTask_Execute(t *testing.T) { core := newTestCore(withMeta(meta), withTtSynchronizer(ticker)) task := &createCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreateCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, CollectionName: collectionName, @@ -662,7 +662,7 @@ func Test_createCollectionTask_Execute(t *testing.T) { pchans := ticker.getDmlChannelNames(shardNum) core := newTestCore(withTtSynchronizer(ticker)) task := &createCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), channels: collectionChannels{ physicalChannels: pchans, virtualChannels: []string{funcutil.GenRandomStr(), funcutil.GenRandomStr()}, @@ -734,7 +734,7 @@ func Test_createCollectionTask_Execute(t *testing.T) { assert.NoError(t, err) task := createCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreateCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, DbName: "mock-db", @@ -829,7 +829,7 @@ func Test_createCollectionTask_Execute(t *testing.T) { assert.NoError(t, err) task := createCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreateCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, CollectionName: collectionName, diff --git a/internal/rootcoord/create_partition_task_test.go b/internal/rootcoord/create_partition_task_test.go index cbca526c1e..8d4d315d86 100644 --- a/internal/rootcoord/create_partition_task_test.go +++ b/internal/rootcoord/create_partition_task_test.go @@ -43,7 +43,7 @@ func Test_createPartitionTask_Prepare(t *testing.T) { t.Run("failed to get collection meta", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &createPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreatePartitionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreatePartition}}, } err := task.Prepare(context.Background()) @@ -64,7 +64,7 @@ func Test_createPartitionTask_Prepare(t *testing.T) { core := newTestCore(withMeta(meta)) task := &createPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.CreatePartitionRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreatePartition}}, } err := task.Prepare(context.Background()) @@ -109,7 +109,7 @@ func Test_createPartitionTask_Execute(t *testing.T) { coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{}} core := newTestCore(withInvalidIDAllocator()) task := &createPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), collMeta: coll, Req: &milvuspb.CreatePartitionRequest{CollectionName: collectionName, PartitionName: partitionName}, } @@ -123,7 +123,7 @@ func Test_createPartitionTask_Execute(t *testing.T) { coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{}} core := newTestCore(withValidIDAllocator(), withInvalidProxyManager()) task := &createPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), collMeta: coll, Req: &milvuspb.CreatePartitionRequest{CollectionName: collectionName, PartitionName: partitionName}, } @@ -137,7 +137,7 @@ func Test_createPartitionTask_Execute(t *testing.T) { coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{}} core := newTestCore(withValidIDAllocator(), withValidProxyManager(), withInvalidMeta()) task := &createPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), collMeta: coll, Req: &milvuspb.CreatePartitionRequest{CollectionName: collectionName, PartitionName: partitionName}, } @@ -162,7 +162,7 @@ func Test_createPartitionTask_Execute(t *testing.T) { } core := newTestCore(withValidIDAllocator(), withValidProxyManager(), withMeta(meta), withBroker(b)) task := &createPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), collMeta: coll, Req: &milvuspb.CreatePartitionRequest{CollectionName: collectionName, PartitionName: partitionName}, } diff --git a/internal/rootcoord/describe_collection_task_test.go b/internal/rootcoord/describe_collection_task_test.go index 4269db918b..6359199722 100644 --- a/internal/rootcoord/describe_collection_task_test.go +++ b/internal/rootcoord/describe_collection_task_test.go @@ -60,10 +60,7 @@ func Test_describeCollectionTask_Execute(t *testing.T) { t.Run("failed to get collection by name", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &describeCollectionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_DescribeCollection, @@ -79,10 +76,7 @@ func Test_describeCollectionTask_Execute(t *testing.T) { t.Run("failed to get collection by id", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &describeCollectionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_DescribeCollection, @@ -114,10 +108,7 @@ func Test_describeCollectionTask_Execute(t *testing.T) { core := newTestCore(withMeta(meta)) task := &describeCollectionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_DescribeCollection, diff --git a/internal/rootcoord/drop_alias_task_test.go b/internal/rootcoord/drop_alias_task_test.go index 03a06c7c42..f8796e5bc0 100644 --- a/internal/rootcoord/drop_alias_task_test.go +++ b/internal/rootcoord/drop_alias_task_test.go @@ -52,7 +52,7 @@ func Test_dropAliasTask_Execute(t *testing.T) { core := newTestCore(withInvalidProxyManager()) alias := funcutil.GenRandomStr() task := &dropAliasTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropAliasRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias}, @@ -67,7 +67,7 @@ func Test_dropAliasTask_Execute(t *testing.T) { core := newTestCore(withValidProxyManager(), withInvalidMeta()) alias := funcutil.GenRandomStr() task := &dropAliasTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropAliasRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias}, @@ -90,7 +90,7 @@ func Test_dropAliasTask_Execute(t *testing.T) { core := newTestCore(withValidProxyManager(), withMeta(meta)) alias := funcutil.GenRandomStr() task := &dropAliasTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropAliasRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias}, diff --git a/internal/rootcoord/drop_collection_task_test.go b/internal/rootcoord/drop_collection_task_test.go index 64b2d2604e..1e5162c4c2 100644 --- a/internal/rootcoord/drop_collection_task_test.go +++ b/internal/rootcoord/drop_collection_task_test.go @@ -55,7 +55,7 @@ func Test_dropCollectionTask_Prepare(t *testing.T) { core := newTestCore(withMeta(meta)) task := &dropCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}, CollectionName: collectionName, @@ -76,7 +76,7 @@ func Test_dropCollectionTask_Prepare(t *testing.T) { core := newTestCore(withMeta(meta)) task := &dropCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}, CollectionName: collectionName, @@ -104,7 +104,7 @@ func Test_dropCollectionTask_Execute(t *testing.T) { }) core := newTestCore(withMeta(meta)) task := &dropCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}, CollectionName: collectionName, @@ -134,7 +134,7 @@ func Test_dropCollectionTask_Execute(t *testing.T) { core := newTestCore(withInvalidProxyManager(), withMeta(meta)) task := &dropCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}, CollectionName: collectionName, @@ -167,7 +167,7 @@ func Test_dropCollectionTask_Execute(t *testing.T) { core := newTestCore(withValidProxyManager(), withMeta(meta)) task := &dropCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}, CollectionName: collectionName, @@ -257,7 +257,7 @@ func Test_dropCollectionTask_Execute(t *testing.T) { withTtSynchronizer(ticker)) task := &dropCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropCollectionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}, CollectionName: collectionName, diff --git a/internal/rootcoord/drop_partition_task_test.go b/internal/rootcoord/drop_partition_task_test.go index 597a671ac6..b25074b764 100644 --- a/internal/rootcoord/drop_partition_task_test.go +++ b/internal/rootcoord/drop_partition_task_test.go @@ -56,7 +56,7 @@ func Test_dropPartitionTask_Prepare(t *testing.T) { t.Run("failed to get collection meta", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &dropPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropPartitionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition}, }, @@ -81,7 +81,7 @@ func Test_dropPartitionTask_Prepare(t *testing.T) { core := newTestCore(withMeta(meta)) task := &dropPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropPartitionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition}, CollectionName: collectionName, @@ -116,7 +116,7 @@ func Test_dropPartitionTask_Execute(t *testing.T) { coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{{PartitionName: partitionName}}} core := newTestCore(withInvalidProxyManager()) task := &dropPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropPartitionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition}, CollectionName: collectionName, @@ -134,7 +134,7 @@ func Test_dropPartitionTask_Execute(t *testing.T) { coll := &model.Collection{Name: collectionName, Partitions: []*model.Partition{{PartitionName: partitionName}}} core := newTestCore(withValidProxyManager(), withInvalidMeta()) task := &dropPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropPartitionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition}, CollectionName: collectionName, @@ -204,7 +204,7 @@ func Test_dropPartitionTask_Execute(t *testing.T) { withBroker(broker)) task := &dropPartitionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.DropPartitionRequest{ Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropPartition}, CollectionName: collectionName, diff --git a/internal/rootcoord/has_collection_task_test.go b/internal/rootcoord/has_collection_task_test.go index e4f48fba08..c17cc0db43 100644 --- a/internal/rootcoord/has_collection_task_test.go +++ b/internal/rootcoord/has_collection_task_test.go @@ -59,10 +59,7 @@ func Test_hasCollectionTask_Execute(t *testing.T) { t.Run("failed", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &hasCollectionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.HasCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_HasCollection, @@ -87,10 +84,7 @@ func Test_hasCollectionTask_Execute(t *testing.T) { core := newTestCore(withMeta(meta)) task := &hasCollectionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.HasCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_HasCollection, diff --git a/internal/rootcoord/has_partition_task_test.go b/internal/rootcoord/has_partition_task_test.go index a057074b26..e6049224ba 100644 --- a/internal/rootcoord/has_partition_task_test.go +++ b/internal/rootcoord/has_partition_task_test.go @@ -59,10 +59,7 @@ func Test_hasPartitionTask_Execute(t *testing.T) { t.Run("fail to get collection", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &hasPartitionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.HasPartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_HasPartition, @@ -94,10 +91,7 @@ func Test_hasPartitionTask_Execute(t *testing.T) { core := newTestCore(withMeta(meta)) task := &hasPartitionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.HasPartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_HasCollection, @@ -133,10 +127,7 @@ func Test_hasPartitionTask_Execute(t *testing.T) { core := newTestCore(withMeta(meta)) task := &hasPartitionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.HasPartitionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_HasCollection, diff --git a/internal/rootcoord/rename_collection_task_test.go b/internal/rootcoord/rename_collection_task_test.go index 8a0f47f8c9..caac2a03a2 100644 --- a/internal/rootcoord/rename_collection_task_test.go +++ b/internal/rootcoord/rename_collection_task_test.go @@ -59,7 +59,7 @@ func Test_renameCollectionTask_Execute(t *testing.T) { t.Run("failed to expire cache", func(t *testing.T) { core := newTestCore(withInvalidProxyManager()) task := &renameCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.RenameCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RenameCollection, @@ -78,7 +78,7 @@ func Test_renameCollectionTask_Execute(t *testing.T) { core := newTestCore(withValidProxyManager(), withMeta(meta)) task := &renameCollectionTask{ - baseTask: baseTask{core: core}, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.RenameCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RenameCollection, diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 1df5e7ca00..00aa4c8f38 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -280,12 +280,14 @@ func (c *Core) Register() error { return err } } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Inc() log.Info("RootCoord Register Finished") c.session.LivenessCheck(c.ctx, func() { log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) if err := c.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Dec() // manually send signal to starter goroutine if c.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { @@ -900,12 +902,8 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti zap.String("role", typeutil.RootCoordRole)) t := &createCollectionTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: in, + baseTask: newBaseTask(ctx, c), + Req: in, } if err := c.scheduler.AddTask(t); err != nil { @@ -932,6 +930,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfCollections.Inc() + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateCollection").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to create collection", zap.String("role", typeutil.RootCoordRole), @@ -955,12 +954,8 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe zap.String("name", in.GetCollectionName())) t := &dropCollectionTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: in, + baseTask: newBaseTask(ctx, c), + Req: in, } if err := c.scheduler.AddTask(t); err != nil { @@ -985,6 +980,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfCollections.Dec() + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropCollection").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to drop collection", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), @@ -1033,6 +1029,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasCollection").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to has collection", zap.Bool("exist", t.Rsp.GetValue())) @@ -1127,6 +1124,7 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DescribeCollection").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to describe collection", zap.Int64("collection_id", t.Rsp.GetCollectionID())) @@ -1189,6 +1187,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowCollections").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to show collections", zap.Int("num of collections", len(t.Rsp.GetCollectionNames()))) // maybe very large, print number instead. @@ -1208,12 +1207,8 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection zap.String("name", in.GetCollectionName())) t := &alterCollectionTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: in, + baseTask: newBaseTask(ctx, c), + Req: in, } if err := c.scheduler.AddTask(t); err != nil { @@ -1240,6 +1235,7 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfCollections.Dec() + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterCollection").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to alter collection", zap.String("role", typeutil.RootCoordRole), @@ -1263,12 +1259,8 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition zap.String("partition", in.GetPartitionName())) t := &createPartitionTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: in, + baseTask: newBaseTask(ctx, c), + Req: in, } if err := c.scheduler.AddTask(t); err != nil { @@ -1296,6 +1288,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreatePartition").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to create partition", zap.String("role", typeutil.RootCoordRole), @@ -1320,12 +1313,8 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ zap.String("partition", in.GetPartitionName())) t := &dropPartitionTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: in, + baseTask: newBaseTask(ctx, c), + Req: in, } if err := c.scheduler.AddTask(t); err != nil { @@ -1352,6 +1341,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropPartition").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to drop partition", zap.String("role", typeutil.RootCoordRole), @@ -1404,6 +1394,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasPartition").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to has partition", zap.Bool("exist", t.Rsp.GetValue())) @@ -1454,6 +1445,7 @@ func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitio metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowPartitions").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to show partitions", zap.Strings("partitions", t.Rsp.GetPartitionNames())) @@ -1656,12 +1648,8 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) zap.String("collection", in.GetCollectionName())) t := &createAliasTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: in, + baseTask: newBaseTask(ctx, c), + Req: in, } if err := c.scheduler.AddTask(t); err != nil { @@ -1689,6 +1677,7 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateAlias").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to create alias", zap.String("role", typeutil.RootCoordRole), @@ -1712,12 +1701,8 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c zap.String("alias", in.GetAlias())) t := &dropAliasTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: in, + baseTask: newBaseTask(ctx, c), + Req: in, } if err := c.scheduler.AddTask(t); err != nil { @@ -1743,6 +1728,7 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropAlias").Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to drop alias", zap.String("role", typeutil.RootCoordRole), @@ -1766,12 +1752,8 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) ( zap.String("collection", in.GetCollectionName())) t := &alterAliasTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: in, + baseTask: newBaseTask(ctx, c), + Req: in, } if err := c.scheduler.AddTask(t); err != nil { @@ -1799,6 +1781,7 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) ( metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterAlias").Observe(float64(t.queueDur.Milliseconds())) log.Info("done to alter alias", zap.String("role", typeutil.RootCoordRole), @@ -2713,12 +2696,8 @@ func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollect metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("RenameCollection") t := &renameCollectionTask{ - baseTask: baseTask{ - ctx: ctx, - core: c, - done: make(chan error, 1), - }, - Req: req, + baseTask: newBaseTask(ctx, c), + Req: req, } if err := c.scheduler.AddTask(t); err != nil { diff --git a/internal/rootcoord/scheduler.go b/internal/rootcoord/scheduler.go index 9372b52c83..1b3f8ab139 100644 --- a/internal/rootcoord/scheduler.go +++ b/internal/rootcoord/scheduler.go @@ -80,6 +80,7 @@ func (s *scheduler) Stop() { func (s *scheduler) execute(task task) { defer s.setMinDdlTs(task.GetTs()) // we should update ts, whatever task succeeds or not. + task.SetInQueueDuration() if err := task.Prepare(task.GetCtx()); err != nil { task.NotifyDone(err) return diff --git a/internal/rootcoord/scheduler_test.go b/internal/rootcoord/scheduler_test.go index 384004aa86..40de36978c 100644 --- a/internal/rootcoord/scheduler_test.go +++ b/internal/rootcoord/scheduler_test.go @@ -38,10 +38,7 @@ type mockFailTask struct { func newMockFailTask() *mockFailTask { task := &mockFailTask{ - baseTask: baseTask{ - ctx: context.Background(), - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), nil), } task.SetCtx(context.Background()) return task @@ -73,10 +70,7 @@ type mockNormalTask struct { func newMockNormalTask() *mockNormalTask { task := &mockNormalTask{ - baseTask: baseTask{ - ctx: context.Background(), - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), nil), } task.SetCtx(context.Background()) return task diff --git a/internal/rootcoord/show_collection_task_test.go b/internal/rootcoord/show_collection_task_test.go index 31f2230fe7..9cf72af2f9 100644 --- a/internal/rootcoord/show_collection_task_test.go +++ b/internal/rootcoord/show_collection_task_test.go @@ -56,10 +56,7 @@ func Test_showCollectionTask_Execute(t *testing.T) { t.Run("failed to list collections", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &showCollectionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.ShowCollectionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowCollections, @@ -85,10 +82,7 @@ func Test_showCollectionTask_Execute(t *testing.T) { } core := newTestCore(withMeta(meta)) task := &showCollectionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.ShowCollectionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowCollections, diff --git a/internal/rootcoord/show_partition_task_test.go b/internal/rootcoord/show_partition_task_test.go index 576aeb5f2d..de606dd8a8 100644 --- a/internal/rootcoord/show_partition_task_test.go +++ b/internal/rootcoord/show_partition_task_test.go @@ -57,10 +57,7 @@ func Test_showPartitionTask_Execute(t *testing.T) { t.Run("failed to list collections by name", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &showPartitionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, @@ -77,10 +74,7 @@ func Test_showPartitionTask_Execute(t *testing.T) { t.Run("failed to list collections by id", func(t *testing.T) { core := newTestCore(withInvalidMeta()) task := &showPartitionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, @@ -114,10 +108,7 @@ func Test_showPartitionTask_Execute(t *testing.T) { } core := newTestCore(withMeta(meta)) task := &showPartitionTask{ - baseTask: baseTask{ - core: core, - done: make(chan error, 1), - }, + baseTask: newBaseTask(context.Background(), core), Req: &milvuspb.ShowPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ShowPartitions, diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index d2bee5e7c8..bbabf12bb6 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -18,6 +18,9 @@ package rootcoord import ( "context" + "time" + + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type task interface { @@ -31,6 +34,7 @@ type task interface { Execute(ctx context.Context) error WaitToFinish() error NotifyDone(err error) + SetInQueueDuration() } type baseTask struct { @@ -39,12 +43,16 @@ type baseTask struct { done chan error ts Timestamp id UniqueID + + tr *timerecord.TimeRecorder + queueDur time.Duration } func newBaseTask(ctx context.Context, core *Core) baseTask { b := baseTask{ core: core, done: make(chan error, 1), + tr: timerecord.NewTimeRecorderWithTrace(ctx, "new task"), } b.SetCtx(ctx) return b @@ -89,3 +97,7 @@ func (b *baseTask) WaitToFinish() error { func (b *baseTask) NotifyDone(err error) { b.done <- err } + +func (b *baseTask) SetInQueueDuration() { + b.queueDur = b.tr.ElapseSpan() +} diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 81dd24b7de..95c3b6231d 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -29,8 +29,10 @@ import ( "github.com/milvus-io/milvus/internal/storage/aliyun" "github.com/milvus-io/milvus/internal/storage/gcp" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/timerecord" minio "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" @@ -180,7 +182,7 @@ func (mcm *MinioChunkManager) Path(ctx context.Context, filePath string) (string // Reader returns the path of minio data if exists. func (mcm *MinioChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error) { - reader, err := mcm.Client.GetObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) + reader, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) if err != nil { log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err @@ -189,7 +191,7 @@ func (mcm *MinioChunkManager) Reader(ctx context.Context, filePath string) (File } func (mcm *MinioChunkManager) Size(ctx context.Context, filePath string) (int64, error) { - objectInfo, err := mcm.Client.StatObject(ctx, mcm.bucketName, filePath, minio.StatObjectOptions{}) + objectInfo, err := mcm.statMinioObject(ctx, mcm.bucketName, filePath, minio.StatObjectOptions{}) if err != nil { log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return 0, err @@ -200,13 +202,14 @@ func (mcm *MinioChunkManager) Size(ctx context.Context, filePath string) (int64, // Write writes the data to minio storage. func (mcm *MinioChunkManager) Write(ctx context.Context, filePath string, content []byte) error { - _, err := mcm.Client.PutObject(ctx, mcm.bucketName, filePath, bytes.NewReader(content), int64(len(content)), minio.PutObjectOptions{}) + _, err := mcm.putMinioObject(ctx, mcm.bucketName, filePath, bytes.NewReader(content), int64(len(content)), minio.PutObjectOptions{}) if err != nil { log.Warn("failed to put object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return err } + metrics.PersistentDataKvSize.WithLabelValues(metrics.DataPutLabel).Observe(float64(len(content))) return nil } @@ -225,7 +228,7 @@ func (mcm *MinioChunkManager) MultiWrite(ctx context.Context, kvs map[string][]b // Exist checks whether chunk is saved to minio storage. func (mcm *MinioChunkManager) Exist(ctx context.Context, filePath string) (bool, error) { - _, err := mcm.Client.StatObject(ctx, mcm.bucketName, filePath, minio.StatObjectOptions{}) + _, err := mcm.statMinioObject(ctx, mcm.bucketName, filePath, minio.StatObjectOptions{}) if err != nil { errResponse := minio.ToErrorResponse(err) if errResponse.Code == "NoSuchKey" { @@ -239,7 +242,7 @@ func (mcm *MinioChunkManager) Exist(ctx context.Context, filePath string) (bool, // Read reads the minio storage data if exists. func (mcm *MinioChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) { - object, err := mcm.Client.GetObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) + object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) if err != nil { log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err @@ -277,6 +280,7 @@ func (mcm *MinioChunkManager) Read(ctx context.Context, filePath string) ([]byte log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err } + metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(objectInfo.Size)) return data, nil } @@ -324,7 +328,7 @@ func (mcm *MinioChunkManager) ReadAt(ctx context.Context, filePath string, off i return nil, err } - object, err := mcm.Client.GetObject(ctx, mcm.bucketName, filePath, opts) + object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, opts) if err != nil { log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err @@ -340,12 +344,13 @@ func (mcm *MinioChunkManager) ReadAt(ctx context.Context, filePath string, off i log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return nil, err } + metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(length)) return data, nil } // Remove deletes an object with @key. func (mcm *MinioChunkManager) Remove(ctx context.Context, filePath string) error { - err := mcm.Client.RemoveObject(ctx, mcm.bucketName, filePath, minio.RemoveObjectOptions{}) + err := mcm.removeMinioObject(ctx, mcm.bucketName, filePath, minio.RemoveObjectOptions{}) if err != nil { log.Warn("failed to remove object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return err @@ -367,7 +372,7 @@ func (mcm *MinioChunkManager) MultiRemove(ctx context.Context, keys []string) er // RemoveWithPrefix removes all objects with the same prefix @prefix from minio. func (mcm *MinioChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error { - objects := mcm.Client.ListObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) + objects := mcm.listMinioObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) i := 0 maxGoroutine := 10 removeKeys := make([]string, 0, len(objects)) @@ -382,7 +387,7 @@ func (mcm *MinioChunkManager) RemoveWithPrefix(ctx context.Context, prefix strin for j := 0; j < maxGoroutine && i < len(removeKeys); j++ { key := removeKeys[i] runningGroup.Go(func() error { - err := mcm.Client.RemoveObject(groupCtx, mcm.bucketName, key, minio.RemoveObjectOptions{}) + err := mcm.removeMinioObject(groupCtx, mcm.bucketName, key, minio.RemoveObjectOptions{}) if err != nil { log.Warn("failed to remove object", zap.String("path", key), zap.Error(err)) return err @@ -422,7 +427,7 @@ func (mcm *MinioChunkManager) ListWithPrefix(ctx context.Context, prefix string, // TODO add concurrent call if performance matters // only return current level per call - objects := mcm.Client.ListObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: pre, Recursive: false}) + objects := mcm.listMinioObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: pre, Recursive: false}) for object := range objects { if object.Err != nil { @@ -463,3 +468,79 @@ func Read(r io.Reader, size int64) ([]byte, error) { } } } + +func (mcm *MinioChunkManager) getMinioObject(ctx context.Context, bucketName, objectName string, + opts minio.GetObjectOptions) (*minio.Object, error) { + start := timerecord.NewTimeRecorder("getMinioObject") + + reader, err := mcm.Client.GetObject(ctx, bucketName, objectName, opts) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.TotalLabel).Inc() + if err == nil && reader != nil { + metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(start.ElapseSpan().Milliseconds())) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.SuccessLabel).Inc() + } else { + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataGetLabel, metrics.FailLabel).Inc() + } + + return reader, err +} + +func (mcm *MinioChunkManager) putMinioObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64, + opts minio.PutObjectOptions) (minio.UploadInfo, error) { + start := timerecord.NewTimeRecorder("putMinioObject") + + info, err := mcm.Client.PutObject(ctx, bucketName, objectName, reader, objectSize, opts) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataPutLabel, metrics.TotalLabel).Inc() + if err == nil { + metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataPutLabel).Observe(float64(start.ElapseSpan().Milliseconds())) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.SuccessLabel).Inc() + } else { + metrics.PersistentDataOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.FailLabel).Inc() + } + + return info, err +} + +func (mcm *MinioChunkManager) statMinioObject(ctx context.Context, bucketName, objectName string, + opts minio.StatObjectOptions) (minio.ObjectInfo, error) { + start := timerecord.NewTimeRecorder("statMinioObject") + + info, err := mcm.Client.StatObject(ctx, bucketName, objectName, opts) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.TotalLabel).Inc() + if err == nil { + metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataStatLabel).Observe(float64(start.ElapseSpan().Milliseconds())) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.SuccessLabel).Inc() + } else { + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataStatLabel, metrics.FailLabel).Inc() + } + + return info, err +} + +func (mcm *MinioChunkManager) listMinioObjects(ctx context.Context, bucketName string, + opts minio.ListObjectsOptions) <-chan minio.ObjectInfo { + start := timerecord.NewTimeRecorder("listMinioObjects") + + res := mcm.Client.ListObjects(ctx, bucketName, opts) + metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataListLabel).Observe(float64(start.ElapseSpan().Milliseconds())) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.TotalLabel).Inc() + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataListLabel, metrics.SuccessLabel).Inc() + + return res +} + +func (mcm *MinioChunkManager) removeMinioObject(ctx context.Context, bucketName, objectName string, + opts minio.RemoveObjectOptions) error { + start := timerecord.NewTimeRecorder("removeMinioObject") + + err := mcm.Client.RemoveObject(ctx, bucketName, objectName, opts) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.TotalLabel).Inc() + if err == nil { + metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataRemoveLabel).Observe(float64(start.ElapseSpan().Milliseconds())) + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.SuccessLabel).Inc() + } else { + metrics.PersistentDataOpCounter.WithLabelValues(metrics.DataRemoveLabel, metrics.FailLabel).Inc() + } + + return err +} diff --git a/internal/util/flowgraph/message_test.go b/internal/util/flowgraph/message_test.go index 3ff8659e6f..adb35ac1a5 100644 --- a/internal/util/flowgraph/message_test.go +++ b/internal/util/flowgraph/message_test.go @@ -80,6 +80,10 @@ func (bm *MockMsg) SetPosition(position *MsgPosition) { } +func (bm *MockMsg) Size() int { + return 0 +} + func Test_GenerateMsgStreamMsg(t *testing.T) { messages := make([]msgstream.TsMsg, 1) messages[0] = &MockMsg{ diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index 4496048c46..d6e0f43274 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -32,6 +32,12 @@ const ( CompactOutputLabel = "output" compactIOLabelName = "IO" compactTypeLabelName = "compactType" + + InsertFileLabel = "insert_file" + DeleteFileLabel = "delete_file" + StatFileLabel = "stat_file" + IndexFileLabel = "index_file" + segmentFileTypeLabelName = "segment_file_type" ) var ( @@ -112,6 +118,33 @@ var ( segmentIDLabelName, }) + DataCoordDmlChannelNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "watched_dml_chanel_num", + Help: "the num of dml channel watched by datanode", + }, []string{ + nodeIDLabelName, + }) + + DataCoordCompactedSegmentSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "compacted_segment_size", + Help: "the segment size of compacted segment", + Buckets: buckets, + }, []string{}) + + FlushedSegmentFileNum = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Name: "flushed_segment_file_num", + Help: "the num of files for flushed segment", + Buckets: buckets, + }, []string{segmentFileTypeLabelName}) + /* hard to implement, commented now DataCoordSegmentSizeRatio = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -195,6 +228,9 @@ func RegisterDataCoord(registry *prometheus.Registry) { registry.MustRegister(DataCoordConsumeDataNodeTimeTickLag) registry.MustRegister(DataCoordStoredBinlogSize) registry.MustRegister(DataCoordSegmentBinLogFileCount) + registry.MustRegister(DataCoordDmlChannelNum) + registry.MustRegister(DataCoordCompactedSegmentSize) + registry.MustRegister(FlushedSegmentFileNum) registry.MustRegister(IndexRequestCounter) registry.MustRegister(IndexTaskNum) registry.MustRegister(IndexNodeNum) diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index 0f0b4ad013..d8ca092ade 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -159,6 +159,17 @@ var ( nodeIDLabelName, }) + DataNodeCompactionLatencyInQueue = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "compaction_latency_in_queue", + Help: "latency of compaction operation in queue", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + }) + // DataNodeFlushReqCounter counts the num of calls of FlushSegments DataNodeFlushReqCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -189,6 +200,17 @@ var ( Buckets: buckets, // unit: ms }, []string{nodeIDLabelName}) + DataNodeFlowGraphBufferDataSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "fg_buffer_size", + Help: "the buffered data size of flow graph", + }, []string{ + nodeIDLabelName, + collectionIDLabelName, + }) + DataNodeMsgDispatcherTtLag = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, @@ -219,6 +241,8 @@ func RegisterDataNode(registry *prometheus.Registry) { registry.MustRegister(DataNodeConsumeBytesCount) registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken) registry.MustRegister(DataNodeMsgDispatcherTtLag) + registry.MustRegister(DataNodeCompactionLatencyInQueue) + registry.MustRegister(DataNodeFlowGraphBufferDataSize) } func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) { diff --git a/pkg/metrics/etcd_metrics.go b/pkg/metrics/etcd_metrics.go deleted file mode 100644 index 2fde18fbce..0000000000 --- a/pkg/metrics/etcd_metrics.go +++ /dev/null @@ -1,31 +0,0 @@ -package metrics - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -var ( - EtcdPutKvSize = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: milvusNamespace, - Subsystem: "etcd", - Name: "etcd_kv_size", - Help: "kv size stats", - Buckets: buckets, - }) - - EtcdRequestLatency = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: milvusNamespace, - Subsystem: "etcd", - Name: "client_request_latency", - Help: "request latency on the client side ", - Buckets: buckets, - }) -) - -// RegisterEtcdMetrics registers etcd metrics -func RegisterEtcdMetrics(registry *prometheus.Registry) { - registry.MustRegister(EtcdPutKvSize) - registry.MustRegister(EtcdRequestLatency) -} diff --git a/pkg/metrics/indexnode_metrics.go b/pkg/metrics/indexnode_metrics.go index ab656c3b53..f85f59a116 100644 --- a/pkg/metrics/indexnode_metrics.go +++ b/pkg/metrics/indexnode_metrics.go @@ -53,7 +53,7 @@ var ( prometheus.HistogramOpts{ Namespace: milvusNamespace, Subsystem: typeutil.IndexNodeRole, - Name: "build_index_latency", + Name: "knowhere_build_index_latency", Help: "latency of building the index by knowhere", Buckets: buckets, }, []string{nodeIDLabelName}) @@ -75,6 +75,24 @@ var ( Help: "latency of saving the index file", Buckets: buckets, }, []string{nodeIDLabelName}) + + IndexNodeIndexTaskLatencyInQueue = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexNodeRole, + Name: "index_task_latency_in_queue", + Help: "latency of index task in queue", + Buckets: buckets, + }, []string{nodeIDLabelName}) + + IndexNodeBuildIndexLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.IndexNodeRole, + Name: "build_index_latency", + Help: "latency of build index for segment", + Buckets: buckets, + }, []string{nodeIDLabelName}) ) // RegisterIndexNode registers IndexNode metrics @@ -85,4 +103,6 @@ func RegisterIndexNode(registry *prometheus.Registry) { registry.MustRegister(IndexNodeKnowhereBuildIndexLatency) registry.MustRegister(IndexNodeEncodeIndexFileLatency) registry.MustRegister(IndexNodeSaveIndexFileLatency) + registry.MustRegister(IndexNodeIndexTaskLatencyInQueue) + registry.MustRegister(IndexNodeBuildIndexLatency) } diff --git a/pkg/metrics/meta_metrics.go b/pkg/metrics/meta_metrics.go new file mode 100644 index 0000000000..b02e3a53d4 --- /dev/null +++ b/pkg/metrics/meta_metrics.go @@ -0,0 +1,65 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const ( + MetaGetLabel = "get" + MetaPutLabel = "put" + MetaRemoveLabel = "remove" + MetaTxnLabel = "txn" + + metaOpType = "meta_op_type" +) + +var ( + MetaKvSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: "meta", + Name: "kv_size", + Help: "kv size stats", + Buckets: buckets, + }, []string{metaOpType}) + + MetaRequestLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: "meta", + Name: "request_latency", + Help: "request latency on the client side ", + Buckets: buckets, + }, []string{metaOpType}) + + MetaOpCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: "meta", + Name: "op_count", + Help: "count of meta operation", + }, []string{metaOpType, statusLabelName}) +) + +// RegisterMetaMetrics registers meta metrics +func RegisterMetaMetrics(registry *prometheus.Registry) { + registry.MustRegister(MetaKvSize) + registry.MustRegister(MetaRequestLatency) + registry.MustRegister(MetaOpCounter) +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 7a27b85511..7d971c4e6e 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -86,4 +86,16 @@ var ( // buckets involves durations in milliseconds, // [1 2 4 8 16 32 64 128 256 512 1024 2048 4096 8192 16384 32768 65536 1.31072e+05] buckets = prometheus.ExponentialBuckets(1, 2, 18) + + NumNodes = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Name: "num_node", + Help: "number of nodes and coordinates", + }, []string{nodeIDLabelName, roleNameLabelName}) ) + +// Register serves prometheus http service +func Register(r *prometheus.Registry) { + r.MustRegister(NumNodes) +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index f60099a276..6cb8c68adf 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -30,6 +30,7 @@ func TestRegisterMetrics(t *testing.T) { RegisterProxy(r.GoRegistry) RegisterQueryNode(r.GoRegistry) RegisterQueryCoord(r.GoRegistry) - RegisterEtcdMetrics(r.GoRegistry) - RegisterMq(r.GoRegistry) + RegisterMetaMetrics(r.GoRegistry) + RegisterStorageMetrics(r.GoRegistry) + RegisterMsgStreamMetrics(r.GoRegistry) } diff --git a/pkg/metrics/mq_metrics.go b/pkg/metrics/msgstream_metrics.go similarity index 52% rename from pkg/metrics/mq_metrics.go rename to pkg/metrics/msgstream_metrics.go index b63f78333d..83d17728b5 100644 --- a/pkg/metrics/mq_metrics.go +++ b/pkg/metrics/msgstream_metrics.go @@ -16,8 +16,16 @@ package metrics -import ( - "github.com/prometheus/client_golang/prometheus" +import "github.com/prometheus/client_golang/prometheus" + +const ( + SendMsgLabel = "produce" + ReceiveMsgLabel = "consume" // not used + + CreateProducerLabel = "create_producer" + CreateConsumerLabel = "create_consumer" + + msgStreamOpType = "message_op_type" ) var ( @@ -31,8 +39,28 @@ var ( roleNameLabelName, nodeIDLabelName, }) + + MsgStreamRequestLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: "msgstream", + Name: "request_latency", + Help: "request latency on the client side ", + Buckets: buckets, + }, []string{msgStreamOpType}) + + MsgStreamOpCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: "msgstream", + Name: "op_count", + Help: "count of stream message operation", + }, []string{msgStreamOpType, statusLabelName}) ) -func RegisterMq(registry *prometheus.Registry) { +// RegisterMsgStreamMetrics registers msg stream metrics +func RegisterMsgStreamMetrics(registry *prometheus.Registry) { registry.MustRegister(NumConsumers) + registry.MustRegister(MsgStreamRequestLatency) + registry.MustRegister(MsgStreamOpCounter) } diff --git a/pkg/metrics/persistent_store_metrics.go b/pkg/metrics/persistent_store_metrics.go new file mode 100644 index 0000000000..eb6688909b --- /dev/null +++ b/pkg/metrics/persistent_store_metrics.go @@ -0,0 +1,64 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +const ( + DataGetLabel = "get" + DataPutLabel = "put" + DataRemoveLabel = "remove" + DataListLabel = "list" + DataStatLabel = "stat" + + persistentDataOpType = "persistent_data_op_type" +) + +var ( + PersistentDataKvSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: "storage", + Name: "kv_size", + Help: "kv size stats", + Buckets: buckets, + }, []string{persistentDataOpType}) + + PersistentDataRequestLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: "storage", + Name: "request_latency", + Help: "request latency on the client side ", + Buckets: buckets, + }, []string{persistentDataOpType}) + + PersistentDataOpCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: "storage", + Name: "op_count", + Help: "count of persistent data operation", + }, []string{persistentDataOpType, statusLabelName}) +) + +// RegisterStorageMetrics registers storage metrics +func RegisterStorageMetrics(registry *prometheus.Registry) { + registry.MustRegister(PersistentDataKvSize) + registry.MustRegister(PersistentDataRequestLatency) + registry.MustRegister(PersistentDataOpCounter) +} diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 97527949d1..5d115a073b 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -138,7 +138,7 @@ var ( Buckets: buckets, // unit: ms }, []string{nodeIDLabelName, msgTypeLabelName}) - // ProxyCacheHitCounter record the number of Proxy cache hits or miss. + // ProxyCacheStatsCounter record the number of Proxy cache hits or miss. ProxyCacheStatsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: milvusNamespace, @@ -155,7 +155,7 @@ var ( Name: "cache_update_latency", Help: "latency that proxy update cache when cache miss", Buckets: buckets, // unit: ms - }, []string{nodeIDLabelName}) + }, []string{nodeIDLabelName, cacheNameLabelName}) // ProxySyncTimeTickLag record Proxy synchronization timestamp statistics, differentiated by Channel. ProxySyncTimeTickLag = prometheus.NewGaugeVec( diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index 10e9cdfd94..56d88bf35e 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -357,6 +357,30 @@ var ( nodeIDLabelName, channelNameLabelName, }) + + QueryNodeSegmentSearchLatencyPerVector = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "segment_latency_per_vector", + Help: "one vector's search latency per segment", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + queryTypeLabelName, + segmentStateLabelName, + }) + + QueryNodeWatchDmlChannelLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "watch_dml_channel_latency", + Help: "latency of watch dml channel", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + }) ) // RegisterQueryNode registers QueryNode metrics @@ -391,6 +415,8 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeConsumerMsgCount) registry.MustRegister(QueryNodeConsumeTimeTickLag) registry.MustRegister(QueryNodeMsgDispatcherTtLag) + registry.MustRegister(QueryNodeSegmentSearchLatencyPerVector) + registry.MustRegister(QueryNodeWatchDmlChannelLatency) } func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) { diff --git a/pkg/metrics/rootcoord_metrics.go b/pkg/metrics/rootcoord_metrics.go index 39f4358882..d29f667b95 100644 --- a/pkg/metrics/rootcoord_metrics.go +++ b/pkg/metrics/rootcoord_metrics.go @@ -168,6 +168,14 @@ var ( }, []string{ "quota_states", }) + + RootCoordDDLReqLatencyInQueue = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.RootCoordRole, + Name: "ddl_req_latency_in_queue", + Help: "latency of each DDL operations in queue", + }, []string{functionLabelName}) ) // RegisterRootCoord registers RootCoord metrics @@ -200,4 +208,5 @@ func RegisterRootCoord(registry *prometheus.Registry) { registry.MustRegister(RootCoordNumOfRoles) registry.MustRegister(RootCoordTtDelay) registry.MustRegister(RootCoordQuotaStates) + registry.MustRegister(RootCoordDDLReqLatencyInQueue) } diff --git a/pkg/mq/msgstream/mq_msgstream_test.go b/pkg/mq/msgstream/mq_msgstream_test.go index 5939e3327a..69f76120d4 100644 --- a/pkg/mq/msgstream/mq_msgstream_test.go +++ b/pkg/mq/msgstream/mq_msgstream_test.go @@ -1091,6 +1091,10 @@ func (t *MarshalFailTsMsg) Unmarshal(_ MarshalType) (TsMsg, error) { return nil, errors.New("mocked error") } +func (t *MarshalFailTsMsg) Size() int { + return 0 +} + var _ mqwrapper.Producer = (*mockSendFailProducer)(nil) type mockSendFailProducer struct { diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index 5dfcb58312..4c657e689f 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -9,8 +9,10 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) var Producer *kafka.Producer @@ -144,22 +146,37 @@ func (kc *kafkaClient) newConsumerConfig(group string, offset mqwrapper.Subscrip } func (kc *kafkaClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) { + start := timerecord.NewTimeRecorder("create producer") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.TotalLabel).Inc() + pp, err := kc.getKafkaProducer() if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() return nil, err } + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc() + deliveryChan := make(chan kafka.Event, 128) producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: options.Topic} return producer, nil } func (kc *kafkaClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) { + start := timerecord.NewTimeRecorder("create consumer") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.TotalLabel).Inc() + config := kc.newConsumerConfig(options.SubscriptionName, options.SubscriptionInitialPosition) consumer, err := newKafkaConsumer(config, options.Topic, options.SubscriptionName, options.SubscriptionInitialPosition) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, err } + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateConsumerLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.SuccessLabel).Inc() return consumer, nil } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go index 55397afc42..0e6c818d88 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -11,7 +11,9 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type kafkaProducer struct { @@ -26,6 +28,9 @@ func (kp *kafkaProducer) Topic() string { } func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { + start := timerecord.NewTimeRecorder("send msg to stream") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc() + headers := make([]kafka.Header, 0, len(message.Properties)) for key, value := range message.Properties { header := kafka.Header{Key: key, Value: []byte(value)} @@ -38,20 +43,27 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe }, kp.deliveryChan) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() return nil, err } e, ok := <-kp.deliveryChan if !ok { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() log.Error("kafka produce message fail because of delivery chan is closed", zap.String("topic", kp.topic)) return nil, common.NewIgnorableError(fmt.Errorf("delivery chan of kafka producer is closed")) } m := e.(*kafka.Message) if m.TopicPartition.Error != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() return nil, m.TopicPartition.Error } + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc() + return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil } diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_client.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_client.go index cecae9d60e..666e64203b 100644 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_client.go +++ b/pkg/mq/msgstream/mqwrapper/nmq/nmq_client.go @@ -25,7 +25,9 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) // nmqClient implements mqwrapper.Client. @@ -55,9 +57,13 @@ func NewClient(url string, options ...nats.Option) (*nmqClient, error) { // CreateProducer creates a producer for natsmq client func (nc *nmqClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) { + start := timerecord.NewTimeRecorder("create producer") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.TotalLabel).Inc() + // TODO: inject jetstream options. js, err := nc.conn.JetStream() if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() return nil, errors.Wrap(err, "failed to create jetstream context") } // TODO: (1) investigate on performance of multiple streams vs multiple topics. @@ -67,23 +73,34 @@ func (nc *nmqClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrappe Subjects: []string{options.Topic}, }) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() return nil, errors.Wrap(err, "failed to add/connect to jetstream for producer") } rp := nmqProducer{js: js, topic: options.Topic} + + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc() return &rp, nil } func (nc *nmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) { + start := timerecord.NewTimeRecorder("create consumer") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.TotalLabel).Inc() + if options.Topic == "" { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, fmt.Errorf("invalid consumer config: empty topic") } if options.SubscriptionName == "" { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, fmt.Errorf("invalid consumer config: empty subscription name") } // TODO: inject jetstream options. js, err := nc.conn.JetStream() if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, errors.Wrap(err, "failed to create jetstream context") } // TODO: do we allow passing in an existing natsChan from options? @@ -95,6 +112,7 @@ func (nc *nmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Con Subjects: []string{options.Topic}, }) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, errors.Wrap(err, "failed to add/connect to jetstream for consumer") } closeChan := make(chan struct{}) @@ -109,9 +127,13 @@ func (nc *nmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Con sub, err = js.ChanSubscribe(options.Topic, natsChan, nats.DeliverAll()) } if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, errors.Wrap(err, fmt.Sprintf("failed to get consumer info, subscribe position: %d", position)) } + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateConsumerLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.SuccessLabel).Inc() return &Consumer{ js: js, sub: sub, diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer.go index 1df8f3d577..e44e67e55a 100644 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer.go +++ b/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer.go @@ -20,7 +20,9 @@ import ( "context" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/nats-io/nats.go" "go.uber.org/zap" ) @@ -40,6 +42,9 @@ func (np *nmqProducer) Topic() string { // Send send the producer messages to natsmq func (np *nmqProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { + start := timerecord.NewTimeRecorder("send msg to stream") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc() + // Encode message msg := &nats.Msg{ Subject: np.topic, @@ -53,9 +58,14 @@ func (np *nmqProducer) Send(ctx context.Context, message *mqwrapper.ProducerMess // publish to nats-server pa, err := np.js.PublishMsg(msg) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() log.Warn("failed to publish message by nmq", zap.String("topic", np.topic), zap.Error(err), zap.Int("payload_size", len(message.Payload))) return nil, err } + + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc() return &nmqID{messageID: pa.Sequence}, err } diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go index a9222724de..b56a809f40 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go @@ -29,8 +29,10 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type pulsarClient struct { @@ -64,8 +66,12 @@ func NewClient(tenant string, namespace string, opts pulsar.ClientOptions) (*pul // CreateProducer create a pulsar producer from options func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) { + start := timerecord.NewTimeRecorder("create producer") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.TotalLabel).Inc() + fullTopicName, err := GetFullTopicName(pc.tenant, pc.namespace, options.Topic) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() return nil, err } opts := pulsar.ProducerOptions{Topic: fullTopicName} @@ -80,20 +86,29 @@ func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwra pp, err := pc.client.CreateProducer(opts) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() return nil, err } if pp == nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() return nil, errors.New("pulsar is not ready, producer is nil") } + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc() producer := &pulsarProducer{p: pp} return producer, nil } // Subscribe creates a pulsar consumer instance and subscribe a topic func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) { + start := timerecord.NewTimeRecorder("create consumer") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.TotalLabel).Inc() + receiveChannel := make(chan pulsar.ConsumerMessage, options.BufSize) fullTopicName, err := GetFullTopicName(pc.tenant, pc.namespace, options.Topic) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() return nil, err } consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ @@ -104,6 +119,7 @@ func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper. MessageChannel: receiveChannel, }) if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() if strings.Contains(err.Error(), "ConsumerBusy") { return nil, retry.Unrecoverable(err) } @@ -116,6 +132,9 @@ func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper. pConsumer.AtLatest = true } + elapsed := start.ElapseSpan() + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateConsumerLabel).Observe(float64(elapsed.Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.SuccessLabel).Inc() return pConsumer, nil } diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_producer.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_producer.go index b9fcbfd8c6..0773d91483 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_producer.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_producer.go @@ -21,7 +21,9 @@ import ( "github.com/apache/pulsar-client-go/pulsar" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) // implementation assertion @@ -37,9 +39,19 @@ func (pp *pulsarProducer) Topic() string { } func (pp *pulsarProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { + start := timerecord.NewTimeRecorder("send msg to stream") + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc() + ppm := &pulsar.ProducerMessage{Payload: message.Payload, Properties: message.Properties} pmID, err := pp.p.Send(ctx, ppm) - return &pulsarID{messageID: pmID}, err + if err != nil { + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() + return &pulsarID{messageID: pmID}, err + } + + metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(start.ElapseSpan().Milliseconds())) + metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc() + return &pulsarID{messageID: pmID}, nil } func (pp *pulsarProducer) Close() { diff --git a/pkg/mq/msgstream/msg.go b/pkg/mq/msgstream/msg.go index c367d6d66e..afccaa0927 100644 --- a/pkg/mq/msgstream/msg.go +++ b/pkg/mq/msgstream/msg.go @@ -52,6 +52,7 @@ type TsMsg interface { Unmarshal(MarshalType) (TsMsg, error) Position() *MsgPosition SetPosition(*MsgPosition) + Size() int } // BaseMsg is a basic structure that contains begin timestamp, end timestamp and the position of msgstream @@ -289,6 +290,10 @@ func (it *InsertMsg) IndexMsg(index int) *InsertMsg { } } +func (it *InsertMsg) Size() int { + return proto.Size(&it.InsertRequest) +} + /////////////////////////////////////////Delete////////////////////////////////////////// // DeleteMsg is a message pack that contains delete request @@ -388,6 +393,10 @@ func (dt *DeleteMsg) CheckAligned() error { return nil } +func (dt *DeleteMsg) Size() int { + return proto.Size(&dt.DeleteRequest) +} + // ///////////////////////////////////////Upsert////////////////////////////////////////// type UpsertMsg struct { InsertMsg *InsertMsg @@ -454,6 +463,10 @@ func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error) { return timeTick, nil } +func (tst *TimeTickMsg) Size() int { + return proto.Size(&tst.TimeTickMsg) +} + /////////////////////////////////////////CreateCollection////////////////////////////////////////// // CreateCollectionMsg is a message pack that contains create collection request @@ -514,6 +527,10 @@ func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { return createCollectionMsg, nil } +func (cc *CreateCollectionMsg) Size() int { + return proto.Size(&cc.CreateCollectionRequest) +} + /////////////////////////////////////////DropCollection////////////////////////////////////////// // DropCollectionMsg is a message pack that contains drop collection request @@ -574,6 +591,10 @@ func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { return dropCollectionMsg, nil } +func (dc *DropCollectionMsg) Size() int { + return proto.Size(&dc.DropCollectionRequest) +} + /////////////////////////////////////////CreatePartition////////////////////////////////////////// // CreatePartitionMsg is a message pack that contains create partition request @@ -634,6 +655,10 @@ func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) { return createPartitionMsg, nil } +func (cp *CreatePartitionMsg) Size() int { + return proto.Size(&cp.CreatePartitionRequest) +} + /////////////////////////////////////////DropPartition////////////////////////////////////////// // DropPartitionMsg is a message pack that contains drop partition request @@ -694,6 +719,10 @@ func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) { return dropPartitionMsg, nil } +func (dp *DropPartitionMsg) Size() int { + return proto.Size(&dp.DropPartitionRequest) +} + /////////////////////////////////////////DataNodeTtMsg////////////////////////////////////////// // DataNodeTtMsg is a message pack that contains datanode time tick @@ -750,3 +779,7 @@ func (m *DataNodeTtMsg) Unmarshal(input MarshalType) (TsMsg, error) { DataNodeTtMsg: msg, }, nil } + +func (m *DataNodeTtMsg) Size() int { + return proto.Size(&m.DataNodeTtMsg) +}