sync resource group change (#22269)

fix load collection with rg (#22083)
	refine rg capacity behavior (#22089)
	support rg api rbac (#22097)
	Add resource group tests (#22101)
	Fix gen_vectors typo issue (#22117)
	fix transfer node (#22120)
	fix rg e2e (#22187)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/22257/head
wei liu 2023-02-20 19:22:26 +08:00 committed by GitHub
parent faf62cfa55
commit cd2e7fa535
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1928 additions and 275 deletions

12
go.mod
View File

@ -28,7 +28,7 @@ require (
github.com/klauspost/compress v1.14.2
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230213095429-2c3e821f707d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378
github.com/minio/minio-go/v7 v7.0.17
github.com/opentracing/opentracing-go v1.2.0
github.com/panjf2000/ants/v2 v2.4.8
@ -159,14 +159,9 @@ require (
go.etcd.io/etcd/client/v2 v2.305.5 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.5 // indirect
go.etcd.io/etcd/raft/v3 v3.5.5 // indirect
go.opentelemetry.io/contrib v0.20.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0 // indirect
go.opentelemetry.io/otel v1.0.1 // indirect
go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect
go.opentelemetry.io/otel/metric v0.20.0 // indirect
go.opentelemetry.io/otel v1.0.1
go.opentelemetry.io/otel/sdk v1.0.1 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v1.0.1 // indirect
go.opentelemetry.io/proto/otlp v0.9.0 // indirect
go.uber.org/multierr v1.6.0
@ -192,9 +187,6 @@ require github.com/golang/mock v1.5.0
require (
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/spf13/cobra v1.1.3 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.1 // indirect
)

45
go.sum
View File

@ -95,8 +95,6 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@ -273,7 +271,6 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@ -494,10 +491,8 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyex
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230131100146-10289a199a32 h1:cLW/zBbJQWNtux+Cjx4RniB70iw3T5ut8/D393YNtz0=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230131100146-10289a199a32/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230213095429-2c3e821f707d h1:ib7cdknLFq99Ma1vzEnLHe0x4YPQXj+zWj/iwPwyrkM=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230213095429-2c3e821f707d/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378 h1:ttJp/ZUB/3GGbd2mIbASSfdOiBUrkP50gn5gDgCsD0g=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230220081023-2c1f3aadc378/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=
github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
@ -579,8 +574,6 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
@ -715,32 +708,21 @@ github.com/zeebo/xxh3 v1.0.1/go.mod h1:8VHV24/3AZLn3b6Mlp/KuC33LWH687Wq6EnziEB+r
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd/api/v3 v3.5.0 h1:GsV3S+OfZEOCNXdtNkBSR7kgLobAa/SO6tCxRa0GAYw=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/api/v3 v3.5.5 h1:BX4JIbQ7hl7+jL+g+2j5UAr0o1bctCm6/Ct+ArBGkf0=
go.etcd.io/etcd/api/v3 v3.5.5/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8=
go.etcd.io/etcd/client/pkg/v3 v3.5.0 h1:2aQv6F436YnN7I4VbI8PPYrBhu+SmrTaADcf8Mi/6PU=
go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/pkg/v3 v3.5.5 h1:9S0JUVvmrVl7wCF39iTQthdaaNIiAaQbmK75ogO6GU8=
go.etcd.io/etcd/client/pkg/v3 v3.5.5/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ=
go.etcd.io/etcd/client/v2 v2.305.0 h1:ftQ0nOOHMcbMS3KIaDQ0g5Qcd6bhaBrQT6b89DfwLTs=
go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ=
go.etcd.io/etcd/client/v2 v2.305.5 h1:DktRP60//JJpnPC0VBymAN/7V71GHMdjDCBt4ZPXDjI=
go.etcd.io/etcd/client/v2 v2.305.5/go.mod h1:zQjKllfqfBVyVStbt4FaosoX2iYd8fV/GRy/PbowgP4=
go.etcd.io/etcd/client/v3 v3.5.0 h1:62Eh0XOro+rDwkrypAGDfgmNh5Joq+z+W9HZdlXMzek=
go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0=
go.etcd.io/etcd/client/v3 v3.5.5 h1:q++2WTJbUgpQu4B6hCuT7VkdwaTP7Qz6Daak3WzbrlI=
go.etcd.io/etcd/client/v3 v3.5.5/go.mod h1:aApjR4WGlSumpnJ2kloS75h6aHUmAyaPLjHMxpc7E7c=
go.etcd.io/etcd/pkg/v3 v3.5.0 h1:ntrg6vvKRW26JRmHTE0iNlDgYK6JX3hg/4cD62X0ixk=
go.etcd.io/etcd/pkg/v3 v3.5.0/go.mod h1:UzJGatBQ1lXChBkQF0AuAtkRQMYnHubxAEYIrC3MSsE=
go.etcd.io/etcd/pkg/v3 v3.5.5 h1:Ablg7T7OkR+AeeeU32kdVhw/AGDsitkKPl7aW73ssjU=
go.etcd.io/etcd/pkg/v3 v3.5.5/go.mod h1:6ksYFxttiUGzC2uxyqiyOEvhAiD0tuIqSZkX3TyPdaE=
go.etcd.io/etcd/raft/v3 v3.5.0 h1:kw2TmO3yFTgE+F0mdKkG7xMxkit2duBDa2Hu6D/HMlw=
go.etcd.io/etcd/raft/v3 v3.5.0/go.mod h1:UFOHSIvO/nKwd4lhkwabrTD3cqW5yVyYYf/KlD00Szc=
go.etcd.io/etcd/raft/v3 v3.5.5 h1:Ibz6XyZ60OYyRopu73lLM/P+qco3YtlZMOhnXNS051I=
go.etcd.io/etcd/raft/v3 v3.5.5/go.mod h1:76TA48q03g1y1VpTue92jZLr9lIHKUNcYdZOOGyx8rI=
go.etcd.io/etcd/server/v3 v3.5.0 h1:jk8D/lwGEDlQU9kZXUFMSANkE22Sg5+mW27ip8xcF9E=
go.etcd.io/etcd/server/v3 v3.5.0/go.mod h1:3Ah5ruV+M+7RZr0+Y/5mNLwC+eQlni+mQmOVdCRJoS4=
go.etcd.io/etcd/server/v3 v3.5.5 h1:jNjYm/9s+f9A9r6+SC4RvNaz6AqixpOvhrFdT0PvIj0=
go.etcd.io/etcd/server/v3 v3.5.5/go.mod h1:rZ95vDw/jrvsbj9XpTqPrTAB9/kzchVdhRirySPkUBc=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
@ -750,39 +732,23 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/contrib v0.20.0 h1:ubFQUn0VCZ0gPwIoJfBJVpeBlyRMxu8Mm/huKWYd9p0=
go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 h1:sO4WKdPAudZGKPcpZT4MJn6JaDmpyLrMPDGGyA1SttE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0 h1:Wx7nFnvCaissIUZxPkBqDz2963Z+Cl+PkYbDKzTxDqQ=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0/go.mod h1:E5NNboN0UqSAki0Atn9kVwaN7I+l25gGxDqBueo/74E=
go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel v1.0.1 h1:4XKyXmfqJLOQ7feyV5DB6gsBFZ0ltB8vLtp6pj4JIcc=
go.opentelemetry.io/otel v1.0.1/go.mod h1:OPEOD4jIT2SlZPMmwT6FqZz2C0ZNdQqiWcoK6M0SNFU=
go.opentelemetry.io/otel/exporters/otlp v0.20.0 h1:PTNgq9MRmQqqJY0REVbZFvwkYOA85vbdQU/nVfxDyqg=
go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.1 h1:ofMbch7i29qIUf7VtF+r0HRF6ac0SBaPSziSsKp7wkk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.1/go.mod h1:Kv8liBeVNFkkkbilbgWRpV+wWuu+H5xdOT6HAgd30iw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.1 h1:CFMFNoz+CGprjFAFy+RJFrfEe4GBia3RRm2a4fREvCA=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.1/go.mod h1:xOvWoTOrQjxjW61xtOmD/WKGRYb/P4NzRo3bs65U6Rk=
go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8=
go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU=
go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw=
go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw=
go.opentelemetry.io/otel/sdk v0.20.0 h1:JsxtGXd06J8jrnya7fdI/U/MR6yXA5DtbZy+qoHQlr8=
go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc=
go.opentelemetry.io/otel/sdk v1.0.1 h1:wXxFEWGo7XfXupPwVJvTBOaPBC9FEg0wB8hMNrKk+cA=
go.opentelemetry.io/otel/sdk v1.0.1/go.mod h1:HrdXne+BiwsOHYYkBE5ysIcv2bvdZstxzmCQhxTcZkI=
go.opentelemetry.io/otel/sdk/export/metric v0.20.0 h1:c5VRjxCXdQlx1HjzwGdQHzZaVI82b5EbBgOu2ljD92g=
go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi+bJK+Dr8NQCh0qGhm1KDnNlE=
go.opentelemetry.io/otel/sdk/metric v0.20.0 h1:7ao1wpzHRVKf0OQ7GIxiQJA6X7DLX9o14gmVon7mMK8=
go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE=
go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
go.opentelemetry.io/otel/trace v1.0.1 h1:StTeIH6Q3G4r0Fiw34LTokUFESZgIDUr0qIJ7mKmAfw=
go.opentelemetry.io/otel/trace v1.0.1/go.mod h1:5g4i4fKLaX2BQpSBsxw8YYcgKpMMSW3x7ZTuYBr3sUk=
go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.9.0 h1:C0g6TWmQYvjKRnljRULLWUVJGy8Uvu0NEL/5frY2/t4=
go.opentelemetry.io/proto/otlp v0.9.0/go.mod h1:1vKfU9rv61e9EVGthD1zNvUbiwPcimSsOPU9brfSHJg=
@ -792,8 +758,6 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0=
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
@ -812,9 +776,7 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
@ -856,7 +818,6 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
@ -1061,7 +1022,6 @@ golang.org/x/tools v0.0.0-20190927191325-030b2cf1153e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
@ -1210,7 +1170,6 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=

View File

@ -363,7 +363,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
".ObjectType\022>\n\020object_privilege\030\002 \001(\0162$."
"milvus.proto.common.ObjectPrivilege\022\031\n\021o"
"bject_name_index\030\003 \001(\005\022\032\n\022object_name_in"
"dexs\030\004 \001(\005*\223\n\n\tErrorCode\022\013\n\007Success\020\000\022\023\n"
"dexs\030\004 \001(\005*\303\n\n\tErrorCode\022\013\n\007Success\020\000\022\023\n"
"\017UnexpectedError\020\001\022\021\n\rConnectFailed\020\002\022\024\n"
"\020PermissionDenied\020\003\022\027\n\023CollectionNotExis"
"ts\020\004\022\023\n\017IllegalArgument\020\005\022\024\n\020IllegalDime"
@ -394,97 +394,105 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"ateLimit\0201\022\022\n\016NodeIDNotMatch\0202\022\024\n\020Upsert"
"AutoIDTrue\0203\022\034\n\030InsufficientMemoryToLoad"
"\0204\022\030\n\024MemoryQuotaExhausted\0205\022\026\n\022DiskQuot"
"aExhausted\0206\022\025\n\021TimeTickLongDelay\0207\022\017\n\013D"
"ataCoordNA\020d\022\022\n\rDDRequestRace\020\350\007*c\n\nInde"
"xState\022\022\n\016IndexStateNone\020\000\022\014\n\010Unissued\020\001"
"\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n\n\006Failed"
"\020\004\022\t\n\005Retry\020\005*\202\001\n\014SegmentState\022\024\n\020Segmen"
"tStateNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022"
"\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013"
"\n\007Dropped\020\006\022\r\n\tImporting\020\007*>\n\017Placeholde"
"rType\022\010\n\004None\020\000\022\020\n\014BinaryVector\020d\022\017\n\013Flo"
"atVector\020e*\263\016\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n"
"\020CreateCollection\020d\022\022\n\016DropCollection\020e\022"
"\021\n\rHasCollection\020f\022\026\n\022DescribeCollection"
"\020g\022\023\n\017ShowCollections\020h\022\024\n\020GetSystemConf"
"igs\020i\022\022\n\016LoadCollection\020j\022\025\n\021ReleaseColl"
"ection\020k\022\017\n\013CreateAlias\020l\022\r\n\tDropAlias\020m"
"\022\016\n\nAlterAlias\020n\022\023\n\017AlterCollection\020o\022\024\n"
"\020RenameCollection\020p\022\024\n\017CreatePartition\020\310"
"\001\022\022\n\rDropPartition\020\311\001\022\021\n\014HasPartition\020\312\001"
"\022\026\n\021DescribePartition\020\313\001\022\023\n\016ShowPartitio"
"ns\020\314\001\022\023\n\016LoadPartitions\020\315\001\022\026\n\021ReleasePar"
"titions\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n\017Describ"
"eSegment\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024\n\017Releas"
"eSegments\020\375\001\022\024\n\017HandoffSegments\020\376\001\022\030\n\023Lo"
"adBalanceSegments\020\377\001\022\025\n\020DescribeSegments"
"\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\rDescribeIndex\020\255"
"\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete"
"\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022ResendSegmentStats\020\223\003"
"\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003\022\022\n\rGetI"
"ndexState\020\366\003\022\032\n\025GetIndexBuildProgress\020\367\003"
"\022\034\n\027GetCollectionStatistics\020\370\003\022\033\n\026GetPar"
"titionStatistics\020\371\003\022\r\n\010Retrieve\020\372\003\022\023\n\016Re"
"trieveResult\020\373\003\022\024\n\017WatchDmChannels\020\374\003\022\025\n"
"\020RemoveDmChannels\020\375\003\022\027\n\022WatchQueryChanne"
"ls\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022\035\n\030Seale"
"dSegmentsChangeInfo\020\200\004\022\027\n\022WatchDeltaChan"
"nels\020\201\004\022\024\n\017GetShardLeaders\020\202\004\022\020\n\013GetRepl"
"icas\020\203\004\022\023\n\016UnsubDmChannel\020\204\004\022\024\n\017GetDistr"
"ibution\020\205\004\022\025\n\020SyncDistribution\020\206\004\022\020\n\013Seg"
"mentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017GetRecov"
"eryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004\022\r\n\010Time"
"Tick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\tLoadInde"
"x\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n"
"\017AllocateSegment\020\265\t\022\026\n\021SegmentStatistics"
"\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDataNodeTt"
"\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGetCredent"
"ial\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n\020UpdateC"
"redential\020\337\013\022\026\n\021ListCredUsernames\020\340\013\022\017\n\n"
"CreateRole\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017OperateU"
"serRole\020\302\014\022\017\n\nSelectRole\020\303\014\022\017\n\nSelectUse"
"r\020\304\014\022\023\n\016SelectResource\020\305\014\022\025\n\020OperatePriv"
"ilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022\033\n\026RefreshPol"
"icyInfoCache\020\310\014\022\017\n\nListPolicy\020\311\014\022\030\n\023Crea"
"teResourceGroup\020\244\r\022\026\n\021DropResourceGroup\020"
"\245\r\022\027\n\022ListResourceGroups\020\246\r\022\032\n\025DescribeR"
"esourceGroup\020\247\r\022\021\n\014TransferNode\020\250\r\022\024\n\017Tr"
"ansferReplica\020\251\r*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\n"
"BoolExprV1\020\001*B\n\017CompactionState\022\021\n\rUndef"
"iedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tCompleted\020\002"
"*X\n\020ConsistencyLevel\022\n\n\006Strong\020\000\022\013\n\007Sess"
"ion\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventually\020\003\022\016\n\nCu"
"stomized\020\004*\213\001\n\013ImportState\022\021\n\rImportPend"
"ing\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImportStarted"
"\020\002\022\023\n\017ImportPersisted\020\005\022\023\n\017ImportComplet"
"ed\020\006\022\032\n\026ImportFailedAndCleaned\020\007*2\n\nObje"
"ctType\022\016\n\nCollection\020\000\022\n\n\006Global\020\001\022\010\n\004Us"
"er\020\002*\206\005\n\017ObjectPrivilege\022\020\n\014PrivilegeAll"
"\020\000\022\035\n\031PrivilegeCreateCollection\020\001\022\033\n\027Pri"
"vilegeDropCollection\020\002\022\037\n\033PrivilegeDescr"
"ibeCollection\020\003\022\034\n\030PrivilegeShowCollecti"
"ons\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020PrivilegeRel"
"ease\020\006\022\027\n\023PrivilegeCompaction\020\007\022\023\n\017Privi"
"legeInsert\020\010\022\023\n\017PrivilegeDelete\020\t\022\032\n\026Pri"
"vilegeGetStatistics\020\n\022\030\n\024PrivilegeCreate"
"Index\020\013\022\030\n\024PrivilegeIndexDetail\020\014\022\026\n\022Pri"
"vilegeDropIndex\020\r\022\023\n\017PrivilegeSearch\020\016\022\022"
"\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQuery\020\020\022\030"
"\n\024PrivilegeLoadBalance\020\021\022\023\n\017PrivilegeImp"
"ort\020\022\022\034\n\030PrivilegeCreateOwnership\020\023\022\027\n\023P"
"rivilegeUpdateUser\020\024\022\032\n\026PrivilegeDropOwn"
"ership\020\025\022\034\n\030PrivilegeSelectOwnership\020\026\022\034"
"\n\030PrivilegeManageOwnership\020\027\022\027\n\023Privileg"
"eSelectUser\020\030*S\n\tStateCode\022\020\n\014Initializi"
"ng\020\000\022\013\n\007Healthy\020\001\022\014\n\010Abnormal\020\002\022\013\n\007Stand"
"By\020\003\022\014\n\010Stopping\020\004*c\n\tLoadState\022\025\n\021LoadS"
"tateNotExist\020\000\022\024\n\020LoadStateNotLoad\020\001\022\024\n\020"
"LoadStateLoading\020\002\022\023\n\017LoadStateLoaded\020\003:"
"^\n\021privilege_ext_obj\022\037.google.protobuf.M"
"essageOptions\030\351\007 \001(\0132!.milvus.proto.comm"
"on.PrivilegeExtBU\n\016io.milvus.grpcB\013Commo"
"nProtoP\001Z1github.com/milvus-io/milvus-pr"
"oto/go-api/commonpb\240\001\001b\006proto3"
"aExhausted\0206\022\025\n\021TimeTickLongDelay\0207\022\021\n\rN"
"otReadyServe\0208\022\033\n\027NotReadyCoordActivatin"
"g\0209\022\017\n\013DataCoordNA\020d\022\022\n\rDDRequestRace\020\350\007"
"*c\n\nIndexState\022\022\n\016IndexStateNone\020\000\022\014\n\010Un"
"issued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n"
"\n\006Failed\020\004\022\t\n\005Retry\020\005*\202\001\n\014SegmentState\022\024"
"\n\020SegmentStateNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Gr"
"owing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flus"
"hing\020\005\022\013\n\007Dropped\020\006\022\r\n\tImporting\020\007*>\n\017Pl"
"aceholderType\022\010\n\004None\020\000\022\020\n\014BinaryVector\020"
"d\022\017\n\013FloatVector\020e*\263\016\n\007MsgType\022\r\n\tUndefi"
"ned\020\000\022\024\n\020CreateCollection\020d\022\022\n\016DropColle"
"ction\020e\022\021\n\rHasCollection\020f\022\026\n\022DescribeCo"
"llection\020g\022\023\n\017ShowCollections\020h\022\024\n\020GetSy"
"stemConfigs\020i\022\022\n\016LoadCollection\020j\022\025\n\021Rel"
"easeCollection\020k\022\017\n\013CreateAlias\020l\022\r\n\tDro"
"pAlias\020m\022\016\n\nAlterAlias\020n\022\023\n\017AlterCollect"
"ion\020o\022\024\n\020RenameCollection\020p\022\024\n\017CreatePar"
"tition\020\310\001\022\022\n\rDropPartition\020\311\001\022\021\n\014HasPart"
"ition\020\312\001\022\026\n\021DescribePartition\020\313\001\022\023\n\016Show"
"Partitions\020\314\001\022\023\n\016LoadPartitions\020\315\001\022\026\n\021Re"
"leasePartitions\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n"
"\017DescribeSegment\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024"
"\n\017ReleaseSegments\020\375\001\022\024\n\017HandoffSegments\020"
"\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025\n\020Describe"
"Segments\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\rDescrib"
"eIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013"
"\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\027\n\022ResendSegment"
"Stats\020\223\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003"
"\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBuildPro"
"gress\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033"
"\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retrieve\020"
"\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChanne"
"ls\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQue"
"ryChannels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022"
"\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchD"
"eltaChannels\020\201\004\022\024\n\017GetShardLeaders\020\202\004\022\020\n"
"\013GetReplicas\020\203\004\022\023\n\016UnsubDmChannel\020\204\004\022\024\n\017"
"GetDistribution\020\205\004\022\025\n\020SyncDistribution\020\206"
"\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017"
"GetRecoveryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004"
"\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\t"
"LoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestT"
"SO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n\021SegmentSt"
"atistics\020\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDa"
"taNodeTt\020\270\t\022\025\n\020CreateCredential\020\334\013\022\022\n\rGe"
"tCredential\020\335\013\022\025\n\020DeleteCredential\020\336\013\022\025\n"
"\020UpdateCredential\020\337\013\022\026\n\021ListCredUsername"
"s\020\340\013\022\017\n\nCreateRole\020\300\014\022\r\n\010DropRole\020\301\014\022\024\n\017"
"OperateUserRole\020\302\014\022\017\n\nSelectRole\020\303\014\022\017\n\nS"
"electUser\020\304\014\022\023\n\016SelectResource\020\305\014\022\025\n\020Ope"
"ratePrivilege\020\306\014\022\020\n\013SelectGrant\020\307\014\022\033\n\026Re"
"freshPolicyInfoCache\020\310\014\022\017\n\nListPolicy\020\311\014"
"\022\030\n\023CreateResourceGroup\020\244\r\022\026\n\021DropResour"
"ceGroup\020\245\r\022\027\n\022ListResourceGroups\020\246\r\022\032\n\025D"
"escribeResourceGroup\020\247\r\022\021\n\014TransferNode\020"
"\250\r\022\024\n\017TransferReplica\020\251\r*\"\n\007DslType\022\007\n\003D"
"sl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017CompactionState\022"
"\021\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tCom"
"pleted\020\002*X\n\020ConsistencyLevel\022\n\n\006Strong\020\000"
"\022\013\n\007Session\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventually"
"\020\003\022\016\n\nCustomized\020\004*\213\001\n\013ImportState\022\021\n\rIm"
"portPending\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImpor"
"tStarted\020\002\022\023\n\017ImportPersisted\020\005\022\023\n\017Impor"
"tCompleted\020\006\022\032\n\026ImportFailedAndCleaned\020\007"
"*2\n\nObjectType\022\016\n\nCollection\020\000\022\n\n\006Global"
"\020\001\022\010\n\004User\020\002*\202\007\n\017ObjectPrivilege\022\020\n\014Priv"
"ilegeAll\020\000\022\035\n\031PrivilegeCreateCollection\020"
"\001\022\033\n\027PrivilegeDropCollection\020\002\022\037\n\033Privil"
"egeDescribeCollection\020\003\022\034\n\030PrivilegeShow"
"Collections\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Priv"
"ilegeRelease\020\006\022\027\n\023PrivilegeCompaction\020\007\022"
"\023\n\017PrivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020"
"\t\022\032\n\026PrivilegeGetStatistics\020\n\022\030\n\024Privile"
"geCreateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020"
"\014\022\026\n\022PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSe"
"arch\020\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQ"
"uery\020\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Priv"
"ilegeImport\020\022\022\034\n\030PrivilegeCreateOwnershi"
"p\020\023\022\027\n\023PrivilegeUpdateUser\020\024\022\032\n\026Privileg"
"eDropOwnership\020\025\022\034\n\030PrivilegeSelectOwner"
"ship\020\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023"
"PrivilegeSelectUser\020\030\022 \n\034PrivilegeCreate"
"ResourceGroup\020\032\022\036\n\032PrivilegeDropResource"
"Group\020\033\022\"\n\036PrivilegeDescribeResourceGrou"
"p\020\034\022\037\n\033PrivilegeListResourceGroups\020\035\022\031\n\025"
"PrivilegeTransferNode\020\036\022\034\n\030PrivilegeTran"
"sferReplica\020\037\022\037\n\033PrivilegeGetLoadingProg"
"ress\020 \022\031\n\025PrivilegeGetLoadState\020!*S\n\tSta"
"teCode\022\020\n\014Initializing\020\000\022\013\n\007Healthy\020\001\022\014\n"
"\010Abnormal\020\002\022\013\n\007StandBy\020\003\022\014\n\010Stopping\020\004*c"
"\n\tLoadState\022\025\n\021LoadStateNotExist\020\000\022\024\n\020Lo"
"adStateNotLoad\020\001\022\024\n\020LoadStateLoading\020\002\022\023"
"\n\017LoadStateLoaded\020\003:^\n\021privilege_ext_obj"
"\022\037.google.protobuf.MessageOptions\030\351\007 \001(\013"
"2!.milvus.proto.common.PrivilegeExtBU\n\016i"
"o.milvus.grpcB\013CommonProtoP\001Z1github.com"
"/milvus-io/milvus-proto/go-api/commonpb\240"
"\001\001b\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
&::descriptor_table_google_2fprotobuf_2fdescriptor_2eproto,
@ -505,7 +513,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5790,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 6090,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 11, 1,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 11, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
@ -577,6 +585,8 @@ bool ErrorCode_IsValid(int value) {
case 53:
case 54:
case 55:
case 56:
case 57:
case 100:
case 1000:
return true;
@ -848,6 +858,14 @@ bool ObjectPrivilege_IsValid(int value) {
case 22:
case 23:
case 24:
case 26:
case 27:
case 28:
case 29:
case 30:
case 31:
case 32:
case 33:
return true;
default:
return false;

View File

@ -168,6 +168,8 @@ enum ErrorCode : int {
MemoryQuotaExhausted = 53,
DiskQuotaExhausted = 54,
TimeTickLongDelay = 55,
NotReadyServe = 56,
NotReadyCoordActivating = 57,
DataCoordNA = 100,
DDRequestRace = 1000,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
@ -549,12 +551,20 @@ enum ObjectPrivilege : int {
PrivilegeSelectOwnership = 22,
PrivilegeManageOwnership = 23,
PrivilegeSelectUser = 24,
PrivilegeCreateResourceGroup = 26,
PrivilegeDropResourceGroup = 27,
PrivilegeDescribeResourceGroup = 28,
PrivilegeListResourceGroups = 29,
PrivilegeTransferNode = 30,
PrivilegeTransferReplica = 31,
PrivilegeGetLoadingProgress = 32,
PrivilegeGetLoadState = 33,
ObjectPrivilege_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
ObjectPrivilege_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool ObjectPrivilege_IsValid(int value);
constexpr ObjectPrivilege ObjectPrivilege_MIN = PrivilegeAll;
constexpr ObjectPrivilege ObjectPrivilege_MAX = PrivilegeSelectUser;
constexpr ObjectPrivilege ObjectPrivilege_MAX = PrivilegeGetLoadState;
constexpr int ObjectPrivilege_ARRAYSIZE = ObjectPrivilege_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ObjectPrivilege_descriptor();

View File

@ -4838,7 +4838,7 @@ func getErrResponse(err error, method string) *commonpb.Status {
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.QueryCoordCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
ErrorCode: commonpb.ErrorCode_IllegalArgument,
Reason: err.Error(),
}
}
@ -4849,13 +4849,6 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop
}
method := "DropResourceGroup"
if err := ValidateResourceGroupName(request.GetResourceGroup()); err != nil {
log.Warn("DropResourceGroup failed",
zap.Error(err),
)
return getErrResponse(err, method), nil
}
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropResourceGroup")
defer sp.Finish()
tr := timerecord.NewTimeRecorder(method)

View File

@ -254,7 +254,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
ResourceGroup: "...",
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
})
t.Run("drop resource group", func(t *testing.T) {
@ -262,7 +262,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
ResourceGroup: "...",
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("transfer node", func(t *testing.T) {
@ -272,7 +272,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
NumNode: 1,
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
})
t.Run("transfer replica", func(t *testing.T) {
@ -283,7 +283,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
CollectionName: "collection1",
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
})
}

View File

@ -132,3 +132,64 @@ func TestPrivilegeInterceptor(t *testing.T) {
})
}
func TestResourceGroupPrivilege(t *testing.T) {
ctx := context.Background()
t.Run("Resource Group Privilege", func(t *testing.T) {
Params.CommonCfg.AuthorizationEnabled = true
_, err := PrivilegeInterceptor(ctx, &milvuspb.ListResourceGroupsRequest{})
assert.NotNil(t, err)
ctx = GetContext(context.Background(), "fooo:123456")
client := &MockRootCoordClientInterface{}
queryCoord := &MockQueryCoordClientInterface{}
mgr := newShardClientMgr()
client.listPolicy = func(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
return &internalpb.ListPolicyResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
PolicyInfos: []string{
funcutil.PolicyForPrivilege("role1", commonpb.ObjectType_Global.String(), "*", commonpb.ObjectPrivilege_PrivilegeCreateResourceGroup.String()),
funcutil.PolicyForPrivilege("role1", commonpb.ObjectType_Global.String(), "*", commonpb.ObjectPrivilege_PrivilegeDropResourceGroup.String()),
funcutil.PolicyForPrivilege("role1", commonpb.ObjectType_Global.String(), "*", commonpb.ObjectPrivilege_PrivilegeDescribeResourceGroup.String()),
funcutil.PolicyForPrivilege("role1", commonpb.ObjectType_Global.String(), "*", commonpb.ObjectPrivilege_PrivilegeListResourceGroups.String()),
funcutil.PolicyForPrivilege("role1", commonpb.ObjectType_Global.String(), "*", commonpb.ObjectPrivilege_PrivilegeTransferNode.String()),
funcutil.PolicyForPrivilege("role1", commonpb.ObjectType_Global.String(), "*", commonpb.ObjectPrivilege_PrivilegeTransferReplica.String()),
},
UserRoles: []string{
funcutil.EncodeUserRoleCache("fooo", "role1"),
},
}, nil
}
InitMetaCache(ctx, client, queryCoord, mgr)
_, err = PrivilegeInterceptor(GetContext(context.Background(), "fooo:123456"), &milvuspb.CreateResourceGroupRequest{
ResourceGroup: "rg",
})
assert.Nil(t, err)
_, err = PrivilegeInterceptor(GetContext(context.Background(), "fooo:123456"), &milvuspb.DropResourceGroupRequest{
ResourceGroup: "rg",
})
assert.Nil(t, err)
_, err = PrivilegeInterceptor(GetContext(context.Background(), "fooo:123456"), &milvuspb.DescribeResourceGroupRequest{
ResourceGroup: "rg",
})
assert.Nil(t, err)
_, err = PrivilegeInterceptor(GetContext(context.Background(), "fooo:123456"), &milvuspb.ListResourceGroupsRequest{})
assert.Nil(t, err)
_, err = PrivilegeInterceptor(GetContext(context.Background(), "fooo:123456"), &milvuspb.TransferNodeRequest{})
assert.Nil(t, err)
_, err = PrivilegeInterceptor(GetContext(context.Background(), "fooo:123456"), &milvuspb.TransferReplicaRequest{})
assert.Nil(t, err)
})
}

View File

@ -1359,12 +1359,13 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) {
lct.Base,
commonpbutil.WithMsgType(commonpb.MsgType_LoadCollection),
),
DbID: 0,
CollectionID: collID,
Schema: collSchema,
ReplicaNumber: lct.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
Refresh: lct.Refresh,
DbID: 0,
CollectionID: collID,
Schema: collSchema,
ReplicaNumber: lct.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
Refresh: lct.Refresh,
ResourceGroups: lct.ResourceGroups,
}
log.Info("send LoadCollectionRequest to query coordinator", zap.String("role", typeutil.ProxyRole),
zap.Int64("msgID", request.Base.MsgID), zap.Int64("collectionID", request.CollectionID),
@ -1581,13 +1582,14 @@ func (lpt *loadPartitionsTask) Execute(ctx context.Context) error {
lpt.Base,
commonpbutil.WithMsgType(commonpb.MsgType_LoadPartitions),
),
DbID: 0,
CollectionID: collID,
PartitionIDs: partitionIDs,
Schema: collSchema,
ReplicaNumber: lpt.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
Refresh: lpt.Refresh,
DbID: 0,
CollectionID: collID,
PartitionIDs: partitionIDs,
Schema: collSchema,
ReplicaNumber: lpt.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
Refresh: lpt.Refresh,
ResourceGroups: lpt.ResourceGroups,
}
lpt.result, err = lpt.queryCoord.LoadPartitions(ctx, request)
return err

View File

@ -112,7 +112,7 @@ func validateCollectionNameOrAlias(entity, entityType string) error {
func ValidateResourceGroupName(entity string) error {
if entity == "" {
return fmt.Errorf("resource group name %s should not be empty", entity)
return errors.New("resource group name couldn't be empty")
}
invalidMsg := fmt.Sprintf("Invalid resource group name %s.", entity)

View File

@ -167,12 +167,6 @@ func (job *LoadCollectionJob) PreExecute() error {
return ErrCollectionLoaded
}
if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) {
msg := "no enough nodes to create replicas"
log.Warn(msg)
return utils.WrapError(msg, ErrNoEnoughNode)
}
return nil
}
@ -381,12 +375,6 @@ func (job *LoadPartitionJob) PreExecute() error {
return ErrCollectionLoaded
}
if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) {
msg := "no enough nodes to create replicas"
log.Warn(msg)
return utils.WrapError(msg, ErrNoEnoughNode)
}
return nil
}

View File

@ -343,7 +343,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrNoEnoughNode)
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
}
}
@ -605,7 +605,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, ErrNoEnoughNode)
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
}
}

View File

@ -51,6 +51,7 @@ var (
)
var DefaultResourceGroupName = "__default_resource_group"
var DefaultResourceGroupCapacity = 1000000
type ResourceGroup struct {
nodes UniqueSet
@ -67,53 +68,30 @@ func NewResourceGroup(capacity int) *ResourceGroup {
}
// assign node to resource group
func (rg *ResourceGroup) assignNode(id int64) error {
func (rg *ResourceGroup) assignNode(id int64, deltaCapacity int) error {
if rg.containsNode(id) {
return ErrNodeAlreadyAssign
}
rg.nodes.Insert(id)
rg.capacity++
rg.capacity += deltaCapacity
return nil
}
// unassign node from resource group
func (rg *ResourceGroup) unassignNode(id int64) error {
func (rg *ResourceGroup) unassignNode(id int64, deltaCapacity int) error {
if !rg.containsNode(id) {
// remove non exist node should be tolerable
return nil
}
rg.nodes.Remove(id)
rg.capacity--
rg.capacity += deltaCapacity
return nil
}
func (rg *ResourceGroup) handleNodeUp(id int64) error {
if rg.LackOfNodes() == 0 {
return ErrRGIsFull
}
if rg.containsNode(id) {
return ErrNodeAlreadyAssign
}
rg.nodes.Insert(id)
return nil
}
func (rg *ResourceGroup) handleNodeDown(id int64) error {
if !rg.containsNode(id) {
// remove non exist node should be tolerable
return nil
}
rg.nodes.Remove(id)
return nil
}
func (rg *ResourceGroup) LackOfNodes() int {
return rg.capacity - len(rg.nodes)
}
@ -140,7 +118,7 @@ type ResourceManager struct {
func NewResourceManager(store Store, nodeMgr *session.NodeManager) *ResourceManager {
groupMap := make(map[string]*ResourceGroup)
groupMap[DefaultResourceGroupName] = NewResourceGroup(0)
groupMap[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupCapacity)
return &ResourceManager{
groups: groupMap,
store: store,
@ -240,9 +218,14 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error {
newNodes := rm.groups[rgName].GetNodes()
newNodes = append(newNodes, node)
deltaCapacity := 1
if rgName == DefaultResourceGroupName {
// default rg capacity won't be changed
deltaCapacity = 0
}
err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: int32(rm.groups[rgName].GetCapacity()) + 1,
Capacity: int32(rm.groups[rgName].GetCapacity() + deltaCapacity),
Nodes: newNodes,
})
if err != nil {
@ -254,7 +237,7 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error {
return err
}
err = rm.groups[rgName].assignNode(node)
err = rm.groups[rgName].assignNode(node, deltaCapacity)
if err != nil {
return err
}
@ -289,7 +272,7 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
return ErrRGNotExist
}
if rm.nodeMgr.Get(node) == nil {
if rm.nodeMgr.Get(node) == nil || !rm.groups[rgName].containsNode(node) {
// remove non exist node should be tolerable
return nil
}
@ -301,9 +284,15 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
}
}
deltaCapacity := -1
if rgName == DefaultResourceGroupName {
// default rg capacity won't be changed
deltaCapacity = 0
}
err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{
Name: rgName,
Capacity: int32(rm.groups[rgName].GetCapacity()) - 1,
Capacity: int32(rm.groups[rgName].GetCapacity() + deltaCapacity),
Nodes: newNodes,
})
if err != nil {
@ -316,7 +305,7 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error {
}
rm.checkRGNodeStatus(rgName)
err = rm.groups[rgName].unassignNode(node)
err = rm.groups[rgName].unassignNode(node, deltaCapacity)
if err != nil {
return err
}
@ -460,8 +449,8 @@ func (rm *ResourceManager) HandleNodeUp(node int64) (string, error) {
}
// assign new node to default rg
rm.groups[DefaultResourceGroupName].assignNode(node)
log.Info("HandleNodeUp: assign node to default resource group",
rm.groups[DefaultResourceGroupName].assignNode(node, 0)
log.Info("HandleNodeUp: add node to default resource group",
zap.String("rgName", DefaultResourceGroupName),
zap.Int64("node", node),
)
@ -482,13 +471,13 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) {
zap.String("rgName", rgName),
zap.Int64("node", node),
)
return rgName, rm.groups[rgName].handleNodeDown(node)
return rgName, rm.groups[rgName].unassignNode(node, 0)
}
return "", ErrNodeNotAssignToRG
}
func (rm *ResourceManager) TransferNode(from, to string) error {
func (rm *ResourceManager) TransferNode(from string, to string, numNode int) error {
rm.rwmutex.Lock()
defer rm.rwmutex.Unlock()
@ -496,57 +485,87 @@ func (rm *ResourceManager) TransferNode(from, to string) error {
return ErrRGNotExist
}
if len(rm.groups[from].nodes) == 0 {
return ErrRGIsEmpty
}
rm.checkRGNodeStatus(from)
rm.checkRGNodeStatus(to)
if len(rm.groups[from].nodes) < numNode {
return ErrNodeNotEnough
}
//todo: a better way to choose a node with least balance cost
node := rm.groups[from].GetNodes()[0]
if err := rm.transferNodeInStore(from, to, node); err != nil {
movedNodes, err := rm.transferNodeInStore(from, to, numNode)
if err != nil {
return err
}
err := rm.groups[from].unassignNode(node)
if err != nil {
// interrupt transfer, unreachable logic path
return err
deltaFromCapacity := -1
if from == DefaultResourceGroupName {
deltaFromCapacity = 0
}
deltaToCapacity := 1
if to == DefaultResourceGroupName {
deltaToCapacity = 0
}
err = rm.groups[to].assignNode(node)
if err != nil {
// interrupt transfer, unreachable logic path
return err
for _, node := range movedNodes {
err := rm.groups[from].unassignNode(node, deltaFromCapacity)
if err != nil {
// interrupt transfer, unreachable logic path
return err
}
err = rm.groups[to].assignNode(node, deltaToCapacity)
if err != nil {
// interrupt transfer, unreachable logic path
return err
}
}
return nil
}
func (rm *ResourceManager) transferNodeInStore(from string, to string, node int64) error {
func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode int) ([]int64, error) {
availableNodes := rm.groups[from].GetNodes()
if len(availableNodes) < numNode {
return nil, ErrNodeNotEnough
}
movedNodes := make([]int64, 0, numNode)
fromNodeList := make([]int64, 0)
for nid := range rm.groups[from].nodes {
if nid != node {
fromNodeList = append(fromNodeList, nid)
toNodeList := rm.groups[to].GetNodes()
for i := 0; i < len(availableNodes); i++ {
if i < numNode {
movedNodes = append(movedNodes, availableNodes[i])
toNodeList = append(toNodeList, availableNodes[i])
} else {
fromNodeList = append(fromNodeList, availableNodes[i])
}
}
toNodeList := rm.groups[to].GetNodes()
toNodeList = append(toNodeList, node)
fromCapacity := rm.groups[from].GetCapacity()
if from != DefaultResourceGroupName {
// default rg capacity won't be changed
fromCapacity = rm.groups[from].GetCapacity() - numNode
}
fromRG := &querypb.ResourceGroup{
Name: from,
Capacity: int32(rm.groups[from].GetCapacity()) - 1,
Capacity: int32(fromCapacity),
Nodes: fromNodeList,
}
toCapacity := rm.groups[to].GetCapacity()
if from != DefaultResourceGroupName {
// default rg capacity won't be changed
toCapacity = rm.groups[to].GetCapacity() + numNode
}
toRG := &querypb.ResourceGroup{
Name: to,
Capacity: int32(rm.groups[to].GetCapacity()) + 1,
Capacity: int32(toCapacity),
Nodes: toNodeList,
}
return rm.store.SaveResourceGroup(fromRG, toRG)
return movedNodes, rm.store.SaveResourceGroup(fromRG, toRG)
}
// auto recover rg, return recover used node num
@ -570,7 +589,7 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error)
return i + 1, err
}
err = rm.groups[rgName].handleNodeUp(node)
err = rm.groups[rgName].assignNode(node, 0)
if err != nil {
// roll back, unreachable logic path
rm.assignNode(DefaultResourceGroupName, node)
@ -589,10 +608,18 @@ func (rm *ResourceManager) Recover() error {
}
for _, rg := range rgs {
rm.groups[rg.GetName()] = NewResourceGroup(0)
for _, node := range rg.GetNodes() {
rm.groups[rg.GetName()].assignNode(node)
if rg.GetName() == DefaultResourceGroupName {
rm.groups[rg.GetName()] = NewResourceGroup(DefaultResourceGroupCapacity)
for _, node := range rg.GetNodes() {
rm.groups[rg.GetName()].assignNode(node, 0)
}
} else {
rm.groups[rg.GetName()] = NewResourceGroup(0)
for _, node := range rg.GetNodes() {
rm.groups[rg.GetName()].assignNode(node, 1)
}
}
rm.checkRGNodeStatus(rg.GetName())
log.Info("Recover resource group",
zap.String("rgName", rg.GetName()),
@ -613,7 +640,7 @@ func (rm *ResourceManager) checkRGNodeStatus(rgName string) {
zap.Int64("nodeID", node),
)
rm.groups[rgName].handleNodeDown(node)
rm.groups[rgName].unassignNode(node, 0)
}
}
}

View File

@ -111,16 +111,37 @@ func (suite *ResourceManagerSuite) TestManipulateNode() {
err = suite.manager.AssignNode("rg1", 1)
suite.NoError(err)
err = suite.manager.AssignNode("rg2", 1)
println(err.Error())
suite.ErrorIs(err, ErrNodeAlreadyAssign)
// transfer node between rgs
err = suite.manager.TransferNode("rg1", "rg2")
err = suite.manager.TransferNode("rg1", "rg2", 1)
suite.NoError(err)
// transfer meet non exist rg
err = suite.manager.TransferNode("rgggg", "rg2")
err = suite.manager.TransferNode("rgggg", "rg2", 1)
suite.ErrorIs(err, ErrRGNotExist)
err = suite.manager.TransferNode("rg1", "rg2", 5)
suite.ErrorIs(err, ErrNodeNotEnough)
suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(12, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(13, "localhost"))
suite.manager.nodeMgr.Add(session.NewNodeInfo(14, "localhost"))
suite.manager.AssignNode("rg1", 11)
suite.manager.AssignNode("rg1", 12)
suite.manager.AssignNode("rg1", 13)
suite.manager.AssignNode("rg1", 14)
rg1, err := suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
rg2, err := suite.manager.GetResourceGroup("rg2")
suite.NoError(err)
suite.Equal(rg1.GetCapacity(), 4)
suite.Equal(rg2.GetCapacity(), 1)
suite.manager.TransferNode("rg1", "rg2", 3)
suite.Equal(rg1.GetCapacity(), 1)
suite.Equal(rg2.GetCapacity(), 4)
}
func (suite *ResourceManagerSuite) TestHandleNodeUp() {
@ -153,14 +174,14 @@ func (suite *ResourceManagerSuite) TestHandleNodeUp() {
suite.NoError(err)
defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(0, defaultRG.GetCapacity())
suite.Equal(DefaultResourceGroupCapacity, defaultRG.GetCapacity())
suite.manager.HandleNodeUp(101)
rg, err = suite.manager.GetResourceGroup("rg1")
suite.NoError(err)
suite.Equal(rg.GetCapacity(), 3)
suite.Equal(len(rg.GetNodes()), 2)
suite.False(suite.manager.ContainsNode("rg1", 101))
suite.Equal(1, defaultRG.GetCapacity())
suite.Equal(DefaultResourceGroupCapacity, defaultRG.GetCapacity())
}
func (suite *ResourceManagerSuite) TestRecover() {
@ -283,6 +304,42 @@ func (suite *ResourceManagerSuite) TestAutoRecover() {
suite.Equal(lackNodes, 0)
}
func (suite *ResourceManagerSuite) TestDefaultResourceGroup() {
for i := 0; i < 10; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(int64(i), "localhost"))
}
defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 0)
suite.manager.HandleNodeUp(1)
suite.manager.HandleNodeUp(2)
suite.manager.HandleNodeUp(3)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 3)
// shutdown node 1 and 2
suite.manager.nodeMgr.Remove(1)
suite.manager.nodeMgr.Remove(2)
defaultRG, err = suite.manager.GetResourceGroup(DefaultResourceGroupName)
suite.NoError(err)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 1)
suite.manager.HandleNodeUp(4)
suite.manager.HandleNodeUp(5)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 3)
suite.manager.HandleNodeUp(7)
suite.manager.HandleNodeUp(8)
suite.manager.HandleNodeUp(9)
suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity)
suite.Len(defaultRG.GetNodes(), 6)
}
func (suite *ResourceManagerSuite) TearDownSuite() {
suite.kv.Close()
}

View File

@ -209,13 +209,14 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int32("replicaNumber", req.GetReplicaNumber()),
zap.Strings("resourceGroups", req.GetResourceGroups()),
zap.Bool("refreshMode", req.GetRefresh()),
)
log.Info("load collection request received",
zap.Any("schema", req.Schema),
zap.Int32("replicaNumber", req.ReplicaNumber),
zap.Int64s("fieldIndexes", lo.Values(req.GetFieldIndexID())),
zap.Bool("refreshMode", req.GetRefresh()),
)
metrics.QueryCoordLoadCount.WithLabelValues(metrics.TotalLabel).Inc()
@ -310,7 +311,6 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
log.Info("received load partitions request",
zap.Any("schema", req.Schema),
zap.Int32("replicaNumber", req.ReplicaNumber),
zap.Int64s("partitions", req.GetPartitionIDs()))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.TotalLabel).Inc()
@ -357,7 +357,7 @@ func (s *Server) checkResourceGroup(collectionID int64, resourceGroups []string)
if len(resourceGroups) != 0 {
collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(collectionID)
for _, rgName := range resourceGroups {
if !collectionUsedRG.Contain(rgName) {
if len(collectionUsedRG) > 0 && !collectionUsedRG.Contain(rgName) {
return ErrLoadUseWrongRG
}
}
@ -1016,6 +1016,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
log := log.Ctx(ctx).With(
zap.String("source", req.GetSourceResourceGroup()),
zap.String("target", req.GetTargetResourceGroup()),
zap.Int32("numNode", req.GetNumNode()),
)
log.Info("transfer node between resource group request received")
@ -1034,7 +1035,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
}
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup())
err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode()))
if err != nil {
log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), err), nil
@ -1066,12 +1067,19 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil
}
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup())
if len(replicas) > 0 {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported",
len(replicas), req.GetSourceResourceGroup())), nil
}
// for now, we don't support to transfer replica of same collection to same resource group
replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup())
replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup())
if len(replicas) < int(req.GetNumReplica()) {
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument,
fmt.Sprintf("found [%d] replicas of collection[%d] in source resource group[%s]",
len(replicas), req.GetCollectionID(), req.GetSourceResourceGroup())), nil
fmt.Sprintf("only found [%d] replicas in source resource group[%s]",
len(replicas), req.GetSourceResourceGroup())), nil
}
err := s.transferReplica(req.GetTargetResourceGroup(), replicas[:req.GetNumReplica()])

View File

@ -473,6 +473,7 @@ func (suite *ServiceSuite) TestTransferNode() {
resp, err := server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg1",
NumNode: 1,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
@ -498,6 +499,40 @@ func (suite *ServiceSuite) TestTransferNode() {
suite.Contains(resp.Reason, meta.ErrRGNotExist.Error())
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
err = server.meta.ResourceManager.AddResourceGroup("rg3")
suite.NoError(err)
err = server.meta.ResourceManager.AddResourceGroup("rg4")
suite.NoError(err)
suite.nodeMgr.Add(session.NewNodeInfo(11, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(12, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(13, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(14, "localhost"))
suite.meta.ResourceManager.AssignNode("rg3", 11)
suite.meta.ResourceManager.AssignNode("rg3", 12)
suite.meta.ResourceManager.AssignNode("rg3", 13)
suite.meta.ResourceManager.AssignNode("rg3", 14)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: "rg3",
TargetResourceGroup: "rg4",
NumNode: 3,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
nodes, err = server.meta.ResourceManager.GetNodes("rg3")
suite.NoError(err)
suite.Len(nodes, 1)
nodes, err = server.meta.ResourceManager.GetNodes("rg4")
suite.NoError(err)
suite.Len(nodes, 3)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
SourceResourceGroup: "rg3",
TargetResourceGroup: "rg4",
NumNode: 3,
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
// server unhealthy
server.status.Store(commonpb.StateCode_Abnormal)
resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{
@ -526,7 +561,7 @@ func (suite *ServiceSuite) TestTransferReplica() {
NumReplica: 2,
})
suite.NoError(err)
suite.Contains(resp.Reason, "found [0] replicas of collection[1] in source resource group")
suite.Contains(resp.Reason, "only found [0] replicas in source resource group")
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: "rgg",
@ -556,6 +591,11 @@ func (suite *ServiceSuite) TestTransferReplica() {
ID: 222,
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(2)))
suite.server.meta.Put(meta.NewReplica(&querypb.Replica{
CollectionID: 1,
ID: 333,
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(3)))
suite.server.nodeMgr.Add(session.NewNodeInfo(1001, "localhost"))
suite.server.nodeMgr.Add(session.NewNodeInfo(1002, "localhost"))
@ -576,6 +616,14 @@ func (suite *ServiceSuite) TestTransferReplica() {
suite.NoError(err)
suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success)
suite.Len(suite.server.meta.GetByResourceGroup("rg3"), 2)
resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{
SourceResourceGroup: meta.DefaultResourceGroupName,
TargetResourceGroup: "rg3",
CollectionID: 1,
NumReplica: 2,
})
suite.NoError(err)
suite.Contains(resp.Reason, "dynamically increase replica num is unsupported")
// server unhealthy
server.status.Store(commonpb.StateCode_Abnormal)

View File

@ -33,7 +33,7 @@ var (
ErrGetNodesFromRG = errors.New("failed to get node from rg")
ErrNoReplicaFound = errors.New("no replica found during assign nodes")
ErrReplicasInconsistent = errors.New("all replicas should belong to same collection during assign nodes")
ErrUseWrongNumRG = errors.New("resource num can only be 0, 1 or same as replica number")
ErrUseWrongNumRG = errors.New("resource group num can only be 0, 1 or same as replica number")
)
func GetReplicaNodesInfo(replicaMgr *meta.ReplicaManager, nodeMgr *session.NodeManager, replicaID int64) []*session.NodeInfo {

View File

@ -89,6 +89,13 @@ var (
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeDropOwnership.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeSelectOwnership.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeManageOwnership.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeCreateResourceGroup.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeDropResourceGroup.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeDescribeResourceGroup.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeListResourceGroups.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeTransferReplica.String()),
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeTransferNode.String()),
},
commonpb.ObjectType_User.String(): {
MetaStore2API(commonpb.ObjectPrivilege_PrivilegeUpdateUser.String()),

View File

@ -953,7 +953,7 @@ func (p *queryCoordConfig) initCheckNodeInReplicaInterval() {
}
func (p *queryCoordConfig) initCheckResourceGroupInterval() {
interval := p.Base.LoadWithDefault("queryCoord.checkResourceGroupInterval", "60")
interval := p.Base.LoadWithDefault("queryCoord.checkResourceGroupInterval", "10")
checkResourceGroupInterval, err := strconv.ParseInt(interval, 10, 64)
if err != nil {
panic(err)

View File

@ -254,7 +254,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, float64(300), Params.NextTargetSurviveTime.Seconds())
assert.Equal(t, float64(30), Params.UpdateNextTargetInterval.Seconds())
assert.Equal(t, float64(60), Params.CheckNodeInReplicaInterval.Seconds())
assert.Equal(t, float64(60), Params.CheckResourceGroupInterval.Seconds())
assert.Equal(t, float64(10), Params.CheckResourceGroupInterval.Seconds())
assert.Equal(t, true, Params.EnableRGAutoRecover)
})

View File

@ -14,6 +14,8 @@ from utils.util_log import test_log as log
from common import common_func as cf
from common import common_type as ct
from pymilvus import ResourceGroupInfo
class Base:
""" Initialize class object """
@ -25,6 +27,7 @@ class Base:
collection_schema_wrap = None
field_schema_wrap = None
collection_object_list = []
resource_group_list = []
def setup_class(self):
log.info("[setup_class] Start setup class...")
@ -64,6 +67,16 @@ class Base:
if collection_object.collection is not None and collection_object.name in collection_list:
collection_object.drop(check_task=ct.CheckTasks.check_nothing)
""" Clean up the rgs before disconnect """
for rg_name in self.resource_group_list:
rg = self.utility_wrap.describe_resource_group(name=rg_name, check_task=ct.CheckTasks.check_nothing)[0]
if isinstance(rg, ResourceGroupInfo):
if rg.num_available_node > 0:
self.utility_wrap.transfer_node(source=rg_name,
target=ct.default_resource_group_name,
num_node=rg.num_available_node)
self.utility_wrap.drop_resource_group(rg_name, check_task=ct.CheckTasks.check_nothing)
except Exception as e:
log.debug(str(e))
@ -262,3 +275,14 @@ class TestcaseBase(Base):
collection_w.insert(df)
assert collection_w.num_entities == nb_of_segment * (i + 1)
return collection_w
def init_resource_group(self, name, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
if not self.connection_wrap.has_connection(alias=DefaultConfig.DEFAULT_USING)[0]:
self._connect()
utility_w = ApiUtilityWrapper()
res, check_result = utility_w.create_resource_group(name=name, using=using, timeout=timeout,
check_task=check_task,
check_items=check_items, **kwargs)
if res is None and check_result:
self.resource_group_list.append(name)
return res, check_result

View File

@ -467,4 +467,41 @@ class ApiUtilityWrapper:
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.role.list_grants], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
return res, check_result
def create_resource_group(self, name, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.create_resource_group, name, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def drop_resource_group(self, name, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.drop_resource_group, name, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def list_resource_groups(self, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.list_resource_groups, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def describe_resource_group(self, name, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.describe_resource_group, name, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def transfer_node(self, source, target, num_node, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.transfer_node, source, target, num_node, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def transfer_replica(self, source, target, collection_name, num_replica, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.transfer_replica, source, target, collection_name,num_replica, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result

View File

@ -6,7 +6,7 @@ from common import common_type as ct
from common import common_func as cf
from common.common_type import CheckTasks, Connect_Object_Name
# from common.code_mapping import ErrorCode, ErrorMessage
from pymilvus import Collection, Partition
from pymilvus import Collection, Partition, ResourceGroupInfo
from utils.api_request import Error
import check.param_check as pc
@ -83,6 +83,9 @@ class ResponseChecker:
elif self.check_task == CheckTasks.check_permission_deny:
# Collection interface response check
result = self.check_permission_deny(self.response, self.succ)
elif self.check_task == CheckTasks.check_rg_property:
# describe resource group interface response check
result = self.check_rg_property(self.response, self.func_name, self.check_items)
# Add check_items here if something new need verify
@ -198,6 +201,29 @@ class ResponseChecker:
assert partition.num_entities == check_items["num_entities"]
return True
@staticmethod
def check_rg_property(rg, func_name, check_items):
exp_func_name = "describe_resource_group"
if func_name != exp_func_name:
log.warning("The function name is {} rather than {}".format(func_name, exp_func_name))
if not isinstance(rg, ResourceGroupInfo):
raise Exception("The result to check isn't ResourceGroupInfo type object")
if len(check_items) == 0:
raise Exception("No expect values found in the check task")
if check_items.get("name", None):
assert rg.name == check_items["name"]
if check_items.get("capacity", None):
assert rg.capacity == check_items["capacity"]
if check_items.get("num_available_node", None):
assert rg.num_available_node == check_items["num_available_node"]
if check_items.get("num_loaded_replica", None):
assert dict(rg.num_loaded_replica).items() >= check_items["num_loaded_replica"].items()
if check_items.get("num_outgoing_node", None):
assert dict(rg.num_outgoing_node).items() >= check_items["num_outgoing_node"].items()
if check_items.get("num_incoming_node", None):
assert dict(rg.num_incoming_node).items() >= check_items["num_incoming_node"].items()
return True
@staticmethod
def check_search_results(search_res, func_name, check_items):
"""

View File

@ -322,10 +322,10 @@ def gen_default_binary_dataframe_data(nb=ct.default_nb, dim=ct.default_dim, star
return df, binary_raw_values
def gen_default_list_data(nb=ct.default_nb, dim=ct.default_dim):
int_values = [i for i in range(nb)]
float_values = [np.float32(i) for i in range(nb)]
string_values = [str(i) for i in range(nb)]
def gen_default_list_data(nb=ct.default_nb, dim=ct.default_dim, start=0):
int_values = [i for i in range(start, start + nb)]
float_values = [np.float32(i) for i in range(start, start + nb)]
string_values = [str(i) for i in range(start, start + nb)]
float_vec_values = gen_vectors(nb, dim)
data = [int_values, float_values, string_values, float_vec_values]
return data

View File

@ -37,6 +37,8 @@ default_float_vec_field_name = "float_vector"
another_float_vec_field_name = "float_vector1"
default_binary_vec_field_name = "binary_vector"
default_partition_name = "_default"
default_resource_group_name = '__default_resource_group'
default_resource_group_capacity = 1000000
default_tag = "1970_01_01"
row_count = "row_count"
default_length = 65535
@ -200,6 +202,7 @@ class CheckTasks:
check_role_property = "check_role_property"
check_permission_deny = "check_permission_deny"
check_value_equal = "check_value_equal"
check_rg_property = "check_resource_group_property"
class BulkLoadStates:

File diff suppressed because it is too large Load Diff