Merge pull request #607 from yhmo/ongoing

#596 Frequently insert operation cost too much disk space
pull/652/head
Jin Hai 2019-12-02 15:06:19 +08:00 committed by GitHub
commit 51e8be0130
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 652 additions and 297 deletions

View File

@ -40,6 +40,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#552 - Server down during building index_type: IVF_PQ using GPU-edition
- \#561 - Milvus server should report exception/error message or terminate on mysql metadata backend error
- \#579 - Build index hang in GPU version when gpu_resources disabled
- \#596 - Frequently insert operation cost too much disk space
- \#599 - Build index log is incorrect
- \#602 - Optimizer specify wrong gpu_id
- \#606 - No log generated during building index with CPU

View File

@ -176,6 +176,11 @@ Cache<ItemObj>::print() {
{
std::lock_guard<std::mutex> lock(mutex_);
cache_count = lru_.size();
#if 0
for (auto it = lru_.begin(); it != lru_.end(); ++it) {
SERVER_LOG_DEBUG << it->first;
}
#endif
}
SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count;

View File

@ -52,8 +52,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1;
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!");
void
@ -370,7 +368,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
WaitMergeFileFinish();
// step 4: wait and build index
status = CleanFailedIndexFileOfTable(table_id);
status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
status = BuildTableIndexRecursively(table_id, index);
return status;
@ -504,7 +502,9 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
TimeRecorder rc("");
// step 1: get files to search
// step 1: construct search job
auto status = ongoing_files_checker_.MarkOngoingFiles(files);
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
for (auto& file : files) {
@ -512,9 +512,11 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
job->AddIndexFile(file_ptr);
}
// step 2: put search task to scheduler
// step 2: put search job to scheduler and wait result
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitResult();
status = ongoing_files_checker_.UnmarkOngoingFiles(files);
if (!job->GetStatus().ok()) {
return job->GetStatus();
}
@ -693,7 +695,6 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
auto file_schema = file;
file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
index_size = index->Size();
if (index_size >= file_schema.index_file_size_) {
@ -703,20 +704,27 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
// step 3: serialize to disk
try {
index->Serialize();
status = index->Serialize();
if (!status.ok()) {
ENGINE_LOG_ERROR << status.message();
}
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
status = Status(DB_ERROR, msg);
}
if (!status.ok()) {
// if failed to serialize merge file to disk
// typical error: out of disk space, out of memory or permition denied
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
<< ", possible out of disk space" << std::endl;
ENGINE_LOG_ERROR << "Failed to persist merged file: " << table_file.location_
<< ", possible out of disk space or memory";
return Status(DB_ERROR, msg);
return status;
}
// step 4: update table files state
@ -751,13 +759,15 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
for (auto& kv : raw_files) {
auto files = kv.second;
meta::TableFilesSchema& files = kv.second;
if (files.size() < options_.merge_trigger_number_) {
ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
continue;
}
status = ongoing_files_checker_.MarkOngoingFiles(files);
MergeFiles(table_id, kv.first, kv.second);
status = ongoing_files_checker_.UnmarkOngoingFiles(files);
if (shutting_down_.load(std::memory_order_acquire)) {
ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
@ -788,16 +798,12 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
meta_ptr_->Archive();
{
uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds
meta_ptr_->CleanUpCacheWithTTL(ttl);
}
{
uint64_t ttl = 5 * meta::M_SEC; // default: file will be deleted after few minutes
uint64_t ttl = 10 * meta::SECOND; // default: file will be hard-deleted few seconds after soft-deleted
if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
ttl = meta::D_SEC;
ttl = meta::H_SEC;
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_);
}
// ENGINE_LOG_TRACE << " Background compaction thread exit";
@ -833,14 +839,15 @@ DBImpl::StartBuildIndexTask(bool force) {
void
DBImpl::BackgroundBuildIndex() {
// ENGINE_LOG_TRACE << "Background build index thread start";
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status = IgnoreFailedIndexFiles(to_index_files);
Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
if (!to_index_files.empty()) {
ENGINE_LOG_DEBUG << "Background build index thread begin";
status = ongoing_files_checker_.MarkOngoingFiles(to_index_files);
// step 2: put build index task to scheduler
std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr>> job2file_map;
for (auto& file : to_index_files) {
@ -851,6 +858,7 @@ DBImpl::BackgroundBuildIndex() {
job2file_map.push_back(std::make_pair(job, file_ptr));
}
// step 3: wait build index finished and mark failed files
for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
scheduler::BuildIndexJobPtr job = iter->first;
meta::TableFileSchema& file_schema = *(iter->second.get());
@ -859,17 +867,17 @@ DBImpl::BackgroundBuildIndex() {
Status status = job->GetStatus();
ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();
MarkFailedIndexFile(file_schema);
index_failed_checker_.MarkFailedIndexFile(file_schema);
} else {
MarkSucceedIndexFile(file_schema);
ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
index_failed_checker_.MarkSucceedIndexFile(file_schema);
}
status = ongoing_files_checker_.UnmarkOngoingFile(file_schema);
}
ENGINE_LOG_DEBUG << "Background build index thread finished";
}
// ENGINE_LOG_TRACE << "Background build index thread exit";
}
Status
@ -894,6 +902,8 @@ DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>
Status
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
meta::TableFilesSchema& files) {
ENGINE_LOG_DEBUG << "Collect files from table: " << table_id;
meta::DatePartionedTableFilesSchema date_files;
auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files);
if (!status.ok()) {
@ -934,7 +944,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da
if (dates.empty()) {
status = mem_mgr_->EraseMemVector(table_id); // not allow insert
status = meta_ptr_->DropTable(table_id); // soft delete table
CleanFailedIndexFileOfTable(table_id);
index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
// scheduler will determine when to delete table files
auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
@ -1014,7 +1024,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
GetFilesToBuildIndex(table_id, file_types, table_files);
times++;
IgnoreFailedIndexFiles(table_files);
index_failed_checker_.IgnoreFailedIndexFiles(table_files);
}
// build index for partition
@ -1029,7 +1039,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
// failed to build index for some files, return error
std::vector<std::string> failed_files;
GetFailedIndexFileOfTable(table_id, failed_files);
index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files);
if (!failed_files.empty()) {
std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) +
((failed_files.size() == 1) ? " file" : " files");
@ -1043,7 +1053,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
CleanFailedIndexFileOfTable(table_id);
index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
auto status = meta_ptr_->DropTableIndex(table_id);
if (!status.ok()) {
return status;
@ -1086,86 +1096,5 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c
return Status::OK();
}
Status
DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
index_failed_files_.erase(table_id); // rebuild failed index files for this table
return Status::OK();
}
Status
DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) {
failed_files.clear();
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(table_id);
if (iter != index_failed_files_.end()) {
FileID2FailedTimes& failed_map = iter->second;
for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) {
failed_files.push_back(it_file->first);
}
}
return Status::OK();
}
Status
DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter == index_failed_files_.end()) {
FileID2FailedTimes failed_files;
failed_files.insert(std::make_pair(file.file_id_, 1));
index_failed_files_.insert(std::make_pair(file.table_id_, failed_files));
} else {
auto it_failed_files = iter->second.find(file.file_id_);
if (it_failed_files != iter->second.end()) {
it_failed_files->second++;
} else {
iter->second.insert(std::make_pair(file.file_id_, 1));
}
}
return Status::OK();
}
Status
DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter != index_failed_files_.end()) {
iter->second.erase(file.file_id_);
}
return Status::OK();
}
Status
DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
// there could be some failed files belong to different table.
// some files may has failed for several times, no need to build index for these files.
// thus we can avoid dead circle for build index operation
for (auto it_file = table_files.begin(); it_file != table_files.end();) {
auto it_failed_files = index_failed_files_.find((*it_file).table_id_);
if (it_failed_files != index_failed_files_.end()) {
auto it_failed_file = it_failed_files->second.find((*it_file).file_id_);
if (it_failed_file != it_failed_files->second.end()) {
if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) {
it_file = table_files.erase(it_file);
continue;
}
}
}
++it_file;
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -18,8 +18,10 @@
#pragma once
#include "DB.h"
#include "Types.h"
#include "src/db/insert/MemManager.h"
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "utils/ThreadPool.h"
#include <atomic>
@ -178,21 +180,6 @@ class DBImpl : public DB {
Status
GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count);
Status
CleanFailedIndexFileOfTable(const std::string& table_id);
Status
GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files);
Status
MarkFailedIndexFile(const meta::TableFileSchema& file);
Status
MarkSucceedIndexFile(const meta::TableFileSchema& file);
Status
IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files);
private:
const DBOptions options_;
@ -214,11 +201,10 @@ class DBImpl : public DB {
std::list<std::future<void>> index_thread_results_;
std::mutex build_index_mutex_;
std::mutex index_failed_mutex_;
using FileID2FailedTimes = std::map<std::string, uint64_t>;
using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>;
Table2FailedFiles index_failed_files_; // file id mapping to failed times
}; // DBImpl
IndexFailedChecker index_failed_checker_;
OngoingFileChecker ongoing_files_checker_;
}; // DBImpl
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "db/IndexFailedChecker.h"
#include <utility>
namespace milvus {
namespace engine {
constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1;
Status
IndexFailedChecker::CleanFailedIndexFileOfTable(const std::string& table_id) {
std::lock_guard<std::mutex> lck(mutex_);
index_failed_files_.erase(table_id); // rebuild failed index files for this table
return Status::OK();
}
Status
IndexFailedChecker::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) {
failed_files.clear();
std::lock_guard<std::mutex> lck(mutex_);
auto iter = index_failed_files_.find(table_id);
if (iter != index_failed_files_.end()) {
File2RefCount& failed_map = iter->second;
for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) {
failed_files.push_back(it_file->first);
}
}
return Status::OK();
}
Status
IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter == index_failed_files_.end()) {
File2RefCount failed_files;
failed_files.insert(std::make_pair(file.file_id_, 1));
index_failed_files_.insert(std::make_pair(file.table_id_, failed_files));
} else {
auto it_failed_files = iter->second.find(file.file_id_);
if (it_failed_files != iter->second.end()) {
it_failed_files->second++;
} else {
iter->second.insert(std::make_pair(file.file_id_, 1));
}
}
return Status::OK();
}
Status
IndexFailedChecker::MarkSucceedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter != index_failed_files_.end()) {
iter->second.erase(file.file_id_);
if (iter->second.empty()) {
index_failed_files_.erase(file.table_id_);
}
}
return Status::OK();
}
Status
IndexFailedChecker::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
// there could be some failed files belong to different table.
// some files may has failed for several times, no need to build index for these files.
// thus we can avoid dead circle for build index operation
for (auto it_file = table_files.begin(); it_file != table_files.end();) {
auto it_failed_files = index_failed_files_.find((*it_file).table_id_);
if (it_failed_files != index_failed_files_.end()) {
auto it_failed_file = it_failed_files->second.find((*it_file).file_id_);
if (it_failed_file != it_failed_files->second.end()) {
if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) {
it_file = table_files.erase(it_file);
continue;
}
}
}
++it_file;
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "db/Types.h"
#include "meta/Meta.h"
#include "utils/Status.h"
#include <map>
#include <mutex>
#include <string>
#include <vector>
namespace milvus {
namespace engine {
class IndexFailedChecker {
public:
Status
CleanFailedIndexFileOfTable(const std::string& table_id);
Status
GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files);
Status
MarkFailedIndexFile(const meta::TableFileSchema& file);
Status
MarkSucceedIndexFile(const meta::TableFileSchema& file);
Status
IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files);
private:
std::mutex mutex_;
Table2Files index_failed_files_; // table id mapping to (file id mapping to failed times)
};
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "db/OngoingFileChecker.h"
#include "utils/Log.h"
#include <utility>
namespace milvus {
namespace engine {
Status
OngoingFileChecker::MarkOngoingFile(const meta::TableFileSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return MarkOngoingFileNoLock(table_file);
}
Status
OngoingFileChecker::MarkOngoingFiles(const meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
MarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
Status
OngoingFileChecker::UnmarkOngoingFile(const meta::TableFileSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return UnmarkOngoingFileNoLock(table_file);
}
Status
OngoingFileChecker::UnmarkOngoingFiles(const meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
UnmarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
bool
OngoingFileChecker::IsIgnored(const meta::TableFileSchema& schema) {
std::lock_guard<std::mutex> lck(mutex_);
auto iter = ongoing_files_.find(schema.table_id_);
if (iter == ongoing_files_.end()) {
return false;
} else {
auto it_file = iter->second.find(schema.file_id_);
if (it_file == iter->second.end()) {
return false;
} else {
return (it_file->second > 0);
}
}
}
Status
OngoingFileChecker::MarkOngoingFileNoLock(const meta::TableFileSchema& table_file) {
if (table_file.table_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid table files");
}
auto iter = ongoing_files_.find(table_file.table_id_);
if (iter == ongoing_files_.end()) {
File2RefCount files_refcount;
files_refcount.insert(std::make_pair(table_file.file_id_, 1));
ongoing_files_.insert(std::make_pair(table_file.table_id_, files_refcount));
} else {
auto it_file = iter->second.find(table_file.file_id_);
if (it_file == iter->second.end()) {
iter->second[table_file.file_id_] = 1;
} else {
it_file->second++;
}
}
ENGINE_LOG_DEBUG << "Mark ongoing file:" << table_file.file_id_
<< " refcount:" << ongoing_files_[table_file.table_id_][table_file.file_id_];
return Status::OK();
}
Status
OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_file) {
if (table_file.table_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid table files");
}
auto iter = ongoing_files_.find(table_file.table_id_);
if (iter != ongoing_files_.end()) {
auto it_file = iter->second.find(table_file.file_id_);
if (it_file != iter->second.end()) {
it_file->second--;
ENGINE_LOG_DEBUG << "Unmark ongoing file:" << table_file.file_id_ << " refcount:" << it_file->second;
if (it_file->second <= 0) {
iter->second.erase(table_file.file_id_);
if (iter->second.empty()) {
ongoing_files_.erase(table_file.table_id_);
}
}
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "db/Types.h"
#include "meta/Meta.h"
#include "utils/Status.h"
#include <map>
#include <mutex>
#include <set>
#include <string>
namespace milvus {
namespace engine {
class OngoingFileChecker : public meta::Meta::CleanUpFilter {
public:
Status
MarkOngoingFile(const meta::TableFileSchema& table_file);
Status
MarkOngoingFiles(const meta::TableFilesSchema& table_files);
Status
UnmarkOngoingFile(const meta::TableFileSchema& table_file);
Status
UnmarkOngoingFiles(const meta::TableFilesSchema& table_files);
bool
IsIgnored(const meta::TableFileSchema& schema) override;
private:
Status
MarkOngoingFileNoLock(const meta::TableFileSchema& table_file);
Status
UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_file);
private:
std::mutex mutex_;
Table2Files ongoing_files_; // table id mapping to (file id mapping to ongoing ref-count)
};
} // namespace engine
} // namespace milvus

View File

@ -21,6 +21,9 @@
#include <faiss/Index.h>
#include <stdint.h>
#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>
@ -40,5 +43,8 @@ struct TableIndex {
int32_t metric_type_ = (int)MetricType::L2;
};
using File2RefCount = std::map<std::string, int64_t>;
using Table2Files = std::map<std::string, File2RefCount>;
} // namespace engine
} // namespace milvus

View File

@ -271,6 +271,12 @@ ExecutionEngineImpl::Serialize() {
// here we reset index size by file size,
// since some index type(such as SQ8) data size become smaller after serialized
index_->set_size(PhysicalSize());
ENGINE_LOG_DEBUG << "Finish serialize index file: " << location_ << " size: " << index_->Size();
if (index_->Size() == 0) {
std::string msg = "Failed to serialize file: " + location_ + " reason: out of disk space or memory";
status = Status(DB_ERROR, msg);
}
return status;
}
@ -465,7 +471,9 @@ ExecutionEngineImpl::Merge(const std::string& location) {
if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) {
auto status = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
if (!status.ok()) {
ENGINE_LOG_ERROR << "Merge: Add Error";
ENGINE_LOG_ERROR << "Failed to merge: " << location << " to: " << location_;
} else {
ENGINE_LOG_DEBUG << "Finish merge index file: " << location;
}
return status;
} else {
@ -503,6 +511,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t
throw Exception(DB_ERROR, status.message());
}
ENGINE_LOG_DEBUG << "Finish build index file: " << location << " size: " << to_index->Size();
return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
}

View File

@ -35,6 +35,13 @@ static const char* META_TABLES = "Tables";
static const char* META_TABLEFILES = "TableFiles";
class Meta {
public:
class CleanUpFilter {
public:
virtual bool
IsIgnored(const TableFileSchema& schema) = 0;
};
public:
virtual ~Meta() = default;
@ -121,10 +128,7 @@ class Meta {
CleanUpShadowFiles() = 0;
virtual Status
CleanUpCacheWithTTL(uint64_t seconds) = 0;
virtual Status
CleanUpFilesWithTTL(uint64_t seconds) = 0;
CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) = 0;
virtual Status
DropAll() = 0;

View File

@ -1783,49 +1783,7 @@ MySQLMetaImpl::CleanUpShadowFiles() {
}
Status
MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
// erase deleted/backup files from cache
try {
server::MetricCollector metric;
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
if (connectionPtr == nullptr) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
<< std::to_string(TableFileSchema::TO_DELETE) << ","
<< std::to_string(TableFileSchema::BACKUP) << ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
TableFileSchema table_file;
std::vector<std::string> idsToDelete;
for (auto& resRow : res) {
table_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(table_file.table_id_);
resRow["file_id"].to_string(table_file.file_id_);
table_file.date_ = resRow["date"];
utils::GetTableFilePath(options_, table_file);
server::CommonUtil::EraseFromCache(table_file.location_);
}
} catch (std::exception& e) {
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
}
return Status::OK();
}
Status
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) {
auto now = utils::GetMicroSecTimeStamp();
std::set<std::string> table_ids;
@ -1840,33 +1798,52 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date"
<< " FROM " << META_TABLEFILES
<< " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE)
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::Query query = connectionPtr->query();
query << "SELECT id, table_id, file_id, file_type, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
<< std::to_string(TableFileSchema::TO_DELETE) << "," << std::to_string(TableFileSchema::BACKUP) << ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
mysqlpp::StoreQueryResult res = query.store();
TableFileSchema table_file;
std::vector<std::string> idsToDelete;
int64_t clean_files = 0;
for (auto& resRow : res) {
table_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(table_file.table_id_);
resRow["file_id"].to_string(table_file.file_id_);
table_file.date_ = resRow["date"];
table_file.file_type_ = resRow["file_type"];
utils::DeleteTableFilePath(options_, table_file);
// check if the file can be deleted
if (filter && filter->IsIgnored(table_file)) {
ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
<< " currently is in use, not able to delete now";
continue; // ignore this file, don't delete it
}
ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
// erase file data from cache
// because GetTableFilePath won't able to generate file path after the file is deleted
utils::GetTableFilePath(options_, table_file);
server::CommonUtil::EraseFromCache(table_file.location_);
idsToDelete.emplace_back(std::to_string(table_file.id_));
table_ids.insert(table_file.table_id_);
if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) {
// delete file from disk storage
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Remove file id:" << table_file.id_ << " location:" << table_file.location_;
idsToDelete.emplace_back(std::to_string(table_file.id_));
table_ids.insert(table_file.table_id_);
clean_files++;
}
}
// delete file from meta
if (!idsToDelete.empty()) {
std::stringstream idsToDeleteSS;
for (auto& id : idsToDelete) {
@ -1875,18 +1852,17 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
std::string idsToDeleteStr = idsToDeleteSS.str();
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR "
cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";";
query << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
if (!cleanUpFilesWithTTLQuery.exec()) {
return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL",
cleanUpFilesWithTTLQuery.error());
if (!query.exec()) {
return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", query.error());
}
}
if (res.size() > 0) {
ENGINE_LOG_DEBUG << "Clean " << res.size() << " files deleted in " << seconds << " seconds";
if (clean_files > 0) {
ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds";
}
} // Scoped Connection
} catch (std::exception& e) {
@ -1904,14 +1880,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id"
<< " FROM " << META_TABLES
<< " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
mysqlpp::Query query = connectionPtr->query();
query << "SELECT id, table_id"
<< " FROM " << META_TABLES << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
mysqlpp::StoreQueryResult res = query.store();
int64_t remove_tables = 0;
if (!res.empty()) {
@ -1927,13 +1902,12 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
}
std::string idsToDeleteStr = idsToDeleteSS.str();
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR "
cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";";
query << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
if (!cleanUpFilesWithTTLQuery.exec()) {
return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL",
cleanUpFilesWithTTLQuery.error());
if (!query.exec()) {
return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", query.error());
}
}
@ -1958,14 +1932,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
}
for (auto& table_id : table_ids) {
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT file_id"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote
<< table_id << ";";
mysqlpp::Query query = connectionPtr->query();
query << "SELECT file_id"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
mysqlpp::StoreQueryResult res = query.store();
if (res.empty()) {
utils::DeleteTablePath(options_, table_id);

View File

@ -120,10 +120,7 @@ class MySQLMetaImpl : public Meta {
CleanUpShadowFiles() override;
Status
CleanUpCacheWithTTL(uint64_t seconds) override;
Status
CleanUpFilesWithTTL(uint64_t seconds) override;
CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override;
Status
DropAll() override;

View File

@ -1294,51 +1294,7 @@ SqliteMetaImpl::CleanUpShadowFiles() {
}
Status
SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
auto now = utils::GetMicroSecTimeStamp();
// erase deleted/backup files from cache
try {
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
std::vector<int> file_types = {
(int)TableFileSchema::TO_DELETE,
(int)TableFileSchema::BACKUP,
};
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::date_),
where(
in(&TableFileSchema::file_type_, file_types)
and
c(&TableFileSchema::updated_time_)
< now - seconds * US_PS));
for (auto& file : files) {
TableFileSchema table_file;
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.date_ = std::get<3>(file);
utils::GetTableFilePath(options_, table_file);
server::CommonUtil::EraseFromCache(table_file.location_);
}
} catch (std::exception& e) {
return HandleException("Encounter exception when clean cache", e.what());
}
return Status::OK();
}
Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) {
auto now = utils::GetMicroSecTimeStamp();
std::set<std::string> table_ids;
@ -1346,33 +1302,60 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
try {
server::MetricCollector metric;
std::vector<int> file_types = {
(int)TableFileSchema::TO_DELETE,
(int)TableFileSchema::BACKUP,
};
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// collect files to be deleted
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
&TableFileSchema::file_type_,
&TableFileSchema::date_),
where(
c(&TableFileSchema::file_type_) ==
(int)TableFileSchema::TO_DELETE
in(&TableFileSchema::file_type_, file_types)
and
c(&TableFileSchema::updated_time_)
< now - seconds * US_PS));
int64_t clean_files = 0;
auto commited = ConnectorPtr->transaction([&]() mutable {
TableFileSchema table_file;
for (auto& file : files) {
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.date_ = std::get<3>(file);
table_file.file_type_ = std::get<3>(file);
table_file.date_ = std::get<4>(file);
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
ConnectorPtr->remove<TableFileSchema>(table_file.id_);
// check if the file can be deleted
if (filter && filter->IsIgnored(table_file)) {
ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
<< " currently is in use, not able to delete now";
continue; // ignore this file, don't delete it
}
table_ids.insert(table_file.table_id_);
// erase from cache, must do this before file deleted,
// because GetTableFilePath won't able to generate file path after the file is deleted
utils::GetTableFilePath(options_, table_file);
server::CommonUtil::EraseFromCache(table_file.location_);
if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) {
// delete file from meta
ConnectorPtr->remove<TableFileSchema>(table_file.id_);
// delete file from disk storage
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Remove file id:" << table_file.file_id_ << " location:" << table_file.location_;
table_ids.insert(table_file.table_id_);
clean_files++;
}
}
return true;
});
@ -1381,8 +1364,8 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed");
}
if (files.size() > 0) {
ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds";
if (clean_files > 0) {
ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds";
}
} catch (std::exception& e) {
return HandleException("Encounter exception when clean table files", e.what());

View File

@ -120,10 +120,7 @@ class SqliteMetaImpl : public Meta {
CleanUpShadowFiles() override;
Status
CleanUpCacheWithTTL(uint64_t seconds) override;
Status
CleanUpFilesWithTTL(uint64_t seconds) override;
CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override;
Status
DropAll() override;

View File

@ -168,21 +168,28 @@ XBuildIndexTask::Execute() {
// step 5: save index file
try {
index->Serialize();
status = index->Serialize();
if (!status.ok()) {
ENGINE_LOG_ERROR << status.message();
}
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
status = Status(DB_ERROR, msg);
}
if (!status.ok()) {
// if failed to serialize index file to disk
// typical error: out of disk space, out of memory or permition denied
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
ENGINE_LOG_ERROR << "Failed to persist index file: " << table_file.location_
<< ", possible out of disk space";
<< ", possible out of disk space or memory";
build_index_job->BuildIndexDone(to_index_id_);
build_index_job->GetStatus() = Status(DB_ERROR, msg);
build_index_job->GetStatus() = status;
to_index_engine_ = nullptr;
return;
}
@ -196,7 +203,11 @@ XBuildIndexTask::Execute() {
origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP;
engine::meta::TableFilesSchema update_files = {table_file, origin_file};
status = meta_ptr->UpdateTableFiles(update_files);
if (status.ok()) { // makesure index file is sucessfully serialized to disk
status = meta_ptr->UpdateTableFiles(update_files);
}
if (status.ok()) {
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
<< " bytes"

View File

@ -17,6 +17,8 @@
#include "server/grpc_impl/request/CmdRequest.h"
#include "scheduler/SchedInst.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <memory>
@ -35,6 +37,9 @@ CmdRequest::Create(const std::string& cmd, std::string& result) {
Status
CmdRequest::OnExecute() {
std::string hdr = "CmdRequest(cmd=" + cmd_ + ")";
TimeRecorder rc(hdr);
if (cmd_ == "version") {
result_ = MILVUS_VERSION;
} else if (cmd_ == "tasktable") {

View File

@ -39,7 +39,8 @@ CountTableRequest::Create(const std::string& table_name, int64_t& row_count) {
Status
CountTableRequest::OnExecute() {
try {
TimeRecorder rc("CountTableRequest");
std::string hdr = "CountTableRequest(table=" + table_name_ + ")";
TimeRecorder rc(hdr);
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);

View File

@ -44,7 +44,8 @@ CreateIndexRequest::Create(const ::milvus::grpc::IndexParam* index_param) {
Status
CreateIndexRequest::OnExecute() {
try {
TimeRecorder rc("CreateIndexRequest");
std::string hdr = "CreateIndexRequest(table=" + index_param_->table_name() + ")";
TimeRecorder rc(hdr);
// step 1: check arguments
std::string table_name_ = index_param_->table_name();

View File

@ -22,6 +22,7 @@
#include "utils/ValidationUtil.h"
#include <memory>
#include <string>
namespace milvus {
namespace server {
@ -42,7 +43,10 @@ CreatePartitionRequest::Create(const ::milvus::grpc::PartitionParam* partition_p
Status
CreatePartitionRequest::OnExecute() {
TimeRecorder rc("CreatePartitionRequest");
std::string hdr = "CreatePartitionRequest(table=" + partition_param_->table_name() +
", partition_name=" + partition_param_->partition_name() +
", partition_tag=" + partition_param_->tag() + ")";
TimeRecorder rc(hdr);
try {
// step 1: check arguments

View File

@ -22,6 +22,7 @@
#include "utils/ValidationUtil.h"
#include <memory>
#include <string>
namespace milvus {
namespace server {
@ -42,7 +43,9 @@ CreateTableRequest::Create(const ::milvus::grpc::TableSchema* schema) {
Status
CreateTableRequest::OnExecute() {
TimeRecorder rc("CreateTableRequest");
std::string hdr = "CreateTableRequest(table=" + schema_->table_name() +
", dimension=" + std::to_string(schema_->dimension()) + ")";
TimeRecorder rc(hdr);
try {
// step 1: check arguments

View File

@ -39,7 +39,8 @@ DescribeIndexRequest::Create(const std::string& table_name, ::milvus::grpc::Inde
Status
DescribeIndexRequest::OnExecute() {
try {
TimeRecorder rc("DescribeIndexRequest");
std::string hdr = "DescribeIndexRequest(table=" + table_name_ + ")";
TimeRecorder rc(hdr);
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);

View File

@ -38,7 +38,8 @@ DescribeTableRequest::Create(const std::string& table_name, ::milvus::grpc::Tabl
Status
DescribeTableRequest::OnExecute() {
TimeRecorder rc("DescribeTableRequest");
std::string hdr = "DescribeTableRequest(table=" + table_name_ + ")";
TimeRecorder rc(hdr);
try {
// step 1: check arguments

View File

@ -39,7 +39,8 @@ DropIndexRequest::Create(const std::string& table_name) {
Status
DropIndexRequest::OnExecute() {
try {
TimeRecorder rc("DropIndexRequest");
std::string hdr = "DropIndexRequest(table=" + table_name_ + ")";
TimeRecorder rc(hdr);
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);

View File

@ -39,6 +39,11 @@ DropPartitionRequest::Create(const ::milvus::grpc::PartitionParam* partition_par
Status
DropPartitionRequest::OnExecute() {
std::string hdr = "DropPartitionRequest(table=" + partition_param_->table_name() +
", partition_name=" + partition_param_->partition_name() +
", partition_tag=" + partition_param_->tag() + ")";
TimeRecorder rc(hdr);
std::string table_name = partition_param_->table_name();
std::string partition_name = partition_param_->partition_name();
std::string partition_tag = partition_param_->tag();

View File

@ -40,7 +40,8 @@ DropTableRequest::Create(const std::string& table_name) {
Status
DropTableRequest::OnExecute() {
try {
TimeRecorder rc("DropTableRequest");
std::string hdr = "DropTableRequest(table=" + table_name_ + ")";
TimeRecorder rc(hdr);
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);

View File

@ -39,7 +39,8 @@ HasTableRequest::Create(const std::string& table_name, bool& has_table) {
Status
HasTableRequest::OnExecute() {
try {
TimeRecorder rc("HasTableRequest");
std::string hdr = "HasTableRequest(table=" + table_name_ + ")";
TimeRecorder rc(hdr);
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);

View File

@ -45,7 +45,10 @@ InsertRequest::Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus:
Status
InsertRequest::OnExecute() {
try {
TimeRecorder rc("InsertRequest");
std::string hdr = "InsertRequest(table=" + insert_param_->table_name() +
", n=" + std::to_string(insert_param_->row_record_array_size()) +
", partition_tag=" + insert_param_->partition_tag() + ")";
TimeRecorder rc(hdr);
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(insert_param_->table_name());

View File

@ -39,7 +39,8 @@ PreloadTableRequest::Create(const std::string& table_name) {
Status
PreloadTableRequest::OnExecute() {
try {
TimeRecorder rc("PreloadTableRequest");
std::string hdr = "PreloadTableRequest(table=" + table_name_ + ")";
TimeRecorder rc(hdr);
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);

View File

@ -51,7 +51,9 @@ SearchRequest::OnExecute() {
int64_t top_k = search_param_->topk();
int64_t nprobe = search_param_->nprobe();
std::string hdr = "SearchRequest(k=" + std::to_string(top_k) + ", nprob=" + std::to_string(nprobe) + ")";
std::string hdr = "SearchRequest(table=" + search_param_->table_name() +
", nq=" + std::to_string(search_param_->query_record_array_size()) +
", k=" + std::to_string(top_k) + ", nprob=" + std::to_string(nprobe) + ")";
TimeRecorder rc(hdr);
// step 1: check table name

View File

@ -40,6 +40,9 @@ ShowPartitionsRequest::Create(const std::string& table_name, ::milvus::grpc::Par
Status
ShowPartitionsRequest::OnExecute() {
std::string hdr = "ShowPartitionsRequest(table=" + table_name_ + ")";
TimeRecorder rc(hdr);
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;

View File

@ -38,6 +38,8 @@ ShowTablesRequest::Create(::milvus::grpc::TableNameList* table_name_list) {
Status
ShowTablesRequest::OnExecute() {
TimeRecorder rc("ShowTablesRequest");
std::vector<engine::meta::TableSchema> schema_array;
auto statuts = DBWrapper::DB()->AllTables(schema_array);
if (!statuts.ok()) {

View File

@ -229,7 +229,7 @@ CommonUtil::ConvertTime(tm time_struct, time_t& time_integer) {
void
CommonUtil::EraseFromCache(const std::string& item_key) {
if (item_key.empty()) {
// SERVER_LOG_ERROR << "Empty key cannot be erased from cache";
SERVER_LOG_ERROR << "Empty key cannot be erased from cache";
return;
}

View File

@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Options.h"
#include "db/Utils.h"
#include "db/engine/EngineFactory.h"
@ -119,3 +121,61 @@ TEST(DBMiscTest, UTILS_TEST) {
status = milvus::engine::utils::DeleteTableFilePath(options, file);
ASSERT_TRUE(status.ok());
}
TEST(DBMiscTest, CHECKER_TEST) {
{
milvus::engine::IndexFailedChecker checker;
milvus::engine::meta::TableFileSchema schema;
schema.table_id_ = "aaa";
schema.file_id_ = "5000";
checker.MarkFailedIndexFile(schema);
schema.table_id_ = "bbb";
schema.file_id_ = "5001";
checker.MarkFailedIndexFile(schema);
std::vector<std::string> failed_files;
checker.GetFailedIndexFileOfTable("aaa", failed_files);
ASSERT_EQ(failed_files.size(), 1UL);
schema.table_id_ = "bbb";
schema.file_id_ = "5002";
checker.MarkFailedIndexFile(schema);
checker.MarkFailedIndexFile(schema);
milvus::engine::meta::TableFilesSchema table_files = {schema};
checker.IgnoreFailedIndexFiles(table_files);
ASSERT_TRUE(table_files.empty());
checker.GetFailedIndexFileOfTable("bbb", failed_files);
ASSERT_EQ(failed_files.size(), 2UL);
checker.MarkSucceedIndexFile(schema);
checker.GetFailedIndexFileOfTable("bbb", failed_files);
ASSERT_EQ(failed_files.size(), 1UL);
}
{
milvus::engine::OngoingFileChecker checker;
milvus::engine::meta::TableFileSchema schema;
schema.table_id_ = "aaa";
schema.file_id_ = "5000";
checker.MarkOngoingFile(schema);
ASSERT_TRUE(checker.IsIgnored(schema));
schema.table_id_ = "bbb";
schema.file_id_ = "5001";
milvus::engine::meta::TableFilesSchema table_files = {schema};
checker.MarkOngoingFiles(table_files);
ASSERT_TRUE(checker.IsIgnored(schema));
checker.UnmarkOngoingFile(schema);
ASSERT_FALSE(checker.IsIgnored(schema));
schema.table_id_ = "aaa";
schema.file_id_ = "5000";
checker.UnmarkOngoingFile(schema);
ASSERT_FALSE(checker.IsIgnored(schema));
}
}