mirror of https://github.com/milvus-io/milvus.git
parent
032ac328b2
commit
ef0e3145eb
|
@ -47,6 +47,7 @@ Please mark all change in change log and use the issue from GitHub
|
|||
- \#966 - Update NOTICE.md
|
||||
- \#1002 - Rename minio to s3 in Storage Config section
|
||||
- \#1078 - Move 'insert_buffer_size' to Cache Config section
|
||||
- \#1105 - Error message is not clear when creating IVFSQ8H index without gpu resources
|
||||
|
||||
## Task
|
||||
|
||||
|
|
|
@ -895,7 +895,7 @@ DBImpl::BackgroundBuildIndex() {
|
|||
Status status = job->GetStatus();
|
||||
ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();
|
||||
|
||||
index_failed_checker_.MarkFailedIndexFile(file_schema);
|
||||
index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
|
||||
} else {
|
||||
ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
|
||||
|
||||
|
@ -1066,13 +1066,10 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
|
|||
}
|
||||
|
||||
// failed to build index for some files, return error
|
||||
std::vector<std::string> failed_files;
|
||||
index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files);
|
||||
if (!failed_files.empty()) {
|
||||
std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) +
|
||||
((failed_files.size() == 1) ? " file" : " files");
|
||||
msg += ", please double check index parameters.";
|
||||
return Status(DB_ERROR, msg);
|
||||
std::string err_msg;
|
||||
index_failed_checker_.GetErrMsgForTable(table_id, err_msg);
|
||||
if (!err_msg.empty()) {
|
||||
return Status(DB_ERROR, err_msg);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
|
|
@ -15,9 +15,10 @@
|
|||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "db/IndexFailedChecker.h"
|
||||
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "db/IndexFailedChecker.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
@ -33,35 +34,31 @@ IndexFailedChecker::CleanFailedIndexFileOfTable(const std::string& table_id) {
|
|||
}
|
||||
|
||||
Status
|
||||
IndexFailedChecker::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) {
|
||||
failed_files.clear();
|
||||
IndexFailedChecker::GetErrMsgForTable(const std::string& table_id, std::string& err_msg) {
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
auto iter = index_failed_files_.find(table_id);
|
||||
if (iter != index_failed_files_.end()) {
|
||||
File2RefCount& failed_map = iter->second;
|
||||
for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) {
|
||||
failed_files.push_back(it_file->first);
|
||||
}
|
||||
err_msg = iter->second.begin()->second[0];
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file) {
|
||||
IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file, const std::string& err_msg) {
|
||||
std::lock_guard<std::mutex> lck(mutex_);
|
||||
|
||||
auto iter = index_failed_files_.find(file.table_id_);
|
||||
if (iter == index_failed_files_.end()) {
|
||||
File2RefCount failed_files;
|
||||
failed_files.insert(std::make_pair(file.file_id_, 1));
|
||||
File2ErrArray failed_files;
|
||||
failed_files.insert(std::make_pair(file.file_id_, std::vector<std::string>(1, err_msg)));
|
||||
index_failed_files_.insert(std::make_pair(file.table_id_, failed_files));
|
||||
} else {
|
||||
auto it_failed_files = iter->second.find(file.file_id_);
|
||||
if (it_failed_files != iter->second.end()) {
|
||||
it_failed_files->second++;
|
||||
it_failed_files->second.push_back(err_msg);
|
||||
} else {
|
||||
iter->second.insert(std::make_pair(file.file_id_, 1));
|
||||
iter->second.insert(std::make_pair(file.file_id_, std::vector<std::string>(1, err_msg)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,7 +92,7 @@ IndexFailedChecker::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files)
|
|||
if (it_failed_files != index_failed_files_.end()) {
|
||||
auto it_failed_file = it_failed_files->second.find((*it_file).file_id_);
|
||||
if (it_failed_file != it_failed_files->second.end()) {
|
||||
if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) {
|
||||
if (it_failed_file->second.size() >= INDEX_FAILED_RETRY_TIME) {
|
||||
it_file = table_files.erase(it_file);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
#include <map>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
@ -35,10 +34,10 @@ class IndexFailedChecker {
|
|||
CleanFailedIndexFileOfTable(const std::string& table_id);
|
||||
|
||||
Status
|
||||
GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files);
|
||||
GetErrMsgForTable(const std::string& table_id, std::string& err_msg);
|
||||
|
||||
Status
|
||||
MarkFailedIndexFile(const meta::TableFileSchema& file);
|
||||
MarkFailedIndexFile(const meta::TableFileSchema& file, const std::string& err_msg);
|
||||
|
||||
Status
|
||||
MarkSucceedIndexFile(const meta::TableFileSchema& file);
|
||||
|
@ -48,7 +47,7 @@ class IndexFailedChecker {
|
|||
|
||||
private:
|
||||
std::mutex mutex_;
|
||||
Table2Files index_failed_files_; // table id mapping to (file id mapping to failed times)
|
||||
Table2FileErr index_failed_files_; // table id mapping to (file id mapping to failed times)
|
||||
};
|
||||
|
||||
} // namespace engine
|
||||
|
|
|
@ -55,7 +55,7 @@ class OngoingFileChecker : public meta::Meta::CleanUpFilter {
|
|||
|
||||
private:
|
||||
std::mutex mutex_;
|
||||
Table2Files ongoing_files_; // table id mapping to (file id mapping to ongoing ref-count)
|
||||
Table2FileRef ongoing_files_; // table id mapping to (file id mapping to ongoing ref-count)
|
||||
};
|
||||
|
||||
} // namespace engine
|
||||
|
|
|
@ -50,8 +50,10 @@ struct VectorsData {
|
|||
IDNumbers id_array_;
|
||||
};
|
||||
|
||||
using File2ErrArray = std::map<std::string, std::vector<std::string>>;
|
||||
using Table2FileErr = std::map<std::string, File2ErrArray>;
|
||||
using File2RefCount = std::map<std::string, int64_t>;
|
||||
using Table2Files = std::map<std::string, File2RefCount>;
|
||||
using Table2FileRef = std::map<std::string, File2RefCount>;
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
|
|
@ -128,30 +128,30 @@ TEST(DBMiscTest, CHECKER_TEST) {
|
|||
milvus::engine::meta::TableFileSchema schema;
|
||||
schema.table_id_ = "aaa";
|
||||
schema.file_id_ = "5000";
|
||||
checker.MarkFailedIndexFile(schema);
|
||||
checker.MarkFailedIndexFile(schema, "5000 fail");
|
||||
schema.table_id_ = "bbb";
|
||||
schema.file_id_ = "5001";
|
||||
checker.MarkFailedIndexFile(schema);
|
||||
checker.MarkFailedIndexFile(schema, "5001 fail");
|
||||
|
||||
std::vector<std::string> failed_files;
|
||||
checker.GetFailedIndexFileOfTable("aaa", failed_files);
|
||||
ASSERT_EQ(failed_files.size(), 1UL);
|
||||
std::string err_msg;
|
||||
checker.GetErrMsgForTable("aaa", err_msg);
|
||||
ASSERT_EQ(err_msg, "5000 fail");
|
||||
|
||||
schema.table_id_ = "bbb";
|
||||
schema.file_id_ = "5002";
|
||||
checker.MarkFailedIndexFile(schema);
|
||||
checker.MarkFailedIndexFile(schema);
|
||||
checker.MarkFailedIndexFile(schema, "5002 fail");
|
||||
checker.MarkFailedIndexFile(schema, "5002 fail");
|
||||
|
||||
milvus::engine::meta::TableFilesSchema table_files = {schema};
|
||||
checker.IgnoreFailedIndexFiles(table_files);
|
||||
ASSERT_TRUE(table_files.empty());
|
||||
|
||||
checker.GetFailedIndexFileOfTable("bbb", failed_files);
|
||||
ASSERT_EQ(failed_files.size(), 2UL);
|
||||
checker.GetErrMsgForTable("bbb", err_msg);
|
||||
ASSERT_EQ(err_msg, "5001 fail");
|
||||
|
||||
checker.MarkSucceedIndexFile(schema);
|
||||
checker.GetFailedIndexFileOfTable("bbb", failed_files);
|
||||
ASSERT_EQ(failed_files.size(), 1UL);
|
||||
checker.GetErrMsgForTable("bbb", err_msg);
|
||||
ASSERT_EQ(err_msg, "5001 fail");
|
||||
}
|
||||
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue