mirror of https://github.com/milvus-io/milvus.git
[test]Add standby test and adapt to different schemas (#24781)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/24790/head
parent
a9dccec03a
commit
2bcd1bb0d8
|
@ -3,6 +3,7 @@ import pytest
|
|||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption("--chaos_type", action="store", default="pod_kill", help="chaos_type")
|
||||
parser.addoption("--role_type", action="store", default="activated", help="role_type")
|
||||
parser.addoption("--target_component", action="store", default="querynode", help="target_component")
|
||||
parser.addoption("--target_pod", action="store", default="etcd_leader", help="target_pod")
|
||||
parser.addoption("--target_number", action="store", default="1", help="target_number")
|
||||
|
@ -17,6 +18,11 @@ def chaos_type(request):
|
|||
return request.config.getoption("--chaos_type")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def role_type(request):
|
||||
return request.config.getoption("--role_type")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def target_component(request):
|
||||
return request.config.getoption("--target_component")
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
import threading
|
||||
import pytest
|
||||
import time
|
||||
from time import sleep
|
||||
from pathlib import Path
|
||||
from pymilvus import connections
|
||||
from common.cus_resource_opts import CustomResourceOperations as CusResource
|
||||
from common.milvus_sys import MilvusSys
|
||||
import logging as log
|
||||
from utils.util_k8s import wait_pods_ready, get_milvus_instance_name, get_milvus_deploy_tool, find_activate_standby_coord_pod
|
||||
from utils.util_common import update_key_value, update_key_name, gen_experiment_config
|
||||
import constants
|
||||
|
||||
|
||||
class TestChaosApply:
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def init_env(self, host, port, user, password, milvus_ns):
|
||||
if user and password:
|
||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
||||
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
|
||||
else:
|
||||
connections.connect('default', host=host, port=port)
|
||||
if connections.has_connection("default") is False:
|
||||
raise Exception("no connections")
|
||||
#
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.user = user
|
||||
self.password = password
|
||||
self.milvus_sys = MilvusSys(alias='default')
|
||||
self.chaos_ns = constants.CHAOS_NAMESPACE
|
||||
self.milvus_ns = milvus_ns
|
||||
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
||||
self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys)
|
||||
|
||||
def reconnect(self):
|
||||
if self.user and self.password:
|
||||
connections.connect('default', host=self.host, port=self.port,
|
||||
user=self.user,
|
||||
password=self.password,
|
||||
secure=True)
|
||||
else:
|
||||
connections.connect('default', host=self.host, port=self.port)
|
||||
if connections.has_connection("default") is False:
|
||||
raise Exception("no connections")
|
||||
|
||||
def teardown(self):
|
||||
chaos_res = CusResource(kind=self.chaos_config['kind'],
|
||||
group=constants.CHAOS_GROUP,
|
||||
version=constants.CHAOS_VERSION,
|
||||
namespace=constants.CHAOS_NAMESPACE)
|
||||
meta_name = self.chaos_config.get('metadata', None).get('name', None)
|
||||
chaos_res.delete(meta_name, raise_ex=False)
|
||||
sleep(2)
|
||||
|
||||
def test_chaos_apply(self, chaos_type, role_type, target_component, chaos_duration, chaos_interval):
|
||||
# start the monitor threads to check the milvus ops
|
||||
log.info("*********************Chaos Test Start**********************")
|
||||
log.info(connections.get_connection_addr('default'))
|
||||
|
||||
activate_pod_list, standby_pod_list = find_activate_standby_coord_pod(self.milvus_ns, self.release_name,
|
||||
target_component)
|
||||
log.info(f"activated pod list: {activate_pod_list}, standby pod list: {standby_pod_list}")
|
||||
target_pod_list = activate_pod_list + standby_pod_list
|
||||
if role_type == "standby":
|
||||
target_pod_list = standby_pod_list
|
||||
if role_type == "activated":
|
||||
target_pod_list = activate_pod_list
|
||||
chaos_type = chaos_type.replace("_", "-")
|
||||
chaos_config = gen_experiment_config(f"{str(Path(__file__).absolute().parent)}/"
|
||||
f"chaos_objects/template/{chaos_type}-by-pod-list.yaml")
|
||||
chaos_config['metadata']['name'] = f"test-{target_component}-standby-{int(time.time())}"
|
||||
|
||||
meta_name = chaos_config.get('metadata', None).get('name', None)
|
||||
chaos_config['spec']['selector']['pods']['chaos-testing'] = target_pod_list
|
||||
self.chaos_config = chaos_config
|
||||
# update chaos_duration from string to int with unit second
|
||||
chaos_duration = chaos_duration.replace('h', '*3600+').replace('m', '*60+').replace('s', '*1+') + '+0'
|
||||
chaos_duration = eval(chaos_duration)
|
||||
if self.deploy_by == "milvus-operator":
|
||||
update_key_name(chaos_config, "component", "app.kubernetes.io/component")
|
||||
self._chaos_config = chaos_config # cache the chaos config for tear down
|
||||
log.info(f"chaos_config: {chaos_config}")
|
||||
# apply chaos object
|
||||
chaos_res = CusResource(kind=chaos_config['kind'],
|
||||
group=constants.CHAOS_GROUP,
|
||||
version=constants.CHAOS_VERSION,
|
||||
namespace=constants.CHAOS_NAMESPACE)
|
||||
chaos_res.create(chaos_config)
|
||||
log.info("chaos injected")
|
||||
res = chaos_res.list_all()
|
||||
chaos_list = [r['metadata']['name'] for r in res['items']]
|
||||
assert meta_name in chaos_list
|
||||
res = chaos_res.get(meta_name)
|
||||
log.info(f"chaos inject result: {res['kind']}, {res['metadata']['name']}")
|
||||
sleep(chaos_duration)
|
||||
# delete chaos
|
||||
chaos_res.delete(meta_name)
|
||||
log.info("chaos deleted")
|
||||
res = chaos_res.list_all()
|
||||
chaos_list = [r['metadata']['name'] for r in res['items']]
|
||||
# verify the chaos is deleted
|
||||
sleep(10)
|
||||
res = chaos_res.list_all()
|
||||
chaos_list = [r['metadata']['name'] for r in res['items']]
|
||||
assert meta_name not in chaos_list
|
||||
# wait all pods ready
|
||||
t0 = time.time()
|
||||
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={meta_name}")
|
||||
wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={meta_name}")
|
||||
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={meta_name}")
|
||||
wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={meta_name}")
|
||||
log.info("all pods are ready")
|
||||
pods_ready_time = time.time() - t0
|
||||
log.info(f"pods ready time: {pods_ready_time}")
|
||||
# reconnect to test the service healthy
|
||||
start_time = time.time()
|
||||
end_time = start_time + 120
|
||||
while time.time() < end_time:
|
||||
try:
|
||||
self.reconnect()
|
||||
break
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
sleep(2)
|
||||
recovery_time = time.time() - start_time
|
||||
log.info(f"recovery time: {recovery_time}")
|
||||
time.sleep(30)
|
||||
activate_pod_list_after_chaos, standby_pod_list_after_chaos = find_activate_standby_coord_pod(self.milvus_ns, self.release_name,
|
||||
target_component)
|
||||
log.info(f"activated pod list: {activate_pod_list_after_chaos}, standby pod list: {standby_pod_list_after_chaos}")
|
||||
if role_type == "standby":
|
||||
# if the standby pod is injected, the activated pod should not be changed
|
||||
assert activate_pod_list_after_chaos[0] == activate_pod_list[0]
|
||||
if role_type == "activated":
|
||||
# if the activated pod is injected, the one of standby pods should be changed to activated
|
||||
assert activate_pod_list_after_chaos[0] in standby_pod_list
|
||||
log.info("*********************Chaos Test Completed**********************")
|
|
@ -1,6 +1,6 @@
|
|||
import time
|
||||
import pytest
|
||||
|
||||
from pymilvus import Collection
|
||||
from base.client_base import TestcaseBase
|
||||
from common import common_func as cf
|
||||
from common import common_type as ct
|
||||
|
@ -26,11 +26,12 @@ class TestAllCollection(TestcaseBase):
|
|||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_milvus_default(self, collection_name):
|
||||
self._connect()
|
||||
# create
|
||||
name = collection_name if collection_name else cf.gen_unique_str("Checker_")
|
||||
t0 = time.time()
|
||||
collection_w = self.init_collection_wrap(name=name, active_trace=True, enable_dynamic_field=False,
|
||||
with_json=False)
|
||||
schema = Collection(name=name).schema
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
tt = time.time() - t0
|
||||
assert collection_w.name == name
|
||||
# compact collection before getting num_entities
|
||||
|
@ -43,7 +44,11 @@ class TestAllCollection(TestcaseBase):
|
|||
|
||||
# insert
|
||||
insert_batch = 3000
|
||||
data = cf.gen_default_list_data(start=-insert_batch, with_json=False)
|
||||
with_json = False
|
||||
for field in collection_w.schema.fields:
|
||||
if field.dtype.name == "JSON":
|
||||
with_json = True
|
||||
data = cf.gen_default_list_data(start=-insert_batch, with_json=with_json)
|
||||
t0 = time.time()
|
||||
_, res = collection_w.insert(data)
|
||||
tt = time.time() - t0
|
||||
|
@ -89,7 +94,7 @@ class TestAllCollection(TestcaseBase):
|
|||
log.info(f"assert search: {tt}")
|
||||
assert len(res_1) == 1
|
||||
# query
|
||||
term_expr = f'{ct.default_int64_field_name} in {[i for i in range(-insert_batch,0)]}'
|
||||
term_expr = f'{ct.default_int64_field_name} in {[i for i in range(-insert_batch, 0)]}'
|
||||
t0 = time.time()
|
||||
res, _ = collection_w.query(term_expr)
|
||||
tt = time.time() - t0
|
||||
|
@ -98,7 +103,7 @@ class TestAllCollection(TestcaseBase):
|
|||
collection_w.release()
|
||||
|
||||
# insert data
|
||||
d = cf.gen_default_list_data(with_json=False)
|
||||
d = cf.gen_default_list_data(with_json=with_json)
|
||||
collection_w.insert(d)
|
||||
|
||||
# load
|
||||
|
|
|
@ -3,7 +3,7 @@ from time import sleep
|
|||
from pymilvus import connections
|
||||
from chaos.checker import (CreateChecker,
|
||||
InsertChecker,
|
||||
FlushChecker,
|
||||
FlushChecker,
|
||||
SearchChecker,
|
||||
QueryChecker,
|
||||
IndexChecker,
|
||||
|
@ -17,6 +17,7 @@ from chaos.chaos_commons import assert_statistic
|
|||
from chaos import constants
|
||||
from delayed_assert import assert_expectations
|
||||
|
||||
|
||||
class TestBase:
|
||||
expect_create = constants.SUCC
|
||||
expect_insert = constants.SUCC
|
||||
|
@ -45,7 +46,7 @@ class TestOperations(TestBase):
|
|||
self.host = host
|
||||
self.port = port
|
||||
self.user = user
|
||||
self.password = password
|
||||
self.password = password
|
||||
|
||||
def init_health_checkers(self, collection_name=None):
|
||||
c_name = collection_name
|
||||
|
@ -57,7 +58,7 @@ class TestOperations(TestBase):
|
|||
Op.search: SearchChecker(collection_name=c_name),
|
||||
Op.query: QueryChecker(collection_name=c_name),
|
||||
Op.delete: DeleteChecker(collection_name=c_name),
|
||||
Op.drop:DropChecker(collection_name=c_name)
|
||||
Op.drop: DropChecker(collection_name=c_name)
|
||||
}
|
||||
self.health_checkers = checkers
|
||||
|
||||
|
@ -71,13 +72,13 @@ class TestOperations(TestBase):
|
|||
cc.start_monitor_threads(self.health_checkers)
|
||||
log.info("*********************Load Start**********************")
|
||||
# wait request_duration
|
||||
request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","")
|
||||
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
|
||||
if request_duration[-1] == "+":
|
||||
request_duration = request_duration[:-1]
|
||||
request_duration = eval(request_duration)
|
||||
for i in range(10):
|
||||
sleep(request_duration//10)
|
||||
for k,v in self.health_checkers.items():
|
||||
sleep(request_duration // 10)
|
||||
for k, v in self.health_checkers.items():
|
||||
v.check_result()
|
||||
if is_check:
|
||||
assert_statistic(self.health_checkers, succ_rate_threshold=0.98)
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
import pytest
|
||||
import threading
|
||||
from time import sleep
|
||||
from pymilvus import connections
|
||||
from chaos.checker import (CreateChecker,
|
||||
InsertChecker,
|
||||
FlushChecker,
|
||||
SearchChecker,
|
||||
QueryChecker,
|
||||
IndexChecker,
|
||||
DeleteChecker,
|
||||
Op)
|
||||
from utils.util_log import test_log as log
|
||||
from chaos import chaos_commons as cc
|
||||
from common.common_type import CaseLabel
|
||||
from common.milvus_sys import MilvusSys
|
||||
from chaos.chaos_commons import assert_statistic
|
||||
from chaos import constants
|
||||
from delayed_assert import assert_expectations
|
||||
from utils.util_k8s import (get_milvus_instance_name,
|
||||
get_milvus_deploy_tool,
|
||||
reset_healthy_checker_after_standby_activated)
|
||||
|
||||
|
||||
class TestBase:
|
||||
expect_create = constants.SUCC
|
||||
expect_insert = constants.SUCC
|
||||
expect_flush = constants.SUCC
|
||||
expect_index = constants.SUCC
|
||||
expect_search = constants.SUCC
|
||||
expect_query = constants.SUCC
|
||||
host = '127.0.0.1'
|
||||
port = 19530
|
||||
_chaos_config = None
|
||||
health_checkers = {}
|
||||
|
||||
|
||||
class TestOperations(TestBase):
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def connection(self, host, port, user, password, milvus_ns):
|
||||
if user and password:
|
||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
||||
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
|
||||
else:
|
||||
connections.connect('default', host=host, port=port)
|
||||
if connections.has_connection("default") is False:
|
||||
raise Exception("no connections")
|
||||
log.info("connect to milvus successfully")
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.user = user
|
||||
self.password = password
|
||||
self.milvus_sys = MilvusSys(alias='default')
|
||||
self.chaos_ns = constants.CHAOS_NAMESPACE
|
||||
self.milvus_ns = milvus_ns
|
||||
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
||||
self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys)
|
||||
|
||||
def init_health_checkers(self, collection_name=None):
|
||||
c_name = collection_name
|
||||
checkers = {
|
||||
Op.create: CreateChecker(collection_name=c_name),
|
||||
Op.insert: InsertChecker(collection_name=c_name),
|
||||
Op.flush: FlushChecker(collection_name=c_name),
|
||||
Op.index: IndexChecker(collection_name=c_name),
|
||||
Op.search: SearchChecker(collection_name=c_name),
|
||||
Op.query: QueryChecker(collection_name=c_name),
|
||||
Op.delete: DeleteChecker(collection_name=c_name),
|
||||
}
|
||||
self.health_checkers = checkers
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_operations(self, request_duration, target_component, is_check):
|
||||
# start the monitor threads to check the milvus ops
|
||||
log.info("*********************Test Start**********************")
|
||||
log.info(connections.get_connection_addr('default'))
|
||||
c_name = None
|
||||
self.init_health_checkers(collection_name=c_name)
|
||||
cc.start_monitor_threads(self.health_checkers)
|
||||
log.info("*********************Load Start**********************")
|
||||
# wait request_duration
|
||||
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
|
||||
if request_duration[-1] == "+":
|
||||
request_duration = request_duration[:-1]
|
||||
request_duration = eval(request_duration)
|
||||
# start a thread to reset health_checkers when standby is activated.
|
||||
t = threading.Thread(target=reset_healthy_checker_after_standby_activated,
|
||||
args=(self.milvus_ns, self.release_name, target_component, self.health_checkers),
|
||||
kwargs={"timeout": request_duration//2},
|
||||
daemon=True)
|
||||
t.start()
|
||||
# t.join()
|
||||
log.info('start a thread to reset health_checkers when standby is activated')
|
||||
for i in range(10):
|
||||
sleep(request_duration//10)
|
||||
for k, v in self.health_checkers.items():
|
||||
v.check_result()
|
||||
if is_check:
|
||||
assert_statistic(self.health_checkers)
|
||||
assert_expectations()
|
||||
log.info("*********************Chaos Test Completed**********************")
|
|
@ -42,3 +42,5 @@ loguru==0.6.0
|
|||
|
||||
# util
|
||||
psutil==5.9.4
|
||||
# for standby test
|
||||
etcd-sdk-python==0.0.2
|
|
@ -1,13 +1,14 @@
|
|||
import json
|
||||
import os.path
|
||||
import time
|
||||
|
||||
import pyetcd
|
||||
import requests
|
||||
from pymilvus import connections
|
||||
from kubernetes import client, config
|
||||
from kubernetes.client.rest import ApiException
|
||||
from common.milvus_sys import MilvusSys
|
||||
from utils.util_log import test_log as log
|
||||
from chaos import chaos_commons as cc
|
||||
from common.common_type import in_cluster_env
|
||||
|
||||
|
||||
|
@ -187,7 +188,7 @@ def get_milvus_instance_name(namespace, host="127.0.0.1", port="19530", milvus_s
|
|||
query_node_ip = ms.query_nodes[0]["infos"]['hardware_infos']["ip"].split(":")[0]
|
||||
ip_name_pairs = get_pod_ip_name_pairs(namespace, "app.kubernetes.io/name=milvus")
|
||||
pod_name = ip_name_pairs[query_node_ip]
|
||||
|
||||
|
||||
init_k8s_client_config()
|
||||
api_instance = client.CoreV1Api()
|
||||
try:
|
||||
|
@ -222,7 +223,7 @@ def get_milvus_deploy_tool(namespace, milvus_sys):
|
|||
log.error("Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e)
|
||||
raise Exception(str(e))
|
||||
if ("app.kubernetes.io/managed-by" in api_response.metadata.labels and
|
||||
api_response.metadata.labels["app.kubernetes.io/managed-by"] == "milvus-operator"):
|
||||
api_response.metadata.labels["app.kubernetes.io/managed-by"] == "milvus-operator"):
|
||||
deploy_tool = "milvus-operator"
|
||||
else:
|
||||
deploy_tool = "helm"
|
||||
|
@ -315,6 +316,7 @@ def get_metrics_querynode_sq_req_count():
|
|||
else:
|
||||
raise Exception(-1, f"Failed to get metrics with status code {response.status_code}")
|
||||
|
||||
|
||||
def get_svc_ip(namespace, label_selector):
|
||||
""" get svc ip from svc list """
|
||||
init_k8s_client_config()
|
||||
|
@ -393,6 +395,66 @@ def get_etcd_followers(release_name, deploy_tool="helm"):
|
|||
return followers
|
||||
|
||||
|
||||
def find_activate_standby_coord_pod(namespace, release_name, coord_type):
|
||||
init_k8s_client_config()
|
||||
api_instance = client.CoreV1Api()
|
||||
etcd_service_name = release_name + "-etcd"
|
||||
service = api_instance.read_namespaced_service(name=etcd_service_name, namespace=namespace)
|
||||
etcd_cluster_ip = service.spec.cluster_ip
|
||||
etcd_port = service.spec.ports[0].port
|
||||
etcd = pyetcd.client(host=etcd_cluster_ip, port=etcd_port)
|
||||
v = etcd.get(f'by-dev/meta/session/{coord_type}')
|
||||
log.info(f"coord_type: {coord_type}, etcd session value: {v}")
|
||||
activated_pod_ip = json.loads(v[0])["Address"].split(":")[0]
|
||||
label_selector = f'app.kubernetes.io/instance={release_name}, component={coord_type}'
|
||||
items = get_pod_list(namespace, label_selector=label_selector)
|
||||
all_pod_list = []
|
||||
for item in items:
|
||||
pod_name = item.metadata.name
|
||||
all_pod_list.append(pod_name)
|
||||
activate_pod_list = []
|
||||
standby_pod_list = []
|
||||
for item in items:
|
||||
pod_name = item.metadata.name
|
||||
ip = item.status.pod_ip
|
||||
if ip == activated_pod_ip:
|
||||
activate_pod_list.append(pod_name)
|
||||
standby_pod_list = list(set(all_pod_list) - set(activate_pod_list))
|
||||
return activate_pod_list, standby_pod_list
|
||||
|
||||
|
||||
def reset_healthy_checker_after_standby_activated(namespace, release_name, coord_type, health_checkers, timeout=360):
|
||||
activate_pod_list_before, standby_pod_list_before = find_activate_standby_coord_pod(namespace, release_name,
|
||||
coord_type)
|
||||
log.info(f"check standby switch: activate_pod_list_before {activate_pod_list_before}, "
|
||||
f"standby_pod_list_before {standby_pod_list_before}")
|
||||
standby_activated = False
|
||||
start_time = time.time()
|
||||
end_time = time.time()
|
||||
while not standby_activated and end_time - start_time < timeout:
|
||||
try:
|
||||
activate_pod_list_after, standby_pod_list_after = find_activate_standby_coord_pod(namespace, release_name,
|
||||
coord_type)
|
||||
if activate_pod_list_after[0] in standby_pod_list_before:
|
||||
standby_activated = True
|
||||
log.info(f"Standby {coord_type} pod {activate_pod_list_after[0]} activated")
|
||||
log.info(f"check standby switch: activate_pod_list_after {activate_pod_list_after}, "
|
||||
f"standby_pod_list_after {standby_pod_list_after}")
|
||||
break
|
||||
except Exception as e:
|
||||
log.error(f"Exception when check standby switch: {e}")
|
||||
time.sleep(10)
|
||||
end_time = time.time()
|
||||
if standby_activated:
|
||||
time.sleep(30)
|
||||
cc.reset_counting(health_checkers)
|
||||
for k, v in health_checkers.items():
|
||||
log.info("reset health checkers")
|
||||
v.check_result()
|
||||
else:
|
||||
log.info(f"Standby {coord_type} pod does not switch standby mode")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
label = "app.kubernetes.io/name=milvus, component=querynode"
|
||||
instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111")
|
||||
|
|
Loading…
Reference in New Issue