From 172cd21daf644dc523c06af95fd428719ce6a693 Mon Sep 17 00:00:00 2001 From: zhiru Date: Fri, 5 Jul 2019 15:03:40 +0800 Subject: [PATCH 1/6] add mem impl Former-commit-id: 074f1ade11572923ddee2653c26ce6a143001b3c --- cpp/src/db/Constants.h | 20 ++++ cpp/src/db/MemTable.cpp | 51 ++++++++++ cpp/src/db/MemTable.h | 40 ++++++++ cpp/src/db/MemTableFile.cpp | 66 +++++++++++++ cpp/src/db/MemTableFile.h | 44 +++++++++ cpp/src/db/VectorSource.cpp | 60 +++++++++++ cpp/src/db/VectorSource.h | 41 ++++++++ cpp/unittest/db/mem_test.cpp | 187 +++++++++++++++++++++++++++++++++++ 8 files changed, 509 insertions(+) create mode 100644 cpp/src/db/Constants.h create mode 100644 cpp/src/db/MemTable.cpp create mode 100644 cpp/src/db/MemTable.h create mode 100644 cpp/src/db/MemTableFile.cpp create mode 100644 cpp/src/db/MemTableFile.h create mode 100644 cpp/src/db/VectorSource.cpp create mode 100644 cpp/src/db/VectorSource.h create mode 100644 cpp/unittest/db/mem_test.cpp diff --git a/cpp/src/db/Constants.h b/cpp/src/db/Constants.h new file mode 100644 index 0000000000..2bb2e0a064 --- /dev/null +++ b/cpp/src/db/Constants.h @@ -0,0 +1,20 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +namespace zilliz { +namespace milvus { +namespace engine { + +const size_t K = 1024UL; +const size_t M = K*K; +const size_t MAX_TABLE_FILE_MEM = 128 * M; + +const int VECTOR_TYPE_SIZE = sizeof(float); + +} // namespace engine +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp new file mode 100644 index 0000000000..032d479999 --- /dev/null +++ b/cpp/src/db/MemTable.cpp @@ -0,0 +1,51 @@ +#include "MemTable.h" +#include "Log.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +MemTable::MemTable(const std::string& table_id, + const std::shared_ptr& meta) : + table_id_(table_id), + meta_(meta) { + +} + +Status MemTable::Add(VectorSource::Ptr& source) { + while (!source->AllAdded()) { + MemTableFile::Ptr currentMemTableFile; + if (!mem_table_file_stack_.empty()) { + currentMemTableFile = mem_table_file_stack_.top(); + } + Status status; + if (mem_table_file_stack_.empty() || currentMemTableFile->isFull()) { + MemTableFile::Ptr newMemTableFile = std::make_shared(table_id_, meta_); + status = newMemTableFile->Add(source); + if (status.ok()) { + mem_table_file_stack_.push(newMemTableFile); + } + } + else { + status = currentMemTableFile->Add(source); + } + if (!status.ok()) { + std::string errMsg = "MemTable::Add failed: " + status.ToString(); + ENGINE_LOG_ERROR << errMsg; + return Status::Error(errMsg); + } + } + return Status::OK(); +} + +void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) { + mem_table_file = mem_table_file_stack_.top(); +} + +size_t MemTable::GetStackSize() { + return mem_table_file_stack_.size(); +} + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h new file mode 100644 index 0000000000..b9fe4147d8 --- /dev/null +++ b/cpp/src/db/MemTable.h @@ -0,0 +1,40 @@ +#pragma once + +#include "Status.h" +#include "MemTableFile.h" +#include "VectorSource.h" + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +class MemTable { + +public: + + using Ptr = std::shared_ptr; + using MemTableFileStack = std::stack; + using MetaPtr = meta::Meta::Ptr; + + MemTable(const std::string& table_id, const std::shared_ptr& meta); + + Status Add(VectorSource::Ptr& source); + + void GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file); + + size_t GetStackSize(); + +private: + const std::string table_id_; + + MemTableFileStack mem_table_file_stack_; + + MetaPtr meta_; + +}; //MemTable + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTableFile.cpp b/cpp/src/db/MemTableFile.cpp new file mode 100644 index 0000000000..26bc0d38e9 --- /dev/null +++ b/cpp/src/db/MemTableFile.cpp @@ -0,0 +1,66 @@ +#include "MemTableFile.h" +#include "Constants.h" +#include "Log.h" + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +MemTableFile::MemTableFile(const std::string& table_id, + const std::shared_ptr& meta) : + table_id_(table_id), + meta_(meta) { + + current_mem_ = 0; + CreateTableFile(); +} + +Status MemTableFile::CreateTableFile() { + + meta::TableFileSchema table_file_schema; + table_file_schema.table_id_ = table_id_; + auto status = meta_->CreateTableFile(table_file_schema); + if (status.ok()) { + table_file_schema_ = table_file_schema; + } + else { + std::string errMsg = "MemTableFile::CreateTableFile failed: " + status.ToString(); + ENGINE_LOG_ERROR << errMsg; + } + return status; +} + +Status MemTableFile::Add(const VectorSource::Ptr& source) { + + size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; + size_t memLeft = GetMemLeft(); + if (memLeft >= singleVectorMemSize) { + size_t numVectorsToAdd = std::ceil(memLeft / singleVectorMemSize); + size_t numVectorsAdded; + auto status = source->Add(table_file_schema_, numVectorsToAdd, numVectorsAdded); + if (status.ok()) { + current_mem_ += (numVectorsAdded * singleVectorMemSize); + } + return status; + } + return Status::OK(); +} + +size_t MemTableFile::GetCurrentMem() { + return current_mem_; +} + +size_t MemTableFile::GetMemLeft() { + return (MAX_TABLE_FILE_MEM - current_mem_); +} + +bool MemTableFile::isFull() { + size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; + return (GetMemLeft() < singleVectorMemSize); +} + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTableFile.h b/cpp/src/db/MemTableFile.h new file mode 100644 index 0000000000..1efe4c0bfe --- /dev/null +++ b/cpp/src/db/MemTableFile.h @@ -0,0 +1,44 @@ +#pragma once + +#include "Status.h" +#include "Meta.h" +#include "VectorSource.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class MemTableFile { + +public: + + using Ptr = std::shared_ptr; + using MetaPtr = meta::Meta::Ptr; + + MemTableFile(const std::string& table_id, const std::shared_ptr& meta); + + Status Add(const VectorSource::Ptr& source); + + size_t GetCurrentMem(); + + size_t GetMemLeft(); + + bool isFull(); + +private: + + Status CreateTableFile(); + + const std::string table_id_; + + meta::TableFileSchema table_file_schema_; + + MetaPtr meta_; + + size_t current_mem_; + +}; //MemTableFile + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/VectorSource.cpp b/cpp/src/db/VectorSource.cpp new file mode 100644 index 0000000000..dff5423c6f --- /dev/null +++ b/cpp/src/db/VectorSource.cpp @@ -0,0 +1,60 @@ +#include "VectorSource.h" +#include "ExecutionEngine.h" +#include "EngineFactory.h" +#include "Log.h" + +namespace zilliz { +namespace milvus { +namespace engine { + + +VectorSource::VectorSource(const size_t &n, + const float *vectors) : + n_(n), + vectors_(vectors), + id_generator_(new SimpleIDGenerator()) { + current_num_vectors_added = 0; +} + +Status VectorSource::Add(const meta::TableFileSchema& table_file_schema, const size_t& num_vectors_to_add, size_t& num_vectors_added) { + + if (table_file_schema.dimension_ <= 0) { + std::string errMsg = "VectorSource::Add: table_file_schema dimension = " + + std::to_string(table_file_schema.dimension_) + ", table_id = " + table_file_schema.table_id_; + ENGINE_LOG_ERROR << errMsg; + return Status::Error(errMsg); + } + ExecutionEnginePtr engine = EngineFactory::Build(table_file_schema.dimension_, + table_file_schema.location_, + (EngineType)table_file_schema.engine_type_); + + num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added; + IDNumbers vector_ids_to_add; + id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add); + Status status = engine->AddWithIds(num_vectors_added, vectors_ + current_num_vectors_added, vector_ids_to_add.data()); + if (status.ok()) { + current_num_vectors_added += num_vectors_added; + vector_ids_.insert(vector_ids_.end(), vector_ids_to_add.begin(), vector_ids_to_add.end()); + } + else { + ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); + } + + return status; +} + +size_t VectorSource::GetNumVectorsAdded() { + return current_num_vectors_added; +} + +bool VectorSource::AllAdded() { + return (current_num_vectors_added == n_); +} + +IDNumbers VectorSource::GetVectorIds() { + return vector_ids_; +} + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/VectorSource.h b/cpp/src/db/VectorSource.h new file mode 100644 index 0000000000..170f3634cf --- /dev/null +++ b/cpp/src/db/VectorSource.h @@ -0,0 +1,41 @@ +#pragma once + +#include "Status.h" +#include "Meta.h" +#include "IDGenerator.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class VectorSource { + +public: + + using Ptr = std::shared_ptr; + + VectorSource(const size_t& n, const float* vectors); + + Status Add(const meta::TableFileSchema& table_file_schema, const size_t& num_vectors_to_add, size_t& num_vectors_added); + + size_t GetNumVectorsAdded(); + + bool AllAdded(); + + IDNumbers GetVectorIds(); + +private: + + const size_t n_; + const float* vectors_; + IDNumbers vector_ids_; + + size_t current_num_vectors_added; + + IDGenerator* id_generator_; + +}; //VectorSource + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp new file mode 100644 index 0000000000..8418b9cd2d --- /dev/null +++ b/cpp/unittest/db/mem_test.cpp @@ -0,0 +1,187 @@ +#include "gtest/gtest.h" + +#include "db/VectorSource.h" +#include "db/MemTableFile.h" +#include "db/MemTable.h" +#include "utils.h" +#include "db/Factories.h" +#include "db/Constants.h" + +using namespace zilliz::milvus; + +namespace { + + static const std::string TABLE_NAME = "test_group"; + static constexpr int64_t TABLE_DIM = 256; + static constexpr int64_t VECTOR_COUNT = 250000; + static constexpr int64_t INSERT_LOOP = 10000; + + engine::meta::TableSchema BuildTableSchema() { + engine::meta::TableSchema table_info; + table_info.dimension_ = TABLE_DIM; + table_info.table_id_ = TABLE_NAME; + table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; + return table_info; + } + + void BuildVectors(int64_t n, std::vector& vectors) { + vectors.clear(); + vectors.resize(n*TABLE_DIM); + float* data = vectors.data(); + for(int i = 0; i < n; i++) { + for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; + } + } +} + +TEST(MEM_TEST, VECTOR_SOURCE_TEST) { + + std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + + engine::meta::TableSchema table_schema = BuildTableSchema(); + auto status = impl_->CreateTable(table_schema); + ASSERT_TRUE(status.ok()); + + engine::meta::TableFileSchema table_file_schema; + table_file_schema.table_id_ = TABLE_NAME; + status = impl_->CreateTableFile(table_file_schema); + ASSERT_TRUE(status.ok()); + + int64_t n = 100; + std::vector vectors; + BuildVectors(n, vectors); + + engine::VectorSource source(n, vectors.data()); + + size_t num_vectors_added; + status = source.Add(table_file_schema, 50, num_vectors_added); + ASSERT_TRUE(status.ok()); + + ASSERT_EQ(num_vectors_added, 50); + + engine::IDNumbers vector_ids = source.GetVectorIds(); + ASSERT_EQ(vector_ids.size(), 50); + + status = source.Add(table_file_schema, 60, num_vectors_added); + ASSERT_TRUE(status.ok()); + + ASSERT_EQ(num_vectors_added, 50); + + vector_ids = source.GetVectorIds(); + ASSERT_EQ(vector_ids.size(), 100); + +// for (auto& id : vector_ids) { +// std::cout << id << std::endl; +// } + + status = impl_->DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { + + std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + + engine::meta::TableSchema table_schema = BuildTableSchema(); + auto status = impl_->CreateTable(table_schema); + ASSERT_TRUE(status.ok()); + + engine::MemTableFile memTableFile(TABLE_NAME, impl_); + + int64_t n_100 = 100; + std::vector vectors_100; + BuildVectors(n_100, vectors_100); + + engine::VectorSource::Ptr source = std::make_shared(n_100, vectors_100.data()); + + status = memTableFile.Add(source); + ASSERT_TRUE(status.ok()); + +// std::cout << memTableFile.GetCurrentMem() << " " << memTableFile.GetMemLeft() << std::endl; + + engine::IDNumbers vector_ids = source->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), 100); + + size_t singleVectorMem = sizeof(float) * TABLE_DIM; + ASSERT_EQ(memTableFile.GetCurrentMem(), n_100 * singleVectorMem); + + int64_t n_max = engine::MAX_TABLE_FILE_MEM / singleVectorMem; + std::vector vectors_128M; + BuildVectors(n_max, vectors_128M); + + engine::VectorSource::Ptr source_128M = std::make_shared(n_max, vectors_128M.data()); + status = memTableFile.Add(source_128M); + + vector_ids = source_128M->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), n_max - n_100); + + ASSERT_TRUE(memTableFile.isFull()); + + status = impl_->DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST(MEM_TEST, MEM_TABLE_TEST) { + + std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + + engine::meta::TableSchema table_schema = BuildTableSchema(); + auto status = impl_->CreateTable(table_schema); + ASSERT_TRUE(status.ok()); + + int64_t n_100 = 100; + std::vector vectors_100; + BuildVectors(n_100, vectors_100); + + engine::VectorSource::Ptr source_100 = std::make_shared(n_100, vectors_100.data()); + + engine::MemTable memTable(TABLE_NAME, impl_); + + status = memTable.Add(source_100); + ASSERT_TRUE(status.ok()); + + engine::IDNumbers vector_ids = source_100->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), 100); + + engine::MemTableFile::Ptr memTableFile; + memTable.GetCurrentMemTableFile(memTableFile); + size_t singleVectorMem = sizeof(float) * TABLE_DIM; + ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); + + int64_t n_max = engine::MAX_TABLE_FILE_MEM / singleVectorMem; + std::vector vectors_128M; + BuildVectors(n_max, vectors_128M); + + engine::VectorSource::Ptr source_128M = std::make_shared(n_max, vectors_128M.data()); + status = memTable.Add(source_128M); + ASSERT_TRUE(status.ok()); + + vector_ids = source_128M->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), n_max); + + memTable.GetCurrentMemTableFile(memTableFile); + ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); + + ASSERT_EQ(memTable.GetStackSize(), 2); + + int64_t n_1G = 1024000; + std::vector vectors_1G; + BuildVectors(n_1G, vectors_1G); + + engine::VectorSource::Ptr source_1G = std::make_shared(n_1G, vectors_1G.data()); + + status = memTable.Add(source_1G); + ASSERT_TRUE(status.ok()); + + vector_ids = source_1G->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), n_1G); + + int expectedStackSize = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); + ASSERT_EQ(memTable.GetStackSize(), expectedStackSize); + + status = impl_->DropAll(); + ASSERT_TRUE(status.ok()); +} + + From 8f42ef678d577af061f522575b9aa60c844a09f6 Mon Sep 17 00:00:00 2001 From: zhiru Date: Fri, 5 Jul 2019 15:57:49 +0800 Subject: [PATCH 2/6] update Former-commit-id: b5c019432679df7fcdf3aacd0e061ee91ddf9609 --- cpp/src/db/MemTableFile.cpp | 10 ++++++++-- cpp/src/db/MemTableFile.h | 3 +++ cpp/src/db/VectorSource.cpp | 10 +++++----- cpp/src/db/VectorSource.h | 8 +++++++- cpp/unittest/db/mem_test.cpp | 8 ++++++-- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/cpp/src/db/MemTableFile.cpp b/cpp/src/db/MemTableFile.cpp index 26bc0d38e9..58b76ab834 100644 --- a/cpp/src/db/MemTableFile.cpp +++ b/cpp/src/db/MemTableFile.cpp @@ -1,6 +1,7 @@ #include "MemTableFile.h" #include "Constants.h" #include "Log.h" +#include "EngineFactory.h" #include @@ -14,7 +15,12 @@ MemTableFile::MemTableFile(const std::string& table_id, meta_(meta) { current_mem_ = 0; - CreateTableFile(); + auto status = CreateTableFile(); + if (status.ok()) { + execution_engine_ = EngineFactory::Build(table_file_schema_.dimension_, + table_file_schema_.location_, + (EngineType)table_file_schema_.engine_type_); + } } Status MemTableFile::CreateTableFile() { @@ -39,7 +45,7 @@ Status MemTableFile::Add(const VectorSource::Ptr& source) { if (memLeft >= singleVectorMemSize) { size_t numVectorsToAdd = std::ceil(memLeft / singleVectorMemSize); size_t numVectorsAdded; - auto status = source->Add(table_file_schema_, numVectorsToAdd, numVectorsAdded); + auto status = source->Add(execution_engine_, table_file_schema_, numVectorsToAdd, numVectorsAdded); if (status.ok()) { current_mem_ += (numVectorsAdded * singleVectorMemSize); } diff --git a/cpp/src/db/MemTableFile.h b/cpp/src/db/MemTableFile.h index 1efe4c0bfe..04f30178ea 100644 --- a/cpp/src/db/MemTableFile.h +++ b/cpp/src/db/MemTableFile.h @@ -3,6 +3,7 @@ #include "Status.h" #include "Meta.h" #include "VectorSource.h" +#include "ExecutionEngine.h" namespace zilliz { namespace milvus { @@ -37,6 +38,8 @@ private: size_t current_mem_; + ExecutionEnginePtr execution_engine_; + }; //MemTableFile } // namespace engine diff --git a/cpp/src/db/VectorSource.cpp b/cpp/src/db/VectorSource.cpp index dff5423c6f..f7cef994fa 100644 --- a/cpp/src/db/VectorSource.cpp +++ b/cpp/src/db/VectorSource.cpp @@ -16,7 +16,10 @@ VectorSource::VectorSource(const size_t &n, current_num_vectors_added = 0; } -Status VectorSource::Add(const meta::TableFileSchema& table_file_schema, const size_t& num_vectors_to_add, size_t& num_vectors_added) { +Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, + const meta::TableFileSchema& table_file_schema, + const size_t& num_vectors_to_add, + size_t& num_vectors_added) { if (table_file_schema.dimension_ <= 0) { std::string errMsg = "VectorSource::Add: table_file_schema dimension = " + @@ -24,14 +27,11 @@ Status VectorSource::Add(const meta::TableFileSchema& table_file_schema, const s ENGINE_LOG_ERROR << errMsg; return Status::Error(errMsg); } - ExecutionEnginePtr engine = EngineFactory::Build(table_file_schema.dimension_, - table_file_schema.location_, - (EngineType)table_file_schema.engine_type_); num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added; IDNumbers vector_ids_to_add; id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add); - Status status = engine->AddWithIds(num_vectors_added, vectors_ + current_num_vectors_added, vector_ids_to_add.data()); + Status status = execution_engine->AddWithIds(num_vectors_added, vectors_ + current_num_vectors_added, vector_ids_to_add.data()); if (status.ok()) { current_num_vectors_added += num_vectors_added; vector_ids_.insert(vector_ids_.end(), vector_ids_to_add.begin(), vector_ids_to_add.end()); diff --git a/cpp/src/db/VectorSource.h b/cpp/src/db/VectorSource.h index 170f3634cf..597eee4ad8 100644 --- a/cpp/src/db/VectorSource.h +++ b/cpp/src/db/VectorSource.h @@ -3,6 +3,7 @@ #include "Status.h" #include "Meta.h" #include "IDGenerator.h" +#include "ExecutionEngine.h" namespace zilliz { namespace milvus { @@ -16,7 +17,10 @@ public: VectorSource(const size_t& n, const float* vectors); - Status Add(const meta::TableFileSchema& table_file_schema, const size_t& num_vectors_to_add, size_t& num_vectors_added); + Status Add(const ExecutionEnginePtr& execution_engine, + const meta::TableFileSchema& table_file_schema, + const size_t& num_vectors_to_add, + size_t& num_vectors_added); size_t GetNumVectorsAdded(); @@ -24,6 +28,8 @@ public: IDNumbers GetVectorIds(); +// Status Serialize(); + private: const size_t n_; diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp index 8418b9cd2d..111914f8a9 100644 --- a/cpp/unittest/db/mem_test.cpp +++ b/cpp/unittest/db/mem_test.cpp @@ -6,6 +6,7 @@ #include "utils.h" #include "db/Factories.h" #include "db/Constants.h" +#include "db/EngineFactory.h" using namespace zilliz::milvus; @@ -55,7 +56,10 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) { engine::VectorSource source(n, vectors.data()); size_t num_vectors_added; - status = source.Add(table_file_schema, 50, num_vectors_added); + engine::ExecutionEnginePtr execution_engine_ = engine::EngineFactory::Build(table_file_schema.dimension_, + table_file_schema.location_, + (engine::EngineType)table_file_schema.engine_type_); + status = source.Add(execution_engine_, table_file_schema, 50, num_vectors_added); ASSERT_TRUE(status.ok()); ASSERT_EQ(num_vectors_added, 50); @@ -63,7 +67,7 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) { engine::IDNumbers vector_ids = source.GetVectorIds(); ASSERT_EQ(vector_ids.size(), 50); - status = source.Add(table_file_schema, 60, num_vectors_added); + status = source.Add(execution_engine_, table_file_schema, 60, num_vectors_added); ASSERT_TRUE(status.ok()); ASSERT_EQ(num_vectors_added, 50); From 9f38b96eddf222c57bb4b1eb6b23edf7d6b16735 Mon Sep 17 00:00:00 2001 From: zhiru Date: Fri, 5 Jul 2019 16:46:15 +0800 Subject: [PATCH 3/6] Implemented add and serialize Former-commit-id: 25fbbc2185efc4b45ea8f4693fea0ba0001d267e --- cpp/src/db/MemTable.cpp | 32 +++++++++++++++++++-------- cpp/src/db/MemTable.h | 10 ++++++--- cpp/src/db/MemTableFile.cpp | 42 +++++++++++++++++++++++++++++++++--- cpp/src/db/MemTableFile.h | 8 +++++-- cpp/src/db/VectorSource.cpp | 12 +++++------ cpp/src/db/VectorSource.h | 2 -- cpp/unittest/db/mem_test.cpp | 11 +++++++--- 7 files changed, 89 insertions(+), 28 deletions(-) diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp index 032d479999..86554695c8 100644 --- a/cpp/src/db/MemTable.cpp +++ b/cpp/src/db/MemTable.cpp @@ -6,24 +6,26 @@ namespace milvus { namespace engine { MemTable::MemTable(const std::string& table_id, - const std::shared_ptr& meta) : + const std::shared_ptr& meta, + const Options& options) : table_id_(table_id), - meta_(meta) { + meta_(meta), + options_(options) { } Status MemTable::Add(VectorSource::Ptr& source) { while (!source->AllAdded()) { MemTableFile::Ptr currentMemTableFile; - if (!mem_table_file_stack_.empty()) { - currentMemTableFile = mem_table_file_stack_.top(); + if (!mem_table_file_list_.empty()) { + currentMemTableFile = mem_table_file_list_.back(); } Status status; - if (mem_table_file_stack_.empty() || currentMemTableFile->isFull()) { - MemTableFile::Ptr newMemTableFile = std::make_shared(table_id_, meta_); + if (mem_table_file_list_.empty() || currentMemTableFile->IsFull()) { + MemTableFile::Ptr newMemTableFile = std::make_shared(table_id_, meta_, options_); status = newMemTableFile->Add(source); if (status.ok()) { - mem_table_file_stack_.push(newMemTableFile); + mem_table_file_list_.emplace_back(newMemTableFile); } } else { @@ -39,11 +41,23 @@ Status MemTable::Add(VectorSource::Ptr& source) { } void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) { - mem_table_file = mem_table_file_stack_.top(); + mem_table_file = mem_table_file_list_.back(); } size_t MemTable::GetStackSize() { - return mem_table_file_stack_.size(); + return mem_table_file_list_.size(); +} + +Status MemTable::Serialize() { + for (auto& memTableFile : mem_table_file_list_) { + auto status = memTableFile->Serialize(); + if (!status.ok()) { + std::string errMsg = "MemTable::Serialize failed: " + status.ToString(); + ENGINE_LOG_ERROR << errMsg; + return Status::Error(errMsg); + } + } + return Status::OK(); } } // namespace engine diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h index b9fe4147d8..d5c7cc9e85 100644 --- a/cpp/src/db/MemTable.h +++ b/cpp/src/db/MemTable.h @@ -15,10 +15,10 @@ class MemTable { public: using Ptr = std::shared_ptr; - using MemTableFileStack = std::stack; + using MemTableFileList = std::vector; using MetaPtr = meta::Meta::Ptr; - MemTable(const std::string& table_id, const std::shared_ptr& meta); + MemTable(const std::string& table_id, const std::shared_ptr& meta, const Options& options); Status Add(VectorSource::Ptr& source); @@ -26,13 +26,17 @@ public: size_t GetStackSize(); + Status Serialize(); + private: const std::string table_id_; - MemTableFileStack mem_table_file_stack_; + MemTableFileList mem_table_file_list_; MetaPtr meta_; + Options options_; + }; //MemTable } // namespace engine diff --git a/cpp/src/db/MemTableFile.cpp b/cpp/src/db/MemTableFile.cpp index 58b76ab834..0ff91de00b 100644 --- a/cpp/src/db/MemTableFile.cpp +++ b/cpp/src/db/MemTableFile.cpp @@ -2,6 +2,7 @@ #include "Constants.h" #include "Log.h" #include "EngineFactory.h" +#include "metrics/Metrics.h" #include @@ -10,9 +11,11 @@ namespace milvus { namespace engine { MemTableFile::MemTableFile(const std::string& table_id, - const std::shared_ptr& meta) : + const std::shared_ptr& meta, + const Options& options) : table_id_(table_id), - meta_(meta) { + meta_(meta), + options_(options) { current_mem_ = 0; auto status = CreateTableFile(); @@ -40,6 +43,13 @@ Status MemTableFile::CreateTableFile() { Status MemTableFile::Add(const VectorSource::Ptr& source) { + if (table_file_schema_.dimension_ <= 0) { + std::string errMsg = "MemTableFile::Add: table_file_schema dimension = " + + std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_; + ENGINE_LOG_ERROR << errMsg; + return Status::Error(errMsg); + } + size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; size_t memLeft = GetMemLeft(); if (memLeft >= singleVectorMemSize) { @@ -62,11 +72,37 @@ size_t MemTableFile::GetMemLeft() { return (MAX_TABLE_FILE_MEM - current_mem_); } -bool MemTableFile::isFull() { +bool MemTableFile::IsFull() { size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; return (GetMemLeft() < singleVectorMemSize); } +Status MemTableFile::Serialize() { + + auto start_time = METRICS_NOW_TIME; + + auto size = GetCurrentMem(); + + execution_engine_->Serialize(); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + table_file_schema_.size_ = size; + + server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double)size/total_time); + + table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ? + meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; + + auto status = meta_->UpdateTableFile(table_file_schema_); + + LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index") + << " file " << table_file_schema_.file_id_ << " of size " << (double)size / (double)M << " M"; + + execution_engine_->Cache(); + + return status; +} + } // namespace engine } // namespace milvus } // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTableFile.h b/cpp/src/db/MemTableFile.h index 04f30178ea..1be0ae78ba 100644 --- a/cpp/src/db/MemTableFile.h +++ b/cpp/src/db/MemTableFile.h @@ -16,7 +16,7 @@ public: using Ptr = std::shared_ptr; using MetaPtr = meta::Meta::Ptr; - MemTableFile(const std::string& table_id, const std::shared_ptr& meta); + MemTableFile(const std::string& table_id, const std::shared_ptr& meta, const Options& options); Status Add(const VectorSource::Ptr& source); @@ -24,7 +24,9 @@ public: size_t GetMemLeft(); - bool isFull(); + bool IsFull(); + + Status Serialize(); private: @@ -36,6 +38,8 @@ private: MetaPtr meta_; + Options options_; + size_t current_mem_; ExecutionEnginePtr execution_engine_; diff --git a/cpp/src/db/VectorSource.cpp b/cpp/src/db/VectorSource.cpp index f7cef994fa..b113b9ad5e 100644 --- a/cpp/src/db/VectorSource.cpp +++ b/cpp/src/db/VectorSource.cpp @@ -2,6 +2,7 @@ #include "ExecutionEngine.h" #include "EngineFactory.h" #include "Log.h" +#include "metrics/Metrics.h" namespace zilliz { namespace milvus { @@ -21,12 +22,7 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, const size_t& num_vectors_to_add, size_t& num_vectors_added) { - if (table_file_schema.dimension_ <= 0) { - std::string errMsg = "VectorSource::Add: table_file_schema dimension = " + - std::to_string(table_file_schema.dimension_) + ", table_id = " + table_file_schema.table_id_; - ENGINE_LOG_ERROR << errMsg; - return Status::Error(errMsg); - } + auto start_time = METRICS_NOW_TIME; num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added; IDNumbers vector_ids_to_add; @@ -40,6 +36,10 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); } + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), static_cast(table_file_schema.dimension_), total_time); + return status; } diff --git a/cpp/src/db/VectorSource.h b/cpp/src/db/VectorSource.h index 597eee4ad8..dec31f39e1 100644 --- a/cpp/src/db/VectorSource.h +++ b/cpp/src/db/VectorSource.h @@ -28,8 +28,6 @@ public: IDNumbers GetVectorIds(); -// Status Serialize(); - private: const size_t n_; diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp index 111914f8a9..f68d1eb8e3 100644 --- a/cpp/unittest/db/mem_test.cpp +++ b/cpp/unittest/db/mem_test.cpp @@ -86,12 +86,13 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) { TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + auto options = engine::OptionsFactory::Build(); engine::meta::TableSchema table_schema = BuildTableSchema(); auto status = impl_->CreateTable(table_schema); ASSERT_TRUE(status.ok()); - engine::MemTableFile memTableFile(TABLE_NAME, impl_); + engine::MemTableFile memTableFile(TABLE_NAME, impl_, options); int64_t n_100 = 100; std::vector vectors_100; @@ -120,7 +121,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { vector_ids = source_128M->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_max - n_100); - ASSERT_TRUE(memTableFile.isFull()); + ASSERT_TRUE(memTableFile.IsFull()); status = impl_->DropAll(); ASSERT_TRUE(status.ok()); @@ -129,6 +130,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { TEST(MEM_TEST, MEM_TABLE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + auto options = engine::OptionsFactory::Build(); engine::meta::TableSchema table_schema = BuildTableSchema(); auto status = impl_->CreateTable(table_schema); @@ -140,7 +142,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { engine::VectorSource::Ptr source_100 = std::make_shared(n_100, vectors_100.data()); - engine::MemTable memTable(TABLE_NAME, impl_); + engine::MemTable memTable(TABLE_NAME, impl_, options); status = memTable.Add(source_100); ASSERT_TRUE(status.ok()); @@ -184,6 +186,9 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { int expectedStackSize = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); ASSERT_EQ(memTable.GetStackSize(), expectedStackSize); + status = memTable.Serialize(); + ASSERT_TRUE(status.ok()); + status = impl_->DropAll(); ASSERT_TRUE(status.ok()); } From 6a6722a71c3544856a933d432ac9197a90c7e60f Mon Sep 17 00:00:00 2001 From: zhiru Date: Sun, 7 Jul 2019 13:50:39 +0800 Subject: [PATCH 4/6] add mem manager Former-commit-id: c9d77a1d0e9df6679c90fddefee22123cfb0acac --- cpp/src/db/DBImpl.cpp | 3 +- cpp/src/db/DBImpl.h | 4 +- cpp/src/db/Factories.cpp | 11 +++ cpp/src/db/Factories.h | 5 ++ cpp/src/db/MemManager.h | 14 ++-- cpp/src/db/MemManagerAbstract.h | 25 ++++++ cpp/src/db/MemTable.cpp | 10 ++- cpp/src/db/MemTable.h | 6 +- cpp/src/db/NewMemManager.cpp | 92 +++++++++++++++++++++ cpp/src/db/NewMemManager.h | 54 +++++++++++++ cpp/src/db/VectorSource.cpp | 15 +++- cpp/unittest/db/mem_test.cpp | 137 +++++++++++++++++++++++++++++++- 12 files changed, 356 insertions(+), 20 deletions(-) create mode 100644 cpp/src/db/MemManagerAbstract.h create mode 100644 cpp/src/db/NewMemManager.cpp create mode 100644 cpp/src/db/NewMemManager.h diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 0a1e8651e1..09a7c72201 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -87,8 +87,7 @@ DBImpl::DBImpl(const Options& options) compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode); - mem_mgr_ = std::make_shared(meta_ptr_, options_); - // mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_)); + mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); if (options.mode != Options::MODE::READ_ONLY) { StartTimerTasks(); } diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 9dcd174f8b..5601f1a33b 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -9,6 +9,7 @@ #include "MemManager.h" #include "Types.h" #include "utils/ThreadPool.h" +#include "MemManagerAbstract.h" #include #include @@ -33,7 +34,6 @@ class Meta; class DBImpl : public DB { public: using MetaPtr = meta::Meta::Ptr; - using MemManagerPtr = typename MemManager::Ptr; explicit DBImpl(const Options &options); @@ -123,7 +123,7 @@ class DBImpl : public DB { std::thread bg_timer_thread_; MetaPtr meta_ptr_; - MemManagerPtr mem_mgr_; + MemManagerAbstractPtr mem_mgr_; server::ThreadPool compact_thread_pool_; std::list> compact_thread_results_; diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index 4b24bd3a1c..d51727cbff 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -6,6 +6,8 @@ #include #include "Factories.h" #include "DBImpl.h" +#include "MemManager.h" +#include "NewMemManager.h" #include #include @@ -98,6 +100,15 @@ DB* DBFactory::Build(const Options& options) { return new DBImpl(options); } +MemManagerAbstractPtr MemManagerFactory::Build(const std::shared_ptr& meta, + const Options& options) { + bool useNew = true; + if (useNew) { + return std::make_shared(meta, options); + } + return std::make_shared(meta, options); +} + } // namespace engine } // namespace milvus } // namespace zilliz diff --git a/cpp/src/db/Factories.h b/cpp/src/db/Factories.h index 889922b17a..567bc0a8bc 100644 --- a/cpp/src/db/Factories.h +++ b/cpp/src/db/Factories.h @@ -10,6 +10,7 @@ #include "MySQLMetaImpl.h" #include "Options.h" #include "ExecutionEngine.h" +#include "MemManagerAbstract.h" #include #include @@ -36,6 +37,10 @@ struct DBFactory { static DB* Build(const Options&); }; +struct MemManagerFactory { + static MemManagerAbstractPtr Build(const std::shared_ptr& meta, const Options& options); +}; + } // namespace engine } // namespace milvus } // namespace zilliz diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index 0ce88d504d..95303889db 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -9,13 +9,13 @@ #include "IDGenerator.h" #include "Status.h" #include "Meta.h" +#include "MemManagerAbstract.h" #include #include #include #include #include -#include namespace zilliz { namespace milvus { @@ -62,7 +62,7 @@ private: -class MemManager { +class MemManager : public MemManagerAbstract { public: using MetaPtr = meta::Meta::Ptr; using MemVectorsPtr = typename MemVectors::Ptr; @@ -71,16 +71,16 @@ public: MemManager(const std::shared_ptr& meta, const Options& options) : meta_(meta), options_(options) {} - MemVectorsPtr GetMemByTable(const std::string& table_id); - Status InsertVectors(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids); + size_t n, const float* vectors, IDNumbers& vector_ids) override; - Status Serialize(std::set& table_ids); + Status Serialize(std::set& table_ids) override; - Status EraseMemVector(const std::string& table_id); + Status EraseMemVector(const std::string& table_id) override; private: + MemVectorsPtr GetMemByTable(const std::string& table_id); + Status InsertVectorsNoLock(const std::string& table_id, size_t n, const float* vectors, IDNumbers& vector_ids); Status ToImmutable(); diff --git a/cpp/src/db/MemManagerAbstract.h b/cpp/src/db/MemManagerAbstract.h new file mode 100644 index 0000000000..74222df1e8 --- /dev/null +++ b/cpp/src/db/MemManagerAbstract.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +class MemManagerAbstract { +public: + + virtual Status InsertVectors(const std::string& table_id, + size_t n, const float* vectors, IDNumbers& vector_ids) = 0; + + virtual Status Serialize(std::set& table_ids) = 0; + + virtual Status EraseMemVector(const std::string& table_id) = 0; + +}; // MemManagerAbstract + +using MemManagerAbstractPtr = std::shared_ptr; + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp index 86554695c8..b282ad375a 100644 --- a/cpp/src/db/MemTable.cpp +++ b/cpp/src/db/MemTable.cpp @@ -44,7 +44,7 @@ void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) { mem_table_file = mem_table_file_list_.back(); } -size_t MemTable::GetStackSize() { +size_t MemTable::GetTableFileCount() { return mem_table_file_list_.size(); } @@ -60,6 +60,14 @@ Status MemTable::Serialize() { return Status::OK(); } +bool MemTable::Empty() { + return mem_table_file_list_.empty(); +} + +std::string MemTable::GetTableId() { + return table_id_; +} + } // namespace engine } // namespace milvus } // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h index d5c7cc9e85..e09d6ddac1 100644 --- a/cpp/src/db/MemTable.h +++ b/cpp/src/db/MemTable.h @@ -24,10 +24,14 @@ public: void GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file); - size_t GetStackSize(); + size_t GetTableFileCount(); Status Serialize(); + bool Empty(); + + std::string GetTableId(); + private: const std::string table_id_; diff --git a/cpp/src/db/NewMemManager.cpp b/cpp/src/db/NewMemManager.cpp new file mode 100644 index 0000000000..19aba68eb7 --- /dev/null +++ b/cpp/src/db/NewMemManager.cpp @@ -0,0 +1,92 @@ +#include "NewMemManager.h" +#include "VectorSource.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string& table_id) { + auto memIt = mem_id_map_.find(table_id); + if (memIt != mem_id_map_.end()) { + return memIt->second; + } + + mem_id_map_[table_id] = std::make_shared(table_id, meta_, options_); + return mem_id_map_[table_id]; +} + +Status NewMemManager::InsertVectors(const std::string& table_id_, + size_t n_, + const float* vectors_, + IDNumbers& vector_ids_) { + + + std::unique_lock lock(mutex_); + + return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_); +} + +Status NewMemManager::InsertVectorsNoLock(const std::string& table_id, + size_t n, + const float* vectors, + IDNumbers& vector_ids) { + MemTablePtr mem = GetMemByTable(table_id); + VectorSource::Ptr source = std::make_shared(n, vectors); + + auto status = mem->Add(source); + if (status.ok()) { + vector_ids = source->GetVectorIds(); + } + return status; +} + +Status NewMemManager::ToImmutable() { + std::unique_lock lock(mutex_); + MemIdMap temp_map; + for (auto& kv: mem_id_map_) { + if(kv.second->Empty()) { + temp_map.insert(kv); + continue;//empty table, no need to serialize + } + immu_mem_list_.push_back(kv.second); + } + + mem_id_map_.swap(temp_map); + return Status::OK(); +} + +Status NewMemManager::Serialize(std::set& table_ids) { + ToImmutable(); + std::unique_lock lock(serialization_mtx_); + table_ids.clear(); + for (auto& mem : immu_mem_list_) { + mem->Serialize(); + table_ids.insert(mem->GetTableId()); + } + immu_mem_list_.clear(); + return Status::OK(); +} + +Status NewMemManager::EraseMemVector(const std::string& table_id) { + {//erase MemVector from rapid-insert cache + std::unique_lock lock(mutex_); + mem_id_map_.erase(table_id); + } + + {//erase MemVector from serialize cache + std::unique_lock lock(serialization_mtx_); + MemList temp_list; + for (auto& mem : immu_mem_list_) { + if(mem->GetTableId() != table_id) { + temp_list.push_back(mem); + } + } + immu_mem_list_.swap(temp_list); + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/NewMemManager.h b/cpp/src/db/NewMemManager.h new file mode 100644 index 0000000000..a5f5a9ca13 --- /dev/null +++ b/cpp/src/db/NewMemManager.h @@ -0,0 +1,54 @@ +#pragma once + +#include "Meta.h" +#include "MemTable.h" +#include "Status.h" +#include "MemManagerAbstract.h" + +#include +#include +#include +#include +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +class NewMemManager : public MemManagerAbstract { +public: + using MetaPtr = meta::Meta::Ptr; + using Ptr = std::shared_ptr; + using MemTablePtr = typename MemTable::Ptr; + + NewMemManager(const std::shared_ptr& meta, const Options& options) + : meta_(meta), options_(options) {} + + Status InsertVectors(const std::string& table_id, + size_t n, const float* vectors, IDNumbers& vector_ids) override; + + Status Serialize(std::set& table_ids) override; + + Status EraseMemVector(const std::string& table_id) override; + +private: + MemTablePtr GetMemByTable(const std::string& table_id); + + Status InsertVectorsNoLock(const std::string& table_id, + size_t n, const float* vectors, IDNumbers& vector_ids); + Status ToImmutable(); + + using MemIdMap = std::map; + using MemList = std::vector; + MemIdMap mem_id_map_; + MemList immu_mem_list_; + MetaPtr meta_; + Options options_; + std::mutex mutex_; + std::mutex serialization_mtx_; +}; // NewMemManager + + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/VectorSource.cpp b/cpp/src/db/VectorSource.cpp index b113b9ad5e..d032be51f6 100644 --- a/cpp/src/db/VectorSource.cpp +++ b/cpp/src/db/VectorSource.cpp @@ -24,13 +24,18 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, auto start_time = METRICS_NOW_TIME; - num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added; + num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? + num_vectors_to_add : n_ - current_num_vectors_added; IDNumbers vector_ids_to_add; id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add); - Status status = execution_engine->AddWithIds(num_vectors_added, vectors_ + current_num_vectors_added, vector_ids_to_add.data()); + Status status = execution_engine->AddWithIds(num_vectors_added, + vectors_ + current_num_vectors_added * table_file_schema.dimension_, + vector_ids_to_add.data()); if (status.ok()) { current_num_vectors_added += num_vectors_added; - vector_ids_.insert(vector_ids_.end(), vector_ids_to_add.begin(), vector_ids_to_add.end()); + vector_ids_.insert(vector_ids_.end(), + std::make_move_iterator(vector_ids_to_add.begin()), + std::make_move_iterator(vector_ids_to_add.end())); } else { ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); @@ -38,7 +43,9 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), static_cast(table_file_schema.dimension_), total_time); + server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), + static_cast(table_file_schema.dimension_), + total_time); return status; } diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp index f68d1eb8e3..915610adcc 100644 --- a/cpp/unittest/db/mem_test.cpp +++ b/cpp/unittest/db/mem_test.cpp @@ -7,6 +7,11 @@ #include "db/Factories.h" #include "db/Constants.h" #include "db/EngineFactory.h" +#include "metrics/Metrics.h" + +#include +#include +#include using namespace zilliz::milvus; @@ -29,6 +34,9 @@ namespace { vectors.clear(); vectors.resize(n*TABLE_DIM); float* data = vectors.data(); +// std::random_device rd; +// std::mt19937 gen(rd()); +// std::uniform_real_distribution<> dis(0.0, 1.0); for(int i = 0; i < n; i++) { for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); data[TABLE_DIM * i] += i / 2000.; @@ -169,7 +177,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { memTable.GetCurrentMemTableFile(memTableFile); ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); - ASSERT_EQ(memTable.GetStackSize(), 2); + ASSERT_EQ(memTable.GetTableFileCount(), 2); int64_t n_1G = 1024000; std::vector vectors_1G; @@ -183,8 +191,8 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { vector_ids = source_1G->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_1G); - int expectedStackSize = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); - ASSERT_EQ(memTable.GetStackSize(), expectedStackSize); + int expectedTableFileCount = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); + ASSERT_EQ(memTable.GetTableFileCount(), expectedTableFileCount); status = memTable.Serialize(); ASSERT_TRUE(status.ok()); @@ -193,4 +201,127 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { ASSERT_TRUE(status.ok()); } +TEST(MEM_TEST, MEM_MANAGER_TEST) { + + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + std::map> search_vectors; +// std::map> vectors_ids_map; + { + engine::IDNumbers vector_ids; + int64_t nb = 1024000; + std::vector xb; + BuildVectors(nb, xb); + engine::Status status = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + ASSERT_TRUE(status.ok()); + +// std::ofstream myfile("mem_test.txt"); +// for (int64_t i = 0; i < nb; ++i) { +// int64_t vector_id = vector_ids[i]; +// std::vector vectors; +// for (int64_t j = 0; j < TABLE_DIM; j++) { +// vectors.emplace_back(xb[i*TABLE_DIM + j]); +//// std::cout << xb[i*TABLE_DIM + j] << std::endl; +// } +// vectors_ids_map[vector_id] = vectors; +// } + + std::this_thread::sleep_for(std::chrono::seconds(3)); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dis(0, nb - 1); + + int64_t numQuery = 1000; + for (int64_t i = 0; i < numQuery; ++i) { + int64_t index = dis(gen); + std::vector search; + for (int64_t j = 0; j < TABLE_DIM; j++) { + search.push_back(xb[index * TABLE_DIM + j]); + } + search_vectors.insert(std::make_pair(vector_ids[index], search)); +// std::cout << "index: " << index << " vector_ids[index]: " << vector_ids[index] << std::endl; + } + +// for (int64_t i = 0; i < nb; i += 100000) { +// std::vector search; +// for (int64_t j = 0; j < TABLE_DIM; j++) { +// search.push_back(xb[i * TABLE_DIM + j]); +// } +// search_vectors.insert(std::make_pair(vector_ids[i], search)); +// } + + } + + int k = 10; + for(auto& pair : search_vectors) { + auto& search = pair.second; + engine::QueryResults results; + stat = db_->Query(TABLE_NAME, k, 1, search.data(), results); + for(int t = 0; t < k; t++) { +// std::cout << "ID=" << results[0][t].first << " DISTANCE=" << results[0][t].second << std::endl; + +// std::cout << vectors_ids_map[results[0][t].first].size() << std::endl; +// for (auto& data : vectors_ids_map[results[0][t].first]) { +// std::cout << data << " "; +// } +// std::cout << std::endl; + } + // std::cout << "results[0][0].first: " << results[0][0].first << " pair.first: " << pair.first << " results[0][0].second: " << results[0][0].second << std::endl; + ASSERT_EQ(results[0][0].first, pair.first); + ASSERT_LT(results[0][0].second, 0.00001); + } + + stat = db_->DropAll(); + ASSERT_TRUE(stat.ok()); + +} + +TEST(MEM_TEST, INSERT_TEST) { + + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + auto start_time = METRICS_NOW_TIME; + + int insert_loop = 1000; + for (int i = 0; i < insert_loop; ++i) { + int64_t nb = 204800; + std::vector xb; + BuildVectors(nb, xb); + engine::IDNumbers vector_ids; + engine::Status status = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + ASSERT_TRUE(status.ok()); + } + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + std::cout << "total_time(ms) : " << total_time << std::endl; + + stat = db_->DropAll(); + ASSERT_TRUE(stat.ok()); + +} From a951dd14c0e8fb330870dbe7257315b8829f5c9b Mon Sep 17 00:00:00 2001 From: zhiru Date: Mon, 8 Jul 2019 11:14:28 +0800 Subject: [PATCH 5/6] Add new mem manager Former-commit-id: abab1d1c2cf67f49a4d9dcf2304df1abed675dda --- cpp/CHANGELOG.md | 1 + cpp/conf/server_config.template | 4 +- cpp/src/db/Constants.h | 3 + cpp/src/db/MemManager.cpp | 25 ++++++ cpp/src/db/MemManager.h | 6 ++ cpp/src/db/MemManagerAbstract.h | 6 ++ cpp/src/db/MemTable.cpp | 17 +++- cpp/src/db/MemTable.h | 8 +- cpp/src/db/NewMemManager.cpp | 38 +++++++++ cpp/src/db/NewMemManager.h | 6 ++ cpp/src/db/Options.h | 1 + cpp/src/server/DBWrapper.cpp | 8 ++ cpp/src/server/ServerConfig.h | 1 + cpp/unittest/db/mem_test.cpp | 144 +++++++++++++++++++++----------- cpp/unittest/db/utils.cpp | 12 +++ cpp/unittest/db/utils.h | 5 ++ 16 files changed, 231 insertions(+), 54 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 0f4e480123..fd27d05b9c 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -18,6 +18,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-152 - Delete assert in MySQLMetaImpl and change MySQLConnectionPool impl ## New Feature +- MS-180 - Add new mem manager ## Task diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 0383e00b53..f0cd6d5e52 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -2,7 +2,7 @@ server_config: address: 0.0.0.0 port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534 gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1 - mode: single # milvus deployment type: single, cluster + mode: single # milvus deployment type: single, cluster, read_only db_config: db_path: @MILVUS_DB_PATH@ # milvus data storage path @@ -15,6 +15,8 @@ db_config: index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB archive_days_threshold: 30 # files older than x days will be archived, unit: day + maximum_memory: 4 # maximum memory allowed, default: 4, unit: GB, should be at least 1 GB. + # the sum of maximum_memory and cpu_cache_capacity should be less than total memory metric_config: is_startup: off # if monitoring start: on, off diff --git a/cpp/src/db/Constants.h b/cpp/src/db/Constants.h index 2bb2e0a064..1ba02b1d55 100644 --- a/cpp/src/db/Constants.h +++ b/cpp/src/db/Constants.h @@ -11,6 +11,9 @@ namespace engine { const size_t K = 1024UL; const size_t M = K*K; +const size_t G = K*M; +const size_t T = K*G; + const size_t MAX_TABLE_FILE_MEM = 128 * M; const int VECTOR_TYPE_SIZE = sizeof(float); diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.cpp index e36b0c45ba..ba8517cdbd 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.cpp @@ -8,6 +8,7 @@ #include "MetaConsts.h" #include "EngineFactory.h" #include "metrics/Metrics.h" +#include "Log.h" #include #include @@ -128,6 +129,10 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id, size_t n, const float* vectors, IDNumbers& vector_ids) { + + LOG(DEBUG) << "MemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() << + ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + MemVectorsPtr mem = GetMemByTable(table_id); if (mem == nullptr) { return Status::NotFound("Group " + table_id + " not found!"); @@ -192,6 +197,26 @@ Status MemManager::EraseMemVector(const std::string& table_id) { return Status::OK(); } +size_t MemManager::GetCurrentMutableMem() { + size_t totalMem = 0; + for (auto& kv : mem_id_map_) { + auto memVector = kv.second; + totalMem += memVector->Size(); + } + return totalMem; +} + +size_t MemManager::GetCurrentImmutableMem() { + size_t totalMem = 0; + for (auto& memVector : immu_mem_list_) { + totalMem += memVector->Size(); + } + return totalMem; +} + +size_t MemManager::GetCurrentMem() { + return GetCurrentMutableMem() + GetCurrentImmutableMem(); +} } // namespace engine } // namespace milvus diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index 95303889db..e8460c7a6d 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -78,6 +78,12 @@ public: Status EraseMemVector(const std::string& table_id) override; + size_t GetCurrentMutableMem() override; + + size_t GetCurrentImmutableMem() override; + + size_t GetCurrentMem() override; + private: MemVectorsPtr GetMemByTable(const std::string& table_id); diff --git a/cpp/src/db/MemManagerAbstract.h b/cpp/src/db/MemManagerAbstract.h index 74222df1e8..58c73ba6f8 100644 --- a/cpp/src/db/MemManagerAbstract.h +++ b/cpp/src/db/MemManagerAbstract.h @@ -16,6 +16,12 @@ public: virtual Status EraseMemVector(const std::string& table_id) = 0; + virtual size_t GetCurrentMutableMem() = 0; + + virtual size_t GetCurrentImmutableMem() = 0; + + virtual size_t GetCurrentMem() = 0; + }; // MemManagerAbstract using MemManagerAbstractPtr = std::shared_ptr; diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp index b282ad375a..ba3875fbb5 100644 --- a/cpp/src/db/MemTable.cpp +++ b/cpp/src/db/MemTable.cpp @@ -49,13 +49,15 @@ size_t MemTable::GetTableFileCount() { } Status MemTable::Serialize() { - for (auto& memTableFile : mem_table_file_list_) { - auto status = memTableFile->Serialize(); + for (auto memTableFile = mem_table_file_list_.begin(); memTableFile != mem_table_file_list_.end(); ) { + auto status = (*memTableFile)->Serialize(); if (!status.ok()) { std::string errMsg = "MemTable::Serialize failed: " + status.ToString(); ENGINE_LOG_ERROR << errMsg; return Status::Error(errMsg); } + std::lock_guard lock(mutex_); + memTableFile = mem_table_file_list_.erase(memTableFile); } return Status::OK(); } @@ -64,10 +66,19 @@ bool MemTable::Empty() { return mem_table_file_list_.empty(); } -std::string MemTable::GetTableId() { +const std::string& MemTable::GetTableId() const { return table_id_; } +size_t MemTable::GetCurrentMem() { + std::lock_guard lock(mutex_); + size_t totalMem = 0; + for (auto& memTableFile : mem_table_file_list_) { + totalMem += memTableFile->GetCurrentMem(); + } + return totalMem; +} + } // namespace engine } // namespace milvus } // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h index e09d6ddac1..9bae932e62 100644 --- a/cpp/src/db/MemTable.h +++ b/cpp/src/db/MemTable.h @@ -4,7 +4,7 @@ #include "MemTableFile.h" #include "VectorSource.h" -#include +#include namespace zilliz { namespace milvus { @@ -30,7 +30,9 @@ public: bool Empty(); - std::string GetTableId(); + const std::string& GetTableId() const; + + size_t GetCurrentMem(); private: const std::string table_id_; @@ -41,6 +43,8 @@ private: Options options_; + std::mutex mutex_; + }; //MemTable } // namespace engine diff --git a/cpp/src/db/NewMemManager.cpp b/cpp/src/db/NewMemManager.cpp index 19aba68eb7..3c78f37101 100644 --- a/cpp/src/db/NewMemManager.cpp +++ b/cpp/src/db/NewMemManager.cpp @@ -1,5 +1,9 @@ #include "NewMemManager.h" #include "VectorSource.h" +#include "Log.h" +#include "Constants.h" + +#include namespace zilliz { namespace milvus { @@ -20,6 +24,9 @@ Status NewMemManager::InsertVectors(const std::string& table_id_, const float* vectors_, IDNumbers& vector_ids_) { + while (GetCurrentMem() > options_.maximum_memory) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } std::unique_lock lock(mutex_); @@ -30,6 +37,10 @@ Status NewMemManager::InsertVectorsNoLock(const std::string& table_id, size_t n, const float* vectors, IDNumbers& vector_ids) { + + LOG(DEBUG) << "NewMemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() << + ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + MemTablePtr mem = GetMemByTable(table_id); VectorSource::Ptr source = std::make_shared(n, vectors); @@ -64,6 +75,12 @@ Status NewMemManager::Serialize(std::set& table_ids) { table_ids.insert(mem->GetTableId()); } immu_mem_list_.clear(); +// for (auto mem = immu_mem_list_.begin(); mem != immu_mem_list_.end(); ) { +// (*mem)->Serialize(); +// table_ids.insert((*mem)->GetTableId()); +// mem = immu_mem_list_.erase(mem); +// LOG(DEBUG) << "immu_mem_list_ size = " << immu_mem_list_.size(); +// } return Status::OK(); } @@ -87,6 +104,27 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) { return Status::OK(); } +size_t NewMemManager::GetCurrentMutableMem() { + size_t totalMem = 0; + for (auto& kv : mem_id_map_) { + auto memTable = kv.second; + totalMem += memTable->GetCurrentMem(); + } + return totalMem; +} + +size_t NewMemManager::GetCurrentImmutableMem() { + size_t totalMem = 0; + for (auto& memTable : immu_mem_list_) { + totalMem += memTable->GetCurrentMem(); + } + return totalMem; +} + +size_t NewMemManager::GetCurrentMem() { + return GetCurrentMutableMem() + GetCurrentImmutableMem(); +} + } // namespace engine } // namespace milvus } // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/NewMemManager.h b/cpp/src/db/NewMemManager.h index a5f5a9ca13..9883480404 100644 --- a/cpp/src/db/NewMemManager.h +++ b/cpp/src/db/NewMemManager.h @@ -31,6 +31,12 @@ public: Status EraseMemVector(const std::string& table_id) override; + size_t GetCurrentMutableMem() override; + + size_t GetCurrentImmutableMem() override; + + size_t GetCurrentMem() override; + private: MemTablePtr GetMemByTable(const std::string& table_id); diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index 39d0a15019..47bbb45bbc 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -61,6 +61,7 @@ struct Options { size_t index_trigger_size = ONE_GB; //unit: byte DBMetaOptions meta; int mode = MODE::SINGLE; + float maximum_memory = 4 * ONE_GB; }; // Options diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index fca15cb65a..bed4440d5e 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -23,6 +23,14 @@ DBWrapper::DBWrapper() { if(index_size > 0) {//ensure larger than zero, unit is MB opt.index_trigger_size = (size_t)index_size * engine::ONE_MB; } + float maximum_memory = config.GetFloatValue(CONFIG_MAXMIMUM_MEMORY); + if (maximum_memory > 1.0) { + opt.maximum_memory = maximum_memory * engine::ONE_GB; + } + else { + std::cout << "ERROR: maximum_memory should be at least 1 GB" << std::endl; + kill(0, SIGUSR1); + } ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER); std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single"); diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index 0ec04eed8c..b3b95eb8b6 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -26,6 +26,7 @@ static const std::string CONFIG_DB_PATH = "db_path"; static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold"; static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold"; static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold"; +static const std::string CONFIG_MAXMIMUM_MEMORY = "maximum_memory"; static const std::string CONFIG_LOG = "log_config"; diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp index 915610adcc..818c3a6388 100644 --- a/cpp/unittest/db/mem_test.cpp +++ b/cpp/unittest/db/mem_test.cpp @@ -8,6 +8,8 @@ #include "db/Constants.h" #include "db/EngineFactory.h" #include "metrics/Metrics.h" +#include "db/MetaConsts.h" +#include "boost/filesystem.hpp" #include #include @@ -34,9 +36,6 @@ namespace { vectors.clear(); vectors.resize(n*TABLE_DIM); float* data = vectors.data(); -// std::random_device rd; -// std::mt19937 gen(rd()); -// std::uniform_real_distribution<> dis(0.0, 1.0); for(int i = 0; i < n; i++) { for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); data[TABLE_DIM * i] += i / 2000.; @@ -44,7 +43,7 @@ namespace { } } -TEST(MEM_TEST, VECTOR_SOURCE_TEST) { +TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); @@ -91,7 +90,7 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) { ASSERT_TRUE(status.ok()); } -TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { +TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); auto options = engine::OptionsFactory::Build(); @@ -135,7 +134,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { ASSERT_TRUE(status.ok()); } -TEST(MEM_TEST, MEM_TABLE_TEST) { +TEST_F(NewMemManagerTest, MEM_TABLE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); auto options = engine::OptionsFactory::Build(); @@ -201,7 +200,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { ASSERT_TRUE(status.ok()); } -TEST(MEM_TEST, MEM_MANAGER_TEST) { +TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; @@ -218,7 +217,6 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) { ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); std::map> search_vectors; -// std::map> vectors_ids_map; { engine::IDNumbers vector_ids; int64_t nb = 1024000; @@ -227,24 +225,13 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) { engine::Status status = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); ASSERT_TRUE(status.ok()); -// std::ofstream myfile("mem_test.txt"); -// for (int64_t i = 0; i < nb; ++i) { -// int64_t vector_id = vector_ids[i]; -// std::vector vectors; -// for (int64_t j = 0; j < TABLE_DIM; j++) { -// vectors.emplace_back(xb[i*TABLE_DIM + j]); -//// std::cout << xb[i*TABLE_DIM + j] << std::endl; -// } -// vectors_ids_map[vector_id] = vectors; -// } - std::this_thread::sleep_for(std::chrono::seconds(3)); std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution dis(0, nb - 1); - int64_t numQuery = 1000; + int64_t numQuery = 20; for (int64_t i = 0; i < numQuery; ++i) { int64_t index = dis(gen); std::vector search; @@ -252,17 +239,7 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) { search.push_back(xb[index * TABLE_DIM + j]); } search_vectors.insert(std::make_pair(vector_ids[index], search)); -// std::cout << "index: " << index << " vector_ids[index]: " << vector_ids[index] << std::endl; } - -// for (int64_t i = 0; i < nb; i += 100000) { -// std::vector search; -// for (int64_t j = 0; j < TABLE_DIM; j++) { -// search.push_back(xb[i * TABLE_DIM + j]); -// } -// search_vectors.insert(std::make_pair(vector_ids[i], search)); -// } - } int k = 10; @@ -270,26 +247,16 @@ TEST(MEM_TEST, MEM_MANAGER_TEST) { auto& search = pair.second; engine::QueryResults results; stat = db_->Query(TABLE_NAME, k, 1, search.data(), results); - for(int t = 0; t < k; t++) { -// std::cout << "ID=" << results[0][t].first << " DISTANCE=" << results[0][t].second << std::endl; - -// std::cout << vectors_ids_map[results[0][t].first].size() << std::endl; -// for (auto& data : vectors_ids_map[results[0][t].first]) { -// std::cout << data << " "; -// } -// std::cout << std::endl; - } - // std::cout << "results[0][0].first: " << results[0][0].first << " pair.first: " << pair.first << " results[0][0].second: " << results[0][0].second << std::endl; ASSERT_EQ(results[0][0].first, pair.first); ASSERT_LT(results[0][0].second, 0.00001); } - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); + delete db_; + boost::filesystem::remove_all(options.meta.path); } -TEST(MEM_TEST, INSERT_TEST) { +TEST_F(NewMemManagerTest, INSERT_TEST) { auto options = engine::OptionsFactory::Build(); options.meta.path = "/tmp/milvus_test"; @@ -307,9 +274,9 @@ TEST(MEM_TEST, INSERT_TEST) { auto start_time = METRICS_NOW_TIME; - int insert_loop = 1000; + int insert_loop = 20; for (int i = 0; i < insert_loop; ++i) { - int64_t nb = 204800; + int64_t nb = 409600; std::vector xb; BuildVectors(nb, xb); engine::IDNumbers vector_ids; @@ -318,10 +285,91 @@ TEST(MEM_TEST, INSERT_TEST) { } auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); - std::cout << "total_time(ms) : " << total_time << std::endl; + LOG(DEBUG) << "total_time spent in INSERT_TEST (ms) : " << total_time; - stat = db_->DropAll(); - ASSERT_TRUE(stat.ok()); + delete db_; + boost::filesystem::remove_all(options.meta.path); } +TEST_F(NewMemManagerTest, CONCURRENT_INSERT_SEARCH_TEST) { + + auto options = engine::OptionsFactory::Build(); + options.meta.path = "/tmp/milvus_test"; + options.meta.backend_uri = "sqlite://:@:/"; + auto db_ = engine::DBFactory::Build(options); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + engine::meta::TableSchema table_info_get; + table_info_get.table_id_ = TABLE_NAME; + stat = db_->DescribeTable(table_info_get); + ASSERT_STATS(stat); + ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); + + engine::IDNumbers vector_ids; + engine::IDNumbers target_ids; + + int64_t nb = 409600; + std::vector xb; + BuildVectors(nb, xb); + + int64_t qb = 5; + std::vector qxb; + BuildVectors(qb, qxb); + + std::thread search([&]() { + engine::QueryResults results; + int k = 10; + std::this_thread::sleep_for(std::chrono::seconds(2)); + + INIT_TIMER; + std::stringstream ss; + uint64_t count = 0; + uint64_t prev_count = 0; + + for (auto j=0; j<10; ++j) { + ss.str(""); + db_->Size(count); + prev_count = count; + + START_TIMER; + stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results); + ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; + STOP_TIMER(ss.str()); + + ASSERT_STATS(stat); + for (auto k=0; k= prev_count); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + }); + + int loop = 20; + + for (auto i=0; iInsertVectors(TABLE_NAME, qb, qxb.data(), target_ids); + ASSERT_EQ(target_ids.size(), qb); + } else { + db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + } + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + search.join(); + + delete db_; + boost::filesystem::remove_all(options.meta.path); + +}; + diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index 70c0712549..ae05c59d3b 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -106,6 +106,18 @@ zilliz::milvus::engine::Options MySQLDBTest::GetOptions() { return options; } +void NewMemManagerTest::InitLog() { + el::Configurations defaultConf; + defaultConf.setToDefault(); + defaultConf.set(el::Level::Debug, + el::ConfigurationType::Format, "[%thread-%datetime-%level]: %msg (%fbase:%line)"); + el::Loggers::reconfigureLogger("default", defaultConf); +} + +void NewMemManagerTest::SetUp() { + InitLog(); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); if (argc > 1) { diff --git a/cpp/unittest/db/utils.h b/cpp/unittest/db/utils.h index 361c24b4be..d06500de5c 100644 --- a/cpp/unittest/db/utils.h +++ b/cpp/unittest/db/utils.h @@ -87,3 +87,8 @@ class MySQLDBTest : public ::testing::Test { protected: zilliz::milvus::engine::Options GetOptions(); }; + +class NewMemManagerTest : public ::testing::Test { + void InitLog(); + virtual void SetUp() override; +}; From ed23b7056ff47a8f798534ecd87f09adda0a50e5 Mon Sep 17 00:00:00 2001 From: zhiru Date: Mon, 8 Jul 2019 15:07:03 +0800 Subject: [PATCH 6/6] update Former-commit-id: 6edbbf6f4bca89c568c71d5e4bd0de1be84e6137 --- cpp/src/db/Constants.h | 6 +- cpp/src/db/Factories.cpp | 19 +++--- cpp/src/db/Factories.h | 9 +-- cpp/src/db/MemManager.cpp | 83 ++++++++++++++------------ cpp/src/db/MemManager.h | 43 +++++++------- cpp/src/db/MemManagerAbstract.h | 11 ++-- cpp/src/db/MemTable.cpp | 66 +++++++++++---------- cpp/src/db/MemTable.h | 13 ++-- cpp/src/db/MemTableFile.cpp | 56 +++++++++--------- cpp/src/db/MemTableFile.h | 9 +-- cpp/src/db/NewMemManager.cpp | 63 ++++++++++---------- cpp/src/db/NewMemManager.h | 23 ++++---- cpp/src/db/VectorSource.cpp | 18 +++--- cpp/src/db/VectorSource.h | 19 +++--- cpp/unittest/db/mem_test.cpp | 101 ++++++++++++++++---------------- cpp/unittest/db/utils.h | 20 +++---- 16 files changed, 284 insertions(+), 275 deletions(-) diff --git a/cpp/src/db/Constants.h b/cpp/src/db/Constants.h index 1ba02b1d55..055b10ca9a 100644 --- a/cpp/src/db/Constants.h +++ b/cpp/src/db/Constants.h @@ -10,9 +10,9 @@ namespace milvus { namespace engine { const size_t K = 1024UL; -const size_t M = K*K; -const size_t G = K*M; -const size_t T = K*G; +const size_t M = K * K; +const size_t G = K * M; +const size_t T = K * G; const size_t MAX_TABLE_FILE_MEM = 128 * M; diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index d51727cbff..65c7484a50 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -22,6 +22,8 @@ namespace zilliz { namespace milvus { namespace engine { +#define USE_NEW_MEM_MANAGER 1 + DBMetaOptions DBMetaOptionsFactory::Build(const std::string& path) { auto p = path; if(p == "") { @@ -74,17 +76,14 @@ std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOp if (dialect.find("mysql") != std::string::npos) { ENGINE_LOG_INFO << "Using MySQL"; return std::make_shared(meta::MySQLMetaImpl(metaOptions, mode)); - } - else if (dialect.find("sqlite") != std::string::npos) { + } else if (dialect.find("sqlite") != std::string::npos) { ENGINE_LOG_INFO << "Using SQLite"; return std::make_shared(meta::DBMetaImpl(metaOptions)); - } - else { + } else { ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; throw InvalidArgumentException("URI dialect is not mysql / sqlite"); } - } - else { + } else { ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri; throw InvalidArgumentException("Wrong URI format "); } @@ -102,11 +101,11 @@ DB* DBFactory::Build(const Options& options) { MemManagerAbstractPtr MemManagerFactory::Build(const std::shared_ptr& meta, const Options& options) { - bool useNew = true; - if (useNew) { - return std::make_shared(meta, options); - } +#ifdef USE_NEW_MEM_MANAGER + return std::make_shared(meta, options); +#else return std::make_shared(meta, options); +#endif } } // namespace engine diff --git a/cpp/src/db/Factories.h b/cpp/src/db/Factories.h index 567bc0a8bc..8b6e7b100f 100644 --- a/cpp/src/db/Factories.h +++ b/cpp/src/db/Factories.h @@ -15,12 +15,13 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { struct DBMetaOptionsFactory { - static DBMetaOptions Build(const std::string& path = ""); + static DBMetaOptions Build(const std::string &path = ""); }; struct OptionsFactory { @@ -29,16 +30,16 @@ struct OptionsFactory { struct DBMetaImplFactory { static std::shared_ptr Build(); - static std::shared_ptr Build(const DBMetaOptions& metaOptions, const int& mode); + static std::shared_ptr Build(const DBMetaOptions &metaOptions, const int &mode); }; struct DBFactory { static std::shared_ptr Build(); - static DB* Build(const Options&); + static DB *Build(const Options &); }; struct MemManagerFactory { - static MemManagerAbstractPtr Build(const std::shared_ptr& meta, const Options& options); + static MemManagerAbstractPtr Build(const std::shared_ptr &meta, const Options &options); }; } // namespace engine diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.cpp index ba8517cdbd..dbf0703173 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.cpp @@ -15,22 +15,23 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { -MemVectors::MemVectors(const std::shared_ptr& meta_ptr, - const meta::TableFileSchema& schema, const Options& options) - : meta_(meta_ptr), - options_(options), - schema_(schema), - id_generator_(new SimpleIDGenerator()), - active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) { +MemVectors::MemVectors(const std::shared_ptr &meta_ptr, + const meta::TableFileSchema &schema, const Options &options) + : meta_(meta_ptr), + options_(options), + schema_(schema), + id_generator_(new SimpleIDGenerator()), + active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType) schema_.engine_type_)) { } -Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) { - if(active_engine_ == nullptr) { +Status MemVectors::Add(size_t n_, const float *vectors_, IDNumbers &vector_ids_) { + if (active_engine_ == nullptr) { return Status::Error("index engine is null"); } @@ -39,13 +40,15 @@ Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data()); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), static_cast(schema_.dimension_), total_time); + server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), + static_cast(schema_.dimension_), + total_time); return status; } size_t MemVectors::RowCount() const { - if(active_engine_ == nullptr) { + if (active_engine_ == nullptr) { return 0; } @@ -53,15 +56,15 @@ size_t MemVectors::RowCount() const { } size_t MemVectors::Size() const { - if(active_engine_ == nullptr) { + if (active_engine_ == nullptr) { return 0; } return active_engine_->Size(); } -Status MemVectors::Serialize(std::string& table_id) { - if(active_engine_ == nullptr) { +Status MemVectors::Serialize(std::string &table_id) { + if (active_engine_ == nullptr) { return Status::Error("index engine is null"); } @@ -73,15 +76,16 @@ Status MemVectors::Serialize(std::string& table_id) { auto total_time = METRICS_MICROSECONDS(start_time, end_time); schema_.size_ = size; - server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size/total_time); + server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size / total_time); schema_.file_type_ = (size >= options_.index_trigger_size) ? - meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; + meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; auto status = meta_->UpdateTableFile(schema_); LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index") - << " file " << schema_.file_id_ << " of size " << (double)(active_engine_->Size()) / (double)meta::M << " M"; + << " file " << schema_.file_id_ << " of size " << (double) (active_engine_->Size()) / (double) meta::M + << " M"; active_engine_->Cache(); @@ -99,7 +103,7 @@ MemVectors::~MemVectors() { * MemManager */ MemManager::MemVectorsPtr MemManager::GetMemByTable( - const std::string& table_id) { + const std::string &table_id) { auto memIt = mem_id_map_.find(table_id); if (memIt != mem_id_map_.end()) { return memIt->second; @@ -116,22 +120,23 @@ MemManager::MemVectorsPtr MemManager::GetMemByTable( return mem_id_map_[table_id]; } -Status MemManager::InsertVectors(const std::string& table_id_, - size_t n_, - const float* vectors_, - IDNumbers& vector_ids_) { +Status MemManager::InsertVectors(const std::string &table_id_, + size_t n_, + const float *vectors_, + IDNumbers &vector_ids_) { + + LOG(DEBUG) << "MemManager::InsertVectors: mutable mem = " << GetCurrentMutableMem() << + ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + std::unique_lock lock(mutex_); return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_); } -Status MemManager::InsertVectorsNoLock(const std::string& table_id, - size_t n, - const float* vectors, - IDNumbers& vector_ids) { - - LOG(DEBUG) << "MemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() << - ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); +Status MemManager::InsertVectorsNoLock(const std::string &table_id, + size_t n, + const float *vectors, + IDNumbers &vector_ids) { MemVectorsPtr mem = GetMemByTable(table_id); if (mem == nullptr) { @@ -139,7 +144,7 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id, } //makesure each file size less than index_trigger_size - if(mem->Size() > options_.index_trigger_size) { + if (mem->Size() > options_.index_trigger_size) { std::unique_lock lock(serialization_mtx_); immu_mem_list_.push_back(mem); mem_id_map_.erase(table_id); @@ -152,8 +157,8 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id, Status MemManager::ToImmutable() { std::unique_lock lock(mutex_); MemIdMap temp_map; - for (auto& kv: mem_id_map_) { - if(kv.second->RowCount() == 0) { + for (auto &kv: mem_id_map_) { + if (kv.second->RowCount() == 0) { temp_map.insert(kv); continue;//empty vector, no need to serialize } @@ -164,12 +169,12 @@ Status MemManager::ToImmutable() { return Status::OK(); } -Status MemManager::Serialize(std::set& table_ids) { +Status MemManager::Serialize(std::set &table_ids) { ToImmutable(); std::unique_lock lock(serialization_mtx_); std::string table_id; table_ids.clear(); - for (auto& mem : immu_mem_list_) { + for (auto &mem : immu_mem_list_) { mem->Serialize(table_id); table_ids.insert(table_id); } @@ -177,7 +182,7 @@ Status MemManager::Serialize(std::set& table_ids) { return Status::OK(); } -Status MemManager::EraseMemVector(const std::string& table_id) { +Status MemManager::EraseMemVector(const std::string &table_id) { {//erase MemVector from rapid-insert cache std::unique_lock lock(mutex_); mem_id_map_.erase(table_id); @@ -186,8 +191,8 @@ Status MemManager::EraseMemVector(const std::string& table_id) { {//erase MemVector from serialize cache std::unique_lock lock(serialization_mtx_); MemList temp_list; - for (auto& mem : immu_mem_list_) { - if(mem->TableId() != table_id) { + for (auto &mem : immu_mem_list_) { + if (mem->TableId() != table_id) { temp_list.push_back(mem); } } @@ -199,7 +204,7 @@ Status MemManager::EraseMemVector(const std::string& table_id) { size_t MemManager::GetCurrentMutableMem() { size_t totalMem = 0; - for (auto& kv : mem_id_map_) { + for (auto &kv : mem_id_map_) { auto memVector = kv.second; totalMem += memVector->Size(); } @@ -208,7 +213,7 @@ size_t MemManager::GetCurrentMutableMem() { size_t MemManager::GetCurrentImmutableMem() { size_t totalMem = 0; - for (auto& memVector : immu_mem_list_) { + for (auto &memVector : immu_mem_list_) { totalMem += memVector->Size(); } return totalMem; diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index e8460c7a6d..5ad3d08b63 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -17,45 +17,46 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { namespace meta { - class Meta; +class Meta; } class MemVectors { -public: + public: using MetaPtr = meta::Meta::Ptr; using Ptr = std::shared_ptr; - explicit MemVectors(const std::shared_ptr&, - const meta::TableFileSchema&, const Options&); + explicit MemVectors(const std::shared_ptr &, + const meta::TableFileSchema &, const Options &); - Status Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_); + Status Add(size_t n_, const float *vectors_, IDNumbers &vector_ids_); size_t RowCount() const; size_t Size() const; - Status Serialize(std::string& table_id); + Status Serialize(std::string &table_id); ~MemVectors(); - const std::string& Location() const { return schema_.location_; } + const std::string &Location() const { return schema_.location_; } std::string TableId() const { return schema_.table_id_; } -private: + private: MemVectors() = delete; - MemVectors(const MemVectors&) = delete; - MemVectors& operator=(const MemVectors&) = delete; + MemVectors(const MemVectors &) = delete; + MemVectors &operator=(const MemVectors &) = delete; MetaPtr meta_; Options options_; meta::TableFileSchema schema_; - IDGenerator* id_generator_; + IDGenerator *id_generator_; ExecutionEnginePtr active_engine_; }; // MemVectors @@ -63,20 +64,20 @@ private: class MemManager : public MemManagerAbstract { -public: + public: using MetaPtr = meta::Meta::Ptr; using MemVectorsPtr = typename MemVectors::Ptr; using Ptr = std::shared_ptr; - MemManager(const std::shared_ptr& meta, const Options& options) + MemManager(const std::shared_ptr &meta, const Options &options) : meta_(meta), options_(options) {} - Status InsertVectors(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids) override; + Status InsertVectors(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids) override; - Status Serialize(std::set& table_ids) override; + Status Serialize(std::set &table_ids) override; - Status EraseMemVector(const std::string& table_id) override; + Status EraseMemVector(const std::string &table_id) override; size_t GetCurrentMutableMem() override; @@ -84,11 +85,11 @@ public: size_t GetCurrentMem() override; -private: - MemVectorsPtr GetMemByTable(const std::string& table_id); + private: + MemVectorsPtr GetMemByTable(const std::string &table_id); - Status InsertVectorsNoLock(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids); + Status InsertVectorsNoLock(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids); Status ToImmutable(); using MemIdMap = std::map; diff --git a/cpp/src/db/MemManagerAbstract.h b/cpp/src/db/MemManagerAbstract.h index 58c73ba6f8..943c454e46 100644 --- a/cpp/src/db/MemManagerAbstract.h +++ b/cpp/src/db/MemManagerAbstract.h @@ -2,19 +2,20 @@ #include + namespace zilliz { namespace milvus { namespace engine { class MemManagerAbstract { -public: + public: - virtual Status InsertVectors(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids) = 0; + virtual Status InsertVectors(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids) = 0; - virtual Status Serialize(std::set& table_ids) = 0; + virtual Status Serialize(std::set &table_ids) = 0; - virtual Status EraseMemVector(const std::string& table_id) = 0; + virtual Status EraseMemVector(const std::string &table_id) = 0; virtual size_t GetCurrentMutableMem() = 0; diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp index ba3875fbb5..e05aa058ac 100644 --- a/cpp/src/db/MemTable.cpp +++ b/cpp/src/db/MemTable.cpp @@ -1,46 +1,50 @@ #include "MemTable.h" #include "Log.h" + namespace zilliz { namespace milvus { namespace engine { -MemTable::MemTable(const std::string& table_id, - const std::shared_ptr& meta, - const Options& options) : - table_id_(table_id), - meta_(meta), - options_(options) { +MemTable::MemTable(const std::string &table_id, + const std::shared_ptr &meta, + const Options &options) : + table_id_(table_id), + meta_(meta), + options_(options) { } -Status MemTable::Add(VectorSource::Ptr& source) { +Status MemTable::Add(VectorSource::Ptr &source) { + while (!source->AllAdded()) { - MemTableFile::Ptr currentMemTableFile; + + MemTableFile::Ptr current_mem_table_file; if (!mem_table_file_list_.empty()) { - currentMemTableFile = mem_table_file_list_.back(); + current_mem_table_file = mem_table_file_list_.back(); } + Status status; - if (mem_table_file_list_.empty() || currentMemTableFile->IsFull()) { - MemTableFile::Ptr newMemTableFile = std::make_shared(table_id_, meta_, options_); - status = newMemTableFile->Add(source); + if (mem_table_file_list_.empty() || current_mem_table_file->IsFull()) { + MemTableFile::Ptr new_mem_table_file = std::make_shared(table_id_, meta_, options_); + status = new_mem_table_file->Add(source); if (status.ok()) { - mem_table_file_list_.emplace_back(newMemTableFile); + mem_table_file_list_.emplace_back(new_mem_table_file); } + } else { + status = current_mem_table_file->Add(source); } - else { - status = currentMemTableFile->Add(source); - } + if (!status.ok()) { - std::string errMsg = "MemTable::Add failed: " + status.ToString(); - ENGINE_LOG_ERROR << errMsg; - return Status::Error(errMsg); + std::string err_msg = "MemTable::Add failed: " + status.ToString(); + ENGINE_LOG_ERROR << err_msg; + return Status::Error(err_msg); } } return Status::OK(); } -void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) { +void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr &mem_table_file) { mem_table_file = mem_table_file_list_.back(); } @@ -49,15 +53,15 @@ size_t MemTable::GetTableFileCount() { } Status MemTable::Serialize() { - for (auto memTableFile = mem_table_file_list_.begin(); memTableFile != mem_table_file_list_.end(); ) { - auto status = (*memTableFile)->Serialize(); + for (auto mem_table_file = mem_table_file_list_.begin(); mem_table_file != mem_table_file_list_.end();) { + auto status = (*mem_table_file)->Serialize(); if (!status.ok()) { - std::string errMsg = "MemTable::Serialize failed: " + status.ToString(); - ENGINE_LOG_ERROR << errMsg; - return Status::Error(errMsg); + std::string err_msg = "MemTable::Serialize failed: " + status.ToString(); + ENGINE_LOG_ERROR << err_msg; + return Status::Error(err_msg); } std::lock_guard lock(mutex_); - memTableFile = mem_table_file_list_.erase(memTableFile); + mem_table_file = mem_table_file_list_.erase(mem_table_file); } return Status::OK(); } @@ -66,17 +70,17 @@ bool MemTable::Empty() { return mem_table_file_list_.empty(); } -const std::string& MemTable::GetTableId() const { +const std::string &MemTable::GetTableId() const { return table_id_; } size_t MemTable::GetCurrentMem() { std::lock_guard lock(mutex_); - size_t totalMem = 0; - for (auto& memTableFile : mem_table_file_list_) { - totalMem += memTableFile->GetCurrentMem(); + size_t total_mem = 0; + for (auto &mem_table_file : mem_table_file_list_) { + total_mem += mem_table_file->GetCurrentMem(); } - return totalMem; + return total_mem; } } // namespace engine diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h index 9bae932e62..198fcc228a 100644 --- a/cpp/src/db/MemTable.h +++ b/cpp/src/db/MemTable.h @@ -6,23 +6,24 @@ #include + namespace zilliz { namespace milvus { namespace engine { class MemTable { -public: + public: using Ptr = std::shared_ptr; using MemTableFileList = std::vector; using MetaPtr = meta::Meta::Ptr; - MemTable(const std::string& table_id, const std::shared_ptr& meta, const Options& options); + MemTable(const std::string &table_id, const std::shared_ptr &meta, const Options &options); - Status Add(VectorSource::Ptr& source); + Status Add(VectorSource::Ptr &source); - void GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file); + void GetCurrentMemTableFile(MemTableFile::Ptr &mem_table_file); size_t GetTableFileCount(); @@ -30,11 +31,11 @@ public: bool Empty(); - const std::string& GetTableId() const; + const std::string &GetTableId() const; size_t GetCurrentMem(); -private: + private: const std::string table_id_; MemTableFileList mem_table_file_list_; diff --git a/cpp/src/db/MemTableFile.cpp b/cpp/src/db/MemTableFile.cpp index 0ff91de00b..649a680cf3 100644 --- a/cpp/src/db/MemTableFile.cpp +++ b/cpp/src/db/MemTableFile.cpp @@ -6,23 +6,24 @@ #include + namespace zilliz { namespace milvus { namespace engine { -MemTableFile::MemTableFile(const std::string& table_id, - const std::shared_ptr& meta, - const Options& options) : - table_id_(table_id), - meta_(meta), - options_(options) { +MemTableFile::MemTableFile(const std::string &table_id, + const std::shared_ptr &meta, + const Options &options) : + table_id_(table_id), + meta_(meta), + options_(options) { current_mem_ = 0; auto status = CreateTableFile(); if (status.ok()) { execution_engine_ = EngineFactory::Build(table_file_schema_.dimension_, table_file_schema_.location_, - (EngineType)table_file_schema_.engine_type_); + (EngineType) table_file_schema_.engine_type_); } } @@ -33,31 +34,30 @@ Status MemTableFile::CreateTableFile() { auto status = meta_->CreateTableFile(table_file_schema); if (status.ok()) { table_file_schema_ = table_file_schema; - } - else { - std::string errMsg = "MemTableFile::CreateTableFile failed: " + status.ToString(); - ENGINE_LOG_ERROR << errMsg; + } else { + std::string err_msg = "MemTableFile::CreateTableFile failed: " + status.ToString(); + ENGINE_LOG_ERROR << err_msg; } return status; } -Status MemTableFile::Add(const VectorSource::Ptr& source) { +Status MemTableFile::Add(const VectorSource::Ptr &source) { if (table_file_schema_.dimension_ <= 0) { - std::string errMsg = "MemTableFile::Add: table_file_schema dimension = " + - std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_; - ENGINE_LOG_ERROR << errMsg; - return Status::Error(errMsg); + std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " + + std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_; + ENGINE_LOG_ERROR << err_msg; + return Status::Error(err_msg); } - size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; - size_t memLeft = GetMemLeft(); - if (memLeft >= singleVectorMemSize) { - size_t numVectorsToAdd = std::ceil(memLeft / singleVectorMemSize); - size_t numVectorsAdded; - auto status = source->Add(execution_engine_, table_file_schema_, numVectorsToAdd, numVectorsAdded); + size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; + size_t mem_left = GetMemLeft(); + if (mem_left >= single_vector_mem_size) { + size_t num_vectors_to_add = std::ceil(mem_left / single_vector_mem_size); + size_t num_vectors_added; + auto status = source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added); if (status.ok()) { - current_mem_ += (numVectorsAdded * singleVectorMemSize); + current_mem_ += (num_vectors_added * single_vector_mem_size); } return status; } @@ -73,8 +73,8 @@ size_t MemTableFile::GetMemLeft() { } bool MemTableFile::IsFull() { - size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; - return (GetMemLeft() < singleVectorMemSize); + size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; + return (GetMemLeft() < single_vector_mem_size); } Status MemTableFile::Serialize() { @@ -88,15 +88,15 @@ Status MemTableFile::Serialize() { auto total_time = METRICS_MICROSECONDS(start_time, end_time); table_file_schema_.size_ = size; - server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double)size/total_time); + server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time); table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ? - meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; + meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; auto status = meta_->UpdateTableFile(table_file_schema_); LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index") - << " file " << table_file_schema_.file_id_ << " of size " << (double)size / (double)M << " M"; + << " file " << table_file_schema_.file_id_ << " of size " << (double) size / (double) M << " M"; execution_engine_->Cache(); diff --git a/cpp/src/db/MemTableFile.h b/cpp/src/db/MemTableFile.h index 1be0ae78ba..4d0011b362 100644 --- a/cpp/src/db/MemTableFile.h +++ b/cpp/src/db/MemTableFile.h @@ -5,20 +5,21 @@ #include "VectorSource.h" #include "ExecutionEngine.h" + namespace zilliz { namespace milvus { namespace engine { class MemTableFile { -public: + public: using Ptr = std::shared_ptr; using MetaPtr = meta::Meta::Ptr; - MemTableFile(const std::string& table_id, const std::shared_ptr& meta, const Options& options); + MemTableFile(const std::string &table_id, const std::shared_ptr &meta, const Options &options); - Status Add(const VectorSource::Ptr& source); + Status Add(const VectorSource::Ptr &source); size_t GetCurrentMem(); @@ -28,7 +29,7 @@ public: Status Serialize(); -private: + private: Status CreateTableFile(); diff --git a/cpp/src/db/NewMemManager.cpp b/cpp/src/db/NewMemManager.cpp index 3c78f37101..b0fcc9d4ae 100644 --- a/cpp/src/db/NewMemManager.cpp +++ b/cpp/src/db/NewMemManager.cpp @@ -5,11 +5,12 @@ #include + namespace zilliz { namespace milvus { namespace engine { -NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string& table_id) { +NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string &table_id) { auto memIt = mem_id_map_.find(table_id); if (memIt != mem_id_map_.end()) { return memIt->second; @@ -19,27 +20,27 @@ NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string& table return mem_id_map_[table_id]; } -Status NewMemManager::InsertVectors(const std::string& table_id_, +Status NewMemManager::InsertVectors(const std::string &table_id_, size_t n_, - const float* vectors_, - IDNumbers& vector_ids_) { + const float *vectors_, + IDNumbers &vector_ids_) { while (GetCurrentMem() > options_.maximum_memory) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } + LOG(DEBUG) << "NewMemManager::InsertVectors: mutable mem = " << GetCurrentMutableMem() << + ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + std::unique_lock lock(mutex_); return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_); } -Status NewMemManager::InsertVectorsNoLock(const std::string& table_id, +Status NewMemManager::InsertVectorsNoLock(const std::string &table_id, size_t n, - const float* vectors, - IDNumbers& vector_ids) { - - LOG(DEBUG) << "NewMemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() << - ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + const float *vectors, + IDNumbers &vector_ids) { MemTablePtr mem = GetMemByTable(table_id); VectorSource::Ptr source = std::make_shared(n, vectors); @@ -54,37 +55,33 @@ Status NewMemManager::InsertVectorsNoLock(const std::string& table_id, Status NewMemManager::ToImmutable() { std::unique_lock lock(mutex_); MemIdMap temp_map; - for (auto& kv: mem_id_map_) { - if(kv.second->Empty()) { + for (auto &kv: mem_id_map_) { + if (kv.second->Empty()) { + //empty table, no need to serialize temp_map.insert(kv); - continue;//empty table, no need to serialize + } else { + immu_mem_list_.push_back(kv.second); } - immu_mem_list_.push_back(kv.second); } mem_id_map_.swap(temp_map); return Status::OK(); } -Status NewMemManager::Serialize(std::set& table_ids) { +Status NewMemManager::Serialize(std::set &table_ids) { ToImmutable(); std::unique_lock lock(serialization_mtx_); table_ids.clear(); - for (auto& mem : immu_mem_list_) { + for (auto &mem : immu_mem_list_) { mem->Serialize(); table_ids.insert(mem->GetTableId()); } immu_mem_list_.clear(); -// for (auto mem = immu_mem_list_.begin(); mem != immu_mem_list_.end(); ) { -// (*mem)->Serialize(); -// table_ids.insert((*mem)->GetTableId()); -// mem = immu_mem_list_.erase(mem); -// LOG(DEBUG) << "immu_mem_list_ size = " << immu_mem_list_.size(); -// } + return Status::OK(); } -Status NewMemManager::EraseMemVector(const std::string& table_id) { +Status NewMemManager::EraseMemVector(const std::string &table_id) { {//erase MemVector from rapid-insert cache std::unique_lock lock(mutex_); mem_id_map_.erase(table_id); @@ -93,8 +90,8 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) { {//erase MemVector from serialize cache std::unique_lock lock(serialization_mtx_); MemList temp_list; - for (auto& mem : immu_mem_list_) { - if(mem->GetTableId() != table_id) { + for (auto &mem : immu_mem_list_) { + if (mem->GetTableId() != table_id) { temp_list.push_back(mem); } } @@ -105,20 +102,20 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) { } size_t NewMemManager::GetCurrentMutableMem() { - size_t totalMem = 0; - for (auto& kv : mem_id_map_) { + size_t total_mem = 0; + for (auto &kv : mem_id_map_) { auto memTable = kv.second; - totalMem += memTable->GetCurrentMem(); + total_mem += memTable->GetCurrentMem(); } - return totalMem; + return total_mem; } size_t NewMemManager::GetCurrentImmutableMem() { - size_t totalMem = 0; - for (auto& memTable : immu_mem_list_) { - totalMem += memTable->GetCurrentMem(); + size_t total_mem = 0; + for (auto &mem_table : immu_mem_list_) { + total_mem += mem_table->GetCurrentMem(); } - return totalMem; + return total_mem; } size_t NewMemManager::GetCurrentMem() { diff --git a/cpp/src/db/NewMemManager.h b/cpp/src/db/NewMemManager.h index 9883480404..5b933c94ca 100644 --- a/cpp/src/db/NewMemManager.h +++ b/cpp/src/db/NewMemManager.h @@ -11,25 +11,26 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { class NewMemManager : public MemManagerAbstract { -public: + public: using MetaPtr = meta::Meta::Ptr; using Ptr = std::shared_ptr; using MemTablePtr = typename MemTable::Ptr; - NewMemManager(const std::shared_ptr& meta, const Options& options) - : meta_(meta), options_(options) {} + NewMemManager(const std::shared_ptr &meta, const Options &options) + : meta_(meta), options_(options) {} - Status InsertVectors(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids) override; + Status InsertVectors(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids) override; - Status Serialize(std::set& table_ids) override; + Status Serialize(std::set &table_ids) override; - Status EraseMemVector(const std::string& table_id) override; + Status EraseMemVector(const std::string &table_id) override; size_t GetCurrentMutableMem() override; @@ -37,11 +38,11 @@ public: size_t GetCurrentMem() override; -private: - MemTablePtr GetMemByTable(const std::string& table_id); + private: + MemTablePtr GetMemByTable(const std::string &table_id); - Status InsertVectorsNoLock(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids); + Status InsertVectorsNoLock(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids); Status ToImmutable(); using MemIdMap = std::map; diff --git a/cpp/src/db/VectorSource.cpp b/cpp/src/db/VectorSource.cpp index d032be51f6..74c07ae1f6 100644 --- a/cpp/src/db/VectorSource.cpp +++ b/cpp/src/db/VectorSource.cpp @@ -4,6 +4,7 @@ #include "Log.h" #include "metrics/Metrics.h" + namespace zilliz { namespace milvus { namespace engine { @@ -11,16 +12,16 @@ namespace engine { VectorSource::VectorSource(const size_t &n, const float *vectors) : - n_(n), - vectors_(vectors), - id_generator_(new SimpleIDGenerator()) { + n_(n), + vectors_(vectors), + id_generator_(new SimpleIDGenerator()) { current_num_vectors_added = 0; } -Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, - const meta::TableFileSchema& table_file_schema, - const size_t& num_vectors_to_add, - size_t& num_vectors_added) { +Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, + const meta::TableFileSchema &table_file_schema, + const size_t &num_vectors_to_add, + size_t &num_vectors_added) { auto start_time = METRICS_NOW_TIME; @@ -36,8 +37,7 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, vector_ids_.insert(vector_ids_.end(), std::make_move_iterator(vector_ids_to_add.begin()), std::make_move_iterator(vector_ids_to_add.end())); - } - else { + } else { ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); } diff --git a/cpp/src/db/VectorSource.h b/cpp/src/db/VectorSource.h index dec31f39e1..7092805a6d 100644 --- a/cpp/src/db/VectorSource.h +++ b/cpp/src/db/VectorSource.h @@ -5,22 +5,23 @@ #include "IDGenerator.h" #include "ExecutionEngine.h" + namespace zilliz { namespace milvus { namespace engine { class VectorSource { -public: + public: using Ptr = std::shared_ptr; - VectorSource(const size_t& n, const float* vectors); + VectorSource(const size_t &n, const float *vectors); - Status Add(const ExecutionEnginePtr& execution_engine, - const meta::TableFileSchema& table_file_schema, - const size_t& num_vectors_to_add, - size_t& num_vectors_added); + Status Add(const ExecutionEnginePtr &execution_engine, + const meta::TableFileSchema &table_file_schema, + const size_t &num_vectors_to_add, + size_t &num_vectors_added); size_t GetNumVectorsAdded(); @@ -28,15 +29,15 @@ public: IDNumbers GetVectorIds(); -private: + private: const size_t n_; - const float* vectors_; + const float *vectors_; IDNumbers vector_ids_; size_t current_num_vectors_added; - IDGenerator* id_generator_; + IDGenerator *id_generator_; }; //VectorSource diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp index 818c3a6388..5b7972ec35 100644 --- a/cpp/unittest/db/mem_test.cpp +++ b/cpp/unittest/db/mem_test.cpp @@ -15,33 +15,34 @@ #include #include + using namespace zilliz::milvus; namespace { - static const std::string TABLE_NAME = "test_group"; - static constexpr int64_t TABLE_DIM = 256; - static constexpr int64_t VECTOR_COUNT = 250000; - static constexpr int64_t INSERT_LOOP = 10000; +static const std::string TABLE_NAME = "test_group"; +static constexpr int64_t TABLE_DIM = 256; +static constexpr int64_t VECTOR_COUNT = 250000; +static constexpr int64_t INSERT_LOOP = 10000; - engine::meta::TableSchema BuildTableSchema() { - engine::meta::TableSchema table_info; - table_info.dimension_ = TABLE_DIM; - table_info.table_id_ = TABLE_NAME; - table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; - return table_info; - } +engine::meta::TableSchema BuildTableSchema() { + engine::meta::TableSchema table_info; + table_info.dimension_ = TABLE_DIM; + table_info.table_id_ = TABLE_NAME; + table_info.engine_type_ = (int) engine::EngineType::FAISS_IDMAP; + return table_info; +} - void BuildVectors(int64_t n, std::vector& vectors) { - vectors.clear(); - vectors.resize(n*TABLE_DIM); - float* data = vectors.data(); - for(int i = 0; i < n; i++) { - for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); - data[TABLE_DIM * i] += i / 2000.; - } +void BuildVectors(int64_t n, std::vector &vectors) { + vectors.clear(); + vectors.resize(n * TABLE_DIM); + float *data = vectors.data(); + for (int i = 0; i < n; i++) { + for (int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; } } +} TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) { @@ -65,7 +66,7 @@ TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) { size_t num_vectors_added; engine::ExecutionEnginePtr execution_engine_ = engine::EngineFactory::Build(table_file_schema.dimension_, table_file_schema.location_, - (engine::EngineType)table_file_schema.engine_type_); + (engine::EngineType) table_file_schema.engine_type_); status = source.Add(execution_engine_, table_file_schema, 50, num_vectors_added); ASSERT_TRUE(status.ok()); @@ -82,10 +83,6 @@ TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) { vector_ids = source.GetVectorIds(); ASSERT_EQ(vector_ids.size(), 100); -// for (auto& id : vector_ids) { -// std::cout << id << std::endl; -// } - status = impl_->DropAll(); ASSERT_TRUE(status.ok()); } @@ -99,7 +96,7 @@ TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) { auto status = impl_->CreateTable(table_schema); ASSERT_TRUE(status.ok()); - engine::MemTableFile memTableFile(TABLE_NAME, impl_, options); + engine::MemTableFile mem_table_file(TABLE_NAME, impl_, options); int64_t n_100 = 100; std::vector vectors_100; @@ -107,28 +104,28 @@ TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) { engine::VectorSource::Ptr source = std::make_shared(n_100, vectors_100.data()); - status = memTableFile.Add(source); + status = mem_table_file.Add(source); ASSERT_TRUE(status.ok()); -// std::cout << memTableFile.GetCurrentMem() << " " << memTableFile.GetMemLeft() << std::endl; +// std::cout << mem_table_file.GetCurrentMem() << " " << mem_table_file.GetMemLeft() << std::endl; engine::IDNumbers vector_ids = source->GetVectorIds(); ASSERT_EQ(vector_ids.size(), 100); size_t singleVectorMem = sizeof(float) * TABLE_DIM; - ASSERT_EQ(memTableFile.GetCurrentMem(), n_100 * singleVectorMem); + ASSERT_EQ(mem_table_file.GetCurrentMem(), n_100 * singleVectorMem); int64_t n_max = engine::MAX_TABLE_FILE_MEM / singleVectorMem; std::vector vectors_128M; BuildVectors(n_max, vectors_128M); engine::VectorSource::Ptr source_128M = std::make_shared(n_max, vectors_128M.data()); - status = memTableFile.Add(source_128M); + status = mem_table_file.Add(source_128M); vector_ids = source_128M->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_max - n_100); - ASSERT_TRUE(memTableFile.IsFull()); + ASSERT_TRUE(mem_table_file.IsFull()); status = impl_->DropAll(); ASSERT_TRUE(status.ok()); @@ -149,34 +146,34 @@ TEST_F(NewMemManagerTest, MEM_TABLE_TEST) { engine::VectorSource::Ptr source_100 = std::make_shared(n_100, vectors_100.data()); - engine::MemTable memTable(TABLE_NAME, impl_, options); + engine::MemTable mem_table(TABLE_NAME, impl_, options); - status = memTable.Add(source_100); + status = mem_table.Add(source_100); ASSERT_TRUE(status.ok()); engine::IDNumbers vector_ids = source_100->GetVectorIds(); ASSERT_EQ(vector_ids.size(), 100); - engine::MemTableFile::Ptr memTableFile; - memTable.GetCurrentMemTableFile(memTableFile); + engine::MemTableFile::Ptr mem_table_file; + mem_table.GetCurrentMemTableFile(mem_table_file); size_t singleVectorMem = sizeof(float) * TABLE_DIM; - ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); + ASSERT_EQ(mem_table_file->GetCurrentMem(), n_100 * singleVectorMem); int64_t n_max = engine::MAX_TABLE_FILE_MEM / singleVectorMem; std::vector vectors_128M; BuildVectors(n_max, vectors_128M); engine::VectorSource::Ptr source_128M = std::make_shared(n_max, vectors_128M.data()); - status = memTable.Add(source_128M); + status = mem_table.Add(source_128M); ASSERT_TRUE(status.ok()); vector_ids = source_128M->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_max); - memTable.GetCurrentMemTableFile(memTableFile); - ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); + mem_table.GetCurrentMemTableFile(mem_table_file); + ASSERT_EQ(mem_table_file->GetCurrentMem(), n_100 * singleVectorMem); - ASSERT_EQ(memTable.GetTableFileCount(), 2); + ASSERT_EQ(mem_table.GetTableFileCount(), 2); int64_t n_1G = 1024000; std::vector vectors_1G; @@ -184,16 +181,16 @@ TEST_F(NewMemManagerTest, MEM_TABLE_TEST) { engine::VectorSource::Ptr source_1G = std::make_shared(n_1G, vectors_1G.data()); - status = memTable.Add(source_1G); + status = mem_table.Add(source_1G); ASSERT_TRUE(status.ok()); vector_ids = source_1G->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_1G); int expectedTableFileCount = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); - ASSERT_EQ(memTable.GetTableFileCount(), expectedTableFileCount); + ASSERT_EQ(mem_table.GetTableFileCount(), expectedTableFileCount); - status = memTable.Serialize(); + status = mem_table.Serialize(); ASSERT_TRUE(status.ok()); status = impl_->DropAll(); @@ -216,7 +213,7 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) { ASSERT_STATS(stat); ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); - std::map> search_vectors; + std::map> search_vectors; { engine::IDNumbers vector_ids; int64_t nb = 1024000; @@ -231,8 +228,8 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) { std::mt19937 gen(rd()); std::uniform_int_distribution dis(0, nb - 1); - int64_t numQuery = 20; - for (int64_t i = 0; i < numQuery; ++i) { + int64_t num_query = 20; + for (int64_t i = 0; i < num_query; ++i) { int64_t index = dis(gen); std::vector search; for (int64_t j = 0; j < TABLE_DIM; j++) { @@ -243,8 +240,8 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) { } int k = 10; - for(auto& pair : search_vectors) { - auto& search = pair.second; + for (auto &pair : search_vectors) { + auto &search = pair.second; engine::QueryResults results; stat = db_->Query(TABLE_NAME, k, 1, search.data(), results); ASSERT_EQ(results[0][0].first, pair.first); @@ -329,18 +326,18 @@ TEST_F(NewMemManagerTest, CONCURRENT_INSERT_SEARCH_TEST) { uint64_t count = 0; uint64_t prev_count = 0; - for (auto j=0; j<10; ++j) { + for (auto j = 0; j < 10; ++j) { ss.str(""); db_->Size(count); prev_count = count; START_TIMER; stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results); - ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; + ss << "Search " << j << " With Size " << count / engine::meta::M << " M"; STOP_TIMER(ss.str()); ASSERT_STATS(stat); - for (auto k=0; kInsertVectors(TABLE_NAME, qb, qxb.data(), target_ids); ASSERT_EQ(target_ids.size(), qb); } else { diff --git a/cpp/unittest/db/utils.h b/cpp/unittest/db/utils.h index d06500de5c..9c126030c2 100644 --- a/cpp/unittest/db/utils.h +++ b/cpp/unittest/db/utils.h @@ -30,7 +30,7 @@ #define STOP_TIMER(name) #endif -void ASSERT_STATS(zilliz::milvus::engine::Status& stat); +void ASSERT_STATS(zilliz::milvus::engine::Status &stat); //class TestEnv : public ::testing::Environment { //public: @@ -54,8 +54,8 @@ void ASSERT_STATS(zilliz::milvus::engine::Status& stat); // ::testing::AddGlobalTestEnvironment(new TestEnv); class DBTest : public ::testing::Test { -protected: - zilliz::milvus::engine::DB* db_; + protected: + zilliz::milvus::engine::DB *db_; void InitLog(); virtual void SetUp() override; @@ -64,13 +64,13 @@ protected: }; class DBTest2 : public DBTest { -protected: + protected: virtual zilliz::milvus::engine::Options GetOptions() override; }; class MetaTest : public DBTest { -protected: + protected: std::shared_ptr impl_; virtual void SetUp() override; @@ -78,17 +78,17 @@ protected: }; class MySQLTest : public ::testing::Test { -protected: + protected: // std::shared_ptr impl_; zilliz::milvus::engine::DBMetaOptions getDBMetaOptions(); }; -class MySQLDBTest : public ::testing::Test { -protected: +class MySQLDBTest : public ::testing::Test { + protected: zilliz::milvus::engine::Options GetOptions(); }; -class NewMemManagerTest : public ::testing::Test { +class NewMemManagerTest : public ::testing::Test { void InitLog(); - virtual void SetUp() override; + void SetUp() override; };