Refine minio chunks manager (#27510)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/27543/head
Xiaofan 2023-10-13 14:15:35 +08:00 committed by GitHub
parent 82b2edc4bd
commit d83869aaeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 210 additions and 174 deletions

View File

@ -35,18 +35,6 @@
#include "log/Log.h"
#include "signal.h"
#define THROWS3ERROR(FUNCTION) \
do { \
auto& err = outcome.GetError(); \
std::stringstream err_msg; \
err_msg << "Error:" << #FUNCTION \
<< "[errcode:" << int(err.GetResponseCode()) \
<< ", exception:" << err.GetExceptionName() \
<< ", errmessage:" << err.GetMessage() << "]"; \
throw SegcoreError(S3Error, err_msg.str()); \
} while (0)
#define S3NoSuchBucket "NoSuchBucket"
namespace milvus::storage {
std::atomic<size_t> MinioChunkManager::init_count_(0);
@ -359,12 +347,6 @@ MinioChunkManager::ListWithPrefix(const std::string& filepath) {
uint64_t
MinioChunkManager::Read(const std::string& filepath, void* buf, uint64_t size) {
if (!ObjectExists(default_bucket_name_, filepath)) {
std::stringstream err_msg;
err_msg << "object('" << default_bucket_name_ << "', '" << filepath
<< "') not exists";
throw SegcoreError(ObjectNotExist, err_msg.str());
}
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
}
@ -377,28 +359,19 @@ MinioChunkManager::Write(const std::string& filepath,
bool
MinioChunkManager::BucketExists(const std::string& bucket_name) {
// auto outcome = client_->ListBuckets();
// if (!outcome.IsSuccess()) {
// THROWS3ERROR(BucketExists);
// }
// for (auto&& b : outcome.GetResult().GetBuckets()) {
// if (ConvertFromAwsString(b.GetName()) == bucket_name) {
// return true;
// }
// }
Aws::S3::Model::HeadBucketRequest request;
request.SetBucket(bucket_name.c_str());
auto outcome = client_->HeadBucket(request);
if (!outcome.IsSuccess()) {
auto error = outcome.GetError();
if (!error.GetExceptionName().empty()) {
std::stringstream err_msg;
err_msg << "Error: BucketExists: "
<< error.GetExceptionName() + " - " + error.GetMessage();
throw SegcoreError(S3Error, err_msg.str());
const auto& err = outcome.GetError();
auto error_type = err.GetErrorType();
// only throw if the error is not nosuchbucket
// if bucket not exist, HeadBucket return errorType RESOURCE_NOT_FOUND
if (error_type != Aws::S3::S3Errors::NO_SUCH_BUCKET &&
error_type != Aws::S3::S3Errors::RESOURCE_NOT_FOUND) {
ThrowS3Error("BucketExists", err, "params, bucket={}", bucket_name);
}
return false;
}
@ -411,7 +384,8 @@ MinioChunkManager::ListBuckets() {
auto outcome = client_->ListBuckets();
if (!outcome.IsSuccess()) {
THROWS3ERROR(CreateBucket);
const auto& err = outcome.GetError();
ThrowS3Error("ListBuckets", err, "params");
}
for (auto&& b : outcome.GetResult().GetBuckets()) {
buckets.emplace_back(b.GetName().c_str());
@ -426,10 +400,13 @@ MinioChunkManager::CreateBucket(const std::string& bucket_name) {
auto outcome = client_->CreateBucket(request);
if (!outcome.IsSuccess() &&
Aws::S3::S3Errors(outcome.GetError().GetErrorType()) !=
if (!outcome.IsSuccess()) {
const auto& err = outcome.GetError();
if (err.GetErrorType() !=
Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) {
THROWS3ERROR(CreateBucket);
ThrowS3Error("CreateBucket", err, "params, bucket={}", bucket_name);
}
return false;
}
return true;
}
@ -442,9 +419,11 @@ MinioChunkManager::DeleteBucket(const std::string& bucket_name) {
auto outcome = client_->DeleteBucket(request);
if (!outcome.IsSuccess()) {
auto err = outcome.GetError();
if (err.GetExceptionName() != S3NoSuchBucket) {
THROWS3ERROR(DeleteBucket);
const auto& err = outcome.GetError();
auto error_type = err.GetErrorType();
if (error_type != Aws::S3::S3Errors::NO_SUCH_BUCKET &&
error_type != Aws::S3::S3Errors::RESOURCE_NOT_FOUND) {
ThrowS3Error("DeleteBucket", err, "params, bucket={}", bucket_name);
}
return false;
}
@ -461,11 +440,13 @@ MinioChunkManager::ObjectExists(const std::string& bucket_name,
auto outcome = client_->HeadObject(request);
if (!outcome.IsSuccess()) {
auto& err = outcome.GetError();
if (!err.GetExceptionName().empty()) {
std::stringstream err_msg;
err_msg << "Error: ObjectExists: " << err.GetMessage();
throw SegcoreError(S3Error, err_msg.str());
const auto& err = outcome.GetError();
if (!IsNotFound(err.GetErrorType())) {
ThrowS3Error("ObjectExists",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return false;
}
@ -481,7 +462,12 @@ MinioChunkManager::GetObjectSize(const std::string& bucket_name,
auto outcome = client_->HeadObject(request);
if (!outcome.IsSuccess()) {
THROWS3ERROR(GetObjectSize);
const auto& err = outcome.GetError();
ThrowS3Error("GetObjectSize",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return outcome.GetResult().GetContentLength();
}
@ -496,11 +482,15 @@ MinioChunkManager::DeleteObject(const std::string& bucket_name,
auto outcome = client_->DeleteObject(request);
if (!outcome.IsSuccess()) {
// auto err = outcome.GetError();
// std::stringstream err_msg;
// err_msg << "Error: DeleteObject:" << err.GetMessage();
// throw S3ErrorException(err_msg.str());
THROWS3ERROR(DeleteObject);
const auto& err = outcome.GetError();
if (!IsNotFound(err.GetErrorType())) {
ThrowS3Error("DeleteObject",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return false;
}
return true;
}
@ -523,7 +513,12 @@ MinioChunkManager::PutObjectBuffer(const std::string& bucket_name,
auto outcome = client_->PutObject(request);
if (!outcome.IsSuccess()) {
THROWS3ERROR(PutObjectBuffer);
const auto& err = outcome.GetError();
ThrowS3Error("PutObjectBuffer",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return true;
}
@ -591,24 +586,35 @@ MinioChunkManager::GetObjectBuffer(const std::string& bucket_name,
auto outcome = client_->GetObject(request);
if (!outcome.IsSuccess()) {
THROWS3ERROR(GetObjectBuffer);
const auto& err = outcome.GetError();
ThrowS3Error("GetObjectBuffer",
err,
"params, bucket={}, object={}",
bucket_name,
object_name);
}
return size;
}
std::vector<std::string>
MinioChunkManager::ListObjects(const char* bucket_name, const char* prefix) {
MinioChunkManager::ListObjects(const std::string& bucket_name,
const std::string& prefix) {
std::vector<std::string> objects_vec;
Aws::S3::Model::ListObjectsRequest request;
request.WithBucket(bucket_name);
if (prefix != nullptr) {
if (prefix != "") {
request.SetPrefix(prefix);
}
auto outcome = client_->ListObjects(request);
if (!outcome.IsSuccess()) {
THROWS3ERROR(ListObjects);
const auto& err = outcome.GetError();
ThrowS3Error("ListObjects",
err,
"params, bucket={}, prefix={}",
bucket_name,
prefix);
}
auto objects = outcome.GetResult().GetContents();
for (auto& obj : objects) {

View File

@ -36,6 +36,7 @@
#include <memory>
#include <string>
#include <vector>
#include <fmt/core.h>
#include "common/EasyAssert.h"
#include "storage/ChunkManager.h"
@ -45,6 +46,27 @@ namespace milvus::storage {
enum class RemoteStorageType { S3 = 0, GOOGLE_CLOUD = 1, ALIYUN_CLOUD = 2 };
template <typename... Args>
static SegcoreError
ThrowS3Error(const std::string& func,
const Aws::S3::S3Error& err,
const std::string& fmtString,
Args&&... args) {
std::ostringstream oss;
const auto& message = fmt::format(fmtString, std::forward<Args>(args)...);
oss << "Error in " << func << "[errcode:" << int(err.GetResponseCode())
<< ", exception:" << err.GetExceptionName()
<< ", errmessage:" << err.GetMessage() << ", params:" << message << "]";
throw SegcoreError(S3Error, oss.str());
}
static bool
IsNotFound(const Aws::S3::S3Errors& s3err) {
return (s3err == Aws::S3::S3Errors::NO_SUCH_KEY ||
s3err == Aws::S3::S3Errors::RESOURCE_NOT_FOUND);
}
/**
* @brief user defined aws logger, redirect aws log to segcore log
*/
@ -168,8 +190,9 @@ class MinioChunkManager : public ChunkManager {
const std::string& object_name,
void* buf,
uint64_t size);
std::vector<std::string>
ListObjects(const char* bucket_name, const char* prefix = nullptr);
ListObjects(const std::string& bucket_name, const std::string& prefix = "");
void
InitSDKAPIDefault(const std::string& log_level);

View File

@ -90,7 +90,7 @@ struct StorageConfig {
std::string storage_type = "minio";
std::string cloud_provider = "aws";
std::string iam_endpoint = "";
std::string log_level = "error";
std::string log_level = "warn";
std::string region = "";
bool useSSL = false;
bool useIAM = false;

View File

@ -62,6 +62,7 @@ set(MILVUS_TEST_FILES
if ( BUILD_DISK_ANN STREQUAL "ON" )
set(MILVUS_TEST_FILES
${MILVUS_TEST_FILES}
#need update aws-sdk-cpp, see more from https://github.com/aws/aws-sdk-cpp/issues/1757
#test_minio_chunk_manager.cpp
)
endif()

View File

@ -19,7 +19,6 @@
using namespace std;
using namespace milvus;
using namespace milvus::storage;
using namespace boost::filesystem;
class MinioChunkManagerTest : public testing::Test {
public:
@ -30,7 +29,7 @@ class MinioChunkManagerTest : public testing::Test {
virtual void
SetUp() {
configs_ = get_default_remote_storage_config();
configs_ = StorageConfig{};
chunk_manager_ = std::make_unique<MinioChunkManager>(configs_);
}
@ -39,68 +38,50 @@ class MinioChunkManagerTest : public testing::Test {
StorageConfig configs_;
};
StorageConfig
get_google_cloud_storage_config() {
auto endpoint = "storage.googleapis.com:443";
auto accessKey = "";
auto accessValue = "";
auto rootPath = "files";
auto useSSL = true;
auto useIam = true;
auto iamEndPoint = "";
auto bucketName = "gcp-zilliz-infra-test";
//StorageConfig
//get_aliyun_cloud_storage_config() {
// auto endpoint = "oss-cn-shanghai.aliyuncs.com:443";
// auto accessKey = "";
// auto accessValue = "";
// auto rootPath = "files";
// auto useSSL = true;
// auto useIam = true;
// auto iamEndPoint = "";
// auto bucketName = "vdc-infra-poc";
// auto cloudProvider = "aliyun";
// auto logLevel = "error";
// auto region = "";
//
// return StorageConfig{endpoint,
// bucketName,
// accessKey,
// accessValue,
// rootPath,
// "minio",
// cloudProvider,
// iamEndPoint,
// logLevel,
// region,
// useSSL,
// useIam};
//}
return StorageConfig{endpoint,
bucketName,
accessKey,
accessValue,
rootPath,
"minio",
iamEndPoint,
"error",
"",
useSSL,
useIam};
}
StorageConfig
get_aliyun_cloud_storage_config() {
auto endpoint = "oss-cn-shanghai.aliyuncs.com:443";
auto accessKey = "";
auto accessValue = "";
auto rootPath = "files";
auto useSSL = true;
auto useIam = true;
auto iamEndPoint = "";
auto bucketName = "vdc-infra-poc";
return StorageConfig{endpoint,
bucketName,
accessKey,
accessValue,
rootPath,
"minio",
iamEndPoint,
useSSL,
useIam};
}
class AliyunChunkManagerTest : public testing::Test {
public:
AliyunChunkManagerTest() {
}
~AliyunChunkManagerTest() {
}
virtual void
SetUp() {
chunk_manager_ = std::make_unique<MinioChunkManager>(
get_aliyun_cloud_storage_config());
}
protected:
MinioChunkManagerPtr chunk_manager_;
};
//class AliyunChunkManagerTest : public testing::Test {
// public:
// AliyunChunkManagerTest() {
// }
// ~AliyunChunkManagerTest() {
// }
//
// virtual void
// SetUp() {
// chunk_manager_ = std::make_unique<MinioChunkManager>(
// get_aliyun_cloud_storage_config());
// }
//
// protected:
// MinioChunkManagerPtr chunk_manager_;
//};
TEST_F(MinioChunkManagerTest, BucketPositive) {
string testBucketName = "test-bucket";
@ -120,12 +101,8 @@ TEST_F(MinioChunkManagerTest, BucketNegtive) {
// create already exist bucket
chunk_manager_->CreateBucket(testBucketName);
try {
chunk_manager_->CreateBucket(testBucketName);
} catch (S3ErrorException& e) {
EXPECT_TRUE(std::string(e.what()).find("BucketAlreadyOwnedByYou") !=
string::npos);
}
bool created = chunk_manager_->CreateBucket(testBucketName);
EXPECT_EQ(created, false);
chunk_manager_->DeleteBucket(testBucketName);
}
@ -147,9 +124,9 @@ TEST_F(MinioChunkManagerTest, WritePositive) {
chunk_manager_->SetBucketName(testBucketName);
EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
// if (!chunk_manager_->BucketExists(testBucketName)) {
// chunk_manager_->CreateBucket(testBucketName);
// }
if (!chunk_manager_->BucketExists(testBucketName)) {
chunk_manager_->CreateBucket(testBucketName);
}
auto has_bucket = chunk_manager_->BucketExists(testBucketName);
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
string path = "1";
@ -225,6 +202,30 @@ TEST_F(MinioChunkManagerTest, ReadPositive) {
chunk_manager_->DeleteBucket(testBucketName);
}
TEST_F(MinioChunkManagerTest, ReadNotExist) {
string testBucketName = configs_.bucket_name;
chunk_manager_->SetBucketName(testBucketName);
EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
if (!chunk_manager_->BucketExists(testBucketName)) {
chunk_manager_->CreateBucket(testBucketName);
}
string path = "1/5/8";
uint8_t readdata[20] = {0};
EXPECT_THROW(
try {
chunk_manager_->Read(path, readdata, sizeof(readdata));
} catch (SegcoreError& e) {
EXPECT_TRUE(std::string(e.what()).find("exist") != string::npos);
throw e;
},
SegcoreError);
chunk_manager_->Remove(path);
chunk_manager_->DeleteBucket(testBucketName);
}
TEST_F(MinioChunkManagerTest, RemovePositive) {
string testBucketName = "test-remove";
chunk_manager_->SetBucketName(testBucketName);
@ -240,7 +241,12 @@ TEST_F(MinioChunkManagerTest, RemovePositive) {
bool exist = chunk_manager_->Exist(path);
EXPECT_EQ(exist, true);
chunk_manager_->Remove(path);
bool deleted = chunk_manager_->Remove(path);
EXPECT_EQ(deleted, true);
// test double deleted
deleted = chunk_manager_->Remove(path);
EXPECT_EQ(deleted, false);
exist = chunk_manager_->Exist(path);
EXPECT_EQ(exist, false);
@ -286,45 +292,45 @@ TEST_F(MinioChunkManagerTest, ListWithPrefixPositive) {
chunk_manager_->DeleteBucket(testBucketName);
}
TEST_F(AliyunChunkManagerTest, ReadPositive) {
string testBucketName = "vdc-infra-poc";
chunk_manager_->SetBucketName(testBucketName);
EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
string path = "1/4/6";
chunk_manager_->Write(path, data, sizeof(data));
bool exist = chunk_manager_->Exist(path);
EXPECT_EQ(exist, true);
auto size = chunk_manager_->Size(path);
EXPECT_EQ(size, 5);
uint8_t readdata[20] = {0};
size = chunk_manager_->Read(path, readdata, 20);
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x45);
EXPECT_EQ(readdata[3], 0x34);
EXPECT_EQ(readdata[4], 0x23);
size = chunk_manager_->Read(path, readdata, 3);
EXPECT_EQ(size, 3);
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x45);
uint8_t dataWithNULL[] = {0x17, 0x32, 0x00, 0x34, 0x23};
chunk_manager_->Write(path, dataWithNULL, sizeof(dataWithNULL));
exist = chunk_manager_->Exist(path);
EXPECT_EQ(exist, true);
size = chunk_manager_->Size(path);
EXPECT_EQ(size, 5);
size = chunk_manager_->Read(path, readdata, 20);
EXPECT_EQ(readdata[0], 0x17);
EXPECT_EQ(readdata[1], 0x32);
EXPECT_EQ(readdata[2], 0x00);
EXPECT_EQ(readdata[3], 0x34);
EXPECT_EQ(readdata[4], 0x23);
chunk_manager_->Remove(path);
}
//TEST_F(AliyunChunkManagerTest, ReadPositive) {
// string testBucketName = "vdc-infra-poc";
// chunk_manager_->SetBucketName(testBucketName);
// EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
//
// uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
// string path = "1/4/6";
// chunk_manager_->Write(path, data, sizeof(data));
// bool exist = chunk_manager_->Exist(path);
// EXPECT_EQ(exist, true);
// auto size = chunk_manager_->Size(path);
// EXPECT_EQ(size, 5);
//
// uint8_t readdata[20] = {0};
// size = chunk_manager_->Read(path, readdata, 20);
// EXPECT_EQ(readdata[0], 0x17);
// EXPECT_EQ(readdata[1], 0x32);
// EXPECT_EQ(readdata[2], 0x45);
// EXPECT_EQ(readdata[3], 0x34);
// EXPECT_EQ(readdata[4], 0x23);
//
// size = chunk_manager_->Read(path, readdata, 3);
// EXPECT_EQ(size, 3);
// EXPECT_EQ(readdata[0], 0x17);
// EXPECT_EQ(readdata[1], 0x32);
// EXPECT_EQ(readdata[2], 0x45);
//
// uint8_t dataWithNULL[] = {0x17, 0x32, 0x00, 0x34, 0x23};
// chunk_manager_->Write(path, dataWithNULL, sizeof(dataWithNULL));
// exist = chunk_manager_->Exist(path);
// EXPECT_EQ(exist, true);
// size = chunk_manager_->Size(path);
// EXPECT_EQ(size, 5);
// size = chunk_manager_->Read(path, readdata, 20);
// EXPECT_EQ(readdata[0], 0x17);
// EXPECT_EQ(readdata[1], 0x32);
// EXPECT_EQ(readdata[2], 0x00);
// EXPECT_EQ(readdata[3], 0x34);
// EXPECT_EQ(readdata[4], 0x23);
//
// chunk_manager_->Remove(path);
//}