diff --git a/CHANGELOG.md b/CHANGELOG.md index d579f4db9c..b5a0087550 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Please mark all change in change log and use the issue from GitHub - \#1708 NSG search crashed - \#1724 Remove unused unittests - \#1734 Opentracing for combined search request +- \#1735 Fix search out of memory with ivf_flat ## Feature - \#1603 BinaryFlat add 2 Metric: Substructure and Superstructure diff --git a/core/src/cache/Cache.h b/core/src/cache/Cache.h index ef79ea948f..5f72fa5060 100644 --- a/core/src/cache/Cache.h +++ b/core/src/cache/Cache.h @@ -26,7 +26,7 @@ template class Cache { public: // mem_capacity, units:GB - Cache(int64_t capacity_gb, uint64_t cache_max_count); + Cache(int64_t capacity_gb, int64_t cache_max_count, const std::string& header = ""); ~Cache() = default; int64_t @@ -80,6 +80,7 @@ class Cache { free_memory(); private: + std::string header_; int64_t usage_; int64_t capacity_; double freemem_percent_; diff --git a/core/src/cache/Cache.inl b/core/src/cache/Cache.inl index e1bc643399..2ad25d69af 100644 --- a/core/src/cache/Cache.inl +++ b/core/src/cache/Cache.inl @@ -12,12 +12,16 @@ namespace milvus { namespace cache { -constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.85; +constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.7; +constexpr double WARNING_THRESHHOLD_PERCENT = 0.9; template -Cache::Cache(int64_t capacity, uint64_t cache_max_count) - : usage_(0), capacity_(capacity), freemem_percent_(DEFAULT_THRESHHOLD_PERCENT), lru_(cache_max_count) { - // AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity) +Cache::Cache(int64_t capacity, int64_t cache_max_count, const std::string& header) + : usage_(0), + capacity_(capacity), + header_(header), + freemem_percent_(DEFAULT_THRESHHOLD_PERCENT), + lru_(cache_max_count) { } template @@ -61,12 +65,7 @@ Cache::insert(const std::string& key, const ItemObj& item) { return; } - // if(item->size() > capacity_) { - // SERVER_LOG_ERROR << "Item size " << item->size() - // << " is too large to insert into cache, capacity " << capacity_; - // return; - // } - + size_t item_size = item->Size(); // calculate usage { std::lock_guard lock(mutex_); @@ -78,13 +77,13 @@ Cache::insert(const std::string& key, const ItemObj& item) { } // plus new item size - usage_ += item->Size(); + usage_ += item_size; } // if usage exceed capacity, free some items - if (usage_ > capacity_) { - SERVER_LOG_DEBUG << "Current usage " << usage_ << " exceeds cache capacity " << capacity_ - << ", start free memory"; + if (usage_ > (int64_t)(capacity_ * WARNING_THRESHHOLD_PERCENT)) { + SERVER_LOG_DEBUG << header_ << " Current usage " << (usage_ >> 20) << "MB is too high for capacity " + << (capacity_ >> 20) << "MB, start free memory"; free_memory(); } @@ -93,8 +92,9 @@ Cache::insert(const std::string& key, const ItemObj& item) { std::lock_guard lock(mutex_); lru_.put(key, item); - SERVER_LOG_DEBUG << "Insert " << key << " size: " << item->Size() << " bytes into cache, usage: " << usage_ - << " bytes," << " capacity: " << capacity_ << " bytes"; + SERVER_LOG_DEBUG << header_ << " Insert " << key << " size: " << (item_size >> 20) << "MB into cache"; + SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: " + << (capacity_ >> 20) << "MB"; } } @@ -106,13 +106,15 @@ Cache::erase(const std::string& key) { return; } - const ItemObj& old_item = lru_.get(key); - usage_ -= old_item->Size(); - - SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size() << " bytes from cache, usage: " << usage_ - << " bytes," << " capacity: " << capacity_ << " bytes"; + const ItemObj& item = lru_.get(key); + size_t item_size = item->Size(); lru_.erase(key); + + usage_ -= item_size; + SERVER_LOG_DEBUG << header_ << " Erase " << key << " size: " << (item_size >> 20) << "MB from cache"; + SERVER_LOG_DEBUG << header_ << " Count: " << lru_.size() << ", Usage: " << (usage_ >> 20) << "MB, Capacity: " + << (capacity_ >> 20) << "MB"; } template @@ -121,15 +123,15 @@ Cache::clear() { std::lock_guard lock(mutex_); lru_.clear(); usage_ = 0; - SERVER_LOG_DEBUG << "Clear cache !"; + SERVER_LOG_DEBUG << header_ << " Clear cache !"; } /* free memory space when CACHE occupation exceed its capacity */ template void Cache::free_memory() { - if (usage_ <= capacity_) - return; + // if (usage_ <= capacity_) + // return; int64_t threshhold = capacity_ * freemem_percent_; int64_t delta_size = usage_ - threshhold; @@ -154,13 +156,11 @@ Cache::free_memory() { } } - SERVER_LOG_DEBUG << "to be released memory size: " << released_size; + SERVER_LOG_DEBUG << header_ << " To be released memory size: " << (released_size >> 20) << "MB"; for (auto& key : key_array) { erase(key); } - - print(); } template @@ -177,7 +177,8 @@ Cache::print() { #endif } - SERVER_LOG_DEBUG << "[Cache] [item count]: " << cache_count << " [capacity] " << capacity_ << "(bytes) [usage] " << usage_ << "(bytes)"; + SERVER_LOG_DEBUG << header_ << " [item count]: " << cache_count << ", [usage] " << (usage_ >> 20) + << "MB, [capacity] " << (capacity_ >> 20) << "MB"; } } // namespace cache diff --git a/core/src/cache/CpuCacheMgr.cpp b/core/src/cache/CpuCacheMgr.cpp index 7e2233d427..ff8649315a 100644 --- a/core/src/cache/CpuCacheMgr.cpp +++ b/core/src/cache/CpuCacheMgr.cpp @@ -32,7 +32,7 @@ CpuCacheMgr::CpuCacheMgr() { int64_t cpu_cache_cap; config.GetCacheConfigCpuCacheCapacity(cpu_cache_cap); int64_t cap = cpu_cache_cap * unit; - cache_ = std::make_shared>(cap, 1UL << 32); + cache_ = std::make_shared>(cap, 1UL << 32, "[CACHE CPU]"); float cpu_cache_threshold; config.GetCacheConfigCpuCacheThreshold(cpu_cache_threshold); diff --git a/core/src/cache/GpuCacheMgr.cpp b/core/src/cache/GpuCacheMgr.cpp index c3b3b0b8df..374d141fc0 100644 --- a/core/src/cache/GpuCacheMgr.cpp +++ b/core/src/cache/GpuCacheMgr.cpp @@ -21,21 +21,22 @@ namespace milvus { namespace cache { #ifdef MILVUS_GPU_VERSION -std::mutex GpuCacheMgr::mutex_; -std::unordered_map GpuCacheMgr::instance_; +std::mutex GpuCacheMgr::global_mutex_; +std::unordered_map> GpuCacheMgr::instance_; namespace { constexpr int64_t G_BYTE = 1024 * 1024 * 1024; } -GpuCacheMgr::GpuCacheMgr() { +GpuCacheMgr::GpuCacheMgr(int64_t gpu_id) : gpu_id_(gpu_id) { // All config values have been checked in Config::ValidateConfig() server::Config& config = server::Config::GetInstance(); int64_t gpu_cache_cap; config.GetGpuResourceConfigCacheCapacity(gpu_cache_cap); int64_t cap = gpu_cache_cap * G_BYTE; - cache_ = std::make_shared>(cap, 1UL << 32); + std::string header = "[CACHE GPU" + std::to_string(gpu_id) + "]"; + cache_ = std::make_shared>(cap, 1UL << 32, header); float gpu_mem_threshold; config.GetGpuResourceConfigCacheThreshold(gpu_mem_threshold); @@ -51,20 +52,6 @@ GpuCacheMgr::~GpuCacheMgr() { config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_ENABLE, identity_); } -GpuCacheMgr* -GpuCacheMgr::GetInstance(uint64_t gpu_id) { - if (instance_.find(gpu_id) == instance_.end()) { - std::lock_guard lock(mutex_); - if (instance_.find(gpu_id) == instance_.end()) { - instance_.insert(std::pair(gpu_id, std::make_shared())); - } - return instance_[gpu_id].get(); - } else { - std::lock_guard lock(mutex_); - return instance_[gpu_id].get(); - } -} - DataObjPtr GpuCacheMgr::GetIndex(const std::string& key) { DataObjPtr obj = GetItem(key); @@ -78,10 +65,33 @@ GpuCacheMgr::InsertItem(const std::string& key, const milvus::cache::DataObjPtr& } } +GpuCacheMgrPtr +GpuCacheMgr::GetInstance(int64_t gpu_id) { + if (instance_.find(gpu_id) == instance_.end()) { + std::lock_guard lock(global_mutex_); + if (instance_.find(gpu_id) == instance_.end()) { + instance_[gpu_id] = std::make_pair(std::make_shared(gpu_id), std::make_shared()); + } + } + return instance_[gpu_id].first; +} + +MutexPtr +GpuCacheMgr::GetInstanceMutex(int64_t gpu_id) { + if (instance_.find(gpu_id) == instance_.end()) { + std::lock_guard lock(global_mutex_); + if (instance_.find(gpu_id) == instance_.end()) { + instance_[gpu_id] = std::make_pair(std::make_shared(gpu_id), std::make_shared()); + } + } + return instance_[gpu_id].second; +} + void GpuCacheMgr::OnGpuCacheCapacityChanged(int64_t capacity) { for (auto& iter : instance_) { - iter.second->SetCapacity(capacity * G_BYTE); + std::lock_guard lock(*(iter.second.second)); + iter.second.first->SetCapacity(capacity * G_BYTE); } } diff --git a/core/src/cache/GpuCacheMgr.h b/core/src/cache/GpuCacheMgr.h index 6afad88438..d96d6f3c89 100644 --- a/core/src/cache/GpuCacheMgr.h +++ b/core/src/cache/GpuCacheMgr.h @@ -10,8 +10,10 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include +#include #include #include +#include #include "cache/CacheMgr.h" #include "cache/DataObj.h" @@ -23,31 +25,36 @@ namespace cache { #ifdef MILVUS_GPU_VERSION class GpuCacheMgr; using GpuCacheMgrPtr = std::shared_ptr; +using MutexPtr = std::shared_ptr; class GpuCacheMgr : public CacheMgr, public server::GpuResourceConfigHandler { public: - GpuCacheMgr(); + explicit GpuCacheMgr(int64_t gpu_id); ~GpuCacheMgr(); - static GpuCacheMgr* - GetInstance(uint64_t gpu_id); - DataObjPtr GetIndex(const std::string& key); void InsertItem(const std::string& key, const DataObjPtr& data); + static GpuCacheMgrPtr + GetInstance(int64_t gpu_id); + + static MutexPtr + GetInstanceMutex(int64_t gpu_id); + protected: void OnGpuCacheCapacityChanged(int64_t capacity) override; private: bool gpu_enable_ = true; + int64_t gpu_id_; + static std::mutex global_mutex_; + static std::unordered_map> instance_; std::string identity_; - static std::mutex mutex_; - static std::unordered_map instance_; }; #endif diff --git a/core/src/config/Config.h b/core/src/config/Config.h index 1bd99a6b9d..4c4ba82479 100644 --- a/core/src/config/Config.h +++ b/core/src/config/Config.h @@ -86,7 +86,7 @@ static const char* CONFIG_CACHE = "cache_config"; static const char* CONFIG_CACHE_CPU_CACHE_CAPACITY = "cpu_cache_capacity"; static const char* CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT = "4"; static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD = "cpu_cache_threshold"; -static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT = "0.85"; +static const char* CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT = "0.7"; static const char* CONFIG_CACHE_INSERT_BUFFER_SIZE = "insert_buffer_size"; static const char* CONFIG_CACHE_INSERT_BUFFER_SIZE_DEFAULT = "1"; static const char* CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data"; @@ -123,7 +123,7 @@ static const char* CONFIG_GPU_RESOURCE_ENABLE_DEFAULT = "false"; static const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY = "cache_capacity"; static const char* CONFIG_GPU_RESOURCE_CACHE_CAPACITY_DEFAULT = "1"; static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD = "cache_threshold"; -static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD_DEFAULT = "0.85"; +static const char* CONFIG_GPU_RESOURCE_CACHE_THRESHOLD_DEFAULT = "0.7"; static const char* CONFIG_GPU_RESOURCE_DELIMITER = ","; static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES = "search_resources"; static const char* CONFIG_GPU_RESOURCE_SEARCH_RESOURCES_DEFAULT = "gpu0"; diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index 0a77aa18a5..dbbee791b6 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -544,9 +544,15 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) { } try { + /* Index data is copied to GPU first, then added into GPU cache. + * We MUST add a lock here to avoid more than one INDEX are copied to one GPU card at same time, + * which will potentially cause GPU out of memory. + */ + std::lock_guard lock(*(cache::GpuCacheMgr::GetInstanceMutex(device_id))); + ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " start"; index_ = knowhere::cloner::CopyCpuToGpu(index_, device_id, knowhere::Config()); - ENGINE_LOG_DEBUG << "CPU to GPU" << device_id; GpuCache(device_id); + ENGINE_LOG_DEBUG << "CPU to GPU" << device_id << " finished"; } catch (std::exception& e) { ENGINE_LOG_ERROR << e.what(); return Status(DB_ERROR, e.what()); diff --git a/core/src/scheduler/task/SearchTask.cpp b/core/src/scheduler/task/SearchTask.cpp index 50d1990ba6..9394cc55cd 100644 --- a/core/src/scheduler/task/SearchTask.cpp +++ b/core/src/scheduler/task/SearchTask.cpp @@ -148,7 +148,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { hybrid = true; } stat = index_engine_->CopyToGpu(device_id, hybrid); - type_str = "CPU2GPU:" + std::to_string(device_id); + type_str = "CPU2GPU" + std::to_string(device_id); } else if (type == LoadType::GPU2CPU) { stat = index_engine_->CopyToCpu(); type_str = "GPU2CPU";