mirror of https://github.com/milvus-io/milvus.git
[test][skip-e2e] Add counter decorater and scale query multi replicas test (#16845)
Signed-off-by: ThreadDao <yufen.zong@zilliz.com>pull/16852/head
parent
62658dcda6
commit
8945b80a64
|
@ -2,7 +2,7 @@
|
|||
String cron_timezone = 'TZ=Asia/Shanghai'
|
||||
String cron_string = BRANCH_NAME == "master" ? "05 21 * * * " : ""
|
||||
|
||||
int total_timeout_minutes = 60
|
||||
int total_timeout_minutes = 90
|
||||
|
||||
// pipeline
|
||||
pipeline {
|
||||
|
|
|
@ -15,6 +15,9 @@ spec:
|
|||
deletionPolicy: Delete
|
||||
pvcDeletion: true
|
||||
values:
|
||||
proxy:
|
||||
configData:
|
||||
httpNumThreads: "100"
|
||||
image:
|
||||
broker:
|
||||
tag: 2.8.2
|
||||
|
@ -26,18 +29,19 @@ spec:
|
|||
tag: 2.8.2
|
||||
bastion:
|
||||
tag: 2.8.2
|
||||
|
||||
zookeeper:
|
||||
configData:
|
||||
PULSAR_MEM: "\"-Xms1024m -Xmx1024m -Dcom.sun.management.jmxremote -Djute.maxbuffer=10485760DoEscapeAnalysis -XX:+DisableExplicitGC -XX:+PerfDisableSharedMem -Dzookeeper.forceSync=no\""
|
||||
|
||||
bookkeeper:
|
||||
configData:
|
||||
BOOKIE_MEM: "\"-Xms4096m -Xmx4096m -XX:MaxDirectMemorySize=8192m -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.linkCapacity=1024 -XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError -XX:+PerfDisableSharedMem -verbosegc\""
|
||||
# zookeeper:
|
||||
# configData:
|
||||
# PULSAR_MEM: "-Xms1024m -Xmx1024m -Dcom.sun.management.jmxremote -Djute.maxbuffer=10485760DoEscapeAnalysis -XX:+DisableExplicitGC -XX:+PerfDisableSharedMem -Dzookeeper.forceSync=no"
|
||||
#
|
||||
# bookkeeper:
|
||||
# configData:
|
||||
# BOOKIE_MEM: "-Xms4096m -Xmx4096m -XX:MaxDirectMemorySize=8192m -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.linkCapacity=1024 -XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError -XX:+PerfDisableSharedMem -verbosegc"
|
||||
storage:
|
||||
inCluster:
|
||||
deletionPolicy: Delete
|
||||
pvcDeletion: true
|
||||
config: {}
|
||||
config:
|
||||
log:
|
||||
level: debug
|
||||
components: {}
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
import os
|
||||
|
||||
from pymilvus import connections, Index
|
||||
from pymilvus import connections, Index, MilvusException
|
||||
|
||||
from utils.util_log import test_log as log
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
|
@ -9,6 +9,7 @@ from common import common_type as ct
|
|||
|
||||
|
||||
def e2e_milvus(host, c_name):
|
||||
""" e2e milvus """
|
||||
log.debug(f'pid: {os.getpid()}')
|
||||
# connect
|
||||
connections.add_connection(default={"host": host, "port": 19530})
|
||||
|
@ -41,4 +42,12 @@ def e2e_milvus(host, c_name):
|
|||
ids = search_res[0].ids[0]
|
||||
term_expr = f'{ct.default_int64_field_name} in [{ids}]'
|
||||
query_res, _ = collection_w.query(term_expr, output_fields=["*", "%"])
|
||||
assert query_res[0][ct.default_int64_field_name] == ids
|
||||
assert query_res[0][ct.default_int64_field_name] == ids
|
||||
|
||||
|
||||
def check_succ_rate(func_obj):
|
||||
""" check func succ rate"""
|
||||
log.debug(f"{func_obj.name} total: {func_obj.total}, succ: {func_obj.succ}, fail: {func_obj.fail}")
|
||||
if func_obj.total == 0:
|
||||
raise MilvusException(0, f"{func_obj.name} request total 0")
|
||||
assert func_obj.fail == 0 and func_obj.succ // func_obj.total == 1
|
||||
|
|
|
@ -7,16 +7,12 @@ from base.collection_wrapper import ApiCollectionWrapper
|
|||
from common.common_type import CaseLabel
|
||||
from common import common_func as cf
|
||||
from customize.milvus_operator import MilvusOperator
|
||||
from scale import constants
|
||||
from pymilvus import connections
|
||||
from scale import constants, scale_common
|
||||
from pymilvus import connections, MilvusException
|
||||
from utils.util_log import test_log as log
|
||||
from utils.util_k8s import wait_pods_ready, read_pod_log
|
||||
from utils.util_pymilvus import get_latest_tag
|
||||
|
||||
prefix = "data_scale"
|
||||
default_schema = cf.gen_default_collection_schema()
|
||||
default_search_exp = "int64 >= 0"
|
||||
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
|
||||
from utils.wrapper import counter
|
||||
|
||||
|
||||
class TestDataNodeScale:
|
||||
|
@ -38,7 +34,6 @@ class TestDataNodeScale:
|
|||
release_name = "scale-data"
|
||||
image_tag = get_latest_tag()
|
||||
image = f'{constants.IMAGE_REPOSITORY}:{image_tag}'
|
||||
fail_count = 0
|
||||
|
||||
data_config = {
|
||||
'metadata.namespace': constants.NAMESPACE,
|
||||
|
@ -51,12 +46,10 @@ class TestDataNodeScale:
|
|||
}
|
||||
mic = MilvusOperator()
|
||||
mic.install(data_config)
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200):
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
|
||||
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
|
||||
else:
|
||||
# log.warning(f'Deploy {release_name} timeout and ready to uninstall')
|
||||
# mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1200s')
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1800s')
|
||||
|
||||
try:
|
||||
# connect
|
||||
|
@ -65,20 +58,24 @@ class TestDataNodeScale:
|
|||
|
||||
# create
|
||||
c_name = cf.gen_unique_str("scale_query")
|
||||
# c_name = 'scale_query_DymS7kI4'
|
||||
collection_w = ApiCollectionWrapper()
|
||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=5)
|
||||
|
||||
tmp_nb = 10000
|
||||
|
||||
@counter
|
||||
def do_insert():
|
||||
while True:
|
||||
tmp_df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(tmp_df)
|
||||
log.debug(collection_w.num_entities)
|
||||
""" do insert and flush """
|
||||
insert_res, is_succ = collection_w.insert(cf.gen_default_dataframe_data(tmp_nb))
|
||||
log.debug(collection_w.num_entities)
|
||||
return insert_res, is_succ
|
||||
|
||||
t_insert = threading.Thread(target=do_insert, args=(), daemon=True)
|
||||
t_insert.start()
|
||||
def loop_insert():
|
||||
""" loop do insert """
|
||||
while True:
|
||||
do_insert()
|
||||
|
||||
threading.Thread(target=loop_insert, args=(), daemon=True).start()
|
||||
|
||||
# scale dataNode to 5
|
||||
mic.upgrade(release_name, {'spec.components.dataNode.replicas': 5}, constants.NAMESPACE)
|
||||
|
@ -91,14 +88,19 @@ class TestDataNodeScale:
|
|||
collection_w_new = ApiCollectionWrapper()
|
||||
collection_w_new.init_collection(name=new_c_name, schema=cf.gen_default_collection_schema(), shards_num=2)
|
||||
|
||||
@counter
|
||||
def do_new_insert():
|
||||
while True:
|
||||
tmp_df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w_new.insert(tmp_df)
|
||||
log.debug(collection_w_new.num_entities)
|
||||
""" do new insert """
|
||||
insert_res, is_succ = collection_w_new.insert(cf.gen_default_dataframe_data(tmp_nb))
|
||||
log.debug(collection_w_new.num_entities)
|
||||
return insert_res, is_succ
|
||||
|
||||
t_insert_new = threading.Thread(target=do_new_insert, args=(), daemon=True)
|
||||
t_insert_new.start()
|
||||
def loop_new_insert():
|
||||
""" loop new insert """
|
||||
while True:
|
||||
do_new_insert()
|
||||
|
||||
threading.Thread(target=loop_new_insert, args=(), daemon=True).start()
|
||||
|
||||
# scale dataNode to 3
|
||||
mic.upgrade(release_name, {'spec.components.dataNode.replicas': 3}, constants.NAMESPACE)
|
||||
|
@ -107,16 +109,15 @@ class TestDataNodeScale:
|
|||
|
||||
log.debug(collection_w.num_entities)
|
||||
time.sleep(300)
|
||||
scale_common.check_succ_rate(do_insert)
|
||||
scale_common.check_succ_rate(do_new_insert)
|
||||
log.debug("Shrink dataNode test finished")
|
||||
|
||||
except Exception as e:
|
||||
log.error(str(e))
|
||||
fail_count += 1
|
||||
# raise Exception(str(e))
|
||||
|
||||
finally:
|
||||
log.info(f'Test finished with {fail_count} fail request')
|
||||
assert fail_count <= 1
|
||||
label = f"app.kubernetes.io/instance={release_name}"
|
||||
log.info('Start to export milvus pod logs')
|
||||
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
import datetime
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from pymilvus import connections
|
||||
from pymilvus import connections, MilvusException
|
||||
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
from common.common_type import CaseLabel
|
||||
|
@ -20,6 +19,7 @@ default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params":
|
|||
|
||||
class TestIndexNodeScale:
|
||||
|
||||
@pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/16832")
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_expand_index_node(self):
|
||||
"""
|
||||
|
@ -47,12 +47,13 @@ class TestIndexNodeScale:
|
|||
}
|
||||
mic = MilvusOperator()
|
||||
mic.install(data_config)
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200):
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
|
||||
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
|
||||
else:
|
||||
# If deploy failed and want to uninsatll mic
|
||||
# log.warning(f'Deploy {release_name} timeout and ready to uninstall')
|
||||
# mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1200s')
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1800s')
|
||||
|
||||
try:
|
||||
# connect
|
||||
|
@ -114,6 +115,7 @@ class TestIndexNodeScale:
|
|||
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
|
||||
mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
||||
|
||||
@pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/16832")
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_shrink_index_node(self):
|
||||
"""
|
||||
|
@ -139,12 +141,10 @@ class TestIndexNodeScale:
|
|||
}
|
||||
mic = MilvusOperator()
|
||||
mic.install(data_config)
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200):
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
|
||||
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
|
||||
else:
|
||||
# log.warning(f'Deploy {release_name} timeout and ready to uninstall')
|
||||
# mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1200s')
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1800s')
|
||||
|
||||
try:
|
||||
# connect
|
||||
|
@ -175,7 +175,7 @@ class TestIndexNodeScale:
|
|||
collection_w.drop_index()
|
||||
assert not collection_w.has_index()[0]
|
||||
|
||||
# expand indexNode from 2 to 1
|
||||
# shrink indexNode from 2 to 1
|
||||
mic.upgrade(release_name, {'spec.components.indexNode.replicas': 1}, constants.NAMESPACE)
|
||||
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
||||
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import multiprocessing
|
||||
|
||||
import pytest
|
||||
from pymilvus import MilvusException
|
||||
|
||||
from customize.milvus_operator import MilvusOperator
|
||||
from common import common_func as cf
|
||||
from common.common_type import CaseLabel
|
||||
|
@ -9,21 +11,21 @@ from utils.util_log import test_log as log
|
|||
from utils.util_k8s import wait_pods_ready, read_pod_log
|
||||
from utils.util_pymilvus import get_latest_tag
|
||||
|
||||
prefix = "proxy_scale"
|
||||
|
||||
def e2e_milvus_parallel(process_num, host, c_name):
|
||||
""" e2e milvus """
|
||||
process_list = []
|
||||
for i in range(process_num):
|
||||
p = multiprocessing.Process(target=sc.e2e_milvus, args=(host, c_name))
|
||||
p.start()
|
||||
process_list.append(p)
|
||||
|
||||
for p in process_list:
|
||||
p.join()
|
||||
|
||||
|
||||
class TestProxyScale:
|
||||
|
||||
def e2e_milvus_parallel(self, process_num, host, c_name):
|
||||
process_list = []
|
||||
for i in range(process_num):
|
||||
p = multiprocessing.Process(target=sc.e2e_milvus, args=(host, c_name))
|
||||
p.start()
|
||||
process_list.append(p)
|
||||
|
||||
for p in process_list:
|
||||
p.join()
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_scale_proxy(self):
|
||||
"""
|
||||
|
@ -52,16 +54,14 @@ class TestProxyScale:
|
|||
}
|
||||
mic = MilvusOperator()
|
||||
mic.install(data_config)
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200):
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
|
||||
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
|
||||
else:
|
||||
# log.warning(f'Deploy {release_name} timeout and ready to uninstall')
|
||||
# mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1200s')
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1800s')
|
||||
|
||||
try:
|
||||
c_name = cf.gen_unique_str(prefix)
|
||||
self.e2e_milvus_parallel(5, host, c_name)
|
||||
c_name = cf.gen_unique_str("proxy_scale")
|
||||
e2e_milvus_parallel(5, host, c_name)
|
||||
log.info('Milvus test before expand')
|
||||
|
||||
# expand proxy replicas from 1 to 5
|
||||
|
@ -69,7 +69,7 @@ class TestProxyScale:
|
|||
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
||||
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||
|
||||
self.e2e_milvus_parallel(5, host, c_name)
|
||||
e2e_milvus_parallel(5, host, c_name)
|
||||
log.info('Milvus test after expand')
|
||||
|
||||
# expand proxy replicas from 5 to 2
|
||||
|
@ -77,7 +77,7 @@ class TestProxyScale:
|
|||
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
||||
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||
|
||||
self.e2e_milvus_parallel(2, host, c_name)
|
||||
e2e_milvus_parallel(2, host, c_name)
|
||||
log.info('Milvus test after shrink')
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
@ -1,24 +1,24 @@
|
|||
import random
|
||||
import threading
|
||||
import time
|
||||
from functools import reduce
|
||||
|
||||
import pytest
|
||||
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
from common.common_type import CaseLabel
|
||||
from base.utility_wrapper import ApiUtilityWrapper
|
||||
from common.common_type import CaseLabel, CheckTasks
|
||||
from customize.milvus_operator import MilvusOperator
|
||||
from common import common_func as cf
|
||||
from common import common_type as ct
|
||||
from scale import constants
|
||||
from pymilvus import Index, connections
|
||||
from scale import constants, scale_common
|
||||
from pymilvus import Index, connections, MilvusException
|
||||
from utils.util_log import test_log as log
|
||||
from utils.util_k8s import wait_pods_ready, read_pod_log
|
||||
from utils.util_pymilvus import get_latest_tag
|
||||
from utils.wrapper import counter
|
||||
|
||||
prefix = "search_scale"
|
||||
nb = 5000
|
||||
nq = 5
|
||||
default_schema = cf.gen_default_collection_schema()
|
||||
default_search_exp = "int64 >= 0"
|
||||
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
|
||||
|
||||
|
||||
|
@ -36,7 +36,6 @@ class TestQueryNodeScale:
|
|||
6.shrink queryNode from 5 to 3
|
||||
expected: Verify milvus remains healthy and search successfully during scale
|
||||
"""
|
||||
fail_count = 0
|
||||
release_name = "scale-query"
|
||||
image_tag = get_latest_tag()
|
||||
image = f'{constants.IMAGE_REPOSITORY}:{image_tag}'
|
||||
|
@ -51,12 +50,10 @@ class TestQueryNodeScale:
|
|||
}
|
||||
mic = MilvusOperator()
|
||||
mic.install(query_config)
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200):
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
|
||||
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
|
||||
else:
|
||||
# log.warning(f'Deploy {release_name} timeout and ready to uninstall')
|
||||
# mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1200s')
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1800s')
|
||||
|
||||
try:
|
||||
# connect
|
||||
|
@ -87,29 +84,37 @@ class TestQueryNodeScale:
|
|||
# scale queryNode to 5
|
||||
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 5}, constants.NAMESPACE)
|
||||
|
||||
# continuously search
|
||||
@counter
|
||||
def do_search():
|
||||
while True:
|
||||
search_res, _ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
|
||||
ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
log.debug(search_res[0].ids)
|
||||
assert len(search_res[0].ids) == ct.default_limit
|
||||
""" do search """
|
||||
search_res, is_succ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
|
||||
ct.default_float_vec_field_name, ct.default_search_params,
|
||||
ct.default_limit, check_task=CheckTasks.check_nothing)
|
||||
assert len(search_res) == 1
|
||||
return search_res, is_succ
|
||||
|
||||
t_search = threading.Thread(target=do_search, args=(), daemon=True)
|
||||
t_search.start()
|
||||
def loop_search():
|
||||
""" continuously search """
|
||||
while True:
|
||||
do_search()
|
||||
|
||||
threading.Thread(target=loop_search, args=(), daemon=True).start()
|
||||
|
||||
# wait new QN running, continuously insert
|
||||
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
||||
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||
|
||||
@counter
|
||||
def do_insert():
|
||||
while True:
|
||||
tmp_df = cf.gen_default_dataframe_data(1000)
|
||||
collection_w.insert(tmp_df)
|
||||
""" do insert """
|
||||
return collection_w.insert(cf.gen_default_dataframe_data(1000), check_task=CheckTasks.check_nothing)
|
||||
|
||||
t_insert = threading.Thread(target=do_insert, args=(), daemon=True)
|
||||
t_insert.start()
|
||||
def loop_insert():
|
||||
""" loop insert """
|
||||
while True:
|
||||
do_insert()
|
||||
|
||||
threading.Thread(target=loop_insert, args=(), daemon=True).start()
|
||||
|
||||
log.debug(collection_w.num_entities)
|
||||
time.sleep(20)
|
||||
|
@ -121,16 +126,181 @@ class TestQueryNodeScale:
|
|||
|
||||
log.debug(collection_w.num_entities)
|
||||
time.sleep(60)
|
||||
scale_common.check_succ_rate(do_search)
|
||||
scale_common.check_succ_rate(do_insert)
|
||||
log.debug("Shrink querynode test finished")
|
||||
|
||||
except Exception as e:
|
||||
log.error(str(e))
|
||||
fail_count += 1
|
||||
# raise Exception(str(e))
|
||||
raise Exception(str(e))
|
||||
|
||||
finally:
|
||||
label = f"app.kubernetes.io/instance={release_name}"
|
||||
log.info('Start to export milvus pod logs')
|
||||
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
|
||||
mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
||||
|
||||
def test_scale_query_node_replicas(self):
|
||||
"""
|
||||
target: test scale out querynode when load multi replicas
|
||||
method: 1.Deploy cluster with 5 querynodes
|
||||
2.Create collection with 2 shards
|
||||
3.Insert 10 segments and flushed
|
||||
4.Load collection with 2 replicas
|
||||
5.Scale out querynode from 5 to 6 while search and insert growing data
|
||||
expected: Verify search succ rate is 100%
|
||||
"""
|
||||
release_name = "scale-replica"
|
||||
image_tag = get_latest_tag()
|
||||
image = f'{constants.IMAGE_REPOSITORY}:{image_tag}'
|
||||
query_config = {
|
||||
'metadata.namespace': constants.NAMESPACE,
|
||||
'metadata.name': release_name,
|
||||
'spec.components.image': image,
|
||||
'spec.components.proxy.serviceType': 'LoadBalancer',
|
||||
'spec.components.queryNode.replicas': 5,
|
||||
'spec.config.dataCoord.enableCompaction': True,
|
||||
'spec.config.dataCoord.enableGarbageCollection': True
|
||||
}
|
||||
mic = MilvusOperator()
|
||||
mic.install(query_config)
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
|
||||
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
|
||||
else:
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1800s')
|
||||
|
||||
try:
|
||||
scale_querynode = random.choice([6, 7, 4, 3])
|
||||
connections.connect("scale-replica", host=host, port=19530)
|
||||
|
||||
collection_w = ApiCollectionWrapper()
|
||||
collection_w.init_collection(name=cf.gen_unique_str("scale_out"), schema=cf.gen_default_collection_schema())
|
||||
|
||||
# insert 10 sealed segments
|
||||
for i in range(5):
|
||||
df = cf.gen_default_dataframe_data(start=i * ct.default_nb)
|
||||
collection_w.insert(df)
|
||||
assert collection_w.num_entities == (i + 1) * ct.default_nb
|
||||
|
||||
collection_w.load(replica_number=2)
|
||||
|
||||
@counter
|
||||
def do_search():
|
||||
""" do search """
|
||||
search_res, is_succ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
|
||||
ct.default_float_vec_field_name, ct.default_search_params,
|
||||
ct.default_limit, check_task=CheckTasks.check_nothing)
|
||||
assert len(search_res) == 1
|
||||
return search_res, is_succ
|
||||
|
||||
def loop_search():
|
||||
""" continuously search """
|
||||
while True:
|
||||
do_search()
|
||||
|
||||
threading.Thread(target=loop_search, args=(), daemon=True).start()
|
||||
|
||||
# scale out
|
||||
mic.upgrade(release_name, {'spec.components.queryNode.replicas': scale_querynode}, constants.NAMESPACE)
|
||||
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
||||
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||
log.debug("Scale out querynode success")
|
||||
|
||||
time.sleep(100)
|
||||
scale_common.check_succ_rate(do_search)
|
||||
log.debug("Scale out test finished")
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(str(e))
|
||||
|
||||
finally:
|
||||
log.info(f'Test finished')
|
||||
|
||||
@pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/16705")
|
||||
def test_scale_in_query_node_less_than_replicas(self):
|
||||
"""
|
||||
target: test scale in cluster and querynode < replica
|
||||
method: 1.Deploy cluster with 3 querynodes
|
||||
2.Create and insert data, flush
|
||||
3.Load collection with 2 replica number
|
||||
4.Scale in querynode from 3 to 1 and query
|
||||
5.Scale out querynode from 1 back to 3
|
||||
expected: Verify search successfully after scale out
|
||||
"""
|
||||
release_name = "scale-in-query"
|
||||
release_name = "mic-replica"
|
||||
image_tag = get_latest_tag()
|
||||
image = f'{constants.IMAGE_REPOSITORY}:{image_tag}'
|
||||
query_config = {
|
||||
'metadata.namespace': constants.NAMESPACE,
|
||||
'metadata.name': release_name,
|
||||
'spec.components.image': image,
|
||||
'spec.components.proxy.serviceType': 'LoadBalancer',
|
||||
'spec.components.queryNode.replicas': 2,
|
||||
'spec.config.dataCoord.enableCompaction': True,
|
||||
'spec.config.dataCoord.enableGarbageCollection': True
|
||||
}
|
||||
mic = MilvusOperator()
|
||||
mic.install(query_config)
|
||||
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
|
||||
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
|
||||
else:
|
||||
raise MilvusException(message=f'Milvus healthy timeout 1800s')
|
||||
try:
|
||||
# prepare collection
|
||||
connections.connect("scale-in", host=host, port=19530)
|
||||
utility_w = ApiUtilityWrapper()
|
||||
collection_w = ApiCollectionWrapper()
|
||||
collection_w.init_collection(name=cf.gen_unique_str("scale_in"), schema=cf.gen_default_collection_schema())
|
||||
collection_w.insert(cf.gen_default_dataframe_data())
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
# load multi replicas and search success
|
||||
collection_w.load(replica_number=2)
|
||||
search_res, is_succ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
|
||||
ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
assert len(search_res[0].ids) == ct.default_limit
|
||||
log.info("Search successfullt after load with 2 replicas")
|
||||
log.debug(collection_w.get_replicas()[0])
|
||||
log.debug(utility_w.get_query_segment_info(collection_w.name))
|
||||
|
||||
# scale in querynode from 2 to 1, less than replica number
|
||||
log.debug("Scale in querynode from 2 to 1")
|
||||
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 1}, constants.NAMESPACE)
|
||||
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
||||
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||
|
||||
# search and not assure success
|
||||
collection_w.search(cf.gen_vectors(1, ct.default_dim),
|
||||
ct.default_float_vec_field_name, ct.default_search_params,
|
||||
ct.default_limit, check_task=CheckTasks.check_nothing)
|
||||
log.debug(collection_w.get_replicas(check_task=CheckTasks.check_nothing)[0])
|
||||
log.debug(utility_w.get_query_segment_info(collection_w.name, check_task=CheckTasks.check_nothing))
|
||||
|
||||
# scale querynode from 1 back to 2
|
||||
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 2}, constants.NAMESPACE)
|
||||
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
||||
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||
|
||||
# verify search success
|
||||
collection_w.search(cf.gen_vectors(1, ct.default_dim),
|
||||
ct.default_float_vec_field_name, ct.default_search_params, ct.default_limit)
|
||||
# Verify replica info is correct
|
||||
replicas = collection_w.get_replicas()[0]
|
||||
assert len(replicas.groups) == 2
|
||||
for group in replicas.groups:
|
||||
assert len(group.group_nodes) == 1
|
||||
# Verify loaded segment info is correct
|
||||
seg_info = utility_w.get_query_segment_info(collection_w.name)[0]
|
||||
seg_ids = list(map(lambda seg: seg.segmentID, seg_info))
|
||||
num_entities = list(map(lambda seg: seg.num_rows, seg_info))
|
||||
assert reduce(lambda x, y: x ^ y, seg_ids) == 0
|
||||
assert reduce(lambda x, y: x + y, num_entities) == ct.default_nb * 2
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(str(e))
|
||||
|
||||
finally:
|
||||
log.info(f'Test finished with {fail_count} fail request')
|
||||
assert fail_count <= 1
|
||||
label = f"app.kubernetes.io/instance={release_name}"
|
||||
log.info('Start to export milvus pod logs')
|
||||
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
|
||||
|
|
|
@ -5,6 +5,7 @@ from utils.util_log import test_log as log
|
|||
|
||||
DEFAULT_FMT = '[{start_time}][{end_time}][{elapsed:0.8f}s] {collection_name} {func_name} ({arg_str}) -> {result!r}'
|
||||
|
||||
|
||||
def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
|
||||
def decorate(func):
|
||||
@functools.wraps(func)
|
||||
|
@ -31,19 +32,39 @@ def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
|
|||
else:
|
||||
result = func(*args, **kwargs)
|
||||
return result
|
||||
|
||||
return inner_wrapper
|
||||
|
||||
return decorate
|
||||
|
||||
|
||||
def counter(func):
|
||||
""" count func succ rate """
|
||||
def inner_wrapper(*args, **kwargs):
|
||||
""" inner wrapper """
|
||||
result, is_succ = func(*args, **kwargs)
|
||||
inner_wrapper.total += 1
|
||||
if is_succ:
|
||||
inner_wrapper.succ += 1
|
||||
else:
|
||||
inner_wrapper.fail += 1
|
||||
return result, is_succ
|
||||
|
||||
inner_wrapper.name = func.__name__
|
||||
inner_wrapper.total = 0
|
||||
inner_wrapper.succ = 0
|
||||
inner_wrapper.fail = 0
|
||||
return inner_wrapper
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@trace()
|
||||
def snooze(seconds, name='snooze'):
|
||||
time.sleep(seconds)
|
||||
return name
|
||||
# print(f"name: {name}")
|
||||
|
||||
|
||||
for i in range(3):
|
||||
res = snooze(.123, name=i)
|
||||
print("res:",res)
|
||||
print("res:", res)
|
||||
|
|
Loading…
Reference in New Issue