mirror of https://github.com/milvus-io/milvus.git
Snapshot improve (#2608)
* Format code style Signed-off-by: JinHai-CN <hai.jin@zilliz.com> * Fix bug && Format code style * Format code style * Format code style Signed-off-by: JinHai-CN <hai.jin@zilliz.com> * Update lint Signed-off-by: JinHai-CN <hai.jin@zilliz.com> * Update merge config Signed-off-by: JinHai-CN <hai.jin@zilliz.com> * Fix compiling error Signed-off-by: JinHai-CN <hai.jin@zilliz.com>pull/2639/head
parent
dd9f029a9f
commit
f1a1ee8678
|
@ -9,7 +9,6 @@ pull_request_rules:
|
|||
- "status-success=continuous-integration/jenkins/pr-merge"
|
||||
- "status-success=AMD64 Ubuntu 18.04"
|
||||
- "status-success=AMD64 CentOS 7"
|
||||
- "status-success=codecov/project"
|
||||
actions:
|
||||
merge:
|
||||
method: squash
|
||||
|
|
|
@ -16,7 +16,7 @@ FAISS_ROOT="" #FAISS root path
|
|||
FAISS_SOURCE="BUNDLED"
|
||||
WITH_PROMETHEUS="ON"
|
||||
FIU_ENABLE="OFF"
|
||||
BUILD_OPENBLAS="ON"
|
||||
# BUILD_OPENBLAS="ON" # not used any more
|
||||
|
||||
while getopts "p:d:t:f:ulrcghzmei" arg; do
|
||||
case $arg in
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include "BaseHolders.h"
|
||||
#include "Operations.h"
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
void ResourceHolder<ResourceT, Derived>::Dump(const std::string& tag) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
std::cout << typeid(*this).name() << " Dump Start [" << tag << "]:" << id_map_.size() << std::endl;
|
||||
for (auto& kv : id_map_) {
|
||||
/* std::cout << "\t" << kv.second->ToString() << std::endl; */
|
||||
std::cout << "\t" << kv.first << " ref_count " << kv.second->ref_count() << std::endl;
|
||||
}
|
||||
std::cout << typeid(*this).name() << " Dump End [" << tag << "]" << std::endl;
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
void ResourceHolder<ResourceT, Derived>::Reset() {
|
||||
id_map_.clear();
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
typename ResourceHolder<ResourceT, Derived>::ResourcePtr
|
||||
ResourceHolder<ResourceT, Derived>::DoLoad(Store& store, ID_TYPE id) {
|
||||
LoadOperationContext context;
|
||||
context.id = id;
|
||||
auto op = std::make_shared<LoadOperation<ResourceT>>(context);
|
||||
(*op)(store);
|
||||
typename ResourceT::Ptr c;
|
||||
auto status = op->GetResource(c);
|
||||
if (status.ok()) {
|
||||
Add(c);
|
||||
return c;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
typename ResourceHolder<ResourceT, Derived>::ScopedT
|
||||
ResourceHolder<ResourceT, Derived>::Load(Store& store, ID_TYPE id, bool scoped) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
auto cit = id_map_.find(id);
|
||||
if (cit != id_map_.end()) {
|
||||
return ScopedT(cit->second, scoped);
|
||||
}
|
||||
}
|
||||
auto ret = DoLoad(store, id);
|
||||
if (!ret) return ScopedT();
|
||||
return ScopedT(ret, scoped);
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
typename ResourceHolder<ResourceT, Derived>::ScopedT
|
||||
ResourceHolder<ResourceT, Derived>::GetResource(ID_TYPE id, bool scoped) {
|
||||
// TODO: Temp to use Load here. Will be removed when resource is loaded just post Compound
|
||||
// Operations.
|
||||
return Load(Store::GetInstance(), id, scoped);
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
auto cit = id_map_.find(id);
|
||||
if (cit != id_map_.end()) {
|
||||
return ScopedT(cit->second, scoped);
|
||||
}
|
||||
}
|
||||
return ScopedT();
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
void
|
||||
ResourceHolder<ResourceT, Derived>::OnNoRefCallBack(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
|
||||
HardDelete(resource->GetID());
|
||||
Release(resource->GetID());
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
bool ResourceHolder<ResourceT, Derived>::ReleaseNoLock(ID_TYPE id) {
|
||||
auto it = id_map_.find(id);
|
||||
if (it == id_map_.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
id_map_.erase(it);
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
bool ResourceHolder<ResourceT, Derived>::Release(ID_TYPE id) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
return ReleaseNoLock(id);
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
bool
|
||||
ResourceHolder<ResourceT, Derived>::HardDelete(ID_TYPE id) {
|
||||
auto op = std::make_shared<HardDeleteOperation<ResourceT>>(id);
|
||||
// TODO:
|
||||
(*op)(Store::GetInstance());
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
bool ResourceHolder<ResourceT, Derived>::AddNoLock(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
|
||||
if (!resource) return false;
|
||||
if (id_map_.find(resource->GetID()) != id_map_.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
id_map_[resource->GetID()] = resource;
|
||||
resource->RegisterOnNoRefCB(std::bind(&Derived::OnNoRefCallBack, this, resource));
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename Derived>
|
||||
bool ResourceHolder<ResourceT, Derived>::Add(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
return AddNoLock(resource);
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -13,21 +13,16 @@
|
|||
#include <string>
|
||||
#include "ReferenceProxy.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
class BaseResource : public ReferenceProxy {
|
||||
public:
|
||||
virtual std::string
|
||||
[[nodiscard]] virtual std::string
|
||||
ToString() const {
|
||||
return "";
|
||||
return std::string();
|
||||
}
|
||||
|
||||
virtual ~BaseResource() {
|
||||
}
|
||||
~BaseResource() override = default;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -12,12 +12,9 @@
|
|||
#include "db/snapshot/OperationExecutor.h"
|
||||
#include <iostream>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
OperationExecutor::OperationExecutor() {
|
||||
}
|
||||
OperationExecutor::OperationExecutor() = default;
|
||||
|
||||
OperationExecutor::~OperationExecutor() {
|
||||
Stop();
|
||||
|
@ -30,7 +27,7 @@ OperationExecutor::GetInstance() {
|
|||
}
|
||||
|
||||
Status
|
||||
OperationExecutor::Submit(OperationsPtr operation, bool sync) {
|
||||
OperationExecutor::Submit(const OperationsPtr& operation, bool sync) {
|
||||
if (!operation)
|
||||
return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid Operation");
|
||||
/* Store::GetInstance().Apply(*operation); */
|
||||
|
@ -60,7 +57,7 @@ OperationExecutor::Stop() {
|
|||
}
|
||||
|
||||
void
|
||||
OperationExecutor::Enqueue(OperationsPtr operation) {
|
||||
OperationExecutor::Enqueue(const OperationsPtr& operation) {
|
||||
/* std::cout << std::this_thread::get_id() << " Enqueue Operation " << operation->GetID() << std::endl; */
|
||||
queue_.Put(operation);
|
||||
}
|
||||
|
@ -79,6 +76,4 @@ OperationExecutor::ThreadMain() {
|
|||
}
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -18,9 +18,7 @@
|
|||
#include "Store.h"
|
||||
#include "utils/BlockingQueue.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
using ThreadPtr = std::shared_ptr<std::thread>;
|
||||
using OperationQueue = BlockingQueue<OperationsPtr>;
|
||||
|
@ -36,7 +34,7 @@ class OperationExecutor {
|
|||
GetInstance();
|
||||
|
||||
Status
|
||||
Submit(OperationsPtr operation, bool sync = true);
|
||||
Submit(const OperationsPtr& operation, bool sync = true);
|
||||
|
||||
void
|
||||
Start();
|
||||
|
@ -53,15 +51,12 @@ class OperationExecutor {
|
|||
ThreadMain();
|
||||
|
||||
void
|
||||
Enqueue(OperationsPtr operation);
|
||||
Enqueue(const OperationsPtr& operation);
|
||||
|
||||
protected:
|
||||
mutable std::mutex mtx_;
|
||||
bool running_ = false;
|
||||
std::thread thread_;
|
||||
OperationQueue queue_;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -205,8 +205,8 @@ Operations::OnSnapshotDropped() {
|
|||
|
||||
Status
|
||||
Operations::OnSnapshotStale() {
|
||||
/* std::cout << GetRepr() << " Stale SS " << prev_ss_->GetID() << " RefCnt=" << prev_ss_->RefCnt() \ */
|
||||
/* << " Curr SS " << context_.prev_ss->GetID() << " RefCnt=" << context_.prev_ss->RefCnt() << std::endl; */
|
||||
/* std::cout << GetRepr() << " Stale SS " << prev_ss_->GetID() << " RefCnt=" << prev_ss_->ref_count() \ */
|
||||
/* << " Curr SS " << context_.prev_ss->GetID() << " RefCnt=" << context_.prev_ss->ref_count() << std::endl; */
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -10,23 +10,19 @@
|
|||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include "db/snapshot/ReferenceProxy.h"
|
||||
#include <assert.h>
|
||||
#include <iostream>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
void
|
||||
ReferenceProxy::Ref() {
|
||||
++refcnt_;
|
||||
++ref_count_;
|
||||
}
|
||||
|
||||
void
|
||||
ReferenceProxy::UnRef() {
|
||||
if (refcnt_ == 0)
|
||||
if (ref_count_ == 0)
|
||||
return;
|
||||
if (refcnt_.fetch_sub(1) == 1) {
|
||||
if (ref_count_.fetch_sub(1) == 1) {
|
||||
for (auto& cb : on_no_ref_cbs_) {
|
||||
cb();
|
||||
}
|
||||
|
@ -34,14 +30,8 @@ ReferenceProxy::UnRef() {
|
|||
}
|
||||
|
||||
void
|
||||
ReferenceProxy::RegisterOnNoRefCB(OnNoRefCBF cb) {
|
||||
ReferenceProxy::RegisterOnNoRefCB(const OnNoRefCBF& cb) {
|
||||
on_no_ref_cbs_.emplace_back(cb);
|
||||
}
|
||||
|
||||
ReferenceProxy::~ReferenceProxy() {
|
||||
/* OnDeRef(); */
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -16,47 +16,45 @@
|
|||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
using OnNoRefCBF = std::function<void(void)>;
|
||||
|
||||
class ReferenceProxy {
|
||||
public:
|
||||
ReferenceProxy() {
|
||||
}
|
||||
ReferenceProxy() = default;
|
||||
|
||||
// TODO: Copy constructor is used in Mock Test. Should never be used. To be removed
|
||||
ReferenceProxy(const ReferenceProxy& o) {
|
||||
refcnt_ = 0;
|
||||
ref_count_ = 0;
|
||||
}
|
||||
|
||||
void
|
||||
RegisterOnNoRefCB(OnNoRefCBF cb);
|
||||
RegisterOnNoRefCB(const OnNoRefCBF& cb);
|
||||
|
||||
virtual void
|
||||
Ref();
|
||||
|
||||
virtual void
|
||||
UnRef();
|
||||
|
||||
int
|
||||
RefCnt() const {
|
||||
return refcnt_;
|
||||
[[nodiscard]] int64_t
|
||||
ref_count() const {
|
||||
return ref_count_;
|
||||
}
|
||||
|
||||
void
|
||||
ResetCnt() {
|
||||
refcnt_ = 0;
|
||||
ref_count_ = 0;
|
||||
}
|
||||
|
||||
virtual ~ReferenceProxy();
|
||||
virtual ~ReferenceProxy() = default;
|
||||
|
||||
protected:
|
||||
std::atomic_long refcnt_ = {0};
|
||||
std::atomic<int64_t> ref_count_ = {0};
|
||||
std::vector<OnNoRefCBF> on_no_ref_cbs_;
|
||||
};
|
||||
|
||||
using ReferenceResourcePtr = std::shared_ptr<ReferenceProxy>;
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -109,7 +109,7 @@ class ResourceHolder {
|
|||
std::cout << typeid(*this).name() << " Dump Start [" << tag << "]:" << id_map_.size() << std::endl;
|
||||
for (auto& kv : id_map_) {
|
||||
/* std::cout << "\t" << kv.second->ToString() << std::endl; */
|
||||
std::cout << "\t" << kv.first << " RefCnt " << kv.second->RefCnt() << std::endl;
|
||||
std::cout << "\t" << kv.first << " RefCnt " << kv.second->ref_count() << std::endl;
|
||||
}
|
||||
std::cout << typeid(*this).name() << " Dump End [" << tag << "]" << std::endl;
|
||||
}
|
||||
|
|
|
@ -14,9 +14,7 @@
|
|||
#include <sstream>
|
||||
#include "db/snapshot/Store.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
Collection::Collection(const std::string& name, ID_TYPE id, LSN_TYPE lsn, State status, TS_TYPE created_on,
|
||||
TS_TYPE updated_on)
|
||||
|
@ -59,7 +57,7 @@ Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, LSN_TYPE lsn, St
|
|||
LsnField(lsn),
|
||||
StatusField(status),
|
||||
CreatedOnField(created_on),
|
||||
UpdatedOnField(updated_on_) {
|
||||
UpdatedOnField(updated_on) {
|
||||
}
|
||||
|
||||
FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype,
|
||||
|
@ -181,6 +179,4 @@ SegmentFile::SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field
|
|||
UpdatedOnField(updated_on) {
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "db/Utils.h"
|
||||
#include "db/snapshot/BaseResource.h"
|
||||
|
@ -25,14 +26,13 @@
|
|||
|
||||
using milvus::engine::utils::GetMicroSecTimeStamp;
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace snapshot {
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
class MappingsField {
|
||||
public:
|
||||
explicit MappingsField(const MappingT& mappings = {}) : mappings_(mappings) {
|
||||
explicit MappingsField(MappingT mappings = {}) : mappings_(std::move(mappings)) {
|
||||
}
|
||||
|
||||
const MappingT&
|
||||
GetMappings() const {
|
||||
return mappings_;
|
||||
|
@ -50,15 +50,17 @@ class StatusField {
|
|||
public:
|
||||
explicit StatusField(State status = PENDING) : status_(status) {
|
||||
}
|
||||
|
||||
State
|
||||
GetStatus() const {
|
||||
return status_;
|
||||
}
|
||||
|
||||
bool
|
||||
[[nodiscard]] bool
|
||||
IsActive() const {
|
||||
return status_ == ACTIVE;
|
||||
}
|
||||
|
||||
bool
|
||||
IsDeactive() const {
|
||||
return status_ == DEACTIVE;
|
||||
|
@ -90,6 +92,7 @@ class LsnField {
|
|||
public:
|
||||
explicit LsnField(LSN_TYPE lsn = 0) : lsn_(lsn) {
|
||||
}
|
||||
|
||||
const LSN_TYPE&
|
||||
GetLsn() const {
|
||||
return lsn_;
|
||||
|
@ -108,6 +111,7 @@ class CreatedOnField {
|
|||
public:
|
||||
explicit CreatedOnField(TS_TYPE created_on = GetMicroSecTimeStamp()) : created_on_(created_on) {
|
||||
}
|
||||
|
||||
TS_TYPE
|
||||
GetCreatedTime() const {
|
||||
return created_on_;
|
||||
|
@ -121,6 +125,7 @@ class UpdatedOnField {
|
|||
public:
|
||||
explicit UpdatedOnField(TS_TYPE updated_on = GetMicroSecTimeStamp()) : updated_on_(updated_on) {
|
||||
}
|
||||
|
||||
TS_TYPE
|
||||
GetUpdatedTime() const {
|
||||
return updated_on_;
|
||||
|
@ -134,6 +139,7 @@ class IdField {
|
|||
public:
|
||||
explicit IdField(ID_TYPE id) : id_(id) {
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
GetID() const {
|
||||
return id_;
|
||||
|
@ -155,6 +161,7 @@ class CollectionIdField {
|
|||
public:
|
||||
explicit CollectionIdField(ID_TYPE id) : collection_id_(id) {
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
GetCollectionId() const {
|
||||
return collection_id_;
|
||||
|
@ -168,6 +175,7 @@ class SchemaIdField {
|
|||
public:
|
||||
explicit SchemaIdField(ID_TYPE id) : schema_id_(id) {
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
GetSchemaId() const {
|
||||
return schema_id_;
|
||||
|
@ -185,6 +193,7 @@ class NumField {
|
|||
public:
|
||||
explicit NumField(NUM_TYPE num) : num_(num) {
|
||||
}
|
||||
|
||||
NUM_TYPE
|
||||
GetNum() const {
|
||||
return num_;
|
||||
|
@ -202,6 +211,7 @@ class FtypeField {
|
|||
public:
|
||||
explicit FtypeField(FTYPE_TYPE type) : ftype_(type) {
|
||||
}
|
||||
|
||||
FTYPE_TYPE
|
||||
GetFtype() const {
|
||||
return ftype_;
|
||||
|
@ -215,6 +225,7 @@ class FieldIdField {
|
|||
public:
|
||||
explicit FieldIdField(ID_TYPE id) : field_id_(id) {
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
GetFieldId() const {
|
||||
return field_id_;
|
||||
|
@ -228,6 +239,7 @@ class FieldElementIdField {
|
|||
public:
|
||||
explicit FieldElementIdField(ID_TYPE id) : field_element_id_(id) {
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
GetFieldElementId() const {
|
||||
return field_element_id_;
|
||||
|
@ -241,6 +253,7 @@ class PartitionIdField {
|
|||
public:
|
||||
explicit PartitionIdField(ID_TYPE id) : partition_id_(id) {
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
GetPartitionId() const {
|
||||
return partition_id_;
|
||||
|
@ -254,6 +267,7 @@ class SegmentIdField {
|
|||
public:
|
||||
explicit SegmentIdField(ID_TYPE id) : segment_id_(id) {
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
GetSegmentId() const {
|
||||
return segment_id_;
|
||||
|
@ -265,8 +279,9 @@ class SegmentIdField {
|
|||
|
||||
class NameField {
|
||||
public:
|
||||
explicit NameField(const std::string& name) : name_(name) {
|
||||
explicit NameField(std::string name) : name_(std::move(name)) {
|
||||
}
|
||||
|
||||
const std::string&
|
||||
GetName() const {
|
||||
return name_;
|
||||
|
@ -290,8 +305,8 @@ class Collection : public BaseResource,
|
|||
using VecT = std::vector<Ptr>;
|
||||
static constexpr const char* Name = "Collection";
|
||||
|
||||
Collection(const std::string& name, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
|
||||
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
|
||||
explicit Collection(const std::string& name, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
|
||||
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
|
||||
};
|
||||
|
||||
using CollectionPtr = Collection::Ptr;
|
||||
|
@ -311,9 +326,9 @@ class SchemaCommit : public BaseResource,
|
|||
using VecT = std::vector<Ptr>;
|
||||
static constexpr const char* Name = "SchemaCommit";
|
||||
|
||||
SchemaCommit(ID_TYPE collection_id, const MappingT& mappings = {}, ID_TYPE id = 0, LSN_TYPE lsn = 0,
|
||||
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
|
||||
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
|
||||
explicit SchemaCommit(ID_TYPE collection_id, const MappingT& mappings = {}, ID_TYPE id = 0, LSN_TYPE lsn = 0,
|
||||
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
|
||||
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
|
||||
};
|
||||
|
||||
using SchemaCommitPtr = SchemaCommit::Ptr;
|
||||
|
@ -448,7 +463,7 @@ class PartitionCommit : public BaseResource,
|
|||
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
|
||||
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
|
||||
|
||||
std::string
|
||||
[[nodiscard]] std::string
|
||||
ToString() const override;
|
||||
};
|
||||
|
||||
|
@ -469,10 +484,10 @@ class Segment : public BaseResource,
|
|||
using VecT = std::vector<Ptr>;
|
||||
static constexpr const char* Name = "Segment";
|
||||
|
||||
Segment(ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
|
||||
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
|
||||
explicit Segment(ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
|
||||
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
|
||||
|
||||
std::string
|
||||
[[nodiscard]] std::string
|
||||
ToString() const override;
|
||||
};
|
||||
|
||||
|
@ -499,7 +514,7 @@ class SegmentCommit : public BaseResource,
|
|||
ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
|
||||
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
|
||||
|
||||
std::string
|
||||
[[nodiscard]] std::string
|
||||
ToString() const override;
|
||||
};
|
||||
|
||||
|
@ -528,6 +543,4 @@ class SegmentFile : public BaseResource,
|
|||
|
||||
using SegmentFilePtr = SegmentFile::Ptr;
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -19,13 +19,13 @@ namespace snapshot {
|
|||
|
||||
void
|
||||
Snapshot::RefAll() {
|
||||
/* std::cout << this << " RefAll SS=" << GetID() << " SS RefCnt=" << RefCnt() << std::endl; */
|
||||
/* std::cout << this << " RefAll SS=" << GetID() << " SS RefCnt=" << ref_count() << std::endl; */
|
||||
std::apply([this](auto&... resource) { ((DoRef(resource)), ...); }, resources_);
|
||||
}
|
||||
|
||||
void
|
||||
Snapshot::UnRefAll() {
|
||||
/* std::cout << this << " UnRefAll SS=" << GetID() << " SS RefCnt=" << RefCnt() << std::endl; */
|
||||
/* std::cout << this << " UnRefAll SS=" << GetID() << " SS RefCnt=" << ref_count() << std::endl; */
|
||||
std::apply([this](auto&... resource) { ((DoUnRef(resource)), ...); }, resources_);
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ class Snapshot : public ReferenceProxy {
|
|||
return GetCollectionCommit()->GetID();
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
[[nodiscard]] ID_TYPE
|
||||
GetCollectionId() const {
|
||||
auto it = GetResources<Collection>().begin();
|
||||
return it->first;
|
||||
|
@ -67,17 +67,17 @@ class Snapshot : public ReferenceProxy {
|
|||
return GetResource<SchemaCommit>(id);
|
||||
}
|
||||
|
||||
const std::string&
|
||||
[[nodiscard]] const std::string&
|
||||
GetName() const {
|
||||
return GetResources<Collection>().begin()->second->GetName();
|
||||
}
|
||||
|
||||
size_t
|
||||
[[nodiscard]] size_t
|
||||
NumberOfPartitions() const {
|
||||
return GetResources<Partition>().size();
|
||||
}
|
||||
|
||||
const LSN_TYPE&
|
||||
[[nodiscard]] const LSN_TYPE&
|
||||
GetMaxLsn() const {
|
||||
return max_lsn_;
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ class Snapshot : public ReferenceProxy {
|
|||
return GetResources<CollectionCommit>().begin()->second.Get();
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
[[nodiscard]] ID_TYPE
|
||||
GetLatestSchemaCommitId() const {
|
||||
return latest_schema_commit_id_;
|
||||
}
|
||||
|
@ -129,7 +129,7 @@ class Snapshot : public ReferenceProxy {
|
|||
return GetResource<PartitionCommit>(it->second);
|
||||
}
|
||||
|
||||
std::vector<std::string>
|
||||
[[nodiscard]] std::vector<std::string>
|
||||
GetFieldNames() const {
|
||||
std::vector<std::string> names;
|
||||
for (auto& kv : field_names_map_) {
|
||||
|
@ -138,19 +138,19 @@ class Snapshot : public ReferenceProxy {
|
|||
return std::move(names);
|
||||
}
|
||||
|
||||
bool
|
||||
[[nodiscard]] bool
|
||||
HasField(const std::string& name) const {
|
||||
auto it = field_names_map_.find(name);
|
||||
return it != field_names_map_.end();
|
||||
}
|
||||
|
||||
bool
|
||||
[[nodiscard]] bool
|
||||
HasFieldElement(const std::string& field_name, const std::string& field_element_name) const {
|
||||
auto id = GetFieldElementId(field_name, field_element_name);
|
||||
return id > 0;
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
[[nodiscard]] ID_TYPE
|
||||
GetSegmentFileId(const std::string& field_name, const std::string& field_element_name, ID_TYPE segment_id) const {
|
||||
auto field_element_id = GetFieldElementId(field_name, field_element_name);
|
||||
auto it = element_segfiles_map_.find(field_element_id);
|
||||
|
@ -164,13 +164,13 @@ class Snapshot : public ReferenceProxy {
|
|||
return its->second;
|
||||
}
|
||||
|
||||
bool
|
||||
[[nodiscard]] bool
|
||||
HasSegmentFile(const std::string& field_name, const std::string& field_element_name, ID_TYPE segment_id) const {
|
||||
auto id = GetSegmentFileId(field_name, field_element_name, segment_id);
|
||||
return id > 0;
|
||||
}
|
||||
|
||||
ID_TYPE
|
||||
[[nodiscard]] ID_TYPE
|
||||
GetFieldElementId(const std::string& field_name, const std::string& field_element_name) const {
|
||||
auto itf = field_element_names_map_.find(field_name);
|
||||
if (itf == field_element_names_map_.end())
|
||||
|
@ -232,7 +232,7 @@ class Snapshot : public ReferenceProxy {
|
|||
}
|
||||
|
||||
template <typename ResourceT>
|
||||
const typename ResourceT::ScopedMapT&
|
||||
[[nodiscard]] const typename ResourceT::ScopedMapT&
|
||||
GetResources() const {
|
||||
return std::get<Index<typename ResourceT::ScopedMapT, ScopedResourcesT>::value>(resources_);
|
||||
}
|
||||
|
|
|
@ -211,7 +211,8 @@ void
|
|||
Snapshots::SnapshotGCCallback(Snapshot::Ptr ss_ptr) {
|
||||
/* to_release_.push_back(ss_ptr); */
|
||||
ss_ptr->UnRef();
|
||||
std::cout << "Snapshot " << ss_ptr->GetID() << " RefCnt = " << ss_ptr->RefCnt() << " To be removed" << std::endl;
|
||||
std::cout << "Snapshot " << ss_ptr->GetID() << " ref_count = " << ss_ptr->ref_count() << " To be removed"
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
|
|
|
@ -78,49 +78,55 @@ TEST_F(SnapshotTest, ReferenceProxyTest) {
|
|||
};
|
||||
|
||||
auto proxy = ReferenceProxy();
|
||||
ASSERT_EQ(proxy.RefCnt(), 0);
|
||||
ASSERT_EQ(proxy.ref_count(), 0);
|
||||
|
||||
int refcnt = 3;
|
||||
for (auto i = 0; i < refcnt; ++i) {
|
||||
proxy.Ref();
|
||||
}
|
||||
ASSERT_EQ(proxy.RefCnt(), refcnt);
|
||||
ASSERT_EQ(proxy.ref_count(), refcnt);
|
||||
|
||||
proxy.RegisterOnNoRefCB(callback);
|
||||
|
||||
for (auto i = 0; i < refcnt; ++i) {
|
||||
proxy.UnRef();
|
||||
}
|
||||
ASSERT_EQ(proxy.RefCnt(), 0);
|
||||
ASSERT_EQ(proxy.ref_count(), 0);
|
||||
ASSERT_EQ(status, CALLED);
|
||||
}
|
||||
|
||||
TEST_F(SnapshotTest, ScopedResourceTest) {
|
||||
auto inner = std::make_shared<Collection>("c1");
|
||||
ASSERT_EQ(inner->RefCnt(), 0);
|
||||
ASSERT_EQ(inner->ref_count(), 0);
|
||||
|
||||
{
|
||||
auto not_scoped = CollectionScopedT(inner, false);
|
||||
ASSERT_EQ(not_scoped->RefCnt(), 0);
|
||||
ASSERT_EQ(not_scoped->ref_count(), 0);
|
||||
not_scoped->Ref();
|
||||
ASSERT_EQ(not_scoped->RefCnt(), 1);
|
||||
ASSERT_EQ(inner->RefCnt(), 1);
|
||||
ASSERT_EQ(not_scoped->ref_count(), 1);
|
||||
ASSERT_EQ(inner->ref_count(), 1);
|
||||
|
||||
auto not_scoped_2 = not_scoped;
|
||||
ASSERT_EQ(not_scoped_2->RefCnt(), 1);
|
||||
ASSERT_EQ(not_scoped->RefCnt(), 1);
|
||||
ASSERT_EQ(inner->RefCnt(), 1);
|
||||
ASSERT_EQ(not_scoped_2->ref_count(), 1);
|
||||
ASSERT_EQ(not_scoped->ref_count(), 1);
|
||||
ASSERT_EQ(inner->ref_count(), 1);
|
||||
|
||||
not_scoped_2->Ref();
|
||||
ASSERT_EQ(not_scoped_2->ref_count(), 2);
|
||||
ASSERT_EQ(not_scoped->ref_count(), 2);
|
||||
ASSERT_EQ(inner->ref_count(), 2);
|
||||
}
|
||||
ASSERT_EQ(inner->RefCnt(), 1);
|
||||
inner->UnRef();
|
||||
ASSERT_EQ(inner->ref_count(), 1);
|
||||
|
||||
inner->UnRef();
|
||||
ASSERT_EQ(inner->RefCnt(), 0);
|
||||
ASSERT_EQ(inner->ref_count(), 0);
|
||||
|
||||
{
|
||||
// Test scoped construct
|
||||
auto scoped = CollectionScopedT(inner);
|
||||
ASSERT_EQ(scoped->RefCnt(), 1);
|
||||
ASSERT_EQ(inner->RefCnt(), 1);
|
||||
ASSERT_EQ(scoped->ref_count(), 1);
|
||||
ASSERT_EQ(inner->ref_count(), 1);
|
||||
|
||||
{
|
||||
// Test bool operator
|
||||
|
@ -128,45 +134,45 @@ TEST_F(SnapshotTest, ScopedResourceTest) {
|
|||
ASSERT_EQ(other_scoped, false);
|
||||
// Test operator=
|
||||
other_scoped = scoped;
|
||||
ASSERT_EQ(other_scoped->RefCnt(), 2);
|
||||
ASSERT_EQ(scoped->RefCnt(), 2);
|
||||
ASSERT_EQ(inner->RefCnt(), 2);
|
||||
ASSERT_EQ(other_scoped->ref_count(), 2);
|
||||
ASSERT_EQ(scoped->ref_count(), 2);
|
||||
ASSERT_EQ(inner->ref_count(), 2);
|
||||
}
|
||||
ASSERT_EQ(scoped->RefCnt(), 1);
|
||||
ASSERT_EQ(inner->RefCnt(), 1);
|
||||
ASSERT_EQ(scoped->ref_count(), 1);
|
||||
ASSERT_EQ(inner->ref_count(), 1);
|
||||
|
||||
{
|
||||
// Test copy
|
||||
auto other_scoped(scoped);
|
||||
ASSERT_EQ(other_scoped->RefCnt(), 2);
|
||||
ASSERT_EQ(scoped->RefCnt(), 2);
|
||||
ASSERT_EQ(inner->RefCnt(), 2);
|
||||
ASSERT_EQ(other_scoped->ref_count(), 2);
|
||||
ASSERT_EQ(scoped->ref_count(), 2);
|
||||
ASSERT_EQ(inner->ref_count(), 2);
|
||||
}
|
||||
ASSERT_EQ(scoped->RefCnt(), 1);
|
||||
ASSERT_EQ(inner->RefCnt(), 1);
|
||||
ASSERT_EQ(scoped->ref_count(), 1);
|
||||
ASSERT_EQ(inner->ref_count(), 1);
|
||||
}
|
||||
ASSERT_EQ(inner->RefCnt(), 0);
|
||||
ASSERT_EQ(inner->ref_count(), 0);
|
||||
}
|
||||
|
||||
TEST_F(SnapshotTest, ResourceHoldersTest) {
|
||||
ID_TYPE collection_id = 1;
|
||||
auto collection = CollectionsHolder::GetInstance().GetResource(collection_id, false);
|
||||
auto prev_cnt = collection->RefCnt();
|
||||
auto prev_cnt = collection->ref_count();
|
||||
{
|
||||
auto collection_2 = CollectionsHolder::GetInstance().GetResource(collection_id, false);
|
||||
ASSERT_EQ(collection->GetID(), collection_id);
|
||||
ASSERT_EQ(collection->RefCnt(), prev_cnt);
|
||||
ASSERT_EQ(collection->ref_count(), prev_cnt);
|
||||
}
|
||||
|
||||
{
|
||||
auto collection = CollectionsHolder::GetInstance().GetResource(collection_id, true);
|
||||
ASSERT_EQ(collection->GetID(), collection_id);
|
||||
ASSERT_EQ(collection->RefCnt(), 1+prev_cnt);
|
||||
auto collection_3 = CollectionsHolder::GetInstance().GetResource(collection_id, true);
|
||||
ASSERT_EQ(collection_3->GetID(), collection_id);
|
||||
ASSERT_EQ(collection_3->ref_count(), 1+prev_cnt);
|
||||
}
|
||||
|
||||
if (prev_cnt == 0) {
|
||||
auto collection = CollectionsHolder::GetInstance().GetResource(collection_id, false);
|
||||
ASSERT_TRUE(!collection);
|
||||
auto collection_4 = CollectionsHolder::GetInstance().GetResource(collection_id, false);
|
||||
ASSERT_TRUE(!collection_4);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -211,7 +217,7 @@ TEST_F(SnapshotTest, CreateCollectionOperationTest) {
|
|||
IDS_TYPE ids;
|
||||
status = Snapshots::GetInstance().GetCollectionIds(ids);
|
||||
ASSERT_EQ(ids.size(), 6);
|
||||
ASSERT_EQ(ids[5], latest_ss->GetCollectionId());
|
||||
ASSERT_EQ(ids.back(), latest_ss->GetCollectionId());
|
||||
|
||||
OperationContext sd_op_ctx;
|
||||
sd_op_ctx.collection = latest_ss->GetCollection();
|
||||
|
|
Loading…
Reference in New Issue