mirror of https://github.com/milvus-io/milvus.git
Fix failed to load index due to lost binary (#26135)
Signed-off-by: yah01 <yang.cen@zilliz.com>pull/26146/head
parent
4e1b65d38f
commit
07f08daf1a
|
@ -37,6 +37,7 @@
|
||||||
#include "storage/FieldData.h"
|
#include "storage/FieldData.h"
|
||||||
#include "storage/MemFileManagerImpl.h"
|
#include "storage/MemFileManagerImpl.h"
|
||||||
#include "storage/ThreadPools.h"
|
#include "storage/ThreadPools.h"
|
||||||
|
#include "storage/Util.h"
|
||||||
|
|
||||||
namespace milvus::index {
|
namespace milvus::index {
|
||||||
|
|
||||||
|
@ -104,32 +105,78 @@ VectorMemIndex::Load(const Config& config) {
|
||||||
AssertInfo(index_files.has_value(),
|
AssertInfo(index_files.has_value(),
|
||||||
"index file paths is empty when load index");
|
"index file paths is empty when load index");
|
||||||
|
|
||||||
|
LOG_SEGCORE_INFO_ << "load index files: " << index_files.value().size();
|
||||||
|
|
||||||
auto parallel_degree =
|
auto parallel_degree =
|
||||||
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
|
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
|
||||||
|
std::map<std::string, storage::FieldDataPtr> index_datas{};
|
||||||
|
|
||||||
std::map<std::string, storage::FieldDataChannelPtr> channels;
|
// try to read slice meta first
|
||||||
for (const auto& file : index_files.value()) {
|
std::string slice_meta_filepath;
|
||||||
auto key = file.substr(file.find_last_of('/') + 1);
|
for (auto& file : index_files.value()) {
|
||||||
LOG_SEGCORE_INFO_ << "loading index file " << key;
|
auto file_name = file.substr(file.find_last_of('/') + 1);
|
||||||
if (channels.find(key) == channels.end()) {
|
if (file_name == INDEX_FILE_SLICE_META) {
|
||||||
channels.emplace(std::move(key),
|
slice_meta_filepath = file;
|
||||||
std::make_shared<storage::FieldDataChannel>(
|
break;
|
||||||
parallel_degree * 2));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
|
if (slice_meta_filepath
|
||||||
auto future = pool.Submit(
|
.empty()) { // no slice meta, we could simply load all these files
|
||||||
[&] { file_manager_->LoadFileStream(index_files.value(), channels); });
|
index_datas = file_manager_->LoadIndexToMemory(index_files.value());
|
||||||
|
AssembleIndexDatas(index_datas);
|
||||||
|
} else { // load with the slice meta info, then we can load batch by batch
|
||||||
|
std::string index_file_prefix = slice_meta_filepath.substr(
|
||||||
|
0, slice_meta_filepath.find_last_of('/') + 1);
|
||||||
|
std::vector<std::string> batch{};
|
||||||
|
batch.reserve(parallel_degree);
|
||||||
|
|
||||||
LOG_SEGCORE_INFO_ << "assemble index data...";
|
auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath});
|
||||||
std::unordered_map<std::string, storage::FieldDataPtr> result;
|
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
|
||||||
AssembleIndexDatas(channels, result);
|
Config meta_data = Config::parse(
|
||||||
LOG_SEGCORE_INFO_ << "assemble index data done";
|
std::string(static_cast<const char*>(raw_slice_meta->Data()),
|
||||||
|
raw_slice_meta->Size()));
|
||||||
|
|
||||||
|
for (auto& item : meta_data[META]) {
|
||||||
|
std::string prefix = item[NAME];
|
||||||
|
int slice_num = item[SLICE_NUM];
|
||||||
|
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
|
||||||
|
|
||||||
|
auto new_field_data =
|
||||||
|
milvus::storage::CreateFieldData(DataType::INT8, 1, total_len);
|
||||||
|
auto HandleBatch = [&](int index) {
|
||||||
|
auto batch_data = file_manager_->LoadIndexToMemory(batch);
|
||||||
|
for (int j = index - batch.size() + 1; j <= index; j++) {
|
||||||
|
std::string file_name = GenSlicedFileName(prefix, j);
|
||||||
|
AssertInfo(batch_data.find(file_name) != batch_data.end(),
|
||||||
|
"lost index slice data");
|
||||||
|
auto data = batch_data[file_name];
|
||||||
|
new_field_data->FillFieldData(data->Data(), data->Size());
|
||||||
|
}
|
||||||
|
batch.clear();
|
||||||
|
};
|
||||||
|
|
||||||
|
for (auto i = 0; i < slice_num; ++i) {
|
||||||
|
std::string file_name = GenSlicedFileName(prefix, i);
|
||||||
|
batch.push_back(index_file_prefix + file_name);
|
||||||
|
if (batch.size() >= parallel_degree) {
|
||||||
|
HandleBatch(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (batch.size() > 0) {
|
||||||
|
HandleBatch(slice_num - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
AssertInfo(
|
||||||
|
new_field_data->IsFull(),
|
||||||
|
"index len is inconsistent after disassemble and assemble");
|
||||||
|
index_datas[prefix] = new_field_data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LOG_SEGCORE_INFO_ << "construct binary set...";
|
LOG_SEGCORE_INFO_ << "construct binary set...";
|
||||||
BinarySet binary_set;
|
BinarySet binary_set;
|
||||||
for (auto& [key, data] : result) {
|
for (auto& [key, data] : index_datas) {
|
||||||
LOG_SEGCORE_INFO_ << "add index data to binary set: " << key;
|
LOG_SEGCORE_INFO_ << "add index data to binary set: " << key;
|
||||||
auto size = data->Size();
|
auto size = data->Size();
|
||||||
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
|
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
|
||||||
|
|
|
@ -119,41 +119,6 @@ MemFileManagerImpl::LoadIndexToMemory(
|
||||||
return file_to_index_data;
|
return file_to_index_data;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
|
||||||
MemFileManagerImpl::LoadFileStream(
|
|
||||||
const std::vector<std::string>& remote_files,
|
|
||||||
std::map<std::string, storage::FieldDataChannelPtr>& channels) {
|
|
||||||
auto parallel_degree =
|
|
||||||
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
|
|
||||||
|
|
||||||
std::vector<std::string> batch_files;
|
|
||||||
auto LoadBatchIndexFiles = [&]() {
|
|
||||||
auto index_datas = GetObjectData(rcm_.get(), batch_files);
|
|
||||||
for (auto i = 0; i < index_datas.size(); i++) {
|
|
||||||
auto file_name =
|
|
||||||
batch_files[i].substr(batch_files[i].find_last_of('/') + 1);
|
|
||||||
auto& channel = channels[file_name];
|
|
||||||
channel->push(index_datas[i]);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
for (auto& file : remote_files) {
|
|
||||||
if (batch_files.size() >= parallel_degree) {
|
|
||||||
LoadBatchIndexFiles();
|
|
||||||
batch_files.clear();
|
|
||||||
}
|
|
||||||
batch_files.emplace_back(file);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (batch_files.size() > 0) {
|
|
||||||
LoadBatchIndexFiles();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto& [_, channel] : channels) {
|
|
||||||
channel->close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<FieldDataPtr>
|
std::vector<FieldDataPtr>
|
||||||
MemFileManagerImpl::CacheRawDataToMemory(
|
MemFileManagerImpl::CacheRawDataToMemory(
|
||||||
std::vector<std::string> remote_files) {
|
std::vector<std::string> remote_files) {
|
||||||
|
|
|
@ -55,11 +55,6 @@ class MemFileManagerImpl : public FileManagerImpl {
|
||||||
std::map<std::string, storage::FieldDataPtr>
|
std::map<std::string, storage::FieldDataPtr>
|
||||||
LoadIndexToMemory(const std::vector<std::string>& remote_files);
|
LoadIndexToMemory(const std::vector<std::string>& remote_files);
|
||||||
|
|
||||||
void
|
|
||||||
LoadFileStream(
|
|
||||||
const std::vector<std::string>& remote_files,
|
|
||||||
std::map<std::string, storage::FieldDataChannelPtr>& channels);
|
|
||||||
|
|
||||||
std::vector<FieldDataPtr>
|
std::vector<FieldDataPtr>
|
||||||
CacheRawDataToMemory(std::vector<std::string> remote_files);
|
CacheRawDataToMemory(std::vector<std::string> remote_files);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue