From f7c1d0ad0dfafc573162632d99af99393a8ac448 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Thu, 2 Jul 2020 21:09:49 +0800 Subject: [PATCH] snapshot integration (#2716) * code opt Signed-off-by: yudong.cai * add some APIs for SSDBImpl Signed-off-by: yudong.cai --- core/src/db/DB.h | 3 - core/src/db/DBImpl.h | 43 +----- core/src/db/SSDBImpl.cpp | 217 +++++++++++++++++++++------ core/src/db/SSDBImpl.h | 38 ++++- core/src/db/SimpleWaitNotify.h | 62 ++++++++ core/src/db/snapshot/ResourceTypes.h | 4 +- 6 files changed, 267 insertions(+), 100 deletions(-) create mode 100644 core/src/db/SimpleWaitNotify.h diff --git a/core/src/db/DB.h b/core/src/db/DB.h index 30682b5911..6da0e5b978 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -11,7 +11,6 @@ #pragma once -#include #include #include #include @@ -28,8 +27,6 @@ namespace milvus { namespace engine { -class Env; - class DB { public: DB() = default; diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 5d6fca5d84..f9a201943e 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -12,7 +12,6 @@ #pragma once #include -#include #include #include #include @@ -27,6 +26,7 @@ #include "config/handler/EngineConfigHandler.h" #include "db/DB.h" #include "db/IndexFailedChecker.h" +#include "db/SimpleWaitNotify.h" #include "db/Types.h" #include "db/insert/MemManager.h" #include "db/merge/MergeManager.h" @@ -325,47 +325,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi std::thread bg_metric_thread_; std::thread bg_index_thread_; - struct SimpleWaitNotify { - bool notified_ = false; - std::mutex mutex_; - std::condition_variable cv_; - - void - Wait() { - std::unique_lock lck(mutex_); - if (!notified_) { - cv_.wait(lck); - } - notified_ = false; - } - - void - Wait_Until(const std::chrono::system_clock::time_point& tm_pint) { - std::unique_lock lck(mutex_); - if (!notified_) { - cv_.wait_until(lck, tm_pint); - } - notified_ = false; - } - - void - Wait_For(const std::chrono::system_clock::duration& tm_dur) { - std::unique_lock lck(mutex_); - if (!notified_) { - cv_.wait_for(lck, tm_dur); - } - notified_ = false; - } - - void - Notify() { - std::unique_lock lck(mutex_); - notified_ = true; - lck.unlock(); - cv_.notify_one(); - } - }; - SimpleWaitNotify swn_wal_; SimpleWaitNotify swn_flush_; SimpleWaitNotify swn_metric_; diff --git a/core/src/db/SSDBImpl.cpp b/core/src/db/SSDBImpl.cpp index 5b2be9302b..f390eb56f4 100644 --- a/core/src/db/SSDBImpl.cpp +++ b/core/src/db/SSDBImpl.cpp @@ -10,10 +10,16 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/SSDBImpl.h" +#include "cache/CpuCacheMgr.h" #include "db/snapshot/CompoundOperations.h" +#include "db/snapshot/ResourceTypes.h" #include "db/snapshot/Snapshots.h" +#include "metrics/Metrics.h" +#include "metrics/SystemInfo.h" +#include "utils/Exception.h" #include "wal/WalDefinations.h" +#include #include #include @@ -21,6 +27,8 @@ namespace milvus { namespace engine { namespace { +constexpr int64_t BACKGROUND_METRIC_INTERVAL = 1; + static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!"); } // namespace @@ -45,9 +53,9 @@ SSDBImpl::~SSDBImpl() { Stop(); } -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// external api -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +// External APIs +//////////////////////////////////////////////////////////////////////////////// Status SSDBImpl::Start() { if (initialized_.load(std::memory_order_acquire)) { @@ -77,15 +85,11 @@ SSDBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) { CHECK_INITIALIZED; auto ctx = context; - if (options_.wal_enable_) { ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName()); } - auto op = std::make_shared(ctx); - auto status = op->Push(); - - return status; + return op->Push(); } Status @@ -94,41 +98,32 @@ SSDBImpl::DescribeCollection(const std::string& collection_name, snapshot::Colle CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); collection = ss->GetCollection(); - auto& fields = ss->GetResources(); for (auto& kv : fields) { fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName()); } - return status; + return Status::OK(); } Status SSDBImpl::DropCollection(const std::string& name) { CHECK_INITIALIZED; - // dates partly delete files of the collection but currently we don't support LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << name; snapshot::ScopedSnapshotT ss; auto& snapshots = snapshot::Snapshots::GetInstance(); - auto status = snapshots.GetSnapshot(ss, name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshots.GetSnapshot(ss, name)); if (options_.wal_enable_) { // SS TODO /* wal_mgr_->DropCollection(ss->GetCollectionId()); */ } - status = snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits::max()); - return status; + return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits::max()); } Status @@ -154,13 +149,10 @@ Status SSDBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) { CHECK_INITIALIZED; - uint64_t lsn = 0; snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); + snapshot::LSN_TYPE lsn = 0; if (options_.wal_enable_) { // SS TODO /* lsn = wal_mgr_->CreatePartition(collection_id, partition_tag); */ @@ -175,13 +167,8 @@ SSDBImpl::CreatePartition(const std::string& collection_name, const std::string& snapshot::PartitionContext p_ctx; p_ctx.name = partition_name; snapshot::PartitionPtr partition; - status = op->CommitNewPartition(p_ctx, partition); - if (!status.ok()) { - return status; - } - - status = op->Push(); - return status; + STATUS_CHECK(op->CommitNewPartition(p_ctx, partition)); + return op->Push(); } Status @@ -189,10 +176,7 @@ SSDBImpl::DropPartition(const std::string& collection_name, const std::string& p CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); // SS TODO: Is below step needed? Or How to implement it? /* mem_mgr_->EraseMemVector(partition_name); */ @@ -200,9 +184,7 @@ SSDBImpl::DropPartition(const std::string& collection_name, const std::string& p snapshot::PartitionContext context; context.name = partition_name; auto op = std::make_shared(context, ss); - status = op->Push(); - - return status; + return op->Push(); } Status @@ -210,13 +192,10 @@ SSDBImpl::ShowPartitions(const std::string& collection_name, std::vectorGetPartitionNames()); - return status; + return Status::OK(); } Status @@ -225,10 +204,7 @@ SSDBImpl::PreloadCollection(const std::shared_ptr& context, con CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; - auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name); - if (!status.ok()) { - return status; - } + STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name)); auto handler = std::make_shared(context, ss); handler->Iterate(); @@ -236,5 +212,148 @@ SSDBImpl::PreloadCollection(const std::shared_ptr& context, con return handler->GetStatus(); } +//////////////////////////////////////////////////////////////////////////////// +// Internal APIs +//////////////////////////////////////////////////////////////////////////////// +void +SSDBImpl::InternalFlush(const std::string& collection_id) { + wal::MXLogRecord record; + record.type = wal::MXLogType::Flush; + record.collection_id = collection_id; + ExecWalRecord(record); +} + +void +SSDBImpl::BackgroundFlushThread() { + SetThreadName("flush_thread"); + server::SystemInfo::GetInstance().Init(); + while (true) { + if (!initialized_.load(std::memory_order_acquire)) { + LOG_ENGINE_DEBUG_ << "DB background flush thread exit"; + break; + } + + InternalFlush(); + if (options_.auto_flush_interval_ > 0) { + swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_)); + } else { + swn_flush_.Wait(); + } + } +} + +void +SSDBImpl::StartMetricTask() { + server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL); + int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); + int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity(); + fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0); + + if (cache_total > 0) { + double cache_usage_double = cache_usage; + server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total); + } else { + server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0); + } + + server::Metrics::GetInstance().GpuCacheUsageGaugeSet(); + /* SS TODO */ + // uint64_t size; + // Size(size); + // server::Metrics::GetInstance().DataFileSizeGaugeSet(size); + server::Metrics::GetInstance().CPUUsagePercentSet(); + server::Metrics::GetInstance().RAMUsagePercentSet(); + server::Metrics::GetInstance().GPUPercentGaugeSet(); + server::Metrics::GetInstance().GPUMemoryUsageGaugeSet(); + server::Metrics::GetInstance().OctetsSet(); + + server::Metrics::GetInstance().CPUCoreUsagePercentSet(); + server::Metrics::GetInstance().GPUTemperature(); + server::Metrics::GetInstance().CPUTemperature(); + server::Metrics::GetInstance().PushToGateway(); +} + +void +SSDBImpl::BackgroundMetricThread() { + SetThreadName("metric_thread"); + server::SystemInfo::GetInstance().Init(); + while (true) { + if (!initialized_.load(std::memory_order_acquire)) { + LOG_ENGINE_DEBUG_ << "DB background metric thread exit"; + break; + } + + swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL)); + StartMetricTask(); + meta::FilesHolder::PrintInfo(); + } +} + +Status +SSDBImpl::ExecWalRecord(const wal::MXLogRecord& record) { + return Status::OK(); +} + +void +SSDBImpl::BackgroundWalThread() { + SetThreadName("wal_thread"); + server::SystemInfo::GetInstance().Init(); + + std::chrono::system_clock::time_point next_auto_flush_time; + auto get_next_auto_flush_time = [&]() { + return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_); + }; + if (options_.auto_flush_interval_ > 0) { + next_auto_flush_time = get_next_auto_flush_time(); + } + + InternalFlush(); + while (true) { + if (options_.auto_flush_interval_ > 0) { + if (std::chrono::system_clock::now() >= next_auto_flush_time) { + InternalFlush(); + next_auto_flush_time = get_next_auto_flush_time(); + } + } + + wal::MXLogRecord record; + auto error_code = wal_mgr_->GetNextRecord(record); + if (error_code != WAL_SUCCESS) { + LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error"; + break; + } + + if (record.type != wal::MXLogType::None) { + ExecWalRecord(record); + if (record.type == wal::MXLogType::Flush) { + // notify flush request to return + flush_req_swn_.Notify(); + + // if user flush all manually, update auto flush also + if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) { + next_auto_flush_time = get_next_auto_flush_time(); + } + } + + } else { + if (!initialized_.load(std::memory_order_acquire)) { + InternalFlush(); + flush_req_swn_.Notify(); + // SS TODO + // WaitMergeFileFinish(); + // WaitBuildIndexFinish(); + LOG_ENGINE_DEBUG_ << "WAL background thread exit"; + break; + } + + if (options_.auto_flush_interval_ > 0) { + swn_wal_.Wait_Until(next_auto_flush_time); + } else { + swn_wal_.Wait(); + } + } + } +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/SSDBImpl.h b/core/src/db/SSDBImpl.h index 1cc53387c5..a735b4e35f 100644 --- a/core/src/db/SSDBImpl.h +++ b/core/src/db/SSDBImpl.h @@ -18,6 +18,7 @@ #include #include "db/Options.h" +#include "db/SimpleWaitNotify.h" #include "db/SnapshotHandlers.h" #include "db/snapshot/Context.h" #include "db/snapshot/ResourceTypes.h" @@ -32,6 +33,14 @@ class SSDBImpl { public: explicit SSDBImpl(const DBOptions& options); + ~SSDBImpl(); + + Status + Start(); + + Status + Stop(); + Status CreateCollection(const snapshot::CreateCollectionContext& context); @@ -61,18 +70,39 @@ class SSDBImpl { Status ShowPartitions(const std::string& collection_name, std::vector& partition_names); - ~SSDBImpl(); + private: + void + InternalFlush(const std::string& collection_id = ""); + + void + BackgroundFlushThread(); + + void + StartMetricTask(); + + void + BackgroundMetricThread(); Status - Start(); + ExecWalRecord(const wal::MXLogRecord& record); - Status - Stop(); + void + BackgroundWalThread(); private: DBOptions options_; std::atomic initialized_; + std::shared_ptr wal_mgr_; + std::shared_ptr bg_wal_thread_; + + std::shared_ptr bg_flush_thread_; + + SimpleWaitNotify swn_wal_; + SimpleWaitNotify swn_flush_; + SimpleWaitNotify swn_metric_; + + SimpleWaitNotify flush_req_swn_; }; // SSDBImpl } // namespace engine diff --git a/core/src/db/SimpleWaitNotify.h b/core/src/db/SimpleWaitNotify.h new file mode 100644 index 0000000000..a92a8ccee7 --- /dev/null +++ b/core/src/db/SimpleWaitNotify.h @@ -0,0 +1,62 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include + +namespace milvus { +namespace engine { + +struct SimpleWaitNotify { + bool notified_ = false; + std::mutex mutex_; + std::condition_variable cv_; + + void + Wait() { + std::unique_lock lck(mutex_); + if (!notified_) { + cv_.wait(lck); + } + notified_ = false; + } + + void + Wait_Until(const std::chrono::system_clock::time_point& tm_pint) { + std::unique_lock lck(mutex_); + if (!notified_) { + cv_.wait_until(lck, tm_pint); + } + notified_ = false; + } + + void + Wait_For(const std::chrono::system_clock::duration& tm_dur) { + std::unique_lock lck(mutex_); + if (!notified_) { + cv_.wait_for(lck, tm_dur); + } + notified_ = false; + } + + void + Notify() { + std::unique_lock lck(mutex_); + notified_ = true; + lck.unlock(); + cv_.notify_one(); + } +}; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/snapshot/ResourceTypes.h b/core/src/db/snapshot/ResourceTypes.h index 21b3ce5086..7c74fd0d18 100644 --- a/core/src/db/snapshot/ResourceTypes.h +++ b/core/src/db/snapshot/ResourceTypes.h @@ -23,8 +23,8 @@ using ID_TYPE = int64_t; using NUM_TYPE = int64_t; using FTYPE_TYPE = int64_t; using TS_TYPE = int64_t; -using LSN_TYPE = uint64_t; -using SIZE_TYPE = uint64_t; +using LSN_TYPE = int64_t; +using SIZE_TYPE = int64_t; using MappingT = std::set; enum FieldType { VECTOR, INT32 };