enhance: Optimize index format for improved load performance (#41041)

related: #40838
pr: https://github.com/milvus-io/milvus/pull/40839

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
pull/41269/head
Chun Han 2025-04-15 07:07:38 +08:00 committed by GitHub
parent 2498eb0691
commit afc0ca77e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 599 additions and 366 deletions

View File

@ -575,16 +575,8 @@ BitmapIndex<T>::Load(milvus::tracer::TraceContext ctx, const Config& config) {
AssertInfo(index_files.has_value(),
"index file paths is empty when load bitmap index");
auto index_datas = file_manager_->LoadIndexToMemory(index_files.value());
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
auto size = data->DataSize();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);
}
AssembleIndexDatas(index_datas, binary_set);
LoadWithoutAssemble(binary_set, config);
}

View File

@ -362,16 +362,8 @@ HybridScalarIndex<T>::Load(milvus::tracer::TraceContext ctx,
auto index_datas = mem_file_manager_->LoadIndexToMemory(
std::vector<std::string>{index_type_file});
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
auto size = data->DataSize();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);
}
AssembleIndexDatas(index_datas, binary_set);
DeserializeIndexType(binary_set);
auto index = GetInternalIndex();

View File

@ -192,7 +192,6 @@ InvertedIndexTantivy<T>::Load(milvus::tracer::TraceContext ctx,
inverted_index_files.end());
std::vector<std::string> null_offset_files;
std::shared_ptr<FieldDataBase> null_offset_data;
auto find_file = [&](const std::string& target) -> auto {
return std::find_if(inverted_index_files.begin(),
@ -202,6 +201,12 @@ InvertedIndexTantivy<T>::Load(milvus::tracer::TraceContext ctx,
});
};
auto fill_null_offsets = [&](const uint8_t* data, int64_t size) {
folly::SharedMutex::WriteHolder lock(mutex_);
null_offset_.resize((size_t)size / sizeof(size_t));
memcpy(null_offset_.data(), data, (size_t)size);
};
if (auto it = find_file(INDEX_FILE_SLICE_META);
it != inverted_index_files.end()) {
// SLICE_META only exist if null_offset_files are sliced
@ -219,24 +224,24 @@ InvertedIndexTantivy<T>::Load(milvus::tracer::TraceContext ctx,
auto index_datas =
mem_file_manager_->LoadIndexToMemory(null_offset_files);
AssembleIndexDatas(index_datas);
null_offset_data = index_datas.at(INDEX_NULL_OFFSET_FILE_NAME);
auto null_offsets_data = CompactIndexDatas(index_datas);
auto null_offsets_data_codecs =
std::move(null_offsets_data.at(INDEX_NULL_OFFSET_FILE_NAME));
for (auto&& null_offsets_codec : null_offsets_data_codecs.codecs_) {
fill_null_offsets(null_offsets_codec->PayloadData(),
null_offsets_codec->PayloadSize());
}
} else if (auto it = find_file(INDEX_NULL_OFFSET_FILE_NAME);
it != inverted_index_files.end()) {
// null offset file is not sliced
null_offset_files.push_back(*it);
auto index_datas = mem_file_manager_->LoadIndexToMemory({*it});
null_offset_data = index_datas.at(INDEX_NULL_OFFSET_FILE_NAME);
auto null_offset_data =
std::move(index_datas.at(INDEX_NULL_OFFSET_FILE_NAME));
fill_null_offsets(null_offset_data->PayloadData(),
null_offset_data->PayloadSize());
}
if (null_offset_data) {
auto data = null_offset_data->Data();
auto size = null_offset_data->DataSize();
folly::SharedMutex::WriteHolder lock(mutex_);
null_offset_.resize((size_t)size / sizeof(size_t));
memcpy(null_offset_.data(), data, (size_t)size);
}
// remove from inverted_index_files
inverted_index_files.erase(
std::remove_if(inverted_index_files.begin(),

View File

@ -207,16 +207,8 @@ ScalarIndexSort<T>::Load(milvus::tracer::TraceContext ctx,
AssertInfo(index_files.has_value(),
"index file paths is empty when load disk ann index");
auto index_datas = file_manager_->LoadIndexToMemory(index_files.value());
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
auto size = data->DataSize();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);
}
AssembleIndexDatas(index_datas, binary_set);
LoadWithoutAssemble(binary_set, config);
}

View File

@ -234,16 +234,8 @@ StringIndexMarisa::Load(milvus::tracer::TraceContext ctx,
AssertInfo(index_files.has_value(),
"index file paths is empty when load index");
auto index_datas = file_manager_->LoadIndexToMemory(index_files.value());
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
auto size = data->DataSize();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);
}
AssembleIndexDatas(index_datas, binary_set);
LoadWithoutAssemble(binary_set, config);
}

View File

@ -137,15 +137,8 @@ TextMatchIndex::Load(const Config& config) {
file.push_back(*it);
files_value.erase(it);
auto index_datas = mem_file_manager_->LoadIndexToMemory(file);
AssembleIndexDatas(index_datas);
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
auto size = data->DataSize();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);
}
AssembleIndexDatas(index_datas, binary_set);
auto index_valid_data = binary_set.GetByName("index_null_offset");
folly::SharedMutex::WriteHolder lock(mutex_);
null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t));

View File

@ -262,37 +262,78 @@ ParseConfigFromIndexParams(
return config;
}
void
AssembleIndexDatas(std::map<std::string, FieldDataPtr>& index_datas) {
std::map<std::string, IndexDataCodec>
CompactIndexDatas(
std::map<std::string, std::unique_ptr<storage::DataCodec>>& index_datas) {
std::map<std::string, IndexDataCodec> index_file_slices;
std::unordered_set<std::string> compacted_files;
if (index_datas.find(INDEX_FILE_SLICE_META) != index_datas.end()) {
auto slice_meta = index_datas.at(INDEX_FILE_SLICE_META);
Config meta_data = Config::parse(
std::string(static_cast<const char*>(slice_meta->Data()),
slice_meta->DataSize()));
auto slice_meta = std::move(index_datas.at(INDEX_FILE_SLICE_META));
Config meta_data = Config::parse(std::string(
reinterpret_cast<const char*>(slice_meta->PayloadData()),
slice_meta->PayloadSize()));
compacted_files.insert(INDEX_FILE_SLICE_META);
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
// build index skip null value, so not need to set nullable == true
auto new_field_data =
storage::CreateFieldData(DataType::INT8, false, 1, total_len);
auto data_len = 0;
index_file_slices.insert({prefix, IndexDataCodec{}});
auto& index_data_codec = index_file_slices.at(prefix);
for (auto i = 0; i < slice_num; ++i) {
std::string file_name = GenSlicedFileName(prefix, i);
AssertInfo(index_datas.find(file_name) != index_datas.end(),
"lost index slice data");
auto data = index_datas.at(file_name);
auto len = data->DataSize();
new_field_data->FillFieldData(data->Data(), len);
index_datas.erase(file_name);
index_data_codec.codecs_.push_back(
std::move(index_datas.at(file_name)));
compacted_files.insert(file_name);
data_len += index_data_codec.codecs_.back()->PayloadSize();
}
AssertInfo(
new_field_data->IsFull(),
total_len == data_len,
"index len is inconsistent after disassemble and assemble");
index_datas[prefix] = new_field_data;
if (index_datas.count(prefix) > 0) {
index_data_codec.codecs_.push_back(
std::move(index_datas[prefix]));
compacted_files.insert(prefix);
}
index_data_codec.size_ = data_len;
}
}
for (auto& index_data : index_datas) {
if (compacted_files.find(index_data.first) == compacted_files.end()) {
index_file_slices.insert({index_data.first, IndexDataCodec{}});
auto& index_data_codec = index_file_slices.at(index_data.first);
index_data_codec.size_ = index_data.second->PayloadSize();
index_data_codec.codecs_.push_back(std::move(index_data.second));
}
}
return index_file_slices;
}
void
AssembleIndexDatas(
std::map<std::string, std::unique_ptr<storage::DataCodec>>& index_datas,
BinarySet& index_binary_set) {
auto index_file_slices = CompactIndexDatas(index_datas);
AssembleIndexDatas(index_file_slices, index_binary_set);
}
void
AssembleIndexDatas(std::map<std::string, IndexDataCodec>& index_file_slices,
BinarySet& index_binary_set) {
for (auto& [key, index_slices] : index_file_slices) {
auto index_size = index_slices.size_;
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[index_size]);
int64_t offset = 0;
for (auto&& index_slice : index_slices.codecs_) {
std::memcpy(buf.get() + offset,
index_slice->PayloadData(),
index_slice->PayloadSize());
offset += index_slice->PayloadSize();
}
index_binary_set.Append(key, buf, index_size);
}
}
void

View File

@ -34,6 +34,7 @@
#include "common/RangeSearchHelper.h"
#include "index/IndexInfo.h"
#include "storage/Types.h"
#include "storage/DataCodec.h"
namespace milvus::index {
@ -148,8 +149,23 @@ Config
ParseConfigFromIndexParams(
const std::map<std::string, std::string>& index_params);
struct IndexDataCodec {
std::list<std::unique_ptr<storage::DataCodec>> codecs_{};
int64_t size_{0};
};
std::map<std::string, IndexDataCodec>
CompactIndexDatas(
std::map<std::string, std::unique_ptr<storage::DataCodec>>& index_datas);
void
AssembleIndexDatas(std::map<std::string, FieldDataPtr>& index_datas);
AssembleIndexDatas(
std::map<std::string, std::unique_ptr<storage::DataCodec>>& index_datas,
BinarySet& index_binary_set);
void
AssembleIndexDatas(std::map<std::string, IndexDataCodec>& index_datas,
BinarySet& index_binary_set);
void
AssembleIndexDatas(std::map<std::string, FieldDataChannelPtr>& index_datas,

View File

@ -181,11 +181,7 @@ VectorMemIndex<T>::Load(milvus::tracer::TraceContext ctx,
index_files->end());
LOG_INFO("load index files: {}", index_files.value().size());
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
std::map<std::string, FieldDataPtr> index_datas{};
std::map<std::string, IndexDataCodec> index_data_codecs{};
// try to read slice meta first
std::string slice_meta_filepath;
for (auto& file : pending_index_files) {
@ -212,17 +208,14 @@ VectorMemIndex<T>::Load(milvus::tracer::TraceContext ctx,
auto result =
file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));
auto raw_slice_meta = std::move(result[INDEX_FILE_SLICE_META]);
Config meta_data = Config::parse(std::string(
reinterpret_cast<const char*>(raw_slice_meta->PayloadData()),
raw_slice_meta->PayloadSize()));
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
auto new_field_data = milvus::storage::CreateFieldData(
DataType::INT8, false, 1, total_len);
std::vector<std::string> batch;
batch.reserve(slice_num);
@ -232,23 +225,26 @@ VectorMemIndex<T>::Load(milvus::tracer::TraceContext ctx,
}
auto batch_data = file_manager_->LoadIndexToMemory(batch);
int64_t payload_size = 0;
index_data_codecs.insert({prefix, IndexDataCodec{}});
auto& index_data_codec = index_data_codecs.at(prefix);
for (const auto& file_path : batch) {
const std::string file_name =
file_path.substr(file_path.find_last_of('/') + 1);
AssertInfo(batch_data.find(file_name) != batch_data.end(),
"lost index slice data: {}",
file_name);
auto data = batch_data[file_name];
new_field_data->FillFieldData(data->Data(), data->Size());
payload_size += batch_data[file_name]->PayloadSize();
index_data_codec.codecs_.push_back(
std::move(batch_data[file_name]));
}
for (auto& file : batch) {
pending_index_files.erase(file);
}
AssertInfo(
new_field_data->IsFull(),
payload_size == total_len,
"index len is inconsistent after disassemble and assemble");
index_datas[prefix] = new_field_data;
index_data_codec.size_ = payload_size;
}
}
@ -257,7 +253,12 @@ VectorMemIndex<T>::Load(milvus::tracer::TraceContext ctx,
file_manager_->LoadIndexToMemory(std::vector<std::string>(
pending_index_files.begin(), pending_index_files.end()));
for (auto&& index_data : result) {
index_datas.insert(std::move(index_data));
auto prefix = index_data.first;
index_data_codecs.insert({prefix, IndexDataCodec{}});
auto& index_data_codec = index_data_codecs.at(prefix);
index_data_codec.size_ = index_data.second->PayloadSize();
index_data_codec.codecs_.push_back(
std::move(index_data.second));
}
}
@ -266,14 +267,7 @@ VectorMemIndex<T>::Load(milvus::tracer::TraceContext ctx,
LOG_INFO("construct binary set...");
BinarySet binary_set;
for (auto& [key, data] : index_datas) {
LOG_INFO("add index data to binary set: {}", key);
auto size = data->Size();
auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction
auto buf = std::shared_ptr<uint8_t[]>(
(uint8_t*)const_cast<void*>(data->Data()), deleter);
binary_set.Append(key, buf, size);
}
AssembleIndexDatas(index_data_codecs, binary_set);
// start engine load index span
auto span_load_engine =
@ -568,10 +562,10 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
batch.reserve(parallel_degree);
auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));
auto raw_slice_meta = std::move(result[INDEX_FILE_SLICE_META]);
Config meta_data = Config::parse(std::string(
reinterpret_cast<const char*>(raw_slice_meta->PayloadData()),
raw_slice_meta->PayloadSize()));
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
@ -586,13 +580,14 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
std::string file_name = GenSlicedFileName(prefix, j);
AssertInfo(batch_data.find(file_name) != batch_data.end(),
"lost index slice data");
auto data = batch_data[file_name];
auto&& data = batch_data[file_name];
auto start_write_file = std::chrono::system_clock::now();
auto written = file.Write(data->Data(), data->Size());
auto written =
file.Write(data->PayloadData(), data->PayloadSize());
write_disk_duration_sum +=
(std::chrono::system_clock::now() - start_write_file);
AssertInfo(
written == data->Size(),
written == data->PayloadSize(),
fmt::format("failed to write index data to disk {}: {}",
filepath->data(),
strerror(errno)));
@ -624,7 +619,7 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
//2. write data into files
auto start_write_file = std::chrono::system_clock::now();
for (auto& [_, index_data] : result) {
file.Write(index_data->Data(), index_data->Size());
file.Write(index_data->PayloadData(), index_data->PayloadSize());
}
write_disk_duration_sum +=
(std::chrono::system_clock::now() - start_write_file);

View File

@ -30,6 +30,19 @@ class BinlogReader {
: data_(binlog_data), size_(length), tell_(0) {
}
template <typename T>
SegcoreError
ReadSingleValue(T& val) {
auto needed_size = sizeof(T);
if (needed_size > size_ - tell_) {
return SegcoreError(milvus::UnexpectedError,
"out range of binlog data");
}
val = *reinterpret_cast<T*>(data_.get() + tell_);
tell_ += needed_size;
return SegcoreError(milvus::Success, "");
}
SegcoreError
Read(int64_t nbytes, void* out);

View File

@ -49,13 +49,8 @@ DeserializeRemoteFileData(BinlogReaderPtr reader, bool is_field_data) {
reader, event_data_length, data_type, nullable, is_field_data);
std::unique_ptr<InsertData> insert_data;
if (is_field_data) {
insert_data =
std::make_unique<InsertData>(insert_event_data.field_data);
} else {
insert_data = std::make_unique<InsertData>(
insert_event_data.payload_reader);
}
insert_data =
std::make_unique<InsertData>(insert_event_data.payload_reader);
insert_data->SetFieldDataMeta(data_meta);
insert_data->SetTimestamps(insert_event_data.start_timestamp,
insert_event_data.end_timestamp);
@ -66,9 +61,13 @@ DeserializeRemoteFileData(BinlogReaderPtr reader, bool is_field_data) {
header.event_length_ - GetEventHeaderSize(header);
auto index_event_data =
IndexEventData(reader, event_data_length, data_type, nullable);
auto field_data = index_event_data.field_data;
// for compatible with golang indexcode.Serialize, which set dataType to String
if (data_type == DataType::STRING) {
if (index_event_data.payload_reader->get_payload_datatype() ==
DataType::STRING) {
AssertInfo(index_event_data.payload_reader->has_field_data(),
"old index having no field_data");
auto field_data =
index_event_data.payload_reader->get_field_data();
AssertInfo(field_data->get_data_type() == DataType::STRING,
"wrong index type in index binlog file");
AssertInfo(
@ -79,10 +78,11 @@ DeserializeRemoteFileData(BinlogReaderPtr reader, bool is_field_data) {
(*static_cast<const std::string*>(field_data->RawValue(0)))
.c_str(),
field_data->Size());
field_data = new_field_data;
index_event_data.payload_reader =
std::make_shared<PayloadReader>(new_field_data);
}
auto index_data = std::make_unique<IndexData>(field_data);
auto index_data =
std::make_unique<IndexData>(index_event_data.payload_reader);
index_data->SetFieldDataMeta(data_meta);
IndexMeta index_meta;
index_meta.segment_id = data_meta.segment_id;

View File

@ -32,12 +32,16 @@ namespace milvus::storage {
class DataCodec {
public:
explicit DataCodec(FieldDataPtr data, CodecType type)
: field_data_(std::move(data)), codec_type_(type) {
explicit DataCodec(std::shared_ptr<PayloadReader>& reader, CodecType type)
: codec_type_(type), payload_reader_(std::move(reader)) {
}
explicit DataCodec(std::shared_ptr<PayloadReader> reader, CodecType type)
: payload_reader_(reader), codec_type_(type) {
explicit DataCodec(const uint8_t* payload_data,
int64_t length,
CodecType type)
: codec_type_(type) {
payload_reader_ = std::make_shared<PayloadReader>(
payload_data, length, DataType::NONE, false, false);
}
virtual ~DataCodec() = default;
@ -68,12 +72,19 @@ class DataCodec {
DataType
GetDataType() {
return field_data_->get_data_type();
AssertInfo(payload_reader_ != nullptr,
"payload_reader in the data_codec is invalid, wrong state");
return payload_reader_->get_payload_datatype();
}
FieldDataPtr
GetFieldData() const {
return field_data_;
AssertInfo(payload_reader_ != nullptr,
"payload_reader in the data_codec is invalid, wrong state");
AssertInfo(payload_reader_->has_field_data(),
"payload_reader in the data_codec has no field data, "
"wrongly calling the method");
return payload_reader_->get_field_data();
}
virtual std::shared_ptr<ArrowDataWrapper>
@ -90,12 +101,43 @@ class DataCodec {
data_ = data;
}
bool
HasBinaryPayload() const {
AssertInfo(payload_reader_ != nullptr,
"payload_reader in the data_codec is invalid, wrong state");
return payload_reader_->has_binary_payload();
}
const uint8_t*
PayloadData() const {
if (HasBinaryPayload()) {
return payload_reader_->get_payload_data();
}
if (payload_reader_->has_field_data()) {
return reinterpret_cast<const uint8_t*>(
const_cast<void*>(payload_reader_->get_field_data()->Data()));
}
return nullptr;
}
int64_t
PayloadSize() const {
if (HasBinaryPayload()) {
return payload_reader_->get_payload_size();
}
if (payload_reader_->has_field_data()) {
return payload_reader_->get_field_data_size();
}
return 0;
}
protected:
CodecType codec_type_;
std::pair<Timestamp, Timestamp> time_range_;
FieldDataPtr field_data_;
std::shared_ptr<PayloadReader> payload_reader_;
std::shared_ptr<uint8_t[]> data_;
//the shared ptr to keep the original input data alive for zero-copy target
};
// Deserialize the data stream of the file obtained from remote or local

View File

@ -260,13 +260,12 @@ DiskFileManagerImpl::CacheIndexToDiskInternal(
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
auto appendIndexFiles = [&]() {
auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files);
for (auto& chunk : index_chunks) {
auto index_data = chunk.get()->GetFieldData();
auto index_size = index_data->DataSize();
auto chunk_data = reinterpret_cast<uint8_t*>(
const_cast<void*>(index_data->Data()));
file.Write(chunk_data, index_size);
auto index_chunks_futures =
GetObjectData(rcm_.get(), batch_remote_files);
for (auto& chunk_future : index_chunks_futures) {
auto chunk_codec = chunk_future.get();
file.Write(chunk_codec->PayloadData(),
chunk_codec->PayloadSize());
}
batch_remote_files.clear();
};

View File

@ -212,9 +212,9 @@ BaseEventData::BaseEventData(BinlogReaderPtr reader,
DataType data_type,
bool nullable,
bool is_field_data) {
auto ast = reader->Read(sizeof(start_timestamp), &start_timestamp);
auto ast = reader->ReadSingleValue<Timestamp>(start_timestamp);
AssertInfo(ast.ok(), "read start timestamp failed");
ast = reader->Read(sizeof(end_timestamp), &end_timestamp);
ast = reader->ReadSingleValue<Timestamp>(end_timestamp);
AssertInfo(ast.ok(), "read end timestamp failed");
int payload_length =
@ -223,102 +223,119 @@ BaseEventData::BaseEventData(BinlogReaderPtr reader,
AssertInfo(res.first.ok(), "read payload failed");
payload_reader = std::make_shared<PayloadReader>(
res.second.get(), payload_length, data_type, nullable, is_field_data);
if (is_field_data) {
field_data = payload_reader->get_field_data();
}
}
std::vector<uint8_t>
BaseEventData::Serialize() {
auto data_type = field_data->get_data_type();
std::shared_ptr<PayloadWriter> payload_writer;
if (IsVectorDataType(data_type) &&
!IsSparseFloatVectorDataType(data_type)) {
payload_writer = std::make_unique<PayloadWriter>(
data_type, field_data->get_dim(), field_data->IsNullable());
if (payload_reader->has_binary_payload()) {
// for index slice, directly copy payload slice
auto payload_size = payload_reader->get_payload_size();
auto payload_data = payload_reader->get_payload_data();
auto len =
sizeof(start_timestamp) + sizeof(end_timestamp) + payload_size;
std::vector<uint8_t> res(len);
int offset = 0;
memcpy(res.data() + offset, &start_timestamp, sizeof(start_timestamp));
offset += sizeof(start_timestamp);
memcpy(res.data() + offset, &end_timestamp, sizeof(end_timestamp));
offset += sizeof(end_timestamp);
memcpy(res.data() + offset, payload_data, payload_size);
return res;
} else {
payload_writer = std::make_unique<PayloadWriter>(
data_type, field_data->IsNullable());
}
switch (data_type) {
case DataType::VARCHAR:
case DataType::STRING: {
for (size_t offset = 0; offset < field_data->get_num_rows();
++offset) {
auto str = static_cast<const std::string*>(
field_data->RawValue(offset));
auto size = field_data->is_valid(offset) ? str->size() : -1;
payload_writer->add_one_string_payload(str->c_str(), size);
}
break;
// for insert bin log, use field_data to serialize
auto field_data = payload_reader->get_field_data();
auto data_type = field_data->get_data_type();
std::shared_ptr<PayloadWriter> payload_writer;
if (IsVectorDataType(data_type) &&
!IsSparseFloatVectorDataType(data_type)) {
payload_writer = std::make_unique<PayloadWriter>(
data_type, field_data->get_dim(), field_data->IsNullable());
} else {
payload_writer = std::make_unique<PayloadWriter>(
data_type, field_data->IsNullable());
}
case DataType::ARRAY: {
for (size_t offset = 0; offset < field_data->get_num_rows();
++offset) {
auto array =
static_cast<const Array*>(field_data->RawValue(offset));
auto array_string = array->output_data().SerializeAsString();
auto size =
field_data->is_valid(offset) ? array_string.size() : -1;
payload_writer->add_one_binary_payload(
reinterpret_cast<const uint8_t*>(array_string.c_str()),
size);
}
break;
}
case DataType::JSON: {
for (size_t offset = 0; offset < field_data->get_num_rows();
++offset) {
auto string_view =
static_cast<const Json*>(field_data->RawValue(offset))
->data();
auto size =
field_data->is_valid(offset) ? string_view.size() : -1;
payload_writer->add_one_binary_payload(
reinterpret_cast<const uint8_t*>(
std::string(string_view).c_str()),
size);
}
break;
}
case DataType::VECTOR_SPARSE_FLOAT: {
for (size_t offset = 0; offset < field_data->get_num_rows();
++offset) {
auto row =
static_cast<const knowhere::sparse::SparseRow<float>*>(
switch (data_type) {
case DataType::VARCHAR:
case DataType::STRING: {
for (size_t offset = 0; offset < field_data->get_num_rows();
++offset) {
auto str = static_cast<const std::string*>(
field_data->RawValue(offset));
payload_writer->add_one_binary_payload(
static_cast<const uint8_t*>(row->data()),
row->data_byte_size());
auto size = field_data->is_valid(offset) ? str->size() : -1;
payload_writer->add_one_string_payload(str->c_str(), size);
}
break;
}
case DataType::ARRAY: {
for (size_t offset = 0; offset < field_data->get_num_rows();
++offset) {
auto array =
static_cast<const Array*>(field_data->RawValue(offset));
auto array_string =
array->output_data().SerializeAsString();
auto size =
field_data->is_valid(offset) ? array_string.size() : -1;
payload_writer->add_one_binary_payload(
reinterpret_cast<const uint8_t*>(array_string.c_str()),
size);
}
break;
}
case DataType::JSON: {
for (size_t offset = 0; offset < field_data->get_num_rows();
++offset) {
auto string_view =
static_cast<const Json*>(field_data->RawValue(offset))
->data();
auto size =
field_data->is_valid(offset) ? string_view.size() : -1;
payload_writer->add_one_binary_payload(
reinterpret_cast<const uint8_t*>(
std::string(string_view).c_str()),
size);
}
break;
}
case DataType::VECTOR_SPARSE_FLOAT: {
for (size_t offset = 0; offset < field_data->get_num_rows();
++offset) {
auto row =
static_cast<const knowhere::sparse::SparseRow<float>*>(
field_data->RawValue(offset));
payload_writer->add_one_binary_payload(
static_cast<const uint8_t*>(row->data()),
row->data_byte_size());
}
break;
}
default: {
auto payload =
Payload{data_type,
static_cast<const uint8_t*>(field_data->Data()),
field_data->ValidData(),
field_data->get_num_rows(),
field_data->get_dim(),
field_data->IsNullable()};
payload_writer->add_payload(payload);
}
break;
}
default: {
auto payload =
Payload{data_type,
static_cast<const uint8_t*>(field_data->Data()),
field_data->ValidData(),
field_data->get_num_rows(),
field_data->get_dim(),
field_data->IsNullable()};
payload_writer->add_payload(payload);
}
payload_writer->finish();
auto payload_buffer = payload_writer->get_payload_buffer();
auto len = sizeof(start_timestamp) + sizeof(end_timestamp) +
payload_buffer.size();
std::vector<uint8_t> res(len);
int offset = 0;
memcpy(res.data() + offset, &start_timestamp, sizeof(start_timestamp));
offset += sizeof(start_timestamp);
memcpy(res.data() + offset, &end_timestamp, sizeof(end_timestamp));
offset += sizeof(end_timestamp);
memcpy(
res.data() + offset, payload_buffer.data(), payload_buffer.size());
return res;
}
payload_writer->finish();
auto payload_buffer = payload_writer->get_payload_buffer();
auto len =
sizeof(start_timestamp) + sizeof(end_timestamp) + payload_buffer.size();
std::vector<uint8_t> res(len);
int offset = 0;
memcpy(res.data() + offset, &start_timestamp, sizeof(start_timestamp));
offset += sizeof(start_timestamp);
memcpy(res.data() + offset, &end_timestamp, sizeof(end_timestamp));
offset += sizeof(end_timestamp);
memcpy(res.data() + offset, payload_buffer.data(), payload_buffer.size());
return res;
}
BaseEvent::BaseEvent(BinlogReaderPtr reader,

View File

@ -76,9 +76,7 @@ struct DescriptorEventData {
struct BaseEventData {
Timestamp start_timestamp;
Timestamp end_timestamp;
FieldDataPtr field_data;
std::shared_ptr<PayloadReader> payload_reader;
BaseEventData() = default;
explicit BaseEventData(BinlogReaderPtr reader,
int event_length,
@ -117,10 +115,10 @@ struct BaseEvent {
using InsertEvent = BaseEvent;
using InsertEventData = BaseEventData;
using IndexEvent = BaseEvent;
using IndexEventData = BaseEventData;
using DeleteEvent = BaseEvent;
using DeleteEventData = BaseEventData;
using IndexEvent = BaseEvent;
using IndexEventData = BaseEventData;
using CreateCollectionEvent = BaseEvent;
using CreateCollectionEventData = BaseEventData;
using CreatePartitionEvent = BaseEvent;

View File

@ -50,10 +50,6 @@ std::vector<uint8_t>
IndexData::serialize_to_remote_file() {
AssertInfo(field_data_meta_.has_value(), "field data not exist");
AssertInfo(index_meta_.has_value(), "index meta not exist");
AssertInfo(field_data_ != nullptr, "empty field data");
DataType data_type = field_data_->get_data_type();
// create descriptor event
DescriptorEvent descriptor_event;
auto& des_event_data = descriptor_event.event_data;
@ -64,7 +60,8 @@ IndexData::serialize_to_remote_file() {
des_fix_part.field_id = field_data_meta_->field_id;
des_fix_part.start_timestamp = time_range_.first;
des_fix_part.end_timestamp = time_range_.second;
des_fix_part.data_type = milvus::proto::schema::DataType(data_type);
des_fix_part.data_type =
milvus::proto::schema::DataType(milvus::proto::schema::DataType::None);
for (auto i = int8_t(EventType::DescriptorEvent);
i < int8_t(EventType::EventTypeEnd);
i++) {
@ -72,14 +69,12 @@ IndexData::serialize_to_remote_file() {
GetEventFixPartSize(EventType(i)));
}
des_event_data.extras[ORIGIN_SIZE_KEY] =
std::to_string(field_data_->Size());
std::to_string(payload_reader_->get_payload_size());
des_event_data.extras[INDEX_BUILD_ID_KEY] =
std::to_string(index_meta_->build_id);
auto& des_event_header = descriptor_event.event_header;
// TODO :: set timestamp
des_event_header.timestamp_ = 0;
// serialize descriptor event data
auto des_event_bytes = descriptor_event.Serialize();
@ -89,7 +84,7 @@ IndexData::serialize_to_remote_file() {
auto& index_event_data = index_event.event_data;
index_event_data.start_timestamp = time_range_.first;
index_event_data.end_timestamp = time_range_.second;
index_event_data.field_data = field_data_;
index_event_data.payload_reader = payload_reader_;
auto& index_event_header = index_event.event_header;
index_event_header.event_type_ = EventType::IndexFileEvent;
@ -98,11 +93,9 @@ IndexData::serialize_to_remote_file() {
// serialize insert event
auto index_event_bytes = index_event.Serialize();
des_event_bytes.insert(des_event_bytes.end(),
index_event_bytes.begin(),
index_event_bytes.end());
return des_event_bytes;
}
@ -110,7 +103,7 @@ IndexData::serialize_to_remote_file() {
std::vector<uint8_t>
IndexData::serialize_to_local_file() {
LocalIndexEvent event;
event.field_data = field_data_;
event.field_data = GetFieldData();
return event.Serialize();
}

View File

@ -27,8 +27,12 @@ namespace milvus::storage {
// TODO :: indexParams storage in a single file
class IndexData : public DataCodec {
public:
explicit IndexData(FieldDataPtr data)
: DataCodec(data, CodecType::IndexDataType) {
explicit IndexData(std::shared_ptr<PayloadReader>& payload_reader)
: DataCodec(payload_reader, CodecType::IndexDataType) {
}
explicit IndexData(const uint8_t* payload_data, int64_t length)
: DataCodec(payload_data, length, CodecType::IndexDataType) {
}
std::vector<uint8_t>

View File

@ -46,9 +46,9 @@ InsertData::Serialize(StorageType medium) {
std::vector<uint8_t>
InsertData::serialize_to_remote_file() {
AssertInfo(field_data_meta_.has_value(), "field data not exist");
AssertInfo(field_data_ != nullptr, "empty field data");
DataType data_type = field_data_->get_data_type();
AssertInfo(payload_reader_->has_field_data(), "empty field data");
auto field_data = payload_reader_->get_field_data();
DataType data_type = field_data->get_data_type();
// create descriptor event
DescriptorEvent descriptor_event;
@ -67,9 +67,8 @@ InsertData::serialize_to_remote_file() {
des_event_data.post_header_lengths.push_back(
GetEventFixPartSize(EventType(i)));
}
des_event_data.extras[ORIGIN_SIZE_KEY] =
std::to_string(field_data_->Size());
des_event_data.extras[NULLABLE] = field_data_->IsNullable();
des_event_data.extras[ORIGIN_SIZE_KEY] = std::to_string(field_data->Size());
des_event_data.extras[NULLABLE] = field_data->IsNullable();
auto& des_event_header = descriptor_event.event_header;
// TODO :: set timestamp
@ -84,7 +83,7 @@ InsertData::serialize_to_remote_file() {
auto& insert_event_data = insert_event.event_data;
insert_event_data.start_timestamp = time_range_.first;
insert_event_data.end_timestamp = time_range_.second;
insert_event_data.field_data = field_data_;
insert_event_data.payload_reader = payload_reader_;
auto& insert_event_header = insert_event.event_header;
// TODO :: set timestamps
@ -108,7 +107,7 @@ InsertData::serialize_to_remote_file() {
std::vector<uint8_t>
InsertData::serialize_to_local_file() {
LocalInsertEvent event;
event.field_data = field_data_;
event.field_data = GetFieldData();
return event.Serialize();
}

View File

@ -26,10 +26,6 @@ namespace milvus::storage {
class InsertData : public DataCodec {
public:
explicit InsertData(FieldDataPtr data)
: DataCodec(data, CodecType::InsertDataType) {
}
explicit InsertData(std::shared_ptr<PayloadReader> payload_reader)
: DataCodec(payload_reader, CodecType::InsertDataType) {
}

View File

@ -96,10 +96,10 @@ MemFileManagerImpl::LoadFile(const std::string& filename) noexcept {
return true;
}
std::map<std::string, FieldDataPtr>
std::map<std::string, std::unique_ptr<DataCodec>>
MemFileManagerImpl::LoadIndexToMemory(
const std::vector<std::string>& remote_files) {
std::map<std::string, FieldDataPtr> file_to_index_data;
std::map<std::string, std::unique_ptr<DataCodec>> file_to_index_data;
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
std::vector<std::string> batch_files;
@ -109,8 +109,7 @@ MemFileManagerImpl::LoadIndexToMemory(
for (size_t idx = 0; idx < batch_files.size(); ++idx) {
auto file_name =
batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1);
file_to_index_data[file_name] =
index_datas[idx].get()->GetFieldData();
file_to_index_data[file_name] = index_datas[idx].get();
}
};

View File

@ -51,7 +51,7 @@ class MemFileManagerImpl : public FileManagerImpl {
return "MemIndexFileManagerImpl";
}
std::map<std::string, FieldDataPtr>
std::map<std::string, std::unique_ptr<DataCodec>>
LoadIndexToMemory(const std::vector<std::string>& remote_files);
std::vector<FieldDataPtr>

View File

@ -25,70 +25,80 @@
namespace milvus::storage {
PayloadReader::PayloadReader(const milvus::FieldDataPtr& fieldData)
: column_type_(fieldData->get_data_type()),
nullable_(fieldData->IsNullable()) {
field_data_ = fieldData;
}
PayloadReader::PayloadReader(const uint8_t* data,
int length,
DataType data_type,
bool nullable,
bool is_field_data)
: column_type_(data_type), nullable_(nullable) {
auto input = std::make_shared<arrow::io::BufferReader>(data, length);
init(input, is_field_data);
init(data, length, is_field_data);
}
void
PayloadReader::init(std::shared_ptr<arrow::io::BufferReader> input,
bool is_field_data) {
arrow::MemoryPool* pool = arrow::default_memory_pool();
// Configure general Parquet reader settings
auto reader_properties = parquet::ReaderProperties(pool);
reader_properties.set_buffer_size(4096 * 4);
reader_properties.enable_buffered_stream();
// Configure Arrow-specific Parquet reader settings
auto arrow_reader_props = parquet::ArrowReaderProperties();
arrow_reader_props.set_batch_size(128 * 1024); // default 64 * 1024
arrow_reader_props.set_pre_buffer(false);
parquet::arrow::FileReaderBuilder reader_builder;
auto st = reader_builder.Open(input, reader_properties);
AssertInfo(st.ok(), "file to read file");
reader_builder.memory_pool(pool);
reader_builder.properties(arrow_reader_props);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
st = reader_builder.Build(&arrow_reader);
AssertInfo(st.ok(), "build file reader");
int64_t column_index = 0;
auto file_meta = arrow_reader->parquet_reader()->metadata();
// dim is unused for sparse float vector
dim_ = (IsVectorDataType(column_type_) &&
!IsSparseFloatVectorDataType(column_type_))
? GetDimensionFromFileMetaData(
file_meta->schema()->Column(column_index), column_type_)
: 1;
auto total_num_rows = file_meta->num_rows();
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
st = arrow_reader->GetRecordBatchReader(&rb_reader);
AssertInfo(st.ok(), "get record batch reader");
if (is_field_data) {
field_data_ =
CreateFieldData(column_type_, nullable_, dim_, total_num_rows);
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch :
*rb_reader) {
AssertInfo(maybe_batch.ok(), "get batch record success");
auto array = maybe_batch.ValueOrDie()->column(column_index);
// to read
field_data_->FillFieldData(array);
}
AssertInfo(field_data_->IsFull(), "field data hasn't been filled done");
PayloadReader::init(const uint8_t* data, int length, bool is_field_data) {
if (column_type_ == DataType::NONE) {
payload_buf_ = std::make_shared<BytesBuf>(data, length);
} else {
arrow_reader_ = std::move(arrow_reader);
record_batch_reader_ = std::move(rb_reader);
auto input = std::make_shared<arrow::io::BufferReader>(data, length);
arrow::MemoryPool* pool = arrow::default_memory_pool();
// Configure general Parquet reader settings
auto reader_properties = parquet::ReaderProperties(pool);
reader_properties.set_buffer_size(4096 * 4);
// reader_properties.enable_buffered_stream();
// Configure Arrow-specific Parquet reader settings
auto arrow_reader_props = parquet::ArrowReaderProperties();
arrow_reader_props.set_batch_size(128 * 1024); // default 64 * 1024
arrow_reader_props.set_pre_buffer(false);
parquet::arrow::FileReaderBuilder reader_builder;
auto st = reader_builder.Open(input, reader_properties);
AssertInfo(st.ok(), "file to read file");
reader_builder.memory_pool(pool);
reader_builder.properties(arrow_reader_props);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
st = reader_builder.Build(&arrow_reader);
AssertInfo(st.ok(), "build file reader");
int64_t column_index = 0;
auto file_meta = arrow_reader->parquet_reader()->metadata();
// dim is unused for sparse float vector
dim_ =
(IsVectorDataType(column_type_) &&
!IsSparseFloatVectorDataType(column_type_))
? GetDimensionFromFileMetaData(
file_meta->schema()->Column(column_index), column_type_)
: 1;
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
st = arrow_reader->GetRecordBatchReader(&rb_reader);
AssertInfo(st.ok(), "get record batch reader");
if (is_field_data) {
auto total_num_rows = file_meta->num_rows();
field_data_ =
CreateFieldData(column_type_, nullable_, dim_, total_num_rows);
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>>
maybe_batch : *rb_reader) {
AssertInfo(maybe_batch.ok(), "get batch record success");
auto array = maybe_batch.ValueOrDie()->column(column_index);
// to read
field_data_->FillFieldData(array);
}
AssertInfo(field_data_->IsFull(),
"field data hasn't been filled done");
} else {
arrow_reader_ = std::move(arrow_reader);
record_batch_reader_ = std::move(rb_reader);
}
}
}

View File

@ -27,6 +27,8 @@ namespace milvus::storage {
class PayloadReader {
public:
explicit PayloadReader(const FieldDataPtr& fieldData);
explicit PayloadReader(const uint8_t* data,
int length,
DataType data_type,
@ -36,13 +38,20 @@ class PayloadReader {
~PayloadReader() = default;
void
init(std::shared_ptr<arrow::io::BufferReader> buffer, bool is_field_data);
init(const uint8_t* data, int length, bool is_field_data);
const FieldDataPtr
get_field_data() const {
return field_data_;
}
int64_t
get_field_data_size() const {
if (field_data_)
return field_data_->Size();
return 0;
}
std::shared_ptr<arrow::RecordBatchReader>
get_reader() {
return record_batch_reader_;
@ -53,6 +62,45 @@ class PayloadReader {
return arrow_reader_;
}
int64_t
get_payload_size() {
if (payload_buf_) {
return payload_buf_->Size();
}
return 0;
}
const uint8_t*
get_payload_data() {
if (payload_buf_) {
return payload_buf_->Data();
}
return nullptr;
}
bool
has_binary_payload() {
return payload_buf_ != nullptr;
}
bool
has_field_data() {
return field_data_ != nullptr;
}
DataType
get_payload_datatype() {
AssertInfo(payload_buf_ != nullptr || field_data_ != nullptr,
"Neither payload_buf nor field_data is "
"available, wrong state");
if (payload_buf_) {
return DataType::NONE;
}
if (field_data_) {
return field_data_->get_data_type();
}
}
private:
DataType column_type_;
int dim_;
@ -61,6 +109,9 @@ class PayloadReader {
std::shared_ptr<parquet::arrow::FileReader> arrow_reader_;
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader_;
// buffer for zero-copy bytes
std::shared_ptr<BytesBuf> payload_buf_;
};
} // namespace milvus::storage

View File

@ -255,3 +255,25 @@ struct fmt::formatter<milvus::storage::StorageType> : formatter<string_view> {
return formatter<string_view>::format("unknown", ctx);
}
};
struct BytesBuf {
public:
explicit BytesBuf(const uint8_t* data, int64_t length)
: data_(data), size_(length) {
AssertInfo(data != nullptr, "Data pointer for slice cannot be null.");
}
int64_t
Size() const {
return size_;
}
const uint8_t*
Data() const {
return data_;
}
private:
const uint8_t* data_; // Only used when not owning the data
int64_t size_;
};

View File

@ -584,12 +584,13 @@ std::unique_ptr<DataCodec>
DownloadAndDecodeRemoteFile(ChunkManager* chunk_manager,
const std::string& file,
bool is_field_data) {
// TODO remove this Size() cost
auto fileSize = chunk_manager->Size(file);
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize]);
chunk_manager->Read(file, buf.get(), fileSize);
auto res = DeserializeFileData(buf, fileSize, is_field_data);
res->SetData(buf);
// DataCodec must keep the buf alive for zero-copy usage, otherwise segmentation violation will occur
return res;
}
@ -601,9 +602,7 @@ EncodeAndUploadIndexSlice(ChunkManager* chunk_manager,
FieldDataMeta field_meta,
std::string object_key) {
// index not use valid_data, so no need to set nullable==true
auto field_data = CreateFieldData(DataType::INT8, false);
field_data->FillFieldData(buf, batch_size);
auto indexData = std::make_shared<IndexData>(field_data);
auto indexData = std::make_shared<IndexData>(buf, batch_size);
indexData->set_index_meta(index_meta);
indexData->SetFieldDataMeta(field_meta);
auto serialized_index_data = indexData->serialize_to_remote_file();
@ -627,7 +626,8 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
auto field_data =
CreateFieldData(field_meta.get_data_type(), false, dim, 0);
field_data->FillFieldData(buf, element_count);
auto insertData = std::make_shared<InsertData>(field_data);
auto payload_reader = std::make_shared<PayloadReader>(field_data);
auto insertData = std::make_shared<InsertData>(payload_reader);
insertData->SetFieldDataMeta(field_data_meta);
auto serialized_inserted_data = insertData->serialize_to_remote_file();
auto serialized_inserted_data_size = serialized_inserted_data.size();

View File

@ -206,7 +206,9 @@ class ArrayBitmapIndexTest : public testing::Test {
} else {
field_data->FillFieldData(data_.data(), data_.size());
}
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
insert_data.SetFieldDataMeta(field_meta);
insert_data.SetTimestamps(0, 100);

View File

@ -117,7 +117,9 @@ class BitmapIndexTest : public testing::Test {
} else {
field_data->FillFieldData(data_.data(), data_.size());
}
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
insert_data.SetFieldDataMeta(field_meta);
insert_data.SetTimestamps(0, 100);

View File

@ -39,7 +39,9 @@ TEST(chunk, test_int64_field) {
milvus::storage::CreateFieldData(storage::DataType::INT64);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
event_data.payload_reader = payload_reader;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
@ -75,7 +77,9 @@ TEST(chunk, test_variable_field) {
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
event_data.payload_reader = payload_reader;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
@ -108,7 +112,9 @@ TEST(chunk, test_null_field) {
uint8_t* valid_data_ = new uint8_t[1]{0x13};
field_data->FillFieldData(data.data(), valid_data_, data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
event_data.payload_reader = payload_reader;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
@ -154,7 +160,9 @@ TEST(chunk, test_array) {
milvus::storage::CreateFieldData(storage::DataType::ARRAY);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
event_data.payload_reader = payload_reader;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
@ -195,7 +203,9 @@ TEST(chunk, test_sparse_float) {
field_data->FillFieldData(vecs.get(), n_rows);
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
event_data.payload_reader = payload_reader;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
@ -262,7 +272,9 @@ TEST(chunk, multiple_chunk_mmap) {
milvus::storage::CreateFieldData(storage::DataType::INT64);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
event_data.payload_reader = payload_reader;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),

View File

@ -35,7 +35,9 @@ TEST(storage, InsertDataBool) {
milvus::storage::CreateFieldData(storage::DataType::BOOL, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -65,7 +67,9 @@ TEST(storage, InsertDataBoolNullable) {
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -98,7 +102,9 @@ TEST(storage, InsertDataInt8) {
milvus::storage::CreateFieldData(storage::DataType::INT8, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -127,7 +133,9 @@ TEST(storage, InsertDataInt8Nullable) {
uint8_t* valid_data = new uint8_t[1]{0x13};
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -158,7 +166,9 @@ TEST(storage, InsertDataInt16) {
milvus::storage::CreateFieldData(storage::DataType::INT16, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -187,7 +197,9 @@ TEST(storage, InsertDataInt16Nullable) {
uint8_t* valid_data = new uint8_t[1]{0x13};
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -218,7 +230,9 @@ TEST(storage, InsertDataInt32) {
milvus::storage::CreateFieldData(storage::DataType::INT32, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -247,7 +261,9 @@ TEST(storage, InsertDataInt32Nullable) {
uint8_t* valid_data = new uint8_t[1]{0x13};
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -278,7 +294,9 @@ TEST(storage, InsertDataInt64) {
milvus::storage::CreateFieldData(storage::DataType::INT64, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -307,7 +325,9 @@ TEST(storage, InsertDataInt64Nullable) {
uint8_t* valid_data = new uint8_t[1]{0x13};
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -339,7 +359,9 @@ TEST(storage, InsertDataString) {
milvus::storage::CreateFieldData(storage::DataType::VARCHAR, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -373,7 +395,9 @@ TEST(storage, InsertDataStringNullable) {
uint8_t* valid_data = new uint8_t[1]{0x13};
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -407,7 +431,9 @@ TEST(storage, InsertDataFloat) {
milvus::storage::CreateFieldData(storage::DataType::FLOAT, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -436,7 +462,9 @@ TEST(storage, InsertDataFloatNullable) {
std::array<uint8_t, 1> valid_data = {0x13};
field_data->FillFieldData(data.data(), valid_data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -466,7 +494,9 @@ TEST(storage, InsertDataDouble) {
milvus::storage::CreateFieldData(storage::DataType::DOUBLE, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -495,7 +525,9 @@ TEST(storage, InsertDataDoubleNullable) {
uint8_t* valid_data = new uint8_t[1]{0x13};
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -527,7 +559,9 @@ TEST(storage, InsertDataFloatVector) {
storage::DataType::VECTOR_FLOAT, false, DIM);
field_data->FillFieldData(data.data(), data.size() / DIM);
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -559,7 +593,9 @@ TEST(storage, InsertDataSparseFloat) {
storage::DataType::VECTOR_SPARSE_FLOAT, false, kTestSparseDim, n_rows);
field_data->FillFieldData(vecs.get(), n_rows);
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -598,7 +634,9 @@ TEST(storage, InsertDataBinaryVector) {
storage::DataType::VECTOR_BINARY, false, DIM);
field_data->FillFieldData(data.data(), data.size() * 8 / DIM);
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -627,7 +665,9 @@ TEST(storage, InsertDataFloat16Vector) {
storage::DataType::VECTOR_FLOAT16, false, DIM);
field_data->FillFieldData(data.data(), data.size() / DIM);
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -653,11 +693,7 @@ TEST(storage, InsertDataFloat16Vector) {
TEST(storage, IndexData) {
std::vector<uint8_t> data = {1, 2, 3, 4, 5, 6, 7, 8};
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::INT8, false);
field_data->FillFieldData(data.data(), data.size());
storage::IndexData index_data(field_data);
storage::IndexData index_data(data.data(), data.size());
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
index_data.SetFieldDataMeta(field_data_meta);
index_data.SetTimestamps(0, 100);
@ -672,11 +708,11 @@ TEST(storage, IndexData) {
ASSERT_EQ(new_index_data->GetCodecType(), storage::IndexDataType);
ASSERT_EQ(new_index_data->GetTimeRage(),
std::make_pair(Timestamp(0), Timestamp(100)));
auto new_field_data = new_index_data->GetFieldData();
ASSERT_EQ(new_field_data->get_data_type(), storage::DataType::INT8);
ASSERT_EQ(new_field_data->Size(), data.size());
ASSERT_TRUE(new_index_data->HasBinaryPayload());
std::vector<uint8_t> new_data(data.size());
memcpy(new_data.data(), new_field_data->Data(), new_field_data->DataSize());
memcpy(new_data.data(),
new_index_data->PayloadData(),
new_index_data->PayloadSize());
ASSERT_EQ(data, new_data);
}
@ -693,7 +729,9 @@ TEST(storage, InsertDataStringArray) {
milvus::storage::CreateFieldData(storage::DataType::ARRAY, false);
field_data->FillFieldData(data.data(), data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -738,7 +776,9 @@ TEST(storage, InsertDataStringArrayNullable) {
uint8_t* valid_data = new uint8_t[1]{0x01};
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);
@ -774,7 +814,9 @@ TEST(storage, InsertDataJsonNullable) {
uint8_t* valid_data = new uint8_t[1]{0x00};
field_data->FillFieldData(data.data(), valid_data, data.size());
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
storage::FieldDataMeta field_data_meta{100, 101, 102, 103};
insert_data.SetFieldDataMeta(field_data_meta);
insert_data.SetTimestamps(0, 100);

View File

@ -268,7 +268,9 @@ PrepareInsertData(const int64_t opt_field_data_range) -> std::string {
PrepareRawFieldData<NativeType>(opt_field_data_range);
auto field_data = storage::CreateFieldData(DT, false, 1, kEntityCnt);
field_data->FillFieldData(data.data(), kEntityCnt);
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
insert_data.SetFieldDataMeta(kOptVecFieldDataMeta);
insert_data.SetTimestamps(0, 100);
auto serialized_data = insert_data.Serialize(storage::StorageType::Remote);

View File

@ -119,7 +119,9 @@ class HybridIndexTestV1 : public testing::Test {
} else {
field_data->FillFieldData(data_.data(), data_.size());
}
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
insert_data.SetFieldDataMeta(field_meta);
insert_data.SetTimestamps(0, 100);

View File

@ -161,7 +161,9 @@ test_run() {
field_data->FillFieldData(data.data(), data.size());
}
// std::cout << "length:" << field_data->get_num_rows() << std::endl;
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
insert_data.SetFieldDataMeta(field_meta);
insert_data.SetTimestamps(0, 100);
@ -436,7 +438,9 @@ test_string() {
} else {
field_data->FillFieldData(data.data(), data.size());
}
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
insert_data.SetFieldDataMeta(field_meta);
insert_data.SetTimestamps(0, 100);

View File

@ -91,7 +91,9 @@ class JsonKeyStatsIndexTest : public ::testing::TestWithParam<bool> {
field_data->FillFieldData(data_.data(), data_.size());
}
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
insert_data.SetFieldDataMeta(field_meta);
insert_data.SetTimestamps(0, 100);

View File

@ -196,7 +196,9 @@ test_run() {
}
auto field_data = storage::CreateFieldData(dtype, false, dim);
field_data->FillFieldData(data_gen.data(), data_gen.size() / dim);
storage::InsertData insert_data(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
insert_data.SetFieldDataMeta(field_meta);
insert_data.SetTimestamps(0, 100);
auto serialized_bytes = insert_data.Serialize(storage::Remote);

View File

@ -72,7 +72,9 @@ PrepareInsertBinlog(int64_t collection_id,
auto SaveFieldData = [&](const FieldDataPtr field_data,
const std::string& file,
const int64_t field_id) {
auto insert_data = std::make_shared<InsertData>(field_data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
auto insert_data = std::make_shared<InsertData>(payload_reader);
FieldDataMeta field_data_meta{
collection_id, partition_id, segment_id, field_id};
insert_data->SetFieldDataMeta(field_data_meta);