Add column-based insert interface in segcore (#16100)

Signed-off-by: Letian Jiang <letian.jiang@zilliz.com>
pull/16184/head
Letian Jiang 2022-03-23 19:05:25 +08:00 committed by GitHub
parent 1c4b949a1d
commit 321105cc01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 264 additions and 0 deletions

View File

@ -150,6 +150,43 @@ Insert(CSegmentInterface c_segment,
}
}
CStatus
InsertColumnData(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
const uint64_t* timestamps,
void* raw_data,
int64_t count) {
try {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
milvus::segcore::ColumnBasedRawData dataChunk{};
auto& schema = segment->get_schema();
auto sizeof_infos = schema.get_sizeof_infos();
dataChunk.columns_ = std::vector<milvus::aligned_vector<uint8_t>>(schema.size());
// reverse space for each field
for (int fid = 0; fid < schema.size(); ++fid) {
auto len = sizeof_infos[fid];
dataChunk.columns_[fid].resize(len * size);
}
auto col_data = reinterpret_cast<const char*>(raw_data);
int64_t offset = 0;
for (int fid = 0; fid < schema.size(); ++fid) {
auto len = sizeof_infos[fid] * size;
auto src = col_data + offset;
auto dst = dataChunk.columns_[fid].data();
memcpy(dst, src, len);
offset += len;
}
dataChunk.count = count;
segment->Insert(reserved_offset, size, row_ids, timestamps, dataChunk);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
CStatus
PreInsert(CSegmentInterface c_segment, int64_t size, int64_t* offset) {
try {

View File

@ -71,6 +71,15 @@ Insert(CSegmentInterface c_segment,
int sizeof_per_row,
int64_t count);
CStatus
InsertColumnData(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
const uint64_t* timestamps,
void* raw_data,
int64_t count);
CStatus
PreInsert(CSegmentInterface c_segment, int64_t size, int64_t* offset);

View File

@ -103,6 +103,29 @@ generate_data(int N) {
return std::make_tuple(raw_data, timestamps, uids);
}
auto
generate_column_data(int N) {
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
std::default_random_engine e(42);
std::normal_distribution<> dis(0.0, 1.0);
for (int i = 0; i < N; ++i) {
uids.push_back(10 * N + i);
timestamps.push_back(0);
float vec[DIM];
for (auto& x : vec) {
x = dis(e);
}
raw_data.insert(raw_data.end(), (const char*)std::begin(vec), (const char*)std::end(vec));
}
for (int i = 0; i < N; ++i) {
int age = e() % 100;
raw_data.insert(raw_data.end(), (const char*)&age, ((const char*)&age) + sizeof(age));
}
return std::make_tuple(raw_data, timestamps, uids);
}
std::string
generate_query_data(int nq) {
namespace ser = milvus::proto::milvus;
@ -216,6 +239,23 @@ TEST(CApiTest, InsertTest) {
DeleteSegment(segment);
}
TEST(CApiTest, InsertColumnDataTest) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
int N = 10000;
auto [raw_data, timestamps, uids] = generate_column_data(N);
int64_t offset;
PreInsert(segment, N, &offset);
auto res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N);
assert(res.error_code == Success);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, DeleteTest) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
@ -300,6 +340,73 @@ TEST(CApiTest, SearchTest) {
DeleteSegment(segment);
}
TEST(CApiTest, SearchTest2) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
int N = 10000;
auto [raw_data, timestamps, uids] = generate_column_data(N);
int64_t ts_offset = 1000;
for (int i = 0; i < N; i++) {
timestamps[i] = ts_offset + i;
}
int64_t offset;
PreInsert(segment, N, &offset);
auto ins_res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N);
ASSERT_EQ(ins_res.error_code, Success);
const char* dsl_string = R"(
{
"bool": {
"vector": {
"fakevec": {
"metric_type": "L2",
"params": {
"nprobe": 10
},
"query": "$0",
"topk": 10,
"round_decimal": 3
}
}
}
})";
int num_queries = 10;
auto blob = generate_query_data(num_queries);
void* plan = nullptr;
auto status = CreateSearchPlan(collection, dsl_string, &plan);
ASSERT_EQ(status.error_code, Success);
void* placeholderGroup = nullptr;
status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup);
ASSERT_EQ(status.error_code, Success);
std::vector<CPlaceholderGroup> placeholderGroups;
placeholderGroups.push_back(placeholderGroup);
timestamps.clear();
timestamps.push_back(1);
CSearchResult search_result;
auto res = Search(segment, plan, placeholderGroup, N + ts_offset, &search_result, -1);
ASSERT_EQ(res.error_code, Success);
CSearchResult search_result2;
auto res2 = Search(segment, plan, placeholderGroup, ts_offset, &search_result2, -1);
ASSERT_EQ(res2.error_code, Success);
DeleteSearchPlan(plan);
DeletePlaceholderGroup(placeholderGroup);
DeleteSearchResult(search_result);
DeleteSearchResult(search_result2);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, SearchTestWithExpr) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
@ -352,6 +459,57 @@ TEST(CApiTest, SearchTestWithExpr) {
DeleteSegment(segment);
}
TEST(CApiTest, SearchTestWithExpr2) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
int N = 10000;
auto [raw_data, timestamps, uids] = generate_column_data(N);
int64_t offset;
PreInsert(segment, N, &offset);
auto ins_res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N);
ASSERT_EQ(ins_res.error_code, Success);
const char* serialized_expr_plan = R"(vector_anns: <
field_id: 100
query_info: <
topk: 10
metric_type: "L2"
search_params: "{\"nprobe\": 10}"
>
placeholder_tag: "$0"
>)";
int num_queries = 10;
auto blob = generate_query_data(num_queries);
void* plan = nullptr;
auto binary_plan = translate_text_plan_to_binary_plan(serialized_expr_plan);
auto status = CreateSearchPlanByExpr(collection, binary_plan.data(), binary_plan.size(), &plan);
ASSERT_EQ(status.error_code, Success);
void* placeholderGroup = nullptr;
status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup);
ASSERT_EQ(status.error_code, Success);
std::vector<CPlaceholderGroup> placeholderGroups;
placeholderGroups.push_back(placeholderGroup);
timestamps.clear();
timestamps.push_back(1);
CSearchResult search_result;
auto res = Search(segment, plan, placeholderGroup, timestamps[0], &search_result, -1);
ASSERT_EQ(res.error_code, Success);
DeleteSearchPlan(plan);
DeletePlaceholderGroup(placeholderGroup);
DeleteSearchResult(search_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, RetrieveTestWithExpr) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
@ -388,6 +546,41 @@ TEST(CApiTest, RetrieveTestWithExpr) {
DeleteSegment(segment);
}
TEST(CApiTest, RetrieveTestWithExpr2) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
int N = 10000;
auto [raw_data, timestamps, uids] = generate_column_data(N);
int64_t offset;
PreInsert(segment, N, &offset);
auto ins_res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N);
ASSERT_EQ(ins_res.error_code, Success);
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
auto plan = std::make_unique<query::RetrievePlan>(*schema);
// create retrieve plan "age in [0]"
std::vector<int64_t> values(1, 0);
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldOffset(1), DataType::INT32, values);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldOffset> target_offsets{FieldOffset(0), FieldOffset(1)};
plan->field_offsets_ = target_offsets;
CRetrieveResult retrieve_result;
auto res = Retrieve(segment, plan.release(), timestamps[0], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
@ -414,6 +607,31 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) {
DeleteSegment(segment);
}
TEST(CApiTest, GetMemoryUsageInBytesTest2) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
auto old_memory_usage_size = GetMemoryUsageInBytes(segment);
// std::cout << "old_memory_usage_size = " << old_memory_usage_size << std::endl;
assert(old_memory_usage_size == 0);
int N = 10000;
auto [raw_data, timestamps, uids] = generate_column_data(N);
int64_t offset;
PreInsert(segment, N, &offset);
auto res = InsertColumnData(segment, offset, N, uids.data(), timestamps.data(), raw_data.data(), N);
assert(res.error_code == Success);
auto memory_usage_size = GetMemoryUsageInBytes(segment);
// std::cout << "new_memory_usage_size = " << memory_usage_size << std::endl;
assert(memory_usage_size == 2785280);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, GetDeletedCountTest) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);