diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a4b23d89a..ea5ca83c98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ Please mark all change in change log and use the issue from GitHub ## Task -# Milvus 0.7.1 (TBD) +# Milvus 0.7.1 (2020-03-29) ## Bug - \#1301 Data in WAL may be accidentally inserted into a new table with the same name. @@ -32,6 +32,7 @@ Please mark all change in change log and use the issue from GitHub - \#1735 Fix search out of memory with ivf_flat - \#1747 Expected error status if search with partition_tag not existed - \#1756 Fix memory exhausted during searching +- \#1781 Fix search hang with SQ8H ## Feature - \#261 Integrate ANNOY into Milvus @@ -174,7 +175,7 @@ Please mark all change in change log and use the issue from GitHub # Milvus 0.6.0 (2019-12-07) ## Bug -- \#228 memory usage increased slowly during searching vectors +- \#228 Memory usage increased slowly during searching vectors - \#246 Exclude src/external folder from code coverage for jenkin ci - \#248 Reside src/external in thirdparty - \#316 Some files not merged after vectors added @@ -201,7 +202,7 @@ Please mark all change in change log and use the issue from GitHub - \#523 Erase file data from cache once the file is marked as deleted - \#527 faiss benchmark not compatible with faiss 1.6.0 - \#530 BuildIndex stop when do build index and search simultaneously -- \#532 assigin value to `table_name` from confest shell +- \#532 Assigin value to `table_name` from confest shell - \#533 NSG build failed with MetricType Inner Product - \#543 client raise exception in shards when search results is empty - \#545 Avoid dead circle of build index thread when error occurs diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 34de23399a..53ebe63b4e 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -90,7 +90,7 @@ if (MILVUS_VERSION_MAJOR STREQUAL "" OR MILVUS_VERSION_MINOR STREQUAL "" OR MILVUS_VERSION_PATCH STREQUAL "") message(WARNING "Failed to determine Milvus version from git branch name") - set(MILVUS_VERSION "0.7.0") + set(MILVUS_VERSION "0.7.1") endif () message(STATUS "Build version = ${MILVUS_VERSION}") diff --git a/core/src/cache/Cache.h b/core/src/cache/Cache.h index 5f72fa5060..34594c4573 100644 --- a/core/src/cache/Cache.h +++ b/core/src/cache/Cache.h @@ -69,6 +69,9 @@ class Cache { void erase(const std::string& key); + bool + reserve(const int64_t size); + void print(); @@ -77,7 +80,13 @@ class Cache { private: void - free_memory(); + insert_internal(const std::string& key, const ItemObj& item); + + void + erase_internal(const std::string& key); + + void + free_memory_internal(const int64_t target_size); private: std::string header_; diff --git a/core/src/cache/Cache.inl b/core/src/cache/Cache.inl index 147c344532..607976e411 100644 --- a/core/src/cache/Cache.inl +++ b/core/src/cache/Cache.inl @@ -13,8 +13,6 @@ namespace milvus { namespace cache { constexpr double DEFAULT_THRESHOLD_PERCENT = 0.7; -constexpr double WARNING_THRESHOLD_PERCENT = 0.9; -constexpr double BIG_ITEM_THRESHOLD_PERCENT = 0.1; template Cache::Cache(int64_t capacity, int64_t cache_max_count, const std::string& header) @@ -28,9 +26,10 @@ Cache::Cache(int64_t capacity, int64_t cache_max_count, const std::stri template void Cache::set_capacity(int64_t capacity) { + std::lock_guard lock(mutex_); if (capacity > 0) { capacity_ = capacity; - free_memory(); + free_memory_internal(capacity); } } @@ -55,56 +54,95 @@ Cache::get(const std::string& key) { if (!lru_.exists(key)) { return nullptr; } - return lru_.get(key); } template void Cache::insert(const std::string& key, const ItemObj& item) { - if (item == nullptr) { - return; - } - - size_t item_size = item->Size(); - // calculate usage - { - std::lock_guard lock(mutex_); - - // if key already exist, subtract old item size - if (lru_.exists(key)) { - const ItemObj& old_item = lru_.get(key); - usage_ -= old_item->Size(); - } - - // plus new item size - usage_ += item_size; - } - - // if usage exceed capacity, free some items - if (usage_ > capacity_ || - (item_size > (int64_t)(capacity_ * BIG_ITEM_THRESHOLD_PERCENT) && - usage_ > (int64_t)(capacity_ * WARNING_THRESHOLD_PERCENT))) { - SERVER_LOG_DEBUG << header_ << " Current usage " << (usage_ >> 20) << "MB is too high for capacity " - << (capacity_ >> 20) << "MB, start free memory"; - free_memory(); - } - - // insert new item - { - std::lock_guard lock(mutex_); - - lru_.put(key, item); - SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache"; - SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: " - << (capacity_ >> 20) << "MB"; - } + std::lock_guard lock(mutex_); + insert_internal(key, item); } template void Cache::erase(const std::string& key) { std::lock_guard lock(mutex_); + erase_internal(key); +} + +template +bool +Cache::reserve(const int64_t item_size) { + std::lock_guard lock(mutex_); + if (item_size > capacity_) { + SERVER_LOG_ERROR << header_ << " item size " << (item_size >> 20) << "MB too big to insert into cache capacity" + << (capacity_ >> 20) << "MB"; + return false; + } + if (item_size > capacity_ - usage_) { + free_memory_internal(capacity_ - item_size); + } + return true; +} + +template +void +Cache::clear() { + std::lock_guard lock(mutex_); + lru_.clear(); + usage_ = 0; + SERVER_LOG_DEBUG << header_ << " Clear cache !"; +} + + +template +void +Cache::print() { + std::lock_guard lock(mutex_); + size_t cache_count = lru_.size(); + // for (auto it = lru_.begin(); it != lru_.end(); ++it) { + // SERVER_LOG_DEBUG << it->first; + // } + SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20) + << "MB, [capacity] " << (capacity_ >> 20) << "MB"; +} + +template +void +Cache::insert_internal(const std::string& key, const ItemObj& item) { + if (item == nullptr) { + return; + } + + size_t item_size = item->Size(); + + // if key already exist, subtract old item size + if (lru_.exists(key)) { + const ItemObj& old_item = lru_.get(key); + usage_ -= old_item->Size(); + } + + // plus new item size + usage_ += item_size; + + // if usage exceed capacity, free some items + if (usage_ > capacity_) { + SERVER_LOG_DEBUG << header_ << " Current usage " << (usage_ >> 20) << "MB is too high for capacity " + << (capacity_ >> 20) << "MB, start free memory"; + free_memory_internal(capacity_); + } + + // insert new item + lru_.put(key, item); + SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache"; + SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: " + << (capacity_ >> 20) << "MB"; +} + +template +void +Cache::erase_internal(const std::string& key) { if (!lru_.exists(key)) { return; } @@ -122,22 +160,9 @@ Cache::erase(const std::string& key) { template void -Cache::clear() { - std::lock_guard lock(mutex_); - lru_.clear(); - usage_ = 0; - SERVER_LOG_DEBUG << header_ << " Clear cache !"; -} - -/* free memory space when CACHE occupation exceed its capacity */ -template -void -Cache::free_memory() { - // if (usage_ <= capacity_) - // return; - - int64_t threshhold = capacity_ * freemem_percent_; - int64_t delta_size = usage_ - threshhold; +Cache::free_memory_internal(const int64_t target_size) { + int64_t threshold = std::min((int64_t)(capacity_ * freemem_percent_), target_size); + int64_t delta_size = usage_ - threshold; if (delta_size <= 0) { delta_size = 1; // ensure at least one item erased } @@ -145,44 +170,22 @@ Cache::free_memory() { std::set key_array; int64_t released_size = 0; - { - std::lock_guard lock(mutex_); + auto it = lru_.rbegin(); + while (it != lru_.rend() && released_size < delta_size) { + auto& key = it->first; + auto& obj_ptr = it->second; - auto it = lru_.rbegin(); - while (it != lru_.rend() && released_size < delta_size) { - auto& key = it->first; - auto& obj_ptr = it->second; - - key_array.emplace(key); - released_size += obj_ptr->Size(); - ++it; - } + key_array.emplace(key); + released_size += obj_ptr->Size(); + ++it; } SERVER_LOG_DEBUG << header_ << " To be released memory size: " << (released_size >> 20) << "MB"; for (auto& key : key_array) { - erase(key); + erase_internal(key); } } -template -void -Cache::print() { - size_t cache_count = 0; - { - std::lock_guard lock(mutex_); - cache_count = lru_.size(); -#if 0 - for (auto it = lru_.begin(); it != lru_.end(); ++it) { - SERVER_LOG_DEBUG << it->first; - } -#endif - } - - SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20) - << "MB, [capacity] " << (capacity_ >> 20) << "MB"; -} - } // namespace cache } // namespace milvus diff --git a/core/src/cache/CacheMgr.h b/core/src/cache/CacheMgr.h index 168c00b68e..c7ab9e26ab 100644 --- a/core/src/cache/CacheMgr.h +++ b/core/src/cache/CacheMgr.h @@ -39,6 +39,9 @@ class CacheMgr { virtual void EraseItem(const std::string& key); + virtual bool + Reserve(const int64_t size); + virtual void PrintInfo(); @@ -54,14 +57,20 @@ class CacheMgr { void SetCapacity(int64_t capacity); + void + Lock(); + + void + Unlock(); + protected: CacheMgr(); virtual ~CacheMgr(); protected: - using CachePtr = std::shared_ptr>; - CachePtr cache_; + std::shared_ptr> cache_; + std::mutex mutex_; }; } // namespace cache diff --git a/core/src/cache/CacheMgr.inl b/core/src/cache/CacheMgr.inl index f3eb6dc42e..606510c66c 100644 --- a/core/src/cache/CacheMgr.inl +++ b/core/src/cache/CacheMgr.inl @@ -27,7 +27,6 @@ CacheMgr::ItemCount() const { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; } - return (uint64_t)(cache_->size()); } @@ -38,7 +37,6 @@ CacheMgr::ItemExists(const std::string& key) { SERVER_LOG_ERROR << "Cache doesn't exist"; return false; } - return cache_->exists(key); } @@ -60,7 +58,6 @@ CacheMgr::InsertItem(const std::string& key, const ItemObj& data) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; } - cache_->insert(key, data); server::Metrics::GetInstance().CacheAccessTotalIncrement(); } @@ -72,11 +69,20 @@ CacheMgr::EraseItem(const std::string& key) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; } - cache_->erase(key); server::Metrics::GetInstance().CacheAccessTotalIncrement(); } +template +bool +CacheMgr::Reserve(const int64_t size) { + if (cache_ == nullptr) { + SERVER_LOG_ERROR << "Cache doesn't exist"; + return false; + } + return cache_->reserve(size); +} + template void CacheMgr::PrintInfo() { @@ -84,7 +90,6 @@ CacheMgr::PrintInfo() { SERVER_LOG_ERROR << "Cache doesn't exist"; return; } - cache_->print(); } @@ -95,7 +100,6 @@ CacheMgr::ClearCache() { SERVER_LOG_ERROR << "Cache doesn't exist"; return; } - cache_->clear(); } @@ -106,7 +110,6 @@ CacheMgr::CacheUsage() const { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; } - return cache_->usage(); } @@ -117,7 +120,6 @@ CacheMgr::CacheCapacity() const { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; } - return cache_->capacity(); } @@ -131,5 +133,17 @@ CacheMgr::SetCapacity(int64_t capacity) { cache_->set_capacity(capacity); } +template +void +CacheMgr::Lock() { + mutex_.lock(); +} + +template +void +CacheMgr::Unlock() { + mutex_.unlock(); +} + } // namespace cache } // namespace milvus diff --git a/core/src/cache/GpuCacheMgr.cpp b/core/src/cache/GpuCacheMgr.cpp index 374d141fc0..30e4ec4c5a 100644 --- a/core/src/cache/GpuCacheMgr.cpp +++ b/core/src/cache/GpuCacheMgr.cpp @@ -22,7 +22,7 @@ namespace cache { #ifdef MILVUS_GPU_VERSION std::mutex GpuCacheMgr::global_mutex_; -std::unordered_map> GpuCacheMgr::instance_; +std::unordered_map GpuCacheMgr::instance_; namespace { constexpr int64_t G_BYTE = 1024 * 1024 * 1024; @@ -65,33 +65,28 @@ GpuCacheMgr::InsertItem(const std::string& key, const milvus::cache::DataObjPtr& } } +bool +GpuCacheMgr::Reserve(const int64_t size) { + return CacheMgr::Reserve(size); +} + GpuCacheMgrPtr GpuCacheMgr::GetInstance(int64_t gpu_id) { if (instance_.find(gpu_id) == instance_.end()) { std::lock_guard lock(global_mutex_); if (instance_.find(gpu_id) == instance_.end()) { - instance_[gpu_id] = std::make_pair(std::make_shared(gpu_id), std::make_shared()); + instance_[gpu_id] = std::make_shared(gpu_id); } } - return instance_[gpu_id].first; -} - -MutexPtr -GpuCacheMgr::GetInstanceMutex(int64_t gpu_id) { - if (instance_.find(gpu_id) == instance_.end()) { - std::lock_guard lock(global_mutex_); - if (instance_.find(gpu_id) == instance_.end()) { - instance_[gpu_id] = std::make_pair(std::make_shared(gpu_id), std::make_shared()); - } - } - return instance_[gpu_id].second; + return instance_[gpu_id]; } void GpuCacheMgr::OnGpuCacheCapacityChanged(int64_t capacity) { for (auto& iter : instance_) { - std::lock_guard lock(*(iter.second.second)); - iter.second.first->SetCapacity(capacity * G_BYTE); + iter.second->Lock(); + iter.second->SetCapacity(capacity * G_BYTE); + iter.second->Unlock(); } } diff --git a/core/src/cache/GpuCacheMgr.h b/core/src/cache/GpuCacheMgr.h index d96d6f3c89..17a92c1d8e 100644 --- a/core/src/cache/GpuCacheMgr.h +++ b/core/src/cache/GpuCacheMgr.h @@ -39,12 +39,12 @@ class GpuCacheMgr : public CacheMgr, public server::GpuResourceConfi void InsertItem(const std::string& key, const DataObjPtr& data); + bool + Reserve(const int64_t size); + static GpuCacheMgrPtr GetInstance(int64_t gpu_id); - static MutexPtr - GetInstanceMutex(int64_t gpu_id); - protected: void OnGpuCacheCapacityChanged(int64_t capacity) override; @@ -53,7 +53,7 @@ class GpuCacheMgr : public CacheMgr, public server::GpuResourceConfi bool gpu_enable_ = true; int64_t gpu_id_; static std::mutex global_mutex_; - static std::unordered_map> instance_; + static std::unordered_map instance_; std::string identity_; }; #endif diff --git a/core/src/config/Config.cpp b/core/src/config/Config.cpp index cddc8a1203..b8f65a1702 100644 --- a/core/src/config/Config.cpp +++ b/core/src/config/Config.cpp @@ -40,7 +40,8 @@ namespace server { constexpr int64_t GB = 1UL << 30; static const std::unordered_map milvus_config_version_map({{"0.6.0", "0.1"}, - {"0.7.0", "0.2"}}); + {"0.7.0", "0.2"}, + {"0.7.1", "0.2"}}); ///////////////////////////////////////////////////////////// Config::Config() { diff --git a/core/src/db/engine/ExecutionEngine.h b/core/src/db/engine/ExecutionEngine.h index 56f829960d..62e1806b2a 100644 --- a/core/src/db/engine/ExecutionEngine.h +++ b/core/src/db/engine/ExecutionEngine.h @@ -112,9 +112,6 @@ class ExecutionEngine { virtual Status Cache() = 0; - virtual Status - GpuCache(uint64_t gpu_id) = 0; - virtual Status Init() = 0; diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index 9fcd8863de..e367a35907 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -41,6 +41,7 @@ #include "metrics/Metrics.h" #include "scheduler/Utils.h" #include "utils/CommonUtil.h" +#include "utils/Error.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/Status.h" @@ -549,13 +550,16 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) { try { /* Index data is copied to GPU first, then added into GPU cache. - * We MUST add a lock here to avoid more than one INDEX are copied to one GPU card at same time, - * which will potentially cause GPU out of memory. + * Add lock here to avoid multiple INDEX are copied to one GPU card at same time. + * And reserve space to avoid GPU out of memory issue. */ - std::lock_guard lock(*(cache::GpuCacheMgr::GetInstanceMutex(device_id))); ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " start"; + auto gpu_cache_mgr = cache::GpuCacheMgr::GetInstance(device_id); + // gpu_cache_mgr->Lock(); + // gpu_cache_mgr->Reserve(index_->Size()); index_ = knowhere::cloner::CopyCpuToGpu(index_, device_id, knowhere::Config()); - GpuCache(device_id); + // gpu_cache_mgr->InsertItem(location_, std::static_pointer_cast(index_)); + // gpu_cache_mgr->Unlock(); ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " finished"; } catch (std::exception& e) { ENGINE_LOG_ERROR << e.what(); @@ -572,10 +576,11 @@ ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) { #ifdef MILVUS_GPU_VERSION // the ToIndexData is only a placeholder, cpu-copy-to-gpu action is performed in if (index_) { + auto gpu_cache_mgr = milvus::cache::GpuCacheMgr::GetInstance(device_id); + gpu_cache_mgr->Lock(); gpu_num_ = device_id; - auto to_index_data = std::make_shared(index_->Size()); - cache::DataObjPtr obj = std::static_pointer_cast(to_index_data); - milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_ + "_placeholder", obj); + gpu_cache_mgr->Reserve(index_->Size()); + gpu_cache_mgr->Unlock(); } #endif return Status::OK(); @@ -958,18 +963,11 @@ ExecutionEngineImpl::GetVectorByID(const int64_t& id, uint8_t* vector, bool hybr Status ExecutionEngineImpl::Cache() { + auto cpu_cache_mgr = milvus::cache::CpuCacheMgr::GetInstance(); + cpu_cache_mgr->Lock(); cache::DataObjPtr obj = std::static_pointer_cast(index_); - milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj); - - return Status::OK(); -} - -Status -ExecutionEngineImpl::GpuCache(uint64_t gpu_id) { -#ifdef MILVUS_GPU_VERSION - cache::DataObjPtr obj = std::static_pointer_cast(index_); - milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj); -#endif + cpu_cache_mgr->InsertItem(location_, obj); + cpu_cache_mgr->Unlock(); return Status::OK(); } diff --git a/core/src/db/engine/ExecutionEngineImpl.h b/core/src/db/engine/ExecutionEngineImpl.h index d8ef30a6aa..35e0b9c217 100644 --- a/core/src/db/engine/ExecutionEngineImpl.h +++ b/core/src/db/engine/ExecutionEngineImpl.h @@ -86,9 +86,6 @@ class ExecutionEngineImpl : public ExecutionEngine { Status Cache() override; - Status - GpuCache(uint64_t gpu_id) override; - Status Init() override; diff --git a/core/src/server/delivery/request/BaseRequest.h b/core/src/server/delivery/request/BaseRequest.h index f3d70d7934..d00a20a6c3 100644 --- a/core/src/server/delivery/request/BaseRequest.h +++ b/core/src/server/delivery/request/BaseRequest.h @@ -211,7 +211,7 @@ class BaseRequest { TableNotExistMsg(const std::string& table_name); protected: - const std::shared_ptr& context_; + const std::shared_ptr context_; mutable std::mutex finish_mtx_; std::condition_variable finish_cond_; diff --git a/core/src/server/web_impl/README.md b/core/src/server/web_impl/README.md index 675ed3c6df..04eaaa12a9 100644 --- a/core/src/server/web_impl/README.md +++ b/core/src/server/web_impl/README.md @@ -1388,7 +1388,7 @@ $ curl -X GET "http://127.0.0.1:19121/system/version" -H "accept: application/js ##### Response ```json -{"code":0,"message":"OK","reply": "0.7.0" } +{"code":0,"message":"OK","reply": "0.7.1" } ``` ### `system/{op}` (PUT) diff --git a/core/unittest/db/test_engine.cpp b/core/unittest/db/test_engine.cpp index ac05c4d1e1..aac6fcd8b3 100644 --- a/core/unittest/db/test_engine.cpp +++ b/core/unittest/db/test_engine.cpp @@ -203,8 +203,6 @@ TEST_F(EngineTest, ENGINE_IMPL_TEST) { auto status = engine_ptr->CopyToGpu(0, false); ASSERT_TRUE(status.ok()); - status = engine_ptr->GpuCache(0); - ASSERT_TRUE(status.ok()); status = engine_ptr->CopyToGpu(0, false); ASSERT_TRUE(status.ok()); diff --git a/sdk/grpc/ClientProxy.cpp b/sdk/grpc/ClientProxy.cpp index 219f78e4ad..1e9dbba1ba 100644 --- a/sdk/grpc/ClientProxy.cpp +++ b/sdk/grpc/ClientProxy.cpp @@ -17,7 +17,7 @@ #include "grpc-gen/gen-milvus/milvus.grpc.pb.h" -#define MILVUS_SDK_VERSION "0.7.0"; +#define MILVUS_SDK_VERSION "0.7.1"; namespace milvus { diff --git a/tests/milvus_python_test/test_ping.py b/tests/milvus_python_test/test_ping.py index a2ca5575d8..2a05349354 100644 --- a/tests/milvus_python_test/test_ping.py +++ b/tests/milvus_python_test/test_ping.py @@ -1,7 +1,7 @@ import logging import pytest -__version__ = '0.7.0' +__version__ = '0.7.1' class TestPing: