test: add freshness checker (#30280)

add freshness checker

insert/upsert --> query:  Get the time when it can be queried

delete --> query: Get the time when it can not be queried

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/30335/head
zhuwenxing 2024-01-29 12:09:01 +08:00 committed by GitHub
parent 033eae9e73
commit aab7cc9ecd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 329 additions and 65 deletions

View File

@ -1,7 +1,7 @@
import pytest
import unittest
from enum import Enum
from random import randint
import random
import time
import threading
import os
@ -202,7 +202,9 @@ class Op(Enum):
create_collection = 'create_collection'
create_partition = 'create_partition'
insert = 'insert'
insert_freshness = 'insert_freshness'
upsert = 'upsert'
upsert_freshness = 'upsert_freshness'
flush = 'flush'
index = 'index'
create_index = 'create_index'
@ -216,6 +218,7 @@ class Op(Enum):
search = 'search'
query = 'query'
delete = 'delete'
delete_freshness = 'delete_freshness'
compact = 'compact'
drop = 'drop' # short name for drop collection
drop_db = 'drop_db'
@ -282,14 +285,22 @@ def exception_handler():
def wrapper(func):
@functools.wraps(func)
def inner_wrapper(self, *args, **kwargs):
class_name = None
function_name = None
try:
function_name = func.__name__
class_name = getattr(self, '__class__', None).__name__ if self else None
res, result = func(self, *args, **kwargs)
return res, result
except Exception as e:
log_row_length = 300
e_str = str(e)
log_e = e_str[0:log_row_length] + \
'......' if len(e_str) > log_row_length else e_str
log_e = e_str[0:log_row_length] + '......' if len(e_str) > log_row_length else e_str
if class_name:
log_message = f"Error in {class_name}.{function_name}: {log_e}"
else:
log_message = f"Error in {function_name}: {log_e}"
log.error(log_message)
log.error(log_e)
return Error(e), False
@ -306,7 +317,7 @@ class Checker:
"""
def __init__(self, collection_name=None, partition_name=None, shards_num=2, dim=ct.default_dim, insert_data=True,
schema=None):
schema=None, replica_number=1, **kwargs):
self.recovery_time = 0
self._succ = 0
self._fail = 0
@ -337,26 +348,45 @@ class Checker:
shards_num=shards_num,
timeout=timeout,
enable_traceback=enable_traceback)
self.index_name = "vec_index"
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=self.index_name,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.replica_number = replica_number
self.c_wrap.load(replica_number=self.replica_number)
self.p_wrap.init_partition(self.c_name, self.p_name)
if insert_data:
log.info(f"collection {c_name} created, start to insert data")
t0 = time.perf_counter()
self.c_wrap.insert(
data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0),
data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema),
partition_name=self.p_name,
timeout=timeout,
enable_traceback=enable_traceback)
log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s")
self.initial_entities = self.c_wrap.num_entities # do as a flush
self.scale = 100000 # timestamp scale to make time.time() as int64
def insert_data(self, nb=constants.ENTITIES_FOR_SEARCH, partition_name=None):
def insert_data(self, nb=constants.DELTA_PER_INS, partition_name=None):
partition_name = self.p_name if partition_name is None else partition_name
self.c_wrap.insert(
data=cf.get_column_data_by_schema(nb=nb, schema=self.schema, start=0),
partition_name=partition_name,
timeout=timeout,
enable_traceback=enable_traceback)
data = cf.get_column_data_by_schema(nb=nb, schema=self.schema)
ts_data = []
for i in range(nb):
time.sleep(0.001)
offset_ts = int(time.time() * self.scale)
ts_data.append(offset_ts)
data[0] = ts_data # set timestamp (ms) as int64
log.debug(f"insert data: {ts_data}")
res, result = self.c_wrap.insert(data=data,
partition_name=partition_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
return res, result
def total(self):
return self._succ + self._fail
@ -450,7 +480,7 @@ class CollectionLoadChecker(Checker):
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -483,7 +513,7 @@ class CollectionReleaseChecker(Checker):
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -513,14 +543,16 @@ class PartitionLoadChecker(Checker):
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
self.replica_number = replica_number
if collection_name is None:
collection_name = cf.gen_unique_str("LoadChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
collection_name = cf.gen_unique_str("PartitionLoadChecker_")
p_name = cf.gen_unique_str("PartitionLoadChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, partition_name=p_name)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.release()
@trace()
def load_partition(self):
@ -546,14 +578,16 @@ class PartitionReleaseChecker(Checker):
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
self.replica_number = replica_number
if collection_name is None:
collection_name = cf.gen_unique_str("LoadChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
collection_name = cf.gen_unique_str("PartitionReleaseChecker_")
p_name = cf.gen_unique_str("PartitionReleaseChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, partition_name=p_name)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.release()
self.p_wrap.load(replica_number=self.replica_number)
@trace()
@ -583,7 +617,7 @@ class SearchChecker(Checker):
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -718,7 +752,7 @@ class InsertChecker(Checker):
ts_data.append(offset_ts)
data[0] = ts_data # set timestamp (ms) as int64
log.debug(f"insert data: {ts_data}")
log.debug(f"insert data: {len(ts_data)}")
res, result = self.c_wrap.insert(data=data,
partition_names=self.p_names,
timeout=timeout,
@ -728,6 +762,7 @@ class InsertChecker(Checker):
@exception_handler()
def run_task(self):
res, result = self.insert_entities()
return res, result
@ -740,7 +775,7 @@ class InsertChecker(Checker):
try:
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -766,18 +801,76 @@ class InsertChecker(Checker):
pytest.assume(set(data_in_server) == set(data_in_client))
class InsertFreshnessChecker(Checker):
"""check insert freshness operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
self.latest_data = None
if collection_name is None:
collection_name = cf.gen_unique_str("InsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
self.inserted_data = []
self.scale = 1 * 10 ** 6
self.start_time_stamp = int(time.time() * self.scale) # us
self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}'
self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet"
def insert_entities(self):
data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
ts_data = []
for i in range(constants.DELTA_PER_INS):
time.sleep(0.001)
offset_ts = int(time.time() * self.scale)
ts_data.append(offset_ts)
data[0] = ts_data # set timestamp (ms) as int64
log.debug(f"insert data: {len(ts_data)}")
res, result = self.c_wrap.insert(data=data,
partition_names=self.p_names,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.latest_data = ts_data[-1]
self.term_expr = f'{self.int64_field_name} == {self.latest_data}'
return res, result
@trace()
def insert_freshness(self):
while True:
res, result = self.c_wrap.query(self.term_expr, timeout=timeout,
output_fields=[f'{self.int64_field_name}'],
check_task=CheckTasks.check_nothing)
if len(res) == 1 and res[0][f"{self.int64_field_name}"] == self.latest_data:
break
return res, result
@exception_handler()
def run_task(self):
res, result = self.insert_entities()
res, result = self.insert_freshness()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
class UpsertChecker(Checker):
"""check upsert operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("InsertChecker_")
collection_name = cf.gen_unique_str("UpsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
@trace()
def upsert_entities(self):
data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
res, result = self.c_wrap.upsert(data=data,
res, result = self.c_wrap.upsert(data=self.data,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -785,13 +878,70 @@ class UpsertChecker(Checker):
@exception_handler()
def run_task(self):
# half of the data is upsert, the other half is insert
rows = len(self.data[0])
pk_old = self.data[0][:rows // 2]
self.data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
pk_new = self.data[0][rows // 2:]
pk_update = pk_old + pk_new
self.data[0] = pk_update
res, result = self.upsert_entities()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
sleep(constants.WAIT_PER_OP * 6)
class UpsertFreshnessChecker(Checker):
"""check upsert freshness operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
self.term_expr = None
self.latest_data = None
if collection_name is None:
collection_name = cf.gen_unique_str("UpsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
def upsert_entities(self):
res, result = self.c_wrap.upsert(data=self.data,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
return res, result
@trace()
def upsert_freshness(self):
while True:
res, result = self.c_wrap.query(self.term_expr, timeout=timeout,
output_fields=[f'{self.int64_field_name}'],
check_task=CheckTasks.check_nothing)
if len(res) == 1 and res[0][f"{self.int64_field_name}"] == self.latest_data:
break
return res, result
@exception_handler()
def run_task(self):
# half of the data is upsert, the other half is insert
rows = len(self.data[0])
pk_old = self.data[0][:rows // 2]
self.data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
pk_new = self.data[0][rows // 2:]
pk_update = pk_old + pk_new
self.data[0] = pk_update
self.latest_data = self.data[0][-1]
self.term_expr = f'{self.int64_field_name} == {self.latest_data}'
res, result = self.upsert_entities()
res, result = self.upsert_freshness()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP * 6)
class CollectionCreateChecker(Checker):
@ -815,8 +965,10 @@ class CollectionCreateChecker(Checker):
@exception_handler()
def run_task(self):
res, result = self.init_collection()
if result:
self.c_wrap.drop(timeout=timeout)
# if result:
# # 50% chance to drop collection
# if random.randint(0, 1) == 0:
# self.c_wrap.drop(timeout=timeout)
return res, result
def keep_running(self):
@ -878,6 +1030,17 @@ class PartitionCreateChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("PartitionCreateChecker_")
super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name)
c_name = cf.gen_unique_str("PartitionDropChecker_")
self.c_wrap.init_collection(name=c_name, schema=self.schema)
self.c_name = c_name
log.info(f"collection {c_name} created")
self.p_wrap.init_partition(collection=self.c_name,
name=cf.gen_unique_str("PartitionDropChecker_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing
)
log.info(f"partition: {self.p_wrap}")
@trace()
def create_partition(self):
@ -892,8 +1055,6 @@ class PartitionCreateChecker(Checker):
@exception_handler()
def run_task(self):
res, result = self.create_partition()
if result:
self.p_wrap.drop(timeout=timeout)
return res, result
def keep_running(self):
@ -909,12 +1070,17 @@ class PartitionDropChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("PartitionDropChecker_")
super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name)
c_name = cf.gen_unique_str("PartitionDropChecker_")
self.c_wrap.init_collection(name=c_name, schema=self.schema)
self.c_name = c_name
log.info(f"collection {c_name} created")
self.p_wrap.init_partition(collection=self.c_name,
name=cf.gen_unique_str("PartitionDropChecker_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing
)
log.info(f"partition: {self.p_wrap}")
@trace()
def drop_partition(self):
@ -925,12 +1091,14 @@ class PartitionDropChecker(Checker):
def run_task(self):
res, result = self.drop_partition()
if result:
self.p_wrap.init_partition(collection=self.c_name,
name=cf.gen_unique_str("PartitionDropChecker_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing
)
# create two partition then drop one
for i in range(2):
self.p_wrap.init_partition(collection=self.c_name,
name=cf.gen_unique_str("PartitionDropChecker_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing
)
return res, result
def keep_running(self):
@ -1004,7 +1172,6 @@ class IndexCreateChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("IndexChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.index_name = cf.gen_unique_str('index_')
for i in range(5):
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout, enable_traceback=enable_traceback)
@ -1022,9 +1189,11 @@ class IndexCreateChecker(Checker):
@exception_handler()
def run_task(self):
c_name = cf.gen_unique_str("IndexCreateChecker_")
self.c_wrap.init_collection(name=c_name, schema=self.schema)
res, result = self.create_index()
if result:
self.c_wrap.drop_index(timeout=timeout)
self.c_wrap.drop_index(timeout=timeout, index_name=self.index_name)
return res, result
def keep_running(self):
@ -1040,17 +1209,11 @@ class IndexDropChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("IndexChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.index_name = cf.gen_unique_str('index_')
for i in range(5):
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout, enable_traceback=enable_traceback)
# do as a flush before indexing
log.debug(f"Index ready entities: {self.c_wrap.num_entities}")
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=self.index_name,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@trace()
def drop_index(self):
@ -1061,6 +1224,7 @@ class IndexDropChecker(Checker):
def run_task(self):
res, result = self.drop_index()
if result:
self.c_wrap.init_collection(name=cf.gen_unique_str("IndexDropChecker_"), schema=self.schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=self.index_name,
@ -1070,6 +1234,12 @@ class IndexDropChecker(Checker):
def keep_running(self):
while self._keep_running:
self.c_wrap.init_collection(name=cf.gen_unique_str("IndexDropChecker_"), schema=self.schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=self.index_name,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.run_task()
sleep(constants.WAIT_PER_OP * 6)
@ -1083,8 +1253,7 @@ class QueryChecker(Checker):
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -1102,7 +1271,7 @@ class QueryChecker(Checker):
def run_task(self):
int_values = []
for _ in range(5):
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))
int_values.append(random.randint(0, constants.ENTITIES_FOR_SEARCH))
self.term_expr = f'{self.int64_field_name} in {int_values}'
res, result = self.query()
return res, result
@ -1122,35 +1291,118 @@ class DeleteChecker(Checker):
super().__init__(collection_name=collection_name, schema=schema)
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.load() # load before query
self.insert_data()
term_expr = f'{self.int64_field_name} > 0'
res, _ = self.c_wrap.query(term_expr, output_fields=[
self.int64_field_name])
query_expr = f'{self.int64_field_name} > 0'
res, _ = self.c_wrap.query(query_expr,
output_fields=[self.int64_field_name],
partition_name=self.p_name)
self.ids = [r[self.int64_field_name] for r in res]
self.expr = None
self.query_expr = query_expr
delete_ids = self.ids[:len(self.ids) // 2] # delete half of ids
self.delete_expr = f'{self.int64_field_name} in {delete_ids}'
def update_delete_expr(self):
res, _ = self.c_wrap.query(self.query_expr,
output_fields=[self.int64_field_name],
partition_name=self.p_name)
all_ids = [r[self.int64_field_name] for r in res]
if len(all_ids) < 100:
# insert data to make sure there are enough ids to delete
self.insert_data(nb=10000)
res, _ = self.c_wrap.query(self.query_expr,
output_fields=[self.int64_field_name],
partition_name=self.p_name)
all_ids = [r[self.int64_field_name] for r in res]
delete_ids = all_ids[:len(all_ids) // 2] # delete half of ids
self.delete_expr = f'{self.int64_field_name} in {delete_ids}'
@trace()
def delete_entities(self):
res, result = self.c_wrap.delete(expr=self.expr, timeout=timeout)
res, result = self.c_wrap.delete(expr=self.delete_expr, timeout=timeout, partition_name=self.p_name)
return res, result
@exception_handler()
def run_task(self):
delete_ids = self.ids.pop()
self.expr = f'{self.int64_field_name} in {[delete_ids]}'
self.update_delete_expr()
res, result = self.delete_entities()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
sleep(constants.WAIT_PER_OP)
class DeleteFreshnessChecker(Checker):
"""check delete freshness operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DeleteChecker_")
super().__init__(collection_name=collection_name, schema=schema)
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.load() # load before query
self.insert_data()
query_expr = f'{self.int64_field_name} > 0'
res, _ = self.c_wrap.query(query_expr,
output_fields=[self.int64_field_name],
partition_name=self.p_name)
self.ids = [r[self.int64_field_name] for r in res]
self.query_expr = query_expr
delete_ids = self.ids[:len(self.ids) // 2] # delete half of ids
self.delete_expr = f'{self.int64_field_name} in {delete_ids}'
def update_delete_expr(self):
res, _ = self.c_wrap.query(self.query_expr,
output_fields=[self.int64_field_name],
partition_name=self.p_name)
all_ids = [r[self.int64_field_name] for r in res]
if len(all_ids) < 100:
# insert data to make sure there are enough ids to delete
self.insert_data(nb=10000)
res, _ = self.c_wrap.query(self.query_expr,
output_fields=[self.int64_field_name],
partition_name=self.p_name)
all_ids = [r[self.int64_field_name] for r in res]
delete_ids = all_ids[:len(all_ids) // 2] # delete half of ids
self.delete_expr = f'{self.int64_field_name} in {delete_ids}'
def delete_entities(self):
res, result = self.c_wrap.delete(expr=self.delete_expr, timeout=timeout, partition_name=self.p_name)
return res, result
@trace()
def delete_freshness(self):
while True:
res, result = self.c_wrap.query(self.delete_expr, timeout=timeout,
output_fields=[f'{self.int64_field_name}'],
check_task=CheckTasks.check_nothing)
if len(res) == 0:
break
return res, result
@exception_handler()
def run_task(self):
self.update_delete_expr()
res, result = self.delete_entities()
res, result = self.delete_freshness()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class CompactChecker(Checker):
@ -1163,8 +1415,7 @@ class CompactChecker(Checker):
self.ut = ApiUtilityWrapper()
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -1198,8 +1449,7 @@ class LoadBalanceChecker(Checker):
self.utility_wrap = ApiUtilityWrapper()
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)

View File

@ -12,7 +12,7 @@ CHAOS_GROUP = 'chaos-mesh.org' # chaos mesh group
CHAOS_VERSION = 'v1alpha1' # chaos mesh version
SUCC = 'succ'
FAIL = 'fail'
DELTA_PER_INS = 10 # entities per insert
DELTA_PER_INS = 3000 # entities per insert
ENTITIES_FOR_SEARCH = 3000 # entities for search_collection
ENTITIES_FOR_BULKINSERT = 1000000 # entities for bulk insert
CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chaos path

View File

@ -2,7 +2,7 @@ import time
import pytest
from time import sleep
from pymilvus import connections
from pymilvus import connections, db
from chaos.checker import (
DatabaseCreateChecker,
DatabaseDropChecker,
@ -52,7 +52,7 @@ class TestBase:
class TestOperations(TestBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port, user, password, milvus_ns):
def connection(self, host, port, user, password, milvus_ns, database_name):
if user and password:
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
@ -60,7 +60,11 @@ class TestOperations(TestBase):
connections.connect('default', host=host, port=port)
if connections.has_connection("default") is False:
raise Exception("no connections")
log.info("connect to milvus successfully")
all_dbs = db.list_database()
if database_name not in all_dbs:
db.create_database(database_name)
db.using_database(database_name)
log.info(f"connect to milvus {host}:{port}, db {database_name} successfully")
self.host = host
self.port = port
self.user = user

View File

@ -77,6 +77,7 @@ class TestOperations(TestBase):
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
}
log.info(f"init_health_checkers: {checkers}")
self.health_checkers = checkers
@pytest.fixture(scope="function", params=get_all_collections())

View File

@ -31,6 +31,7 @@ def pytest_addoption(parser):
parser.addoption("--handler", action="store", default="GRPC", help="handler of request")
parser.addoption("--tag", action="store", default="all", help="only run tests matching the tag.")
parser.addoption('--dry_run', action='store_true', default=False, help="")
parser.addoption('--database_name', action='store', default="default", help="name of database")
parser.addoption('--partition_name', action='store', default="partition_name", help="name of partition")
parser.addoption('--connect_name', action='store', default="connect_name", help="name of connect")
parser.addoption('--descriptions', action='store', default="partition_des", help="descriptions of partition")
@ -116,6 +117,11 @@ def connect_name(request):
return request.config.getoption("--connect_name")
@pytest.fixture
def database_name(request):
return request.config.getoption("--database_name")
@pytest.fixture
def partition_name(request):
return request.config.getoption("--partition_name")

View File

@ -47,6 +47,7 @@ pandas==1.5.3
tenacity==8.1.0
# for standby test
etcd-sdk-python==0.0.4
deepdiff==6.7.1
# for test result anaylszer
prettytable==3.8.0

View File

@ -4,6 +4,7 @@ import json
from time import sleep
from pymilvus import connections
from chaos.checker import (InsertChecker,
UpsertChecker,
SearchChecker,
QueryChecker,
DeleteChecker,
@ -63,6 +64,7 @@ class TestOperations(TestBase):
c_name = collection_name
checkers = {
Op.insert: InsertChecker(collection_name=c_name),
Op.upsert: UpsertChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),