mirror of https://github.com/milvus-io/milvus.git
Config file validation
Former-commit-id: 1fc9b78d356da8ee05df4e9aa9189604daaf1670pull/191/head
commit
72f3bb6414
|
@ -30,6 +30,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-493 - Knowhere unittest crash
|
||||
- MS-453 - GPU search error when nprobe set more than 1024
|
||||
- MS-510 - unittest out of memory and crashed
|
||||
- MS-119 - The problem of combining the log files
|
||||
|
||||
## Improvement
|
||||
- MS-327 - Clean code for milvus
|
||||
|
@ -107,6 +108,9 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-525 - Disable parallel reduce in SearchTask
|
||||
- MS-527 - Update scheduler_test and enable it
|
||||
- MS-528 - Hide some config used future
|
||||
- MS-530 - Add unittest for SearchTask->Load
|
||||
- MS-531 - Disable next version code
|
||||
- MS-533 - Update resource_test to cover dump function
|
||||
- MS-523 - Config file validation
|
||||
|
||||
## New Feature
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
#include <faiss/gpu/StandardGpuResources.h>
|
||||
|
||||
#include "ivf.h"
|
||||
#include "src/utils/BlockingQueue.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
|
@ -16,12 +17,15 @@ struct Resource {
|
|||
|
||||
std::shared_ptr<faiss::gpu::StandardGpuResources> faiss_res;
|
||||
int64_t id;
|
||||
std::mutex mutex;
|
||||
};
|
||||
using ResPtr = std::shared_ptr<Resource>;
|
||||
using ResWPtr = std::weak_ptr<Resource>;
|
||||
|
||||
class FaissGpuResourceMgr {
|
||||
public:
|
||||
using ResBQ = zilliz::milvus::server::BlockingQueue<ResPtr>;
|
||||
|
||||
struct DeviceParams {
|
||||
int64_t temp_mem_size = 0;
|
||||
int64_t pinned_mem_size = 0;
|
||||
|
@ -55,11 +59,8 @@ class FaissGpuResourceMgr {
|
|||
|
||||
// allocate gpu memory before search
|
||||
// this func will return True if the device is idle and exists an idle resource.
|
||||
bool
|
||||
GetRes(const int64_t& device_id, ResPtr &res, const int64_t& alloc_size = 0);
|
||||
|
||||
void
|
||||
MoveToInuse(const int64_t &device_id, const ResPtr& res);
|
||||
//bool
|
||||
//GetRes(const int64_t& device_id, ResPtr &res, const int64_t& alloc_size = 0);
|
||||
|
||||
void
|
||||
MoveToIdle(const int64_t &device_id, const ResPtr& res);
|
||||
|
@ -67,33 +68,34 @@ class FaissGpuResourceMgr {
|
|||
void
|
||||
Dump();
|
||||
|
||||
protected:
|
||||
void
|
||||
RemoveResource(const int64_t& device_id, const ResPtr& res, std::map<int64_t, std::vector<ResPtr>>& resource_pool);
|
||||
|
||||
protected:
|
||||
bool is_init = false;
|
||||
|
||||
std::mutex mutex_;
|
||||
std::map<int64_t, DeviceParams> devices_params_;
|
||||
std::map<int64_t, std::vector<ResPtr>> in_use_;
|
||||
std::map<int64_t, std::vector<ResPtr>> idle_;
|
||||
std::map<int64_t, ResBQ> idle_map;
|
||||
};
|
||||
|
||||
class ResScope {
|
||||
public:
|
||||
ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id) {
|
||||
FaissGpuResourceMgr::GetInstance().MoveToInuse(device_id, resource);
|
||||
ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id), move(true) {
|
||||
res->mutex.lock();
|
||||
}
|
||||
|
||||
ResScope(ResPtr &res) : resource(res), device_id(-1), move(false) {
|
||||
res->mutex.lock();
|
||||
}
|
||||
|
||||
~ResScope() {
|
||||
//resource->faiss_res->noTempMemory();
|
||||
FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
|
||||
if (move) {
|
||||
FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource);
|
||||
}
|
||||
resource->mutex.unlock();
|
||||
}
|
||||
|
||||
private:
|
||||
ResPtr resource;
|
||||
int64_t device_id;
|
||||
bool move = true;
|
||||
};
|
||||
|
||||
class GPUIndex {
|
||||
|
|
|
@ -130,19 +130,17 @@ void GPUIVF::search_impl(int64_t n,
|
|||
float *distances,
|
||||
int64_t *labels,
|
||||
const Config &cfg) {
|
||||
// TODO(linxj): allocate mem
|
||||
auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_);
|
||||
if (temp_res) {
|
||||
ResScope rs(gpu_id_, temp_res);
|
||||
if (auto device_index = std::static_pointer_cast<faiss::gpu::GpuIndexIVF>(index_)) {
|
||||
auto nprobe = cfg.get_with_default("nprobe", size_t(1));
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
device_index->setNumProbes(nprobe);
|
||||
if (auto device_index = std::static_pointer_cast<faiss::gpu::GpuIndexIVF>(index_)) {
|
||||
auto nprobe = cfg.get_with_default("nprobe", size_t(1));
|
||||
device_index->setNumProbes(nprobe);
|
||||
|
||||
{
|
||||
// TODO(linxj): allocate mem
|
||||
ResScope rs(res_);
|
||||
device_index->search(n, (float *) data, k, distances, labels);
|
||||
}
|
||||
} else {
|
||||
KNOWHERE_THROW_MSG("search can't get gpu resource");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -283,119 +281,70 @@ void FaissGpuResourceMgr::InitResource() {
|
|||
is_init = true;
|
||||
|
||||
for(auto& device : devices_params_) {
|
||||
auto& resource_vec = idle_[device.first];
|
||||
auto& device_id = device.first;
|
||||
auto& device_param = device.second;
|
||||
auto& bq = idle_map[device_id];
|
||||
|
||||
for (int64_t i = 0; i < device.second.resource_num; ++i) {
|
||||
auto res = std::make_shared<faiss::gpu::StandardGpuResources>();
|
||||
for (int64_t i = 0; i < device_param.resource_num; ++i) {
|
||||
auto raw_resource = std::make_shared<faiss::gpu::StandardGpuResources>();
|
||||
|
||||
// TODO(linxj): enable set pinned memory
|
||||
//res->noTempMemory();
|
||||
auto res_wrapper = std::make_shared<Resource>(res);
|
||||
AllocateTempMem(res_wrapper, device.first, 0);
|
||||
auto res_wrapper = std::make_shared<Resource>(raw_resource);
|
||||
AllocateTempMem(res_wrapper, device_id, 0);
|
||||
|
||||
resource_vec.emplace_back(res_wrapper);
|
||||
bq.Put(res_wrapper);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
|
||||
const int64_t &alloc_size) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
InitResource();
|
||||
|
||||
if (!is_init) {
|
||||
InitResource();
|
||||
is_init = true;
|
||||
}
|
||||
|
||||
auto search = idle_.find(device_id);
|
||||
if (search != idle_.end()) {
|
||||
auto res = search->second.back();
|
||||
//AllocateTempMem(res, device_id, alloc_size);
|
||||
|
||||
search->second.pop_back();
|
||||
return res;
|
||||
auto finder = idle_map.find(device_id);
|
||||
if (finder != idle_map.end()) {
|
||||
auto& bq = finder->second;
|
||||
auto&& resource = bq.Take();
|
||||
AllocateTempMem(resource, device_id, alloc_size);
|
||||
return resource;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool FaissGpuResourceMgr::GetRes(const int64_t &device_id,
|
||||
ResPtr &res,
|
||||
const int64_t &alloc_size) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
|
||||
if (!is_init) {
|
||||
InitResource();
|
||||
is_init = true;
|
||||
}
|
||||
|
||||
auto search = idle_.find(device_id);
|
||||
if (search != idle_.end()) {
|
||||
auto &res_vec = search->second;
|
||||
for (auto it = res_vec.cbegin(); it != res_vec.cend(); ++it) {
|
||||
if ((*it)->id == res->id) {
|
||||
//AllocateTempMem(res, device_id, alloc_size);
|
||||
res_vec.erase(it);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// else
|
||||
return false;
|
||||
}
|
||||
|
||||
void FaissGpuResourceMgr::MoveToInuse(const int64_t &device_id, const ResPtr &res) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
RemoveResource(device_id, res, idle_);
|
||||
in_use_[device_id].push_back(res);
|
||||
}
|
||||
//bool FaissGpuResourceMgr::GetRes(const int64_t &device_id,
|
||||
// ResPtr &res,
|
||||
// const int64_t &alloc_size) {
|
||||
// InitResource();
|
||||
//
|
||||
// std::lock_guard<std::mutex> lk(res->mutex);
|
||||
// AllocateTempMem(res, device_id, alloc_size);
|
||||
// return true;
|
||||
//}
|
||||
|
||||
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
RemoveResource(device_id, res, in_use_);
|
||||
auto it = idle_[device_id].begin();
|
||||
idle_[device_id].insert(it, res);
|
||||
}
|
||||
|
||||
void
|
||||
FaissGpuResourceMgr::RemoveResource(const int64_t &device_id,
|
||||
const ResPtr &res,
|
||||
std::map<int64_t, std::vector<ResPtr>> &resource_pool) {
|
||||
if (resource_pool.find(device_id) != resource_pool.end()) {
|
||||
std::vector<ResPtr> &res_array = resource_pool[device_id];
|
||||
res_array.erase(std::remove_if(res_array.begin(), res_array.end(),
|
||||
[&](ResPtr &ptr) { return ptr->id == res->id; }),
|
||||
res_array.end());
|
||||
auto finder = idle_map.find(device_id);
|
||||
if (finder != idle_map.end()) {
|
||||
auto& bq = finder->second;
|
||||
bq.Put(res);
|
||||
}
|
||||
}
|
||||
|
||||
void FaissGpuResourceMgr::Free() {
|
||||
for (auto &item : in_use_) {
|
||||
auto& res_vec = item.second;
|
||||
res_vec.clear();
|
||||
}
|
||||
for (auto &item : idle_) {
|
||||
auto& res_vec = item.second;
|
||||
res_vec.clear();
|
||||
for (auto &item : idle_map) {
|
||||
auto& bq = item.second;
|
||||
while (!bq.Empty()) {
|
||||
bq.Take();
|
||||
}
|
||||
}
|
||||
is_init = false;
|
||||
}
|
||||
|
||||
void
|
||||
FaissGpuResourceMgr::Dump() {
|
||||
std::cout << "In used resource" << std::endl;
|
||||
for(auto& item: in_use_) {
|
||||
std::cout << "device_id: " << item.first << std::endl;
|
||||
for(auto& elem : item.second) {
|
||||
std::cout << "resource_id: " << elem->id << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "Idle resource" << std::endl;
|
||||
for(auto& item: idle_) {
|
||||
std::cout << "device_id: " << item.first << std::endl;
|
||||
for(auto& elem : item.second) {
|
||||
std::cout << "resource_id: " << elem->id << std::endl;
|
||||
}
|
||||
for (auto &item : idle_map) {
|
||||
auto& bq = item.second;
|
||||
std::cout << "device_id: " << item.first
|
||||
<< ", resource count:" << bq.Size();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ class IVFTest
|
|||
void SetUp() override {
|
||||
std::tie(index_type, preprocess_cfg, train_cfg, add_cfg, search_cfg) = GetParam();
|
||||
//Init_with_default();
|
||||
Generate(128, 1000000/10, 10);
|
||||
Generate(128, 1000000/100, 10);
|
||||
index_ = IndexFactory(index_type);
|
||||
FaissGpuResourceMgr::GetInstance().InitDevice(device_id, 1024*1024*200, 1024*1024*600, 2);
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
|
|||
Config::object{{"k", 10}}),
|
||||
std::make_tuple("GPUIVF",
|
||||
Config(),
|
||||
Config::object{{"nlist", 1638}, {"gpu_id", device_id}, {"metric_type", "L2"}},
|
||||
Config::object{{"nlist", 100}, {"gpu_id", device_id}, {"metric_type", "L2"}},
|
||||
Config(),
|
||||
Config::object{{"k", 10}}),
|
||||
std::make_tuple("GPUIVFPQ",
|
||||
|
@ -99,7 +99,7 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
|
|||
Config::object{{"k", 10}}),
|
||||
std::make_tuple("GPUIVFSQ",
|
||||
Config(),
|
||||
Config::object{{"gpu_id", device_id}, {"nlist", 1638}, {"nbits", 8}, {"metric_type", "L2"}},
|
||||
Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"nbits", 8}, {"metric_type", "L2"}},
|
||||
Config(),
|
||||
Config::object{{"k", 10}})
|
||||
)
|
||||
|
@ -386,8 +386,8 @@ class GPURESTEST
|
|||
int64_t elems = 0;
|
||||
};
|
||||
|
||||
const int search_count = 100;
|
||||
const int load_count = 30;
|
||||
const int search_count = 18;
|
||||
const int load_count = 3;
|
||||
|
||||
TEST_F(GPURESTEST, gpu_ivf_resource_test) {
|
||||
assert(!xb.empty());
|
||||
|
|
|
@ -25,8 +25,8 @@ class NSGInterfaceTest : public DataGen, public TestWithParam<::std::tuple<Confi
|
|||
protected:
|
||||
void SetUp() override {
|
||||
//Init_with_default();
|
||||
FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID, 1024*1024*200, 1024*1024*300, 2);
|
||||
Generate(256, 1000000, 1);
|
||||
FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID, 1024*1024*200, 1024*1024*600, 2);
|
||||
Generate(256, 10000, 1);
|
||||
index_ = std::make_shared<NSG>();
|
||||
std::tie(train_cfg, search_cfg) = GetParam();
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ class NSGInterfaceTest : public DataGen, public TestWithParam<::std::tuple<Confi
|
|||
INSTANTIATE_TEST_CASE_P(NSGparameters, NSGInterfaceTest,
|
||||
Values(std::make_tuple(
|
||||
// search length > out_degree
|
||||
Config::object{{"nlist", 16384}, {"nprobe", 50}, {"knng", 100}, {"metric_type", "L2"},
|
||||
Config::object{{"nlist", 128}, {"nprobe", 50}, {"knng", 100}, {"metric_type", "L2"},
|
||||
{"search_length", 60}, {"out_degree", 70}, {"candidate_pool_size", 500}},
|
||||
Config::object{{"k", 20}, {"search_length", 30}}))
|
||||
);
|
||||
|
|
|
@ -31,8 +31,6 @@ enum class MetricType {
|
|||
class ExecutionEngine {
|
||||
public:
|
||||
|
||||
virtual Status AddWithIdArray(const std::vector<float>& vectors, const std::vector<long>& vector_ids);
|
||||
|
||||
virtual Status AddWithIds(long n, const float *xdata, const long *xids) = 0;
|
||||
|
||||
virtual size_t Count() const = 0;
|
||||
|
|
|
@ -138,10 +138,8 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
|
|||
} else {
|
||||
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
|
||||
}
|
||||
} catch (knowhere::KnowhereException &e) {
|
||||
ENGINE_LOG_ERROR << e.what();
|
||||
return Status(DB_ERROR, e.what());
|
||||
} catch (std::exception &e) {
|
||||
ENGINE_LOG_ERROR << e.what();
|
||||
return Status(DB_ERROR, e.what());
|
||||
}
|
||||
}
|
||||
|
@ -166,10 +164,8 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
|
|||
try {
|
||||
index_ = index_->CopyToGpu(device_id);
|
||||
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
|
||||
} catch (knowhere::KnowhereException &e) {
|
||||
ENGINE_LOG_ERROR << e.what();
|
||||
return Status(DB_ERROR, e.what());
|
||||
} catch (std::exception &e) {
|
||||
ENGINE_LOG_ERROR << e.what();
|
||||
return Status(DB_ERROR, e.what());
|
||||
}
|
||||
}
|
||||
|
@ -195,10 +191,8 @@ Status ExecutionEngineImpl::CopyToCpu() {
|
|||
try {
|
||||
index_ = index_->CopyToCpu();
|
||||
ENGINE_LOG_DEBUG << "GPU to CPU";
|
||||
} catch (knowhere::KnowhereException &e) {
|
||||
ENGINE_LOG_ERROR << e.what();
|
||||
return Status(DB_ERROR, e.what());
|
||||
} catch (std::exception &e) {
|
||||
ENGINE_LOG_ERROR << e.what();
|
||||
return Status(DB_ERROR, e.what());
|
||||
}
|
||||
}
|
||||
|
@ -233,10 +227,8 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
|
|||
double physical_size = server::CommonUtil::GetFileSize(location);
|
||||
server::CollectExecutionEngineMetrics metrics(physical_size);
|
||||
to_merge = read_index(location);
|
||||
} catch (knowhere::KnowhereException &e) {
|
||||
ENGINE_LOG_ERROR << e.what();
|
||||
return Status(DB_ERROR, e.what());
|
||||
} catch (std::exception &e) {
|
||||
ENGINE_LOG_ERROR << e.what();
|
||||
return Status(DB_ERROR, e.what());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ Status MemTableFile::Add(const VectorSource::Ptr &source, IDNumbers& vector_ids)
|
|||
std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " +
|
||||
std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_;
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
return Status(DB_ERROR, err_msg);
|
||||
return Status(DB_ERROR, "not able to create table file");
|
||||
}
|
||||
|
||||
size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
|
||||
|
|
|
@ -145,51 +145,52 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
|
|||
break;
|
||||
}
|
||||
case TaskLabelType::SPECIFIED_RESOURCE: {
|
||||
auto self = event->resource_.lock();
|
||||
auto task = load_completed_event->task_table_item_->task;
|
||||
|
||||
// if this resource is disk, assign it to smallest cost resource
|
||||
if (self->type() == ResourceType::DISK) {
|
||||
// step 1: calculate shortest path per resource, from disk to compute resource
|
||||
auto compute_resources = res_mgr_.lock()->GetComputeResources();
|
||||
std::vector<std::vector<std::string>> paths;
|
||||
std::vector<uint64_t> transport_costs;
|
||||
for (auto &res : compute_resources) {
|
||||
std::vector<std::string> path;
|
||||
uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
|
||||
transport_costs.push_back(transport_cost);
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
|
||||
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t min_cost_idx = 0;
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->TotalTasks() == 0) {
|
||||
min_cost_idx = i;
|
||||
break;
|
||||
}
|
||||
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
|
||||
+ transport_costs[i];
|
||||
if (min_cost > cost) {
|
||||
min_cost = cost;
|
||||
min_cost_idx = i;
|
||||
}
|
||||
}
|
||||
|
||||
// step 3: set path in task
|
||||
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
task->path() = task_path;
|
||||
}
|
||||
|
||||
if (self->name() == task->path().Last()) {
|
||||
self->WakeupLoader();
|
||||
} else {
|
||||
auto next_res_name = task->path().Next();
|
||||
auto next_res = res_mgr_.lock()->GetResource(next_res_name);
|
||||
load_completed_event->task_table_item_->Move();
|
||||
next_res->task_table().Put(task);
|
||||
}
|
||||
// support next version
|
||||
// auto self = event->resource_.lock();
|
||||
// auto task = load_completed_event->task_table_item_->task;
|
||||
//
|
||||
// // if this resource is disk, assign it to smallest cost resource
|
||||
// if (self->type() == ResourceType::DISK) {
|
||||
// // step 1: calculate shortest path per resource, from disk to compute resource
|
||||
// auto compute_resources = res_mgr_.lock()->GetComputeResources();
|
||||
// std::vector<std::vector<std::string>> paths;
|
||||
// std::vector<uint64_t> transport_costs;
|
||||
// for (auto &res : compute_resources) {
|
||||
// std::vector<std::string> path;
|
||||
// uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
|
||||
// transport_costs.push_back(transport_cost);
|
||||
// paths.emplace_back(path);
|
||||
// }
|
||||
//
|
||||
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
// uint64_t min_cost_idx = 0;
|
||||
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
// if (compute_resources[i]->TotalTasks() == 0) {
|
||||
// min_cost_idx = i;
|
||||
// break;
|
||||
// }
|
||||
// uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
|
||||
// + transport_costs[i];
|
||||
// if (min_cost > cost) {
|
||||
// min_cost = cost;
|
||||
// min_cost_idx = i;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // step 3: set path in task
|
||||
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
// task->path() = task_path;
|
||||
// }
|
||||
//
|
||||
// if (self->name() == task->path().Last()) {
|
||||
// self->WakeupLoader();
|
||||
// } else {
|
||||
// auto next_res_name = task->path().Next();
|
||||
// auto next_res = res_mgr_.lock()->GetResource(next_res_name);
|
||||
// load_completed_event->task_table_item_->Move();
|
||||
// next_res->task_table().Put(task);
|
||||
// }
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
|
|
|
@ -57,8 +57,8 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
|
|||
total_speed += speed;
|
||||
}
|
||||
|
||||
std::random_device rd;
|
||||
std::mt19937 mt(rd());
|
||||
unsigned seed1 = std::chrono::system_clock::now().time_since_epoch().count();
|
||||
std::mt19937 mt(seed1);
|
||||
std::uniform_int_distribution<int> dist(0, total_speed);
|
||||
uint64_t index = 0;
|
||||
int64_t rd_speed = dist(mt);
|
||||
|
|
|
@ -96,33 +96,31 @@ XSearchTask::XSearchTask(TableFileSchemaPtr file)
|
|||
void
|
||||
XSearchTask::Load(LoadType type, uint8_t device_id) {
|
||||
server::TimeRecorder rc("");
|
||||
Status stat = Status::OK();
|
||||
std::string error_msg;
|
||||
|
||||
try {
|
||||
if (type == LoadType::DISK2CPU) {
|
||||
auto stat = index_engine_->Load();
|
||||
if(!stat.ok()) {
|
||||
//typical error: file not available
|
||||
ENGINE_LOG_ERROR << "Failed to load index file: file not available";
|
||||
|
||||
for(auto& context : search_contexts_) {
|
||||
context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
stat = index_engine_->Load();
|
||||
} else if (type == LoadType::CPU2GPU) {
|
||||
index_engine_->CopyToGpu(device_id);
|
||||
stat = index_engine_->CopyToGpu(device_id);
|
||||
} else if (type == LoadType::GPU2CPU) {
|
||||
index_engine_->CopyToCpu();
|
||||
stat = index_engine_->CopyToCpu();
|
||||
} else {
|
||||
// TODO: exception
|
||||
std::string msg = "Wrong load type";
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
error_msg = "Wrong load type";
|
||||
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
|
||||
}
|
||||
} catch (std::exception &ex) {
|
||||
//typical error: out of disk space or permition denied
|
||||
std::string msg = "Failed to load index file: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
error_msg = "Failed to load index file: " + std::string(ex.what());
|
||||
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
|
||||
}
|
||||
|
||||
if (!stat.ok()) {
|
||||
if (error_msg.empty())
|
||||
error_msg = std::string("Failed to load index file: file not available");
|
||||
//typical error: file not available
|
||||
ENGINE_LOG_ERROR << error_msg;
|
||||
|
||||
for (auto &context : search_contexts_) {
|
||||
context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed
|
||||
|
@ -241,7 +239,7 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
|
|||
// if (NeedParallelReduce(nq, topk)) {
|
||||
// ParallelReduce(reduce_worker, nq);
|
||||
// } else {
|
||||
reduce_worker(0, nq);
|
||||
reduce_worker(0, nq);
|
||||
// }
|
||||
|
||||
return Status::OK();
|
||||
|
@ -346,7 +344,7 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
|
|||
// if (NeedParallelReduce(result_src.size(), topk)) {
|
||||
// ParallelReduce(ReduceWorker, result_src.size());
|
||||
// } else {
|
||||
ReduceWorker(0, result_src.size());
|
||||
ReduceWorker(0, result_src.size());
|
||||
// }
|
||||
|
||||
return Status::OK();
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#pragma once
|
||||
|
||||
#include "Log.h"
|
||||
//#include "Log.h"
|
||||
#include "Error.h"
|
||||
|
||||
namespace zilliz {
|
||||
|
@ -17,7 +17,7 @@ BlockingQueue<T>::Put(const T &task) {
|
|||
std::string error_msg =
|
||||
"blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " +
|
||||
std::to_string(queue_.size());
|
||||
SERVER_LOG_ERROR << error_msg;
|
||||
//SERVER_LOG_ERROR << error_msg;
|
||||
throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,7 @@ BlockingQueue<T>::Take() {
|
|||
|
||||
if (queue_.empty()) {
|
||||
std::string error_msg = "blocking queue empty";
|
||||
SERVER_LOG_ERROR << error_msg;
|
||||
//SERVER_LOG_ERROR << error_msg;
|
||||
throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
|
||||
|
@ -57,7 +57,7 @@ BlockingQueue<T>::Front() {
|
|||
empty_.wait(lock, [this] { return !queue_.empty(); });
|
||||
if (queue_.empty()) {
|
||||
std::string error_msg = "blocking queue empty";
|
||||
SERVER_LOG_ERROR << error_msg;
|
||||
//SERVER_LOG_ERROR << error_msg;
|
||||
throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
T front(queue_.front());
|
||||
|
@ -72,7 +72,7 @@ BlockingQueue<T>::Back() {
|
|||
|
||||
if (queue_.empty()) {
|
||||
std::string error_msg = "blocking queue empty";
|
||||
SERVER_LOG_ERROR << error_msg;
|
||||
//SERVER_LOG_ERROR << error_msg;
|
||||
throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
|
||||
|
|
|
@ -9,11 +9,70 @@
|
|||
#include <easylogging++.h>
|
||||
#include <ctype.h>
|
||||
|
||||
#include <string>
|
||||
#include <libgen.h>
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace server {
|
||||
|
||||
int32_t InitLog(const std::string& log_config_file) {
|
||||
namespace {
|
||||
static int global_idx = 0;
|
||||
static int debug_idx = 0;
|
||||
static int warning_idx = 0;
|
||||
static int trace_idx = 0;
|
||||
static int error_idx = 0;
|
||||
static int fatal_idx = 0;
|
||||
}
|
||||
|
||||
// TODO(yzb) : change the easylogging library to get the log level from parameter rather than filename
|
||||
void rolloutHandler(const char *filename, std::size_t size) {
|
||||
char *dirc = strdup(filename);
|
||||
char *basec = strdup(filename);
|
||||
char *dir = dirname(dirc);
|
||||
char *base = basename(basec);
|
||||
|
||||
std::string s(base);
|
||||
std::stringstream ss;
|
||||
std::string
|
||||
list[] = {"\\", " ", "\'", "\"", "*", "\?", "{", "}", ";", "<", ">", "|", "^", "&", "$", "#", "!", "`", "~"};
|
||||
std::string::size_type position;
|
||||
for (auto substr : list) {
|
||||
position = 0;
|
||||
while ((position = s.find_first_of(substr, position)) != std::string::npos) {
|
||||
s.insert(position, "\\");
|
||||
position += 2;
|
||||
}
|
||||
}
|
||||
int ret;
|
||||
std::string m(std::string(dir) + "/" + s);
|
||||
s = m;
|
||||
if ((position = s.find("global")) != std::string::npos) {
|
||||
s.append("." + std::to_string(++global_idx));
|
||||
ret = rename(m.c_str(), s.c_str());
|
||||
} else if ((position = s.find("debug")) != std::string::npos) {
|
||||
s.append("." + std::to_string(++debug_idx));
|
||||
ret = rename(m.c_str(), s.c_str());
|
||||
} else if ((position = s.find("warning")) != std::string::npos) {
|
||||
s.append("." + std::to_string(++warning_idx));
|
||||
ret = rename(m.c_str(), s.c_str());
|
||||
} else if ((position = s.find("trace")) != std::string::npos) {
|
||||
s.append("." + std::to_string(++trace_idx));
|
||||
ret = rename(m.c_str(), s.c_str());
|
||||
} else if ((position = s.find("error")) != std::string::npos) {
|
||||
s.append("." + std::to_string(++error_idx));
|
||||
ret = rename(m.c_str(), s.c_str());
|
||||
} else if ((position = s.find("fatal")) != std::string::npos) {
|
||||
s.append("." + std::to_string(++fatal_idx));
|
||||
ret = rename(m.c_str(), s.c_str());
|
||||
} else {
|
||||
s.append("." + std::to_string(++global_idx));
|
||||
ret = rename(m.c_str(), s.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
int32_t InitLog(const std::string &log_config_file) {
|
||||
#if 0
|
||||
ServerConfig &config = ServerConfig::GetInstance();
|
||||
ConfigNode log_config = config.GetConfig(CONFIG_LOG);
|
||||
|
@ -50,8 +109,10 @@ int32_t InitLog(const std::string& log_config_file) {
|
|||
#else
|
||||
el::Configurations conf(log_config_file);
|
||||
#endif
|
||||
|
||||
el::Loggers::reconfigureAllLoggers(conf);
|
||||
|
||||
el::Loggers::addFlag(el::LoggingFlag::StrictLogFileSizeCheck);
|
||||
el::Helpers::installPreRollOutCallback(rolloutHandler);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -11,9 +11,7 @@
|
|||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace server {
|
||||
|
||||
int32_t InitLog(const std::string& log_config_file);
|
||||
|
||||
inline std::string GetFileName(std::string filename) {
|
||||
int pos = filename.find_last_of('/');
|
||||
return filename.substr(pos + 1);
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
// Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
// Proprietary and confidential.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
#include <gtest/gtest.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "db/engine/EngineFactory.h"
|
||||
#include "db/engine/ExecutionEngineImpl.h"
|
||||
#include "server/ServerConfig.h"
|
||||
|
||||
#include <vector>
|
||||
|
||||
using namespace zilliz::milvus;
|
||||
|
||||
TEST(EngineTest, FACTORY_TEST) {
|
||||
{
|
||||
auto engine_ptr = engine::EngineFactory::Build(
|
||||
512,
|
||||
"/tmp/milvus_index_1",
|
||||
engine::EngineType::INVALID,
|
||||
engine::MetricType::IP,
|
||||
1024
|
||||
);
|
||||
|
||||
ASSERT_TRUE(engine_ptr == nullptr);
|
||||
}
|
||||
|
||||
{
|
||||
auto engine_ptr = engine::EngineFactory::Build(
|
||||
512,
|
||||
"/tmp/milvus_index_1",
|
||||
engine::EngineType::FAISS_IDMAP,
|
||||
engine::MetricType::IP,
|
||||
1024
|
||||
);
|
||||
|
||||
ASSERT_TRUE(engine_ptr != nullptr);
|
||||
}
|
||||
|
||||
{
|
||||
auto engine_ptr = engine::EngineFactory::Build(
|
||||
512,
|
||||
"/tmp/milvus_index_1",
|
||||
engine::EngineType::FAISS_IVFFLAT,
|
||||
engine::MetricType::IP,
|
||||
1024
|
||||
);
|
||||
|
||||
ASSERT_TRUE(engine_ptr != nullptr);
|
||||
}
|
||||
|
||||
{
|
||||
auto engine_ptr = engine::EngineFactory::Build(
|
||||
512,
|
||||
"/tmp/milvus_index_1",
|
||||
engine::EngineType::FAISS_IVFSQ8,
|
||||
engine::MetricType::IP,
|
||||
1024
|
||||
);
|
||||
|
||||
ASSERT_TRUE(engine_ptr != nullptr);
|
||||
}
|
||||
|
||||
{
|
||||
auto engine_ptr = engine::EngineFactory::Build(
|
||||
512,
|
||||
"/tmp/milvus_index_1",
|
||||
engine::EngineType::NSG_MIX,
|
||||
engine::MetricType::IP,
|
||||
1024
|
||||
);
|
||||
|
||||
ASSERT_TRUE(engine_ptr != nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(EngineTest, ENGINE_IMPL_TEST) {
|
||||
uint16_t dimension = 64;
|
||||
std::string file_path = "/tmp/milvus_index_1";
|
||||
auto engine_ptr = engine::EngineFactory::Build(
|
||||
dimension,
|
||||
file_path,
|
||||
engine::EngineType::FAISS_IVFFLAT,
|
||||
engine::MetricType::IP,
|
||||
1024
|
||||
);
|
||||
|
||||
std::vector<float> data;
|
||||
std::vector<long> ids;
|
||||
const int row_count = 10000;
|
||||
data.reserve(row_count*dimension);
|
||||
ids.reserve(row_count);
|
||||
for(long i = 0; i < row_count; i++) {
|
||||
ids.push_back(i);
|
||||
for(uint16_t k = 0; k < dimension; k++) {
|
||||
data.push_back(i*dimension + k);
|
||||
}
|
||||
}
|
||||
|
||||
auto status = engine_ptr->AddWithIds((long)ids.size(), data.data(), ids.data());
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
ASSERT_EQ(engine_ptr->Dimension(), dimension);
|
||||
ASSERT_EQ(engine_ptr->Count(), ids.size());
|
||||
|
||||
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
|
||||
config.AddSequenceItem(server::CONFIG_GPU_IDS, "0");
|
||||
|
||||
status = engine_ptr->CopyToGpu(0);
|
||||
//ASSERT_TRUE(status.ok());
|
||||
|
||||
auto new_engine = engine_ptr->Clone();
|
||||
ASSERT_EQ(new_engine->Dimension(), dimension);
|
||||
ASSERT_EQ(new_engine->Count(), ids.size());
|
||||
status = new_engine->CopyToCpu();
|
||||
//ASSERT_TRUE(status.ok());
|
||||
|
||||
auto engine_build = new_engine->BuildIndex("/tmp/milvus_index_2", engine::EngineType::FAISS_IVFSQ8);
|
||||
//ASSERT_TRUE(status.ok());
|
||||
|
||||
}
|
|
@ -11,7 +11,7 @@
|
|||
using namespace zilliz::milvus::engine;
|
||||
|
||||
|
||||
TEST(normal_test, DISABLED_inst_test) {
|
||||
TEST(normal_test, inst_test) {
|
||||
// ResourceMgr only compose resources, provide unified event
|
||||
auto res_mgr = ResMgrInst::GetInstance();
|
||||
|
||||
|
|
|
@ -86,9 +86,7 @@ TEST_F(ResourceBaseTest, dump) {
|
|||
ASSERT_FALSE(only_executor_->Dump().empty());
|
||||
ASSERT_FALSE(both_enable_->Dump().empty());
|
||||
ASSERT_FALSE(both_disable_->Dump().empty());
|
||||
std::stringstream ss;
|
||||
ss << only_loader_ << only_executor_ << both_enable_ << both_disable_;
|
||||
ASSERT_FALSE(ss.str().empty());
|
||||
std::cout << *only_loader_ << *only_executor_ << *both_enable_ << *both_disable_;
|
||||
}
|
||||
|
||||
/************ ResourceAdvanceTest ************/
|
||||
|
|
|
@ -3,20 +3,21 @@
|
|||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include "ExecutionEngine.h"
|
||||
|
||||
#include <easylogging++.h>
|
||||
#include "scheduler/task/SearchTask.h"
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
Status ExecutionEngine::AddWithIdArray(const std::vector<float>& vectors, const std::vector<long>& vector_ids) {
|
||||
long n = (long)vector_ids.size();
|
||||
return AddWithIds(n, vectors.data(), vector_ids.data());
|
||||
|
||||
TEST(TaskTest, invalid_index) {
|
||||
auto search_task = std::make_shared<XSearchTask>(nullptr);
|
||||
search_task->Load(LoadType::TEST, 10);
|
||||
}
|
||||
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
} // namespace zilliz
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue