diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index eb1dcc3330..9450a65969 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -936,15 +936,16 @@ GetSnapshotIDsOperation::GetIDs() const { return ids_; } -GetAllActiveSnapshotIDsOperation::GetAllActiveSnapshotIDsOperation() - : BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound) { +GetAllActiveSnapshotIDsOperation::GetAllActiveSnapshotIDsOperation(const RangeContext& context) + : BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound), updated_time_range_(context) { } Status GetAllActiveSnapshotIDsOperation::DoExecute(StorePtr store) { std::vector<CollectionCommitPtr> ccs; - STATUS_CHECK(store->GetActiveResourcesByAttrs<CollectionCommit>(ccs, {meta::F_ID, meta::F_COLLECTON_ID})); - /* STATUS_CHECK(store->GetActiveResources<CollectionCommit>(ccs)); */ + STATUS_CHECK(store->GetActiveResourcesByAttrs<CollectionCommit>( + ccs, {meta::F_ID, meta::F_COLLECTON_ID}, updated_time_range_.upper_bound_, updated_time_range_.low_bound_)); + for (auto& cc : ccs) { auto cid = cc->GetCollectionId(); auto it = cid_ccid_.find(cid); @@ -953,6 +954,7 @@ GetAllActiveSnapshotIDsOperation::DoExecute(StorePtr store) { } else { cid_ccid_[cid] = std::max(it->second, cc->GetID()); } + latest_update_ = std::max(latest_update_, cc->GetUpdatedTime()); } return Status::OK(); } diff --git a/core/src/db/snapshot/CompoundOperations.h b/core/src/db/snapshot/CompoundOperations.h index 212c43fe1e..8092b6f5c0 100644 --- a/core/src/db/snapshot/CompoundOperations.h +++ b/core/src/db/snapshot/CompoundOperations.h @@ -11,6 +11,7 @@ #pragma once +#include <limits> #include <map> #include <set> #include <string> @@ -331,15 +332,22 @@ class GetAllActiveSnapshotIDsOperation : public Operations { public: using BaseT = Operations; - GetAllActiveSnapshotIDsOperation(); + explicit GetAllActiveSnapshotIDsOperation(const RangeContext& context); Status DoExecute(StorePtr) override; const std::map<ID_TYPE, ID_TYPE>& GetIDs() const; + TS_TYPE + GetLatestUpdatedTime() const { + return latest_update_; + } + protected: std::map<ID_TYPE, ID_TYPE> cid_ccid_; + RangeContext updated_time_range_; + TS_TYPE latest_update_ = std::numeric_limits<TS_TYPE>::min(); }; class DropCollectionOperation : public CompoundBaseOperation<DropCollectionOperation> { diff --git a/core/src/db/snapshot/Context.h b/core/src/db/snapshot/Context.h index 20fe10eac3..c5973bede6 100644 --- a/core/src/db/snapshot/Context.h +++ b/core/src/db/snapshot/Context.h @@ -12,6 +12,7 @@ #pragma once #include <iostream> +#include <limits> #include <map> #include <string> #include <unordered_map> @@ -25,6 +26,11 @@ namespace milvus { namespace engine { namespace snapshot { +struct RangeContext { + TS_TYPE upper_bound_ = std::numeric_limits<TS_TYPE>::max(); + TS_TYPE low_bound_ = std::numeric_limits<TS_TYPE>::min(); +}; + struct PartitionContext { std::string name; ID_TYPE id = 0; diff --git a/core/src/db/snapshot/Snapshots.cpp b/core/src/db/snapshot/Snapshots.cpp index c3e4d4b327..a8be084621 100644 --- a/core/src/db/snapshot/Snapshots.cpp +++ b/core/src/db/snapshot/Snapshots.cpp @@ -18,6 +18,7 @@ #include "db/snapshot/OperationExecutor.h" #include "db/snapshot/SnapshotPolicyFactory.h" #include "utils/CommonUtil.h" +#include "utils/TimeRecorder.h" #include "utils/TimerContext.h" #include "value/config/ServerConfig.h" @@ -25,7 +26,7 @@ namespace milvus::engine::snapshot { -static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 100 * 1000; +static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 120 * 1000; static constexpr int DEFAULT_WRITER_TIMER_INTERVAL_US = 2000 * 1000; Status @@ -245,9 +246,11 @@ Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) con } void -Snapshots::OnReaderTimer(const boost::system::error_code& ec) { +Snapshots::OnReaderTimer(const boost::system::error_code& ec, TimerContext* timer) { std::chrono::time_point<std::chrono::system_clock> start = std::chrono::system_clock::now(); - auto op = std::make_shared<GetAllActiveSnapshotIDsOperation>(); + RangeContext ctx; + ctx.low_bound_ = latest_updated_; + auto op = std::make_shared<GetAllActiveSnapshotIDsOperation>(ctx); auto status = (*op)(store_); if (!status.ok()) { LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer::GetAllActiveSnapshotIDsOperation failed: " << status.message(); @@ -260,11 +263,15 @@ Snapshots::OnReaderTimer(const boost::system::error_code& ec) { } return; } + + latest_updated_ = std::max(op->GetLatestUpdatedTime(), latest_updated_.load()); + auto ids = op->GetIDs(); ScopedSnapshotT ss; std::set<ID_TYPE> alive_cids; std::set<ID_TYPE> this_invalid_cids; bool diff_found = false; + for (auto& [cid, ccid] : ids) { status = LoadSnapshot(store_, ss, cid, ccid); if (status.code() == SS_NOT_ACTIVE_ERROR) { @@ -347,13 +354,20 @@ Snapshots::OnReaderTimer(const boost::system::error_code& ec) { auto exe_time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now() - start) .count(); - if (exe_time > DEFAULT_READER_TIMER_INTERVAL_US) { - LOG_ENGINE_WARNING_ << "OnReaderTimer takes too much time: " << exe_time << " us"; + reader_time_acc_(exe_time); + if (boost::accumulators::count(reader_time_acc_) >= 100) { + auto acc = reader_time_acc_; + reader_time_acc_ = {}; + auto mean_val = boost::accumulators::mean(acc); + auto min_val = boost::accumulators::min(acc); + auto max_val = boost::accumulators::max(acc); + LOG_SERVER_INFO_ << "OnReaderTimer Stastics [US]: MEAN=" << mean_val << ", MIN=" << min_val + << ", MAX=" << max_val; } } void -Snapshots::OnWriterTimer(const boost::system::error_code& ec) { +Snapshots::OnWriterTimer(const boost::system::error_code& ec, TimerContext* timer) { // Single mode if (!config.cluster.enable()) { std::unique_lock<std::shared_timed_mutex> lock(inactive_mtx_); @@ -392,7 +406,7 @@ Snapshots::RegisterTimers(TimerManager* mgr) { ctx.interval_us = low_limit; } LOG_SERVER_INFO_ << "OnReaderTimer INTERVAL: " << ctx.interval_us << " US"; - ctx.handler = std::bind(&Snapshots::OnReaderTimer, this, std::placeholders::_1); + ctx.handler = std::bind(&Snapshots::OnReaderTimer, this, std::placeholders::_1, std::placeholders::_2); mgr->AddTimer(ctx); } else { TimerContext::Context ctx; @@ -410,7 +424,7 @@ Snapshots::RegisterTimers(TimerManager* mgr) { } LOG_SERVER_INFO_ << "OnWriterTimer INTERVAL: " << ctx.interval_us << " US"; - ctx.handler = std::bind(&Snapshots::OnWriterTimer, this, std::placeholders::_1); + ctx.handler = std::bind(&Snapshots::OnWriterTimer, this, std::placeholders::_1, std::placeholders::_2); mgr->AddTimer(ctx); } return Status::OK(); diff --git a/core/src/db/snapshot/Snapshots.h b/core/src/db/snapshot/Snapshots.h index b5b08a7024..f117be5cd1 100644 --- a/core/src/db/snapshot/Snapshots.h +++ b/core/src/db/snapshot/Snapshots.h @@ -11,7 +11,15 @@ #pragma once +#include <algorithm> #include <atomic> +#include <boost/accumulators/accumulators.hpp> +#include <boost/accumulators/statistics/count.hpp> +#include <boost/accumulators/statistics/max.hpp> +#include <boost/accumulators/statistics/mean.hpp> +#include <boost/accumulators/statistics/min.hpp> +#include <boost/accumulators/statistics/stats.hpp> +#include <limits> #include <map> #include <memory> #include <mutex> @@ -88,9 +96,9 @@ class Snapshots { DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn); void - OnReaderTimer(const boost::system::error_code&); + OnReaderTimer(const boost::system::error_code&, TimerContext*); void - OnWriterTimer(const boost::system::error_code&); + OnWriterTimer(const boost::system::error_code&, TimerContext*); Status LoadNoLock(StorePtr store, ID_TYPE collection_id, SnapshotHolderPtr& holder); @@ -105,6 +113,14 @@ class Snapshots { std::map<ID_TYPE, SnapshotHolderPtr> inactive_holders_; std::set<ID_TYPE> invalid_ssid_; StorePtr store_; + + std::atomic<TS_TYPE> latest_updated_ = std::numeric_limits<TS_TYPE>::min(); + boost::accumulators::accumulator_set< + double, boost::accumulators::stats<boost::accumulators::tag::count, boost::accumulators::tag::mean, + boost::accumulators::tag::max, boost::accumulators::tag::min> + /* boost::accumulators::features<boost::accumulators::tag::count> */ + > + reader_time_acc_; }; } // namespace milvus::engine::snapshot diff --git a/core/src/db/snapshot/Store.h b/core/src/db/snapshot/Store.h index 94f35ecafe..bf30b1eca0 100644 --- a/core/src/db/snapshot/Store.h +++ b/core/src/db/snapshot/Store.h @@ -14,6 +14,8 @@ #include "codecs/Codec.h" #include "db/Utils.h" #include "db/meta/MetaFactory.h" +#include "db/meta/condition/MetaFilter.h" +#include "db/meta/condition/MetaRelation.h" #include "db/snapshot/ResourceContext.h" #include "db/snapshot/ResourceHelper.h" #include "db/snapshot/ResourceTypes.h" @@ -31,6 +33,7 @@ #include <functional> #include <iomanip> #include <iostream> +#include <limits> #include <map> #include <memory> #include <set> @@ -181,9 +184,16 @@ class Store : public std::enable_shared_from_this<Store> { template <typename ResourceT> Status GetActiveResourcesByAttrs(std::vector<typename ResourceT::Ptr>& return_vs, - const std::vector<std::string>& target_attrs) { + const std::vector<std::string>& target_attrs, int64_t upper_bound, int64_t low_bound) { std::vector<State> filter_states = {State::ACTIVE}; - return adapter_->SelectByAttrs<ResourceT>(StateField::Name, filter_states, target_attrs, return_vs); + auto relation = meta::ONE_(meta::Range_<ResourceT, StateField>(meta::Range::EQ, State::ACTIVE)); + if (upper_bound < std::numeric_limits<TS_TYPE>::max()) { + relation = meta::AND_(relation, meta::Range_<ResourceT, UpdatedOnField>(meta::Range::LTE, upper_bound)); + } + if (low_bound > std::numeric_limits<TS_TYPE>::min()) { + relation = meta::AND_(relation, meta::Range_<ResourceT, UpdatedOnField>(meta::Range::GTE, low_bound)); + } + return adapter_->Query<ResourceT>(relation, return_vs); } template <typename ResourceT> diff --git a/core/src/utils/TimerContext.h b/core/src/utils/TimerContext.h index 0396c4ef61..37adcc7ebe 100644 --- a/core/src/utils/TimerContext.h +++ b/core/src/utils/TimerContext.h @@ -24,7 +24,7 @@ namespace milvus { struct TimerContext { - using HandlerT = std::function<void(const boost::system::error_code&)>; + using HandlerT = std::function<void(const boost::system::error_code&, TimerContext* ctx)>; struct Context { /* Context(int interval_us, HandlerT& handler, ThreadPoolPtr pool = nullptr) */ /* : interval_(interval_us), handler_(handler), timer_(io, interval_), pool_(pool) { */ @@ -58,7 +58,7 @@ struct TimerContext { inline void TimerContext::Reschedule(const boost::system::error_code& ec) { try { - pool_->enqueue(handler_, ec); + pool_->enqueue(handler_, ec, this); } catch (std::exception& ex) { LOG_SERVER_ERROR_ << "Fail to enqueue handler: " << std::string(ex.what()); }