mirror of https://github.com/milvus-io/milvus.git
Merge branch 'branch-0.3.0' into 'branch-0.3.0'
MS-77 Performance issue of post-search action See merge request megasearch/vecwise_engine!92 Former-commit-id: 94a039963017cf0820a101d8f458091731e79c60pull/191/head
commit
d5b56f278c
|
@ -10,6 +10,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
## Improvement
|
||||
- MS-82 - Update server startup welcome message
|
||||
- MS-83 - Update vecwise to Milvus
|
||||
- MS-77 - Performance issue of post-search action
|
||||
|
||||
## New Feature
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include "EngineFactory.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "scheduler/SearchScheduler.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <chrono>
|
||||
|
@ -71,6 +72,55 @@ void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
|
|||
}
|
||||
}
|
||||
|
||||
void CalcScore(uint64_t vector_count,
|
||||
const float *vectors_data,
|
||||
uint64_t dimension,
|
||||
const SearchContext::ResultSet &result_src,
|
||||
SearchContext::ResultSet &result_target) {
|
||||
result_target.clear();
|
||||
if(result_src.empty()){
|
||||
return;
|
||||
}
|
||||
|
||||
server::TimeRecorder rc("Calculate Score");
|
||||
int vec_index = 0;
|
||||
for(auto& result : result_src) {
|
||||
const float * vec_data = vectors_data + vec_index*dimension;
|
||||
double vec_len = 0;
|
||||
for(uint64_t i = 0; i < dimension; i++) {
|
||||
vec_len += vec_data[i]*vec_data[i];
|
||||
}
|
||||
vec_index++;
|
||||
|
||||
double max_score = 0.0;
|
||||
for(auto& pair : result) {
|
||||
if(max_score < pair.second) {
|
||||
max_score = pair.second;
|
||||
}
|
||||
}
|
||||
|
||||
//makesure socre is less than 100
|
||||
if(max_score > vec_len) {
|
||||
vec_len = max_score;
|
||||
}
|
||||
|
||||
//avoid divided by zero
|
||||
static constexpr double TOLERANCE = std::numeric_limits<float>::epsilon();
|
||||
if(vec_len < TOLERANCE) {
|
||||
vec_len = TOLERANCE;
|
||||
}
|
||||
|
||||
SearchContext::Id2ScoreMap score_array;
|
||||
double vec_len_inverse = 1.0/vec_len;
|
||||
for(auto& pair : result) {
|
||||
score_array.push_back(std::make_pair(pair.first, (1 - pair.second*vec_len_inverse)*100.0));
|
||||
}
|
||||
result_target.emplace_back(score_array);
|
||||
}
|
||||
|
||||
rc.Elapse("totally cost");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -301,6 +351,11 @@ Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
|
|||
if (results.empty()) {
|
||||
return Status::NotFound("Group " + table_id + ", search result not found!");
|
||||
}
|
||||
|
||||
QueryResults temp_results;
|
||||
CalcScore(nq, vectors, dim, results, temp_results);
|
||||
results.swap(temp_results);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -329,9 +384,13 @@ Status DBImpl::QueryAsync(const std::string& table_id, uint64_t k, uint64_t nq,
|
|||
|
||||
context->WaitResult();
|
||||
|
||||
//step 3: construct results
|
||||
//step 3: construct results, calculate score between 0 ~ 100
|
||||
auto& context_result = context->GetResult();
|
||||
results.swap(context_result);
|
||||
meta::TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
pMeta_->DescribeTable(table_schema);
|
||||
|
||||
CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -18,10 +18,15 @@ void ClusterResult(const std::vector<long> &output_ids,
|
|||
uint64_t topk,
|
||||
SearchContext::ResultSet &result_set) {
|
||||
result_set.clear();
|
||||
result_set.reserve(nq);
|
||||
for (auto i = 0; i < nq; i++) {
|
||||
SearchContext::Id2ScoreMap id_score;
|
||||
id_score.reserve(topk);
|
||||
for (auto k = 0; k < topk; k++) {
|
||||
uint64_t index = i * topk + k;
|
||||
if(output_ids[index] < 0) {
|
||||
continue;
|
||||
}
|
||||
id_score.push_back(std::make_pair(output_ids[index], output_distence[index]));
|
||||
}
|
||||
result_set.emplace_back(id_score);
|
||||
|
@ -29,20 +34,60 @@ void ClusterResult(const std::vector<long> &output_ids,
|
|||
}
|
||||
|
||||
void MergeResult(SearchContext::Id2ScoreMap &score_src,
|
||||
SearchContext::Id2ScoreMap &score_target,
|
||||
uint64_t topk) {
|
||||
for (auto& pair_src : score_src) {
|
||||
for (auto iter = score_target.begin(); iter != score_target.end(); ++iter) {
|
||||
if(pair_src.second > iter->second) {
|
||||
score_target.insert(iter, pair_src);
|
||||
SearchContext::Id2ScoreMap &score_target,
|
||||
uint64_t topk) {
|
||||
//Note: the score_src and score_target are already arranged by score in ascending order
|
||||
if(score_src.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if(score_target.empty()) {
|
||||
score_target.swap(score_src);
|
||||
return;
|
||||
}
|
||||
|
||||
size_t src_count = score_src.size();
|
||||
size_t target_count = score_target.size();
|
||||
SearchContext::Id2ScoreMap score_merged;
|
||||
score_merged.reserve(topk);
|
||||
size_t src_index = 0, target_index = 0;
|
||||
while(true) {
|
||||
//all score_src items are merged, if score_merged.size() still less than topk
|
||||
//move items from score_target to score_merged until score_merged.size() equal topk
|
||||
if(src_index >= src_count - 1) {
|
||||
for(size_t i = target_index; i < target_count && score_merged.size() < topk; ++i) {
|
||||
score_merged.push_back(score_target[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
//all score_target items are merged, if score_merged.size() still less than topk
|
||||
//move items from score_src to score_merged until score_merged.size() equal topk
|
||||
if(target_index >= target_count - 1) {
|
||||
for(size_t i = src_index; i < src_count && score_merged.size() < topk; ++i) {
|
||||
score_merged.push_back(score_src[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
//compare score, put smallest score to score_merged one by one
|
||||
auto& src_pair = score_src[src_index];
|
||||
auto& target_pair = score_target[target_index];
|
||||
if(src_pair.second > target_pair.second) {
|
||||
score_merged.push_back(target_pair);
|
||||
target_index++;
|
||||
} else {
|
||||
score_merged.push_back(src_pair);
|
||||
src_index++;
|
||||
}
|
||||
|
||||
//score_merged.size() already equal topk
|
||||
if(score_merged.size() >= topk) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//remove unused items
|
||||
while (score_target.size() > topk) {
|
||||
score_target.pop_back();
|
||||
}
|
||||
score_target.swap(score_merged);
|
||||
}
|
||||
|
||||
void TopkResult(SearchContext::ResultSet &result_src,
|
||||
|
@ -65,33 +110,6 @@ void TopkResult(SearchContext::ResultSet &result_src,
|
|||
}
|
||||
}
|
||||
|
||||
void CalcScore(uint64_t vector_count,
|
||||
const float *vectors_data,
|
||||
uint64_t dimension,
|
||||
const SearchContext::ResultSet &result_src,
|
||||
SearchContext::ResultSet &result_target) {
|
||||
result_target.clear();
|
||||
if(result_src.empty()){
|
||||
return;
|
||||
}
|
||||
|
||||
int vec_index = 0;
|
||||
for(auto& result : result_src) {
|
||||
const float * vec_data = vectors_data + vec_index*dimension;
|
||||
double vec_len = 0;
|
||||
for(uint64_t i = 0; i < dimension; i++) {
|
||||
vec_len += vec_data[i]*vec_data[i];
|
||||
}
|
||||
vec_index++;
|
||||
|
||||
SearchContext::Id2ScoreMap score_array;
|
||||
for(auto& pair : result) {
|
||||
score_array.push_back(std::make_pair(pair.first, (1 - pair.second/vec_len)*100.0));
|
||||
}
|
||||
result_target.emplace_back(score_array);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
bool SearchTask::DoSearch() {
|
||||
|
@ -109,8 +127,8 @@ bool SearchTask::DoSearch() {
|
|||
output_ids.resize(inner_k*context->nq());
|
||||
output_distence.resize(inner_k*context->nq());
|
||||
|
||||
//step 2: search
|
||||
try {
|
||||
//step 2: search
|
||||
index_engine_->Search(context->nq(), context->vectors(), inner_k, output_distence.data(),
|
||||
output_ids.data());
|
||||
|
||||
|
@ -125,18 +143,13 @@ bool SearchTask::DoSearch() {
|
|||
TopkResult(result_set, inner_k, context->GetResult());
|
||||
rc.Record("reduce topk");
|
||||
|
||||
//step 5: calculate score between 0 ~ 100
|
||||
CalcScore(context->nq(), context->vectors(), index_engine_->Dimension(), context->GetResult(), result_set);
|
||||
context->GetResult().swap(result_set);
|
||||
rc.Record("calculate score");
|
||||
|
||||
} catch (std::exception& ex) {
|
||||
SERVER_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
|
||||
context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
|
||||
continue;
|
||||
}
|
||||
|
||||
//step 6: notify to send result to client
|
||||
//step 5: notify to send result to client
|
||||
context->IndexSearchDone(index_id_);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue