mirror of https://github.com/milvus-io/milvus.git
Caiyd 1627 move rw index (#1833)
* #1627 move read/write index into codec Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * fix clang-format Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update changelog Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update unittest Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * code optimize Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * update WriteVectorIndex Signed-off-by: yudong.cai <yudong.cai@zilliz.com> * fix clang-format Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/1861/head
parent
f82aa4f493
commit
7ccfa5b9f0
|
@ -17,6 +17,7 @@ Please mark all change in change log and use the issue from GitHub
|
|||
- \#1849 NSG support deleted vectors searching
|
||||
|
||||
## Improvement
|
||||
- \#1627 Move read/write index APIs into codec
|
||||
- \#1784 Add Substructure and Superstructure in http module
|
||||
|
||||
## Task
|
||||
|
|
|
@ -22,8 +22,8 @@
|
|||
#include "DeletedDocsFormat.h"
|
||||
#include "IdBloomFilterFormat.h"
|
||||
#include "IdIndexFormat.h"
|
||||
#include "VectorIndexFormat.h"
|
||||
#include "VectorsFormat.h"
|
||||
#include "VectorsIndexFormat.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
|
@ -33,6 +33,9 @@ class Codec {
|
|||
virtual VectorsFormatPtr
|
||||
GetVectorsFormat() = 0;
|
||||
|
||||
virtual VectorIndexFormatPtr
|
||||
GetVectorIndexFormat() = 0;
|
||||
|
||||
virtual DeletedDocsFormatPtr
|
||||
GetDeletedDocsFormat() = 0;
|
||||
|
||||
|
@ -44,9 +47,6 @@ class Codec {
|
|||
virtual AttrsFormat
|
||||
GetAttrsFormat() = 0;
|
||||
|
||||
virtual VectorsIndexFormat
|
||||
GetVectorsIndexFormat() = 0;
|
||||
|
||||
virtual AttrsIndexFormat
|
||||
GetAttrsIndexFormat() = 0;
|
||||
|
||||
|
|
|
@ -17,17 +17,25 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "segment/VectorIndex.h"
|
||||
#include "storage/FSHandler.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
|
||||
class VectorsIndexFormat {
|
||||
// public:
|
||||
// virtual VectorsIndex
|
||||
// read() = 0;
|
||||
//
|
||||
// virtual void
|
||||
// write(VectorsIndex vectors_index) = 0;
|
||||
class VectorIndexFormat {
|
||||
public:
|
||||
virtual void
|
||||
read(const storage::FSHandlerPtr& fs_ptr, segment::VectorIndexPtr& vector_index) = 0;
|
||||
|
||||
virtual void
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorIndexPtr& vector_index) = 0;
|
||||
};
|
||||
|
||||
using VectorIndexFormatPtr = std::shared_ptr<VectorIndexFormat>;
|
||||
|
||||
} // namespace codec
|
||||
} // namespace milvus
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
#include "DefaultDeletedDocsFormat.h"
|
||||
#include "DefaultIdBloomFilterFormat.h"
|
||||
#include "DefaultVectorIndexFormat.h"
|
||||
#include "DefaultVectorsFormat.h"
|
||||
|
||||
namespace milvus {
|
||||
|
@ -28,6 +29,7 @@ namespace codec {
|
|||
|
||||
DefaultCodec::DefaultCodec() {
|
||||
vectors_format_ptr_ = std::make_shared<DefaultVectorsFormat>();
|
||||
vector_index_format_ptr_ = std::make_shared<DefaultVectorIndexFormat>();
|
||||
deleted_docs_format_ptr_ = std::make_shared<DefaultDeletedDocsFormat>();
|
||||
id_bloom_filter_format_ptr_ = std::make_shared<DefaultIdBloomFilterFormat>();
|
||||
}
|
||||
|
@ -37,6 +39,11 @@ DefaultCodec::GetVectorsFormat() {
|
|||
return vectors_format_ptr_;
|
||||
}
|
||||
|
||||
VectorIndexFormatPtr
|
||||
DefaultCodec::GetVectorIndexFormat() {
|
||||
return vector_index_format_ptr_;
|
||||
}
|
||||
|
||||
DeletedDocsFormatPtr
|
||||
DefaultCodec::GetDeletedDocsFormat() {
|
||||
return deleted_docs_format_ptr_;
|
||||
|
|
|
@ -29,6 +29,9 @@ class DefaultCodec : public Codec {
|
|||
VectorsFormatPtr
|
||||
GetVectorsFormat() override;
|
||||
|
||||
VectorIndexFormatPtr
|
||||
GetVectorIndexFormat() override;
|
||||
|
||||
DeletedDocsFormatPtr
|
||||
GetDeletedDocsFormat() override;
|
||||
|
||||
|
@ -37,6 +40,7 @@ class DefaultCodec : public Codec {
|
|||
|
||||
private:
|
||||
VectorsFormatPtr vectors_format_ptr_;
|
||||
VectorIndexFormatPtr vector_index_format_ptr_;
|
||||
DeletedDocsFormatPtr deleted_docs_format_ptr_;
|
||||
IdBloomFilterFormatPtr id_bloom_filter_format_ptr_;
|
||||
};
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <memory>
|
||||
|
||||
#include "codecs/default/DefaultVectorIndexFormat.h"
|
||||
#include "knowhere/common/BinarySet.h"
|
||||
#include "knowhere/index/vector_index/VecIndex.h"
|
||||
#include "knowhere/index/vector_index/VecIndexFactory.h"
|
||||
#include "segment/VectorIndex.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
|
||||
knowhere::VecIndexPtr
|
||||
DefaultVectorIndexFormat::read_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& path) {
|
||||
milvus::TimeRecorder recorder("read_index");
|
||||
knowhere::BinarySet load_data_list;
|
||||
|
||||
recorder.RecordSection("Start");
|
||||
fs_ptr->reader_ptr_->open(path);
|
||||
|
||||
size_t length = fs_ptr->reader_ptr_->length();
|
||||
if (length <= 0) {
|
||||
ENGINE_LOG_ERROR << "Invalid vector index length: " << path;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
size_t rp = 0;
|
||||
fs_ptr->reader_ptr_->seekg(0);
|
||||
|
||||
int32_t current_type = 0;
|
||||
fs_ptr->reader_ptr_->read(¤t_type, sizeof(current_type));
|
||||
rp += sizeof(current_type);
|
||||
fs_ptr->reader_ptr_->seekg(rp);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Start to read_index(" << path << ") length: " << length << " bytes";
|
||||
while (rp < length) {
|
||||
size_t meta_length;
|
||||
fs_ptr->reader_ptr_->read(&meta_length, sizeof(meta_length));
|
||||
rp += sizeof(meta_length);
|
||||
fs_ptr->reader_ptr_->seekg(rp);
|
||||
|
||||
auto meta = new char[meta_length];
|
||||
fs_ptr->reader_ptr_->read(meta, meta_length);
|
||||
rp += meta_length;
|
||||
fs_ptr->reader_ptr_->seekg(rp);
|
||||
|
||||
size_t bin_length;
|
||||
fs_ptr->reader_ptr_->read(&bin_length, sizeof(bin_length));
|
||||
rp += sizeof(bin_length);
|
||||
fs_ptr->reader_ptr_->seekg(rp);
|
||||
|
||||
auto bin = new uint8_t[bin_length];
|
||||
fs_ptr->reader_ptr_->read(bin, bin_length);
|
||||
rp += bin_length;
|
||||
fs_ptr->reader_ptr_->seekg(rp);
|
||||
|
||||
std::shared_ptr<uint8_t[]> binptr(bin);
|
||||
load_data_list.Append(std::string(meta, meta_length), binptr, bin_length);
|
||||
delete[] meta;
|
||||
}
|
||||
fs_ptr->reader_ptr_->close();
|
||||
|
||||
double span = recorder.RecordSection("End");
|
||||
double rate = length * 1000000.0 / span / 1024 / 1024;
|
||||
ENGINE_LOG_DEBUG << "read_index(" << path << ") rate " << rate << "MB/s";
|
||||
|
||||
knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance();
|
||||
auto index =
|
||||
vec_index_factory.CreateVecIndex(knowhere::OldIndexTypeToStr(current_type), knowhere::IndexMode::MODE_CPU);
|
||||
if (index != nullptr) {
|
||||
index->Load(load_data_list);
|
||||
index->SetIndexSize(length);
|
||||
} else {
|
||||
ENGINE_LOG_ERROR << "Fail to create vector index: " << path;
|
||||
}
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
void
|
||||
DefaultVectorIndexFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::VectorIndexPtr& vector_index) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
if (!boost::filesystem::is_directory(dir_path)) {
|
||||
std::string err_msg = "Directory: " + dir_path + "does not exist";
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
|
||||
}
|
||||
|
||||
boost::filesystem::path target_path(dir_path);
|
||||
typedef boost::filesystem::directory_iterator d_it;
|
||||
d_it it_end;
|
||||
d_it it(target_path);
|
||||
|
||||
for (; it != it_end; ++it) {
|
||||
const auto& path = it->path();
|
||||
|
||||
// if (path.extension().string() == vector_index_extension_) {
|
||||
/* tmp solution, should be replaced when use .idx as index extension name */
|
||||
const std::string& location = path.string();
|
||||
if (location.substr(location.length() - 3) == "000") {
|
||||
knowhere::VecIndexPtr index = read_internal(fs_ptr, location);
|
||||
vector_index->SetVectorIndex(index);
|
||||
vector_index->SetName(path.stem().string());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string
|
||||
GenerateFileName() {
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
|
||||
return std::to_string(micros * 1000);
|
||||
}
|
||||
|
||||
void
|
||||
DefaultVectorIndexFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorIndexPtr& vector_index) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
|
||||
const std::string index_file_path = dir_path + "/" + GenerateFileName();
|
||||
// const std::string index_file_path = dir_path + "/" + vector_index->GetName() + vector_index_extension_;
|
||||
|
||||
milvus::TimeRecorder recorder("write_index");
|
||||
|
||||
knowhere::VecIndexPtr index = vector_index->GetVectorIndex();
|
||||
|
||||
auto binaryset = index->Serialize(knowhere::Config());
|
||||
int32_t index_type = knowhere::StrToOldIndexType(index->index_type());
|
||||
|
||||
recorder.RecordSection("Start");
|
||||
fs_ptr->writer_ptr_->open(index_file_path);
|
||||
|
||||
fs_ptr->writer_ptr_->write(&index_type, sizeof(index_type));
|
||||
|
||||
for (auto& iter : binaryset.binary_map_) {
|
||||
auto meta = iter.first.c_str();
|
||||
size_t meta_length = iter.first.length();
|
||||
fs_ptr->writer_ptr_->write(&meta_length, sizeof(meta_length));
|
||||
fs_ptr->writer_ptr_->write((void*)meta, meta_length);
|
||||
|
||||
auto binary = iter.second;
|
||||
int64_t binary_length = binary->size;
|
||||
fs_ptr->writer_ptr_->write(&binary_length, sizeof(binary_length));
|
||||
fs_ptr->writer_ptr_->write((void*)binary->data.get(), binary_length);
|
||||
}
|
||||
fs_ptr->writer_ptr_->close();
|
||||
|
||||
double span = recorder.RecordSection("End");
|
||||
double rate = fs_ptr->writer_ptr_->length() * 1000000.0 / span / 1024 / 1024;
|
||||
ENGINE_LOG_DEBUG << "write_index(" << index_file_path << ") rate " << rate << "MB/s";
|
||||
}
|
||||
|
||||
} // namespace codec
|
||||
} // namespace milvus
|
|
@ -0,0 +1,58 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
|
||||
#include "codecs/VectorIndexFormat.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
|
||||
class DefaultVectorIndexFormat : public VectorIndexFormat {
|
||||
public:
|
||||
DefaultVectorIndexFormat() = default;
|
||||
|
||||
void
|
||||
read(const storage::FSHandlerPtr& fs_ptr, segment::VectorIndexPtr& vector_index) override;
|
||||
|
||||
void
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorIndexPtr& vector_index) override;
|
||||
|
||||
// No copy and move
|
||||
DefaultVectorIndexFormat(const DefaultVectorIndexFormat&) = delete;
|
||||
DefaultVectorIndexFormat(DefaultVectorIndexFormat&&) = delete;
|
||||
|
||||
DefaultVectorIndexFormat&
|
||||
operator=(const DefaultVectorIndexFormat&) = delete;
|
||||
DefaultVectorIndexFormat&
|
||||
operator=(DefaultVectorIndexFormat&&) = delete;
|
||||
|
||||
private:
|
||||
knowhere::VecIndexPtr
|
||||
read_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& path);
|
||||
|
||||
private:
|
||||
std::mutex mutex_;
|
||||
|
||||
const std::string vector_index_extension_ = "";
|
||||
};
|
||||
|
||||
} // namespace codec
|
||||
} // namespace milvus
|
|
@ -22,7 +22,6 @@
|
|||
#include "cache/GpuCacheMgr.h"
|
||||
#include "config/Config.h"
|
||||
#include "db/Utils.h"
|
||||
#include "index/archive/VecIndex.h"
|
||||
#include "knowhere/common/Config.h"
|
||||
#include "knowhere/index/vector_index/ConfAdapter.h"
|
||||
#include "knowhere/index/vector_index/ConfAdapterMgr.h"
|
||||
|
@ -40,6 +39,8 @@
|
|||
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "scheduler/Utils.h"
|
||||
#include "segment/SegmentReader.h"
|
||||
#include "segment/SegmentWriter.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "utils/Error.h"
|
||||
#include "utils/Exception.h"
|
||||
|
@ -353,7 +354,11 @@ ExecutionEngineImpl::Size() const {
|
|||
|
||||
Status
|
||||
ExecutionEngineImpl::Serialize() {
|
||||
auto status = write_index(index_, location_);
|
||||
std::string segment_dir;
|
||||
utils::GetParentPath(location_, segment_dir);
|
||||
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(segment_dir);
|
||||
segment_writer_ptr->SetVectorIndex(index_);
|
||||
segment_writer_ptr->WriteVectorIndex();
|
||||
|
||||
// here we reset index size by file size,
|
||||
// since some index type(such as SQ8) data size become smaller after serialized
|
||||
|
@ -362,10 +367,10 @@ ExecutionEngineImpl::Serialize() {
|
|||
|
||||
if (index_->Size() == 0) {
|
||||
std::string msg = "Failed to serialize file: " + location_ + " reason: out of disk space or memory";
|
||||
status = Status(DB_ERROR, msg);
|
||||
return Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
return status;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
|
@ -436,7 +441,10 @@ ExecutionEngineImpl::Load(bool to_cache) {
|
|||
ENGINE_LOG_DEBUG << "Finished loading raw data from segment " << segment_dir;
|
||||
} else {
|
||||
try {
|
||||
index_ = read_index(location_);
|
||||
segment::SegmentPtr segment_ptr;
|
||||
segment_reader_ptr->GetSegment(segment_ptr);
|
||||
auto status = segment_reader_ptr->LoadVectorIndex(segment_ptr->vector_index_ptr_);
|
||||
index_ = segment_ptr->vector_index_ptr_->GetVectorIndex();
|
||||
|
||||
if (index_ == nullptr) {
|
||||
std::string msg = "Failed to load index from " + location_;
|
||||
|
|
|
@ -1,173 +0,0 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "config/Config.h"
|
||||
#include "index/archive/VecIndex.h"
|
||||
#include "knowhere/common/Exception.h"
|
||||
#include "knowhere/index/vector_index/IndexType.h"
|
||||
#include "knowhere/index/vector_index/VecIndex.h"
|
||||
#include "knowhere/index/vector_index/VecIndexFactory.h"
|
||||
#include "storage/disk/DiskIOReader.h"
|
||||
#include "storage/disk/DiskIOWriter.h"
|
||||
#include "storage/s3/S3IOReader.h"
|
||||
#include "storage/s3/S3IOWriter.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
knowhere::VecIndexPtr
|
||||
LoadVecIndex(const knowhere::IndexType& type, const knowhere::BinarySet& index_binary, int64_t size) {
|
||||
knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance();
|
||||
auto index = vec_index_factory.CreateVecIndex(type, knowhere::IndexMode::MODE_CPU);
|
||||
if (index == nullptr)
|
||||
return nullptr;
|
||||
// else
|
||||
index->Load(index_binary);
|
||||
index->SetIndexSize(size);
|
||||
return index;
|
||||
}
|
||||
|
||||
knowhere::VecIndexPtr
|
||||
read_index(const std::string& location) {
|
||||
milvus::TimeRecorder recorder("read_index");
|
||||
knowhere::BinarySet load_data_list;
|
||||
|
||||
bool s3_enable = false;
|
||||
milvus::server::Config& config = milvus::server::Config::GetInstance();
|
||||
config.GetStorageConfigS3Enable(s3_enable);
|
||||
|
||||
std::shared_ptr<milvus::storage::IOReader> reader_ptr;
|
||||
if (s3_enable) {
|
||||
reader_ptr = std::make_shared<milvus::storage::S3IOReader>();
|
||||
} else {
|
||||
reader_ptr = std::make_shared<milvus::storage::DiskIOReader>();
|
||||
}
|
||||
|
||||
recorder.RecordSection("Start");
|
||||
reader_ptr->open(location);
|
||||
|
||||
size_t length = reader_ptr->length();
|
||||
if (length <= 0) {
|
||||
STORAGE_LOG_DEBUG << "read_index(" << location << ") failed!";
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
size_t rp = 0;
|
||||
reader_ptr->seekg(0);
|
||||
|
||||
int32_t current_type = 0;
|
||||
reader_ptr->read(¤t_type, sizeof(current_type));
|
||||
rp += sizeof(current_type);
|
||||
reader_ptr->seekg(rp);
|
||||
|
||||
STORAGE_LOG_DEBUG << "Start to read_index(" << location << ") length: " << length << " bytes";
|
||||
while (rp < length) {
|
||||
size_t meta_length;
|
||||
reader_ptr->read(&meta_length, sizeof(meta_length));
|
||||
rp += sizeof(meta_length);
|
||||
reader_ptr->seekg(rp);
|
||||
|
||||
auto meta = new char[meta_length];
|
||||
reader_ptr->read(meta, meta_length);
|
||||
rp += meta_length;
|
||||
reader_ptr->seekg(rp);
|
||||
|
||||
size_t bin_length;
|
||||
reader_ptr->read(&bin_length, sizeof(bin_length));
|
||||
rp += sizeof(bin_length);
|
||||
reader_ptr->seekg(rp);
|
||||
|
||||
auto bin = new uint8_t[bin_length];
|
||||
reader_ptr->read(bin, bin_length);
|
||||
rp += bin_length;
|
||||
reader_ptr->seekg(rp);
|
||||
|
||||
std::shared_ptr<uint8_t[]> binptr(bin);
|
||||
load_data_list.Append(std::string(meta, meta_length), binptr, bin_length);
|
||||
delete[] meta;
|
||||
}
|
||||
reader_ptr->close();
|
||||
|
||||
double span = recorder.RecordSection("End");
|
||||
double rate = length * 1000000.0 / span / 1024 / 1024;
|
||||
STORAGE_LOG_DEBUG << "read_index(" << location << ") rate " << rate << "MB/s";
|
||||
|
||||
return LoadVecIndex(knowhere::OldIndexTypeToStr(current_type), load_data_list, length);
|
||||
}
|
||||
|
||||
milvus::Status
|
||||
write_index(knowhere::VecIndexPtr index, const std::string& location) {
|
||||
try {
|
||||
milvus::TimeRecorder recorder("write_index");
|
||||
|
||||
auto binaryset = index->Serialize(knowhere::Config());
|
||||
int32_t index_type = knowhere::StrToOldIndexType(index->index_type());
|
||||
|
||||
bool s3_enable = false;
|
||||
milvus::server::Config& config = milvus::server::Config::GetInstance();
|
||||
config.GetStorageConfigS3Enable(s3_enable);
|
||||
|
||||
std::shared_ptr<milvus::storage::IOWriter> writer_ptr;
|
||||
if (s3_enable) {
|
||||
writer_ptr = std::make_shared<milvus::storage::S3IOWriter>();
|
||||
} else {
|
||||
writer_ptr = std::make_shared<milvus::storage::DiskIOWriter>();
|
||||
}
|
||||
|
||||
recorder.RecordSection("Start");
|
||||
writer_ptr->open(location);
|
||||
|
||||
writer_ptr->write(&index_type, sizeof(index_type));
|
||||
|
||||
for (auto& iter : binaryset.binary_map_) {
|
||||
auto meta = iter.first.c_str();
|
||||
size_t meta_length = iter.first.length();
|
||||
writer_ptr->write(&meta_length, sizeof(meta_length));
|
||||
writer_ptr->write((void*)meta, meta_length);
|
||||
|
||||
auto binary = iter.second;
|
||||
int64_t binary_length = binary->size;
|
||||
writer_ptr->write(&binary_length, sizeof(binary_length));
|
||||
writer_ptr->write((void*)binary->data.get(), binary_length);
|
||||
}
|
||||
writer_ptr->close();
|
||||
|
||||
double span = recorder.RecordSection("End");
|
||||
double rate = writer_ptr->length() * 1000000.0 / span / 1024 / 1024;
|
||||
STORAGE_LOG_DEBUG << "write_index(" << location << ") rate " << rate << "MB/s";
|
||||
} catch (knowhere::KnowhereException& e) {
|
||||
WRAPPER_LOG_ERROR << e.what();
|
||||
return milvus::Status(milvus::KNOWHERE_UNEXPECTED_ERROR, e.what());
|
||||
} catch (std::exception& e) {
|
||||
WRAPPER_LOG_ERROR << e.what();
|
||||
std::string estring(e.what());
|
||||
if (estring.find("No space left on device") != estring.npos) {
|
||||
WRAPPER_LOG_ERROR << "No space left on the device";
|
||||
return milvus::Status(milvus::KNOWHERE_NO_SPACE, "No space left on the device");
|
||||
} else {
|
||||
return milvus::Status(milvus::KNOWHERE_ERROR, e.what());
|
||||
}
|
||||
}
|
||||
return milvus::Status::OK();
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -1,37 +0,0 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "cache/DataObj.h"
|
||||
#include "knowhere/common/BinarySet.h"
|
||||
#include "knowhere/index/vector_index/VecIndex.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
extern milvus::Status
|
||||
write_index(knowhere::VecIndexPtr index, const std::string& location);
|
||||
|
||||
extern knowhere::VecIndexPtr
|
||||
read_index(const std::string& location);
|
||||
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
|
@ -21,7 +21,6 @@
|
|||
#include "knowhere/common/Typedef.h"
|
||||
#include "knowhere/index/Index.h"
|
||||
#include "knowhere/index/vector_index/IndexType.h"
|
||||
#include "segment/Types.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace knowhere {
|
||||
|
@ -88,13 +87,13 @@ class VecIndex : public Index {
|
|||
bitset_ = std::move(bitset_ptr);
|
||||
}
|
||||
|
||||
const std::vector<milvus::segment::doc_id_t>&
|
||||
const std::vector<IDType>&
|
||||
GetUids() const {
|
||||
return uids_;
|
||||
}
|
||||
|
||||
void
|
||||
SetUids(std::vector<milvus::segment::doc_id_t>& uids) {
|
||||
SetUids(std::vector<IDType>& uids) {
|
||||
uids_.clear();
|
||||
uids_.swap(uids);
|
||||
}
|
||||
|
@ -110,7 +109,7 @@ class VecIndex : public Index {
|
|||
|
||||
size_t
|
||||
UidsSize() {
|
||||
return uids_.size() * sizeof(segment::doc_id_t);
|
||||
return uids_.size() * sizeof(IDType);
|
||||
}
|
||||
|
||||
virtual int64_t
|
||||
|
@ -135,7 +134,7 @@ class VecIndex : public Index {
|
|||
IndexType index_type_ = "";
|
||||
IndexMode index_mode_ = IndexMode::MODE_CPU;
|
||||
faiss::ConcurrentBitsetPtr bitset_ = nullptr;
|
||||
std::vector<segment::doc_id_t> uids_;
|
||||
std::vector<IDType> uids_;
|
||||
int64_t index_size_ = -1;
|
||||
};
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ SegmentReader::Load() {
|
|||
try {
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->read(fs_ptr_, segment_ptr_->vectors_ptr_);
|
||||
// default_codec.GetVectorIndexFormat()->read(fs_ptr_, segment_ptr_->vector_index_ptr_);
|
||||
default_codec.GetDeletedDocsFormat()->read(fs_ptr_, segment_ptr_->deleted_docs_ptr_);
|
||||
} catch (std::exception& e) {
|
||||
return Status(DB_ERROR, e.what());
|
||||
|
@ -91,6 +92,20 @@ SegmentReader::GetSegment(SegmentPtr& segment_ptr) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentReader::LoadVectorIndex(segment::VectorIndexPtr& vector_index_ptr) {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorIndexFormat()->read(fs_ptr_, vector_index_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load vector index: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
return Status(DB_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
codec::DefaultCodec default_codec;
|
||||
|
|
|
@ -45,6 +45,9 @@ class SegmentReader {
|
|||
Status
|
||||
LoadUids(std::vector<doc_id_t>& uids);
|
||||
|
||||
Status
|
||||
LoadVectorIndex(segment::VectorIndexPtr& vector_index_ptr);
|
||||
|
||||
Status
|
||||
LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr);
|
||||
|
||||
|
|
|
@ -49,6 +49,12 @@ SegmentWriter::AddVectors(const std::string& name, const std::vector<uint8_t>& d
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentWriter::SetVectorIndex(const milvus::knowhere::VecIndexPtr& index) {
|
||||
segment_ptr_->vector_index_ptr_->SetVectorIndex(index);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentWriter::Serialize() {
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
|
@ -99,6 +105,20 @@ SegmentWriter::WriteVectors() {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentWriter::WriteVectorIndex() {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorIndexFormat()->write(fs_ptr_, segment_ptr_->vector_index_ptr_);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vector index: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
return Status(SERVER_WRITE_ERROR, err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SegmentWriter::WriteBloomFilter() {
|
||||
codec::DefaultCodec default_codec;
|
||||
|
|
|
@ -35,6 +35,9 @@ class SegmentWriter {
|
|||
Status
|
||||
AddVectors(const std::string& name, const std::vector<uint8_t>& data, const std::vector<doc_id_t>& uids);
|
||||
|
||||
Status
|
||||
SetVectorIndex(const knowhere::VecIndexPtr& index);
|
||||
|
||||
Status
|
||||
WriteBloomFilter(const IdBloomFilterPtr& bloom_filter_ptr);
|
||||
|
||||
|
@ -59,6 +62,9 @@ class SegmentWriter {
|
|||
size_t
|
||||
VectorCount();
|
||||
|
||||
Status
|
||||
WriteVectorIndex();
|
||||
|
||||
private:
|
||||
Status
|
||||
WriteVectors();
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
#include "segment/DeletedDocs.h"
|
||||
#include "segment/IdBloomFilter.h"
|
||||
#include "segment/VectorIndex.h"
|
||||
#include "segment/Vectors.h"
|
||||
|
||||
namespace milvus {
|
||||
|
@ -30,6 +31,7 @@ typedef int64_t doc_id_t;
|
|||
|
||||
struct Segment {
|
||||
VectorsPtr vectors_ptr_ = std::make_shared<Vectors>();
|
||||
VectorIndexPtr vector_index_ptr_ = std::make_shared<VectorIndex>();
|
||||
DeletedDocsPtr deleted_docs_ptr_ = nullptr;
|
||||
IdBloomFilterPtr id_bloom_filter_ptr_ = nullptr;
|
||||
};
|
||||
|
|
|
@ -18,18 +18,38 @@
|
|||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "index/knowhere/knowhere/index/Index.h"
|
||||
#include <string>
|
||||
#include "knowhere/index/vector_index/VecIndex.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace segment {
|
||||
|
||||
class VectorIndex {
|
||||
public:
|
||||
explicit VectorIndex(knowhere::IndexPtr index_ptr);
|
||||
explicit VectorIndex(knowhere::VecIndexPtr index_ptr) : index_ptr_(index_ptr) {
|
||||
}
|
||||
|
||||
VectorIndex() = default;
|
||||
|
||||
knowhere::VecIndexPtr
|
||||
GetVectorIndex() const {
|
||||
return index_ptr_;
|
||||
}
|
||||
|
||||
void
|
||||
Get(knowhere::IndexPtr& index_ptr);
|
||||
SetVectorIndex(const knowhere::VecIndexPtr& index_ptr) {
|
||||
index_ptr_ = index_ptr;
|
||||
}
|
||||
|
||||
void
|
||||
SetName(const std::string& name) {
|
||||
name_ = name;
|
||||
}
|
||||
|
||||
const std::string&
|
||||
GetName() const {
|
||||
return name_;
|
||||
}
|
||||
|
||||
// No copy and move
|
||||
VectorIndex(const VectorIndex&) = delete;
|
||||
|
@ -41,7 +61,8 @@ class VectorIndex {
|
|||
operator=(VectorIndex&&) = delete;
|
||||
|
||||
private:
|
||||
knowhere::IndexPtr index_ptr_;
|
||||
knowhere::VecIndexPtr index_ptr_ = nullptr;
|
||||
std::string name_;
|
||||
};
|
||||
|
||||
using VectorIndexPtr = std::shared_ptr<VectorIndex>;
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include "storage/IOReader.h"
|
||||
|
||||
|
@ -52,5 +53,7 @@ class DiskIOReader : public IOReader {
|
|||
std::fstream fs_;
|
||||
};
|
||||
|
||||
using DiskIOReaderPtr = std::shared_ptr<DiskIOReader>;
|
||||
|
||||
} // namespace storage
|
||||
} // namespace milvus
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include "storage/IOWriter.h"
|
||||
|
||||
|
@ -50,5 +51,7 @@ class DiskIOWriter : public IOWriter {
|
|||
std::fstream fs_;
|
||||
};
|
||||
|
||||
using DiskIOWriterPtr = std::shared_ptr<DiskIOWriter>;
|
||||
|
||||
} // namespace storage
|
||||
} // namespace milvus
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include "storage/IOReader.h"
|
||||
|
||||
|
@ -52,5 +53,7 @@ class S3IOReader : public IOReader {
|
|||
size_t pos_;
|
||||
};
|
||||
|
||||
using S3IOReaderPtr = std::shared_ptr<S3IOReader>;
|
||||
|
||||
} // namespace storage
|
||||
} // namespace milvus
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include "storage/IOWriter.h"
|
||||
|
||||
|
@ -49,5 +50,7 @@ class S3IOWriter : public IOWriter {
|
|||
std::string buffer_;
|
||||
};
|
||||
|
||||
using S3IOWriterPtr = std::shared_ptr<S3IOWriter>;
|
||||
|
||||
} // namespace storage
|
||||
} // namespace milvus
|
||||
|
|
|
@ -22,6 +22,7 @@ if (MILVUS_WITH_PROMETHEUS)
|
|||
test_prometheus.cpp)
|
||||
endif ()
|
||||
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-access-control")
|
||||
add_executable(test_metrics
|
||||
${common_files}
|
||||
${test_files}
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
#include <fiu-local.h>
|
||||
#include <fiu-control.h>
|
||||
|
||||
#define private public
|
||||
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "config/Config.h"
|
||||
#include "metrics/utils.h"
|
||||
|
|
Loading…
Reference in New Issue