mirror of https://github.com/milvus-io/milvus.git
Reduce 1x memory copy for retrieving data (#28106)
Signed-off-by: yah01 <yah2er0ne@outlook.com>pull/28184/head
parent
33f17ae5bd
commit
863e26969a
|
@ -1032,7 +1032,6 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
|
|||
// we have to clone the shared pointer,
|
||||
// to make sure it won't get released if segment released
|
||||
auto column = fields_.at(field_id);
|
||||
|
||||
if (datatype_is_variable(field_meta.get_data_type())) {
|
||||
switch (field_meta.get_data_type()) {
|
||||
case DataType::VARCHAR:
|
||||
|
@ -1071,10 +1070,15 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
|
|||
auto src_vec = column->Data();
|
||||
switch (field_meta.get_data_type()) {
|
||||
case DataType::BOOL: {
|
||||
FixedVector<bool> output(count);
|
||||
bulk_subscript_impl<bool>(
|
||||
src_vec, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto ret = fill_with_empty(field_id, count);
|
||||
bulk_subscript_impl<bool>(src_vec,
|
||||
seg_offsets,
|
||||
count,
|
||||
ret->mutable_scalars()
|
||||
->mutable_bool_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return ret;
|
||||
}
|
||||
case DataType::INT8: {
|
||||
FixedVector<int8_t> output(count);
|
||||
|
@ -1089,40 +1093,81 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
|
|||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
}
|
||||
case DataType::INT32: {
|
||||
FixedVector<int32_t> output(count);
|
||||
bulk_subscript_impl<int32_t>(
|
||||
src_vec, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto ret = fill_with_empty(field_id, count);
|
||||
bulk_subscript_impl<int32_t>(src_vec,
|
||||
seg_offsets,
|
||||
count,
|
||||
ret->mutable_scalars()
|
||||
->mutable_int_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return ret;
|
||||
}
|
||||
case DataType::INT64: {
|
||||
FixedVector<int64_t> output(count);
|
||||
bulk_subscript_impl<int64_t>(
|
||||
src_vec, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto ret = fill_with_empty(field_id, count);
|
||||
bulk_subscript_impl<int64_t>(src_vec,
|
||||
seg_offsets,
|
||||
count,
|
||||
ret->mutable_scalars()
|
||||
->mutable_long_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return ret;
|
||||
}
|
||||
case DataType::FLOAT: {
|
||||
FixedVector<float> output(count);
|
||||
bulk_subscript_impl<float>(
|
||||
src_vec, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto ret = fill_with_empty(field_id, count);
|
||||
bulk_subscript_impl<float>(src_vec,
|
||||
seg_offsets,
|
||||
count,
|
||||
ret->mutable_scalars()
|
||||
->mutable_float_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return ret;
|
||||
}
|
||||
case DataType::DOUBLE: {
|
||||
FixedVector<double> output(count);
|
||||
bulk_subscript_impl<double>(
|
||||
src_vec, seg_offsets, count, output.data());
|
||||
return CreateScalarDataArrayFrom(output.data(), count, field_meta);
|
||||
auto ret = fill_with_empty(field_id, count);
|
||||
bulk_subscript_impl<double>(src_vec,
|
||||
seg_offsets,
|
||||
count,
|
||||
ret->mutable_scalars()
|
||||
->mutable_double_data()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return ret;
|
||||
}
|
||||
|
||||
case DataType::VECTOR_FLOAT:
|
||||
case DataType::VECTOR_FLOAT16:
|
||||
case DataType::VECTOR_BINARY: {
|
||||
aligned_vector<char> output(field_meta.get_sizeof() * count);
|
||||
case DataType::VECTOR_FLOAT: {
|
||||
auto ret = fill_with_empty(field_id, count);
|
||||
bulk_subscript_impl(field_meta.get_sizeof(),
|
||||
src_vec,
|
||||
seg_offsets,
|
||||
count,
|
||||
output.data());
|
||||
return CreateVectorDataArrayFrom(output.data(), count, field_meta);
|
||||
ret->mutable_vectors()
|
||||
->mutable_float_vector()
|
||||
->mutable_data()
|
||||
->mutable_data());
|
||||
return ret;
|
||||
}
|
||||
case DataType::VECTOR_FLOAT16: {
|
||||
auto ret = fill_with_empty(field_id, count);
|
||||
bulk_subscript_impl(
|
||||
field_meta.get_sizeof(),
|
||||
src_vec,
|
||||
seg_offsets,
|
||||
count,
|
||||
ret->mutable_vectors()->mutable_float16_vector()->data());
|
||||
return ret;
|
||||
}
|
||||
case DataType::VECTOR_BINARY: {
|
||||
auto ret = fill_with_empty(field_id, count);
|
||||
bulk_subscript_impl(
|
||||
field_meta.get_sizeof(),
|
||||
src_vec,
|
||||
seg_offsets,
|
||||
count,
|
||||
ret->mutable_vectors()->mutable_binary_vector()->data());
|
||||
return ret;
|
||||
}
|
||||
|
||||
default: {
|
||||
|
|
|
@ -279,6 +279,8 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) {
|
|||
case DataType::ARRAY: {
|
||||
auto obj = scalar_array->mutable_array_data();
|
||||
obj->mutable_data()->Reserve(count);
|
||||
obj->set_element_type(static_cast<milvus::proto::schema::DataType>(
|
||||
field_meta.get_element_type()));
|
||||
for (int i = 0; i < count; i++) {
|
||||
*(obj->mutable_data()->Add()) = proto::schema::ScalarField();
|
||||
}
|
||||
|
@ -406,6 +408,8 @@ CreateScalarDataArrayFrom(const void* data_raw,
|
|||
case DataType::ARRAY: {
|
||||
auto data = reinterpret_cast<const ScalarArray*>(data_raw);
|
||||
auto obj = scalar_array->mutable_array_data();
|
||||
obj->set_element_type(static_cast<milvus::proto::schema::DataType>(
|
||||
field_meta.get_element_type()));
|
||||
for (auto i = 0; i < count; i++) {
|
||||
*(obj->mutable_data()->Add()) = data[i];
|
||||
}
|
||||
|
|
|
@ -11,6 +11,8 @@
|
|||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "common/Types.h"
|
||||
#include "knowhere/comp/index_param.h"
|
||||
#include "query/Expr.h"
|
||||
#include "query/ExprImpl.h"
|
||||
#include "segcore/ScalarIndex.h"
|
||||
|
@ -291,6 +293,50 @@ TEST(Retrieve, Limit) {
|
|||
Assert(field2.vectors().float_vector().data_size() == N * DIM);
|
||||
}
|
||||
|
||||
TEST(Retrieve, FillEntry) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
|
||||
auto DIM = 16;
|
||||
auto fid_bool = schema->AddDebugField("bool", DataType::BOOL);
|
||||
auto fid_f32 = schema->AddDebugField("f32", DataType::FLOAT);
|
||||
auto fid_f64 = schema->AddDebugField("f64", DataType::DOUBLE);
|
||||
auto fid_vec32 = schema->AddDebugField(
|
||||
"vector_32", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
|
||||
auto fid_vecbin = schema->AddDebugField(
|
||||
"vec_bin", DataType::VECTOR_BINARY, DIM, knowhere::metric::L2);
|
||||
schema->set_primary_field_id(fid_64);
|
||||
|
||||
int64_t N = 101;
|
||||
auto dataset = DataGen(schema, N, 42);
|
||||
auto segment = CreateSealedSegment(schema);
|
||||
SealedLoadFieldData(dataset, *segment);
|
||||
|
||||
auto plan = std::make_unique<query::RetrievePlan>(*schema);
|
||||
auto term_expr = std::make_unique<query::UnaryRangeExprImpl<int64_t>>(
|
||||
milvus::query::ColumnInfo(
|
||||
fid_64, DataType::INT64, std::vector<std::string>()),
|
||||
OpType::GreaterEqual,
|
||||
0,
|
||||
proto::plan::GenericValue::kInt64Val);
|
||||
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
|
||||
plan->plan_node_->predicate_ = std::move(term_expr);
|
||||
|
||||
// test query results exceed the limit size
|
||||
std::vector<FieldId> target_fields{TimestampFieldID,
|
||||
fid_64,
|
||||
fid_bool,
|
||||
fid_f32,
|
||||
fid_f64,
|
||||
fid_vec32,
|
||||
fid_vecbin};
|
||||
plan->field_ids_ = target_fields;
|
||||
EXPECT_THROW(segment->Retrieve(plan.get(), N, 1), std::runtime_error);
|
||||
|
||||
auto retrieve_results =
|
||||
segment->Retrieve(plan.get(), N, DEFAULT_MAX_OUTPUT_SIZE);
|
||||
Assert(retrieve_results->fields_data_size() == target_fields.size());
|
||||
}
|
||||
|
||||
TEST(Retrieve, LargeTimestamp) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
|
||||
|
|
Loading…
Reference in New Issue