Add flush all test cases (#23129) (#23516)

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
pull/23575/head
ThreadDao 2023-04-20 16:37:02 +08:00 committed by GitHub
parent 0f1d9afe00
commit fba57d95ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 214 additions and 26 deletions

View File

@ -63,6 +63,10 @@ class ApiCollectionWrapper:
self.flush()
return self.collection.num_entities
@property
def num_entities_without_flush(self):
return self.collection.num_entities
@property
def primary_field(self):
return self.collection.primary_field

View File

@ -9,6 +9,7 @@ from utils.api_request import api_request
from pymilvus import BulkInsertState
from pymilvus import Role
from utils.util_log import test_log as log
TIMEOUT = 20
@ -33,14 +34,16 @@ class ApiUtilityWrapper:
log.info(f"after bulk load, there are {len(working_tasks)} working tasks")
return res, check_result
def get_bulk_insert_state(self, task_id, timeout=None, using="default", check_task=None, check_items=None, **kwargs):
def get_bulk_insert_state(self, task_id, timeout=None, using="default", check_task=None, check_items=None,
**kwargs):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.get_bulk_insert_state, task_id, timeout, using], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
task_id=task_id, using=using).run()
return res, check_result
def list_bulk_insert_tasks(self, limit=0, collection_name=None, timeout=None, using="default", check_task=None, check_items=None, **kwargs):
def list_bulk_insert_tasks(self, limit=0, collection_name=None, timeout=None, using="default", check_task=None,
check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.list_bulk_insert_tasks, limit, collection_name, timeout, using], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
@ -88,7 +91,7 @@ class ApiUtilityWrapper:
unknown = unknown + 1
log.info("There are", len(tasks), "bulkload tasks.", pending, "pending,", started, "started,", persisted,
"persisted,", completed, "completed,", failed, "failed", failed_and_cleaned, "failed_and_cleaned",
"persisted,", completed, "completed,", failed, "failed", failed_and_cleaned, "failed_and_cleaned",
unknown, "unknown")
def wait_for_bulk_insert_tasks_completed(self, task_ids, target_state=BulkInsertState.ImportCompleted,
@ -109,7 +112,8 @@ class ApiUtilityWrapper:
log.info(f"wait bulk load timeout is {task_timeout}")
pending_tasks = self.get_bulk_insert_pending_list()
log.info(f"before waiting, there are {len(pending_tasks)} pending tasks")
while len(tasks_state_distribution["success"])+len(tasks_state_distribution["failed"]) < len(task_ids) and end-start <= task_timeout:
while len(tasks_state_distribution["success"]) + len(tasks_state_distribution["failed"]) < len(
task_ids) and end - start <= task_timeout:
time.sleep(2)
for task_id in task_ids:
@ -134,21 +138,22 @@ class ApiUtilityWrapper:
if task_id in tasks_state_distribution["in_progress"]:
tasks_state_distribution["in_progress"].remove(task_id)
tasks_state_distribution["success"].add(task_id)
elif state.state in [BulkInsertState.ImportPending, BulkInsertState.ImportStarted, BulkInsertState.ImportPersisted]:
elif state.state in [BulkInsertState.ImportPending, BulkInsertState.ImportStarted,
BulkInsertState.ImportPersisted]:
tasks_state_distribution["in_progress"].add(task_id)
else:
tasks_state_distribution["failed"].add(task_id)
end = time.time()
pending_tasks = self.get_bulk_insert_pending_list()
log.info(f"after waiting, there are {len(pending_tasks)} pending tasks")
log.info(f"task state distribution: {tasks_state_distribution}")
log.info(tasks_state)
if len(tasks_state_distribution["success"]) == len(task_ids):
log.info(f"wait for bulk load tasks completed successfully, cost time: {end-start}")
log.info(f"wait for bulk load tasks completed successfully, cost time: {end - start}")
return True, tasks_state
else:
log.info(f"wait for bulk load tasks completed failed, cost time: {end-start}")
log.info(f"wait for bulk load tasks completed failed, cost time: {end - start}")
return False, tasks_state
def wait_all_pending_tasks_finished(self):
@ -162,7 +167,8 @@ class ApiUtilityWrapper:
log.info(f"current tasks states: {task_states_map}")
pending_tasks = self.get_bulk_insert_pending_list()
working_tasks = self.get_bulk_insert_working_list()
log.info(f"in the start, there are {len(working_tasks)} working tasks, {working_tasks} {len(pending_tasks)} pending tasks, {pending_tasks}")
log.info(
f"in the start, there are {len(working_tasks)} working tasks, {working_tasks} {len(pending_tasks)} pending tasks, {pending_tasks}")
time_cnt = 0
pending_task_ids = set()
while len(pending_tasks) > 0:
@ -174,7 +180,8 @@ class ApiUtilityWrapper:
for task_id in pending_tasks.keys():
cur_pending_task_ids.append(task_id)
pending_task_ids.add(task_id)
log.info(f"after {time_cnt}, there are {len(working_tasks)} working tasks, {len(pending_tasks)} pending tasks")
log.info(
f"after {time_cnt}, there are {len(working_tasks)} working tasks, {len(pending_tasks)} pending tasks")
log.debug(f"total pending tasks: {pending_task_ids} current pending tasks: {cur_pending_task_ids}")
log.info(f"after {time_cnt}, all pending tasks are finished")
all_tasks, _ = self.list_bulk_insert_tasks()
@ -331,7 +338,7 @@ class ApiUtilityWrapper:
def create_user(self, user, password, using="default", check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.create_user, user, password, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,using=using).run()
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, using=using).run()
return res, check_result
def list_usernames(self, using="default", check_task=None, check_items=None):
@ -475,24 +482,35 @@ class ApiUtilityWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def transfer_node(self, source, target, num_node, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
def transfer_node(self, source, target, num_node, using="default", timeout=None, check_task=None, check_items=None,
**kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.transfer_node, source, target, num_node, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def transfer_replica(self, source, target, collection_name, num_replica, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
def transfer_replica(self, source, target, collection_name, num_replica, using="default", timeout=None,
check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.transfer_replica, source, target, collection_name,num_replica, using, timeout], **kwargs)
res, check = api_request(
[self.ut.transfer_replica, source, target, collection_name, num_replica, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def rename_collection(self, old_collection_name, new_collection_name, timeout=None, check_task=None,
check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.rename_collection, old_collection_name, new_collection_name, timeout], **kwargs)
res, check = api_request([self.ut.rename_collection, old_collection_name, new_collection_name, timeout],
**kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check,
old_collection_name=old_collection_name, new_collection_name=new_collection_name,
timeout=timeout, **kwargs).run()
return res, check_result
def flush_all(self, using="default", timeout=None, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.ut.flush_all, using, timeout], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check,
using=using, timeout=timeout, **kwargs).run()
return res, check_result

View File

@ -63,6 +63,7 @@ max_field_num = 64 # Maximum number of fields in a collection
max_name_length = 255 # Maximum length of name for a collection or alias
default_replica_num = 1
default_graceful_time = 5 #
max_shards_num = 64
IMAGE_REPOSITORY_MILVUS = "harbor.milvus.io/dockerhub/milvusdb/milvus"
NAMESPACE_CHAOS_TESTING = "chaos-testing"

View File

@ -5,6 +5,7 @@ import pytest
from pymilvus import DefaultConfig
from pymilvus.exceptions import MilvusException
from base.client_base import TestcaseBase
from base.collection_wrapper import ApiCollectionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from utils.util_log import test_log as log
from common import common_func as cf
@ -534,7 +535,8 @@ class TestUtilityParams(TestcaseBase):
self.utility_wrap.rename_collection(old_collection_name, new_collection_name,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "`collection_name` value {} is illegal".format(old_collection_name)})
"err_msg": "`collection_name` value {} is illegal".format(
old_collection_name)})
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection_old_invalid_value(self, get_invalid_value_collection_name):
@ -601,7 +603,8 @@ class TestUtilityParams(TestcaseBase):
self.utility_wrap.rename_collection(old_collection_name, new_collection_name,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "can't find collection: {}".format(collection_w.name)})
"err_msg": "can't find collection: {}".format(
collection_w.name)})
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection_existed_collection_name(self):
@ -617,7 +620,8 @@ class TestUtilityParams(TestcaseBase):
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "duplicated new collection name :{} with other "
"collection name or alias".format(collection_w.name)})
"collection name or alias".format(
collection_w.name)})
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection_existed_collection_alias(self):
@ -656,6 +660,7 @@ class TestUtilityParams(TestcaseBase):
"err_msg": "unsupported use an alias to "
"rename collection, alias:{}".format(alias)})
class TestUtilityBase(TestcaseBase):
""" Test case of index interface """
@ -1642,6 +1647,7 @@ class TestUtilityBase(TestcaseBase):
assert collection_alias[0] in collections
assert old_collection_name not in collections
class TestUtilityAdvanced(TestcaseBase):
""" Test case of index interface """
@ -1848,7 +1854,7 @@ class TestUtilityAdvanced(TestcaseBase):
if x in segment_distribution else 0, reverse=True)
# add node id greater than all querynodes, which is not exist for querynode, to src_node_ids
max_query_node_id = max(all_querynodes)
invalid_src_node_id = max_query_node_id+1
invalid_src_node_id = max_query_node_id + 1
src_node_id = all_querynodes[0]
dst_node_ids = all_querynodes[1:]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
@ -2364,8 +2370,8 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
# 3.reset password with the wrong username
self.utility_wrap.update_password(user="hobo", old_password=old_password, new_password="qwaszx1",
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 30})
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 30})
@pytest.mark.tags(ct.CaseLabel.L3)
@pytest.mark.parametrize("user", ["demo"])
@ -2386,8 +2392,8 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
# 3.reset password with the wrong new password
self.utility_wrap.update_password(user=user, old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 5})
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 5})
@pytest.mark.tags(ct.CaseLabel.L3)
@pytest.mark.parametrize("user", ["genny"])
@ -2401,8 +2407,8 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user, password="qwaszx0")
self.utility_wrap.update_password(user=user, old_password="waszx0", new_password="123456",
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 30})
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 30})
@pytest.mark.tags(ct.CaseLabel.L3)
def test_delete_user_root(self, host, port):
@ -2764,7 +2770,7 @@ class TestUtilityRBAC(TestcaseBase):
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role(r_name)
self.utility_wrap.role_revoke("Global", "*", "CreateCollection")
self.utility_wrap.role_revoke("Global", "*", "CreateCollection")
# verify revoke is success
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
@ -4051,3 +4057,162 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.create_role()
error = {"err_code": 41, "err_msg": "the privilege name[%s] in the privilege entity is invalid" % p_name}
self.utility_wrap.role_revoke("Global", "*", p_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L3)
class TestUtilityFlushAll(TestcaseBase):
def test_flush_all_multi_collections(self):
"""
target: test flush multi collections
method: 1.create multi collections
2. insert data into each collections without flushing
3. delete some data
4. flush all collections
5. verify num entities
6. create index -> load -> query deleted ids -> query no-deleted ids
expected: the insert and delete data of all collections are flushed
"""
collection_num = 5
collection_names = []
delete_num = 100
for i in range(collection_num):
collection_w, _, _, insert_ids, _ = self.init_collection_general(prefix, insert_data=True, is_flush=False,
is_index=True)
collection_w.delete(f'{ct.default_int64_field_name} in {insert_ids[:delete_num]}')
collection_names.append(collection_w.name)
self.utility_wrap.flush_all(timeout=60)
cw = ApiCollectionWrapper()
for c in collection_names:
cw.init_collection(name=c)
assert cw.num_entities_without_flush == ct.default_nb
cw.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
cw.load()
cw.query(f'{ct.default_int64_field_name} in {insert_ids[:100]}', check_task=CheckTasks.check_query_empty)
res, _ = cw.query(f'{ct.default_int64_field_name} not in {insert_ids[:delete_num]}')
assert len(res) == ct.default_nb - delete_num
def test_flush_all_multi_shards_timeout(self):
"""
target: test flush_all collection with max shards_num
method: 1.create collection with max shards_num
2.insert data
3.flush all with a small timeout and gets exception
4.flush and verify num entities
expected:
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=ct.max_shards_num)
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
error = {ct.err_code: 1, ct.err_msg: "wait for flush all timeout"}
self.utility_wrap.flush_all(timeout=5, check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.flush_all(timeout=120)
assert collection_w.num_entities_without_flush == ct.default_nb
def test_flush_all_no_collections(self):
"""
target: test flush all without any collections
method: connect and flush all
expected: no exception
"""
self._connect()
self.utility_wrap.flush_all(check_task=None)
def test_flush_all_async(self):
"""
target: test flush all collections with _async
method: flush all collections and _async=True
expected: finish flush all collection within a period of time
"""
collection_num = 5
collection_names = []
delete_num = 100
for i in range(collection_num):
collection_w, _, _, insert_ids, _ = self.init_collection_general(prefix, insert_data=True, is_flush=False,
is_index=True)
collection_w.delete(f'{ct.default_int64_field_name} in {insert_ids[:delete_num]}')
collection_names.append(collection_w.name)
self.utility_wrap.flush_all(_async=True)
_async_timeout = 60
cw = ApiCollectionWrapper()
start = time.time()
while time.time() - start < _async_timeout:
time.sleep(2.0)
flush_flag = False
for c in collection_names:
cw.init_collection(name=c)
if cw.num_entities_without_flush == ct.default_nb:
flush_flag = True
else:
log.debug(f"collection num entities: {cw.num_entities_without_flush} of collection {c}")
flush_flag = False
if flush_flag:
break
if time.time() - start > _async_timeout:
raise MilvusException(1, f"Waiting more than {_async_timeout}s for the flush all")
def test_flush_all_while_insert_delete(self):
"""
target: test flush all while insert and delete
method: 1. prepare multi collections
2. flush_all while inserting and deleting
expected:
"""
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
collection_num = 5
collection_names = []
delete_num = 100
delete_ids = [_i for _i in range(delete_num)]
for i in range(collection_num):
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=4)
df = cf.gen_default_dataframe_data(nb=ct.default_nb, start=0)
collection_w.insert(df)
collection_names.append(collection_w.name)
def do_insert():
cw = ApiCollectionWrapper()
df = cf.gen_default_dataframe_data(nb=ct.default_nb, start=ct.default_nb)
for c_name in collection_names:
cw.init_collection(c_name)
insert_res, _ = cw.insert(df)
assert insert_res.insert_count == ct.default_nb
def do_delete():
cw = ApiCollectionWrapper()
for c_name in collection_names:
cw.init_collection(c_name)
del_res, _ = cw.delete(f"{ct.default_int64_field_name} in {delete_ids}")
assert del_res.delete_count == delete_num
def do_flush_all():
time.sleep(2)
self.utility_wrap.flush_all(timeout=600)
executor = ThreadPoolExecutor(max_workers=3)
insert_task = executor.submit(do_insert)
delete_task = executor.submit(do_delete)
flush_task = executor.submit(do_flush_all)
# wait all tasks completed
wait([insert_task, delete_task, flush_task], return_when=ALL_COMPLETED)
# verify
for c in collection_names:
cw = ApiCollectionWrapper()
cw.init_collection(name=c)
log.debug(f"num entities: {cw.num_entities_without_flush}")
assert cw.num_entities_without_flush >= ct.default_nb
assert cw.num_entities_without_flush <= ct.default_nb * 2
cw.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
cw.load()
cw.query(f'{ct.default_int64_field_name} in {delete_ids}', check_task=CheckTasks.check_query_empty)
res, _ = cw.query(f'{ct.default_int64_field_name} not in {delete_ids}')
assert len(res) == ct.default_nb * 2 - delete_num