diff --git a/core/src/codecs/BlockFormat.cpp b/core/src/codecs/BlockFormat.cpp index 7df5a77d33..30d96f9c97 100644 --- a/core/src/codecs/BlockFormat.cpp +++ b/core/src/codecs/BlockFormat.cpp @@ -17,12 +17,13 @@ #include "codecs/BlockFormat.h" -#include #include #include #include #include +#include +#include "storage/ExtraFileInfo.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" @@ -32,10 +33,13 @@ namespace codec { void BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, engine::BinaryDataPtr& raw) { + CHECK_MAGIC_VALID(fs_ptr, file_path); + CHECK_SUM_VALID(fs_ptr, file_path); if (!fs_ptr->reader_ptr_->Open(file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); size_t num_bytes; fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t)); @@ -48,6 +52,8 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p void BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int64_t offset, int64_t num_bytes, engine::BinaryDataPtr& raw) { + CHECK_MAGIC_VALID(fs_ptr, file_path); + CHECK_SUM_VALID(fs_ptr, file_path); if (offset < 0 || num_bytes <= 0) { THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid input to read: " + file_path); } @@ -56,10 +62,12 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); + size_t total_num_bytes; fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t)); - offset += sizeof(size_t); // Beginning of file is num_bytes + offset += MAGIC_SIZE + HEADER_SIZE + sizeof(size_t); // Beginning of file is num_bytes if (offset + num_bytes > total_num_bytes) { THROW_ERROR(SERVER_INVALID_ARGUMENT, "Invalid argument to read: " + file_path); } @@ -74,6 +82,8 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p void BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const ReadRanges& read_ranges, engine::BinaryDataPtr& raw) { + CHECK_MAGIC_VALID(fs_ptr, file_path); + CHECK_SUM_VALID(fs_ptr, file_path); if (read_ranges.empty()) { return; } @@ -82,6 +92,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open file: " + file_path); } + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); size_t total_num_bytes; fs_ptr->reader_ptr_->Read(&total_num_bytes, sizeof(size_t)); @@ -97,7 +108,7 @@ BlockFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_p raw->data_.resize(total_bytes); int64_t poz = 0; for (auto& range : read_ranges) { - int64_t offset = range.offset_ + sizeof(size_t); + int64_t offset = MAGIC_SIZE + HEADER_SIZE + sizeof(size_t) + range.offset_; fs_ptr->reader_ptr_->Seekg(offset); fs_ptr->reader_ptr_->Read(raw->data_.data() + poz, range.num_bytes_); poz += range.num_bytes_; @@ -111,15 +122,23 @@ BlockFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_ if (raw == nullptr) { return; } + // TODO: add extra info + std::unordered_map maps; + WRITE_MAGIC(fs_ptr, file_path) + WRITE_HEADER(fs_ptr, file_path, maps); - if (!fs_ptr->writer_ptr_->Open(file_path)) { + if (!fs_ptr->writer_ptr_->InOpen(file_path)) { THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open file: " + file_path); } + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); + size_t num_bytes = raw->data_.size(); fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t)); fs_ptr->writer_ptr_->Write((void*)(raw->data_.data()), num_bytes); fs_ptr->writer_ptr_->Close(); + + WRITE_SUM(fs_ptr, file_path); } } // namespace codec diff --git a/core/src/codecs/DeletedDocsFormat.cpp b/core/src/codecs/DeletedDocsFormat.cpp index 8152a69028..47e9f63eaa 100644 --- a/core/src/codecs/DeletedDocsFormat.cpp +++ b/core/src/codecs/DeletedDocsFormat.cpp @@ -17,15 +17,16 @@ #include "codecs/DeletedDocsFormat.h" -#include #include #include #include #include +#include #include +#include "storage/ExtraFileInfo.h" #include "utils/Exception.h" #include "utils/Log.h" @@ -45,10 +46,13 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& segment::DeletedDocsPtr& deleted_docs) { const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; + CHECK_MAGIC_VALID(fs_ptr, full_file_path); + CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open deleted docs file: " + full_file_path); } + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); size_t num_bytes; fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t)); @@ -95,14 +99,20 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& if (!deleted_docs_list.empty()) { delete_ids.insert(delete_ids.end(), deleted_docs_list.begin(), deleted_docs_list.end()); } + // TODO: add extra info + std::unordered_map maps; + WRITE_MAGIC(fs_ptr, temp_path) + WRITE_HEADER(fs_ptr, temp_path, maps); - if (!fs_ptr->writer_ptr_->Open(temp_path)) { + if (!fs_ptr->writer_ptr_->InOpen(temp_path)) { THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to write file: " + temp_path); } + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); fs_ptr->writer_ptr_->Write(&new_num_bytes, sizeof(size_t)); fs_ptr->writer_ptr_->Write(delete_ids.data(), new_num_bytes); fs_ptr->writer_ptr_->Close(); + WRITE_SUM(fs_ptr, temp_path); // Move temp file to delete file std::experimental::filesystem::rename(temp_path, full_file_path); @@ -111,10 +121,13 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& void DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, size_t& size) { const std::string full_file_path = file_path + DELETED_DOCS_POSTFIX; + CHECK_MAGIC_VALID(fs_ptr, full_file_path); + CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->writer_ptr_->Open(full_file_path)) { THROW_ERROR(SERVER_CANNOT_CREATE_FILE, "Fail to open deleted docs file: " + full_file_path); } + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); size_t num_bytes; fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t)); diff --git a/core/src/codecs/StructuredIndexFormat.cpp b/core/src/codecs/StructuredIndexFormat.cpp index 4c5d248cf1..39c56c1455 100644 --- a/core/src/codecs/StructuredIndexFormat.cpp +++ b/core/src/codecs/StructuredIndexFormat.cpp @@ -17,16 +17,16 @@ #include "codecs/StructuredIndexFormat.h" -#include #include #include #include #include -#include +#include #include "db/Types.h" #include "knowhere/index/structured_index/StructuredIndexSort.h" +#include "storage/ExtraFileInfo.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" @@ -85,20 +85,22 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s knowhere::BinarySet load_data_list; std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX; + CHECK_MAGIC_VALID(fs_ptr, full_file_path); + CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path); } - int64_t length = fs_ptr->reader_ptr_->Length(); + int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE; if (length <= 0) { THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid structured index length: " + full_file_path); } - size_t rp = 0; - fs_ptr->reader_ptr_->Seekg(0); + size_t rp = MAGIC_SIZE + HEADER_SIZE; + fs_ptr->reader_ptr_->Seekg(rp); int32_t data_type = 0; fs_ptr->reader_ptr_->Read(&data_type, sizeof(data_type)); - rp += sizeof(data_type); + rp += sizeof(data_type) + MAGIC_SIZE + HEADER_SIZE; fs_ptr->reader_ptr_->Seekg(rp); LOG_ENGINE_DEBUG_ << "Start to read_index(" << full_file_path << ") length: " << length << " bytes"; @@ -144,11 +146,17 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const milvus::TimeRecorder recorder("StructuredIndexFormat::Write"); std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX; + // TODO: add extra info + std::unordered_map maps; + WRITE_MAGIC(fs_ptr, full_file_path) + WRITE_HEADER(fs_ptr, full_file_path, maps); + auto binaryset = index->Serialize(knowhere::Config()); - if (!fs_ptr->writer_ptr_->Open(full_file_path)) { + if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open structured index: " + full_file_path); } + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); fs_ptr->writer_ptr_->Write(&data_type, sizeof(data_type)); for (auto& iter : binaryset.binary_map_) { @@ -164,6 +172,7 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const } fs_ptr->writer_ptr_->Close(); + WRITE_SUM(fs_ptr, full_file_path); double span = recorder.RecordSection("End"); double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; diff --git a/core/src/codecs/VectorCompressFormat.cpp b/core/src/codecs/VectorCompressFormat.cpp index 1e2ed28a4b..e6bc86ee56 100644 --- a/core/src/codecs/VectorCompressFormat.cpp +++ b/core/src/codecs/VectorCompressFormat.cpp @@ -17,9 +17,11 @@ #include #include +#include #include "codecs/VectorCompressFormat.h" #include "knowhere/common/BinarySet.h" +#include "storage/ExtraFileInfo.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" @@ -41,11 +43,13 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin milvus::TimeRecorder recorder("VectorCompressFormat::Read"); const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX; + CHECK_MAGIC_VALID(fs_ptr, full_file_path); + CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress file: " + full_file_path); } - int64_t length = fs_ptr->reader_ptr_->Length(); + int64_t length = fs_ptr->reader_ptr_->Length() - MAGIC_SIZE - HEADER_SIZE - SUM_SIZE; if (length <= 0) { THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector compress length: " + full_file_path); } @@ -53,7 +57,7 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin compress->data = std::shared_ptr(new uint8_t[length]); compress->size = length; - fs_ptr->reader_ptr_->Seekg(0); + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); fs_ptr->reader_ptr_->Read(compress->data.get(), length); fs_ptr->reader_ptr_->Close(); @@ -68,12 +72,18 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri milvus::TimeRecorder recorder("VectorCompressFormat::Write"); const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX; - if (!fs_ptr->writer_ptr_->Open(full_file_path)) { + // TODO: add extra info + std::unordered_map maps; + WRITE_MAGIC(fs_ptr, full_file_path) + WRITE_HEADER(fs_ptr, full_file_path, maps); + if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector compress: " + full_file_path); } + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); fs_ptr->writer_ptr_->Write(compress->data.get(), compress->size); fs_ptr->writer_ptr_->Close(); + WRITE_SUM(fs_ptr, full_file_path); double span = recorder.RecordSection("End"); double rate = compress->size * 1000000.0 / span / 1024 / 1024; diff --git a/core/src/codecs/VectorIndexFormat.cpp b/core/src/codecs/VectorIndexFormat.cpp index bef0200f84..05d7fd2472 100644 --- a/core/src/codecs/VectorIndexFormat.cpp +++ b/core/src/codecs/VectorIndexFormat.cpp @@ -17,12 +17,14 @@ #include #include +#include #include "codecs/Codec.h" #include "codecs/VectorIndexFormat.h" #include "knowhere/common/BinarySet.h" #include "knowhere/index/vector_index/VecIndex.h" #include "knowhere/index/vector_index/VecIndexFactory.h" +#include "storage/ExtraFileInfo.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" @@ -42,11 +44,14 @@ void VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, knowhere::BinaryPtr& data) { milvus::TimeRecorder recorder("VectorIndexFormat::ReadRaw"); + CHECK_MAGIC_VALID(fs_ptr, file_path); + CHECK_SUM_VALID(fs_ptr, file_path); if (!fs_ptr->reader_ptr_->Open(file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open raw file: " + file_path); } + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE); size_t num_bytes; fs_ptr->reader_ptr_->Read(&num_bytes, sizeof(size_t)); @@ -55,7 +60,7 @@ VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::strin data->data = std::shared_ptr(new uint8_t[num_bytes]); // Beginning of file is num_bytes - fs_ptr->reader_ptr_->Seekg(sizeof(size_t)); + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE + HEADER_SIZE + sizeof(size_t)); fs_ptr->reader_ptr_->Read(data->data.get(), num_bytes); fs_ptr->reader_ptr_->Close(); @@ -70,17 +75,19 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str milvus::TimeRecorder recorder("VectorIndexFormat::ReadIndex"); std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX; + CHECK_MAGIC_VALID(fs_ptr, full_file_path); + CHECK_SUM_VALID(fs_ptr, full_file_path); if (!fs_ptr->reader_ptr_->Open(full_file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path); } - int64_t length = fs_ptr->reader_ptr_->Length(); + int64_t length = fs_ptr->reader_ptr_->Length() - SUM_SIZE; if (length <= 0) { THROW_ERROR(SERVER_UNEXPECTED_ERROR, "Invalid vector index length: " + full_file_path); } - int64_t rp = 0; - fs_ptr->reader_ptr_->Seekg(0); + int64_t rp = MAGIC_SIZE + HEADER_SIZE; + fs_ptr->reader_ptr_->Seekg(rp); LOG_ENGINE_DEBUG_ << "Start to ReadIndex(" << full_file_path << ") length: " << length << " bytes"; while (rp < length) { @@ -172,12 +179,17 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st milvus::TimeRecorder recorder("SVectorIndexFormat::WriteIndex"); std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX; + // TODO: add extra info + std::unordered_map maps; + WRITE_MAGIC(fs_ptr, full_file_path) + WRITE_HEADER(fs_ptr, full_file_path, maps); auto binaryset = index->Serialize(knowhere::Config()); - if (!fs_ptr->writer_ptr_->Open(full_file_path)) { + if (!fs_ptr->writer_ptr_->InOpen(full_file_path)) { THROW_ERROR(SERVER_CANNOT_OPEN_FILE, "Fail to open vector index: " + full_file_path); } + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE + HEADER_SIZE); for (auto& iter : binaryset.binary_map_) { auto meta = iter.first.c_str(); size_t meta_length = iter.first.length(); @@ -191,6 +203,8 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st } fs_ptr->writer_ptr_->Close(); + WRITE_SUM(fs_ptr, full_file_path); + double span = recorder.RecordSection("End"); double rate = fs_ptr->writer_ptr_->Length() * 1000000.0 / span / 1024 / 1024; LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s"; diff --git a/core/src/db/CMakeLists.txt b/core/src/db/CMakeLists.txt index 26c488f94f..7cf79cfd0e 100644 --- a/core/src/db/CMakeLists.txt +++ b/core/src/db/CMakeLists.txt @@ -136,8 +136,8 @@ target_link_libraries( milvus_engine PUBLIC knowhere segment cache - storage codecs + storage tracing ${THIRD_PARTY_LIBS} ${ENGINE_LIBS} diff --git a/core/src/storage/CMakeLists.txt b/core/src/storage/CMakeLists.txt index debbd3e8ba..c360327ba3 100644 --- a/core/src/storage/CMakeLists.txt +++ b/core/src/storage/CMakeLists.txt @@ -10,14 +10,58 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. #------------------------------------------------------------------------------- -aux_source_directory( ${MILVUS_ENGINE_SRC}/storage STORAGE_MAIN_FILES ) -aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/disk STORAGE_DISK_FILES ) +set( STORAGE_SRCS + ExtraFileInfo.cpp + ExtraFileInfo.h + FSHandler.h + IOReader.h + IOWriter.h + Operation.h + disk/DiskIOReader.h + disk/DiskIOWriter.h + disk/DiskOperation.h + disk/DiskIOReader.cpp + disk/DiskIOWriter.cpp + disk/DiskOperation.cpp +# s3/S3ClientMock.h +# s3/S3ClientWrapper.h +# s3/S3ClientWrapper.cpp +# s3/S3IOReader.h +# s3/S3IOReader.cpp +# s3/S3IOWriter.h +# s3/S3IOWriter.cpp + ) +#aux_source_directory( ${MILVUS_ENGINE_SRC}/storage STORAGE_MAIN_FILES ) +#aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/disk STORAGE_DISK_FILES ) # aux_source_directory( ${MILVUS_ENGINE_SRC}/storage/s3 STORAGE_S3_FILES ) -set( STORAGE_FILES ${STORAGE_MAIN_FILES} - ${STORAGE_DISK_FILES} +#set( STORAGE_FILES ${STORAGE_MAIN_FILES} +# ${STORAGE_DISK_FILES} # ${STORAGE_S3_FILES} - ) +# ) add_library( storage STATIC ) -target_sources( storage PRIVATE ${STORAGE_FILES} ) -target_link_libraries( storage PRIVATE log) +target_sources( storage PRIVATE ${STORAGE_SRCS} ) add_dependencies( storage fiu ) + +target_link_libraries( storage + log + crc32c + fiu + libboost_filesystem.a + libstdc++fs.a + ) + + +if ( BUILD_UNIT_TEST ) + add_executable( ExtraFileInfoTest ) + target_sources( ExtraFileInfoTest PRIVATE ExtraFileInfoTest.cpp) + target_link_libraries( ExtraFileInfoTest + storage + gtest + gtest_main + gmock + gmock_main ) + + add_test ( NAME ExtraFileInfoTest + COMMAND $ + ) +endif() diff --git a/core/src/storage/ExtraFileInfo.cpp b/core/src/storage/ExtraFileInfo.cpp new file mode 100644 index 0000000000..7abe6ef997 --- /dev/null +++ b/core/src/storage/ExtraFileInfo.cpp @@ -0,0 +1,177 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include +#include +#include +#include + +#include "crc32c/crc32c.h" +#include "storage/ExtraFileInfo.h" + +namespace milvus { +namespace storage { + +bool +CheckMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) { + if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + LOG_ENGINE_ERROR_ << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + char* ch = static_cast(malloc(MAGIC_SIZE)); + fs_ptr->reader_ptr_->Read(ch, MAGIC_SIZE); + bool result = !strcmp(ch, MAGIC); + + fs_ptr->reader_ptr_->Close(); + return result; +} + +void +WriteMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) { + if (!fs_ptr->writer_ptr_->Open(file_path.c_str())) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + LOG_ENGINE_ERROR_ << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + fs_ptr->writer_ptr_->Write((void*)MAGIC, MAGIC_SIZE); + fs_ptr->writer_ptr_->Close(); +} + +std::unordered_map +ReadHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) { + if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + LOG_ENGINE_ERROR_ << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + fs_ptr->reader_ptr_->Seekg(MAGIC_SIZE); + char* ch = static_cast(malloc(HEADER_SIZE)); + fs_ptr->reader_ptr_->Read(ch, HEADER_SIZE); + + std::string data(ch); + + auto result = std::unordered_map(); + + std::regex semicolon(";"); + std::vector maps(std::sregex_token_iterator(data.begin(), data.end(), semicolon, -1), + std::sregex_token_iterator()); + std::regex equal("="); + for (auto& item : maps) { + std::vector pair(std::sregex_token_iterator(item.begin(), item.end(), equal, -1), + std::sregex_token_iterator()); + result.insert(std::make_pair(pair[0], pair[1])); + } + fs_ptr->reader_ptr_->Close(); + + return result; +} + +std::string +ReadHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key) { + auto kv = ReadHeaderValues(fs_ptr, file_path); + return kv.at(key); +} + +std::uint8_t +CalculateSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, bool written) { + if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + LOG_ENGINE_ERROR_ << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + int size = fs_ptr->reader_ptr_->Length(); + if (written) { + size -= SUM_SIZE; + } + char* ch = static_cast(malloc(size)); + fs_ptr->reader_ptr_->Read(ch, size); + std::uint8_t result = crc32c::Crc32c(ch, size); + fs_ptr->reader_ptr_->Close(); + + return result; +} + +void +WriteSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int result, bool written) { + if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + LOG_ENGINE_ERROR_ << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + + if (written) { + fs_ptr->writer_ptr_->Seekp(-SUM_SIZE, std::ios_base::end); + } else { + fs_ptr->writer_ptr_->Seekp(0, std::ios_base::end); + } + + std::string sum = std::to_string(result); + sum.resize(SUM_SIZE, '\0'); + fs_ptr->writer_ptr_->Write(sum.data(), SUM_SIZE); + fs_ptr->writer_ptr_->Close(); +} + +bool +CheckSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path) { + int result = CalculateSum(fs_ptr, file_path, true); + if (!fs_ptr->reader_ptr_->Open(file_path.c_str())) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + LOG_ENGINE_ERROR_ << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + fs_ptr->reader_ptr_->Seekg(-SUM_SIZE, std::ios_base::end); + char* record = static_cast(malloc(SUM_SIZE)); + fs_ptr->reader_ptr_->Read(record, SUM_SIZE); + + fs_ptr->reader_ptr_->Close(); + + auto sum = (uint8_t)atoi(record); + return sum == result; +} + +bool +WriteHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key, + const std::string& value) { + auto record = ReadHeaderValues(fs_ptr, file_path); + record.insert(std::make_pair(key, value)); + WriteHeaderValues(fs_ptr, file_path, record); + return true; +} + +bool +WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, + const std::unordered_map& maps) { + if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + LOG_ENGINE_ERROR_ << err_msg; + throw Exception(SERVER_WRITE_ERROR, err_msg); + } + fs_ptr->writer_ptr_->Seekp(MAGIC_SIZE); + + std::string kv; + for (auto& map : maps) { + kv.append(map.first + "=" + map.second + ";"); + } + if (kv.size() > HEADER_SIZE) { + throw "Exceeded the limit of header data size"; + } + + fs_ptr->writer_ptr_->Write(kv.data(), HEADER_SIZE); + fs_ptr->writer_ptr_->Close(); + + return true; +} +} // namespace storage + +} // namespace milvus diff --git a/core/src/storage/ExtraFileInfo.h b/core/src/storage/ExtraFileInfo.h new file mode 100644 index 0000000000..425d4e0cb7 --- /dev/null +++ b/core/src/storage/ExtraFileInfo.h @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef EXTRA_FILE_INFO_H +#define EXTRA_FILE_INFO_H + +#include +#include +#include +#include + +#include +#include +#include + +#include "storage/FSHandler.h" + +#define MAGIC "Milvus" +#define MAGIC_SIZE 6 +#define SINGLE_KV_DATA_SIZE 64 +#define HEADER_SIZE 4096 +#define SUM_SIZE 16 + +namespace milvus { +namespace storage { + +#define CHECK_MAGIC_VALID(PTR, FILE_PATH) \ + if (!CheckMagic(PTR, FILE_PATH)) { \ + throw Exception(SERVER_FILE_MAGIC_BYTES_ERROR, "wrong magic bytes"); \ + } + +#define CHECK_SUM_VALID(PTR, FILE_PATH) \ + if (!CheckSum(PTR, FILE_PATH)) { \ + throw Exception(SERVER_FILE_SUM_BYTES_ERROR, "wrong sum bytes,file may be changed"); \ + } + +#define WRITE_MAGIC(PTR, FILE_PATH) \ + try { \ + WriteMagic(PTR, FILE_PATH); \ + } catch (...) { \ + throw "write magic failed"; \ + } +#define WRITE_HEADER(PTR, FILE_PATH, KV) \ + try { \ + WriteHeaderValues(PTR, FILE_PATH, KV); \ + } catch (...) { \ + throw "write sum failed"; \ + } + +#define WRITE_SUM(PTR, FILE_PATH) \ + try { \ + int result = CalculateSum(PTR, FILE_PATH); \ + WriteSum(PTR, FILE_PATH, result); \ + } catch (...) { \ + throw "write sum failed"; \ + } + +void +WriteMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path); + +bool +CheckMagic(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path); + +bool +CheckSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path); + +void +WriteSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, int result, bool written = false); + +std::uint8_t +CalculateSum(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, bool written = false); + +std::string +ReadHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key); + +std::unordered_map +ReadHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path); + +bool +WriteHeaderValue(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, const std::string& key, + const std::string& value); + +bool +WriteHeaderValues(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, + const std::unordered_map& maps); + +} // namespace storage +} // namespace milvus +#endif // end of EXTRA_FILE_INFO_H diff --git a/core/src/storage/ExtraFileInfoTest.cpp b/core/src/storage/ExtraFileInfoTest.cpp new file mode 100644 index 0000000000..5fa0119a43 --- /dev/null +++ b/core/src/storage/ExtraFileInfoTest.cpp @@ -0,0 +1,74 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include +#include + +#include "easyloggingpp/easylogging++.h" +#include "gtest/gtest.h" + +#include "ExtraFileInfo.h" +#include "crc32c/crc32c.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" +#include "storage/disk/DiskOperation.h" + +INITIALIZE_EASYLOGGINGPP + +namespace milvus { +namespace storage { + +/* ExtraFileInfoTest */ +class ExtraFileInfoTest : public testing::Test { + protected: +}; + +TEST_F(ExtraFileInfoTest, WriteFileTest) { + std::string raw = "helloworldhelloworld"; + + std::string directory = "/tmp"; + storage::IOReaderPtr reader_ptr = std::make_shared(); + storage::IOWriterPtr writer_ptr = std::make_shared(); + storage::OperationPtr operation_ptr = std::make_shared(directory); + const storage::FSHandlerPtr fs_ptr = std::make_shared(reader_ptr, writer_ptr, operation_ptr); + std::string file_path = "/tmp/test.txt"; + + auto record = std::unordered_map(); + record.insert(std::make_pair("test", "test")); + WriteMagic(fs_ptr, file_path); + WriteHeaderValues(fs_ptr, file_path, record); + + if (!fs_ptr->writer_ptr_->InOpen(file_path.c_str())) { + std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); + } + fs_ptr->writer_ptr_->Seekp(0, std::ios_base::end); + + size_t num_bytes = raw.size(); + fs_ptr->writer_ptr_->Write(&num_bytes, sizeof(size_t)); + fs_ptr->writer_ptr_->Write((void*)(raw.data()), num_bytes); + fs_ptr->writer_ptr_->Close(); + + int result_sum = CalculateSum(fs_ptr, file_path); + WriteSum(fs_ptr, file_path, result_sum); + + ASSERT_TRUE(CheckSum(fs_ptr, file_path)); + ASSERT_EQ(ReadHeaderValue(fs_ptr, file_path, "test"), "test"); + + ASSERT_TRUE(WriteHeaderValue(fs_ptr, file_path, "github", "github")); + ASSERT_EQ(ReadHeaderValue(fs_ptr, file_path, "github"), "github"); + result_sum = CalculateSum(fs_ptr, file_path); + WriteSum(fs_ptr, file_path, result_sum, true); + ASSERT_TRUE(CheckMagic(fs_ptr, file_path)); + ASSERT_TRUE(CheckSum(fs_ptr, file_path)); +} +} // namespace storage + +} // namespace milvus diff --git a/core/src/storage/FSHandler.h b/core/src/storage/FSHandler.h index 8b0f175bf9..9a93c6fea8 100644 --- a/core/src/storage/FSHandler.h +++ b/core/src/storage/FSHandler.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include "storage/IOReader.h" diff --git a/core/src/storage/IOReader.h b/core/src/storage/IOReader.h index 81a4da52e7..1e32fe0ede 100644 --- a/core/src/storage/IOReader.h +++ b/core/src/storage/IOReader.h @@ -28,6 +28,9 @@ class IOReader { virtual void Seekg(int64_t pos) = 0; + virtual void + Seekg(int64_t pos, std::ios_base::seekdir seekdir) = 0; + virtual int64_t Length() = 0; diff --git a/core/src/storage/IOWriter.h b/core/src/storage/IOWriter.h index 102bd98a01..d6f09c8d4f 100644 --- a/core/src/storage/IOWriter.h +++ b/core/src/storage/IOWriter.h @@ -22,9 +22,18 @@ class IOWriter { virtual bool Open(const std::string& name) = 0; + virtual bool + InOpen(const std::string& name) = 0; + virtual void Write(void* ptr, int64_t size) = 0; + virtual void + Seekp(int64_t pos) = 0; + + virtual void + Seekp(int64_t pos, std::ios_base::seekdir seekdir) = 0; + virtual int64_t Length() = 0; diff --git a/core/src/storage/disk/DiskIOReader.cpp b/core/src/storage/disk/DiskIOReader.cpp index 938378c143..2c121eae76 100644 --- a/core/src/storage/disk/DiskIOReader.cpp +++ b/core/src/storage/disk/DiskIOReader.cpp @@ -30,6 +30,10 @@ void DiskIOReader::Seekg(int64_t pos) { fs_.seekg(pos); } +void +DiskIOReader::Seekg(int64_t pos, std::ios_base::seekdir seekdir) { + fs_.seekg(pos, seekdir); +} int64_t DiskIOReader::Length() { diff --git a/core/src/storage/disk/DiskIOReader.h b/core/src/storage/disk/DiskIOReader.h index b8fe0105cc..e9d3f55cc8 100644 --- a/core/src/storage/disk/DiskIOReader.h +++ b/core/src/storage/disk/DiskIOReader.h @@ -42,6 +42,9 @@ class DiskIOReader : public IOReader { void Seekg(int64_t pos) override; + void + Seekg(int64_t pos, std::ios_base::seekdir seekdir) override; + int64_t Length() override; diff --git a/core/src/storage/disk/DiskIOWriter.cpp b/core/src/storage/disk/DiskIOWriter.cpp index 2031a20981..3e7970b6c3 100644 --- a/core/src/storage/disk/DiskIOWriter.cpp +++ b/core/src/storage/disk/DiskIOWriter.cpp @@ -21,6 +21,13 @@ DiskIOWriter::Open(const std::string& name) { fs_ = std::fstream(name_, std::ios::out | std::ios::binary); return fs_.good(); } +bool +DiskIOWriter::InOpen(const std::string& name) { + name_ = name; + len_ = 0; + fs_ = std::fstream(name_, std::ios::out | std::ios::binary | std::ios::in); + return fs_.good(); +} void DiskIOWriter::Write(void* ptr, int64_t size) { @@ -37,6 +44,14 @@ void DiskIOWriter::Close() { fs_.close(); } +void +DiskIOWriter::Seekp(int64_t pos) { + fs_.seekp(pos); +} +void +DiskIOWriter::Seekp(int64_t pos, std::ios_base::seekdir seekdir) { + fs_.seekp(pos, seekdir); +} } // namespace storage } // namespace milvus diff --git a/core/src/storage/disk/DiskIOWriter.h b/core/src/storage/disk/DiskIOWriter.h index ed19b9e267..a82d802223 100644 --- a/core/src/storage/disk/DiskIOWriter.h +++ b/core/src/storage/disk/DiskIOWriter.h @@ -36,9 +36,18 @@ class DiskIOWriter : public IOWriter { bool Open(const std::string& name) override; + bool + InOpen(const std::string& name) override; + void Write(void* ptr, int64_t size) override; + void + Seekp(int64_t pos) override; + + void + Seekp(int64_t pos, std::ios_base::seekdir seekdir) override; + int64_t Length() override; diff --git a/core/src/utils/Error.h b/core/src/utils/Error.h index 8d07d977c4..a82b4570db 100644 --- a/core/src/utils/Error.h +++ b/core/src/utils/Error.h @@ -72,7 +72,9 @@ constexpr ErrorCode SERVER_CANNOT_DELETE_FOLDER = ToServerErrorCode(10); constexpr ErrorCode SERVER_CANNOT_DELETE_FILE = ToServerErrorCode(11); constexpr ErrorCode SERVER_BUILD_INDEX_ERROR = ToServerErrorCode(12); constexpr ErrorCode SERVER_CANNOT_OPEN_FILE = ToServerErrorCode(13); -constexpr ErrorCode SERVER_CANNOT_READ_FILE = ToServerErrorCode(14); +constexpr ErrorCode SERVER_FILE_MAGIC_BYTES_ERROR = ToServerErrorCode(14); +constexpr ErrorCode SERVER_FILE_SUM_BYTES_ERROR = ToServerErrorCode(15); +constexpr ErrorCode SERVER_CANNOT_READ_FILE = ToServerErrorCode(16); constexpr ErrorCode SERVER_COLLECTION_NOT_EXIST = ToServerErrorCode(100); constexpr ErrorCode SERVER_INVALID_COLLECTION_NAME = ToServerErrorCode(101); diff --git a/core/thirdparty/CMakeLists.txt b/core/thirdparty/CMakeLists.txt index b7acb6e341..58e26ebcf0 100644 --- a/core/thirdparty/CMakeLists.txt +++ b/core/thirdparty/CMakeLists.txt @@ -47,6 +47,9 @@ if ( MILVUS_WITH_YAMLCPP ) add_subdirectory( yaml-cpp ) endif() +# ****************************** Thirdparty crc32c *************************************** +add_subdirectory( crc32c ) + # ****************************** Thirdparty opentracing *************************************** if ( MILVUS_WITH_OPENTRACING ) add_subdirectory( opentracing ) diff --git a/core/thirdparty/crc32c/CMakeLists.txt b/core/thirdparty/crc32c/CMakeLists.txt new file mode 100644 index 0000000000..912a707608 --- /dev/null +++ b/core/thirdparty/crc32c/CMakeLists.txt @@ -0,0 +1,39 @@ +#------------------------------------------------------------------------------- +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License. +#------------------------------------------------------------------------------- + +if ( DEFINED ENV{MILVUS_CRC32C_URL} ) + set( CRC32C_SOURCE_URL "$ENV{MILVUS_CRC32C_URL}" ) +endif() +set( CRC32C_SOURCE_URL + "https://github.com/google/crc32c/archive/${CRC32C_VERSION}.zip") +message( STATUS "Building crc32c-master from source" ) +FetchContent_Declare( + crc32c + URL ${CRC32C_SOURCE_URL} + URL_MD5 "d24bd57ca2d9fb051aa589818c0e2c10" + SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/crc32c-src + BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/crc32c-build +) + +FetchContent_GetProperties( crc32c ) +if ( NOT crc32c_POPULATED ) + + FetchContent_Populate( crc32c ) + set (CRC32C_BUILD_TESTS CACHE BOOL OFF FORCE) + set (CRC32C_BUILD_BENCHMARKS CACHE BOOL OFF FORCE) + set (CRC32C_USE_GLOG CACHE BOOL OFF FORCE) + add_subdirectory( ${crc32c_SOURCE_DIR} + ${crc32c_BINARY_DIR} + EXCLUDE_FROM_ALL ) + +endif() \ No newline at end of file diff --git a/core/thirdparty/versions.txt b/core/thirdparty/versions.txt index b7a3a56469..0558a7cdb4 100644 --- a/core/thirdparty/versions.txt +++ b/core/thirdparty/versions.txt @@ -12,5 +12,6 @@ OPENTRACING_VERSION=v1.5.1 FIU_VERSION=1.00 OATPP_VERSION=1.1.0 AWS_VERSION=1.7.250 +CRC32C_VERSION=1.1.1 # vim: set filetype=sh: