mirror of https://github.com/milvus-io/milvus.git
(db/snapshot): add GC event and implement crtp for base resource (#2926)
* (db/snapshot): update for row count Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): fix bug in NewSegmentOperation Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): remove dummy print Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): Add some test for row count Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): update size logic Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): update size logic related ut Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): rollback if operation is not done Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): clean store Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): remove some dependency Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): update for store Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): update Store.h Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): update store related code Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): add field element modification operation Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): change new operation name Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): fix lint error Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): Add Segment File Operation Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): crtp for BaseResource Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): add InActiveResourcesGCEvent Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): fix ut error Signed-off-by: peng.xu <peng.xu@zilliz.com> * (db/snapshot): small change Signed-off-by: peng.xu <peng.xu@zilliz.com>pull/2946/head
parent
dc93524df0
commit
6c301be1bf
|
@ -12,20 +12,23 @@
|
|||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
#include "ReferenceProxy.h"
|
||||
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
template <typename DerivedT>
|
||||
class BaseResource : public ReferenceProxy {
|
||||
public:
|
||||
virtual std::string
|
||||
ToString() const {
|
||||
return std::string();
|
||||
std::stringstream ss;
|
||||
const DerivedT& derived = static_cast<const DerivedT&>(*this);
|
||||
ss << DerivedT::Name << ": id=" << derived.GetID() << " state=" << derived.GetState();
|
||||
return ss.str();
|
||||
}
|
||||
};
|
||||
|
||||
using BaseResourcePtr = std::shared_ptr<BaseResource>;
|
||||
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -51,10 +51,16 @@ class ResourceGCEvent : public MetaEvent {
|
|||
/* if (!boost::filesystem::exists(res_path)) { */
|
||||
/* return Status::OK(); */
|
||||
/* } */
|
||||
if (boost::filesystem::is_directory(res_path)) {
|
||||
boost::filesystem::remove_all(res_path);
|
||||
if (res_path.empty()) {
|
||||
/* std::cout << "[GC] No remove action for " << res_->ToString() << std::endl; */
|
||||
} else if (boost::filesystem::is_directory(res_path)) {
|
||||
auto ok = boost::filesystem::remove_all(res_path);
|
||||
/* std::cout << "[GC] Remove dir " << res_->ToString() << " " << res_path << " " << ok << std::endl; */
|
||||
} else if (boost::filesystem::is_regular_file(res_path)) {
|
||||
auto ok = boost::filesystem::remove(res_path);
|
||||
/* std::cout << "[GC] Remove file " << res_->ToString() << " " << res_path << " " << ok << std::endl; */
|
||||
} else {
|
||||
boost::filesystem::remove(res_path);
|
||||
std::cout << "[GC] Remove stale " << res_path << " for " << res_->ToString() << std::endl;
|
||||
}
|
||||
|
||||
/* remove resource from meta */
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "db/snapshot/Event.h"
|
||||
#include "db/snapshot/Operations.h"
|
||||
#include "db/snapshot/ResourceHelper.h"
|
||||
#include "db/snapshot/Store.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
|
||||
class InActiveResourcesGCEvent : public MetaEvent, public Operations {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<InActiveResourcesGCEvent>;
|
||||
|
||||
InActiveResourcesGCEvent() : Operations(OperationContext(), ScopedSnapshotT(), OperationsType::O_Leaf) {
|
||||
}
|
||||
|
||||
~InActiveResourcesGCEvent() = default;
|
||||
|
||||
Status
|
||||
Process(StorePtr store) override {
|
||||
return store->Apply(*this);
|
||||
}
|
||||
|
||||
Status
|
||||
OnExecute(StorePtr store) override {
|
||||
std::cout << "Executing InActiveResourcesGCEvent" << std::endl;
|
||||
return Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -435,7 +435,7 @@ class RowCountField {
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class Collection : public BaseResource,
|
||||
class Collection : public BaseResource<Collection>,
|
||||
public NameField,
|
||||
public ParamsField,
|
||||
public IdField,
|
||||
|
@ -458,7 +458,7 @@ class Collection : public BaseResource,
|
|||
|
||||
using CollectionPtr = Collection::Ptr;
|
||||
|
||||
class CollectionCommit : public BaseResource,
|
||||
class CollectionCommit : public BaseResource<CollectionCommit>,
|
||||
public CollectionIdField,
|
||||
public SchemaIdField,
|
||||
public MappingsField,
|
||||
|
@ -485,7 +485,7 @@ using CollectionCommitPtr = CollectionCommit::Ptr;
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class Partition : public BaseResource,
|
||||
class Partition : public BaseResource<Partition>,
|
||||
public NameField,
|
||||
public CollectionIdField,
|
||||
public IdField,
|
||||
|
@ -507,7 +507,7 @@ class Partition : public BaseResource,
|
|||
|
||||
using PartitionPtr = Partition::Ptr;
|
||||
|
||||
class PartitionCommit : public BaseResource,
|
||||
class PartitionCommit : public BaseResource<PartitionCommit>,
|
||||
public CollectionIdField,
|
||||
public PartitionIdField,
|
||||
public MappingsField,
|
||||
|
@ -538,7 +538,7 @@ using PartitionCommitPtr = PartitionCommit::Ptr;
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class Segment : public BaseResource,
|
||||
class Segment : public BaseResource<Segment>,
|
||||
public CollectionIdField,
|
||||
public PartitionIdField,
|
||||
public NumField,
|
||||
|
@ -565,7 +565,7 @@ class Segment : public BaseResource,
|
|||
|
||||
using SegmentPtr = Segment::Ptr;
|
||||
|
||||
class SegmentCommit : public BaseResource,
|
||||
class SegmentCommit : public BaseResource<SegmentCommit>,
|
||||
public SchemaIdField,
|
||||
public PartitionIdField,
|
||||
public SegmentIdField,
|
||||
|
@ -597,7 +597,7 @@ using SegmentCommitPtr = SegmentCommit::Ptr;
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class SegmentFile : public BaseResource,
|
||||
class SegmentFile : public BaseResource<SegmentFile>,
|
||||
public CollectionIdField,
|
||||
public PartitionIdField,
|
||||
public SegmentIdField,
|
||||
|
@ -626,7 +626,7 @@ using SegmentFilePtr = SegmentFile::Ptr;
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class SchemaCommit : public BaseResource,
|
||||
class SchemaCommit : public BaseResource<SchemaCommit>,
|
||||
public CollectionIdField,
|
||||
public MappingsField,
|
||||
public IdField,
|
||||
|
@ -651,7 +651,7 @@ using SchemaCommitPtr = SchemaCommit::Ptr;
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class Field : public BaseResource,
|
||||
class Field : public BaseResource<Field>,
|
||||
public NameField,
|
||||
public NumField,
|
||||
public FtypeField,
|
||||
|
@ -676,7 +676,7 @@ class Field : public BaseResource,
|
|||
|
||||
using FieldPtr = Field::Ptr;
|
||||
|
||||
class FieldCommit : public BaseResource,
|
||||
class FieldCommit : public BaseResource<FieldCommit>,
|
||||
public CollectionIdField,
|
||||
public FieldIdField,
|
||||
public MappingsField,
|
||||
|
@ -702,7 +702,7 @@ using FieldCommitPtr = FieldCommit::Ptr;
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class FieldElement : public BaseResource,
|
||||
class FieldElement : public BaseResource<FieldElement>,
|
||||
public CollectionIdField,
|
||||
public FieldIdField,
|
||||
public NameField,
|
||||
|
|
|
@ -11,6 +11,8 @@
|
|||
|
||||
#include "db/snapshot/Snapshots.h"
|
||||
#include "db/snapshot/CompoundOperations.h"
|
||||
#include "db/snapshot/EventExecutor.h"
|
||||
#include "db/snapshot/InActiveResourcesGCEvent.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
@ -148,6 +150,9 @@ Snapshots::LoadNoLock(StorePtr store, ID_TYPE collection_id, SnapshotHolderPtr&
|
|||
|
||||
Status
|
||||
Snapshots::Init(StorePtr store) {
|
||||
auto event = std::make_shared<InActiveResourcesGCEvent>();
|
||||
EventExecutor::GetInstance().Submit(event);
|
||||
STATUS_CHECK(event->WaitToFinish());
|
||||
auto op = std::make_shared<GetCollectionIDsOperation>();
|
||||
STATUS_CHECK((*op)(store));
|
||||
auto& collection_ids = op->GetIDs();
|
||||
|
|
|
@ -114,9 +114,9 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
}
|
||||
|
||||
template <typename OpT>
|
||||
void
|
||||
Status
|
||||
Apply(OpT& op) {
|
||||
op.ApplyToStore(this->shared_from_this());
|
||||
return op.ApplyToStore(this->shared_from_this());
|
||||
}
|
||||
|
||||
template <typename ResourceT>
|
||||
|
|
|
@ -656,7 +656,7 @@ TEST_F(SnapshotTest, OperationTest) {
|
|||
auto collection_commit = CollectionCommitsHolder::GetInstance().GetResource(ss_id, false);
|
||||
/* snapshot::SegmentCommitsHolder::GetInstance().GetResource(prev_segment_commit->GetID()); */
|
||||
ASSERT_TRUE(collection_commit);
|
||||
ASSERT_TRUE(collection_commit->ToString().empty());
|
||||
std::cout << collection_commit->ToString() << std::endl;
|
||||
}
|
||||
|
||||
OperationContext merge_ctx;
|
||||
|
@ -1697,7 +1697,6 @@ TEST_F(SnapshotTest, CompoundTest2) {
|
|||
if (it == stale_partitions.end()) {
|
||||
continue;
|
||||
}
|
||||
/* std::cout << "stale Segment " << seg_p.first << std::endl; */
|
||||
expect_segments.erase(seg_p.first);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue