mirror of https://github.com/milvus-io/milvus.git
fix: fix query count(*) concurrently (#35007)
#34778 #34849 fix two problems: 1. count(*) incorrect, if growing insert duplicated (pk, timestamp) pairs that pk and timestamp all same, need to keep just one pair. 2. count(*) may core dump, if get_real_count interface get snapshot and do mvcc at not consistency status, mainly happens under concurrency. Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>pull/35078/head
parent
972752258a
commit
86322e0468
|
@ -58,15 +58,29 @@ class DeletedRecord {
|
|||
int64_t removed_num = 0;
|
||||
int64_t mem_add = 0;
|
||||
for (size_t i = 0; i < pks.size(); ++i) {
|
||||
auto offsets = insert_record_->search_pk(pks[i], timestamps[i]);
|
||||
auto delete_pk = pks[i];
|
||||
auto delete_timestamp = timestamps[i];
|
||||
auto offsets =
|
||||
insert_record_->search_pk(delete_pk, delete_timestamp);
|
||||
bool has_duplicate_pk_timestamps = false;
|
||||
for (auto offset : offsets) {
|
||||
int64_t insert_row_offset = offset.get();
|
||||
// Assert(insert_record->timestamps_.size() >= insert_row_offset);
|
||||
if (insert_record_->timestamps_[insert_row_offset] <
|
||||
timestamps[i]) {
|
||||
InsertIntoInnerPairs(timestamps[i], {insert_row_offset});
|
||||
int64_t row_offset = offset.get();
|
||||
auto row_timestamp = insert_record_->timestamps_[row_offset];
|
||||
// Assert(insert_record->timestamps_.size() >= row_offset);
|
||||
if (row_timestamp < delete_timestamp) {
|
||||
InsertIntoInnerPairs(delete_timestamp, {row_offset});
|
||||
removed_num++;
|
||||
mem_add += sizeof(Timestamp) + sizeof(int64_t);
|
||||
} else if (row_timestamp == delete_timestamp) {
|
||||
// if insert record have multi same (pk, timestamp) pairs,
|
||||
// need to remove the next pairs, just keep first
|
||||
if (!has_duplicate_pk_timestamps) {
|
||||
has_duplicate_pk_timestamps = true;
|
||||
} else {
|
||||
InsertIntoInnerPairs(delete_timestamp, {row_offset});
|
||||
removed_num++;
|
||||
mem_add += sizeof(Timestamp) + sizeof(int64_t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -84,15 +98,24 @@ class DeletedRecord {
|
|||
auto end = deleted_pairs_.lower_bound(
|
||||
std::make_pair(timestamp, std::set<int64_t>{}));
|
||||
for (auto it = deleted_pairs_.begin(); it != end; it++) {
|
||||
// this may happen if lower_bound end is deleted_pairs_ end and
|
||||
// other threads insert node to deleted_pairs_ concurrently
|
||||
if (it->first > timestamp) {
|
||||
break;
|
||||
}
|
||||
for (auto& v : it->second) {
|
||||
bitset.set(v);
|
||||
if (v < insert_barrier) {
|
||||
bitset.set(v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handle the case where end points to an element with the same timestamp
|
||||
if (end != deleted_pairs_.end() && end->first == timestamp) {
|
||||
for (auto& v : end->second) {
|
||||
bitset.set(v);
|
||||
if (v < insert_barrier) {
|
||||
bitset.set(v);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -701,7 +701,7 @@ TEST(Expr, TestArrayEqual) {
|
|||
std::vector<ScalarArray> long_array_col;
|
||||
int num_iters = 1;
|
||||
for (int iter = 0; iter < num_iters; ++iter) {
|
||||
auto raw_data = DataGen(schema, N, iter, 0, 1, 3);
|
||||
auto raw_data = DataGen(schema, N, iter, 0, 0, 1, 3);
|
||||
auto new_long_array_col = raw_data.get_col<ScalarArray>(long_array_fid);
|
||||
long_array_col.insert(long_array_col.end(),
|
||||
new_long_array_col.begin(),
|
||||
|
|
|
@ -936,7 +936,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
|
|||
auto col = (milvus::segcore::Collection*)collection;
|
||||
|
||||
int N = 20;
|
||||
auto dataset = DataGen(col->get_schema(), N, 42, 0, 2);
|
||||
auto dataset = DataGen(col->get_schema(), N, 42, 0, 0, 2);
|
||||
|
||||
auto segment_interface = reinterpret_cast<SegmentInterface*>(segment);
|
||||
auto sealed_segment = dynamic_cast<SegmentSealed*>(segment_interface);
|
||||
|
@ -1156,7 +1156,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) {
|
|||
|
||||
// second insert data
|
||||
// insert data with pks = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} , timestamps = {10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
|
||||
dataset = DataGen(col->get_schema(), N, 42, N);
|
||||
dataset = DataGen(col->get_schema(), N, 42, 0, N);
|
||||
insert_data = serialize(dataset.raw_);
|
||||
PreInsert(segment, N, &offset);
|
||||
res = Insert(segment,
|
||||
|
@ -1194,7 +1194,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
|
|||
auto col = (milvus::segcore::Collection*)collection;
|
||||
|
||||
int N = 10;
|
||||
auto dataset = DataGen(col->get_schema(), N, 42, 0, 2);
|
||||
auto dataset = DataGen(col->get_schema(), N, 42, 0, 0, 2);
|
||||
|
||||
// insert data with pks = {0, 0, 1, 1, 2, 2, 3, 3, 4, 4} , timestamps = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
|
||||
auto segment_interface = reinterpret_cast<SegmentInterface*>(segment);
|
||||
|
|
|
@ -29,7 +29,7 @@ TEST(CApiTest, StreamReduce) {
|
|||
|
||||
//2. insert data into segments
|
||||
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
|
||||
auto dataset_1 = DataGen(schema, N, 55, 0, 1, 10, true);
|
||||
auto dataset_1 = DataGen(schema, N, 55, 0, 0, 1, 10, true);
|
||||
int64_t offset_1;
|
||||
PreInsert(segment_1, N, &offset_1);
|
||||
auto insert_data_1 = serialize(dataset_1.raw_);
|
||||
|
@ -42,7 +42,7 @@ TEST(CApiTest, StreamReduce) {
|
|||
insert_data_1.size());
|
||||
ASSERT_EQ(ins_res_1.error_code, Success);
|
||||
|
||||
auto dataset_2 = DataGen(schema, N, 66, 0, 1, 10, true);
|
||||
auto dataset_2 = DataGen(schema, N, 66, 0, 0, 1, 10, true);
|
||||
int64_t offset_2;
|
||||
PreInsert(segment_2, N, &offset_2);
|
||||
auto insert_data_2 = serialize(dataset_2.raw_);
|
||||
|
|
|
@ -95,7 +95,7 @@ TEST(GroupBY, SealedIndex) {
|
|||
size_t N = 50;
|
||||
|
||||
//2. load raw data
|
||||
auto raw_data = DataGen(schema, N, 42, 0, 8, 10, false, false);
|
||||
auto raw_data = DataGen(schema, N, 42, 0, 0, 8, 10, false, false);
|
||||
auto fields = schema->get_fields();
|
||||
for (auto field_data : raw_data.raw_->fields_data()) {
|
||||
int64_t field_id = field_data.field_id();
|
||||
|
@ -447,7 +447,7 @@ TEST(GroupBY, SealedData) {
|
|||
size_t N = 100;
|
||||
|
||||
//2. load raw data
|
||||
auto raw_data = DataGen(schema, N, 42, 0, 8, 10, false, false);
|
||||
auto raw_data = DataGen(schema, N, 42, 0, 0, 8, 10, false, false);
|
||||
auto fields = schema->get_fields();
|
||||
for (auto field_data : raw_data.raw_->fields_data()) {
|
||||
int64_t field_id = field_data.field_id();
|
||||
|
@ -542,9 +542,9 @@ TEST(GroupBY, Reduce) {
|
|||
int repeat_count_1 = 2;
|
||||
int repeat_count_2 = 5;
|
||||
auto raw_data1 =
|
||||
DataGen(schema, N, seed, ts_offset, repeat_count_1, false, false);
|
||||
DataGen(schema, N, seed, 0, ts_offset, repeat_count_1, false, false);
|
||||
auto raw_data2 =
|
||||
DataGen(schema, N, seed, ts_offset, repeat_count_2, false, false);
|
||||
DataGen(schema, N, seed, 0, ts_offset, repeat_count_2, false, false);
|
||||
|
||||
auto fields = schema->get_fields();
|
||||
//load segment1 raw data
|
||||
|
@ -676,7 +676,7 @@ TEST(GroupBY, GrowingRawData) {
|
|||
int n_batch = 3;
|
||||
for (int i = 0; i < n_batch; i++) {
|
||||
auto data_set =
|
||||
DataGen(schema, rows_per_batch, 42, 0, 8, 10, false, false);
|
||||
DataGen(schema, rows_per_batch, 42, 0, 0, 8, 10, false, false);
|
||||
auto offset = segment_growing_impl->PreInsert(rows_per_batch);
|
||||
segment_growing_impl->Insert(offset,
|
||||
rows_per_batch,
|
||||
|
@ -774,9 +774,9 @@ TEST(GroupBY, GrowingIndex) {
|
|||
int64_t rows_per_batch = 1024;
|
||||
int n_batch = 10;
|
||||
for (int i = 0; i < n_batch; i++) {
|
||||
auto data_set =
|
||||
DataGen(schema, rows_per_batch, 42, 0, 8, 10, false, false);
|
||||
auto offset = segment_growing_impl->PreInsert(rows_per_batch);
|
||||
auto data_set = DataGen(
|
||||
schema, rows_per_batch, 42, offset, offset, 1, 10, false, false);
|
||||
segment_growing_impl->Insert(offset,
|
||||
rows_per_batch,
|
||||
data_set.row_ids_.data(),
|
||||
|
|
|
@ -59,7 +59,7 @@ TEST(Growing, RemoveDuplicatedRecords) {
|
|||
int64_t c = 1000;
|
||||
auto offset = 0;
|
||||
|
||||
auto dataset = DataGen(schema, c, 42, 0, 1, 10, true);
|
||||
auto dataset = DataGen(schema, c, 42, 0, 0, 1, 10, true);
|
||||
auto pks = dataset.get_col<int64_t>(pk);
|
||||
segment->Insert(offset,
|
||||
c,
|
||||
|
@ -109,6 +109,34 @@ TEST(Growing, RemoveDuplicatedRecords) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST(Growing, RealCountWithDuplicateRecords) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto pk = schema->AddDebugField("pk", DataType::INT64);
|
||||
schema->set_primary_field_id(pk);
|
||||
auto segment = CreateGrowingSegment(schema, empty_index_meta);
|
||||
|
||||
int64_t c = 10;
|
||||
auto offset = 0;
|
||||
auto dataset = DataGen(schema, c);
|
||||
auto pks = dataset.get_col<int64_t>(pk);
|
||||
|
||||
// insert same values twice
|
||||
segment->Insert(offset,
|
||||
c,
|
||||
dataset.row_ids_.data(),
|
||||
dataset.timestamps_.data(),
|
||||
dataset.raw_);
|
||||
|
||||
segment->Insert(offset + c,
|
||||
c,
|
||||
dataset.row_ids_.data(),
|
||||
dataset.timestamps_.data(),
|
||||
dataset.raw_);
|
||||
|
||||
// real count is still c not 2c
|
||||
ASSERT_EQ(c, segment->get_real_count());
|
||||
}
|
||||
|
||||
TEST(Growing, RealCount) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto pk = schema->AddDebugField("pk", DataType::INT64);
|
||||
|
|
|
@ -383,7 +383,7 @@ TEST_P(RetrieveTest, LargeTimestamp) {
|
|||
int choose_sep = 3;
|
||||
auto choose = [=](int i) { return i * choose_sep % N; };
|
||||
uint64_t ts_offset = 100;
|
||||
auto dataset = DataGen(schema, N, 42, ts_offset + 1);
|
||||
auto dataset = DataGen(schema, N, 42, 0, ts_offset + 1);
|
||||
auto segment = CreateSealedSegment(schema);
|
||||
SealedLoadFieldData(dataset, *segment);
|
||||
auto i64_col = dataset.get_col<int64_t>(fid_64);
|
||||
|
|
|
@ -248,6 +248,7 @@ struct GeneratedData {
|
|||
DataGen(SchemaPtr schema,
|
||||
int64_t N,
|
||||
uint64_t seed,
|
||||
uint64_t pk_offset,
|
||||
uint64_t ts_offset,
|
||||
int repeat_count,
|
||||
int array_len,
|
||||
|
@ -317,14 +318,16 @@ GenerateRandomSparseFloatVector(size_t rows,
|
|||
return tensor;
|
||||
}
|
||||
|
||||
inline GeneratedData DataGen(SchemaPtr schema,
|
||||
int64_t N,
|
||||
uint64_t seed = 42,
|
||||
uint64_t ts_offset = 0,
|
||||
int repeat_count = 1,
|
||||
int array_len = 10,
|
||||
bool random_pk = false,
|
||||
bool random_val = true) {
|
||||
inline GeneratedData
|
||||
DataGen(SchemaPtr schema,
|
||||
int64_t N,
|
||||
uint64_t seed = 42,
|
||||
uint64_t pk_offset = 0,
|
||||
uint64_t ts_offset = 0,
|
||||
int repeat_count = 1,
|
||||
int array_len = 10,
|
||||
bool random_pk = false,
|
||||
bool random_val = true) {
|
||||
using std::vector;
|
||||
std::default_random_engine random(seed);
|
||||
std::normal_distribution<> distr(0, 1);
|
||||
|
@ -425,9 +428,11 @@ inline GeneratedData DataGen(SchemaPtr schema,
|
|||
case DataType::INT64: {
|
||||
vector<int64_t> data(N);
|
||||
for (int i = 0; i < N; i++) {
|
||||
if (random_pk && schema->get_primary_field_id()->get() ==
|
||||
field_id.get()) {
|
||||
data[i] = random() % N;
|
||||
if (schema->get_primary_field_id()->get() ==
|
||||
field_id.get()) {
|
||||
data[i] = random_pk
|
||||
? random() % N + pk_offset
|
||||
: data[i] = i / repeat_count + pk_offset;
|
||||
} else {
|
||||
data[i] = i / repeat_count;
|
||||
}
|
||||
|
|
|
@ -2996,7 +2996,7 @@ class TestQueryCount(TestcaseBase):
|
|||
# query count
|
||||
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
|
||||
check_task=CheckTasks.check_query_results,
|
||||
check_items={exp_res: [{count: tmp_nb}]}
|
||||
check_items={exp_res: [{count: 1}]}
|
||||
)
|
||||
|
||||
# delete and verify count
|
||||
|
|
Loading…
Reference in New Issue