Add test cases for L0 delete (#28380)

- test delete records in delta logs, WAL, L0 segment

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
pull/28280/head
ThreadDao 2023-11-14 10:26:24 +08:00 committed by GitHub
parent b1eb1ea506
commit ec23107146
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 78 additions and 6 deletions

View File

@ -419,9 +419,9 @@ class TestDeleteOperation(TestcaseBase):
assert len(inter) == 0
@pytest.mark.tags(CaseLabel.L1)
def test_delete_query_ids_both_sealed_and_channel(self):
def test_delete_query_ids_both_L0_segment_and_WAL(self):
"""
target: test query that delete ids from both channel and sealed
target: test query that delete records from both L0 segment and WAL
method: 1.create and insert
2.delete id 0 and flush
3.load and query id 0
@ -439,12 +439,12 @@ class TestDeleteOperation(TestcaseBase):
assert del_res.delete_count == 1
assert collection_w.num_entities == tmp_nb
# load and query id 0
# load and query id 0, delete records from L0 segment
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
# insert id tmp_nb and delete id 0 and tmp_nb
# insert id tmp_nb and delete id 0 and tmp_nb, delete records from WAL
df_new = cf.gen_default_dataframe_data(nb=1, start=tmp_nb)
collection_w.insert(df_new)
collection_w.delete(expr=f'{ct.default_int64_field_name} in {[tmp_nb]}')
@ -453,6 +453,50 @@ class TestDeleteOperation(TestcaseBase):
collection_w.query(expr=f'{ct.default_int64_field_name} in {[0, tmp_nb]}',
check_task=CheckTasks.check_query_empty)
# insert deleted ids and query last two rows
df_same = cf.gen_default_dataframe_data(tmp_nb+1)
collection_w.insert(df_same)
res = df_same.iloc[-2:, [0, 1, -1]].to_dict('records')
collection_w.query(expr=f'{ct.default_int64_field_name} >= {tmp_nb-1}',
output_fields=[ct.default_float_vec_field_name, ct.default_float_field_name],
check_task=CheckTasks.check_query_results, check_items={'exp_res': res, 'with_vec': True})
@pytest.mark.tags(CaseLabel.L1)
def test_delete_query_delta_logs(self):
"""
target: test delete and query which delete records comes from delta logs(L1 delta)
method: 1. insert -> flush
2. delete and flush 11 twice -> gen more than 10 L0 segment binlog -> L0 compaction
3. load and query
expected: gets empty
"""
L0_binlog_num_compaction = 10
# init collection and insert data without flush
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(df)
# delete id 0 and flush
for i in range(L0_binlog_num_compaction + 1):
delete_expr = f'{ct.default_int64_field_name} in {[i, i+1]}'
del_res, _ = collection_w.delete(delete_expr)
assert del_res.delete_count == 2
collection_w.flush()
# load and query id 0, delete records from L0 segment
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
query_expr = f'{ct.default_int64_field_name} < {L0_binlog_num_compaction+2}'
collection_w.query(query_expr, check_task=CheckTasks.check_query_empty)
# insert deleted ids and query last two rows
df_same = cf.gen_default_dataframe_data(L0_binlog_num_compaction+2)
collection_w.insert(df_same)
res = df_same.iloc[:, [0, 1, -1]].to_dict('records')
collection_w.query(expr=f'{ct.default_int64_field_name} < {L0_binlog_num_compaction+2}',
output_fields=[ct.default_float_vec_field_name, ct.default_float_field_name],
check_task=CheckTasks.check_query_results, check_items={'exp_res': res, 'with_vec': True})
@pytest.mark.tags(CaseLabel.L1)
def test_delete_search(self):
"""
@ -700,12 +744,21 @@ class TestDeleteOperation(TestcaseBase):
# delete
del_res, _ = collection_w.delete(tmp_expr)
assert del_res.delete_count == 1
collection_w.flush()
# load and query with id
# load and query with id -> delete records from L0 segment or WAL
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
# insert deleted id and query
df_same = cf.gen_default_dataframe_data(1)
collection_w.insert(df_same)
res = df_same.iloc[:, [0, 1, -1]].to_dict('records')
collection_w.query(expr=tmp_expr,
output_fields=[ct.default_float_vec_field_name, ct.default_float_field_name],
check_task=CheckTasks.check_query_results, check_items={'exp_res': res, 'with_vec': True})
@pytest.mark.tags(CaseLabel.L1)
def test_delete_growing_data_channel_delete(self):
"""
@ -1276,7 +1329,7 @@ class TestDeleteString(TestcaseBase):
assert len(inter) == 0
@pytest.mark.tags(CaseLabel.L1)
def test_delete_query_ids_both_sealed_and_channel_with_string(self):
def test_delete_query_ids_both_L0_segment_and_WAL_with_string(self):
"""
target: test query that delete ids from both channel and sealed
method: 1.create and insert, string field is primary
@ -1312,6 +1365,15 @@ class TestDeleteString(TestcaseBase):
collection_w.query(expr=f'{ct.default_string_field_name} in ["0", "tmp_nb"]',
check_task=CheckTasks.check_query_empty)
# insert deleted id and query
df_same = cf.gen_default_dataframe_data(1)
collection_w.insert(df_same)
res = df_same.iloc[:, [2, -1]].to_dict('records')
collection_w.query(expr=default_string_expr,
output_fields=[ct.default_float_vec_field_name],
check_task=CheckTasks.check_query_results,
check_items={'exp_res': res, 'with_vec': True, "primary_field": ct.default_string_field_name})
@pytest.mark.tags(CaseLabel.L1)
def test_delete_search_with_string(self):
"""
@ -1440,6 +1502,7 @@ class TestDeleteString(TestcaseBase):
# delete
del_res, _ = collection_w.delete(default_string_expr)
assert del_res.delete_count == 1
collection_w.flush()
# load and query with id
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
@ -1447,6 +1510,15 @@ class TestDeleteString(TestcaseBase):
collection_w.query(default_string_expr,
check_task=CheckTasks.check_query_empty)
# insert deleted id and query
df_same = cf.gen_default_dataframe_data(1)
collection_w.insert(df_same)
res = df_same.iloc[:, [2, -1]].to_dict('records')
collection_w.query(expr=default_string_expr,
output_fields=[ct.default_float_vec_field_name],
check_task=CheckTasks.check_query_results,
check_items={'exp_res': res, 'with_vec': True, "primary_field": ct.default_string_field_name})
@pytest.mark.tags(CaseLabel.L1)
def test_delete_growing_data_channel_delete_with_string(self):
"""