add file header and tail implementation (#3316)

* add file header and tail implementation

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>

* remove files and formate code

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>

* fix bugs

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>

* format code

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>

* fix bug in getEntityById

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>

* remove useless lines and add zip md5 cheksum

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>

* change crc32c thirdparty reposity

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>
pull/3341/head
chen qingxiang 2020-08-19 16:17:23 +08:00 committed by GitHub
parent 3b30504859
commit fe905a0062
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 582 additions and 30 deletions

View File

@ -17,12 +17,13 @@
#include "codecs/BlockFormat.h" #include "codecs/BlockFormat.h"
#include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <algorithm> #include <algorithm>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <memory> #include <memory>
#include <unordered_map>
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h" #include "utils/Exception.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
@ -32,10 +33,13 @@ namespace codec {
void void
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) { BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (!fs_ptr->reader_ptr_->Open(file_path)) { if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
} }
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes; size_t num_bytes;
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t)); fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
@ -48,6 +52,8 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
void void
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes, BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes,
engine::BinaryDataPtr& raw) { engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (offset < 0 || num_bytes <= 0) { if (offset < 0 || num_bytes <= 0) {
THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path); THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path);
} }
@ -56,10 +62,12 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
} }
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t total_num_bytes; size_t total_num_bytes;
fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t)); fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t));
offset += sizeof(size_t); // Beginning of file is num_bytes offset += MAGIC_SIZE + HEADER_SIZE + sizeof(size_t); // Beginning of file is num_bytes
if (offset + num_bytes > total_num_bytes) { if (offset + num_bytes > total_num_bytes) {
THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path); THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path);
} }
@ -74,6 +82,8 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
void void
BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges, BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges,
engine::BinaryDataPtr& raw) { engine::BinaryDataPtr& raw) {
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (read_ranges.empty()) { if (read_ranges.empty()) {
return; return;
} }
@ -82,6 +92,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path);
} }
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t total_num_bytes; size_t total_num_bytes;
fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t)); fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t));
@ -97,7 +108,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p
raw->data_.resize(total_bytes); raw->data_.resize(total_bytes);
int64_t poz = 0; int64_t poz = 0;
for (auto& range : read_ranges) { for (auto& range : read_ranges) {
int64_t offset = range.offset_ + sizeof(size_t); int64_t offset = MAGIC_SIZE + HEADER_SIZE + sizeof(size_t) + range.offset_;
fs_ptr->reader_ptr_->Seekg(offset); fs_ptr->reader_ptr_->Seekg(offset);
fs_ptr->reader_ptr_->Read(raw->data_.data() + poz, range.num_bytes_); fs_ptr->reader_ptr_->Read(raw->data_.data() + poz, range.num_bytes_);
poz += range.num_bytes_; poz += range.num_bytes_;
@ -111,15 +122,23 @@ BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_
if (raw == nullptr) { if (raw == nullptr) {
return; return;
} }
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, file_path)
WRITE_HEADER(fs_ptr, file_path, maps);
if (!fs_ptr->writer_ptr_->Open(file_path)) { if (!fs_ptr->writer_ptr_->InOpen(file_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path); THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path);
} }
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes = raw->data_.size(); size_t num_bytes = raw->data_.size();
fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t)); fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write((void*)(raw->data_.data()), num_bytes); fs_ptr->writer_ptr_->Write((void*)(raw->data_.data()), num_bytes);
fs_ptr->writer_ptr_->Close(); fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, file_path);
} }
} // namespace codec } // namespace codec

View File

@ -17,15 +17,16 @@
#include "codecs/DeletedDocsFormat.h" #include "codecs/DeletedDocsFormat.h"
#include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <experimental/filesystem> #include <experimental/filesystem>
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <vector> #include <vector>
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h" #include "utils/Exception.h"
#include "utils/Log.h" #include "utils/Log.h"
@ -45,10 +46,13 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string&
segment::DeletedDocsPtr& deleted_docs) { segment::DeletedDocsPtr& deleted_docs) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) { if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path);
} }
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes; size_t num_bytes;
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t)); fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
@ -95,14 +99,20 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
if (!deleted_docs_list.empty()) { if (!deleted_docs_list.empty()) {
delete_ids.insert(delete_ids.end(), deleted_docs_list.begin(), deleted_docs_list.end()); delete_ids.insert(delete_ids.end(), deleted_docs_list.begin(), deleted_docs_list.end());
} }
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, temp_path)
WRITE_HEADER(fs_ptr, temp_path, maps);
if (!fs_ptr->writer_ptr_->Open(temp_path)) { if (!fs_ptr->writer_ptr_->InOpen(temp_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path); THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path);
} }
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t)); fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes); fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes);
fs_ptr->writer_ptr_->Close(); fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, temp_path);
// Move temp file to delete file // Move temp file to delete file
std::experimental::filesystem::rename(temp_path, full_file_path); std::experimental::filesystem::rename(temp_path, full_file_path);
@ -111,10 +121,13 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
void void
DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size) { DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size) {
const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->writer_ptr_->Open(full_file_path)) { if (!fs_ptr->writer_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path); THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path);
} }
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes; size_t num_bytes;
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t)); fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));

View File

@ -17,16 +17,16 @@
#include "codecs/StructuredIndexFormat.h" #include "codecs/StructuredIndexFormat.h"
#include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <algorithm> #include <algorithm>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <memory> #include <memory>
#include <utility> #include <unordered_map>
#include "db/Types.h" #include "db/Types.h"
#include "knowhere/index/structured_index/StructuredIndexSort.h" #include "knowhere/index/structured_index/StructuredIndexSort.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h" #include "utils/Exception.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
@ -85,20 +85,22 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
knowhere::BinarySet load_data_list; knowhere::BinarySet load_data_list;
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX; std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) { if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
} }
int64_t length = fs_ptr->reader_ptr_->Length(); int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE;
if (length <= 0) { if (length <= 0) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path); THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path);
} }
size_t rp = 0; size_t rp = MAGIC_SIZE + HEADER_SIZE;
fs_ptr->reader_ptr_->Seekg(0); fs_ptr->reader_ptr_->Seekg(rp);
int32_t data_type = 0; int32_t data_type = 0;
fs_ptr->reader_ptr_->Read(&data_type, sizeof(data_type)); fs_ptr->reader_ptr_->Read(&data_type, sizeof(data_type));
rp += sizeof(data_type); rp += sizeof(data_type) + MAGIC_SIZE + HEADER_SIZE;
fs_ptr->reader_ptr_->Seekg(rp); fs_ptr->reader_ptr_->Seekg(rp);
LOG_ENGINE_DEBUG_ << "Start to read_index(" << full_file_path << ") length: " << length << " bytes"; LOG_ENGINE_DEBUG_ << "Start to read_index(" << full_file_path << ") length: " << length << " bytes";
@ -144,11 +146,17 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const
milvus::TimeRecorder recorder("StructuredIndexFormat::Write"); milvus::TimeRecorder recorder("StructuredIndexFormat::Write");
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX; std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, full_file_path)
WRITE_HEADER(fs_ptr, full_file_path, maps);
auto binaryset = index->Serialize(knowhere::Config()); auto binaryset = index->Serialize(knowhere::Config());
if (!fs_ptr->writer_ptr_->Open(full_file_path)) { if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path);
} }
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type)); fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type));
for (auto& iter : binaryset.binary_map_) { for (auto& iter : binaryset.binary_map_) {
@ -164,6 +172,7 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const
} }
fs_ptr->writer_ptr_->Close(); fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
double span = recorder.RecordSection("End"); double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;

View File

@ -17,9 +17,11 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <memory> #include <memory>
#include <unordered_map>
#include "codecs/VectorCompressFormat.h" #include "codecs/VectorCompressFormat.h"
#include "knowhere/common/BinarySet.h" #include "knowhere/common/BinarySet.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h" #include "utils/Exception.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
@ -41,11 +43,13 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin
milvus::TimeRecorder recorder("VectorCompressFormat::Read"); milvus::TimeRecorder recorder("VectorCompressFormat::Read");
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX; const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) { if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path);
} }
int64_t length = fs_ptr->reader_ptr_->Length(); int64_t length = fs_ptr->reader_ptr_->Length() - MAGIC_SIZE - HEADER_SIZE - SUM_SIZE;
if (length <= 0) { if (length <= 0) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector compress length: " + full_file_path); THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector compress length: " + full_file_path);
} }
@ -53,7 +57,7 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin
compress->data = std::shared_ptr<uint8_t[]>(new uint8_t[length]); compress->data = std::shared_ptr<uint8_t[]>(new uint8_t[length]);
compress->size = length; compress->size = length;
fs_ptr->reader_ptr_->Seekg(0); fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->reader_ptr_->Read(compress->data.get(), length); fs_ptr->reader_ptr_->Read(compress->data.get(), length);
fs_ptr->reader_ptr_->Close(); fs_ptr->reader_ptr_->Close();
@ -68,12 +72,18 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri
milvus::TimeRecorder recorder("VectorCompressFormat::Write"); milvus::TimeRecorder recorder("VectorCompressFormat::Write");
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX; const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
if (!fs_ptr->writer_ptr_->Open(full_file_path)) { // TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, full_file_path)
WRITE_HEADER(fs_ptr, full_file_path, maps);
if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path);
} }
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size); fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size);
fs_ptr->writer_ptr_->Close(); fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
double span = recorder.RecordSection("End"); double span = recorder.RecordSection("End");
double rate = compress->size * 1000000.0 / span / 1024 / 1024; double rate = compress->size * 1000000.0 / span / 1024 / 1024;

View File

@ -17,12 +17,14 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <memory> #include <memory>
#include <unordered_map>
#include "codecs/Codec.h" #include "codecs/Codec.h"
#include "codecs/VectorIndexFormat.h" #include "codecs/VectorIndexFormat.h"
#include "knowhere/common/BinarySet.h" #include "knowhere/common/BinarySet.h"
#include "knowhere/index/vector_index/VecIndex.h" #include "knowhere/index/vector_index/VecIndex.h"
#include "knowhere/index/vector_index/VecIndexFactory.h" #include "knowhere/index/vector_index/VecIndexFactory.h"
#include "storage/ExtraFileInfo.h"
#include "utils/Exception.h" #include "utils/Exception.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
@ -42,11 +44,14 @@ void
VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinaryPtr& data) { knowhere::BinaryPtr& data) {
milvus::TimeRecorder recorder("VectorIndexFormat::ReadRaw"); milvus::TimeRecorder recorder("VectorIndexFormat::ReadRaw");
CHECK_MAGIC_VALID(fs_ptr, file_path);
CHECK_SUM_VALID(fs_ptr, file_path);
if (!fs_ptr->reader_ptr_->Open(file_path)) { if (!fs_ptr->reader_ptr_->Open(file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path);
} }
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE);
size_t num_bytes; size_t num_bytes;
fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t)); fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t));
@ -55,7 +60,7 @@ VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::strin
data->data = std::shared_ptr<uint8_t[]>(new uint8_t[num_bytes]); data->data = std::shared_ptr<uint8_t[]>(new uint8_t[num_bytes]);
// Beginning of file is num_bytes // Beginning of file is num_bytes
fs_ptr->reader_ptr_->Seekg(sizeof(size_t)); fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE + sizeof(size_t));
fs_ptr->reader_ptr_->Read(data->data.get(), num_bytes); fs_ptr->reader_ptr_->Read(data->data.get(), num_bytes);
fs_ptr->reader_ptr_->Close(); fs_ptr->reader_ptr_->Close();
@ -70,17 +75,19 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str
milvus::TimeRecorder recorder("VectorIndexFormat::ReadIndex"); milvus::TimeRecorder recorder("VectorIndexFormat::ReadIndex");
std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX; std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX;
CHECK_MAGIC_VALID(fs_ptr, full_file_path);
CHECK_SUM_VALID(fs_ptr, full_file_path);
if (!fs_ptr->reader_ptr_->Open(full_file_path)) { if (!fs_ptr->reader_ptr_->Open(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
} }
int64_t length = fs_ptr->reader_ptr_->Length(); int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE;
if (length <= 0) { if (length <= 0) {
THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector index length: " + full_file_path); THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector index length: " + full_file_path);
} }
int64_t rp = 0; int64_t rp = MAGIC_SIZE + HEADER_SIZE;
fs_ptr->reader_ptr_->Seekg(0); fs_ptr->reader_ptr_->Seekg(rp);
LOG_ENGINE_DEBUG_ << "Start to ReadIndex(" << full_file_path << ") length: " << length << " bytes"; LOG_ENGINE_DEBUG_ << "Start to ReadIndex(" << full_file_path << ") length: " << length << " bytes";
while (rp < length) { while (rp < length) {
@ -172,12 +179,17 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
milvus::TimeRecorder recorder("SVectorIndexFormat::WriteIndex"); milvus::TimeRecorder recorder("SVectorIndexFormat::WriteIndex");
std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX; std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX;
// TODO: add extra info
std::unordered_map<std::string, std::string> maps;
WRITE_MAGIC(fs_ptr, full_file_path)
WRITE_HEADER(fs_ptr, full_file_path, maps);
auto binaryset = index->Serialize(knowhere::Config()); auto binaryset = index->Serialize(knowhere::Config());
if (!fs_ptr->writer_ptr_->Open(full_file_path)) { if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) {
THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path); THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path);
} }
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE);
for (auto& iter : binaryset.binary_map_) { for (auto& iter : binaryset.binary_map_) {
auto meta = iter.first.c_str(); auto meta = iter.first.c_str();
size_t meta_length = iter.first.length(); size_t meta_length = iter.first.length();
@ -191,6 +203,8 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
} }
fs_ptr->writer_ptr_->Close(); fs_ptr->writer_ptr_->Close();
WRITE_SUM(fs_ptr, full_file_path);
double span = recorder.RecordSection("End"); double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s"; LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s";

View File

@ -136,8 +136,8 @@ target_link_libraries( milvus_engine
PUBLIC knowhere PUBLIC knowhere
segment segment
cache cache
storage
codecs codecs
storage
tracing tracing
${THIRD_PARTY_LIBS} ${THIRD_PARTY_LIBS}
${ENGINE_LIBS} ${ENGINE_LIBS}

View File

@ -10,14 +10,58 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License. # or implied. See the License for the specific language governing permissions and limitations under the License.
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
aux_source_directory( ${MILVUS_ENGINE_SRC}/storage STORAGE_MAIN_FILES ) set( STORAGE_SRCS
aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/disk STORAGE_DISK_FILES ) ExtraFileInfo.cpp
ExtraFileInfo.h
FSHandler.h
IOReader.h
IOWriter.h
Operation.h
disk/DiskIOReader.h
disk/DiskIOWriter.h
disk/DiskOperation.h
disk/DiskIOReader.cpp
disk/DiskIOWriter.cpp
disk/DiskOperation.cpp
# s3/S3ClientMock.h
# s3/S3ClientWrapper.h
# s3/S3ClientWrapper.cpp
# s3/S3IOReader.h
# s3/S3IOReader.cpp
# s3/S3IOWriter.h
# s3/S3IOWriter.cpp
)
#aux_source_directory( ${MILVUS_ENGINE_SRC}/storage STORAGE_MAIN_FILES )
#aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/disk STORAGE_DISK_FILES )
# aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/s3 STORAGE_S3_FILES ) # aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/s3 STORAGE_S3_FILES )
set( STORAGE_FILES ${STORAGE_MAIN_FILES} #set( STORAGE_FILES ${STORAGE_MAIN_FILES}
${STORAGE_DISK_FILES} # ${STORAGE_DISK_FILES}
# ${STORAGE_S3_FILES} # ${STORAGE_S3_FILES}
) # )
add_library( storage STATIC ) add_library( storage STATIC )
target_sources( storage PRIVATE ${STORAGE_FILES} ) target_sources( storage PRIVATE ${STORAGE_SRCS} )
target_link_libraries( storage PRIVATE log)
add_dependencies( storage fiu ) add_dependencies( storage fiu )
target_link_libraries( storage
log
crc32c
fiu
libboost_filesystem.a
libstdc++fs.a
)
if ( BUILD_UNIT_TEST )
add_executable( ExtraFileInfoTest )
target_sources( ExtraFileInfoTest PRIVATE ExtraFileInfoTest.cpp)
target_link_libraries( ExtraFileInfoTest
storage
gtest
gtest_main
gmock
gmock_main )
add_test ( NAME ExtraFileInfoTest
COMMAND $<TARGET_FILE:ExtraFileInfoTest>
)
endif()

View File

@ -0,0 +1,177 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <iostream>
#include <regex>
#include <utility>
#include <vector>
#include "crc32c/crc32c.h"
#include "storage/ExtraFileInfo.h"
namespace milvus {
namespace storage {
bool
CheckMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) {
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
char* ch = static_cast<char*>(malloc(MAGIC_SIZE));
fs_ptr->reader_ptr_->Read(ch, MAGIC_SIZE);
bool result = !strcmp(ch, MAGIC);
fs_ptr->reader_ptr_->Close();
return result;
}
void
WriteMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) {
if (!fs_ptr->writer_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->writer_ptr_->Write((void*)MAGIC, MAGIC_SIZE);
fs_ptr->writer_ptr_->Close();
}
std::unordered_map<std::string, std::string>
ReadHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) {
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE);
char* ch = static_cast<char*>(malloc(HEADER_SIZE));
fs_ptr->reader_ptr_->Read(ch, HEADER_SIZE);
std::string data(ch);
auto result = std::unordered_map<std::string, std::string>();
std::regex semicolon(";");
std::vector<std::string> maps(std::sregex_token_iterator(data.begin(), data.end(), semicolon, -1),
std::sregex_token_iterator());
std::regex equal("=");
for (auto& item : maps) {
std::vector<std::string> pair(std::sregex_token_iterator(item.begin(), item.end(), equal, -1),
std::sregex_token_iterator());
result.insert(std::make_pair(pair[0], pair[1]));
}
fs_ptr->reader_ptr_->Close();
return result;
}
std::string
ReadHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key) {
auto kv = ReadHeaderValues(fs_ptr, file_path);
return kv.at(key);
}
std::uint8_t
CalculateSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, bool written) {
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
int size = fs_ptr->reader_ptr_->Length();
if (written) {
size -= SUM_SIZE;
}
char* ch = static_cast<char*>(malloc(size));
fs_ptr->reader_ptr_->Read(ch, size);
std::uint8_t result = crc32c::Crc32c(ch, size);
fs_ptr->reader_ptr_->Close();
return result;
}
void
WriteSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int result, bool written) {
if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (written) {
fs_ptr->writer_ptr_->Seekp(-SUM_SIZE, std::ios_base::end);
} else {
fs_ptr->writer_ptr_->Seekp(0, std::ios_base::end);
}
std::string sum = std::to_string(result);
sum.resize(SUM_SIZE, '\0');
fs_ptr->writer_ptr_->Write(sum.data(), SUM_SIZE);
fs_ptr->writer_ptr_->Close();
}
bool
CheckSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) {
int result = CalculateSum(fs_ptr, file_path, true);
if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->Seekg(-SUM_SIZE, std::ios_base::end);
char* record = static_cast<char*>(malloc(SUM_SIZE));
fs_ptr->reader_ptr_->Read(record, SUM_SIZE);
fs_ptr->reader_ptr_->Close();
auto sum = (uint8_t)atoi(record);
return sum == result;
}
bool
WriteHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key,
const std::string& value) {
auto record = ReadHeaderValues(fs_ptr, file_path);
record.insert(std::make_pair(key, value));
WriteHeaderValues(fs_ptr, file_path, record);
return true;
}
bool
WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const std::unordered_map<std::string, std::string>& maps) {
if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE);
std::string kv;
for (auto& map : maps) {
kv.append(map.first + "=" + map.second + ";");
}
if (kv.size() > HEADER_SIZE) {
throw "Exceeded the limit of header data size";
}
fs_ptr->writer_ptr_->Write(kv.data(), HEADER_SIZE);
fs_ptr->writer_ptr_->Close();
return true;
}
} // namespace storage
} // namespace milvus

View File

@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef EXTRA_FILE_INFO_H
#define EXTRA_FILE_INFO_H
#include <cstdio>
#include <cstring>
#include <string>
#include <unordered_map>
#include <src/log/Log.h>
#include <src/utils/Error.h>
#include <src/utils/Exception.h>
#include "storage/FSHandler.h"
#define MAGIC "Milvus"
#define MAGIC_SIZE 6
#define SINGLE_KV_DATA_SIZE 64
#define HEADER_SIZE 4096
#define SUM_SIZE 16
namespace milvus {
namespace storage {
#define CHECK_MAGIC_VALID(PTR, FILE_PATH) \
if (!CheckMagic(PTR, FILE_PATH)) { \
throw Exception(SERVER_FILE_MAGIC_BYTES_ERROR, "wrong magic bytes"); \
}
#define CHECK_SUM_VALID(PTR, FILE_PATH) \
if (!CheckSum(PTR, FILE_PATH)) { \
throw Exception(SERVER_FILE_SUM_BYTES_ERROR, "wrong sum bytes,file may be changed"); \
}
#define WRITE_MAGIC(PTR, FILE_PATH) \
try { \
WriteMagic(PTR, FILE_PATH); \
} catch (...) { \
throw "write magic failed"; \
}
#define WRITE_HEADER(PTR, FILE_PATH, KV) \
try { \
WriteHeaderValues(PTR, FILE_PATH, KV); \
} catch (...) { \
throw "write sum failed"; \
}
#define WRITE_SUM(PTR, FILE_PATH) \
try { \
int result = CalculateSum(PTR, FILE_PATH); \
WriteSum(PTR, FILE_PATH, result); \
} catch (...) { \
throw "write sum failed"; \
}
void
WriteMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path);
bool
CheckMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path);
bool
CheckSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path);
void
WriteSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int result, bool written = false);
std::uint8_t
CalculateSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, bool written = false);
std::string
ReadHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key);
std::unordered_map<std::string, std::string>
ReadHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path);
bool
WriteHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key,
const std::string& value);
bool
WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const std::unordered_map<std::string, std::string>& maps);
} // namespace storage
} // namespace milvus
#endif // end of EXTRA_FILE_INFO_H

View File

@ -0,0 +1,74 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <cstring>
#include <unordered_map>
#include "easyloggingpp/easylogging++.h"
#include "gtest/gtest.h"
#include "ExtraFileInfo.h"
#include "crc32c/crc32c.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
INITIALIZE_EASYLOGGINGPP
namespace milvus {
namespace storage {
/* ExtraFileInfoTest */
class ExtraFileInfoTest : public testing::Test {
protected:
};
TEST_F(ExtraFileInfoTest, WriteFileTest) {
std::string raw = "helloworldhelloworld";
std::string directory = "/tmp";
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
storage::OperationPtr operation_ptr = std::make_shared<storage::DiskOperation>(directory);
const storage::FSHandlerPtr fs_ptr = std::make_shared<storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
std::string file_path = "/tmp/test.txt";
auto record = std::unordered_map<std::string, std::string>();
record.insert(std::make_pair("test", "test"));
WriteMagic(fs_ptr, file_path);
WriteHeaderValues(fs_ptr, file_path, record);
if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
}
fs_ptr->writer_ptr_->Seekp(0, std::ios_base::end);
size_t num_bytes = raw.size();
fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->Write((void*)(raw.data()), num_bytes);
fs_ptr->writer_ptr_->Close();
int result_sum = CalculateSum(fs_ptr, file_path);
WriteSum(fs_ptr, file_path, result_sum);
ASSERT_TRUE(CheckSum(fs_ptr, file_path));
ASSERT_EQ(ReadHeaderValue(fs_ptr, file_path, "test"), "test");
ASSERT_TRUE(WriteHeaderValue(fs_ptr, file_path, "github", "github"));
ASSERT_EQ(ReadHeaderValue(fs_ptr, file_path, "github"), "github");
result_sum = CalculateSum(fs_ptr, file_path);
WriteSum(fs_ptr, file_path, result_sum, true);
ASSERT_TRUE(CheckMagic(fs_ptr, file_path));
ASSERT_TRUE(CheckSum(fs_ptr, file_path));
}
} // namespace storage
} // namespace milvus

View File

@ -17,6 +17,7 @@
#pragma once #pragma once
#include <iostream>
#include <memory> #include <memory>
#include "storage/IOReader.h" #include "storage/IOReader.h"

View File

@ -28,6 +28,9 @@ class IOReader {
virtual void virtual void
Seekg(int64_t pos) = 0; Seekg(int64_t pos) = 0;
virtual void
Seekg(int64_t pos, std::ios_base::seekdir seekdir) = 0;
virtual int64_t virtual int64_t
Length() = 0; Length() = 0;

View File

@ -22,9 +22,18 @@ class IOWriter {
virtual bool virtual bool
Open(const std::string& name) = 0; Open(const std::string& name) = 0;
virtual bool
InOpen(const std::string& name) = 0;
virtual void virtual void
Write(void* ptr, int64_t size) = 0; Write(void* ptr, int64_t size) = 0;
virtual void
Seekp(int64_t pos) = 0;
virtual void
Seekp(int64_t pos, std::ios_base::seekdir seekdir) = 0;
virtual int64_t virtual int64_t
Length() = 0; Length() = 0;

View File

@ -30,6 +30,10 @@ void
DiskIOReader::Seekg(int64_t pos) { DiskIOReader::Seekg(int64_t pos) {
fs_.seekg(pos); fs_.seekg(pos);
} }
void
DiskIOReader::Seekg(int64_t pos, std::ios_base::seekdir seekdir) {
fs_.seekg(pos, seekdir);
}
int64_t int64_t
DiskIOReader::Length() { DiskIOReader::Length() {

View File

@ -42,6 +42,9 @@ class DiskIOReader : public IOReader {
void void
Seekg(int64_t pos) override; Seekg(int64_t pos) override;
void
Seekg(int64_t pos, std::ios_base::seekdir seekdir) override;
int64_t int64_t
Length() override; Length() override;

View File

@ -21,6 +21,13 @@ DiskIOWriter::Open(const std::string& name) {
fs_ = std::fstream(name_, std::ios::out | std::ios::binary); fs_ = std::fstream(name_, std::ios::out | std::ios::binary);
return fs_.good(); return fs_.good();
} }
bool
DiskIOWriter::InOpen(const std::string& name) {
name_ = name;
len_ = 0;
fs_ = std::fstream(name_, std::ios::out | std::ios::binary | std::ios::in);
return fs_.good();
}
void void
DiskIOWriter::Write(void* ptr, int64_t size) { DiskIOWriter::Write(void* ptr, int64_t size) {
@ -37,6 +44,14 @@ void
DiskIOWriter::Close() { DiskIOWriter::Close() {
fs_.close(); fs_.close();
} }
void
DiskIOWriter::Seekp(int64_t pos) {
fs_.seekp(pos);
}
void
DiskIOWriter::Seekp(int64_t pos, std::ios_base::seekdir seekdir) {
fs_.seekp(pos, seekdir);
}
} // namespace storage } // namespace storage
} // namespace milvus } // namespace milvus

View File

@ -36,9 +36,18 @@ class DiskIOWriter : public IOWriter {
bool bool
Open(const std::string& name) override; Open(const std::string& name) override;
bool
InOpen(const std::string& name) override;
void void
Write(void* ptr, int64_t size) override; Write(void* ptr, int64_t size) override;
void
Seekp(int64_t pos) override;
void
Seekp(int64_t pos, std::ios_base::seekdir seekdir) override;
int64_t int64_t
Length() override; Length() override;

View File

@ -72,7 +72,9 @@ constexpr ErrorCode SERVER_CANNOT_DELETE_FOLDER = ToServerErrorCode(10);
constexpr ErrorCode SERVER_CANNOT_DELETE_FILE = ToServerErrorCode(11); constexpr ErrorCode SERVER_CANNOT_DELETE_FILE = ToServerErrorCode(11);
constexpr ErrorCode SERVER_BUILD_INDEX_ERROR = ToServerErrorCode(12); constexpr ErrorCode SERVER_BUILD_INDEX_ERROR = ToServerErrorCode(12);
constexpr ErrorCode SERVER_CANNOT_OPEN_FILE = ToServerErrorCode(13); constexpr ErrorCode SERVER_CANNOT_OPEN_FILE = ToServerErrorCode(13);
constexpr ErrorCode SERVER_CANNOT_READ_FILE = ToServerErrorCode(14); constexpr ErrorCode SERVER_FILE_MAGIC_BYTES_ERROR = ToServerErrorCode(14);
constexpr ErrorCode SERVER_FILE_SUM_BYTES_ERROR = ToServerErrorCode(15);
constexpr ErrorCode SERVER_CANNOT_READ_FILE = ToServerErrorCode(16);
constexpr ErrorCode SERVER_COLLECTION_NOT_EXIST = ToServerErrorCode(100); constexpr ErrorCode SERVER_COLLECTION_NOT_EXIST = ToServerErrorCode(100);
constexpr ErrorCode SERVER_INVALID_COLLECTION_NAME = ToServerErrorCode(101); constexpr ErrorCode SERVER_INVALID_COLLECTION_NAME = ToServerErrorCode(101);

View File

@ -47,6 +47,9 @@ if ( MILVUS_WITH_YAMLCPP )
add_subdirectory( yaml-cpp ) add_subdirectory( yaml-cpp )
endif() endif()
# ****************************** Thirdparty crc32c ***************************************
add_subdirectory( crc32c )
# ****************************** Thirdparty opentracing *************************************** # ****************************** Thirdparty opentracing ***************************************
if ( MILVUS_WITH_OPENTRACING ) if ( MILVUS_WITH_OPENTRACING )
add_subdirectory( opentracing ) add_subdirectory( opentracing )

39
core/thirdparty/crc32c/CMakeLists.txt vendored Normal file
View File

@ -0,0 +1,39 @@
#-------------------------------------------------------------------------------
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
if ( DEFINED ENV{MILVUS_CRC32C_URL} )
set( CRC32C_SOURCE_URL "$ENV{MILVUS_CRC32C_URL}" )
endif()
set( CRC32C_SOURCE_URL
"https://github.com/google/crc32c/archive/${CRC32C_VERSION}.zip")
message( STATUS "Building crc32c-master from source" )
FetchContent_Declare(
crc32c
URL ${CRC32C_SOURCE_URL}
URL_MD5 "d24bd57ca2d9fb051aa589818c0e2c10"
SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/crc32c-src
BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/crc32c-build
)
FetchContent_GetProperties( crc32c )
if ( NOT crc32c_POPULATED )
FetchContent_Populate( crc32c )
set (CRC32C_BUILD_TESTS CACHE BOOL OFF FORCE)
set (CRC32C_BUILD_BENCHMARKS CACHE BOOL OFF FORCE)
set (CRC32C_USE_GLOG CACHE BOOL OFF FORCE)
add_subdirectory( ${crc32c_SOURCE_DIR}
${crc32c_BINARY_DIR}
EXCLUDE_FROM_ALL )
endif()

View File

@ -12,5 +12,6 @@ OPENTRACING_VERSION=v1.5.1
FIU_VERSION=1.00 FIU_VERSION=1.00
OATPP_VERSION=1.1.0 OATPP_VERSION=1.1.0
AWS_VERSION=1.7.250 AWS_VERSION=1.7.250
CRC32C_VERSION=1.1.1
# vim: set filetype=sh: # vim: set filetype=sh: