snapshot integration (#2716)

* code opt

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add some APIs for SSDBImpl

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/2718/head
Cai Yudong 2020-07-02 21:09:49 +08:00 committed by GitHub
parent a5f6a7735c
commit f7c1d0ad0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 267 additions and 100 deletions

View File

@ -11,7 +11,6 @@
#pragma once
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
@ -28,8 +27,6 @@
namespace milvus {
namespace engine {
class Env;
class DB {
public:
DB() = default;

View File

@ -12,7 +12,6 @@
#pragma once
#include <atomic>
#include <condition_variable>
#include <list>
#include <map>
#include <memory>
@ -27,6 +26,7 @@
#include "config/handler/EngineConfigHandler.h"
#include "db/DB.h"
#include "db/IndexFailedChecker.h"
#include "db/SimpleWaitNotify.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "db/merge/MergeManager.h"
@ -325,47 +325,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
std::thread bg_metric_thread_;
std::thread bg_index_thread_;
struct SimpleWaitNotify {
bool notified_ = false;
std::mutex mutex_;
std::condition_variable cv_;
void
Wait() {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait(lck);
}
notified_ = false;
}
void
Wait_Until(const std::chrono::system_clock::time_point& tm_pint) {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait_until(lck, tm_pint);
}
notified_ = false;
}
void
Wait_For(const std::chrono::system_clock::duration& tm_dur) {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait_for(lck, tm_dur);
}
notified_ = false;
}
void
Notify() {
std::unique_lock<std::mutex> lck(mutex_);
notified_ = true;
lck.unlock();
cv_.notify_one();
}
};
SimpleWaitNotify swn_wal_;
SimpleWaitNotify swn_flush_;
SimpleWaitNotify swn_metric_;

View File

@ -10,10 +10,16 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/SSDBImpl.h"
#include "cache/CpuCacheMgr.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
#include "metrics/Metrics.h"
#include "metrics/SystemInfo.h"
#include "utils/Exception.h"
#include "wal/WalDefinations.h"
#include <fiu-local.h>
#include <limits>
#include <utility>
@ -21,6 +27,8 @@ namespace milvus {
namespace engine {
namespace {
constexpr int64_t BACKGROUND_METRIC_INTERVAL = 1;
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
} // namespace
@ -45,9 +53,9 @@ SSDBImpl::~SSDBImpl() {
Stop();
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// external api
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// External APIs
////////////////////////////////////////////////////////////////////////////////
Status
SSDBImpl::Start() {
if (initialized_.load(std::memory_order_acquire)) {
@ -77,15 +85,11 @@ SSDBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
CHECK_INITIALIZED;
auto ctx = context;
if (options_.wal_enable_) {
ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName());
}
auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
auto status = op->Push();
return status;
return op->Push();
}
Status
@ -94,41 +98,32 @@ SSDBImpl::DescribeCollection(const std::string& collection_name, snapshot::Colle
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
collection = ss->GetCollection();
auto& fields = ss->GetResources<snapshot::Field>();
for (auto& kv : fields) {
fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName());
}
return status;
return Status::OK();
}
Status
SSDBImpl::DropCollection(const std::string& name) {
CHECK_INITIALIZED;
// dates partly delete files of the collection but currently we don't support
LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << name;
snapshot::ScopedSnapshotT ss;
auto& snapshots = snapshot::Snapshots::GetInstance();
auto status = snapshots.GetSnapshot(ss, name);
if (!status.ok()) {
return status;
}
STATUS_CHECK(snapshots.GetSnapshot(ss, name));
if (options_.wal_enable_) {
// SS TODO
/* wal_mgr_->DropCollection(ss->GetCollectionId()); */
}
status = snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
return status;
return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
}
Status
@ -154,13 +149,10 @@ Status
SSDBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
CHECK_INITIALIZED;
uint64_t lsn = 0;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
snapshot::LSN_TYPE lsn = 0;
if (options_.wal_enable_) {
// SS TODO
/* lsn = wal_mgr_->CreatePartition(collection_id, partition_tag); */
@ -175,13 +167,8 @@ SSDBImpl::CreatePartition(const std::string& collection_name, const std::string&
snapshot::PartitionContext p_ctx;
p_ctx.name = partition_name;
snapshot::PartitionPtr partition;
status = op->CommitNewPartition(p_ctx, partition);
if (!status.ok()) {
return status;
}
status = op->Push();
return status;
STATUS_CHECK(op->CommitNewPartition(p_ctx, partition));
return op->Push();
}
Status
@ -189,10 +176,7 @@ SSDBImpl::DropPartition(const std::string& collection_name, const std::string& p
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
// SS TODO: Is below step needed? Or How to implement it?
/* mem_mgr_->EraseMemVector(partition_name); */
@ -200,9 +184,7 @@ SSDBImpl::DropPartition(const std::string& collection_name, const std::string& p
snapshot::PartitionContext context;
context.name = partition_name;
auto op = std::make_shared<snapshot::DropPartitionOperation>(context, ss);
status = op->Push();
return status;
return op->Push();
}
Status
@ -210,13 +192,10 @@ SSDBImpl::ShowPartitions(const std::string& collection_name, std::vector<std::st
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
partition_names = std::move(ss->GetPartitionNames());
return status;
return Status::OK();
}
Status
@ -225,10 +204,7 @@ SSDBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, con
CHECK_INITIALIZED;
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
return status;
}
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
auto handler = std::make_shared<LoadVectorFieldHandler>(context, ss);
handler->Iterate();
@ -236,5 +212,148 @@ SSDBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, con
return handler->GetStatus();
}
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
void
SSDBImpl::InternalFlush(const std::string& collection_id) {
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
record.collection_id = collection_id;
ExecWalRecord(record);
}
void
SSDBImpl::BackgroundFlushThread() {
SetThreadName("flush_thread");
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
break;
}
InternalFlush();
if (options_.auto_flush_interval_ > 0) {
swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
} else {
swn_flush_.Wait();
}
}
}
void
SSDBImpl::StartMetricTask() {
server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);
if (cache_total > 0) {
double cache_usage_double = cache_usage;
server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
} else {
server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
}
server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
/* SS TODO */
// uint64_t size;
// Size(size);
// server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
server::Metrics::GetInstance().CPUUsagePercentSet();
server::Metrics::GetInstance().RAMUsagePercentSet();
server::Metrics::GetInstance().GPUPercentGaugeSet();
server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
server::Metrics::GetInstance().OctetsSet();
server::Metrics::GetInstance().CPUCoreUsagePercentSet();
server::Metrics::GetInstance().GPUTemperature();
server::Metrics::GetInstance().CPUTemperature();
server::Metrics::GetInstance().PushToGateway();
}
void
SSDBImpl::BackgroundMetricThread() {
SetThreadName("metric_thread");
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
break;
}
swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
StartMetricTask();
meta::FilesHolder::PrintInfo();
}
}
Status
SSDBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return Status::OK();
}
void
SSDBImpl::BackgroundWalThread() {
SetThreadName("wal_thread");
server::SystemInfo::GetInstance().Init();
std::chrono::system_clock::time_point next_auto_flush_time;
auto get_next_auto_flush_time = [&]() {
return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
};
if (options_.auto_flush_interval_ > 0) {
next_auto_flush_time = get_next_auto_flush_time();
}
InternalFlush();
while (true) {
if (options_.auto_flush_interval_ > 0) {
if (std::chrono::system_clock::now() >= next_auto_flush_time) {
InternalFlush();
next_auto_flush_time = get_next_auto_flush_time();
}
}
wal::MXLogRecord record;
auto error_code = wal_mgr_->GetNextRecord(record);
if (error_code != WAL_SUCCESS) {
LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
break;
}
if (record.type != wal::MXLogType::None) {
ExecWalRecord(record);
if (record.type == wal::MXLogType::Flush) {
// notify flush request to return
flush_req_swn_.Notify();
// if user flush all manually, update auto flush also
if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
next_auto_flush_time = get_next_auto_flush_time();
}
}
} else {
if (!initialized_.load(std::memory_order_acquire)) {
InternalFlush();
flush_req_swn_.Notify();
// SS TODO
// WaitMergeFileFinish();
// WaitBuildIndexFinish();
LOG_ENGINE_DEBUG_ << "WAL background thread exit";
break;
}
if (options_.auto_flush_interval_ > 0) {
swn_wal_.Wait_Until(next_auto_flush_time);
} else {
swn_wal_.Wait();
}
}
}
}
} // namespace engine
} // namespace milvus

View File

@ -18,6 +18,7 @@
#include <vector>
#include "db/Options.h"
#include "db/SimpleWaitNotify.h"
#include "db/SnapshotHandlers.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/ResourceTypes.h"
@ -32,6 +33,14 @@ class SSDBImpl {
public:
explicit SSDBImpl(const DBOptions& options);
~SSDBImpl();
Status
Start();
Status
Stop();
Status
CreateCollection(const snapshot::CreateCollectionContext& context);
@ -61,18 +70,39 @@ class SSDBImpl {
Status
ShowPartitions(const std::string& collection_name, std::vector<std::string>& partition_names);
~SSDBImpl();
private:
void
InternalFlush(const std::string& collection_id = "");
void
BackgroundFlushThread();
void
StartMetricTask();
void
BackgroundMetricThread();
Status
Start();
ExecWalRecord(const wal::MXLogRecord& record);
Status
Stop();
void
BackgroundWalThread();
private:
DBOptions options_;
std::atomic<bool> initialized_;
std::shared_ptr<wal::WalManager> wal_mgr_;
std::shared_ptr<std::thread> bg_wal_thread_;
std::shared_ptr<std::thread> bg_flush_thread_;
SimpleWaitNotify swn_wal_;
SimpleWaitNotify swn_flush_;
SimpleWaitNotify swn_metric_;
SimpleWaitNotify flush_req_swn_;
}; // SSDBImpl
} // namespace engine

View File

@ -0,0 +1,62 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <condition_variable>
#include <mutex>
namespace milvus {
namespace engine {
struct SimpleWaitNotify {
bool notified_ = false;
std::mutex mutex_;
std::condition_variable cv_;
void
Wait() {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait(lck);
}
notified_ = false;
}
void
Wait_Until(const std::chrono::system_clock::time_point& tm_pint) {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait_until(lck, tm_pint);
}
notified_ = false;
}
void
Wait_For(const std::chrono::system_clock::duration& tm_dur) {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait_for(lck, tm_dur);
}
notified_ = false;
}
void
Notify() {
std::unique_lock<std::mutex> lck(mutex_);
notified_ = true;
lck.unlock();
cv_.notify_one();
}
};
} // namespace engine
} // namespace milvus

View File

@ -23,8 +23,8 @@ using ID_TYPE = int64_t;
using NUM_TYPE = int64_t;
using FTYPE_TYPE = int64_t;
using TS_TYPE = int64_t;
using LSN_TYPE = uint64_t;
using SIZE_TYPE = uint64_t;
using LSN_TYPE = int64_t;
using SIZE_TYPE = int64_t;
using MappingT = std::set<ID_TYPE>;
enum FieldType { VECTOR, INT32 };