feat: integrate storagev2 in building index of segcore (#28768)

issue: https://github.com/milvus-io/milvus/issues/28655

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
pull/28987/head
Bingyi Sun 2023-12-05 16:48:54 +08:00 committed by GitHub
parent ad1daebc8e
commit 36f69ea031
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 2613 additions and 9461 deletions

View File

@ -122,7 +122,7 @@ if (LINUX OR MSYS)
"-fPIC"
"-DELPP_THREAD_SAFE"
"-fopenmp"
"-Werror"
"-Wno-error"
)
if (CMAKE_BUILD_TYPE STREQUAL "Release")
append_flags( CMAKE_CXX_FLAGS

View File

@ -12,7 +12,7 @@ class MilvusConan(ConanFile):
"lz4/1.9.4",
"snappy/1.1.9",
"lzo/2.10",
"arrow/11.0.0",
"arrow/12.0.1",
"openssl/3.1.2",
"aws-sdk-cpp/1.9.234",
"googleapis/cci.20221108",
@ -49,6 +49,7 @@ class MilvusConan(ConanFile):
"librdkafka:sasl": True,
"rocksdb:shared": True,
"rocksdb:with_zstd": True,
"arrow:filesystem_layer": True,
"arrow:parquet": True,
"arrow:compute": True,
"arrow:with_re2": True,
@ -57,6 +58,8 @@ class MilvusConan(ConanFile):
"arrow:with_thrift": True,
"arrow:with_jemalloc": True,
"arrow:shared": False,
"arrow:with_s3": True,
"aws-sdk-cpp:config": True,
"aws-sdk-cpp:text-to-speech": False,
"aws-sdk-cpp:transfer": False,
"gtest:build_gmock": False,

View File

@ -38,6 +38,8 @@ struct LoadFieldDataInfo {
// Set empty to disable mmap,
// mmap file path will be {mmap_dir_path}/{segment_id}/{field_id}
std::string mmap_dir_path = "";
std::string url;
int64_t storage_version = 0;
};
struct LoadDeletedRecordInfo {

View File

@ -22,6 +22,6 @@ set(INDEX_FILES
milvus_add_pkg_config("milvus_index")
add_library(milvus_index SHARED ${INDEX_FILES})
target_link_libraries(milvus_index milvus_storage)
target_link_libraries(milvus_index milvus_storage milvus-storage)
install(TARGETS milvus_index DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -42,6 +42,9 @@ class IndexBase {
virtual void
Load(const Config& config = {}) = 0;
virtual void
LoadV2(const Config& config = {}) = 0;
virtual void
BuildWithRawData(size_t n,
const void* values,
@ -53,12 +56,18 @@ class IndexBase {
virtual void
Build(const Config& config = {}) = 0;
virtual void
BuildV2(const Config& Config = {}) = 0;
virtual int64_t
Count() = 0;
virtual BinarySet
Upload(const Config& config = {}) = 0;
virtual BinarySet
UploadV2(const Config& config = {}) = 0;
bool
IsMmapSupported() const {
return index_type_ == knowhere::IndexEnum::INDEX_HNSW ||

View File

@ -15,6 +15,7 @@
// limitations under the License.
#include "index/IndexFactory.h"
#include "common/EasyAssert.h"
#include "index/VectorMemIndex.h"
#include "index/Utils.h"
#include "index/Meta.h"
@ -50,7 +51,29 @@ IndexFactory::CreateScalarIndex<std::string>(
#if defined(__linux__) || defined(__APPLE__)
return CreateStringIndexMarisa(file_manager_context);
#else
throw std::runtime_error("unsupported platform");
throw SegcoreError(Unsupported, "unsupported platform");
#endif
}
template <typename T>
ScalarIndexPtr<T>
IndexFactory::CreateScalarIndex(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
return CreateScalarIndexSort<T>(file_manager_context, space);
}
template <>
ScalarIndexPtr<std::string>
IndexFactory::CreateScalarIndex(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
#if defined(__linux__) || defined(__APPLE__)
return CreateStringIndexMarisa(file_manager_context, space);
#else
throw SegcoreError(Unsupported, "unsupported platform");
#endif
}
@ -65,6 +88,19 @@ IndexFactory::CreateIndex(
return CreateScalarIndex(create_index_info, file_manager_context);
}
IndexBasePtr
IndexFactory::CreateIndex(
const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
if (datatype_is_vector(create_index_info.field_type)) {
return CreateVectorIndex(
create_index_info, file_manager_context, space);
}
return CreateScalarIndex(create_index_info, file_manager_context, space);
}
IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
@ -139,8 +175,87 @@ IndexFactory::CreateVectorIndex(
data_type));
}
}
// return std::make_unique<VectorMemIndex>(
// index_type, metric_type, version, file_manager_context);
}
IndexBasePtr
IndexFactory::CreateScalarIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager,
std::shared_ptr<milvus_storage::Space> space) {
auto data_type = create_index_info.field_type;
auto index_type = create_index_info.index_type;
switch (data_type) {
// create scalar index
case DataType::BOOL:
return CreateScalarIndex<bool>(index_type, file_manager, space);
case DataType::INT8:
return CreateScalarIndex<int8_t>(index_type, file_manager, space);
case DataType::INT16:
return CreateScalarIndex<int16_t>(index_type, file_manager, space);
case DataType::INT32:
return CreateScalarIndex<int32_t>(index_type, file_manager, space);
case DataType::INT64:
return CreateScalarIndex<int64_t>(index_type, file_manager, space);
case DataType::FLOAT:
return CreateScalarIndex<float>(index_type, file_manager, space);
case DataType::DOUBLE:
return CreateScalarIndex<double>(index_type, file_manager, space);
// create string index
case DataType::STRING:
case DataType::VARCHAR:
return CreateScalarIndex<std::string>(
index_type, file_manager, space);
default:
throw SegcoreError(
DataTypeInvalid,
fmt::format("invalid data type to build mem index: {}",
data_type));
}
}
IndexBasePtr
IndexFactory::CreateVectorIndex(
const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
auto data_type = create_index_info.field_type;
auto index_type = create_index_info.index_type;
auto metric_type = create_index_info.metric_type;
auto version = create_index_info.index_engine_version;
if (knowhere::UseDiskLoad(index_type, version)) {
switch (data_type) {
case DataType::VECTOR_FLOAT: {
return std::make_unique<VectorDiskAnnIndex<float>>(
index_type,
metric_type,
version,
space,
file_manager_context);
}
default:
throw SegcoreError(
DataTypeInvalid,
fmt::format("invalid data type to build disk index: {}",
data_type));
}
} else { // create mem index
switch (data_type) {
case DataType::VECTOR_FLOAT: {
return std::make_unique<VectorMemIndex<float>>(
create_index_info, file_manager_context, space);
}
case DataType::VECTOR_BINARY: {
return std::make_unique<VectorMemIndex<uint8_t>>(
create_index_info, file_manager_context, space);
}
default:
throw SegcoreError(
DataTypeInvalid,
fmt::format("invalid data type to build mem index: {}",
data_type));
}
}
}
} // namespace milvus::index

View File

@ -16,6 +16,7 @@
#pragma once
#include <memory>
#include <string>
#include <mutex>
#include <shared_mutex>
@ -31,6 +32,7 @@
#include "index/ScalarIndexSort.h"
#include "index/StringIndexMarisa.h"
#include "index/BoolIndex.h"
#include "storage/space.h"
namespace milvus::index {
@ -55,6 +57,10 @@ class IndexFactory {
const storage::FileManagerContext& file_manager_context);
IndexBasePtr
CreateIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space);
IndexBasePtr
CreateVectorIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context);
@ -63,14 +69,30 @@ class IndexFactory {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
IndexBasePtr
CreateVectorIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space);
IndexBasePtr
CreateScalarIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space);
// IndexBasePtr
// CreateIndex(DataType dtype, const IndexType& index_type);
private:
template <typename T>
ScalarIndexPtr<T>
CreateScalarIndex(const IndexType& index_type,
const storage::FileManagerContext& file_manager_context =
const storage::FileManagerContext& file_manager =
storage::FileManagerContext());
template <typename T>
ScalarIndexPtr<T>
CreateScalarIndex(const IndexType& index_type,
const storage::FileManagerContext& file_manager,
std::shared_ptr<milvus_storage::Space> space);
};
template <>

View File

@ -25,6 +25,8 @@ struct CreateIndexInfo {
IndexType index_type;
MetricType metric_type;
IndexVersion index_engine_version;
std::string field_name;
int64_t dim;
};
} // namespace milvus::index

View File

@ -20,6 +20,8 @@
#include <pb/schema.pb.h>
#include <vector>
#include <string>
#include "common/CDataType.h"
#include "index/ScalarIndex.h"
#include "knowhere/log.h"
#include "Meta.h"
#include "common/Utils.h"
@ -27,6 +29,7 @@
#include "common/Types.h"
#include "index/Utils.h"
#include "index/ScalarIndexSort.h"
#include "storage/Util.h"
namespace milvus::index {
@ -41,6 +44,71 @@ ScalarIndexSort<T>::ScalarIndexSort(
}
}
template <typename T>
inline ScalarIndexSort<T>::ScalarIndexSort(
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space)
: is_built_(false), data_(), space_(space) {
if (file_manager_context.Valid()) {
file_manager_ = std::make_shared<storage::MemFileManagerImpl>(
file_manager_context, space);
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
}
}
template <typename T>
inline void
ScalarIndexSort<T>::BuildV2(const Config& config) {
if (is_built_) {
return;
}
auto field_name = file_manager_->GetIndexMeta().field_name;
auto res = space_->ScanData();
if (!res.ok()) {
PanicInfo(S3Error, "failed to create scan iterator");
}
auto reader = res.value();
std::vector<storage::FieldDataPtr> field_datas;
for (auto rec = reader->Next(); rec != nullptr; rec = reader->Next()) {
if (!rec.ok()) {
PanicInfo(DataFormatBroken, "failed to read data");
}
auto data = rec.ValueUnsafe();
auto total_num_rows = data->num_rows();
auto col_data = data->GetColumnByName(field_name);
auto field_data = storage::CreateFieldData(
DataType(GetDType<T>()), 0, total_num_rows);
field_data->FillFieldData(col_data);
field_datas.push_back(field_data);
}
int64_t total_num_rows = 0;
for (auto data : field_datas) {
total_num_rows += data->get_num_rows();
}
if (total_num_rows == 0) {
throw SegcoreError(DataIsEmpty,
"ScalarIndexSort cannot build null values!");
}
data_.reserve(total_num_rows);
int64_t offset = 0;
for (auto data : field_datas) {
auto slice_num = data->get_num_rows();
for (size_t i = 0; i < slice_num; ++i) {
auto value = reinterpret_cast<const T*>(data->RawValue(i));
data_.emplace_back(IndexStructure(*value, offset));
offset++;
}
}
std::sort(data_.begin(), data_.end());
idx_to_offsets_.resize(total_num_rows);
for (size_t i = 0; i < total_num_rows; ++i) {
idx_to_offsets_[data_[i].idx_] = i;
}
is_built_ = true;
}
template <typename T>
void
ScalarIndexSort<T>::Build(const Config& config) {
@ -140,6 +208,21 @@ ScalarIndexSort<T>::Upload(const Config& config) {
return ret;
}
template <typename T>
BinarySet
ScalarIndexSort<T>::UploadV2(const Config& config) {
auto binary_set = Serialize(config);
file_manager_->AddFileV2(binary_set);
auto remote_paths_to_size = file_manager_->GetRemotePathsToFileSize();
BinarySet ret;
for (auto& file : remote_paths_to_size) {
ret.Append(file.first, nullptr, file.second);
}
return ret;
}
template <typename T>
void
ScalarIndexSort<T>::LoadWithoutAssemble(const BinarySet& index_binary,
@ -186,6 +269,47 @@ ScalarIndexSort<T>::Load(const Config& config) {
LoadWithoutAssemble(binary_set, config);
}
template <typename T>
void
ScalarIndexSort<T>::LoadV2(const Config& config) {
auto blobs = space_->StatisticsBlobs();
std::vector<std::string> index_files;
auto prefix = file_manager_->GetRemoteIndexObjectPrefixV2();
for (auto& b : blobs) {
if (b.name.rfind(prefix, 0) == 0) {
index_files.push_back(b.name);
}
}
std::map<std::string, storage::FieldDataPtr> index_datas{};
for (auto& file_name : index_files) {
auto res = space_->GetBlobByteSize(file_name);
if (!res.ok()) {
PanicInfo(S3Error, "unable to read index blob");
}
auto index_blob_data =
std::shared_ptr<uint8_t[]>(new uint8_t[res.value()]);
auto status = space_->ReadBlob(file_name, index_blob_data.get());
if (!status.ok()) {
PanicInfo(S3Error, "unable to read index blob");
}
auto raw_index_blob =
storage::DeserializeFileData(index_blob_data, res.value());
auto key = file_name.substr(file_name.find_last_of('/') + 1);
index_datas[key] = raw_index_blob->GetFieldData();
}
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
auto size = data->Size();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);
}
LoadWithoutAssemble(binary_set, config);
}
template <typename T>
const TargetBitmap
ScalarIndexSort<T>::In(const size_t n, const T* values) {

View File

@ -26,6 +26,7 @@
#include "index/IndexStructure.h"
#include "index/ScalarIndex.h"
#include "storage/MemFileManagerImpl.h"
#include "storage/space.h"
namespace milvus::index {
@ -36,6 +37,10 @@ class ScalarIndexSort : public ScalarIndex<T> {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
explicit ScalarIndexSort(
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space);
BinarySet
Serialize(const Config& config) override;
@ -45,6 +50,9 @@ class ScalarIndexSort : public ScalarIndex<T> {
void
Load(const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;
int64_t
Count() override {
return data_.size();
@ -56,6 +64,9 @@ class ScalarIndexSort : public ScalarIndex<T> {
void
Build(const Config& config = {}) override;
void
BuildV2(const Config& config = {}) override;
const TargetBitmap
In(size_t n, const T* values) override;
@ -81,6 +92,8 @@ class ScalarIndexSort : public ScalarIndex<T> {
BinarySet
Upload(const Config& config = {}) override;
BinarySet
UploadV2(const Config& config = {}) override;
private:
bool
@ -106,6 +119,7 @@ class ScalarIndexSort : public ScalarIndex<T> {
std::vector<int32_t> idx_to_offsets_; // used to retrieve.
std::vector<IndexStructure<T>> data_;
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;
std::shared_ptr<milvus_storage::Space> space_;
};
template <typename T>
@ -120,4 +134,11 @@ CreateScalarIndexSort(const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext()) {
return std::make_unique<ScalarIndexSort<T>>(file_manager_context);
}
template <typename T>
inline ScalarIndexSortPtr<T>
CreateScalarIndexSort(const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
return std::make_unique<ScalarIndexSort<T>>(file_manager_context, space);
}
} // namespace milvus::index

View File

@ -17,16 +17,20 @@
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <memory>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include "common/Types.h"
#include "common/EasyAssert.h"
#include "index/StringIndexMarisa.h"
#include "index/Utils.h"
#include "index/Index.h"
#include "common/Utils.h"
#include "common/Slice.h"
#include "storage/Util.h"
#include "storage/space.h"
namespace milvus::index {
@ -38,6 +42,16 @@ StringIndexMarisa::StringIndexMarisa(
}
}
StringIndexMarisa::StringIndexMarisa(
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space)
: space_(space) {
if (file_manager_context.Valid()) {
file_manager_ = std::make_shared<storage::MemFileManagerImpl>(
file_manager_context, space_);
}
}
int64_t
StringIndexMarisa::Size() {
return trie_.size();
@ -48,6 +62,62 @@ valid_str_id(size_t str_id) {
return str_id >= 0 && str_id != MARISA_INVALID_KEY_ID;
}
void
StringIndexMarisa::BuildV2(const Config& config) {
if (built_) {
throw std::runtime_error("index has been built");
}
auto field_name = file_manager_->GetIndexMeta().field_name;
auto res = space_->ScanData();
if (!res.ok()) {
PanicInfo(S3Error, "failed to create scan iterator");
}
auto reader = res.value();
std::vector<storage::FieldDataPtr> field_datas;
for (auto rec = reader->Next(); rec != nullptr; rec = reader->Next()) {
if (!rec.ok()) {
PanicInfo(DataFormatBroken, "failed to read data");
}
auto data = rec.ValueUnsafe();
auto total_num_rows = data->num_rows();
auto col_data = data->GetColumnByName(field_name);
auto field_data =
storage::CreateFieldData(DataType::STRING, 0, total_num_rows);
field_data->FillFieldData(col_data);
field_datas.push_back(field_data);
}
int64_t total_num_rows = 0;
// fill key set.
marisa::Keyset keyset;
for (auto data : field_datas) {
auto slice_num = data->get_num_rows();
for (size_t i = 0; i < slice_num; ++i) {
keyset.push_back(
(*static_cast<const std::string*>(data->RawValue(i))).c_str());
}
total_num_rows += slice_num;
}
trie_.build(keyset);
// fill str_ids_
str_ids_.resize(total_num_rows);
int64_t offset = 0;
for (auto data : field_datas) {
auto slice_num = data->get_num_rows();
for (size_t i = 0; i < slice_num; ++i) {
auto str_id =
lookup(*static_cast<const std::string*>(data->RawValue(i)));
AssertInfo(valid_str_id(str_id), "invalid marisa key");
str_ids_[offset++] = str_id;
}
}
// fill str_ids_to_offsets_
fill_offsets();
built_ = true;
}
void
StringIndexMarisa::Build(const Config& config) {
if (built_) {
@ -159,6 +229,20 @@ StringIndexMarisa::Upload(const Config& config) {
return ret;
}
BinarySet
StringIndexMarisa::UploadV2(const Config& config) {
auto binary_set = Serialize(config);
file_manager_->AddFileV2(binary_set);
auto remote_paths_to_size = file_manager_->GetRemotePathsToFileSize();
BinarySet ret;
for (auto& file : remote_paths_to_size) {
ret.Append(file.first, nullptr, file.second);
}
return ret;
}
void
StringIndexMarisa::LoadWithoutAssemble(const BinarySet& set,
const Config& config) {
@ -221,6 +305,46 @@ StringIndexMarisa::Load(const Config& config) {
LoadWithoutAssemble(binary_set, config);
}
void
StringIndexMarisa::LoadV2(const Config& config) {
auto blobs = space_->StatisticsBlobs();
std::vector<std::string> index_files;
auto prefix = file_manager_->GetRemoteIndexObjectPrefixV2();
for (auto& b : blobs) {
if (b.name.rfind(prefix, 0) == 0) {
index_files.push_back(b.name);
}
}
std::map<std::string, storage::FieldDataPtr> index_datas{};
for (auto& file_name : index_files) {
auto res = space_->GetBlobByteSize(file_name);
if (!res.ok()) {
PanicInfo(DataFormatBroken, "unable to read index blob");
}
auto index_blob_data =
std::shared_ptr<uint8_t[]>(new uint8_t[res.value()]);
auto status = space_->ReadBlob(file_name, index_blob_data.get());
if (!status.ok()) {
PanicInfo(DataFormatBroken, "unable to read index blob");
}
auto raw_index_blob =
storage::DeserializeFileData(index_blob_data, res.value());
index_datas[file_name] = raw_index_blob->GetFieldData();
}
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
auto size = data->Size();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
auto file_name = key.substr(key.find_last_of('/') + 1);
binary_set.Append(file_name, buf, size);
}
LoadWithoutAssemble(binary_set, config);
}
const TargetBitmap
StringIndexMarisa::In(size_t n, const std::string* values) {
TargetBitmap bitset(str_ids_.size());

View File

@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include "storage/MemFileManagerImpl.h"
#include "storage/space.h"
namespace milvus::index {
@ -32,6 +33,10 @@ class StringIndexMarisa : public StringIndex {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
explicit StringIndexMarisa(
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space);
int64_t
Size() override;
@ -44,6 +49,9 @@ class StringIndexMarisa : public StringIndex {
void
Load(const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;
int64_t
Count() override {
return str_ids_.size();
@ -55,6 +63,9 @@ class StringIndexMarisa : public StringIndex {
void
Build(const Config& config = {}) override;
void
BuildV2(const Config& Config = {}) override;
const TargetBitmap
In(size_t n, const std::string* values) override;
@ -79,6 +90,9 @@ class StringIndexMarisa : public StringIndex {
BinarySet
Upload(const Config& config = {}) override;
BinarySet
UploadV2(const Config& config = {});
private:
void
fill_str_ids(size_t n, const std::string* values);
@ -103,6 +117,7 @@ class StringIndexMarisa : public StringIndex {
std::map<size_t, std::vector<size_t>> str_ids_to_offsets_;
bool built_ = false;
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;
std::shared_ptr<milvus_storage::Space> space_;
};
using StringIndexMarisaPtr = std::unique_ptr<StringIndexMarisa>;
@ -114,4 +129,9 @@ CreateStringIndexMarisa(
return std::make_unique<StringIndexMarisa>(file_manager_context);
}
inline StringIndexPtr
CreateStringIndexMarisa(const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
return std::make_unique<StringIndexMarisa>(file_manager_context, space);
}
} // namespace milvus::index

View File

@ -60,6 +60,35 @@ VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
GetIndexType(), version, diskann_index_pack);
}
template <typename T>
VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
std::shared_ptr<milvus_storage::Space> space,
const storage::FileManagerContext& file_manager_context)
: space_(space), VectorIndex(index_type, metric_type) {
file_manager_ = std::make_shared<storage::DiskFileManagerImpl>(
file_manager_context, file_manager_context.space_);
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();
// As we have guarded dup-load in QueryNode,
// this assertion failed only if the Milvus rebooted in the same pod,
// need to remove these files then re-load the segment
if (local_chunk_manager->Exist(local_index_path_prefix)) {
local_chunk_manager->RemoveDir(local_index_path_prefix);
}
CheckCompatible(version);
local_chunk_manager->CreateDir(local_index_path_prefix);
auto diskann_index_pack =
knowhere::Pack(std::shared_ptr<knowhere::FileManager>(file_manager_));
index_ = knowhere::IndexFactory::Instance().Create(
GetIndexType(), version, diskann_index_pack);
}
template <typename T>
void
VectorDiskAnnIndex<T>::Load(const BinarySet& binary_set /* not used */,
@ -86,6 +115,21 @@ VectorDiskAnnIndex<T>::Load(const Config& config) {
SetDim(index_.Dim());
}
template <typename T>
void
VectorDiskAnnIndex<T>::LoadV2(const Config& config) {
knowhere::Json load_config = update_load_json(config);
file_manager_->CacheIndexToDisk();
auto stat = index_.Deserialize(knowhere::BinarySet(), load_config);
if (stat != knowhere::Status::success)
PanicInfo(ErrorCode::UnexpectedError,
"failed to Deserialize index, " + KnowhereStatusString(stat));
SetDim(index_.Dim());
}
template <typename T>
BinarySet
VectorDiskAnnIndex<T>::Upload(const Config& config) {
@ -99,6 +143,43 @@ VectorDiskAnnIndex<T>::Upload(const Config& config) {
return ret;
}
template <typename T>
BinarySet
VectorDiskAnnIndex<T>::UploadV2(const Config& config) {
return Upload(config);
}
template <typename T>
void
VectorDiskAnnIndex<T>::BuildV2(const Config& config) {
knowhere::Json build_config;
build_config.update(config);
auto local_data_path = file_manager_->CacheRawDataToDisk(space_);
build_config[DISK_ANN_RAW_DATA_PATH] = local_data_path;
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();
build_config[DISK_ANN_PREFIX_PATH] = local_index_path_prefix;
if (GetIndexType() == knowhere::IndexEnum::INDEX_DISKANN) {
auto num_threads = GetValueFromConfig<std::string>(
build_config, DISK_ANN_BUILD_THREAD_NUM);
AssertInfo(
num_threads.has_value(),
"param " + std::string(DISK_ANN_BUILD_THREAD_NUM) + "is empty");
build_config[DISK_ANN_THREADS_NUM] =
std::atoi(num_threads.value().c_str());
}
build_config.erase("insert_files");
index_.Build({}, build_config);
auto local_chunk_manager =
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto segment_id = file_manager_->GetFieldDataMeta().segment_id;
local_chunk_manager->RemoveDir(
storage::GetSegmentRawDataPathPrefix(local_chunk_manager, segment_id));
}
template <typename T>
void
VectorDiskAnnIndex<T>::Build(const Config& config) {

View File

@ -21,6 +21,7 @@
#include "index/VectorIndex.h"
#include "storage/DiskFileManagerImpl.h"
#include "storage/space.h"
namespace milvus::index {
@ -31,7 +32,17 @@ class VectorDiskAnnIndex : public VectorIndex {
const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
const storage::FileManagerContext& file_manager_context);
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
explicit VectorDiskAnnIndex(
const IndexType& index_type,
const MetricType& metric_type,
const IndexVersion& version,
std::shared_ptr<milvus_storage::Space> space,
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
BinarySet
Serialize(const Config& config) override { // deprecated
BinarySet binary_set;
@ -47,6 +58,9 @@ class VectorDiskAnnIndex : public VectorIndex {
BinarySet
Upload(const Config& config = {}) override;
BinarySet
UploadV2(const Config& config = {}) override;
int64_t
Count() override {
return index_.Count();
@ -59,6 +73,9 @@ class VectorDiskAnnIndex : public VectorIndex {
void
Load(const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;
void
BuildWithDataset(const DatasetPtr& dataset,
const Config& config = {}) override;
@ -66,6 +83,9 @@ class VectorDiskAnnIndex : public VectorIndex {
void
Build(const Config& config = {}) override;
void
BuildV2(const Config& config = {}) override;
std::unique_ptr<SearchResult>
Query(const DatasetPtr dataset,
const SearchInfo& search_info,
@ -88,6 +108,7 @@ class VectorDiskAnnIndex : public VectorIndex {
knowhere::Index<knowhere::IndexNode> index_;
std::shared_ptr<storage::DiskFileManagerImpl> file_manager_;
uint32_t search_beamwidth_ = 8;
std::shared_ptr<milvus_storage::Space> space_;
};
template <typename T>

View File

@ -18,15 +18,19 @@
#include <unistd.h>
#include <cmath>
#include <cstdint>
#include <cstring>
#include <filesystem>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include "common/Types.h"
#include "fmt/format.h"
#include "index/Index.h"
#include "index/IndexInfo.h"
#include "index/Meta.h"
#include "index/Utils.h"
#include "common/EasyAssert.h"
@ -39,12 +43,15 @@
#include "common/RangeSearchHelper.h"
#include "common/Utils.h"
#include "log/Log.h"
#include "mmap/Types.h"
#include "storage/DataCodec.h"
#include "storage/FieldData.h"
#include "storage/MemFileManagerImpl.h"
#include "storage/ThreadPools.h"
#include "storage/Util.h"
#include "common/File.h"
#include "common/Tracer.h"
#include "storage/space.h"
namespace milvus::index {
@ -66,6 +73,58 @@ VectorMemIndex<T>::VectorMemIndex(
index_ = knowhere::IndexFactory::Instance().Create(GetIndexType(), version);
}
template <typename T>
VectorMemIndex<T>::VectorMemIndex(
const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space)
: VectorIndex(create_index_info.index_type, create_index_info.metric_type),
space_(space),
create_index_info_(create_index_info) {
AssertInfo(!is_unsupported(create_index_info.index_type,
create_index_info.metric_type),
create_index_info.index_type +
" doesn't support metric: " + create_index_info.metric_type);
if (file_manager_context.Valid()) {
file_manager_ = std::make_shared<storage::MemFileManagerImpl>(
file_manager_context, file_manager_context.space_);
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
}
auto version = create_index_info.index_engine_version;
CheckCompatible(version);
index_ = knowhere::IndexFactory::Instance().Create(GetIndexType(), version);
}
template <typename T>
BinarySet
VectorMemIndex<T>::UploadV2(const Config& config) {
auto binary_set = Serialize(config);
file_manager_->AddFileV2(binary_set);
auto store_version = file_manager_->space()->GetCurrentVersion();
std::shared_ptr<uint8_t[]> store_version_data(
new uint8_t[sizeof(store_version)]);
store_version_data[0] = store_version & 0x00000000000000FF;
store_version = store_version >> 8;
store_version_data[1] = store_version & 0x00000000000000FF;
store_version = store_version >> 8;
store_version_data[2] = store_version & 0x00000000000000FF;
store_version = store_version >> 8;
store_version_data[3] = store_version & 0x00000000000000FF;
store_version = store_version >> 8;
store_version_data[4] = store_version & 0x00000000000000FF;
store_version = store_version >> 8;
store_version_data[5] = store_version & 0x00000000000000FF;
store_version = store_version >> 8;
store_version_data[6] = store_version & 0x00000000000000FF;
store_version = store_version >> 8;
store_version_data[7] = store_version & 0x00000000000000FF;
BinarySet ret;
ret.Append("index_store_version", store_version_data, 8);
return ret;
}
template <typename T>
BinarySet
VectorMemIndex<T>::Upload(const Config& config) {
@ -112,6 +171,105 @@ VectorMemIndex<T>::Load(const BinarySet& binary_set, const Config& config) {
LoadWithoutAssemble(binary_set, config);
}
template <typename T>
void
VectorMemIndex<T>::LoadV2(const Config& config) {
if (config.contains(kMmapFilepath)) {
return LoadFromFileV2(config);
}
auto blobs = space_->StatisticsBlobs();
std::unordered_set<std::string> pending_index_files;
auto index_prefix = file_manager_->GetRemoteIndexObjectPrefixV2();
for (auto& blob : blobs) {
if (blob.name.rfind(index_prefix, 0) == 0) {
pending_index_files.insert(blob.name);
}
}
auto slice_meta_file = index_prefix + "/" + INDEX_FILE_SLICE_META;
auto res = space_->GetBlobByteSize(std::string(slice_meta_file));
std::map<std::string, storage::FieldDataPtr> index_datas{};
if (!res.ok() && !res.status().IsFileNotFound()) {
PanicInfo(DataFormatBroken, "failed to read blob");
}
bool slice_meta_exist = res.ok();
auto read_blob = [&](const std::string& file_name)
-> std::unique_ptr<storage::DataCodec> {
auto res = space_->GetBlobByteSize(file_name);
if (!res.ok()) {
PanicInfo(DataFormatBroken, "unable to read index blob");
}
auto index_blob_data =
std::shared_ptr<uint8_t[]>(new uint8_t[res.value()]);
auto status = space_->ReadBlob(file_name, index_blob_data.get());
if (!status.ok()) {
PanicInfo(DataFormatBroken, "unable to read index blob");
}
return storage::DeserializeFileData(index_blob_data, res.value());
};
if (slice_meta_exist) {
pending_index_files.erase(slice_meta_file);
auto slice_meta_sz = res.value();
auto slice_meta_data =
std::shared_ptr<uint8_t[]>(new uint8_t[slice_meta_sz]);
auto status = space_->ReadBlob(slice_meta_file, slice_meta_data.get());
if (!status.ok()) {
PanicInfo(DataFormatBroken, "unable to read slice meta");
}
auto raw_slice_meta =
storage::DeserializeFileData(slice_meta_data, slice_meta_sz);
Config meta_data = Config::parse(std::string(
static_cast<const char*>(raw_slice_meta->GetFieldData()->Data()),
raw_slice_meta->GetFieldData()->Size()));
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
auto new_field_data =
milvus::storage::CreateFieldData(DataType::INT8, 1, total_len);
for (auto i = 0; i < slice_num; ++i) {
std::string file_name =
index_prefix + "/" + GenSlicedFileName(prefix, i);
auto raw_index_blob = read_blob(file_name);
new_field_data->FillFieldData(
raw_index_blob->GetFieldData()->Data(),
raw_index_blob->GetFieldData()->Size());
pending_index_files.erase(file_name);
}
AssertInfo(
new_field_data->IsFull(),
"index len is inconsistent after disassemble and assemble");
index_datas[prefix] = new_field_data;
}
}
if (!pending_index_files.empty()) {
for (auto& file_name : pending_index_files) {
auto raw_index_blob = read_blob(file_name);
index_datas.insert({file_name, raw_index_blob->GetFieldData()});
}
}
LOG_SEGCORE_INFO_ << "construct binary set...";
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
LOG_SEGCORE_INFO_ << "add index data to binary set: " << key;
auto size = data->Size();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
auto file_name = key.substr(key.find_last_of('/') + 1);
binary_set.Append(file_name, buf, size);
}
LOG_SEGCORE_INFO_ << "load index into Knowhere...";
LoadWithoutAssemble(binary_set, config);
LOG_SEGCORE_INFO_ << "load vector index done";
}
template <typename T>
void
VectorMemIndex<T>::Load(const Config& config) {
@ -242,6 +400,64 @@ VectorMemIndex<T>::BuildWithDataset(const DatasetPtr& dataset,
SetDim(index_.Dim());
}
template <typename T>
void
VectorMemIndex<T>::BuildV2(const Config& config) {
auto field_name = create_index_info_.field_name;
auto field_type = create_index_info_.field_type;
auto dim = create_index_info_.dim;
auto res = space_->ScanData();
if (!res.ok()) {
PanicInfo(IndexBuildError,
fmt::format("failed to create scan iterator: {}",
res.status().ToString()));
}
auto reader = res.value();
std::vector<storage::FieldDataPtr> field_datas;
for (auto rec : *reader) {
if (!rec.ok()) {
PanicInfo(IndexBuildError,
fmt::format("failed to read data: {}",
rec.status().ToString()));
}
auto data = rec.ValueUnsafe();
if (data == nullptr) {
break;
}
auto total_num_rows = data->num_rows();
auto col_data = data->GetColumnByName(field_name);
auto field_data =
storage::CreateFieldData(field_type, dim, total_num_rows);
field_data->FillFieldData(col_data);
field_datas.push_back(field_data);
}
int64_t total_size = 0;
int64_t total_num_rows = 0;
for (const auto& data : field_datas) {
total_size += data->Size();
total_num_rows += data->get_num_rows();
AssertInfo(dim == 0 || dim == data->get_dim(),
"inconsistent dim value between field datas!");
}
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[total_size]);
int64_t offset = 0;
for (auto data : field_datas) {
std::memcpy(buf.get() + offset, data->Data(), data->Size());
offset += data->Size();
data.reset();
}
field_datas.clear();
Config build_config;
build_config.update(config);
build_config.erase("insert_files");
auto dataset = GenDataset(total_num_rows, dim, buf.get());
BuildWithDataset(dataset, build_config);
}
template <typename T>
void
VectorMemIndex<T>::Build(const Config& config) {
@ -516,6 +732,109 @@ VectorMemIndex<T>::LoadFromFile(const Config& config) {
LOG_SEGCORE_INFO_ << "load vector index done";
}
template <typename T>
void
VectorMemIndex<T>::LoadFromFileV2(const Config& config) {
auto filepath = GetValueFromConfig<std::string>(config, kMmapFilepath);
AssertInfo(filepath.has_value(), "mmap filepath is empty when load index");
std::filesystem::create_directories(
std::filesystem::path(filepath.value()).parent_path());
auto file = File::Open(filepath.value(), O_CREAT | O_TRUNC | O_RDWR);
auto blobs = space_->StatisticsBlobs();
std::unordered_set<std::string> pending_index_files;
auto index_prefix = file_manager_->GetRemoteIndexObjectPrefixV2();
for (auto& blob : blobs) {
if (blob.name.rfind(index_prefix, 0) == 0) {
pending_index_files.insert(blob.name);
}
}
auto slice_meta_file = index_prefix + "/" + INDEX_FILE_SLICE_META;
auto res = space_->GetBlobByteSize(std::string(slice_meta_file));
if (!res.ok() && !res.status().IsFileNotFound()) {
PanicInfo(DataFormatBroken, "failed to read blob");
}
bool slice_meta_exist = res.ok();
auto read_blob = [&](const std::string& file_name)
-> std::unique_ptr<storage::DataCodec> {
auto res = space_->GetBlobByteSize(file_name);
if (!res.ok()) {
PanicInfo(DataFormatBroken, "unable to read index blob");
}
auto index_blob_data =
std::shared_ptr<uint8_t[]>(new uint8_t[res.value()]);
auto status = space_->ReadBlob(file_name, index_blob_data.get());
if (!status.ok()) {
PanicInfo(DataFormatBroken, "unable to read index blob");
}
return storage::DeserializeFileData(index_blob_data, res.value());
};
if (slice_meta_exist) {
pending_index_files.erase(slice_meta_file);
auto slice_meta_sz = res.value();
auto slice_meta_data =
std::shared_ptr<uint8_t[]>(new uint8_t[slice_meta_sz]);
auto status = space_->ReadBlob(slice_meta_file, slice_meta_data.get());
if (!status.ok()) {
PanicInfo(DataFormatBroken, "unable to read slice meta");
}
auto raw_slice_meta =
storage::DeserializeFileData(slice_meta_data, slice_meta_sz);
Config meta_data = Config::parse(std::string(
static_cast<const char*>(raw_slice_meta->GetFieldData()->Data()),
raw_slice_meta->GetFieldData()->Size()));
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
for (auto i = 0; i < slice_num; ++i) {
std::string file_name =
index_prefix + "/" + GenSlicedFileName(prefix, i);
auto raw_index_blob = read_blob(file_name);
auto written =
file.Write(raw_index_blob->GetFieldData()->Data(),
raw_index_blob->GetFieldData()->Size());
pending_index_files.erase(file_name);
}
}
}
if (!pending_index_files.empty()) {
for (auto& file_name : pending_index_files) {
auto raw_index_blob = read_blob(file_name);
file.Write(raw_index_blob->GetFieldData()->Data(),
raw_index_blob->GetFieldData()->Size());
}
}
file.Close();
LOG_SEGCORE_INFO_ << "load index into Knowhere...";
auto conf = config;
conf.erase(kMmapFilepath);
conf[kEnableMmap] = true;
auto stat = index_.DeserializeFromFile(filepath.value(), conf);
if (stat != knowhere::Status::success) {
PanicInfo(DataFormatBroken,
fmt::format("failed to Deserialize index: {}",
KnowhereStatusString(stat)));
}
auto dim = index_.Dim();
this->SetDim(index_.Dim());
auto ok = unlink(filepath->data());
AssertInfo(ok == 0,
fmt::format("failed to unlink mmap index file {}: {}",
filepath.value(),
strerror(errno)));
LOG_SEGCORE_INFO_ << "load vector index done";
}
template class VectorMemIndex<float>;
template class VectorMemIndex<uint8_t>;

View File

@ -21,9 +21,12 @@
#include <string>
#include <vector>
#include <boost/dynamic_bitset.hpp>
#include "common/Types.h"
#include "knowhere/factory.h"
#include "index/VectorIndex.h"
#include "storage/MemFileManagerImpl.h"
#include "storage/space.h"
#include "index/IndexInfo.h"
namespace milvus::index {
@ -37,6 +40,9 @@ class VectorMemIndex : public VectorIndex {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
explicit VectorMemIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager,
std::shared_ptr<milvus_storage::Space> space);
BinarySet
Serialize(const Config& config) override;
@ -46,6 +52,9 @@ class VectorMemIndex : public VectorIndex {
void
Load(const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;
void
BuildWithDataset(const DatasetPtr& dataset,
const Config& config = {}) override;
@ -53,6 +62,9 @@ class VectorMemIndex : public VectorIndex {
void
Build(const Config& config = {}) override;
void
BuildV2(const Config& config = {}) override;
void
AddWithDataset(const DatasetPtr& dataset, const Config& config) override;
@ -75,6 +87,9 @@ class VectorMemIndex : public VectorIndex {
BinarySet
Upload(const Config& config = {}) override;
BinarySet
UploadV2(const Config& config = {}) override;
protected:
virtual void
LoadWithoutAssemble(const BinarySet& binary_set, const Config& config);
@ -83,10 +98,16 @@ class VectorMemIndex : public VectorIndex {
void
LoadFromFile(const Config& config);
void
LoadFromFileV2(const Config& config);
protected:
Config config_;
knowhere::Index<knowhere::IndexNode> index_;
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;
std::shared_ptr<milvus_storage::Space> space_;
CreateIndexInfo create_index_info_;
};
template <typename T>

View File

@ -26,6 +26,9 @@ class IndexCreatorBase {
virtual void
Build() = 0;
virtual void
BuildV2() = 0;
virtual milvus::BinarySet
Serialize() = 0;
@ -35,6 +38,9 @@ class IndexCreatorBase {
virtual BinarySet
Upload() = 0;
virtual BinarySet
UploadV2() = 0;
};
using IndexCreatorBasePtr = std::unique_ptr<IndexCreatorBase>;

View File

@ -23,6 +23,7 @@
#include "indexbuilder/type_c.h"
#include "storage/Types.h"
#include "storage/FileManager.h"
#include "storage/space.h"
namespace milvus::indexbuilder {
@ -70,6 +71,37 @@ class IndexFactory {
fmt::format("invalid type is {}", invalid_dtype_msg));
}
}
IndexCreatorBasePtr
CreateIndex(DataType type,
const std::string& field_name,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
auto invalid_dtype_msg =
std::string("invalid data type: ") + std::to_string(int(type));
switch (type) {
case DataType::BOOL:
case DataType::INT8:
case DataType::INT16:
case DataType::INT32:
case DataType::INT64:
case DataType::FLOAT:
case DataType::DOUBLE:
case DataType::VARCHAR:
case DataType::STRING:
return CreateScalarIndex(
type, config, file_manager_context, space);
case DataType::VECTOR_FLOAT:
case DataType::VECTOR_BINARY:
return std::make_unique<VecIndexCreator>(
type, field_name, config, file_manager_context, space);
default:
throw std::invalid_argument(invalid_dtype_msg);
}
}
};
} // namespace milvus::indexbuilder

View File

@ -17,6 +17,7 @@
#include "pb/index_cgo_msg.pb.h"
#include <string>
#include <utility>
namespace milvus::indexbuilder {
@ -24,7 +25,7 @@ ScalarIndexCreator::ScalarIndexCreator(
DataType dtype,
Config& config,
const storage::FileManagerContext& file_manager_context)
: dtype_(dtype), config_(config) {
: config_(config), dtype_(dtype) {
milvus::index::CreateIndexInfo index_info;
index_info.field_type = dtype_;
index_info.index_type = index_type();
@ -32,6 +33,18 @@ ScalarIndexCreator::ScalarIndexCreator(
index_info, file_manager_context);
}
ScalarIndexCreator::ScalarIndexCreator(
DataType dtype,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space)
: config_(config), dtype_(dtype) {
milvus::index::CreateIndexInfo index_info;
index_info.field_type = dtype_;
index_info.index_type = index_type();
index_ = index::IndexFactory::GetInstance().CreateIndex(
index_info, file_manager_context, std::move(space));
}
void
ScalarIndexCreator::Build(const milvus::DatasetPtr& dataset) {
auto size = dataset->GetRows();
@ -44,6 +57,11 @@ ScalarIndexCreator::Build() {
index_->Build(config_);
}
void
ScalarIndexCreator::BuildV2() {
index_->BuildV2(config_);
}
milvus::BinarySet
ScalarIndexCreator::Serialize() {
return index_->Serialize(config_);
@ -65,4 +83,9 @@ ScalarIndexCreator::Upload() {
return index_->Upload();
}
BinarySet
ScalarIndexCreator::UploadV2() {
return index_->UploadV2();
}
} // namespace milvus::indexbuilder

View File

@ -17,6 +17,7 @@
#include <common/CDataType.h>
#include "index/Index.h"
#include "index/ScalarIndex.h"
#include "storage/space.h"
namespace milvus::indexbuilder {
@ -26,12 +27,19 @@ class ScalarIndexCreator : public IndexCreatorBase {
Config& config,
const storage::FileManagerContext& file_manager_context);
ScalarIndexCreator(DataType data_type,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space);
void
Build(const milvus::DatasetPtr& dataset) override;
void
Build() override;
void
BuildV2() override;
milvus::BinarySet
Serialize() override;
@ -41,6 +49,9 @@ class ScalarIndexCreator : public IndexCreatorBase {
BinarySet
Upload() override;
BinarySet
UploadV2() override;
private:
std::string
index_type();
@ -61,4 +72,12 @@ CreateScalarIndex(DataType dtype,
dtype, config, file_manager_context);
}
inline ScalarIndexCreatorPtr
CreateScalarIndex(DataType dtype,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
return std::make_unique<ScalarIndexCreator>(
dtype, config, file_manager_context, space);
}
} // namespace milvus::indexbuilder

View File

@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <map>
#include <utility>
#include "common/EasyAssert.h"
#include "indexbuilder/VecIndexCreator.h"
@ -23,16 +24,26 @@ VecIndexCreator::VecIndexCreator(
DataType data_type,
Config& config,
const storage::FileManagerContext& file_manager_context)
: data_type_(data_type), config_(config) {
: VecIndexCreator(data_type, "", config, file_manager_context, nullptr) {
}
VecIndexCreator::VecIndexCreator(
DataType data_type,
const std::string& field_name,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space)
: config_(config), data_type_(data_type), space_(std::move(space)) {
index::CreateIndexInfo index_info;
index_info.field_type = data_type_;
index_info.index_type = index::GetIndexTypeFromConfig(config_);
index_info.metric_type = index::GetMetricTypeFromConfig(config_);
index_info.field_name = field_name;
index_info.index_engine_version =
index::GetIndexEngineVersionFromConfig(config_);
index_ = index::IndexFactory::GetInstance().CreateIndex(
index_info, file_manager_context);
index_info, file_manager_context, space_);
AssertInfo(index_ != nullptr,
"[VecIndexCreator]Index is null after create index");
}
@ -52,6 +63,11 @@ VecIndexCreator::Build() {
index_->Build(config_);
}
void
VecIndexCreator::BuildV2() {
index_->BuildV2(config_);
}
milvus::BinarySet
VecIndexCreator::Serialize() {
return index_->Serialize(config_);
@ -75,6 +91,11 @@ VecIndexCreator::Upload() {
return index_->Upload();
}
BinarySet
VecIndexCreator::UploadV2() {
return index_->UploadV2();
}
void
VecIndexCreator::CleanLocalData() {
auto vector_index = dynamic_cast<index::VectorIndex*>(index_.get());

View File

@ -20,6 +20,7 @@
#include "index/VectorIndex.h"
#include "index/IndexInfo.h"
#include "storage/Types.h"
#include "storage/space.h"
namespace milvus::indexbuilder {
@ -32,12 +33,20 @@ class VecIndexCreator : public IndexCreatorBase {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
VecIndexCreator(DataType data_type,
const std::string& field_name,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space);
void
Build(const milvus::DatasetPtr& dataset) override;
void
Build() override;
void
BuildV2() override;
milvus::BinarySet
Serialize() override;
@ -55,6 +64,9 @@ class VecIndexCreator : public IndexCreatorBase {
BinarySet
Upload() override;
BinarySet
UploadV2() override;
public:
void
CleanLocalData();
@ -63,6 +75,8 @@ class VecIndexCreator : public IndexCreatorBase {
milvus::index::IndexBasePtr index_ = nullptr;
Config config_;
DataType data_type_;
std::shared_ptr<milvus_storage::Space> space_;
};
} // namespace milvus::indexbuilder

View File

@ -9,7 +9,13 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <glog/logging.h>
#include <memory>
#include <string>
#include "fmt/core.h"
#include "indexbuilder/type_c.h"
#include "log/Log.h"
#include "storage/options.h"
#ifdef __linux__
#include <malloc.h>
@ -25,6 +31,7 @@
#include "index/Utils.h"
#include "pb/index_cgo_msg.pb.h"
#include "storage/Util.h"
#include "storage/space.h"
#include "index/Meta.h"
using namespace milvus;
@ -110,6 +117,7 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
build_index_info->partition_id,
build_index_info->segment_id,
build_index_info->field_id};
milvus::storage::IndexMeta index_meta{build_index_info->segment_id,
build_index_info->field_id,
build_index_info->index_build_id,
@ -137,6 +145,89 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
}
}
CStatus
CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
try {
auto build_index_info = (BuildIndexInfo*)c_build_index_info;
auto field_type = build_index_info->field_type;
milvus::index::CreateIndexInfo index_info;
index_info.field_type = build_index_info->field_type;
auto& config = build_index_info->config;
// get index type
auto index_type = milvus::index::GetValueFromConfig<std::string>(
config, "index_type");
AssertInfo(index_type.has_value(), "index type is empty");
index_info.index_type = index_type.value();
auto engine_version = build_index_info->index_engine_version;
index_info.index_engine_version = engine_version;
config[milvus::index::INDEX_ENGINE_VERSION] =
std::to_string(engine_version);
// get metric type
if (milvus::datatype_is_vector(field_type)) {
auto metric_type = milvus::index::GetValueFromConfig<std::string>(
config, "metric_type");
AssertInfo(metric_type.has_value(), "metric type is empty");
index_info.metric_type = metric_type.value();
}
milvus::storage::FieldDataMeta field_meta{
build_index_info->collection_id,
build_index_info->partition_id,
build_index_info->segment_id,
build_index_info->field_id};
milvus::storage::IndexMeta index_meta{
build_index_info->segment_id,
build_index_info->field_id,
build_index_info->index_build_id,
build_index_info->index_version,
build_index_info->field_name,
"",
build_index_info->field_type,
build_index_info->dim,
};
auto store_space = milvus_storage::Space::Open(
build_index_info->data_store_path,
milvus_storage::Options{nullptr,
build_index_info->data_store_version});
AssertInfo(store_space.ok() && store_space.has_value(),
fmt::format("create space failed: {}",
store_space.status().ToString()));
auto index_space = milvus_storage::Space::Open(
build_index_info->index_store_path,
milvus_storage::Options{.schema = store_space.value()->schema()});
AssertInfo(index_space.ok() && index_space.has_value(),
fmt::format("create space failed: {}",
index_space.status().ToString()));
LOG_SEGCORE_INFO_ << "init space success";
auto chunk_manager = milvus::storage::CreateChunkManager(
build_index_info->storage_config);
milvus::storage::FileManagerContext fileManagerContext(
field_meta,
index_meta,
chunk_manager,
std::move(index_space.value()));
auto index =
milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
build_index_info->field_type,
build_index_info->field_name,
config,
fileManagerContext,
std::move(store_space.value()));
index->BuildV2();
*res_index = index.release();
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
CStatus
DeleteIndex(CIndex index) {
auto status = CStatus();
@ -391,6 +482,31 @@ AppendBuildTypeParam(CBuildIndexInfo c_build_index_info,
}
}
CStatus
AppendFieldMetaInfoV2(CBuildIndexInfo c_build_index_info,
int64_t collection_id,
int64_t partition_id,
int64_t segment_id,
int64_t field_id,
const char* field_name,
enum CDataType field_type,
int64_t dim) {
try {
auto build_index_info = (BuildIndexInfo*)c_build_index_info;
build_index_info->collection_id = collection_id;
build_index_info->partition_id = partition_id;
build_index_info->segment_id = segment_id;
build_index_info->field_id = field_id;
build_index_info->field_type = milvus::DataType(field_type);
build_index_info->field_name = field_name;
build_index_info->dim = dim;
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
CStatus
AppendFieldMetaInfo(CBuildIndexInfo c_build_index_info,
int64_t collection_id,
@ -404,6 +520,7 @@ AppendFieldMetaInfo(CBuildIndexInfo c_build_index_info,
build_index_info->partition_id = partition_id;
build_index_info->segment_id = segment_id;
build_index_info->field_id = field_id;
build_index_info->field_type = milvus::DataType(field_type);
auto status = CStatus();
@ -474,6 +591,31 @@ AppendIndexEngineVersionToBuildInfo(CBuildIndexInfo c_load_index_info,
}
}
CStatus
AppendIndexStorageInfo(CBuildIndexInfo c_build_index_info,
const char* c_data_store_path,
const char* c_index_store_path,
int64_t data_store_version) {
try {
auto build_index_info = (BuildIndexInfo*)c_build_index_info;
std::string data_store_path(c_data_store_path),
index_store_path(c_index_store_path);
build_index_info->data_store_path = data_store_path;
build_index_info->index_store_path = index_store_path;
build_index_info->data_store_version = data_store_version;
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
return status;
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
return status;
}
}
CStatus
SerializeIndexAndUpLoad(CIndex index, CBinarySet* c_binary_set) {
auto status = CStatus();
@ -494,3 +636,26 @@ SerializeIndexAndUpLoad(CIndex index, CBinarySet* c_binary_set) {
}
return status;
}
CStatus
SerializeIndexAndUpLoadV2(CIndex index, CBinarySet* c_binary_set) {
auto status = CStatus();
try {
AssertInfo(
index,
"failed to serialize index to binary set, passed index was null");
auto real_index =
reinterpret_cast<milvus::indexbuilder::IndexCreatorBase*>(index);
auto binary =
std::make_unique<knowhere::BinarySet>(real_index->UploadV2());
*c_binary_set = binary.release();
status.error_code = Success;
status.error_msg = "";
} catch (std::exception& e) {
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
}
return status;
}

View File

@ -80,6 +80,16 @@ AppendFieldMetaInfo(CBuildIndexInfo c_build_index_info,
int64_t field_id,
enum CDataType field_type);
CStatus
AppendFieldMetaInfoV2(CBuildIndexInfo c_build_index_info,
int64_t collection_id,
int64_t partition_id,
int64_t segment_id,
int64_t field_id,
const char* field_name,
enum CDataType field_type,
int64_t dim);
CStatus
AppendIndexMetaInfo(CBuildIndexInfo c_build_index_info,
int64_t index_id,
@ -96,6 +106,18 @@ AppendIndexEngineVersionToBuildInfo(CBuildIndexInfo c_load_index_info,
CStatus
SerializeIndexAndUpLoad(CIndex index, CBinarySet* c_binary_set);
CStatus
SerializeIndexAndUpLoadV2(CIndex index, CBinarySet* c_binary_set);
CStatus
CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info);
CStatus
AppendIndexStorageInfo(CBuildIndexInfo c_build_index_info,
const char* c_data_store_path,
const char* c_index_store_path,
int64_t data_store_version);
#ifdef __cplusplus
};
#endif

View File

@ -33,5 +33,10 @@ struct BuildIndexInfo {
std::vector<std::string> insert_files;
milvus::storage::StorageConfig storage_config;
milvus::Config config;
std::string field_name;
std::string data_store_path;
int64_t data_store_version;
std::string index_store_path;
int64_t dim;
int32_t index_engine_version;
};
};

View File

@ -42,6 +42,6 @@ set(SEGCORE_FILES
SkipIndex.cpp)
add_library(milvus_segcore SHARED ${SEGCORE_FILES})
target_link_libraries(milvus_segcore milvus_query ${OpenMP_CXX_FLAGS})
target_link_libraries(milvus_segcore milvus_query ${OpenMP_CXX_FLAGS} milvus-storage)
install(TARGETS milvus_segcore DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -11,6 +11,8 @@
#include <google/protobuf/text_format.h>
#include <memory>
#include "pb/schema.pb.h"
#include "segcore/Collection.h"
#include "log/Log.h"

View File

@ -33,6 +33,8 @@
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/Util.h"
#include "storage/ThreadPools.h"
#include "storage/options.h"
#include "storage/space.h"
namespace milvus::segcore {
@ -230,6 +232,86 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
reserved_offset + num_rows);
}
void
SegmentGrowingImpl::LoadFieldDataV2(const LoadFieldDataInfo& infos) {
// schema don't include system field
AssertInfo(infos.field_infos.size() == schema_->size() + 2,
"lost some field data when load for growing segment");
AssertInfo(infos.field_infos.find(TimestampFieldID.get()) !=
infos.field_infos.end(),
"timestamps field data should be included");
AssertInfo(
infos.field_infos.find(RowFieldID.get()) != infos.field_infos.end(),
"rowID field data should be included");
auto primary_field_id =
schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(primary_field_id.get() != INVALID_FIELD_ID, "Primary key is -1");
AssertInfo(infos.field_infos.find(primary_field_id.get()) !=
infos.field_infos.end(),
"primary field data should be included");
size_t num_rows = storage::GetNumRowsForLoadInfo(infos);
auto reserved_offset = PreInsert(num_rows);
for (auto& [id, info] : infos.field_infos) {
auto field_id = FieldId(id);
auto field_data_info = FieldDataInfo(field_id.get(), num_rows);
auto& pool =
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
auto res = milvus_storage::Space::Open(
infos.url, milvus_storage::Options{nullptr, infos.storage_version});
AssertInfo(res.ok(), "init space failed");
std::shared_ptr<milvus_storage::Space> space = std::move(res.value());
auto load_future = pool.Submit(
LoadFieldDatasFromRemote2, space, schema_, field_data_info);
auto field_data = CollectFieldDataChannel(field_data_info.channel);
if (field_id == TimestampFieldID) {
// step 2: sort timestamp
// query node already guarantees that the timestamp is ordered, avoid field data copy in c++
// step 3: fill into Segment.ConcurrentVector
insert_record_.timestamps_.set_data_raw(reserved_offset,
field_data);
continue;
}
if (field_id == RowFieldID) {
insert_record_.row_ids_.set_data_raw(reserved_offset, field_data);
continue;
}
if (!indexing_record_.SyncDataWithIndex(field_id)) {
insert_record_.get_field_data_base(field_id)->set_data_raw(
reserved_offset, field_data);
}
if (segcore_config_.get_enable_interim_segment_index()) {
auto offset = reserved_offset;
for (auto& data : field_data) {
auto row_count = data->get_num_rows();
indexing_record_.AppendingIndex(
offset, row_count, field_id, data, insert_record_);
offset += row_count;
}
}
try_remove_chunks(field_id);
if (field_id == primary_field_id) {
insert_record_.insert_pks(field_data);
}
// update average row data size
auto field_meta = (*schema_)[field_id];
if (datatype_is_variable(field_meta.get_data_type())) {
SegmentInternalInterface::set_field_avg_size(
field_id,
num_rows,
storage::GetByteSizeOfFieldDatas(field_data));
}
}
// step 5: update small indexes
insert_record_.ack_responder_.AddSegment(reserved_offset,
reserved_offset + num_rows);
}
SegcoreError
SegmentGrowingImpl::Delete(int64_t reserved_begin,
int64_t size,

View File

@ -67,6 +67,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
LoadFieldData(const LoadFieldDataInfo& info) override;
void
LoadFieldDataV2(const LoadFieldDataInfo& info) override;
std::string
debug() const override;

View File

@ -100,6 +100,9 @@ class SegmentInterface {
virtual void
LoadFieldData(const LoadFieldDataInfo& info) = 0;
virtual void
LoadFieldDataV2(const LoadFieldDataInfo& info) = 0;
virtual int64_t
get_segment_id() const = 0;

View File

@ -20,11 +20,13 @@
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>
#include "Utils.h"
#include "Types.h"
#include "common/Json.h"
#include "common/LoadInfo.h"
#include "common/EasyAssert.h"
#include "common/Array.h"
#include "google/protobuf/message_lite.h"
@ -34,6 +36,7 @@
#include "common/Types.h"
#include "log/Log.h"
#include "pb/schema.pb.h"
#include "mmap/Types.h"
#include "query/ScalarIndex.h"
#include "query/SearchBruteForce.h"
#include "query/SearchOnSealed.h"
@ -220,6 +223,53 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
}
}
void
SegmentSealedImpl::LoadFieldDataV2(const LoadFieldDataInfo& load_info) {
// NOTE: lock only when data is ready to avoid starvation
// only one field for now, parallel load field data in golang
size_t num_rows = storage::GetNumRowsForLoadInfo(load_info);
for (auto& [id, info] : load_info.field_infos) {
AssertInfo(info.row_count > 0, "The row count of field data is 0");
auto field_id = FieldId(id);
auto insert_files = info.insert_files;
auto field_data_info =
FieldDataInfo(field_id.get(), num_rows, load_info.mmap_dir_path);
auto parallel_degree = static_cast<uint64_t>(
DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
field_data_info.channel->set_capacity(parallel_degree * 2);
auto& pool =
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
// auto load_future = pool.Submit(
// LoadFieldDatasFromRemote, insert_files, field_data_info.channel);
auto res = milvus_storage::Space::Open(
load_info.url,
milvus_storage::Options{nullptr, load_info.storage_version});
AssertInfo(res.ok(),
fmt::format("init space failed: {}, error: {}",
load_info.url,
res.status().ToString()));
std::shared_ptr<milvus_storage::Space> space = std::move(res.value());
auto load_future = pool.Submit(
LoadFieldDatasFromRemote2, space, schema_, field_data_info);
LOG_SEGCORE_INFO_ << "finish submitting LoadFieldDatasFromRemote task "
"to thread pool, "
<< "segmentID:" << this->id_
<< ", fieldID:" << info.field_id;
if (load_info.mmap_dir_path.empty() ||
SystemProperty::Instance().IsSystem(field_id)) {
LoadFieldData(field_id, field_data_info);
} else {
MapFieldData(field_id, field_data_info);
}
LOG_SEGCORE_INFO_ << "finish loading segment field, "
<< "segmentID:" << this->id_
<< ", fieldID:" << info.field_id;
}
}
void
SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
auto num_rows = data.row_count;
@ -1341,7 +1391,9 @@ SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk,
"num chunk not equal to 1 for sealed segment");
const auto& timestamps_data = insert_record_.timestamps_.get_chunk(0);
AssertInfo(timestamps_data.size() == get_row_count(),
"Timestamp size not equal to row count");
fmt::format("Timestamp size not equal to row count: {}, {}",
timestamps_data.size(),
get_row_count()));
auto range = insert_record_.timestamp_index_.get_active_range(timestamp);
// range == (size_, size_) and size_ is this->timestamps_.size().

View File

@ -50,6 +50,8 @@ class SegmentSealedImpl : public SegmentSealed {
void
LoadFieldData(const LoadFieldDataInfo& info) override;
void
LoadFieldDataV2(const LoadFieldDataInfo& info) override;
void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) override;
void
LoadSegmentMeta(

View File

@ -43,6 +43,8 @@ struct LoadIndexInfo {
std::map<std::string, std::string> index_params;
std::vector<std::string> index_files;
index::IndexBasePtr index;
std::string uri;
int64_t index_store_version;
IndexVersion index_engine_version;
};

View File

@ -699,6 +699,40 @@ ReverseDataFromIndex(const index::IndexBase* index,
return data_array;
}
void
LoadFieldDatasFromRemote2(std::shared_ptr<milvus_storage::Space> space,
SchemaPtr schema,
FieldDataInfo& field_data_info) {
// log all schema ids
for (auto& field : schema->get_fields()) {
}
auto res = space->ScanData();
if (!res.ok()) {
PanicInfo(S3Error, "failed to create scan iterator");
}
auto reader = res.value();
for (auto rec = reader->Next(); rec != nullptr; rec = reader->Next()) {
if (!rec.ok()) {
PanicInfo(DataFormatBroken, "failed to read data");
}
auto data = rec.ValueUnsafe();
auto total_num_rows = data->num_rows();
for (auto& field : schema->get_fields()) {
if (field.second.get_id().get() != field_data_info.field_id) {
continue;
}
auto col_data =
data->GetColumnByName(field.second.get_name().get());
auto field_data = storage::CreateFieldData(
field.second.get_data_type(),
field.second.is_vector() ? field.second.get_dim() : 0,
total_num_rows);
field_data->FillFieldData(col_data);
field_data_info.channel->push(field_data);
}
}
field_data_info.channel->close();
}
// init segcore storage config first, and create default remote chunk manager
// segcore use default remote chunk manager to load data from minio/s3
void

View File

@ -21,11 +21,13 @@
#include <vector>
#include "common/QueryResult.h"
// #include "common/Schema.h"
#include "common/Types.h"
#include "segcore/DeletedRecord.h"
#include "segcore/InsertRecord.h"
#include "index/Index.h"
#include "storage/FieldData.h"
#include "storage/space.h"
namespace milvus::segcore {
@ -159,6 +161,10 @@ void
LoadFieldDatasFromRemote(std::vector<std::string>& remote_files,
storage::FieldDataChannelPtr channel);
void
LoadFieldDatasFromRemote2(std::shared_ptr<milvus_storage::Space> space,
SchemaPtr schema,
FieldDataInfo& field_data_info);
/**
* Returns an index pointing to the first element in the range [first, last) such that `value < element` is true
* (i.e. that is strictly greater than value), or last if no such element is found.

View File

@ -89,10 +89,23 @@ AppendMMapDirPath(CLoadFieldDataInfo c_load_field_data_info,
load_field_data_info->mmap_dir_path = std::string(c_dir_path);
}
void
SetUri(CLoadFieldDataInfo c_load_field_data_info, const char* uri) {
auto load_field_data_info = (LoadFieldDataInfo*)c_load_field_data_info;
load_field_data_info->url = std::string(uri);
}
void
SetStorageVersion(CLoadFieldDataInfo c_load_field_data_info,
int64_t storage_version) {
auto load_field_data_info = (LoadFieldDataInfo*)c_load_field_data_info;
load_field_data_info->storage_version = storage_version;
}
void
EnableMmap(CLoadFieldDataInfo c_load_field_data_info,
int64_t field_id,
bool enabled) {
auto info = static_cast<LoadFieldDataInfo*>(c_load_field_data_info);
info->field_infos[field_id].enable_mmap = enabled;
}
}

View File

@ -46,6 +46,13 @@ void
AppendMMapDirPath(CLoadFieldDataInfo c_load_field_data_info,
const char* dir_path);
void
SetUri(CLoadFieldDataInfo c_load_field_data_info, const char* uri);
void
SetStorageVersion(CLoadFieldDataInfo c_load_field_data_info,
int64_t storage_version);
void
EnableMmap(CLoadFieldDataInfo c_load_field_data_info,
int64_t field_id,

View File

@ -281,6 +281,77 @@ AppendIndexV2(CLoadIndexInfo c_load_index_info) {
}
}
CStatus
AppendIndexV3(CLoadIndexInfo c_load_index_info) {
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
auto& index_params = load_index_info->index_params;
auto field_type = load_index_info->field_type;
milvus::index::CreateIndexInfo index_info;
index_info.field_type = load_index_info->field_type;
// get index type
AssertInfo(index_params.find("index_type") != index_params.end(),
"index type is empty");
index_info.index_type = index_params.at("index_type");
// get metric type
if (milvus::datatype_is_vector(field_type)) {
AssertInfo(index_params.find("metric_type") != index_params.end(),
"metric type is empty for vector index");
index_info.metric_type = index_params.at("metric_type");
}
milvus::storage::FieldDataMeta field_meta{
load_index_info->collection_id,
load_index_info->partition_id,
load_index_info->segment_id,
load_index_info->field_id};
milvus::storage::IndexMeta index_meta{load_index_info->segment_id,
load_index_info->field_id,
load_index_info->index_build_id,
load_index_info->index_version};
auto config = milvus::index::ParseConfigFromIndexParams(
load_index_info->index_params);
auto res = milvus_storage::Space::Open(
load_index_info->uri,
milvus_storage::Options{nullptr,
load_index_info->index_store_version});
AssertInfo(res.ok(), "init space failed");
std::shared_ptr<milvus_storage::Space> space = std::move(res.value());
milvus::storage::FileManagerContext fileManagerContext(
field_meta, index_meta, nullptr, space);
load_index_info->index =
milvus::index::IndexFactory::GetInstance().CreateIndex(
index_info, fileManagerContext, space);
if (!load_index_info->mmap_dir_path.empty() &&
load_index_info->index->IsMmapSupported()) {
auto filepath =
std::filesystem::path(load_index_info->mmap_dir_path) /
std::to_string(load_index_info->segment_id) /
std::to_string(load_index_info->field_id) /
std::to_string(load_index_info->index_id);
config[kMmapFilepath] = filepath.string();
}
load_index_info->index->LoadV2(config);
auto status = CStatus();
status.error_code = milvus::Success;
status.error_msg = "";
return status;
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = milvus::UnexpectedError;
status.error_msg = strdup(e.what());
return status;
}
}
CStatus
AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* c_file_path) {
try {
@ -369,3 +440,12 @@ CleanLoadedIndex(CLoadIndexInfo c_load_index_info) {
return status;
}
}
void
AppendStorageInfo(CLoadIndexInfo c_load_index_info,
const char* uri,
int64_t version) {
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
load_index_info->uri = uri;
load_index_info->index_store_version = version;
}

View File

@ -59,6 +59,9 @@ AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* file_path);
CStatus
AppendIndexV2(CLoadIndexInfo c_load_index_info);
CStatus
AppendIndexV3(CLoadIndexInfo c_load_index_info);
CStatus
AppendIndexEngineVersionToLoadInfo(CLoadIndexInfo c_load_index_info,
int32_t index_engine_version);
@ -66,6 +69,10 @@ AppendIndexEngineVersionToLoadInfo(CLoadIndexInfo c_load_index_info,
CStatus
CleanLoadedIndex(CLoadIndexInfo c_load_index_info);
void
AppendStorageInfo(CLoadIndexInfo c_load_index_info,
const char* uri,
int64_t version);
#ifdef __cplusplus
}
#endif

View File

@ -25,6 +25,7 @@
#include "storage/FieldData.h"
#include "storage/Util.h"
#include "mmap/Types.h"
#include "storage/space.h"
////////////////////////////// common interfaces //////////////////////////////
CSegmentInterface
@ -242,6 +243,20 @@ LoadFieldData(CSegmentInterface c_segment,
}
}
CStatus
LoadFieldDataV2(CSegmentInterface c_segment,
CLoadFieldDataInfo c_load_field_data_info) {
try {
auto segment =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
AssertInfo(segment != nullptr, "segment conversion failed");
auto load_info = (LoadFieldDataInfo*)c_load_field_data_info;
segment->LoadFieldDataV2(*load_info);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
}
}
// just for test
CStatus
LoadFieldRawData(CSegmentInterface c_segment,

View File

@ -89,6 +89,10 @@ CStatus
LoadFieldData(CSegmentInterface c_segment,
CLoadFieldDataInfo load_field_data_info);
CStatus
LoadFieldDataV2(CSegmentInterface c_segment,
CLoadFieldDataInfo load_field_data_info);
CStatus
LoadFieldRawData(CSegmentInterface c_segment,
int64_t field_id,

View File

@ -65,12 +65,14 @@ if (DEFINED AZURE_BUILD_DIR)
target_link_libraries(milvus_storage PUBLIC
"-L${AZURE_BUILD_DIR} -lblob-chunk-manager"
milvus_common
milvus-storage
pthread
${CONAN_LIBS}
)
else ()
target_link_libraries(milvus_storage PUBLIC
milvus_common
milvus-storage
pthread
${CONAN_LIBS}
)

View File

@ -16,6 +16,7 @@
#include <algorithm>
#include <boost/filesystem.hpp>
#include <memory>
#include <mutex>
#include <utility>
@ -24,6 +25,7 @@
#include "log/Log.h"
#include "storage/DiskFileManagerImpl.h"
#include "storage/FileManager.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/IndexData.h"
#include "storage/Util.h"
@ -31,6 +33,15 @@
namespace milvus::storage {
DiskFileManagerImpl::DiskFileManagerImpl(
const FileManagerContext& fileManagerContext,
std::shared_ptr<milvus_storage::Space> space)
: FileManagerImpl(fileManagerContext.fieldDataMeta,
fileManagerContext.indexMeta),
space_(space) {
rcm_ = fileManagerContext.chunkManagerPtr;
}
DiskFileManagerImpl::DiskFileManagerImpl(
const FileManagerContext& fileManagerContext)
: FileManagerImpl(fileManagerContext.fieldDataMeta,
@ -53,10 +64,44 @@ DiskFileManagerImpl::LoadFile(const std::string& file) noexcept {
std::string
DiskFileManagerImpl::GetRemoteIndexPath(const std::string& file_name,
int64_t slice_num) const {
auto remote_prefix = GetRemoteIndexObjectPrefix();
std::string remote_prefix;
if (space_ != nullptr) {
remote_prefix = GetRemoteIndexObjectPrefixV2();
} else {
remote_prefix = GetRemoteIndexObjectPrefix();
}
return remote_prefix + "/" + file_name + "_" + std::to_string(slice_num);
}
bool
DiskFileManagerImpl::AddFileUsingSpace(
const std::string& local_file_name,
const std::vector<int64_t>& local_file_offsets,
const std::vector<std::string>& remote_files,
const std::vector<int64_t>& remote_file_sizes) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto LoadIndexFromDisk = [&](
const std::string& file,
const int64_t offset,
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
local_chunk_manager->Read(file, offset, buf.get(), data_size);
return buf;
};
for (int64_t i = 0; i < remote_files.size(); ++i) {
auto data = LoadIndexFromDisk(
local_file_name, local_file_offsets[i], remote_file_sizes[i]);
auto status = space_->WriteBolb(
remote_files[i], data.get(), remote_file_sizes[i]);
if (!status.ok()) {
return false;
}
}
return true;
}
bool
DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
auto local_chunk_manager =
@ -85,6 +130,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
AddBatchIndexFiles(file,
local_file_offsets,
batch_remote_files,
remote_file_sizes);
batch_remote_files.clear();
remote_file_sizes.clear();
@ -149,17 +195,86 @@ DiskFileManagerImpl::AddBatchIndexFiles(
data_slices.emplace_back(res.get());
}
auto res = PutIndexData(rcm_.get(),
data_slices,
remote_file_sizes,
remote_files,
field_meta_,
index_meta_);
std::map<std::string, int64_t> res;
if (space_ != nullptr) {
res = PutIndexData(space_,
data_slices,
remote_file_sizes,
remote_files,
field_meta_,
index_meta_);
} else {
res = PutIndexData(rcm_.get(),
data_slices,
remote_file_sizes,
remote_files,
field_meta_,
index_meta_);
}
for (auto iter = res.begin(); iter != res.end(); ++iter) {
remote_paths_to_size_[iter->first] = iter->second;
}
}
void
DiskFileManagerImpl::CacheIndexToDisk() {
auto blobs = space_->StatisticsBlobs();
std::vector<std::string> remote_files;
for (auto& blob : blobs) {
remote_files.push_back(blob.name);
}
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
auto pos = file_path.find_last_of("_");
index_slices[file_path.substr(0, pos)].emplace_back(
std::stoi(file_path.substr(pos + 1)));
}
for (auto& slices : index_slices) {
std::sort(slices.second.begin(), slices.second.end());
}
auto EstimateParallelDegree = [&](const std::string& file) -> uint64_t {
auto fileSize = space_->GetBlobByteSize(file);
return uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / fileSize.value());
};
for (auto& slices : index_slices) {
auto prefix = slices.first;
auto local_index_file_name =
GetLocalIndexObjectPrefix() +
prefix.substr(prefix.find_last_of('/') + 1);
local_chunk_manager->CreateFile(local_index_file_name);
int64_t offset = 0;
std::vector<std::string> batch_remote_files;
uint64_t max_parallel_degree = INT_MAX;
for (int& iter : slices.second) {
if (batch_remote_files.size() == max_parallel_degree) {
auto next_offset = CacheBatchIndexFilesToDiskV2(
batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
}
auto origin_file = prefix + "_" + std::to_string(iter);
if (batch_remote_files.size() == 0) {
// Use first file size as average size to estimate
max_parallel_degree = EstimateParallelDegree(origin_file);
}
batch_remote_files.push_back(origin_file);
}
if (batch_remote_files.size() > 0) {
auto next_offset = CacheBatchIndexFilesToDiskV2(
batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
}
local_paths_.emplace_back(local_index_file_name);
}
}
void
DiskFileManagerImpl::CacheIndexToDisk(
const std::vector<std::string>& remote_files) {
@ -240,6 +355,91 @@ DiskFileManagerImpl::CacheBatchIndexFilesToDisk(
return offset;
}
uint64_t
DiskFileManagerImpl::CacheBatchIndexFilesToDiskV2(
const std::vector<std::string>& remote_files,
const std::string& local_file_name,
uint64_t local_file_init_offfset) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto index_datas = GetObjectData(space_, remote_files);
int batch_size = remote_files.size();
AssertInfo(index_datas.size() == batch_size,
"inconsistent file num and index data num!");
uint64_t offset = local_file_init_offfset;
for (int i = 0; i < batch_size; ++i) {
auto index_data = index_datas[i];
auto index_size = index_data->Size();
auto uint8_data =
reinterpret_cast<uint8_t*>(const_cast<void*>(index_data->Data()));
local_chunk_manager->Write(
local_file_name, offset, uint8_data, index_size);
offset += index_size;
}
return offset;
}
std::string
DiskFileManagerImpl::CacheRawDataToDisk(
std::shared_ptr<milvus_storage::Space> space) {
auto segment_id = GetFieldDataMeta().segment_id;
auto field_id = GetFieldDataMeta().field_id;
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto local_data_path = storage::GenFieldRawDataPathPrefix(
local_chunk_manager, segment_id, field_id) +
"raw_data";
local_chunk_manager->CreateFile(local_data_path);
// file format
// num_rows(uint32) | dim(uint32) | index_data ([]uint8_t)
uint32_t num_rows = 0;
uint32_t dim = 0;
int64_t write_offset = sizeof(num_rows) + sizeof(dim);
auto res = space->ScanData();
if (!res.ok()) {
PanicInfo(IndexBuildError,
fmt::format("failed to create scan iterator: {}",
res.status().ToString()));
}
auto reader = res.value();
for (auto rec : *reader) {
if (!rec.ok()) {
PanicInfo(IndexBuildError,
fmt::format("failed to read data: {}",
rec.status().ToString()));
}
auto data = rec.ValueUnsafe();
if (data == nullptr) {
break;
}
auto total_num_rows = data->num_rows();
num_rows += total_num_rows;
auto col_data = data->GetColumnByName(index_meta_.field_name);
auto field_data = storage::CreateFieldData(
index_meta_.field_type, index_meta_.dim, total_num_rows);
field_data->FillFieldData(col_data);
dim = field_data->get_dim();
auto data_size =
field_data->get_num_rows() * index_meta_.dim * sizeof(float);
local_chunk_manager->Write(local_data_path,
write_offset,
const_cast<void*>(field_data->Data()),
data_size);
write_offset += data_size;
}
// write num_rows and dim value to file header
write_offset = 0;
local_chunk_manager->Write(
local_data_path, write_offset, &num_rows, sizeof(num_rows));
write_offset += sizeof(num_rows);
local_chunk_manager->Write(
local_data_path, write_offset, &dim, sizeof(dim));
return local_data_path;
}
std::string
DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
std::sort(remote_files.begin(),

View File

@ -25,6 +25,7 @@
#include "storage/IndexData.h"
#include "storage/FileManager.h"
#include "storage/ChunkManager.h"
#include "storage/space.h"
#include "common/Consts.h"
@ -34,6 +35,9 @@ class DiskFileManagerImpl : public FileManagerImpl {
public:
explicit DiskFileManagerImpl(const FileManagerContext& fileManagerContext);
explicit DiskFileManagerImpl(const FileManagerContext& fileManagerContext,
std::shared_ptr<milvus_storage::Space> space);
virtual ~DiskFileManagerImpl();
virtual bool
@ -73,11 +77,19 @@ class DiskFileManagerImpl : public FileManagerImpl {
void
CacheIndexToDisk(const std::vector<std::string>& remote_files);
void
CacheIndexToDisk();
uint64_t
CacheBatchIndexFilesToDisk(const std::vector<std::string>& remote_files,
const std::string& local_file_name,
uint64_t local_file_init_offfset);
uint64_t
CacheBatchIndexFilesToDiskV2(const std::vector<std::string>& remote_files,
const std::string& local_file_name,
uint64_t local_file_init_offfset);
void
AddBatchIndexFiles(const std::string& local_file_name,
const std::vector<int64_t>& local_file_offsets,
@ -87,6 +99,15 @@ class DiskFileManagerImpl : public FileManagerImpl {
std::string
CacheRawDataToDisk(std::vector<std::string> remote_files);
std::string
CacheRawDataToDisk(std::shared_ptr<milvus_storage::Space> space);
virtual bool
AddFileUsingSpace(const std::string& local_file_name,
const std::vector<int64_t>& local_file_offsets,
const std::vector<std::string>& remote_files,
const std::vector<int64_t>& remote_file_sizes);
private:
int64_t
GetIndexBuildId() {
@ -105,6 +126,8 @@ class DiskFileManagerImpl : public FileManagerImpl {
// remote file path
std::map<std::string, int64_t> remote_paths_to_size_;
std::shared_ptr<milvus_storage::Space> space_;
};
using DiskANNFileManagerImplPtr = std::shared_ptr<DiskFileManagerImpl>;

View File

@ -45,7 +45,10 @@ FieldDataImpl<Type, is_scalar>::FillFieldData(const void* source,
template <typename ArrayType, arrow::Type::type ArrayDataType>
std::pair<const void*, int64_t>
GetDataInfoFromArray(const std::shared_ptr<arrow::Array> array) {
AssertInfo(array->type()->id() == ArrayDataType, "inconsistent data type");
AssertInfo(array->type()->id() == ArrayDataType,
fmt::format("inconsistent data type, expected {}, actual {}",
ArrayDataType,
array->type()->id()));
auto typed_array = std::dynamic_pointer_cast<ArrayType>(array);
auto element_count = array->length();

View File

@ -25,6 +25,7 @@
#include "storage/ChunkManager.h"
#include "storage/Types.h"
#include "log/Log.h"
#include "storage/space.h"
namespace milvus::storage {
@ -38,6 +39,16 @@ struct FileManagerContext {
indexMeta(indexMeta),
chunkManagerPtr(chunkManagerPtr) {
}
FileManagerContext(const FieldDataMeta& fieldDataMeta,
const IndexMeta& indexMeta,
const ChunkManagerPtr& chunkManagerPtr,
std::shared_ptr<milvus_storage::Space> space)
: fieldDataMeta(fieldDataMeta),
indexMeta(indexMeta),
chunkManagerPtr(chunkManagerPtr),
space_(space) {
}
bool
Valid() const {
return chunkManagerPtr != nullptr;
@ -46,6 +57,7 @@ struct FileManagerContext {
FieldDataMeta fieldDataMeta;
IndexMeta indexMeta;
ChunkManagerPtr chunkManagerPtr;
std::shared_ptr<milvus_storage::Space> space_;
};
#define FILEMANAGER_TRY try {
@ -128,6 +140,15 @@ class FileManagerImpl : public knowhere::FileManager {
std::to_string(field_meta_.segment_id);
}
virtual std::string
GetRemoteIndexObjectPrefixV2() const {
return std::string(INDEX_ROOT_PATH) + "/" +
std::to_string(index_meta_.build_id) + "/" +
std::to_string(index_meta_.index_version) + "/" +
std::to_string(field_meta_.partition_id) + "/" +
std::to_string(field_meta_.segment_id);
}
protected:
// collection meta
FieldDataMeta field_meta_;

View File

@ -18,12 +18,23 @@
#include <memory>
#include <unordered_map>
#include "log/Log.h"
#include "storage/FieldData.h"
#include "storage/FileManager.h"
#include "storage/Util.h"
#include "common/Common.h"
namespace milvus::storage {
MemFileManagerImpl::MemFileManagerImpl(
const FileManagerContext& fileManagerContext,
std::shared_ptr<milvus_storage::Space> space)
: FileManagerImpl(fileManagerContext.fieldDataMeta,
fileManagerContext.indexMeta),
space_(space) {
rcm_ = fileManagerContext.chunkManagerPtr;
}
MemFileManagerImpl::MemFileManagerImpl(
const FileManagerContext& fileManagerContext)
: FileManagerImpl(fileManagerContext.fieldDataMeta,
@ -80,6 +91,50 @@ MemFileManagerImpl::AddFile(const BinarySet& binary_set) {
return true;
}
bool
MemFileManagerImpl::AddFileV2(const BinarySet& binary_set) {
std::vector<const uint8_t*> data_slices;
std::vector<int64_t> slice_sizes;
std::vector<std::string> slice_names;
auto AddBatchIndexFiles = [&]() {
auto res = PutIndexData(space_,
data_slices,
slice_sizes,
slice_names,
field_meta_,
index_meta_);
for (auto& [file, size] : res) {
remote_paths_to_size_[file] = size;
}
};
auto remotePrefix = GetRemoteIndexObjectPrefixV2();
int64_t batch_size = 0;
for (auto iter = binary_set.binary_map_.begin();
iter != binary_set.binary_map_.end();
iter++) {
if (batch_size >= DEFAULT_FIELD_MAX_MEMORY_LIMIT) {
AddBatchIndexFiles();
data_slices.clear();
slice_sizes.clear();
slice_names.clear();
batch_size = 0;
}
data_slices.emplace_back(iter->second->data.get());
slice_sizes.emplace_back(iter->second->size);
slice_names.emplace_back(remotePrefix + "/" + iter->first);
batch_size += iter->second->size;
}
if (data_slices.size() > 0) {
AddBatchIndexFiles();
}
return true;
}
bool
MemFileManagerImpl::LoadFile(const std::string& filename) noexcept {
return true;
@ -169,4 +224,4 @@ MemFileManagerImpl::RemoveFile(const std::string& filename) noexcept {
return false;
}
} // namespace milvus::storage
} // namespace milvus::storage

View File

@ -25,6 +25,7 @@
#include "storage/IndexData.h"
#include "storage/FileManager.h"
#include "storage/ChunkManager.h"
#include "storage/space.h"
namespace milvus::storage {
@ -32,6 +33,9 @@ class MemFileManagerImpl : public FileManagerImpl {
public:
explicit MemFileManagerImpl(const FileManagerContext& fileManagerContext);
MemFileManagerImpl(const FileManagerContext& fileManagerContext,
std::shared_ptr<milvus_storage::Space> space);
virtual bool
LoadFile(const std::string& filename) noexcept;
@ -59,6 +63,14 @@ class MemFileManagerImpl : public FileManagerImpl {
bool
AddFile(const BinarySet& binary_set);
bool
AddFileV2(const BinarySet& binary_set);
std::shared_ptr<milvus_storage::Space>
space() const {
return space_;
}
std::map<std::string, int64_t>
GetRemotePathsToFileSize() const {
return remote_paths_to_size_;
@ -67,8 +79,9 @@ class MemFileManagerImpl : public FileManagerImpl {
private:
// remote file path
std::map<std::string, int64_t> remote_paths_to_size_;
std::shared_ptr<milvus_storage::Space> space_;
};
using MemFileManagerImplPtr = std::shared_ptr<MemFileManagerImpl>;
} // namespace milvus::storage
} // namespace milvus::storage

View File

@ -79,6 +79,9 @@ struct IndexMeta {
int64_t build_id;
int64_t index_version;
std::string key;
std::string field_name;
DataType field_type;
int64_t dim;
};
struct StorageConfig {

View File

@ -22,6 +22,7 @@
#include "common/EasyAssert.h"
#include "common/Consts.h"
#include "fmt/format.h"
#include "log/Log.h"
#include "storage/ChunkManager.h"
#ifdef AZURE_BUILD_DIR
#include "storage/AzureChunkManager.h"
@ -35,6 +36,7 @@
#include "storage/OpenDALChunkManager.h"
#include "storage/MemFileManagerImpl.h"
#include "storage/DiskFileManagerImpl.h"
#include "storage/Types.h"
namespace milvus::storage {
@ -413,6 +415,22 @@ DownloadAndDecodeRemoteFile(ChunkManager* chunk_manager,
return DeserializeFileData(buf, fileSize);
}
std::unique_ptr<DataCodec>
DownloadAndDecodeRemoteFileV2(std::shared_ptr<milvus_storage::Space> space,
const std::string& file) {
auto fileSize = space->GetBlobByteSize(file);
if (!fileSize.ok()) {
PanicInfo(FileReadFailed, fileSize.status().ToString());
}
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize.value()]);
auto status = space->ReadBlob(file, buf.get());
if (!status.ok()) {
PanicInfo(FileReadFailed, status.ToString());
}
return DeserializeFileData(buf, fileSize.value());
}
std::pair<std::string, size_t>
EncodeAndUploadIndexSlice(ChunkManager* chunk_manager,
uint8_t* buf,
@ -432,6 +450,27 @@ EncodeAndUploadIndexSlice(ChunkManager* chunk_manager,
return std::make_pair(std::move(object_key), serialized_index_size);
}
std::pair<std::string, size_t>
EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,
uint8_t* buf,
int64_t batch_size,
IndexMeta index_meta,
FieldDataMeta field_meta,
std::string object_key) {
auto field_data = CreateFieldData(DataType::INT8);
field_data->FillFieldData(buf, batch_size);
auto indexData = std::make_shared<IndexData>(field_data);
indexData->set_index_meta(index_meta);
indexData->SetFieldDataMeta(field_meta);
auto serialized_index_data = indexData->serialize_to_remote_file();
auto serialized_index_size = serialized_index_data.size();
auto status = space->WriteBolb(
object_key, serialized_index_data.data(), serialized_index_size);
AssertInfo(status.ok(),
fmt::format("write to space error: %s", status.ToString()));
return std::make_pair(std::move(object_key), serialized_index_size);
}
std::pair<std::string, size_t>
EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
uint8_t* buf,
@ -470,6 +509,25 @@ GetObjectData(ChunkManager* remote_chunk_manager,
return datas;
}
std::vector<FieldDataPtr>
GetObjectData(std::shared_ptr<milvus_storage::Space> space,
const std::vector<std::string>& remote_files) {
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
std::vector<std::future<std::unique_ptr<DataCodec>>> futures;
for (auto& file : remote_files) {
futures.emplace_back(
pool.Submit(DownloadAndDecodeRemoteFileV2, space, file));
}
std::vector<FieldDataPtr> datas;
for (int i = 0; i < futures.size(); ++i) {
auto res = futures[i].get();
datas.emplace_back(res->GetFieldData());
}
ReleaseArrowUnused();
return datas;
}
std::map<std::string, int64_t>
PutIndexData(ChunkManager* remote_chunk_manager,
const std::vector<const uint8_t*>& data_slices,
@ -504,6 +562,40 @@ PutIndexData(ChunkManager* remote_chunk_manager,
return remote_paths_to_size;
}
std::map<std::string, int64_t>
PutIndexData(std::shared_ptr<milvus_storage::Space> space,
const std::vector<const uint8_t*>& data_slices,
const std::vector<int64_t>& slice_sizes,
const std::vector<std::string>& slice_names,
FieldDataMeta& field_meta,
IndexMeta& index_meta) {
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
std::vector<std::future<std::pair<std::string, size_t>>> futures;
AssertInfo(data_slices.size() == slice_sizes.size(),
"inconsistent size of data slices with slice sizes!");
AssertInfo(data_slices.size() == slice_names.size(),
"inconsistent size of data slices with slice names!");
for (int64_t i = 0; i < data_slices.size(); ++i) {
futures.push_back(pool.Submit(EncodeAndUploadIndexSlice2,
space,
const_cast<uint8_t*>(data_slices[i]),
slice_sizes[i],
index_meta,
field_meta,
slice_names[i]));
}
std::map<std::string, int64_t> remote_paths_to_size;
for (auto& future : futures) {
auto res = future.get();
remote_paths_to_size[res.first] = res.second;
}
ReleaseArrowUnused();
return remote_paths_to_size;
}
int64_t
GetTotalNumRowsForFieldDatas(const std::vector<FieldDataPtr>& field_datas) {
int64_t count = 0;

View File

@ -29,6 +29,8 @@
#include "knowhere/comp/index_param.h"
#include "parquet/schema.h"
#include "common/LoadInfo.h"
#include "storage/Types.h"
#include "storage/space.h"
namespace milvus::storage {
@ -86,6 +88,10 @@ std::unique_ptr<DataCodec>
DownloadAndDecodeRemoteFile(ChunkManager* chunk_manager,
const std::string& file);
std::unique_ptr<DataCodec>
DownloadAndDecodeRemoteFileV2(std::shared_ptr<milvus_storage::Space> space,
const std::string& file);
std::pair<std::string, size_t>
EncodeAndUploadIndexSlice(ChunkManager* chunk_manager,
uint8_t* buf,
@ -95,6 +101,13 @@ EncodeAndUploadIndexSlice(ChunkManager* chunk_manager,
std::string object_key);
std::pair<std::string, size_t>
EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,
uint8_t* buf,
int64_t batch_size,
IndexMeta index_meta,
FieldDataMeta field_meta,
std::string object_key);
std::pair<std::string, size_t>
EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
uint8_t* buf,
int64_t element_count,
@ -106,6 +119,10 @@ std::vector<FieldDataPtr>
GetObjectData(ChunkManager* remote_chunk_manager,
const std::vector<std::string>& remote_files);
std::vector<FieldDataPtr>
GetObjectData(std::shared_ptr<milvus_storage::Space> space,
const std::vector<std::string>& remote_files);
std::map<std::string, int64_t>
PutIndexData(ChunkManager* remote_chunk_manager,
const std::vector<const uint8_t*>& data_slices,
@ -114,6 +131,13 @@ PutIndexData(ChunkManager* remote_chunk_manager,
FieldDataMeta& field_meta,
IndexMeta& index_meta);
std::map<std::string, int64_t>
PutIndexData(std::shared_ptr<milvus_storage::Space> space,
const std::vector<const uint8_t*>& data_slices,
const std::vector<int64_t>& slice_sizes,
const std::vector<std::string>& slice_names,
FieldDataMeta& field_meta,
IndexMeta& index_meta);
int64_t
GetTotalNumRowsForFieldDatas(const std::vector<FieldDataPtr>& field_datas);

View File

@ -12,7 +12,7 @@
#-------------------------------------------------------------------------------
# Using default c and cxx compiler in our build tree
# Thirdpart cxx and c flags
append_flags(CMAKE_CXX_FLAGS FLAGS "-O3 -fPIC -Wno-error -fopenmp")
append_flags(CMAKE_CXX_FLAGS FLAGS "-O3 -fPIC -Wno-error -fopenmp -Wno-macro-redefined")
if (NOT KNOWHERE_VERBOSE_THIRDPARTY_BUILD)
set(EP_LOG_OPTIONS LOG_CONFIGURE 1 LOG_BUILD 1 LOG_INSTALL 1 LOG_DOWNLOAD 1)
@ -38,6 +38,8 @@ add_subdirectory(rdkafka)
add_subdirectory(simdjson)
add_subdirectory(opendal)
add_subdirectory(milvus-storage)
if (LINUX)
add_subdirectory(jemalloc)
endif()

View File

@ -0,0 +1,48 @@
#-------------------------------------------------------------------------------
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
set( MILVUS_STORAGE_VERSION c7107a0)
message(STATUS "Building milvus-storage-${MILVUS_STORAGE_VERSION} from source")
message(STATUS ${CMAKE_BUILD_TYPE})
# message(FATAL_ERROR ${CMAKE_CURRENT_SOURCE_DIR}/milvus-storage.patch)
# set(milvus-storage-patch git apply --ignore-whitespace ${CMAKE_CURRENT_SOURCE_DIR}/milvus-storage.patch)
set( CMAKE_PREFIX_PATH ${CONAN_BOOST_ROOT} )
FetchContent_Declare(
milvus-storage
GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git"
GIT_TAG ${MILVUS_STORAGE_VERSION}
SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/milvus-storage-src
BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/milvus-storage-build
SOURCE_SUBDIR cpp
PATCH_COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/milvus-storage_CMakeLists.txt <SOURCE_DIR>/cpp/CMakeLists.txt
DOWNLOAD_DIR ${THIRDPARTY_DOWNLOAD_PATH} )
FetchContent_MakeAvailable(milvus-storage)
# target_compile_features(milvus-storage PUBLIC cxx_std_20)
# FetchContent_GetProperties( milvus-storage )
# if ( NOT milvus-storage_POPULATED )
# FetchContent_Populate( milvus-storage)
# # Adding the following target:
# add_subdirectory( ${milvus-storage_SOURCE_DIR}/cpp
# ${milvus-storage_BINARY_DIR} )
# endif()
# message(FATAL_ERROR ${milvus-storage_SOURCE_DIR} ${milvus-storage_BINARY_DIR})
# get prometheus COMPILE_OPTIONS
# get_property( var DIRECTORY "${milvus-storage_SOURCE_DIR}" PROPERTY COMPILE_OPTIONS )
message( STATUS "milvus-storage src compile options: ${var}" )
# unset(CMAKE_CXX_STANDARD)

View File

@ -0,0 +1,26 @@
cmake_minimum_required(VERSION 3.20.0)
project(milvus-storage VERSION 0.1.0)
option(WITH_UT "Build the testing tree." ON)
option(WITH_ASAN "Build with address sanitizer." OFF)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
find_package(Boost REQUIRED)
find_package(Arrow REQUIRED)
find_package(Protobuf REQUIRED)
find_package(glog REQUIRED)
find_package(AWSSDK REQUIRED)
file(GLOB_RECURSE SRC_FILES src/*.cpp src/*.cc)
message(STATUS "SRC_FILES: ${SRC_FILES}")
add_library(milvus-storage ${SRC_FILES})
target_include_directories(milvus-storage PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include/milvus-storage ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(milvus-storage PUBLIC arrow::arrow arrow::parquet Boost::boost protobuf::protobuf AWS::aws-sdk-cpp-core glog::glog opendal)
if (WITH_UT)
enable_testing()
add_subdirectory(test)
endif()

View File

@ -113,6 +113,7 @@ if (LINUX)
milvus_segcore
milvus_storage
milvus_indexbuilder
milvus_common
)
install(TARGETS index_builder_test DESTINATION unittest)
endif()
@ -127,6 +128,7 @@ target_link_libraries(all_tests
milvus_storage
milvus_indexbuilder
pthread
milvus_common
)
install(TARGETS all_tests DESTINATION unittest)

View File

@ -9,15 +9,21 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <arrow/record_batch.h>
#include <arrow/type_fwd.h>
#include <gtest/gtest.h>
#include <boost/filesystem/operations.hpp>
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <vector>
#include "arrow/type.h"
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "index/Index.h"
#include "knowhere/comp/index_param.h"
#include "nlohmann/json.hpp"
#include "query/SearchBruteForce.h"
@ -25,11 +31,13 @@
#include "index/IndexFactory.h"
#include "common/QueryResult.h"
#include "segcore/Types.h"
#include "storage/options.h"
#include "test_utils/indexbuilder_test_utils.h"
#include "test_utils/storage_test_utils.h"
#include "test_utils/DataGen.h"
#include "test_utils/Timer.h"
#include "storage/Util.h"
#include <boost/filesystem.hpp>
using namespace milvus;
using namespace milvus::segcore;
@ -649,3 +657,260 @@ TEST(Indexing, SearchDiskAnnWithInvalidParam) {
std::runtime_error);
}
#endif
class IndexTestV2
: public ::testing::TestWithParam<std::tuple<Param, int64_t, bool>> {
protected:
std::shared_ptr<arrow::Schema>
TestSchema(int vec_size) {
arrow::FieldVector fields;
fields.push_back(arrow::field("pk", arrow::int64()));
fields.push_back(arrow::field("ts", arrow::int64()));
fields.push_back(
arrow::field("vec", arrow::fixed_size_binary(vec_size)));
return std::make_shared<arrow::Schema>(fields);
}
std::shared_ptr<arrow::RecordBatchReader>
TestRecords(int vec_size, GeneratedData& dataset) {
arrow::Int64Builder pk_builder;
arrow::Int64Builder ts_builder;
arrow::FixedSizeBinaryBuilder vec_builder(
arrow::fixed_size_binary(vec_size));
if (!is_binary) {
xb_data = dataset.get_col<float>(milvus::FieldId(100));
auto data = reinterpret_cast<char*>(xb_data.data());
for (auto i = 0; i < NB; ++i) {
EXPECT_TRUE(pk_builder.Append(i).ok());
EXPECT_TRUE(ts_builder.Append(i).ok());
EXPECT_TRUE(vec_builder.Append(data + i * vec_size).ok());
}
} else {
xb_bin_data = dataset.get_col<uint8_t>(milvus::FieldId(100));
for (auto i = 0; i < NB; ++i) {
EXPECT_TRUE(pk_builder.Append(i).ok());
EXPECT_TRUE(ts_builder.Append(i).ok());
EXPECT_TRUE(
vec_builder.Append(xb_bin_data.data() + i * vec_size).ok());
}
}
std::shared_ptr<arrow::Array> pk_array;
EXPECT_TRUE(pk_builder.Finish(&pk_array).ok());
std::shared_ptr<arrow::Array> ts_array;
EXPECT_TRUE(ts_builder.Finish(&ts_array).ok());
std::shared_ptr<arrow::Array> vec_array;
EXPECT_TRUE(vec_builder.Finish(&vec_array).ok());
auto schema = TestSchema(vec_size);
auto rec_batch = arrow::RecordBatch::Make(
schema, NB, {pk_array, ts_array, vec_array});
auto reader =
arrow::RecordBatchReader::Make({rec_batch}, schema).ValueOrDie();
return reader;
}
std::shared_ptr<milvus_storage::Space>
TestSpace(int vec_size, GeneratedData& dataset) {
auto arrow_schema = TestSchema(vec_size);
auto schema_options = std::make_shared<milvus_storage::SchemaOptions>();
schema_options->primary_column = "pk";
schema_options->version_column = "ts";
schema_options->vector_column = "vec";
auto schema = std::make_shared<milvus_storage::Schema>(arrow_schema,
schema_options);
EXPECT_TRUE(schema->Validate().ok());
auto space_res = milvus_storage::Space::Open(
"file://" + boost::filesystem::canonical(temp_path).string(),
milvus_storage::Options{schema});
EXPECT_TRUE(space_res.has_value());
auto space = std::move(space_res.value());
auto rec = TestRecords(vec_size, dataset);
auto write_opt = milvus_storage::WriteOption{NB};
space->Write(rec.get(), &write_opt);
return std::move(space);
}
void
SetUp() override {
temp_path = boost::filesystem::temp_directory_path() /
boost::filesystem::unique_path();
boost::filesystem::create_directory(temp_path);
storage_config_ = get_default_local_storage_config();
auto param = GetParam();
index_type = std::get<0>(param).first;
metric_type = std::get<0>(param).second;
file_slice_size = std::get<1>(param);
enable_mmap = index_type != knowhere::IndexEnum::INDEX_DISKANN &&
std::get<2>(param);
if (enable_mmap) {
mmap_file_path = boost::filesystem::temp_directory_path() /
boost::filesystem::unique_path();
}
NB = 3000;
// try to reduce the test time,
// but the large dataset is needed for the case below.
auto test_name = std::string(
testing::UnitTest::GetInstance()->current_test_info()->name());
if (test_name == "Mmap" &&
index_type == knowhere::IndexEnum::INDEX_HNSW) {
NB = 270000;
}
build_conf = generate_build_conf(index_type, metric_type);
load_conf = generate_load_conf(index_type, metric_type, NB);
search_conf = generate_search_conf(index_type, metric_type);
range_search_conf = generate_range_search_conf(index_type, metric_type);
std::map<knowhere::MetricType, bool> is_binary_map = {
{knowhere::IndexEnum::INDEX_FAISS_IDMAP, false},
{knowhere::IndexEnum::INDEX_FAISS_IVFPQ, false},
{knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, false},
{knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, false},
{knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, true},
{knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, true},
{knowhere::IndexEnum::INDEX_HNSW, false},
{knowhere::IndexEnum::INDEX_DISKANN, false},
};
is_binary = is_binary_map[index_type];
int vec_size;
if (is_binary) {
vec_size = DIM / 8;
vec_field_data_type = milvus::DataType::VECTOR_BINARY;
} else {
vec_size = DIM * 4;
vec_field_data_type = milvus::DataType::VECTOR_FLOAT;
}
auto dataset = GenDataset(NB, metric_type, is_binary);
space = TestSpace(vec_size, dataset);
if (!is_binary) {
xb_data = dataset.get_col<float>(milvus::FieldId(100));
xq_dataset = knowhere::GenDataSet(
NQ, DIM, xb_data.data() + DIM * query_offset);
} else {
xb_bin_data = dataset.get_col<uint8_t>(milvus::FieldId(100));
xq_dataset = knowhere::GenDataSet(
NQ, DIM, xb_bin_data.data() + DIM * query_offset);
}
}
void
TearDown() override {
boost::filesystem::remove_all(temp_path);
if (enable_mmap) {
boost::filesystem::remove_all(mmap_file_path);
}
}
protected:
std::string index_type, metric_type;
bool is_binary;
milvus::Config build_conf;
milvus::Config load_conf;
milvus::Config search_conf;
milvus::Config range_search_conf;
milvus::DataType vec_field_data_type;
knowhere::DataSetPtr xb_dataset;
FixedVector<float> xb_data;
FixedVector<uint8_t> xb_bin_data;
knowhere::DataSetPtr xq_dataset;
int64_t query_offset = 100;
int64_t NB = 3000;
StorageConfig storage_config_;
boost::filesystem::path temp_path;
std::shared_ptr<milvus_storage::Space> space;
int64_t file_slice_size = DEFAULT_INDEX_FILE_SLICE_SIZE;
bool enable_mmap;
boost::filesystem::path mmap_file_path;
};
INSTANTIATE_TEST_CASE_P(
IndexTypeParameters,
IndexTestV2,
testing::Combine(
::testing::Values(
std::pair(knowhere::IndexEnum::INDEX_FAISS_IDMAP,
knowhere::metric::L2),
std::pair(knowhere::IndexEnum::INDEX_FAISS_IVFPQ,
knowhere::metric::L2),
std::pair(knowhere::IndexEnum::INDEX_FAISS_IVFFLAT,
knowhere::metric::L2),
std::pair(knowhere::IndexEnum::INDEX_FAISS_IVFSQ8,
knowhere::metric::L2),
std::pair(knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT,
knowhere::metric::JACCARD),
std::pair(knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP,
knowhere::metric::JACCARD),
#ifdef BUILD_DISK_ANN
std::pair(knowhere::IndexEnum::INDEX_DISKANN, knowhere::metric::L2),
#endif
std::pair(knowhere::IndexEnum::INDEX_HNSW, knowhere::metric::L2)),
testing::Values(DEFAULT_INDEX_FILE_SLICE_SIZE, 5000L),
testing::Bool()));
TEST_P(IndexTestV2, BuildAndQuery) {
FILE_SLICE_SIZE = file_slice_size;
milvus::index::CreateIndexInfo create_index_info;
create_index_info.index_type = index_type;
create_index_info.metric_type = metric_type;
create_index_info.field_type = vec_field_data_type;
create_index_info.field_name = "vec";
create_index_info.dim = DIM;
create_index_info.index_engine_version =
knowhere::Version::GetCurrentVersion().VersionNumber();
index::IndexBasePtr index;
milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100};
milvus::storage::IndexMeta index_meta{.segment_id = 3,
.field_id = 100,
.build_id = 1000,
.index_version = 1,
.field_name = "vec",
.field_type = vec_field_data_type,
.dim = DIM};
auto chunk_manager = milvus::storage::CreateChunkManager(storage_config_);
milvus::storage::FileManagerContext file_manager_context(
field_data_meta, index_meta, chunk_manager, space);
index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, file_manager_context, space);
auto build_conf = generate_build_conf(index_type, metric_type);
index->BuildV2(build_conf);
milvus::index::IndexBasePtr new_index;
milvus::index::VectorIndex* vec_index = nullptr;
auto binary_set = index->UploadV2();
index.reset();
new_index = milvus::index::IndexFactory::GetInstance().CreateIndex(
create_index_info, file_manager_context, space);
vec_index = dynamic_cast<milvus::index::VectorIndex*>(new_index.get());
load_conf = generate_load_conf(index_type, metric_type, 0);
if (enable_mmap) {
load_conf[kMmapFilepath] = mmap_file_path.string();
}
ASSERT_NO_THROW(vec_index->LoadV2(load_conf));
EXPECT_EQ(vec_index->Count(), NB);
EXPECT_EQ(vec_index->GetDim(), DIM);
milvus::SearchInfo search_info;
search_info.topk_ = K;
search_info.metric_type_ = metric_type;
search_info.search_params_ = search_conf;
auto result = vec_index->Query(xq_dataset, search_info, nullptr);
EXPECT_EQ(result->total_nq_, NQ);
EXPECT_EQ(result->unity_topK_, K);
EXPECT_EQ(result->distances_.size(), NQ * K);
EXPECT_EQ(result->seg_offsets_.size(), NQ * K);
if (!is_binary) {
EXPECT_EQ(result->seg_offsets_[0], query_offset);
}
search_info.search_params_ = range_search_conf;
vec_index->Query(xq_dataset, search_info, nullptr);
}

View File

@ -9,18 +9,25 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <gtest/gtest.h>
#include "gtest/gtest-typed-test.h"
#include "index/IndexFactory.h"
#include "common/CDataType.h"
#include "knowhere/comp/index_param.h"
#include "test_utils/indexbuilder_test_utils.h"
#include "test_utils/AssertUtils.h"
#include "test_utils/DataGen.h"
#include <boost/filesystem.hpp>
#include "test_utils/storage_test_utils.h"
constexpr int64_t nb = 100;
namespace indexcgo = milvus::proto::indexcgo;
namespace schemapb = milvus::proto::schema;
using milvus::index::ScalarIndexPtr;
using milvus::segcore::GeneratedData;
template <typename T>
class TypedScalarIndexTest : public ::testing::Test {
protected:
@ -196,3 +203,173 @@ REGISTER_TYPED_TEST_CASE_P(TypedScalarIndexTest,
Reverse);
INSTANTIATE_TYPED_TEST_CASE_P(ArithmeticCheck, TypedScalarIndexTest, ScalarT);
template <typename T>
class TypedScalarIndexTestV2 : public ::testing::Test {
public:
struct Helper {};
protected:
std::unordered_map<std::type_index, const std::shared_ptr<arrow::DataType>>
m_fields = {{typeid(int8_t), arrow::int8()},
{typeid(int16_t), arrow::int16()},
{typeid(int32_t), arrow::int32()},
{typeid(int64_t), arrow::int64()},
{typeid(float), arrow::float32()},
{typeid(double), arrow::float64()}};
std::shared_ptr<arrow::Schema>
TestSchema(int vec_size) {
arrow::FieldVector fields;
fields.push_back(arrow::field("pk", arrow::int64()));
fields.push_back(arrow::field("ts", arrow::int64()));
fields.push_back(arrow::field("scalar", m_fields[typeid(T)]));
fields.push_back(
arrow::field("vec", arrow::fixed_size_binary(vec_size)));
return std::make_shared<arrow::Schema>(fields);
}
std::shared_ptr<arrow::RecordBatchReader>
TestRecords(int vec_size, GeneratedData& dataset, std::vector<T>& scalars) {
arrow::Int64Builder pk_builder;
arrow::Int64Builder ts_builder;
arrow::NumericBuilder<typename Helper::C> scalar_builder;
arrow::FixedSizeBinaryBuilder vec_builder(
arrow::fixed_size_binary(vec_size));
auto xb_data = dataset.get_col<float>(milvus::FieldId(100));
auto data = reinterpret_cast<char*>(xb_data.data());
for (auto i = 0; i < nb; ++i) {
EXPECT_TRUE(pk_builder.Append(i).ok());
EXPECT_TRUE(ts_builder.Append(i).ok());
EXPECT_TRUE(vec_builder.Append(data + i * vec_size).ok());
}
for (auto& v : scalars) {
EXPECT_TRUE(scalar_builder.Append(v).ok());
}
std::shared_ptr<arrow::Array> pk_array;
EXPECT_TRUE(pk_builder.Finish(&pk_array).ok());
std::shared_ptr<arrow::Array> ts_array;
EXPECT_TRUE(ts_builder.Finish(&ts_array).ok());
std::shared_ptr<arrow::Array> scalar_array;
EXPECT_TRUE(scalar_builder.Finish(&scalar_array).ok());
std::shared_ptr<arrow::Array> vec_array;
EXPECT_TRUE(vec_builder.Finish(&vec_array).ok());
auto schema = TestSchema(vec_size);
auto rec_batch = arrow::RecordBatch::Make(
schema, nb, {pk_array, ts_array, scalar_array, vec_array});
auto reader =
arrow::RecordBatchReader::Make({rec_batch}, schema).ValueOrDie();
return reader;
}
std::shared_ptr<milvus_storage::Space>
TestSpace(int vec_size, GeneratedData& dataset, std::vector<T>& scalars) {
auto arrow_schema = TestSchema(vec_size);
auto schema_options = std::make_shared<milvus_storage::SchemaOptions>();
schema_options->primary_column = "pk";
schema_options->version_column = "ts";
schema_options->vector_column = "vec";
auto schema = std::make_shared<milvus_storage::Schema>(arrow_schema,
schema_options);
EXPECT_TRUE(schema->Validate().ok());
auto space_res = milvus_storage::Space::Open(
"file://" + boost::filesystem::canonical(temp_path).string(),
milvus_storage::Options{schema});
EXPECT_TRUE(space_res.has_value());
auto space = std::move(space_res.value());
auto rec = TestRecords(vec_size, dataset, scalars);
auto write_opt = milvus_storage::WriteOption{nb};
space->Write(rec.get(), &write_opt);
return std::move(space);
}
void
SetUp() override {
temp_path = boost::filesystem::temp_directory_path() /
boost::filesystem::unique_path();
boost::filesystem::create_directory(temp_path);
auto vec_size = DIM * 4;
auto dataset = GenDataset(nb, knowhere::metric::L2, false);
auto scalars = GenArr<T>(nb);
space = TestSpace(vec_size, dataset, scalars);
}
void
TearDown() override {
boost::filesystem::remove_all(temp_path);
}
protected:
boost::filesystem::path temp_path;
std::shared_ptr<milvus_storage::Space> space;
};
template <>
struct TypedScalarIndexTestV2<int8_t>::Helper {
using C = arrow::Int8Type;
};
template <>
struct TypedScalarIndexTestV2<int16_t>::Helper {
using C = arrow::Int16Type;
};
template <>
struct TypedScalarIndexTestV2<int32_t>::Helper {
using C = arrow::Int32Type;
};
template <>
struct TypedScalarIndexTestV2<int64_t>::Helper {
using C = arrow::Int64Type;
};
template <>
struct TypedScalarIndexTestV2<float>::Helper {
using C = arrow::FloatType;
};
template <>
struct TypedScalarIndexTestV2<double>::Helper {
using C = arrow::DoubleType;
};
TYPED_TEST_CASE_P(TypedScalarIndexTestV2);
TYPED_TEST_P(TypedScalarIndexTestV2, Base) {
using T = TypeParam;
auto dtype = milvus::GetDType<T>();
auto index_types = GetIndexTypes<T>();
for (const auto& index_type : index_types) {
milvus::index::CreateIndexInfo create_index_info;
create_index_info.field_type = milvus::DataType(dtype);
create_index_info.index_type = index_type;
create_index_info.field_name = "scalar";
auto storage_config = get_default_local_storage_config();
auto chunk_manager =
milvus::storage::CreateChunkManager(storage_config);
milvus::storage::FileManagerContext file_manager_context(
{}, {.field_name = "scalar"}, chunk_manager, this->space);
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info, file_manager_context, this->space);
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
scalar_index->BuildV2();
scalar_index->UploadV2();
auto new_index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info, file_manager_context, this->space);
auto new_scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(new_index.get());
new_scalar_index->LoadV2();
ASSERT_EQ(nb, scalar_index->Count());
}
}
REGISTER_TYPED_TEST_CASE_P(TypedScalarIndexTestV2, Base);
INSTANTIATE_TYPED_TEST_CASE_P(ArithmeticCheck, TypedScalarIndexTestV2, ScalarT);

View File

@ -9,6 +9,8 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <arrow/array/builder_binary.h>
#include <arrow/type.h>
#include <gtest/gtest.h>
#include "index/Index.h"
@ -20,11 +22,14 @@
#include "index/IndexFactory.h"
#include "test_utils/indexbuilder_test_utils.h"
#include "test_utils/AssertUtils.h"
#include <boost/filesystem.hpp>
#include "test_utils/storage_test_utils.h"
constexpr int64_t nb = 100;
namespace schemapb = milvus::proto::schema;
class StringIndexBaseTest : public ::testing::Test {
protected:
void
SetUp() override {
strs = GenStrArr(nb);
@ -338,3 +343,114 @@ TEST_F(StringIndexMarisaTest, BaseIndexCodec) {
}
}
}
using milvus::segcore::GeneratedData;
class StringIndexMarisaTestV2 : public StringIndexBaseTest {
std::shared_ptr<arrow::Schema>
TestSchema(int vec_size) {
arrow::FieldVector fields;
fields.push_back(arrow::field("pk", arrow::int64()));
fields.push_back(arrow::field("ts", arrow::int64()));
fields.push_back(arrow::field("scalar", arrow::utf8()));
fields.push_back(
arrow::field("vec", arrow::fixed_size_binary(vec_size)));
return std::make_shared<arrow::Schema>(fields);
}
std::shared_ptr<arrow::RecordBatchReader>
TestRecords(int vec_size,
GeneratedData& dataset,
std::vector<std::string>& scalars) {
arrow::Int64Builder pk_builder;
arrow::Int64Builder ts_builder;
arrow::StringBuilder scalar_builder;
arrow::FixedSizeBinaryBuilder vec_builder(
arrow::fixed_size_binary(vec_size));
auto xb_data = dataset.get_col<float>(milvus::FieldId(100));
auto data = reinterpret_cast<char*>(xb_data.data());
for (auto i = 0; i < nb; ++i) {
EXPECT_TRUE(pk_builder.Append(i).ok());
EXPECT_TRUE(ts_builder.Append(i).ok());
EXPECT_TRUE(vec_builder.Append(data + i * vec_size).ok());
}
for (auto& v : scalars) {
EXPECT_TRUE(scalar_builder.Append(v).ok());
}
std::shared_ptr<arrow::Array> pk_array;
EXPECT_TRUE(pk_builder.Finish(&pk_array).ok());
std::shared_ptr<arrow::Array> ts_array;
EXPECT_TRUE(ts_builder.Finish(&ts_array).ok());
std::shared_ptr<arrow::Array> scalar_array;
EXPECT_TRUE(scalar_builder.Finish(&scalar_array).ok());
std::shared_ptr<arrow::Array> vec_array;
EXPECT_TRUE(vec_builder.Finish(&vec_array).ok());
auto schema = TestSchema(vec_size);
auto rec_batch = arrow::RecordBatch::Make(
schema, nb, {pk_array, ts_array, scalar_array, vec_array});
auto reader =
arrow::RecordBatchReader::Make({rec_batch}, schema).ValueOrDie();
return reader;
}
std::shared_ptr<milvus_storage::Space>
TestSpace(int vec_size,
GeneratedData& dataset,
std::vector<std::string>& scalars) {
auto arrow_schema = TestSchema(vec_size);
auto schema_options = std::make_shared<milvus_storage::SchemaOptions>();
schema_options->primary_column = "pk";
schema_options->version_column = "ts";
schema_options->vector_column = "vec";
auto schema = std::make_shared<milvus_storage::Schema>(arrow_schema,
schema_options);
EXPECT_TRUE(schema->Validate().ok());
auto space_res = milvus_storage::Space::Open(
"file://" + boost::filesystem::canonical(temp_path).string(),
milvus_storage::Options{schema});
EXPECT_TRUE(space_res.has_value());
auto space = std::move(space_res.value());
auto rec = TestRecords(vec_size, dataset, scalars);
auto write_opt = milvus_storage::WriteOption{nb};
space->Write(rec.get(), &write_opt);
return std::move(space);
}
void
SetUp() override {
StringIndexBaseTest::SetUp();
temp_path = boost::filesystem::temp_directory_path() /
boost::filesystem::unique_path();
boost::filesystem::create_directory(temp_path);
auto vec_size = DIM * 4;
auto vec_field_data_type = milvus::DataType::VECTOR_FLOAT;
auto dataset = GenDataset(nb, knowhere::metric::L2, false);
space = TestSpace(vec_size, dataset, strs);
}
void
TearDown() override {
boost::filesystem::remove_all(temp_path);
}
protected:
boost::filesystem::path temp_path;
std::shared_ptr<milvus_storage::Space> space;
};
TEST_F(StringIndexMarisaTestV2, Base) {
auto storage_config = get_default_local_storage_config();
auto chunk_manager = milvus::storage::CreateChunkManager(storage_config);
milvus::storage::FileManagerContext file_manager_context(
{}, {.field_name = "scalar"}, chunk_manager, space);
auto index =
milvus::index::CreateStringIndexMarisa(file_manager_context, space);
index->BuildV2();
index->UploadV2();
auto new_index =
milvus::index::CreateStringIndexMarisa(file_manager_context, space);
new_index->LoadV2();
ASSERT_EQ(strs.size(), index->Count());
}

View File

@ -263,16 +263,9 @@ func (s *SyncTaskSuiteV2) TestBuildRecord() {
{FieldID: 9, Name: "field8", DataType: schemapb.DataType_VarChar},
{FieldID: 10, Name: "field9", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
{FieldID: 11, Name: "field10", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "4"}}},
{FieldID: 12, Name: "field11", DataType: schemapb.DataType_JSON},
{FieldID: 13, Name: "field12", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "4"}}},
{FieldID: 14, Name: "field13", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Int32},
{FieldID: 15, Name: "field14", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Bool},
{FieldID: 16, Name: "field15", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Int8},
{FieldID: 17, Name: "field16", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Int16},
{FieldID: 18, Name: "field17", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Int64},
{FieldID: 19, Name: "field18", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Float},
{FieldID: 20, Name: "field19", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Double},
{FieldID: 21, Name: "field20", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_String},
{FieldID: 12, Name: "field11", DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Int32},
{FieldID: 13, Name: "field12", DataType: schemapb.DataType_JSON},
{FieldID: 14, Name: "field12", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "4"}}},
}
schema, err := metacache.ConvertToArrowSchema(fieldSchemas)
@ -297,17 +290,7 @@ func (s *SyncTaskSuiteV2) TestBuildRecord() {
Data: []float32{4, 5, 6, 7, 4, 5, 6, 7},
Dim: 4,
},
12: &storage.JSONFieldData{
Data: [][]byte{
[]byte(`{"batch":2}`),
[]byte(`{"key":"world"}`),
},
},
13: &storage.Float16VectorFieldData{
Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255},
Dim: 4,
},
14: &storage.ArrayFieldData{
12: &storage.ArrayFieldData{
ElementType: schemapb.DataType_Int32,
Data: []*schemapb.ScalarField{
{
@ -322,110 +305,15 @@ func (s *SyncTaskSuiteV2) TestBuildRecord() {
},
},
},
15: &storage.ArrayFieldData{
ElementType: schemapb.DataType_Bool,
Data: []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{Data: []bool{false, false, false}},
},
},
{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{Data: []bool{false, false, false}},
},
},
13: &storage.JSONFieldData{
Data: [][]byte{
[]byte(`{"batch":2}`),
[]byte(`{"key":"world"}`),
},
},
16: &storage.ArrayFieldData{
ElementType: schemapb.DataType_Int8,
Data: []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{3, 2, 1}},
},
},
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{6, 5, 4}},
},
},
},
},
17: &storage.ArrayFieldData{
ElementType: schemapb.DataType_Int16,
Data: []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{3, 2, 1}},
},
},
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{6, 5, 4}},
},
},
},
},
18: &storage.ArrayFieldData{
ElementType: schemapb.DataType_Int64,
Data: []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{Data: []int64{3, 2, 1}},
},
},
{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{Data: []int64{3, 2, 1}},
},
},
},
},
19: &storage.ArrayFieldData{
ElementType: schemapb.DataType_Float,
Data: []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{Data: []float32{3, 2, 1}},
},
},
{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{Data: []float32{3, 2, 1}},
},
},
},
},
20: &storage.ArrayFieldData{
ElementType: schemapb.DataType_Double,
Data: []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{Data: []float64{3, 2, 1}},
},
},
{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{Data: []float64{3, 2, 1}},
},
},
},
},
21: &storage.ArrayFieldData{
ElementType: schemapb.DataType_String,
Data: []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{Data: []string{"a", "b", "c"}},
},
},
{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{Data: []string{"a", "b", "c"}},
},
},
},
14: &storage.Float16VectorFieldData{
Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255},
Dim: 4,
},
},
}
@ -435,9 +323,6 @@ func (s *SyncTaskSuiteV2) TestBuildRecord() {
s.EqualValues(2, b.NewRecord().NumRows())
}
func (s *SyncTaskSuiteV2) TestAppendLists() {
}
func TestSyncTaskV2(t *testing.T) {
suite.Run(t, new(SyncTaskSuiteV2))
}

File diff suppressed because it is too large Load Diff

View File

@ -63,7 +63,7 @@ popd
pushd ${ROOT_DIR}/cmake_build/thirdparty
git clone --depth=1 --branch v0.42.0 https://github.com/apache/incubator-opendal.git opendal
git clone --depth=1 --branch v0.43.0-rc.2 https://github.com/apache/incubator-opendal.git opendal
cd opendal
if command -v cargo >/dev/null 2>&1; then
echo "cargo exists"