mirror of https://github.com/milvus-io/milvus.git
Merge branch 'integrate_knowhere' into 'branch-0.3.1-xiaojun'
MS-286 fix.. See merge request megasearch/milvus!281 Former-commit-id: ab2892036ea2e84f8036bf0918a42c4917d77f64pull/191/head
commit
469b639469
|
@ -198,18 +198,25 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
|
|||
ids.push_back(std::stoul(id, &sz));
|
||||
}
|
||||
|
||||
meta::TableFilesSchema files_array;
|
||||
auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array);
|
||||
meta::DatePartionedTableFilesSchema files_array;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
if(files_array.empty()) {
|
||||
meta::TableFilesSchema file_id_array;
|
||||
for (auto &day_files : files_array) {
|
||||
for (auto &file : day_files.second) {
|
||||
file_id_array.push_back(file);
|
||||
}
|
||||
}
|
||||
|
||||
if(file_id_array.empty()) {
|
||||
return Status::Error("Invalid file id");
|
||||
}
|
||||
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
|
||||
status = QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
|
||||
status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
|
||||
return status;
|
||||
}
|
||||
|
|
|
@ -544,6 +544,79 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
files.clear();
|
||||
MetricCollector metric;
|
||||
|
||||
try {
|
||||
auto select_columns = columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_,
|
||||
&TableFileSchema::size_,
|
||||
&TableFileSchema::date_,
|
||||
&TableFileSchema::engine_type_);
|
||||
|
||||
auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
|
||||
auto is_raw = c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW;
|
||||
auto is_toindex = c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX;
|
||||
auto is_index = c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX;
|
||||
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
auto status = DescribeTable(table_schema);
|
||||
if (!status.ok()) { return status; }
|
||||
|
||||
decltype(ConnectorPtr->select(select_columns)) result;
|
||||
if (partition.empty() && ids.empty()) {
|
||||
auto filter = where(match_tableid and (is_raw or is_toindex or is_index));
|
||||
result = ConnectorPtr->select(select_columns, filter);
|
||||
}
|
||||
else if (partition.empty() && !ids.empty()) {
|
||||
auto match_fileid = in(&TableFileSchema::id_, ids);
|
||||
auto filter = where(match_tableid and match_fileid and (is_raw or is_toindex or is_index));
|
||||
result = ConnectorPtr->select(select_columns, filter);
|
||||
}
|
||||
else if (!partition.empty() && ids.empty()) {
|
||||
auto match_date = in(&TableFileSchema::date_, partition);
|
||||
auto filter = where(match_tableid and match_date and (is_raw or is_toindex or is_index));
|
||||
result = ConnectorPtr->select(select_columns, filter);
|
||||
}
|
||||
else if (!partition.empty() && !ids.empty()) {
|
||||
auto match_fileid = in(&TableFileSchema::id_, ids);
|
||||
auto match_date = in(&TableFileSchema::date_, partition);
|
||||
auto filter = where(match_tableid and match_fileid and match_date and (is_raw or is_toindex or is_index));
|
||||
result = ConnectorPtr->select(select_columns, filter);
|
||||
}
|
||||
|
||||
TableFileSchema table_file;
|
||||
for (auto &file : result) {
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.file_type_ = std::get<3>(file);
|
||||
table_file.size_ = std::get<4>(file);
|
||||
table_file.date_ = std::get<5>(file);
|
||||
table_file.engine_type_ = std::get<6>(file);
|
||||
table_file.dimension_ = table_schema.dimension_;
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
auto dateItr = files.find(table_file.date_);
|
||||
if (dateItr == files.end()) {
|
||||
files[table_file.date_] = TableFilesSchema();
|
||||
}
|
||||
files[table_file.date_].push_back(table_file);
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when iterate index files", e);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::FilesToMerge(const std::string &table_id,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
files.clear();
|
||||
|
|
|
@ -62,6 +62,11 @@ class DBMetaImpl : public Meta {
|
|||
Status
|
||||
FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) override;
|
||||
|
||||
Status FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) override;
|
||||
|
||||
Status
|
||||
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override;
|
||||
|
||||
|
|
|
@ -65,6 +65,9 @@ class Meta {
|
|||
virtual Status
|
||||
FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToSearch(const std::string &table_id, const std::vector<size_t> &ids, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0;
|
||||
|
||||
|
|
|
@ -965,6 +965,117 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
|
||||
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
|
||||
MetricCollector metric;
|
||||
|
||||
StoreQueryResult res;
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status::Error("Failed to connect to database server");
|
||||
}
|
||||
|
||||
Query filesToSearchQuery = connectionPtr->query();
|
||||
filesToSearchQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " <<
|
||||
"FROM TableFiles " <<
|
||||
"WHERE table_id = " << quote << table_id;
|
||||
|
||||
if (!partition.empty()) {
|
||||
std::stringstream partitionListSS;
|
||||
for (auto &date : partition) {
|
||||
partitionListSS << std::to_string(date) << ", ";
|
||||
}
|
||||
std::string partitionListStr = partitionListSS.str();
|
||||
|
||||
partitionListStr = partitionListStr.substr(0, partitionListStr.size() - 2); //remove the last ", "
|
||||
filesToSearchQuery << " AND " << "date IN (" << partitionListStr << ")";
|
||||
}
|
||||
|
||||
if (!ids.empty()) {
|
||||
std::stringstream idSS;
|
||||
for (auto &id : ids) {
|
||||
idSS << "id = " << std::to_string(id) << " OR ";
|
||||
}
|
||||
std::string idStr = idSS.str();
|
||||
idStr = idStr.substr(0, idStr.size() - 4); //remove the last " OR "
|
||||
|
||||
filesToSearchQuery << " AND " << "(" << idStr << ")";
|
||||
|
||||
}
|
||||
// End
|
||||
filesToSearchQuery << " AND " <<
|
||||
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
|
||||
|
||||
res = filesToSearchQuery.store();
|
||||
} //Scoped Connection
|
||||
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
auto status = DescribeTable(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
TableFileSchema table_file;
|
||||
for (auto &resRow : res) {
|
||||
|
||||
table_file.id_ = resRow["id"]; //implicit conversion
|
||||
|
||||
std::string table_id_str;
|
||||
resRow["table_id"].to_string(table_id_str);
|
||||
table_file.table_id_ = table_id_str;
|
||||
|
||||
table_file.engine_type_ = resRow["engine_type"];
|
||||
|
||||
std::string file_id;
|
||||
resRow["file_id"].to_string(file_id);
|
||||
table_file.file_id_ = file_id;
|
||||
|
||||
table_file.file_type_ = resRow["file_type"];
|
||||
|
||||
table_file.size_ = resRow["size"];
|
||||
|
||||
table_file.date_ = resRow["date"];
|
||||
|
||||
table_file.dimension_ = table_schema.dimension_;
|
||||
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
|
||||
auto dateItr = files.find(table_file.date_);
|
||||
if (dateItr == files.end()) {
|
||||
files[table_file.date_] = TableFilesSchema();
|
||||
}
|
||||
|
||||
files[table_file.date_].push_back(table_file);
|
||||
}
|
||||
} catch (const BadQuery &er) {
|
||||
// Handle any query errors
|
||||
ENGINE_LOG_ERROR << "QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
|
||||
return Status::DBTransactionError("QUERY ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
|
||||
} catch (const Exception &er) {
|
||||
// Catch-all for any other MySQL++ exceptions
|
||||
ENGINE_LOG_ERROR << "GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH" << ": " << er.what();
|
||||
return Status::DBTransactionError("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", er.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
|
||||
|
|
|
@ -53,6 +53,11 @@ class MySQLMetaImpl : public Meta {
|
|||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) override;
|
||||
|
||||
Status FilesToSearch(const std::string &table_id,
|
||||
const std::vector<size_t> &ids,
|
||||
const DatesT &partition,
|
||||
DatePartionedTableFilesSchema &files) override;
|
||||
|
||||
Status FilesToMerge(const std::string &table_id,
|
||||
DatePartionedTableFilesSchema &files) override;
|
||||
|
||||
|
|
|
@ -215,14 +215,13 @@ TEST_F(DBTest, SEARCH_TEST) {
|
|||
ASSERT_STATS(stat);
|
||||
}
|
||||
|
||||
// TODO: FIX HERE
|
||||
//{//search by specify index file
|
||||
// engine::meta::DatesT dates;
|
||||
// std::vector<std::string> file_ids = {"1", "2", "3", "4"};
|
||||
// engine::QueryResults results;
|
||||
// stat = db_->Query(TABLE_NAME, file_ids, k, nq, xq.data(), dates, results);
|
||||
// ASSERT_STATS(stat);
|
||||
//}
|
||||
{//search by specify index file
|
||||
engine::meta::DatesT dates;
|
||||
std::vector<std::string> file_ids = {"4", "5", "6"};
|
||||
engine::QueryResults results;
|
||||
stat = db_->Query(TABLE_NAME, file_ids, k, nq, xq.data(), dates, results);
|
||||
ASSERT_STATS(stat);
|
||||
}
|
||||
|
||||
// TODO(linxj): add groundTruth assert
|
||||
};
|
||||
|
|
|
@ -269,4 +269,15 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
|
|||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
std::vector<size_t> ids;
|
||||
status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
ids.push_back(size_t(9999999999));
|
||||
status = impl_->FilesToSearch(table_id, ids, dates, dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),0);
|
||||
}
|
||||
|
|
|
@ -328,6 +328,17 @@ TEST_F(MySQLTest, TABLE_FILES_TEST) {
|
|||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
std::vector<size_t> ids;
|
||||
status = impl.FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),
|
||||
to_index_files_cnt+raw_files_cnt+index_files_cnt);
|
||||
|
||||
ids.push_back(size_t(9999999999));
|
||||
status = impl.FilesToSearch(table_id, ids, dates, dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[table_file.date_].size(),0);
|
||||
|
||||
status = impl.DropAll();
|
||||
ASSERT_TRUE(status.ok());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue