mirror of https://github.com/milvus-io/milvus.git
feat: record the duration waiting in the proxy queue (#34744)
fix: https://github.com/milvus-io/milvus/issues/34743 --------- Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/34718/head
parent
a4aed9b0b5
commit
a2ac84bd64
|
@ -96,6 +96,7 @@ func newMockIDAllocatorInterface() allocator.Interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockTask struct {
|
type mockTask struct {
|
||||||
|
baseTask
|
||||||
*TaskCondition
|
*TaskCondition
|
||||||
id UniqueID
|
id UniqueID
|
||||||
name string
|
name string
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
@ -118,14 +119,26 @@ type task interface {
|
||||||
WaitToFinish() error
|
WaitToFinish() error
|
||||||
Notify(err error)
|
Notify(err error)
|
||||||
CanSkipAllocTimestamp() bool
|
CanSkipAllocTimestamp() bool
|
||||||
|
SetOnEnqueueTime()
|
||||||
|
GetDurationInQueue() time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type baseTask struct{}
|
type baseTask struct {
|
||||||
|
onEnqueueTime time.Time
|
||||||
|
}
|
||||||
|
|
||||||
func (bt *baseTask) CanSkipAllocTimestamp() bool {
|
func (bt *baseTask) CanSkipAllocTimestamp() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bt *baseTask) SetOnEnqueueTime() {
|
||||||
|
bt.onEnqueueTime = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bt *baseTask) GetDurationInQueue() time.Duration {
|
||||||
|
return time.Since(bt.onEnqueueTime)
|
||||||
|
}
|
||||||
|
|
||||||
type dmlTask interface {
|
type dmlTask interface {
|
||||||
task
|
task
|
||||||
setChannels() error
|
setChannels() error
|
||||||
|
@ -440,12 +453,12 @@ func (t *dropCollectionTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_DropCollection
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *dropCollectionTask) PreExecute(ctx context.Context) error {
|
func (t *dropCollectionTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_DropCollection
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
if err := validateCollectionName(t.CollectionName); err != nil {
|
if err := validateCollectionName(t.CollectionName); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -505,13 +518,15 @@ func (t *hasCollectionTask) SetTs(ts Timestamp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *hasCollectionTask) OnEnqueue() error {
|
func (t *hasCollectionTask) OnEnqueue() error {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
if t.Base == nil {
|
||||||
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_HasCollection
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *hasCollectionTask) PreExecute(ctx context.Context) error {
|
func (t *hasCollectionTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_HasCollection
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
if err := validateCollectionName(t.CollectionName); err != nil {
|
if err := validateCollectionName(t.CollectionName); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -571,13 +586,15 @@ func (t *describeCollectionTask) SetTs(ts Timestamp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *describeCollectionTask) OnEnqueue() error {
|
func (t *describeCollectionTask) OnEnqueue() error {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
if t.Base == nil {
|
||||||
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_DescribeCollection
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *describeCollectionTask) PreExecute(ctx context.Context) error {
|
func (t *describeCollectionTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_DescribeCollection
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
if t.CollectionID != 0 && len(t.CollectionName) == 0 {
|
if t.CollectionID != 0 && len(t.CollectionName) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -712,12 +729,12 @@ func (t *showCollectionsTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (t *showCollectionsTask) OnEnqueue() error {
|
func (t *showCollectionsTask) OnEnqueue() error {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
|
t.Base.MsgType = commonpb.MsgType_ShowCollections
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *showCollectionsTask) PreExecute(ctx context.Context) error {
|
func (t *showCollectionsTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_ShowCollections
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
if t.GetType() == milvuspb.ShowType_InMemory {
|
if t.GetType() == milvuspb.ShowType_InMemory {
|
||||||
for _, collectionName := range t.CollectionNames {
|
for _, collectionName := range t.CollectionNames {
|
||||||
if err := validateCollectionName(collectionName); err != nil {
|
if err := validateCollectionName(collectionName); err != nil {
|
||||||
|
@ -868,6 +885,8 @@ func (t *alterCollectionTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_AlterCollection
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -916,8 +935,6 @@ func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
|
func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_AlterCollection
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName)
|
collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1049,12 +1066,12 @@ func (t *createPartitionTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_CreatePartition
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *createPartitionTask) PreExecute(ctx context.Context) error {
|
func (t *createPartitionTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_CreatePartition
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName, partitionTag := t.CollectionName, t.PartitionName
|
collName, partitionTag := t.CollectionName, t.PartitionName
|
||||||
|
|
||||||
|
@ -1132,12 +1149,12 @@ func (t *dropPartitionTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_DropPartition
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *dropPartitionTask) PreExecute(ctx context.Context) error {
|
func (t *dropPartitionTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_DropPartition
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName, partitionTag := t.CollectionName, t.PartitionName
|
collName, partitionTag := t.CollectionName, t.PartitionName
|
||||||
|
|
||||||
|
@ -1237,13 +1254,15 @@ func (t *hasPartitionTask) SetTs(ts Timestamp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *hasPartitionTask) OnEnqueue() error {
|
func (t *hasPartitionTask) OnEnqueue() error {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
if t.Base == nil {
|
||||||
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_HasPartition
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *hasPartitionTask) PreExecute(ctx context.Context) error {
|
func (t *hasPartitionTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_HasPartition
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName, partitionTag := t.CollectionName, t.PartitionName
|
collName, partitionTag := t.CollectionName, t.PartitionName
|
||||||
|
|
||||||
|
@ -1309,13 +1328,15 @@ func (t *showPartitionsTask) SetTs(ts Timestamp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *showPartitionsTask) OnEnqueue() error {
|
func (t *showPartitionsTask) OnEnqueue() error {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
if t.Base == nil {
|
||||||
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_ShowPartitions
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *showPartitionsTask) PreExecute(ctx context.Context) error {
|
func (t *showPartitionsTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_ShowPartitions
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
if err := validateCollectionName(t.CollectionName); err != nil {
|
if err := validateCollectionName(t.CollectionName); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1460,12 +1481,12 @@ func (t *flushTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_Flush
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *flushTask) PreExecute(ctx context.Context) error {
|
func (t *flushTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_Flush
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1563,14 +1584,14 @@ func (t *loadCollectionTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_LoadCollection
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *loadCollectionTask) PreExecute(ctx context.Context) error {
|
func (t *loadCollectionTask) PreExecute(ctx context.Context) error {
|
||||||
log.Ctx(ctx).Debug("loadCollectionTask PreExecute",
|
log.Ctx(ctx).Debug("loadCollectionTask PreExecute",
|
||||||
zap.String("role", typeutil.ProxyRole))
|
zap.String("role", typeutil.ProxyRole))
|
||||||
t.Base.MsgType = commonpb.MsgType_LoadCollection
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName := t.CollectionName
|
collName := t.CollectionName
|
||||||
|
|
||||||
|
@ -1715,12 +1736,12 @@ func (t *releaseCollectionTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_ReleaseCollection
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *releaseCollectionTask) PreExecute(ctx context.Context) error {
|
func (t *releaseCollectionTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_ReleaseCollection
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName := t.CollectionName
|
collName := t.CollectionName
|
||||||
|
|
||||||
|
@ -1809,12 +1830,12 @@ func (t *loadPartitionsTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_LoadPartitions
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *loadPartitionsTask) PreExecute(ctx context.Context) error {
|
func (t *loadPartitionsTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_LoadPartitions
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName := t.CollectionName
|
collName := t.CollectionName
|
||||||
|
|
||||||
|
@ -1959,12 +1980,12 @@ func (t *releasePartitionsTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_ReleasePartitions
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *releasePartitionsTask) PreExecute(ctx context.Context) error {
|
func (t *releasePartitionsTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_ReleasePartitions
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName := t.CollectionName
|
collName := t.CollectionName
|
||||||
|
|
||||||
|
@ -2064,12 +2085,12 @@ func (t *CreateResourceGroupTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_CreateResourceGroup
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error {
|
func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_CreateResourceGroup
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -2129,12 +2150,12 @@ func (t *UpdateResourceGroupsTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_UpdateResourceGroups
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *UpdateResourceGroupsTask) PreExecute(ctx context.Context) error {
|
func (t *UpdateResourceGroupsTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_UpdateResourceGroups
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -2197,12 +2218,12 @@ func (t *DropResourceGroupTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_DropResourceGroup
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error {
|
func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_DropResourceGroup
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -2259,13 +2280,15 @@ func (t *DescribeResourceGroupTask) SetTs(ts Timestamp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *DescribeResourceGroupTask) OnEnqueue() error {
|
func (t *DescribeResourceGroupTask) OnEnqueue() error {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
if t.Base == nil {
|
||||||
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *DescribeResourceGroupTask) PreExecute(ctx context.Context) error {
|
func (t *DescribeResourceGroupTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -2386,12 +2409,12 @@ func (t *TransferNodeTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_TransferNode
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransferNodeTask) PreExecute(ctx context.Context) error {
|
func (t *TransferNodeTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_TransferNode
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -2451,12 +2474,12 @@ func (t *TransferReplicaTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_TransferReplica
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TransferReplicaTask) PreExecute(ctx context.Context) error {
|
func (t *TransferReplicaTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_TransferReplica
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -2522,13 +2545,15 @@ func (t *ListResourceGroupsTask) SetTs(ts Timestamp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *ListResourceGroupsTask) OnEnqueue() error {
|
func (t *ListResourceGroupsTask) OnEnqueue() error {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
if t.Base == nil {
|
||||||
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_ListResourceGroups
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error {
|
func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_ListResourceGroups
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,13 +82,13 @@ func (t *CreateAliasTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_CreateAlias
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreExecute defines the tion before task execution
|
// PreExecute defines the tion before task execution
|
||||||
func (t *CreateAliasTask) PreExecute(ctx context.Context) error {
|
func (t *CreateAliasTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_CreateAlias
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collAlias := t.Alias
|
collAlias := t.Alias
|
||||||
// collection alias uses the same format as collection name
|
// collection alias uses the same format as collection name
|
||||||
|
@ -165,12 +165,12 @@ func (t *DropAliasTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_DropAlias
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *DropAliasTask) PreExecute(ctx context.Context) error {
|
func (t *DropAliasTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_DropAlias
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
collAlias := t.Alias
|
collAlias := t.Alias
|
||||||
if err := ValidateCollectionAlias(collAlias); err != nil {
|
if err := ValidateCollectionAlias(collAlias); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -234,12 +234,12 @@ func (t *AlterAliasTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_AlterAlias
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *AlterAliasTask) PreExecute(ctx context.Context) error {
|
func (t *AlterAliasTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_AlterAlias
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collAlias := t.Alias
|
collAlias := t.Alias
|
||||||
// collection alias uses the same format as collection name
|
// collection alias uses the same format as collection name
|
||||||
|
@ -310,12 +310,12 @@ func (a *DescribeAliasTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (a *DescribeAliasTask) OnEnqueue() error {
|
func (a *DescribeAliasTask) OnEnqueue() error {
|
||||||
a.Base = commonpbutil.NewMsgBase()
|
a.Base = commonpbutil.NewMsgBase()
|
||||||
|
a.Base.MsgType = commonpb.MsgType_DescribeAlias
|
||||||
|
a.Base.SourceID = a.nodeID
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *DescribeAliasTask) PreExecute(ctx context.Context) error {
|
func (a *DescribeAliasTask) PreExecute(ctx context.Context) error {
|
||||||
a.Base.MsgType = commonpb.MsgType_DescribeAlias
|
|
||||||
a.Base.SourceID = a.nodeID
|
|
||||||
// collection alias uses the same format as collection name
|
// collection alias uses the same format as collection name
|
||||||
if err := ValidateCollectionAlias(a.GetAlias()); err != nil {
|
if err := ValidateCollectionAlias(a.GetAlias()); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -378,12 +378,12 @@ func (a *ListAliasesTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (a *ListAliasesTask) OnEnqueue() error {
|
func (a *ListAliasesTask) OnEnqueue() error {
|
||||||
a.Base = commonpbutil.NewMsgBase()
|
a.Base = commonpbutil.NewMsgBase()
|
||||||
|
a.Base.MsgType = commonpb.MsgType_ListAliases
|
||||||
|
a.Base.SourceID = a.nodeID
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ListAliasesTask) PreExecute(ctx context.Context) error {
|
func (a *ListAliasesTask) PreExecute(ctx context.Context) error {
|
||||||
a.Base.MsgType = commonpb.MsgType_ListAliases
|
|
||||||
a.Base.SourceID = a.nodeID
|
|
||||||
|
|
||||||
if len(a.GetCollectionName()) > 0 {
|
if len(a.GetCollectionName()) > 0 {
|
||||||
if err := validateCollectionName(a.GetCollectionName()); err != nil {
|
if err := validateCollectionName(a.GetCollectionName()); err != nil {
|
||||||
|
|
|
@ -266,12 +266,12 @@ func (t *alterDatabaseTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_AlterDatabase
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *alterDatabaseTask) PreExecute(ctx context.Context) error {
|
func (t *alterDatabaseTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_AlterDatabase
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -345,12 +345,12 @@ func (t *describeDatabaseTask) OnEnqueue() error {
|
||||||
if t.Base == nil {
|
if t.Base == nil {
|
||||||
t.Base = commonpbutil.NewMsgBase()
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.Base.MsgType = commonpb.MsgType_DescribeDatabase
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *describeDatabaseTask) PreExecute(ctx context.Context) error {
|
func (t *describeDatabaseTask) PreExecute(ctx context.Context) error {
|
||||||
t.Base.MsgType = commonpb.MsgType_AlterCollection
|
|
||||||
t.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,6 +98,11 @@ func (dt *deleteTask) SetTs(ts Timestamp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dt *deleteTask) OnEnqueue() error {
|
func (dt *deleteTask) OnEnqueue() error {
|
||||||
|
if dt.req.Base == nil {
|
||||||
|
dt.req.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
dt.req.Base.MsgType = commonpb.MsgType_Delete
|
||||||
|
dt.req.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -111,6 +111,8 @@ func (cit *createIndexTask) OnEnqueue() error {
|
||||||
if cit.req.Base == nil {
|
if cit.req.Base == nil {
|
||||||
cit.req.Base = commonpbutil.NewMsgBase()
|
cit.req.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
cit.req.Base.MsgType = commonpb.MsgType_CreateIndex
|
||||||
|
cit.req.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -438,8 +440,6 @@ func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) erro
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cit *createIndexTask) PreExecute(ctx context.Context) error {
|
func (cit *createIndexTask) PreExecute(ctx context.Context) error {
|
||||||
cit.req.Base.MsgType = commonpb.MsgType_CreateIndex
|
|
||||||
cit.req.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName := cit.req.GetCollectionName()
|
collName := cit.req.GetCollectionName()
|
||||||
|
|
||||||
|
@ -548,12 +548,12 @@ func (t *alterIndexTask) OnEnqueue() error {
|
||||||
if t.req.Base == nil {
|
if t.req.Base == nil {
|
||||||
t.req.Base = commonpbutil.NewMsgBase()
|
t.req.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
t.req.Base.MsgType = commonpb.MsgType_AlterIndex
|
||||||
|
t.req.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *alterIndexTask) PreExecute(ctx context.Context) error {
|
func (t *alterIndexTask) PreExecute(ctx context.Context) error {
|
||||||
t.req.Base.MsgType = commonpb.MsgType_AlterIndex
|
|
||||||
t.req.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
for _, param := range t.req.GetExtraParams() {
|
for _, param := range t.req.GetExtraParams() {
|
||||||
if !indexparams.IsConfigableIndexParam(param.GetKey()) {
|
if !indexparams.IsConfigableIndexParam(param.GetKey()) {
|
||||||
|
@ -660,12 +660,12 @@ func (dit *describeIndexTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (dit *describeIndexTask) OnEnqueue() error {
|
func (dit *describeIndexTask) OnEnqueue() error {
|
||||||
dit.Base = commonpbutil.NewMsgBase()
|
dit.Base = commonpbutil.NewMsgBase()
|
||||||
|
dit.Base.MsgType = commonpb.MsgType_DescribeIndex
|
||||||
|
dit.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dit *describeIndexTask) PreExecute(ctx context.Context) error {
|
func (dit *describeIndexTask) PreExecute(ctx context.Context) error {
|
||||||
dit.Base.MsgType = commonpb.MsgType_DescribeIndex
|
|
||||||
dit.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
if err := validateCollectionName(dit.CollectionName); err != nil {
|
if err := validateCollectionName(dit.CollectionName); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -785,12 +785,12 @@ func (dit *getIndexStatisticsTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (dit *getIndexStatisticsTask) OnEnqueue() error {
|
func (dit *getIndexStatisticsTask) OnEnqueue() error {
|
||||||
dit.Base = commonpbutil.NewMsgBase()
|
dit.Base = commonpbutil.NewMsgBase()
|
||||||
|
dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics
|
||||||
|
dit.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dit *getIndexStatisticsTask) PreExecute(ctx context.Context) error {
|
func (dit *getIndexStatisticsTask) PreExecute(ctx context.Context) error {
|
||||||
dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics
|
|
||||||
dit.Base.SourceID = dit.nodeID
|
|
||||||
|
|
||||||
if err := validateCollectionName(dit.CollectionName); err != nil {
|
if err := validateCollectionName(dit.CollectionName); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -903,12 +903,12 @@ func (dit *dropIndexTask) OnEnqueue() error {
|
||||||
if dit.Base == nil {
|
if dit.Base == nil {
|
||||||
dit.Base = commonpbutil.NewMsgBase()
|
dit.Base = commonpbutil.NewMsgBase()
|
||||||
}
|
}
|
||||||
|
dit.Base.MsgType = commonpb.MsgType_DropIndex
|
||||||
|
dit.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
|
func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
|
||||||
dit.Base.MsgType = commonpb.MsgType_DropIndex
|
|
||||||
dit.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collName, fieldName := dit.CollectionName, dit.FieldName
|
collName, fieldName := dit.CollectionName, dit.FieldName
|
||||||
|
|
||||||
|
@ -1014,12 +1014,12 @@ func (gibpt *getIndexBuildProgressTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (gibpt *getIndexBuildProgressTask) OnEnqueue() error {
|
func (gibpt *getIndexBuildProgressTask) OnEnqueue() error {
|
||||||
gibpt.Base = commonpbutil.NewMsgBase()
|
gibpt.Base = commonpbutil.NewMsgBase()
|
||||||
|
gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress
|
||||||
|
gibpt.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gibpt *getIndexBuildProgressTask) PreExecute(ctx context.Context) error {
|
func (gibpt *getIndexBuildProgressTask) PreExecute(ctx context.Context) error {
|
||||||
gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress
|
|
||||||
gibpt.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
if err := validateCollectionName(gibpt.CollectionName); err != nil {
|
if err := validateCollectionName(gibpt.CollectionName); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1104,12 +1104,12 @@ func (gist *getIndexStateTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (gist *getIndexStateTask) OnEnqueue() error {
|
func (gist *getIndexStateTask) OnEnqueue() error {
|
||||||
gist.Base = commonpbutil.NewMsgBase()
|
gist.Base = commonpbutil.NewMsgBase()
|
||||||
|
gist.Base.MsgType = commonpb.MsgType_GetIndexState
|
||||||
|
gist.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gist *getIndexStateTask) PreExecute(ctx context.Context) error {
|
func (gist *getIndexStateTask) PreExecute(ctx context.Context) error {
|
||||||
gist.Base.MsgType = commonpb.MsgType_GetIndexState
|
|
||||||
gist.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
if err := validateCollectionName(gist.CollectionName); err != nil {
|
if err := validateCollectionName(gist.CollectionName); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||||
|
@ -91,6 +92,11 @@ func (it *insertTask) getChannels() []pChan {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *insertTask) OnEnqueue() error {
|
func (it *insertTask) OnEnqueue() error {
|
||||||
|
if it.insertMsg.Base == nil {
|
||||||
|
it.insertMsg.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
it.insertMsg.Base.MsgType = commonpb.MsgType_Insert
|
||||||
|
it.insertMsg.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
@ -43,6 +44,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type queryTask struct {
|
type queryTask struct {
|
||||||
|
baseTask
|
||||||
Condition
|
Condition
|
||||||
*internalpb.RetrieveRequest
|
*internalpb.RetrieveRequest
|
||||||
|
|
||||||
|
@ -710,6 +712,10 @@ func (t *queryTask) SetTs(ts Timestamp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *queryTask) OnEnqueue() error {
|
func (t *queryTask) OnEnqueue() error {
|
||||||
|
if t.Base == nil {
|
||||||
|
t.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
t.Base.MsgType = commonpb.MsgType_Retrieve
|
t.Base.MsgType = commonpb.MsgType_Retrieve
|
||||||
|
t.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package proxy
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -26,6 +27,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
|
@ -190,6 +192,7 @@ func (queue *baseTaskQueue) Enqueue(t task) error {
|
||||||
t.SetTs(ts)
|
t.SetTs(ts)
|
||||||
t.SetID(id)
|
t.SetID(id)
|
||||||
|
|
||||||
|
t.SetOnEnqueueTime()
|
||||||
return queue.addUnissuedTask(t)
|
return queue.addUnissuedTask(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -454,6 +457,11 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
|
||||||
}()
|
}()
|
||||||
span.AddEvent("scheduler process PreExecute")
|
span.AddEvent("scheduler process PreExecute")
|
||||||
|
|
||||||
|
waitDuration := t.GetDurationInQueue()
|
||||||
|
metrics.ProxyReqInQueueLatency.
|
||||||
|
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.Type().String()).
|
||||||
|
Observe(float64(waitDuration.Milliseconds()))
|
||||||
|
|
||||||
err := t.PreExecute(ctx)
|
err := t.PreExecute(ctx)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -46,6 +46,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type searchTask struct {
|
type searchTask struct {
|
||||||
|
baseTask
|
||||||
Condition
|
Condition
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
*internalpb.SearchRequest
|
*internalpb.SearchRequest
|
||||||
|
|
|
@ -94,6 +94,9 @@ func (g *getStatisticsTask) OnEnqueue() error {
|
||||||
g.GetStatisticsRequest = &internalpb.GetStatisticsRequest{
|
g.GetStatisticsRequest = &internalpb.GetStatisticsRequest{
|
||||||
Base: commonpbutil.NewMsgBase(),
|
Base: commonpbutil.NewMsgBase(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics
|
||||||
|
g.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,10 +110,6 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error {
|
||||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetStatistics-PreExecute")
|
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetStatistics-PreExecute")
|
||||||
defer sp.End()
|
defer sp.End()
|
||||||
|
|
||||||
// TODO: Maybe we should create a new MsgType: GetStatistics?
|
|
||||||
g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics
|
|
||||||
g.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
|
|
||||||
collID, err := globalMetaCache.GetCollectionID(ctx, g.request.GetDbName(), g.collectionName)
|
collID, err := globalMetaCache.GetCollectionID(ctx, g.request.GetDbName(), g.collectionName)
|
||||||
if err != nil { // err is not nil if collection not exists
|
if err != nil { // err is not nil if collection not exists
|
||||||
return err
|
return err
|
||||||
|
@ -634,12 +633,12 @@ func (g *getCollectionStatisticsTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (g *getCollectionStatisticsTask) OnEnqueue() error {
|
func (g *getCollectionStatisticsTask) OnEnqueue() error {
|
||||||
g.Base = commonpbutil.NewMsgBase()
|
g.Base = commonpbutil.NewMsgBase()
|
||||||
|
g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics
|
||||||
|
g.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error {
|
func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error {
|
||||||
g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics
|
|
||||||
g.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,12 +716,12 @@ func (g *getPartitionStatisticsTask) SetTs(ts Timestamp) {
|
||||||
|
|
||||||
func (g *getPartitionStatisticsTask) OnEnqueue() error {
|
func (g *getPartitionStatisticsTask) OnEnqueue() error {
|
||||||
g.Base = commonpbutil.NewMsgBase()
|
g.Base = commonpbutil.NewMsgBase()
|
||||||
|
g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics
|
||||||
|
g.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error {
|
func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error {
|
||||||
g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics
|
|
||||||
g.Base.SourceID = paramtable.GetNodeID()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1022,6 +1022,7 @@ func TestHasCollectionTask(t *testing.T) {
|
||||||
rootCoord: rc,
|
rootCoord: rc,
|
||||||
result: nil,
|
result: nil,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_HasCollection, task.Type())
|
assert.Equal(t, commonpb.MsgType_HasCollection, task.Type())
|
||||||
|
@ -1084,6 +1085,7 @@ func TestDescribeCollectionTask(t *testing.T) {
|
||||||
rootCoord: rc,
|
rootCoord: rc,
|
||||||
result: nil,
|
result: nil,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_DescribeCollection, task.Type())
|
assert.Equal(t, commonpb.MsgType_DescribeCollection, task.Type())
|
||||||
|
@ -1332,6 +1334,7 @@ func TestCreatePartitionTask(t *testing.T) {
|
||||||
rootCoord: rc,
|
rootCoord: rc,
|
||||||
result: nil,
|
result: nil,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_CreatePartition, task.Type())
|
assert.Equal(t, commonpb.MsgType_CreatePartition, task.Type())
|
||||||
|
@ -1407,6 +1410,7 @@ func TestDropPartitionTask(t *testing.T) {
|
||||||
queryCoord: qc,
|
queryCoord: qc,
|
||||||
result: nil,
|
result: nil,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_DropPartition, task.Type())
|
assert.Equal(t, commonpb.MsgType_DropPartition, task.Type())
|
||||||
|
@ -1524,6 +1528,7 @@ func TestHasPartitionTask(t *testing.T) {
|
||||||
rootCoord: rc,
|
rootCoord: rc,
|
||||||
result: nil,
|
result: nil,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_HasPartition, task.Type())
|
assert.Equal(t, commonpb.MsgType_HasPartition, task.Type())
|
||||||
|
@ -1571,6 +1576,7 @@ func TestShowPartitionsTask(t *testing.T) {
|
||||||
rootCoord: rc,
|
rootCoord: rc,
|
||||||
result: nil,
|
result: nil,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_ShowPartitions, task.Type())
|
assert.Equal(t, commonpb.MsgType_ShowPartitions, task.Type())
|
||||||
|
@ -2693,6 +2699,7 @@ func TestCreateResourceGroupTask(t *testing.T) {
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
queryCoord: qc,
|
queryCoord: qc,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_CreateResourceGroup, task.Type())
|
assert.Equal(t, commonpb.MsgType_CreateResourceGroup, task.Type())
|
||||||
|
@ -2732,6 +2739,7 @@ func TestDropResourceGroupTask(t *testing.T) {
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
queryCoord: qc,
|
queryCoord: qc,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_DropResourceGroup, task.Type())
|
assert.Equal(t, commonpb.MsgType_DropResourceGroup, task.Type())
|
||||||
|
@ -2773,6 +2781,7 @@ func TestTransferNodeTask(t *testing.T) {
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
queryCoord: qc,
|
queryCoord: qc,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_TransferNode, task.Type())
|
assert.Equal(t, commonpb.MsgType_TransferNode, task.Type())
|
||||||
|
@ -2815,6 +2824,7 @@ func TestTransferReplicaTask(t *testing.T) {
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
queryCoord: qc,
|
queryCoord: qc,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_TransferReplica, task.Type())
|
assert.Equal(t, commonpb.MsgType_TransferReplica, task.Type())
|
||||||
|
@ -2854,6 +2864,7 @@ func TestListResourceGroupsTask(t *testing.T) {
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
queryCoord: qc,
|
queryCoord: qc,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_ListResourceGroups, task.Type())
|
assert.Equal(t, commonpb.MsgType_ListResourceGroups, task.Type())
|
||||||
|
@ -2906,6 +2917,7 @@ func TestDescribeResourceGroupTask(t *testing.T) {
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
queryCoord: qc,
|
queryCoord: qc,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type())
|
assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type())
|
||||||
|
@ -2952,6 +2964,7 @@ func TestDescribeResourceGroupTaskFailed(t *testing.T) {
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
queryCoord: qc,
|
queryCoord: qc,
|
||||||
}
|
}
|
||||||
|
task.OnEnqueue()
|
||||||
task.PreExecute(ctx)
|
task.PreExecute(ctx)
|
||||||
|
|
||||||
assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type())
|
assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type())
|
||||||
|
|
|
@ -134,6 +134,11 @@ func (it *upsertTask) getChannels() []pChan {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *upsertTask) OnEnqueue() error {
|
func (it *upsertTask) OnEnqueue() error {
|
||||||
|
if it.req.Base == nil {
|
||||||
|
it.req.Base = commonpbutil.NewMsgBase()
|
||||||
|
}
|
||||||
|
it.req.Base.MsgType = commonpb.MsgType_Upsert
|
||||||
|
it.req.Base.SourceID = paramtable.GetNodeID()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -334,6 +334,16 @@ var (
|
||||||
Name: "slow_query_count",
|
Name: "slow_query_count",
|
||||||
Help: "count of slow query executed",
|
Help: "count of slow query executed",
|
||||||
}, []string{nodeIDLabelName, msgTypeLabelName})
|
}, []string{nodeIDLabelName, msgTypeLabelName})
|
||||||
|
|
||||||
|
// ProxyReqInQueueLatency records the latency that requests wait in the queue, like "CreateCollection".
|
||||||
|
ProxyReqInQueueLatency = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.ProxyRole,
|
||||||
|
Name: "req_in_queue_latency",
|
||||||
|
Help: "latency which request waits in the queue",
|
||||||
|
Buckets: buckets, // unit: ms
|
||||||
|
}, []string{nodeIDLabelName, functionLabelName})
|
||||||
)
|
)
|
||||||
|
|
||||||
// RegisterProxy registers Proxy metrics
|
// RegisterProxy registers Proxy metrics
|
||||||
|
@ -383,6 +393,7 @@ func RegisterProxy(registry *prometheus.Registry) {
|
||||||
|
|
||||||
registry.MustRegister(ProxySlowQueryCount)
|
registry.MustRegister(ProxySlowQueryCount)
|
||||||
registry.MustRegister(ProxyReportValue)
|
registry.MustRegister(ProxyReportValue)
|
||||||
|
registry.MustRegister(ProxyReqInQueueLatency)
|
||||||
}
|
}
|
||||||
|
|
||||||
func CleanupProxyDBMetrics(nodeID int64, dbName string) {
|
func CleanupProxyDBMetrics(nodeID int64, dbName string) {
|
||||||
|
|
Loading…
Reference in New Issue