[test]Add performance monitor for ApiCollectionWrapper (#16221)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/16223/head
zhuwenxing 2022-03-28 16:25:27 +08:00 committed by GitHub
parent e0ffe54547
commit 974371c06c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 3 deletions

View File

@ -1,11 +1,13 @@
import sys
import time
import timeout_decorator
from pymilvus import Collection
sys.path.append("..")
from check.func_check import ResponseChecker
from utils.api_request import api_request
from utils.wrapper import trace
from utils.util_log import test_log as log
from pymilvus.orm.types import CONSISTENCY_STRONG
@ -19,7 +21,11 @@ TIMEOUT = 20
class ApiCollectionWrapper:
collection = None
def init_collection(self, name, schema=None, using="default", shards_num=2, check_task=None, check_items=None, **kwargs):
def __init__(self, active_trace=False):
self.active_trace = active_trace
def init_collection(self, name, schema=None, using="default", shards_num=2, check_task=None, check_items=None, active_trace=False, **kwargs):
self.active_trace = active_trace
consistency_level = kwargs.get("consistency_level", CONSISTENCY_STRONG)
kwargs.update({"consistency_level": consistency_level})
@ -59,6 +65,7 @@ class ApiCollectionWrapper:
def _shards_num(self):
return self.collection._shards_num
@trace()
def construct_from_dataframe(self, name, dataframe, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([Collection.construct_from_dataframe, name, dataframe], **kwargs)
@ -67,6 +74,7 @@ class ApiCollectionWrapper:
name=name, dataframe=dataframe, **kwargs).run()
return res, check_result
@trace()
def drop(self, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -76,6 +84,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def load(self, partition_names=None, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -86,6 +95,7 @@ class ApiCollectionWrapper:
partition_names=partition_names, **kwargs).run()
return res, check_result
@trace()
def release(self, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -96,6 +106,7 @@ class ApiCollectionWrapper:
check_items, check, **kwargs).run()
return res, check_result
@trace()
def insert(self, data, partition_name=None, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -107,6 +118,24 @@ class ApiCollectionWrapper:
**kwargs).run()
return res, check_result
@trace()
def flush(self, check_task=None, check_items=None, **kwargs):
#TODO:currently, flush is not supported by sdk in milvus
timeout = kwargs.get("timeout", TIMEOUT)
@timeout_decorator.timeout(timeout, timeout_exception=TimeoutError)
def _flush():
res = self.collection.num_entities
return res
try:
res = _flush()
return res, True
except TimeoutError as e:
log.error(f"flush timeout error: {e}")
res = None
return res, False
@trace()
def search(self, data, anns_field, param, limit, expr=None,
partition_names=None, output_fields=None, timeout=None, round_decimal=-1,
check_task=None, check_items=None, **kwargs):
@ -122,6 +151,7 @@ class ApiCollectionWrapper:
timeout=timeout, **kwargs).run()
return res, check_result
@trace()
def query(self, expr, output_fields=None, partition_names=None, timeout=None, check_task=None, check_items=None,
**kwargs):
# time.sleep(5)
@ -139,6 +169,7 @@ class ApiCollectionWrapper:
def partitions(self):
return self.collection.partitions
@trace()
def partition(self, partition_name, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, succ = api_request([self.collection.partition, partition_name])
@ -146,6 +177,7 @@ class ApiCollectionWrapper:
succ, partition_name=partition_name).run()
return res, check_result
@trace()
def has_partition(self, partition_name, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, succ = api_request([self.collection.has_partition, partition_name])
@ -153,6 +185,7 @@ class ApiCollectionWrapper:
succ, partition_name=partition_name).run()
return res, check_result
@trace()
def drop_partition(self, partition_name, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -163,6 +196,7 @@ class ApiCollectionWrapper:
**kwargs).run()
return res, check_result
@trace()
def create_partition(self, partition_name, check_task=None, check_items=None, description=""):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.create_partition, partition_name, description])
@ -174,12 +208,14 @@ class ApiCollectionWrapper:
def indexes(self):
return self.collection.indexes
@trace()
def index(self, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.index])
check_result = ResponseChecker(res, func_name, check_task, check_items, check).run()
return res, check_result
@trace()
def create_index(self, field_name, index_params, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT * 2)
kwargs.update({"timeout": timeout})
@ -190,12 +226,14 @@ class ApiCollectionWrapper:
field_name=field_name, index_params=index_params, **kwargs).run()
return res, check_result
@trace()
def has_index(self, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.has_index])
check_result = ResponseChecker(res, func_name, check_task, check_items, check).run()
return res, check_result
@trace()
def drop_index(self, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -205,6 +243,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def create_alias(self, alias_name, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -214,6 +253,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def drop_alias(self, alias_name, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -223,6 +263,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def alter_alias(self, alias_name, check_task=None, check_items=None, **kwargs):
timeout = kwargs.get("timeout", TIMEOUT)
kwargs.update({"timeout": timeout})
@ -232,6 +273,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def delete(self, expr, partition_name=None, timeout=None, check_task=None, check_items=None, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
@ -239,6 +281,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def compact(self, timeout=None, check_task=None, check_items=None, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
@ -246,6 +289,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def get_compaction_state(self, timeout=None, check_task=None, check_items=None, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
@ -253,6 +297,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def get_compaction_plans(self, timeout=None, check_task=None, check_items={}, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
@ -260,6 +305,7 @@ class ApiCollectionWrapper:
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
@trace()
def wait_for_compaction_completed(self, timeout=None, **kwargs):
timeout = TIMEOUT * 3 if timeout is None else timeout
res = self.collection.wait_for_compaction_completed(timeout, **kwargs)

View File

@ -26,6 +26,7 @@ pytest-random-order
# for customize config test
python-benedict==0.24.3
timeout-decorator==0.5.0
# version need to be consistent with protobuf used in pymilvus
protobuf==3.17.1

View File

@ -17,7 +17,7 @@ class TestE2e(TestcaseBase):
# create
name = cf.gen_unique_str(prefix)
t0 = time.time()
collection_w = self.init_collection_wrap(name=name)
collection_w = self.init_collection_wrap(name=name, active_trace=True)
tt = time.time() - t0
assert collection_w.name == name
entities = collection_w.num_entities
@ -33,7 +33,9 @@ class TestE2e(TestcaseBase):
# flush
t0 = time.time()
assert collection_w.num_entities == len(data[0]) + entities
num_entities, check_result = collection_w.flush(timeout=30)
assert check_result
assert num_entities == len(data[0]) + entities
tt = time.time() - t0
entities = collection_w.num_entities
log.info(f"assert flush: {tt}, entities: {entities}")

View File

@ -0,0 +1,49 @@
import time
from datetime import datetime
import functools
from utils.util_log import test_log as log
DEFAULT_FMT = '[{start_time}][{end_time}][{elapsed:0.8f}s] {collection_name} {func_name} ({arg_str}) -> {result!r}'
def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
def decorate(func):
@functools.wraps(func)
def inner_wrapper(*args, **kwargs):
# args[0] is an instance of ApiCollectionWrapper class
flag = args[0].active_trace
if flag:
start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
t0 = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - t0
end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
func_name = func.__name__
collection_name = args[0].collection.name
arg_lst = [repr(arg) for arg in args[1:]][:100]
arg_lst.extend(f'{k}={v!r}' for k, v in kwargs.items())
arg_str = ', '.join(arg_lst)[:200]
log_str = f"[{prefix}]" + fmt.format(**locals())
# TODO: add report function in this place, like uploading to influxdb
# it is better a async way to do this, in case of blocking the request processing
log.info(log_str)
return result
else:
result = func(*args, **kwargs)
return result
return inner_wrapper
return decorate
if __name__ == '__main__':
@trace()
def snooze(seconds, name='snooze'):
time.sleep(seconds)
return name
# print(f"name: {name}")
for i in range(3):
res = snooze(.123, name=i)
print("res:",res)