mirror of https://github.com/milvus-io/milvus.git
(db/snapshot): LSN logic change and design EventHandlerFactory (#2815)
* (db/snapshot): add more visitors Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): fix lint error Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): add some Iterators Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): update visitors Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): add event handler factory Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): update Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): process lsn Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): fix lint error Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): small change Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): precheck duplicate collection in create collection operation Signed-off-by: peng.xu <peng.xu@zilliz.com>pull/2835/head^2
parent
141835a7c7
commit
6c5f42f9f1
|
@ -262,8 +262,6 @@ SSDBImpl::CreatePartition(const std::string& collection_name, const std::string&
|
|||
if (options_.wal_enable_) {
|
||||
// SS TODO
|
||||
/* lsn = wal_mgr_->CreatePartition(collection_id, partition_tag); */
|
||||
} else {
|
||||
lsn = ss->GetCollection()->GetLsn();
|
||||
}
|
||||
|
||||
snapshot::OperationContext context;
|
||||
|
@ -316,8 +314,6 @@ SSDBImpl::DropIndex(const std::string& collection_name, const std::string& field
|
|||
// SS TODO: Check Index Type
|
||||
|
||||
snapshot::OperationContext context;
|
||||
// SS TODO: no lsn for drop index
|
||||
context.lsn = ss->GetCollectionCommit()->GetLsn();
|
||||
STATUS_CHECK(ss->GetFieldElement(field_name, field_element_name, context.stale_field_element));
|
||||
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
|
||||
STATUS_CHECK(op->Push());
|
||||
|
|
|
@ -521,6 +521,14 @@ Status
|
|||
CreateCollectionOperation::DoExecute(Store& store) {
|
||||
// TODO: Do some checks
|
||||
CollectionPtr collection;
|
||||
ScopedSnapshotT ss;
|
||||
Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection->GetName());
|
||||
if (ss) {
|
||||
std::stringstream emsg;
|
||||
emsg << GetRepr() << ". Duplicated collection " << c_context_.collection->GetName();
|
||||
return Status(SS_DUPLICATED_ERROR, emsg.str());
|
||||
}
|
||||
|
||||
auto status = store.CreateCollection(Collection(c_context_.collection->GetName()), collection);
|
||||
if (!status.ok()) {
|
||||
std::cerr << status.ToString() << std::endl;
|
||||
|
|
|
@ -44,9 +44,11 @@ class CompoundBaseOperation : public Operations {
|
|||
Status
|
||||
PreCheck() override {
|
||||
// TODO
|
||||
/* if (GetContextLsn() <= GetStartedSS()->GetMaxLsn()) { */
|
||||
/* return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation"); */
|
||||
/* } */
|
||||
if (GetContextLsn() == 0) {
|
||||
SetContextLsn(GetStartedSS()->GetMaxLsn());
|
||||
} else if (GetContextLsn() <= GetStartedSS()->GetMaxLsn()) {
|
||||
return Status(SS_INVALID_CONTEX_ERROR, "Invalid LSN found in operation");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -50,9 +50,9 @@ class ResourceGCEvent : public Event {
|
|||
|
||||
/* TODO: physically clean resource */
|
||||
std::string res_path = GetResPath<ResourceT>(res_);
|
||||
if (!boost::filesystem::exists(res_path)) {
|
||||
return Status::OK();
|
||||
}
|
||||
/* if (!boost::filesystem::exists(res_path)) { */
|
||||
/* return Status::OK(); */
|
||||
/* } */
|
||||
if (boost::filesystem::is_directory(res_path)) {
|
||||
boost::filesystem::remove_all(res_path);
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "db/snapshot/Event.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
|
||||
class IEventHandler {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<IEventHandler>;
|
||||
static constexpr const char* EventName = "";
|
||||
virtual const char*
|
||||
GetEventName() const {
|
||||
return EventName;
|
||||
}
|
||||
};
|
||||
|
||||
class IEventHandlerRegistrar {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<IEventHandlerRegistrar>;
|
||||
|
||||
virtual IEventHandler::Ptr
|
||||
GetHandler() = 0;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class HandlerFactory {
|
||||
public:
|
||||
using ThisT = HandlerFactory<T>;
|
||||
|
||||
static ThisT&
|
||||
GetInstance() {
|
||||
static ThisT factory;
|
||||
return factory;
|
||||
}
|
||||
|
||||
IEventHandler::Ptr
|
||||
GetHandler(const std::string& event_name) {
|
||||
auto it = registry_.find(event_name);
|
||||
if (it == registry_.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
return it->second->GetHandler();
|
||||
}
|
||||
|
||||
void
|
||||
Register(IEventHandlerRegistrar* registrar, const std::string& event_name) {
|
||||
auto it = registry_.find(event_name);
|
||||
if (it == registry_.end()) {
|
||||
registry_[event_name] = registrar;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<std::string, IEventHandlerRegistrar*> registry_;
|
||||
};
|
||||
|
||||
template <typename T, typename HandlerT>
|
||||
class EventHandlerRegistrar : public IEventHandlerRegistrar {
|
||||
public:
|
||||
using FactoryT = HandlerFactory<T>;
|
||||
using HandlerPtr = typename HandlerT::Ptr;
|
||||
explicit EventHandlerRegistrar(const std::string& event_name) : event_name_(event_name) {
|
||||
auto& factory = FactoryT::GetInstance();
|
||||
factory.Register(this, event_name_);
|
||||
}
|
||||
|
||||
HandlerPtr
|
||||
GetHandler() {
|
||||
return std::make_shared<HandlerT>();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::string event_name_;
|
||||
};
|
||||
|
||||
#define REGISTER_HANDLER(EXECUTOR, HANDLER) \
|
||||
namespace { \
|
||||
static milvus::engine::snapshot::EventHandlerRegistrar<EXECUTOR, HANDLER> EXECUTOR##HANDLER##_registrar( \
|
||||
HANDLER ::EventName); \
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -59,6 +59,11 @@ class Operations : public std::enable_shared_from_this<Operations> {
|
|||
return context_.lsn;
|
||||
}
|
||||
|
||||
void
|
||||
SetContextLsn(LSN_TYPE lsn) {
|
||||
context_.lsn = lsn;
|
||||
}
|
||||
|
||||
virtual Status
|
||||
CheckStale(const CheckStaleFunc& checker = nullptr) const;
|
||||
virtual Status
|
||||
|
|
|
@ -197,10 +197,6 @@ class Store {
|
|||
CreateCollection(Collection&& collection, CollectionPtr& return_v) {
|
||||
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
auto& resources = std::get<Collection::MapT>(resources_);
|
||||
if (!collection.HasAssigned() && (name_ids_.find(collection.GetName()) != name_ids_.end()) &&
|
||||
(resources[name_ids_[collection.GetName()]]->IsActive()) && !collection.IsDeactive()) {
|
||||
return Status(SS_DUPLICATED_ERROR, "Duplicated");
|
||||
}
|
||||
auto c = std::make_shared<Collection>(collection);
|
||||
auto& id = std::get<Index<Collection::MapT, MockResourcesT>::value>(ids_);
|
||||
c->SetID(++id);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include <algorithm>
|
||||
|
||||
#include "ssdb/utils.h"
|
||||
#include "db/snapshot/HandlerFactory.h"
|
||||
|
||||
TEST_F(SnapshotTest, ResourcesTest) {
|
||||
int nprobe = 16;
|
||||
|
@ -416,33 +417,32 @@ TEST_F(SnapshotTest, PartitionTest) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Open this test later
|
||||
/* TEST_F(SnapshotTest, PartitionTest2) { */
|
||||
/* std::string collection_name("c1"); */
|
||||
/* LSN_TYPE lsn = 1; */
|
||||
/* milvus::Status status; */
|
||||
TEST_F(SnapshotTest, PartitionTest2) {
|
||||
std::string collection_name("c1");
|
||||
LSN_TYPE lsn = 1;
|
||||
milvus::Status status;
|
||||
|
||||
/* auto ss = CreateCollection(collection_name, ++lsn); */
|
||||
/* ASSERT_TRUE(ss); */
|
||||
/* ASSERT_EQ(lsn, ss->GetMaxLsn()); */
|
||||
auto ss = CreateCollection(collection_name, ++lsn);
|
||||
ASSERT_TRUE(ss);
|
||||
ASSERT_EQ(lsn, ss->GetMaxLsn());
|
||||
|
||||
/* OperationContext context; */
|
||||
/* context.lsn = lsn; */
|
||||
/* auto cp_op = std::make_shared<CreatePartitionOperation>(context, ss); */
|
||||
/* std::string partition_name("p1"); */
|
||||
/* PartitionContext p_ctx; */
|
||||
/* p_ctx.name = partition_name; */
|
||||
/* PartitionPtr partition; */
|
||||
/* status = cp_op->CommitNewPartition(p_ctx, partition); */
|
||||
/* ASSERT_TRUE(status.ok()); */
|
||||
/* ASSERT_TRUE(partition); */
|
||||
/* ASSERT_EQ(partition->GetName(), partition_name); */
|
||||
/* ASSERT_FALSE(partition->IsActive()); */
|
||||
/* ASSERT_TRUE(partition->HasAssigned()); */
|
||||
OperationContext context;
|
||||
context.lsn = lsn;
|
||||
auto cp_op = std::make_shared<CreatePartitionOperation>(context, ss);
|
||||
std::string partition_name("p1");
|
||||
PartitionContext p_ctx;
|
||||
p_ctx.name = partition_name;
|
||||
PartitionPtr partition;
|
||||
status = cp_op->CommitNewPartition(p_ctx, partition);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_TRUE(partition);
|
||||
ASSERT_EQ(partition->GetName(), partition_name);
|
||||
ASSERT_FALSE(partition->IsActive());
|
||||
ASSERT_TRUE(partition->HasAssigned());
|
||||
|
||||
/* status = cp_op->Push(); */
|
||||
/* ASSERT_FALSE(status.ok()); */
|
||||
/* } */
|
||||
status = cp_op->Push();
|
||||
ASSERT_FALSE(status.ok());
|
||||
}
|
||||
|
||||
TEST_F(SnapshotTest, IndexTest) {
|
||||
LSN_TYPE lsn = 0;
|
||||
|
@ -1550,3 +1550,47 @@ TEST_F(SnapshotTest, CompoundTest2) {
|
|||
ASSERT_EQ(final_segments, expect_segments);
|
||||
// TODO: Check Total Segment Files Cnt
|
||||
}
|
||||
|
||||
struct GCSchedule {
|
||||
static constexpr const char* Name = "GCSchedule";
|
||||
};
|
||||
|
||||
struct FlushSchedule {
|
||||
static constexpr const char* Name = "FlushSchedule";
|
||||
};
|
||||
|
||||
using IEventHandler = milvus::engine::snapshot::IEventHandler;
|
||||
/* struct SampleHandler : public IEventHandler { */
|
||||
/* static constexpr const char* EventName = "SampleHandler"; */
|
||||
/* const char* */
|
||||
/* GetEventName() const override { */
|
||||
/* return EventName; */
|
||||
/* } */
|
||||
/* }; */
|
||||
|
||||
REGISTER_HANDLER(GCSchedule, IEventHandler);
|
||||
/* REGISTER_HANDLER(GCSchedule, SampleHandler); */
|
||||
REGISTER_HANDLER(FlushSchedule, IEventHandler);
|
||||
/* REGISTER_HANDLER(FlushSchedule, SampleHandler); */
|
||||
|
||||
using GCScheduleFactory = milvus::engine::snapshot::HandlerFactory<GCSchedule>;
|
||||
using FlushScheduleFactory = milvus::engine::snapshot::HandlerFactory<GCSchedule>;
|
||||
|
||||
TEST_F(SnapshotTest, RegistryTest) {
|
||||
{
|
||||
auto& factory = GCScheduleFactory::GetInstance();
|
||||
auto ihandler = factory.GetHandler(IEventHandler::EventName);
|
||||
ASSERT_TRUE(ihandler);
|
||||
/* auto sihandler = factory.GetHandler(SampleHandler::EventName); */
|
||||
/* ASSERT_TRUE(sihandler); */
|
||||
/* ASSERT_EQ(SampleHandler::EventName, sihandler->GetEventName()); */
|
||||
}
|
||||
{
|
||||
/* auto& factory = FlushScheduleFactory::GetInstance(); */
|
||||
/* auto ihandler = factory.GetHandler(IEventHandler::EventName); */
|
||||
/* ASSERT_TRUE(ihandler); */
|
||||
/* auto sihandler = factory.GetHandler(SampleHandler::EventName); */
|
||||
/* ASSERT_TRUE(sihandler); */
|
||||
/* ASSERT_EQ(SampleHandler::EventName, sihandler->GetEventName()); */
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue