diff --git a/tests/python_client/base/collection_wrapper.py b/tests/python_client/base/collection_wrapper.py index eb082ccb0f..fb0525d359 100644 --- a/tests/python_client/base/collection_wrapper.py +++ b/tests/python_client/base/collection_wrapper.py @@ -253,7 +253,7 @@ class ApiCollectionWrapper: check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result - def get_compaction_plans(self, timeout=None, check_task=None, check_items=None, **kwargs): + def get_compaction_plans(self, timeout=None, check_task=None, check_items={}, **kwargs): timeout = TIMEOUT if timeout is None else timeout func_name = sys._getframe().f_code.co_name res, check = api_request([self.collection.get_compaction_plans, timeout], **kwargs) diff --git a/tests/python_client/check/func_check.py b/tests/python_client/check/func_check.py index 0d187f9b84..4b014a4402 100644 --- a/tests/python_client/check/func_check.py +++ b/tests/python_client/check/func_check.py @@ -1,3 +1,5 @@ +from pymilvus.client.types import CompactionPlans + from utils.util_log import test_log as log from common import common_type as ct from common import common_func as cf @@ -64,6 +66,12 @@ class ResponseChecker: # Calculate distance interface that response check result = self.check_distance(self.response, self.func_name, self.check_items) + elif self.check_task == CheckTasks.check_delete_compact: + result = self.check_delete_compact_plan(self.response, self.func_name, self.check_items) + + elif self.check_task == CheckTasks.check_merge_compact: + result = self.check_merge_compact_plan(self.response, self.func_name, self.check_items) + # Add check_items here if something new need verify return result @@ -112,7 +120,6 @@ class ResponseChecker: # assert res_obj == class_obj if func_name == "has_connection": - value_content = params.get(ct.value_content, False) res_obj = res if res is not None else False assert res_obj == value_content @@ -288,3 +295,56 @@ class ResponseChecker: metric, sqrt) return True + + @staticmethod + def check_delete_compact_plan(compaction_plans, func_name, check_items): + """ + Verify that the delete type compaction plan + + :param: compaction_plans: A compaction plan + :type: CompactionPlans + + :param func_name: get_compaction_plans API name + :type func_name: str + + :param check_items: which items you wish to check + plans_num represent the delete compact plans number + :type: dict + """ + to_check_func = 'get_compaction_plans' + if func_name != to_check_func: + log.warning("The function name is {} rather than {}".format(func_name, to_check_func)) + if not isinstance(compaction_plans, CompactionPlans): + raise Exception("The compaction_plans result to check isn't CompactionPlans type object") + + plans_num = check_items.get("plans_num", 1) + assert len(compaction_plans.plans) == plans_num + for plan in compaction_plans.plans: + assert len(plan.sources) == 1 + assert plan.sources[0] != plan.target + + @staticmethod + def check_merge_compact_plan(compaction_plans, func_name, check_items): + """ + Verify that the merge type compaction plan + + :param: compaction_plans: A compaction plan + :type: CompactionPlans + + :param func_name: get_compaction_plans API name + :type func_name: str + + :param check_items: which items you wish to check + segment_num represent how many segments are expected to be merged, default is 2 + :type: dict + """ + to_check_func = 'get_compaction_plans' + if func_name != to_check_func: + log.warning("The function name is {} rather than {}".format(func_name, to_check_func)) + if not isinstance(compaction_plans, CompactionPlans): + raise Exception("The compaction_plans result to check isn't CompactionPlans type object") + + segment_num = check_items.get("segment_num", 2) + assert len(compaction_plans.plans) == 1 + assert len(compaction_plans.plans[0].sources) == segment_num + assert compaction_plans.plans[0].target not in compaction_plans.plans[0].sources diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index 13e59dd0c7..cacde89e24 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -47,7 +47,7 @@ gracefulTime = 1 default_nlist = 128 compact_segment_num_threshold = 10 compact_delta_ratio_reciprocal = 5 # compact_delta_binlog_ratio is 0.2 -compact_retention_duration = 20 # compaction travel time retention range 20s +compact_retention_duration = 40 # compaction travel time retention range 20s max_compaction_interval = 60 # the max time interval (s) from the last compaction max_field_num = 256 # Maximum number of fields in a collection @@ -179,6 +179,8 @@ class CheckTasks: check_query_results = "check_query_results" check_query_empty = "check_query_empty" # verify that query result is empty check_distance = "check_distance" + check_delete_compact = "check_delete_compact" + check_merge_compact = "check_merge_compact" class CaseLabel: diff --git a/tests/python_client/testcases/test_compaction.py b/tests/python_client/testcases/test_compaction.py index 7f2424db5e..7f4e7d1aee 100644 --- a/tests/python_client/testcases/test_compaction.py +++ b/tests/python_client/testcases/test_compaction.py @@ -137,7 +137,7 @@ class TestCompactionParams(TestcaseBase): c_plans, _ = collection_w.get_compaction_plans() assert len(c_plans.plans) == 0 - @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("delete_pos", [1, tmp_nb // 2]) def test_compact_after_delete(self, delete_pos): """ @@ -148,7 +148,7 @@ class TestCompactionParams(TestcaseBase): expected: Verify compact result """ # create, insert without flush - collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix)) + collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1) df = cf.gen_default_dataframe_data(tmp_nb) insert_res, _ = collection_w.insert(df) @@ -158,12 +158,10 @@ class TestCompactionParams(TestcaseBase): assert collection_w.num_entities == tmp_nb # compact, get plan + sleep(ct.compact_retention_duration + 1) collection_w.compact() collection_w.wait_for_compaction_completed() - c_plans = collection_w.get_compaction_plans()[0] - - # Delete type compaction just merge insert log and delta log of one segment - # todo assert len(c_plans.plans[0].sources) == 1 + collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact) collection_w.load() collection_w.query(single_expr, check_items=CheckTasks.check_query_empty) @@ -172,7 +170,6 @@ class TestCompactionParams(TestcaseBase): collection_w.query(f'{ct.default_int64_field_name} in {insert_res.primary_keys[-1:]}', check_items={'exp_res': res}) - @pytest.mark.xfail("Issue #15499") @pytest.mark.tags(CaseLabel.L3) def test_compact_after_delete_index(self): """ @@ -199,11 +196,10 @@ class TestCompactionParams(TestcaseBase): log.debug(collection_w.index()) # compact, get plan - sleep(50) + sleep(ct.compact_retention_duration + 1) collection_w.compact() collection_w.wait_for_compaction_completed() - c_plans = collection_w.get_compaction_plans()[0] - assert len(c_plans.plans[0].sources) == 1 + collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact) collection_w.load() res, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(), @@ -230,10 +226,9 @@ class TestCompactionParams(TestcaseBase): collection_w.delete(ratio_expr) assert collection_w.num_entities == tmp_nb - # auto_compact - sleep(1) - # Delete type compaction just merge insert log and delta log of one segment - # todo assert len(c_plans.plans[0].sources) == 1 + # Flush a new segment and meet condition 20% deleted entities, triggre compaction but no way to get plan + collection_w.insert(cf.gen_default_dataframe_data(1, start=tmp_nb)) + assert collection_w.num_entities == tmp_nb + 1 collection_w.load() collection_w.query(ratio_expr, check_items=CheckTasks.check_query_empty) @@ -264,14 +259,14 @@ class TestCompactionParams(TestcaseBase): collection_w.load() collection_w.query(ratio_expr, check_task=CheckTasks.check_query_empty) - @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.tags(CaseLabel.L3) def test_compact_after_delete_all(self): """ target: test delete all and compact method: 1.create with shard_num=1 2.delete all sealed data 3.compact - expected: collection num_entities is close to 0 + expected: Get compaction plan """ collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) df = cf.gen_default_dataframe_data() @@ -283,10 +278,10 @@ class TestCompactionParams(TestcaseBase): # currently no way to verify whether it is compact after delete, # because the merge compact plan is generate first + sleep(ct.compact_retention_duration + 1) collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans() - log.debug(collection_w.num_entities) + collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact) collection_w.load() collection_w.query(expr, check_items=CheckTasks.check_query_empty) @@ -334,14 +329,15 @@ class TestCompactionParams(TestcaseBase): """ target: test merge insert and delta log triggered by max_compaction_interval method: todo - expected: auto merge + expected: auto compact binlogs """ pass class TestCompactionOperation(TestcaseBase): - @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.xfail(reason="Issue https://github.com/milvus-io/milvus/issues/15665") + @pytest.mark.tags(CaseLabel.L3) def test_compact_both_delete_merge(self): """ target: test compact both delete and merge @@ -350,7 +346,7 @@ class TestCompactionOperation(TestcaseBase): 3.delete and flush (new insert) 4.compact 5.load and search - expected: + expected: Triggre two types compaction """ collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1) ids = [] @@ -366,12 +362,16 @@ class TestCompactionOperation(TestcaseBase): collection_w.insert(cf.gen_default_dataframe_data(1, start=2 * tmp_nb)) assert collection_w.num_entities == 2 * tmp_nb + 1 + sleep(ct.compact_retention_duration + 1) + collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans() + c_plans = collection_w.get_compaction_plans()[0] + + assert len(c_plans.plans) == 2 + # todo assert two types compaction plan # search - sleep(5) ids.pop(0) ids.pop(-1) collection_w.load() @@ -402,7 +402,7 @@ class TestCompactionOperation(TestcaseBase): # compact collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans() + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact) # search collection_w.load() @@ -426,7 +426,7 @@ class TestCompactionOperation(TestcaseBase): collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1, schema=cf.gen_default_binary_collection_schema()) for i in range(2): - df, _ = cf.gen_default_binary_dataframe_data(ct.default_nb) + df, _ = cf.gen_default_binary_dataframe_data() collection_w.insert(data=df) assert collection_w.num_entities == (i + 1) * ct.default_nb @@ -448,7 +448,7 @@ class TestCompactionOperation(TestcaseBase): # compact collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans() + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact) # verify index re-build and re-load search_params = {"metric_type": "L1", "params": {"nprobe": 10}} @@ -457,17 +457,16 @@ class TestCompactionOperation(TestcaseBase): search_params, ct.default_limit, check_task=CheckTasks.err_res, check_items={ct.err_code: 1, - ct.err_msg: "Metric type of field index isn't " - "the same with search info"}) + ct.err_msg: "metric type not found: (L1)"}) # verify search result search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}} search_res_two, _ = collection_w.search(vectors, ct.default_binary_vec_field_name, search_params, ct.default_limit) - assert len(search_res_two) == ct.default_nq - for hits in search_res_two: - assert len(hits) == ct.default_limit + for i in range(ct.default_nq): + for j in range(ct.default_limit): + assert search_res_two[i][j].id == search_res_one[i][j].id @pytest.mark.tags(CaseLabel.L1) def test_compact_and_index(self): @@ -485,7 +484,7 @@ class TestCompactionOperation(TestcaseBase): # compact collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans() + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact) # create index collection_w.create_index(ct.default_float_vec_field_name, ct.default_index) @@ -500,7 +499,7 @@ class TestCompactionOperation(TestcaseBase): for hits in search_res: assert len(hits) == ct.default_limit - @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.tags(CaseLabel.L3) def test_compact_delete_and_search(self): """ target: test delete and compact segment, and search @@ -517,10 +516,13 @@ class TestCompactionOperation(TestcaseBase): expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:ct.default_nb // 2]}' collection_w.delete(expr) assert collection_w.num_entities == ct.default_nb + + sleep(ct.compact_retention_duration + 1) collection_w.compact() + collection_w.wait_for_compaction_completed() + collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact) # search - sleep(2) collection_w.load() search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim), ct.default_float_vec_field_name, @@ -546,7 +548,7 @@ class TestCompactionOperation(TestcaseBase): # compact collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans() + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact) # search collection_w.load() @@ -584,7 +586,6 @@ class TestCompactionOperation(TestcaseBase): assert len(c_plans.plans) == 0 # search - sleep(2) collection_w.load() search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim), ct.default_float_vec_field_name, @@ -595,7 +596,34 @@ class TestCompactionOperation(TestcaseBase): "limit": ct.default_limit} ) - @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.tags(CaseLabel.L3) + def test_compact_repeatedly_delete_same_id(self): + """ + target: test compact after repeatedly delete same entity + method: 1.Create and insert entities + 2.repeatedly delete the same id + 3.compact + expected: No exception or delta log just delete one + """ + collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1) + + df = cf.gen_default_dataframe_data() + insert_res, _ = collection_w.insert(df) + expr = f'{ct.default_int64_field_name} in [0]' + + for _ in range(100): + collection_w.delete(expr=expr) + assert collection_w.num_entities == ct.default_nb + + sleep(ct.compact_retention_duration + 1) + collection_w.compact() + collection_w.wait_for_compaction_completed() + collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact) + + collection_w.load() + collection_w.query(expr, check_task=CheckTasks.check_query_empty) + + @pytest.mark.tags(CaseLabel.L3) def test_compact_delete_inside_time_travel(self): """ target: test compact inside time_travel range @@ -618,7 +646,8 @@ class TestCompactionOperation(TestcaseBase): delete_res, _ = collection_w.delete(expr) log.debug(collection_w.num_entities) - collection_w.compact() + c_plans = collection_w.get_compaction_plans()[0] + assert len(c_plans.plans) == 0 collection_w.load() search_one, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(), @@ -652,20 +681,20 @@ class TestCompactionOperation(TestcaseBase): log.debug(collection_w.num_entities) # ensure compact remove delta data that delete outside retention range - # sleep(ct.compact_retention_duration) - sleep(60) + sleep(ct.compact_retention_duration + 1) collection_w.compact() collection_w.wait_for_compaction_completed() + collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact) collection_w.load() # search with travel_time tt - search_res, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(), - ct.default_float_vec_field_name, - ct.default_search_params, ct.default_limit, - travel_timestamp=tt) - log.debug(search_res[0].ids) - assert len(search_res[0]) == 0 + collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(), + ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit, + travel_timestamp=tt, + check_task=CheckTasks.err_res, + check_items={ct.err_code: 1, ct.err_msg: "only support to travel back to"}) @pytest.mark.tags(CaseLabel.L0) def test_compact_merge_two_segments(self): @@ -684,17 +713,15 @@ class TestCompactionOperation(TestcaseBase): collection_w.compact() collection_w.wait_for_compaction_completed() - c_plans = collection_w.get_compaction_plans()[0] + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact) # verify the two segments are merged into one - assert len(c_plans.plans) == 1 - assert len(c_plans.plans[0].sources) == 2 - target = c_plans.plans[0].target + c_plans = collection_w.get_compaction_plans()[0] # verify queryNode load the compacted segments collection_w.load() segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] - assert target == segment_info[0].segmentID + assert c_plans.plans[0].target == segment_info[0].segmentID @pytest.mark.tags(CaseLabel.L2) def test_compact_no_merge(self): @@ -734,7 +761,7 @@ class TestCompactionOperation(TestcaseBase): collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans()[0] + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact, check_items={"segment_num": 2}) collection_w.load() segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] @@ -759,8 +786,8 @@ class TestCompactionOperation(TestcaseBase): collection_w.compact() collection_w.wait_for_compaction_completed() - c_plans = collection_w.get_compaction_plans()[0] - assert len(c_plans.plans[0].sources) == num_of_segment + c_plans = collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact, + check_items={"segment_num": num_of_segment})[0] target = c_plans.plans[0].target collection_w.load() @@ -792,7 +819,7 @@ class TestCompactionOperation(TestcaseBase): collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans()[0] + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact) collection_w.load() search_res, _ = collection_w.search(df2[ct.default_float_vec_field_name][:1].to_list(), @@ -845,7 +872,6 @@ class TestCompactionOperation(TestcaseBase): # create collection shard_num=1, insert 9 segments, each with one entity collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=less_threshold) - sleep(3) # load and verify no auto-merge collection_w.load() @@ -896,11 +922,10 @@ class TestCompactionOperation(TestcaseBase): """ # init collection with one shard, insert into two segments collection_w = self.collection_insert_multi_segments_one_shard(prefix, is_dup=False) - # compact and complete collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans() + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact) # delete and query expr = f'{ct.default_int64_field_name} in {[0]}' @@ -934,6 +959,32 @@ class TestCompactionOperation(TestcaseBase): # Actually no merged assert len(c_plans.plans) == 0 + @pytest.mark.tags(CaseLabel.L3) + def test_compact_delete_cross_shards(self): + """ + target: test delete compact cross different shards + method: 1.create with 2 shards + 2.insert entities into 2 segments + 3.delete one entity from each segment + 4.call compact and get compact plan + expected: Generate compaction plan for each segment + """ + shards_num = 2 + # insert into two segments with two shard + collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=shards_num) + df = cf.gen_default_dataframe_data(tmp_nb) + collection_w.insert(df) + expr = f"{ct.default_int64_field_name} in [0, 99]" + collection_w.delete(expr) + assert collection_w.num_entities == tmp_nb + + # compact + sleep(ct.compact_retention_duration + 1) + collection_w.compact() + collection_w.wait_for_compaction_completed(timeout=1) + collection_w.get_compaction_plans(check_task=CheckTasks.check_delete_compact, + check_items={"plans_num": shards_num}) + @pytest.mark.tags(CaseLabel.L1) def test_compact_cross_partition(self): """ @@ -1020,3 +1071,35 @@ class TestCompactionOperation(TestcaseBase): collection_w.load() seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] assert len(seg_info) == 1 + + def test_compact_during_search(self): + """ + target: test compact during search + method: while compact collection start a thread to search + expected: No exception + """ + # less than auto-merge threshold 10 + num_of_segment = 5 + + # create collection shard_num=1, insert 11 segments, each with one entity + collection_w = self.collection_insert_multi_segments_one_shard(prefix, + num_of_segment=num_of_segment, + nb_of_segment=100) + + def do_search(): + for _ in range(5): + search_res, _ = collection_w.search(cf.gen_vectors(1, ct.default_dim), + ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit) + assert len(search_res[0]) == ct.default_limit + + # compact during search + collection_w.load() + t = threading.Thread(target=do_search, args=()) + t.start() + + collection_w.compact() + collection_w.wait_for_compaction_completed() + collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact, + check_items={"segment_num": num_of_segment}) + t.join()