mirror of https://github.com/milvus-io/milvus.git
* #1686 API search_in_files cannot work correctly when vectors is stored in certain non-default partition Signed-off-by: yhmo <yihua.mo@zilliz.com> * #1686 API search_in_files cannot work correctly when vectors is stored in certain non-default partition Signed-off-by: groot <yihua.mo@zilliz.com>pull/1712/head
parent
5950deddfc
commit
fe2595fa38
|
@ -11,6 +11,7 @@ Please mark all change in change log and use the issue from GitHub
|
|||
- \#1648 The cache cannot be used all when the vector type is binary
|
||||
- \#1651 Check validity of dimension when collection metric type is binary one
|
||||
- \#1663 PQ index parameter 'm' validation
|
||||
- \#1686 API search_in_files cannot work correctly when vectors is stored in certain non-default partition
|
||||
|
||||
## Feature
|
||||
- \#1603 BinaryFlat add 2 Metric: Substructure and Superstructure
|
||||
|
|
|
@ -29,7 +29,9 @@ class Env;
|
|||
class DB {
|
||||
public:
|
||||
DB() = default;
|
||||
|
||||
DB(const DB&) = delete;
|
||||
|
||||
DB&
|
||||
operator=(const DB&) = delete;
|
||||
|
||||
|
@ -37,6 +39,7 @@ class DB {
|
|||
|
||||
virtual Status
|
||||
Start() = 0;
|
||||
|
||||
virtual Status
|
||||
Stop() = 0;
|
||||
|
||||
|
@ -121,9 +124,9 @@ class DB {
|
|||
const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) = 0;
|
||||
|
||||
virtual Status
|
||||
QueryByFileID(const std::shared_ptr<server::Context>& context, const std::string& table_id,
|
||||
const std::vector<std::string>& file_ids, uint64_t k, const milvus::json& extra_params,
|
||||
const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) = 0;
|
||||
QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids, uint64_t k,
|
||||
const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
|
||||
ResultDistances& result_distances) = 0;
|
||||
|
||||
virtual Status
|
||||
Size(uint64_t& result) = 0;
|
||||
|
|
|
@ -339,9 +339,8 @@ DBImpl::PreloadTable(const std::string& table_id) {
|
|||
}
|
||||
|
||||
// step 1: get all table files from parent table
|
||||
std::vector<size_t> ids;
|
||||
meta::TableFilesSchema files_array;
|
||||
auto status = GetFilesToSearch(table_id, ids, files_array);
|
||||
auto status = GetFilesToSearch(table_id, files_array);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
@ -350,7 +349,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
|
|||
std::vector<meta::TableSchema> partition_array;
|
||||
status = meta_ptr_->ShowPartitions(table_id, partition_array);
|
||||
for (auto& schema : partition_array) {
|
||||
status = GetFilesToSearch(schema.table_id_, ids, files_array);
|
||||
status = GetFilesToSearch(schema.table_id_, files_array);
|
||||
}
|
||||
|
||||
int64_t size = 0;
|
||||
|
@ -1109,13 +1108,12 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
|
|||
}
|
||||
|
||||
Status status;
|
||||
std::vector<size_t> ids;
|
||||
meta::TableFilesSchema files_array;
|
||||
|
||||
if (partition_tags.empty()) {
|
||||
// no partition tag specified, means search in whole table
|
||||
// get all table files from parent table
|
||||
status = GetFilesToSearch(table_id, ids, files_array);
|
||||
status = GetFilesToSearch(table_id, files_array);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
@ -1123,7 +1121,7 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
|
|||
std::vector<meta::TableSchema> partition_array;
|
||||
status = meta_ptr_->ShowPartitions(table_id, partition_array);
|
||||
for (auto& schema : partition_array) {
|
||||
status = GetFilesToSearch(schema.table_id_, ids, files_array);
|
||||
status = GetFilesToSearch(schema.table_id_, files_array);
|
||||
}
|
||||
|
||||
if (files_array.empty()) {
|
||||
|
@ -1135,7 +1133,7 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
|
|||
GetPartitionsByTags(table_id, partition_tags, partition_name_array);
|
||||
|
||||
for (auto& partition_name : partition_name_array) {
|
||||
status = GetFilesToSearch(partition_name, ids, files_array);
|
||||
status = GetFilesToSearch(partition_name, files_array);
|
||||
}
|
||||
|
||||
if (files_array.empty()) {
|
||||
|
@ -1144,7 +1142,7 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
|
|||
}
|
||||
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
|
||||
status = QueryAsync(query_ctx, table_id, files_array, k, extra_params, vectors, result_ids, result_distances);
|
||||
status = QueryAsync(query_ctx, files_array, k, extra_params, vectors, result_ids, result_distances);
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
|
||||
|
||||
query_ctx->GetTraceContext()->GetSpan()->Finish();
|
||||
|
@ -1153,9 +1151,9 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
|
|||
}
|
||||
|
||||
Status
|
||||
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::string& table_id,
|
||||
const std::vector<std::string>& file_ids, uint64_t k, const milvus::json& extra_params,
|
||||
const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
|
||||
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
|
||||
uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
|
||||
ResultDistances& result_distances) {
|
||||
auto query_ctx = context->Child("Query by file id");
|
||||
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
|
@ -1165,25 +1163,23 @@ DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std
|
|||
// get specified files
|
||||
std::vector<size_t> ids;
|
||||
for (auto& id : file_ids) {
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = table_id;
|
||||
std::string::size_type sz;
|
||||
ids.push_back(std::stoul(id, &sz));
|
||||
}
|
||||
|
||||
meta::TableFilesSchema files_array;
|
||||
auto status = GetFilesToSearch(table_id, ids, files_array);
|
||||
meta::TableFilesSchema search_files;
|
||||
auto status = meta_ptr_->FilesByID(ids, search_files);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
fiu_do_on("DBImpl.QueryByFileID.empty_files_array", files_array.clear());
|
||||
if (files_array.empty()) {
|
||||
fiu_do_on("DBImpl.QueryByFileID.empty_files_array", search_files.clear());
|
||||
if (search_files.empty()) {
|
||||
return Status(DB_ERROR, "Invalid file id");
|
||||
}
|
||||
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
|
||||
status = QueryAsync(query_ctx, table_id, files_array, k, extra_params, vectors, result_ids, result_distances);
|
||||
status = QueryAsync(query_ctx, search_files, k, extra_params, vectors, result_ids, result_distances);
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
|
||||
|
||||
query_ctx->GetTraceContext()->GetSpan()->Finish();
|
||||
|
@ -1204,9 +1200,9 @@ DBImpl::Size(uint64_t& result) {
|
|||
// internal methods
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
Status
|
||||
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
|
||||
const meta::TableFilesSchema& files, uint64_t k, const milvus::json& extra_params,
|
||||
const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
|
||||
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::TableFilesSchema& files, uint64_t k,
|
||||
const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
|
||||
ResultDistances& result_distances) {
|
||||
auto query_async_ctx = context->Child("Query Async");
|
||||
|
||||
server::CollectQueryMetrics metrics(vectors.vector_count_);
|
||||
|
@ -1610,12 +1606,11 @@ DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>
|
|||
}
|
||||
|
||||
Status
|
||||
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids,
|
||||
meta::TableFilesSchema& files) {
|
||||
DBImpl::GetFilesToSearch(const std::string& table_id, meta::TableFilesSchema& files) {
|
||||
ENGINE_LOG_DEBUG << "Collect files from table: " << table_id;
|
||||
|
||||
meta::TableFilesSchema search_files;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, file_ids, search_files);
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, search_files);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
|
|
@ -42,12 +42,15 @@ class Meta;
|
|||
class DBImpl : public DB, public server::CacheConfigHandler, public server::EngineConfigHandler {
|
||||
public:
|
||||
explicit DBImpl(const DBOptions& options);
|
||||
|
||||
~DBImpl();
|
||||
|
||||
Status
|
||||
Start() override;
|
||||
|
||||
Status
|
||||
Stop() override;
|
||||
|
||||
Status
|
||||
DropAll() override;
|
||||
|
||||
|
@ -141,9 +144,9 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) override;
|
||||
|
||||
Status
|
||||
QueryByFileID(const std::shared_ptr<server::Context>& context, const std::string& table_id,
|
||||
const std::vector<std::string>& file_ids, uint64_t k, const milvus::json& extra_params,
|
||||
const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) override;
|
||||
QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids, uint64_t k,
|
||||
const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
|
||||
ResultDistances& result_distances) override;
|
||||
|
||||
Status
|
||||
Size(uint64_t& result) override;
|
||||
|
@ -157,9 +160,9 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
|
||||
private:
|
||||
Status
|
||||
QueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
|
||||
const meta::TableFilesSchema& files, uint64_t k, const milvus::json& extra_params,
|
||||
const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances);
|
||||
QueryAsync(const std::shared_ptr<server::Context>& context, const meta::TableFilesSchema& files, uint64_t k,
|
||||
const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
|
||||
ResultDistances& result_distances);
|
||||
|
||||
Status
|
||||
GetVectorByIdHelper(const std::string& table_id, IDNumber vector_id, VectorsData& vector,
|
||||
|
@ -167,8 +170,10 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
|
||||
void
|
||||
BackgroundTimerTask();
|
||||
|
||||
void
|
||||
WaitMergeFileFinish();
|
||||
|
||||
void
|
||||
WaitBuildIndexFinish();
|
||||
|
||||
|
@ -180,13 +185,16 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
|
||||
Status
|
||||
MergeFiles(const std::string& table_id, const meta::TableFilesSchema& files);
|
||||
|
||||
Status
|
||||
BackgroundMergeFiles(const std::string& table_id);
|
||||
|
||||
void
|
||||
BackgroundMerge(std::set<std::string> table_ids);
|
||||
|
||||
void
|
||||
StartBuildIndexTask(bool force = false);
|
||||
|
||||
void
|
||||
BackgroundBuildIndex();
|
||||
|
||||
|
@ -204,7 +212,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
meta::TableFilesSchema& files);
|
||||
|
||||
Status
|
||||
GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, meta::TableFilesSchema& files);
|
||||
GetFilesToSearch(const std::string& table_id, meta::TableFilesSchema& files);
|
||||
|
||||
Status
|
||||
GetPartitionByTag(const std::string& table_id, const std::string& partition_tag, std::string& partition_name);
|
||||
|
|
|
@ -116,7 +116,7 @@ class Meta {
|
|||
GetPartitionName(const std::string& table_name, const std::string& tag, std::string& partition_name) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) = 0;
|
||||
FilesToSearch(const std::string& table_id, TableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToMerge(const std::string& table_id, TableFilesSchema& files) = 0;
|
||||
|
@ -125,7 +125,10 @@ class Meta {
|
|||
FilesToIndex(TableFilesSchema&) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types, TableFilesSchema& table_files) = 0;
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types, TableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesByID(const std::vector<size_t>& ids, TableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status
|
||||
Size(uint64_t& result) = 0;
|
||||
|
|
|
@ -1544,7 +1544,7 @@ MySQLMetaImpl::GetPartitionName(const std::string& table_id, const std::string&
|
|||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) {
|
||||
MySQLMetaImpl::FilesToSearch(const std::string& table_id, TableFilesSchema& files) {
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
|
@ -1565,16 +1565,6 @@ MySQLMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size
|
|||
<< "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id;
|
||||
|
||||
if (!ids.empty()) {
|
||||
std::stringstream idSS;
|
||||
for (auto& id : ids) {
|
||||
idSS << "id = " << std::to_string(id) << " OR ";
|
||||
}
|
||||
std::string idStr = idSS.str();
|
||||
idStr = idStr.substr(0, idStr.size() - 4); // remove the last " OR "
|
||||
|
||||
filesToSearchQuery << " AND (" << idStr << ")";
|
||||
}
|
||||
// End
|
||||
filesToSearchQuery << " AND"
|
||||
<< " (file_type = " << std::to_string(TableFileSchema::RAW)
|
||||
|
@ -1782,8 +1772,7 @@ MySQLMetaImpl::FilesToIndex(TableFilesSchema& files) {
|
|||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||
TableFilesSchema& table_files) {
|
||||
MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types, TableFilesSchema& files) {
|
||||
if (file_types.empty()) {
|
||||
return Status(DB_ERROR, "file types array is empty");
|
||||
}
|
||||
|
@ -1791,7 +1780,7 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
|||
Status ret = Status::OK();
|
||||
|
||||
try {
|
||||
table_files.clear();
|
||||
files.clear();
|
||||
|
||||
mysqlpp::StoreQueryResult res;
|
||||
{
|
||||
|
@ -1857,7 +1846,7 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
|||
ret = status;
|
||||
}
|
||||
|
||||
table_files.emplace_back(file_schema);
|
||||
files.emplace_back(file_schema);
|
||||
|
||||
int32_t file_type = resRow["file_type"];
|
||||
switch (file_type) {
|
||||
|
@ -1924,6 +1913,104 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
|||
return ret;
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, TableFilesSchema& files) {
|
||||
files.clear();
|
||||
|
||||
if (ids.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
mysqlpp::StoreQueryResult res;
|
||||
{
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
bool is_null_connection = (connectionPtr == nullptr);
|
||||
fiu_do_on("MySQLMetaImpl.FilesByID.null_connection", is_null_connection = true);
|
||||
fiu_do_on("MySQLMetaImpl.FilesByID.throw_exception", throw std::exception(););
|
||||
if (is_null_connection) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
mysqlpp::Query filesToSearchQuery = connectionPtr->query();
|
||||
filesToSearchQuery
|
||||
<< "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
|
||||
<< " FROM " << META_TABLEFILES;
|
||||
|
||||
std::stringstream idSS;
|
||||
for (auto& id : ids) {
|
||||
idSS << "id = " << std::to_string(id) << " OR ";
|
||||
}
|
||||
std::string idStr = idSS.str();
|
||||
idStr = idStr.substr(0, idStr.size() - 4); // remove the last " OR "
|
||||
|
||||
filesToSearchQuery << " WHERE (" << idStr << ")";
|
||||
|
||||
// End
|
||||
filesToSearchQuery << " AND"
|
||||
<< " (file_type = " << std::to_string(TableFileSchema::RAW)
|
||||
<< " OR file_type = " << std::to_string(TableFileSchema::TO_INDEX)
|
||||
<< " OR file_type = " << std::to_string(TableFileSchema::INDEX) << ");";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::FilesToSearch: " << filesToSearchQuery.str();
|
||||
|
||||
res = filesToSearchQuery.store();
|
||||
} // Scoped Connection
|
||||
|
||||
std::map<std::string, meta::TableSchema> tables;
|
||||
Status ret;
|
||||
for (auto& resRow : res) {
|
||||
TableFileSchema table_file;
|
||||
table_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(table_file.table_id_);
|
||||
resRow["segment_id"].to_string(table_file.segment_id_);
|
||||
table_file.engine_type_ = resRow["engine_type"];
|
||||
resRow["file_id"].to_string(table_file.file_id_);
|
||||
table_file.file_type_ = resRow["file_type"];
|
||||
table_file.file_size_ = resRow["file_size"];
|
||||
table_file.row_count_ = resRow["row_count"];
|
||||
table_file.date_ = resRow["date"];
|
||||
|
||||
if (tables.find(table_file.table_id_) == tables.end()) {
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_file.table_id_;
|
||||
auto status = DescribeTable(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
tables.insert(std::make_pair(table_file.table_id_, table_schema));
|
||||
}
|
||||
|
||||
auto status = utils::GetTableFilePath(options_, table_file);
|
||||
if (!status.ok()) {
|
||||
ret = status;
|
||||
}
|
||||
|
||||
files.emplace_back(table_file);
|
||||
}
|
||||
|
||||
for (auto& table_file : files) {
|
||||
TableSchema& table_schema = tables[table_file.table_id_];
|
||||
table_file.dimension_ = table_schema.dimension_;
|
||||
table_file.index_file_size_ = table_schema.index_file_size_;
|
||||
table_file.index_params_ = table_schema.index_params_;
|
||||
table_file.metric_type_ = table_schema.metric_type_;
|
||||
}
|
||||
|
||||
if (files.empty()) {
|
||||
ENGINE_LOG_ERROR << "No file to search in file id list";
|
||||
} else {
|
||||
ENGINE_LOG_DEBUG << "Collect " << files.size() << " files by id";
|
||||
}
|
||||
|
||||
return ret;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES BY ID", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(myh): Support swap to cloud storage
|
||||
Status
|
||||
MySQLMetaImpl::Archive() {
|
||||
|
|
|
@ -105,7 +105,7 @@ class MySQLMetaImpl : public Meta {
|
|||
GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) override;
|
||||
|
||||
Status
|
||||
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) override;
|
||||
FilesToSearch(const std::string& table_id, TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
FilesToMerge(const std::string& table_id, TableFilesSchema& files) override;
|
||||
|
@ -114,8 +114,10 @@ class MySQLMetaImpl : public Meta {
|
|||
FilesToIndex(TableFilesSchema&) override;
|
||||
|
||||
Status
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||
TableFilesSchema& table_files) override;
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types, TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
FilesByID(const std::vector<size_t>& ids, TableFilesSchema& table_files) override;
|
||||
|
||||
Status
|
||||
Archive() override;
|
||||
|
|
|
@ -968,11 +968,11 @@ SqliteMetaImpl::GetPartitionName(const std::string& table_id, const std::string&
|
|||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) {
|
||||
SqliteMetaImpl::FilesToSearch(const std::string& table_id, TableFilesSchema& files) {
|
||||
files.clear();
|
||||
server::MetricCollector metric;
|
||||
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception());
|
||||
|
||||
auto select_columns =
|
||||
|
@ -995,14 +995,8 @@ SqliteMetaImpl::FilesToSearch(const std::string& table_id, const std::vector<siz
|
|||
|
||||
// perform query
|
||||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
if (ids.empty()) {
|
||||
auto filter = where(match_tableid and match_type);
|
||||
selected = ConnectorPtr->select(select_columns, filter);
|
||||
} else {
|
||||
auto match_fileid = in(&TableFileSchema::id_, ids);
|
||||
auto filter = where(match_tableid and match_fileid and match_type);
|
||||
selected = ConnectorPtr->select(select_columns, filter);
|
||||
}
|
||||
auto filter = where(match_tableid and match_type);
|
||||
selected = ConnectorPtr->select(select_columns, filter);
|
||||
|
||||
Status ret;
|
||||
for (auto& file : selected) {
|
||||
|
@ -1172,8 +1166,7 @@ SqliteMetaImpl::FilesToIndex(TableFilesSchema& files) {
|
|||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||
TableFilesSchema& table_files) {
|
||||
SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& file_types, TableFilesSchema& files) {
|
||||
if (file_types.empty()) {
|
||||
return Status(DB_ERROR, "file types array is empty");
|
||||
}
|
||||
|
@ -1190,7 +1183,7 @@ SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
|||
try {
|
||||
fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());
|
||||
|
||||
table_files.clear();
|
||||
files.clear();
|
||||
auto selected = ConnectorPtr->select(
|
||||
columns(&TableFileSchema::id_, &TableFileSchema::segment_id_, &TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_, &TableFileSchema::file_size_, &TableFileSchema::row_count_,
|
||||
|
@ -1241,7 +1234,7 @@ SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
|||
ret = status;
|
||||
}
|
||||
|
||||
table_files.emplace_back(file_schema);
|
||||
files.emplace_back(file_schema);
|
||||
}
|
||||
|
||||
std::string msg = "Get table files by type.";
|
||||
|
@ -1277,6 +1270,87 @@ SqliteMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
|
|||
return ret;
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, TableFilesSchema& files) {
|
||||
files.clear();
|
||||
|
||||
if (ids.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception());
|
||||
|
||||
auto select_columns =
|
||||
columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::segment_id_,
|
||||
&TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::file_size_,
|
||||
&TableFileSchema::row_count_, &TableFileSchema::date_, &TableFileSchema::engine_type_);
|
||||
|
||||
|
||||
std::vector<int> file_types = {(int)TableFileSchema::RAW, (int)TableFileSchema::TO_INDEX,
|
||||
(int)TableFileSchema::INDEX};
|
||||
auto match_type = in(&TableFileSchema::file_type_, file_types);
|
||||
|
||||
// perform query
|
||||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
auto match_fileid = in(&TableFileSchema::id_, ids);
|
||||
auto filter = where(match_fileid and match_type);
|
||||
selected = ConnectorPtr->select(select_columns, filter);
|
||||
|
||||
std::map<std::string, meta::TableSchema> tables;
|
||||
Status ret;
|
||||
for (auto& file : selected) {
|
||||
TableFileSchema table_file;
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.segment_id_ = std::get<2>(file);
|
||||
table_file.file_id_ = std::get<3>(file);
|
||||
table_file.file_type_ = std::get<4>(file);
|
||||
table_file.file_size_ = std::get<5>(file);
|
||||
table_file.row_count_ = std::get<6>(file);
|
||||
table_file.date_ = std::get<7>(file);
|
||||
table_file.engine_type_ = std::get<8>(file);
|
||||
|
||||
if (tables.find(table_file.table_id_) == tables.end()) {
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_file.table_id_;
|
||||
auto status = DescribeTable(table_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
tables.insert(std::make_pair(table_file.table_id_, table_schema));
|
||||
}
|
||||
|
||||
auto status = utils::GetTableFilePath(options_, table_file);
|
||||
if (!status.ok()) {
|
||||
ret = status;
|
||||
}
|
||||
|
||||
files.emplace_back(table_file);
|
||||
}
|
||||
|
||||
for (auto& table_file : files) {
|
||||
TableSchema& table_schema = tables[table_file.table_id_];
|
||||
table_file.dimension_ = table_schema.dimension_;
|
||||
table_file.index_file_size_ = table_schema.index_file_size_;
|
||||
table_file.index_params_ = table_schema.index_params_;
|
||||
table_file.metric_type_ = table_schema.metric_type_;
|
||||
}
|
||||
|
||||
if (files.empty()) {
|
||||
ENGINE_LOG_ERROR << "No file to search in file id list";
|
||||
} else {
|
||||
ENGINE_LOG_DEBUG << "Collect " << selected.size() << " files by id";
|
||||
}
|
||||
|
||||
return ret;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when iterate index files", e.what());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// TODO(myh): Support swap to cloud storage
|
||||
Status
|
||||
SqliteMetaImpl::Archive() {
|
||||
|
|
|
@ -104,7 +104,7 @@ class SqliteMetaImpl : public Meta {
|
|||
GetPartitionName(const std::string& table_id, const std::string& tag, std::string& partition_name) override;
|
||||
|
||||
Status
|
||||
FilesToSearch(const std::string& table_id, const std::vector<size_t>& ids, TableFilesSchema& files) override;
|
||||
FilesToSearch(const std::string& table_id, TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
FilesToMerge(const std::string& table_id, TableFilesSchema& files) override;
|
||||
|
@ -113,8 +113,10 @@ class SqliteMetaImpl : public Meta {
|
|||
FilesToIndex(TableFilesSchema&) override;
|
||||
|
||||
Status
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types,
|
||||
TableFilesSchema& table_files) override;
|
||||
FilesByType(const std::string& table_id, const std::vector<int>& file_types, TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
FilesByID(const std::vector<size_t>& ids, TableFilesSchema& files) override;
|
||||
|
||||
Status
|
||||
Size(uint64_t& result) override;
|
||||
|
|
|
@ -154,7 +154,7 @@ SearchRequest::OnExecute() {
|
|||
status = DBWrapper::DB()->Query(context_, table_name_, partition_list_, (size_t)topk_, extra_params_,
|
||||
vectors_data_, result_ids, result_distances);
|
||||
} else {
|
||||
status = DBWrapper::DB()->QueryByFileID(context_, table_name_, file_id_list_, (size_t)topk_, extra_params_,
|
||||
status = DBWrapper::DB()->QueryByFileID(context_, file_id_list_, (size_t)topk_, extra_params_,
|
||||
vectors_data_, result_ids, result_distances);
|
||||
}
|
||||
|
||||
|
|
|
@ -371,24 +371,18 @@ TEST_F(DBTest, SEARCH_TEST) {
|
|||
}
|
||||
milvus::engine::ResultIds result_ids;
|
||||
milvus::engine::ResultDistances result_distances;
|
||||
stat = db_->QueryByFileID(dummy_context_, TABLE_NAME, file_ids, k,
|
||||
stat = db_->QueryByFileID(dummy_context_, file_ids, k,
|
||||
json_params,
|
||||
xq,
|
||||
result_ids,
|
||||
result_distances);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
FIU_ENABLE_FIU("SqliteMetaImpl.FilesToSearch.throw_exception");
|
||||
stat =
|
||||
db_->QueryByFileID(dummy_context_, TABLE_NAME, file_ids, k, json_params, xq, result_ids, result_distances);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("SqliteMetaImpl.FilesToSearch.throw_exception");
|
||||
|
||||
FIU_ENABLE_FIU("DBImpl.QueryByFileID.empty_files_array");
|
||||
stat =
|
||||
db_->QueryByFileID(dummy_context_, TABLE_NAME, file_ids, k, json_params, xq, result_ids, result_distances);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("DBImpl.QueryByFileID.empty_files_array");
|
||||
// FIU_ENABLE_FIU("DBImpl.QueryByFileID.empty_files_array");
|
||||
// stat =
|
||||
// db_->QueryByFileID(dummy_context_, file_ids, k, json_params, xq, result_ids, result_distances);
|
||||
// ASSERT_FALSE(stat.ok());
|
||||
// fiu_disable("DBImpl.QueryByFileID.empty_files_array");
|
||||
}
|
||||
|
||||
// TODO(zhiru): PQ build takes forever
|
||||
|
@ -434,7 +428,7 @@ TEST_F(DBTest, SEARCH_TEST) {
|
|||
}
|
||||
result_ids.clear();
|
||||
result_dists.clear();
|
||||
stat = db_->QueryByFileID(dummy_context_, TABLE_NAME, file_ids, k, json_params, xq, result_ids, result_dists);
|
||||
stat = db_->QueryByFileID(dummy_context_, file_ids, k, json_params, xq, result_ids, result_dists);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
}
|
||||
#endif
|
||||
|
@ -579,7 +573,6 @@ TEST_F(DBTest, SHUTDOWN_TEST) {
|
|||
ASSERT_FALSE(stat.ok());
|
||||
std::vector<std::string> file_ids;
|
||||
stat = db_->QueryByFileID(dummy_context_,
|
||||
table_info.table_id_,
|
||||
file_ids,
|
||||
1,
|
||||
json_params,
|
||||
|
|
|
@ -260,13 +260,12 @@ TEST_F(MetaTest, FALID_TEST) {
|
|||
fiu_disable("SqliteMetaImpl.GetPartitionName.throw_exception");
|
||||
}
|
||||
{
|
||||
std::vector<size_t> ids;
|
||||
milvus::engine::meta::TableFilesSchema table_files;
|
||||
status = impl_->FilesToSearch("notexist", ids, table_files);
|
||||
status = impl_->FilesToSearch("notexist", table_files);
|
||||
ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND);
|
||||
|
||||
FIU_ENABLE_FIU("SqliteMetaImpl.FilesToSearch.throw_exception");
|
||||
status = impl_->FilesToSearch(table_id, ids, table_files);
|
||||
status = impl_->FilesToSearch(table_id, table_files);
|
||||
ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED);
|
||||
fiu_disable("SqliteMetaImpl.FilesToSearch.throw_exception");
|
||||
}
|
||||
|
@ -622,13 +621,21 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
|
|||
ASSERT_EQ(files.size(), to_index_files_cnt);
|
||||
|
||||
table_files.clear();
|
||||
std::vector<size_t> ids;
|
||||
status = impl_->FilesToSearch(table_id, ids, table_files);
|
||||
status = impl_->FilesToSearch(table_id, table_files);
|
||||
ASSERT_EQ(table_files.size(), to_index_files_cnt + raw_files_cnt + index_files_cnt);
|
||||
|
||||
std::vector<size_t> ids;
|
||||
for (auto& file : table_files) {
|
||||
ids.push_back(file.id_);
|
||||
}
|
||||
size_t cnt = table_files.size();
|
||||
table_files.clear();
|
||||
ids.push_back(size_t(9999999999));
|
||||
status = impl_->FilesToSearch(table_id, ids, table_files);
|
||||
status = impl_->FilesByID(ids, table_files);
|
||||
ASSERT_EQ(table_files.size(), cnt);
|
||||
|
||||
table_files.clear();
|
||||
ids = {9999999999UL};
|
||||
status = impl_->FilesByID(ids, table_files);
|
||||
ASSERT_EQ(table_files.size(), 0);
|
||||
|
||||
table_files.clear();
|
||||
|
|
|
@ -249,6 +249,13 @@ TEST_F(MySqlMetaTest, TABLE_FILE_TEST) {
|
|||
status = impl_->GetTableFiles(table_file.table_id_, ids, files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
|
||||
table_file.table_id_ = table.table_id_;
|
||||
table_file.file_type_ = milvus::engine::meta::TableFileSchema::RAW;
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
ids = {table_file.id_};
|
||||
status = impl_->FilesByID(ids, files);
|
||||
ASSERT_EQ(files.size(), 1UL);
|
||||
|
||||
table_file.table_id_ = table.table_id_;
|
||||
table_file.file_type_ = milvus::engine::meta::TableFileSchema::TO_DELETE;
|
||||
status = impl_->CreateTableFile(table_file);
|
||||
|
@ -641,26 +648,25 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
|
|||
fiu_disable("MySQLMetaImpl.FilesToIndex.throw_exception");
|
||||
|
||||
table_files.clear();
|
||||
std::vector<size_t> ids;
|
||||
status = impl_->FilesToSearch(table_id, ids, table_files);
|
||||
status = impl_->FilesToSearch(table_id, table_files);
|
||||
ASSERT_EQ(table_files.size(), to_index_files_cnt + raw_files_cnt + index_files_cnt);
|
||||
|
||||
table_files.clear();
|
||||
ids.push_back(size_t(9999999999));
|
||||
status = impl_->FilesToSearch(table_id, ids, table_files);
|
||||
std::vector<size_t> ids = {9999999999UL};
|
||||
status = impl_->FilesByID(ids, table_files);
|
||||
ASSERT_EQ(table_files.size(), 0);
|
||||
|
||||
FIU_ENABLE_FIU("MySQLMetaImpl.FilesToSearch.null_connection");
|
||||
status = impl_->FilesToSearch(table_id, ids, table_files);
|
||||
status = impl_->FilesToSearch(table_id, table_files);
|
||||
ASSERT_FALSE(status.ok());
|
||||
fiu_disable("MySQLMetaImpl.FilesToSearch.null_connection");
|
||||
|
||||
FIU_ENABLE_FIU("MySQLMetaImpl.FilesToSearch.throw_exception");
|
||||
status = impl_->FilesToSearch(table_id, ids, table_files);
|
||||
status = impl_->FilesToSearch(table_id, table_files);
|
||||
ASSERT_FALSE(status.ok());
|
||||
fiu_disable("MySQLMetaImpl.FilesToSearch.throw_exception");
|
||||
|
||||
status = impl_->FilesToSearch("notexist", ids, table_files);
|
||||
status = impl_->FilesToSearch("notexist", table_files);
|
||||
ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND);
|
||||
|
||||
table_files.clear();
|
||||
|
|
Loading…
Reference in New Issue