mirror of https://github.com/milvus-io/milvus.git
(db/snapshot) Inactivate gc event (#2960)
* Add API to Get inactivate res in Store Signed-off-by: yhz <413554850@qq.com> * Update select logic in MetaAdapter Signed-off-by: yhz <413554850@qq.com> * Add event test case Signed-off-by: yhz <413554850@qq.com> * Add deactive collection test in SSEventTest.TestInActiveResGcEvent Signed-off-by: yhz <413554850@qq.com> * Add CollectinCommit in event test Signed-off-by: yhz <413554850@qq.com> * Add more res in event test Signed-off-by: yhz <413554850@qq.com> * Set url to mock in ssdb test Signed-off-by: yhz <413554850@qq.com>pull/2978/head
parent
b548b38222
commit
c878700474
|
@ -251,7 +251,8 @@ ValidateDbURI(const std::string& uri) {
|
|||
if (std::regex_match(uri, pieces_match, uriRegex)) {
|
||||
std::string dialect = pieces_match[1].str();
|
||||
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
|
||||
if (dialect.find("mysql") == std::string::npos && dialect.find("sqlite") == std::string::npos) {
|
||||
if (dialect.find("mysql") == std::string::npos && dialect.find("sqlite") == std::string::npos &&
|
||||
dialect.find("mock") == std::string::npos) {
|
||||
LOG_SERVER_ERROR_ << "Invalid dialect in URI: dialect = " << dialect;
|
||||
okay = false;
|
||||
}
|
||||
|
|
|
@ -11,12 +11,15 @@
|
|||
|
||||
#include "db/SSDBImpl.h"
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "config/Config.h"
|
||||
#include "db/IDGenerator.h"
|
||||
#include "db/SnapshotVisitor.h"
|
||||
#include "db/merge/MergeManagerFactory.h"
|
||||
#include "db/merge/SSMergeTask.h"
|
||||
#include "db/snapshot/CompoundOperations.h"
|
||||
#include "db/snapshot/EventExecutor.h"
|
||||
#include "db/snapshot/IterateHandler.h"
|
||||
#include "db/snapshot/OperationExecutor.h"
|
||||
#include "db/snapshot/ResourceHelper.h"
|
||||
#include "db/snapshot/ResourceTypes.h"
|
||||
#include "db/snapshot/Snapshots.h"
|
||||
|
@ -83,6 +86,22 @@ SSDBImpl::Start() {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
// TODO(yhz): Get storage url
|
||||
auto& config = server::Config::GetInstance();
|
||||
std::string path;
|
||||
STATUS_CHECK(config.GetStorageConfigPath(path));
|
||||
|
||||
std::string url;
|
||||
STATUS_CHECK(config.GetGeneralConfigMetaURI(url));
|
||||
|
||||
// snapshot
|
||||
auto store = snapshot::Store::Build(url, path);
|
||||
snapshot::OperationExecutor::Init(store);
|
||||
snapshot::OperationExecutor::GetInstance().Start();
|
||||
snapshot::EventExecutor::Init(store);
|
||||
snapshot::EventExecutor::GetInstance().Start();
|
||||
snapshot::Snapshots::GetInstance().Init(store);
|
||||
|
||||
// LOG_ENGINE_TRACE_ << "DB service start";
|
||||
initialized_.store(true, std::memory_order_release);
|
||||
|
||||
|
@ -176,6 +195,9 @@ SSDBImpl::Stop() {
|
|||
bg_metric_thread_.join();
|
||||
}
|
||||
|
||||
snapshot::EventExecutor::GetInstance().Stop();
|
||||
snapshot::OperationExecutor::GetInstance().Stop();
|
||||
|
||||
// LOG_ENGINE_TRACE_ << "DB service stop";
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ class MetaAdapter {
|
|||
// TODO move select logic to here
|
||||
auto session = CreateSession();
|
||||
std::vector<typename T::Ptr> resources;
|
||||
auto status = session->Select<T, snapshot::ID_TYPE>(snapshot::IdField::Name, id, {}, resources);
|
||||
auto status = session->Select<T, snapshot::ID_TYPE>(snapshot::IdField::Name, {id}, {}, resources);
|
||||
if (status.ok() && !resources.empty()) {
|
||||
// TODO: may need to check num of resources
|
||||
resource = resources.at(0);
|
||||
|
@ -51,18 +51,17 @@ class MetaAdapter {
|
|||
|
||||
template <typename ResourceT, typename U>
|
||||
Status
|
||||
SelectBy(const std::string& field, const U& value, std::vector<typename ResourceT::Ptr>& resources) {
|
||||
SelectBy(const std::string& field, const std::vector<U>& values, std::vector<typename ResourceT::Ptr>& resources) {
|
||||
auto session = CreateSession();
|
||||
return session->Select<ResourceT, U>(field, value, {}, resources);
|
||||
return session->Select<ResourceT, U>(field, values, {}, resources);
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename U>
|
||||
Status
|
||||
SelectResourceIDs(std::vector<int64_t>& ids, const std::string& filter_field, const U& filter_value) {
|
||||
SelectResourceIDs(std::vector<int64_t>& ids, const std::string& filter_field, const std::vector<U>& filter_values) {
|
||||
std::vector<typename ResourceT::Ptr> resources;
|
||||
auto session = CreateSession();
|
||||
std::vector<std::string> target_attrs = {F_ID};
|
||||
auto status = session->Select<ResourceT, U>(filter_field, filter_value, target_attrs, resources);
|
||||
auto status = session->Select<ResourceT, U>(filter_field, filter_values, {F_ID}, resources);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ class MetaSession {
|
|||
public:
|
||||
template <typename ResourceT, typename U>
|
||||
Status
|
||||
Select(const std::string& field, const U& value, const std::vector<std::string>& target_attrs,
|
||||
Select(const std::string& field, const std::vector<U>& value, const std::vector<std::string>& target_attrs,
|
||||
std::vector<typename ResourceT::Ptr>& resources);
|
||||
|
||||
template <typename ResourceT>
|
||||
|
@ -92,15 +92,19 @@ class MetaSession {
|
|||
|
||||
template <typename T, typename U>
|
||||
Status
|
||||
MetaSession::Select(const std::string& field, const U& value, const std::vector<std::string>& target_attrs,
|
||||
std::vector<typename T::Ptr>& resources) {
|
||||
MetaSession::Select(const std::string& field, const std::vector<U>& values,
|
||||
const std::vector<std::string>& target_attrs, std::vector<typename T::Ptr>& resources) {
|
||||
MetaQueryContext context;
|
||||
context.table_ = T::Name;
|
||||
|
||||
if (!field.empty()) {
|
||||
std::string field_value;
|
||||
ResourceFieldToSqlStr(value, field_value);
|
||||
context.filter_attrs_ = {{field, field_value}};
|
||||
std::vector<std::string> field_values;
|
||||
for (auto& v : values) {
|
||||
std::string field_value;
|
||||
ResourceFieldToSqlStr(v, field_value);
|
||||
field_values.push_back(field_value);
|
||||
}
|
||||
context.filter_attrs_ = {{field, field_values}};
|
||||
}
|
||||
|
||||
if (!target_attrs.empty()) {
|
||||
|
@ -139,18 +143,16 @@ MetaSession::Select(const std::string& field, const U& value, const std::vector<
|
|||
iter = raw.find(F_STATE);
|
||||
if (iter != raw.end()) {
|
||||
auto status_int = std::stol(iter->second);
|
||||
sf_p->ResetStatus();
|
||||
switch (static_cast<snapshot::State>(status_int)) {
|
||||
case snapshot::PENDING: {
|
||||
sf_p->ResetStatus();
|
||||
break;
|
||||
}
|
||||
case snapshot::ACTIVE: {
|
||||
sf_p->ResetStatus();
|
||||
sf_p->Activate();
|
||||
break;
|
||||
}
|
||||
case snapshot::DEACTIVE: {
|
||||
sf_p->ResetStatus();
|
||||
sf_p->Deactivate();
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ struct MetaQueryContext {
|
|||
std::string table_;
|
||||
bool all_required_ = true;
|
||||
std::vector<std::string> query_fields_;
|
||||
std::unordered_map<std::string, std::string> filter_attrs_;
|
||||
std::unordered_map<std::string, std::vector<std::string>> filter_attrs_;
|
||||
};
|
||||
|
||||
struct MetaApplyContext {
|
||||
|
|
|
@ -32,9 +32,18 @@ MetaHelper::MetaQueryContextToSql(const MetaQueryContext& context, std::string&
|
|||
|
||||
std::vector<std::string> filter_conditions;
|
||||
for (auto& attr : context.filter_attrs_) {
|
||||
filter_conditions.emplace_back(attr.first + "=" + attr.second);
|
||||
|
||||
std::string filter_str;
|
||||
if (attr.second.size() < 1) {
|
||||
return Status(SERVER_UNEXPECTED_ERROR, "Invalid filter attrs. ");
|
||||
} else if (attr.second.size() == 1) {
|
||||
filter_conditions.emplace_back(attr.first + "=" + attr.second[0]);
|
||||
} else {
|
||||
std::string in_condition;
|
||||
StringHelpFunctions::MergeStringWithDelimeter(attr.second, ",", in_condition);
|
||||
in_condition = attr.first + " IN (" + in_condition + ")";
|
||||
filter_conditions.emplace_back(in_condition);
|
||||
}
|
||||
|
||||
StringHelpFunctions::MergeStringWithDelimeter(filter_conditions, " AND ", filter_str);
|
||||
sql += " WHERE " + filter_str;
|
||||
}
|
||||
|
|
|
@ -48,6 +48,16 @@ MockMetaEngine::QueryNoLock(const MetaQueryContext& context, AttrsMapList& attrs
|
|||
}
|
||||
};
|
||||
|
||||
auto term = [](const std::string& attr, const std::vector<std::string>& attrs) -> bool {
|
||||
for (auto& t : attrs) {
|
||||
if (attr == t) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
auto& candidate_raws = resources_[context.table_];
|
||||
|
||||
bool selected = true;
|
||||
|
@ -55,7 +65,12 @@ MockMetaEngine::QueryNoLock(const MetaQueryContext& context, AttrsMapList& attrs
|
|||
for (auto& raw : candidate_raws) {
|
||||
for (auto& filter_attr : context.filter_attrs_) {
|
||||
auto iter = raw.find(filter_attr.first);
|
||||
if (iter == raw.end() || iter->second != filter_attr.second) {
|
||||
if (iter == raw.end()) {
|
||||
selected = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!term(iter->second, filter_attr.second)) {
|
||||
selected = false;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "db/snapshot/Event.h"
|
||||
#include "db/snapshot/EventExecutor.h"
|
||||
#include "db/snapshot/Operations.h"
|
||||
#include "db/snapshot/ResourceHelper.h"
|
||||
#include "db/snapshot/Store.h"
|
||||
|
@ -42,8 +43,53 @@ class InActiveResourcesGCEvent : public MetaEvent, public Operations {
|
|||
Status
|
||||
OnExecute(StorePtr store) override {
|
||||
std::cout << "Executing InActiveResourcesGCEvent" << std::endl;
|
||||
|
||||
STATUS_CHECK(ClearInActiveResources<Collection>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<CollectionCommit>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<Partition>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<PartitionCommit>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<Segment>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<SegmentCommit>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<SegmentFile>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<SchemaCommit>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<Field>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<FieldCommit>(store));
|
||||
STATUS_CHECK(ClearInActiveResources<FieldElement>(store));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename ResourceT>
|
||||
Status
|
||||
ClearInActiveResources(StorePtr store) {
|
||||
std::vector<typename ResourceT::Ptr> resources;
|
||||
STATUS_CHECK(store->GetInActiveResources<ResourceT>(resources));
|
||||
|
||||
for (auto& res : resources) {
|
||||
std::string res_path = GetResPath<ResourceT>(dir_root_, res);
|
||||
if (res_path.empty()) {
|
||||
/* std::cout << "[GC] No remove action for " << res_->ToString() << std::endl; */
|
||||
} else if (boost::filesystem::is_directory(res_path)) {
|
||||
auto ok = boost::filesystem::remove_all(res_path);
|
||||
/* std::cout << "[GC] Remove dir " << res_->ToString() << " " << res_path << " " << ok << std::endl; */
|
||||
} else if (boost::filesystem::is_regular_file(res_path)) {
|
||||
auto ok = boost::filesystem::remove(res_path);
|
||||
/* std::cout << "[GC] Remove file " << res_->ToString() << " " << res_path << " " << ok << std::endl; */
|
||||
} else {
|
||||
std::cout << "[GC] Remove stale " << res_path << " for " << res->ToString() << std::endl;
|
||||
}
|
||||
|
||||
/* remove resource from meta */
|
||||
auto hd_op = std::make_shared<HardDeleteOperation<ResourceT>>(res->GetID());
|
||||
STATUS_CHECK((*hd_op)(store));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
std::string dir_root_;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
|
|
|
@ -147,7 +147,7 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
GetCollection(const std::string& name, CollectionPtr& return_v) {
|
||||
// TODO: Get active collection
|
||||
std::vector<CollectionPtr> resources;
|
||||
auto status = adapter_->SelectBy<Collection>(NameField::Name, name, resources);
|
||||
auto status = adapter_->SelectBy<Collection, std::string>(NameField::Name, {name}, resources);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
@ -162,6 +162,13 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
return Status(SS_NOT_FOUND_ERROR, "DB resource not found");
|
||||
}
|
||||
|
||||
template <typename ResourceT>
|
||||
Status
|
||||
GetInActiveResources(std::vector<typename ResourceT::Ptr>& return_vs) {
|
||||
std::vector<State> filter_states = {State::PENDING, State::DEACTIVE};
|
||||
return adapter_->SelectBy<ResourceT>(StateField::Name, filter_states, return_vs);
|
||||
}
|
||||
|
||||
template <typename ResourceT>
|
||||
Status
|
||||
RemoveResource(ID_TYPE id) {
|
||||
|
@ -176,7 +183,7 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
AllActiveCollectionIds(bool reversed = true) const {
|
||||
IDS_TYPE ids;
|
||||
IDS_TYPE selected_ids;
|
||||
adapter_->SelectResourceIDs<Collection, std::string>(selected_ids, "", "");
|
||||
adapter_->SelectResourceIDs<Collection, std::string>(selected_ids, "", {""});
|
||||
|
||||
if (!reversed) {
|
||||
ids = selected_ids;
|
||||
|
@ -192,7 +199,7 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
IDS_TYPE
|
||||
AllActiveCollectionCommitIds(ID_TYPE collection_id, bool reversed = true) const {
|
||||
IDS_TYPE ids, selected_ids;
|
||||
adapter_->SelectResourceIDs<CollectionCommit, int64_t>(selected_ids, meta::F_COLLECTON_ID, collection_id);
|
||||
adapter_->SelectResourceIDs<CollectionCommit, int64_t>(selected_ids, meta::F_COLLECTON_ID, {collection_id});
|
||||
|
||||
if (!reversed) {
|
||||
ids = selected_ids;
|
||||
|
|
|
@ -19,6 +19,7 @@ set(test_files
|
|||
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_meta.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_job.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_task.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_event.cpp
|
||||
)
|
||||
|
||||
add_executable(test_ssdb
|
||||
|
@ -60,6 +61,6 @@ target_link_libraries(test_ssdb
|
|||
metrics
|
||||
stdc++
|
||||
${unittest_libs}
|
||||
oatpp)
|
||||
)
|
||||
|
||||
install(TARGETS test_ssdb DESTINATION unittest)
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "db/snapshot/InActiveResourcesGCEvent.h"
|
||||
#include "ssdb/utils.h"
|
||||
|
||||
using CollectionCommit = milvus::engine::snapshot::CollectionCommit;
|
||||
using CollectionCommitPtr = milvus::engine::snapshot::CollectionCommitPtr;
|
||||
using PartitionCommit = milvus::engine::snapshot::PartitionCommit;
|
||||
using PartitionCommitPtr = milvus::engine::snapshot::PartitionCommitPtr;
|
||||
using SegmentCommit = milvus::engine::snapshot::SegmentCommit;
|
||||
using SegmentCommitPtr = milvus::engine::snapshot::SegmentCommitPtr;
|
||||
using SchemaCommit = milvus::engine::snapshot::SchemaCommit;
|
||||
using FieldCommit = milvus::engine::snapshot::FieldCommit;
|
||||
|
||||
using FType = milvus::engine::FieldType;
|
||||
using FEType = milvus::engine::FieldElementType;
|
||||
|
||||
using InActiveResourcesGCEvent = milvus::engine::snapshot::InActiveResourcesGCEvent;
|
||||
|
||||
TEST_F(SSEventTest, TestInActiveResGcEvent) {
|
||||
CollectionPtr collection;
|
||||
auto status = store_->CreateResource(Collection("test_gc_c1"), collection);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
CollectionPtr inactive_collection;
|
||||
auto c = Collection("test_gc_c2");
|
||||
c.Deactivate();
|
||||
status = store_->CreateResource(std::move(c), inactive_collection);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
CollectionPtr active_collection;
|
||||
auto c_2 = Collection("test_gc_c3");
|
||||
c_2.Activate();
|
||||
status = store_->CreateResource(std::move(c_2), active_collection);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
PartitionPtr partition;
|
||||
status = store_->CreateResource<Partition>(Partition("test_gc_c1_p1", collection->GetID()), partition);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
PartitionPtr inactive_partition;
|
||||
auto p = Partition("test_gc_c1_p2", collection->GetID());
|
||||
p.Deactivate();
|
||||
status = store_->CreateResource<Partition>(std::move(p), inactive_partition);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
PartitionCommitPtr partition_commit;
|
||||
status = store_->CreateResource<PartitionCommit>(PartitionCommit(collection->GetID(), partition->GetID()),
|
||||
partition_commit);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
CollectionCommitPtr collection_commit;
|
||||
status = store_->CreateResource<CollectionCommit>(CollectionCommit(0, 0), collection_commit);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
SegmentPtr s;
|
||||
status = store_->CreateResource<Segment>(Segment(collection->GetID(), partition->GetCollectionId()), s);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
Field::Ptr field;
|
||||
status = store_->CreateResource<Field>(Field("f_0", 0, FType::INT64), field);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
FieldElementPtr field_element;
|
||||
status = store_->CreateResource<FieldElement>(
|
||||
FieldElement(collection->GetID(), field->GetID(), "fe_0", FEType::FET_INDEX), field_element);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
FieldCommit::Ptr field_commit;
|
||||
status = store_->CreateResource<FieldCommit>(FieldCommit(collection->GetID(), field->GetID()), field_commit);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
SchemaCommit::Ptr schema;
|
||||
status = store_->CreateResource<SchemaCommit>(SchemaCommit(collection->GetID(), {}), schema);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
SegmentFilePtr seg_file;
|
||||
status = store_->CreateResource<SegmentFile>(
|
||||
SegmentFile(collection->GetID(), partition->GetID(), s->GetID(), field_element->GetID()), seg_file);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
SegmentCommitPtr sc;
|
||||
status = store_->CreateResource<SegmentCommit>(SegmentCommit(schema->GetID(), partition->GetID(), s->GetID()), sc);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
CollectionCommitPtr inactive_collection_commit;
|
||||
auto cc = CollectionCommit(collection->GetID(), schema->GetID());
|
||||
cc.Deactivate();
|
||||
status = store_->CreateResource<CollectionCommit>(std::move(cc), inactive_collection_commit);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
// TODO(yhz): Check if disk file has been deleted
|
||||
|
||||
auto event = std::make_shared<InActiveResourcesGCEvent>();
|
||||
status = event->Process(store_);
|
||||
// milvus::engine::snapshot::EventExecutor::GetInstance().Submit(event);
|
||||
// status = event->WaitToFinish();
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
std::vector<FieldElementPtr> field_elements;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<FieldElement>(field_elements).ok());
|
||||
ASSERT_TRUE(field_elements.empty());
|
||||
|
||||
std::vector<Field::Ptr> fields;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<Field>(fields).ok());
|
||||
ASSERT_TRUE(fields.empty());
|
||||
|
||||
std::vector<FieldCommit::Ptr> field_commits;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<FieldCommit>(field_commits).ok());
|
||||
ASSERT_TRUE(field_commits.empty());
|
||||
|
||||
std::vector<SegmentFilePtr> seg_files;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<SegmentFile>(seg_files).ok());
|
||||
ASSERT_TRUE(seg_files.empty());
|
||||
|
||||
std::vector<SegmentCommitPtr> seg_commits;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<SegmentCommit>(seg_commits).ok());
|
||||
ASSERT_TRUE(seg_commits.empty());
|
||||
|
||||
std::vector<SegmentPtr> segs;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<Segment>(segs).ok());
|
||||
ASSERT_TRUE(segs.empty());
|
||||
|
||||
std::vector<SchemaCommit::Ptr> schemas;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<SchemaCommit>(schemas).ok());
|
||||
ASSERT_TRUE(schemas.empty());
|
||||
|
||||
std::vector<PartitionCommitPtr> partition_commits;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<PartitionCommit>(partition_commits).ok());
|
||||
ASSERT_TRUE(partition_commits.empty());
|
||||
|
||||
std::vector<PartitionPtr> partitions;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<Partition>(partitions).ok());
|
||||
ASSERT_TRUE(partitions.empty());
|
||||
|
||||
std::vector<CollectionPtr> collections;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<Collection>(collections).ok());
|
||||
ASSERT_TRUE(collections.empty());
|
||||
|
||||
std::vector<CollectionCommitPtr> collection_commits;
|
||||
ASSERT_TRUE(store_->GetInActiveResources<CollectionCommit>(collection_commits).ok());
|
||||
ASSERT_TRUE(collection_commits.empty());
|
||||
}
|
|
@ -143,25 +143,26 @@ TEST_F(SSMetaTest, SelectTest) {
|
|||
ASSERT_GT(collection2->GetID(), collection->GetID());
|
||||
|
||||
std::vector<Collection::Ptr> return_collections;
|
||||
status = meta_->SelectBy<Collection, ID_TYPE>(milvus::engine::meta::F_ID, collection2->GetID(), return_collections);
|
||||
status = meta_->SelectBy<Collection, ID_TYPE>(milvus::engine::meta::F_ID,
|
||||
{collection2->GetID()}, return_collections);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
ASSERT_EQ(return_collections.size(), 1);
|
||||
ASSERT_EQ(return_collections.at(0)->GetID(), collection2->GetID());
|
||||
ASSERT_EQ(return_collections.at(0)->GetName(), collection2->GetName());
|
||||
return_collections.clear();
|
||||
|
||||
status = meta_->SelectBy<Collection, State>(milvus::engine::meta::F_STATE, State::ACTIVE, return_collections);
|
||||
status = meta_->SelectBy<Collection, State>(milvus::engine::meta::F_STATE, {State::ACTIVE}, return_collections);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
ASSERT_EQ(return_collections.size(), 2);
|
||||
|
||||
std::vector<ID_TYPE> ids;
|
||||
status = meta_->SelectResourceIDs<Collection, std::string>(ids, "", "");
|
||||
status = meta_->SelectResourceIDs<Collection, std::string>(ids, "", {""});
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
ASSERT_EQ(ids.size(), 2);
|
||||
|
||||
ids.clear();
|
||||
status = meta_->SelectResourceIDs<Collection, std::string>(ids, milvus::engine::meta::F_NAME,
|
||||
collection->GetName());
|
||||
{collection->GetName()});
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
ASSERT_EQ(ids.size(), 1);
|
||||
ASSERT_EQ(ids.at(0), collection->GetID());
|
||||
|
|
|
@ -51,7 +51,7 @@ static const char* CONFIG_STR =
|
|||
"\n"
|
||||
"general:\n"
|
||||
" timezone: UTC+8\n"
|
||||
" meta_uri: sqlite://:@:/\n"
|
||||
" meta_uri: mock://:@:/\n"
|
||||
"\n"
|
||||
"network:\n"
|
||||
" bind.address: 0.0.0.0\n"
|
||||
|
@ -141,7 +141,11 @@ void
|
|||
BaseTest::SnapshotStart(bool mock_store) {
|
||||
/* auto uri = "mysql://root:12345678@127.0.0.1:3307/milvus"; */
|
||||
auto uri = "mock://:@:/";
|
||||
auto store = Store::Build(uri, "/tmp/milvus_ss/db");
|
||||
auto& config = milvus::server::Config::GetInstance();
|
||||
config.SetGeneralConfigMetaURI(uri);
|
||||
std::string path = "/tmp/milvus_ss/db";
|
||||
config.SetStorageConfigPath(path);
|
||||
auto store = Store::Build(uri, path);
|
||||
|
||||
milvus::engine::snapshot::OperationExecutor::Init(store);
|
||||
milvus::engine::snapshot::OperationExecutor::GetInstance().Start();
|
||||
|
@ -249,6 +253,9 @@ SSSegmentTest::TearDown() {
|
|||
void
|
||||
SSMetaTest::SetUp() {
|
||||
auto engine = std::make_shared<milvus::engine::meta::MockMetaEngine>();
|
||||
// milvus::engine::DBMetaOptions options;
|
||||
// options.backend_uri_ = "mysql://root:12345678@127.0.0.1:3307/milvus";
|
||||
// auto engine = std::make_shared<milvus::engine::meta::MySqlEngine>(options);
|
||||
meta_ = std::make_shared<milvus::engine::meta::MetaAdapter>(engine);
|
||||
meta_->TruncateAll();
|
||||
}
|
||||
|
@ -297,6 +304,17 @@ SSSchedulerTest::TearDown() {
|
|||
BaseTest::TearDown();
|
||||
}
|
||||
|
||||
void
|
||||
SSEventTest::SetUp() {
|
||||
auto uri = "mock://:@:/";
|
||||
store_ = Store::Build(uri, "/tmp/milvus_ss/db");
|
||||
store_->DoReset();
|
||||
}
|
||||
|
||||
void
|
||||
SSEventTest::TearDown() {
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
int
|
||||
main(int argc, char **argv) {
|
||||
|
|
|
@ -82,6 +82,7 @@ using SSDBImpl = milvus::engine::SSDBImpl;
|
|||
using Status = milvus::Status;
|
||||
using Store = milvus::engine::snapshot::Store;
|
||||
|
||||
using StorePtr = milvus::engine::snapshot::Store::Ptr;
|
||||
using MetaAdapterPtr = milvus::engine::meta::MetaAdapterPtr;
|
||||
|
||||
inline int
|
||||
|
@ -353,3 +354,14 @@ class SSSchedulerTest : public BaseTest {
|
|||
void
|
||||
TearDown() override;
|
||||
};
|
||||
|
||||
class SSEventTest : public BaseTest {
|
||||
protected:
|
||||
StorePtr store_;
|
||||
|
||||
protected:
|
||||
void
|
||||
SetUp() override;
|
||||
void
|
||||
TearDown() override;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue