mirror of https://github.com/milvus-io/milvus.git
enhance: reduce 1x copy while retrieving data from growing segment (#28323)
Signed-off-by: yah01 <yah2er0ne@outlook.com>pull/28349/head
parent
70995383bf
commit
267c67dfee
|
@ -23,6 +23,7 @@
|
|||
#include "common/EasyAssert.h"
|
||||
#include "common/Types.h"
|
||||
#include "fmt/format.h"
|
||||
#include "log/Log.h"
|
||||
#include "nlohmann/json.hpp"
|
||||
#include "query/PlanNode.h"
|
||||
#include "query/SearchOnSealed.h"
|
||||
|
@ -347,42 +348,52 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id,
|
|||
auto vec_ptr = insert_record_.get_field_data_base(field_id);
|
||||
auto& field_meta = schema_->operator[](field_id);
|
||||
if (field_meta.is_vector()) {
|
||||
aligned_vector<char> output(field_meta.get_sizeof() * count);
|
||||
auto result = CreateVectorDataArray(count, field_meta);
|
||||
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
|
||||
bulk_subscript_impl<FloatVector>(field_id,
|
||||
field_meta.get_sizeof(),
|
||||
vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
output.data());
|
||||
result->mutable_vectors()
|
||||
->mutable_float_vector()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
} else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) {
|
||||
bulk_subscript_impl<BinaryVector>(field_id,
|
||||
bulk_subscript_impl<BinaryVector>(
|
||||
field_id,
|
||||
field_meta.get_sizeof(),
|
||||
vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
output.data());
|
||||
result->mutable_vectors()->mutable_binary_vector()->data());
|
||||
} else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) {
|
||||
bulk_subscript_impl<Float16Vector>(field_id,
|
||||
bulk_subscript_impl<Float16Vector>(
|
||||
field_id,
|
||||
field_meta.get_sizeof(),
|
||||
vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
output.data());
|
||||
result->mutable_vectors()->mutable_float16_vector()->data());
|
||||
} else {
|
||||
PanicInfo(DataTypeInvalid, "logical error");
|
||||
}
|
||||
return CreateVectorDataArrayFrom(output.data(), count, field_meta);
|
||||
return result;
|
||||
}
|
||||
|
||||
AssertInfo(!field_meta.is_vector(),
|
||||
"Scalar field meta type is vector type");
|
||||
switch (field_meta.get_data_type()) {
|
||||
case DataType::BOOL: {
|
||||
FixedVector<bool> output(count);
|
||||
bulk_subscript_impl<bool>(
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto result = CreateScalarDataArray(count, field_meta);
|
||||
bulk_subscript_impl<bool>(vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
result->mutable_scalars()
|
||||
->mutable_bool_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return result;
|
||||
}
|
||||
case DataType::INT8: {
|
||||
FixedVector<int8_t> output(count);
|
||||
|
@ -397,28 +408,48 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id,
|
|||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::INT32: {
|
||||
FixedVector<int32_t> output(count);
|
||||
bulk_subscript_impl<int32_t>(
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto result = CreateScalarDataArray(count, field_meta);
|
||||
bulk_subscript_impl<int32_t>(vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
result->mutable_scalars()
|
||||
->mutable_int_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return result;
|
||||
}
|
||||
case DataType::INT64: {
|
||||
FixedVector<int64_t> output(count);
|
||||
bulk_subscript_impl<int64_t>(
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto result = CreateScalarDataArray(count, field_meta);
|
||||
bulk_subscript_impl<int64_t>(vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
result->mutable_scalars()
|
||||
->mutable_long_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return result;
|
||||
}
|
||||
case DataType::FLOAT: {
|
||||
FixedVector<float> output(count);
|
||||
bulk_subscript_impl<float>(
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto result = CreateScalarDataArray(count, field_meta);
|
||||
bulk_subscript_impl<float>(vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
result->mutable_scalars()
|
||||
->mutable_float_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return result;
|
||||
}
|
||||
case DataType::DOUBLE: {
|
||||
FixedVector<double> output(count);
|
||||
bulk_subscript_impl<double>(
|
||||
vec_ptr, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto result = CreateScalarDataArray(count, field_meta);
|
||||
bulk_subscript_impl<double>(vec_ptr,
|
||||
seg_offsets,
|
||||
count,
|
||||
result->mutable_scalars()
|
||||
->mutable_double_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return result;
|
||||
}
|
||||
case DataType::VARCHAR: {
|
||||
FixedVector<std::string> output(count);
|
||||
|
|
|
@ -100,6 +100,7 @@ TEST(Growing, RealCount) {
|
|||
TEST(Growing, FillData) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
auto bool_field = schema->AddDebugField("bool", DataType::BOOL);
|
||||
auto int8_field = schema->AddDebugField("int8", DataType::INT8);
|
||||
auto int16_field = schema->AddDebugField("int16", DataType::INT16);
|
||||
auto int32_field = schema->AddDebugField("int32", DataType::INT32);
|
||||
|
@ -145,6 +146,7 @@ TEST(Growing, FillData) {
|
|||
int64_t dim = 128;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema, per_batch);
|
||||
auto bool_values = dataset.get_col<bool>(bool_field);
|
||||
auto int8_values = dataset.get_col<int8_t>(int8_field);
|
||||
auto int16_values = dataset.get_col<int16_t>(int16_field);
|
||||
auto int32_values = dataset.get_col<int32_t>(int32_field);
|
||||
|
@ -172,6 +174,8 @@ TEST(Growing, FillData) {
|
|||
dataset.raw_);
|
||||
auto num_inserted = (i + 1) * per_batch;
|
||||
auto ids_ds = GenRandomIds(num_inserted);
|
||||
auto bool_result =
|
||||
segment->bulk_subscript(bool_field, ids_ds->GetIds(), num_inserted);
|
||||
auto int8_result =
|
||||
segment->bulk_subscript(int8_field, ids_ds->GetIds(), num_inserted);
|
||||
auto int16_result = segment->bulk_subscript(
|
||||
|
@ -203,6 +207,7 @@ TEST(Growing, FillData) {
|
|||
auto vec_result =
|
||||
segment->bulk_subscript(vec, ids_ds->GetIds(), num_inserted);
|
||||
|
||||
EXPECT_EQ(bool_result->scalars().bool_data().data_size(), num_inserted);
|
||||
EXPECT_EQ(int8_result->scalars().int_data().data_size(), num_inserted);
|
||||
EXPECT_EQ(int16_result->scalars().int_data().data_size(), num_inserted);
|
||||
EXPECT_EQ(int32_result->scalars().int_data().data_size(), num_inserted);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include <map>
|
||||
#include <tuple>
|
||||
|
||||
#include "common/Types.h"
|
||||
#include "indexbuilder/IndexFactory.h"
|
||||
#include "indexbuilder/VecIndexCreator.h"
|
||||
#include "common/QueryResult.h"
|
||||
|
@ -103,8 +104,8 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> {
|
|||
bool is_binary;
|
||||
DataType vec_field_data_type;
|
||||
knowhere::DataSetPtr xb_dataset;
|
||||
std::vector<float> xb_data;
|
||||
std::vector<uint8_t> xb_bin_data;
|
||||
FixedVector<float> xb_data;
|
||||
FixedVector<uint8_t> xb_bin_data;
|
||||
knowhere::DataSetPtr xq_dataset;
|
||||
int64_t query_offset = 100;
|
||||
int64_t NB = 10000;
|
||||
|
@ -141,8 +142,8 @@ TEST_P(IndexWrapperTest, BuildAndQuery) {
|
|||
|
||||
auto dataset = GenDataset(NB, metric_type, is_binary);
|
||||
knowhere::DataSetPtr xb_dataset;
|
||||
std::vector<uint8_t> bin_vecs;
|
||||
std::vector<float> f_vecs;
|
||||
FixedVector<uint8_t> bin_vecs;
|
||||
FixedVector<float> f_vecs;
|
||||
if (is_binary) {
|
||||
bin_vecs = dataset.get_col<uint8_t>(milvus::FieldId(100));
|
||||
xb_dataset = knowhere::GenDataSet(NB, DIM, bin_vecs.data());
|
||||
|
@ -153,7 +154,7 @@ TEST_P(IndexWrapperTest, BuildAndQuery) {
|
|||
|
||||
ASSERT_NO_THROW(index->Build(xb_dataset));
|
||||
auto binary_set = index->Serialize();
|
||||
std::vector<std::string> index_files;
|
||||
FixedVector<std::string> index_files;
|
||||
for (auto& binary : binary_set.binary_map_) {
|
||||
index_files.emplace_back(binary.first);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/Types.h"
|
||||
#include "knowhere/comp/index_param.h"
|
||||
#include "nlohmann/json.hpp"
|
||||
#include "query/SearchBruteForce.h"
|
||||
|
@ -356,8 +357,8 @@ class IndexTest : public ::testing::TestWithParam<Param> {
|
|||
milvus::Config range_search_conf;
|
||||
milvus::DataType vec_field_data_type;
|
||||
knowhere::DataSetPtr xb_dataset;
|
||||
std::vector<float> xb_data;
|
||||
std::vector<uint8_t> xb_bin_data;
|
||||
FixedVector<float> xb_data;
|
||||
FixedVector<uint8_t> xb_bin_data;
|
||||
knowhere::DataSetPtr xq_dataset;
|
||||
int64_t query_offset = 100;
|
||||
int64_t NB = 3000;
|
||||
|
@ -610,7 +611,7 @@ TEST(Indexing, SearchDiskAnnWithInvalidParam) {
|
|||
|
||||
// build disk ann index
|
||||
auto dataset = GenDataset(NB, metric_type, false);
|
||||
std::vector<float> xb_data =
|
||||
FixedVector<float> xb_data =
|
||||
dataset.get_col<float>(milvus::FieldId(field_id));
|
||||
knowhere::DataSetPtr xb_dataset =
|
||||
knowhere::GenDataSet(NB, DIM, xb_data.data());
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "Constants.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/Schema.h"
|
||||
#include "common/Types.h"
|
||||
#include "index/ScalarIndexSort.h"
|
||||
#include "index/StringIndexSort.h"
|
||||
#include "index/VectorMemIndex.h"
|
||||
|
@ -81,9 +82,9 @@ struct GeneratedData {
|
|||
}
|
||||
|
||||
template <typename T>
|
||||
std::vector<T>
|
||||
FixedVector<T>
|
||||
get_col(FieldId field_id) const {
|
||||
std::vector<T> ret(raw_->num_rows());
|
||||
FixedVector<T> ret(raw_->num_rows());
|
||||
for (auto i = 0; i < raw_->fields_data_size(); i++) {
|
||||
auto target_field_data = raw_->fields_data(i);
|
||||
if (field_id.get() != target_field_data.field_id()) {
|
||||
|
|
Loading…
Reference in New Issue