mirror of https://github.com/milvus-io/milvus.git
reduce useless ObjectExists (#28156)
replace ListBlobs() with GetProperties() unified style std::string& / char* config azure requestTimeoutMs Signed-off-by: PowderLi <min.li@zilliz.com>pull/28057/head
parent
af1c2044b9
commit
7bb0fa9c70
|
@ -17,18 +17,33 @@
|
|||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include "common/EasyAssert.h"
|
||||
#include "log/Log.h"
|
||||
#include "storage/AzureChunkManager.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace storage {
|
||||
|
||||
std::atomic<size_t> AzureChunkManager::init_count_(0);
|
||||
std::mutex AzureChunkManager::client_mutex_;
|
||||
|
||||
void
|
||||
AzureLogger(std::string const& level, std::string const& msg) {
|
||||
LOG_SEGCORE_INFO_ << "[AZURE LOG] " << msg;
|
||||
}
|
||||
|
||||
AzureChunkManager::AzureChunkManager(const StorageConfig& storage_config)
|
||||
: default_bucket_name_(storage_config.bucket_name),
|
||||
path_prefix_(storage_config.root_path) {
|
||||
std::scoped_lock lock{client_mutex_};
|
||||
const size_t initCount = init_count_++;
|
||||
if (initCount == 0) {
|
||||
// azure::AzureBlobChunkManager::InitLog(storage_config.log_level, AzureLogger);
|
||||
}
|
||||
client_ = std::make_shared<azure::AzureBlobChunkManager>(
|
||||
storage_config.access_key_id,
|
||||
storage_config.access_key_value,
|
||||
storage_config.address,
|
||||
storage_config.requestTimeoutMs,
|
||||
storage_config.useIAM);
|
||||
}
|
||||
|
||||
|
@ -52,18 +67,19 @@ AzureChunkManager::Remove(const std::string& filepath) {
|
|||
|
||||
std::vector<std::string>
|
||||
AzureChunkManager::ListWithPrefix(const std::string& filepath) {
|
||||
return ListObjects(default_bucket_name_.c_str(), filepath.c_str());
|
||||
return ListObjects(default_bucket_name_, filepath);
|
||||
}
|
||||
|
||||
uint64_t
|
||||
AzureChunkManager::Read(const std::string& filepath, void* buf, uint64_t size) {
|
||||
if (!ObjectExists(default_bucket_name_, filepath)) {
|
||||
try {
|
||||
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
|
||||
} catch (const std::exception& e) {
|
||||
std::stringstream err_msg;
|
||||
err_msg << "object('" << default_bucket_name_ << "', '" << filepath
|
||||
<< "') not exists";
|
||||
err_msg << "read object('" << default_bucket_name_ << "', '" << filepath
|
||||
<< "' fail: " << e.what();
|
||||
throw SegcoreError(ObjectNotExist, err_msg.str());
|
||||
}
|
||||
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -75,59 +91,94 @@ AzureChunkManager::Write(const std::string& filepath,
|
|||
|
||||
bool
|
||||
AzureChunkManager::BucketExists(const std::string& bucket_name) {
|
||||
return client_->BucketExists(bucket_name);
|
||||
bool res;
|
||||
try {
|
||||
res = client_->BucketExists(bucket_name);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("BucketExists", err, "params, bucket={}", bucket_name);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
std::vector<std::string>
|
||||
AzureChunkManager::ListBuckets() {
|
||||
return client_->ListBuckets();
|
||||
std::vector<std::string> res;
|
||||
try {
|
||||
res = client_->ListBuckets();
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("ListBuckets", err, "params");
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
bool
|
||||
AzureChunkManager::CreateBucket(const std::string& bucket_name) {
|
||||
bool res;
|
||||
try {
|
||||
client_->CreateBucket(bucket_name);
|
||||
} catch (std::exception& e) {
|
||||
throw SegcoreError(BucketInvalid, e.what());
|
||||
res = client_->CreateBucket(bucket_name);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("CreateBucket", err, "params, bucket={}", bucket_name);
|
||||
}
|
||||
return true;
|
||||
return res;
|
||||
}
|
||||
|
||||
bool
|
||||
AzureChunkManager::DeleteBucket(const std::string& bucket_name) {
|
||||
bool res;
|
||||
try {
|
||||
client_->DeleteBucket(bucket_name);
|
||||
} catch (std::exception& e) {
|
||||
throw SegcoreError(BucketInvalid, e.what());
|
||||
res = client_->DeleteBucket(bucket_name);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("DeleteBucket", err, "params, bucket={}", bucket_name);
|
||||
}
|
||||
return true;
|
||||
return res;
|
||||
}
|
||||
|
||||
bool
|
||||
AzureChunkManager::ObjectExists(const std::string& bucket_name,
|
||||
const std::string& object_name) {
|
||||
return client_->ObjectExists(bucket_name, object_name);
|
||||
bool res;
|
||||
try {
|
||||
res = client_->ObjectExists(bucket_name, object_name);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("ObjectExists",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
int64_t
|
||||
uint64_t
|
||||
AzureChunkManager::GetObjectSize(const std::string& bucket_name,
|
||||
const std::string& object_name) {
|
||||
uint64_t res;
|
||||
try {
|
||||
return client_->GetObjectSize(bucket_name, object_name);
|
||||
} catch (std::exception& e) {
|
||||
throw SegcoreError(ObjectNotExist, e.what());
|
||||
res = client_->GetObjectSize(bucket_name, object_name);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("GetObjectSize",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
bool
|
||||
AzureChunkManager::DeleteObject(const std::string& bucket_name,
|
||||
const std::string& object_name) {
|
||||
bool res;
|
||||
try {
|
||||
client_->DeleteObject(bucket_name, object_name);
|
||||
} catch (std::exception& e) {
|
||||
throw SegcoreError(ObjectNotExist, e.what());
|
||||
res = client_->DeleteObject(bucket_name, object_name);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("DeleteObject",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
return true;
|
||||
return res;
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -135,7 +186,17 @@ AzureChunkManager::PutObjectBuffer(const std::string& bucket_name,
|
|||
const std::string& object_name,
|
||||
void* buf,
|
||||
uint64_t size) {
|
||||
return client_->PutObjectBuffer(bucket_name, object_name, buf, size);
|
||||
bool res;
|
||||
try {
|
||||
res = client_->PutObjectBuffer(bucket_name, object_name, buf, size);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("PutObjectBuffer",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
uint64_t
|
||||
|
@ -143,12 +204,33 @@ AzureChunkManager::GetObjectBuffer(const std::string& bucket_name,
|
|||
const std::string& object_name,
|
||||
void* buf,
|
||||
uint64_t size) {
|
||||
return client_->GetObjectBuffer(bucket_name, object_name, buf, size);
|
||||
uint64_t res;
|
||||
try {
|
||||
res = client_->GetObjectBuffer(bucket_name, object_name, buf, size);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("GetObjectBuffer",
|
||||
err,
|
||||
"params, bucket={}, object={}",
|
||||
bucket_name,
|
||||
object_name);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
std::vector<std::string>
|
||||
AzureChunkManager::ListObjects(const char* bucket_name, const char* prefix) {
|
||||
return client_->ListObjects(bucket_name, prefix);
|
||||
AzureChunkManager::ListObjects(const std::string& bucket_name,
|
||||
const std::string& prefix) {
|
||||
std::vector<std::string> res;
|
||||
try {
|
||||
res = client_->ListObjects(bucket_name, prefix);
|
||||
} catch (std::exception& err) {
|
||||
ThrowAzureError("ListObjects",
|
||||
err,
|
||||
"params, bucket={}, prefix={}",
|
||||
bucket_name,
|
||||
prefix);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
|
|
|
@ -27,6 +27,20 @@
|
|||
namespace milvus {
|
||||
namespace storage {
|
||||
|
||||
template <typename... Args>
|
||||
|
||||
static SegcoreError
|
||||
ThrowAzureError(const std::string& func,
|
||||
const std::exception& err,
|
||||
const std::string& fmtString,
|
||||
Args&&... args) {
|
||||
std::ostringstream oss;
|
||||
const auto& message = fmt::format(fmtString, std::forward<Args>(args)...);
|
||||
oss << "Error in " << func << "[exception:" << err.what()
|
||||
<< ", params:" << message << "]";
|
||||
throw SegcoreError(S3Error, oss.str());
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief This AzureChunkManager is responsible for read and write file in blob.
|
||||
*/
|
||||
|
@ -113,7 +127,7 @@ class AzureChunkManager : public ChunkManager {
|
|||
bool
|
||||
ObjectExists(const std::string& bucket_name,
|
||||
const std::string& object_name);
|
||||
int64_t
|
||||
uint64_t
|
||||
GetObjectSize(const std::string& bucket_name,
|
||||
const std::string& object_name);
|
||||
bool
|
||||
|
@ -130,9 +144,12 @@ class AzureChunkManager : public ChunkManager {
|
|||
void* buf,
|
||||
uint64_t size);
|
||||
std::vector<std::string>
|
||||
ListObjects(const char* bucket_name, const char* prefix = nullptr);
|
||||
ListObjects(const std::string& bucket_name,
|
||||
const std::string& prefix = nullptr);
|
||||
|
||||
private:
|
||||
static std::atomic<size_t> init_count_;
|
||||
static std::mutex client_mutex_;
|
||||
std::shared_ptr<azure::AzureBlobChunkManager> client_;
|
||||
std::string default_bucket_name_;
|
||||
std::string path_prefix_;
|
||||
|
|
|
@ -458,7 +458,7 @@ MinioChunkManager::ObjectExists(const std::string& bucket_name,
|
|||
return true;
|
||||
}
|
||||
|
||||
int64_t
|
||||
uint64_t
|
||||
MinioChunkManager::GetObjectSize(const std::string& bucket_name,
|
||||
const std::string& object_name) {
|
||||
Aws::S3::Model::HeadObjectRequest request;
|
||||
|
|
|
@ -174,7 +174,7 @@ class MinioChunkManager : public ChunkManager {
|
|||
bool
|
||||
ObjectExists(const std::string& bucket_name,
|
||||
const std::string& object_name);
|
||||
int64_t
|
||||
uint64_t
|
||||
GetObjectSize(const std::string& bucket_name,
|
||||
const std::string& object_name);
|
||||
bool
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <azure/core/diagnostics/logger.hpp>
|
||||
#include <azure/identity/workload_identity_credential.hpp>
|
||||
#include "AzureBlobChunkManager.h"
|
||||
|
||||
|
@ -48,11 +49,36 @@ GetConnectionString(const std::string& access_key_id,
|
|||
";AccountKey=" + access_key_value + ";EndpointSuffix=" + address;
|
||||
}
|
||||
|
||||
void
|
||||
AzureBlobChunkManager::InitLog(
|
||||
std::string level_str,
|
||||
std::function<void(std::string const& level, std::string const& message)>
|
||||
listener) {
|
||||
// SetListener accepts std::function<>, which can be either lambda or a function pointer.
|
||||
Azure::Core::Diagnostics::Logger::SetListener(
|
||||
[&](auto lvl, auto msg) { listener("info", msg); });
|
||||
Azure::Core::Diagnostics::Logger::Level level =
|
||||
Azure::Core::Diagnostics::Logger::Level::Verbose;
|
||||
if (level_str == "fatal" || level_str == "error") {
|
||||
level = Azure::Core::Diagnostics::Logger::Level::Error;
|
||||
} else if (level_str == "warn") {
|
||||
level = Azure::Core::Diagnostics::Logger::Level::Warning;
|
||||
} else if (level_str == "info") {
|
||||
level = Azure::Core::Diagnostics::Logger::Level::Informational;
|
||||
} else if (level_str == "debug" || level_str == "trace") {
|
||||
level = Azure::Core::Diagnostics::Logger::Level::Verbose;
|
||||
}
|
||||
// See above for the level descriptions.
|
||||
Azure::Core::Diagnostics::Logger::SetLevel(level);
|
||||
}
|
||||
|
||||
AzureBlobChunkManager::AzureBlobChunkManager(
|
||||
const std::string& access_key_id,
|
||||
const std::string& access_key_value,
|
||||
const std::string& address,
|
||||
int64_t requestTimeoutMs,
|
||||
bool useIAM) {
|
||||
requestTimeoutMs_ = requestTimeoutMs;
|
||||
if (useIAM) {
|
||||
auto workloadIdentityCredential =
|
||||
std::make_shared<Azure::Identity::WorkloadIdentityCredential>(
|
||||
|
@ -73,23 +99,38 @@ AzureBlobChunkManager::~AzureBlobChunkManager() {
|
|||
|
||||
bool
|
||||
AzureBlobChunkManager::BucketExists(const std::string& bucket_name) {
|
||||
std::vector<std::string> buckets;
|
||||
for (auto containerPage = client_->ListBlobContainers();
|
||||
containerPage.HasPage();
|
||||
containerPage.MoveToNextPage()) {
|
||||
for (auto& container : containerPage.BlobContainers) {
|
||||
if (container.Name == bucket_name) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context = context.WithDeadline(
|
||||
std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
client_->GetBlobContainerClient(bucket_name)
|
||||
.GetProperties(
|
||||
Azure::Storage::Blobs::GetBlobContainerPropertiesOptions(),
|
||||
context);
|
||||
return true;
|
||||
} catch (const Azure::Storage::StorageException& e) {
|
||||
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound &&
|
||||
e.ErrorCode == "ContainerNotFound") {
|
||||
return false;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<std::string>
|
||||
AzureBlobChunkManager::ListBuckets() {
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context =
|
||||
context.WithDeadline(std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
std::vector<std::string> buckets;
|
||||
for (auto containerPage = client_->ListBlobContainers();
|
||||
for (auto containerPage = client_->ListBlobContainers(
|
||||
Azure::Storage::Blobs::ListBlobContainersOptions(), context);
|
||||
containerPage.HasPage();
|
||||
containerPage.MoveToNextPage()) {
|
||||
for (auto& container : containerPage.BlobContainers) {
|
||||
|
@ -99,57 +140,86 @@ AzureBlobChunkManager::ListBuckets() {
|
|||
return buckets;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
AzureBlobChunkManager::CreateBucket(const std::string& bucket_name) {
|
||||
client_->GetBlobContainerClient(bucket_name).Create();
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context =
|
||||
context.WithDeadline(std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
return client_->GetBlobContainerClient(bucket_name)
|
||||
.CreateIfNotExists(Azure::Storage::Blobs::CreateBlobContainerOptions(),
|
||||
context)
|
||||
.Value.Created;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
AzureBlobChunkManager::DeleteBucket(const std::string& bucket_name) {
|
||||
client_->GetBlobContainerClient(bucket_name).Delete();
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context =
|
||||
context.WithDeadline(std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
return client_->GetBlobContainerClient(bucket_name)
|
||||
.DeleteIfExists(Azure::Storage::Blobs::DeleteBlobContainerOptions(),
|
||||
context)
|
||||
.Value.Deleted;
|
||||
}
|
||||
|
||||
bool
|
||||
AzureBlobChunkManager::ObjectExists(const std::string& bucket_name,
|
||||
const std::string& object_name) {
|
||||
for (auto blobPage =
|
||||
client_->GetBlobContainerClient(bucket_name).ListBlobs();
|
||||
blobPage.HasPage();
|
||||
blobPage.MoveToNextPage()) {
|
||||
for (auto& blob : blobPage.Blobs) {
|
||||
if (blob.Name == object_name) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context = context.WithDeadline(
|
||||
std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
client_->GetBlobContainerClient(bucket_name)
|
||||
.GetBlockBlobClient(object_name)
|
||||
.GetProperties(Azure::Storage::Blobs::GetBlobPropertiesOptions(),
|
||||
context);
|
||||
return true;
|
||||
} catch (const Azure::Storage::StorageException& e) {
|
||||
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
|
||||
return false;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t
|
||||
AzureBlobChunkManager::GetObjectSize(const std::string& bucket_name,
|
||||
const std::string& object_name) {
|
||||
for (auto blobPage =
|
||||
client_->GetBlobContainerClient(bucket_name).ListBlobs();
|
||||
blobPage.HasPage();
|
||||
blobPage.MoveToNextPage()) {
|
||||
for (auto& blob : blobPage.Blobs) {
|
||||
if (blob.Name == object_name) {
|
||||
return blob.BlobSize;
|
||||
}
|
||||
}
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context =
|
||||
context.WithDeadline(std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
std::stringstream err_msg;
|
||||
err_msg << "object('" << bucket_name << "', '" << object_name
|
||||
<< "') not exists";
|
||||
throw std::runtime_error(err_msg.str());
|
||||
return client_->GetBlobContainerClient(bucket_name)
|
||||
.GetBlockBlobClient(object_name)
|
||||
.GetProperties(Azure::Storage::Blobs::GetBlobPropertiesOptions(),
|
||||
context)
|
||||
.Value.BlobSize;
|
||||
}
|
||||
|
||||
void
|
||||
bool
|
||||
AzureBlobChunkManager::DeleteObject(const std::string& bucket_name,
|
||||
const std::string& object_name) {
|
||||
client_->GetBlobContainerClient(bucket_name)
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context =
|
||||
context.WithDeadline(std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
return client_->GetBlobContainerClient(bucket_name)
|
||||
.GetBlockBlobClient(object_name)
|
||||
.Delete();
|
||||
.DeleteIfExists(Azure::Storage::Blobs::DeleteBlobOptions(), context)
|
||||
.Value.Deleted;
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -159,9 +229,18 @@ AzureBlobChunkManager::PutObjectBuffer(const std::string& bucket_name,
|
|||
uint64_t size) {
|
||||
std::vector<unsigned char> str(static_cast<char*>(buf),
|
||||
static_cast<char*>(buf) + size);
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context =
|
||||
context.WithDeadline(std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
client_->GetBlobContainerClient(bucket_name)
|
||||
.GetBlockBlobClient(object_name)
|
||||
.UploadFrom(str.data(), str.size());
|
||||
.UploadFrom(str.data(),
|
||||
str.size(),
|
||||
Azure::Storage::Blobs::UploadBlockBlobFromOptions(),
|
||||
context);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -174,71 +253,47 @@ AzureBlobChunkManager::GetObjectBuffer(const std::string& bucket_name,
|
|||
downloadOptions.Range = Azure::Core::Http::HttpRange();
|
||||
downloadOptions.Range.Value().Offset = 0;
|
||||
downloadOptions.Range.Value().Length = size;
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context =
|
||||
context.WithDeadline(std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
auto downloadResponse = client_->GetBlobContainerClient(bucket_name)
|
||||
.GetBlockBlobClient(object_name)
|
||||
.Download(downloadOptions);
|
||||
std::vector<unsigned char> str =
|
||||
downloadResponse.Value.BodyStream->ReadToEnd();
|
||||
memcpy(static_cast<char*>(buf), &str[0], str.size() * sizeof(str[0]));
|
||||
return str.size();
|
||||
.Download(downloadOptions, context);
|
||||
auto bodyStream = downloadResponse.Value.BodyStream.get();
|
||||
uint64_t totalBytesRead = 0;
|
||||
uint64_t bytesRead = 0;
|
||||
do {
|
||||
bytesRead = bodyStream->Read(
|
||||
static_cast<uint8_t*>(buf) + totalBytesRead, size - totalBytesRead);
|
||||
totalBytesRead += bytesRead;
|
||||
} while (bytesRead != 0 && totalBytesRead < size);
|
||||
return totalBytesRead;
|
||||
}
|
||||
|
||||
std::vector<std::string>
|
||||
AzureBlobChunkManager::ListObjects(const char* bucket_name,
|
||||
const char* prefix) {
|
||||
AzureBlobChunkManager::ListObjects(const std::string& bucket_name,
|
||||
const std::string& prefix) {
|
||||
std::vector<std::string> objects_vec;
|
||||
for (auto blobPage =
|
||||
client_->GetBlobContainerClient(bucket_name).ListBlobs();
|
||||
Azure::Storage::Blobs::ListBlobsOptions listOptions;
|
||||
listOptions.Prefix = prefix;
|
||||
Azure::Core::Context context;
|
||||
if (requestTimeoutMs_ > 0) {
|
||||
context =
|
||||
context.WithDeadline(std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(requestTimeoutMs_));
|
||||
}
|
||||
for (auto blobPage = client_->GetBlobContainerClient(bucket_name)
|
||||
.ListBlobs(listOptions, context);
|
||||
blobPage.HasPage();
|
||||
blobPage.MoveToNextPage()) {
|
||||
for (auto& blob : blobPage.Blobs) {
|
||||
if (blob.Name.rfind(prefix, 0) == 0) {
|
||||
objects_vec.emplace_back(blob.Name);
|
||||
}
|
||||
objects_vec.emplace_back(blob.Name);
|
||||
}
|
||||
}
|
||||
return objects_vec;
|
||||
}
|
||||
|
||||
} // namespace azure
|
||||
|
||||
int
|
||||
main() {
|
||||
const char* containerName = "default";
|
||||
const char* blobName = "sample-blob";
|
||||
using namespace azure;
|
||||
AzureBlobChunkManager chunkManager = AzureBlobChunkManager("", "", "");
|
||||
std::vector<std::string> buckets = chunkManager.ListBuckets();
|
||||
for (const auto& bucket : buckets) {
|
||||
std::cout << bucket << std::endl;
|
||||
}
|
||||
std::vector<std::string> objects =
|
||||
chunkManager.ListObjects(containerName, blobName);
|
||||
for (const auto& object : objects) {
|
||||
std::cout << object << std::endl;
|
||||
}
|
||||
std::cout << chunkManager.GetObjectSize(containerName, blobName)
|
||||
<< std::endl;
|
||||
std::cout << chunkManager.ObjectExists(containerName, blobName)
|
||||
<< std::endl;
|
||||
std::cout << chunkManager.ObjectExists(containerName, "blobName")
|
||||
<< std::endl;
|
||||
std::cout << chunkManager.BucketExists(containerName) << std::endl;
|
||||
char buffer[1024 * 1024];
|
||||
chunkManager.GetObjectBuffer(containerName, blobName, buffer, 1024 * 1024);
|
||||
std::cout << buffer << std::endl;
|
||||
|
||||
char msg[12];
|
||||
memcpy(msg, "Azure hello!", 12);
|
||||
if (!chunkManager.ObjectExists(containerName, "blobName")) {
|
||||
chunkManager.PutObjectBuffer(containerName, "blobName", msg, 12);
|
||||
}
|
||||
char buffer0[1024 * 1024];
|
||||
chunkManager.GetObjectBuffer(
|
||||
containerName, "blobName", buffer0, 1024 * 1024);
|
||||
std::cout << buffer0 << std::endl;
|
||||
chunkManager.DeleteObject(containerName, "blobName");
|
||||
chunkManager.CreateBucket("sample-container1");
|
||||
chunkManager.DeleteBucket("sample-container1");
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
|
@ -21,7 +21,7 @@
|
|||
#include <stdlib.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "azure/storage/common/storage_exception.hpp"
|
||||
#include <azure/storage/common/storage_exception.hpp>
|
||||
|
||||
namespace azure {
|
||||
/**
|
||||
|
@ -29,9 +29,14 @@ namespace azure {
|
|||
*/
|
||||
class AzureBlobChunkManager {
|
||||
public:
|
||||
static void
|
||||
InitLog(std::string level_string,
|
||||
std::function<void(std::string const& level,
|
||||
std::string const& message)> listener);
|
||||
explicit AzureBlobChunkManager(const std::string& access_key_id,
|
||||
const std::string& access_key_value,
|
||||
const std::string& address,
|
||||
int64_t requestTimeoutMs = 0,
|
||||
bool useIAM = false);
|
||||
|
||||
AzureBlobChunkManager(const AzureBlobChunkManager&);
|
||||
|
@ -43,9 +48,9 @@ class AzureBlobChunkManager {
|
|||
|
||||
bool
|
||||
BucketExists(const std::string& bucket_name);
|
||||
void
|
||||
bool
|
||||
CreateBucket(const std::string& bucket_name);
|
||||
void
|
||||
bool
|
||||
DeleteBucket(const std::string& bucket_name);
|
||||
std::vector<std::string>
|
||||
ListBuckets();
|
||||
|
@ -55,7 +60,7 @@ class AzureBlobChunkManager {
|
|||
int64_t
|
||||
GetObjectSize(const std::string& bucket_name,
|
||||
const std::string& object_name);
|
||||
void
|
||||
bool
|
||||
DeleteObject(const std::string& bucket_name,
|
||||
const std::string& object_name);
|
||||
bool
|
||||
|
@ -69,10 +74,12 @@ class AzureBlobChunkManager {
|
|||
void* buf,
|
||||
uint64_t size);
|
||||
std::vector<std::string>
|
||||
ListObjects(const char* bucket_name, const char* prefix = nullptr);
|
||||
ListObjects(const std::string& bucket_name,
|
||||
const std::string& prefix = nullptr);
|
||||
|
||||
private:
|
||||
std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> client_;
|
||||
int64_t requestTimeoutMs_;
|
||||
};
|
||||
|
||||
} // namespace azure
|
||||
|
|
|
@ -23,7 +23,10 @@ find_package(azure-storage-blobs-cpp CONFIG REQUIRED)
|
|||
find_package(azure-identity-cpp CONFIG REQUIRED)
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-parameter -Wno-return-type -Wno-pedantic")
|
||||
add_library(blob-chunk-manager SHARED AzureBlobChunkManager.cpp)
|
||||
target_link_libraries(blob-chunk-manager PRIVATE Azure::azure-identity Azure::azure-storage-blobs)
|
||||
target_link_libraries(blob-chunk-manager PUBLIC Azure::azure-identity Azure::azure-storage-blobs)
|
||||
|
||||
install(TARGETS blob-chunk-manager DESTINATION "${CMAKE_INSTALL_LIBDIR}")
|
||||
|
||||
if ( BUILD_UNIT_TEST STREQUAL "ON" )
|
||||
add_subdirectory(test)
|
||||
endif ()
|
|
@ -0,0 +1,7 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
project(azure-blob-test)
|
||||
|
||||
add_executable(azure-blob-test test_azure_blob_chunk_manager.cpp)
|
||||
target_link_libraries(azure-blob-test PRIVATE blob-chunk-manager Azure::azure-identity Azure::azure-storage-blobs)
|
|
@ -0,0 +1,59 @@
|
|||
#include "../AzureBlobChunkManager.h"
|
||||
|
||||
using namespace azure;
|
||||
|
||||
void
|
||||
print(std::string const& level, std::string const& message) {
|
||||
std::cout << "level: " << level << ", message: " << message << std::endl;
|
||||
}
|
||||
|
||||
int
|
||||
main() {
|
||||
const char* containerName = "default";
|
||||
const char* blobName = "sample-blob";
|
||||
// AzureBlobChunkManager::InitLog("info", print);
|
||||
AzureBlobChunkManager chunkManager = AzureBlobChunkManager("", "", "");
|
||||
std::vector<std::string> buckets = chunkManager.ListBuckets();
|
||||
std::cout << "list buckets." << std::endl;
|
||||
for (const auto& bucket : buckets) {
|
||||
std::cout << bucket << std::endl;
|
||||
}
|
||||
if (!chunkManager.BucketExists(containerName)) {
|
||||
std::cout << "create a bucket named: " << containerName << std::endl;
|
||||
chunkManager.CreateBucket(containerName);
|
||||
} else {
|
||||
std::vector<std::string> objects =
|
||||
chunkManager.ListObjects(containerName, blobName);
|
||||
for (const auto& object : objects) {
|
||||
std::cout << object << std::endl;
|
||||
}
|
||||
}
|
||||
std::cout << chunkManager.BucketExists(containerName) << std::endl;
|
||||
|
||||
if (!chunkManager.ObjectExists(containerName, blobName)) {
|
||||
char msg[12];
|
||||
memcpy(msg, "Azure hello!", 12);
|
||||
chunkManager.PutObjectBuffer(containerName, blobName, msg, 12);
|
||||
}
|
||||
std::cout << chunkManager.GetObjectSize(containerName, blobName)
|
||||
<< std::endl;
|
||||
std::cout << chunkManager.ObjectExists(containerName, blobName)
|
||||
<< std::endl;
|
||||
std::cout << chunkManager.ObjectExists(containerName, "blobName")
|
||||
<< std::endl;
|
||||
char buffer[1024 * 1024];
|
||||
chunkManager.GetObjectBuffer(containerName, blobName, buffer, 1024 * 1024);
|
||||
std::cout << buffer << std::endl;
|
||||
chunkManager.DeleteObject(containerName, blobName);
|
||||
try {
|
||||
chunkManager.GetObjectBuffer(
|
||||
containerName, "blobName", buffer, 1024 * 1024);
|
||||
} catch (const std::exception& e) {
|
||||
std::cout << "object('" << containerName << "', 'blobName') not exists"
|
||||
<< e.what() << std::endl;
|
||||
}
|
||||
std::cout << "create a bucket duplicated "
|
||||
<< chunkManager.CreateBucket(containerName) << std::endl;
|
||||
chunkManager.DeleteBucket(containerName);
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
|
@ -210,7 +210,7 @@ TEST_F(AzureChunkManagerTest, ReadPositive) {
|
|||
try {
|
||||
chunk_manager_->Read(path, readdata, sizeof(dataWithNULL));
|
||||
} catch (SegcoreError& e) {
|
||||
EXPECT_TRUE(string(e.what()).find("exists") != string::npos);
|
||||
EXPECT_TRUE(string(e.what()).find("exist") != string::npos);
|
||||
}
|
||||
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
|
|
|
@ -1,13 +1,38 @@
|
|||
ROOT_DIR=$1
|
||||
while getopts "p:s:h" arg; do
|
||||
case $arg in
|
||||
p)
|
||||
INSTALL_PREFIX=$OPTARG
|
||||
;;
|
||||
s)
|
||||
SOURCE_DIR=$OPTARG
|
||||
;;
|
||||
h) # help
|
||||
echo "
|
||||
parameter:
|
||||
-p: install prefix
|
||||
-s: source directory
|
||||
-h: help
|
||||
|
||||
usage:
|
||||
./azure_build.sh -p \${INSTALL_PREFIX} -s \${SOURCE_DIR} [-h]
|
||||
"
|
||||
exit 0
|
||||
;;
|
||||
?)
|
||||
echo "ERROR! unknown argument"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
ARCHITECTURE=$(uname -m)
|
||||
if [[ ${ARCHITECTURE} == "aarch64" ]]; then
|
||||
export VCPKG_FORCE_SYSTEM_BINARIES="arm"
|
||||
fi
|
||||
|
||||
AZURE_CMAKE_CMD="cmake \
|
||||
-DCMAKE_INSTALL_LIBDIR=${ROOT_DIR}/internal/core/output/lib \
|
||||
${ROOT_DIR}/internal/core/src/storage/azure-blob-storage"
|
||||
AZURE_CMAKE_CMD="cmake -DBUILD_UNIT_TEST=on \
|
||||
-DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} \
|
||||
${SOURCE_DIR}"
|
||||
echo ${AZURE_CMAKE_CMD}
|
||||
${AZURE_CMAKE_CMD}
|
||||
|
||||
|
|
|
@ -184,7 +184,7 @@ if [ -z "$BUILD_WITHOUT_AZURE" ]; then
|
|||
mkdir -p ${AZURE_BUILD_DIR}
|
||||
fi
|
||||
pushd ${AZURE_BUILD_DIR}
|
||||
env bash ${ROOT_DIR}/scripts/azure_build.sh ${ROOT_DIR}
|
||||
env bash ${ROOT_DIR}/scripts/azure_build.sh -p ${INSTALL_PREFIX} -s ${ROOT_DIR}/internal/core/src/storage/azure-blob-storage
|
||||
if [ ! -e libblob-chunk-manager* ]; then
|
||||
cat vcpkg-bootstrap.log
|
||||
exit 1
|
||||
|
|
Loading…
Reference in New Issue