Return error code when deny to read/write (#21561)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/21564/head
bigsheeper 2023-01-06 14:31:37 +08:00 committed by GitHub
parent aa203acfb3
commit 79535931ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 277 additions and 336 deletions

2
go.mod
View File

@ -27,7 +27,7 @@ require (
github.com/klauspost/compress v1.14.4
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230105121931-9f9303dcc729
github.com/minio/minio-go/v7 v7.0.17
github.com/opentracing/opentracing-go v1.2.0
github.com/panjf2000/ants/v2 v2.4.8

2
go.sum
View File

@ -484,6 +484,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0 h1:GSiYfmb/CgWCdTKHzI0zl0L1xTr9/kaM6wr1O882lYc=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20221226093525-ce18c3347db0/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230105121931-9f9303dcc729 h1:hsb1ifdNe3qlXi1YY5dWPPzWMNZmnqe5uunYPYK3gd0=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230105121931-9f9303dcc729/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -363,7 +363,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
".ObjectType\022>\n\020object_privilege\030\002 \001(\0162$."
"milvus.proto.common.ObjectPrivilege\022\031\n\021o"
"bject_name_index\030\003 \001(\005\022\032\n\022object_name_in"
"dexs\030\004 \001(\005*\254\t\n\tErrorCode\022\013\n\007Success\020\000\022\023\n"
"dexs\030\004 \001(\005*\223\n\n\tErrorCode\022\013\n\007Success\020\000\022\023\n"
"\017UnexpectedError\020\001\022\021\n\rConnectFailed\020\002\022\024\n"
"\020PermissionDenied\020\003\022\027\n\023CollectionNotExis"
"ts\020\004\022\023\n\017IllegalArgument\020\005\022\024\n\020IllegalDime"
@ -392,94 +392,97 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"otShardLeader\020-\022\026\n\022NoReplicaAvailable\020.\022"
"\023\n\017SegmentNotFound\020/\022\r\n\tForceDeny\0200\022\r\n\tR"
"ateLimit\0201\022\022\n\016NodeIDNotMatch\0202\022\024\n\020Upsert"
"AutoIDTrue\0203\022\017\n\013DataCoordNA\020d\022\022\n\rDDReque"
"stRace\020\350\007*c\n\nIndexState\022\022\n\016IndexStateNon"
"e\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Fin"
"ished\020\003\022\n\n\006Failed\020\004\022\t\n\005Retry\020\005*\202\001\n\014Segme"
"ntState\022\024\n\020SegmentStateNone\020\000\022\014\n\010NotExis"
"t\020\001\022\013\n\007Growing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020"
"\004\022\014\n\010Flushing\020\005\022\013\n\007Dropped\020\006\022\r\n\tImportin"
"g\020\007*>\n\017PlaceholderType\022\010\n\004None\020\000\022\020\n\014Bina"
"ryVector\020d\022\017\n\013FloatVector\020e*\232\r\n\007MsgType\022"
"\r\n\tUndefined\020\000\022\024\n\020CreateCollection\020d\022\022\n\016"
"DropCollection\020e\022\021\n\rHasCollection\020f\022\026\n\022D"
"escribeCollection\020g\022\023\n\017ShowCollections\020h"
"\022\024\n\020GetSystemConfigs\020i\022\022\n\016LoadCollection"
"\020j\022\025\n\021ReleaseCollection\020k\022\017\n\013CreateAlias"
"\020l\022\r\n\tDropAlias\020m\022\016\n\nAlterAlias\020n\022\023\n\017Alt"
"erCollection\020o\022\024\n\017CreatePartition\020\310\001\022\022\n\r"
"DropPartition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021D"
"escribePartition\020\313\001\022\023\n\016ShowPartitions\020\314\001"
"\022\023\n\016LoadPartitions\020\315\001\022\026\n\021ReleasePartitio"
"ns\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegm"
"ent\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegm"
"ents\020\375\001\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBal"
"anceSegments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020"
"\n\013CreateIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\t"
"DropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n"
"\n\005Flush\020\222\003\022\027\n\022ResendSegmentStats\020\223\003\022\013\n\006U"
"psert\020\224\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003"
"\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBuildPro"
"gress\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033"
"\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retrieve\020"
"\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChanne"
"ls\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQue"
"ryChannels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022"
"\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchD"
"eltaChannels\020\201\004\022\024\n\017GetShardLeaders\020\202\004\022\020\n"
"\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204\004\022\024\n\017"
"GetDistribution\020\205\004\022\025\n\020SyncDistribution\020\206"
"\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017"
"GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004"
"\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\t"
"LoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestT"
"SO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentSt"
"atistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDa"
"taNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGe"
"tCredential\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n"
"\020UpdateCredential\020\337\013\022\026\n\021ListCredUsername"
"s\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017"
"OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014\022\017\n\nS"
"electUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025\n\020Ope"
"ratePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022\033\n\026Re"
"freshPolicyInfoCache\020\310\014\022\017\n\nListPolicy\020\311\014"
"*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017"
"CompactionState\022\021\n\rUndefiedState\020\000\022\r\n\tEx"
"ecuting\020\001\022\r\n\tCompleted\020\002*X\n\020ConsistencyL"
"evel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bounded"
"\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\236\001\n\013I"
"mportState\022\021\n\rImportPending\020\000\022\020\n\014ImportF"
"ailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017ImportPers"
"isted\020\005\022\021\n\rImportFlushed\020\010\022\023\n\017ImportComp"
"leted\020\006\022\032\n\026ImportFailedAndCleaned\020\007*2\n\nO"
"bjectType\022\016\n\nCollection\020\000\022\n\n\006Global\020\001\022\010\n"
"\004User\020\002*\233\005\n\017ObjectPrivilege\022\020\n\014Privilege"
"All\020\000\022\035\n\031PrivilegeCreateCollection\020\001\022\033\n\027"
"PrivilegeDropCollection\020\002\022\037\n\033PrivilegeDe"
"scribeCollection\020\003\022\034\n\030PrivilegeShowColle"
"ctions\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Privilege"
"Release\020\006\022\027\n\023PrivilegeCompaction\020\007\022\023\n\017Pr"
"ivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020\t\022\032\n\026"
"PrivilegeGetStatistics\020\n\022\030\n\024PrivilegeCre"
"ateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020\014\022\026\n\022"
"PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSearch\020"
"\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQuery\020"
"\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Privilege"
"Import\020\022\022\034\n\030PrivilegeCreateOwnership\020\023\022\027"
"\n\023PrivilegeUpdateUser\020\024\022\032\n\026PrivilegeDrop"
"Ownership\020\025\022\034\n\030PrivilegeSelectOwnership\020"
"\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023Privi"
"legeSelectUser\020\030\022\023\n\017PrivilegeUpsert\020\031*S\n"
"\tStateCode\022\020\n\014Initializing\020\000\022\013\n\007Healthy\020"
"\001\022\014\n\010Abnormal\020\002\022\013\n\007StandBy\020\003\022\014\n\010Stopping"
"\020\004*c\n\tLoadState\022\025\n\021LoadStateNotExist\020\000\022\024"
"\n\020LoadStateNotLoad\020\001\022\024\n\020LoadStateLoading"
"\020\002\022\023\n\017LoadStateLoaded\020\003:^\n\021privilege_ext"
"_obj\022\037.google.protobuf.MessageOptions\030\351\007"
" \001(\0132!.milvus.proto.common.PrivilegeExtB"
"f\n\016io.milvus.grpcB\013CommonProtoP\001Z1github"
".com/milvus-io/milvus-proto/go-api/commo"
"npb\240\001\001\252\002\016IO.Milvus.Grpcb\006proto3"
"AutoIDTrue\0203\022\034\n\030InsufficientMemoryToLoad"
"\0204\022\030\n\024MemoryQuotaExhausted\0205\022\026\n\022DiskQuot"
"aExhausted\0206\022\025\n\021TimeTickLongDelay\0207\022\017\n\013D"
"ataCoordNA\020d\022\022\n\rDDRequestRace\020\350\007*c\n\nInde"
"xState\022\022\n\016IndexStateNone\020\000\022\014\n\010Unissued\020\001"
"\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n\n\006Failed"
"\020\004\022\t\n\005Retry\020\005*\202\001\n\014SegmentState\022\024\n\020Segmen"
"tStateNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022"
"\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013"
"\n\007Dropped\020\006\022\r\n\tImporting\020\007*>\n\017Placeholde"
"rType\022\010\n\004None\020\000\022\020\n\014BinaryVector\020d\022\017\n\013Flo"
"atVector\020e*\232\r\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n"
"\020CreateCollection\020d\022\022\n\016DropCollection\020e\022"
"\021\n\rHasCollection\020f\022\026\n\022DescribeCollection"
"\020g\022\023\n\017ShowCollections\020h\022\024\n\020GetSystemConf"
"igs\020i\022\022\n\016LoadCollection\020j\022\025\n\021ReleaseColl"
"ection\020k\022\017\n\013CreateAlias\020l\022\r\n\tDropAlias\020m"
"\022\016\n\nAlterAlias\020n\022\023\n\017AlterCollection\020o\022\024\n"
"\017CreatePartition\020\310\001\022\022\n\rDropPartition\020\311\001\022"
"\021\n\014HasPartition\020\312\001\022\026\n\021DescribePartition\020"
"\313\001\022\023\n\016ShowPartitions\020\314\001\022\023\n\016LoadPartition"
"s\020\315\001\022\026\n\021ReleasePartitions\020\316\001\022\021\n\014ShowSegm"
"ents\020\372\001\022\024\n\017DescribeSegment\020\373\001\022\021\n\014LoadSeg"
"ments\020\374\001\022\024\n\017ReleaseSegments\020\375\001\022\024\n\017Handof"
"fSegments\020\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025"
"\n\020DescribeSegments\020\200\002\022\020\n\013CreateIndex\020\254\002\022"
"\022\n\rDescribeIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006I"
"nsert\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022Res"
"endSegmentStats\020\223\003\022\013\n\006Upsert\020\224\003\022\013\n\006Searc"
"h\020\364\003\022\021\n\014SearchResult\020\365\003\022\022\n\rGetIndexState"
"\020\366\003\022\032\n\025GetIndexBuildProgress\020\367\003\022\034\n\027GetCo"
"llectionStatistics\020\370\003\022\033\n\026GetPartitionSta"
"tistics\020\371\003\022\r\n\010Retrieve\020\372\003\022\023\n\016RetrieveRes"
"ult\020\373\003\022\024\n\017WatchDmChannels\020\374\003\022\025\n\020RemoveDm"
"Channels\020\375\003\022\027\n\022WatchQueryChannels\020\376\003\022\030\n\023"
"RemoveQueryChannels\020\377\003\022\035\n\030SealedSegments"
"ChangeInfo\020\200\004\022\027\n\022WatchDeltaChannels\020\201\004\022\024"
"\n\017GetShardLeaders\020\202\004\022\020\n\013GetReplicas\020\203\004\022\023"
"\n\016UnsubDmChannel\020\204\004\022\024\n\017GetDistribution\020\205"
"\004\022\025\n\020SyncDistribution\020\206\004\022\020\n\013SegmentInfo\020"
"\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017GetRecoveryInfo\020\332"
"\004\022\024\n\017GetSegmentState\020\333\004\022\r\n\010TimeTick\020\260\t\022\023"
"\n\016QueryNodeStats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tR"
"equestID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017Allocate"
"Segment\020\265\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n\020Se"
"gmentFlushDone\020\267\t\022\017\n\nDataNodeTt\020\270\t\022\025\n\020Cr"
"eateCredential\020\334\013\022\022\n\rGetCredential\020\335\013\022\025\n"
"\020DeleteCredential\020\336\013\022\025\n\020UpdateCredential"
"\020\337\013\022\026\n\021ListCredUsernames\020\340\013\022\017\n\nCreateRol"
"e\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017OperateUserRole\020\302"
"\014\022\017\n\nSelectRole\020\303\014\022\017\n\nSelectUser\020\304\014\022\023\n\016S"
"electResource\020\305\014\022\025\n\020OperatePrivilege\020\306\014\022"
"\020\n\013SelectGrant\020\307\014\022\033\n\026RefreshPolicyInfoCa"
"che\020\310\014\022\017\n\nListPolicy\020\311\014*\"\n\007DslType\022\007\n\003Ds"
"l\020\000\022\016\n\nBoolExprV1\020\001*B\n\017CompactionState\022\021"
"\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tComp"
"leted\020\002*X\n\020ConsistencyLevel\022\n\n\006Strong\020\000\022"
"\013\n\007Session\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventually\020"
"\003\022\016\n\nCustomized\020\004*\236\001\n\013ImportState\022\021\n\rImp"
"ortPending\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImport"
"Started\020\002\022\023\n\017ImportPersisted\020\005\022\021\n\rImport"
"Flushed\020\010\022\023\n\017ImportCompleted\020\006\022\032\n\026Import"
"FailedAndCleaned\020\007*2\n\nObjectType\022\016\n\nColl"
"ection\020\000\022\n\n\006Global\020\001\022\010\n\004User\020\002*\233\005\n\017Objec"
"tPrivilege\022\020\n\014PrivilegeAll\020\000\022\035\n\031Privileg"
"eCreateCollection\020\001\022\033\n\027PrivilegeDropColl"
"ection\020\002\022\037\n\033PrivilegeDescribeCollection\020"
"\003\022\034\n\030PrivilegeShowCollections\020\004\022\021\n\rPrivi"
"legeLoad\020\005\022\024\n\020PrivilegeRelease\020\006\022\027\n\023Priv"
"ilegeCompaction\020\007\022\023\n\017PrivilegeInsert\020\010\022\023"
"\n\017PrivilegeDelete\020\t\022\032\n\026PrivilegeGetStati"
"stics\020\n\022\030\n\024PrivilegeCreateIndex\020\013\022\030\n\024Pri"
"vilegeIndexDetail\020\014\022\026\n\022PrivilegeDropInde"
"x\020\r\022\023\n\017PrivilegeSearch\020\016\022\022\n\016PrivilegeFlu"
"sh\020\017\022\022\n\016PrivilegeQuery\020\020\022\030\n\024PrivilegeLoa"
"dBalance\020\021\022\023\n\017PrivilegeImport\020\022\022\034\n\030Privi"
"legeCreateOwnership\020\023\022\027\n\023PrivilegeUpdate"
"User\020\024\022\032\n\026PrivilegeDropOwnership\020\025\022\034\n\030Pr"
"ivilegeSelectOwnership\020\026\022\034\n\030PrivilegeMan"
"ageOwnership\020\027\022\027\n\023PrivilegeSelectUser\020\030\022"
"\023\n\017PrivilegeUpsert\020\031*S\n\tStateCode\022\020\n\014Ini"
"tializing\020\000\022\013\n\007Healthy\020\001\022\014\n\010Abnormal\020\002\022\013"
"\n\007StandBy\020\003\022\014\n\010Stopping\020\004*c\n\tLoadState\022\025"
"\n\021LoadStateNotExist\020\000\022\024\n\020LoadStateNotLoa"
"d\020\001\022\024\n\020LoadStateLoading\020\002\022\023\n\017LoadStateLo"
"aded\020\003:^\n\021privilege_ext_obj\022\037.google.pro"
"tobuf.MessageOptions\030\351\007 \001(\0132!.milvus.pro"
"to.common.PrivilegeExtBf\n\016io.milvus.grpc"
"B\013CommonProtoP\001Z1github.com/milvus-io/mi"
"lvus-proto/go-api/commonpb\240\001\001\252\002\016IO.Milvu"
"s.Grpcb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
&::descriptor_table_google_2fprotobuf_2fdescriptor_2eproto,
@ -500,7 +503,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5591,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5694,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 11, 1,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 11, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
@ -568,6 +571,10 @@ bool ErrorCode_IsValid(int value) {
case 49:
case 50:
case 51:
case 52:
case 53:
case 54:
case 55:
case 100:
case 1000:
return true;

View File

@ -164,6 +164,10 @@ enum ErrorCode : int {
RateLimit = 49,
NodeIDNotMatch = 50,
UpsertAutoIDTrue = 51,
InsufficientMemoryToLoad = 52,
MemoryQuotaExhausted = 53,
DiskQuotaExhausted = 54,
TimeTickLongDelay = 55,
DataCoordNA = 100,
DDRequestRace = 1000,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),

View File

@ -54,5 +54,5 @@ message SetRatesRequest {
common.MsgBase base = 1;
repeated internal.Rate rates = 2;
repeated milvus.QuotaState states = 3;
repeated string state_reasons = 4;
repeated common.ErrorCode codes = 4;
}

View File

@ -255,7 +255,7 @@ type SetRatesRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Rates []*internalpb.Rate `protobuf:"bytes,2,rep,name=rates,proto3" json:"rates,omitempty"`
States []milvuspb.QuotaState `protobuf:"varint,3,rep,packed,name=states,proto3,enum=milvus.proto.milvus.QuotaState" json:"states,omitempty"`
StateReasons []string `protobuf:"bytes,4,rep,name=state_reasons,json=stateReasons,proto3" json:"state_reasons,omitempty"`
Codes []commonpb.ErrorCode `protobuf:"varint,4,rep,packed,name=codes,proto3,enum=milvus.proto.common.ErrorCode" json:"codes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -307,9 +307,9 @@ func (m *SetRatesRequest) GetStates() []milvuspb.QuotaState {
return nil
}
func (m *SetRatesRequest) GetStateReasons() []string {
func (m *SetRatesRequest) GetCodes() []commonpb.ErrorCode {
if m != nil {
return m.StateReasons
return m.Codes
}
return nil
}
@ -327,44 +327,44 @@ func init() { proto.RegisterFile("proxy.proto", fileDescriptor_700b50b08ed8dbaf)
var fileDescriptor_700b50b08ed8dbaf = []byte{
// 625 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x51, 0x4f, 0x13, 0x4d,
0x14, 0x65, 0x29, 0x2d, 0x70, 0xe9, 0x07, 0xc9, 0x84, 0x0f, 0x6b, 0x11, 0x6d, 0x16, 0x23, 0x0d,
0x89, 0xad, 0x54, 0x13, 0xdf, 0x29, 0x49, 0x43, 0x0c, 0x04, 0xa7, 0xfa, 0xe2, 0x0b, 0x99, 0xdd,
0xbd, 0xd0, 0x21, 0xdb, 0x99, 0x65, 0x67, 0x8a, 0xf6, 0xc9, 0xc4, 0x7f, 0xe4, 0x9b, 0x3f, 0xc4,
0x1f, 0x64, 0x76, 0x66, 0xbb, 0xd0, 0xba, 0x65, 0xa3, 0xc4, 0xb7, 0x3d, 0x77, 0xce, 0x9d, 0x73,
0xef, 0xec, 0xbd, 0x07, 0xd6, 0xa2, 0x58, 0x7e, 0x19, 0xb7, 0xa2, 0x58, 0x6a, 0x49, 0xc8, 0x90,
0x87, 0x37, 0x23, 0x65, 0x51, 0xcb, 0x9c, 0xd4, 0xab, 0xbe, 0x1c, 0x0e, 0xa5, 0xb0, 0xb1, 0xfa,
0x3a, 0x17, 0x1a, 0x63, 0xc1, 0xc2, 0x14, 0x57, 0xef, 0x66, 0xb8, 0x3f, 0x1c, 0x78, 0x7a, 0x2c,
0x6e, 0x58, 0xc8, 0x03, 0xa6, 0xb1, 0x2b, 0xc3, 0xf0, 0x04, 0x35, 0xeb, 0x32, 0x7f, 0x80, 0x14,
0xaf, 0x47, 0xa8, 0x34, 0x79, 0x05, 0x4b, 0x1e, 0x53, 0x58, 0x73, 0x1a, 0x4e, 0x73, 0xad, 0xf3,
0xa4, 0x35, 0xa5, 0x98, 0x4a, 0x9d, 0xa8, 0xcb, 0x43, 0xa6, 0x90, 0x1a, 0x26, 0x79, 0x04, 0xcb,
0x81, 0x77, 0x2e, 0xd8, 0x10, 0x6b, 0x8b, 0x0d, 0xa7, 0xb9, 0x4a, 0x2b, 0x81, 0x77, 0xca, 0x86,
0x48, 0xf6, 0x60, 0xc3, 0x97, 0x61, 0x88, 0xbe, 0xe6, 0x52, 0x58, 0x42, 0xc9, 0x10, 0xd6, 0x6f,
0xc3, 0x86, 0xe8, 0x42, 0xf5, 0x36, 0x72, 0x7c, 0x54, 0x5b, 0x6a, 0x38, 0xcd, 0x12, 0x9d, 0x8a,
0xb9, 0x57, 0x50, 0xbf, 0x53, 0x79, 0x8c, 0xc1, 0x03, 0xab, 0xae, 0xc3, 0xca, 0x48, 0x25, 0x2f,
0x95, 0x95, 0x9d, 0x61, 0xf7, 0x9b, 0x03, 0x5b, 0x1f, 0xa3, 0x7f, 0x2f, 0x94, 0x9c, 0x45, 0x4c,
0xa9, 0xcf, 0x32, 0x0e, 0xd2, 0xa7, 0xc9, 0xb0, 0xfb, 0x15, 0x76, 0x28, 0x5e, 0xc4, 0xa8, 0x06,
0x67, 0x32, 0xe4, 0xfe, 0xf8, 0x58, 0x5c, 0xc8, 0x07, 0x96, 0xb2, 0x05, 0x15, 0x19, 0x7d, 0x18,
0x47, 0xb6, 0x90, 0x32, 0x4d, 0x11, 0xd9, 0x84, 0xb2, 0x8c, 0xde, 0xe1, 0x38, 0xad, 0xc1, 0x02,
0xf7, 0xa7, 0x03, 0x1b, 0x7d, 0xd4, 0x94, 0x69, 0x54, 0x7f, 0xaf, 0x79, 0x00, 0xe5, 0x38, 0xb9,
0xa1, 0xb6, 0xd8, 0x28, 0x35, 0xd7, 0x3a, 0xdb, 0xd3, 0x29, 0xd9, 0xb4, 0x26, 0x2a, 0xd4, 0x32,
0xc9, 0x5b, 0xa8, 0x28, 0x6d, 0x72, 0x4a, 0x8d, 0x52, 0x73, 0xbd, 0xf3, 0x6c, 0x3a, 0x27, 0x05,
0xef, 0x47, 0x52, 0xb3, 0x7e, 0xc2, 0xa3, 0x29, 0x9d, 0xec, 0xc2, 0x7f, 0xe6, 0xeb, 0x3c, 0x46,
0xa6, 0xa4, 0x50, 0xb5, 0xa5, 0x46, 0xa9, 0xb9, 0x4a, 0xab, 0x26, 0x48, 0x6d, 0xac, 0xf3, 0x7d,
0x19, 0xca, 0x67, 0xc9, 0xe6, 0x90, 0x10, 0x48, 0x0f, 0x75, 0x57, 0x0e, 0x23, 0x29, 0x50, 0xe8,
0xbe, 0xbd, 0xa4, 0x95, 0xab, 0xf6, 0x3b, 0x31, 0x7d, 0x92, 0xfa, 0xf3, 0x5c, 0xfe, 0x0c, 0xd9,
0x5d, 0x20, 0xd7, 0xb0, 0xd9, 0x43, 0x03, 0xb9, 0xd2, 0xdc, 0x57, 0xdd, 0x01, 0x13, 0x02, 0x43,
0xd2, 0x99, 0xf3, 0x22, 0x79, 0xe4, 0x89, 0xe6, 0x6e, 0xae, 0x66, 0x5f, 0xc7, 0x5c, 0x5c, 0x52,
0x54, 0x91, 0x14, 0x0a, 0xdd, 0x05, 0x12, 0xc3, 0xce, 0xf4, 0xb6, 0xdb, 0x6d, 0xca, 0x76, 0x7e,
0x56, 0xdb, 0x5a, 0xcd, 0xfd, 0x06, 0x51, 0xdf, 0xce, 0xfd, 0xe9, 0x49, 0xa9, 0xa3, 0xa4, 0x4d,
0x06, 0xd5, 0x1e, 0xea, 0xa3, 0x60, 0xd2, 0xde, 0xfe, 0xfc, 0xf6, 0x32, 0xd2, 0x1f, 0xb6, 0x75,
0x05, 0x8f, 0xa7, 0xad, 0x00, 0x85, 0xe6, 0x2c, 0xb4, 0x2d, 0xb5, 0x0a, 0x5a, 0x9a, 0x59, 0xe8,
0xa2, 0x76, 0x3c, 0xf8, 0xff, 0xd6, 0x09, 0xee, 0xea, 0xec, 0xe7, 0xe9, 0xe4, 0x9b, 0x46, 0x91,
0xc6, 0x15, 0x6c, 0xe5, 0x6f, 0x3a, 0x39, 0xc8, 0x13, 0xb9, 0xd7, 0x15, 0x8a, 0xb4, 0x02, 0xd8,
0xe8, 0xa1, 0x36, 0xf3, 0x7f, 0x82, 0x3a, 0xe6, 0xbe, 0x22, 0x2f, 0xe6, 0x0d, 0x7c, 0x4a, 0x98,
0xdc, 0xbc, 0x57, 0xc8, 0xcb, 0xfe, 0xd0, 0x29, 0xac, 0x4c, 0x9c, 0x83, 0xec, 0xe6, 0xf5, 0x30,
0xe3, 0x2b, 0x05, 0x55, 0x1f, 0xbe, 0xf9, 0xd4, 0xb9, 0xe4, 0x7a, 0x30, 0xf2, 0x92, 0x93, 0xb6,
0xa5, 0xbe, 0xe4, 0x32, 0xfd, 0x6a, 0x4f, 0x86, 0xaa, 0x6d, 0xb2, 0xdb, 0x46, 0x22, 0xf2, 0xbc,
0x8a, 0x81, 0xaf, 0x7f, 0x05, 0x00, 0x00, 0xff, 0xff, 0x36, 0x94, 0x65, 0x38, 0x43, 0x07, 0x00,
0x14, 0x65, 0x29, 0x2d, 0x7c, 0x97, 0x06, 0x92, 0x09, 0x1f, 0xd6, 0x22, 0xd8, 0x2c, 0x46, 0x1a,
0x12, 0x5b, 0xa9, 0x24, 0xbe, 0x53, 0x4c, 0x43, 0x0c, 0x04, 0xb7, 0xfa, 0xe2, 0x8b, 0x99, 0xdd,
0xbd, 0xd0, 0x21, 0xdb, 0x99, 0x65, 0x66, 0x16, 0xed, 0x93, 0x89, 0xff, 0xc8, 0x37, 0xff, 0x90,
0xff, 0xc3, 0xec, 0xce, 0x76, 0x61, 0xeb, 0x96, 0x8d, 0x12, 0xdf, 0x7a, 0x66, 0xcf, 0x9d, 0x73,
0xee, 0xed, 0xdc, 0x03, 0xab, 0xa1, 0x14, 0x5f, 0x26, 0x9d, 0x50, 0x0a, 0x2d, 0x08, 0x19, 0xb3,
0xe0, 0x26, 0x52, 0x06, 0x75, 0x92, 0x2f, 0xcd, 0xba, 0x27, 0xc6, 0x63, 0xc1, 0xcd, 0x59, 0x73,
0x8d, 0x71, 0x8d, 0x92, 0xd3, 0x20, 0xc5, 0xf5, 0xbb, 0x15, 0xf6, 0x0f, 0x0b, 0x76, 0x4e, 0xf8,
0x0d, 0x0d, 0x98, 0x4f, 0x35, 0xf6, 0x45, 0x10, 0x9c, 0xa2, 0xa6, 0x7d, 0xea, 0x8d, 0xd0, 0xc1,
0xeb, 0x08, 0x95, 0x26, 0x2f, 0x61, 0xc9, 0xa5, 0x0a, 0x1b, 0x56, 0xcb, 0x6a, 0xaf, 0xf6, 0x9e,
0x74, 0x72, 0x8a, 0xa9, 0xd4, 0xa9, 0xba, 0x3c, 0xa2, 0x0a, 0x9d, 0x84, 0x49, 0x1e, 0xc1, 0xb2,
0xef, 0x7e, 0xe2, 0x74, 0x8c, 0x8d, 0xc5, 0x96, 0xd5, 0xfe, 0xcf, 0xa9, 0xf9, 0xee, 0x19, 0x1d,
0x23, 0xd9, 0x83, 0x75, 0x4f, 0x04, 0x01, 0x7a, 0x9a, 0x09, 0x6e, 0x08, 0x95, 0x84, 0xb0, 0x76,
0x7b, 0x9c, 0x10, 0x6d, 0xa8, 0xdf, 0x9e, 0x9c, 0x1c, 0x37, 0x96, 0x5a, 0x56, 0xbb, 0xe2, 0xe4,
0xce, 0xec, 0x2b, 0x68, 0xde, 0x71, 0x2e, 0xd1, 0x7f, 0xa0, 0xeb, 0x26, 0xac, 0x44, 0x2a, 0x9e,
0x54, 0x66, 0x3b, 0xc3, 0xf6, 0x37, 0x0b, 0x36, 0x3f, 0x84, 0xff, 0x5e, 0x28, 0xfe, 0x16, 0x52,
0xa5, 0x3e, 0x0b, 0xe9, 0xa7, 0xa3, 0xc9, 0xb0, 0xfd, 0x15, 0xb6, 0x1d, 0xbc, 0x90, 0xa8, 0x46,
0xe7, 0x22, 0x60, 0xde, 0xe4, 0x84, 0x5f, 0x88, 0x07, 0x5a, 0xd9, 0x84, 0x9a, 0x08, 0xdf, 0x4f,
0x42, 0x63, 0xa4, 0xea, 0xa4, 0x88, 0x6c, 0x40, 0x55, 0x84, 0x6f, 0x71, 0x92, 0x7a, 0x30, 0xc0,
0xfe, 0x69, 0xc1, 0xfa, 0x10, 0xb5, 0x43, 0x35, 0xaa, 0xbf, 0xd7, 0x3c, 0x80, 0xaa, 0x8c, 0x6f,
0x68, 0x2c, 0xb6, 0x2a, 0xed, 0xd5, 0xde, 0x56, 0xbe, 0x24, 0x7b, 0xad, 0xb1, 0x8a, 0x63, 0x98,
0xe4, 0x35, 0xd4, 0x94, 0x4e, 0x6a, 0x2a, 0xad, 0x4a, 0x7b, 0xad, 0xf7, 0x34, 0x5f, 0x93, 0x82,
0x77, 0x91, 0xd0, 0x74, 0x18, 0xf3, 0x9c, 0x94, 0x4e, 0x0e, 0xa1, 0xea, 0x09, 0x1f, 0x55, 0x63,
0x29, 0xa9, 0xdb, 0x29, 0xb4, 0xf7, 0x46, 0x4a, 0x21, 0xfb, 0xc2, 0x47, 0xc7, 0x90, 0x7b, 0xdf,
0x97, 0xa1, 0x7a, 0x1e, 0xaf, 0x12, 0x09, 0x80, 0x0c, 0x50, 0xf7, 0xc5, 0x38, 0x14, 0x1c, 0xb9,
0x1e, 0x9a, 0x5b, 0x3b, 0x85, 0xf2, 0xbf, 0x13, 0xd3, 0x19, 0x35, 0x9f, 0x15, 0xf2, 0x67, 0xc8,
0xf6, 0x02, 0xb9, 0x86, 0x8d, 0x01, 0x26, 0x90, 0x29, 0xcd, 0x3c, 0xd5, 0x1f, 0x51, 0xce, 0x31,
0x20, 0xbd, 0x39, 0x23, 0x2a, 0x22, 0x4f, 0x35, 0x77, 0x0b, 0x35, 0x87, 0x5a, 0x32, 0x7e, 0xe9,
0xa0, 0x0a, 0x05, 0x57, 0x68, 0x2f, 0x10, 0x09, 0xdb, 0xf9, 0xf5, 0x37, 0xeb, 0x95, 0x85, 0xc0,
0xac, 0xb6, 0xc9, 0x9e, 0xfb, 0x13, 0xa3, 0xb9, 0x55, 0x38, 0xe6, 0xd8, 0x6a, 0x14, 0xb7, 0x49,
0xa1, 0x3e, 0x40, 0x7d, 0xec, 0x4f, 0xdb, 0xdb, 0x9f, 0xdf, 0x5e, 0x46, 0xfa, 0xc3, 0xb6, 0xae,
0xe0, 0x71, 0x3e, 0x1b, 0x90, 0x6b, 0x46, 0x03, 0xd3, 0x52, 0xa7, 0xa4, 0xa5, 0x99, 0x0d, 0x2f,
0x6b, 0xc7, 0x85, 0xff, 0x6f, 0xa3, 0xe1, 0xae, 0xce, 0x7e, 0x91, 0x4e, 0x71, 0x8a, 0x94, 0x69,
0x5c, 0xc1, 0x66, 0xf1, 0xea, 0x93, 0x83, 0x22, 0x91, 0x7b, 0x63, 0xa2, 0x4c, 0xcb, 0x87, 0xf5,
0x01, 0xea, 0xe4, 0xfd, 0x9f, 0xa2, 0x96, 0xcc, 0x53, 0xe4, 0xf9, 0xbc, 0x07, 0x9f, 0x12, 0xa6,
0x37, 0xef, 0x95, 0xf2, 0xb2, 0x7f, 0xe8, 0x0c, 0x56, 0xa6, 0x51, 0x42, 0x76, 0x8b, 0x7a, 0x98,
0x09, 0x9a, 0x12, 0xd7, 0x47, 0x87, 0x1f, 0x7b, 0x97, 0x4c, 0x8f, 0x22, 0x37, 0xfe, 0xd2, 0x35,
0xd4, 0x17, 0x4c, 0xa4, 0xbf, 0xba, 0xd3, 0x47, 0xd5, 0x4d, 0xaa, 0xbb, 0x89, 0x44, 0xe8, 0xba,
0xb5, 0x04, 0xbe, 0xfa, 0x15, 0x00, 0x00, 0xff, 0xff, 0x47, 0x77, 0x96, 0xe8, 0x54, 0x07, 0x00,
0x00,
}

View File

@ -24,8 +24,6 @@ import (
"google.golang.org/grpc/status"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
)
// TODO(dragondriver): add more common error type
@ -92,22 +90,3 @@ func ErrProxyNotReady() error {
func ErrPartitionNotExist(partitionName string) error {
return fmt.Errorf("partition is not exist: %s", partitionName)
}
var (
ErrRateLimit = errors.New("RequestLimited")
ErrForceDeny = errors.New("RequestDenied")
)
func wrapRateLimitError() error {
return fmt.Errorf("[%w] request is rejected by grpc RateLimiter middleware, please retry later", ErrRateLimit)
}
func wrapForceDenyError(rt internalpb.RateType, limiter types.Limiter) error {
switch rt {
case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad:
return fmt.Errorf("[%w] deny to write, reason: %s", ErrForceDeny, limiter.GetWriteStateReason())
case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery:
return fmt.Errorf("[%w] deny to read, reason: %s", ErrForceDeny, limiter.GetReadStateReason())
}
return nil
}

View File

@ -17,14 +17,12 @@
package proxy
import (
"errors"
"testing"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func Test_errInvalidNumRows(t *testing.T) {
@ -152,16 +150,3 @@ func Test_errProxyIsUnhealthy(t *testing.T) {
zap.Error(errProxyIsUnhealthy(id)))
}
}
func Test_ErrRateLimitAndErrForceDeny(t *testing.T) {
err := wrapRateLimitError()
assert.True(t, errors.Is(err, ErrRateLimit))
limiter := NewMultiRateLimiter()
err = wrapForceDenyError(internalpb.RateType_DMLInsert, limiter)
assert.True(t, errors.Is(err, ErrForceDeny))
err = wrapForceDenyError(internalpb.RateType_DMLDelete, limiter)
assert.True(t, errors.Is(err, ErrForceDeny))
err = wrapForceDenyError(internalpb.RateType_DQLSearch, limiter)
assert.True(t, errors.Is(err, ErrForceDeny))
}

View File

@ -4310,11 +4310,11 @@ func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesReques
resp.Reason = err.Error()
return resp, nil
}
node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetStateReasons())
node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetCodes())
log.Info("current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Any("rates", request.GetRates()))
if len(request.GetStates()) != 0 {
for i := range request.GetStates() {
log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetStateReasons()[i]))
log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetCodes()[i].String()))
}
}
resp.ErrorCode = commonpb.ErrorCode_Success

View File

@ -133,8 +133,8 @@ func TestProxy_CheckHealth(t *testing.T) {
assert.Equal(t, 0, len(resp.GetReasons()))
states := []milvuspb.QuotaState{milvuspb.QuotaState_DenyToWrite, milvuspb.QuotaState_DenyToRead}
reasons := []string{"memory quota exhausted", "manually deny to read"}
node.multiRateLimiter.SetQuotaStates(states, reasons)
codes := []commonpb.ErrorCode{commonpb.ErrorCode_MemoryQuotaExhausted, commonpb.ErrorCode_ForceDeny}
node.multiRateLimiter.SetQuotaStates(states, codes)
resp, err = node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)

View File

@ -23,6 +23,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
@ -31,13 +32,24 @@ import (
"github.com/milvus-io/milvus/internal/util/ratelimitutil"
)
var QuotaErrorString = map[commonpb.ErrorCode]string{
commonpb.ErrorCode_ForceDeny: "manually force deny",
commonpb.ErrorCode_MemoryQuotaExhausted: "memory quota exhausted, please allocate more resources",
commonpb.ErrorCode_DiskQuotaExhausted: "disk quota exhausted, please allocate more resources",
commonpb.ErrorCode_TimeTickLongDelay: "time tick long delay",
}
func GetQuotaErrorString(errCode commonpb.ErrorCode) string {
return QuotaErrorString[errCode]
}
// MultiRateLimiter includes multilevel rate limiters, such as global rateLimiter,
// collection level rateLimiter and so on. It also implements Limiter interface.
type MultiRateLimiter struct {
globalRateLimiter *rateLimiter
// TODO: add collection level rateLimiter
quotaStatesMu sync.RWMutex
quotaStates map[milvuspb.QuotaState]string
quotaStates map[milvuspb.QuotaState]commonpb.ErrorCode
}
// NewMultiRateLimiter returns a new MultiRateLimiter.
@ -48,18 +60,32 @@ func NewMultiRateLimiter() *MultiRateLimiter {
}
// Check checks if request would be limited or denied.
func (m *MultiRateLimiter) Check(rt internalpb.RateType, n int) error {
func (m *MultiRateLimiter) Check(rt internalpb.RateType, n int) commonpb.ErrorCode {
if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
return nil
return commonpb.ErrorCode_Success
}
limit, rate := m.globalRateLimiter.limit(rt, n)
if rate == 0 {
return wrapForceDenyError(rt, m)
return m.GetErrorCode(rt)
}
if limit {
return wrapRateLimitError()
return commonpb.ErrorCode_RateLimit
}
return nil
return commonpb.ErrorCode_Success
}
func (m *MultiRateLimiter) GetErrorCode(rt internalpb.RateType) commonpb.ErrorCode {
switch rt {
case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad:
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
return m.quotaStates[milvuspb.QuotaState_DenyToWrite]
case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery:
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
return m.quotaStates[milvuspb.QuotaState_DenyToRead]
}
return commonpb.ErrorCode_Success
}
// GetQuotaStates returns quota states.
@ -70,30 +96,18 @@ func (m *MultiRateLimiter) GetQuotaStates() ([]milvuspb.QuotaState, []string) {
reasons := make([]string, 0, len(m.quotaStates))
for k, v := range m.quotaStates {
states = append(states, k)
reasons = append(reasons, v)
reasons = append(reasons, GetQuotaErrorString(v))
}
return states, reasons
}
func (m *MultiRateLimiter) GetReadStateReason() string {
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
return m.quotaStates[milvuspb.QuotaState_DenyToRead]
}
func (m *MultiRateLimiter) GetWriteStateReason() string {
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
return m.quotaStates[milvuspb.QuotaState_DenyToWrite]
}
// SetQuotaStates sets quota states for MultiRateLimiter.
func (m *MultiRateLimiter) SetQuotaStates(states []milvuspb.QuotaState, reasons []string) {
func (m *MultiRateLimiter) SetQuotaStates(states []milvuspb.QuotaState, codes []commonpb.ErrorCode) {
m.quotaStatesMu.Lock()
defer m.quotaStatesMu.Unlock()
m.quotaStates = make(map[milvuspb.QuotaState]string, len(states))
m.quotaStates = make(map[milvuspb.QuotaState]commonpb.ErrorCode, len(states))
for i := 0; i < len(states); i++ {
m.quotaStates[states[i]] = reasons[i]
m.quotaStates[states[i]] = codes[i]
}
}

View File

@ -17,12 +17,11 @@
package proxy
import (
"errors"
"fmt"
"math"
"testing"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/ratelimitutil"
@ -38,12 +37,12 @@ func TestMultiRateLimiter(t *testing.T) {
multiLimiter.globalRateLimiter.limiters[internalpb.RateType(rt)] = ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)
}
for _, rt := range internalpb.RateType_value {
err := multiLimiter.Check(internalpb.RateType(rt), 1)
assert.NoError(t, err)
err = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt)
assert.NoError(t, err)
err = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt)
assert.True(t, errors.Is(err, ErrRateLimit))
errCode := multiLimiter.Check(internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(internalpb.RateType(rt), math.MaxInt)
assert.Equal(t, commonpb.ErrorCode_RateLimit, errCode)
}
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})
@ -53,8 +52,8 @@ func TestMultiRateLimiter(t *testing.T) {
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "false")
for _, rt := range internalpb.RateType_value {
err := multiLimiter.Check(internalpb.RateType(rt), 1)
assert.NoError(t, err)
errCode := multiLimiter.Check(internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
}
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})
@ -66,8 +65,8 @@ func TestMultiRateLimiter(t *testing.T) {
multiLimiter := NewMultiRateLimiter()
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
err := multiLimiter.Check(internalpb.RateType_DMLInsert, 1*1024*1024)
assert.NoError(t, err)
errCode := multiLimiter.Check(internalpb.RateType_DMLInsert, 1*1024*1024)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
Params.QuotaConfig.DMLMaxInsertRate = bakInsertRate
}
@ -77,17 +76,6 @@ func TestMultiRateLimiter(t *testing.T) {
run(math.MaxFloat64 / 3)
run(math.MaxFloat64 / 10000)
})
t.Run("test GetReadStateReason and GetWriteStateReason", func(t *testing.T) {
multiLimiter := NewMultiRateLimiter()
states := []milvuspb.QuotaState{milvuspb.QuotaState_DenyToWrite, milvuspb.QuotaState_DenyToRead}
writeReason := "memory quota exhausted"
readReason := "manually deny to read"
reasons := []string{writeReason, readReason}
multiLimiter.SetQuotaStates(states, reasons)
assert.Equal(t, writeReason, multiLimiter.GetWriteStateReason())
assert.Equal(t, readReason, multiLimiter.GetReadStateReason())
})
}
func TestRateLimiter(t *testing.T) {

View File

@ -18,7 +18,6 @@ package proxy
import (
"context"
"errors"
"fmt"
"reflect"
@ -38,15 +37,9 @@ func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor {
if err != nil {
return handler(ctx, req)
}
err = limiter.Check(rt, n)
if errors.Is(err, ErrForceDeny) {
rsp := getFailedResponse(req, commonpb.ErrorCode_ForceDeny, info.FullMethod, err)
if rsp != nil {
return rsp, nil
}
}
if errors.Is(err, ErrRateLimit) {
rsp := getFailedResponse(req, commonpb.ErrorCode_RateLimit, info.FullMethod, err)
code := limiter.Check(rt, n)
if code != commonpb.ErrorCode_Success {
rsp := getFailedResponse(req, rt, code, info.FullMethod)
if rsp != nil {
return rsp, nil
}
@ -113,37 +106,53 @@ func failedBoolResponse(code commonpb.ErrorCode, reason string) *milvuspb.BoolRe
}
}
func wrapQuotaError(rt internalpb.RateType, errCode commonpb.ErrorCode, fullMethod string) error {
if errCode == commonpb.ErrorCode_RateLimit {
return fmt.Errorf("request is rejected by grpc RateLimiter middleware, please retry later, req: %s", fullMethod)
}
// deny to write/read
var op string
switch rt {
case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad:
op = "write"
case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery:
op = "read"
}
return fmt.Errorf("deny to %s, reason: %s, req: %s", op, GetQuotaErrorString(errCode), fullMethod)
}
// getFailedResponse returns failed response.
func getFailedResponse(req interface{}, code commonpb.ErrorCode, fullMethod string, err error) interface{} {
reason := fmt.Sprintf("%s, req: %s", err, fullMethod)
func getFailedResponse(req interface{}, rt internalpb.RateType, errCode commonpb.ErrorCode, fullMethod string) interface{} {
err := wrapQuotaError(rt, errCode, fullMethod)
switch req.(type) {
case *milvuspb.InsertRequest, *milvuspb.DeleteRequest:
return failedMutationResult(code, reason)
return failedMutationResult(errCode, err.Error())
case *milvuspb.ImportRequest:
return &milvuspb.ImportResponse{
Status: failedStatus(code, reason),
Status: failedStatus(errCode, err.Error()),
}
case *milvuspb.SearchRequest:
return &milvuspb.SearchResults{
Status: failedStatus(code, reason),
Status: failedStatus(errCode, err.Error()),
}
case *milvuspb.QueryRequest:
return &milvuspb.QueryResults{
Status: failedStatus(code, reason),
Status: failedStatus(errCode, err.Error()),
}
case *milvuspb.CreateCollectionRequest, *milvuspb.DropCollectionRequest,
*milvuspb.LoadCollectionRequest, *milvuspb.ReleaseCollectionRequest,
*milvuspb.CreatePartitionRequest, *milvuspb.DropPartitionRequest,
*milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest,
*milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest:
return failedStatus(code, reason)
return failedStatus(errCode, err.Error())
case *milvuspb.FlushRequest:
return &milvuspb.FlushResponse{
Status: failedStatus(code, reason),
Status: failedStatus(errCode, err.Error()),
}
case *milvuspb.ManualCompactionRequest:
return &milvuspb.ManualCompactionResponse{
Status: failedStatus(code, reason),
Status: failedStatus(errCode, err.Error()),
}
}
return nil

View File

@ -18,7 +18,6 @@ package proxy
import (
"context"
"fmt"
"testing"
"github.com/golang/protobuf/proto"
@ -34,35 +33,17 @@ type limiterMock struct {
limit bool
rate float64
quotaStates []milvuspb.QuotaState
quotaStateReasons []string
quotaStateReasons []commonpb.ErrorCode
}
func (l *limiterMock) Check(rt internalpb.RateType, n int) error {
func (l *limiterMock) Check(rt internalpb.RateType, n int) commonpb.ErrorCode {
if l.rate == 0 {
return ErrForceDeny
return commonpb.ErrorCode_ForceDeny
}
if l.limit {
return ErrRateLimit
return commonpb.ErrorCode_RateLimit
}
return nil
}
func (l *limiterMock) GetReadStateReason() string {
for i := range l.quotaStates {
if l.quotaStates[i] == milvuspb.QuotaState_DenyToRead {
return l.quotaStateReasons[i]
}
}
return ""
}
func (l *limiterMock) GetWriteStateReason() string {
for i := range l.quotaStates {
if l.quotaStates[i] == milvuspb.QuotaState_DenyToWrite {
return l.quotaStateReasons[i]
}
}
return ""
return commonpb.ErrorCode_Success
}
func TestRateLimitInterceptor(t *testing.T) {
@ -119,23 +100,23 @@ func TestRateLimitInterceptor(t *testing.T) {
})
t.Run("test getFailedResponse", func(t *testing.T) {
testGetFailedResponse := func(req interface{}) {
rsp := getFailedResponse(req, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err"))
testGetFailedResponse := func(req interface{}, rt internalpb.RateType, errCode commonpb.ErrorCode, fullMethod string) {
rsp := getFailedResponse(req, rt, errCode, fullMethod)
assert.NotNil(t, rsp)
}
testGetFailedResponse(&milvuspb.DeleteRequest{})
testGetFailedResponse(&milvuspb.ImportRequest{})
testGetFailedResponse(&milvuspb.SearchRequest{})
testGetFailedResponse(&milvuspb.QueryRequest{})
testGetFailedResponse(&milvuspb.CreateCollectionRequest{})
testGetFailedResponse(&milvuspb.FlushRequest{})
testGetFailedResponse(&milvuspb.ManualCompactionRequest{})
testGetFailedResponse(&milvuspb.DeleteRequest{}, internalpb.RateType_DMLDelete, commonpb.ErrorCode_ForceDeny, "delete")
testGetFailedResponse(&milvuspb.ImportRequest{}, internalpb.RateType_DMLBulkLoad, commonpb.ErrorCode_MemoryQuotaExhausted, "import")
testGetFailedResponse(&milvuspb.SearchRequest{}, internalpb.RateType_DQLSearch, commonpb.ErrorCode_DiskQuotaExhausted, "search")
testGetFailedResponse(&milvuspb.QueryRequest{}, internalpb.RateType_DQLQuery, commonpb.ErrorCode_ForceDeny, "query")
testGetFailedResponse(&milvuspb.CreateCollectionRequest{}, internalpb.RateType_DDLCollection, commonpb.ErrorCode_RateLimit, "createCollection")
testGetFailedResponse(&milvuspb.FlushRequest{}, internalpb.RateType_DDLFlush, commonpb.ErrorCode_RateLimit, "flush")
testGetFailedResponse(&milvuspb.ManualCompactionRequest{}, internalpb.RateType_DDLCompaction, commonpb.ErrorCode_RateLimit, "compaction")
// test illegal
rsp := getFailedResponse(&milvuspb.SearchResults{}, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err"))
rsp := getFailedResponse(&milvuspb.SearchResults{}, internalpb.RateType_DQLSearch, commonpb.ErrorCode_UnexpectedError, "method")
assert.Nil(t, rsp)
rsp = getFailedResponse(nil, commonpb.ErrorCode_UnexpectedError, "method", fmt.Errorf("mock err"))
rsp = getFailedResponse(nil, internalpb.RateType_DQLSearch, commonpb.ErrorCode_UnexpectedError, "method")
assert.Nil(t, rsp)
})

View File

@ -46,31 +46,6 @@ const (
SetRatesTimeout = 10 * time.Second
)
type TriggerReason int32
const (
ManuallyDenyToRead TriggerReason = 0
ManuallyDenyToWrite TriggerReason = 1
MemoryQuotaExhausted TriggerReason = 2
DiskQuotaExhausted TriggerReason = 3
TimeTickLongDelay TriggerReason = 4
)
var TriggerReasonString = map[TriggerReason]string{
ManuallyDenyToRead: "manually deny to read",
ManuallyDenyToWrite: "manually deny to write",
MemoryQuotaExhausted: "memory quota exhausted, please allocate more resources",
DiskQuotaExhausted: "disk quota exhausted, please allocate more resources",
TimeTickLongDelay: "time tick long delay",
}
func (t TriggerReason) String() string {
if s, ok := TriggerReasonString[t]; ok {
return s
}
return ""
}
type RateAllocateStrategy int32
const (
@ -114,7 +89,7 @@ type QuotaCenter struct {
dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics
currentRates map[internalpb.RateType]Limit
quotaStates map[milvuspb.QuotaState]string
quotaStates map[milvuspb.QuotaState]commonpb.ErrorCode
tsoAllocator tso.Allocator
rateAllocateStrategy RateAllocateStrategy
@ -130,7 +105,7 @@ func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, da
queryCoord: queryCoord,
dataCoord: dataCoord,
currentRates: make(map[internalpb.RateType]Limit),
quotaStates: make(map[milvuspb.QuotaState]string),
quotaStates: make(map[milvuspb.QuotaState]commonpb.ErrorCode),
tsoAllocator: tsoAllocator,
rateAllocateStrategy: DefaultRateAllocateStrategy,
@ -271,20 +246,20 @@ func (q *QuotaCenter) syncMetrics() error {
}
// forceDenyWriting sets dml rates to 0 to reject all dml requests.
func (q *QuotaCenter) forceDenyWriting(reason TriggerReason) {
func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode) {
q.currentRates[internalpb.RateType_DMLInsert] = 0
q.currentRates[internalpb.RateType_DMLDelete] = 0
q.currentRates[internalpb.RateType_DMLBulkLoad] = 0
log.Warn("QuotaCenter force to deny writing", zap.String("reason", reason.String()))
q.quotaStates[milvuspb.QuotaState_DenyToWrite] = reason.String()
log.Warn("QuotaCenter force to deny writing", zap.String("reason", errorCode.String()))
q.quotaStates[milvuspb.QuotaState_DenyToWrite] = errorCode
}
// forceDenyWriting sets dql rates to 0 to reject all dql requests.
func (q *QuotaCenter) forceDenyReading(reason TriggerReason) {
func (q *QuotaCenter) forceDenyReading(errorCode commonpb.ErrorCode) {
q.currentRates[internalpb.RateType_DQLSearch] = 0
q.currentRates[internalpb.RateType_DQLQuery] = 0
log.Warn("QuotaCenter force to deny reading", zap.String("reason", reason.String()))
q.quotaStates[milvuspb.QuotaState_DenyToRead] = reason.String()
log.Warn("QuotaCenter force to deny reading", zap.String("reason", errorCode.String()))
q.quotaStates[milvuspb.QuotaState_DenyToRead] = errorCode
}
// getRealTimeRate return real time rate in Proxy.
@ -310,7 +285,7 @@ func (q *QuotaCenter) guaranteeMinRate(minRate float64, rateType internalpb.Rate
// calculateReadRates calculates and sets dql rates.
func (q *QuotaCenter) calculateReadRates() {
if Params.QuotaConfig.ForceDenyReading.GetAsBool() {
q.forceDenyReading(ManuallyDenyToRead)
q.forceDenyReading(commonpb.ErrorCode_ForceDeny)
return
}
@ -355,13 +330,13 @@ func (q *QuotaCenter) calculateReadRates() {
// calculateWriteRates calculates and sets dml rates.
func (q *QuotaCenter) calculateWriteRates() error {
if Params.QuotaConfig.ForceDenyWriting.GetAsBool() {
q.forceDenyWriting(ManuallyDenyToWrite)
q.forceDenyWriting(commonpb.ErrorCode_ForceDeny)
return nil
}
exceeded := q.ifDiskQuotaExceeded()
if exceeded {
q.forceDenyWriting(DiskQuotaExhausted) // disk quota protection
q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted) // disk quota protection
return nil
}
@ -371,13 +346,13 @@ func (q *QuotaCenter) calculateWriteRates() error {
}
ttFactor := q.getTimeTickDelayFactor(ts)
if ttFactor <= 0 {
q.forceDenyWriting(TimeTickLongDelay) // tt protection
q.forceDenyWriting(commonpb.ErrorCode_TimeTickLongDelay) // tt protection
return nil
}
memFactor := q.getMemoryFactor()
if memFactor <= 0 {
q.forceDenyWriting(MemoryQuotaExhausted) // memory protection
q.forceDenyWriting(commonpb.ErrorCode_MemoryQuotaExhausted) // memory protection
return nil
}
@ -430,7 +405,7 @@ func (q *QuotaCenter) resetCurrentRates() {
q.currentRates[rt] = Inf // no limit
}
}
q.quotaStates = make(map[milvuspb.QuotaState]string)
q.quotaStates = make(map[milvuspb.QuotaState]commonpb.ErrorCode)
}
// getTimeTickDelayFactor gets time tick delay of DataNodes and QueryNodes,
@ -689,10 +664,10 @@ func (q *QuotaCenter) setRates() error {
// TODO: support ByRateWeight
}
states := make([]milvuspb.QuotaState, 0, len(q.quotaStates))
stateReasons := make([]string, 0, len(q.quotaStates))
codes := make([]commonpb.ErrorCode, 0, len(q.quotaStates))
for k, v := range q.quotaStates {
states = append(states, k)
stateReasons = append(stateReasons, v)
codes = append(codes, v)
}
timestamp := tsoutil.ComposeTSByTime(time.Now(), 0)
req := &proxypb.SetRatesRequest{
@ -702,24 +677,23 @@ func (q *QuotaCenter) setRates() error {
),
Rates: map2List(),
States: states,
StateReasons: stateReasons,
Codes: codes,
}
return q.proxies.SetRates(ctx, req)
}
// recordMetrics records metrics of quota states.
func (q *QuotaCenter) recordMetrics() {
for _, reason := range TriggerReasonString {
hit := false
record := func(errorCode commonpb.ErrorCode) {
for _, v := range q.quotaStates {
if v == reason {
hit = true
if v == errorCode {
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String()).Set(1)
return
}
}
if hit {
metrics.RootCoordQuotaStates.WithLabelValues(reason).Set(1)
} else {
metrics.RootCoordQuotaStates.WithLabelValues(reason).Set(0)
}
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String()).Set(0)
}
record(commonpb.ErrorCode_MemoryQuotaExhausted)
record(commonpb.ErrorCode_DiskQuotaExhausted)
record(commonpb.ErrorCode_TimeTickLongDelay)
}

View File

@ -115,10 +115,10 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test forceDeny", func(t *testing.T) {
quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter.forceDenyReading(ManuallyDenyToRead)
quotaCenter.forceDenyReading(commonpb.ErrorCode_ForceDeny)
assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DQLQuery])
quotaCenter.forceDenyWriting(ManuallyDenyToWrite)
quotaCenter.forceDenyWriting(commonpb.ErrorCode_ForceDeny)
assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DMLInsert])
assert.Equal(t, Limit(0), quotaCenter.currentRates[internalpb.RateType_DMLDelete])
})
@ -441,16 +441,16 @@ func TestQuotaCenter(t *testing.T) {
t.Run("test setRates", func(t *testing.T) {
quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter.currentRates[internalpb.RateType_DMLInsert] = 100
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = TriggerReasonString[MemoryQuotaExhausted]
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = TriggerReasonString[ManuallyDenyToRead]
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = commonpb.ErrorCode_MemoryQuotaExhausted
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = commonpb.ErrorCode_ForceDeny
err = quotaCenter.setRates()
assert.NoError(t, err)
})
t.Run("test recordMetrics", func(t *testing.T) {
quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = TriggerReasonString[MemoryQuotaExhausted]
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = TriggerReasonString[ManuallyDenyToRead]
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToWrite] = commonpb.ErrorCode_MemoryQuotaExhausted
quotaCenter.quotaStates[milvuspb.QuotaState_DenyToRead] = commonpb.ErrorCode_ForceDeny
quotaCenter.recordMetrics()
})

View File

@ -41,9 +41,7 @@ type TimeTickProvider interface {
// If Limit function return true, the request will be rejected.
// Otherwise, the request will pass. Limit also returns limit of limiter.
type Limiter interface {
Check(rt internalpb.RateType, n int) error
GetReadStateReason() string
GetWriteStateReason() string
Check(rt internalpb.RateType, n int) commonpb.ErrorCode
}
// Component is the interface all services implement