diff --git a/internal/core/src/index/Exception.h b/internal/core/src/index/Exception.h new file mode 100644 index 0000000000..dde9a9e24b --- /dev/null +++ b/internal/core/src/index/Exception.h @@ -0,0 +1,34 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <iostream> +#include <stdexcept> +#include <string> + +namespace milvus::index { + +class UnistdException : public std::runtime_error { + public: + explicit UnistdException(const std::string& msg) : std::runtime_error(msg) { + } + + virtual ~UnistdException() { + } +}; + +} // namespace milvus::index diff --git a/internal/core/src/index/StringIndexMarisa.cpp b/internal/core/src/index/StringIndexMarisa.cpp index 660594aaae..69328de68f 100644 --- a/internal/core/src/index/StringIndexMarisa.cpp +++ b/internal/core/src/index/StringIndexMarisa.cpp @@ -24,6 +24,7 @@ #include "index/StringIndexMarisa.h" #include "index/Utils.h" #include "index/Index.h" +#include "index/Exception.h" #include "common/Utils.h" #include "common/Slice.h" @@ -31,15 +32,6 @@ namespace milvus::index { #if defined(__linux__) || defined(__APPLE__) -class UnistdException : public std::runtime_error { - public: - explicit UnistdException(const std::string& msg) : std::runtime_error(msg) { - } - - virtual ~UnistdException() { - } -}; - StringIndexMarisa::StringIndexMarisa(storage::FileManagerImplPtr file_manager) { if (file_manager != nullptr) { file_manager_ = std::dynamic_pointer_cast<storage::MemFileManagerImpl>( @@ -131,20 +123,15 @@ StringIndexMarisa::Serialize(const Config& config) { auto fd = open( file.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IXUSR); + AssertInfo(fd != -1, "open file failed"); trie_.write(fd); auto size = get_file_size(fd); auto index_data = std::shared_ptr<uint8_t[]>(new uint8_t[size]); - - lseek(fd, 0, SEEK_SET); - auto status = read(fd, index_data.get(), size); + ReadDataFromFD(fd, index_data.get(), size); close(fd); remove(file.c_str()); - if (status != size) { - throw UnistdException("read index from fd error, errorCode is " + - std::to_string(status)); - } auto str_ids_len = str_ids_.size() * sizeof(size_t); std::shared_ptr<uint8_t[]> str_ids(new uint8_t[str_ids_len]); diff --git a/internal/core/src/index/Utils.cpp b/internal/core/src/index/Utils.cpp index 249cb85088..87f5c7fc0e 100644 --- a/internal/core/src/index/Utils.cpp +++ b/internal/core/src/index/Utils.cpp @@ -26,6 +26,7 @@ #include <iostream> #include "index/Utils.h" +#include "index/Exception.h" #include "index/Meta.h" #include <google/protobuf/text_format.h> #include <unistd.h> @@ -278,4 +279,21 @@ AssembleIndexDatas( } } +void +ReadDataFromFD(int fd, void* buf, size_t size, size_t chunk_size) { + lseek(fd, 0, SEEK_SET); + while (size != 0) { + const size_t count = (size < chunk_size) ? size : chunk_size; + const ssize_t size_read = read(fd, buf, count); + if (size_read != count) { + throw UnistdException( + "read data from fd error, returned read size is " + + std::to_string(size_read)); + } + + buf = static_cast<char*>(buf) + size_read; + size -= static_cast<std::size_t>(size_read); + } +} + } // namespace milvus::index diff --git a/internal/core/src/index/Utils.h b/internal/core/src/index/Utils.h index 528c6147e0..02c5747197 100644 --- a/internal/core/src/index/Utils.h +++ b/internal/core/src/index/Utils.h @@ -118,4 +118,8 @@ AssembleIndexDatas( std::map<std::string, storage::FieldDataChannelPtr>& index_datas, std::unordered_map<std::string, storage::FieldDataPtr>& result); +// On Linux, read() (and similar system calls) will transfer at most 0x7ffff000 (2,147,479,552) bytes once +void +ReadDataFromFD(int fd, void* buf, size_t size, size_t chunk_size = 0x7ffff000); + } // namespace milvus::index diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index c67a4f8f10..cf35a89a73 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -11,11 +11,15 @@ #include <gtest/gtest.h> #include <string.h> +#include <boost/uuid/uuid.hpp> +#include <boost/uuid/uuid_io.hpp> +#include <boost/uuid/uuid_generators.hpp> #include "common/Utils.h" #include "query/Utils.h" #include "test_utils/DataGen.h" #include "common/Types.h" +#include "index/Exception.h" TEST(Util, StringMatch) { using namespace milvus; @@ -133,3 +137,30 @@ TEST(Util, upper_bound) { ASSERT_EQ(5, upper_bound(timestamps, 0, data.size(), 4)); ASSERT_EQ(10, upper_bound(timestamps, 0, data.size(), 10)); } + +TEST(Util, read_from_fd) { + auto uuid = boost::uuids::random_generator()(); + auto uuid_string = boost::uuids::to_string(uuid); + auto file = std::string("/tmp/") + uuid_string; + + auto fd = open( + file.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IXUSR); + ASSERT_NE(fd, -1); + size_t data_size = 100 * 1024 * 1024; // 100M + auto index_data = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]); + auto max_loop = size_t(INT_MAX) / data_size + 1; // insert data > 2G + for (int i = 0; i < max_loop; ++i) { + auto size_write = write(fd, index_data.get(), data_size); + ASSERT_GE(size_write, 0); + } + + auto read_buf = + std::shared_ptr<uint8_t[]>(new uint8_t[data_size * max_loop]); + EXPECT_NO_THROW(milvus::index::ReadDataFromFD( + fd, read_buf.get(), data_size * max_loop)); + + // On Linux, read() (and similar system calls) will transfer at most 0x7ffff000 (2,147,479,552) bytes once + EXPECT_THROW(milvus::index::ReadDataFromFD( + fd, read_buf.get(), data_size * max_loop, INT_MAX), + milvus::index::UnistdException); +}