mirror of https://github.com/milvus-io/milvus.git
MS-624 Describe index timeout when building index
Former-commit-id: 09c30bbc9718c1fc7305c8ba0eb205c5428ccd3apull/191/head
parent
17e97a3b75
commit
4c945ae5bc
|
@ -21,6 +21,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-644 - Search crashed with index-type: flat
|
||||
- MS-624 - Search vectors failed if time ranges long enough
|
||||
- MS-652 - IVFSQH quantization double free
|
||||
- MS-654 - Describe index timeout when building index
|
||||
|
||||
## Improvement
|
||||
- MS-552 - Add and change the easylogging library
|
||||
|
|
|
@ -39,27 +39,6 @@ mysql_exc "GRANT ALL PRIVILEGES ON ${MYSQL_DB_NAME}.* TO '${MYSQL_USER_NAME}'@'%
|
|||
mysql_exc "FLUSH PRIVILEGES;"
|
||||
mysql_exc "USE ${MYSQL_DB_NAME};"
|
||||
|
||||
MYSQL_USER_NAME=root
|
||||
MYSQL_PASSWORD=Fantast1c
|
||||
MYSQL_HOST='192.168.1.194'
|
||||
MYSQL_PORT='3306'
|
||||
|
||||
MYSQL_DB_NAME=milvus_`date +%s%N`
|
||||
|
||||
function mysql_exc()
|
||||
{
|
||||
cmd=$1
|
||||
mysql -h${MYSQL_HOST} -u${MYSQL_USER_NAME} -p${MYSQL_PASSWORD} -e "${cmd}"
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "mysql $cmd run failed"
|
||||
fi
|
||||
}
|
||||
|
||||
mysql_exc "CREATE DATABASE IF NOT EXISTS ${MYSQL_DB_NAME};"
|
||||
mysql_exc "GRANT ALL PRIVILEGES ON ${MYSQL_DB_NAME}.* TO '${MYSQL_USER_NAME}'@'%';"
|
||||
mysql_exc "FLUSH PRIVILEGES;"
|
||||
mysql_exc "USE ${MYSQL_DB_NAME};"
|
||||
|
||||
# get baseline
|
||||
${LCOV_CMD} -c -i -d ${DIR_GCNO} -o "${FILE_INFO_BASE}"
|
||||
if [ $? -ne 0 ]; then
|
||||
|
|
|
@ -251,11 +251,6 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect
|
|||
Status status;
|
||||
milvus::server::CollectInsertMetrics metrics(n, status);
|
||||
status = mem_mgr_->InsertVectors(table_id, n, vectors, vector_ids);
|
||||
// std::chrono::microseconds time_span =
|
||||
// std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
|
||||
// double average_time = double(time_span.count()) / n;
|
||||
|
||||
// ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
|
||||
|
||||
return status;
|
||||
}
|
||||
|
@ -359,7 +354,7 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
|
|||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
|
||||
ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id << " date range count: " << dates.size();
|
||||
|
||||
// get all table files from table
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
|
@ -377,7 +372,7 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
|
|||
}
|
||||
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
|
||||
status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
|
||||
status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, results);
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
|
||||
return status;
|
||||
}
|
||||
|
@ -389,7 +384,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
|
|||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
|
||||
ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id << " date range count: " << dates.size();
|
||||
|
||||
// get specified files
|
||||
std::vector<size_t> ids;
|
||||
|
@ -418,7 +413,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
|
|||
}
|
||||
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
|
||||
status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
|
||||
status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, results);
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
|
||||
return status;
|
||||
}
|
||||
|
@ -437,14 +432,13 @@ DBImpl::Size(uint64_t& result) {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
Status
|
||||
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
|
||||
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
uint64_t nprobe, const float* vectors, QueryResults& results) {
|
||||
server::CollectQueryMetrics metrics(nq);
|
||||
|
||||
TimeRecorder rc("");
|
||||
|
||||
// step 1: get files to search
|
||||
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size()
|
||||
<< " date range count: " << dates.size();
|
||||
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
|
||||
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
|
||||
for (auto& file : files) {
|
||||
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
|
||||
|
@ -458,32 +452,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
|
|||
return job->GetStatus();
|
||||
}
|
||||
|
||||
// step 3: print time cost information
|
||||
// double load_cost = context->LoadCost();
|
||||
// double search_cost = context->SearchCost();
|
||||
// double reduce_cost = context->ReduceCost();
|
||||
// std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost);
|
||||
// std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost);
|
||||
// std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost);
|
||||
// if(search_cost > 0.0 || reduce_cost > 0.0) {
|
||||
// double total_cost = load_cost + search_cost + reduce_cost;
|
||||
// double load_percent = load_cost/total_cost;
|
||||
// double search_percent = search_cost/total_cost;
|
||||
// double reduce_percent = reduce_cost/total_cost;
|
||||
//
|
||||
// ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info
|
||||
// << " percent: " << load_percent*100 << "%";
|
||||
// ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info
|
||||
// << " percent: " << search_percent*100 << "%";
|
||||
// ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info
|
||||
// << " percent: " << reduce_percent*100 << "%";
|
||||
// } else {
|
||||
// ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
|
||||
// << " search cost: " << search_info
|
||||
// << " reduce cost: " << reduce_info;
|
||||
// }
|
||||
|
||||
// step 4: construct results
|
||||
// step 3: construct results
|
||||
results = job->GetResult();
|
||||
rc.ElapseFromBegin("Engine query totally cost");
|
||||
|
||||
|
@ -695,14 +664,13 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
|
|||
return status;
|
||||
}
|
||||
|
||||
bool has_merge = false;
|
||||
for (auto& kv : raw_files) {
|
||||
auto files = kv.second;
|
||||
if (files.size() < options_.merge_trigger_number_) {
|
||||
ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
|
||||
continue;
|
||||
}
|
||||
has_merge = true;
|
||||
|
||||
MergeFiles(table_id, kv.first, kv.second);
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
|
@ -770,127 +738,6 @@ DBImpl::StartBuildIndexTask(bool force) {
|
|||
}
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
|
||||
(MetricType)file.metric_type_, file.nlist_);
|
||||
if (to_index == nullptr) {
|
||||
ENGINE_LOG_ERROR << "Invalid engine type";
|
||||
return Status(DB_ERROR, "Invalid engine type");
|
||||
}
|
||||
|
||||
try {
|
||||
// step 1: load index
|
||||
Status status = to_index->Load(options_.insert_cache_immediately_);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
|
||||
return status;
|
||||
}
|
||||
|
||||
// step 2: create table file
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = file.table_id_;
|
||||
table_file.date_ = file.date_;
|
||||
table_file.file_type_ =
|
||||
meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path
|
||||
status = meta_ptr_->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
|
||||
return status;
|
||||
}
|
||||
|
||||
// step 3: build index
|
||||
std::shared_ptr<ExecutionEngine> index;
|
||||
|
||||
try {
|
||||
server::CollectBuildIndexMetrics metrics;
|
||||
index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
|
||||
if (index == nullptr) {
|
||||
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr_->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
|
||||
<< " to to_delete";
|
||||
|
||||
return status;
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
// typical error: out of gpu memory
|
||||
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
|
||||
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr_->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
|
||||
std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
|
||||
<< std::endl;
|
||||
|
||||
return Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
// step 4: if table has been deleted, dont save index file
|
||||
bool has_table = false;
|
||||
meta_ptr_->HasTable(file.table_id_, has_table);
|
||||
if (!has_table) {
|
||||
meta_ptr_->DeleteTableFiles(file.table_id_);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// step 5: save index file
|
||||
try {
|
||||
index->Serialize();
|
||||
} catch (std::exception& ex) {
|
||||
// typical error: out of disk space or permition denied
|
||||
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
|
||||
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr_->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
|
||||
std::cout << "ERROR: failed to persist index file: " << table_file.location_
|
||||
<< ", possible out of disk space" << std::endl;
|
||||
|
||||
return Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
// step 6: update meta
|
||||
table_file.file_type_ = meta::TableFileSchema::INDEX;
|
||||
table_file.file_size_ = index->PhysicalSize();
|
||||
table_file.row_count_ = index->Count();
|
||||
|
||||
auto origin_file = file;
|
||||
origin_file.file_type_ = meta::TableFileSchema::BACKUP;
|
||||
|
||||
meta::TableFilesSchema update_files = {table_file, origin_file};
|
||||
status = meta_ptr_->UpdateTableFiles(update_files);
|
||||
if (status.ok()) {
|
||||
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
|
||||
<< " bytes"
|
||||
<< " from file " << origin_file.file_id_;
|
||||
|
||||
if (options_.insert_cache_immediately_) {
|
||||
index->Cache();
|
||||
}
|
||||
} else {
|
||||
// failed to update meta, mark the new file as to_delete, don't delete old file
|
||||
origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
|
||||
status = meta_ptr_->UpdateTableFile(origin_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
|
||||
|
||||
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr_->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
std::string msg = "Build index encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
return Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void
|
||||
DBImpl::BackgroundBuildIndex() {
|
||||
ENGINE_LOG_TRACE << "Background build index thread start";
|
||||
|
@ -915,17 +762,6 @@ DBImpl::BackgroundBuildIndex() {
|
|||
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
|
||||
}
|
||||
}
|
||||
// for (auto &file : to_index_files) {
|
||||
// status = BuildIndex(file);
|
||||
// if (!status.ok()) {
|
||||
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
|
||||
// }
|
||||
//
|
||||
// if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
ENGINE_LOG_TRACE << "Background build index thread exit";
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ class DBImpl : public DB {
|
|||
private:
|
||||
Status
|
||||
QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
|
||||
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results);
|
||||
uint64_t nprobe, const float* vectors, QueryResults& results);
|
||||
|
||||
void
|
||||
BackgroundTimerTask();
|
||||
|
@ -133,9 +133,6 @@ class DBImpl : public DB {
|
|||
void
|
||||
BackgroundBuildIndex();
|
||||
|
||||
Status
|
||||
BuildIndex(const meta::TableFileSchema&);
|
||||
|
||||
Status
|
||||
MemSerialize();
|
||||
|
||||
|
|
|
@ -91,7 +91,8 @@ IVFSQHybrid::CopyCpuToGpu(const int64_t& device_id, const Config& config) {
|
|||
|
||||
auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), device_id, &index_composition, &option);
|
||||
|
||||
std::shared_ptr<faiss::Index> device_index = std::shared_ptr<faiss::Index>(gpu_index);;
|
||||
std::shared_ptr<faiss::Index> device_index = std::shared_ptr<faiss::Index>(gpu_index);
|
||||
;
|
||||
auto new_idx = std::make_shared<IVFSQHybrid>(device_index, device_id, res);
|
||||
return new_idx;
|
||||
} else {
|
||||
|
|
|
@ -40,7 +40,7 @@ namespace grpc {
|
|||
|
||||
static const char* DQL_TASK_GROUP = "dql";
|
||||
static const char* DDL_DML_TASK_GROUP = "ddl_dml";
|
||||
static const char* PING_TASK_GROUP = "ping";
|
||||
static const char* INFO_TASK_GROUP = "info";
|
||||
|
||||
constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
|
||||
|
||||
|
@ -182,7 +182,7 @@ CreateTableTask::OnExecute() {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
DescribeTableTask::DescribeTableTask(const std::string& table_name, ::milvus::grpc::TableSchema* schema)
|
||||
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), schema_(schema) {
|
||||
: GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), schema_(schema) {
|
||||
}
|
||||
|
||||
BaseTaskPtr
|
||||
|
@ -288,7 +288,7 @@ CreateIndexTask::OnExecute() {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
HasTableTask::HasTableTask(const std::string& table_name, bool& has_table)
|
||||
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), has_table_(has_table) {
|
||||
: GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), has_table_(has_table) {
|
||||
}
|
||||
|
||||
BaseTaskPtr
|
||||
|
@ -373,7 +373,7 @@ DropTableTask::OnExecute() {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList* table_name_list)
|
||||
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_list_(table_name_list) {
|
||||
: GrpcBaseTask(INFO_TASK_GROUP), table_name_list_(table_name_list) {
|
||||
}
|
||||
|
||||
BaseTaskPtr
|
||||
|
@ -683,7 +683,7 @@ SearchTask::OnExecute() {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
CountTableTask::CountTableTask(const std::string& table_name, int64_t& row_count)
|
||||
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), row_count_(row_count) {
|
||||
: GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), row_count_(row_count) {
|
||||
}
|
||||
|
||||
BaseTaskPtr
|
||||
|
@ -725,7 +725,7 @@ CountTableTask::OnExecute() {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
CmdTask::CmdTask(const std::string& cmd, std::string& result)
|
||||
: GrpcBaseTask(PING_TASK_GROUP), cmd_(cmd), result_(result) {
|
||||
: GrpcBaseTask(INFO_TASK_GROUP), cmd_(cmd), result_(result) {
|
||||
}
|
||||
|
||||
BaseTaskPtr
|
||||
|
@ -816,7 +816,7 @@ DeleteByRangeTask::OnExecute() {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
PreloadTableTask::PreloadTableTask(const std::string& table_name)
|
||||
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) {
|
||||
: GrpcBaseTask(DQL_TASK_GROUP), table_name_(table_name) {
|
||||
}
|
||||
|
||||
BaseTaskPtr
|
||||
|
@ -851,7 +851,7 @@ PreloadTableTask::OnExecute() {
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
DescribeIndexTask::DescribeIndexTask(const std::string& table_name, ::milvus::grpc::IndexParam* index_param)
|
||||
: GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), index_param_(index_param) {
|
||||
: GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), index_param_(index_param) {
|
||||
}
|
||||
|
||||
BaseTaskPtr
|
||||
|
|
Loading…
Reference in New Issue