diff --git a/tests/python_client/deploy/scripts/action_after_reinstall.py b/tests/python_client/deploy/scripts/action_after_reinstall.py
index 8a040e3cb9..cec0876991 100644
--- a/tests/python_client/deploy/scripts/action_after_reinstall.py
+++ b/tests/python_client/deploy/scripts/action_after_reinstall.py
@@ -35,12 +35,18 @@ def task_2(data_zise, host):
if __name__ == '__main__':
import argparse
+ import threading
parser = argparse.ArgumentParser(description='config for deploy test')
parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
parser.add_argument('--data_size', type=int, default=3000, help='data size')
args = parser.parse_args()
host = args.host
data_size = args.data_size
- print(f"data size: {data_size}")
- task_1(data_size, host)
- task_2(data_size, host)
\ No newline at end of file
+ logger.info(f"data size: {data_size}")
+ tasks = []
+ tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
+ tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
+ for task in tasks:
+ task.start()
+ for task in tasks:
+ task.join()
\ No newline at end of file
diff --git a/tests/python_client/deploy/scripts/action_after_upgrade.py b/tests/python_client/deploy/scripts/action_after_upgrade.py
index c08812bba2..32521cb4e7 100644
--- a/tests/python_client/deploy/scripts/action_after_upgrade.py
+++ b/tests/python_client/deploy/scripts/action_after_upgrade.py
@@ -91,22 +91,28 @@ def task_5(data_size, host):
if __name__ == '__main__':
import argparse
+ import threading
parser = argparse.ArgumentParser(description='config for deploy test')
parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
parser.add_argument('--data_size', type=int, default=3000, help='data size')
args = parser.parse_args()
data_size = args.data_size
host = args.host
- print(f"data size: {data_size}")
+ logger.info(f"data size: {data_size}")
connections.connect(host=host, port=19530, timeout=60)
ms = MilvusSys()
# create index for flat
- print("create index for flat start")
+ logger.info("create index for flat start")
create_index_flat()
- print("create index for flat done")
- task_1(data_size, host)
- task_2(data_size, host)
+ logger.info("create index for flat done")
+ tasks = []
+ tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
+ tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
if len(ms.query_nodes) >= NUM_REPLICAS:
- task_3(data_size, host)
- task_4(data_size, host)
- task_5(data_size, host)
+ tasks.append(threading.Thread(target=task_3, args=(data_size, host)))
+ tasks.append(threading.Thread(target=task_4, args=(data_size, host)))
+ tasks.append(threading.Thread(target=task_5, args=(data_size, host)))
+ for task in tasks:
+ task.start()
+ for task in tasks:
+ task.join()
diff --git a/tests/python_client/deploy/scripts/action_before_reinstall.py b/tests/python_client/deploy/scripts/action_before_reinstall.py
index 2a61942933..712664b2d4 100644
--- a/tests/python_client/deploy/scripts/action_before_reinstall.py
+++ b/tests/python_client/deploy/scripts/action_before_reinstall.py
@@ -36,12 +36,18 @@ def task_2(data_size, host):
if __name__ == '__main__':
import argparse
+ import threading
parser = argparse.ArgumentParser(description='config for deploy test')
parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
parser.add_argument('--data_size', type=int, default=3000, help='data size')
args = parser.parse_args()
data_size = args.data_size
host = args.host
- print(f"data_size: {data_size}")
- task_1(data_size, host)
- task_2(data_size, host)
+ logger.info(f"data_size: {data_size}")
+ tasks = []
+ tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
+ tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
+ for task in tasks:
+ task.start()
+ for task in tasks:
+ task.join()
\ No newline at end of file
diff --git a/tests/python_client/deploy/scripts/action_before_upgrade.py b/tests/python_client/deploy/scripts/action_before_upgrade.py
index 3d98d06a3d..da0baa4895 100644
--- a/tests/python_client/deploy/scripts/action_before_upgrade.py
+++ b/tests/python_client/deploy/scripts/action_before_upgrade.py
@@ -73,18 +73,25 @@ def task_5(data_size, host):
if __name__ == '__main__':
import argparse
+ import threading
parser = argparse.ArgumentParser(description='config for deploy test')
parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
parser.add_argument('--data_size', type=int, default=3000, help='data size')
args = parser.parse_args()
data_size = args.data_size
host = args.host
- print(f"data size: {data_size}")
+ logger.info(f"data size: {data_size}")
connections.connect(host=host, port=19530, timeout=60)
ms = MilvusSys()
- task_1(data_size, host)
- task_2(data_size, host)
+ tasks = []
+ tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
+ tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
if len(ms.query_nodes) >= NUM_REPLICAS:
- task_3(data_size, host)
- task_4(data_size, host)
- task_5(data_size, host)
\ No newline at end of file
+ tasks.append(threading.Thread(target=task_3, args=(data_size, host)))
+ tasks.append(threading.Thread(target=task_4, args=(data_size, host)))
+ tasks.append(threading.Thread(target=task_5, args=(data_size, host)))
+ for task in tasks:
+ task.start()
+ for task in tasks:
+ task.join()
+
\ No newline at end of file
diff --git a/tests/python_client/deploy/scripts/utils.py b/tests/python_client/deploy/scripts/utils.py
index a2f4d0d3c1..1370a93739 100644
--- a/tests/python_client/deploy/scripts/utils.py
+++ b/tests/python_client/deploy/scripts/utils.py
@@ -1,10 +1,18 @@
+import sys
import copy
import time
+from loguru import logger
import pymilvus
from pymilvus import (
FieldSchema, CollectionSchema, DataType,
Collection, list_collections,
)
+logger.remove()
+logger.add(sys.stderr, format= "{time:YYYY-MM-DD HH:mm:ss.SSS} | "
+ "{level: <8} | "
+ "{thread.name} |"
+ "{name}:{function}:{line} - {message}",
+ level="INFO")
pymilvus_version = pymilvus.__version__
@@ -46,15 +54,15 @@ def gen_search_param(index_type, metric_type="L2"):
annoy_search_param = {"metric_type": metric_type, "params": {"search_k": search_k}}
search_params.append(annoy_search_param)
else:
- print("Invalid index_type.")
+ logger.info("Invalid index_type.")
raise Exception("Invalid index_type.")
return search_params
def get_collections(prefix, check=False):
- print("\nList collections...")
+ logger.info("\nList collections...")
col_list = filter_collections_by_prefix(prefix)
- print(f"collections_nums: {len(col_list)}")
+ logger.info(f"collections_nums: {len(col_list)}")
# list entities if collections
for name in col_list:
c = Collection(name=name)
@@ -63,7 +71,7 @@ def get_collections(prefix, check=False):
else:
c.num_entities
num_entities = c.num_entities
- print(f"{name}: {num_entities}")
+ logger.info(f"{name}: {num_entities}")
if check:
assert num_entities >= 3000
return col_list
@@ -80,11 +88,11 @@ def create_collections_and_insert_data(prefix, flush=True, count=3000, collectio
]
default_schema = CollectionSchema(fields=default_fields, description="test collection")
for index_name in all_index_types[:collection_cnt]:
- print("\nCreate collection...")
+ logger.info("\nCreate collection...")
col_name = prefix + index_name
collection = Collection(name=col_name, schema=default_schema)
- print(f"collection name: {col_name}")
- print(f"begin insert, count: {count} nb: {nb}")
+ logger.info(f"collection name: {col_name}")
+ logger.info(f"begin insert, count: {count} nb: {nb}")
times = int(count // nb)
total_time = 0.0
vectors = [[random.random() for _ in range(dim)] for _ in range(count)]
@@ -98,22 +106,22 @@ def create_collections_and_insert_data(prefix, flush=True, count=3000, collectio
]
)
end_time = time.time()
- print(f"[{j+1}/{times}] insert {nb} data, time: {end_time - start_time:.4f}")
+ logger.info(f"[{j+1}/{times}] insert {nb} data, time: {end_time - start_time:.4f}")
total_time += end_time - start_time
- print(f"end insert, time: {total_time:.4f}")
+ logger.info(f"end insert, time: {total_time:.4f}")
if flush:
- print("Get collection entities")
+ logger.info("Get collection entities")
start_time = time.time()
if pymilvus_version >= "2.2.0":
collection.flush()
else:
collection.num_entities
- print(f"collection entities: {collection.num_entities}")
+ logger.info(f"collection entities: {collection.num_entities}")
end_time = time.time()
- print("Get collection entities time = %.4fs" % (end_time - start_time))
- print("\nList collections...")
- print(get_collections(prefix))
+ logger.info("Get collection entities time = %.4fs" % (end_time - start_time))
+ logger.info("\nList collections...")
+ logger.info(get_collections(prefix))
def create_index_flat():
@@ -124,25 +132,41 @@ def create_index_flat():
for col_name in all_col_list:
if "FLAT" in col_name and "task" in col_name and "IVF" not in col_name:
col_list.append(col_name)
- print("\nCreate index for FLAT...")
+ logger.info("\nCreate index for FLAT...")
for col_name in col_list:
c = Collection(name=col_name)
- print(c)
+ logger.info(c)
+ try:
+ replicas = c.get_replicas()
+ replica_number = len(replicas.groups)
+ c.release()
+ except Exception as e:
+ replica_number = 0
+ logger.info(e)
t0 = time.time()
c.create_index(field_name="float_vector", index_params=default_flat_index)
- print(f"create index time: {time.time() - t0:.4f}")
+ logger.info(f"create index time: {time.time() - t0:.4f}")
+ if replica_number > 0:
+ c.load(replica_number=replica_number)
def create_index(prefix):
# create index
default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"}
col_list = get_collections(prefix)
- print("\nCreate index...")
+ logger.info("\nCreate index...")
for col_name in col_list:
c = Collection(name=col_name)
+ try:
+ replicas = c.get_replicas()
+ replica_number = len(replicas.groups)
+ c.release()
+ except Exception as e:
+ replica_number = 0
+ logger.info(e)
index_name = col_name.replace(prefix, "")
- print(index_name)
- print(c)
+ logger.info(index_name)
+ logger.info(c)
index = copy.deepcopy(default_index)
index["index_type"] = index_name
index["params"] = index_params_map[index_name]
@@ -150,45 +174,47 @@ def create_index(prefix):
index["metric_type"] = "HAMMING"
t0 = time.time()
c.create_index(field_name="float_vector", index_params=index)
- print(f"create index time: {time.time() - t0:.4f}")
+ logger.info(f"create index time: {time.time() - t0:.4f}")
+ if replica_number > 0:
+ c.load(replica_number=replica_number)
def release_collection(prefix):
col_list = get_collections(prefix)
- print("release collection")
+ logger.info("release collection")
for col_name in col_list:
c = Collection(name=col_name)
c.release()
def load_and_search(prefix, replicas=1):
- print("search data starts")
+ logger.info("search data starts")
col_list = get_collections(prefix)
for col_name in col_list:
c = Collection(name=col_name)
- print(f"collection name: {col_name}")
- print("load collection")
+ logger.info(f"collection name: {col_name}")
+ logger.info("load collection")
if replicas == 1:
t0 = time.time()
c.load()
- print(f"load time: {time.time() - t0:.4f}")
+ logger.info(f"load time: {time.time() - t0:.4f}")
if replicas > 1:
- print("release collection before load if replicas > 1")
+ logger.info("release collection before load if replicas > 1")
t0 = time.time()
c.release()
- print(f"release time: {time.time() - t0:.4f}")
+ logger.info(f"release time: {time.time() - t0:.4f}")
t0 = time.time()
c.load(replica_number=replicas)
- print(f"load time: {time.time() - t0:.4f}")
- print(c.get_replicas())
+ logger.info(f"load time: {time.time() - t0:.4f}")
+ logger.info(c.get_replicas())
topK = 5
vectors = [[1.0 for _ in range(128)] for _ in range(3000)]
index_name = col_name.replace(prefix, "")
search_params = gen_search_param(index_name)[0]
- print(search_params)
+ logger.info(search_params)
# search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
start_time = time.time()
- print(f"\nSearch...")
+ logger.info(f"\nSearch...")
# define output_fields of search result
v_search = vectors[:1]
res = c.search(
@@ -200,22 +226,22 @@ def load_and_search(prefix, replicas=1):
for hits in res:
for hit in hits:
# Get value of the random value field for search result
- print(hit, hit.entity.get("random_value"))
+ logger.info(str(hits), hit.entity.get("random_value"))
ids = hits.ids
assert len(ids) == topK, f"get {len(ids)} results, but topK is {topK}"
- print(ids)
+ logger.info(ids)
assert len(res) == len(v_search), f"get {len(res)} results, but search num is {len(v_search)}"
- print("search latency: %.4fs" % (end_time - start_time))
+ logger.info("search latency: %.4fs" % (end_time - start_time))
t0 = time.time()
expr = "count in [2,4,6,8]"
output_fields = ["count", "random_value"]
- res = c.query(expr, output_fields, timeout=20)
+ res = c.query(expr, output_fields, timeout=120)
sorted_res = sorted(res, key=lambda k: k['count'])
for r in sorted_res:
- print(r)
+ logger.info(r)
t1 = time.time()
assert len(res) == 4
- print("query latency: %.4fs" % (t1 - t0))
+ logger.info("query latency: %.4fs" % (t1 - t0))
# c.release()
- print("###########")
- print("search data ends")
+ logger.info("###########")
+ logger.info("search data ends")
diff --git a/tests/python_client/deploy/testcases/test_action_first_deployment.py b/tests/python_client/deploy/testcases/test_action_first_deployment.py
index 3150b90cc3..96deef4cff 100644
--- a/tests/python_client/deploy/testcases/test_action_first_deployment.py
+++ b/tests/python_client/deploy/testcases/test_action_first_deployment.py
@@ -21,13 +21,13 @@ default_int64_field_name = ct.default_int64_field_name
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
-binary_field_name = default_binary_vec_field_name
+binary_field_name = ct.default_binary_vec_field_name
default_search_exp = "int64 >= 0"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
prefix = "deploy_test"
-TIMEOUT = 60
+TIMEOUT = 120
class TestActionFirstDeployment(TestDeployBase):
""" Test case of action before reinstall """
@@ -39,13 +39,14 @@ class TestActionFirstDeployment(TestDeployBase):
log.info("skip drop collection")
@pytest.mark.tags(CaseLabel.L3)
- @pytest.mark.parametrize("index_type", ["HNSW","BIN_IVF_FLAT"])
- def test_task_all_empty(self,index_type):
+ @pytest.mark.parametrize("replica_number", [0])
+ @pytest.mark.parametrize("index_type", ["HNSW", "BIN_IVF_FLAT"])
+ def test_task_all_empty(self, index_type, replica_number):
"""
before reinstall: create collection
"""
name = ""
- for k,v in locals().items():
+ for k, v in locals().items():
if k in ["self", "name"]:
continue
name += f"_{k}_{v}"
@@ -59,9 +60,6 @@ class TestActionFirstDeployment(TestDeployBase):
for index_name in index_names:
collection_w.drop_index(index_name=index_name)
-
-
-
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("replica_number", [0, 1, 2])
@pytest.mark.parametrize("is_compacted", ["is_compacted", "not_compacted"])
@@ -131,6 +129,7 @@ class TestActionFirstDeployment(TestDeployBase):
log.error(
f"release collection failed: {e} maybe the collection is not loaded")
collection_w.load(replica_number=replica_number, timeout=TIMEOUT)
+ self.utility_wrap.wait_for_loading_complete(name)
# delete data for growing segment
delete_expr = f"{ct.default_int64_field_name} in {[i for i in range(0,10)]}"
@@ -181,6 +180,7 @@ class TestActionFirstDeployment(TestDeployBase):
if replica_number > 0:
collection_w.release()
collection_w.load(replica_number=replica_number, timeout=TIMEOUT)
+ self.utility_wrap.wait_for_loading_complete(name)
# insert data to get growing segment after reload
if segment_status == "all":
diff --git a/tests/python_client/deploy/testcases/test_action_second_deployment.py b/tests/python_client/deploy/testcases/test_action_second_deployment.py
index f1caa21d6d..63288ed955 100644
--- a/tests/python_client/deploy/testcases/test_action_second_deployment.py
+++ b/tests/python_client/deploy/testcases/test_action_second_deployment.py
@@ -1,4 +1,6 @@
import pytest
+import re
+import time
import pymilvus
from common import common_func as cf
from common import common_type as ct
@@ -19,7 +21,7 @@ default_int64_field_name = ct.default_int64_field_name
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
-binary_field_name = default_binary_vec_field_name
+binary_field_name = ct.default_binary_vec_field_name
default_search_exp = "int64 >= 0"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
@@ -41,28 +43,19 @@ class TestActionSecondDeployment(TestDeployBase):
method.__name__)
log.info("show collection info")
log.info(f"collection {self.collection_w.name} has entities: {self.collection_w.num_entities}")
- try:
- replicas = self.collection_w.get_replicas(enable_traceback=False)
- replicas_loaded = len(replicas.groups)
- except Exception as e:
- log.info("get replicas failed with error {str(e)}")
- replicas_loaded = 0
- log.info(f"collection {self.collection_w.name} has {replicas_loaded} replicas")
+
+ res, _ = self.utility_wrap.get_query_segment_info(self.collection_w.name)
+ log.info(f"The segment info of collection {self.collection_w.name} is {res}")
+
index_infos = [index.to_dict() for index in self.collection_w.indexes]
log.info(f"collection {self.collection_w.name} index infos {index_infos}")
log.info("skip drop collection")
def create_index(self, collection_w, default_index_field, default_index_param):
- try:
- replicas = collection_w.get_replicas(enable_traceback=False)
- replicas_loaded = len(replicas.groups)
- except Exception as e:
- log.info("get replicas failed")
- replicas_loaded = 0
- log.info(f"before create index, collection {collection_w.name} has {replicas_loaded} replicas")
+
index_field_map = dict([(index.field_name, index.index_name) for index in collection_w.indexes])
index_infos = [index.to_dict() for index in collection_w.indexes]
- log.info(index_infos)
+ log.info(f"index info: {index_infos}")
# log.info(f"{default_index_field:} {default_index_param:}")
if len(index_infos) > 0:
log.info(
@@ -107,14 +100,23 @@ class TestActionSecondDeployment(TestDeployBase):
vector_index_types = binary_vector_index_types + float_vector_index_types
if len(vector_index_types) > 0:
vector_index_type = vector_index_types[0]
+ try:
+ t0 = time.time()
+ self.utility_wrap.wait_for_loading_complete(name)
+ log.info(f"wait for {name} loading complete cost {time.time() - t0}")
+ except Exception as e:
+ log.error(e)
# get replicas loaded
try:
replicas = collection_w.get_replicas(enable_traceback=False)
replicas_loaded = len(replicas.groups)
except Exception as e:
- log.info(f"get replicas failed with error {str(e)}")
+ log.error(e)
replicas_loaded = 0
+
log.info(f"collection {name} has {replicas_loaded} replicas")
+ actual_replicas = re.search(r'replica_number_(.*?)_', name).group(1)
+ assert replicas_loaded == int(actual_replicas)
# params for search and query
if is_binary:
_, vectors_to_search = cf.gen_binary_vectors(
diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt
index 3e37346e18..996be58380 100644
--- a/tests/python_client/requirements.txt
+++ b/tests/python_client/requirements.txt
@@ -39,3 +39,6 @@ minio==7.1.5
# for benchmark
h5py==3.1.0
+
+# for log
+loguru==0.5.3
\ No newline at end of file