mirror of https://github.com/milvus-io/milvus.git
QPS decrease (#2372)
* optimize merge strategy Signed-off-by: yhmo <yihua.mo@zilliz.com> * #2365 Signed-off-by: groot <yihua.mo@zilliz.com> * fix typo Signed-off-by: groot <yihua.mo@zilliz.com> * optimize search Signed-off-by: groot <yihua.mo@zilliz.com> * code format Signed-off-by: groot <yihua.mo@zilliz.com>pull/2380/head
parent
b9e8acb5b2
commit
014a0e860c
|
@ -1393,9 +1393,10 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
|
|||
}
|
||||
}
|
||||
|
||||
// step 3: let merge file thread finish
|
||||
// to avoid duplicate data bug
|
||||
WaitMergeFileFinish();
|
||||
// step 3: wait merge file thread finished to avoid duplicate data bug
|
||||
WaitMergeFileFinish(); // let merge file thread finish
|
||||
StartMergeTask(true); // start force-merge task
|
||||
WaitMergeFileFinish(); // let force-merge file thread finish
|
||||
|
||||
// step 4: wait and build index
|
||||
status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
|
||||
|
@ -1897,7 +1898,7 @@ DBImpl::StartMetricTask() {
|
|||
}
|
||||
|
||||
void
|
||||
DBImpl::StartMergeTask() {
|
||||
DBImpl::StartMergeTask(bool force_merge_all) {
|
||||
// LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
|
||||
// merge task has been finished?
|
||||
{
|
||||
|
@ -1927,7 +1928,7 @@ DBImpl::StartMergeTask() {
|
|||
|
||||
// start merge file thread
|
||||
merge_thread_results_.push_back(
|
||||
merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_));
|
||||
merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_, force_merge_all));
|
||||
merge_collection_ids_.clear();
|
||||
}
|
||||
}
|
||||
|
@ -2031,14 +2032,20 @@ DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& fi
|
|||
}
|
||||
|
||||
void
|
||||
DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
|
||||
DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all) {
|
||||
// LOG_ENGINE_TRACE_ << " Background merge thread start";
|
||||
|
||||
Status status;
|
||||
for (auto& collection_id : collection_ids) {
|
||||
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
|
||||
|
||||
auto old_strategy = merge_mgr_ptr_->Strategy();
|
||||
if (force_merge_all) {
|
||||
merge_mgr_ptr_->UseStrategy(MergeStrategyType::ADAPTIVE);
|
||||
}
|
||||
|
||||
auto status = merge_mgr_ptr_->MergeFiles(collection_id);
|
||||
merge_mgr_ptr_->UseStrategy(old_strategy);
|
||||
if (!status.ok()) {
|
||||
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id
|
||||
<< " reason:" << status.message();
|
||||
|
|
|
@ -228,10 +228,10 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
StartMetricTask();
|
||||
|
||||
void
|
||||
StartMergeTask();
|
||||
StartMergeTask(bool force_merge_all = false);
|
||||
|
||||
void
|
||||
BackgroundMerge(std::set<std::string> collection_ids);
|
||||
BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all);
|
||||
|
||||
Status
|
||||
MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder);
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include "db/merge/MergeAdaptiveStrategy.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
namespace {
|
||||
struct {
|
||||
bool
|
||||
operator()(meta::SegmentSchema& left, meta::SegmentSchema& right) const {
|
||||
return left.file_size_ > right.file_size_;
|
||||
}
|
||||
} CompareSegment;
|
||||
} // namespace
|
||||
|
||||
Status
|
||||
MergeAdaptiveStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) {
|
||||
meta::SegmentsSchema sort_files;
|
||||
meta::SegmentsSchema& files = files_holder.HoldFiles();
|
||||
for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) {
|
||||
meta::SegmentSchema& file = *iter;
|
||||
if (file.index_file_size_ > 0 && file.file_size_ > file.index_file_size_) {
|
||||
// file that no need to merge
|
||||
continue;
|
||||
}
|
||||
sort_files.push_back(file);
|
||||
}
|
||||
|
||||
// no need to merge single file
|
||||
if (sort_files.size() < 2) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// two files, simply merge them
|
||||
if (sort_files.size() == 2) {
|
||||
files_groups.emplace_back(sort_files);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// arrange files by file size in descending order
|
||||
std::sort(sort_files.begin(), sort_files.end(), CompareSegment);
|
||||
|
||||
// pick files to merge
|
||||
int64_t index_file_size = sort_files[0].index_file_size_;
|
||||
while (true) {
|
||||
meta::SegmentsSchema temp_group;
|
||||
int64_t sum_size = 0;
|
||||
for (auto iter = sort_files.begin(); iter != sort_files.end();) {
|
||||
meta::SegmentSchema& file = *iter;
|
||||
if (sum_size + file.file_size_ <= index_file_size) {
|
||||
temp_group.push_back(file);
|
||||
sum_size += file.file_size_;
|
||||
iter = sort_files.erase(iter);
|
||||
} else {
|
||||
if ((iter + 1 == sort_files.end()) && sum_size < index_file_size) {
|
||||
temp_group.push_back(file);
|
||||
sort_files.erase(iter);
|
||||
break;
|
||||
} else {
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!temp_group.empty()) {
|
||||
files_groups.emplace_back(temp_group);
|
||||
}
|
||||
|
||||
if (sort_files.empty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -0,0 +1,29 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "db/merge/MergeStrategy.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class MergeAdaptiveStrategy : public MergeStrategy {
|
||||
public:
|
||||
Status
|
||||
RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override;
|
||||
}; // MergeSimpleStrategy
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -40,8 +40,7 @@ MergeLayeredStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGr
|
|||
for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) {
|
||||
meta::SegmentSchema& file = *iter;
|
||||
if (file.index_file_size_ > 0 && file.file_size_ > (size_t)(file.index_file_size_)) {
|
||||
// release file that no need to merge
|
||||
files_holder.UnmarkFile(file);
|
||||
// file that no need to merge
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,15 +24,34 @@
|
|||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
// 1. SIMPLE
|
||||
// merge in old way, merge files one by one, stop merge until file size exceed index_file_size
|
||||
// 2. LAYERED
|
||||
// distribute files to several groups according to file size
|
||||
// firstly, define layers by file size: 4MB, 16MB, 64MB, 256MB, 1024MB
|
||||
// if file size between 0MB~4MB, put it into layer "4"
|
||||
// if file size between 4MB~16MB, put it into layer "16"
|
||||
// if file size between 16MB~64MB, put it into layer "64"
|
||||
// if file size between 64MB~256MB, put it into layer "256"
|
||||
// if file size between 256MB~1024MB, put it into layer "1024"
|
||||
// secondly, merge files for each group
|
||||
// third, if some file's create time is 30 seconds ago, and it still un-merged, force merge with upper layer files
|
||||
// 3. ADAPTIVE
|
||||
// Pick files that sum of size is close to index_file_size, merge them
|
||||
enum class MergeStrategyType {
|
||||
SIMPLE = 1,
|
||||
LAYERED = 2,
|
||||
ADAPTIVE = 3,
|
||||
};
|
||||
|
||||
class MergeManager {
|
||||
public:
|
||||
virtual MergeStrategyType
|
||||
Strategy() const = 0;
|
||||
|
||||
virtual Status
|
||||
UseStrategy(MergeStrategyType type) = 0;
|
||||
|
||||
virtual Status
|
||||
MergeFiles(const std::string& collection_id) = 0;
|
||||
}; // MergeManager
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include "db/merge/MergeManagerImpl.h"
|
||||
#include "db/merge/MergeAdaptiveStrategy.h"
|
||||
#include "db/merge/MergeLayeredStrategy.h"
|
||||
#include "db/merge/MergeSimpleStrategy.h"
|
||||
#include "db/merge/MergeStrategy.h"
|
||||
|
@ -21,7 +22,7 @@ namespace milvus {
|
|||
namespace engine {
|
||||
|
||||
MergeManagerImpl::MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type)
|
||||
: meta_ptr_(meta_ptr), options_(options) {
|
||||
: meta_ptr_(meta_ptr), options_(options), strategy_type_(type) {
|
||||
UseStrategy(type);
|
||||
}
|
||||
|
||||
|
@ -36,12 +37,17 @@ MergeManagerImpl::UseStrategy(MergeStrategyType type) {
|
|||
strategy_ = std::make_shared<MergeLayeredStrategy>();
|
||||
break;
|
||||
}
|
||||
case MergeStrategyType::ADAPTIVE: {
|
||||
strategy_ = std::make_shared<MergeAdaptiveStrategy>();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type);
|
||||
LOG_ENGINE_ERROR_ << msg;
|
||||
throw Exception(DB_ERROR, msg);
|
||||
}
|
||||
}
|
||||
strategy_type_ = type;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
@ -31,6 +31,11 @@ class MergeManagerImpl : public MergeManager {
|
|||
public:
|
||||
MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type);
|
||||
|
||||
MergeStrategyType
|
||||
Strategy() const override {
|
||||
return strategy_type_;
|
||||
}
|
||||
|
||||
Status
|
||||
UseStrategy(MergeStrategyType type) override;
|
||||
|
||||
|
@ -41,6 +46,7 @@ class MergeManagerImpl : public MergeManager {
|
|||
meta::MetaPtr meta_ptr_;
|
||||
DBOptions options_;
|
||||
|
||||
MergeStrategyType strategy_type_ = MergeStrategyType::SIMPLE;
|
||||
MergeStrategyPtr strategy_;
|
||||
}; // MergeManagerImpl
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
|
@ -124,6 +125,10 @@ class Meta {
|
|||
virtual Status
|
||||
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
|
||||
FilesHolder& files_holder) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) = 0;
|
||||
|
||||
|
|
|
@ -1692,6 +1692,116 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file
|
|||
}
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
|
||||
FilesHolder& files_holder) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
// get root collection information
|
||||
CollectionSchema collection_schema;
|
||||
collection_schema.collection_id_ = root_collection;
|
||||
auto status = DescribeCollection(collection_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
// distribute id array to batchs
|
||||
const int64_t batch_size = 50;
|
||||
std::vector<std::vector<std::string>> id_groups;
|
||||
std::vector<std::string> temp_group = {root_collection};
|
||||
int64_t count = 1;
|
||||
for (auto& id : partition_id_array) {
|
||||
temp_group.push_back(id);
|
||||
count++;
|
||||
if (count >= batch_size) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
temp_group.clear();
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (!temp_group.empty()) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
}
|
||||
|
||||
// perform query batch by batch
|
||||
int64_t files_count = 0;
|
||||
Status ret;
|
||||
for (auto group : id_groups) {
|
||||
mysqlpp::StoreQueryResult res;
|
||||
{
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
bool is_null_connection = (connectionPtr == nullptr);
|
||||
if (is_null_connection) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
// to ensure UpdateCollectionFiles to be a atomic operation
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement
|
||||
<< "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id in (";
|
||||
for (size_t i = 0; i < group.size(); i++) {
|
||||
statement << mysqlpp::quote << group[i];
|
||||
if (i != group.size() - 1) {
|
||||
statement << ",";
|
||||
}
|
||||
}
|
||||
statement << ")";
|
||||
|
||||
// End
|
||||
statement << " AND"
|
||||
<< " (file_type = " << std::to_string(SegmentSchema::RAW)
|
||||
<< " OR file_type = " << std::to_string(SegmentSchema::TO_INDEX)
|
||||
<< " OR file_type = " << std::to_string(SegmentSchema::INDEX) << ");";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "FilesToSearch: " << statement.str();
|
||||
|
||||
res = statement.store();
|
||||
} // Scoped Connection
|
||||
|
||||
for (auto& resRow : res) {
|
||||
SegmentSchema collection_file;
|
||||
collection_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(collection_file.collection_id_);
|
||||
resRow["segment_id"].to_string(collection_file.segment_id_);
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
collection_file.metric_type_ = collection_schema.metric_type_;
|
||||
resRow["file_id"].to_string(collection_file.file_id_);
|
||||
collection_file.file_type_ = resRow["file_type"];
|
||||
collection_file.file_size_ = resRow["file_size"];
|
||||
collection_file.row_count_ = resRow["row_count"];
|
||||
collection_file.date_ = resRow["date"];
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, collection_file);
|
||||
if (!status.ok()) {
|
||||
ret = status;
|
||||
continue;
|
||||
}
|
||||
|
||||
files_holder.MarkFile(collection_file);
|
||||
files_count++;
|
||||
}
|
||||
}
|
||||
|
||||
if (files_count == 0) {
|
||||
LOG_ENGINE_DEBUG_ << "No file to search for collection: " << root_collection;
|
||||
} else {
|
||||
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << root_collection;
|
||||
}
|
||||
return ret;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Failed to get files to search", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) {
|
||||
try {
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
|
@ -109,6 +110,10 @@ class MySQLMetaImpl : public Meta {
|
|||
Status
|
||||
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override;
|
||||
|
||||
Status
|
||||
FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
|
||||
FilesHolder& files_holder) override;
|
||||
|
||||
Status
|
||||
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override;
|
||||
|
||||
|
|
|
@ -286,13 +286,14 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
|
|||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
if (is_root) {
|
||||
selected = ConnectorPtr->select(select_columns,
|
||||
where(c(&CollectionSchema::collection_id_) == collection_id
|
||||
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
|
||||
and c(&CollectionSchema::owner_collection_) == ""));
|
||||
where(c(&CollectionSchema::collection_id_) == collection_id
|
||||
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
|
||||
and c(&CollectionSchema::owner_collection_) == ""));
|
||||
} else {
|
||||
selected = ConnectorPtr->select(select_columns,
|
||||
where(c(&CollectionSchema::collection_id_) == collection_id
|
||||
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
|
||||
where(c(&CollectionSchema::collection_id_) == collection_id
|
||||
and c(&CollectionSchema::state_)
|
||||
!= (int)CollectionSchema::TO_DELETE));
|
||||
}
|
||||
|
||||
if (selected.size() == 1) {
|
||||
|
@ -1118,6 +1119,99 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil
|
|||
}
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection,
|
||||
const std::set<std::string>& partition_id_array,
|
||||
FilesHolder& files_holder) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
// get root collection information
|
||||
CollectionSchema collection_schema;
|
||||
collection_schema.collection_id_ = root_collection;
|
||||
auto status = DescribeCollection(collection_schema);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
// distribute id array to batchs
|
||||
const int64_t batch_size = 50;
|
||||
std::vector<std::vector<std::string>> id_groups;
|
||||
std::vector<std::string> temp_group = {root_collection};
|
||||
int64_t count = 1;
|
||||
for (auto& id : partition_id_array) {
|
||||
temp_group.push_back(id);
|
||||
count++;
|
||||
if (count >= batch_size) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
temp_group.clear();
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (!temp_group.empty()) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
}
|
||||
|
||||
// perform query batch by batch
|
||||
int64_t files_count = 0;
|
||||
Status ret;
|
||||
for (auto group : id_groups) {
|
||||
auto select_columns =
|
||||
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
|
||||
|
||||
auto match_collectionid = in(&SegmentSchema::collection_id_, group);
|
||||
|
||||
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
|
||||
(int)SegmentSchema::INDEX};
|
||||
auto match_type = in(&SegmentSchema::file_type_, file_types);
|
||||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
{
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
auto filter = where(match_collectionid and match_type);
|
||||
selected = ConnectorPtr->select(select_columns, filter);
|
||||
}
|
||||
|
||||
for (auto& file : selected) {
|
||||
SegmentSchema collection_file;
|
||||
collection_file.id_ = std::get<0>(file);
|
||||
collection_file.collection_id_ = std::get<1>(file);
|
||||
collection_file.segment_id_ = std::get<2>(file);
|
||||
collection_file.file_id_ = std::get<3>(file);
|
||||
collection_file.file_type_ = std::get<4>(file);
|
||||
collection_file.file_size_ = std::get<5>(file);
|
||||
collection_file.row_count_ = std::get<6>(file);
|
||||
collection_file.date_ = std::get<7>(file);
|
||||
collection_file.engine_type_ = std::get<8>(file);
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
collection_file.metric_type_ = collection_schema.metric_type_;
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, collection_file);
|
||||
if (!status.ok()) {
|
||||
ret = status;
|
||||
continue;
|
||||
}
|
||||
|
||||
files_holder.MarkFile(collection_file);
|
||||
files_count++;
|
||||
}
|
||||
}
|
||||
if (files_count == 0) {
|
||||
LOG_ENGINE_DEBUG_ << "No file to search for collection: " << root_collection;
|
||||
} else {
|
||||
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << root_collection;
|
||||
}
|
||||
return ret;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when iterate index files", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) {
|
||||
try {
|
||||
|
@ -1315,29 +1409,21 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
|
|||
file_schema.metric_type_ = collection_schema.metric_type_;
|
||||
|
||||
switch (file_schema.file_type_) {
|
||||
case (int)SegmentSchema::RAW:
|
||||
++raw_count;
|
||||
case (int)SegmentSchema::RAW:++raw_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW:
|
||||
++new_count;
|
||||
case (int)SegmentSchema::NEW:++new_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_MERGE:
|
||||
++new_merge_count;
|
||||
case (int)SegmentSchema::NEW_MERGE:++new_merge_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_INDEX:
|
||||
++new_index_count;
|
||||
case (int)SegmentSchema::NEW_INDEX:++new_index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::TO_INDEX:
|
||||
++to_index_count;
|
||||
case (int)SegmentSchema::TO_INDEX:++to_index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::INDEX:
|
||||
++index_count;
|
||||
case (int)SegmentSchema::INDEX:++index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::BACKUP:
|
||||
++backup_count;
|
||||
break;
|
||||
default:
|
||||
case (int)SegmentSchema::BACKUP:++backup_count;
|
||||
break;
|
||||
default:break;
|
||||
}
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, file_schema);
|
||||
|
@ -1351,11 +1437,9 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
|
|||
std::string msg = "Get collection files by type.";
|
||||
for (int file_type : file_types) {
|
||||
switch (file_type) {
|
||||
case (int)SegmentSchema::RAW:
|
||||
msg = msg + " raw files:" + std::to_string(raw_count);
|
||||
case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count);
|
||||
break;
|
||||
case (int)SegmentSchema::NEW:
|
||||
msg = msg + " new files:" + std::to_string(new_count);
|
||||
case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count);
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_MERGE:
|
||||
msg = msg + " new_merge files:" + std::to_string(new_merge_count);
|
||||
|
@ -1363,17 +1447,13 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
|
|||
case (int)SegmentSchema::NEW_INDEX:
|
||||
msg = msg + " new_index files:" + std::to_string(new_index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::TO_INDEX:
|
||||
msg = msg + " to_index files:" + std::to_string(to_index_count);
|
||||
case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::INDEX:
|
||||
msg = msg + " index files:" + std::to_string(index_count);
|
||||
case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::BACKUP:
|
||||
msg = msg + " backup files:" + std::to_string(backup_count);
|
||||
break;
|
||||
default:
|
||||
case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count);
|
||||
break;
|
||||
default:break;
|
||||
}
|
||||
}
|
||||
LOG_ENGINE_DEBUG_ << msg;
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
|
@ -111,6 +112,10 @@ class SqliteMetaImpl : public Meta {
|
|||
Status
|
||||
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override;
|
||||
|
||||
Status
|
||||
FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
|
||||
FilesHolder& files_holder) override;
|
||||
|
||||
Status
|
||||
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override;
|
||||
|
||||
|
|
Loading…
Reference in New Issue