Add all compaction test cases (#12149)

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>

Verify issue and update case

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
pull/12177/head
ThreadDao 2021-11-22 15:33:14 +08:00 committed by GitHub
parent e4c5713265
commit ab9fd0f9ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 525 additions and 93 deletions

View File

@ -195,3 +195,22 @@ class TestcaseBase(Base):
conn.flush([collection_w.name])
collection_w.load()
return collection_w, partition_w, df_partition, df_default
def collection_insert_multi_segments_one_shard(self, collection_prefix, num_of_segment=2, nb_of_segment=1,
is_dup=True, is_binary=False):
"""
init collection with one shard, insert data into two segments on one shard (they can be merged)
:param collection_prefix: collection name prefix
:param num_of_segment: number of segments
:param nb_of_segment: number of entities per segment
:param is_dup: whether the primary keys of each segment is duplicated
:return: collection wrap and partition wrap
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(collection_prefix), shards_num=1)
for i in range(num_of_segment):
start = 0 if is_dup else i * nb_of_segment
df = cf.gen_default_dataframe_data(nb_of_segment, start=start)
collection_w.insert(df)
assert collection_w.num_entities == nb_of_segment * (i + 1)
return collection_w

View File

@ -1,10 +1,13 @@
import asyncio
import sys
from pymilvus import Collection
from pymilvus.client.types import State
sys.path.append("..")
from check.func_check import ResponseChecker
from utils.api_request import api_request
from utils.util_log import test_log as log
TIMEOUT = 20
@ -16,7 +19,8 @@ TIMEOUT = 20
class ApiCollectionWrapper:
collection = None
def init_collection(self, name, schema=None, using="default", shards_num=2, check_task=None, check_items=None, **kwargs):
def init_collection(self, name, schema=None, using="default", shards_num=2, check_task=None, check_items=None,
**kwargs):
""" In order to distinguish the same name of collection """
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([Collection, name, schema, using, shards_num], **kwargs)
@ -239,14 +243,22 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def get_compaction_state(self, timeout=None, **kwargs):
def get_compaction_state(self, timeout=None, check_task=None, check_items=None, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
res = self.collection.get_compaction_state(timeout, **kwargs)
return res
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.get_compaction_state, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def get_compaction_plans(self, timeout=None, check_task=None, check_items=None, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.get_compaction_plans, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
return res, check_result
def wait_for_compaction_completed(self, timeout=None, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
res = self.collection.wait_for_compaction_completed(timeout, **kwargs)
log.debug(res)
return res

View File

@ -13,6 +13,7 @@ default_nq = 2
default_limit = 10
default_search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
default_index = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
default_binary_index = {"index_type": "BIN_IVF_FLAT", "params": {"nlist": 128}, "metric_type": "JACCARD"}
max_top_k = 16384
max_partition_num = 4096 # 256
default_segment_row_limit = 1000
@ -44,6 +45,9 @@ binary_vec_field_desc = "binary vector type field"
max_dim = 32768
gracefulTime = 1
default_nlist = 128
compact_segment_num_threshold = 10
# compact_delta_binlog_ratio is 0.2
compact_delta_ratio_reciprocal = 5
Not_Exist = "Not_Exist"
Connect_Object_Name = "Milvus"

View File

@ -1,5 +1,7 @@
from time import sleep
import pytest
from pymilvus.client.types import State
from pymilvus.grpc_gen.common_pb2 import SegmentState
from base.client_base import TestcaseBase
from common import common_func as cf
@ -11,7 +13,7 @@ prefix = "compact"
tmp_nb = 100
@pytest.mark.skip(reason="Waiting for development")
@pytest.mark.skip(reason="Ci failed")
class TestCompactionParams(TestcaseBase):
@pytest.mark.tags(CaseLabel.L1)
@ -31,23 +33,41 @@ class TestCompactionParams(TestcaseBase):
error = {ct.err_code: 0, ct.err_msg: "should create connect first"}
collection_w.compact(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.xfail(reason="Issue #12075")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_twice(self):
"""
target: test compact twice
method: compact twice
expected: No exception
method: 1.create with shard_num=1
2.insert and flush twice (two segments)
3.compact
4.insert new data
5.compact
expected: Merge into one segment
"""
# init collection with tmp_nb default data
collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0:4]
collection_w.delete(f'{ct.default_int64_field_name} in {ids}')
# init collection with one shard, insert into two segments
collection_w = self.collection_insert_multi_segments_one_shard(prefix, nb_of_segment=tmp_nb)
# first compact two segments
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans1 = collection_w.get_compaction_plans()[0]
target_1 = c_plans1.plans[0].target
# insert new data
df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(df)
log.debug(collection_w.num_entities)
# second compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_state()
c_plans2 = collection_w.get_compaction_plans()[0]
assert target_1 in c_plans2.plans[0].sources
log.debug(c_plans2.plans[0].target)
@pytest.mark.tags(CaseLabel.L2)
def test_compact_partition(self):
"""
@ -55,17 +75,53 @@ class TestCompactionParams(TestcaseBase):
method: compact partition
expected: Verify partition segments merged
"""
pass
# create collection with shard_num=1, and create partition
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
# insert flush twice
for i in range(2):
df = cf.gen_default_dataframe_data(tmp_nb)
partition_w.insert(df)
assert partition_w.num_entities == tmp_nb * (i + 1)
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
assert len(c_plans.plans) == 1
assert len(c_plans.plans[0].sources) == 2
target = c_plans.plans[0].target
# verify queryNode load the compacted segments
collection_w.load()
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
assert target == segment_info[0].segmentID
@pytest.mark.tags(CaseLabel.L2)
def test_compact_growing_segment(self):
def test_compact_only_growing_segment(self):
"""
target: test compact growing data
method: 1.insert into multi segments without flush
2.compact
expected: No compaction (compact just for sealed data)
"""
pass
# create and insert without flush
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(df)
# compact when only growing segment
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
assert len(c_plans.plans) == 0
collection_w.load()
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
for segment_info in segments_info:
assert segment_info.state == SegmentState.Growing
@pytest.mark.tags(CaseLabel.L2)
def test_compact_empty_collection(self):
@ -83,64 +139,93 @@ class TestCompactionParams(TestcaseBase):
assert len(c_plans.plans) == 0
@pytest.mark.tags(CaseLabel.L2)
def test_compact_after_delete_single(self):
@pytest.mark.parametrize("delete_pos", [1, tmp_nb // 2])
def test_compact_after_delete(self, delete_pos):
"""
target: test delete one entity and compact
method: 1.create with shard_num=1
2.delete one sealed entity
2.delete one sealed entity, half entities
2.compact
expected: Verify compact result todo
expected: Verify compact result
"""
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
single_expr = f'{ct.default_int64_field_name} in {[0]}'
# create, insert without flush
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data(tmp_nb)
insert_res, _ = collection_w.insert(df)
# delete single entity, flush
single_expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:delete_pos]}'
collection_w.delete(single_expr)
assert collection_w.num_entities == tmp_nb
# compact, get plan
collection_w.compact()
while True:
c_state = collection_w.get_compaction_state()
if c_state.state == State.Completed:
log.debug(f'state: {c_state.state}')
break
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
for plan in c_plans.plans:
assert len(plan.sources) == 1
self.utility_wrap.get_query_segment_info(collection_w.name)
# Delete type compaction just merge insert log and delta log of one segment
# todo assert len(c_plans.plans[0].sources) == 1
@pytest.mark.tags(CaseLabel.L2)
def test_compact_after_delete_half(self):
collection_w.load()
collection_w.query(single_expr, check_items=CheckTasks.check_query_empty)
res = df.iloc[-1:, :1].to_dict('records')
collection_w.query(f'{ct.default_int64_field_name} in {insert_res.primary_keys[-1:]}',
check_items={'exp_res': res})
@pytest.mark.tags(CaseLabel.L1)
def test_compact_delete_ratio(self):
"""
target: test delete half entity and compact
target: test delete entities reaches ratio and auto-compact
method: 1.create with shard_num=1
2.insert and flush
3.delete half of nb
4.compact
expected: collection num_entities decrease
2.insert (compact load delta log, not from dmlChannel)
3.delete 20% of nb, flush
expected: Verify auto compaction, merge insert log and delta log
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
df = cf.gen_default_dataframe_data(tmp_nb)
res, _ = collection_w.insert(df)
assert collection_w.num_entities == tmp_nb
# delete half
half_expr = f'{ct.default_int64_field_name} in {res.primary_keys[:tmp_nb // 2]}'
collection_w.delete(half_expr)
insert_res, _ = collection_w.insert(df)
collection_w.compact()
while True:
c_state = collection_w.get_compaction_state()
if c_state.state == State.Completed:
log.debug(f'state: {c_state.state}')
break
c_plans = collection_w.get_compaction_plans()[0]
for plan in c_plans.plans:
assert len(plan.sources) == 1
# delete 20% entities
ratio_expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:tmp_nb // ct.compact_delta_ratio_reciprocal]}'
collection_w.delete(ratio_expr)
assert collection_w.num_entities == tmp_nb
# auto_compact
sleep(1)
# Delete type compaction just merge insert log and delta log of one segment
# todo assert len(c_plans.plans[0].sources) == 1
collection_w.load()
log.debug(collection_w.num_entities)
collection_w.query(ratio_expr, check_items=CheckTasks.check_query_empty)
self.utility_wrap.get_query_segment_info(collection_w.name)
res = df.iloc[-1:, :1].to_dict('records')
collection_w.query(f'{ct.default_int64_field_name} in {insert_res.primary_keys[-1:]}',
check_items={'exp_res': res})
@pytest.mark.tags(CaseLabel.L2)
def test_compact_delete_less_ratio(self):
"""
target: test delete entities less ratio and no compact
method: 1.create collection shard_num=1
2.insert without flush
3.delete 10% entities and flush
expected: Verify no compact (can't), delete successfully
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
df = cf.gen_default_dataframe_data(tmp_nb)
insert_res, _ = collection_w.insert(df)
# delete 10% entities, ratio = 0.1
less_ratio_reciprocal = 10
ratio_expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:tmp_nb // less_ratio_reciprocal]}'
collection_w.delete(ratio_expr)
assert collection_w.num_entities == tmp_nb
collection_w.load()
collection_w.query(ratio_expr, check_task=CheckTasks.check_query_empty)
@pytest.mark.tags(CaseLabel.L0)
def test_compact_after_delete_all(self):
"""
target: test delete all and compact
@ -153,20 +238,76 @@ class TestCompactionParams(TestcaseBase):
df = cf.gen_default_dataframe_data()
res, _ = collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
expr = f'{ct.default_int64_field_name} in {res.primary_keys}'
collection_w.delete(expr)
assert collection_w.num_entities == ct.default_nb
# assert collection_w.num_entities == ct.default_nb
# currently no way to verify whether it is compact after delete,
# because the merge compact plan is generate first
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
log.debug(collection_w.num_entities)
collection_w.load()
collection_w.query(expr, check_items=CheckTasks.check_query_empty)
@pytest.mark.skip(reason="Waiting for development")
@pytest.mark.skip(reason="TODO")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_delete_max_delete_size(self):
"""
target: test compact delta log reaches max delete size 10MiB
method: todo
expected: auto merge single segment
"""
pass
@pytest.mark.skip(reason="Ci failed")
class TestCompactionOperation(TestcaseBase):
@pytest.mark.tags(CaseLabel.L2)
def test_compact_both_delete_merge(self):
"""
target: test compact both delete and merge
method: 1.create collection with shard_num=1
2.insert data into two segments
3.delete and flush (new insert)
4.compact (
5.load and search
expected:
"""
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
ids = []
for i in range(2):
df = cf.gen_default_dataframe_data(tmp_nb, start=i * tmp_nb)
insert_res, _ = collection_w.insert(df)
assert collection_w.num_entities == (i + 1) * tmp_nb
ids.extend(insert_res.primary_keys)
expr = f'{ct.default_int64_field_name} in {[0, 2*tmp_nb-1]}'
collection_w.delete(expr)
collection_w.insert(cf.gen_default_dataframe_data(1, start=2*tmp_nb))
assert collection_w.num_entities == 2*tmp_nb + 1
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
# search
sleep(5)
ids.pop(0)
ids.pop(-1)
collection_w.load()
search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit,
check_items={"nq": ct.default_nq,
"ids": ids,
"limit": ct.default_limit})
@pytest.mark.tags(CaseLabel.L1)
def test_compact_after_index(self):
"""
target: test compact after create index
@ -176,8 +317,27 @@ class TestCompactionOperation(TestcaseBase):
4.search
expected: Verify segment info and index info
"""
pass
collection_w = self.collection_insert_multi_segments_one_shard(prefix, nb_of_segment=ct.default_nb)
# create index
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
log.debug(collection_w.index())
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
# search
collection_w.load()
search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
assert len(search_res) == ct.default_nq
for hits in search_res:
assert len(hits) == ct.default_limit
@pytest.mark.xfail(reason="Issue #12148")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_after_binary_index(self):
"""
@ -188,9 +348,34 @@ class TestCompactionOperation(TestcaseBase):
4.search
expected: Verify segment info and index info
"""
pass
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1,
schema=cf.gen_default_binary_collection_schema())
@pytest.mark.tags(CaseLabel.L2)
df, _ = cf.gen_default_binary_dataframe_data(ct.default_nb)
for i in range(2):
collection_w.insert(data=df)
assert collection_w.num_entities == (i + 1) * ct.default_nb
# create index
collection_w.create_index(ct.default_binary_vec_field_name, ct.default_binary_index)
log.debug(collection_w.index())
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
# search
collection_w.load()
search_res, _ = collection_w.search(cf.gen_binary_vectors(ct.default_nq, ct.default_dim)[1],
ct.default_binary_vec_field_name,
ct.default_search_params, ct.default_limit)
assert len(search_res) == ct.default_nq
for hits in search_res:
assert len(hits) == ct.default_limit
@pytest.mark.xfail(reason="Issue #12146")
@pytest.mark.tags(CaseLabel.L1)
def test_compact_and_index(self):
"""
target: test compact and create index
@ -200,10 +385,59 @@ class TestCompactionOperation(TestcaseBase):
4.load and search
expected: Verify search result and index info
"""
pass
collection_w = self.collection_insert_multi_segments_one_shard(prefix, nb_of_segment=ct.default_nb)
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
# create index
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
log.debug(collection_w.index())
# search
collection_w.load()
search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
assert len(search_res) == ct.default_nq
for hits in search_res:
assert len(hits) == ct.default_limit
@pytest.mark.tags(CaseLabel.L1)
def test_compact_delete_and_search(self):
"""
target: test delete and compact segment, and search
method: 1.create collection and insert
2.delete part entities
3.compact
4.load and search
expected: Verify search result
"""
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:ct.default_nb // 2]}'
collection_w.delete(expr)
assert collection_w.num_entities == ct.default_nb
collection_w.compact()
# search
sleep(2)
collection_w.load()
search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": ct.default_nq,
"ids": insert_res.primary_keys[ct.default_nb // 2:],
"limit": ct.default_limit}
)
@pytest.mark.tags(CaseLabel.L0)
def test_compact_and_search(self):
def test_compact_merge_and_search(self):
"""
target: test compact and search
method: 1.insert data into two segments
@ -211,8 +445,23 @@ class TestCompactionOperation(TestcaseBase):
3.load and search
expected: Verify search result
"""
pass
collection_w = self.collection_insert_multi_segments_one_shard(prefix, nb_of_segment=ct.default_nb)
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
# search
collection_w.load()
search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
assert len(search_res) == ct.default_nq
for hits in search_res:
assert len(hits) == ct.default_limit
# @pytest.mark.skip(reason="Todo")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_search_after_delete_channel(self):
"""
@ -222,10 +471,36 @@ class TestCompactionOperation(TestcaseBase):
2.delete half
3.compact
4.search
expected: No exception
expected: No compact, compact get delta log from storage
"""
pass
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
collection_w.load()
expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:ct.default_nb // 2]}'
collection_w.delete(expr)
collection_w.compact()
c_plans = collection_w.get_compaction_plans()[0]
assert len(c_plans.plans) == 0
# search
sleep(2)
collection_w.load()
search_res, _ = collection_w.search(cf.gen_vectors(ct.default_nq, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": ct.default_nq,
"ids": insert_res.primary_keys[ct.default_nb // 2:],
"limit": ct.default_limit}
)
@pytest.mark.skip(reason="Todo")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_delete_inside_time_travel(self):
"""
@ -239,6 +514,7 @@ class TestCompactionOperation(TestcaseBase):
"""
pass
@pytest.mark.skip(reason="Todo")
@pytest.mark.tags(CaseLabel.L1)
def test_compact_delete_outside_time_travel(self):
"""
@ -253,26 +529,29 @@ class TestCompactionOperation(TestcaseBase):
"""
target: test compact merge two segments
method: 1.create with shard_num=1
2.insert half nb and flush
3.insert half nb and flush
2.insert and flush
3.insert and flush again
4.compact
5.search
5.load
expected: Verify segments are merged
"""
half = ct.default_nb // 2
# create collection
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
df = cf.gen_default_dataframe_data()
res, _ = collection_w.insert(df[:half])
log.debug(collection_w.num_entities)
collection_w.insert(df[half:])
log.debug(collection_w.num_entities)
num_of_segment = 2
# create collection shard_num=1, insert 2 segments, each with tmp_nb entities
collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment, tmp_nb)
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
state, _ = collection_w.get_compaction_state()
plan, _ = collection_w.get_compaction_plans()
# verify the two segments are merged into one
assert len(c_plans.plans) == 1
assert len(c_plans.plans[0].sources) == 2
target = c_plans.plans[0].target
# verify queryNode load the compacted segments
collection_w.load()
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
assert target == segment_info[0].segmentID
@pytest.mark.tags(CaseLabel.L2)
def test_compact_no_merge(self):
@ -290,15 +569,12 @@ class TestCompactionOperation(TestcaseBase):
assert collection_w.num_entities == tmp_nb
collection_w.compact()
while True:
c_state = collection_w.get_compaction_state()
log.debug(c_state)
if c_state.state == State.Completed and c_state.in_timeout == 0:
break
collection_w.wait_for_compaction_completed()
c_plans, _ = collection_w.get_compaction_plans()
assert len(c_plans.plans) == 0
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="Issue #12075")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_merge_multi_segments(self):
"""
target: test compact and merge multi small segments
@ -308,8 +584,21 @@ class TestCompactionOperation(TestcaseBase):
4.load and search
expected: Verify segments info
"""
pass
# greater than auto-merge threshold 10
num_of_segment = ct.compact_segment_num_threshold + 1
# create collection shard_num=1, insert 11 segments, each with one entity
collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=num_of_segment)
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
collection_w.load()
self.utility_wrap.get_query_segment_info(collection_w.name)
# assert 0 == 1
@pytest.mark.skip(reason="todo")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_merge_inside_time_travel(self):
"""
@ -322,21 +611,48 @@ class TestCompactionOperation(TestcaseBase):
@pytest.mark.tags(CaseLabel.L2)
def test_compact_threshold_auto_merge(self):
"""
target: test num (segment_size < 1/2Max) reaches auto-merge threshold
method: todo
expected: Auto-merge segments
target: test num (segment_size < 1/2Max) reaches auto-merge threshold 10
method: 1.create with shard_num=1
2.insert flush 10 times (merge threshold 10)
3.wait for compaction, load
expected: Get query segments into to verify segments auto-merged into one
"""
pass
threshold = ct.compact_segment_num_threshold
# create collection shard_num=1, insert 10 segments, each with one entity
collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=threshold)
# todo compaction cost
collection_w.get_compaction_plans()
collection_w.load()
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
# verify segments reaches threshold, auto-merge ten segments into one
assert len(segments_info) == 1
@pytest.mark.xfail(reason="Issue #12131")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_less_threshold_no_merge(self):
"""
target: test compact the num of segments that size less than 1/2Max, does not reach the threshold
method: todo
expected: No auto-merge
method: 1.create collection with shard_num = 1
2.insert flush 9 times (segments threshold 10)
3.after a while, load
expected: Verify segments no merge
"""
pass
less_threshold = ct.compact_segment_num_threshold - 1
# create collection shard_num=1, insert 9 segments, each with one entity
collection_w = self.collection_insert_multi_segments_one_shard(prefix, num_of_segment=less_threshold)
sleep(3)
# load and verify no auto-merge
collection_w.load()
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
assert len(segments_info) == less_threshold
@pytest.mark.skip(reason="Todo")
@pytest.mark.tags(CaseLabel.L2)
def test_compact_multi_collections(self):
"""
@ -356,7 +672,18 @@ class TestCompactionOperation(TestcaseBase):
4.load and search
expected: Verify search result and segment info
"""
pass
# create collection shard_num=1, insert 2 segments, each with tmp_nb entities
collection_w = self.collection_insert_multi_segments_one_shard(prefix, nb_of_segment=tmp_nb)
# compact two segments
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
# insert new data, verify insert flush successfully
df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(df)
assert collection_w.num_entities == tmp_nb * 3
@pytest.mark.tags(CaseLabel.L1)
def test_compact_and_delete(self):
@ -367,4 +694,74 @@ class TestCompactionOperation(TestcaseBase):
3.delete and query
expected: Verify deleted ids
"""
pass
# init collection with one shard, insert into two segments
collection_w = self.collection_insert_multi_segments_one_shard(prefix, is_dup=False)
# compact and complete
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.get_compaction_plans()
# delete and query
expr = f'{ct.default_int64_field_name} in {[0]}'
collection_w.delete(expr)
collection_w.load()
collection_w.query(expr, check_task=CheckTasks.check_query_empty)
expr_1 = f'{ct.default_int64_field_name} in {[1]}'
collection_w.query(expr_1, check_task=CheckTasks.check_query_results, check_items={'exp_res': [{'int64': 1}]})
@pytest.mark.tags(CaseLabel.L2)
def test_compact_cross_shards(self):
"""
target: test compact cross shards
method: 1.create with shard_num=2
2.insert once and flush (two segments, belonging to two shards)
3.compact and completed
expected: Verify no compact
"""
# insert into two segments with two shard
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=2)
df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(df)
assert collection_w.num_entities == tmp_nb
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed(timeout=1)
c_plans = collection_w.get_compaction_plans()[0]
# Actually no merged
assert len(c_plans.plans) == 0
@pytest.mark.tags(CaseLabel.L2)
def test_compact_cross_partition(self):
"""
target: test compact cross partitions
method: 1.create with shard_num=1
2.create partition and insert, flush
3.insert _default partition and flush
4.compact
expected: Verify no compact
"""
# create collection and partition
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
# insert
df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(df)
assert collection_w.num_entities == tmp_nb
partition_w.insert(df)
assert collection_w.num_entities == tmp_nb * 2
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
c_plans = collection_w.get_compaction_plans()[0]
# Actually no merged
assert len(c_plans.plans) == 0
collection_w.load()
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
assert segments_info[0].partitionID != segments_info[-1].partitionID