[test] Add and update compaction cases (#15680)

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
pull/15694/head
ThreadDao 2022-02-22 17:49:52 +08:00 committed by GitHub
parent ce9662c140
commit 74ffd5e549
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 206 additions and 61 deletions

View File

@ -253,7 +253,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def get_compaction_plans(self, timeout=None, check_task=None, check_items=None, **kwargs):
def get_compaction_plans(self, timeout=None, check_task=None, check_items={}, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.get_compaction_plans, timeout], **kwargs)

View File

@ -1,3 +1,5 @@
from pymilvus.client.types import CompactionPlans
from utils.util_log import test_log as log
from common import common_type as ct
from common import common_func as cf
@ -64,6 +66,12 @@ class ResponseChecker:
# Calculate distance interface that response check
result = self.check_distance(self.response, self.func_name, self.check_items)
elif self.check_task == CheckTasks.check_delete_compact:
result = self.check_delete_compact_plan(self.response, self.func_name, self.check_items)
elif self.check_task == CheckTasks.check_merge_compact:
result = self.check_merge_compact_plan(self.response, self.func_name, self.check_items)
# Add check_items here if something new need verify
return result
@ -112,7 +120,6 @@ class ResponseChecker:
# assert res_obj == class_obj
if func_name == "has_connection":
value_content = params.get(ct.value_content, False)
res_obj = res if res is not None else False
assert res_obj == value_content
@ -288,3 +295,56 @@ class ResponseChecker:
metric, sqrt)
return True
@staticmethod
def check_delete_compact_plan(compaction_plans, func_name, check_items):
"""
Verify that the delete type compaction plan
:param: compaction_plans: A compaction plan
:type: CompactionPlans
:param func_name: get_compaction_plans API name
:type func_name: str
:param check_items: which items you wish to check
plans_num represent the delete compact plans number
:type: dict
"""
to_check_func = 'get_compaction_plans'
if func_name != to_check_func:
log.warning("The function name is {} rather than {}".format(func_name, to_check_func))
if not isinstance(compaction_plans, CompactionPlans):
raise Exception("The compaction_plans result to check isn't CompactionPlans type object")
plans_num = check_items.get("plans_num", 1)
assert len(compaction_plans.plans) == plans_num
for plan in compaction_plans.plans:
assert len(plan.sources) == 1
assert plan.sources[0] != plan.target
@staticmethod
def check_merge_compact_plan(compaction_plans, func_name, check_items):
"""
Verify that the merge type compaction plan
:param: compaction_plans: A compaction plan
:type: CompactionPlans
:param func_name: get_compaction_plans API name
:type func_name: str
:param check_items: which items you wish to check
segment_num represent how many segments are expected to be merged, default is 2
:type: dict
"""
to_check_func = 'get_compaction_plans'
if func_name != to_check_func:
log.warning("The function name is {} rather than {}".format(func_name, to_check_func))
if not isinstance(compaction_plans, CompactionPlans):
raise Exception("The compaction_plans result to check isn't CompactionPlans type object")
segment_num = check_items.get("segment_num", 2)
assert len(compaction_plans.plans) == 1
assert len(compaction_plans.plans[0].sources) == segment_num
assert compaction_plans.plans[0].target not in compaction_plans.plans[0].sources

View File

@ -47,7 +47,7 @@ gracefulTime = 1
default_nlist = 128
compact_segment_num_threshold = 10
compact_delta_ratio_reciprocal = 5 # compact_delta_binlog_ratio is 0.2
compact_retention_duration = 20 # compaction travel time retention range 20s
compact_retention_duration = 40 # compaction travel time retention range 20s
max_compaction_interval = 60 # the max time interval (s) from the last compaction
max_field_num = 256 # Maximum number of fields in a collection
@ -179,6 +179,8 @@ class CheckTasks:
check_query_results = "check_query_results"
check_query_empty = "check_query_empty" # verify that query result is empty
check_distance = "check_distance"
check_delete_compact = "check_delete_compact"
check_merge_compact = "check_merge_compact"
class CaseLabel:

View File

@ -137,7 +137,7 @@ class TestCompactionParams(TestcaseBase):
c_plans, _ = collection_w.get_compaction_plans()
assert len(c_plans.plans) == 0
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("delete_pos", [1, tmp_nb // 2])
def test_compact_after_delete(self, delete_pos):
"""
@ -148,7 +148,7 @@ class TestCompactionParams(TestcaseBase):
expected: Verify compact result
"""
# create, insert without flush
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
df = cf.gen_default_dataframe_data(tmp_nb)
insert_res, _ = collection_w.insert(df)
@ -158,12 +158,10 @@ class TestCompactionParams(TestcaseBase):
assert collection_w.num_entities == tmp_nb
# compact, get plan
sleep(ct.compact_retention_duration + 1)
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
# Delete type compaction just merge insert log and delta log of one segment
# todo assert len(c_plans.plans[0].sources) == 1
collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact)
collection_w.load()
collection_w.query(single_expr, check_items=CheckTasks.check_query_empty)
@ -172,7 +170,6 @@ class TestCompactionParams(TestcaseBase):
collection_w.query(f'{ct.default_int64_field_name} in {insert_res.primary_keys[-1:]}',
check_items={'exp_res': res})
@pytest.mark.xfail("Issue #15499")
@pytest.mark.tags(CaseLabel.L3)
def test_compact_after_delete_index(self):
"""
@ -199,11 +196,10 @@ class TestCompactionParams(TestcaseBase):
log.debug(collection_w.index())
# compact, get plan
sleep(50)
sleep(ct.compact_retention_duration + 1)
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
assert len(c_plans.plans[0].sources) == 1
collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact)
collection_w.load()
res, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(),
@ -230,10 +226,9 @@ class TestCompactionParams(TestcaseBase):
collection_w.delete(ratio_expr)
assert collection_w.num_entities == tmp_nb
# auto_compact
sleep(1)
# Delete type compaction just merge insert log and delta log of one segment
# todo assert len(c_plans.plans[0].sources) == 1
# Flush a new segment and meet condition 20% deleted entities, triggre compaction but no way to get plan
collection_w.insert(cf.gen_default_dataframe_data(1, start=tmp_nb))
assert collection_w.num_entities == tmp_nb + 1
collection_w.load()
collection_w.query(ratio_expr, check_items=CheckTasks.check_query_empty)
@ -264,14 +259,14 @@ class TestCompactionParams(TestcaseBase):
collection_w.load()
collection_w.query(ratio_expr, check_task=CheckTasks.check_query_empty)
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.tags(CaseLabel.L3)
def test_compact_after_delete_all(self):
"""
target: test delete all and compact
method: 1.create with shard_num=1
2.delete all sealed data
3.compact
expected: collection num_entities is close to 0
expected: Get compaction plan
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
df = cf.gen_default_dataframe_data()
@ -283,10 +278,10 @@ class TestCompactionParams(TestcaseBase):
# currently no way to verify whether it is compact after delete,
# because the merge compact plan is generate first
sleep(ct.compact_retention_duration + 1)
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
log.debug(collection_w.num_entities)
collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact)
collection_w.load()
collection_w.query(expr, check_items=CheckTasks.check_query_empty)
@ -334,14 +329,15 @@ class TestCompactionParams(TestcaseBase):
"""
target: test merge insert and delta log triggered by max_compaction_interval
method: todo
expected: auto merge
expected: auto compact binlogs
"""
pass
class TestCompactionOperation(TestcaseBase):
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.xfail(reason="Issue https://github.com/milvus-io/milvus/issues/15665")
@pytest.mark.tags(CaseLabel.L3)
def test_compact_both_delete_merge(self):
"""
target: test compact both delete and merge
@ -350,7 +346,7 @@ class TestCompactionOperation(TestcaseBase):
3.delete and flush (new insert)
4.compact
5.load and search
expected:
expected: Triggre two types compaction
"""
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
ids = []
@ -366,12 +362,16 @@ class TestCompactionOperation(TestcaseBase):
collection_w.insert(cf.gen_default_dataframe_data(1, start=2 * tmp_nb))
assert collection_w.num_entities == 2 * tmp_nb + 1
sleep(ct.compact_retention_duration + 1)
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
c_plans = collection_w.get_compaction_plans()[0]
assert len(c_plans.plans) == 2
# todo assert two types compaction plan
# search
sleep(5)
ids.pop(0)
ids.pop(-1)
collection_w.load()
@ -402,7 +402,7 @@ class TestCompactionOperation(TestcaseBase):
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)
# search
collection_w.load()
@ -426,7 +426,7 @@ class TestCompactionOperation(TestcaseBase):
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1,
schema=cf.gen_default_binary_collection_schema())
for i in range(2):
df, _ = cf.gen_default_binary_dataframe_data(ct.default_nb)
df, _ = cf.gen_default_binary_dataframe_data()
collection_w.insert(data=df)
assert collection_w.num_entities == (i + 1) * ct.default_nb
@ -448,7 +448,7 @@ class TestCompactionOperation(TestcaseBase):
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)
# verify index re-build and re-load
search_params = {"metric_type": "L1", "params": {"nprobe": 10}}
@ -457,17 +457,16 @@ class TestCompactionOperation(TestcaseBase):
search_params, ct.default_limit,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
ct.err_msg: "Metric type of field index isn't "
"the same with search info"})
ct.err_msg: "metric type not found: (L1)"})
# verify search result
search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}}
search_res_two, _ = collection_w.search(vectors,
ct.default_binary_vec_field_name,
search_params, ct.default_limit)
assert len(search_res_two) == ct.default_nq
for hits in search_res_two:
assert len(hits) == ct.default_limit
for i in range(ct.default_nq):
for j in range(ct.default_limit):
assert search_res_two[i][j].id == search_res_one[i][j].id
@pytest.mark.tags(CaseLabel.L1)
def test_compact_and_index(self):
@ -485,7 +484,7 @@ class TestCompactionOperation(TestcaseBase):
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)
# create index
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
@ -500,7 +499,7 @@ class TestCompactionOperation(TestcaseBase):
for hits in search_res:
assert len(hits) == ct.default_limit
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.L3)
def test_compact_delete_and_search(self):
"""
target: test delete and compact segment, and search
@ -517,10 +516,13 @@ class TestCompactionOperation(TestcaseBase):
expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:ct.default_nb // 2]}'
collection_w.delete(expr)
assert collection_w.num_entities == ct.default_nb
sleep(ct.compact_retention_duration + 1)
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact)
# search
sleep(2)
collection_w.load()
search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim),
ct.default_float_vec_field_name,
@ -546,7 +548,7 @@ class TestCompactionOperation(TestcaseBase):
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)
# search
collection_w.load()
@ -584,7 +586,6 @@ class TestCompactionOperation(TestcaseBase):
assert len(c_plans.plans) == 0
# search
sleep(2)
collection_w.load()
search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim),
ct.default_float_vec_field_name,
@ -595,7 +596,34 @@ class TestCompactionOperation(TestcaseBase):
"limit": ct.default_limit}
)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.L3)
def test_compact_repeatedly_delete_same_id(self):
"""
target: test compact after repeatedly delete same entity
method: 1.Create and insert entities
2.repeatedly delete the same id
3.compact
expected: No exception or delta log just delete one
"""
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
expr = f'{ct.default_int64_field_name} in [0]'
for _ in range(100):
collection_w.delete(expr=expr)
assert collection_w.num_entities == ct.default_nb
sleep(ct.compact_retention_duration + 1)
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact)
collection_w.load()
collection_w.query(expr, check_task=CheckTasks.check_query_empty)
@pytest.mark.tags(CaseLabel.L3)
def test_compact_delete_inside_time_travel(self):
"""
target: test compact inside time_travel range
@ -618,7 +646,8 @@ class TestCompactionOperation(TestcaseBase):
delete_res, _ = collection_w.delete(expr)
log.debug(collection_w.num_entities)
collection_w.compact()
c_plans = collection_w.get_compaction_plans()[0]
assert len(c_plans.plans) == 0
collection_w.load()
search_one, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(),
@ -652,20 +681,20 @@ class TestCompactionOperation(TestcaseBase):
log.debug(collection_w.num_entities)
# ensure compact remove delta data that delete outside retention range
# sleep(ct.compact_retention_duration)
sleep(60)
sleep(ct.compact_retention_duration + 1)
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact)
collection_w.load()
# search with travel_time tt
search_res, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit,
travel_timestamp=tt)
log.debug(search_res[0].ids)
assert len(search_res[0]) == 0
collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit,
travel_timestamp=tt,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "only support to travel back to"})
@pytest.mark.tags(CaseLabel.L0)
def test_compact_merge_two_segments(self):
@ -684,17 +713,15 @@ class TestCompactionOperation(TestcaseBase):
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)
# verify the two segments are merged into one
assert len(c_plans.plans) == 1
assert len(c_plans.plans[0].sources) == 2
target = c_plans.plans[0].target
c_plans = collection_w.get_compaction_plans()[0]
# verify queryNode load the compacted segments
collection_w.load()
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
assert target == segment_info[0].segmentID
assert c_plans.plans[0].target == segment_info[0].segmentID
@pytest.mark.tags(CaseLabel.L2)
def test_compact_no_merge(self):
@ -734,7 +761,7 @@ class TestCompactionOperation(TestcaseBase):
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()[0]
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact, check_items={"segment_num": 2})
collection_w.load()
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
@ -759,8 +786,8 @@ class TestCompactionOperation(TestcaseBase):
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
assert len(c_plans.plans[0].sources) == num_of_segment
c_plans = collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact,
check_items={"segment_num": num_of_segment})[0]
target = c_plans.plans[0].target
collection_w.load()
@ -792,7 +819,7 @@ class TestCompactionOperation(TestcaseBase):
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()[0]
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)
collection_w.load()
search_res, _ = collection_w.search(df2[ct.default_float_vec_field_name][:1].to_list(),
@ -845,7 +872,6 @@ class TestCompactionOperation(TestcaseBase):
# create collection shard_num=1, insert 9 segments, each with one entity
collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=less_threshold)
sleep(3)
# load and verify no auto-merge
collection_w.load()
@ -896,11 +922,10 @@ class TestCompactionOperation(TestcaseBase):
"""
# init collection with one shard, insert into two segments
collection_w = self.collection_insert_multi_segments_one_shard(prefix, is_dup=False)
# compact and complete
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact)
# delete and query
expr = f'{ct.default_int64_field_name} in {[0]}'
@ -934,6 +959,32 @@ class TestCompactionOperation(TestcaseBase):
# Actually no merged
assert len(c_plans.plans) == 0
@pytest.mark.tags(CaseLabel.L3)
def test_compact_delete_cross_shards(self):
"""
target: test delete compact cross different shards
method: 1.create with 2 shards
2.insert entities into 2 segments
3.delete one entity from each segment
4.call compact and get compact plan
expected: Generate compaction plan for each segment
"""
shards_num = 2
# insert into two segments with two shard
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=shards_num)
df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(df)
expr = f"{ct.default_int64_field_name} in [0, 99]"
collection_w.delete(expr)
assert collection_w.num_entities == tmp_nb
# compact
sleep(ct.compact_retention_duration + 1)
collection_w.compact()
collection_w.wait_for_compaction_completed(timeout=1)
collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact,
check_items={"plans_num": shards_num})
@pytest.mark.tags(CaseLabel.L1)
def test_compact_cross_partition(self):
"""
@ -1020,3 +1071,35 @@ class TestCompactionOperation(TestcaseBase):
collection_w.load()
seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
assert len(seg_info) == 1
def test_compact_during_search(self):
"""
target: test compact during search
method: while compact collection start a thread to search
expected: No exception
"""
# less than auto-merge threshold 10
num_of_segment = 5
# create collection shard_num=1, insert 11 segments, each with one entity
collection_w = self.collection_insert_multi_segments_one_shard(prefix,
num_of_segment=num_of_segment,
nb_of_segment=100)
def do_search():
for _ in range(5):
search_res, _ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
assert len(search_res[0]) == ct.default_limit
# compact during search
collection_w.load()
t = threading.Thread(target=do_search, args=())
t.start()
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact,
check_items={"segment_num": num_of_segment})
t.join()