mirror of https://github.com/milvus-io/milvus.git
[test]Fix create index after load (#21260)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com> Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/21275/head
parent
4df91ddf60
commit
44cc62b81d
|
@ -1,8 +1,17 @@
|
|||
|
||||
release=${1:-"milvs-chaos"}
|
||||
milvus_mode=${2:-"cluster"}
|
||||
ns=${2:-"chaos-testing"}
|
||||
bash uninstall_milvus.sh ${release} ${ns}|| true
|
||||
|
||||
helm repo add milvus https://milvus-io.github.io/milvus-helm/
|
||||
helm repo update
|
||||
helm install --wait --timeout 360s ${release} milvus/milvus -f ../cluster-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns}
|
||||
if [[ ${milvus_mode} == "cluster" ]];
|
||||
then
|
||||
helm install --wait --timeout 360s ${release} milvus/milvus -f ../cluster-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns}
|
||||
fi
|
||||
|
||||
if [[ ${milvus_mode} == "standalone" ]];
|
||||
then
|
||||
helm install --wait --timeout 360s ${release} milvus/milvus -f ../standalone-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns}
|
||||
fi
|
|
@ -35,12 +35,18 @@ def task_2(data_zise, host):
|
|||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
import threading
|
||||
parser = argparse.ArgumentParser(description='config for deploy test')
|
||||
parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
|
||||
parser.add_argument('--data_size', type=int, default=3000, help='data size')
|
||||
args = parser.parse_args()
|
||||
host = args.host
|
||||
data_size = args.data_size
|
||||
print(f"data size: {data_size}")
|
||||
task_1(data_size, host)
|
||||
task_2(data_size, host)
|
||||
logger.info(f"data size: {data_size}")
|
||||
tasks = []
|
||||
tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
|
||||
tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
|
||||
for task in tasks:
|
||||
task.start()
|
||||
for task in tasks:
|
||||
task.join()
|
|
@ -91,22 +91,28 @@ def task_5(data_size, host):
|
|||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
import threading
|
||||
parser = argparse.ArgumentParser(description='config for deploy test')
|
||||
parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
|
||||
parser.add_argument('--data_size', type=int, default=3000, help='data size')
|
||||
args = parser.parse_args()
|
||||
data_size = args.data_size
|
||||
host = args.host
|
||||
print(f"data size: {data_size}")
|
||||
logger.info(f"data size: {data_size}")
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
ms = MilvusSys()
|
||||
# create index for flat
|
||||
print("create index for flat start")
|
||||
logger.info("create index for flat start")
|
||||
create_index_flat()
|
||||
print("create index for flat done")
|
||||
task_1(data_size, host)
|
||||
task_2(data_size, host)
|
||||
logger.info("create index for flat done")
|
||||
tasks = []
|
||||
tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
|
||||
tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
|
||||
if len(ms.query_nodes) >= NUM_REPLICAS:
|
||||
task_3(data_size, host)
|
||||
task_4(data_size, host)
|
||||
task_5(data_size, host)
|
||||
tasks.append(threading.Thread(target=task_3, args=(data_size, host)))
|
||||
tasks.append(threading.Thread(target=task_4, args=(data_size, host)))
|
||||
tasks.append(threading.Thread(target=task_5, args=(data_size, host)))
|
||||
for task in tasks:
|
||||
task.start()
|
||||
for task in tasks:
|
||||
task.join()
|
||||
|
|
|
@ -36,12 +36,18 @@ def task_2(data_size, host):
|
|||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
import threading
|
||||
parser = argparse.ArgumentParser(description='config for deploy test')
|
||||
parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
|
||||
parser.add_argument('--data_size', type=int, default=3000, help='data size')
|
||||
args = parser.parse_args()
|
||||
data_size = args.data_size
|
||||
host = args.host
|
||||
print(f"data_size: {data_size}")
|
||||
task_1(data_size, host)
|
||||
task_2(data_size, host)
|
||||
logger.info(f"data_size: {data_size}")
|
||||
tasks = []
|
||||
tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
|
||||
tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
|
||||
for task in tasks:
|
||||
task.start()
|
||||
for task in tasks:
|
||||
task.join()
|
|
@ -73,18 +73,25 @@ def task_5(data_size, host):
|
|||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
import threading
|
||||
parser = argparse.ArgumentParser(description='config for deploy test')
|
||||
parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
|
||||
parser.add_argument('--data_size', type=int, default=3000, help='data size')
|
||||
args = parser.parse_args()
|
||||
data_size = args.data_size
|
||||
host = args.host
|
||||
print(f"data size: {data_size}")
|
||||
logger.info(f"data size: {data_size}")
|
||||
connections.connect(host=host, port=19530, timeout=60)
|
||||
ms = MilvusSys()
|
||||
task_1(data_size, host)
|
||||
task_2(data_size, host)
|
||||
tasks = []
|
||||
tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
|
||||
tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
|
||||
if len(ms.query_nodes) >= NUM_REPLICAS:
|
||||
task_3(data_size, host)
|
||||
task_4(data_size, host)
|
||||
task_5(data_size, host)
|
||||
tasks.append(threading.Thread(target=task_3, args=(data_size, host)))
|
||||
tasks.append(threading.Thread(target=task_4, args=(data_size, host)))
|
||||
tasks.append(threading.Thread(target=task_5, args=(data_size, host)))
|
||||
for task in tasks:
|
||||
task.start()
|
||||
for task in tasks:
|
||||
task.join()
|
||||
|
|
@ -1,10 +1,18 @@
|
|||
import sys
|
||||
import copy
|
||||
import time
|
||||
from loguru import logger
|
||||
import pymilvus
|
||||
from pymilvus import (
|
||||
FieldSchema, CollectionSchema, DataType,
|
||||
Collection, list_collections,
|
||||
)
|
||||
logger.remove()
|
||||
logger.add(sys.stderr, format= "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
|
||||
"<level>{level: <8}</level> | "
|
||||
"<cyan>{thread.name}</cyan> |"
|
||||
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
|
||||
level="INFO")
|
||||
|
||||
pymilvus_version = pymilvus.__version__
|
||||
|
||||
|
@ -46,15 +54,15 @@ def gen_search_param(index_type, metric_type="L2"):
|
|||
annoy_search_param = {"metric_type": metric_type, "params": {"search_k": search_k}}
|
||||
search_params.append(annoy_search_param)
|
||||
else:
|
||||
print("Invalid index_type.")
|
||||
logger.info("Invalid index_type.")
|
||||
raise Exception("Invalid index_type.")
|
||||
return search_params
|
||||
|
||||
|
||||
def get_collections(prefix, check=False):
|
||||
print("\nList collections...")
|
||||
logger.info("\nList collections...")
|
||||
col_list = filter_collections_by_prefix(prefix)
|
||||
print(f"collections_nums: {len(col_list)}")
|
||||
logger.info(f"collections_nums: {len(col_list)}")
|
||||
# list entities if collections
|
||||
for name in col_list:
|
||||
c = Collection(name=name)
|
||||
|
@ -63,7 +71,7 @@ def get_collections(prefix, check=False):
|
|||
else:
|
||||
c.num_entities
|
||||
num_entities = c.num_entities
|
||||
print(f"{name}: {num_entities}")
|
||||
logger.info(f"{name}: {num_entities}")
|
||||
if check:
|
||||
assert num_entities >= 3000
|
||||
return col_list
|
||||
|
@ -80,11 +88,11 @@ def create_collections_and_insert_data(prefix, flush=True, count=3000, collectio
|
|||
]
|
||||
default_schema = CollectionSchema(fields=default_fields, description="test collection")
|
||||
for index_name in all_index_types[:collection_cnt]:
|
||||
print("\nCreate collection...")
|
||||
logger.info("\nCreate collection...")
|
||||
col_name = prefix + index_name
|
||||
collection = Collection(name=col_name, schema=default_schema)
|
||||
print(f"collection name: {col_name}")
|
||||
print(f"begin insert, count: {count} nb: {nb}")
|
||||
logger.info(f"collection name: {col_name}")
|
||||
logger.info(f"begin insert, count: {count} nb: {nb}")
|
||||
times = int(count // nb)
|
||||
total_time = 0.0
|
||||
vectors = [[random.random() for _ in range(dim)] for _ in range(count)]
|
||||
|
@ -98,22 +106,22 @@ def create_collections_and_insert_data(prefix, flush=True, count=3000, collectio
|
|||
]
|
||||
)
|
||||
end_time = time.time()
|
||||
print(f"[{j+1}/{times}] insert {nb} data, time: {end_time - start_time:.4f}")
|
||||
logger.info(f"[{j+1}/{times}] insert {nb} data, time: {end_time - start_time:.4f}")
|
||||
total_time += end_time - start_time
|
||||
|
||||
print(f"end insert, time: {total_time:.4f}")
|
||||
logger.info(f"end insert, time: {total_time:.4f}")
|
||||
if flush:
|
||||
print("Get collection entities")
|
||||
logger.info("Get collection entities")
|
||||
start_time = time.time()
|
||||
if pymilvus_version >= "2.2.0":
|
||||
collection.flush()
|
||||
else:
|
||||
collection.num_entities
|
||||
print(f"collection entities: {collection.num_entities}")
|
||||
logger.info(f"collection entities: {collection.num_entities}")
|
||||
end_time = time.time()
|
||||
print("Get collection entities time = %.4fs" % (end_time - start_time))
|
||||
print("\nList collections...")
|
||||
print(get_collections(prefix))
|
||||
logger.info("Get collection entities time = %.4fs" % (end_time - start_time))
|
||||
logger.info("\nList collections...")
|
||||
logger.info(get_collections(prefix))
|
||||
|
||||
|
||||
def create_index_flat():
|
||||
|
@ -124,25 +132,41 @@ def create_index_flat():
|
|||
for col_name in all_col_list:
|
||||
if "FLAT" in col_name and "task" in col_name and "IVF" not in col_name:
|
||||
col_list.append(col_name)
|
||||
print("\nCreate index for FLAT...")
|
||||
logger.info("\nCreate index for FLAT...")
|
||||
for col_name in col_list:
|
||||
c = Collection(name=col_name)
|
||||
print(c)
|
||||
logger.info(c)
|
||||
try:
|
||||
replicas = c.get_replicas()
|
||||
replica_number = len(replicas.groups)
|
||||
c.release()
|
||||
except Exception as e:
|
||||
replica_number = 0
|
||||
logger.info(e)
|
||||
t0 = time.time()
|
||||
c.create_index(field_name="float_vector", index_params=default_flat_index)
|
||||
print(f"create index time: {time.time() - t0:.4f}")
|
||||
logger.info(f"create index time: {time.time() - t0:.4f}")
|
||||
if replica_number > 0:
|
||||
c.load(replica_number=replica_number)
|
||||
|
||||
|
||||
def create_index(prefix):
|
||||
# create index
|
||||
default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"}
|
||||
col_list = get_collections(prefix)
|
||||
print("\nCreate index...")
|
||||
logger.info("\nCreate index...")
|
||||
for col_name in col_list:
|
||||
c = Collection(name=col_name)
|
||||
try:
|
||||
replicas = c.get_replicas()
|
||||
replica_number = len(replicas.groups)
|
||||
c.release()
|
||||
except Exception as e:
|
||||
replica_number = 0
|
||||
logger.info(e)
|
||||
index_name = col_name.replace(prefix, "")
|
||||
print(index_name)
|
||||
print(c)
|
||||
logger.info(index_name)
|
||||
logger.info(c)
|
||||
index = copy.deepcopy(default_index)
|
||||
index["index_type"] = index_name
|
||||
index["params"] = index_params_map[index_name]
|
||||
|
@ -150,45 +174,47 @@ def create_index(prefix):
|
|||
index["metric_type"] = "HAMMING"
|
||||
t0 = time.time()
|
||||
c.create_index(field_name="float_vector", index_params=index)
|
||||
print(f"create index time: {time.time() - t0:.4f}")
|
||||
logger.info(f"create index time: {time.time() - t0:.4f}")
|
||||
if replica_number > 0:
|
||||
c.load(replica_number=replica_number)
|
||||
|
||||
|
||||
def release_collection(prefix):
|
||||
col_list = get_collections(prefix)
|
||||
print("release collection")
|
||||
logger.info("release collection")
|
||||
for col_name in col_list:
|
||||
c = Collection(name=col_name)
|
||||
c.release()
|
||||
|
||||
|
||||
def load_and_search(prefix, replicas=1):
|
||||
print("search data starts")
|
||||
logger.info("search data starts")
|
||||
col_list = get_collections(prefix)
|
||||
for col_name in col_list:
|
||||
c = Collection(name=col_name)
|
||||
print(f"collection name: {col_name}")
|
||||
print("load collection")
|
||||
logger.info(f"collection name: {col_name}")
|
||||
logger.info("load collection")
|
||||
if replicas == 1:
|
||||
t0 = time.time()
|
||||
c.load()
|
||||
print(f"load time: {time.time() - t0:.4f}")
|
||||
logger.info(f"load time: {time.time() - t0:.4f}")
|
||||
if replicas > 1:
|
||||
print("release collection before load if replicas > 1")
|
||||
logger.info("release collection before load if replicas > 1")
|
||||
t0 = time.time()
|
||||
c.release()
|
||||
print(f"release time: {time.time() - t0:.4f}")
|
||||
logger.info(f"release time: {time.time() - t0:.4f}")
|
||||
t0 = time.time()
|
||||
c.load(replica_number=replicas)
|
||||
print(f"load time: {time.time() - t0:.4f}")
|
||||
print(c.get_replicas())
|
||||
logger.info(f"load time: {time.time() - t0:.4f}")
|
||||
logger.info(c.get_replicas())
|
||||
topK = 5
|
||||
vectors = [[1.0 for _ in range(128)] for _ in range(3000)]
|
||||
index_name = col_name.replace(prefix, "")
|
||||
search_params = gen_search_param(index_name)[0]
|
||||
print(search_params)
|
||||
logger.info(search_params)
|
||||
# search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
|
||||
start_time = time.time()
|
||||
print(f"\nSearch...")
|
||||
logger.info(f"\nSearch...")
|
||||
# define output_fields of search result
|
||||
v_search = vectors[:1]
|
||||
res = c.search(
|
||||
|
@ -200,22 +226,22 @@ def load_and_search(prefix, replicas=1):
|
|||
for hits in res:
|
||||
for hit in hits:
|
||||
# Get value of the random value field for search result
|
||||
print(hit, hit.entity.get("random_value"))
|
||||
logger.info(str(hits), hit.entity.get("random_value"))
|
||||
ids = hits.ids
|
||||
assert len(ids) == topK, f"get {len(ids)} results, but topK is {topK}"
|
||||
print(ids)
|
||||
logger.info(ids)
|
||||
assert len(res) == len(v_search), f"get {len(res)} results, but search num is {len(v_search)}"
|
||||
print("search latency: %.4fs" % (end_time - start_time))
|
||||
logger.info("search latency: %.4fs" % (end_time - start_time))
|
||||
t0 = time.time()
|
||||
expr = "count in [2,4,6,8]"
|
||||
output_fields = ["count", "random_value"]
|
||||
res = c.query(expr, output_fields, timeout=120)
|
||||
sorted_res = sorted(res, key=lambda k: k['count'])
|
||||
for r in sorted_res:
|
||||
print(r)
|
||||
logger.info(r)
|
||||
t1 = time.time()
|
||||
assert len(res) == 4
|
||||
print("query latency: %.4fs" % (t1 - t0))
|
||||
logger.info("query latency: %.4fs" % (t1 - t0))
|
||||
# c.release()
|
||||
print("###########")
|
||||
print("search data ends")
|
||||
logger.info("###########")
|
||||
logger.info("search data ends")
|
||||
|
|
|
@ -21,7 +21,7 @@ default_int64_field_name = ct.default_int64_field_name
|
|||
default_float_field_name = ct.default_float_field_name
|
||||
default_bool_field_name = ct.default_bool_field_name
|
||||
default_string_field_name = ct.default_string_field_name
|
||||
binary_field_name = default_binary_vec_field_name
|
||||
binary_field_name = ct.default_binary_vec_field_name
|
||||
default_search_exp = "int64 >= 0"
|
||||
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import pytest
|
||||
import re
|
||||
import time
|
||||
import pymilvus
|
||||
from common import common_func as cf
|
||||
from common import common_type as ct
|
||||
|
@ -20,7 +21,7 @@ default_int64_field_name = ct.default_int64_field_name
|
|||
default_float_field_name = ct.default_float_field_name
|
||||
default_bool_field_name = ct.default_bool_field_name
|
||||
default_string_field_name = ct.default_string_field_name
|
||||
binary_field_name = default_binary_vec_field_name
|
||||
binary_field_name = ct.default_binary_vec_field_name
|
||||
default_search_exp = "int64 >= 0"
|
||||
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
|
||||
|
||||
|
|
|
@ -39,3 +39,6 @@ minio==7.1.5
|
|||
|
||||
# for benchmark
|
||||
h5py==3.1.0
|
||||
|
||||
# for log
|
||||
loguru==0.5.3
|
Loading…
Reference in New Issue