mirror of https://github.com/milvus-io/milvus.git
(db/snapshot) Add root path for snapshot store (#2945)
* Add root path for snapshot GC Signed-off-by: yhz <413554850@qq.com> * Change Fun name from GetResPath to GetRootPath Signed-off-by: yhz <413554850@qq.com>pull/2939/head
parent
0e0a731ddd
commit
d4f305aa7b
|
@ -29,7 +29,6 @@
|
|||
//#include "storage/s3/S3ClientWrapper.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
|
||||
#include <map>
|
||||
|
||||
|
@ -50,15 +49,6 @@ ConstructParentFolder(const std::string& db_path, const meta::SegmentSchema& tab
|
|||
|
||||
} // namespace
|
||||
|
||||
std::string
|
||||
ConstructCollectionRootPath(const std::string& root_path) {
|
||||
if (StringHelpFunctions::EndWithSlash(root_path)) {
|
||||
return root_path + "db" + TABLES_FOLDER;
|
||||
}
|
||||
|
||||
return root_path + "/db" + TABLES_FOLDER;
|
||||
}
|
||||
|
||||
int64_t
|
||||
GetMicroSecTimeStamp() {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
|
|
|
@ -30,9 +30,6 @@ namespace utils {
|
|||
int64_t
|
||||
GetMicroSecTimeStamp();
|
||||
|
||||
std::string
|
||||
ConstructCollectionRootPath(const std::string& root_path);
|
||||
|
||||
Status
|
||||
CreateCollectionPath(const DBMetaOptions& options, const std::string& collection_id);
|
||||
Status
|
||||
|
|
|
@ -35,7 +35,7 @@ class ResourceGCEvent : public MetaEvent {
|
|||
public:
|
||||
using Ptr = std::shared_ptr<ResourceGCEvent>;
|
||||
|
||||
explicit ResourceGCEvent(const std::string& root_path, class ResourceT::Ptr res) : dir_root_(root_path), res_(res) {
|
||||
explicit ResourceGCEvent(class ResourceT::Ptr res) : res_(res) {
|
||||
}
|
||||
|
||||
~ResourceGCEvent() = default;
|
||||
|
@ -47,7 +47,8 @@ class ResourceGCEvent : public MetaEvent {
|
|||
STATUS_CHECK((*sd_op)(store));
|
||||
|
||||
/* TODO: physically clean resource */
|
||||
std::string res_path = GetResPath<ResourceT>(dir_root_, res_);
|
||||
auto res_prefix = store->GetRootPath();
|
||||
std::string res_path = GetResPath<ResourceT>(res_prefix, res_);
|
||||
/* if (!boost::filesystem::exists(res_path)) { */
|
||||
/* return Status::OK(); */
|
||||
/* } */
|
||||
|
@ -72,7 +73,6 @@ class ResourceGCEvent : public MetaEvent {
|
|||
|
||||
private:
|
||||
class ResourceT::Ptr res_;
|
||||
std::string dir_root_;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
#include <chrono>
|
||||
#include <sstream>
|
||||
|
||||
#include "config/Config.h"
|
||||
#include "db/Utils.h"
|
||||
#include "db/snapshot/Event.h"
|
||||
#include "db/snapshot/EventExecutor.h"
|
||||
#include "db/snapshot/OperationExecutor.h"
|
||||
|
@ -249,15 +247,9 @@ Operations::PostExecute(StorePtr store) {
|
|||
template <typename ResourceT>
|
||||
void
|
||||
ApplyRollBack(std::set<std::shared_ptr<ResourceContext<ResourceT>>>& step_context_set) {
|
||||
auto& config = server::Config::GetInstance();
|
||||
std::string path;
|
||||
config.GetStorageConfigPath(path);
|
||||
auto root_path = utils::ConstructCollectionRootPath(path);
|
||||
|
||||
for (auto& step_context : step_context_set) {
|
||||
auto res = step_context->Resource();
|
||||
|
||||
auto evt_ptr = std::make_shared<ResourceGCEvent<ResourceT>>(root_path, res);
|
||||
auto evt_ptr = std::make_shared<ResourceGCEvent<ResourceT>>(res);
|
||||
EventExecutor::GetInstance().Submit(evt_ptr);
|
||||
std::cout << "Rollback " << typeid(ResourceT).name() << ": " << res->GetID() << std::endl;
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
#include "db/snapshot/Resources.h"
|
||||
#include "utils/Status.h"
|
||||
#include "utils/StringHelpFunctions.h"
|
||||
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
|
@ -35,10 +34,7 @@ template <>
|
|||
inline std::string
|
||||
GetResPath<Collection>(const std::string& root, const Collection::Ptr& res_ptr) {
|
||||
std::stringstream ss;
|
||||
ss << root;
|
||||
if (!StringHelpFunctions::EndWithSlash(root)) {
|
||||
ss << "/";
|
||||
}
|
||||
ss << root << "/";
|
||||
ss << COLLECTION_PREFIX << res_ptr->GetID();
|
||||
|
||||
return ss.str();
|
||||
|
@ -48,10 +44,7 @@ template <>
|
|||
inline std::string
|
||||
GetResPath<Partition>(const std::string& root, const Partition::Ptr& res_ptr) {
|
||||
std::stringstream ss;
|
||||
ss << root;
|
||||
if (!StringHelpFunctions::EndWithSlash(root)) {
|
||||
ss << "/";
|
||||
}
|
||||
ss << root << "/";
|
||||
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
|
||||
ss << PARTITION_PREFIX << res_ptr->GetID();
|
||||
|
||||
|
@ -62,10 +55,7 @@ template <>
|
|||
inline std::string
|
||||
GetResPath<Segment>(const std::string& root, const Segment::Ptr& res_ptr) {
|
||||
std::stringstream ss;
|
||||
ss << root;
|
||||
if (!StringHelpFunctions::EndWithSlash(root)) {
|
||||
ss << "/";
|
||||
}
|
||||
ss << root << "/";
|
||||
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
|
||||
ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/";
|
||||
ss << SEGMENT_PREFIX << res_ptr->GetID();
|
||||
|
@ -77,10 +67,7 @@ template <>
|
|||
inline std::string
|
||||
GetResPath<SegmentFile>(const std::string& root, const SegmentFile::Ptr& res_ptr) {
|
||||
std::stringstream ss;
|
||||
ss << root;
|
||||
if (!StringHelpFunctions::EndWithSlash(root)) {
|
||||
ss << "/";
|
||||
}
|
||||
ss << root << "/";
|
||||
ss << COLLECTION_PREFIX << res_ptr->GetCollectionId() << "/";
|
||||
ss << PARTITION_PREFIX << res_ptr->GetPartitionId() << "/";
|
||||
ss << SEGMENT_PREFIX << res_ptr->GetSegmentId() << "/";
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include "config/Config.h"
|
||||
#include "db/snapshot/Event.h"
|
||||
#include "db/snapshot/EventExecutor.h"
|
||||
#include "db/snapshot/Operations.h"
|
||||
|
@ -139,14 +138,9 @@ class ResourceHolder {
|
|||
|
||||
virtual void
|
||||
OnNoRefCallBack(ResourcePtr resource) {
|
||||
auto& config = server::Config::GetInstance();
|
||||
std::string path;
|
||||
config.GetStorageConfigPath(path);
|
||||
auto root_path = utils::ConstructCollectionRootPath(path);
|
||||
|
||||
resource->Deactivate();
|
||||
Release(resource->GetID());
|
||||
auto evt_ptr = std::make_shared<ResourceGCEvent<ResourceT>>(root_path, resource);
|
||||
auto evt_ptr = std::make_shared<ResourceGCEvent<ResourceT>>(resource);
|
||||
EventExecutor::GetInstance().Submit(evt_ptr);
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include <tuple>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "db/snapshot/Store.h"
|
||||
#include "db/snapshot/Utils.h"
|
||||
#include "db/snapshot/WrappedTypes.h"
|
||||
|
@ -44,7 +45,7 @@ using ScopedResourcesT =
|
|||
class Snapshot : public ReferenceProxy {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<Snapshot>;
|
||||
explicit Snapshot(StorePtr, ID_TYPE);
|
||||
Snapshot(StorePtr, ID_TYPE);
|
||||
|
||||
ID_TYPE
|
||||
GetID() const {
|
||||
|
|
|
@ -49,11 +49,12 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
public:
|
||||
using Ptr = typename std::shared_ptr<Store>;
|
||||
|
||||
explicit Store(meta::MetaAdapterPtr adapter) : adapter_(adapter) {
|
||||
explicit Store(meta::MetaAdapterPtr adapter, const std::string& root_path)
|
||||
: adapter_(adapter), root_path_(root_path) {
|
||||
}
|
||||
|
||||
static Store::Ptr
|
||||
Build(const std::string& uri) {
|
||||
Build(const std::string& uri, const std::string& root_path) {
|
||||
utils::MetaUriInfo uri_info;
|
||||
LOG_ENGINE_DEBUG_ << "MetaUri: " << uri << std::endl;
|
||||
auto status = utils::ParseMetaUri(uri, uri_info);
|
||||
|
@ -69,18 +70,23 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
options.backend_uri_ = uri;
|
||||
auto engine = std::make_shared<meta::MySqlEngine>(options);
|
||||
auto adapter = std::make_shared<meta::MetaAdapter>(engine);
|
||||
return std::make_shared<Store>(adapter);
|
||||
return std::make_shared<Store>(adapter, root_path);
|
||||
} else if (strcasecmp(uri_info.dialect_.c_str(), "mock") == 0) {
|
||||
LOG_ENGINE_INFO_ << "Using Mock. Should only be used in test environment";
|
||||
auto engine = std::make_shared<meta::MockMetaEngine>();
|
||||
auto adapter = std::make_shared<meta::MetaAdapter>(engine);
|
||||
return std::make_shared<Store>(adapter);
|
||||
return std::make_shared<Store>(adapter, root_path);
|
||||
} else {
|
||||
LOG_ENGINE_ERROR_ << "Invalid dialect in URI: dialect = " << uri_info.dialect_;
|
||||
throw InvalidArgumentException("URI dialect is not mysql / sqlite / mock");
|
||||
}
|
||||
}
|
||||
|
||||
std::string
|
||||
GetRootPath() const {
|
||||
return root_path_ + "/tables";
|
||||
}
|
||||
|
||||
template <typename OpT>
|
||||
Status
|
||||
ApplyOperation(OpT& op) {
|
||||
|
@ -364,6 +370,7 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
}
|
||||
|
||||
meta::MetaAdapterPtr adapter_;
|
||||
std::string root_path_;
|
||||
};
|
||||
|
||||
using StorePtr = Store::Ptr;
|
||||
|
|
|
@ -160,13 +160,4 @@ StringHelpFunctions::ConvertToBoolean(const std::string& str, bool& value) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
bool
|
||||
StringHelpFunctions::EndWithSlash(const std::string& path) {
|
||||
if (path.size() == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return *path.rbegin() == '/';
|
||||
}
|
||||
|
||||
} // namespace milvus
|
||||
|
|
|
@ -69,9 +69,6 @@ class StringHelpFunctions {
|
|||
// "false", "off", "no", "0", "" ==> false
|
||||
static Status
|
||||
ConvertToBoolean(const std::string& str, bool& value);
|
||||
|
||||
static bool
|
||||
EndWithSlash(const std::string& path);
|
||||
};
|
||||
|
||||
} // namespace milvus
|
||||
|
|
|
@ -788,11 +788,6 @@ TEST(ValidationUtilTest, VALIDATE_PATH_TEST) {
|
|||
ASSERT_FALSE(milvus::server::ValidateStoragePath("/tmp//milvus").ok());
|
||||
}
|
||||
|
||||
TEST(UtilTest, ENDWITHSLASH_TEST) {
|
||||
ASSERT_TRUE(milvus::StringHelpFunctions::EndWithSlash("/tmp/milvus/"));
|
||||
ASSERT_FALSE(milvus::StringHelpFunctions::EndWithSlash("/tmp/milvus"));
|
||||
}
|
||||
|
||||
TEST(UtilTest, ROLLOUTHANDLER_TEST) {
|
||||
std::string dir1 = "/tmp/milvus_test";
|
||||
std::string dir2 = "/tmp/milvus_test/log_test";
|
||||
|
|
|
@ -141,7 +141,7 @@ void
|
|||
BaseTest::SnapshotStart(bool mock_store) {
|
||||
/* auto uri = "mysql://root:12345678@127.0.0.1:3307/milvus"; */
|
||||
auto uri = "mock://:@:/";
|
||||
auto store = Store::Build(uri);
|
||||
auto store = Store::Build(uri, "/tmp/milvus_ss/db");
|
||||
|
||||
milvus::engine::snapshot::OperationExecutor::Init(store);
|
||||
milvus::engine::snapshot::OperationExecutor::GetInstance().Start();
|
||||
|
|
Loading…
Reference in New Issue