mirror of https://github.com/milvus-io/milvus.git
performance issues (#2438)
* performance issues Signed-off-by: yhmo <yihua.mo@zilliz.com> * fix bugs Signed-off-by: yhmo <yihua.mo@zilliz.com> * preload collection issue Signed-off-by: yhmo <yihua.mo@zilliz.com> * uncomment boring log Signed-off-by: yhmo <yihua.mo@zilliz.com> * reduce unittest time Signed-off-by: yhmo <yihua.mo@zilliz.com> * reduce metric test time cost Signed-off-by: yhmo <yihua.mo@zilliz.com>pull/2453/head
parent
4dc0098560
commit
dcb60f3021
|
@ -71,7 +71,8 @@ class DB {
|
|||
GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) = 0;
|
||||
|
||||
virtual Status
|
||||
PreloadCollection(const std::string& collection_id) = 0;
|
||||
PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
|
||||
bool force = false) = 0;
|
||||
|
||||
virtual Status
|
||||
UpdateCollectionFlag(const std::string& collection_id, int64_t flag) = 0;
|
||||
|
@ -108,10 +109,11 @@ class DB {
|
|||
Flush() = 0;
|
||||
|
||||
virtual Status
|
||||
Compact(const std::string& collection_id, double threshold = 0.0) = 0;
|
||||
Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
|
||||
double threshold = 0.0) = 0;
|
||||
|
||||
virtual Status
|
||||
GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
|
||||
GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array,
|
||||
std::vector<engine::VectorsData>& vectors) = 0;
|
||||
|
||||
virtual Status
|
||||
|
|
|
@ -115,6 +115,16 @@ DBImpl::Start() {
|
|||
// LOG_ENGINE_TRACE_ << "DB service start";
|
||||
initialized_.store(true, std::memory_order_release);
|
||||
|
||||
// server may be closed unexpected, these un-merge files need to be merged when server restart
|
||||
// and soft-delete files need to be deleted when server restart
|
||||
std::set<std::string> merge_collection_ids;
|
||||
std::vector<meta::CollectionSchema> collection_schema_array;
|
||||
meta_ptr_->AllCollections(collection_schema_array);
|
||||
for (auto& schema : collection_schema_array) {
|
||||
merge_collection_ids.insert(schema.collection_id_);
|
||||
}
|
||||
StartMergeTask(merge_collection_ids, true);
|
||||
|
||||
// wal
|
||||
if (options_.wal_enable_) {
|
||||
auto error_code = DB_ERROR;
|
||||
|
@ -158,7 +168,9 @@ DBImpl::Start() {
|
|||
}
|
||||
|
||||
// background metric thread
|
||||
bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
|
||||
if (options_.metric_enable_) {
|
||||
bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -196,8 +208,10 @@ DBImpl::Stop() {
|
|||
}
|
||||
|
||||
// wait metric thread exit
|
||||
swn_metric_.Notify();
|
||||
bg_metric_thread_.join();
|
||||
if (options_.metric_enable_) {
|
||||
swn_metric_.Notify();
|
||||
bg_metric_thread_.join();
|
||||
}
|
||||
|
||||
// LOG_ENGINE_TRACE_ << "DB service stop";
|
||||
return Status::OK();
|
||||
|
@ -386,7 +400,8 @@ DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collect
|
|||
}
|
||||
|
||||
Status
|
||||
DBImpl::PreloadCollection(const std::string& collection_id) {
|
||||
DBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
|
||||
bool force) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
@ -436,6 +451,12 @@ DBImpl::PreloadCollection(const std::string& collection_id) {
|
|||
<< " files need to be pre-loaded";
|
||||
TimeRecorderAuto rc("Pre-load collection:" + collection_id);
|
||||
for (auto& file : files_array) {
|
||||
// client break the connection, no need to continue
|
||||
if (context && context->IsConnectionBroken()) {
|
||||
LOG_ENGINE_DEBUG_ << "Client connection broken, stop load collection";
|
||||
break;
|
||||
}
|
||||
|
||||
EngineType engine_type;
|
||||
if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
|
||||
file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
|
||||
|
@ -467,7 +488,7 @@ DBImpl::PreloadCollection(const std::string& collection_id) {
|
|||
}
|
||||
|
||||
size += engine->Size();
|
||||
if (size > available_size) {
|
||||
if (!force && size > available_size) {
|
||||
LOG_ENGINE_DEBUG_ << "Pre-load cancelled since cache is almost full";
|
||||
return Status(SERVER_CACHE_FULL, "Cache is full");
|
||||
}
|
||||
|
@ -919,7 +940,6 @@ DBImpl::Flush(const std::string& collection_id) {
|
|||
swn_wal_.Notify();
|
||||
flush_req_swn_.Wait();
|
||||
}
|
||||
StartMergeTask();
|
||||
} else {
|
||||
LOG_ENGINE_DEBUG_ << "MemTable flush";
|
||||
InternalFlush(collection_id);
|
||||
|
@ -946,7 +966,6 @@ DBImpl::Flush() {
|
|||
swn_wal_.Notify();
|
||||
flush_req_swn_.Wait();
|
||||
}
|
||||
StartMergeTask();
|
||||
} else {
|
||||
LOG_ENGINE_DEBUG_ << "MemTable flush";
|
||||
InternalFlush();
|
||||
|
@ -958,7 +977,7 @@ DBImpl::Flush() {
|
|||
}
|
||||
|
||||
Status
|
||||
DBImpl::Compact(const std::string& collection_id, double threshold) {
|
||||
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_id, double threshold) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
@ -982,7 +1001,9 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
|
|||
|
||||
LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
|
||||
|
||||
// WaitBuildIndexFinish();
|
||||
std::vector<meta::CollectionSchema> collection_array;
|
||||
status = meta_ptr_->ShowPartitions(collection_id, collection_array);
|
||||
collection_array.push_back(collection_schema);
|
||||
|
||||
const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
|
||||
const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
|
||||
|
@ -993,7 +1014,7 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
|
|||
std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
|
||||
meta::SegmentSchema::FILE_TYPE::BACKUP};
|
||||
meta::FilesHolder files_holder;
|
||||
status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
|
||||
status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder);
|
||||
if (!status.ok()) {
|
||||
std::string err_msg = "Failed to get files to compact: " + status.message();
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
|
@ -1006,6 +1027,12 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
|
|||
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
|
||||
milvus::engine::meta::SegmentsSchema files_to_compact = files_holder.HoldFiles();
|
||||
for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
|
||||
// client break the connection, no need to continue
|
||||
if (context && context->IsConnectionBroken()) {
|
||||
LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
|
||||
break;
|
||||
}
|
||||
|
||||
meta::SegmentSchema file = *iter;
|
||||
iter = files_to_compact.erase(iter);
|
||||
|
||||
|
@ -1023,7 +1050,7 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
|
|||
|
||||
meta::SegmentsSchema files_to_update;
|
||||
if (deleted_docs_size != 0) {
|
||||
compact_status = CompactFile(collection_id, threshold, file, files_to_update);
|
||||
compact_status = CompactFile(file, threshold, files_to_update);
|
||||
|
||||
if (!compact_status.ok()) {
|
||||
LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
|
||||
|
@ -1054,9 +1081,8 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
|
|||
}
|
||||
|
||||
Status
|
||||
DBImpl::CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file,
|
||||
meta::SegmentsSchema& files_to_update) {
|
||||
LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id;
|
||||
DBImpl::CompactFile(const meta::SegmentSchema& file, double threshold, meta::SegmentsSchema& files_to_update) {
|
||||
LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << file.collection_id_;
|
||||
|
||||
std::string segment_dir_to_merge;
|
||||
utils::GetParentPath(file.location_, segment_dir_to_merge);
|
||||
|
@ -1068,7 +1094,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
|
|||
auto status = segment_reader_to_merge.LoadDeletedDocs(deleted_docs_ptr);
|
||||
if (status.ok()) {
|
||||
auto delete_items = deleted_docs_ptr->GetDeletedDocs();
|
||||
double delete_rate = (double)delete_items.size() / (double)file.row_count_;
|
||||
double delete_rate = (double)delete_items.size() / (double)(delete_items.size() + file.row_count_);
|
||||
if (delete_rate < threshold) {
|
||||
LOG_ENGINE_DEBUG_ << "Delete rate less than " << threshold << ", no need to compact for"
|
||||
<< segment_dir_to_merge;
|
||||
|
@ -1079,8 +1105,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
|
|||
|
||||
// Create new collection file
|
||||
meta::SegmentSchema compacted_file;
|
||||
compacted_file.collection_id_ = collection_id;
|
||||
// compacted_file.date_ = date;
|
||||
compacted_file.collection_id_ = file.collection_id_;
|
||||
compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE; // TODO: use NEW_MERGE for now
|
||||
auto status = meta_ptr_->CreateCollectionFile(compacted_file);
|
||||
|
||||
|
@ -1090,7 +1115,6 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
|
|||
}
|
||||
|
||||
// Compact (merge) file to the newly created collection file
|
||||
|
||||
std::string new_segment_dir;
|
||||
utils::GetParentPath(compacted_file.location_, new_segment_dir);
|
||||
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
|
||||
|
@ -1112,7 +1136,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
|
|||
return status;
|
||||
}
|
||||
|
||||
// Update compacted file state, if origin file is backup or to_index, set compected file to to_index
|
||||
// Update compacted file state, if origin file is backup or to_index, set compacted file to to_index
|
||||
compacted_file.file_size_ = segment_writer_ptr->Size();
|
||||
compacted_file.row_count_ = segment_writer_ptr->VectorCount();
|
||||
if ((file.file_type_ == (int32_t)meta::SegmentSchema::BACKUP ||
|
||||
|
@ -1157,53 +1181,41 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
|
|||
}
|
||||
|
||||
Status
|
||||
DBImpl::GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
|
||||
DBImpl::GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array,
|
||||
std::vector<engine::VectorsData>& vectors) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
bool has_collection;
|
||||
auto status = HasCollection(collection_id, has_collection);
|
||||
if (!has_collection) {
|
||||
LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
|
||||
return Status(DB_NOT_FOUND, "Collection does not exist");
|
||||
}
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
meta::FilesHolder files_holder;
|
||||
std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
|
||||
meta::SegmentSchema::FILE_TYPE::BACKUP};
|
||||
|
||||
status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
|
||||
std::vector<meta::CollectionSchema> collection_array;
|
||||
auto status = meta_ptr_->ShowPartitions(collection.collection_id_, collection_array);
|
||||
|
||||
collection_array.push_back(collection);
|
||||
status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder);
|
||||
if (!status.ok()) {
|
||||
std::string err_msg = "Failed to get files for GetVectorsByID: " + status.message();
|
||||
std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
return status;
|
||||
}
|
||||
|
||||
std::vector<meta::CollectionSchema> partition_array;
|
||||
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
|
||||
for (auto& schema : partition_array) {
|
||||
status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files_holder);
|
||||
if (!status.ok()) {
|
||||
std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
|
||||
LOG_ENGINE_ERROR_ << err_msg;
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
if (files_holder.HoldFiles().empty()) {
|
||||
LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
|
||||
return Status(DB_NOT_FOUND, "Collection is empty");
|
||||
}
|
||||
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo();
|
||||
status = GetVectorsByIdHelper(collection_id, id_array, vectors, files_holder);
|
||||
status = GetVectorsByIdHelper(id_array, vectors, files_holder);
|
||||
cache::CpuCacheMgr::GetInstance()->PrintInfo();
|
||||
|
||||
if (vectors.empty()) {
|
||||
std::string msg = "Vectors not found in collection " + collection.collection_id_;
|
||||
LOG_ENGINE_DEBUG_ << msg;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -1280,8 +1292,8 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen
|
|||
}
|
||||
|
||||
Status
|
||||
DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array,
|
||||
std::vector<engine::VectorsData>& vectors, meta::FilesHolder& files_holder) {
|
||||
DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::VectorsData>& vectors,
|
||||
meta::FilesHolder& files_holder) {
|
||||
// attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal
|
||||
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
|
||||
LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size();
|
||||
|
@ -1298,6 +1310,9 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers&
|
|||
|
||||
IDNumbers temp_ids = id_array;
|
||||
for (auto& file : files) {
|
||||
if (temp_ids.empty()) {
|
||||
break; // all vectors found, no need to continue
|
||||
}
|
||||
// Load bloom filter
|
||||
std::string segment_dir;
|
||||
engine::utils::GetParentPath(file.location_, segment_dir);
|
||||
|
@ -1380,11 +1395,6 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers&
|
|||
vectors.emplace_back(data);
|
||||
}
|
||||
|
||||
if (vectors.empty()) {
|
||||
std::string msg = "Vectors not found in collection " + collection_id;
|
||||
LOG_ENGINE_DEBUG_ << msg;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -1395,15 +1405,17 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
|
|||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
// serialize memory data
|
||||
// std::set<std::string> sync_collection_ids;
|
||||
// auto status = SyncMemData(sync_collection_ids);
|
||||
// step 1: wait merge file thread finished to avoid duplicate data bug
|
||||
auto status = Flush();
|
||||
WaitMergeFileFinish(); // let merge file thread finish
|
||||
std::set<std::string> merge_collection_ids;
|
||||
StartMergeTask(merge_collection_ids, true); // start force-merge task
|
||||
WaitMergeFileFinish(); // let force-merge file thread finish
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(build_index_mutex_);
|
||||
|
||||
// step 1: check index difference
|
||||
// step 2: check index difference
|
||||
CollectionIndex old_index;
|
||||
status = DescribeIndex(collection_id, old_index);
|
||||
if (!status.ok()) {
|
||||
|
@ -1411,7 +1423,7 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
|
|||
return status;
|
||||
}
|
||||
|
||||
// step 2: update index info
|
||||
// step 3: update index info
|
||||
CollectionIndex new_index = index;
|
||||
new_index.metric_type_ = old_index.metric_type_; // dont change metric type, it was defined by CreateCollection
|
||||
if (!utils::IsSameIndex(old_index, new_index)) {
|
||||
|
@ -1422,11 +1434,6 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
|
|||
}
|
||||
}
|
||||
|
||||
// step 3: wait merge file thread finished to avoid duplicate data bug
|
||||
WaitMergeFileFinish(); // let merge file thread finish
|
||||
StartMergeTask(true); // start force-merge task
|
||||
WaitMergeFileFinish(); // let force-merge file thread finish
|
||||
|
||||
// step 4: wait and build index
|
||||
status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
|
||||
status = WaitCollectionIndexRecursively(context, collection_id, index);
|
||||
|
@ -1451,7 +1458,8 @@ DBImpl::DropIndex(const std::string& collection_id) {
|
|||
|
||||
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
|
||||
auto status = DropCollectionIndexRecursively(collection_id);
|
||||
StartMergeTask(); // merge small files after drop index
|
||||
std::set<std::string> merge_collection_ids = {collection_id};
|
||||
StartMergeTask(merge_collection_ids, true); // merge small files after drop index
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -1493,7 +1501,7 @@ DBImpl::QueryByIDs(const std::shared_ptr<server::Context>& context, const std::s
|
|||
|
||||
// get target vectors data
|
||||
std::vector<milvus::engine::VectorsData> vectors;
|
||||
status = GetVectorsByID(collection_id, id_array, vectors);
|
||||
status = GetVectorsByID(collection_schema, id_array, vectors);
|
||||
if (!status.ok()) {
|
||||
std::string msg = "Failed to get vector data for collection: " + collection_id;
|
||||
LOG_ENGINE_ERROR_ << msg;
|
||||
|
@ -1897,6 +1905,7 @@ DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const
|
|||
|
||||
void
|
||||
DBImpl::BackgroundIndexThread() {
|
||||
SetThreadName("index_thread");
|
||||
server::SystemInfo::GetInstance().Init();
|
||||
while (true) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
|
@ -1965,7 +1974,7 @@ DBImpl::StartMetricTask() {
|
|||
}
|
||||
|
||||
void
|
||||
DBImpl::StartMergeTask(bool force_merge_all) {
|
||||
DBImpl::StartMergeTask(const std::set<std::string>& merge_collection_ids, bool force_merge_all) {
|
||||
// LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
|
||||
// merge task has been finished?
|
||||
{
|
||||
|
@ -1982,21 +1991,9 @@ DBImpl::StartMergeTask(bool force_merge_all) {
|
|||
{
|
||||
std::lock_guard<std::mutex> lck(merge_result_mutex_);
|
||||
if (merge_thread_results_.empty()) {
|
||||
// collect merge files for all collections(if merge_collection_ids_ is empty) for two reasons:
|
||||
// 1. other collections may still has un-merged files
|
||||
// 2. server may be closed unexpected, these un-merge files need to be merged when server restart
|
||||
if (merge_collection_ids_.empty()) {
|
||||
std::vector<meta::CollectionSchema> collection_schema_array;
|
||||
meta_ptr_->AllCollections(collection_schema_array);
|
||||
for (auto& schema : collection_schema_array) {
|
||||
merge_collection_ids_.insert(schema.collection_id_);
|
||||
}
|
||||
}
|
||||
|
||||
// start merge file thread
|
||||
merge_thread_results_.push_back(
|
||||
merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_, force_merge_all));
|
||||
merge_collection_ids_.clear();
|
||||
merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids, force_merge_all));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2124,7 +2121,7 @@ DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_a
|
|||
}
|
||||
}
|
||||
|
||||
meta_ptr_->Archive();
|
||||
// meta_ptr_->Archive();
|
||||
|
||||
{
|
||||
uint64_t timeout = (options_.file_cleanup_timeout_ >= 0) ? options_.file_cleanup_timeout_ : 10;
|
||||
|
@ -2163,7 +2160,7 @@ DBImpl::BackgroundBuildIndex() {
|
|||
meta::FilesHolder files_holder;
|
||||
meta_ptr_->FilesToIndex(files_holder);
|
||||
|
||||
milvus::engine::meta::SegmentsSchema& to_index_files = files_holder.HoldFiles();
|
||||
milvus::engine::meta::SegmentsSchema to_index_files = files_holder.HoldFiles();
|
||||
Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
|
||||
|
||||
if (!to_index_files.empty()) {
|
||||
|
@ -2383,7 +2380,7 @@ DBImpl::WaitCollectionIndexRecursively(const std::shared_ptr<server::Context>& c
|
|||
index_req_swn_.Wait_For(std::chrono::seconds(1));
|
||||
|
||||
// client break the connection, no need to block, check every 1 second
|
||||
if (context->IsConnectionBroken()) {
|
||||
if (context && context->IsConnectionBroken()) {
|
||||
LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
|
||||
break; // just break, not return, continue to update partitions files to to_index
|
||||
}
|
||||
|
@ -2490,10 +2487,11 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
wal_mgr_->CollectionFlushed(collection_id, lsn);
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lck(merge_result_mutex_);
|
||||
std::set<std::string> merge_collection_ids;
|
||||
for (auto& collection : target_collection_names) {
|
||||
merge_collection_ids_.insert(collection);
|
||||
merge_collection_ids.insert(collection);
|
||||
}
|
||||
StartMergeTask(merge_collection_ids);
|
||||
return max_lsn;
|
||||
};
|
||||
|
||||
|
@ -2505,8 +2503,8 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
|
|||
wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lck(merge_result_mutex_);
|
||||
merge_collection_ids_.insert(target_collection_name);
|
||||
std::set<std::string> merge_collection_ids = {target_collection_name};
|
||||
StartMergeTask(merge_collection_ids);
|
||||
};
|
||||
|
||||
Status status;
|
||||
|
@ -2663,8 +2661,6 @@ DBImpl::InternalFlush(const std::string& collection_id) {
|
|||
record.type = wal::MXLogType::Flush;
|
||||
record.collection_id = collection_id;
|
||||
ExecWalRecord(record);
|
||||
|
||||
StartMergeTask();
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -2747,6 +2743,7 @@ DBImpl::BackgroundFlushThread() {
|
|||
|
||||
void
|
||||
DBImpl::BackgroundMetricThread() {
|
||||
SetThreadName("metric_thread");
|
||||
server::SystemInfo::GetInstance().Init();
|
||||
while (true) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
|
|
|
@ -78,7 +78,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
GetCollectionInfo(const std::string& collection_id, std::string& collection_info) override;
|
||||
|
||||
Status
|
||||
PreloadCollection(const std::string& collection_id) override;
|
||||
PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
|
||||
bool force = false) override;
|
||||
|
||||
Status
|
||||
UpdateCollectionFlag(const std::string& collection_id, int64_t flag) override;
|
||||
|
@ -119,10 +120,11 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
Flush() override;
|
||||
|
||||
Status
|
||||
Compact(const std::string& collection_id, double threshold = 0.0) override;
|
||||
Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
|
||||
double threshold = 0.0) override;
|
||||
|
||||
Status
|
||||
GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
|
||||
GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array,
|
||||
std::vector<engine::VectorsData>& vectors) override;
|
||||
|
||||
Status
|
||||
|
@ -200,8 +202,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
ResultIds& result_ids, ResultDistances& result_distances);
|
||||
|
||||
Status
|
||||
GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array,
|
||||
std::vector<engine::VectorsData>& vectors, meta::FilesHolder& files_holder);
|
||||
GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::VectorsData>& vectors,
|
||||
meta::FilesHolder& files_holder);
|
||||
|
||||
void
|
||||
InternalFlush(const std::string& collection_id = "");
|
||||
|
@ -228,7 +230,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
StartMetricTask();
|
||||
|
||||
void
|
||||
StartMergeTask(bool force_merge_all = false);
|
||||
StartMergeTask(const std::set<std::string>& merge_collection_ids, bool force_merge_all = false);
|
||||
|
||||
void
|
||||
BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all);
|
||||
|
@ -243,13 +245,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
BackgroundBuildIndex();
|
||||
|
||||
Status
|
||||
CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file,
|
||||
meta::SegmentsSchema& files_to_update);
|
||||
|
||||
/*
|
||||
Status
|
||||
SyncMemData(std::set<std::string>& sync_collection_ids);
|
||||
*/
|
||||
CompactFile(const meta::SegmentSchema& file, double threshold, meta::SegmentsSchema& files_to_update);
|
||||
|
||||
Status
|
||||
GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
|
||||
|
@ -355,7 +351,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
|
|||
ThreadPool merge_thread_pool_;
|
||||
std::mutex merge_result_mutex_;
|
||||
std::list<std::future<void>> merge_thread_results_;
|
||||
std::set<std::string> merge_collection_ids_;
|
||||
|
||||
ThreadPool index_thread_pool_;
|
||||
std::mutex index_result_mutex_;
|
||||
|
|
|
@ -75,6 +75,8 @@ struct DBOptions {
|
|||
int64_t auto_flush_interval_ = 1;
|
||||
int64_t file_cleanup_timeout_ = 10;
|
||||
|
||||
bool metric_enable_ = false;
|
||||
|
||||
// wal relative configurations
|
||||
bool wal_enable_ = true;
|
||||
bool recovery_error_ignore_ = true;
|
||||
|
|
|
@ -138,6 +138,10 @@ class Meta {
|
|||
virtual Status
|
||||
FilesByType(const std::string& collection_id, const std::vector<int>& file_types, FilesHolder& files_holder) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
|
||||
FilesHolder& files_holder) = 0;
|
||||
|
||||
virtual Status
|
||||
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) = 0;
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include <sstream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
#include "MetaConsts.h"
|
||||
|
@ -1637,7 +1638,8 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file
|
|||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
|
||||
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
|
||||
<< " engine_type, created_on, updated_time"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id;
|
||||
|
||||
// End
|
||||
|
@ -1665,16 +1667,19 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file
|
|||
collection_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(collection_file.collection_id_);
|
||||
resRow["segment_id"].to_string(collection_file.segment_id_);
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
collection_file.metric_type_ = collection_schema.metric_type_;
|
||||
resRow["file_id"].to_string(collection_file.file_id_);
|
||||
collection_file.file_type_ = resRow["file_type"];
|
||||
collection_file.file_size_ = resRow["file_size"];
|
||||
collection_file.row_count_ = resRow["row_count"];
|
||||
collection_file.date_ = resRow["date"];
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
collection_file.created_on_ = resRow["created_on"];
|
||||
collection_file.updated_time_ = resRow["updated_time"];
|
||||
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
collection_file.metric_type_ = collection_schema.metric_type_;
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, collection_file);
|
||||
if (!status.ok()) {
|
||||
|
@ -1711,18 +1716,15 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se
|
|||
return status;
|
||||
}
|
||||
|
||||
// distribute id array to batchs
|
||||
const int64_t batch_size = 50;
|
||||
// distribute id array to batches
|
||||
const uint64_t batch_size = 50;
|
||||
std::vector<std::vector<std::string>> id_groups;
|
||||
std::vector<std::string> temp_group;
|
||||
int64_t count = 1;
|
||||
for (auto& id : partition_id_array) {
|
||||
temp_group.push_back(id);
|
||||
count++;
|
||||
if (count >= batch_size) {
|
||||
if (temp_group.size() >= batch_size) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
temp_group.clear();
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1749,9 +1751,9 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se
|
|||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement
|
||||
<< "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id in (";
|
||||
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
|
||||
<< " engine_type, created_on, updated_time"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id in (";
|
||||
for (size_t i = 0; i < group.size(); i++) {
|
||||
statement << mysqlpp::quote << group[i];
|
||||
if (i != group.size() - 1) {
|
||||
|
@ -1776,16 +1778,19 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se
|
|||
collection_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(collection_file.collection_id_);
|
||||
resRow["segment_id"].to_string(collection_file.segment_id_);
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
collection_file.metric_type_ = collection_schema.metric_type_;
|
||||
resRow["file_id"].to_string(collection_file.file_id_);
|
||||
collection_file.file_type_ = resRow["file_type"];
|
||||
collection_file.file_size_ = resRow["file_size"];
|
||||
collection_file.row_count_ = resRow["row_count"];
|
||||
collection_file.date_ = resRow["date"];
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
collection_file.created_on_ = resRow["created_on"];
|
||||
collection_file.updated_time_ = resRow["updated_time"];
|
||||
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
collection_file.metric_type_ = collection_schema.metric_type_;
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, collection_file);
|
||||
if (!status.ok()) {
|
||||
|
@ -1837,8 +1842,8 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files
|
|||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date, "
|
||||
"engine_type, created_on"
|
||||
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
|
||||
" engine_type, created_on, updated_time"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
|
||||
<< " AND file_type = " << std::to_string(SegmentSchema::RAW) << " ORDER BY row_count DESC;";
|
||||
|
||||
|
@ -1861,14 +1866,17 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files
|
|||
resRow["segment_id"].to_string(collection_file.segment_id_);
|
||||
resRow["file_id"].to_string(collection_file.file_id_);
|
||||
collection_file.file_type_ = resRow["file_type"];
|
||||
collection_file.file_size_ = resRow["file_size"];
|
||||
collection_file.row_count_ = resRow["row_count"];
|
||||
collection_file.date_ = resRow["date"];
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
collection_file.created_on_ = resRow["created_on"];
|
||||
collection_file.updated_time_ = resRow["updated_time"];
|
||||
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
collection_file.metric_type_ = collection_schema.metric_type_;
|
||||
collection_file.created_on_ = resRow["created_on"];
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, collection_file);
|
||||
if (!status.ok()) {
|
||||
|
@ -1911,12 +1919,12 @@ MySQLMetaImpl::FilesToIndex(FilesHolder& files_holder) {
|
|||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
|
||||
"row_count, date, created_on"
|
||||
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
|
||||
<< " engine_type, created_on, updated_time"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE file_type = " << std::to_string(SegmentSchema::TO_INDEX)
|
||||
<< ";";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "FilesToIndex: " << statement.str();
|
||||
// LOG_ENGINE_DEBUG_ << "FilesToIndex: " << statement.str();
|
||||
|
||||
res = statement.store();
|
||||
} // Scoped Connection
|
||||
|
@ -1929,13 +1937,14 @@ MySQLMetaImpl::FilesToIndex(FilesHolder& files_holder) {
|
|||
collection_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(collection_file.collection_id_);
|
||||
resRow["segment_id"].to_string(collection_file.segment_id_);
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
resRow["file_id"].to_string(collection_file.file_id_);
|
||||
collection_file.file_type_ = resRow["file_type"];
|
||||
collection_file.file_size_ = resRow["file_size"];
|
||||
collection_file.row_count_ = resRow["row_count"];
|
||||
collection_file.date_ = resRow["date"];
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
collection_file.created_on_ = resRow["created_on"];
|
||||
collection_file.updated_time_ = resRow["updated_time"];
|
||||
|
||||
auto groupItr = groups.find(collection_file.collection_id_);
|
||||
if (groupItr == groups.end()) {
|
||||
|
@ -2003,10 +2012,10 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
|
|||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
// since collection_id is a unique column we just need to check whether it exists or not
|
||||
statement
|
||||
<< "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
|
||||
<< " AND file_type in (" << types << ");";
|
||||
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
|
||||
<< " engine_type, created_on, updated_time"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
|
||||
<< " AND file_type in (" << types << ");";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "FilesByType: " << statement.str();
|
||||
|
||||
|
@ -2028,13 +2037,14 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
|
|||
file_schema.id_ = resRow["id"];
|
||||
file_schema.collection_id_ = collection_id;
|
||||
resRow["segment_id"].to_string(file_schema.segment_id_);
|
||||
file_schema.engine_type_ = resRow["engine_type"];
|
||||
resRow["file_id"].to_string(file_schema.file_id_);
|
||||
file_schema.file_type_ = resRow["file_type"];
|
||||
file_schema.file_size_ = resRow["file_size"];
|
||||
file_schema.row_count_ = resRow["row_count"];
|
||||
file_schema.date_ = resRow["date"];
|
||||
file_schema.engine_type_ = resRow["engine_type"];
|
||||
file_schema.created_on_ = resRow["created_on"];
|
||||
file_schema.updated_time_ = resRow["updated_time"];
|
||||
|
||||
file_schema.index_file_size_ = collection_schema.index_file_size_;
|
||||
file_schema.index_params_ = collection_schema.index_params_;
|
||||
|
@ -2113,6 +2123,167 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
|
|||
return ret;
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
|
||||
FilesHolder& files_holder) {
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
// distribute id array to batches
|
||||
const uint64_t batch_size = 50;
|
||||
std::vector<std::vector<std::string>> id_groups;
|
||||
std::vector<std::string> temp_group;
|
||||
std::unordered_map<std::string, meta::CollectionSchema> map_collections;
|
||||
for (auto& collection : collections) {
|
||||
map_collections.insert(std::make_pair(collection.collection_id_, collection));
|
||||
temp_group.push_back(collection.collection_id_);
|
||||
if (temp_group.size() >= batch_size) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
temp_group.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (!temp_group.empty()) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
}
|
||||
|
||||
// perform query batch by batch
|
||||
Status ret;
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
||||
int to_index_count = 0, index_count = 0, backup_count = 0;
|
||||
for (auto group : id_groups) {
|
||||
mysqlpp::StoreQueryResult res;
|
||||
{
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
bool is_null_connection = (connectionPtr == nullptr);
|
||||
fiu_do_on("MySQLMetaImpl.FilesByType.null_connection", is_null_connection = true);
|
||||
fiu_do_on("MySQLMetaImpl.FilesByType.throw_exception", throw std::exception(););
|
||||
if (is_null_connection) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
std::string types;
|
||||
for (auto type : file_types) {
|
||||
if (!types.empty()) {
|
||||
types += ",";
|
||||
}
|
||||
types += std::to_string(type);
|
||||
}
|
||||
|
||||
// to ensure UpdateCollectionFiles to be a atomic operation
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
// since collection_id is a unique column we just need to check whether it exists or not
|
||||
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
|
||||
<< " engine_type, created_on, updated_time"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE table_id in (";
|
||||
for (size_t i = 0; i < group.size(); i++) {
|
||||
statement << mysqlpp::quote << group[i];
|
||||
if (i != group.size() - 1) {
|
||||
statement << ",";
|
||||
}
|
||||
}
|
||||
statement << ") AND file_type in (" << types << ");";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "FilesByType: " << statement.str();
|
||||
|
||||
res = statement.store();
|
||||
} // Scoped Connection
|
||||
|
||||
for (auto& resRow : res) {
|
||||
SegmentSchema file_schema;
|
||||
file_schema.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(file_schema.collection_id_);
|
||||
resRow["segment_id"].to_string(file_schema.segment_id_);
|
||||
resRow["file_id"].to_string(file_schema.file_id_);
|
||||
file_schema.file_type_ = resRow["file_type"];
|
||||
file_schema.file_size_ = resRow["file_size"];
|
||||
file_schema.row_count_ = resRow["row_count"];
|
||||
file_schema.date_ = resRow["date"];
|
||||
file_schema.engine_type_ = resRow["engine_type"];
|
||||
file_schema.created_on_ = resRow["created_on"];
|
||||
file_schema.updated_time_ = resRow["updated_time"];
|
||||
|
||||
auto& collection_schema = map_collections[file_schema.collection_id_];
|
||||
file_schema.dimension_ = collection_schema.dimension_;
|
||||
file_schema.index_file_size_ = collection_schema.index_file_size_;
|
||||
file_schema.index_params_ = collection_schema.index_params_;
|
||||
file_schema.metric_type_ = collection_schema.metric_type_;
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, file_schema);
|
||||
if (!status.ok()) {
|
||||
ret = status;
|
||||
continue;
|
||||
}
|
||||
|
||||
files_holder.MarkFile(file_schema);
|
||||
|
||||
int32_t file_type = resRow["file_type"];
|
||||
switch (file_type) {
|
||||
case (int)SegmentSchema::RAW:
|
||||
++raw_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW:
|
||||
++new_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_MERGE:
|
||||
++new_merge_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_INDEX:
|
||||
++new_index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::TO_INDEX:
|
||||
++to_index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::INDEX:
|
||||
++index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::BACKUP:
|
||||
++backup_count;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string msg = "Get collection files by type.";
|
||||
for (int file_type : file_types) {
|
||||
switch (file_type) {
|
||||
case (int)SegmentSchema::RAW:
|
||||
msg = msg + " raw files:" + std::to_string(raw_count);
|
||||
break;
|
||||
case (int)SegmentSchema::NEW:
|
||||
msg = msg + " new files:" + std::to_string(new_count);
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_MERGE:
|
||||
msg = msg + " new_merge files:" + std::to_string(new_merge_count);
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_INDEX:
|
||||
msg = msg + " new_index files:" + std::to_string(new_index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::TO_INDEX:
|
||||
msg = msg + " to_index files:" + std::to_string(to_index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::INDEX:
|
||||
msg = msg + " index files:" + std::to_string(index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::BACKUP:
|
||||
msg = msg + " backup files:" + std::to_string(backup_count);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
LOG_ENGINE_DEBUG_ << msg;
|
||||
return ret;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Failed to get files by type", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) {
|
||||
if (ids.empty()) {
|
||||
|
@ -2136,7 +2307,8 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hold
|
|||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
mysqlpp::Query statement = connectionPtr->query();
|
||||
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
|
||||
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
|
||||
<< " engine_type, created_on, updated_time"
|
||||
<< " FROM " << META_TABLEFILES;
|
||||
|
||||
std::stringstream idSS;
|
||||
|
@ -2167,12 +2339,14 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hold
|
|||
collection_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(collection_file.collection_id_);
|
||||
resRow["segment_id"].to_string(collection_file.segment_id_);
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
resRow["file_id"].to_string(collection_file.file_id_);
|
||||
collection_file.file_type_ = resRow["file_type"];
|
||||
collection_file.file_size_ = resRow["file_size"];
|
||||
collection_file.row_count_ = resRow["row_count"];
|
||||
collection_file.date_ = resRow["date"];
|
||||
collection_file.engine_type_ = resRow["engine_type"];
|
||||
collection_file.created_on_ = resRow["created_on"];
|
||||
collection_file.updated_time_ = resRow["updated_time"];
|
||||
|
||||
if (collections.find(collection_file.collection_id_) == collections.end()) {
|
||||
CollectionSchema collection_schema;
|
||||
|
@ -2390,7 +2564,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
|
|||
<< ")"
|
||||
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
|
||||
// LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
|
||||
|
||||
res = statement.store();
|
||||
}
|
||||
|
@ -2481,7 +2655,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
|
|||
<< " FROM " << META_TABLES << " WHERE state = " << std::to_string(CollectionSchema::TO_DELETE)
|
||||
<< ";";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
|
||||
// LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
|
||||
|
||||
mysqlpp::StoreQueryResult res = statement.store();
|
||||
|
||||
|
@ -2539,7 +2713,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
|
|||
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
|
||||
<< ";";
|
||||
|
||||
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
|
||||
// LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
|
||||
|
||||
mysqlpp::StoreQueryResult res = statement.store();
|
||||
|
||||
|
|
|
@ -124,6 +124,10 @@ class MySQLMetaImpl : public Meta {
|
|||
FilesByType(const std::string& collection_id, const std::vector<int>& file_types,
|
||||
FilesHolder& files_holder) override;
|
||||
|
||||
Status
|
||||
FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
|
||||
FilesHolder& files_holder) override;
|
||||
|
||||
Status
|
||||
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) override;
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <memory>
|
||||
#include <set>
|
||||
#include <sstream>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "MetaConsts.h"
|
||||
#include "db/IDGenerator.h"
|
||||
|
@ -1076,7 +1077,8 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil
|
|||
// perform query
|
||||
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
|
||||
&SegmentSchema::created_on_, &SegmentSchema::updated_time_);
|
||||
|
||||
auto match_collectionid = c(&SegmentSchema::collection_id_) == collection_id;
|
||||
|
||||
|
@ -1104,6 +1106,9 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil
|
|||
collection_file.row_count_ = std::get<6>(file);
|
||||
collection_file.date_ = std::get<7>(file);
|
||||
collection_file.engine_type_ = std::get<8>(file);
|
||||
collection_file.created_on_ = std::get<9>(file);
|
||||
collection_file.updated_time_ = std::get<10>(file);
|
||||
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
|
@ -1145,18 +1150,15 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection,
|
|||
return status;
|
||||
}
|
||||
|
||||
// distribute id array to batchs
|
||||
const int64_t batch_size = 50;
|
||||
// distribute id array to batches
|
||||
const uint64_t batch_size = 50;
|
||||
std::vector<std::vector<std::string>> id_groups;
|
||||
std::vector<std::string> temp_group;
|
||||
int64_t count = 1;
|
||||
for (auto& id : partition_id_array) {
|
||||
temp_group.push_back(id);
|
||||
count++;
|
||||
if (count >= batch_size) {
|
||||
if (temp_group.size() >= batch_size) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
temp_group.clear();
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1171,7 +1173,8 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection,
|
|||
auto select_columns =
|
||||
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
|
||||
&SegmentSchema::created_on_, &SegmentSchema::updated_time_);
|
||||
|
||||
auto match_collectionid = in(&SegmentSchema::collection_id_, group);
|
||||
|
||||
|
@ -1197,6 +1200,8 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection,
|
|||
collection_file.row_count_ = std::get<6>(file);
|
||||
collection_file.date_ = std::get<7>(file);
|
||||
collection_file.engine_type_ = std::get<8>(file);
|
||||
collection_file.created_on_ = std::get<9>(file);
|
||||
collection_file.updated_time_ = std::get<10>(file);
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
|
@ -1239,9 +1244,11 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file
|
|||
}
|
||||
|
||||
// get files to merge
|
||||
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_);
|
||||
auto select_columns =
|
||||
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
|
||||
&SegmentSchema::created_on_, &SegmentSchema::updated_time_);
|
||||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
{
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
|
@ -1268,7 +1275,10 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file
|
|||
collection_file.file_type_ = std::get<4>(file);
|
||||
collection_file.row_count_ = std::get<6>(file);
|
||||
collection_file.date_ = std::get<7>(file);
|
||||
collection_file.created_on_ = std::get<8>(file);
|
||||
collection_file.engine_type_ = std::get<8>(file);
|
||||
collection_file.created_on_ = std::get<9>(file);
|
||||
collection_file.updated_time_ = std::get<10>(file);
|
||||
|
||||
collection_file.dimension_ = collection_schema.dimension_;
|
||||
collection_file.index_file_size_ = collection_schema.index_file_size_;
|
||||
collection_file.index_params_ = collection_schema.index_params_;
|
||||
|
@ -1302,10 +1312,11 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) {
|
|||
|
||||
server::MetricCollector metric;
|
||||
|
||||
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
|
||||
&SegmentSchema::created_on_);
|
||||
auto select_columns =
|
||||
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
|
||||
&SegmentSchema::created_on_, &SegmentSchema::updated_time_);
|
||||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
{
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
|
@ -1329,6 +1340,7 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) {
|
|||
collection_file.date_ = std::get<7>(file);
|
||||
collection_file.engine_type_ = std::get<8>(file);
|
||||
collection_file.created_on_ = std::get<9>(file);
|
||||
collection_file.updated_time_ = std::get<10>(file);
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, collection_file);
|
||||
if (!status.ok()) {
|
||||
|
@ -1388,7 +1400,8 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
|
|||
auto select_columns =
|
||||
columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
|
||||
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
|
||||
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_);
|
||||
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_,
|
||||
&SegmentSchema::updated_time_);
|
||||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
{
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
|
@ -1413,6 +1426,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
|
|||
file_schema.date_ = std::get<6>(file);
|
||||
file_schema.engine_type_ = std::get<7>(file);
|
||||
file_schema.created_on_ = std::get<8>(file);
|
||||
file_schema.updated_time_ = std::get<9>(file);
|
||||
|
||||
file_schema.dimension_ = collection_schema.dimension_;
|
||||
file_schema.index_file_size_ = collection_schema.index_file_size_;
|
||||
|
@ -1476,6 +1490,146 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
|
|||
return ret;
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections,
|
||||
const std::vector<int>& file_types,
|
||||
FilesHolder& files_holder) {
|
||||
if (file_types.empty()) {
|
||||
return Status(DB_ERROR, "file types array is empty");
|
||||
}
|
||||
|
||||
Status ret = Status::OK();
|
||||
|
||||
try {
|
||||
fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());
|
||||
|
||||
// distribute id array to batches
|
||||
const uint64_t batch_size = 50;
|
||||
std::vector<std::vector<std::string>> id_groups;
|
||||
std::vector<std::string> temp_group;
|
||||
std::unordered_map<std::string, meta::CollectionSchema> map_collections;
|
||||
for (auto& collection : collections) {
|
||||
map_collections.insert(std::make_pair(collection.collection_id_, collection));
|
||||
temp_group.push_back(collection.collection_id_);
|
||||
if (temp_group.size() >= batch_size) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
temp_group.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (!temp_group.empty()) {
|
||||
id_groups.emplace_back(temp_group);
|
||||
}
|
||||
|
||||
// perform query batch by batch
|
||||
Status ret;
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
|
||||
int to_index_count = 0, index_count = 0, backup_count = 0;
|
||||
for (auto group : id_groups) {
|
||||
auto select_columns =
|
||||
columns(&SegmentSchema::id_,
|
||||
&SegmentSchema::collection_id_,
|
||||
&SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_,
|
||||
&SegmentSchema::file_type_,
|
||||
&SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_,
|
||||
&SegmentSchema::date_,
|
||||
&SegmentSchema::engine_type_,
|
||||
&SegmentSchema::created_on_,
|
||||
&SegmentSchema::updated_time_);
|
||||
decltype(ConnectorPtr->select(select_columns)) selected;
|
||||
|
||||
auto match_collectionid = in(&SegmentSchema::collection_id_, group);
|
||||
|
||||
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
|
||||
(int)SegmentSchema::INDEX};
|
||||
auto match_type = in(&SegmentSchema::file_type_, file_types);
|
||||
{
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
auto filter = where(match_collectionid and match_type);
|
||||
selected = ConnectorPtr->select(select_columns, filter);
|
||||
}
|
||||
|
||||
for (auto& file : selected) {
|
||||
SegmentSchema file_schema;
|
||||
file_schema.id_ = std::get<0>(file);
|
||||
file_schema.collection_id_ = std::get<1>(file);
|
||||
file_schema.segment_id_ = std::get<2>(file);
|
||||
file_schema.file_id_ = std::get<3>(file);
|
||||
file_schema.file_type_ = std::get<4>(file);
|
||||
file_schema.file_size_ = std::get<5>(file);
|
||||
file_schema.row_count_ = std::get<6>(file);
|
||||
file_schema.date_ = std::get<7>(file);
|
||||
file_schema.engine_type_ = std::get<8>(file);
|
||||
file_schema.created_on_ = std::get<9>(file);
|
||||
file_schema.updated_time_ = std::get<10>(file);
|
||||
|
||||
auto& collection_schema = map_collections[file_schema.collection_id_];
|
||||
file_schema.dimension_ = collection_schema.dimension_;
|
||||
file_schema.index_file_size_ = collection_schema.index_file_size_;
|
||||
file_schema.index_params_ = collection_schema.index_params_;
|
||||
file_schema.metric_type_ = collection_schema.metric_type_;
|
||||
|
||||
switch (file_schema.file_type_) {
|
||||
case (int)SegmentSchema::RAW:++raw_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW:++new_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_MERGE:++new_merge_count;
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_INDEX:++new_index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::TO_INDEX:++to_index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::INDEX:++index_count;
|
||||
break;
|
||||
case (int)SegmentSchema::BACKUP:++backup_count;
|
||||
break;
|
||||
default:break;
|
||||
}
|
||||
|
||||
auto status = utils::GetCollectionFilePath(options_, file_schema);
|
||||
if (!status.ok()) {
|
||||
ret = status;
|
||||
}
|
||||
|
||||
files_holder.MarkFile(file_schema);
|
||||
}
|
||||
}
|
||||
|
||||
std::string msg = "Get collection files by type.";
|
||||
for (int file_type : file_types) {
|
||||
switch (file_type) {
|
||||
case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count);
|
||||
break;
|
||||
case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count);
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_MERGE:
|
||||
msg = msg + " new_merge files:" + std::to_string(new_merge_count);
|
||||
break;
|
||||
case (int)SegmentSchema::NEW_INDEX:
|
||||
msg = msg + " new_index files:" + std::to_string(new_index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::TO_INDEX:
|
||||
msg = msg + " to_index files:" + std::to_string(to_index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count);
|
||||
break;
|
||||
case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count);
|
||||
break;
|
||||
default:break;
|
||||
}
|
||||
}
|
||||
LOG_ENGINE_DEBUG_ << msg;
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when check non index files", e.what());
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) {
|
||||
if (ids.empty()) {
|
||||
|
@ -1486,9 +1640,17 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hol
|
|||
server::MetricCollector metric;
|
||||
fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception());
|
||||
|
||||
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
|
||||
auto select_columns = columns(&SegmentSchema::id_,
|
||||
&SegmentSchema::collection_id_,
|
||||
&SegmentSchema::segment_id_,
|
||||
&SegmentSchema::file_id_,
|
||||
&SegmentSchema::file_type_,
|
||||
&SegmentSchema::file_size_,
|
||||
&SegmentSchema::row_count_,
|
||||
&SegmentSchema::date_,
|
||||
&SegmentSchema::engine_type_,
|
||||
&SegmentSchema::created_on_,
|
||||
&SegmentSchema::updated_time_);
|
||||
|
||||
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
|
||||
(int)SegmentSchema::INDEX};
|
||||
|
@ -1518,6 +1680,8 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hol
|
|||
collection_file.row_count_ = std::get<6>(file);
|
||||
collection_file.date_ = std::get<7>(file);
|
||||
collection_file.engine_type_ = std::get<8>(file);
|
||||
collection_file.created_on_ = std::get<9>(file);
|
||||
collection_file.updated_time_ = std::get<10>(file);
|
||||
|
||||
if (collections.find(collection_file.collection_id_) == collections.end()) {
|
||||
CollectionSchema collection_schema;
|
||||
|
@ -1943,6 +2107,9 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
|
|||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
|
||||
if (selected.size() == 0) {
|
||||
EnvironmentSchema env;
|
||||
|
|
|
@ -126,6 +126,10 @@ class SqliteMetaImpl : public Meta {
|
|||
FilesByType(const std::string& collection_id, const std::vector<int>& file_types,
|
||||
FilesHolder& files_holder) override;
|
||||
|
||||
Status
|
||||
FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
|
||||
FilesHolder& files_holder) override;
|
||||
|
||||
Status
|
||||
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) override;
|
||||
|
||||
|
|
|
@ -260,8 +260,10 @@ WalManager::GetNextRecord(MXLogRecord& record) {
|
|||
}
|
||||
}
|
||||
|
||||
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn "
|
||||
<< record.lsn;
|
||||
if (record.type != MXLogType::None) {
|
||||
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn "
|
||||
<< record.lsn;
|
||||
}
|
||||
return error_code;
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,13 @@ DBWrapper::StartService() {
|
|||
return s;
|
||||
}
|
||||
|
||||
// metric config
|
||||
s = config.GetMetricConfigEnableMonitor(opt.metric_enable_);
|
||||
if (!s.ok()) {
|
||||
std::cerr << s.ToString() << std::endl;
|
||||
return s;
|
||||
}
|
||||
|
||||
// cache config
|
||||
s = config.GetCacheConfigCacheInsertData(opt.insert_cache_immediately_);
|
||||
if (!s.ok()) {
|
||||
|
@ -255,7 +262,7 @@ DBWrapper::PreloadCollections(const std::string& preload_collections) {
|
|||
db_->AllCollections(table_schema_array);
|
||||
|
||||
for (auto& schema : table_schema_array) {
|
||||
auto status = db_->PreloadCollection(schema.collection_id_);
|
||||
auto status = db_->PreloadCollection(nullptr, schema.collection_id_);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
@ -264,7 +271,7 @@ DBWrapper::PreloadCollections(const std::string& preload_collections) {
|
|||
std::vector<std::string> collection_names;
|
||||
StringHelpFunctions::SplitStringByDelimeter(preload_collections, ",", collection_names);
|
||||
for (auto& name : collection_names) {
|
||||
auto status = db_->PreloadCollection(name);
|
||||
auto status = db_->PreloadCollection(nullptr, name);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ CompactRequest::OnExecute() {
|
|||
rc.RecordSection("check validation");
|
||||
|
||||
// step 2: check collection existence
|
||||
status = DBWrapper::DB()->Compact(collection_name_, compact_threshold_);
|
||||
status = DBWrapper::DB()->Compact(context_, collection_name_, compact_threshold_);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ GetVectorsByIDRequest::OnExecute() {
|
|||
}
|
||||
|
||||
// step 2: get vector data, now only support get one id
|
||||
return DBWrapper::DB()->GetVectorsByID(collection_name_, ids_, vectors_);
|
||||
return DBWrapper::DB()->GetVectorsByID(collection_schema, ids_, vectors_);
|
||||
} catch (std::exception& ex) {
|
||||
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
|
||||
}
|
||||
|
|
|
@ -60,8 +60,9 @@ PreloadCollectionRequest::OnExecute() {
|
|||
}
|
||||
}
|
||||
|
||||
// step 2: check collection existence
|
||||
status = DBWrapper::DB()->PreloadCollection(collection_name_);
|
||||
// step 2: force load collection data into cache
|
||||
// load each segment and insert into cache even cache capacity is not enough
|
||||
status = DBWrapper::DB()->PreloadCollection(context_, collection_name_, true);
|
||||
fiu_do_on("PreloadCollectionRequest.OnExecute.preload_collection_fail",
|
||||
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
|
||||
fiu_do_on("PreloadCollectionRequest.OnExecute.throw_std_exception", throw std::exception());
|
||||
|
|
|
@ -32,7 +32,7 @@ namespace {
|
|||
|
||||
static const char* COLLECTION_NAME = "test_group";
|
||||
static constexpr int64_t COLLECTION_DIM = 256;
|
||||
static constexpr int64_t VECTOR_COUNT = 25000;
|
||||
static constexpr int64_t VECTOR_COUNT = 5000;
|
||||
static constexpr int64_t INSERT_LOOP = 100;
|
||||
static constexpr int64_t SECONDS_EACH_HOUR = 3600;
|
||||
static constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
|
||||
|
@ -180,7 +180,7 @@ TEST_F(DBTest, DB_TEST) {
|
|||
milvus::engine::ResultIds result_ids;
|
||||
milvus::engine::ResultDistances result_distances;
|
||||
int k = 10;
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
INIT_TIMER;
|
||||
std::stringstream ss;
|
||||
|
@ -214,7 +214,7 @@ TEST_F(DBTest, DB_TEST) {
|
|||
/* LOG(DEBUG) << ss.str(); */
|
||||
}
|
||||
ASSERT_TRUE(count >= prev_count);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -236,7 +236,7 @@ TEST_F(DBTest, DB_TEST) {
|
|||
stat = db_->Flush();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(20));
|
||||
}
|
||||
|
||||
search.join();
|
||||
|
@ -455,34 +455,34 @@ TEST_F(DBTest, PRELOAD_TEST) {
|
|||
db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish
|
||||
|
||||
int64_t prev_cache_usage = milvus::cache::CpuCacheMgr::GetInstance()->CacheUsage();
|
||||
stat = db_->PreloadCollection(COLLECTION_NAME);
|
||||
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
int64_t cur_cache_usage = milvus::cache::CpuCacheMgr::GetInstance()->CacheUsage();
|
||||
ASSERT_TRUE(prev_cache_usage < cur_cache_usage);
|
||||
|
||||
FIU_ENABLE_FIU("SqliteMetaImpl.FilesToSearch.throw_exception");
|
||||
stat = db_->PreloadCollection(COLLECTION_NAME);
|
||||
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("SqliteMetaImpl.FilesToSearch.throw_exception");
|
||||
|
||||
// create a partition
|
||||
stat = db_->CreatePartition(COLLECTION_NAME, "part0", "0");
|
||||
ASSERT_TRUE(stat.ok());
|
||||
stat = db_->PreloadCollection(COLLECTION_NAME);
|
||||
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
FIU_ENABLE_FIU("DBImpl.PreloadCollection.null_engine");
|
||||
stat = db_->PreloadCollection(COLLECTION_NAME);
|
||||
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("DBImpl.PreloadCollection.null_engine");
|
||||
|
||||
FIU_ENABLE_FIU("DBImpl.PreloadCollection.exceed_cache");
|
||||
stat = db_->PreloadCollection(COLLECTION_NAME);
|
||||
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("DBImpl.PreloadCollection.exceed_cache");
|
||||
|
||||
FIU_ENABLE_FIU("DBImpl.PreloadCollection.engine_throw_exception");
|
||||
stat = db_->PreloadCollection(COLLECTION_NAME);
|
||||
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
fiu_disable("DBImpl.PreloadCollection.engine_throw_exception");
|
||||
}
|
||||
|
@ -535,15 +535,15 @@ TEST_F(DBTest, SHUTDOWN_TEST) {
|
|||
stat = db_->DeleteVectors(collection_info.collection_id_, ids_to_delete);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
|
||||
stat = db_->Compact(collection_info.collection_id_);
|
||||
stat = db_->Compact(dummy_context_, collection_info.collection_id_);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
|
||||
std::vector<milvus::engine::VectorsData> vectors;
|
||||
std::vector<int64_t> id_array = {0};
|
||||
stat = db_->GetVectorsByID(collection_info.collection_id_, id_array, vectors);
|
||||
stat = db_->GetVectorsByID(collection_info, id_array, vectors);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
|
||||
stat = db_->PreloadCollection(collection_info.collection_id_);
|
||||
stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
|
||||
uint64_t row_count = 0;
|
||||
|
@ -612,7 +612,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) {
|
|||
ASSERT_EQ(xb.id_array_.size(), nb);
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
db_->Stop();
|
||||
fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache");
|
||||
fiu_disable("SqliteMetaImpl.FilesToMerge.throw_exception");
|
||||
|
@ -620,7 +619,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) {
|
|||
|
||||
FIU_ENABLE_FIU("DBImpl.StartMetricTask.InvalidTotalCache");
|
||||
db_->Start();
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
db_->Stop();
|
||||
fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache");
|
||||
}
|
||||
|
@ -644,7 +642,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_2) {
|
|||
}
|
||||
|
||||
FIU_ENABLE_FIU("SqliteMetaImpl.CreateCollectionFile.throw_exception");
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
db_->Stop();
|
||||
fiu_disable("SqliteMetaImpl.CreateCollectionFile.throw_exception");
|
||||
}
|
||||
|
@ -669,7 +666,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_3) {
|
|||
|
||||
FIU_ENABLE_FIU("DBImpl.MergeFiles.Serialize_ThrowException");
|
||||
db_->Start();
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
db_->Stop();
|
||||
fiu_disable("DBImpl.MergeFiles.Serialize_ThrowException");
|
||||
}
|
||||
|
@ -694,7 +690,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_4) {
|
|||
|
||||
FIU_ENABLE_FIU("DBImpl.MergeFiles.Serialize_ErrorStatus");
|
||||
db_->Start();
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
db_->Stop();
|
||||
fiu_disable("DBImpl.MergeFiles.Serialize_ErrorStatus");
|
||||
}
|
||||
|
@ -934,11 +929,9 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
|
|||
BuildVectors(nb, i, xb);
|
||||
|
||||
db_->InsertVectors(COLLECTION_NAME, "", xb);
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
db_->Flush();
|
||||
db_->Size(size);
|
||||
LOG(DEBUG) << "size=" << size;
|
||||
ASSERT_LE(size, 1 * milvus::engine::GB);
|
||||
|
@ -981,8 +974,6 @@ TEST_F(DBTest2, DELETE_TEST) {
|
|||
fiu_disable("DBImpl.DropCollectionRecursively.failed");
|
||||
|
||||
stat = db_->DropCollection(COLLECTION_NAME);
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
db_->HasCollection(COLLECTION_NAME, has_collection);
|
||||
|
@ -1183,7 +1174,9 @@ TEST_F(DBTest2, FLUSH_NON_EXISTING_COLLECTION) {
|
|||
TEST_F(DBTest2, GET_VECTOR_NON_EXISTING_COLLECTION) {
|
||||
std::vector<milvus::engine::VectorsData> vectors;
|
||||
std::vector<int64_t> id_array = {0};
|
||||
auto status = db_->GetVectorsByID("non_existing", id_array, vectors);
|
||||
milvus::engine::meta::CollectionSchema collection_info;
|
||||
collection_info.collection_id_ = "non_existing";
|
||||
auto status = db_->GetVectorsByID(collection_info, id_array, vectors);
|
||||
ASSERT_FALSE(status.ok());
|
||||
}
|
||||
|
||||
|
@ -1203,7 +1196,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) {
|
|||
|
||||
std::vector<milvus::engine::VectorsData> vectors;
|
||||
std::vector<int64_t> empty_array;
|
||||
stat = db_->GetVectorsByID(COLLECTION_NAME, empty_array, vectors);
|
||||
stat = db_->GetVectorsByID(collection_info, empty_array, vectors);
|
||||
ASSERT_FALSE(stat.ok());
|
||||
|
||||
stat = db_->InsertVectors(collection_info.collection_id_, partition_tag, qxb);
|
||||
|
@ -1211,7 +1204,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) {
|
|||
|
||||
db_->Flush(collection_info.collection_id_);
|
||||
|
||||
stat = db_->GetVectorsByID(COLLECTION_NAME, qxb.id_array_, vectors);
|
||||
stat = db_->GetVectorsByID(collection_info, qxb.id_array_, vectors);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(vectors.size(), qxb.id_array_.size());
|
||||
ASSERT_EQ(vectors[0].float_data_.size(), COLLECTION_DIM);
|
||||
|
@ -1221,7 +1214,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) {
|
|||
}
|
||||
|
||||
std::vector<int64_t> invalid_array = {-1, -1};
|
||||
stat = db_->GetVectorsByID(COLLECTION_NAME, empty_array, vectors);
|
||||
stat = db_->GetVectorsByID(collection_info, empty_array, vectors);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
for (auto& vector : vectors) {
|
||||
ASSERT_EQ(vector.vector_count_, 0);
|
||||
|
@ -1344,7 +1337,7 @@ TEST_F(DBTest2, SEARCH_WITH_DIFFERENT_INDEX) {
|
|||
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
stat = db_->PreloadCollection(collection_info.collection_id_);
|
||||
stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
int topk = 10, nprobe = 10;
|
||||
|
@ -1369,7 +1362,7 @@ result_distances);
|
|||
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
stat = db_->PreloadCollection(collection_info.collection_id_);
|
||||
stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
for (auto id : ids_to_search) {
|
||||
|
|
|
@ -72,7 +72,7 @@ TEST_F(MySqlDBTest, DB_TEST) {
|
|||
milvus::engine::ResultIds result_ids;
|
||||
milvus::engine::ResultDistances result_distances;
|
||||
int k = 10;
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
INIT_TIMER;
|
||||
std::stringstream ss;
|
||||
|
@ -106,7 +106,7 @@ TEST_F(MySqlDBTest, DB_TEST) {
|
|||
/* LOG(DEBUG) << ss.str(); */
|
||||
}
|
||||
ASSERT_TRUE(count >= prev_count);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -128,7 +128,7 @@ TEST_F(MySqlDBTest, DB_TEST) {
|
|||
stat = db_->Flush();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(20));
|
||||
}
|
||||
|
||||
search.join();
|
||||
|
@ -183,7 +183,6 @@ TEST_F(MySqlDBTest, SEARCH_TEST) {
|
|||
stat = db_->InsertVectors(COLLECTION_NAME, "", xb);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
// sleep(2); // wait until build index finish
|
||||
stat = db_->Flush();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
|
@ -241,10 +240,8 @@ TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) {
|
|||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, i, xb);
|
||||
db_->InsertVectors(COLLECTION_NAME, "", xb);
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
stat = db_->Flush();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
|
@ -288,16 +285,12 @@ TEST_F(MySqlDBTest, DELETE_TEST) {
|
|||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, i, xb);
|
||||
db_->InsertVectors(COLLECTION_NAME, "", xb);
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
|
||||
stat = db_->Flush();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
stat = db_->DropCollection(COLLECTION_NAME);
|
||||
//// std::cout << "5 sec start" << std::endl;
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
//// std::cout << "5 sec finish" << std::endl;
|
||||
ASSERT_TRUE(stat.ok());
|
||||
//
|
||||
db_->HasCollection(COLLECTION_NAME, has_collection);
|
||||
|
|
|
@ -287,7 +287,7 @@ TEST_F(DeleteTest, delete_before_create_index) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 10000;
|
||||
int64_t nb = 5000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -369,7 +369,7 @@ TEST_F(DeleteTest, delete_with_index) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 10000;
|
||||
int64_t nb = 5000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -451,7 +451,7 @@ TEST_F(DeleteTest, delete_multiple_times_with_index) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 100000;
|
||||
int64_t nb = 5000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -749,7 +749,7 @@ TEST_F(CompactTest, compact_basic) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(row_count, nb - 2);
|
||||
|
||||
stat = db_->Compact(collection_info.collection_id_);
|
||||
stat = db_->Compact(dummy_context_, collection_info.collection_id_);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
const int topk = 1, nprobe = 1;
|
||||
|
@ -834,7 +834,7 @@ TEST_F(CompactTest, compact_with_index) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(row_count, nb - ids_to_delete.size());
|
||||
|
||||
stat = db_->Compact(collection_info.collection_id_);
|
||||
stat = db_->Compact(dummy_context_, collection_info.collection_id_);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
stat = db_->GetCollectionRowCount(collection_info.collection_id_, row_count);
|
||||
|
@ -864,6 +864,6 @@ TEST_F(CompactTest, compact_with_index) {
|
|||
}
|
||||
|
||||
TEST_F(CompactTest, compact_non_existing_table) {
|
||||
auto status = db_->Compact("non_existing_table");
|
||||
auto status = db_->Compact(dummy_context_, "non_existing_table");
|
||||
ASSERT_FALSE(status.ok());
|
||||
}
|
||||
|
|
|
@ -282,7 +282,7 @@ TEST_F(DBTest, COMPACT_TEST) {
|
|||
stat = db_->Flush();
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
stat = db_->Compact(collection_info.collection_id_);
|
||||
stat = db_->Compact(dummy_context_, collection_info.collection_id_);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
const int topk = 1, nprobe = 1;
|
||||
|
|
|
@ -318,20 +318,15 @@ TEST_F(MemManagerTest2, INSERT_TEST) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
|
||||
int insert_loop = 20;
|
||||
int insert_loop = 10;
|
||||
for (int i = 0; i < insert_loop; ++i) {
|
||||
int64_t nb = 40960;
|
||||
int64_t nb = 4096;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
milvus::engine::IDNumbers vector_ids;
|
||||
stat = db_->InsertVectors(GetCollectionName(), "", xb);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
}
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
LOG(DEBUG) << "total_time spent in INSERT_TEST (ms) : " << total_time;
|
||||
}
|
||||
|
||||
TEST_F(MemManagerTest2, INSERT_BINARY_TEST) {
|
||||
|
|
|
@ -86,7 +86,7 @@ TEST_F(SearchByIdTest, BASIC_TEST) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 100000;
|
||||
int64_t nb = 10000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -185,7 +185,7 @@ TEST_F(SearchByIdTest, WITH_INDEX_TEST) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 10000;
|
||||
int64_t nb = 5000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -246,7 +246,7 @@ TEST_F(SearchByIdTest, WITH_DELETE_TEST) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 100000;
|
||||
int64_t nb = 10000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -315,7 +315,7 @@ TEST_F(GetVectorByIdTest, BASIC_TEST) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 100000;
|
||||
int64_t nb = 10000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -349,7 +349,7 @@ TEST_F(GetVectorByIdTest, BASIC_TEST) {
|
|||
milvus::engine::ResultDistances result_distances;
|
||||
|
||||
std::vector<milvus::engine::VectorsData> vectors;
|
||||
stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors);
|
||||
stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
stat = db_->Query(dummy_context_, collection_info.collection_id_, tags, topk, json_params, vectors[0], result_ids,
|
||||
|
@ -369,7 +369,7 @@ TEST_F(GetVectorByIdTest, WITH_INDEX_TEST) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 10000;
|
||||
int64_t nb = 5000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -409,7 +409,7 @@ TEST_F(GetVectorByIdTest, WITH_INDEX_TEST) {
|
|||
milvus::engine::ResultDistances result_distances;
|
||||
|
||||
std::vector<milvus::engine::VectorsData> vectors;
|
||||
stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors);
|
||||
stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
|
||||
stat = db_->Query(dummy_context_, collection_info.collection_id_, tags, topk, json_params, vectors[0], result_ids,
|
||||
|
@ -429,7 +429,7 @@ TEST_F(GetVectorByIdTest, WITH_DELETE_TEST) {
|
|||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
|
||||
|
||||
int64_t nb = 100000;
|
||||
int64_t nb = 10000;
|
||||
milvus::engine::VectorsData xb;
|
||||
BuildVectors(nb, xb);
|
||||
|
||||
|
@ -469,7 +469,7 @@ TEST_F(GetVectorByIdTest, WITH_DELETE_TEST) {
|
|||
milvus::engine::ResultDistances result_distances;
|
||||
|
||||
std::vector<milvus::engine::VectorsData> vectors;
|
||||
stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors);
|
||||
stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
for (auto& vector : vectors) {
|
||||
ASSERT_EQ(vector.vector_count_, 0);
|
||||
|
@ -541,7 +541,7 @@ TEST_F(SearchByIdTest, BINARY_TEST) {
|
|||
milvus::engine::ResultDistances result_distances;
|
||||
|
||||
std::vector<milvus::engine::VectorsData> vectors;
|
||||
stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors);
|
||||
stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
ASSERT_EQ(vectors.size(), ids_to_search.size());
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ TEST_F(MetricTest, METRIC_TEST) {
|
|||
// std::vector<std::string> tags;
|
||||
// milvus::engine::ResultIds result_ids;
|
||||
// milvus::engine::ResultDistances result_distances;
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
INIT_TIMER;
|
||||
std::stringstream ss;
|
||||
|
@ -115,11 +115,11 @@ TEST_F(MetricTest, METRIC_TEST) {
|
|||
// }
|
||||
}
|
||||
ASSERT_TRUE(count >= prev_count);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
});
|
||||
|
||||
int loop = 10000;
|
||||
int loop = 100;
|
||||
|
||||
for (auto i = 0; i < loop; ++i) {
|
||||
if (i == 40) {
|
||||
|
@ -131,7 +131,7 @@ TEST_F(MetricTest, METRIC_TEST) {
|
|||
db_->InsertVectors(group_name, "", xb);
|
||||
ASSERT_EQ(xb.id_array_.size(), nb);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(2000));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(20));
|
||||
}
|
||||
|
||||
search.join();
|
||||
|
|
|
@ -813,7 +813,7 @@ TEST(UtilTest, ROLLOUTHANDLER_TEST) {
|
|||
TEST(UtilTest, THREADPOOL_TEST) {
|
||||
auto thread_pool_ptr = std::make_unique<milvus::ThreadPool>(3);
|
||||
auto fun = [](int i) {
|
||||
sleep(1);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
};
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
thread_pool_ptr->enqueue(fun, i);
|
||||
|
|
Loading…
Reference in New Issue