Merge branch 'branch-0.4.0' into MS-119_log_file_question

Former-commit-id: 765c98b91bf45617613568feed360b8dbc3ecc47
pull/191/head
Heisenberg 2019-09-09 20:28:33 +08:00
commit d94945a999
10 changed files with 181 additions and 89 deletions

View File

@ -109,6 +109,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-527 - Update scheduler_test and enable it
- MS-528 - Hide some config used future
- MS-530 - Add unittest for SearchTask->Load
- MS-531 - Disable next version code
- MS-533 - Update resource_test to cover dump function
## New Feature
- MS-343 - Implement ResourceMgr

View File

@ -26,7 +26,7 @@ class NSGInterfaceTest : public DataGen, public TestWithParam<::std::tuple<Confi
void SetUp() override {
//Init_with_default();
FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID, 1024*1024*200, 1024*1024*600, 2);
Generate(256, 1000000, 1);
Generate(256, 10000, 1);
index_ = std::make_shared<NSG>();
std::tie(train_cfg, search_cfg) = GetParam();
}
@ -44,7 +44,7 @@ class NSGInterfaceTest : public DataGen, public TestWithParam<::std::tuple<Confi
INSTANTIATE_TEST_CASE_P(NSGparameters, NSGInterfaceTest,
Values(std::make_tuple(
// search length > out_degree
Config::object{{"nlist", 16384}, {"nprobe", 50}, {"knng", 100}, {"metric_type", "L2"},
Config::object{{"nlist", 128}, {"nprobe", 50}, {"knng", 100}, {"metric_type", "L2"},
{"search_length", 60}, {"out_degree", 70}, {"candidate_pool_size", 500}},
Config::object{{"k", 20}, {"search_length", 30}}))
);

View File

@ -1,22 +0,0 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ExecutionEngine.h"
#include <easylogging++.h>
namespace zilliz {
namespace milvus {
namespace engine {
Status ExecutionEngine::AddWithIdArray(const std::vector<float>& vectors, const std::vector<long>& vector_ids) {
long n = (long)vector_ids.size();
return AddWithIds(n, vectors.data(), vector_ids.data());
}
} // namespace engine
} // namespace milvus
} // namespace zilliz

View File

@ -31,8 +31,6 @@ enum class MetricType {
class ExecutionEngine {
public:
virtual Status AddWithIdArray(const std::vector<float>& vectors, const std::vector<long>& vector_ids);
virtual Status AddWithIds(long n, const float *xdata, const long *xids) = 0;
virtual size_t Count() const = 0;

View File

@ -138,10 +138,8 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
} else {
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
}
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
} catch (std::exception &e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
}
}
@ -166,10 +164,8 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
try {
index_ = index_->CopyToGpu(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
} catch (std::exception &e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
}
}
@ -195,10 +191,8 @@ Status ExecutionEngineImpl::CopyToCpu() {
try {
index_ = index_->CopyToCpu();
ENGINE_LOG_DEBUG << "GPU to CPU";
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
} catch (std::exception &e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
}
}
@ -233,10 +227,8 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
double physical_size = server::CommonUtil::GetFileSize(location);
server::CollectExecutionEngineMetrics metrics(physical_size);
to_merge = read_index(location);
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
} catch (std::exception &e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
}
}

View File

@ -49,7 +49,7 @@ Status MemTableFile::Add(const VectorSource::Ptr &source, IDNumbers& vector_ids)
std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " +
std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_;
ENGINE_LOG_ERROR << err_msg;
return Status(DB_ERROR, err_msg);
return Status(DB_ERROR, "not able to create table file");
}
size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;

View File

@ -145,51 +145,52 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
break;
}
case TaskLabelType::SPECIFIED_RESOURCE: {
auto self = event->resource_.lock();
auto task = load_completed_event->task_table_item_->task;
// if this resource is disk, assign it to smallest cost resource
if (self->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr_.lock()->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs;
for (auto &res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx = 0;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->TotalTasks() == 0) {
min_cost_idx = i;
break;
}
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
+ transport_costs[i];
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
}
}
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
}
if (self->name() == task->path().Last()) {
self->WakeupLoader();
} else {
auto next_res_name = task->path().Next();
auto next_res = res_mgr_.lock()->GetResource(next_res_name);
load_completed_event->task_table_item_->Move();
next_res->task_table().Put(task);
}
// support next version
// auto self = event->resource_.lock();
// auto task = load_completed_event->task_table_item_->task;
//
// // if this resource is disk, assign it to smallest cost resource
// if (self->type() == ResourceType::DISK) {
// // step 1: calculate shortest path per resource, from disk to compute resource
// auto compute_resources = res_mgr_.lock()->GetComputeResources();
// std::vector<std::vector<std::string>> paths;
// std::vector<uint64_t> transport_costs;
// for (auto &res : compute_resources) {
// std::vector<std::string> path;
// uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
// transport_costs.push_back(transport_cost);
// paths.emplace_back(path);
// }
//
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
// uint64_t min_cost_idx = 0;
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->TotalTasks() == 0) {
// min_cost_idx = i;
// break;
// }
// uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
// + transport_costs[i];
// if (min_cost > cost) {
// min_cost = cost;
// min_cost_idx = i;
// }
// }
//
// // step 3: set path in task
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
// task->path() = task_path;
// }
//
// if (self->name() == task->path().Last()) {
// self->WakeupLoader();
// } else {
// auto next_res_name = task->path().Next();
// auto next_res = res_mgr_.lock()->GetResource(next_res_name);
// load_completed_event->task_table_item_->Move();
// next_res->task_table().Put(task);
// }
break;
}
case TaskLabelType::BROADCAST: {

View File

@ -57,8 +57,8 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
total_speed += speed;
}
std::random_device rd;
std::mt19937 mt(rd());
unsigned seed1 = std::chrono::system_clock::now().time_since_epoch().count();
std::mt19937 mt(seed1);
std::uniform_int_distribution<int> dist(0, total_speed);
uint64_t index = 0;
int64_t rd_speed = dist(mt);

View File

@ -0,0 +1,123 @@
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <boost/filesystem.hpp>
#include "db/engine/EngineFactory.h"
#include "db/engine/ExecutionEngineImpl.h"
#include "server/ServerConfig.h"
#include <vector>
using namespace zilliz::milvus;
TEST(EngineTest, FACTORY_TEST) {
{
auto engine_ptr = engine::EngineFactory::Build(
512,
"/tmp/milvus_index_1",
engine::EngineType::INVALID,
engine::MetricType::IP,
1024
);
ASSERT_TRUE(engine_ptr == nullptr);
}
{
auto engine_ptr = engine::EngineFactory::Build(
512,
"/tmp/milvus_index_1",
engine::EngineType::FAISS_IDMAP,
engine::MetricType::IP,
1024
);
ASSERT_TRUE(engine_ptr != nullptr);
}
{
auto engine_ptr = engine::EngineFactory::Build(
512,
"/tmp/milvus_index_1",
engine::EngineType::FAISS_IVFFLAT,
engine::MetricType::IP,
1024
);
ASSERT_TRUE(engine_ptr != nullptr);
}
{
auto engine_ptr = engine::EngineFactory::Build(
512,
"/tmp/milvus_index_1",
engine::EngineType::FAISS_IVFSQ8,
engine::MetricType::IP,
1024
);
ASSERT_TRUE(engine_ptr != nullptr);
}
{
auto engine_ptr = engine::EngineFactory::Build(
512,
"/tmp/milvus_index_1",
engine::EngineType::NSG_MIX,
engine::MetricType::IP,
1024
);
ASSERT_TRUE(engine_ptr != nullptr);
}
}
TEST(EngineTest, ENGINE_IMPL_TEST) {
uint16_t dimension = 64;
std::string file_path = "/tmp/milvus_index_1";
auto engine_ptr = engine::EngineFactory::Build(
dimension,
file_path,
engine::EngineType::FAISS_IVFFLAT,
engine::MetricType::IP,
1024
);
std::vector<float> data;
std::vector<long> ids;
const int row_count = 10000;
data.reserve(row_count*dimension);
ids.reserve(row_count);
for(long i = 0; i < row_count; i++) {
ids.push_back(i);
for(uint16_t k = 0; k < dimension; k++) {
data.push_back(i*dimension + k);
}
}
auto status = engine_ptr->AddWithIds((long)ids.size(), data.data(), ids.data());
ASSERT_TRUE(status.ok());
ASSERT_EQ(engine_ptr->Dimension(), dimension);
ASSERT_EQ(engine_ptr->Count(), ids.size());
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
config.AddSequenceItem(server::CONFIG_GPU_IDS, "0");
status = engine_ptr->CopyToGpu(0);
//ASSERT_TRUE(status.ok());
auto new_engine = engine_ptr->Clone();
ASSERT_EQ(new_engine->Dimension(), dimension);
ASSERT_EQ(new_engine->Count(), ids.size());
status = new_engine->CopyToCpu();
//ASSERT_TRUE(status.ok());
auto engine_build = new_engine->BuildIndex("/tmp/milvus_index_2", engine::EngineType::FAISS_IVFSQ8);
//ASSERT_TRUE(status.ok());
}

View File

@ -86,9 +86,7 @@ TEST_F(ResourceBaseTest, dump) {
ASSERT_FALSE(only_executor_->Dump().empty());
ASSERT_FALSE(both_enable_->Dump().empty());
ASSERT_FALSE(both_disable_->Dump().empty());
std::stringstream ss;
ss << only_loader_ << only_executor_ << both_enable_ << both_disable_;
ASSERT_FALSE(ss.str().empty());
std::cout << *only_loader_ << *only_executor_ << *both_enable_ << *both_disable_;
}
/************ ResourceAdvanceTest ************/