Fix query empty when insert same pk after deletion (#17222)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/17183/head
xige-16 2022-05-25 22:06:00 +08:00 committed by GitHub
parent 2c9ffce8bc
commit 7d810ac159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 229 additions and 7 deletions

View File

@ -305,14 +305,25 @@ get_deleted_bitmap(int64_t del_barrier,
auto [iter_b, iter_e] = pk2offset.equal_range(pk);
for (auto iter = iter_b; iter != iter_e; ++iter) {
auto insert_row_offset = iter->second;
// for now, insert_barrier == insert count of segment, so this Assert will always work
AssertInfo(insert_row_offset < insert_barrier, "Timestamp offset is larger than insert barrier");
if (delete_record.timestamps_[del_index] > query_timestamp) {
// the deletion record do not take effect in search/query, and reset bitmap to 0
// insert after delete with same pk, delete will not task effect on this insert record
// and reset bitmap to 0
if (insert_record.timestamps_[insert_row_offset] > delete_record.timestamps_[del_index]) {
bitmap->reset(insert_row_offset);
} else {
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
continue;
}
// the deletion record do not take effect in search/query
// and reset bitmap to 0
if (delete_record.timestamps_[del_index] > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}
delete_record.insert_lru_entry(current);

View File

@ -577,6 +577,154 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
DeleteSegment(segment);
}
TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
auto col = (milvus::segcore::Collection*)collection;
int N = 10;
auto dataset = DataGen(col->get_schema(), N);
auto insert_data = serialize(dataset.raw_);
// first insert data
// insert data with pks = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} , timestamps = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
int64_t offset;
PreInsert(segment, N, &offset);
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(res.error_code == Success);
// delete data pks = {1, 2, 3}, timestamps = {9, 9, 9}
std::vector<int64_t> delete_row_ids = {1, 2, 3};
auto ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(delete_row_ids.begin(), delete_row_ids.end());
auto delete_data = serialize(ids.get());
std::vector<uint64_t> delete_timestamps(3, dataset.timestamps_[N - 1]);
offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_data.data(), delete_data.size(), delete_timestamps.data());
assert(del_res.error_code == Success);
// create retrieve plan pks in {1, 2, 3}, timestamp = 9
std::vector<int64_t> retrive_row_ids = {1, 2, 3};
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
auto plan = std::make_unique<query::RetrievePlan>(*schema);
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldId(101), DataType::INT64, retrive_row_ids);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
// second insert data
// insert data with pks = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} , timestamps = {10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
dataset = DataGen(col->get_schema(), N, 42, N);
insert_data = serialize(dataset.raw_);
PreInsert(segment, N, &offset);
res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(res.error_code == Success);
// retrieve pks in {1, 2, 3}, timestamp = 19
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 3);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Sealed, -1);
auto col = (milvus::segcore::Collection*)collection;
int N = 10;
auto dataset = DataGen(col->get_schema(), N, 42, 0, 2);
// insert data with pks = {0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5} , timestamps = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
for (auto& [field_id, field_meta] : col->get_schema()->get_fields()) {
auto array = dataset.get_col(field_id);
auto data = serialize(array.get());
auto load_info = CLoadFieldDataInfo{field_id.get(), data.data(), data.size(), N};
auto res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
auto count = GetRowCount(segment);
assert(count == N);
}
FieldMeta ts_field_meta(FieldName("Timestamp"), TimestampFieldID, DataType::INT64);
auto ts_array = CreateScalarDataArrayFrom(dataset.timestamps_.data(), N, ts_field_meta);
auto ts_data = serialize(ts_array.get());
auto load_info = CLoadFieldDataInfo{TimestampFieldID.get(), ts_data.data(), ts_data.size(), N};
auto res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
auto count = GetRowCount(segment);
assert(count == N);
FieldMeta row_id_field_meta(FieldName("RowID"), RowFieldID, DataType::INT64);
auto row_id_array = CreateScalarDataArrayFrom(dataset.row_ids_.data(), N, row_id_field_meta);
auto row_id_data = serialize(row_id_array.get());
load_info = CLoadFieldDataInfo{RowFieldID.get(), row_id_data.data(), row_id_data.size(), N};
res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
count = GetRowCount(segment);
assert(count == N);
// delete data pks = {1, 2, 3}, timestamps = {4, 4, 4}
std::vector<int64_t> delete_row_ids = {1, 2, 3};
auto ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(delete_row_ids.begin(), delete_row_ids.end());
auto delete_data = serialize(ids.get());
std::vector<uint64_t> delete_timestamps(3, dataset.timestamps_[4]);
auto offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_data.data(), delete_data.size(), delete_timestamps.data());
assert(del_res.error_code == Success);
// create retrieve plan pks in {1, 2, 3}, timestamp = 9
std::vector<int64_t> retrive_row_ids = {1, 2, 3};
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
auto plan = std::make_unique<query::RetrievePlan>(*schema);
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldId(101), DataType::INT64, retrive_row_ids);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 3);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, SearchTest) {
auto c_collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(c_collection, Growing, -1);

View File

@ -15,7 +15,7 @@
#include "common/Utils.h"
#include "query/Utils.h"
#include "segcore/Utils.h"
#include "test_utils/DataGen.h"
TEST(Util, FaissMetricTypeToString) {
using namespace milvus::segcore;
@ -54,3 +54,67 @@ TEST(Util, StringMatch) {
ASSERT_FALSE(PrefixMatch("dontmatch", "prefix"));
ASSERT_FALSE(PostfixMatch("dontmatch", "postfix"));
}
TEST(Util, GetDeleteBitmap) {
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, MetricType::METRIC_L2);
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
schema->set_primary_field_id(i64_fid);
auto N = 10;
Pk2OffsetType pk2offset;
InsertRecord insert_record(*schema, N);
DeletedRecord delete_record;
// fill insert record, all insert records has same pk = 1, timestamps= {1 ... N}
std::vector<int64_t> age_data(N);
std::vector<Timestamp> tss(N);
for (int i = 0; i < N; ++i) {
age_data[i] = 1;
tss[i] = i + 1;
pk2offset.insert(std::make_pair(1, i));
}
auto insert_offset = insert_record.reserved.fetch_add(N);
insert_record.timestamps_.fill_chunk_data(tss.data(), N);
auto field_data = insert_record.get_field_data_base(i64_fid);
field_data->fill_chunk_data(age_data.data(), N);
insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N);
// test case delete pk1(ts = 0) -> insert repeated pk1 (ts = {1 ... N}) -> query (ts = N)
std::vector<Timestamp> delete_ts = {0};
std::vector<PkType> delete_pk = {1};
auto offset = delete_record.reserved.fetch_add(1);
delete_record.timestamps_.set_data_raw(offset, delete_ts.data(), 1);
delete_record.pks_.set_data_raw(offset, delete_pk.data(), 1);
delete_record.ack_responder_.AddSegment(offset, offset + 1);
auto query_timestamp = tss[N - 1];
auto del_barrier = get_barrier(delete_record, query_timestamp);
auto insert_barrier = get_barrier(insert_record, query_timestamp);
auto res_bitmap =
get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, pk2offset, query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0);
// test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N)
delete_ts = {uint64_t(N)};
delete_pk = {1};
offset = delete_record.reserved.fetch_add(1);
delete_record.timestamps_.set_data_raw(offset, delete_ts.data(), 1);
delete_record.pks_.set_data_raw(offset, delete_pk.data(), 1);
delete_record.ack_responder_.AddSegment(offset, offset + 1);
del_barrier = get_barrier(delete_record, query_timestamp);
res_bitmap =
get_deleted_bitmap(del_barrier, insert_barrier, delete_record, insert_record, pk2offset, query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), N);
// test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N/2)
query_timestamp = tss[N - 1] / 2;
del_barrier = get_barrier(delete_record, query_timestamp);
res_bitmap = get_deleted_bitmap(del_barrier, N, delete_record, insert_record, pk2offset, query_timestamp);
ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0);
}

View File

@ -780,7 +780,6 @@ class TestDeleteOperation(TestcaseBase):
collection_w.query(tmp_expr, output_fields=[ct.default_float_vec_field_name],
check_task=CheckTasks.check_query_results, check_items={'exp_res': res, 'with_vec': True})
@pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/15744")
@pytest.mark.parametrize("to_query", [True, False])
def test_delete_insert_same_id_sealed(self, to_query):
"""