Support aliyun OSS for chunk manager (#22663) (#22842)

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
pull/22858/head
zhagnlu 2023-03-20 14:45:57 +08:00 committed by GitHub
parent 3202eb0d9f
commit 7c633e9b9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 678 additions and 19 deletions

View File

@ -0,0 +1,184 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/core/config/AWSProfileConfigLoader.h>
#include <aws/core/platform/Environment.h>
#include <aws/core/platform/FileSystem.h>
#include <aws/core/utils/logging/LogMacros.h>
#include <aws/core/utils/StringUtils.h>
#include <aws/core/utils/FileSystemUtils.h>
#include <aws/core/client/SpecifiedRetryableErrorsRetryStrategy.h>
#include <aws/core/utils/UUID.h>
#include <cstdlib>
#include <fstream>
#include <string.h>
#include <climits>
#include "AliyunCredentialsProvider.h"
static const char STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG[] =
"AliyunSTSAssumeRoleWebIdentityCredentialsProvider"; // [aliyun]
static const int STS_CREDENTIAL_PROVIDER_EXPIRATION_GRACE_PERIOD =
180 *
1000; // [aliyun] supports 1800 second at most, here we use their default: 180s -> 180*1k ms
namespace Aws {
namespace Auth {
AliyunSTSAssumeRoleWebIdentityCredentialsProvider::
AliyunSTSAssumeRoleWebIdentityCredentialsProvider()
: m_initialized(false) {
// check environment variables
// not need in [aliyun]
// Aws::String tmpRegion = Aws::Environment::GetEnv("AWS_DEFAULT_REGION");
m_roleArn = Aws::Environment::GetEnv("ALIBABA_CLOUD_ROLE_ARN"); // [aliyun]
m_tokenFile =
Aws::Environment::GetEnv("ALIBABA_CLOUD_OIDC_TOKEN_FILE"); // [aliyun]
// optional, not existed in [aliyun]
m_sessionName = Aws::Environment::GetEnv("ALIBABA_CLOUD_ROLE_SESSION_NAME");
// check profile_config if either m_roleArn or m_tokenFile is not loaded from environment variable
// region source is not enforced, but we need it to construct sts endpoint, if we can't find from environment, we
// should check if it's set in config file.
if (m_roleArn.empty() ||
m_tokenFile.empty()) { // || tmpRegion.empty() not need in [aliyun]
auto profile = Aws::Config::GetCachedConfigProfile(
Aws::Auth::GetConfigProfileName());
// If either of these two were not found from environment, use whatever found for all three in config file
if (m_roleArn.empty() || m_tokenFile.empty()) {
m_roleArn = profile.GetRoleArn();
m_tokenFile = profile.GetValue("web_identity_token_file");
m_sessionName = profile.GetValue("role_session_name");
}
}
if (m_tokenFile.empty()) {
AWS_LOGSTREAM_WARN(STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"Token file must be specified to use STS AssumeRole "
"web identity creds provider.");
return; // No need to do further constructing
} else {
AWS_LOGSTREAM_DEBUG(STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"Resolved token_file from profile_config or "
"environment variable to be "
<< m_tokenFile);
}
if (m_roleArn.empty()) {
AWS_LOGSTREAM_WARN(STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"RoleArn must be specified to use STS AssumeRole "
"web identity creds provider.");
return; // No need to do further constructing
} else {
AWS_LOGSTREAM_DEBUG(STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"Resolved role_arn from profile_config or "
"environment variable to be "
<< m_roleArn);
}
// not need in [aliyun]
// if (tmpRegion.empty())
// {
// tmpRegion = Aws::Region::US_EAST_1;
// }
// else
// {
// AWS_LOGSTREAM_DEBUG(STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG, "Resolved region from profile_config or environment
// variable to be " << tmpRegion);
// }
if (m_sessionName.empty()) {
m_sessionName = Aws::Utils::UUID::RandomUUID();
} else {
AWS_LOGSTREAM_DEBUG(STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"Resolved session_name from profile_config or "
"environment variable to be "
<< m_sessionName);
}
Aws::Client::ClientConfiguration config;
config.scheme = Aws::Http::Scheme::HTTPS;
// not need in [aliyun]
// config.region = tmpRegion;
Aws::Vector<Aws::String> retryableErrors;
retryableErrors.push_back("IDPCommunicationError");
retryableErrors.push_back("InvalidIdentityToken");
config.retryStrategy =
Aws::MakeShared<Aws::Client::SpecifiedRetryableErrorsRetryStrategy>(
STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
retryableErrors,
3 /*maxRetries*/);
m_client = Aws::MakeUnique<Aws::Internal::AliyunSTSCredentialsClient>(
STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG, config);
m_initialized = true;
AWS_LOGSTREAM_INFO(
STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"Creating STS AssumeRole with web identity creds provider.");
}
Aws::Auth::AWSCredentials
AliyunSTSAssumeRoleWebIdentityCredentialsProvider::GetAWSCredentials() {
// A valid client means required information like role arn and token file were constructed correctly.
// We can use this provider to load creds, otherwise, we can just return empty creds.
if (!m_initialized) {
return Aws::Auth::AWSCredentials();
}
RefreshIfExpired();
Aws::Utils::Threading::ReaderLockGuard guard(m_reloadLock);
return m_credentials;
}
void
AliyunSTSAssumeRoleWebIdentityCredentialsProvider::Reload() {
AWS_LOGSTREAM_INFO(
STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"Credentials have expired, attempting to renew from STS.");
Aws::IFStream tokenFile(m_tokenFile.c_str());
if (tokenFile) {
Aws::String token((std::istreambuf_iterator<char>(tokenFile)),
std::istreambuf_iterator<char>());
m_token = token;
} else {
AWS_LOGSTREAM_ERROR(STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"Can't open token file: " << m_tokenFile);
return;
}
Aws::Internal::AliyunSTSCredentialsClient::
STSAssumeRoleWithWebIdentityRequest request{
m_sessionName, m_roleArn, m_token};
auto result = m_client->GetAssumeRoleWithWebIdentityCredentials(request);
AWS_LOGSTREAM_TRACE(
STS_ASSUME_ROLE_WEB_IDENTITY_LOG_TAG,
"Successfully retrieved credentials with AWS_ACCESS_KEY: "
<< result.creds.GetAWSAccessKeyId());
m_credentials = result.creds;
}
bool
AliyunSTSAssumeRoleWebIdentityCredentialsProvider::ExpiresSoon() const {
return (
(m_credentials.GetExpiration() - Aws::Utils::DateTime::Now()).count() <
STS_CREDENTIAL_PROVIDER_EXPIRATION_GRACE_PERIOD);
}
void
AliyunSTSAssumeRoleWebIdentityCredentialsProvider::RefreshIfExpired() {
Aws::Utils::Threading::ReaderLockGuard guard(m_reloadLock);
if (!m_credentials.IsEmpty() && !ExpiresSoon()) {
return;
}
guard.UpgradeToWriterLock();
if (!m_credentials.IsExpiredOrEmpty() && !ExpiresSoon()) {
return;
}
Reload();
}
} // namespace Auth
}; // namespace Aws

View File

@ -0,0 +1,55 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#pragma once
#include <aws/core/Core_EXPORTS.h>
#include <aws/core/utils/DateTime.h>
#include <aws/core/utils/memory/stl/AWSString.h>
#include <aws/core/internal/AWSHttpResourceClient.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <memory>
#include "AliyunSTSClient.h"
namespace Aws {
namespace Auth {
/**
* To support retrieving credentials of STS AssumeRole with web identity.
* Note that STS accepts request with protocol of queryxml. Calling GetAWSCredentials() will trigger (if expired)
* a query request using AWSHttpResourceClient under the hood.
*/
class AWS_CORE_API AliyunSTSAssumeRoleWebIdentityCredentialsProvider
: public AWSCredentialsProvider {
public:
AliyunSTSAssumeRoleWebIdentityCredentialsProvider();
/**
* Retrieves the credentials if found, otherwise returns empty credential set.
*/
AWSCredentials
GetAWSCredentials() override;
protected:
void
Reload() override;
private:
void
RefreshIfExpired();
Aws::String
CalculateQueryString() const;
Aws::UniquePtr<Aws::Internal::AliyunSTSCredentialsClient> m_client;
Aws::Auth::AWSCredentials m_credentials;
Aws::String m_roleArn;
Aws::String m_tokenFile;
Aws::String m_sessionName;
Aws::String m_token;
bool m_initialized;
bool
ExpiresSoon() const;
};
} // namespace Auth
} // namespace Aws

View File

@ -0,0 +1,208 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/core/internal/AWSHttpResourceClient.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/http/HttpClient.h>
#include <aws/core/http/HttpClientFactory.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/logging/LogMacros.h>
#include <aws/core/utils/StringUtils.h>
#include <aws/core/utils/HashingUtils.h>
#include <aws/core/platform/Environment.h>
#include <aws/core/client/AWSError.h>
#include <aws/core/client/CoreErrors.h>
#include <aws/core/utils/xml/XmlSerializer.h>
#include <limits.h>
#include <mutex>
#include <sstream>
#include <random>
#include "AliyunSTSClient.h"
namespace Aws {
namespace Http {
class HttpClient;
class HttpRequest;
enum class HttpResponseCode;
} // namespace Http
namespace Client {
Aws::String
ComputeUserAgentString();
}
namespace Internal {
static const char STS_RESOURCE_CLIENT_LOG_TAG[] =
"AliyunSTSResourceClient"; // [aliyun]
int
IntRand(const int& min, const int& max) {
static thread_local std::mt19937 generator;
std::uniform_int_distribution<int> distribution(min, max);
return distribution(generator);
}
AliyunSTSCredentialsClient::AliyunSTSCredentialsClient(
const Aws::Client::ClientConfiguration& clientConfiguration)
: AWSHttpResourceClient(clientConfiguration, STS_RESOURCE_CLIENT_LOG_TAG) {
m_aliyunOidcProviderArn =
Aws::Environment::GetEnv("ALIBABA_CLOUD_OIDC_PROVIDER_ARN");
if (m_aliyunOidcProviderArn.empty()) {
AWS_LOGSTREAM_WARN(STS_RESOURCE_CLIENT_LOG_TAG,
"oidc role arn must be specified to use STS "
"AssumeRole web identity creds provider.");
return;
}
SetErrorMarshaller(Aws::MakeUnique<Aws::Client::XmlErrorMarshaller>(
STS_RESOURCE_CLIENT_LOG_TAG));
// [aliyun]
m_endpoint = "https://sts.aliyuncs.com";
AWS_LOGSTREAM_INFO(
STS_RESOURCE_CLIENT_LOG_TAG,
"Creating STS ResourceClient with endpoint: " << m_endpoint);
}
AliyunSTSCredentialsClient::STSAssumeRoleWithWebIdentityResult
AliyunSTSCredentialsClient::GetAssumeRoleWithWebIdentityCredentials(
const STSAssumeRoleWithWebIdentityRequest& request) {
// Calculate query string
Aws::StringStream ss;
// [aliyun]
// linux curl example:
// Action=AssumeRoleWithOIDC
// Timestamp=`date -Iseconds`
// Version="2015-04-01"
// SignatureNonce=`$RANDOM`
// RoleSessionName="default_session"
// RoleArn=$ALIBABA_CLOUD_ROLE_ARN
// OIDCProviderArn=$ALIBABA_CLOUD_OIDC_PROVIDER_ARN
// OIDCToken=`cat $ALIBABA_CLOUD_OIDC_TOKEN_FILE`
// curl "https://sts.aliyuncs.com?Action=$Action&Timestamp=$time" \
// -H "Host: sts.aliyuncs.com" \
// -H "Accept-Encoding: identity" \
// -H "SignatureNonce: $SignatureNonce" \
// -d
// "RoleArn=$RoleArn&OIDCProviderArn=$OIDCProviderArn&OIDCToken=$OIDCToken&RoleSessionName=$RoleSessionName&Version=$Version"
ss << "Action=AssumeRoleWithOIDC"
<< "&Timestamp=" /*iso8601*/
<< Aws::Utils::StringUtils::URLEncode(
Aws::Utils::DateTime::Now()
.ToGmtString(Aws::Utils::DateFormat::ISO_8601)
.c_str())
<< "&Version=2015-04-01"
<< "&SignatureNonce="
<< Aws::Utils::HashingUtils::HashString(
Aws::Utils::StringUtils::to_string(IntRand(0, INT_MAX)).c_str())
<< "&RoleSessionName="
<< Aws::Utils::StringUtils::URLEncode(request.roleSessionName.c_str())
<< "&RoleArn="
<< Aws::Utils::StringUtils::URLEncode(request.roleArn.c_str())
<< "&OIDCProviderArn="
<< Aws::Utils::StringUtils::URLEncode(m_aliyunOidcProviderArn.c_str())
<< "&OIDCToken="
<< Aws::Utils::StringUtils::URLEncode(request.webIdentityToken.c_str());
std::shared_ptr<Aws::Http::HttpRequest> httpRequest(
Aws::Http::CreateHttpRequest(
m_endpoint,
Aws::Http::HttpMethod::HTTP_POST,
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
httpRequest->SetUserAgent(Aws::Client::ComputeUserAgentString());
std::shared_ptr<Aws::IOStream> body =
Aws::MakeShared<Aws::StringStream>("STS_RESOURCE_CLIENT_LOG_TAG");
*body << ss.str();
httpRequest->AddContentBody(body);
body->seekg(0, body->end);
auto streamSize = body->tellg();
body->seekg(0, body->beg);
Aws::StringStream contentLength;
contentLength << streamSize;
httpRequest->SetContentLength(contentLength.str());
httpRequest->SetContentType("application/x-www-form-urlencoded");
Aws::String credentialsStr =
GetResourceWithAWSWebServiceResult(httpRequest).GetPayload();
// Parse credentials
STSAssumeRoleWithWebIdentityResult result;
if (credentialsStr.empty()) {
AWS_LOGSTREAM_WARN(STS_RESOURCE_CLIENT_LOG_TAG,
"Get an empty credential from sts");
return result;
}
// [aliyun] output example
// <?xml version='1.0' encoding='UTF-8'?>
// <AssumeRoleWithOIDCResponse>
// <RequestId>D94AFCC3-54CA-508C-B6AF-21481E761BDB</RequestId>
// <OIDCTokenInfo>
// <Issuer>https://oidc-ack-cn-shanghai.oss-cn-shanghai.aliyuncs.com/c532c4ce5e84048a1972535df283f737d</Issuer>
// <Subject>system:serviceaccount:default:my-release</Subject>
// <ClientIds>sts.aliyuncs.com</ClientIds>
// </OIDCTokenInfo>
// <AssumedRoleUser>
// <Arn>acs:ram::1413891078881348:role/vdc-poc-milvus/default</Arn>
// <AssumedRoleId>383373758575348335:default</AssumedRoleId>
// </AssumedRoleUser>
// <Credentials>
// <SecurityToken>xxx</SecurityToken>
// <AccessKeyId>xxx</AccessKeyId>
// <AccessKeySecret>xxx</AccessKeySecret>
// <Expiration>2023-03-02T07:39:09Z</Expiration>
// </Credentials>
// </AssumeRoleWithOIDCResponse>
const Utils::Xml::XmlDocument xmlDocument =
Utils::Xml::XmlDocument::CreateFromXmlString(credentialsStr);
Utils::Xml::XmlNode rootNode = xmlDocument.GetRootElement();
Utils::Xml::XmlNode resultNode = rootNode;
if (!rootNode.IsNull() &&
(rootNode.GetName() != "AssumeRoleWithOIDCResponse")) {
resultNode =
rootNode.FirstChild("AssumeRoleWithOIDCResponse"); // [aliyun]
}
if (!resultNode.IsNull()) {
Utils::Xml::XmlNode credentialsNode =
resultNode.FirstChild("Credentials");
if (!credentialsNode.IsNull()) {
Utils::Xml::XmlNode accessKeyIdNode =
credentialsNode.FirstChild("AccessKeyId");
if (!accessKeyIdNode.IsNull()) {
result.creds.SetAWSAccessKeyId(accessKeyIdNode.GetText());
}
Utils::Xml::XmlNode secretAccessKeyNode =
credentialsNode.FirstChild("AccessKeySecret"); // [aliyun]
if (!secretAccessKeyNode.IsNull()) {
result.creds.SetAWSSecretKey(secretAccessKeyNode.GetText());
}
Utils::Xml::XmlNode sessionTokenNode =
credentialsNode.FirstChild("SecurityToken"); // [aliyun]
if (!sessionTokenNode.IsNull()) {
result.creds.SetSessionToken(sessionTokenNode.GetText());
}
Utils::Xml::XmlNode expirationNode =
credentialsNode.FirstChild("Expiration");
if (!expirationNode.IsNull()) {
result.creds.SetExpiration(
Aws::Utils::DateTime(Aws::Utils::StringUtils::Trim(
expirationNode.GetText().c_str())
.c_str(),
Aws::Utils::DateFormat::ISO_8601));
}
}
}
return result;
}
} // namespace Internal
} // namespace Aws

View File

@ -0,0 +1,70 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#pragma once
#include <aws/core/Core_EXPORTS.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/utils/memory/stl/AWSString.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/client/AWSErrorMarshaller.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/AmazonWebServiceResult.h>
#include <aws/core/utils/DateTime.h>
#include <aws/core/internal/AWSHttpResourceClient.h> // [aliyun] import original http client
#include <memory>
#include <mutex>
namespace Aws {
namespace Http {
class HttpClient;
class HttpRequest;
enum class HttpResponseCode;
} // namespace Http
namespace Internal {
/**
* To support retrieving credentials from STS.
* Note that STS accepts request with protocol of queryxml. Calling GetResource() will trigger
* a query request using AWSHttpResourceClient under the hood.
*/
class AWS_CORE_API AliyunSTSCredentialsClient : public AWSHttpResourceClient {
public:
/**
* Initializes the provider to retrieve credentials from STS when it expires.
*/
explicit AliyunSTSCredentialsClient(
const Client::ClientConfiguration& clientConfiguration);
AliyunSTSCredentialsClient&
operator=(AliyunSTSCredentialsClient& rhs) = delete;
AliyunSTSCredentialsClient(const AliyunSTSCredentialsClient& rhs) = delete;
AliyunSTSCredentialsClient&
operator=(AliyunSTSCredentialsClient&& rhs) = delete;
AliyunSTSCredentialsClient(const AliyunSTSCredentialsClient&& rhs) = delete;
// If you want to make an AssumeRoleWithWebIdentity call to sts. use these classes to pass data to and get info from
// AliyunSTSCredentialsClient client. If you want to make an AssumeRole call to sts, define the request/result
// members class/struct like this.
struct STSAssumeRoleWithWebIdentityRequest {
Aws::String roleSessionName;
Aws::String roleArn;
Aws::String webIdentityToken;
};
struct STSAssumeRoleWithWebIdentityResult {
Aws::Auth::AWSCredentials creds;
};
STSAssumeRoleWithWebIdentityResult
GetAssumeRoleWithWebIdentityCredentials(
const STSAssumeRoleWithWebIdentityRequest& request);
private:
Aws::String m_endpoint;
Aws::String m_aliyunOidcProviderArn; // [aliyun]
};
} // namespace Internal
} // namespace Aws

View File

@ -43,6 +43,8 @@ if(BUILD_DISK_ANN STREQUAL "ON")
${STORAGE_FILES}
LocalChunkManager.cpp
MinioChunkManager.cpp
AliyunSTSClient.cpp
AliyunCredentialsProvider.cpp
DiskFileManagerImpl.cpp)
endif()

View File

@ -30,6 +30,8 @@
#include <fstream>
#include "storage/AliyunSTSClient.h"
#include "storage/AliyunCredentialsProvider.h"
#include "exceptions/EasyAssert.h"
#include "log/Log.h"
@ -72,7 +74,7 @@ ConvertFromAwsString(const Aws::String& aws_str) {
}
void
MinioChunkManager::InitSDKAPI() {
MinioChunkManager::InitSDKAPI(RemoteStorageType type) {
std::scoped_lock lock{client_mutex_};
const size_t initCount = init_count_++;
if (initCount == 0) {
@ -89,20 +91,10 @@ MinioChunkManager::ShutdownSDKAPI() {
}
}
MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config)
: default_bucket_name_(storage_config.bucket_name) {
InitSDKAPI();
Aws::Client::ClientConfiguration config;
config.endpointOverride = ConvertToAwsString(storage_config.address);
if (storage_config.useSSL) {
config.scheme = Aws::Http::Scheme::HTTPS;
config.verifySSL = true;
} else {
config.scheme = Aws::Http::Scheme::HTTP;
config.verifySSL = false;
}
void
MinioChunkManager::BuildS3Client(
const StorageConfig& storage_config,
const Aws::Client::ClientConfiguration& config) {
if (storage_config.useIAM) {
auto provider =
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
@ -133,6 +125,60 @@ MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config)
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
false);
}
}
void
MinioChunkManager::BuildAliyunCloudClient(
const StorageConfig& storage_config,
const Aws::Client::ClientConfiguration& config) {
if (storage_config.useIAM) {
auto aliyun_provider = Aws::MakeShared<
Aws::Auth::AliyunSTSAssumeRoleWebIdentityCredentialsProvider>(
"AliyunSTSAssumeRoleWebIdentityCredentialsProvider");
auto aliyun_credentials = aliyun_provider->GetAWSCredentials();
AssertInfo(!aliyun_credentials.GetAWSAccessKeyId().empty(),
"if use iam, access key id should not be empty");
AssertInfo(!aliyun_credentials.GetAWSSecretKey().empty(),
"if use iam, secret key should not be empty");
AssertInfo(!aliyun_credentials.GetSessionToken().empty(),
"if use iam, token should not be empty");
client_ = std::make_shared<Aws::S3::S3Client>(
aliyun_provider,
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
true);
} else {
throw std::runtime_error("aliyun cloud only support iam mode now");
}
}
MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config)
: default_bucket_name_(storage_config.bucket_name) {
RemoteStorageType storageType;
if (storage_config.address.find("aliyun") != std::string::npos) {
storageType = RemoteStorageType::ALIYUN_CLOUD;
} else {
storageType = RemoteStorageType::S3;
}
InitSDKAPI(storageType);
Aws::Client::ClientConfiguration config;
config.endpointOverride = ConvertToAwsString(storage_config.address);
if (storage_config.useSSL) {
config.scheme = Aws::Http::Scheme::HTTPS;
config.verifySSL = true;
} else {
config.scheme = Aws::Http::Scheme::HTTP;
config.verifySSL = false;
}
if (storageType == RemoteStorageType::S3) {
BuildS3Client(storage_config, config);
} else if (storageType == RemoteStorageType::ALIYUN_CLOUD) {
BuildAliyunCloudClient(storage_config, config);
}
// TODO ::BucketExist and CreateBucket func not work, should be fixed
// index node has already tried to create bucket when receive index task if bucket not exist

View File

@ -30,6 +30,8 @@
namespace milvus::storage {
enum class RemoteStorageType { S3 = 0, GOOGLE_CLOUD = 1, ALIYUN_CLOUD = 2 };
/**
* @brief This MinioChunkManager is responsible for read and write file in S3.
*/
@ -107,7 +109,7 @@ class MinioChunkManager : public RemoteChunkManager {
std::vector<std::string>
ListBuckets();
private:
public:
bool
ObjectExists(const std::string& bucket_name,
const std::string& object_name);
@ -130,9 +132,15 @@ class MinioChunkManager : public RemoteChunkManager {
std::vector<std::string>
ListObjects(const char* bucket_name, const char* prefix = nullptr);
void
InitSDKAPI();
InitSDKAPI(RemoteStorageType type);
void
ShutdownSDKAPI();
void
BuildS3Client(const StorageConfig& storage_config,
const Aws::Client::ClientConfiguration& config);
void
BuildAliyunCloudClient(const StorageConfig& storage_config,
const Aws::Client::ClientConfiguration& config);
private:
const Aws::SDKOptions sdk_options_;

View File

@ -30,8 +30,51 @@ class MinioChunkManagerTest : public testing::Test {
virtual void
SetUp() {
chunk_manager_ =
std::make_unique<MinioChunkManager>(get_default_storage_config());
chunk_manager_ = std::make_unique<MinioChunkManager>(get_default_storage_config());
}
protected:
MinioChunkManagerPtr chunk_manager_;
};
StorageConfig
get_google_cloud_storage_config() {
auto endpoint = "storage.googleapis.com:443";
auto accessKey = "";
auto accessValue = "";
auto rootPath = "files";
auto useSSL = true;
auto useIam = true;
auto iamEndPoint = "";
auto bucketName = "gcp-zilliz-infra-test";
return StorageConfig{endpoint, bucketName, accessKey, accessValue, rootPath, "minio", iamEndPoint, useSSL, useIam};
}
StorageConfig
get_aliyun_cloud_storage_config() {
auto endpoint = "oss-cn-shanghai.aliyuncs.com:443";
auto accessKey = "";
auto accessValue = "";
auto rootPath = "files";
auto useSSL = true;
auto useIam = true;
auto iamEndPoint = "";
auto bucketName = "vdc-infra-poc";
return StorageConfig{endpoint, bucketName, accessKey, accessValue, rootPath, "minio", iamEndPoint, useSSL, useIam};
}
class AliyunChunkManagerTest : public testing::Test {
public:
AliyunChunkManagerTest() {
}
~AliyunChunkManagerTest() {
}
virtual void
SetUp() {
chunk_manager_ = std::make_unique<MinioChunkManager>(get_aliyun_cloud_storage_config());
}
protected:
@ -220,3 +263,46 @@ TEST_F(MinioChunkManagerTest, ListWithPrefixPositive) {
chunk_manager_->Remove(path3);
chunk_manager_->DeleteBucket(testBucketName);
}
TEST_F(AliyunChunkManagerTest, ReadPositive) {
string testBucketName = "vdc-infra-poc";
chunk_manager_->SetBucketName(testBucketName);
EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
string path = "1/4/6";
chunk_manager_->Write(path, data, sizeof(data));
bool exist = chunk_manager_->Exist(path);
EXPECT_EQ(exist, true);
auto size = chunk_manager_->Size(path);
EXPECT_EQ(size, 5);
uint8_t readdata[20] = {0};
size = chunk_manager_->Read(path, readdata, 20);
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x45);
EXPECT_EQ(readdata[3], 0x34);
EXPECT_EQ(readdata[4], 0x23);
size = chunk_manager_->Read(path, readdata, 3);
EXPECT_EQ(size, 3);
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x45);
uint8_t dataWithNULL[] = {0x17, 0x32, 0x00, 0x34, 0x23};
chunk_manager_->Write(path, dataWithNULL, sizeof(dataWithNULL));
exist = chunk_manager_->Exist(path);
EXPECT_EQ(exist, true);
size = chunk_manager_->Size(path);
EXPECT_EQ(size, 5);
size = chunk_manager_->Read(path, readdata, 20);
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x00);
EXPECT_EQ(readdata[3], 0x34);
EXPECT_EQ(readdata[4], 0x23);
chunk_manager_->Remove(path);
}