fix memory leak (#4767)

* fix memory leak
(1) Minor memory leak when querying on BinaryFlat
(2) Minor memory leak when building NSG index

Signed-off-by: shengjun.li <shengjun.li@zilliz.com>

* fix mismatch new[]/delete[]

Signed-off-by: shengjun.li <shengjun.li@zilliz.com>

* rm useless folder

Signed-off-by: shengjun.li <shengjun.li@zilliz.com>

* fix memory leak in UT

Signed-off-by: shengjun.li <shengjun.li@zilliz.com>
pull/4774/head v1.0.0
shengjun.li 2021-03-03 16:19:29 +08:00 committed by GitHub
parent c29d1d9aa3
commit 21ea92ec7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 8 additions and 522 deletions

View File

@ -7,6 +7,8 @@ Please mark all change in change log and use the issue from GitHub
- \#4739 Fix mishards probe test problem
- \#4749 Fix minor memory leak when building IVF_SQ8 on GPU
- \#4757 Fix minor memory leak when querying by IVF_SQ8H
- \#4765 Fix minor memory leak when building NSG
- \#4766 Fix minor memory leak when querying by BinaryFlat
## Feature
- \#3977 Support logging to stdout

View File

@ -38,7 +38,7 @@ DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::I
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
scaling_bloom_t* bloom_filter =
new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr);
fiu_do_on("bloom_filter_nullptr", (free_scaling_bloom(bloom_filter) || (bloom_filter = nullptr)));
if (bloom_filter == nullptr) {
std::string err_msg =
"Failed to read bloom filter from file: " + bloom_filter_file_path + ". " + std::strerror(errno);

View File

@ -17,7 +17,6 @@
#include <vector>
#include "query/BinaryQuery.h"
#include "search/Task.h"
namespace milvus {
namespace search {

View File

@ -55,8 +55,6 @@
#include "utils/ValidationUtil.h"
#include "wal/WalDefinations.h"
#include "search/TaskInst.h"
namespace milvus {
namespace engine {

View File

@ -126,6 +126,8 @@ NsgIndex::InitNavigationPoint() {
//
// float r1 = distance_->Compare(center, ori_data_ + navigation_point * dimension, dimension);
// assert(r1 == resset[0].distance);
delete[] center;
}
// Specify Link

View File

@ -373,6 +373,7 @@ void binary_distance_knn_hc (
memcpy(ha->val, value, thread_heap_size * sizeof(T));
memcpy(ha->ids, labels, thread_heap_size * sizeof(int64_t));
delete[] hc;
delete[] value;
delete[] labels;

View File

@ -1,223 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#if 0
#pragma once
#include <algorithm>
#include <iostream>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/Utils.h"
#include "db/engine/EngineFactory.h"
#include "search/Task.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace search {
Task::Task(const std::shared_ptr<server::Context>& context, SegmentSchemaPtr& file,
milvus::query::GeneralQueryPtr general_query, std::unordered_map<std::string, engine::DataType>& attr_type,
context::HybridSearchContextPtr hybrid_search_context)
: context_(context),
file_(file),
general_query_(general_query),
attr_type_(attr_type),
hybrid_search_context_(hybrid_search_context) {
if (file_) {
// distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ...
// similarity -- infinity value means two vectors equal, descending reduce, IP
if (file_->metric_type_ == static_cast<int>(engine::MetricType::IP) &&
file_->engine_type_ != static_cast<int>(engine::EngineType::FAISS_PQ)) {
ascending_reduce = false;
}
engine::EngineType engine_type;
if (file->file_type_ == engine::meta::SegmentSchema::FILE_TYPE::RAW ||
file->file_type_ == engine::meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
file->file_type_ == engine::meta::SegmentSchema::FILE_TYPE::BACKUP) {
engine_type = engine::utils::IsBinaryMetricType(file->metric_type_) ? engine::EngineType::FAISS_BIN_IDMAP
: engine::EngineType::FAISS_IDMAP;
} else {
engine_type = (engine::EngineType)file->engine_type_;
}
milvus::json json_params;
if (!file_->index_params_.empty()) {
json_params = milvus::json::parse(file_->index_params_);
}
index_engine_ = engine::EngineFactory::Build(file_->dimension_, file_->location_, engine_type,
(engine::MetricType)file_->metric_type_, json_params);
}
}
void
Task::Load() {
auto load_ctx = context_->Follower("XSearchTask::Load " + std::to_string(file_->id_));
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
try {
stat = index_engine_->Load();
type_str = "IDSK2CPU";
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load index file: " + std::string(ex.what());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (!stat.ok()) {
return;
}
}
void
Task::Execute() {
auto execute_ctx = context_->Follower("XSearchTask::Execute " + std::to_string(index_id_));
if (index_engine_ == nullptr) {
return;
}
TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
std::vector<int64_t> output_ids;
std::vector<float> output_distance;
// step 1: allocate memory
try {
// step 2: search
Status s;
if (general_query_ != nullptr) {
faiss::ConcurrentBitsetPtr bitset;
uint64_t nq, topk;
s = index_engine_->ExecBinaryQuery(general_query_, bitset, attr_type_, nq, topk, output_distance,
output_ids);
if (!s.ok()) {
return;
}
auto spec_k = file_->row_count_ < topk ? file_->row_count_ : topk;
if (spec_k == 0) {
ENGINE_LOG_WARNING << "Searching in an empty file. file location = " << file_->location_;
}
{
if (result_ids_.size() > spec_k) {
if (result_ids_.front() == -1) {
result_ids_.resize(spec_k * nq);
result_distances_.resize(spec_k * nq);
}
}
Task::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, ascending_reduce, result_ids_,
result_distances_);
}
index_engine_ = nullptr;
execute_ctx->GetTraceContext()->GetSpan()->Finish();
return;
}
if (!s.ok()) {
return;
}
} catch (std::exception& ex) {
ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
// search_job->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
}
rc.ElapseFromBegin("totally cost");
// release index in resource
index_engine_ = nullptr;
execute_ctx->GetTraceContext()->GetSpan()->Finish();
}
void
Task::MergeTopkToResultSet(const milvus::search::ResultIds& src_ids,
const milvus::search::ResultDistances& src_distances, size_t src_k, size_t nq, size_t topk,
bool ascending, milvus::search::ResultIds& tar_ids,
milvus::search::ResultDistances& tar_distances) {
if (src_ids.empty()) {
return;
}
size_t tar_k = tar_ids.size() / nq;
size_t buf_k = std::min(topk, src_k + tar_k);
ResultIds buf_ids(nq * buf_k, -1);
ResultDistances buf_distances(nq * buf_k, 0.0);
for (uint64_t i = 0; i < nq; i++) {
size_t buf_k_j = 0, src_k_j = 0, tar_k_j = 0;
size_t buf_idx, src_idx, tar_idx;
size_t buf_k_multi_i = buf_k * i;
size_t src_k_multi_i = topk * i;
size_t tar_k_multi_i = tar_k * i;
while (buf_k_j < buf_k && src_k_j < src_k && tar_k_j < tar_k) {
src_idx = src_k_multi_i + src_k_j;
tar_idx = tar_k_multi_i + tar_k_j;
buf_idx = buf_k_multi_i + buf_k_j;
if ((tar_ids[tar_idx] == -1) || // initialized value
(ascending && src_distances[src_idx] < tar_distances[tar_idx]) ||
(!ascending && src_distances[src_idx] > tar_distances[tar_idx])) {
buf_ids[buf_idx] = src_ids[src_idx];
buf_distances[buf_idx] = src_distances[src_idx];
src_k_j++;
} else {
buf_ids[buf_idx] = tar_ids[tar_idx];
buf_distances[buf_idx] = tar_distances[tar_idx];
tar_k_j++;
}
buf_k_j++;
}
if (buf_k_j < buf_k) {
if (src_k_j < src_k) {
while (buf_k_j < buf_k && src_k_j < src_k) {
buf_idx = buf_k_multi_i + buf_k_j;
src_idx = src_k_multi_i + src_k_j;
buf_ids[buf_idx] = src_ids[src_idx];
buf_distances[buf_idx] = src_distances[src_idx];
src_k_j++;
buf_k_j++;
}
} else {
while (buf_k_j < buf_k && tar_k_j < tar_k) {
buf_idx = buf_k_multi_i + buf_k_j;
tar_idx = tar_k_multi_i + tar_k_j;
buf_ids[buf_idx] = tar_ids[tar_idx];
buf_distances[buf_idx] = tar_distances[tar_idx];
tar_k_j++;
buf_k_j++;
}
}
}
}
tar_ids.swap(buf_ids);
tar_distances.swap(buf_distances);
}
} // namespace search
} // namespace milvus
#endif

View File

@ -1,93 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#if 0
#pragma once
#include <iostream>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "context/HybridSearchContext.h"
#include "db/Types.h"
#include "db/engine/ExecutionEngine.h"
#include "db/meta/MetaTypes.h"
#include "server/context/Context.h"
#include "utils/Status.h"
namespace milvus {
namespace context {
struct HybridSearchContext;
using HybridSearchContextPtr = std::shared_ptr<HybridSearchContext>;
} // namespace context
namespace search {
using SegmentSchemaPtr = engine::meta::SegmentSchemaPtr;
using Id2IndexMap = std::unordered_map<size_t, SegmentSchemaPtr>;
using ResultIds = engine::ResultIds;
using ResultDistances = engine::ResultDistances;
class Task {
public:
explicit Task(const std::shared_ptr<server::Context>& context, SegmentSchemaPtr& file,
query::GeneralQueryPtr general_query, std::unordered_map<std::string, engine::DataType>& attr_type,
context::HybridSearchContextPtr hybrid_search_context);
void
Load();
void
Execute();
public:
static void
MergeTopkToResultSet(const ResultIds& src_ids, const ResultDistances& src_distances, size_t src_k, size_t nq,
size_t topk, bool ascending, ResultIds& tar_ids, ResultDistances& tar_distances);
const std::string&
GetLocation() const;
size_t
GetIndexId() const;
public:
const std::shared_ptr<server::Context> context_;
SegmentSchemaPtr file_;
size_t index_id_ = 0;
int index_type_ = 0;
engine::ExecutionEnginePtr index_engine_ = nullptr;
// distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ...
// similarity -- infinity value means two vectors equal, descending reduce, IP
bool ascending_reduce = true;
query::GeneralQueryPtr general_query_;
std::unordered_map<std::string, engine::DataType> attr_type_;
context::HybridSearchContextPtr hybrid_search_context_;
ResultIds result_ids_;
ResultDistances result_distances_;
};
using TaskPtr = std::shared_ptr<Task>;
} // namespace search
} // namespace milvus
#endif

View File

@ -1,116 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#if 0
#pragma once
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include "search/TaskInst.h"
namespace milvus {
namespace search {
void
TaskInst::Start() {
running_ = true;
load_thread_ = std::make_shared<std::thread>(&TaskInst::StartLoadTask, this);
exec_thread_ = std::make_shared<std::thread>(&TaskInst::StartExecuteTask, this);
}
void
TaskInst::Stop() {
running_ = false;
StopExecuteTask();
StopLoadTask();
}
std::queue<TaskPtr>&
TaskInst::load_queue() {
return load_queue_;
}
std::queue<TaskPtr>&
TaskInst::exec_queue() {
return exec_queue_;
}
std::condition_variable&
TaskInst::load_cv() {
return load_cv_;
}
std::condition_variable&
TaskInst::exec_cv() {
return exec_cv_;
}
void
TaskInst::StartLoadTask() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [this] { return !load_queue_.empty(); });
while (!load_queue_.empty()) {
auto task = load_queue_.front();
task->Load();
load_queue_.pop();
exec_queue_.push(task);
exec_cv_.notify_one();
}
}
}
void
TaskInst::StartExecuteTask() {
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [this] { return !exec_queue_.empty(); });
while (!exec_queue_.empty()) {
auto task = exec_queue_.front();
task->Execute();
exec_queue_.pop();
}
}
}
void
TaskInst::StopLoadTask() {
{
std::lock_guard<std::mutex> lock(load_mutex_);
load_queue_.push(nullptr);
load_cv_.notify_one();
if (load_thread_->joinable()) {
load_thread_->join();
}
load_thread_ = nullptr;
}
}
void
TaskInst::StopExecuteTask() {
{
std::lock_guard<std::mutex> lock(exec_mutex_);
exec_queue_.push(nullptr);
exec_cv_.notify_one();
if (exec_thread_->joinable()) {
exec_thread_->join();
}
exec_thread_ = nullptr;
}
}
} // namespace search
} // namespace milvus
#endif

View File

@ -1,82 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#if 0
#pragma once
#include <condition_variable>
#include <iostream>
#include <memory>
#include <queue>
#include <string>
#include <thread>
#include <vector>
#include "Task.h"
namespace milvus {
namespace search {
class TaskInst {
public:
static TaskInst&
GetInstance() {
static TaskInst instance;
return instance;
}
void
Start();
void
Stop();
std::queue<TaskPtr>&
load_queue();
std::queue<TaskPtr>&
exec_queue();
std::condition_variable&
load_cv();
std::condition_variable&
exec_cv();
private:
TaskInst() = default;
~TaskInst() = default;
void
StartLoadTask();
void
StartExecuteTask();
void
StopLoadTask();
void
StopExecuteTask();
private:
bool running_;
std::shared_ptr<std::thread> load_thread_;
std::shared_ptr<std::thread> exec_thread_;
std::queue<TaskPtr> load_queue_;
std::queue<TaskPtr> exec_queue_;
std::condition_variable load_cv_;
std::condition_variable exec_cv_;
std::mutex exec_mutex_;
std::mutex load_mutex_;
};
} // namespace search
} // namespace milvus
#endif

View File

@ -41,8 +41,6 @@
#include "utils/SignalUtil.h"
#include "utils/TimeRecorder.h"
#include "search/TaskInst.h"
namespace milvus {
namespace server {

View File

@ -61,7 +61,7 @@ Status::operator=(Status&& s) {
void
Status::CopyFrom(const Status& s) {
delete state_;
delete[] state_;
state_ = nullptr;
if (s.state_ == nullptr) {
return;
@ -76,7 +76,7 @@ Status::CopyFrom(const Status& s) {
void
Status::MoveFrom(Status& s) {
delete state_;
delete[] state_;
state_ = s.state_;
s.state_ = nullptr;
}