mirror of https://github.com/milvus-io/milvus.git
Caiyd 1881 bad alloc (#1892)
* #1881 update storage APIs, use int64_t for size Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update changelog Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/1884/head
parent
cc3b75b6b7
commit
66e38d20cd
|
@ -7,6 +7,7 @@ Please mark all change in change log and use the issue from GitHub
|
|||
## Bug
|
||||
- \#1762 Server is not forbidden to create new partition which tag is `_default`
|
||||
- \#1873 Fix index file serialize to incorrect path
|
||||
- \#1881 Fix Annoy index search fail
|
||||
|
||||
## Feature
|
||||
- \#261 Integrate ANNOY into Milvus
|
||||
|
|
|
@ -36,9 +36,12 @@ DefaultVectorIndexFormat::read_internal(const storage::FSHandlerPtr& fs_ptr, con
|
|||
knowhere::BinarySet load_data_list;
|
||||
|
||||
recorder.RecordSection("Start");
|
||||
fs_ptr->reader_ptr_->open(path);
|
||||
if (!fs_ptr->reader_ptr_->open(path)) {
|
||||
ENGINE_LOG_ERROR << "Fail to open vector index: " << path;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
size_t length = fs_ptr->reader_ptr_->length();
|
||||
int64_t length = fs_ptr->reader_ptr_->length();
|
||||
if (length <= 0) {
|
||||
ENGINE_LOG_ERROR << "Invalid vector index length: " << path;
|
||||
return nullptr;
|
||||
|
@ -128,7 +131,10 @@ DefaultVectorIndexFormat::write(const storage::FSHandlerPtr& fs_ptr, const std::
|
|||
int32_t index_type = knowhere::StrToOldIndexType(index->index_type());
|
||||
|
||||
recorder.RecordSection("Start");
|
||||
fs_ptr->writer_ptr_->open(location);
|
||||
if (!fs_ptr->writer_ptr_->open(location)) {
|
||||
ENGINE_LOG_ERROR << "Fail to open vector index: " << location;
|
||||
return;
|
||||
}
|
||||
|
||||
fs_ptr->writer_ptr_->write(&index_type, sizeof(index_type));
|
||||
|
||||
|
|
|
@ -19,16 +19,16 @@ namespace storage {
|
|||
|
||||
class IOReader {
|
||||
public:
|
||||
virtual void
|
||||
virtual bool
|
||||
open(const std::string& name) = 0;
|
||||
|
||||
virtual void
|
||||
read(void* ptr, size_t size) = 0;
|
||||
read(void* ptr, int64_t size) = 0;
|
||||
|
||||
virtual void
|
||||
seekg(size_t pos) = 0;
|
||||
seekg(int64_t pos) = 0;
|
||||
|
||||
virtual size_t
|
||||
virtual int64_t
|
||||
length() = 0;
|
||||
|
||||
virtual void
|
||||
|
|
|
@ -19,13 +19,13 @@ namespace storage {
|
|||
|
||||
class IOWriter {
|
||||
public:
|
||||
virtual void
|
||||
virtual bool
|
||||
open(const std::string& name) = 0;
|
||||
|
||||
virtual void
|
||||
write(void* ptr, size_t size) = 0;
|
||||
write(void* ptr, int64_t size) = 0;
|
||||
|
||||
virtual size_t
|
||||
virtual int64_t
|
||||
length() = 0;
|
||||
|
||||
virtual void
|
||||
|
|
|
@ -14,26 +14,27 @@
|
|||
namespace milvus {
|
||||
namespace storage {
|
||||
|
||||
void
|
||||
bool
|
||||
DiskIOReader::open(const std::string& name) {
|
||||
name_ = name;
|
||||
fs_ = std::fstream(name_, std::ios::in | std::ios::binary);
|
||||
return fs_.good();
|
||||
}
|
||||
|
||||
void
|
||||
DiskIOReader::read(void* ptr, size_t size) {
|
||||
DiskIOReader::read(void* ptr, int64_t size) {
|
||||
fs_.read(reinterpret_cast<char*>(ptr), size);
|
||||
}
|
||||
|
||||
void
|
||||
DiskIOReader::seekg(size_t pos) {
|
||||
DiskIOReader::seekg(int64_t pos) {
|
||||
fs_.seekg(pos);
|
||||
}
|
||||
|
||||
size_t
|
||||
int64_t
|
||||
DiskIOReader::length() {
|
||||
fs_.seekg(0, fs_.end);
|
||||
size_t len = fs_.tellg();
|
||||
int64_t len = fs_.tellg();
|
||||
fs_.seekg(0, fs_.beg);
|
||||
return len;
|
||||
}
|
||||
|
|
|
@ -33,16 +33,16 @@ class DiskIOReader : public IOReader {
|
|||
DiskIOReader&
|
||||
operator=(DiskIOReader&&) = delete;
|
||||
|
||||
void
|
||||
bool
|
||||
open(const std::string& name) override;
|
||||
|
||||
void
|
||||
read(void* ptr, size_t size) override;
|
||||
read(void* ptr, int64_t size) override;
|
||||
|
||||
void
|
||||
seekg(size_t pos) override;
|
||||
seekg(int64_t pos) override;
|
||||
|
||||
size_t
|
||||
int64_t
|
||||
length() override;
|
||||
|
||||
void
|
||||
|
|
|
@ -14,20 +14,21 @@
|
|||
namespace milvus {
|
||||
namespace storage {
|
||||
|
||||
void
|
||||
bool
|
||||
DiskIOWriter::open(const std::string& name) {
|
||||
name_ = name;
|
||||
len_ = 0;
|
||||
fs_ = std::fstream(name_, std::ios::out | std::ios::binary);
|
||||
return fs_.good();
|
||||
}
|
||||
|
||||
void
|
||||
DiskIOWriter::write(void* ptr, size_t size) {
|
||||
DiskIOWriter::write(void* ptr, int64_t size) {
|
||||
fs_.write(reinterpret_cast<char*>(ptr), size);
|
||||
len_ += size;
|
||||
}
|
||||
|
||||
size_t
|
||||
int64_t
|
||||
DiskIOWriter::length() {
|
||||
return len_;
|
||||
}
|
||||
|
|
|
@ -33,13 +33,13 @@ class DiskIOWriter : public IOWriter {
|
|||
DiskIOWriter&
|
||||
operator=(DiskIOWriter&&) = delete;
|
||||
|
||||
void
|
||||
bool
|
||||
open(const std::string& name) override;
|
||||
|
||||
void
|
||||
write(void* ptr, size_t size) override;
|
||||
write(void* ptr, int64_t size) override;
|
||||
|
||||
size_t
|
||||
int64_t
|
||||
length() override;
|
||||
|
||||
void
|
||||
|
@ -47,7 +47,7 @@ class DiskIOWriter : public IOWriter {
|
|||
|
||||
public:
|
||||
std::string name_;
|
||||
size_t len_;
|
||||
int64_t len_;
|
||||
std::fstream fs_;
|
||||
};
|
||||
|
||||
|
|
|
@ -15,24 +15,24 @@
|
|||
namespace milvus {
|
||||
namespace storage {
|
||||
|
||||
void
|
||||
bool
|
||||
S3IOReader::open(const std::string& name) {
|
||||
name_ = name;
|
||||
pos_ = 0;
|
||||
S3ClientWrapper::GetInstance().GetObjectStr(name_, buffer_);
|
||||
return (S3ClientWrapper::GetInstance().GetObjectStr(name_, buffer_).ok());
|
||||
}
|
||||
|
||||
void
|
||||
S3IOReader::read(void* ptr, size_t size) {
|
||||
S3IOReader::read(void* ptr, int64_t size) {
|
||||
memcpy(ptr, buffer_.data() + pos_, size);
|
||||
}
|
||||
|
||||
void
|
||||
S3IOReader::seekg(size_t pos) {
|
||||
S3IOReader::seekg(int64_t pos) {
|
||||
pos_ = pos;
|
||||
}
|
||||
|
||||
size_t
|
||||
int64_t
|
||||
S3IOReader::length() {
|
||||
return buffer_.length();
|
||||
}
|
||||
|
|
|
@ -32,16 +32,16 @@ class S3IOReader : public IOReader {
|
|||
S3IOReader&
|
||||
operator=(S3IOReader&&) = delete;
|
||||
|
||||
void
|
||||
bool
|
||||
open(const std::string& name) override;
|
||||
|
||||
void
|
||||
read(void* ptr, size_t size) override;
|
||||
read(void* ptr, int64_t size) override;
|
||||
|
||||
void
|
||||
seekg(size_t pos) override;
|
||||
seekg(int64_t pos) override;
|
||||
|
||||
size_t
|
||||
int64_t
|
||||
length() override;
|
||||
|
||||
void
|
||||
|
@ -50,7 +50,7 @@ class S3IOReader : public IOReader {
|
|||
public:
|
||||
std::string name_;
|
||||
std::string buffer_;
|
||||
size_t pos_;
|
||||
int64_t pos_;
|
||||
};
|
||||
|
||||
using S3IOReaderPtr = std::shared_ptr<S3IOReader>;
|
||||
|
|
|
@ -15,20 +15,21 @@
|
|||
namespace milvus {
|
||||
namespace storage {
|
||||
|
||||
void
|
||||
bool
|
||||
S3IOWriter::open(const std::string& name) {
|
||||
name_ = name;
|
||||
len_ = 0;
|
||||
buffer_ = "";
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
S3IOWriter::write(void* ptr, size_t size) {
|
||||
S3IOWriter::write(void* ptr, int64_t size) {
|
||||
buffer_ += std::string(reinterpret_cast<char*>(ptr), size);
|
||||
len_ += size;
|
||||
}
|
||||
|
||||
size_t
|
||||
int64_t
|
||||
S3IOWriter::length() {
|
||||
return len_;
|
||||
}
|
||||
|
|
|
@ -32,13 +32,13 @@ class S3IOWriter : public IOWriter {
|
|||
S3IOWriter&
|
||||
operator=(S3IOWriter&&) = delete;
|
||||
|
||||
void
|
||||
bool
|
||||
open(const std::string& name) override;
|
||||
|
||||
void
|
||||
write(void* ptr, size_t size) override;
|
||||
write(void* ptr, int64_t size) override;
|
||||
|
||||
size_t
|
||||
int64_t
|
||||
length() override;
|
||||
|
||||
void
|
||||
|
@ -46,7 +46,7 @@ class S3IOWriter : public IOWriter {
|
|||
|
||||
public:
|
||||
std::string name_;
|
||||
size_t len_;
|
||||
int64_t len_;
|
||||
std::string buffer_;
|
||||
};
|
||||
|
||||
|
|
|
@ -208,4 +208,4 @@ add_subdirectory(metrics)
|
|||
add_subdirectory(scheduler)
|
||||
add_subdirectory(server)
|
||||
add_subdirectory(thirdparty)
|
||||
#add_subdirectory(storage)
|
||||
add_subdirectory(storage)
|
||||
|
|
|
@ -12,7 +12,8 @@
|
|||
#-------------------------------------------------------------------------------
|
||||
|
||||
set(test_files
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_s3_client.cpp
|
||||
# ${CMAKE_CURRENT_SOURCE_DIR}/test_s3_client.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/test_disk.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "easyloggingpp/easylogging++.h"
|
||||
#include "storage/disk/DiskIOReader.h"
|
||||
#include "storage/disk/DiskIOWriter.h"
|
||||
#include "storage/utils.h"
|
||||
|
||||
INITIALIZE_EASYLOGGINGPP
|
||||
|
||||
TEST_F(StorageTest, DISK_RW_TEST) {
|
||||
const std::string index_name = "/tmp/test_index";
|
||||
const std::string content = "abcdefg";
|
||||
|
||||
{
|
||||
milvus::storage::DiskIOWriter writer;
|
||||
ASSERT_TRUE(writer.open(index_name));
|
||||
size_t len = content.length();
|
||||
writer.write(&len, sizeof(len));
|
||||
writer.write((void*)(content.data()), len);
|
||||
ASSERT_TRUE(len + sizeof(len) == writer.length());
|
||||
writer.close();
|
||||
}
|
||||
|
||||
{
|
||||
milvus::storage::DiskIOReader reader;
|
||||
ASSERT_FALSE(reader.open("/tmp/notexist"));
|
||||
ASSERT_TRUE(reader.open(index_name));
|
||||
int64_t length = reader.length();
|
||||
int64_t rp = 0;
|
||||
reader.seekg(rp);
|
||||
std::string content_out;
|
||||
while (rp < length) {
|
||||
size_t len;
|
||||
reader.read(&len, sizeof(len));
|
||||
rp += sizeof(len);
|
||||
reader.seekg(rp);
|
||||
|
||||
auto data = new char[len];
|
||||
reader.read(data, len);
|
||||
rp += len;
|
||||
reader.seekg(rp);
|
||||
|
||||
content_out += std::string(data, len);
|
||||
|
||||
delete[] data;
|
||||
}
|
||||
|
||||
ASSERT_TRUE(content == content_out);
|
||||
reader.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue