enhance: GCS native support (GCS implemented using Google Cloud Storage libraries) (#36214)

Native support for Google cloud storage using the Google Cloud Storage
libraries. Authentication is performed using GCS service account
credentials JSON.

Currently, Milvus supports Google Cloud Storage using S3-compatible APIs
via the AWS SDK. This approach has the following limitations:

1. Overhead: Translating requests between S3-compatible APIs and GCS can
introduce additional overhead.
2. Compatibility Limitations: Some features of the original S3 API may
not fully translate or work as expected with GCS.

To address these limitations, This enhancement is needed.

Related Issue: #36212
pull/36617/head
Rijin-N 2024-09-30 10:53:32 +05:30 committed by GitHub
parent 5686a9a024
commit a05a37a583
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 2005 additions and 115 deletions

1
.env
View File

@ -16,4 +16,5 @@ MINIO_ADDRESS=minio:9000
PULSAR_ADDRESS=pulsar://pulsar:6650
ETCD_ENDPOINTS=etcd:2379
AZURITE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite:10000/devstoreaccount1;"
ENABLE_GCP_NATIVE=ON

View File

@ -141,7 +141,7 @@ jobs:
- name: Start Service
shell: bash
run: |
docker compose up -d azurite
docker compose up -d azurite gcpnative
- name: UT
run: |
chmod +x build/builder.sh
@ -193,7 +193,7 @@ jobs:
- name: Start Service
shell: bash
run: |
docker compose up -d pulsar etcd minio azurite
docker compose up -d pulsar etcd minio azurite gcpnative
- name: UT
run: |
chmod +x build/builder.sh

View File

@ -306,7 +306,8 @@ ${CMAKE_EXTRA_ARGS} \
-DUSE_ASAN=${USE_ASAN} \
-DUSE_DYNAMIC_SIMD=${USE_DYNAMIC_SIMD} \
-DCPU_ARCH=${CPU_ARCH} \
-DINDEX_ENGINE=${INDEX_ENGINE} "
-DINDEX_ENGINE=${INDEX_ENGINE} \
-DENABLE_GCP_NATIVE=${ENABLE_GCP_NATIVE} "
if [ -z "$BUILD_WITHOUT_AZURE" ]; then
CMAKE_CMD=${CMAKE_CMD}"-DAZURE_BUILD_DIR=${AZURE_BUILD_DIR} \
-DVCPKG_TARGET_TRIPLET=${VCPKG_TARGET_TRIPLET} "

View File

@ -133,11 +133,17 @@ minio:
# aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/latest/attach-an-instance-ram-role
useIAM: false
# Cloud Provider of S3. Supports: "aws", "gcp", "aliyun".
# Cloud Provider of Google Cloud Storage. Supports: "gcpnative".
# You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio
# You can use "gcp" for other cloud provider supports S3 API with signature v2
# You can use "aliyun" for other cloud provider uses virtual host style bucket
# You can use "gcpnative" for the Google Cloud Platform provider. Uses service account credentials
# for authentication.
# When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now
cloudProvider: aws
# The JSON content contains the gcs service account credentials.
# Used only for the "gcpnative" cloud provider.
gcpCredentialJSON:
# Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws".
# Leave it empty if you want to use AWS default endpoint
iamEndpoint:

View File

@ -67,6 +67,12 @@ services:
- "16686:16686" # frontent
- "14268:14268" # jaeger.thirft
gcpnative:
image: fsouza/fake-gcs-server
command: -scheme http -public-host storage.gcs.127.0.0.1.nip.io:4443
ports:
- "4443:4443"
networks:
default:
name: milvus_dev

View File

@ -102,6 +102,12 @@ services:
depends_on:
- zookeeper
gcpnative:
image: fsouza/fake-gcs-server
command: -scheme http -public-host storage.gcs.127.0.0.1.nip.io:4443
ports:
- "4443:4443"
networks:
default:
name: milvus_dev

View File

@ -31,6 +31,7 @@ services:
MINIO_ADDRESS: ${MINIO_ADDRESS}
CONAN_USER_HOME: /home/milvus
AZURE_STORAGE_CONNECTION_STRING: ${AZURITE_CONNECTION_STRING}
ENABLE_GCP_NATIVE: ${ENABLE_GCP_NATIVE}
volumes: &builder-volumes
- .:/go/src/github.com/milvus-io/milvus:delegated
- ${DOCKER_VOLUME_DIRECTORY:-.docker}/${IMAGE_ARCH}-${OS_NAME}-ccache:/ccache:delegated
@ -43,6 +44,7 @@ services:
- minio
- pulsar
- azurite
- gcpnative
# Command
command: &builder-command >
/bin/bash -c "
@ -71,6 +73,7 @@ services:
MINIO_ADDRESS: ${MINIO_ADDRESS}
CONAN_USER_HOME: /home/milvus
AZURE_STORAGE_CONNECTION_STRING: ${AZURITE_CONNECTION_STRING}
ENABLE_GCP_NATIVE: ${ENABLE_GCP_NATIVE}
volumes: &builder-volumes-gpu
- .:/go/src/github.com/milvus-io/milvus:delegated
- ${DOCKER_VOLUME_DIRECTORY:-.docker-gpu}/${OS_NAME}-ccache:/ccache:delegated
@ -83,6 +86,7 @@ services:
- minio
- pulsar
- azurite
- gcpnative
# Command
command: &builder-command-gpu >
/bin/bash -c "
@ -134,6 +138,13 @@ services:
jaeger:
image: jaegertracing/all-in-one:latest
gcpnative:
image: fsouza/fake-gcs-server
command: -scheme http -public-host storage.gcs.127.0.0.1.nip.io:4443 -external-url "http://storage.gcs.127.0.0.1.nip.io:4443"
hostname: storage.gcs.127.0.0.1.nip.io
ports:
- "4443:4443"
networks:
default:
name: milvus_dev

19
go.mod
View File

@ -48,7 +48,7 @@ require (
golang.org/x/crypto v0.25.0
golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691
golang.org/x/net v0.27.0
golang.org/x/oauth2 v0.20.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/text v0.16.0
google.golang.org/grpc v1.65.0
@ -75,7 +75,12 @@ require (
)
require (
cloud.google.com/go v0.115.0 // indirect
cloud.google.com/go/auth v0.6.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.8 // indirect
cloud.google.com/go/storage v1.43.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
@ -111,6 +116,7 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/expr-lang/expr v1.15.7 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
@ -127,9 +133,13 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
@ -215,6 +225,8 @@ require (
go.etcd.io/etcd/client/v2 v2.305.5 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.5 // indirect
go.etcd.io/etcd/raft/v3 v3.5.5 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.13.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
@ -232,8 +244,9 @@ require (
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/api v0.187.0 // indirect
google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect

31
go.sum
View File

@ -18,6 +18,12 @@ cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmW
cloud.google.com/go v0.78.0/go.mod h1:QjdrLG0uq+YwhjoVOLsS1t7TW8fs36kLs4XO5R5ECHg=
cloud.google.com/go v0.79.0/go.mod h1:3bzgcEeQlzbuEAYu4mrWhKqWjmpprinYgKJLgKHnbb8=
cloud.google.com/go v0.81.0/go.mod h1:mk/AM35KwGk/Nm2YSeZbxXdrNK3KZOYHmLkOqC2V6E0=
cloud.google.com/go v0.115.0 h1:CnFSK6Xo3lDYRoBKEcAtia6VSC837/ZkJuRduSFnr14=
cloud.google.com/go v0.115.0/go.mod h1:8jIM5vVgoAEoiVxQ/O4BFTfHqulPZgs/ufEzMcFMdWU=
cloud.google.com/go/auth v0.6.1 h1:T0Zw1XM5c1GlpN2HYr2s+m3vr1p2wy+8VN+Z1FKxW38=
cloud.google.com/go/auth v0.6.1/go.mod h1:eFHG7zDzbXHKmjJddFG/rBlcGp6t25SwRUiEQSlO4x4=
cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4=
cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
@ -29,6 +35,8 @@ cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1h
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/iam v1.1.8 h1:r7umDwhj+BQyz0ScZMp4QrGXjSTI3ZINnpgU2nlB/K0=
cloud.google.com/go/iam v1.1.8/go.mod h1:GvE6lyMmfxXauzNq8NbgJbeVQNspG+tcdL/W8QO1+zE=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
@ -38,6 +46,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.43.0 h1:CcxnSohZwizt4LCzQHWvBf1/kvtHUn7gk9QERXPyXFs=
cloud.google.com/go/storage v1.43.0/go.mod h1:ajvxEa7WmZS1PxvKRq4bq0tFT3vMd502JwstCcYv0Q0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
@ -242,6 +252,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
@ -338,6 +350,7 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@ -415,12 +428,18 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBYGmXdxA=
github.com/googleapis/gax-go/v2 v2.12.5/go.mod h1:BUDKcWo+RaKq5SC9vVYL0wLADa3VcfswbOMMRmB9H3E=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
@ -936,9 +955,13 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0/go.mod h1:E5NNboN0UqSAki0Atn9kVwaN7I+l25gGxDqBueo/74E=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.0.1/go.mod h1:OPEOD4jIT2SlZPMmwT6FqZz2C0ZNdQqiWcoK6M0SNFU=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
@ -1121,6 +1144,8 @@ golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo=
golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -1335,6 +1360,8 @@ google.golang.org/api v0.40.0/go.mod h1:fYKFpnQN0DsDSKRVRcQSDQNtqWPfM9i+zNPxepjR
google.golang.org/api v0.41.0/go.mod h1:RkxM5lITDfTzmyKFPt+wGrCJbVfniCr2ool8kTBzRTU=
google.golang.org/api v0.43.0/go.mod h1:nQsDGjRXMo4lvh5hP0TKqF244gqhGcr/YSIykhUk/94=
google.golang.org/api v0.44.0/go.mod h1:EBOGZqzyhtvMDoxwS97ctnh0zUmYY6CxqXsc1AvkYD8=
google.golang.org/api v0.187.0 h1:Mxs7VATVC2v7CY+7Xwm4ndkX71hpElcvx0D1Ji/p1eo=
google.golang.org/api v0.187.0/go.mod h1:KIHlTc4x7N7gKKuVsdmfBXN13yEEWXWFURWY6SBp2gk=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -1391,8 +1418,12 @@ google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+n
google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ=
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY=
google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d h1:PksQg4dV6Sem3/HkBX+Ltq8T0ke0PKIRBNBatoDTVls=
google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:s7iA721uChleev562UJO2OYB0PPT9CMFjV+Ce7VJH5M=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU=
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 h1:MuYw1wJzT+ZkybKfaOXKp5hJiZDn2iHaXRw0mRYdHSc=
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4/go.mod h1:px9SlOOZBg1wM1zdnr8jEL4CNGUBZ+ZKYtNPApNQc4c=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=

View File

@ -84,6 +84,10 @@ if(DEFINED AZURE_BUILD_DIR)
set(LINK_TARGETS ${LINK_TARGETS} azure_blob_chunk_manager)
endif()
if (ENABLE_GCP_NATIVE)
set(LINK_TARGETS ${LINK_TARGETS} gcp-native-storage)
endif()
target_link_libraries(milvus_core ${LINK_TARGETS})
install(TARGETS milvus_core DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -45,6 +45,8 @@ get_storage_config(const milvus::proto::clustering::StorageConfig& config) {
storage_config.region = config.region();
storage_config.useVirtualHost = config.use_virtual_host();
storage_config.requestTimeoutMs = config.request_timeout_ms();
storage_config.gcp_credential_json =
std::string(config.gcpcredentialjson());
return storage_config;
}

View File

@ -65,6 +65,7 @@ enum ErrorCode {
MemAllocateSizeNotMatch = 2035,
MmapError = 2036,
OutOfRange = 2037,
GcpNativeError = 2038,
KnowhereError = 2100,
// timeout or cancel related.

View File

@ -92,6 +92,7 @@ typedef struct CStorageConfig {
bool useIAM;
bool useVirtualHost;
int64_t requestTimeoutMs;
const char* gcp_credential_json;
} CStorageConfig;
typedef struct CMmapConfig {

View File

@ -103,6 +103,8 @@ get_storage_config(const milvus::proto::indexcgo::StorageConfig& config) {
storage_config.region = config.region();
storage_config.useVirtualHost = config.use_virtual_host();
storage_config.requestTimeoutMs = config.request_timeout_ms();
storage_config.gcp_credential_json =
std::string(config.gcpcredentialjson());
return storage_config;
}
@ -563,6 +565,8 @@ NewBuildIndexInfo(CBuildIndexInfo* c_build_index_info,
storage_config.region = c_storage_config.region;
storage_config.useVirtualHost = c_storage_config.useVirtualHost;
storage_config.requestTimeoutMs = c_storage_config.requestTimeoutMs;
storage_config.gcp_credential_json =
std::string(c_storage_config.gcp_credential_json);
*c_build_index_info = build_index_info.release();
auto status = CStatus();

View File

@ -15,6 +15,12 @@
# limitations under the License.
add_source_at_current_directory()
if (ENABLE_GCP_NATIVE)
add_definitions(-DENABLE_GCP_NATIVE)
add_subdirectory(gcp-native-storage)
endif()
if (DEFINED AZURE_BUILD_DIR)
add_definitions(-DAZURE_BUILD_DIR)
include_directories(azure-blob-storage)

View File

@ -101,6 +101,8 @@ struct StorageConfig {
bool useIAM = false;
bool useVirtualHost = false;
int64_t requestTimeoutMs = 3000;
bool gcp_native_without_auth = false;
std::string gcp_credential_json = "";
std::string
ToString() const {
@ -113,7 +115,9 @@ struct StorageConfig {
<< ", sslCACert=" << sslCACert.size() // only print cert length
<< ", useIAM=" << std::boolalpha << useIAM
<< ", useVirtualHost=" << std::boolalpha << useVirtualHost
<< ", requestTimeoutMs=" << requestTimeoutMs << "]";
<< ", requestTimeoutMs=" << requestTimeoutMs
<< ", gcp_native_without_auth=" << std::boolalpha
<< gcp_native_without_auth << "]";
return ss.str();
}

View File

@ -29,6 +29,9 @@
#ifdef AZURE_BUILD_DIR
#include "storage/azure/AzureChunkManager.h"
#endif
#ifdef ENABLE_GCP_NATIVE
#include "storage/gcp-native-storage/GcpNativeChunkManager.h"
#endif
#include "storage/ChunkManager.h"
#include "storage/DiskFileManagerImpl.h"
#include "storage/InsertData.h"
@ -59,6 +62,7 @@ enum class CloudProviderType : int8_t {
ALIYUN = 3,
AZURE = 4,
TENCENTCLOUD = 5,
GCPNATIVE = 6,
};
std::map<std::string, CloudProviderType> CloudProviderType_Map = {
@ -66,7 +70,8 @@ std::map<std::string, CloudProviderType> CloudProviderType_Map = {
{"gcp", CloudProviderType::GCP},
{"aliyun", CloudProviderType::ALIYUN},
{"azure", CloudProviderType::AZURE},
{"tencent", CloudProviderType::TENCENTCLOUD}};
{"tencent", CloudProviderType::TENCENTCLOUD},
{"gcpnative", CloudProviderType::GCPNATIVE}};
std::map<std::string, int> ReadAheadPolicy_Map = {
{"normal", MADV_NORMAL},
@ -713,6 +718,12 @@ CreateChunkManager(const StorageConfig& storage_config) {
case CloudProviderType::AZURE: {
return std::make_shared<AzureChunkManager>(storage_config);
}
#endif
#ifdef ENABLE_GCP_NATIVE
case CloudProviderType::GCPNATIVE: {
return std::make_shared<GcpNativeChunkManager>(
storage_config);
}
#endif
default: {
return std::make_shared<MinioChunkManager>(storage_config);

View File

@ -0,0 +1,15 @@
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License
cmake_minimum_required (VERSION 3.12)
set(CMAKE_CXX_STANDARD 17)
add_library(gcp-native-storage OBJECT GcpNativeChunkManager.cpp GcpNativeClientManager.cpp)

View File

@ -0,0 +1,276 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <chrono>
#include <iostream>
#include <sstream>
#include "common/Consts.h"
#include "common/EasyAssert.h"
#include "log/Log.h"
#include "monitor/prometheus_client.h"
#include "storage/gcp-native-storage/GcpNativeChunkManager.h"
namespace milvus {
namespace storage {
GcpNativeChunkManager::GcpNativeChunkManager(
const StorageConfig& storage_config)
: default_bucket_name_(storage_config.bucket_name),
path_prefix_(storage_config.root_path) {
try {
client_ = std::make_unique<gcpnative::GcpNativeClientManager>(
storage_config.address,
storage_config.gcp_credential_json,
storage_config.useSSL,
storage_config.gcp_native_without_auth,
storage_config.requestTimeoutMs == 0
? DEFAULT_CHUNK_MANAGER_REQUEST_TIMEOUT_MS
: storage_config.requestTimeoutMs);
} catch (std::exception& err) {
ThrowGcpNativeError("GcpNativeChunkManager",
err,
"gcp native chunk manager client creation failed, "
"error: {}, configuration: {}",
err.what(),
storage_config.ToString());
}
}
GcpNativeChunkManager::~GcpNativeChunkManager() {
}
uint64_t
GcpNativeChunkManager::Size(const std::string& filepath) {
return GetObjectSize(default_bucket_name_, filepath);
}
bool
GcpNativeChunkManager::Exist(const std::string& filepath) {
return ObjectExists(default_bucket_name_, filepath);
}
void
GcpNativeChunkManager::Remove(const std::string& filepath) {
DeleteObject(default_bucket_name_, filepath);
}
std::vector<std::string>
GcpNativeChunkManager::ListWithPrefix(const std::string& filepath) {
return ListObjects(default_bucket_name_, filepath);
}
uint64_t
GcpNativeChunkManager::Read(const std::string& filepath,
void* buf,
uint64_t size) {
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
}
void
GcpNativeChunkManager::Write(const std::string& filepath,
void* buf,
uint64_t size) {
PutObjectBuffer(default_bucket_name_, filepath, buf, size);
}
bool
GcpNativeChunkManager::BucketExists(const std::string& bucket_name) {
bool res;
try {
res = client_->BucketExists(bucket_name);
} catch (std::exception& err) {
ThrowGcpNativeError(
"BucketExists", err, "params, bucket={}", bucket_name);
}
return res;
}
std::vector<std::string>
GcpNativeChunkManager::ListBuckets() {
std::vector<std::string> res;
try {
res = client_->ListBuckets();
} catch (std::exception& err) {
ThrowGcpNativeError("ListBuckets", err, "params");
}
return res;
}
bool
GcpNativeChunkManager::CreateBucket(const std::string& bucket_name) {
bool res;
try {
res = client_->CreateBucket(bucket_name);
} catch (std::exception& err) {
ThrowGcpNativeError(
"CreateBucket", err, "params, bucket={}", bucket_name);
}
return res;
}
bool
GcpNativeChunkManager::DeleteBucket(const std::string& bucket_name) {
bool res;
try {
res = client_->DeleteBucket(bucket_name);
} catch (std::exception& err) {
ThrowGcpNativeError(
"DeleteBucket", err, "params, bucket={}", bucket_name);
}
return res;
}
bool
GcpNativeChunkManager::ObjectExists(const std::string& bucket_name,
const std::string& object_name) {
bool res;
try {
auto start = std::chrono::system_clock::now();
res = client_->ObjectExists(bucket_name, object_name);
monitor::internal_storage_request_latency_stat.Observe(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count());
monitor::internal_storage_op_count_stat_suc.Increment();
} catch (std::exception& err) {
monitor::internal_storage_op_count_stat_fail.Increment();
ThrowGcpNativeError("ObjectExists",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}
uint64_t
GcpNativeChunkManager::GetObjectSize(const std::string& bucket_name,
const std::string& object_name) {
uint64_t res;
try {
auto start = std::chrono::system_clock::now();
res = client_->GetObjectSize(bucket_name, object_name);
monitor::internal_storage_request_latency_stat.Observe(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count());
monitor::internal_storage_op_count_stat_suc.Increment();
} catch (std::exception& err) {
monitor::internal_storage_op_count_stat_fail.Increment();
ThrowGcpNativeError("GetObjectSize",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}
bool
GcpNativeChunkManager::DeleteObject(const std::string& bucket_name,
const std::string& object_name) {
bool res;
try {
auto start = std::chrono::system_clock::now();
res = client_->DeleteObject(bucket_name, object_name);
monitor::internal_storage_request_latency_remove.Observe(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count());
monitor::internal_storage_op_count_remove_suc.Increment();
} catch (std::exception& err) {
monitor::internal_storage_op_count_remove_fail.Increment();
ThrowGcpNativeError("DeleteObject",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}
bool
GcpNativeChunkManager::PutObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size) {
bool res;
try {
auto start = std::chrono::system_clock::now();
res = client_->PutObjectBuffer(bucket_name, object_name, buf, size);
monitor::internal_storage_request_latency_put.Observe(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count());
monitor::internal_storage_op_count_put_suc.Increment();
monitor::internal_storage_kv_size_put.Observe(size);
} catch (std::exception& err) {
monitor::internal_storage_op_count_put_fail.Increment();
ThrowGcpNativeError("PutObjectBuffer",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}
uint64_t
GcpNativeChunkManager::GetObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size) {
uint64_t res;
try {
auto start = std::chrono::system_clock::now();
res = client_->GetObjectBuffer(bucket_name, object_name, buf, size);
monitor::internal_storage_request_latency_get.Observe(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count());
monitor::internal_storage_op_count_get_suc.Increment();
monitor::internal_storage_kv_size_get.Observe(size);
} catch (std::exception& err) {
monitor::internal_storage_op_count_get_fail.Increment();
ThrowGcpNativeError("GetObjectBuffer",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return res;
}
std::vector<std::string>
GcpNativeChunkManager::ListObjects(const std::string& bucket_name,
const std::string& prefix) {
std::vector<std::string> res;
try {
auto start = std::chrono::system_clock::now();
res = client_->ListObjects(bucket_name, prefix);
monitor::internal_storage_request_latency_list.Observe(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count());
monitor::internal_storage_op_count_list_suc.Increment();
} catch (std::exception& err) {
monitor::internal_storage_op_count_list_fail.Increment();
ThrowGcpNativeError("ListObjects",
err,
"params, bucket={}, prefix={}",
bucket_name,
prefix);
}
return res;
}
} // namespace storage
} // namespace milvus

View File

@ -0,0 +1,151 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <iostream>
#include <stdlib.h>
#include <string>
#include <vector>
#include "storage/gcp-native-storage/GcpNativeClientManager.h"
#include "storage/ChunkManager.h"
#include "storage/Types.h"
namespace milvus {
namespace storage {
template <typename... Args>
static SegcoreError
ThrowGcpNativeError(const std::string& func,
const std::exception& err,
const std::string& fmt_string,
Args&&... args) {
std::ostringstream oss;
const auto& message = fmt::format(fmt_string, std::forward<Args>(args)...);
oss << "Error in " << func << "[exception:" << err.what()
<< ", params:" << message << "]";
throw SegcoreError(GcpNativeError, oss.str());
}
/**
* @brief This GcpNativeChunkManager is used to interact with
* Google Cloud Storage (GCS) services using GcpNativeClientManager class.
*/
class GcpNativeChunkManager : public ChunkManager {
public:
explicit GcpNativeChunkManager(const StorageConfig& storage_config);
GcpNativeChunkManager(const GcpNativeChunkManager&);
GcpNativeChunkManager&
operator=(const GcpNativeChunkManager&);
virtual ~GcpNativeChunkManager();
public:
bool
Exist(const std::string& filepath) override;
uint64_t
Size(const std::string& filepath) override;
uint64_t
Read(const std::string& filepath,
uint64_t offset,
void* buf,
uint64_t len) override {
PanicInfo(NotImplemented, GetName() + "Read with offset not implement");
}
void
Write(const std::string& filepath,
uint64_t offset,
void* buf,
uint64_t len) override {
PanicInfo(NotImplemented,
GetName() + "Write with offset not implement");
}
uint64_t
Read(const std::string& filepath, void* buf, uint64_t len) override;
void
Write(const std::string& filepath, void* buf, uint64_t len) override;
std::vector<std::string>
ListWithPrefix(const std::string& filepath) override;
void
Remove(const std::string& filepath) override;
std::string
GetName() const override {
return "GcpNativeChunkManager";
}
std::string
GetRootPath() const override {
return path_prefix_;
}
public:
inline std::string
GetBucketName() {
return default_bucket_name_;
}
inline void
SetBucketName(const std::string& bucket_name) {
default_bucket_name_ = bucket_name;
}
bool
BucketExists(const std::string& bucket_name);
bool
CreateBucket(const std::string& bucket_name);
bool
DeleteBucket(const std::string& bucket_name);
std::vector<std::string>
ListBuckets();
bool
ObjectExists(const std::string& bucket_name,
const std::string& object_name);
uint64_t
GetObjectSize(const std::string& bucket_name,
const std::string& object_name);
bool
DeleteObject(const std::string& bucket_name,
const std::string& object_name);
bool
PutObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size);
uint64_t
GetObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size);
std::vector<std::string>
ListObjects(const std::string& bucket_name,
const std::string& prefix = nullptr);
private:
std::unique_ptr<gcpnative::GcpNativeClientManager> client_;
std::string default_bucket_name_;
std::string path_prefix_;
};
} // namespace storage
} // namespace milvus

View File

@ -0,0 +1,208 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <cstdlib>
#include <filesystem>
#include "google/cloud/storage/retry_policy.h"
#include "GcpNativeClientManager.h"
namespace gcs = google::cloud::storage;
namespace gcpnative {
GcpNativeClientManager::GcpNativeClientManager(
const std::string& address,
const std::string& gcp_credential_json,
bool use_ssl,
bool gcp_native_without_auth,
int64_t request_timeout_ms) {
auto options = google::cloud::Options{};
if (gcp_native_without_auth) {
// Create an anonymous credential for unit testing with GCS emulation.
auto credentials =
google::cloud::storage::oauth2::CreateAnonymousCredentials();
options.set<gcs::Oauth2CredentialsOption>(std::move(credentials));
} else {
if (gcp_credential_json.empty()) {
throw std::runtime_error(
"GCP native error: GCS service account credentials are "
"missing.");
}
auto credentials =
google::cloud::MakeServiceAccountCredentials(gcp_credential_json);
options.set<google::cloud::UnifiedCredentialsOption>(credentials);
}
if (request_timeout_ms > 0) {
options.set<gcs::RetryPolicyOption>(
gcs::LimitedTimeRetryPolicy(
std::chrono::milliseconds(request_timeout_ms))
.clone());
}
if (!address.empty()) {
std::string complete_address = "http://";
if (use_ssl) {
complete_address = "https://";
}
complete_address += address;
options.set<gcs::RestEndpointOption>(complete_address);
}
client_ =
std::make_unique<google::cloud::storage::Client>(std::move(options));
}
GcpNativeClientManager::~GcpNativeClientManager() {
}
bool
GcpNativeClientManager::BucketExists(const std::string& bucket_name) {
auto metadata = client_->GetBucketMetadata(bucket_name);
if (!metadata) {
if (metadata.status().code() == google::cloud::StatusCode::kNotFound) {
return false;
} else {
throw std::runtime_error(
GetGcpNativeError(std::move(metadata).status()));
}
}
return true;
}
std::vector<std::string>
GcpNativeClientManager::ListBuckets() {
std::vector<std::string> buckets;
for (auto&& metadata : client_->ListBuckets()) {
if (!metadata) {
throw std::runtime_error(
GetGcpNativeError(std::move(metadata).status()));
}
buckets.emplace_back(metadata->name());
}
return buckets;
}
bool
GcpNativeClientManager::CreateBucket(const std::string& bucket_name) {
if (BucketExists(bucket_name)) {
return false;
}
auto metadata = client_->CreateBucket(bucket_name, gcs::BucketMetadata());
if (!metadata) {
throw std::runtime_error(
GetGcpNativeError(std::move(metadata).status()));
}
return true;
}
bool
GcpNativeClientManager::DeleteBucket(const std::string& bucket_name) {
auto status = client_->DeleteBucket(bucket_name);
if (!status.ok()) {
if (status.code() == google::cloud::StatusCode::kNotFound) {
return false;
} else {
throw std::runtime_error(GetGcpNativeError(std::move(status)));
}
}
return true;
}
bool
GcpNativeClientManager::ObjectExists(const std::string& bucket_name,
const std::string& object_name) {
auto metadata = client_->GetObjectMetadata(bucket_name, object_name);
if (!metadata) {
if (metadata.status().code() == google::cloud::StatusCode::kNotFound) {
return false;
} else {
throw std::runtime_error(
GetGcpNativeError(std::move(metadata).status()));
}
}
return true;
}
int64_t
GcpNativeClientManager::GetObjectSize(const std::string& bucket_name,
const std::string& object_name) {
auto metadata = client_->GetObjectMetadata(bucket_name, object_name);
if (!metadata) {
throw std::runtime_error(
GetGcpNativeError(std::move(metadata).status()));
}
return metadata->size();
}
bool
GcpNativeClientManager::DeleteObject(const std::string& bucket_name,
const std::string& object_name) {
google::cloud::Status status =
client_->DeleteObject(bucket_name, object_name);
if (!status.ok()) {
if (status.code() == google::cloud::StatusCode::kNotFound) {
return false;
} else {
throw std::runtime_error(GetGcpNativeError(std::move(status)));
}
}
return true;
}
bool
GcpNativeClientManager::PutObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size) {
std::string buffer(reinterpret_cast<char*>(buf), size);
auto stream = client_->WriteObject(bucket_name, object_name);
stream << buffer;
stream.Close();
if (stream.bad()) {
throw std::runtime_error(GetGcpNativeError(stream.metadata().status()));
}
return true;
}
uint64_t
GcpNativeClientManager::GetObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size) {
auto stream = client_->ReadObject(bucket_name, object_name);
if (stream.bad()) {
throw std::runtime_error(GetGcpNativeError(stream.status()));
}
stream.read(reinterpret_cast<char*>(buf), size);
auto bytes_read = stream.gcount();
stream.Close();
return bytes_read;
}
std::vector<std::string>
GcpNativeClientManager::ListObjects(const std::string& bucket_name,
const std::string& prefix) {
std::vector<std::string> objects_vec;
for (auto&& metadata :
client_->ListObjects(bucket_name, gcs::Prefix(prefix))) {
if (!metadata) {
throw std::runtime_error(
GetGcpNativeError(std::move(metadata).status()));
}
objects_vec.emplace_back(metadata->name());
}
return objects_vec;
}
} // namespace gcpnative

View File

@ -0,0 +1,87 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <string>
#include "google/cloud/storage/client.h"
#include "google/cloud/options.h"
#include "common/EasyAssert.h"
namespace gcpnative {
/**
* @brief This GcpNativeClientManager is used to interact with
* Google Cloud Storage (GCS) services.
*/
class GcpNativeClientManager {
public:
explicit GcpNativeClientManager(const std::string& address,
const std::string& gcp_credential_json,
bool use_ssl,
bool gcp_native_without_auth,
int64_t request_timeout_ms = 0);
GcpNativeClientManager(const GcpNativeClientManager&);
GcpNativeClientManager&
operator=(const GcpNativeClientManager&);
~GcpNativeClientManager();
public:
bool
BucketExists(const std::string& bucket_name);
bool
CreateBucket(const std::string& bucket_name);
bool
DeleteBucket(const std::string& bucket_name);
std::vector<std::string>
ListBuckets();
bool
ObjectExists(const std::string& bucket_name,
const std::string& object_name);
int64_t
GetObjectSize(const std::string& bucket_name,
const std::string& object_name);
bool
DeleteObject(const std::string& bucket_name,
const std::string& object_name);
bool
PutObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size);
uint64_t
GetObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size);
std::vector<std::string>
ListObjects(const std::string& bucket_name,
const std::string& prefix = nullptr);
private:
inline std::string
GetGcpNativeError(google::cloud::Status&& status) {
return GetGcpNativeError(status);
}
inline std::string
GetGcpNativeError(const google::cloud::Status& status) {
return fmt::format("Gcp native error: {} ({})",
status.message(),
static_cast<int>(status.code()));
}
private:
std::unique_ptr<google::cloud::storage::Client> client_;
};
} // namespace gcpnative

View File

@ -76,6 +76,8 @@ InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config) {
storage_config.useVirtualHost = c_storage_config.useVirtualHost;
storage_config.region = c_storage_config.region;
storage_config.requestTimeoutMs = c_storage_config.requestTimeoutMs;
storage_config.gcp_credential_json =
std::string(c_storage_config.gcp_credential_json);
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(
storage_config);

View File

@ -117,6 +117,14 @@ if (DEFINED AZURE_BUILD_DIR)
include_directories("${AZURE_BUILD_DIR}/vcpkg_installed/${VCPKG_TARGET_TRIPLET}/include")
endif()
if (ENABLE_GCP_NATIVE)
add_definitions(-DENABLE_GCP_NATIVE)
set(MILVUS_TEST_FILES
${MILVUS_TEST_FILES}
test_gcp_native_chunk_manager.cpp
)
endif()
if (LINUX)
message( STATUS "Building Milvus Unit Test on Linux")
option(USE_ASAN "Whether to use AddressSanitizer" OFF)

View File

@ -0,0 +1,283 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include "common/EasyAssert.h"
#include "storage/gcp-native-storage/GcpNativeChunkManager.h"
#include "storage/Util.h"
using namespace std;
using namespace milvus;
using namespace milvus::storage;
StorageConfig
get_default_storage_config() {
auto endpoint = "storage.gcs.127.0.0.1.nip.io:4443";
auto access_key = "";
auto access_value = "";
auto root_path = "files";
auto use_ssl = false;
auto ssl_ca_cert = "";
auto iam_endpoint = "";
auto bucket_name = "sample-bucket";
bool use_iam = false;
bool gcp_native_without_auth = true;
return StorageConfig{
endpoint,
bucket_name,
access_key,
access_value,
root_path,
"remote", // storage_type
"gcpnative", // cloud_provider_type
iam_endpoint,
"error", // log_level
"", // region
use_ssl,
ssl_ca_cert,
use_iam,
false, // useVirtualHost
1000, // requestTimeoutMs
gcp_native_without_auth,
"" // gcp_credential_json
};
}
class GcpNativeChunkManagerTest : public testing::Test {
public:
GcpNativeChunkManagerTest() {
}
~GcpNativeChunkManagerTest() {
}
virtual void
SetUp() {
configs_ = get_default_storage_config();
chunk_manager_ = make_unique<GcpNativeChunkManager>(configs_);
}
protected:
std::unique_ptr<GcpNativeChunkManager> chunk_manager_;
StorageConfig configs_;
};
TEST_F(GcpNativeChunkManagerTest, BasicFunctions) {
EXPECT_TRUE(chunk_manager_->GetName() == "GcpNativeChunkManager");
EXPECT_TRUE(chunk_manager_->GetRootPath() == configs_.root_path);
EXPECT_TRUE(chunk_manager_->GetBucketName() == configs_.bucket_name);
}
TEST_F(GcpNativeChunkManagerTest, BucketPositive) {
string test_bucket_name = "bucket-not-exist";
EXPECT_EQ(chunk_manager_->BucketExists(test_bucket_name), false);
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
EXPECT_EQ(chunk_manager_->BucketExists(test_bucket_name), true);
vector<string> buckets = chunk_manager_->ListBuckets();
EXPECT_TRUE(std::find(buckets.begin(), buckets.end(), test_bucket_name) !=
buckets.end());
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
EXPECT_EQ(chunk_manager_->BucketExists(test_bucket_name), false);
}
TEST_F(GcpNativeChunkManagerTest, BucketNegtive) {
string test_bucket_name = configs_.bucket_name;
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), false);
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
// Create an already existing bucket.
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), false);
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
}
TEST_F(GcpNativeChunkManagerTest, ObjectExist) {
string test_bucket_name = configs_.bucket_name;
string obj_path = "1/3";
if (!chunk_manager_->BucketExists(test_bucket_name)) {
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
}
EXPECT_EQ(chunk_manager_->Exist(obj_path), false);
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
}
TEST_F(GcpNativeChunkManagerTest, WritePositive) {
string test_bucket_name = configs_.bucket_name;
EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name);
if (!chunk_manager_->BucketExists(test_bucket_name)) {
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
}
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
string path = "1";
chunk_manager_->Write(path, data, sizeof(data));
EXPECT_EQ(chunk_manager_->Exist(path), true);
EXPECT_EQ(chunk_manager_->Size(path), 5);
int datasize = 10000;
uint8_t* bigdata = new uint8_t[datasize];
srand((unsigned)time(NULL));
for (int i = 0; i < datasize; ++i) {
bigdata[i] = rand() % 256;
}
chunk_manager_->Write(path, bigdata, datasize);
EXPECT_EQ(chunk_manager_->Size(path), datasize);
delete[] bigdata;
chunk_manager_->Remove(path);
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
}
TEST_F(GcpNativeChunkManagerTest, ReadPositive) {
string test_bucket_name = configs_.bucket_name;
chunk_manager_->SetBucketName(test_bucket_name);
EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name);
if (!chunk_manager_->BucketExists(test_bucket_name)) {
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
}
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
string path = "1/4/6";
chunk_manager_->Write(path, data, sizeof(data));
EXPECT_EQ(chunk_manager_->Exist(path), true);
EXPECT_EQ(chunk_manager_->Size(path), sizeof(data));
uint8_t readdata[20] = {0};
EXPECT_EQ(chunk_manager_->Read(path, readdata, sizeof(data)), sizeof(data));
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x45);
EXPECT_EQ(readdata[3], 0x34);
EXPECT_EQ(readdata[4], 0x23);
EXPECT_EQ(chunk_manager_->Read(path, readdata, 3), 3);
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x45);
uint8_t dataWithNULL[] = {0x17, 0x32, 0x00, 0x34, 0x23};
chunk_manager_->Write(path, dataWithNULL, sizeof(dataWithNULL));
EXPECT_EQ(chunk_manager_->Exist(path), true);
EXPECT_EQ(chunk_manager_->Size(path), sizeof(dataWithNULL));
EXPECT_EQ(chunk_manager_->Read(path, readdata, sizeof(dataWithNULL)),
sizeof(dataWithNULL));
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x00);
EXPECT_EQ(readdata[3], 0x34);
EXPECT_EQ(readdata[4], 0x23);
chunk_manager_->Remove(path);
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
}
TEST_F(GcpNativeChunkManagerTest, ReadNotExist) {
string test_bucket_name = configs_.bucket_name;
chunk_manager_->SetBucketName(test_bucket_name);
EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name);
if (!chunk_manager_->BucketExists(test_bucket_name)) {
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
}
string path = "1/5/8";
uint8_t readdata[20] = {0};
EXPECT_THROW(
try {
chunk_manager_->Read(path, readdata, sizeof(readdata));
} catch (SegcoreError& e) {
EXPECT_TRUE(std::string(e.what()).find("Not Found") !=
string::npos);
throw e;
},
SegcoreError);
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
}
TEST_F(GcpNativeChunkManagerTest, RemovePositive) {
string test_bucket_name = configs_.bucket_name;
chunk_manager_->SetBucketName(test_bucket_name);
EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name);
if (!chunk_manager_->BucketExists(test_bucket_name)) {
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
}
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
string path = "1/7/8";
chunk_manager_->Write(path, data, sizeof(data));
EXPECT_EQ(chunk_manager_->Exist(path), true);
chunk_manager_->Remove(path);
EXPECT_EQ(chunk_manager_->Exist(path), false);
// test double deleted
chunk_manager_->Remove(path);
EXPECT_EQ(chunk_manager_->Exist(path), false);
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
}
TEST_F(GcpNativeChunkManagerTest, ListWithPrefixPositive) {
string test_bucket_name = configs_.bucket_name;
chunk_manager_->SetBucketName(test_bucket_name);
EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name);
if (!chunk_manager_->BucketExists(test_bucket_name)) {
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
}
string path1 = "1/7/8";
string path2 = "1/7/4";
string path3 = "1/4/8";
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
chunk_manager_->Write(path1, data, sizeof(data));
chunk_manager_->Write(path2, data, sizeof(data));
chunk_manager_->Write(path3, data, sizeof(data));
vector<string> objs = chunk_manager_->ListWithPrefix("1/7");
EXPECT_EQ(objs.size(), 2);
std::sort(objs.begin(), objs.end());
EXPECT_EQ(objs[0], "1/7/4");
EXPECT_EQ(objs[1], "1/7/8");
objs = chunk_manager_->ListWithPrefix("//1/7");
EXPECT_EQ(objs.size(), 0);
objs = chunk_manager_->ListWithPrefix("1");
EXPECT_EQ(objs.size(), 3);
std::sort(objs.begin(), objs.end());
EXPECT_EQ(objs[0], "1/4/8");
EXPECT_EQ(objs[1], "1/7/4");
EXPECT_EQ(objs[2], "1/7/8");
chunk_manager_->Remove(path1);
chunk_manager_->Remove(path2);
chunk_manager_->Remove(path3);
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
}
TEST_F(GcpNativeChunkManagerTest, ListWithPrefixNegative) {
string test_bucket_name = configs_.bucket_name;
chunk_manager_->SetBucketName(test_bucket_name);
EXPECT_EQ(chunk_manager_->GetBucketName(), test_bucket_name);
if (!chunk_manager_->BucketExists(test_bucket_name)) {
EXPECT_EQ(chunk_manager_->CreateBucket(test_bucket_name), true);
}
string path = "1/4/8";
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
chunk_manager_->Write(path, data, sizeof(data));
vector<string> objs = chunk_manager_->ListWithPrefix("1/6");
EXPECT_EQ(objs.size(), 0);
chunk_manager_->Remove(path);
EXPECT_EQ(chunk_manager_->DeleteBucket(test_bucket_name), true);
}

View File

@ -89,6 +89,12 @@ TEST_F(RemoteChunkManagerTest, BasicFunctions) {
EXPECT_TRUE(the_chunk_manager_->GetName() == "AzureChunkManager");
#endif
#ifdef ENABLE_GCP_NATIVE
configs_.cloud_provider = "gcpnative";
the_chunk_manager_ = CreateChunkManager(configs_);
EXPECT_TRUE(the_chunk_manager_->GetName() == "GcpNativeChunkManager");
#endif
configs_.cloud_provider = "";
}

View File

@ -330,20 +330,21 @@ func createStorageConfig() *indexpb.StorageConfig {
}
} else {
storageConfig = &indexpb.StorageConfig{
Address: Params.MinioCfg.Address.GetValue(),
AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(),
SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(),
UseSSL: Params.MinioCfg.UseSSL.GetAsBool(),
SslCACert: Params.MinioCfg.SslCACert.GetValue(),
BucketName: Params.MinioCfg.BucketName.GetValue(),
RootPath: Params.MinioCfg.RootPath.GetValue(),
UseIAM: Params.MinioCfg.UseIAM.GetAsBool(),
IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(),
StorageType: Params.CommonCfg.StorageType.GetValue(),
Region: Params.MinioCfg.Region.GetValue(),
UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(),
CloudProvider: Params.MinioCfg.CloudProvider.GetValue(),
RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
Address: Params.MinioCfg.Address.GetValue(),
AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(),
SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(),
UseSSL: Params.MinioCfg.UseSSL.GetAsBool(),
SslCACert: Params.MinioCfg.SslCACert.GetValue(),
BucketName: Params.MinioCfg.BucketName.GetValue(),
RootPath: Params.MinioCfg.RootPath.GetValue(),
UseIAM: Params.MinioCfg.UseIAM.GetAsBool(),
IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(),
StorageType: Params.CommonCfg.StorageType.GetValue(),
Region: Params.MinioCfg.Region.GetValue(),
UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(),
CloudProvider: Params.MinioCfg.CloudProvider.GetValue(),
RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
GcpCredentialJSON: Params.MinioCfg.GcpCredentialJSON.GetValue(),
}
}

View File

@ -39,6 +39,7 @@ func (m *chunkMgrFactory) NewChunkManager(ctx context.Context, config *indexpb.S
storage.RequestTimeout(config.GetRequestTimeoutMs()),
storage.Region(config.GetRegion()),
storage.CreateBucket(true),
storage.GcpCredentialJSON(config.GetGcpCredentialJSON()),
)
return chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}

View File

@ -209,20 +209,21 @@ func (s *IndexNodeSuite) SetupTest() {
s.NoError(err)
s.storageConfig = &indexpb.StorageConfig{
Address: Params.MinioCfg.Address.GetValue(),
AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(),
SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(),
UseSSL: Params.MinioCfg.UseSSL.GetAsBool(),
SslCACert: Params.MinioCfg.SslCACert.GetValue(),
BucketName: Params.MinioCfg.BucketName.GetValue(),
RootPath: Params.MinioCfg.RootPath.GetValue(),
UseIAM: Params.MinioCfg.UseIAM.GetAsBool(),
IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(),
StorageType: Params.CommonCfg.StorageType.GetValue(),
Region: Params.MinioCfg.Region.GetValue(),
UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(),
CloudProvider: Params.MinioCfg.CloudProvider.GetValue(),
RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
Address: Params.MinioCfg.Address.GetValue(),
AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(),
SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(),
UseSSL: Params.MinioCfg.UseSSL.GetAsBool(),
SslCACert: Params.MinioCfg.SslCACert.GetValue(),
BucketName: Params.MinioCfg.BucketName.GetValue(),
RootPath: Params.MinioCfg.RootPath.GetValue(),
UseIAM: Params.MinioCfg.UseIAM.GetAsBool(),
IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(),
StorageType: Params.CommonCfg.StorageType.GetValue(),
Region: Params.MinioCfg.Region.GetValue(),
UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(),
CloudProvider: Params.MinioCfg.CloudProvider.GetValue(),
RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
GcpCredentialJSON: Params.MinioCfg.GcpCredentialJSON.GetValue(),
}
var (

View File

@ -92,20 +92,21 @@ func (at *analyzeTask) Execute(ctx context.Context) error {
log.Info("Begin to build analyze task")
storageConfig := &clusteringpb.StorageConfig{
Address: at.req.GetStorageConfig().GetAddress(),
AccessKeyID: at.req.GetStorageConfig().GetAccessKeyID(),
SecretAccessKey: at.req.GetStorageConfig().GetSecretAccessKey(),
UseSSL: at.req.GetStorageConfig().GetUseSSL(),
BucketName: at.req.GetStorageConfig().GetBucketName(),
RootPath: at.req.GetStorageConfig().GetRootPath(),
UseIAM: at.req.GetStorageConfig().GetUseIAM(),
IAMEndpoint: at.req.GetStorageConfig().GetIAMEndpoint(),
StorageType: at.req.GetStorageConfig().GetStorageType(),
UseVirtualHost: at.req.GetStorageConfig().GetUseVirtualHost(),
Region: at.req.GetStorageConfig().GetRegion(),
CloudProvider: at.req.GetStorageConfig().GetCloudProvider(),
RequestTimeoutMs: at.req.GetStorageConfig().GetRequestTimeoutMs(),
SslCACert: at.req.GetStorageConfig().GetSslCACert(),
Address: at.req.GetStorageConfig().GetAddress(),
AccessKeyID: at.req.GetStorageConfig().GetAccessKeyID(),
SecretAccessKey: at.req.GetStorageConfig().GetSecretAccessKey(),
UseSSL: at.req.GetStorageConfig().GetUseSSL(),
BucketName: at.req.GetStorageConfig().GetBucketName(),
RootPath: at.req.GetStorageConfig().GetRootPath(),
UseIAM: at.req.GetStorageConfig().GetUseIAM(),
IAMEndpoint: at.req.GetStorageConfig().GetIAMEndpoint(),
StorageType: at.req.GetStorageConfig().GetStorageType(),
UseVirtualHost: at.req.GetStorageConfig().GetUseVirtualHost(),
Region: at.req.GetStorageConfig().GetRegion(),
CloudProvider: at.req.GetStorageConfig().GetCloudProvider(),
RequestTimeoutMs: at.req.GetStorageConfig().GetRequestTimeoutMs(),
SslCACert: at.req.GetStorageConfig().GetSslCACert(),
GcpCredentialJSON: at.req.GetStorageConfig().GetGcpCredentialJSON(),
}
numRowsMap := make(map[int64]int64)

View File

@ -248,20 +248,21 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
}
storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(),
UseSSL: it.req.GetStorageConfig().GetUseSSL(),
BucketName: it.req.GetStorageConfig().GetBucketName(),
RootPath: it.req.GetStorageConfig().GetRootPath(),
UseIAM: it.req.GetStorageConfig().GetUseIAM(),
IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(),
StorageType: it.req.GetStorageConfig().GetStorageType(),
UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(),
Region: it.req.GetStorageConfig().GetRegion(),
CloudProvider: it.req.GetStorageConfig().GetCloudProvider(),
RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(),
SslCACert: it.req.GetStorageConfig().GetSslCACert(),
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(),
UseSSL: it.req.GetStorageConfig().GetUseSSL(),
BucketName: it.req.GetStorageConfig().GetBucketName(),
RootPath: it.req.GetStorageConfig().GetRootPath(),
UseIAM: it.req.GetStorageConfig().GetUseIAM(),
IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(),
StorageType: it.req.GetStorageConfig().GetStorageType(),
UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(),
Region: it.req.GetStorageConfig().GetRegion(),
CloudProvider: it.req.GetStorageConfig().GetCloudProvider(),
RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(),
SslCACert: it.req.GetStorageConfig().GetSslCACert(),
GcpCredentialJSON: it.req.GetStorageConfig().GetGcpCredentialJSON(),
}
optFields := make([]*indexcgopb.OptionalFieldInfo, 0, len(it.req.GetOptionalScalarFields()))

View File

@ -20,6 +20,7 @@ message StorageConfig {
string cloud_provider = 12;
int64 request_timeout_ms = 13;
string sslCACert = 14;
string GcpCredentialJSON = 15;
}
message InsertFiles {

View File

@ -48,6 +48,7 @@ message StorageConfig {
string cloud_provider = 12;
int64 request_timeout_ms = 13;
string sslCACert = 14;
string GcpCredentialJSON = 15;
}
// Synchronously modify OptionalFieldInfo in index_coord.proto file

View File

@ -218,6 +218,7 @@ message StorageConfig {
string cloud_provider = 12;
int64 request_timeout_ms = 13;
string sslCACert = 14;
string GcpCredentialJSON = 15;
}
// Synchronously modify OptionalFieldInfo in index_cgo_msg.proto file

View File

@ -31,7 +31,8 @@ func NewChunkManagerFactoryWithParam(params *paramtable.ComponentParam) *ChunkMa
UseVirtualHost(params.MinioCfg.UseVirtualHost.GetAsBool()),
Region(params.MinioCfg.Region.GetValue()),
RequestTimeout(params.MinioCfg.RequestTimeoutMs.GetAsInt64()),
CreateBucket(true))
CreateBucket(true),
GcpCredentialJSON(params.MinioCfg.GcpCredentialJSON.GetValue()))
}
func NewChunkManagerFactory(persistentStorage string, opts ...Option) *ChunkManagerFactory {

View File

@ -0,0 +1,308 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"encoding/json"
"fmt"
"io"
"cloud.google.com/go/storage"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
)
type GcpNativeObjectStorage struct {
client *storage.Client
}
func newGcpNativeObjectStorageWithConfig(ctx context.Context, c *config) (*GcpNativeObjectStorage, error) {
var client *storage.Client
var err error
var opts []option.ClientOption
var projectId string
if c.address != "" {
completeAddress := "http://"
if c.useSSL {
completeAddress = "https://"
}
completeAddress = completeAddress + c.address + "/storage/v1/"
opts = append(opts, option.WithEndpoint(completeAddress))
}
if c.gcpNativeWithoutAuth {
opts = append(opts, option.WithoutAuthentication())
} else {
creds, err := google.CredentialsFromJSON(ctx, []byte(c.gcpCredentialJSON), storage.ScopeReadWrite)
if err != nil {
return nil, err
}
projectId, err = getProjectId(c.gcpCredentialJSON)
if err != nil {
return nil, err
}
opts = append(opts, option.WithCredentials(creds))
}
client, err = storage.NewClient(ctx, opts...)
if err != nil {
return nil, err
}
if c.bucketName == "" {
return nil, merr.WrapErrParameterInvalidMsg("invalid empty bucket name")
}
// Check bucket validity
checkBucketFn := func() error {
bucket := client.Bucket(c.bucketName)
_, err := bucket.Attrs(ctx)
if err == storage.ErrBucketNotExist && c.createBucket {
log.Info("gcs bucket does not exist, create bucket.", zap.String("bucket name", c.bucketName))
err = client.Bucket(c.bucketName).Create(ctx, projectId, nil)
if err != nil {
return err
}
return nil
}
return err
}
err = retry.Do(ctx, checkBucketFn, retry.Attempts(CheckBucketRetryAttempts))
if err != nil {
return nil, err
}
return &GcpNativeObjectStorage{client: client}, nil
}
func (gcs *GcpNativeObjectStorage) GetObject(ctx context.Context, bucketName, objectName string,
offset int64, size int64,
) (FileReader, error) {
bucket := gcs.client.Bucket(bucketName)
_, err := bucket.Attrs(ctx)
if err != nil {
return nil, checkObjectStorageError(objectName, err)
}
obj := bucket.Object(objectName)
_, err = obj.Attrs(ctx)
if err != nil {
return nil, checkObjectStorageError(objectName, err)
}
var reader *storage.Reader
if offset == 0 && size == 0 {
reader, err = obj.NewReader(ctx)
} else {
reader, err = obj.NewRangeReader(ctx, offset, size)
}
if err != nil {
return nil, checkObjectStorageError(objectName, err)
}
return &GcsReader{reader: reader, obj: obj}, nil
}
func (gcs *GcpNativeObjectStorage) PutObject(ctx context.Context, bucketName, objectName string,
reader io.Reader, objectSize int64,
) error {
obj := gcs.client.Bucket(bucketName).Object(objectName)
writer := obj.NewWriter(ctx)
_, err := io.Copy(writer, reader)
if err != nil {
return checkObjectStorageError(objectName, err)
}
err = writer.Close()
if err != nil {
return checkObjectStorageError(objectName, err)
}
return nil
}
func (gcs *GcpNativeObjectStorage) StatObject(ctx context.Context, bucketName,
objectName string,
) (int64, error) {
obj := gcs.client.Bucket(bucketName).Object(objectName)
attrs, err := obj.Attrs(ctx)
if err != nil {
return 0, checkObjectStorageError(objectName, err)
}
return attrs.Size, nil
}
func (gcs *GcpNativeObjectStorage) WalkWithObjects(ctx context.Context,
bucketName string, prefix string, recursive bool, walkFunc ChunkObjectWalkFunc,
) error {
query := &storage.Query{
Prefix: prefix,
}
if !recursive {
query.Delimiter = "/"
}
it := gcs.client.Bucket(bucketName).Objects(ctx, query)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return checkObjectStorageError(prefix, err)
}
if objAttrs.Prefix != "" {
continue
}
if !walkFunc(&ChunkObjectInfo{FilePath: objAttrs.Name, ModifyTime: objAttrs.Updated}) {
return nil
}
}
return nil
}
func (gcs *GcpNativeObjectStorage) RemoveObject(ctx context.Context, bucketName, prefix string) error {
bucket := gcs.client.Bucket(bucketName)
query := &storage.Query{Prefix: prefix}
it := bucket.Objects(ctx, query)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return checkObjectStorageError(prefix, err)
}
obj := bucket.Object(objAttrs.Name)
if err := obj.Delete(ctx); err != nil {
return checkObjectStorageError(objAttrs.Name, err)
}
}
return nil
}
func (gcs *GcpNativeObjectStorage) DeleteBucket(ctx context.Context, bucketName string) error {
bucket := gcs.client.Bucket(bucketName)
err := gcs.RemoveObject(ctx, bucketName, "")
if err != nil {
return err
}
err = bucket.Delete(ctx)
if err != nil {
return err
}
return nil
}
type GcsReader struct {
reader *storage.Reader
obj *storage.ObjectHandle
position int64
}
func (gcsReader *GcsReader) Read(p []byte) (n int, err error) {
n, err = gcsReader.reader.Read(p)
if err != nil {
return n, err
}
gcsReader.position = gcsReader.position + int64(n)
return n, nil
}
func (gcsReader *GcsReader) Close() error {
return gcsReader.reader.Close()
}
func (gcsReader *GcsReader) ReadAt(p []byte, off int64) (n int, err error) {
reader, err := gcsReader.obj.NewRangeReader(context.Background(), off, int64(len(p)))
if err != nil {
return 0, err
}
defer reader.Close()
return io.ReadFull(reader, p)
}
func (gcsReader *GcsReader) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = gcsReader.position + offset
case io.SeekEnd:
objectAttrs, err := gcsReader.obj.Attrs(context.Background())
if err != nil {
return 0, err
}
newOffset = objectAttrs.Size + offset
default:
return 0, merr.WrapErrIoFailedReason("invalid whence")
}
if newOffset < 0 {
return 0, merr.WrapErrIoFailedReason("negative offset")
}
// Reset the underlying reader to the new offset
newReader, err := gcsReader.obj.NewRangeReader(context.Background(), newOffset, -1)
if err != nil {
if gErr, ok := err.(*googleapi.Error); ok {
if gErr.Code == 416 {
newReader, _ = gcsReader.obj.NewRangeReader(context.Background(), 0, 0)
}
} else {
return 0, err
}
}
if gcsReader.reader != nil {
if err := gcsReader.reader.Close(); err != nil {
return 0, err
}
}
// Update the reader and the current position
gcsReader.reader = newReader
gcsReader.position = newOffset
return newOffset, nil
}
func getProjectId(gcpCredentialJSON string) (string, error) {
if gcpCredentialJSON == "" {
return "", errors.New("the JSON string is empty")
}
var data map[string]interface{}
if err := json.Unmarshal([]byte(gcpCredentialJSON), &data); err != nil {
return "", errors.New("failed to parse Google Cloud credentials as JSON")
}
propertyValue, ok := data["project_id"]
projectId := fmt.Sprintf("%v", propertyValue)
if !ok {
return "", errors.New("projectId doesn't exist")
}
return projectId, nil
}

View File

@ -0,0 +1,387 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGcpNativeObjectStorage(t *testing.T) {
ctx := context.Background()
bucketName := "test-bucket"
config := config{
address: "storage.gcs.127.0.0.1.nip.io:4443",
bucketName: bucketName,
createBucket: true,
useIAM: false,
cloudProvider: "gcpnative",
useSSL: false,
gcpNativeWithoutAuth: true,
}
t.Run("test initialize", func(t *testing.T) {
var err error
config.bucketName = ""
_, err = newGcpNativeObjectStorageWithConfig(ctx, &config)
assert.Error(t, err)
config.bucketName = bucketName
_, err = newGcpNativeObjectStorageWithConfig(ctx, &config)
assert.Equal(t, err, nil)
})
t.Run("test load", func(t *testing.T) {
testCM, err := newGcpNativeObjectStorageWithConfig(ctx, &config)
assert.Equal(t, err, nil)
defer testCM.DeleteBucket(ctx, config.bucketName)
prepareTests := []struct {
key string
value []byte
}{
{"abc", []byte("123")},
{"abcd", []byte("1234")},
{"key_1", []byte("111")},
{"key_2", []byte("222")},
{"key_3", []byte("333")},
}
for _, test := range prepareTests {
err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value),
int64(len(test.value)))
require.NoError(t, err)
}
loadTests := []struct {
isvalid bool
loadKey string
expectedValue []byte
description string
}{
{true, "abc", []byte("123"), "load valid key abc"},
{true, "abcd", []byte("1234"), "load valid key abcd"},
{true, "key_1", []byte("111"), "load valid key key_1"},
{true, "key_2", []byte("222"), "load valid key key_2"},
{true, "key_3", []byte("333"), "load valid key key_3"},
{false, "key_not_exist", []byte(""), "load invalid key key_not_exist"},
{false, "/", []byte(""), "load leading slash"},
}
for _, test := range loadTests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
got, err := testCM.GetObject(ctx, config.bucketName, test.loadKey, 0, 1024)
assert.NoError(t, err)
contentData, err := io.ReadAll(got)
assert.NoError(t, err)
assert.Equal(t, len(contentData), len(test.expectedValue))
assert.Equal(t, test.expectedValue, contentData)
statSize, err := testCM.StatObject(ctx, config.bucketName, test.loadKey)
assert.NoError(t, err)
assert.Equal(t, statSize, int64(len(contentData)))
_, err = testCM.GetObject(ctx, config.bucketName, test.loadKey, 1, 1023)
assert.NoError(t, err)
} else {
got, err := testCM.GetObject(ctx, config.bucketName, test.loadKey, 0, 1024)
assert.Error(t, err)
assert.Empty(t, got)
}
})
}
loadWithPrefixTests := []struct {
isvalid bool
prefix string
expectedValue [][]byte
description string
}{
{true, "abc", [][]byte{[]byte("123"), []byte("1234")}, "load with valid prefix abc"},
{true, "key_", [][]byte{[]byte("111"), []byte("222"), []byte("333")}, "load with valid prefix key_"},
{true, "prefix", [][]byte{}, "load with valid but not exist prefix prefix"},
}
for _, test := range loadWithPrefixTests {
t.Run(test.description, func(t *testing.T) {
gotk, _, err := listAllObjectsWithPrefixAtBucket(ctx, testCM, config.bucketName,
test.prefix, false)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
err := testCM.RemoveObject(ctx, config.bucketName, key)
assert.NoError(t, err)
}
})
}
})
t.Run("test list", func(t *testing.T) {
testCM, err := newGcpNativeObjectStorageWithConfig(ctx, &config)
assert.Equal(t, err, nil)
defer testCM.DeleteBucket(ctx, config.bucketName)
prepareTests := []struct {
valid bool
key string
value []byte
}{
{false, "abc/", []byte("123")},
{true, "abc/d/", []byte("1234")},
{false, "abc/d/e", []byte("12345")},
{true, "abc/e/d", []byte("12354")},
{true, "key_/1/1", []byte("111")},
{true, "key_/1/2/", []byte("222")},
{false, "key_/1/2/3", []byte("333")},
{true, "key_/2/3", []byte("333")},
{true, "key_/test.txt", []byte("333")},
}
for _, test := range prepareTests {
err := testCM.PutObject(ctx, config.bucketName, test.key, bytes.NewReader(test.value),
int64(len(test.value)))
require.Nil(t, err)
if !test.valid {
err := testCM.RemoveObject(ctx, config.bucketName, test.key)
require.Nil(t, err)
}
}
insertWithPrefixTests := []struct {
recursive bool
prefix string
expectedValue []string
}{
{true, "abc/", []string{"abc/e/d"}},
{true, "key_/", []string{"key_/1/1", "key_/2/3", "key_/test.txt"}},
{false, "abc/", []string{}},
{false, "key_/", []string{"key_/test.txt"}},
}
for _, test := range insertWithPrefixTests {
t.Run(fmt.Sprintf("prefix: %s, recursive: %t", test.prefix, test.recursive), func(t *testing.T) {
gotk, _, err := listAllObjectsWithPrefixAtBucket(ctx, testCM, config.bucketName,
test.prefix, test.recursive)
assert.NoError(t, err)
assert.Equal(t, len(test.expectedValue), len(gotk))
for _, key := range gotk {
assert.Contains(t, test.expectedValue, key)
}
})
}
})
}
func TestGcpNativeReadFile(t *testing.T) {
ctx := context.Background()
bucketName := "test-bucket"
c := &config{
address: "storage.gcs.127.0.0.1.nip.io:4443",
bucketName: bucketName,
createBucket: true,
useIAM: false,
cloudProvider: "gcpnative",
useSSL: false,
gcpNativeWithoutAuth: true,
}
rcm, err := NewRemoteChunkManager(ctx, c)
t.Run("Read", func(t *testing.T) {
filePath := "test-Read"
data := []byte("Test data for Read.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
buffer := make([]byte, 4)
n, err := reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
buffer = make([]byte, 6)
n, err = reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 6, n)
assert.Equal(t, " data ", string(buffer))
buffer = make([]byte, 40)
n, err = reader.Read(buffer)
assert.Error(t, err)
assert.Equal(t, 9, n)
assert.Equal(t, "for Read.", string(buffer[:9]))
})
t.Run("ReadAt", func(t *testing.T) {
filePath := "test-ReadAt"
data := []byte("Test data for ReadAt.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
buffer := make([]byte, 4)
n, err := reader.ReadAt(buffer, 5)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "data", string(buffer))
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
buffer = make([]byte, 4)
n, err = reader.ReadAt(buffer, 20)
assert.Error(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, ".", string(buffer[:1]))
buffer = make([]byte, 4)
n, err = reader.ReadAt(buffer, 25)
assert.Error(t, err)
assert.Equal(t, 0, n)
})
t.Run("Seek start", func(t *testing.T) {
filePath := "test-SeekStart"
data := []byte("Test data for Seek start.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
offset, err := reader.Seek(10, io.SeekStart)
assert.NoError(t, err)
assert.Equal(t, int64(10), offset)
buffer := make([]byte, 4)
n, err := reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "for ", string(buffer))
offset, err = reader.Seek(40, io.SeekStart)
assert.NoError(t, err)
assert.Equal(t, int64(40), offset)
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.Error(t, err)
assert.Equal(t, 0, n)
})
t.Run("Seek current", func(t *testing.T) {
filePath := "test-SeekCurrent"
data := []byte("Test data for Seek current.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
buffer := make([]byte, 4)
n, err := reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
offset, err := reader.Seek(10, io.SeekCurrent)
assert.NoError(t, err)
assert.Equal(t, int64(14), offset)
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Seek", string(buffer))
offset, err = reader.Seek(40, io.SeekCurrent)
assert.NoError(t, err)
assert.Equal(t, int64(58), offset)
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.Error(t, err)
assert.Equal(t, 0, n)
})
t.Run("Seek end", func(t *testing.T) {
filePath := "test-SeekEnd"
data := []byte("Test data for Seek end.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
buffer := make([]byte, 4)
n, err := reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
offset, err := reader.Seek(10, io.SeekEnd)
assert.NoError(t, err)
assert.Equal(t, int64(33), offset)
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.Error(t, err)
assert.Equal(t, 0, n)
offset, err = reader.Seek(10, 3) // Invalid whence
assert.Error(t, err)
assert.Equal(t, int64(0), offset)
})
t.Run("Close", func(t *testing.T) {
filePath := "test-Close"
data := []byte("Test data for Close.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
err = reader.Close()
assert.NoError(t, err)
})
}

View File

@ -2,20 +2,22 @@ package storage
// Option for setting params used by chunk manager client.
type config struct {
address string
bucketName string
accessKeyID string
secretAccessKeyID string
useSSL bool
sslCACert string
createBucket bool
rootPath string
useIAM bool
cloudProvider string
iamEndpoint string
useVirtualHost bool
region string
requestTimeoutMs int64
address string
bucketName string
accessKeyID string
secretAccessKeyID string
useSSL bool
sslCACert string
createBucket bool
rootPath string
useIAM bool
cloudProvider string
iamEndpoint string
useVirtualHost bool
region string
requestTimeoutMs int64
gcpCredentialJSON string
gcpNativeWithoutAuth bool // used for Unit Testing
}
func newDefaultConfig() *config {
@ -108,3 +110,9 @@ func RequestTimeout(requestTimeoutMs int64) Option {
c.requestTimeoutMs = requestTimeoutMs
}
}
func GcpCredentialJSON(gcpCredentialJSON string) Option {
return func(c *config) {
c.gcpCredentialJSON = gcpCredentialJSON
}
}

View File

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"io"
"net/http"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
@ -29,6 +30,7 @@ import (
"go.uber.org/zap"
"golang.org/x/exp/mmap"
"golang.org/x/sync/errgroup"
"google.golang.org/api/googleapi"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -38,11 +40,12 @@ import (
)
const (
CloudProviderGCP = "gcp"
CloudProviderAWS = "aws"
CloudProviderAliyun = "aliyun"
CloudProviderAzure = "azure"
CloudProviderTencent = "tencent"
CloudProviderGCP = "gcp"
CloudProviderGCPNative = "gcpnative"
CloudProviderAWS = "aws"
CloudProviderAliyun = "aliyun"
CloudProviderAzure = "azure"
CloudProviderTencent = "tencent"
)
// ChunkObjectWalkFunc is the callback function for walking objects.
@ -78,6 +81,8 @@ func NewRemoteChunkManager(ctx context.Context, c *config) (*RemoteChunkManager,
var err error
if c.cloudProvider == CloudProviderAzure {
client, err = newAzureObjectStorageWithConfig(ctx, c)
} else if c.cloudProvider == CloudProviderGCPNative {
client, err = newGcpNativeObjectStorageWithConfig(ctx, c)
} else {
client, err = newMinioObjectStorageWithConfig(ctx, c)
}
@ -403,6 +408,11 @@ func checkObjectStorageError(fileName string, err error) error {
return merr.WrapErrIoKeyNotFound(fileName, err.Error())
}
return merr.WrapErrIoFailed(fileName, err)
case *googleapi.Error:
if err.Code == http.StatusNotFound {
return merr.WrapErrIoKeyNotFound(fileName, err.Error())
}
return merr.WrapErrIoFailed(fileName, err)
}
if err == io.ErrUnexpectedEOF {
return merr.WrapErrIoUnexpectEOF(fileName, err)

View File

@ -52,6 +52,7 @@ func NewBuildIndexInfo(config *indexpb.StorageConfig) (*BuildIndexInfo, error) {
cRegion := C.CString(config.Region)
cCloudProvider := C.CString(config.CloudProvider)
cSslCACert := C.CString(config.SslCACert)
cGcpCredentialJSON := C.CString(config.GcpCredentialJSON)
defer C.free(unsafe.Pointer(cAddress))
defer C.free(unsafe.Pointer(cBucketName))
defer C.free(unsafe.Pointer(cAccessKey))
@ -62,21 +63,23 @@ func NewBuildIndexInfo(config *indexpb.StorageConfig) (*BuildIndexInfo, error) {
defer C.free(unsafe.Pointer(cRegion))
defer C.free(unsafe.Pointer(cCloudProvider))
defer C.free(unsafe.Pointer(cSslCACert))
defer C.free(unsafe.Pointer(cGcpCredentialJSON))
storageConfig := C.CStorageConfig{
address: cAddress,
bucket_name: cBucketName,
access_key_id: cAccessKey,
access_key_value: cAccessValue,
root_path: cRootPath,
storage_type: cStorageType,
iam_endpoint: cIamEndPoint,
cloud_provider: cCloudProvider,
useSSL: C.bool(config.UseSSL),
sslCACert: cSslCACert,
useIAM: C.bool(config.UseIAM),
region: cRegion,
useVirtualHost: C.bool(config.UseVirtualHost),
requestTimeoutMs: C.int64_t(config.RequestTimeoutMs),
address: cAddress,
bucket_name: cBucketName,
access_key_id: cAccessKey,
access_key_value: cAccessValue,
root_path: cRootPath,
storage_type: cStorageType,
iam_endpoint: cIamEndPoint,
cloud_provider: cCloudProvider,
useSSL: C.bool(config.UseSSL),
sslCACert: cSslCACert,
useIAM: C.bool(config.UseIAM),
region: cRegion,
useVirtualHost: C.bool(config.UseVirtualHost),
requestTimeoutMs: C.int64_t(config.RequestTimeoutMs),
gcp_credential_json: cGcpCredentialJSON,
}
status := C.NewBuildIndexInfo(&cBuildIndexInfo, storageConfig)

View File

@ -138,6 +138,7 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
cLogLevel := C.CString(params.MinioCfg.LogLevel.GetValue())
cRegion := C.CString(params.MinioCfg.Region.GetValue())
cSslCACert := C.CString(params.MinioCfg.SslCACert.GetValue())
cGcpCredentialJSON := C.CString(params.MinioCfg.GcpCredentialJSON.GetValue())
defer C.free(unsafe.Pointer(cAddress))
defer C.free(unsafe.Pointer(cBucketName))
defer C.free(unsafe.Pointer(cAccessKey))
@ -149,22 +150,24 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
defer C.free(unsafe.Pointer(cRegion))
defer C.free(unsafe.Pointer(cCloudProvider))
defer C.free(unsafe.Pointer(cSslCACert))
defer C.free(unsafe.Pointer(cGcpCredentialJSON))
storageConfig := C.CStorageConfig{
address: cAddress,
bucket_name: cBucketName,
access_key_id: cAccessKey,
access_key_value: cAccessValue,
root_path: cRootPath,
storage_type: cStorageType,
iam_endpoint: cIamEndPoint,
cloud_provider: cCloudProvider,
useSSL: C.bool(params.MinioCfg.UseSSL.GetAsBool()),
sslCACert: cSslCACert,
useIAM: C.bool(params.MinioCfg.UseIAM.GetAsBool()),
log_level: cLogLevel,
region: cRegion,
useVirtualHost: C.bool(params.MinioCfg.UseVirtualHost.GetAsBool()),
requestTimeoutMs: C.int64_t(params.MinioCfg.RequestTimeoutMs.GetAsInt64()),
address: cAddress,
bucket_name: cBucketName,
access_key_id: cAccessKey,
access_key_value: cAccessValue,
root_path: cRootPath,
storage_type: cStorageType,
iam_endpoint: cIamEndPoint,
cloud_provider: cCloudProvider,
useSSL: C.bool(params.MinioCfg.UseSSL.GetAsBool()),
sslCACert: cSslCACert,
useIAM: C.bool(params.MinioCfg.UseIAM.GetAsBool()),
log_level: cLogLevel,
region: cRegion,
useVirtualHost: C.bool(params.MinioCfg.UseVirtualHost.GetAsBool()),
requestTimeoutMs: C.int64_t(params.MinioCfg.RequestTimeoutMs.GetAsInt64()),
gcp_credential_json: cGcpCredentialJSON,
}
status := C.InitRemoteChunkManagerSingleton(storageConfig)

View File

@ -1115,6 +1115,7 @@ type MinioConfig struct {
RootPath ParamItem `refreshable:"false"`
UseIAM ParamItem `refreshable:"false"`
CloudProvider ParamItem `refreshable:"false"`
GcpCredentialJSON ParamItem `refreshable:"false"`
IAMEndpoint ParamItem `refreshable:"false"`
LogLevel ParamItem `refreshable:"false"`
Region ParamItem `refreshable:"false"`
@ -1255,16 +1256,29 @@ aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/lates
p.CloudProvider = ParamItem{
Key: "minio.cloudProvider",
DefaultValue: DefaultMinioCloudProvider,
Version: "2.2.0",
Version: "2.4.1",
Doc: `Cloud Provider of S3. Supports: "aws", "gcp", "aliyun".
Cloud Provider of Google Cloud Storage. Supports: "gcpnative".
You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio
You can use "gcp" for other cloud provider supports S3 API with signature v2
You can use "aliyun" for other cloud provider uses virtual host style bucket
You can use "gcpnative" for the Google Cloud Platform provider. Uses service account credentials
for authentication.
When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now`,
Export: true,
}
p.CloudProvider.Init(base.mgr)
p.GcpCredentialJSON = ParamItem{
Key: "minio.gcpCredentialJSON",
Version: "2.4.1",
DefaultValue: "",
Doc: `The JSON content contains the gcs service account credentials.
Used only for the "gcpnative" cloud provider.`,
Export: true,
}
p.GcpCredentialJSON.Init(base.mgr)
p.IAMEndpoint = ParamItem{
Key: "minio.iamEndpoint",
DefaultValue: DefaultMinioIAMEndpoint,

View File

@ -204,6 +204,8 @@ func TestServiceParam(t *testing.T) {
assert.Equal(t, Params.IAMEndpoint.GetValue(), "")
assert.Equal(t, Params.GcpCredentialJSON.GetValue(), "")
t.Logf("Minio BucketName = %s", Params.BucketName.GetValue())
t.Logf("Minio rootpath = %s", Params.RootPath.GetValue())

View File

@ -101,6 +101,7 @@ USE_ASAN="OFF"
USE_DYNAMIC_SIMD="ON"
USE_OPENDAL="OFF"
INDEX_ENGINE="KNOWHERE"
: "${ENABLE_GCP_NATIVE:="OFF"}"
while getopts "p:d:t:s:f:n:i:y:a:x:o:ulrcghzmebZ" arg; do
case $arg in
@ -256,7 +257,8 @@ ${CMAKE_EXTRA_ARGS} \
-DUSE_DYNAMIC_SIMD=${USE_DYNAMIC_SIMD} \
-DCPU_ARCH=${CPU_ARCH} \
-DUSE_OPENDAL=${USE_OPENDAL} \
-DINDEX_ENGINE=${INDEX_ENGINE} "
-DINDEX_ENGINE=${INDEX_ENGINE} \
-DENABLE_GCP_NATIVE=${ENABLE_GCP_NATIVE} "
if [ -z "$BUILD_WITHOUT_AZURE" ]; then
CMAKE_CMD=${CMAKE_CMD}"-DAZURE_BUILD_DIR=${AZURE_BUILD_DIR} \
-DVCPKG_TARGET_TRIPLET=${VCPKG_TARGET_TRIPLET} "