mirror of https://github.com/milvus-io/milvus.git
commit
d279b37e53
|
@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
|||
- MS-56 - Add version information when server is started
|
||||
- MS-64 - Different table can have different index type
|
||||
- MS-52 - Return search score
|
||||
- MS-66 - Support time range query
|
||||
|
||||
## Task
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
******************************************************************************/
|
||||
|
||||
#include "PrometheusMetrics.h"
|
||||
#include "utils/Log.h"
|
||||
#include "SystemInfo.h"
|
||||
|
||||
|
||||
|
@ -14,18 +15,24 @@ namespace server {
|
|||
|
||||
ServerError
|
||||
PrometheusMetrics::Init() {
|
||||
ConfigNode& configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
|
||||
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true:false;
|
||||
// Following should be read from config file.
|
||||
const std::string bind_address = configNode.GetChild(CONFIG_PROMETHEUS).GetValue(CONFIG_METRIC_PROMETHEUS_PORT);
|
||||
const std::string uri = std::string("/metrics");
|
||||
const std::size_t num_threads = 2;
|
||||
try {
|
||||
ConfigNode &configNode = ServerConfig::GetInstance().GetConfig(CONFIG_METRIC);
|
||||
startup_ = configNode.GetValue(CONFIG_METRIC_IS_STARTUP) == "true" ? true : false;
|
||||
// Following should be read from config file.
|
||||
const std::string bind_address = configNode.GetChild(CONFIG_PROMETHEUS).GetValue(CONFIG_METRIC_PROMETHEUS_PORT);
|
||||
const std::string uri = std::string("/metrics");
|
||||
const std::size_t num_threads = 2;
|
||||
|
||||
// Init Exposer
|
||||
exposer_ptr_ = std::make_shared<prometheus::Exposer>(bind_address, uri, num_threads);
|
||||
// Init Exposer
|
||||
exposer_ptr_ = std::make_shared<prometheus::Exposer>(bind_address, uri, num_threads);
|
||||
|
||||
// Exposer Registry
|
||||
exposer_ptr_->RegisterCollectable(registry_);
|
||||
} catch (std::exception& ex) {
|
||||
SERVER_LOG_ERROR << "Failed to connect prometheus server: " << std::string(ex.what());
|
||||
return SERVER_UNEXPECTED_ERROR;
|
||||
}
|
||||
|
||||
// Exposer Registry
|
||||
exposer_ptr_->RegisterCollectable(registry_);
|
||||
return SERVER_SUCCESS;
|
||||
|
||||
}
|
||||
|
|
|
@ -75,6 +75,18 @@ namespace {
|
|||
return str;
|
||||
}
|
||||
|
||||
std::string CurrentTmDate() {
|
||||
time_t tt;
|
||||
time( &tt );
|
||||
tt = tt + 8*3600;
|
||||
tm* t= gmtime( &tt );
|
||||
|
||||
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
|
||||
+ "-" + std::to_string(t->tm_mday);
|
||||
|
||||
return str;
|
||||
}
|
||||
|
||||
std::string GetTableName() {
|
||||
static std::string s_id(CurrentTime());
|
||||
return s_id;
|
||||
|
@ -170,6 +182,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
|
|||
BuildVectors(SEARCH_TARGET, SEARCH_TARGET + 10, record_array);
|
||||
|
||||
std::vector<Range> query_range_array;
|
||||
Range rg;
|
||||
rg.start_value = CurrentTmDate();
|
||||
rg.end_value = CurrentTmDate();
|
||||
query_range_array.emplace_back(rg);
|
||||
std::vector<TopKQueryResult> topk_query_result_array;
|
||||
Status stat = conn->SearchVector(TABLE_NAME, record_array, query_range_array, TOP_K, topk_query_result_array);
|
||||
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
|
||||
|
|
|
@ -159,6 +159,8 @@ ClientProxy::SearchVector(const std::string &table_name,
|
|||
}
|
||||
|
||||
try {
|
||||
|
||||
//step 1: convert vectors data
|
||||
std::vector<thrift::RowRecord> thrift_records;
|
||||
for(auto& record : query_record_array) {
|
||||
thrift::RowRecord thrift_record;
|
||||
|
@ -172,10 +174,21 @@ ClientProxy::SearchVector(const std::string &table_name,
|
|||
thrift_records.emplace_back(thrift_record);
|
||||
}
|
||||
|
||||
//step 2: convert range array
|
||||
std::vector<thrift::Range> thrift_ranges;
|
||||
for(auto& range : query_range_array) {
|
||||
thrift::Range thrift_range;
|
||||
thrift_range.__set_start_value(range.start_value);
|
||||
thrift_range.__set_end_value(range.end_value);
|
||||
|
||||
thrift_ranges.emplace_back(thrift_range);
|
||||
}
|
||||
|
||||
//step 3: search vectors
|
||||
std::vector<thrift::TopKQueryResult> result_array;
|
||||
ClientPtr()->interface()->SearchVector(result_array, table_name, thrift_records, thrift_ranges, topk);
|
||||
|
||||
//step 4: convert result array
|
||||
for(auto& thrift_topk_result : result_array) {
|
||||
TopKQueryResult result;
|
||||
|
||||
|
|
|
@ -75,6 +75,74 @@ namespace {
|
|||
|
||||
return map_type[type];
|
||||
}
|
||||
|
||||
ServerError
|
||||
ConvertRowRecordToFloatArray(const std::vector<thrift::RowRecord>& record_array,
|
||||
uint64_t dimension,
|
||||
std::vector<float>& float_array) {
|
||||
ServerError error_code;
|
||||
uint64_t vec_count = record_array.size();
|
||||
float_array.resize(vec_count*dimension);//allocate enough memory
|
||||
for(uint64_t i = 0; i < vec_count; i++) {
|
||||
const auto& record = record_array[i];
|
||||
if(record.vector_data.empty()) {
|
||||
error_code = SERVER_INVALID_ARGUMENT;
|
||||
SERVER_LOG_ERROR << "No vector provided in record";
|
||||
return error_code;
|
||||
}
|
||||
uint64_t vec_dim = record.vector_data.size()/sizeof(double);//how many double value?
|
||||
if(vec_dim != dimension) {
|
||||
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
|
||||
<< " vs. group dimension:" << dimension;
|
||||
error_code = SERVER_INVALID_VECTOR_DIMENSION;
|
||||
return error_code;
|
||||
}
|
||||
|
||||
//convert double array to float array(thrift has no float type)
|
||||
const double* d_p = reinterpret_cast<const double*>(record.vector_data.data());
|
||||
for(uint64_t d = 0; d < vec_dim; d++) {
|
||||
float_array[i*vec_dim + d] = (float)(d_p[d]);
|
||||
}
|
||||
}
|
||||
|
||||
return SERVER_SUCCESS;
|
||||
}
|
||||
|
||||
static constexpr long DAY_SECONDS = 86400;
|
||||
|
||||
ServerError
|
||||
ConvertTimeRangeToDBDates(const std::vector<megasearch::thrift::Range> &range_array,
|
||||
std::vector<DB_DATE>& dates) {
|
||||
dates.clear();
|
||||
ServerError error_code;
|
||||
for(auto& range : range_array) {
|
||||
time_t tt_start, tt_end;
|
||||
tm tm_start, tm_end;
|
||||
if(!CommonUtil::TimeStrToTime(range.start_value, tt_start, tm_start)){
|
||||
error_code = SERVER_INVALID_TIME_RANGE;
|
||||
SERVER_LOG_ERROR << "Invalid time range: " << range.start_value;
|
||||
return error_code;
|
||||
}
|
||||
|
||||
if(!CommonUtil::TimeStrToTime(range.end_value, tt_end, tm_end)){
|
||||
error_code = SERVER_INVALID_TIME_RANGE;
|
||||
SERVER_LOG_ERROR << "Invalid time range: " << range.end_value;
|
||||
return error_code;
|
||||
}
|
||||
|
||||
long days = (tt_end > tt_start) ? (tt_end - tt_start)/DAY_SECONDS : (tt_start - tt_end)/DAY_SECONDS;
|
||||
for(long i = 0; i <= days; i++) {
|
||||
time_t tt_day = tt_start + DAY_SECONDS*i;
|
||||
tm tm_day;
|
||||
CommonUtil::ConvertTime(tt_day, tm_day);
|
||||
|
||||
long date = tm_day.tm_year*10000 + tm_day.tm_mon*100 + tm_day.tm_mday;//according to db logic
|
||||
dates.push_back(date);
|
||||
}
|
||||
}
|
||||
|
||||
return SERVER_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -237,37 +305,17 @@ ServerError AddVectorTask::OnExecute() {
|
|||
rc.Record("check validation");
|
||||
|
||||
//step 2: prepare float data
|
||||
uint64_t vec_count = (uint64_t)record_array_.size();
|
||||
uint64_t group_dim = table_info.dimension_;
|
||||
std::vector<float> vec_f;
|
||||
vec_f.resize(vec_count*group_dim);//allocate enough memory
|
||||
for(uint64_t i = 0; i < vec_count; i++) {
|
||||
const auto& record = record_array_[i];
|
||||
if(record.vector_data.empty()) {
|
||||
error_code_ = SERVER_INVALID_ARGUMENT;
|
||||
error_msg_ = "No vector provided in record";
|
||||
SERVER_LOG_ERROR << error_msg_;
|
||||
return error_code_;
|
||||
}
|
||||
uint64_t vec_dim = record.vector_data.size()/sizeof(double);//how many double value?
|
||||
if(vec_dim != group_dim) {
|
||||
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
|
||||
<< " vs. group dimension:" << group_dim;
|
||||
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
return error_code_;
|
||||
}
|
||||
|
||||
//convert double array to float array(thrift has no float type)
|
||||
const double* d_p = reinterpret_cast<const double*>(record.vector_data.data());
|
||||
for(uint64_t d = 0; d < vec_dim; d++) {
|
||||
vec_f[i*vec_dim + d] = (float)(d_p[d]);
|
||||
}
|
||||
error_code_ = ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f);
|
||||
if(error_code_ != SERVER_SUCCESS) {
|
||||
error_msg_ = "Invalid row record data";
|
||||
return error_code_;
|
||||
}
|
||||
|
||||
rc.Record("prepare vectors data");
|
||||
|
||||
//step 3: insert vectors
|
||||
uint64_t vec_count = (uint64_t)record_array_.size();
|
||||
stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
|
||||
rc.Record("add vectors to engine");
|
||||
if(!stat.ok()) {
|
||||
|
@ -342,44 +390,29 @@ ServerError SearchVectorTask::OnExecute() {
|
|||
return error_code_;
|
||||
}
|
||||
|
||||
//step 3: check date range, and convert to db dates
|
||||
std::vector<DB_DATE> dates;
|
||||
error_code_ = ConvertTimeRangeToDBDates(range_array_, dates);
|
||||
if(error_code_ != SERVER_SUCCESS) {
|
||||
error_msg_ = "Invalid query range";
|
||||
return error_code_;
|
||||
}
|
||||
|
||||
rc.Record("check validation");
|
||||
|
||||
//step 3: prepare float data
|
||||
std::vector<float> vec_f;
|
||||
uint64_t record_count = (uint64_t)record_array_.size();
|
||||
vec_f.resize(record_count*table_info.dimension_);
|
||||
|
||||
for(uint64_t i = 0; i < record_array_.size(); i++) {
|
||||
const auto& record = record_array_[i];
|
||||
if (record.vector_data.empty()) {
|
||||
error_code_ = SERVER_INVALID_ARGUMENT;
|
||||
error_msg_ = "Query record has no vector";
|
||||
SERVER_LOG_ERROR << error_msg_;
|
||||
return error_code_;
|
||||
}
|
||||
|
||||
uint64_t vec_dim = record.vector_data.size() / sizeof(double);//how many double value?
|
||||
if (vec_dim != table_info.dimension_) {
|
||||
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
|
||||
<< " vs. group dimension:" << table_info.dimension_;
|
||||
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
return error_code_;
|
||||
}
|
||||
|
||||
//convert double array to float array(thrift has no float type)
|
||||
const double* d_p = reinterpret_cast<const double*>(record.vector_data.data());
|
||||
for(uint64_t d = 0; d < vec_dim; d++) {
|
||||
vec_f[i*vec_dim + d] = (float)(d_p[d]);
|
||||
}
|
||||
error_code_ = ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f);
|
||||
if(error_code_ != SERVER_SUCCESS) {
|
||||
error_msg_ = "Invalid row record data";
|
||||
return error_code_;
|
||||
}
|
||||
|
||||
rc.Record("prepare vector data");
|
||||
|
||||
|
||||
//step 4: search vectors
|
||||
std::vector<DB_DATE> dates;
|
||||
engine::QueryResults results;
|
||||
uint64_t record_count = (uint64_t)record_array_.size();
|
||||
stat = DB()->Query(table_name_, (size_t)top_k_, record_count, vec_f.data(), dates, results);
|
||||
rc.Record("search vectors from engine");
|
||||
if(!stat.ok()) {
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include <dirent.h>
|
||||
#include <string.h>
|
||||
#include <iostream>
|
||||
#include <time.h>
|
||||
|
||||
#include "boost/filesystem.hpp"
|
||||
|
||||
|
@ -150,15 +151,39 @@ std::string CommonUtil::GetExePath() {
|
|||
return exe_path;
|
||||
}
|
||||
|
||||
void CommonUtil::ConvertTime(int year, int month, int day, int hour, int minute, int second, time_t& t_t) {
|
||||
tm t_m;
|
||||
t_m.tm_year = year;
|
||||
t_m.tm_mon = month;
|
||||
t_m.tm_mday = day;
|
||||
t_m.tm_hour = hour;
|
||||
t_m.tm_min = minute;
|
||||
t_m.tm_sec = second;
|
||||
t_t = mktime(&t_m);
|
||||
bool CommonUtil::TimeStrToTime(const std::string& time_str,
|
||||
time_t &time_integer,
|
||||
tm &time_struct,
|
||||
const std::string& format) {
|
||||
time_integer = 0;
|
||||
memset(&time_struct, 0, sizeof(tm));
|
||||
|
||||
int ret = sscanf(time_str.c_str(),
|
||||
format.c_str(),
|
||||
&(time_struct.tm_year),
|
||||
&(time_struct.tm_mon),
|
||||
&(time_struct.tm_mday),
|
||||
&(time_struct.tm_hour),
|
||||
&(time_struct.tm_min),
|
||||
&(time_struct.tm_sec));
|
||||
if(ret <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
time_struct.tm_year -= 1900;
|
||||
time_struct.tm_mon--;
|
||||
time_integer = mktime(&time_struct);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void CommonUtil::ConvertTime(time_t time_integer, tm &time_struct) {
|
||||
tm* t_m = localtime (&time_integer);
|
||||
memcpy(&time_struct, t_m, sizeof(tm));
|
||||
}
|
||||
|
||||
void ConvertTime(tm time_struct, time_t &time_integer) {
|
||||
time_integer = mktime(&time_struct);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,7 +26,13 @@ class CommonUtil {
|
|||
|
||||
static std::string GetExePath();
|
||||
|
||||
static void ConvertTime(int year, int month, int day, int hour, int minute, int second, time_t& t_t);
|
||||
static bool TimeStrToTime(const std::string& time_str,
|
||||
time_t &time_integer,
|
||||
tm &time_struct,
|
||||
const std::string& format = "%d-%d-%d %d:%d:%d");
|
||||
|
||||
static void ConvertTime(time_t time_integer, tm &time_struct);
|
||||
static void ConvertTime(tm time_struct, time_t &time_integer);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue