mirror of https://github.com/milvus-io/milvus.git
optimization for read node sync metadata (#4556)
* optimize read-write mode sync logic Signed-off-by: peng.xu <peng.xu@zilliz.com> update 2 Signed-off-by: XuPeng-SH <xupeng3112@163.com> optimize read node sync logic Signed-off-by: peng.xu <peng.xu@zilliz.com> * update for optimization Signed-off-by: peng.xu <peng.xu@zilliz.com> Signed-off-by: shengjun.li <shengjun.li@zilliz.com>pull/4600/head
parent
1d6668156c
commit
a11842c05d
|
@ -936,15 +936,16 @@ GetSnapshotIDsOperation::GetIDs() const {
|
|||
return ids_;
|
||||
}
|
||||
|
||||
GetAllActiveSnapshotIDsOperation::GetAllActiveSnapshotIDsOperation()
|
||||
: BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound) {
|
||||
GetAllActiveSnapshotIDsOperation::GetAllActiveSnapshotIDsOperation(const RangeContext& context)
|
||||
: BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound), updated_time_range_(context) {
|
||||
}
|
||||
|
||||
Status
|
||||
GetAllActiveSnapshotIDsOperation::DoExecute(StorePtr store) {
|
||||
std::vector<CollectionCommitPtr> ccs;
|
||||
STATUS_CHECK(store->GetActiveResourcesByAttrs<CollectionCommit>(ccs, {meta::F_ID, meta::F_COLLECTON_ID}));
|
||||
/* STATUS_CHECK(store->GetActiveResources<CollectionCommit>(ccs)); */
|
||||
STATUS_CHECK(store->GetActiveResourcesByAttrs<CollectionCommit>(
|
||||
ccs, {meta::F_ID, meta::F_COLLECTON_ID}, updated_time_range_.upper_bound_, updated_time_range_.low_bound_));
|
||||
|
||||
for (auto& cc : ccs) {
|
||||
auto cid = cc->GetCollectionId();
|
||||
auto it = cid_ccid_.find(cid);
|
||||
|
@ -953,6 +954,7 @@ GetAllActiveSnapshotIDsOperation::DoExecute(StorePtr store) {
|
|||
} else {
|
||||
cid_ccid_[cid] = std::max(it->second, cc->GetID());
|
||||
}
|
||||
latest_update_ = std::max(latest_update_, cc->GetUpdatedTime());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <string>
|
||||
|
@ -331,15 +332,22 @@ class GetAllActiveSnapshotIDsOperation : public Operations {
|
|||
public:
|
||||
using BaseT = Operations;
|
||||
|
||||
GetAllActiveSnapshotIDsOperation();
|
||||
explicit GetAllActiveSnapshotIDsOperation(const RangeContext& context);
|
||||
|
||||
Status DoExecute(StorePtr) override;
|
||||
|
||||
const std::map<ID_TYPE, ID_TYPE>&
|
||||
GetIDs() const;
|
||||
|
||||
TS_TYPE
|
||||
GetLatestUpdatedTime() const {
|
||||
return latest_update_;
|
||||
}
|
||||
|
||||
protected:
|
||||
std::map<ID_TYPE, ID_TYPE> cid_ccid_;
|
||||
RangeContext updated_time_range_;
|
||||
TS_TYPE latest_update_ = std::numeric_limits<TS_TYPE>::min();
|
||||
};
|
||||
|
||||
class DropCollectionOperation : public CompoundBaseOperation<DropCollectionOperation> {
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
@ -25,6 +26,11 @@ namespace milvus {
|
|||
namespace engine {
|
||||
namespace snapshot {
|
||||
|
||||
struct RangeContext {
|
||||
TS_TYPE upper_bound_ = std::numeric_limits<TS_TYPE>::max();
|
||||
TS_TYPE low_bound_ = std::numeric_limits<TS_TYPE>::min();
|
||||
};
|
||||
|
||||
struct PartitionContext {
|
||||
std::string name;
|
||||
ID_TYPE id = 0;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "db/snapshot/OperationExecutor.h"
|
||||
#include "db/snapshot/SnapshotPolicyFactory.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
#include "utils/TimerContext.h"
|
||||
#include "value/config/ServerConfig.h"
|
||||
|
||||
|
@ -25,7 +26,7 @@
|
|||
|
||||
namespace milvus::engine::snapshot {
|
||||
|
||||
static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 100 * 1000;
|
||||
static constexpr int DEFAULT_READER_TIMER_INTERVAL_US = 120 * 1000;
|
||||
static constexpr int DEFAULT_WRITER_TIMER_INTERVAL_US = 2000 * 1000;
|
||||
|
||||
Status
|
||||
|
@ -245,9 +246,11 @@ Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) con
|
|||
}
|
||||
|
||||
void
|
||||
Snapshots::OnReaderTimer(const boost::system::error_code& ec) {
|
||||
Snapshots::OnReaderTimer(const boost::system::error_code& ec, TimerContext* timer) {
|
||||
std::chrono::time_point<std::chrono::system_clock> start = std::chrono::system_clock::now();
|
||||
auto op = std::make_shared<GetAllActiveSnapshotIDsOperation>();
|
||||
RangeContext ctx;
|
||||
ctx.low_bound_ = latest_updated_;
|
||||
auto op = std::make_shared<GetAllActiveSnapshotIDsOperation>(ctx);
|
||||
auto status = (*op)(store_);
|
||||
if (!status.ok()) {
|
||||
LOG_SERVER_ERROR_ << "Snapshots::OnReaderTimer::GetAllActiveSnapshotIDsOperation failed: " << status.message();
|
||||
|
@ -260,11 +263,15 @@ Snapshots::OnReaderTimer(const boost::system::error_code& ec) {
|
|||
}
|
||||
return;
|
||||
}
|
||||
|
||||
latest_updated_ = std::max(op->GetLatestUpdatedTime(), latest_updated_.load());
|
||||
|
||||
auto ids = op->GetIDs();
|
||||
ScopedSnapshotT ss;
|
||||
std::set<ID_TYPE> alive_cids;
|
||||
std::set<ID_TYPE> this_invalid_cids;
|
||||
bool diff_found = false;
|
||||
|
||||
for (auto& [cid, ccid] : ids) {
|
||||
status = LoadSnapshot(store_, ss, cid, ccid);
|
||||
if (status.code() == SS_NOT_ACTIVE_ERROR) {
|
||||
|
@ -347,13 +354,20 @@ Snapshots::OnReaderTimer(const boost::system::error_code& ec) {
|
|||
auto exe_time =
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now() - start)
|
||||
.count();
|
||||
if (exe_time > DEFAULT_READER_TIMER_INTERVAL_US) {
|
||||
LOG_ENGINE_WARNING_ << "OnReaderTimer takes too much time: " << exe_time << " us";
|
||||
reader_time_acc_(exe_time);
|
||||
if (boost::accumulators::count(reader_time_acc_) >= 100) {
|
||||
auto acc = reader_time_acc_;
|
||||
reader_time_acc_ = {};
|
||||
auto mean_val = boost::accumulators::mean(acc);
|
||||
auto min_val = boost::accumulators::min(acc);
|
||||
auto max_val = boost::accumulators::max(acc);
|
||||
LOG_SERVER_INFO_ << "OnReaderTimer Stastics [US]: MEAN=" << mean_val << ", MIN=" << min_val
|
||||
<< ", MAX=" << max_val;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Snapshots::OnWriterTimer(const boost::system::error_code& ec) {
|
||||
Snapshots::OnWriterTimer(const boost::system::error_code& ec, TimerContext* timer) {
|
||||
// Single mode
|
||||
if (!config.cluster.enable()) {
|
||||
std::unique_lock<std::shared_timed_mutex> lock(inactive_mtx_);
|
||||
|
@ -392,7 +406,7 @@ Snapshots::RegisterTimers(TimerManager* mgr) {
|
|||
ctx.interval_us = low_limit;
|
||||
}
|
||||
LOG_SERVER_INFO_ << "OnReaderTimer INTERVAL: " << ctx.interval_us << " US";
|
||||
ctx.handler = std::bind(&Snapshots::OnReaderTimer, this, std::placeholders::_1);
|
||||
ctx.handler = std::bind(&Snapshots::OnReaderTimer, this, std::placeholders::_1, std::placeholders::_2);
|
||||
mgr->AddTimer(ctx);
|
||||
} else {
|
||||
TimerContext::Context ctx;
|
||||
|
@ -410,7 +424,7 @@ Snapshots::RegisterTimers(TimerManager* mgr) {
|
|||
}
|
||||
LOG_SERVER_INFO_ << "OnWriterTimer INTERVAL: " << ctx.interval_us << " US";
|
||||
|
||||
ctx.handler = std::bind(&Snapshots::OnWriterTimer, this, std::placeholders::_1);
|
||||
ctx.handler = std::bind(&Snapshots::OnWriterTimer, this, std::placeholders::_1, std::placeholders::_2);
|
||||
mgr->AddTimer(ctx);
|
||||
}
|
||||
return Status::OK();
|
||||
|
|
|
@ -11,7 +11,15 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <boost/accumulators/accumulators.hpp>
|
||||
#include <boost/accumulators/statistics/count.hpp>
|
||||
#include <boost/accumulators/statistics/max.hpp>
|
||||
#include <boost/accumulators/statistics/mean.hpp>
|
||||
#include <boost/accumulators/statistics/min.hpp>
|
||||
#include <boost/accumulators/statistics/stats.hpp>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
@ -88,9 +96,9 @@ class Snapshots {
|
|||
DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn);
|
||||
|
||||
void
|
||||
OnReaderTimer(const boost::system::error_code&);
|
||||
OnReaderTimer(const boost::system::error_code&, TimerContext*);
|
||||
void
|
||||
OnWriterTimer(const boost::system::error_code&);
|
||||
OnWriterTimer(const boost::system::error_code&, TimerContext*);
|
||||
|
||||
Status
|
||||
LoadNoLock(StorePtr store, ID_TYPE collection_id, SnapshotHolderPtr& holder);
|
||||
|
@ -105,6 +113,14 @@ class Snapshots {
|
|||
std::map<ID_TYPE, SnapshotHolderPtr> inactive_holders_;
|
||||
std::set<ID_TYPE> invalid_ssid_;
|
||||
StorePtr store_;
|
||||
|
||||
std::atomic<TS_TYPE> latest_updated_ = std::numeric_limits<TS_TYPE>::min();
|
||||
boost::accumulators::accumulator_set<
|
||||
double, boost::accumulators::stats<boost::accumulators::tag::count, boost::accumulators::tag::mean,
|
||||
boost::accumulators::tag::max, boost::accumulators::tag::min>
|
||||
/* boost::accumulators::features<boost::accumulators::tag::count> */
|
||||
>
|
||||
reader_time_acc_;
|
||||
};
|
||||
|
||||
} // namespace milvus::engine::snapshot
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
#include "codecs/Codec.h"
|
||||
#include "db/Utils.h"
|
||||
#include "db/meta/MetaFactory.h"
|
||||
#include "db/meta/condition/MetaFilter.h"
|
||||
#include "db/meta/condition/MetaRelation.h"
|
||||
#include "db/snapshot/ResourceContext.h"
|
||||
#include "db/snapshot/ResourceHelper.h"
|
||||
#include "db/snapshot/ResourceTypes.h"
|
||||
|
@ -31,6 +33,7 @@
|
|||
#include <functional>
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
|
@ -181,9 +184,16 @@ class Store : public std::enable_shared_from_this<Store> {
|
|||
template <typename ResourceT>
|
||||
Status
|
||||
GetActiveResourcesByAttrs(std::vector<typename ResourceT::Ptr>& return_vs,
|
||||
const std::vector<std::string>& target_attrs) {
|
||||
const std::vector<std::string>& target_attrs, int64_t upper_bound, int64_t low_bound) {
|
||||
std::vector<State> filter_states = {State::ACTIVE};
|
||||
return adapter_->SelectByAttrs<ResourceT>(StateField::Name, filter_states, target_attrs, return_vs);
|
||||
auto relation = meta::ONE_(meta::Range_<ResourceT, StateField>(meta::Range::EQ, State::ACTIVE));
|
||||
if (upper_bound < std::numeric_limits<TS_TYPE>::max()) {
|
||||
relation = meta::AND_(relation, meta::Range_<ResourceT, UpdatedOnField>(meta::Range::LTE, upper_bound));
|
||||
}
|
||||
if (low_bound > std::numeric_limits<TS_TYPE>::min()) {
|
||||
relation = meta::AND_(relation, meta::Range_<ResourceT, UpdatedOnField>(meta::Range::GTE, low_bound));
|
||||
}
|
||||
return adapter_->Query<ResourceT>(relation, return_vs);
|
||||
}
|
||||
|
||||
template <typename ResourceT>
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
namespace milvus {
|
||||
|
||||
struct TimerContext {
|
||||
using HandlerT = std::function<void(const boost::system::error_code&)>;
|
||||
using HandlerT = std::function<void(const boost::system::error_code&, TimerContext* ctx)>;
|
||||
struct Context {
|
||||
/* Context(int interval_us, HandlerT& handler, ThreadPoolPtr pool = nullptr) */
|
||||
/* : interval_(interval_us), handler_(handler), timer_(io, interval_), pool_(pool) { */
|
||||
|
@ -58,7 +58,7 @@ struct TimerContext {
|
|||
inline void
|
||||
TimerContext::Reschedule(const boost::system::error_code& ec) {
|
||||
try {
|
||||
pool_->enqueue(handler_, ec);
|
||||
pool_->enqueue(handler_, ec, this);
|
||||
} catch (std::exception& ex) {
|
||||
LOG_SERVER_ERROR_ << "Fail to enqueue handler: " << std::string(ex.what());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue