fix storage clang tidy warnings (#3285)

* update storage interface Read

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* enable storage unittest

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* use THROW_ERROR in more places

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* code opt

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename read() to Read()

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update storage interface names

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* roll back to void Read()

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/3284/head^2
Cai Yudong 2020-08-18 11:17:25 +08:00 committed by GitHub
parent fae9ea330c
commit 89ca954e19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 194 additions and 227 deletions

View File

@ -32,52 +32,43 @@ namespace codec {
void
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) {
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
size_t num_bytes;
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
raw = std::make_shared<engine::BinaryData>();
raw->data_.resize(num_bytes);
fs_ptr->reader_ptr_->read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->Close();
}
void
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes,
engine::BinaryDataPtr& raw) {
if (offset < 0 || num_bytes <= 0) {
std::string err_msg = "Invalid input to read: " + file_path;
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path);
}
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
size_t total_num_bytes;
fs_ptr->reader_ptr_->read(&total_num_bytes, sizeof(size_t));
fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t));
offset += sizeof(size_t); // Beginning of file is num_bytes
if (offset + num_bytes > total_num_bytes) {
std::string err_msg = "Invalid input to read: " + file_path;
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path);
}
raw = std::make_shared<engine::BinaryData>();
raw->data_.resize(num_bytes);
fs_ptr->reader_ptr_->seekg(offset);
fs_ptr->reader_ptr_->read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Seekg(offset);
fs_ptr->reader_ptr_->Read(raw->data_.data(), num_bytes);
fs_ptr->reader_ptr_->Close();
}
void
@ -87,23 +78,18 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
return;
}
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
}
size_t total_num_bytes;
fs_ptr->reader_ptr_->read(&total_num_bytes, sizeof(size_t));
fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t));
int64_t total_bytes = 0;
for (auto& range : read_ranges) {
if (range.offset_ > total_num_bytes) {
std::string err_msg = "Invalid input to read: " + file_path;
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path);
}
total_bytes += range.num_bytes_;
}
@ -112,12 +98,11 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
int64_t poz = 0;
for (auto& range : read_ranges) {
int64_t offset = range.offset_ + sizeof(size_t);
fs_ptr->reader_ptr_->seekg(offset);
fs_ptr->reader_ptr_->read(raw->data_.data() + poz, range.num_bytes_);
fs_ptr->reader_ptr_->Seekg(offset);
fs_ptr->reader_ptr_->Read(raw->data_.data() + poz, range.num_bytes_);
poz += range.num_bytes_;
}
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Close();
}
void
@ -127,16 +112,14 @@ BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_
return;
}
if (!fs_ptr->writer_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
if (!fs_ptr->writer_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path);
}
size_t num_bytes = raw->data_.size();
fs_ptr->writer_ptr_->write(&num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->write((void*)(raw->data_.data()), num_bytes);
fs_ptr->writer_ptr_->close();
fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write((void*)(raw->data_.data()), num_bytes);
fs_ptr->writer_ptr_->Close();
}
} // namespace codec

View File

@ -45,21 +45,19 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string&
segment::DeletedDocsPtr& deleted_docs) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
if (!fs_ptr->reader_ptr_->open(full_file_path)) {
std::string err_msg = "Failed to open file: " + full_file_path; // + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path);
}
size_t num_bytes;
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
auto deleted_docs_size = num_bytes / sizeof(engine::offset_t);
std::vector<engine::offset_t> deleted_docs_list;
deleted_docs_list.resize(deleted_docs_size);
fs_ptr->reader_ptr_->read(deleted_docs_list.data(), num_bytes);
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Read(deleted_docs_list.data(), num_bytes);
fs_ptr->reader_ptr_->Close();
deleted_docs = std::make_shared<segment::DeletedDocs>(deleted_docs_list);
}
@ -81,15 +79,13 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
size_t old_num_bytes;
std::vector<engine::offset_t> delete_ids;
if (exists) {
if (!fs_ptr->reader_ptr_->open(temp_path)) {
std::string err_msg = "Failed to read from file: " + temp_path; // + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
if (!fs_ptr->reader_ptr_->Open(temp_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open tmp deleted docs file: " + temp_path);
}
fs_ptr->reader_ptr_->read(&old_num_bytes, sizeof(size_t));
fs_ptr->reader_ptr_->Read(&old_num_bytes, sizeof(size_t));
delete_ids.resize(old_num_bytes / sizeof(engine::offset_t));
fs_ptr->reader_ptr_->read(delete_ids.data(), old_num_bytes);
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Read(delete_ids.data(), old_num_bytes);
fs_ptr->reader_ptr_->Close();
} else {
old_num_bytes = 0;
}
@ -100,15 +96,13 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
delete_ids.insert(delete_ids.end(), deleted_docs_list.begin(), deleted_docs_list.end());
}
if (!fs_ptr->writer_ptr_->open(temp_path)) {
std::string err_msg = "Failed to write from file: " + temp_path; // + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
if (!fs_ptr->writer_ptr_->Open(temp_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path);
}
fs_ptr->writer_ptr_->write(&new_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->write(delete_ids.data(), new_num_bytes);
fs_ptr->writer_ptr_->close();
fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes);
fs_ptr->writer_ptr_->Close();
// Move temp file to delete file
std::experimental::filesystem::rename(temp_path, full_file_path);
@ -117,17 +111,15 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
void
DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
if (!fs_ptr->writer_ptr_->open(full_file_path)) {
std::string err_msg = "Failed to open file: " + full_file_path; // + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path);
}
size_t num_bytes;
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
size = num_bytes / sizeof(engine::offset_t);
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Close();
}
} // namespace codec

View File

@ -46,9 +46,7 @@ IdBloomFilterFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string
new_scaling_bloom_from_file(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str());
fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr);
if (bloom_filter == nullptr) {
std::string err_msg = "Failed to read bloom filter from file: " + full_file_path + ". " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_UNEXPECTED_ERROR, err_msg);
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to read bloom filter from file: " + full_file_path);
}
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(bloom_filter);
}
@ -58,9 +56,7 @@ IdBloomFilterFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::strin
const segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
const std::string full_file_path = file_path + BLOOM_FILTER_POSTFIX;
if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) {
std::string err_msg = "Failed to write bloom filter to file: " + full_file_path + ". " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_UNEXPECTED_ERROR, err_msg);
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to write bloom filter to file: " + full_file_path);
}
}
@ -71,9 +67,7 @@ IdBloomFilterFormat::Create(const storage::FSHandlerPtr& fs_ptr, const std::stri
scaling_bloom_t* bloom_filter =
new_scaling_bloom(BLOOM_FILTER_CAPACITY, BLOOM_FILTER_ERROR_RATE, full_file_path.c_str());
if (bloom_filter == nullptr) {
std::string err_msg = "Failed to read bloom filter from file: " + full_file_path + ". " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_UNEXPECTED_ERROR, err_msg);
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Failed to read bloom filter from file: " + full_file_path);
}
id_bloom_filter_ptr = std::make_shared<segment::IdBloomFilter>(bloom_filter);
}

View File

@ -85,51 +85,49 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
knowhere::BinarySet load_data_list;
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
if (!fs_ptr->reader_ptr_->open(full_file_path)) {
LOG_ENGINE_ERROR_ << "Fail to open structured index: " << full_file_path;
return;
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
}
int64_t length = fs_ptr->reader_ptr_->length();
int64_t length = fs_ptr->reader_ptr_->Length();
if (length <= 0) {
LOG_ENGINE_ERROR_ << "Invalid structured index length: " << full_file_path;
return;
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path);
}
size_t rp = 0;
fs_ptr->reader_ptr_->seekg(0);
fs_ptr->reader_ptr_->Seekg(0);
int32_t data_type = 0;
fs_ptr->reader_ptr_->read(&data_type, sizeof(data_type));
fs_ptr->reader_ptr_->Read(&data_type, sizeof(data_type));
rp += sizeof(data_type);
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
LOG_ENGINE_DEBUG_ << "Start to read_index(" << full_file_path << ") length: " << length << " bytes";
while (rp < length) {
size_t meta_length;
fs_ptr->reader_ptr_->read(&meta_length, sizeof(meta_length));
fs_ptr->reader_ptr_->Read(&meta_length, sizeof(meta_length));
rp += sizeof(meta_length);
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
auto meta = new char[meta_length];
fs_ptr->reader_ptr_->read(meta, meta_length);
fs_ptr->reader_ptr_->Read(meta, meta_length);
rp += meta_length;
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
size_t bin_length;
fs_ptr->reader_ptr_->read(&bin_length, sizeof(bin_length));
fs_ptr->reader_ptr_->Read(&bin_length, sizeof(bin_length));
rp += sizeof(bin_length);
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
auto bin = new uint8_t[bin_length];
fs_ptr->reader_ptr_->read(bin, bin_length);
fs_ptr->reader_ptr_->Read(bin, bin_length);
rp += bin_length;
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
std::shared_ptr<uint8_t[]> binptr(bin);
load_data_list.Append(std::string(meta, meta_length), binptr, bin_length);
delete[] meta;
}
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
@ -148,28 +146,27 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
auto binaryset = index->Serialize(knowhere::Config());
if (!fs_ptr->writer_ptr_->open(full_file_path)) {
LOG_ENGINE_ERROR_ << "Fail to open structured index: " << full_file_path;
return;
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
}
fs_ptr->writer_ptr_->write(&data_type, sizeof(data_type));
fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type));
for (auto& iter : binaryset.binary_map_) {
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
fs_ptr->writer_ptr_->write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->write((void*)meta, meta_length);
fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->Write((void*)meta, meta_length);
auto binary = iter.second;
int64_t binary_length = binary->size;
fs_ptr->writer_ptr_->write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->write((void*)binary->data.get(), binary_length);
fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->Write((void*)binary->data.get(), binary_length);
}
fs_ptr->writer_ptr_->close();
fs_ptr->writer_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->length() * 1000000.0 / span / 1024 / 1024;
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s";
}

View File

@ -41,23 +41,21 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin
milvus::TimeRecorder recorder("VectorCompressFormat::Read");
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
if (!fs_ptr->reader_ptr_->open(full_file_path)) {
LOG_ENGINE_ERROR_ << "Fail to open vector compress: " << full_file_path;
return;
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path);
}
int64_t length = fs_ptr->reader_ptr_->length();
int64_t length = fs_ptr->reader_ptr_->Length();
if (length <= 0) {
LOG_ENGINE_ERROR_ << "Invalid vector compress length: " << full_file_path;
return;
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector compress length: " + full_file_path);
}
compress->data = std::shared_ptr<uint8_t[]>(new uint8_t[length]);
compress->size = length;
fs_ptr->reader_ptr_->seekg(0);
fs_ptr->reader_ptr_->read(compress->data.get(), length);
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Seekg(0);
fs_ptr->reader_ptr_->Read(compress->data.get(), length);
fs_ptr->reader_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
@ -70,13 +68,12 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri
milvus::TimeRecorder recorder("VectorCompressFormat::Write");
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
if (!fs_ptr->writer_ptr_->open(full_file_path)) {
LOG_ENGINE_ERROR_ << "Fail to open vector compress: " << full_file_path;
return;
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path);
}
fs_ptr->writer_ptr_->write(compress->data.get(), compress->size);
fs_ptr->writer_ptr_->close();
fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size);
fs_ptr->writer_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = compress->size * 1000000.0 / span / 1024 / 1024;

View File

@ -43,23 +43,21 @@ VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::strin
knowhere::BinaryPtr& data) {
milvus::TimeRecorder recorder("VectorIndexFormat::ReadRaw");
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open raw file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path);
}
size_t num_bytes;
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
data = std::make_shared<knowhere::Binary>();
data->size = num_bytes;
data->data = std::shared_ptr<uint8_t[]>(new uint8_t[num_bytes]);
// Beginning of file is num_bytes
fs_ptr->reader_ptr_->seekg(sizeof(size_t));
fs_ptr->reader_ptr_->read(data->data.get(), num_bytes);
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Seekg(sizeof(size_t));
fs_ptr->reader_ptr_->Read(data->data.get(), num_bytes);
fs_ptr->reader_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = num_bytes * 1000000.0 / span / 1024 / 1024;
@ -72,48 +70,45 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str
milvus::TimeRecorder recorder("VectorIndexFormat::ReadIndex");
std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX;
if (!fs_ptr->reader_ptr_->open(full_file_path)) {
std::string err_msg = "Failed to open vector index: " + full_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
}
int64_t length = fs_ptr->reader_ptr_->length();
int64_t length = fs_ptr->reader_ptr_->Length();
if (length <= 0) {
LOG_ENGINE_ERROR_ << "Invalid vector index length: " << full_file_path;
return;
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector index length: " + full_file_path);
}
int64_t rp = 0;
fs_ptr->reader_ptr_->seekg(0);
fs_ptr->reader_ptr_->Seekg(0);
LOG_ENGINE_DEBUG_ << "Start to ReadIndex(" << full_file_path << ") length: " << length << " bytes";
while (rp < length) {
size_t meta_length;
fs_ptr->reader_ptr_->read(&meta_length, sizeof(meta_length));
fs_ptr->reader_ptr_->Read(&meta_length, sizeof(meta_length));
rp += sizeof(meta_length);
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
auto meta = new char[meta_length];
fs_ptr->reader_ptr_->read(meta, meta_length);
fs_ptr->reader_ptr_->Read(meta, meta_length);
rp += meta_length;
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
size_t bin_length;
fs_ptr->reader_ptr_->read(&bin_length, sizeof(bin_length));
fs_ptr->reader_ptr_->Read(&bin_length, sizeof(bin_length));
rp += sizeof(bin_length);
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
auto bin = new uint8_t[bin_length];
fs_ptr->reader_ptr_->read(bin, bin_length);
fs_ptr->reader_ptr_->Read(bin, bin_length);
rp += bin_length;
fs_ptr->reader_ptr_->seekg(rp);
fs_ptr->reader_ptr_->Seekg(rp);
std::shared_ptr<uint8_t[]> binptr(bin);
data.Append(std::string(meta, meta_length), binptr, bin_length);
delete[] meta;
}
fs_ptr->reader_ptr_->close();
fs_ptr->reader_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
@ -167,9 +162,7 @@ VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::Binar
index->UpdateIndexSize();
LOG_ENGINE_DEBUG_ << "index file size " << length << " index size " << index->IndexSize();
} else {
std::string err_msg = "Fail to create vector index";
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_UNEXPECTED_ERROR, err_msg);
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Fail to create vector index");
}
}
@ -181,26 +174,25 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX;
auto binaryset = index->Serialize(knowhere::Config());
if (!fs_ptr->writer_ptr_->open(full_file_path)) {
LOG_ENGINE_ERROR_ << "Fail to open vector index: " << full_file_path;
return;
if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
}
for (auto& iter : binaryset.binary_map_) {
auto meta = iter.first.c_str();
size_t meta_length = iter.first.length();
fs_ptr->writer_ptr_->write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->write((void*)meta, meta_length);
fs_ptr->writer_ptr_->Write(&meta_length, sizeof(meta_length));
fs_ptr->writer_ptr_->Write((void*)meta, meta_length);
auto binary = iter.second;
int64_t binary_length = binary->size;
fs_ptr->writer_ptr_->write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->write((void*)binary->data.get(), binary_length);
fs_ptr->writer_ptr_->Write(&binary_length, sizeof(binary_length));
fs_ptr->writer_ptr_->Write((void*)binary->data.get(), binary_length);
}
fs_ptr->writer_ptr_->close();
fs_ptr->writer_ptr_->Close();
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->length() * 1000000.0 / span / 1024 / 1024;
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s";
}

View File

@ -19,4 +19,5 @@ set( STORAGE_FILES ${STORAGE_MAIN_FILES}
)
add_library( storage STATIC )
target_sources( storage PRIVATE ${STORAGE_FILES} )
target_link_libraries( storage PRIVATE log)
add_dependencies( storage fiu )

View File

@ -20,19 +20,19 @@ namespace storage {
class IOReader {
public:
virtual bool
open(const std::string& name) = 0;
Open(const std::string& name) = 0;
virtual void
read(void* ptr, int64_t size) = 0;
Read(void* ptr, int64_t size) = 0;
virtual void
seekg(int64_t pos) = 0;
Seekg(int64_t pos) = 0;
virtual int64_t
length() = 0;
Length() = 0;
virtual void
close() = 0;
Close() = 0;
};
using IOReaderPtr = std::shared_ptr<IOReader>;

View File

@ -20,16 +20,16 @@ namespace storage {
class IOWriter {
public:
virtual bool
open(const std::string& name) = 0;
Open(const std::string& name) = 0;
virtual void
write(void* ptr, int64_t size) = 0;
Write(void* ptr, int64_t size) = 0;
virtual int64_t
length() = 0;
Length() = 0;
virtual void
close() = 0;
Close() = 0;
};
using IOWriterPtr = std::shared_ptr<IOWriter>;

View File

@ -15,32 +15,38 @@ namespace milvus {
namespace storage {
bool
DiskIOReader::open(const std::string& name) {
DiskIOReader::Open(const std::string& name) {
name_ = name;
fs_ = std::fstream(name_, std::ios::in | std::ios::binary);
return fs_.good();
return fs_.is_open();
}
void
DiskIOReader::read(void* ptr, int64_t size) {
DiskIOReader::Read(void* ptr, int64_t size) {
fs_.read(reinterpret_cast<char*>(ptr), size);
}
void
DiskIOReader::seekg(int64_t pos) {
DiskIOReader::Seekg(int64_t pos) {
fs_.seekg(pos);
}
int64_t
DiskIOReader::length() {
DiskIOReader::Length() {
/* save current position */
int64_t cur = fs_.tellg();
/* move position to end of file */
fs_.seekg(0, fs_.end);
int64_t len = fs_.tellg();
fs_.seekg(0, fs_.beg);
/* restore position */
fs_.seekg(cur, fs_.beg);
return len;
}
void
DiskIOReader::close() {
DiskIOReader::Close() {
fs_.close();
}

View File

@ -34,19 +34,19 @@ class DiskIOReader : public IOReader {
operator=(DiskIOReader&&) = delete;
bool
open(const std::string& name) override;
Open(const std::string& name) override;
void
read(void* ptr, int64_t size) override;
Read(void* ptr, int64_t size) override;
void
seekg(int64_t pos) override;
Seekg(int64_t pos) override;
int64_t
length() override;
Length() override;
void
close() override;
Close() override;
public:
std::string name_;

View File

@ -15,7 +15,7 @@ namespace milvus {
namespace storage {
bool
DiskIOWriter::open(const std::string& name) {
DiskIOWriter::Open(const std::string& name) {
name_ = name;
len_ = 0;
fs_ = std::fstream(name_, std::ios::out | std::ios::binary);
@ -23,18 +23,18 @@ DiskIOWriter::open(const std::string& name) {
}
void
DiskIOWriter::write(void* ptr, int64_t size) {
DiskIOWriter::Write(void* ptr, int64_t size) {
fs_.write(reinterpret_cast<char*>(ptr), size);
len_ += size;
}
int64_t
DiskIOWriter::length() {
DiskIOWriter::Length() {
return len_;
}
void
DiskIOWriter::close() {
DiskIOWriter::Close() {
fs_.close();
}

View File

@ -34,16 +34,16 @@ class DiskIOWriter : public IOWriter {
operator=(DiskIOWriter&&) = delete;
bool
open(const std::string& name) override;
Open(const std::string& name) override;
void
write(void* ptr, int64_t size) override;
Write(void* ptr, int64_t size) override;
int64_t
length() override;
Length() override;
void
close() override;
Close() override;
public:
std::string name_;

View File

@ -19,9 +19,9 @@
#include <fiu-local.h>
#include "log/Log.h"
#include "storage/disk/DiskOperation.h"
#include "utils/Exception.h"
#include "utils/Log.h"
namespace milvus {
namespace storage {
@ -38,9 +38,7 @@ DiskOperation::CreateDirectory() {
auto ret = std::experimental::filesystem::create_directories(dir_path_);
fiu_do_on("DiskOperation.CreateDirectory.create_directory", ret = false);
if (!ret) {
std::string err_msg = "Failed to create directory: " + dir_path_;
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FOLDER, err_msg);
THROW_ERROR(SERVER_CANNOT_CREATE_FOLDER, "Failed to create directory: " + dir_path_);
}
}
}

View File

@ -16,29 +16,29 @@ namespace milvus {
namespace storage {
bool
S3IOReader::open(const std::string& name) {
S3IOReader::Open(const std::string& name) {
name_ = name;
pos_ = 0;
return (S3ClientWrapper::GetInstance().GetObjectStr(name_, buffer_).ok());
}
void
S3IOReader::read(void* ptr, int64_t size) {
S3IOReader::Read(void* ptr, int64_t size) {
memcpy(ptr, buffer_.data() + pos_, size);
}
void
S3IOReader::seekg(int64_t pos) {
S3IOReader::Seekg(int64_t pos) {
pos_ = pos;
}
int64_t
S3IOReader::length() {
S3IOReader::Length() {
return buffer_.length();
}
void
S3IOReader::close() {
S3IOReader::Close() {
}
} // namespace storage

View File

@ -33,19 +33,19 @@ class S3IOReader : public IOReader {
operator=(S3IOReader&&) = delete;
bool
open(const std::string& name) override;
Open(const std::string& name) override;
void
read(void* ptr, int64_t size) override;
Read(void* ptr, int64_t size) override;
void
seekg(int64_t pos) override;
Seekg(int64_t pos) override;
int64_t
length() override;
Length() override;
void
close() override;
Close() override;
public:
std::string name_;

View File

@ -33,16 +33,16 @@ class S3IOWriter : public IOWriter {
operator=(S3IOWriter&&) = delete;
bool
open(const std::string& name) override;
Open(const std::string& name) override;
void
write(void* ptr, int64_t size) override;
Write(void* ptr, int64_t size) override;
int64_t
length() override;
Length() override;
void
close() override;
Close() override;
public:
std::string name_;

View File

@ -72,6 +72,7 @@ constexpr ErrorCode SERVER_CANNOT_DELETE_FOLDER = ToServerErrorCode(10);
constexpr ErrorCode SERVER_CANNOT_DELETE_FILE = ToServerErrorCode(11);
constexpr ErrorCode SERVER_BUILD_INDEX_ERROR = ToServerErrorCode(12);
constexpr ErrorCode SERVER_CANNOT_OPEN_FILE = ToServerErrorCode(13);
constexpr ErrorCode SERVER_CANNOT_READ_FILE = ToServerErrorCode(14);
constexpr ErrorCode SERVER_COLLECTION_NOT_EXIST = ToServerErrorCode(100);
constexpr ErrorCode SERVER_INVALID_COLLECTION_NAME = ToServerErrorCode(101);

View File

@ -18,6 +18,10 @@
namespace milvus {
#define THROW_ERROR(err_code, err_msg) \
LOG_ENGINE_ERROR_ << err_msg; \
throw Exception(err_code, err_msg);
class Exception : public std::exception {
public:
Exception(ErrorCode code, const std::string& message) : code_(code), message_(message) {

View File

@ -83,4 +83,4 @@ add_subdirectory(db)
#add_subdirectory(metrics)
#add_subdirectory(scheduler)
#add_subdirectory(thirdparty)
#add_subdirectory(storage)
add_subdirectory(storage)

View File

@ -21,18 +21,20 @@ include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include")
link_directories("${CUDA_TOOLKIT_ROOT_DIR}/lib64")
add_executable(test_storage
${common_files}
${log_files}
${config_files}
${storage_files}
${test_files}
)
target_link_libraries(test_storage
${UNITTEST_LIBS}
stdc++
knowhere
server
milvus_engine
metrics
${unittest_libs}
config
utils
tracing
query
log
)
install(TARGETS test_storage DESTINATION unittest)

View File

@ -27,32 +27,32 @@ TEST_F(StorageTest, DISK_RW_TEST) {
{
milvus::storage::DiskIOWriter writer;
ASSERT_TRUE(writer.open(index_name));
ASSERT_TRUE(writer.Open(index_name));
size_t len = content.length();
writer.write(&len, sizeof(len));
writer.write((void*)(content.data()), len);
ASSERT_TRUE(len + sizeof(len) == writer.length());
writer.close();
writer.Write(&len, sizeof(len));
writer.Write((void*)(content.data()), len);
ASSERT_TRUE(len + sizeof(len) == writer.Length());
writer.Close();
}
{
milvus::storage::DiskIOReader reader;
ASSERT_FALSE(reader.open("/tmp/notexist"));
ASSERT_TRUE(reader.open(index_name));
int64_t length = reader.length();
ASSERT_FALSE(reader.Open("/tmp/notexist"));
ASSERT_TRUE(reader.Open(index_name));
int64_t length = reader.Length();
int64_t rp = 0;
reader.seekg(rp);
reader.Seekg(rp);
std::string content_out;
while (rp < length) {
size_t len;
reader.read(&len, sizeof(len));
reader.Read(&len, sizeof(len));
rp += sizeof(len);
reader.seekg(rp);
reader.Seekg(rp);
auto data = new char[len];
reader.read(data, len);
reader.Read(data, len);
rp += len;
reader.seekg(rp);
reader.Seekg(rp);
content_out += std::string(data, len);
@ -60,7 +60,7 @@ TEST_F(StorageTest, DISK_RW_TEST) {
}
ASSERT_TRUE(content == content_out);
reader.close();
reader.Close();
}
}

View File

@ -48,8 +48,8 @@ StorageTest::SetUp() {
config_path += CONFIG_FILE;
WriteToFile(config_path, CONFIG_STR);
milvus::server::Config& config = milvus::server::Config::GetInstance();
ASSERT_TRUE(config.LoadConfigFile(config_path).ok());
// milvus::server::Config& config = milvus::server::Config::GetInstance();
// ASSERT_TRUE(config.LoadConfigFile(config_path).ok());
}
void