From 79535931aba6f94bc8f603f67c8a6c969ff9c711 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 6 Jan 2023 14:31:37 +0800 Subject: [PATCH] Return error code when deny to read/write (#21561) Signed-off-by: bigsheeper --- go.mod | 2 +- go.sum | 2 + internal/core/src/pb/common.pb.cc | 187 +++++++++--------- internal/core/src/pb/common.pb.h | 4 + internal/proto/proxy.proto | 2 +- internal/proto/proxypb/proxy.pb.go | 82 ++++---- internal/proxy/error.go | 21 -- internal/proxy/error_test.go | 19 +- internal/proxy/impl.go | 4 +- internal/proxy/impl_test.go | 4 +- internal/proxy/multi_rate_limiter.go | 58 +++--- internal/proxy/multi_rate_limiter_test.go | 34 ++-- internal/proxy/rate_limit_interceptor.go | 47 +++-- internal/proxy/rate_limit_interceptor_test.go | 51 ++--- internal/rootcoord/quota_center.go | 80 +++----- internal/rootcoord/quota_center_test.go | 12 +- internal/types/types.go | 4 +- 17 files changed, 277 insertions(+), 336 deletions(-) diff --git a/go.mod b/go.mod index 3aaee3bdd1..fbf1844cce 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/klauspost/compress v1.14.4 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0 + github.com/milvus-io/milvus-proto/go-api v0.0.0-20230105121931-9f9303dcc729 github.com/minio/minio-go/v7 v7.0.17 github.com/opentracing/opentracing-go v1.2.0 github.com/panjf2000/ants/v2 v2.4.8 diff --git a/go.sum b/go.sum index beef82f5c7..80d7a8bcdd 100644 --- a/go.sum +++ b/go.sum @@ -484,6 +484,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0 h1:GSiYfmb/CgWCdTKHzI0zl0L1xTr9/kaM6wr1O882lYc= github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20230105121931-9f9303dcc729 h1:hsb1ifdNe3qlXi1YY5dWPPzWMNZmnqe5uunYPYK3gd0= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20230105121931-9f9303dcc729/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/core/src/pb/common.pb.cc b/internal/core/src/pb/common.pb.cc index 19b3e37e1a..4a46763a96 100644 --- a/internal/core/src/pb/common.pb.cc +++ b/internal/core/src/pb/common.pb.cc @@ -363,7 +363,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( ".ObjectType\022>\n\020object_privilege\030\002 \001(\0162$." "milvus.proto.common.ObjectPrivilege\022\031\n\021o" "bject_name_index\030\003 \001(\005\022\032\n\022object_name_in" - "dexs\030\004 \001(\005*\254\t\n\tErrorCode\022\013\n\007Success\020\000\022\023\n" + "dexs\030\004 \001(\005*\223\n\n\tErrorCode\022\013\n\007Success\020\000\022\023\n" "\017UnexpectedError\020\001\022\021\n\rConnectFailed\020\002\022\024\n" "\020PermissionDenied\020\003\022\027\n\023CollectionNotExis" "ts\020\004\022\023\n\017IllegalArgument\020\005\022\024\n\020IllegalDime" @@ -392,94 +392,97 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "otShardLeader\020-\022\026\n\022NoReplicaAvailable\020.\022" "\023\n\017SegmentNotFound\020/\022\r\n\tForceDeny\0200\022\r\n\tR" "ateLimit\0201\022\022\n\016NodeIDNotMatch\0202\022\024\n\020Upsert" - "AutoIDTrue\0203\022\017\n\013DataCoordNA\020d\022\022\n\rDDReque" - "stRace\020\350\007*c\n\nIndexState\022\022\n\016IndexStateNon" - "e\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Fin" - "ished\020\003\022\n\n\006Failed\020\004\022\t\n\005Retry\020\005*\202\001\n\014Segme" - "ntState\022\024\n\020SegmentStateNone\020\000\022\014\n\010NotExis" - "t\020\001\022\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020" - "\004\022\014\n\010Flushing\020\005\022\013\n\007Dropped\020\006\022\r\n\tImportin" - "g\020\007*>\n\017PlaceholderType\022\010\n\004None\020\000\022\020\n\014Bina" - "ryVector\020d\022\017\n\013FloatVector\020e*\232\r\n\007MsgType\022" - "\r\n\tUndefined\020\000\022\024\n\020CreateCollection\020d\022\022\n\016" - "DropCollection\020e\022\021\n\rHasCollection\020f\022\026\n\022D" - "escribeCollection\020g\022\023\n\017ShowCollections\020h" - "\022\024\n\020GetSystemConfigs\020i\022\022\n\016LoadCollection" - "\020j\022\025\n\021ReleaseCollection\020k\022\017\n\013CreateAlias" - "\020l\022\r\n\tDropAlias\020m\022\016\n\nAlterAlias\020n\022\023\n\017Alt" - "erCollection\020o\022\024\n\017CreatePartition\020\310\001\022\022\n\r" - "DropPartition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021D" - "escribePartition\020\313\001\022\023\n\016ShowPartitions\020\314\001" - "\022\023\n\016LoadPartitions\020\315\001\022\026\n\021ReleasePartitio" - "ns\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegm" - "ent\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegm" - "ents\020\375\001\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBal" - "anceSegments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020" - "\n\013CreateIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\t" - "DropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n" - "\n\005Flush\020\222\003\022\027\n\022ResendSegmentStats\020\223\003\022\013\n\006U" - "psert\020\224\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003" - "\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBuildPro" - "gress\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033" - "\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retrieve\020" - "\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChanne" - "ls\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQue" - "ryChannels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022" - "\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchD" - "eltaChannels\020\201\004\022\024\n\017GetShardLeaders\020\202\004\022\020\n" - "\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204\004\022\024\n\017" - "GetDistribution\020\205\004\022\025\n\020SyncDistribution\020\206" - "\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017" - "GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004" - "\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\t" - "LoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestT" - "SO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentSt" - "atistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDa" - "taNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGe" - "tCredential\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n" - "\020UpdateCredential\020\337\013\022\026\n\021ListCredUsername" - "s\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017" - "OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014\022\017\n\nS" - "electUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025\n\020Ope" - "ratePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022\033\n\026Re" - "freshPolicyInfoCache\020\310\014\022\017\n\nListPolicy\020\311\014" - "*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017" - "CompactionState\022\021\n\rUndefiedState\020\000\022\r\n\tEx" - "ecuting\020\001\022\r\n\tCompleted\020\002*X\n\020ConsistencyL" - "evel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bounded" - "\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\236\001\n\013I" - "mportState\022\021\n\rImportPending\020\000\022\020\n\014ImportF" - "ailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017ImportPers" - "isted\020\005\022\021\n\rImportFlushed\020\010\022\023\n\017ImportComp" - "leted\020\006\022\032\n\026ImportFailedAndCleaned\020\007*2\n\nO" - "bjectType\022\016\n\nCollection\020\000\022\n\n\006Global\020\001\022\010\n" - "\004User\020\002*\233\005\n\017ObjectPrivilege\022\020\n\014Privilege" - "All\020\000\022\035\n\031PrivilegeCreateCollection\020\001\022\033\n\027" - "PrivilegeDropCollection\020\002\022\037\n\033PrivilegeDe" - "scribeCollection\020\003\022\034\n\030PrivilegeShowColle" - "ctions\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Privilege" - "Release\020\006\022\027\n\023PrivilegeCompaction\020\007\022\023\n\017Pr" - "ivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020\t\022\032\n\026" - "PrivilegeGetStatistics\020\n\022\030\n\024PrivilegeCre" - "ateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020\014\022\026\n\022" - "PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSearch\020" - "\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQuery\020" - "\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Privilege" - "Import\020\022\022\034\n\030PrivilegeCreateOwnership\020\023\022\027" - "\n\023PrivilegeUpdateUser\020\024\022\032\n\026PrivilegeDrop" - "Ownership\020\025\022\034\n\030PrivilegeSelectOwnership\020" - "\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023Privi" - "legeSelectUser\020\030\022\023\n\017PrivilegeUpsert\020\031*S\n" - "\tStateCode\022\020\n\014Initializing\020\000\022\013\n\007Healthy\020" - "\001\022\014\n\010Abnormal\020\002\022\013\n\007StandBy\020\003\022\014\n\010Stopping" - "\020\004*c\n\tLoadState\022\025\n\021LoadStateNotExist\020\000\022\024" - "\n\020LoadStateNotLoad\020\001\022\024\n\020LoadStateLoading" - "\020\002\022\023\n\017LoadStateLoaded\020\003:^\n\021privilege_ext" - "_obj\022\037.google.protobuf.MessageOptions\030\351\007" - " \001(\0132!.milvus.proto.common.PrivilegeExtB" - "f\n\016io.milvus.grpcB\013CommonProtoP\001Z1github" - ".com/milvus-io/milvus-proto/go-api/commo" - "npb\240\001\001\252\002\016IO.Milvus.Grpcb\006proto3" + "AutoIDTrue\0203\022\034\n\030InsufficientMemoryToLoad" + "\0204\022\030\n\024MemoryQuotaExhausted\0205\022\026\n\022DiskQuot" + "aExhausted\0206\022\025\n\021TimeTickLongDelay\0207\022\017\n\013D" + "ataCoordNA\020d\022\022\n\rDDRequestRace\020\350\007*c\n\nInde" + "xState\022\022\n\016IndexStateNone\020\000\022\014\n\010Unissued\020\001" + "\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n\n\006Failed" + "\020\004\022\t\n\005Retry\020\005*\202\001\n\014SegmentState\022\024\n\020Segmen" + "tStateNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022" + "\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013" + "\n\007Dropped\020\006\022\r\n\tImporting\020\007*>\n\017Placeholde" + "rType\022\010\n\004None\020\000\022\020\n\014BinaryVector\020d\022\017\n\013Flo" + "atVector\020e*\232\r\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n" + "\020CreateCollection\020d\022\022\n\016DropCollection\020e\022" + "\021\n\rHasCollection\020f\022\026\n\022DescribeCollection" + "\020g\022\023\n\017ShowCollections\020h\022\024\n\020GetSystemConf" + "igs\020i\022\022\n\016LoadCollection\020j\022\025\n\021ReleaseColl" + "ection\020k\022\017\n\013CreateAlias\020l\022\r\n\tDropAlias\020m" + "\022\016\n\nAlterAlias\020n\022\023\n\017AlterCollection\020o\022\024\n" + "\017CreatePartition\020\310\001\022\022\n\rDropPartition\020\311\001\022" + "\021\n\014HasPartition\020\312\001\022\026\n\021DescribePartition\020" + "\313\001\022\023\n\016ShowPartitions\020\314\001\022\023\n\016LoadPartition" + "s\020\315\001\022\026\n\021ReleasePartitions\020\316\001\022\021\n\014ShowSegm" + "ents\020\372\001\022\024\n\017DescribeSegment\020\373\001\022\021\n\014LoadSeg" + "ments\020\374\001\022\024\n\017ReleaseSegments\020\375\001\022\024\n\017Handof" + "fSegments\020\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025" + "\n\020DescribeSegments\020\200\002\022\020\n\013CreateIndex\020\254\002\022" + "\022\n\rDescribeIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006I" + "nsert\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022Res" + "endSegmentStats\020\223\003\022\013\n\006Upsert\020\224\003\022\013\n\006Searc" + "h\020\364\003\022\021\n\014SearchResult\020\365\003\022\022\n\rGetIndexState" + "\020\366\003\022\032\n\025GetIndexBuildProgress\020\367\003\022\034\n\027GetCo" + "llectionStatistics\020\370\003\022\033\n\026GetPartitionSta" + "tistics\020\371\003\022\r\n\010Retrieve\020\372\003\022\023\n\016RetrieveRes" + "ult\020\373\003\022\024\n\017WatchDmChannels\020\374\003\022\025\n\020RemoveDm" + "Channels\020\375\003\022\027\n\022WatchQueryChannels\020\376\003\022\030\n\023" + "RemoveQueryChannels\020\377\003\022\035\n\030SealedSegments" + "ChangeInfo\020\200\004\022\027\n\022WatchDeltaChannels\020\201\004\022\024" + "\n\017GetShardLeaders\020\202\004\022\020\n\013GetReplicas\020\203\004\022\023" + "\n\016UnsubDmChannel\020\204\004\022\024\n\017GetDistribution\020\205" + "\004\022\025\n\020SyncDistribution\020\206\004\022\020\n\013SegmentInfo\020" + "\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017GetRecoveryInfo\020\332" + "\004\022\024\n\017GetSegmentState\020\333\004\022\r\n\010TimeTick\020\260\t\022\023" + "\n\016QueryNodeStats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tR" + "equestID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017Allocate" + "Segment\020\265\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n\020Se" + "gmentFlushDone\020\267\t\022\017\n\nDataNodeTt\020\270\t\022\025\n\020Cr" + "eateCredential\020\334\013\022\022\n\rGetCredential\020\335\013\022\025\n" + "\020DeleteCredential\020\336\013\022\025\n\020UpdateCredential" + "\020\337\013\022\026\n\021ListCredUsernames\020\340\013\022\017\n\nCreateRol" + "e\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017OperateUserRole\020\302" + "\014\022\017\n\nSelectRole\020\303\014\022\017\n\nSelectUser\020\304\014\022\023\n\016S" + "electResource\020\305\014\022\025\n\020OperatePrivilege\020\306\014\022" + "\020\n\013SelectGrant\020\307\014\022\033\n\026RefreshPolicyInfoCa" + "che\020\310\014\022\017\n\nListPolicy\020\311\014*\"\n\007DslType\022\007\n\003Ds" + "l\020\000\022\016\n\nBoolExprV1\020\001*B\n\017CompactionState\022\021" + "\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tComp" + "leted\020\002*X\n\020ConsistencyLevel\022\n\n\006Strong\020\000\022" + "\013\n\007Session\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventually\020" + "\003\022\016\n\nCustomized\020\004*\236\001\n\013ImportState\022\021\n\rImp" + "ortPending\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImport" + "Started\020\002\022\023\n\017ImportPersisted\020\005\022\021\n\rImport" + "Flushed\020\010\022\023\n\017ImportCompleted\020\006\022\032\n\026Import" + "FailedAndCleaned\020\007*2\n\nObjectType\022\016\n\nColl" + "ection\020\000\022\n\n\006Global\020\001\022\010\n\004User\020\002*\233\005\n\017Objec" + "tPrivilege\022\020\n\014PrivilegeAll\020\000\022\035\n\031Privileg" + "eCreateCollection\020\001\022\033\n\027PrivilegeDropColl" + "ection\020\002\022\037\n\033PrivilegeDescribeCollection\020" + "\003\022\034\n\030PrivilegeShowCollections\020\004\022\021\n\rPrivi" + "legeLoad\020\005\022\024\n\020PrivilegeRelease\020\006\022\027\n\023Priv" + "ilegeCompaction\020\007\022\023\n\017PrivilegeInsert\020\010\022\023" + "\n\017PrivilegeDelete\020\t\022\032\n\026PrivilegeGetStati" + "stics\020\n\022\030\n\024PrivilegeCreateIndex\020\013\022\030\n\024Pri" + "vilegeIndexDetail\020\014\022\026\n\022PrivilegeDropInde" + "x\020\r\022\023\n\017PrivilegeSearch\020\016\022\022\n\016PrivilegeFlu" + "sh\020\017\022\022\n\016PrivilegeQuery\020\020\022\030\n\024PrivilegeLoa" + "dBalance\020\021\022\023\n\017PrivilegeImport\020\022\022\034\n\030Privi" + "legeCreateOwnership\020\023\022\027\n\023PrivilegeUpdate" + "User\020\024\022\032\n\026PrivilegeDropOwnership\020\025\022\034\n\030Pr" + "ivilegeSelectOwnership\020\026\022\034\n\030PrivilegeMan" + "ageOwnership\020\027\022\027\n\023PrivilegeSelectUser\020\030\022" + "\023\n\017PrivilegeUpsert\020\031*S\n\tStateCode\022\020\n\014Ini" + "tializing\020\000\022\013\n\007Healthy\020\001\022\014\n\010Abnormal\020\002\022\013" + "\n\007StandBy\020\003\022\014\n\010Stopping\020\004*c\n\tLoadState\022\025" + "\n\021LoadStateNotExist\020\000\022\024\n\020LoadStateNotLoa" + "d\020\001\022\024\n\020LoadStateLoading\020\002\022\023\n\017LoadStateLo" + "aded\020\003:^\n\021privilege_ext_obj\022\037.google.pro" + "tobuf.MessageOptions\030\351\007 \001(\0132!.milvus.pro" + "to.common.PrivilegeExtBf\n\016io.milvus.grpc" + "B\013CommonProtoP\001Z1github.com/milvus-io/mi" + "lvus-proto/go-api/commonpb\240\001\001\252\002\016IO.Milvu" + "s.Grpcb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = { &::descriptor_table_google_2fprotobuf_2fdescriptor_2eproto, @@ -500,7 +503,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once; static bool descriptor_table_common_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = { - &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5591, + &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5694, &descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 11, 1, schemas, file_default_instances, TableStruct_common_2eproto::offsets, file_level_metadata_common_2eproto, 11, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto, @@ -568,6 +571,10 @@ bool ErrorCode_IsValid(int value) { case 49: case 50: case 51: + case 52: + case 53: + case 54: + case 55: case 100: case 1000: return true; diff --git a/internal/core/src/pb/common.pb.h b/internal/core/src/pb/common.pb.h index 9efcb8776a..6ddfd9c830 100644 --- a/internal/core/src/pb/common.pb.h +++ b/internal/core/src/pb/common.pb.h @@ -164,6 +164,10 @@ enum ErrorCode : int { RateLimit = 49, NodeIDNotMatch = 50, UpsertAutoIDTrue = 51, + InsufficientMemoryToLoad = 52, + MemoryQuotaExhausted = 53, + DiskQuotaExhausted = 54, + TimeTickLongDelay = 55, DataCoordNA = 100, DDRequestRace = 1000, ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(), diff --git a/internal/proto/proxy.proto b/internal/proto/proxy.proto index efed2aa6ea..3c3155ef51 100644 --- a/internal/proto/proxy.proto +++ b/internal/proto/proxy.proto @@ -54,5 +54,5 @@ message SetRatesRequest { common.MsgBase base = 1; repeated internal.Rate rates = 2; repeated milvus.QuotaState states = 3; - repeated string state_reasons = 4; + repeated common.ErrorCode codes = 4; } diff --git a/internal/proto/proxypb/proxy.pb.go b/internal/proto/proxypb/proxy.pb.go index d0dd6c7422..ed00f979d3 100644 --- a/internal/proto/proxypb/proxy.pb.go +++ b/internal/proto/proxypb/proxy.pb.go @@ -255,7 +255,7 @@ type SetRatesRequest struct { Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` Rates []*internalpb.Rate `protobuf:"bytes,2,rep,name=rates,proto3" json:"rates,omitempty"` States []milvuspb.QuotaState `protobuf:"varint,3,rep,packed,name=states,proto3,enum=milvus.proto.milvus.QuotaState" json:"states,omitempty"` - StateReasons []string `protobuf:"bytes,4,rep,name=state_reasons,json=stateReasons,proto3" json:"state_reasons,omitempty"` + Codes []commonpb.ErrorCode `protobuf:"varint,4,rep,packed,name=codes,proto3,enum=milvus.proto.common.ErrorCode" json:"codes,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -307,9 +307,9 @@ func (m *SetRatesRequest) GetStates() []milvuspb.QuotaState { return nil } -func (m *SetRatesRequest) GetStateReasons() []string { +func (m *SetRatesRequest) GetCodes() []commonpb.ErrorCode { if m != nil { - return m.StateReasons + return m.Codes } return nil } @@ -327,44 +327,44 @@ func init() { proto.RegisterFile("proxy.proto", fileDescriptor_700b50b08ed8dbaf) var fileDescriptor_700b50b08ed8dbaf = []byte{ // 625 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x51, 0x4f, 0x13, 0x4d, - 0x14, 0x65, 0x29, 0x2d, 0x70, 0xe9, 0x07, 0xc9, 0x84, 0x0f, 0x6b, 0x11, 0x6d, 0x16, 0x23, 0x0d, - 0x89, 0xad, 0x54, 0x13, 0xdf, 0x29, 0x49, 0x43, 0x0c, 0x04, 0xa7, 0xfa, 0xe2, 0x0b, 0x99, 0xdd, - 0xbd, 0xd0, 0x21, 0xdb, 0x99, 0x65, 0x67, 0x8a, 0xf6, 0xc9, 0xc4, 0x7f, 0xe4, 0x9b, 0x3f, 0xc4, - 0x1f, 0x64, 0x76, 0x66, 0xbb, 0xd0, 0xba, 0x65, 0xa3, 0xc4, 0xb7, 0x3d, 0x77, 0xce, 0x9d, 0x73, - 0xef, 0xec, 0xbd, 0x07, 0xd6, 0xa2, 0x58, 0x7e, 0x19, 0xb7, 0xa2, 0x58, 0x6a, 0x49, 0xc8, 0x90, - 0x87, 0x37, 0x23, 0x65, 0x51, 0xcb, 0x9c, 0xd4, 0xab, 0xbe, 0x1c, 0x0e, 0xa5, 0xb0, 0xb1, 0xfa, - 0x3a, 0x17, 0x1a, 0x63, 0xc1, 0xc2, 0x14, 0x57, 0xef, 0x66, 0xb8, 0x3f, 0x1c, 0x78, 0x7a, 0x2c, - 0x6e, 0x58, 0xc8, 0x03, 0xa6, 0xb1, 0x2b, 0xc3, 0xf0, 0x04, 0x35, 0xeb, 0x32, 0x7f, 0x80, 0x14, - 0xaf, 0x47, 0xa8, 0x34, 0x79, 0x05, 0x4b, 0x1e, 0x53, 0x58, 0x73, 0x1a, 0x4e, 0x73, 0xad, 0xf3, - 0xa4, 0x35, 0xa5, 0x98, 0x4a, 0x9d, 0xa8, 0xcb, 0x43, 0xa6, 0x90, 0x1a, 0x26, 0x79, 0x04, 0xcb, - 0x81, 0x77, 0x2e, 0xd8, 0x10, 0x6b, 0x8b, 0x0d, 0xa7, 0xb9, 0x4a, 0x2b, 0x81, 0x77, 0xca, 0x86, - 0x48, 0xf6, 0x60, 0xc3, 0x97, 0x61, 0x88, 0xbe, 0xe6, 0x52, 0x58, 0x42, 0xc9, 0x10, 0xd6, 0x6f, - 0xc3, 0x86, 0xe8, 0x42, 0xf5, 0x36, 0x72, 0x7c, 0x54, 0x5b, 0x6a, 0x38, 0xcd, 0x12, 0x9d, 0x8a, - 0xb9, 0x57, 0x50, 0xbf, 0x53, 0x79, 0x8c, 0xc1, 0x03, 0xab, 0xae, 0xc3, 0xca, 0x48, 0x25, 0x2f, - 0x95, 0x95, 0x9d, 0x61, 0xf7, 0x9b, 0x03, 0x5b, 0x1f, 0xa3, 0x7f, 0x2f, 0x94, 0x9c, 0x45, 0x4c, - 0xa9, 0xcf, 0x32, 0x0e, 0xd2, 0xa7, 0xc9, 0xb0, 0xfb, 0x15, 0x76, 0x28, 0x5e, 0xc4, 0xa8, 0x06, - 0x67, 0x32, 0xe4, 0xfe, 0xf8, 0x58, 0x5c, 0xc8, 0x07, 0x96, 0xb2, 0x05, 0x15, 0x19, 0x7d, 0x18, - 0x47, 0xb6, 0x90, 0x32, 0x4d, 0x11, 0xd9, 0x84, 0xb2, 0x8c, 0xde, 0xe1, 0x38, 0xad, 0xc1, 0x02, - 0xf7, 0xa7, 0x03, 0x1b, 0x7d, 0xd4, 0x94, 0x69, 0x54, 0x7f, 0xaf, 0x79, 0x00, 0xe5, 0x38, 0xb9, - 0xa1, 0xb6, 0xd8, 0x28, 0x35, 0xd7, 0x3a, 0xdb, 0xd3, 0x29, 0xd9, 0xb4, 0x26, 0x2a, 0xd4, 0x32, - 0xc9, 0x5b, 0xa8, 0x28, 0x6d, 0x72, 0x4a, 0x8d, 0x52, 0x73, 0xbd, 0xf3, 0x6c, 0x3a, 0x27, 0x05, - 0xef, 0x47, 0x52, 0xb3, 0x7e, 0xc2, 0xa3, 0x29, 0x9d, 0xec, 0xc2, 0x7f, 0xe6, 0xeb, 0x3c, 0x46, - 0xa6, 0xa4, 0x50, 0xb5, 0xa5, 0x46, 0xa9, 0xb9, 0x4a, 0xab, 0x26, 0x48, 0x6d, 0xac, 0xf3, 0x7d, - 0x19, 0xca, 0x67, 0xc9, 0xe6, 0x90, 0x10, 0x48, 0x0f, 0x75, 0x57, 0x0e, 0x23, 0x29, 0x50, 0xe8, - 0xbe, 0xbd, 0xa4, 0x95, 0xab, 0xf6, 0x3b, 0x31, 0x7d, 0x92, 0xfa, 0xf3, 0x5c, 0xfe, 0x0c, 0xd9, - 0x5d, 0x20, 0xd7, 0xb0, 0xd9, 0x43, 0x03, 0xb9, 0xd2, 0xdc, 0x57, 0xdd, 0x01, 0x13, 0x02, 0x43, - 0xd2, 0x99, 0xf3, 0x22, 0x79, 0xe4, 0x89, 0xe6, 0x6e, 0xae, 0x66, 0x5f, 0xc7, 0x5c, 0x5c, 0x52, - 0x54, 0x91, 0x14, 0x0a, 0xdd, 0x05, 0x12, 0xc3, 0xce, 0xf4, 0xb6, 0xdb, 0x6d, 0xca, 0x76, 0x7e, - 0x56, 0xdb, 0x5a, 0xcd, 0xfd, 0x06, 0x51, 0xdf, 0xce, 0xfd, 0xe9, 0x49, 0xa9, 0xa3, 0xa4, 0x4d, - 0x06, 0xd5, 0x1e, 0xea, 0xa3, 0x60, 0xd2, 0xde, 0xfe, 0xfc, 0xf6, 0x32, 0xd2, 0x1f, 0xb6, 0x75, - 0x05, 0x8f, 0xa7, 0xad, 0x00, 0x85, 0xe6, 0x2c, 0xb4, 0x2d, 0xb5, 0x0a, 0x5a, 0x9a, 0x59, 0xe8, - 0xa2, 0x76, 0x3c, 0xf8, 0xff, 0xd6, 0x09, 0xee, 0xea, 0xec, 0xe7, 0xe9, 0xe4, 0x9b, 0x46, 0x91, - 0xc6, 0x15, 0x6c, 0xe5, 0x6f, 0x3a, 0x39, 0xc8, 0x13, 0xb9, 0xd7, 0x15, 0x8a, 0xb4, 0x02, 0xd8, - 0xe8, 0xa1, 0x36, 0xf3, 0x7f, 0x82, 0x3a, 0xe6, 0xbe, 0x22, 0x2f, 0xe6, 0x0d, 0x7c, 0x4a, 0x98, - 0xdc, 0xbc, 0x57, 0xc8, 0xcb, 0xfe, 0xd0, 0x29, 0xac, 0x4c, 0x9c, 0x83, 0xec, 0xe6, 0xf5, 0x30, - 0xe3, 0x2b, 0x05, 0x55, 0x1f, 0xbe, 0xf9, 0xd4, 0xb9, 0xe4, 0x7a, 0x30, 0xf2, 0x92, 0x93, 0xb6, - 0xa5, 0xbe, 0xe4, 0x32, 0xfd, 0x6a, 0x4f, 0x86, 0xaa, 0x6d, 0xb2, 0xdb, 0x46, 0x22, 0xf2, 0xbc, - 0x8a, 0x81, 0xaf, 0x7f, 0x05, 0x00, 0x00, 0xff, 0xff, 0x36, 0x94, 0x65, 0x38, 0x43, 0x07, 0x00, + 0x14, 0x65, 0x29, 0x2d, 0x7c, 0x97, 0x06, 0x92, 0x09, 0x1f, 0xd6, 0x22, 0xd8, 0x2c, 0x46, 0x1a, + 0x12, 0x5b, 0xa9, 0x24, 0xbe, 0x53, 0x4c, 0x43, 0x0c, 0x04, 0xb7, 0xfa, 0xe2, 0x8b, 0x99, 0xdd, + 0xbd, 0xd0, 0x21, 0xdb, 0x99, 0x65, 0x66, 0x16, 0xed, 0x93, 0x89, 0xff, 0xc8, 0x37, 0xff, 0x90, + 0xff, 0xc3, 0xec, 0xce, 0x76, 0x61, 0xeb, 0x96, 0x8d, 0x12, 0xdf, 0x7a, 0x66, 0xcf, 0x9d, 0x73, + 0xee, 0xed, 0xdc, 0x03, 0xab, 0xa1, 0x14, 0x5f, 0x26, 0x9d, 0x50, 0x0a, 0x2d, 0x08, 0x19, 0xb3, + 0xe0, 0x26, 0x52, 0x06, 0x75, 0x92, 0x2f, 0xcd, 0xba, 0x27, 0xc6, 0x63, 0xc1, 0xcd, 0x59, 0x73, + 0x8d, 0x71, 0x8d, 0x92, 0xd3, 0x20, 0xc5, 0xf5, 0xbb, 0x15, 0xf6, 0x0f, 0x0b, 0x76, 0x4e, 0xf8, + 0x0d, 0x0d, 0x98, 0x4f, 0x35, 0xf6, 0x45, 0x10, 0x9c, 0xa2, 0xa6, 0x7d, 0xea, 0x8d, 0xd0, 0xc1, + 0xeb, 0x08, 0x95, 0x26, 0x2f, 0x61, 0xc9, 0xa5, 0x0a, 0x1b, 0x56, 0xcb, 0x6a, 0xaf, 0xf6, 0x9e, + 0x74, 0x72, 0x8a, 0xa9, 0xd4, 0xa9, 0xba, 0x3c, 0xa2, 0x0a, 0x9d, 0x84, 0x49, 0x1e, 0xc1, 0xb2, + 0xef, 0x7e, 0xe2, 0x74, 0x8c, 0x8d, 0xc5, 0x96, 0xd5, 0xfe, 0xcf, 0xa9, 0xf9, 0xee, 0x19, 0x1d, + 0x23, 0xd9, 0x83, 0x75, 0x4f, 0x04, 0x01, 0x7a, 0x9a, 0x09, 0x6e, 0x08, 0x95, 0x84, 0xb0, 0x76, + 0x7b, 0x9c, 0x10, 0x6d, 0xa8, 0xdf, 0x9e, 0x9c, 0x1c, 0x37, 0x96, 0x5a, 0x56, 0xbb, 0xe2, 0xe4, + 0xce, 0xec, 0x2b, 0x68, 0xde, 0x71, 0x2e, 0xd1, 0x7f, 0xa0, 0xeb, 0x26, 0xac, 0x44, 0x2a, 0x9e, + 0x54, 0x66, 0x3b, 0xc3, 0xf6, 0x37, 0x0b, 0x36, 0x3f, 0x84, 0xff, 0x5e, 0x28, 0xfe, 0x16, 0x52, + 0xa5, 0x3e, 0x0b, 0xe9, 0xa7, 0xa3, 0xc9, 0xb0, 0xfd, 0x15, 0xb6, 0x1d, 0xbc, 0x90, 0xa8, 0x46, + 0xe7, 0x22, 0x60, 0xde, 0xe4, 0x84, 0x5f, 0x88, 0x07, 0x5a, 0xd9, 0x84, 0x9a, 0x08, 0xdf, 0x4f, + 0x42, 0x63, 0xa4, 0xea, 0xa4, 0x88, 0x6c, 0x40, 0x55, 0x84, 0x6f, 0x71, 0x92, 0x7a, 0x30, 0xc0, + 0xfe, 0x69, 0xc1, 0xfa, 0x10, 0xb5, 0x43, 0x35, 0xaa, 0xbf, 0xd7, 0x3c, 0x80, 0xaa, 0x8c, 0x6f, + 0x68, 0x2c, 0xb6, 0x2a, 0xed, 0xd5, 0xde, 0x56, 0xbe, 0x24, 0x7b, 0xad, 0xb1, 0x8a, 0x63, 0x98, + 0xe4, 0x35, 0xd4, 0x94, 0x4e, 0x6a, 0x2a, 0xad, 0x4a, 0x7b, 0xad, 0xf7, 0x34, 0x5f, 0x93, 0x82, + 0x77, 0x91, 0xd0, 0x74, 0x18, 0xf3, 0x9c, 0x94, 0x4e, 0x0e, 0xa1, 0xea, 0x09, 0x1f, 0x55, 0x63, + 0x29, 0xa9, 0xdb, 0x29, 0xb4, 0xf7, 0x46, 0x4a, 0x21, 0xfb, 0xc2, 0x47, 0xc7, 0x90, 0x7b, 0xdf, + 0x97, 0xa1, 0x7a, 0x1e, 0xaf, 0x12, 0x09, 0x80, 0x0c, 0x50, 0xf7, 0xc5, 0x38, 0x14, 0x1c, 0xb9, + 0x1e, 0x9a, 0x5b, 0x3b, 0x85, 0xf2, 0xbf, 0x13, 0xd3, 0x19, 0x35, 0x9f, 0x15, 0xf2, 0x67, 0xc8, + 0xf6, 0x02, 0xb9, 0x86, 0x8d, 0x01, 0x26, 0x90, 0x29, 0xcd, 0x3c, 0xd5, 0x1f, 0x51, 0xce, 0x31, + 0x20, 0xbd, 0x39, 0x23, 0x2a, 0x22, 0x4f, 0x35, 0x77, 0x0b, 0x35, 0x87, 0x5a, 0x32, 0x7e, 0xe9, + 0xa0, 0x0a, 0x05, 0x57, 0x68, 0x2f, 0x10, 0x09, 0xdb, 0xf9, 0xf5, 0x37, 0xeb, 0x95, 0x85, 0xc0, + 0xac, 0xb6, 0xc9, 0x9e, 0xfb, 0x13, 0xa3, 0xb9, 0x55, 0x38, 0xe6, 0xd8, 0x6a, 0x14, 0xb7, 0x49, + 0xa1, 0x3e, 0x40, 0x7d, 0xec, 0x4f, 0xdb, 0xdb, 0x9f, 0xdf, 0x5e, 0x46, 0xfa, 0xc3, 0xb6, 0xae, + 0xe0, 0x71, 0x3e, 0x1b, 0x90, 0x6b, 0x46, 0x03, 0xd3, 0x52, 0xa7, 0xa4, 0xa5, 0x99, 0x0d, 0x2f, + 0x6b, 0xc7, 0x85, 0xff, 0x6f, 0xa3, 0xe1, 0xae, 0xce, 0x7e, 0x91, 0x4e, 0x71, 0x8a, 0x94, 0x69, + 0x5c, 0xc1, 0x66, 0xf1, 0xea, 0x93, 0x83, 0x22, 0x91, 0x7b, 0x63, 0xa2, 0x4c, 0xcb, 0x87, 0xf5, + 0x01, 0xea, 0xe4, 0xfd, 0x9f, 0xa2, 0x96, 0xcc, 0x53, 0xe4, 0xf9, 0xbc, 0x07, 0x9f, 0x12, 0xa6, + 0x37, 0xef, 0x95, 0xf2, 0xb2, 0x7f, 0xe8, 0x0c, 0x56, 0xa6, 0x51, 0x42, 0x76, 0x8b, 0x7a, 0x98, + 0x09, 0x9a, 0x12, 0xd7, 0x47, 0x87, 0x1f, 0x7b, 0x97, 0x4c, 0x8f, 0x22, 0x37, 0xfe, 0xd2, 0x35, + 0xd4, 0x17, 0x4c, 0xa4, 0xbf, 0xba, 0xd3, 0x47, 0xd5, 0x4d, 0xaa, 0xbb, 0x89, 0x44, 0xe8, 0xba, + 0xb5, 0x04, 0xbe, 0xfa, 0x15, 0x00, 0x00, 0xff, 0xff, 0x47, 0x77, 0x96, 0xe8, 0x54, 0x07, 0x00, 0x00, } diff --git a/internal/proxy/error.go b/internal/proxy/error.go index 5c793ebd5b..1f4af9cded 100644 --- a/internal/proxy/error.go +++ b/internal/proxy/error.go @@ -24,8 +24,6 @@ import ( "google.golang.org/grpc/status" "github.com/milvus-io/milvus-proto/go-api/schemapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/types" ) // TODO(dragondriver): add more common error type @@ -92,22 +90,3 @@ func ErrProxyNotReady() error { func ErrPartitionNotExist(partitionName string) error { return fmt.Errorf("partition is not exist: %s", partitionName) } - -var ( - ErrRateLimit = errors.New("RequestLimited") - ErrForceDeny = errors.New("RequestDenied") -) - -func wrapRateLimitError() error { - return fmt.Errorf("[%w] request is rejected by grpc RateLimiter middleware, please retry later", ErrRateLimit) -} - -func wrapForceDenyError(rt internalpb.RateType, limiter types.Limiter) error { - switch rt { - case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad: - return fmt.Errorf("[%w] deny to write, reason: %s", ErrForceDeny, limiter.GetWriteStateReason()) - case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery: - return fmt.Errorf("[%w] deny to read, reason: %s", ErrForceDeny, limiter.GetReadStateReason()) - } - return nil -} diff --git a/internal/proxy/error_test.go b/internal/proxy/error_test.go index a757803a80..7cf7621a7d 100644 --- a/internal/proxy/error_test.go +++ b/internal/proxy/error_test.go @@ -17,14 +17,12 @@ package proxy import ( - "errors" "testing" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" ) func Test_errInvalidNumRows(t *testing.T) { @@ -152,16 +150,3 @@ func Test_errProxyIsUnhealthy(t *testing.T) { zap.Error(errProxyIsUnhealthy(id))) } } - -func Test_ErrRateLimitAndErrForceDeny(t *testing.T) { - err := wrapRateLimitError() - assert.True(t, errors.Is(err, ErrRateLimit)) - - limiter := NewMultiRateLimiter() - err = wrapForceDenyError(internalpb.RateType_DMLInsert, limiter) - assert.True(t, errors.Is(err, ErrForceDeny)) - err = wrapForceDenyError(internalpb.RateType_DMLDelete, limiter) - assert.True(t, errors.Is(err, ErrForceDeny)) - err = wrapForceDenyError(internalpb.RateType_DQLSearch, limiter) - assert.True(t, errors.Is(err, ErrForceDeny)) -} diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 29699bfeea..446f6a68ca 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -4310,11 +4310,11 @@ func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesReques resp.Reason = err.Error() return resp, nil } - node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetStateReasons()) + node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetCodes()) log.Info("current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Any("rates", request.GetRates())) if len(request.GetStates()) != 0 { for i := range request.GetStates() { - log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetStateReasons()[i])) + log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetCodes()[i].String())) } } resp.ErrorCode = commonpb.ErrorCode_Success diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index 2496ca744d..826098fe00 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -133,8 +133,8 @@ func TestProxy_CheckHealth(t *testing.T) { assert.Equal(t, 0, len(resp.GetReasons())) states := []milvuspb.QuotaState{milvuspb.QuotaState_DenyToWrite, milvuspb.QuotaState_DenyToRead} - reasons := []string{"memory quota exhausted", "manually deny to read"} - node.multiRateLimiter.SetQuotaStates(states, reasons) + codes := []commonpb.ErrorCode{commonpb.ErrorCode_MemoryQuotaExhausted, commonpb.ErrorCode_ForceDeny} + node.multiRateLimiter.SetQuotaStates(states, codes) resp, err = node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) assert.Equal(t, true, resp.IsHealthy) diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index 348799a145..6c2ef9cb1a 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" @@ -31,13 +32,24 @@ import ( "github.com/milvus-io/milvus/internal/util/ratelimitutil" ) +var QuotaErrorString = map[commonpb.ErrorCode]string{ + commonpb.ErrorCode_ForceDeny: "manually force deny", + commonpb.ErrorCode_MemoryQuotaExhausted: "memory quota exhausted, please allocate more resources", + commonpb.ErrorCode_DiskQuotaExhausted: "disk quota exhausted, please allocate more resources", + commonpb.ErrorCode_TimeTickLongDelay: "time tick long delay", +} + +func GetQuotaErrorString(errCode commonpb.ErrorCode) string { + return QuotaErrorString[errCode] +} + // MultiRateLimiter includes multilevel rate limiters, such as global rateLimiter, // collection level rateLimiter and so on. It also implements Limiter interface. type MultiRateLimiter struct { globalRateLimiter *rateLimiter // TODO: add collection level rateLimiter quotaStatesMu sync.RWMutex - quotaStates map[milvuspb.QuotaState]string + quotaStates map[milvuspb.QuotaState]commonpb.ErrorCode } // NewMultiRateLimiter returns a new MultiRateLimiter. @@ -48,18 +60,32 @@ func NewMultiRateLimiter() *MultiRateLimiter { } // Check checks if request would be limited or denied. -func (m *MultiRateLimiter) Check(rt internalpb.RateType, n int) error { +func (m *MultiRateLimiter) Check(rt internalpb.RateType, n int) commonpb.ErrorCode { if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() { - return nil + return commonpb.ErrorCode_Success } limit, rate := m.globalRateLimiter.limit(rt, n) if rate == 0 { - return wrapForceDenyError(rt, m) + return m.GetErrorCode(rt) } if limit { - return wrapRateLimitError() + return commonpb.ErrorCode_RateLimit } - return nil + return commonpb.ErrorCode_Success +} + +func (m *MultiRateLimiter) GetErrorCode(rt internalpb.RateType) commonpb.ErrorCode { + switch rt { + case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad: + m.quotaStatesMu.RLock() + defer m.quotaStatesMu.RUnlock() + return m.quotaStates[milvuspb.QuotaState_DenyToWrite] + case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery: + m.quotaStatesMu.RLock() + defer m.quotaStatesMu.RUnlock() + return m.quotaStates[milvuspb.QuotaState_DenyToRead] + } + return commonpb.ErrorCode_Success } // GetQuotaStates returns quota states. @@ -70,30 +96,18 @@ func (m *MultiRateLimiter) GetQuotaStates() ([]milvuspb.QuotaState, []string) { reasons := make([]string, 0, len(m.quotaStates)) for k, v := range m.quotaStates { states = append(states, k) - reasons = append(reasons, v) + reasons = append(reasons, GetQuotaErrorString(v)) } return states, reasons } -func (m *MultiRateLimiter) GetReadStateReason() string { - m.quotaStatesMu.RLock() - defer m.quotaStatesMu.RUnlock() - return m.quotaStates[milvuspb.QuotaState_DenyToRead] -} - -func (m *MultiRateLimiter) GetWriteStateReason() string { - m.quotaStatesMu.RLock() - defer m.quotaStatesMu.RUnlock() - return m.quotaStates[milvuspb.QuotaState_DenyToWrite] -} - // SetQuotaStates sets quota states for MultiRateLimiter. -func (m *MultiRateLimiter) SetQuotaStates(states []milvuspb.QuotaState, reasons []string) { +func (m *MultiRateLimiter) SetQuotaStates(states []milvuspb.QuotaState, codes []commonpb.ErrorCode) { m.quotaStatesMu.Lock() defer m.quotaStatesMu.Unlock() - m.quotaStates = make(map[milvuspb.QuotaState]string, len(states)) + m.quotaStates = make(map[milvuspb.QuotaState]commonpb.ErrorCode, len(states)) for i := 0; i < len(states); i++ { - m.quotaStates[states[i]] = reasons[i] + m.quotaStates[states[i]] = codes[i] } } diff --git a/internal/proxy/multi_rate_limiter_test.go b/internal/proxy/multi_rate_limiter_test.go index 78aea55ca9..2c1e310821 100644 --- a/internal/proxy/multi_rate_limiter_test.go +++ b/internal/proxy/multi_rate_limiter_test.go @@ -17,12 +17,11 @@ package proxy import ( - "errors" "fmt" "math" "testing" - "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/ratelimitutil" @@ -38,12 +37,12 @@ func TestMultiRateLimiter(t *testing.T) { multiLimiter.globalRateLimiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1) } for _, rt := range internalpb.RateType_value { - err := multiLimiter.Check(internalpb.RateType(rt), 1) - assert.NoError(t, err) - err = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt) - assert.NoError(t, err) - err = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt) - assert.True(t, errors.Is(err, ErrRateLimit)) + errCode := multiLimiter.Check(internalpb.RateType(rt), 1) + assert.Equal(t, commonpb.ErrorCode_Success, errCode) + errCode = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt) + assert.Equal(t, commonpb.ErrorCode_Success, errCode) + errCode = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt) + assert.Equal(t, commonpb.ErrorCode_RateLimit, errCode) } Params.QuotaConfig.QuotaAndLimitsEnabled = bak }) @@ -53,8 +52,8 @@ func TestMultiRateLimiter(t *testing.T) { bak := Params.QuotaConfig.QuotaAndLimitsEnabled paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "false") for _, rt := range internalpb.RateType_value { - err := multiLimiter.Check(internalpb.RateType(rt), 1) - assert.NoError(t, err) + errCode := multiLimiter.Check(internalpb.RateType(rt), 1) + assert.Equal(t, commonpb.ErrorCode_Success, errCode) } Params.QuotaConfig.QuotaAndLimitsEnabled = bak }) @@ -66,8 +65,8 @@ func TestMultiRateLimiter(t *testing.T) { multiLimiter := NewMultiRateLimiter() bak := Params.QuotaConfig.QuotaAndLimitsEnabled paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true") - err := multiLimiter.Check(internalpb.RateType_DMLInsert, 1*1024*1024) - assert.NoError(t, err) + errCode := multiLimiter.Check(internalpb.RateType_DMLInsert, 1*1024*1024) + assert.Equal(t, commonpb.ErrorCode_Success, errCode) Params.QuotaConfig.QuotaAndLimitsEnabled = bak Params.QuotaConfig.DMLMaxInsertRate = bakInsertRate } @@ -77,17 +76,6 @@ func TestMultiRateLimiter(t *testing.T) { run(math.MaxFloat64 / 3) run(math.MaxFloat64 / 10000) }) - - t.Run("test GetReadStateReason and GetWriteStateReason", func(t *testing.T) { - multiLimiter := NewMultiRateLimiter() - states := []milvuspb.QuotaState{milvuspb.QuotaState_DenyToWrite, milvuspb.QuotaState_DenyToRead} - writeReason := "memory quota exhausted" - readReason := "manually deny to read" - reasons := []string{writeReason, readReason} - multiLimiter.SetQuotaStates(states, reasons) - assert.Equal(t, writeReason, multiLimiter.GetWriteStateReason()) - assert.Equal(t, readReason, multiLimiter.GetReadStateReason()) - }) } func TestRateLimiter(t *testing.T) { diff --git a/internal/proxy/rate_limit_interceptor.go b/internal/proxy/rate_limit_interceptor.go index a84ae9eec8..0836806da0 100644 --- a/internal/proxy/rate_limit_interceptor.go +++ b/internal/proxy/rate_limit_interceptor.go @@ -18,7 +18,6 @@ package proxy import ( "context" - "errors" "fmt" "reflect" @@ -38,15 +37,9 @@ func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor { if err != nil { return handler(ctx, req) } - err = limiter.Check(rt, n) - if errors.Is(err, ErrForceDeny) { - rsp := getFailedResponse(req, commonpb.ErrorCode_ForceDeny, info.FullMethod, err) - if rsp != nil { - return rsp, nil - } - } - if errors.Is(err, ErrRateLimit) { - rsp := getFailedResponse(req, commonpb.ErrorCode_RateLimit, info.FullMethod, err) + code := limiter.Check(rt, n) + if code != commonpb.ErrorCode_Success { + rsp := getFailedResponse(req, rt, code, info.FullMethod) if rsp != nil { return rsp, nil } @@ -113,37 +106,53 @@ func failedBoolResponse(code commonpb.ErrorCode, reason string) *milvuspb.BoolRe } } +func wrapQuotaError(rt internalpb.RateType, errCode commonpb.ErrorCode, fullMethod string) error { + if errCode == commonpb.ErrorCode_RateLimit { + return fmt.Errorf("request is rejected by grpc RateLimiter middleware, please retry later, req: %s", fullMethod) + } + + // deny to write/read + var op string + switch rt { + case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad: + op = "write" + case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery: + op = "read" + } + return fmt.Errorf("deny to %s, reason: %s, req: %s", op, GetQuotaErrorString(errCode), fullMethod) +} + // getFailedResponse returns failed response. -func getFailedResponse(req interface{}, code commonpb.ErrorCode, fullMethod string, err error) interface{} { - reason := fmt.Sprintf("%s, req: %s", err, fullMethod) +func getFailedResponse(req interface{}, rt internalpb.RateType, errCode commonpb.ErrorCode, fullMethod string) interface{} { + err := wrapQuotaError(rt, errCode, fullMethod) switch req.(type) { case *milvuspb.InsertRequest, *milvuspb.DeleteRequest: - return failedMutationResult(code, reason) + return failedMutationResult(errCode, err.Error()) case *milvuspb.ImportRequest: return &milvuspb.ImportResponse{ - Status: failedStatus(code, reason), + Status: failedStatus(errCode, err.Error()), } case *milvuspb.SearchRequest: return &milvuspb.SearchResults{ - Status: failedStatus(code, reason), + Status: failedStatus(errCode, err.Error()), } case *milvuspb.QueryRequest: return &milvuspb.QueryResults{ - Status: failedStatus(code, reason), + Status: failedStatus(errCode, err.Error()), } case *milvuspb.CreateCollectionRequest, *milvuspb.DropCollectionRequest, *milvuspb.LoadCollectionRequest, *milvuspb.ReleaseCollectionRequest, *milvuspb.CreatePartitionRequest, *milvuspb.DropPartitionRequest, *milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest, *milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest: - return failedStatus(code, reason) + return failedStatus(errCode, err.Error()) case *milvuspb.FlushRequest: return &milvuspb.FlushResponse{ - Status: failedStatus(code, reason), + Status: failedStatus(errCode, err.Error()), } case *milvuspb.ManualCompactionRequest: return &milvuspb.ManualCompactionResponse{ - Status: failedStatus(code, reason), + Status: failedStatus(errCode, err.Error()), } } return nil diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go index 98d1dc11b0..9ac39e666e 100644 --- a/internal/proxy/rate_limit_interceptor_test.go +++ b/internal/proxy/rate_limit_interceptor_test.go @@ -18,7 +18,6 @@ package proxy import ( "context" - "fmt" "testing" "github.com/golang/protobuf/proto" @@ -34,35 +33,17 @@ type limiterMock struct { limit bool rate float64 quotaStates []milvuspb.QuotaState - quotaStateReasons []string + quotaStateReasons []commonpb.ErrorCode } -func (l *limiterMock) Check(rt internalpb.RateType, n int) error { +func (l *limiterMock) Check(rt internalpb.RateType, n int) commonpb.ErrorCode { if l.rate == 0 { - return ErrForceDeny + return commonpb.ErrorCode_ForceDeny } if l.limit { - return ErrRateLimit + return commonpb.ErrorCode_RateLimit } - return nil -} - -func (l *limiterMock) GetReadStateReason() string { - for i := range l.quotaStates { - if l.quotaStates[i] == milvuspb.QuotaState_DenyToRead { - return l.quotaStateReasons[i] - } - } - return "" -} - -func (l *limiterMock) GetWriteStateReason() string { - for i := range l.quotaStates { - if l.quotaStates[i] == milvuspb.QuotaState_DenyToWrite { - return l.quotaStateReasons[i] - } - } - return "" + return commonpb.ErrorCode_Success } func TestRateLimitInterceptor(t *testing.T) { @@ -119,23 +100,23 @@ func TestRateLimitInterceptor(t *testing.T) { }) t.Run("test getFailedResponse", func(t *testing.T) { - testGetFailedResponse := func(req interface{}) { - rsp := getFailedResponse(req, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err")) + testGetFailedResponse := func(req interface{}, rt internalpb.RateType, errCode commonpb.ErrorCode, fullMethod string) { + rsp := getFailedResponse(req, rt, errCode, fullMethod) assert.NotNil(t, rsp) } - testGetFailedResponse(&milvuspb.DeleteRequest{}) - testGetFailedResponse(&milvuspb.ImportRequest{}) - testGetFailedResponse(&milvuspb.SearchRequest{}) - testGetFailedResponse(&milvuspb.QueryRequest{}) - testGetFailedResponse(&milvuspb.CreateCollectionRequest{}) - testGetFailedResponse(&milvuspb.FlushRequest{}) - testGetFailedResponse(&milvuspb.ManualCompactionRequest{}) + testGetFailedResponse(&milvuspb.DeleteRequest{}, internalpb.RateType_DMLDelete, commonpb.ErrorCode_ForceDeny, "delete") + testGetFailedResponse(&milvuspb.ImportRequest{}, internalpb.RateType_DMLBulkLoad, commonpb.ErrorCode_MemoryQuotaExhausted, "import") + testGetFailedResponse(&milvuspb.SearchRequest{}, internalpb.RateType_DQLSearch, commonpb.ErrorCode_DiskQuotaExhausted, "search") + testGetFailedResponse(&milvuspb.QueryRequest{}, internalpb.RateType_DQLQuery, commonpb.ErrorCode_ForceDeny, "query") + testGetFailedResponse(&milvuspb.CreateCollectionRequest{}, internalpb.RateType_DDLCollection, commonpb.ErrorCode_RateLimit, "createCollection") + testGetFailedResponse(&milvuspb.FlushRequest{}, internalpb.RateType_DDLFlush, commonpb.ErrorCode_RateLimit, "flush") + testGetFailedResponse(&milvuspb.ManualCompactionRequest{}, internalpb.RateType_DDLCompaction, commonpb.ErrorCode_RateLimit, "compaction") // test illegal - rsp := getFailedResponse(&milvuspb.SearchResults{}, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err")) + rsp := getFailedResponse(&milvuspb.SearchResults{}, internalpb.RateType_DQLSearch, commonpb.ErrorCode_UnexpectedError, "method") assert.Nil(t, rsp) - rsp = getFailedResponse(nil, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err")) + rsp = getFailedResponse(nil, internalpb.RateType_DQLSearch, commonpb.ErrorCode_UnexpectedError, "method") assert.Nil(t, rsp) }) diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index b6d47098fe..6e689eb5c5 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -46,31 +46,6 @@ const ( SetRatesTimeout = 10 * time.Second ) -type TriggerReason int32 - -const ( - ManuallyDenyToRead TriggerReason = 0 - ManuallyDenyToWrite TriggerReason = 1 - MemoryQuotaExhausted TriggerReason = 2 - DiskQuotaExhausted TriggerReason = 3 - TimeTickLongDelay TriggerReason = 4 -) - -var TriggerReasonString = map[TriggerReason]string{ - ManuallyDenyToRead: "manually deny to read", - ManuallyDenyToWrite: "manually deny to write", - MemoryQuotaExhausted: "memory quota exhausted, please allocate more resources", - DiskQuotaExhausted: "disk quota exhausted, please allocate more resources", - TimeTickLongDelay: "time tick long delay", -} - -func (t TriggerReason) String() string { - if s, ok := TriggerReasonString[t]; ok { - return s - } - return "" -} - type RateAllocateStrategy int32 const ( @@ -114,7 +89,7 @@ type QuotaCenter struct { dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics currentRates map[internalpb.RateType]Limit - quotaStates map[milvuspb.QuotaState]string + quotaStates map[milvuspb.QuotaState]commonpb.ErrorCode tsoAllocator tso.Allocator rateAllocateStrategy RateAllocateStrategy @@ -130,7 +105,7 @@ func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, da queryCoord: queryCoord, dataCoord: dataCoord, currentRates: make(map[internalpb.RateType]Limit), - quotaStates: make(map[milvuspb.QuotaState]string), + quotaStates: make(map[milvuspb.QuotaState]commonpb.ErrorCode), tsoAllocator: tsoAllocator, rateAllocateStrategy: DefaultRateAllocateStrategy, @@ -271,20 +246,20 @@ func (q *QuotaCenter) syncMetrics() error { } // forceDenyWriting sets dml rates to 0 to reject all dml requests. -func (q *QuotaCenter) forceDenyWriting(reason TriggerReason) { +func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode) { q.currentRates[internalpb.RateType_DMLInsert] = 0 q.currentRates[internalpb.RateType_DMLDelete] = 0 q.currentRates[internalpb.RateType_DMLBulkLoad] = 0 - log.Warn("QuotaCenter force to deny writing", zap.String("reason", reason.String())) - q.quotaStates[milvuspb.QuotaState_DenyToWrite] = reason.String() + log.Warn("QuotaCenter force to deny writing", zap.String("reason", errorCode.String())) + q.quotaStates[milvuspb.QuotaState_DenyToWrite] = errorCode } // forceDenyWriting sets dql rates to 0 to reject all dql requests. -func (q *QuotaCenter) forceDenyReading(reason TriggerReason) { +func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode) { q.currentRates[internalpb.RateType_DQLSearch] = 0 q.currentRates[internalpb.RateType_DQLQuery] = 0 - log.Warn("QuotaCenter force to deny reading", zap.String("reason", reason.String())) - q.quotaStates[milvuspb.QuotaState_DenyToRead] = reason.String() + log.Warn("QuotaCenter force to deny reading", zap.String("reason", errorCode.String())) + q.quotaStates[milvuspb.QuotaState_DenyToRead] = errorCode } // getRealTimeRate return real time rate in Proxy. @@ -310,7 +285,7 @@ func (q *QuotaCenter) guaranteeMinRate(minRate float64, rateType internalpb.Rate // calculateReadRates calculates and sets dql rates. func (q *QuotaCenter) calculateReadRates() { if Params.QuotaConfig.ForceDenyReading.GetAsBool() { - q.forceDenyReading(ManuallyDenyToRead) + q.forceDenyReading(commonpb.ErrorCode_ForceDeny) return } @@ -355,13 +330,13 @@ func (q *QuotaCenter) calculateReadRates() { // calculateWriteRates calculates and sets dml rates. func (q *QuotaCenter) calculateWriteRates() error { if Params.QuotaConfig.ForceDenyWriting.GetAsBool() { - q.forceDenyWriting(ManuallyDenyToWrite) + q.forceDenyWriting(commonpb.ErrorCode_ForceDeny) return nil } exceeded := q.ifDiskQuotaExceeded() if exceeded { - q.forceDenyWriting(DiskQuotaExhausted) // disk quota protection + q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted) // disk quota protection return nil } @@ -371,13 +346,13 @@ func (q *QuotaCenter) calculateWriteRates() error { } ttFactor := q.getTimeTickDelayFactor(ts) if ttFactor <= 0 { - q.forceDenyWriting(TimeTickLongDelay) // tt protection + q.forceDenyWriting(commonpb.ErrorCode_TimeTickLongDelay) // tt protection return nil } memFactor := q.getMemoryFactor() if memFactor <= 0 { - q.forceDenyWriting(MemoryQuotaExhausted) // memory protection + q.forceDenyWriting(commonpb.ErrorCode_MemoryQuotaExhausted) // memory protection return nil } @@ -430,7 +405,7 @@ func (q *QuotaCenter) resetCurrentRates() { q.currentRates[rt] = Inf // no limit } } - q.quotaStates = make(map[milvuspb.QuotaState]string) + q.quotaStates = make(map[milvuspb.QuotaState]commonpb.ErrorCode) } // getTimeTickDelayFactor gets time tick delay of DataNodes and QueryNodes, @@ -689,10 +664,10 @@ func (q *QuotaCenter) setRates() error { // TODO: support ByRateWeight } states := make([]milvuspb.QuotaState, 0, len(q.quotaStates)) - stateReasons := make([]string, 0, len(q.quotaStates)) + codes := make([]commonpb.ErrorCode, 0, len(q.quotaStates)) for k, v := range q.quotaStates { states = append(states, k) - stateReasons = append(stateReasons, v) + codes = append(codes, v) } timestamp := tsoutil.ComposeTSByTime(time.Now(), 0) req := &proxypb.SetRatesRequest{ @@ -700,26 +675,25 @@ func (q *QuotaCenter) setRates() error { commonpbutil.WithMsgID(int64(timestamp)), commonpbutil.WithTimeStamp(timestamp), ), - Rates: map2List(), - States: states, - StateReasons: stateReasons, + Rates: map2List(), + States: states, + Codes: codes, } return q.proxies.SetRates(ctx, req) } // recordMetrics records metrics of quota states. func (q *QuotaCenter) recordMetrics() { - for _, reason := range TriggerReasonString { - hit := false + record := func(errorCode commonpb.ErrorCode) { for _, v := range q.quotaStates { - if v == reason { - hit = true + if v == errorCode { + metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String()).Set(1) + return } } - if hit { - metrics.RootCoordQuotaStates.WithLabelValues(reason).Set(1) - } else { - metrics.RootCoordQuotaStates.WithLabelValues(reason).Set(0) - } + metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String()).Set(0) } + record(commonpb.ErrorCode_MemoryQuotaExhausted) + record(commonpb.ErrorCode_DiskQuotaExhausted) + record(commonpb.ErrorCode_TimeTickLongDelay) } diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 446e7b7591..83dab5a82a 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -115,10 +115,10 @@ func TestQuotaCenter(t *testing.T) { t.Run("test forceDeny", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) - quotaCenter.forceDenyReading(ManuallyDenyToRead) + quotaCenter.forceDenyReading(commonpb.ErrorCode_ForceDeny) assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) - quotaCenter.forceDenyWriting(ManuallyDenyToWrite) + quotaCenter.forceDenyWriting(commonpb.ErrorCode_ForceDeny) assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DMLInsert]) assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DMLDelete]) }) @@ -441,16 +441,16 @@ func TestQuotaCenter(t *testing.T) { t.Run("test setRates", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) quotaCenter.currentRates[internalpb.RateType_DMLInsert] = 100 - quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = TriggerReasonString[MemoryQuotaExhausted] - quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = TriggerReasonString[ManuallyDenyToRead] + quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = commonpb.ErrorCode_MemoryQuotaExhausted + quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = commonpb.ErrorCode_ForceDeny err = quotaCenter.setRates() assert.NoError(t, err) }) t.Run("test recordMetrics", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) - quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = TriggerReasonString[MemoryQuotaExhausted] - quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = TriggerReasonString[ManuallyDenyToRead] + quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = commonpb.ErrorCode_MemoryQuotaExhausted + quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = commonpb.ErrorCode_ForceDeny quotaCenter.recordMetrics() }) diff --git a/internal/types/types.go b/internal/types/types.go index e6ef0162ca..3b20c97f60 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -41,9 +41,7 @@ type TimeTickProvider interface { // If Limit function return true, the request will be rejected. // Otherwise, the request will pass. Limit also returns limit of limiter. type Limiter interface { - Check(rt internalpb.RateType, n int) error - GetReadStateReason() string - GetWriteStateReason() string + Check(rt internalpb.RateType, n int) commonpb.ErrorCode } // Component is the interface all services implement