Show trigger reason when deny to read/write (#21368)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/21484/head
bigsheeper 2022-12-30 18:35:32 +08:00 committed by GitHub
parent cf40bb9b5c
commit c187de7754
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 493 additions and 208 deletions

2
go.mod
View File

@ -27,7 +27,7 @@ require (
github.com/klauspost/compress v1.14.2
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0
github.com/minio/minio-go/v7 v7.0.17
github.com/opentracing/opentracing-go v1.2.0
github.com/panjf2000/ants/v2 v2.4.8

6
go.sum
View File

@ -488,8 +488,10 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyex
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651 h1:lXwp7St1mNKatOnl2mt6TU3QRpMTf75liXqTGmTkjis=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221213131318-537b49f7c0aa h1:ok2ZT20iWlDqXWBzgVpbYev4tsOKvqUXPIJ1EUaQdEg=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221213131318-537b49f7c0aa/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0 h1:GSiYfmb/CgWCdTKHzI0zl0L1xTr9/kaM6wr1O882lYc=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=
github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -363,7 +363,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
".ObjectType\022>\n\020object_privilege\030\002 \001(\0162$."
"milvus.proto.common.ObjectPrivilege\022\031\n\021o"
"bject_name_index\030\003 \001(\005\022\032\n\022object_name_in"
"dexs\030\004 \001(\005*\226\t\n\tErrorCode\022\013\n\007Success\020\000\022\023\n"
"dexs\030\004 \001(\005*\254\t\n\tErrorCode\022\013\n\007Success\020\000\022\023\n"
"\017UnexpectedError\020\001\022\021\n\rConnectFailed\020\002\022\024\n"
"\020PermissionDenied\020\003\022\027\n\023CollectionNotExis"
"ts\020\004\022\023\n\017IllegalArgument\020\005\022\024\n\020IllegalDime"
@ -391,94 +391,95 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"cheFailure\020+\022\025\n\021ListPolicyFailure\020,\022\022\n\016N"
"otShardLeader\020-\022\026\n\022NoReplicaAvailable\020.\022"
"\023\n\017SegmentNotFound\020/\022\r\n\tForceDeny\0200\022\r\n\tR"
"ateLimit\0201\022\022\n\016NodeIDNotMatch\0202\022\017\n\013DataCo"
"ordNA\020d\022\022\n\rDDRequestRace\020\350\007*c\n\nIndexStat"
"e\022\022\n\016IndexStateNone\020\000\022\014\n\010Unissued\020\001\022\016\n\nI"
"nProgress\020\002\022\014\n\010Finished\020\003\022\n\n\006Failed\020\004\022\t\n"
"\005Retry\020\005*\202\001\n\014SegmentState\022\024\n\020SegmentStat"
"eNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Se"
"aled\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013\n\007Dro"
"pped\020\006\022\r\n\tImporting\020\007*>\n\017PlaceholderType"
"\022\010\n\004None\020\000\022\020\n\014BinaryVector\020d\022\017\n\013FloatVec"
"tor\020e*\215\r\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n\020Crea"
"teCollection\020d\022\022\n\016DropCollection\020e\022\021\n\rHa"
"sCollection\020f\022\026\n\022DescribeCollection\020g\022\023\n"
"\017ShowCollections\020h\022\024\n\020GetSystemConfigs\020i"
"\022\022\n\016LoadCollection\020j\022\025\n\021ReleaseCollectio"
"n\020k\022\017\n\013CreateAlias\020l\022\r\n\tDropAlias\020m\022\016\n\nA"
"lterAlias\020n\022\023\n\017AlterCollection\020o\022\024\n\017Crea"
"tePartition\020\310\001\022\022\n\rDropPartition\020\311\001\022\021\n\014Ha"
"sPartition\020\312\001\022\026\n\021DescribePartition\020\313\001\022\023\n"
"\016ShowPartitions\020\314\001\022\023\n\016LoadPartitions\020\315\001\022"
"\026\n\021ReleasePartitions\020\316\001\022\021\n\014ShowSegments\020"
"\372\001\022\024\n\017DescribeSegment\020\373\001\022\021\n\014LoadSegments"
"\020\374\001\022\024\n\017ReleaseSegments\020\375\001\022\024\n\017HandoffSegm"
"ents\020\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025\n\020Des"
"cribeSegments\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\rDe"
"scribeIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Insert"
"\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022ResendSe"
"gmentStats\020\223\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResu"
"lt\020\365\003\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBui"
"ldProgress\020\367\003\022\034\n\027GetCollectionStatistics"
"\020\370\003\022\033\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retr"
"ieve\020\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmC"
"hannels\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022Wat"
"chQueryChannels\020\376\003\022\030\n\023RemoveQueryChannel"
"s\020\377\003\022\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022W"
"atchDeltaChannels\020\201\004\022\024\n\017GetShardLeaders\020"
"\202\004\022\020\n\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204"
"\004\022\024\n\017GetDistribution\020\205\004\022\025\n\020SyncDistribut"
"ion\020\206\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331"
"\004\022\024\n\017GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentSta"
"te\020\333\004\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261"
"\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nReq"
"uestTSO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021Segm"
"entStatistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022"
"\017\n\nDataNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022"
"\022\n\rGetCredential\020\335\013\022\025\n\020DeleteCredential\020"
"\336\013\022\025\n\020UpdateCredential\020\337\013\022\026\n\021ListCredUse"
"rnames\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301"
"\014\022\024\n\017OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014"
"\022\017\n\nSelectUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025"
"\n\020OperatePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022"
"\033\n\026RefreshPolicyInfoCache\020\310\014\022\017\n\nListPoli"
"cy\020\311\014*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020"
"\001*B\n\017CompactionState\022\021\n\rUndefiedState\020\000\022"
"\r\n\tExecuting\020\001\022\r\n\tCompleted\020\002*X\n\020Consist"
"encyLevel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bo"
"unded\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*"
"\236\001\n\013ImportState\022\021\n\rImportPending\020\000\022\020\n\014Im"
"portFailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017Impor"
"tPersisted\020\005\022\021\n\rImportFlushed\020\010\022\023\n\017Impor"
"tCompleted\020\006\022\032\n\026ImportFailedAndCleaned\020\007"
"*2\n\nObjectType\022\016\n\nCollection\020\000\022\n\n\006Global"
"\020\001\022\010\n\004User\020\002*\206\005\n\017ObjectPrivilege\022\020\n\014Priv"
"ilegeAll\020\000\022\035\n\031PrivilegeCreateCollection\020"
"\001\022\033\n\027PrivilegeDropCollection\020\002\022\037\n\033Privil"
"egeDescribeCollection\020\003\022\034\n\030PrivilegeShow"
"Collections\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Priv"
"ilegeRelease\020\006\022\027\n\023PrivilegeCompaction\020\007\022"
"\023\n\017PrivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020"
"\t\022\032\n\026PrivilegeGetStatistics\020\n\022\030\n\024Privile"
"geCreateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020"
"\014\022\026\n\022PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSe"
"arch\020\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQ"
"uery\020\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Priv"
"ilegeImport\020\022\022\034\n\030PrivilegeCreateOwnershi"
"p\020\023\022\027\n\023PrivilegeUpdateUser\020\024\022\032\n\026Privileg"
"eDropOwnership\020\025\022\034\n\030PrivilegeSelectOwner"
"ship\020\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023"
"PrivilegeSelectUser\020\030*S\n\tStateCode\022\020\n\014In"
"itializing\020\000\022\013\n\007Healthy\020\001\022\014\n\010Abnormal\020\002\022"
"\013\n\007StandBy\020\003\022\014\n\010Stopping\020\004*c\n\tLoadState\022"
"\025\n\021LoadStateNotExist\020\000\022\024\n\020LoadStateNotLo"
"ad\020\001\022\024\n\020LoadStateLoading\020\002\022\023\n\017LoadStateL"
"oaded\020\003:^\n\021privilege_ext_obj\022\037.google.pr"
"otobuf.MessageOptions\030\351\007 \001(\0132!.milvus.pr"
"oto.common.PrivilegeExtBf\n\016io.milvus.grp"
"cB\013CommonProtoP\001Z1github.com/milvus-io/m"
"ilvus-proto/go-api/commonpb\240\001\001\252\002\016IO.Milv"
"us.Grpcb\006proto3"
"ateLimit\0201\022\022\n\016NodeIDNotMatch\0202\022\024\n\020Upsert"
"AutoIDTrue\0203\022\017\n\013DataCoordNA\020d\022\022\n\rDDReque"
"stRace\020\350\007*c\n\nIndexState\022\022\n\016IndexStateNon"
"e\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Fin"
"ished\020\003\022\n\n\006Failed\020\004\022\t\n\005Retry\020\005*\202\001\n\014Segme"
"ntState\022\024\n\020SegmentStateNone\020\000\022\014\n\010NotExis"
"t\020\001\022\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020"
"\004\022\014\n\010Flushing\020\005\022\013\n\007Dropped\020\006\022\r\n\tImportin"
"g\020\007*>\n\017PlaceholderType\022\010\n\004None\020\000\022\020\n\014Bina"
"ryVector\020d\022\017\n\013FloatVector\020e*\232\r\n\007MsgType\022"
"\r\n\tUndefined\020\000\022\024\n\020CreateCollection\020d\022\022\n\016"
"DropCollection\020e\022\021\n\rHasCollection\020f\022\026\n\022D"
"escribeCollection\020g\022\023\n\017ShowCollections\020h"
"\022\024\n\020GetSystemConfigs\020i\022\022\n\016LoadCollection"
"\020j\022\025\n\021ReleaseCollection\020k\022\017\n\013CreateAlias"
"\020l\022\r\n\tDropAlias\020m\022\016\n\nAlterAlias\020n\022\023\n\017Alt"
"erCollection\020o\022\024\n\017CreatePartition\020\310\001\022\022\n\r"
"DropPartition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021D"
"escribePartition\020\313\001\022\023\n\016ShowPartitions\020\314\001"
"\022\023\n\016LoadPartitions\020\315\001\022\026\n\021ReleasePartitio"
"ns\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegm"
"ent\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegm"
"ents\020\375\001\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBal"
"anceSegments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020"
"\n\013CreateIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\t"
"DropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n"
"\n\005Flush\020\222\003\022\027\n\022ResendSegmentStats\020\223\003\022\013\n\006U"
"psert\020\224\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003"
"\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBuildPro"
"gress\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033"
"\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retrieve\020"
"\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChanne"
"ls\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQue"
"ryChannels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022"
"\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchD"
"eltaChannels\020\201\004\022\024\n\017GetShardLeaders\020\202\004\022\020\n"
"\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204\004\022\024\n\017"
"GetDistribution\020\205\004\022\025\n\020SyncDistribution\020\206"
"\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017"
"GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004"
"\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\t"
"LoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestT"
"SO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentSt"
"atistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDa"
"taNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGe"
"tCredential\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n"
"\020UpdateCredential\020\337\013\022\026\n\021ListCredUsername"
"s\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017"
"OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014\022\017\n\nS"
"electUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025\n\020Ope"
"ratePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022\033\n\026Re"
"freshPolicyInfoCache\020\310\014\022\017\n\nListPolicy\020\311\014"
"*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017"
"CompactionState\022\021\n\rUndefiedState\020\000\022\r\n\tEx"
"ecuting\020\001\022\r\n\tCompleted\020\002*X\n\020ConsistencyL"
"evel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bounded"
"\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\236\001\n\013I"
"mportState\022\021\n\rImportPending\020\000\022\020\n\014ImportF"
"ailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017ImportPers"
"isted\020\005\022\021\n\rImportFlushed\020\010\022\023\n\017ImportComp"
"leted\020\006\022\032\n\026ImportFailedAndCleaned\020\007*2\n\nO"
"bjectType\022\016\n\nCollection\020\000\022\n\n\006Global\020\001\022\010\n"
"\004User\020\002*\233\005\n\017ObjectPrivilege\022\020\n\014Privilege"
"All\020\000\022\035\n\031PrivilegeCreateCollection\020\001\022\033\n\027"
"PrivilegeDropCollection\020\002\022\037\n\033PrivilegeDe"
"scribeCollection\020\003\022\034\n\030PrivilegeShowColle"
"ctions\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Privilege"
"Release\020\006\022\027\n\023PrivilegeCompaction\020\007\022\023\n\017Pr"
"ivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020\t\022\032\n\026"
"PrivilegeGetStatistics\020\n\022\030\n\024PrivilegeCre"
"ateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020\014\022\026\n\022"
"PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSearch\020"
"\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQuery\020"
"\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Privilege"
"Import\020\022\022\034\n\030PrivilegeCreateOwnership\020\023\022\027"
"\n\023PrivilegeUpdateUser\020\024\022\032\n\026PrivilegeDrop"
"Ownership\020\025\022\034\n\030PrivilegeSelectOwnership\020"
"\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023Privi"
"legeSelectUser\020\030\022\023\n\017PrivilegeUpsert\020\031*S\n"
"\tStateCode\022\020\n\014Initializing\020\000\022\013\n\007Healthy\020"
"\001\022\014\n\010Abnormal\020\002\022\013\n\007StandBy\020\003\022\014\n\010Stopping"
"\020\004*c\n\tLoadState\022\025\n\021LoadStateNotExist\020\000\022\024"
"\n\020LoadStateNotLoad\020\001\022\024\n\020LoadStateLoading"
"\020\002\022\023\n\017LoadStateLoaded\020\003:^\n\021privilege_ext"
"_obj\022\037.google.protobuf.MessageOptions\030\351\007"
" \001(\0132!.milvus.proto.common.PrivilegeExtB"
"f\n\016io.milvus.grpcB\013CommonProtoP\001Z1github"
".com/milvus-io/milvus-proto/go-api/commo"
"npb\240\001\001\252\002\016IO.Milvus.Grpcb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
&::descriptor_table_google_2fprotobuf_2fdescriptor_2eproto,
@ -499,7 +500,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5535,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5591,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 11, 1,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 11, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
@ -566,6 +567,7 @@ bool ErrorCode_IsValid(int value) {
case 48:
case 49:
case 50:
case 51:
case 100:
case 1000:
return true;
@ -667,6 +669,7 @@ bool MsgType_IsValid(int value) {
case 401:
case 402:
case 403:
case 404:
case 500:
case 501:
case 502:
@ -831,6 +834,7 @@ bool ObjectPrivilege_IsValid(int value) {
case 22:
case 23:
case 24:
case 25:
return true;
default:
return false;

View File

@ -163,6 +163,7 @@ enum ErrorCode : int {
ForceDeny = 48,
RateLimit = 49,
NodeIDNotMatch = 50,
UpsertAutoIDTrue = 51,
DataCoordNA = 100,
DDRequestRace = 1000,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
@ -308,6 +309,7 @@ enum MsgType : int {
Delete = 401,
Flush = 402,
ResendSegmentStats = 403,
Upsert = 404,
Search = 500,
SearchResult = 501,
GetIndexState = 502,
@ -538,12 +540,13 @@ enum ObjectPrivilege : int {
PrivilegeSelectOwnership = 22,
PrivilegeManageOwnership = 23,
PrivilegeSelectUser = 24,
PrivilegeUpsert = 25,
ObjectPrivilege_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
ObjectPrivilege_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool ObjectPrivilege_IsValid(int value);
constexpr ObjectPrivilege ObjectPrivilege_MIN = PrivilegeAll;
constexpr ObjectPrivilege ObjectPrivilege_MAX = PrivilegeSelectUser;
constexpr ObjectPrivilege ObjectPrivilege_MAX = PrivilegeUpsert;
constexpr int ObjectPrivilege_ARRAYSIZE = ObjectPrivilege_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ObjectPrivilege_descriptor();

View File

@ -684,6 +684,10 @@ func (s *Server) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*
return s.proxy.Delete(ctx, request)
}
func (s *Server) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
panic("TODO: not implement")
}
func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
return s.proxy.Search(ctx, request)
}

View File

@ -145,6 +145,17 @@ var (
roleNameLabelName,
nodeIDLabelName,
})
// RootCoordQuotaStates records the quota states of cluster.
RootCoordQuotaStates = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.RootCoordRole,
Name: "quota_states",
Help: "The quota states of cluster",
}, []string{
"quota_states",
})
)
//RegisterRootCoord registers RootCoord metrics
@ -176,4 +187,5 @@ func RegisterRootCoord(registry *prometheus.Registry) {
registry.MustRegister(RootCoordNumOfRoles)
registry.MustRegister(RootCoordTtDelay)
registry.MustRegister(RootCoordQuotaStates)
}

View File

@ -53,4 +53,6 @@ message RefreshPolicyInfoCacheRequest {
message SetRatesRequest {
common.MsgBase base = 1;
repeated internal.Rate rates = 2;
repeated milvus.QuotaState states = 3;
repeated string state_reasons = 4;
}

View File

@ -252,11 +252,13 @@ func (m *RefreshPolicyInfoCacheRequest) GetOpKey() string {
}
type SetRatesRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Rates []*internalpb.Rate `protobuf:"bytes,2,rep,name=rates,proto3" json:"rates,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Rates []*internalpb.Rate `protobuf:"bytes,2,rep,name=rates,proto3" json:"rates,omitempty"`
States []milvuspb.QuotaState `protobuf:"varint,3,rep,packed,name=states,proto3,enum=milvus.proto.milvus.QuotaState" json:"states,omitempty"`
StateReasons []string `protobuf:"bytes,4,rep,name=state_reasons,json=stateReasons,proto3" json:"state_reasons,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SetRatesRequest) Reset() { *m = SetRatesRequest{} }
@ -298,6 +300,20 @@ func (m *SetRatesRequest) GetRates() []*internalpb.Rate {
return nil
}
func (m *SetRatesRequest) GetStates() []milvuspb.QuotaState {
if m != nil {
return m.States
}
return nil
}
func (m *SetRatesRequest) GetStateReasons() []string {
if m != nil {
return m.StateReasons
}
return nil
}
func init() {
proto.RegisterType((*InvalidateCollMetaCacheRequest)(nil), "milvus.proto.proxy.InvalidateCollMetaCacheRequest")
proto.RegisterType((*InvalidateCredCacheRequest)(nil), "milvus.proto.proxy.InvalidateCredCacheRequest")
@ -309,43 +325,47 @@ func init() {
func init() { proto.RegisterFile("proxy.proto", fileDescriptor_700b50b08ed8dbaf) }
var fileDescriptor_700b50b08ed8dbaf = []byte{
// 573 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xdd, 0x6e, 0xda, 0x30,
0x18, 0x6d, 0xda, 0x42, 0xbb, 0x0f, 0x54, 0x24, 0xab, 0x63, 0x2c, 0x5d, 0x27, 0x94, 0x4e, 0x2b,
0xaa, 0x34, 0x58, 0xd9, 0x9e, 0xa0, 0x54, 0x42, 0x68, 0xa2, 0xaa, 0xc2, 0x76, 0xb3, 0x9b, 0xc9,
0x49, 0xbe, 0x82, 0x51, 0x62, 0xa7, 0xb1, 0x61, 0xe3, 0x6a, 0xd2, 0xde, 0x68, 0x77, 0x7b, 0xbc,
0x29, 0x3f, 0x04, 0xc2, 0x42, 0xa3, 0xad, 0xda, 0x1d, 0xc7, 0x3e, 0xf6, 0x39, 0x27, 0xfe, 0x0e,
0x50, 0xf1, 0x03, 0xf1, 0x6d, 0xd1, 0xf6, 0x03, 0xa1, 0x04, 0x21, 0x1e, 0x73, 0xe7, 0x33, 0x19,
0xa3, 0x76, 0xb4, 0xa3, 0x57, 0x6d, 0xe1, 0x79, 0x82, 0xc7, 0x6b, 0xfa, 0x11, 0xe3, 0x0a, 0x03,
0x4e, 0xdd, 0x04, 0x57, 0xd7, 0x4f, 0x18, 0xbf, 0x34, 0x78, 0x39, 0xe0, 0x73, 0xea, 0x32, 0x87,
0x2a, 0xec, 0x09, 0xd7, 0x1d, 0xa2, 0xa2, 0x3d, 0x6a, 0x4f, 0xd0, 0xc4, 0xfb, 0x19, 0x4a, 0x45,
0xde, 0xc2, 0xbe, 0x45, 0x25, 0x36, 0xb4, 0xa6, 0xd6, 0xaa, 0x74, 0x5f, 0xb4, 0x33, 0x8a, 0x89,
0xd4, 0x50, 0x8e, 0xaf, 0xa8, 0x44, 0x33, 0x62, 0x92, 0x67, 0x70, 0xe0, 0x58, 0x5f, 0x38, 0xf5,
0xb0, 0xb1, 0xdb, 0xd4, 0x5a, 0x4f, 0xcc, 0xb2, 0x63, 0xdd, 0x50, 0x0f, 0xc9, 0x39, 0xd4, 0x6c,
0xe1, 0xba, 0x68, 0x2b, 0x26, 0x78, 0x4c, 0xd8, 0x8b, 0x08, 0x47, 0xab, 0xe5, 0x88, 0x68, 0x40,
0x75, 0xb5, 0x32, 0xb8, 0x6e, 0xec, 0x37, 0xb5, 0xd6, 0x9e, 0x99, 0x59, 0x33, 0xa6, 0xa0, 0xaf,
0x39, 0x0f, 0xd0, 0x79, 0xa4, 0x6b, 0x1d, 0x0e, 0x67, 0x32, 0xfc, 0x52, 0xa9, 0xed, 0x14, 0x1b,
0x3f, 0x34, 0xa8, 0x7f, 0xf2, 0xff, 0xbf, 0x50, 0xb8, 0xe7, 0x53, 0x29, 0xbf, 0x8a, 0xc0, 0x49,
0x3e, 0x4d, 0x8a, 0x8d, 0xef, 0x70, 0x6a, 0xe2, 0x5d, 0x80, 0x72, 0x72, 0x2b, 0x5c, 0x66, 0x2f,
0x06, 0xfc, 0x4e, 0x3c, 0xd2, 0x4a, 0x1d, 0xca, 0xc2, 0xff, 0xb8, 0xf0, 0x63, 0x23, 0x25, 0x33,
0x41, 0xe4, 0x18, 0x4a, 0xc2, 0xff, 0x80, 0x8b, 0xc4, 0x43, 0x0c, 0x8c, 0x39, 0xd4, 0x46, 0xa8,
0x4c, 0xaa, 0x50, 0xfe, 0xbb, 0xe4, 0x25, 0x94, 0x82, 0xf0, 0x86, 0xc6, 0x6e, 0x73, 0xaf, 0x55,
0xe9, 0x9e, 0x64, 0x8f, 0xa4, 0xc3, 0x1a, 0xaa, 0x98, 0x31, 0xb3, 0xfb, 0xf3, 0x00, 0x4a, 0xb7,
0xe1, 0x68, 0x13, 0x17, 0x48, 0x1f, 0x55, 0x4f, 0x78, 0xbe, 0xe0, 0xc8, 0xd5, 0x48, 0x85, 0xfb,
0xa4, 0x9d, 0xbd, 0x23, 0x01, 0x7f, 0x12, 0x13, 0xd3, 0xfa, 0xab, 0x5c, 0xfe, 0x06, 0xd9, 0xd8,
0x21, 0xf7, 0x70, 0xdc, 0xc7, 0x08, 0x32, 0xa9, 0x98, 0x2d, 0x7b, 0x13, 0xca, 0x39, 0xba, 0xa4,
0xbb, 0xc5, 0x73, 0x1e, 0x79, 0xa9, 0x79, 0x96, 0xab, 0x39, 0x52, 0x01, 0xe3, 0x63, 0x13, 0xa5,
0x2f, 0xb8, 0x44, 0x63, 0x87, 0x04, 0x70, 0x9a, 0xad, 0x63, 0x3c, 0xee, 0x69, 0x29, 0x37, 0xb5,
0xe3, 0xff, 0x82, 0x87, 0x1b, 0xac, 0x9f, 0xe4, 0x3e, 0x4b, 0x68, 0x75, 0x16, 0xc6, 0xa4, 0x50,
0xed, 0xa3, 0xba, 0x76, 0x96, 0xf1, 0x2e, 0xb6, 0xc7, 0x4b, 0x49, 0x7f, 0x19, 0x6b, 0x0a, 0xcf,
0xb3, 0x5d, 0x45, 0xae, 0x18, 0x75, 0xe3, 0x48, 0xed, 0x82, 0x48, 0x1b, 0x8d, 0x2b, 0x8a, 0x63,
0xc1, 0xd3, 0x55, 0x55, 0xd7, 0x75, 0x2e, 0xf2, 0x74, 0xf2, 0x5b, 0x5d, 0xa4, 0x31, 0x85, 0x7a,
0x7e, 0x15, 0xc9, 0x65, 0x9e, 0xc8, 0x83, 0xb5, 0x2d, 0xd2, 0x72, 0xa0, 0xd6, 0x47, 0x15, 0xcd,
0xff, 0x10, 0x55, 0xc0, 0x6c, 0x49, 0x5e, 0x6f, 0x1b, 0xf8, 0x84, 0xb0, 0xbc, 0xf9, 0xbc, 0x90,
0x97, 0xbe, 0xd0, 0x0d, 0x1c, 0x2e, 0xbb, 0x4d, 0xce, 0xf2, 0x32, 0x6c, 0x34, 0xbf, 0xc0, 0xf5,
0xd5, 0xfb, 0xcf, 0xdd, 0x31, 0x53, 0x93, 0x99, 0x15, 0xee, 0x74, 0x62, 0xea, 0x1b, 0x26, 0x92,
0x5f, 0x9d, 0xe5, 0x50, 0x75, 0xa2, 0xd3, 0x9d, 0x48, 0xc2, 0xb7, 0xac, 0x72, 0x04, 0xdf, 0xfd,
0x0e, 0x00, 0x00, 0xff, 0xff, 0x9d, 0xbe, 0x61, 0x31, 0xe4, 0x06, 0x00, 0x00,
// 625 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x51, 0x4f, 0x13, 0x4d,
0x14, 0x65, 0x29, 0x2d, 0x70, 0xe9, 0x07, 0xc9, 0x84, 0x0f, 0x6b, 0x11, 0x6d, 0x16, 0x23, 0x0d,
0x89, 0xad, 0x54, 0x13, 0xdf, 0x29, 0x49, 0x43, 0x0c, 0x04, 0xa7, 0xfa, 0xe2, 0x0b, 0x99, 0xdd,
0xbd, 0xd0, 0x21, 0xdb, 0x99, 0x65, 0x67, 0x8a, 0xf6, 0xc9, 0xc4, 0x7f, 0xe4, 0x9b, 0x3f, 0xc4,
0x1f, 0x64, 0x76, 0x66, 0xbb, 0xd0, 0xba, 0x65, 0xa3, 0xc4, 0xb7, 0x3d, 0x77, 0xce, 0x9d, 0x73,
0xef, 0xec, 0xbd, 0x07, 0xd6, 0xa2, 0x58, 0x7e, 0x19, 0xb7, 0xa2, 0x58, 0x6a, 0x49, 0xc8, 0x90,
0x87, 0x37, 0x23, 0x65, 0x51, 0xcb, 0x9c, 0xd4, 0xab, 0xbe, 0x1c, 0x0e, 0xa5, 0xb0, 0xb1, 0xfa,
0x3a, 0x17, 0x1a, 0x63, 0xc1, 0xc2, 0x14, 0x57, 0xef, 0x66, 0xb8, 0x3f, 0x1c, 0x78, 0x7a, 0x2c,
0x6e, 0x58, 0xc8, 0x03, 0xa6, 0xb1, 0x2b, 0xc3, 0xf0, 0x04, 0x35, 0xeb, 0x32, 0x7f, 0x80, 0x14,
0xaf, 0x47, 0xa8, 0x34, 0x79, 0x05, 0x4b, 0x1e, 0x53, 0x58, 0x73, 0x1a, 0x4e, 0x73, 0xad, 0xf3,
0xa4, 0x35, 0xa5, 0x98, 0x4a, 0x9d, 0xa8, 0xcb, 0x43, 0xa6, 0x90, 0x1a, 0x26, 0x79, 0x04, 0xcb,
0x81, 0x77, 0x2e, 0xd8, 0x10, 0x6b, 0x8b, 0x0d, 0xa7, 0xb9, 0x4a, 0x2b, 0x81, 0x77, 0xca, 0x86,
0x48, 0xf6, 0x60, 0xc3, 0x97, 0x61, 0x88, 0xbe, 0xe6, 0x52, 0x58, 0x42, 0xc9, 0x10, 0xd6, 0x6f,
0xc3, 0x86, 0xe8, 0x42, 0xf5, 0x36, 0x72, 0x7c, 0x54, 0x5b, 0x6a, 0x38, 0xcd, 0x12, 0x9d, 0x8a,
0xb9, 0x57, 0x50, 0xbf, 0x53, 0x79, 0x8c, 0xc1, 0x03, 0xab, 0xae, 0xc3, 0xca, 0x48, 0x25, 0x2f,
0x95, 0x95, 0x9d, 0x61, 0xf7, 0x9b, 0x03, 0x5b, 0x1f, 0xa3, 0x7f, 0x2f, 0x94, 0x9c, 0x45, 0x4c,
0xa9, 0xcf, 0x32, 0x0e, 0xd2, 0xa7, 0xc9, 0xb0, 0xfb, 0x15, 0x76, 0x28, 0x5e, 0xc4, 0xa8, 0x06,
0x67, 0x32, 0xe4, 0xfe, 0xf8, 0x58, 0x5c, 0xc8, 0x07, 0x96, 0xb2, 0x05, 0x15, 0x19, 0x7d, 0x18,
0x47, 0xb6, 0x90, 0x32, 0x4d, 0x11, 0xd9, 0x84, 0xb2, 0x8c, 0xde, 0xe1, 0x38, 0xad, 0xc1, 0x02,
0xf7, 0xa7, 0x03, 0x1b, 0x7d, 0xd4, 0x94, 0x69, 0x54, 0x7f, 0xaf, 0x79, 0x00, 0xe5, 0x38, 0xb9,
0xa1, 0xb6, 0xd8, 0x28, 0x35, 0xd7, 0x3a, 0xdb, 0xd3, 0x29, 0xd9, 0xb4, 0x26, 0x2a, 0xd4, 0x32,
0xc9, 0x5b, 0xa8, 0x28, 0x6d, 0x72, 0x4a, 0x8d, 0x52, 0x73, 0xbd, 0xf3, 0x6c, 0x3a, 0x27, 0x05,
0xef, 0x47, 0x52, 0xb3, 0x7e, 0xc2, 0xa3, 0x29, 0x9d, 0xec, 0xc2, 0x7f, 0xe6, 0xeb, 0x3c, 0x46,
0xa6, 0xa4, 0x50, 0xb5, 0xa5, 0x46, 0xa9, 0xb9, 0x4a, 0xab, 0x26, 0x48, 0x6d, 0xac, 0xf3, 0x7d,
0x19, 0xca, 0x67, 0xc9, 0xe6, 0x90, 0x10, 0x48, 0x0f, 0x75, 0x57, 0x0e, 0x23, 0x29, 0x50, 0xe8,
0xbe, 0xbd, 0xa4, 0x95, 0xab, 0xf6, 0x3b, 0x31, 0x7d, 0x92, 0xfa, 0xf3, 0x5c, 0xfe, 0x0c, 0xd9,
0x5d, 0x20, 0xd7, 0xb0, 0xd9, 0x43, 0x03, 0xb9, 0xd2, 0xdc, 0x57, 0xdd, 0x01, 0x13, 0x02, 0x43,
0xd2, 0x99, 0xf3, 0x22, 0x79, 0xe4, 0x89, 0xe6, 0x6e, 0xae, 0x66, 0x5f, 0xc7, 0x5c, 0x5c, 0x52,
0x54, 0x91, 0x14, 0x0a, 0xdd, 0x05, 0x12, 0xc3, 0xce, 0xf4, 0xb6, 0xdb, 0x6d, 0xca, 0x76, 0x7e,
0x56, 0xdb, 0x5a, 0xcd, 0xfd, 0x06, 0x51, 0xdf, 0xce, 0xfd, 0xe9, 0x49, 0xa9, 0xa3, 0xa4, 0x4d,
0x06, 0xd5, 0x1e, 0xea, 0xa3, 0x60, 0xd2, 0xde, 0xfe, 0xfc, 0xf6, 0x32, 0xd2, 0x1f, 0xb6, 0x75,
0x05, 0x8f, 0xa7, 0xad, 0x00, 0x85, 0xe6, 0x2c, 0xb4, 0x2d, 0xb5, 0x0a, 0x5a, 0x9a, 0x59, 0xe8,
0xa2, 0x76, 0x3c, 0xf8, 0xff, 0xd6, 0x09, 0xee, 0xea, 0xec, 0xe7, 0xe9, 0xe4, 0x9b, 0x46, 0x91,
0xc6, 0x15, 0x6c, 0xe5, 0x6f, 0x3a, 0x39, 0xc8, 0x13, 0xb9, 0xd7, 0x15, 0x8a, 0xb4, 0x02, 0xd8,
0xe8, 0xa1, 0x36, 0xf3, 0x7f, 0x82, 0x3a, 0xe6, 0xbe, 0x22, 0x2f, 0xe6, 0x0d, 0x7c, 0x4a, 0x98,
0xdc, 0xbc, 0x57, 0xc8, 0xcb, 0xfe, 0xd0, 0x29, 0xac, 0x4c, 0x9c, 0x83, 0xec, 0xe6, 0xf5, 0x30,
0xe3, 0x2b, 0x05, 0x55, 0x1f, 0xbe, 0xf9, 0xd4, 0xb9, 0xe4, 0x7a, 0x30, 0xf2, 0x92, 0x93, 0xb6,
0xa5, 0xbe, 0xe4, 0x32, 0xfd, 0x6a, 0x4f, 0x86, 0xaa, 0x6d, 0xb2, 0xdb, 0x46, 0x22, 0xf2, 0xbc,
0x8a, 0x81, 0xaf, 0x7f, 0x05, 0x00, 0x00, 0xff, 0xff, 0x36, 0x94, 0x65, 0x38, 0x43, 0x07, 0x00,
0x00,
}
// Reference imports to suppress errors if they are not otherwise used.

View File

@ -24,6 +24,8 @@ import (
"google.golang.org/grpc/status"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
)
// TODO(dragondriver): add more common error type
@ -90,3 +92,22 @@ func ErrProxyNotReady() error {
func ErrPartitionNotExist(partitionName string) error {
return fmt.Errorf("partition is not exist: %s", partitionName)
}
var (
ErrRateLimit = errors.New("RequestLimited")
ErrForceDeny = errors.New("RequestDenied")
)
func wrapRateLimitError() error {
return fmt.Errorf("[%w] request is rejected by grpc RateLimiter middleware, please retry later", ErrRateLimit)
}
func wrapForceDenyError(rt internalpb.RateType, limiter types.Limiter) error {
switch rt {
case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad:
return fmt.Errorf("[%w] deny to write, reason: %s", ErrForceDeny, limiter.GetWriteStateReason())
case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery:
return fmt.Errorf("[%w] deny to read, reason: %s", ErrForceDeny, limiter.GetReadStateReason())
}
return nil
}

View File

@ -17,10 +17,13 @@
package proxy
import (
"errors"
"testing"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
@ -149,3 +152,16 @@ func Test_errProxyIsUnhealthy(t *testing.T) {
zap.Error(errProxyIsUnhealthy(id)))
}
}
func Test_ErrRateLimitAndErrForceDeny(t *testing.T) {
err := wrapRateLimitError()
assert.True(t, errors.Is(err, ErrRateLimit))
limiter := NewMultiRateLimiter()
err = wrapForceDenyError(internalpb.RateType_DMLInsert, limiter)
assert.True(t, errors.Is(err, ErrForceDeny))
err = wrapForceDenyError(internalpb.RateType_DMLDelete, limiter)
assert.True(t, errors.Is(err, ErrForceDeny))
err = wrapForceDenyError(internalpb.RateType_DQLSearch, limiter)
assert.True(t, errors.Is(err, ErrForceDeny))
}

View File

@ -2128,6 +2128,10 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
return it.result, nil
}
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
panic("TODO: not implement")
}
// Delete delete records from collection, then these records cannot be searched.
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
@ -4185,6 +4189,13 @@ func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesReques
resp.Reason = err.Error()
return resp, nil
}
node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetStateReasons())
log.Info("current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Any("rates", request.GetRates()))
if len(request.GetStates()) != 0 {
for i := range request.GetStates() {
log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetStateReasons()[i]))
}
}
resp.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
@ -4248,16 +4259,22 @@ func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealt
err := group.Wait()
if err != nil || len(errReasons) != 0 {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IsHealthy: false,
Reasons: errReasons,
}, nil
}
states, reasons := node.multiRateLimiter.GetQuotaStates()
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
IsHealthy: true,
QuotaStates: states,
Reasons: reasons,
IsHealthy: true,
}, nil
}

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
@ -43,6 +59,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) {
func TestProxy_CheckHealth(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{ServerID: 1}}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
@ -59,6 +76,7 @@ func TestProxy_CheckHealth(t *testing.T) {
indexCoord: NewIndexCoordMock(),
session: &sessionutil.Session{ServerID: 1},
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
@ -96,6 +114,7 @@ func TestProxy_CheckHealth(t *testing.T) {
}),
dataCoord: dataCoordMock,
indexCoord: indexCoordMock}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
@ -103,4 +122,29 @@ func TestProxy_CheckHealth(t *testing.T) {
assert.Equal(t, false, resp.IsHealthy)
assert.Equal(t, 4, len(resp.Reasons))
})
t.Run("check quota state", func(t *testing.T) {
node := &Proxy{
rootCoord: NewRootCoordMock(),
dataCoord: NewDataCoordMock(),
queryCoord: NewQueryCoordMock(),
indexCoord: NewIndexCoordMock(),
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
resp, err := node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)
assert.Equal(t, 0, len(resp.GetQuotaStates()))
assert.Equal(t, 0, len(resp.GetReasons()))
states := []milvuspb.QuotaState{milvuspb.QuotaState_DenyToWrite, milvuspb.QuotaState_DenyToRead}
reasons := []string{"memory quota exhausted", "manually deny to read"}
node.multiRateLimiter.SetQuotaStates(states, reasons)
resp, err = node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)
assert.Equal(t, 2, len(resp.GetQuotaStates()))
assert.Equal(t, 2, len(resp.GetReasons()))
})
}

View File

@ -18,10 +18,12 @@ package proxy
import (
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -34,6 +36,8 @@ import (
type MultiRateLimiter struct {
globalRateLimiter *rateLimiter
// TODO: add collection level rateLimiter
quotaStatesMu sync.RWMutex
quotaStates map[milvuspb.QuotaState]string
}
// NewMultiRateLimiter returns a new MultiRateLimiter.
@ -43,14 +47,54 @@ func NewMultiRateLimiter() *MultiRateLimiter {
return m
}
// Limit returns true, the request will be rejected.
// Otherwise, the request will pass. Limit also returns limit of limiter.
func (m *MultiRateLimiter) Limit(rt internalpb.RateType, n int) (bool, float64) {
// Check checks if request would be limited or denied.
func (m *MultiRateLimiter) Check(rt internalpb.RateType, n int) error {
if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
return false, 1 // no limit
return nil
}
limit, rate := m.globalRateLimiter.limit(rt, n)
if rate == 0 {
return wrapForceDenyError(rt, m)
}
if limit {
return wrapRateLimitError()
}
return nil
}
// GetQuotaStates returns quota states.
func (m *MultiRateLimiter) GetQuotaStates() ([]milvuspb.QuotaState, []string) {
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
states := make([]milvuspb.QuotaState, 0, len(m.quotaStates))
reasons := make([]string, 0, len(m.quotaStates))
for k, v := range m.quotaStates {
states = append(states, k)
reasons = append(reasons, v)
}
return states, reasons
}
func (m *MultiRateLimiter) GetReadStateReason() string {
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
return m.quotaStates[milvuspb.QuotaState_DenyToRead]
}
func (m *MultiRateLimiter) GetWriteStateReason() string {
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
return m.quotaStates[milvuspb.QuotaState_DenyToWrite]
}
// SetQuotaStates sets quota states for MultiRateLimiter.
func (m *MultiRateLimiter) SetQuotaStates(states []milvuspb.QuotaState, reasons []string) {
m.quotaStatesMu.Lock()
defer m.quotaStatesMu.Unlock()
m.quotaStates = make(map[milvuspb.QuotaState]string, len(states))
for i := 0; i < len(states); i++ {
m.quotaStates[states[i]] = reasons[i]
}
// TODO: call other rate limiters
return m.globalRateLimiter.limit(rt, n)
}
// rateLimiter implements Limiter.
@ -83,7 +127,7 @@ func (rl *rateLimiter) setRates(rates []*internalpb.Rate) error {
return fmt.Errorf("unregister rateLimiter for rateType %s", r.GetRt().String())
}
}
rl.printRates(rates)
// rl.printRates(rates)
return nil
}

View File

@ -17,10 +17,12 @@
package proxy
import (
"errors"
"fmt"
"math"
"testing"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/ratelimitutil"
@ -36,12 +38,12 @@ func TestMultiRateLimiter(t *testing.T) {
multiLimiter.globalRateLimiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)
}
for _, rt := range internalpb.RateType_value {
ok, _ := multiLimiter.Limit(internalpb.RateType(rt), 1)
assert.False(t, ok)
ok, _ = multiLimiter.Limit(internalpb.RateType(rt), math.MaxInt)
assert.False(t, ok)
ok, _ = multiLimiter.Limit(internalpb.RateType(rt), math.MaxInt)
assert.True(t, ok)
err := multiLimiter.Check(internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt)
assert.NoError(t, err)
err = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt)
assert.True(t, errors.Is(err, ErrRateLimit))
}
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})
@ -51,9 +53,8 @@ func TestMultiRateLimiter(t *testing.T) {
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "false")
for _, rt := range internalpb.RateType_value {
ok, r := multiLimiter.Limit(internalpb.RateType(rt), 1)
assert.False(t, ok)
assert.NotEqual(t, float64(0), r)
err := multiLimiter.Check(internalpb.RateType(rt), 1)
assert.NoError(t, err)
}
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})
@ -65,9 +66,8 @@ func TestMultiRateLimiter(t *testing.T) {
multiLimiter := NewMultiRateLimiter()
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
ok, r := multiLimiter.Limit(internalpb.RateType_DMLInsert, 1*1024*1024)
assert.False(t, ok)
assert.NotEqual(t, float64(0), r)
err := multiLimiter.Check(internalpb.RateType_DMLInsert, 1*1024*1024)
assert.NoError(t, err)
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
Params.QuotaConfig.DMLMaxInsertRate = bakInsertRate
}
@ -77,6 +77,17 @@ func TestMultiRateLimiter(t *testing.T) {
run(math.MaxFloat64 / 3)
run(math.MaxFloat64 / 10000)
})
t.Run("test GetReadStateReason and GetWriteStateReason", func(t *testing.T) {
multiLimiter := NewMultiRateLimiter()
states := []milvuspb.QuotaState{milvuspb.QuotaState_DenyToWrite, milvuspb.QuotaState_DenyToRead}
writeReason := "memory quota exhausted"
readReason := "manually deny to read"
reasons := []string{writeReason, readReason}
multiLimiter.SetQuotaStates(states, reasons)
assert.Equal(t, writeReason, multiLimiter.GetWriteStateReason())
assert.Equal(t, readReason, multiLimiter.GetReadStateReason())
})
}
func TestRateLimiter(t *testing.T) {

View File

@ -18,6 +18,7 @@ package proxy
import (
"context"
"errors"
"fmt"
"reflect"
@ -34,19 +35,20 @@ import (
func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
rt, n, err := getRequestInfo(req)
if err == nil {
limit, rate := limiter.Limit(rt, n)
if rate == 0 {
res, err1 := getFailedResponse(req, commonpb.ErrorCode_ForceDeny, fmt.Sprintf("force to deny %s.", info.FullMethod))
if err1 == nil {
return res, nil
}
if err != nil {
return handler(ctx, req)
}
err = limiter.Check(rt, n)
if errors.Is(err, ErrForceDeny) {
rsp := getFailedResponse(req, commonpb.ErrorCode_ForceDeny, info.FullMethod, err)
if rsp != nil {
return rsp, nil
}
if limit {
res, err2 := getFailedResponse(req, commonpb.ErrorCode_RateLimit, fmt.Sprintf("%s is rejected by grpc RateLimiter middleware, please retry later.", info.FullMethod))
if err2 == nil {
return res, nil
}
}
if errors.Is(err, ErrRateLimit) {
rsp := getFailedResponse(req, commonpb.ErrorCode_RateLimit, info.FullMethod, err)
if rsp != nil {
return rsp, nil
}
}
return handler(ctx, req)
@ -112,40 +114,37 @@ func failedBoolResponse(code commonpb.ErrorCode, reason string) *milvuspb.BoolRe
}
// getFailedResponse returns failed response.
func getFailedResponse(req interface{}, code commonpb.ErrorCode, reason string) (interface{}, error) {
func getFailedResponse(req interface{}, code commonpb.ErrorCode, fullMethod string, err error) interface{} {
reason := fmt.Sprintf("%s, req: %s", err, fullMethod)
switch req.(type) {
case *milvuspb.InsertRequest, *milvuspb.DeleteRequest:
return failedMutationResult(code, reason), nil
return failedMutationResult(code, reason)
case *milvuspb.ImportRequest:
return &milvuspb.ImportResponse{
Status: failedStatus(code, reason),
}, nil
}
case *milvuspb.SearchRequest:
return &milvuspb.SearchResults{
Status: failedStatus(code, reason),
}, nil
}
case *milvuspb.QueryRequest:
return &milvuspb.QueryResults{
Status: failedStatus(code, reason),
}, nil
}
case *milvuspb.CreateCollectionRequest, *milvuspb.DropCollectionRequest,
*milvuspb.LoadCollectionRequest, *milvuspb.ReleaseCollectionRequest,
*milvuspb.CreatePartitionRequest, *milvuspb.DropPartitionRequest,
*milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest,
*milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest:
return failedStatus(code, reason), nil
return failedStatus(code, reason)
case *milvuspb.FlushRequest:
return &milvuspb.FlushResponse{
Status: failedStatus(code, reason),
}, nil
}
case *milvuspb.ManualCompactionRequest:
return &milvuspb.ManualCompactionResponse{
Status: failedStatus(code, reason),
}, nil
// TODO: support more request
}
}
if req == nil {
return nil, fmt.Errorf("null request")
}
return nil, fmt.Errorf("unsupported request type %s", reflect.TypeOf(req).Name())
return nil
}

View File

@ -18,6 +18,7 @@ package proxy
import (
"context"
"fmt"
"testing"
"github.com/golang/protobuf/proto"
@ -30,12 +31,38 @@ import (
)
type limiterMock struct {
limit bool
rate float64
limit bool
rate float64
quotaStates []milvuspb.QuotaState
quotaStateReasons []string
}
func (l *limiterMock) Limit(_ internalpb.RateType, _ int) (bool, float64) {
return l.limit, l.rate
func (l *limiterMock) Check(rt internalpb.RateType, n int) error {
if l.rate == 0 {
return ErrForceDeny
}
if l.limit {
return ErrRateLimit
}
return nil
}
func (l *limiterMock) GetReadStateReason() string {
for i := range l.quotaStates {
if l.quotaStates[i] == milvuspb.QuotaState_DenyToRead {
return l.quotaStateReasons[i]
}
}
return ""
}
func (l *limiterMock) GetWriteStateReason() string {
for i := range l.quotaStates {
if l.quotaStates[i] == milvuspb.QuotaState_DenyToWrite {
return l.quotaStateReasons[i]
}
}
return ""
}
func TestRateLimitInterceptor(t *testing.T) {
@ -93,8 +120,8 @@ func TestRateLimitInterceptor(t *testing.T) {
t.Run("test getFailedResponse", func(t *testing.T) {
testGetFailedResponse := func(req interface{}) {
_, err := getFailedResponse(req, commonpb.ErrorCode_UnexpectedError, "mock")
assert.NoError(t, err)
rsp := getFailedResponse(req, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err"))
assert.NotNil(t, rsp)
}
testGetFailedResponse(&milvuspb.DeleteRequest{})
@ -106,10 +133,10 @@ func TestRateLimitInterceptor(t *testing.T) {
testGetFailedResponse(&milvuspb.ManualCompactionRequest{})
// test illegal
_, err := getFailedResponse(&milvuspb.SearchResults{}, commonpb.ErrorCode_UnexpectedError, "mock")
assert.Error(t, err)
_, err = getFailedResponse(nil, commonpb.ErrorCode_UnexpectedError, "mock")
assert.Error(t, err)
rsp := getFailedResponse(&milvuspb.SearchResults{}, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err"))
assert.Nil(t, rsp)
rsp = getFailedResponse(nil, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err"))
assert.Nil(t, rsp)
})
t.Run("test RateLimitInterceptor", func(t *testing.T) {

View File

@ -27,6 +27,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -45,15 +46,31 @@ const (
SetRatesTimeout = 10 * time.Second
)
type ForceDenyTriggerReason string
type TriggerReason int32
const (
ManualForceDeny ForceDenyTriggerReason = "ManualForceDeny"
MemoryExhausted ForceDenyTriggerReason = "MemoryExhausted"
DiskQuotaExceeded ForceDenyTriggerReason = "DiskQuotaExceeded"
TimeTickLongDelay ForceDenyTriggerReason = "TimeTickLongDelay"
ManuallyDenyToRead TriggerReason = 0
ManuallyDenyToWrite TriggerReason = 1
MemoryQuotaExhausted TriggerReason = 2
DiskQuotaExhausted TriggerReason = 3
TimeTickLongDelay TriggerReason = 4
)
var TriggerReasonString = map[TriggerReason]string{
ManuallyDenyToRead: "manually deny to read",
ManuallyDenyToWrite: "manually deny to write",
MemoryQuotaExhausted: "memory quota exhausted, please allocate more resources",
DiskQuotaExhausted: "disk quota exhausted, please allocate more resources",
TimeTickLongDelay: "time tick long delay",
}
func (t TriggerReason) String() string {
if s, ok := TriggerReasonString[t]; ok {
return s
}
return ""
}
type RateAllocateStrategy int32
const (
@ -97,6 +114,7 @@ type QuotaCenter struct {
dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics
currentRates map[internalpb.RateType]Limit
quotaStates map[milvuspb.QuotaState]string
tsoAllocator tso.Allocator
rateAllocateStrategy RateAllocateStrategy
@ -112,6 +130,7 @@ func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, da
queryCoord: queryCoord,
dataCoord: dataCoord,
currentRates: make(map[internalpb.RateType]Limit),
quotaStates: make(map[milvuspb.QuotaState]string),
tsoAllocator: tsoAllocator,
rateAllocateStrategy: DefaultRateAllocateStrategy,
@ -144,6 +163,7 @@ func (q *QuotaCenter) run() {
if err != nil {
log.Warn("quotaCenter setRates failed", zap.Error(err))
}
q.recordMetrics()
}
}
}
@ -251,18 +271,20 @@ func (q *QuotaCenter) syncMetrics() error {
}
// forceDenyWriting sets dml rates to 0 to reject all dml requests.
func (q *QuotaCenter) forceDenyWriting(reason ForceDenyTriggerReason) {
func (q *QuotaCenter) forceDenyWriting(reason TriggerReason) {
q.currentRates[internalpb.RateType_DMLInsert] = 0
q.currentRates[internalpb.RateType_DMLDelete] = 0
q.currentRates[internalpb.RateType_DMLBulkLoad] = 0
log.Warn("QuotaCenter force to deny writing", zap.String("reason", string(reason)))
log.Warn("QuotaCenter force to deny writing", zap.String("reason", reason.String()))
q.quotaStates[milvuspb.QuotaState_DenyToWrite] = reason.String()
}
// forceDenyWriting sets dql rates to 0 to reject all dql requests.
func (q *QuotaCenter) forceDenyReading(reason ForceDenyTriggerReason) {
func (q *QuotaCenter) forceDenyReading(reason TriggerReason) {
q.currentRates[internalpb.RateType_DQLSearch] = 0
q.currentRates[internalpb.RateType_DQLQuery] = 0
log.Warn("QuotaCenter force to deny reading", zap.String("reason", string(reason)))
log.Warn("QuotaCenter force to deny reading", zap.String("reason", reason.String()))
q.quotaStates[milvuspb.QuotaState_DenyToRead] = reason.String()
}
// getRealTimeRate return real time rate in Proxy.
@ -288,7 +310,7 @@ func (q *QuotaCenter) guaranteeMinRate(minRate float64, rateType internalpb.Rate
// calculateReadRates calculates and sets dql rates.
func (q *QuotaCenter) calculateReadRates() {
if Params.QuotaConfig.ForceDenyReading.GetAsBool() {
q.forceDenyReading(ManualForceDeny)
q.forceDenyReading(ManuallyDenyToRead)
return
}
@ -333,13 +355,13 @@ func (q *QuotaCenter) calculateReadRates() {
// calculateWriteRates calculates and sets dml rates.
func (q *QuotaCenter) calculateWriteRates() error {
if Params.QuotaConfig.ForceDenyWriting.GetAsBool() {
q.forceDenyWriting(ManualForceDeny)
q.forceDenyWriting(ManuallyDenyToWrite)
return nil
}
exceeded := q.ifDiskQuotaExceeded()
if exceeded {
q.forceDenyWriting(DiskQuotaExceeded) // disk quota protection
q.forceDenyWriting(DiskQuotaExhausted) // disk quota protection
return nil
}
@ -355,7 +377,7 @@ func (q *QuotaCenter) calculateWriteRates() error {
memFactor := q.getMemoryFactor()
if memFactor <= 0 {
q.forceDenyWriting(MemoryExhausted) // memory protection
q.forceDenyWriting(MemoryQuotaExhausted) // memory protection
return nil
}
@ -408,6 +430,7 @@ func (q *QuotaCenter) resetCurrentRates() {
q.currentRates[rt] = Inf // no limit
}
}
q.quotaStates = make(map[milvuspb.QuotaState]string)
}
// getTimeTickDelayFactor gets time tick delay of DataNodes and QueryNodes,
@ -665,13 +688,38 @@ func (q *QuotaCenter) setRates() error {
case ByRateWeight:
// TODO: support ByRateWeight
}
states := make([]milvuspb.QuotaState, 0, len(q.quotaStates))
stateReasons := make([]string, 0, len(q.quotaStates))
for k, v := range q.quotaStates {
states = append(states, k)
stateReasons = append(stateReasons, v)
}
timestamp := tsoutil.ComposeTSByTime(time.Now(), 0)
req := &proxypb.SetRatesRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgID(int64(timestamp)),
commonpbutil.WithTimeStamp(timestamp),
),
Rates: map2List(),
Rates: map2List(),
States: states,
StateReasons: stateReasons,
}
return q.proxies.SetRates(ctx, req)
}
// recordMetrics records metrics of quota states.
func (q *QuotaCenter) recordMetrics() {
for _, reason := range TriggerReasonString {
hit := false
for _, v := range q.quotaStates {
if v == reason {
hit = true
}
}
if hit {
metrics.RootCoordQuotaStates.WithLabelValues(reason).Set(1)
} else {
metrics.RootCoordQuotaStates.WithLabelValues(reason).Set(0)
}
}
}

View File

@ -115,10 +115,10 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test forceDeny", func(t *testing.T) {
quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter.forceDenyReading(ManualForceDeny)
quotaCenter.forceDenyReading(ManuallyDenyToRead)
assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
quotaCenter.forceDenyWriting(ManualForceDeny)
quotaCenter.forceDenyWriting(ManuallyDenyToWrite)
assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DMLInsert])
assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DMLDelete])
})
@ -441,10 +441,19 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test setRates", func(t *testing.T) {
quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter.currentRates[internalpb.RateType_DMLInsert] = 100
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = TriggerReasonString[MemoryQuotaExhausted]
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = TriggerReasonString[ManuallyDenyToRead]
err = quotaCenter.setRates()
assert.NoError(t, err)
})
t.Run("test recordMetrics", func(t *testing.T) {
quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = TriggerReasonString[MemoryQuotaExhausted]
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = TriggerReasonString[ManuallyDenyToRead]
quotaCenter.recordMetrics()
})
t.Run("test guaranteeMinRate", func(t *testing.T) {
quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)
minRate := Limit(100)

View File

@ -40,7 +40,9 @@ type TimeTickProvider interface {
// If Limit function return true, the request will be rejected.
// Otherwise, the request will pass. Limit also returns limit of limiter.
type Limiter interface {
Limit(rt internalpb.RateType, n int) (bool, float64)
Check(rt internalpb.RateType, n int) error
GetReadStateReason() string
GetWriteStateReason() string
}
// Component is the interface all services implement