mirror of https://github.com/milvus-io/milvus.git
[skip ci] Add part chaos experment pod failure and container kill (#7024)
* fix chaos checker Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * add chaos queryNode pod failure Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * add datanode pod failure chaos experiment add export pods logs func Signed-off-by: ThreadDao <yufen.zong@zilliz.com>pull/7063/head
parent
ecfebff801
commit
8bf4524f6f
|
@ -0,0 +1,18 @@
|
||||||
|
apiVersion: chaos-mesh.org/v1alpha1
|
||||||
|
kind: PodChaos
|
||||||
|
metadata:
|
||||||
|
name: test-datanode-container-kill
|
||||||
|
namespace: chaos-testing
|
||||||
|
spec:
|
||||||
|
action: container-kill
|
||||||
|
mode: one
|
||||||
|
containerName: 'datanode'
|
||||||
|
selector:
|
||||||
|
namespaces:
|
||||||
|
- chaos-testing # target namespace of milvus deployment
|
||||||
|
labelSelectors:
|
||||||
|
app.kubernetes.io/instance: milvus-chaos
|
||||||
|
app.kubernetes.io/name: milvus
|
||||||
|
component: datanode
|
||||||
|
scheduler:
|
||||||
|
cron: '@every 2s'
|
|
@ -0,0 +1,19 @@
|
||||||
|
apiVersion: chaos-mesh.org/v1alpha1
|
||||||
|
kind: PodChaos
|
||||||
|
metadata:
|
||||||
|
name: test-datanode-pod-failure
|
||||||
|
namespace: chaos-testing
|
||||||
|
spec:
|
||||||
|
action: pod-failure
|
||||||
|
mode: one
|
||||||
|
value: ''
|
||||||
|
duration: '20s'
|
||||||
|
selector:
|
||||||
|
namespaces:
|
||||||
|
- chaos-testing # target namespace of milvus deployment
|
||||||
|
labelSelectors:
|
||||||
|
app.kubernetes.io/instance: milvus-chaos
|
||||||
|
app.kubernetes.io/name: milvus
|
||||||
|
component: datanode
|
||||||
|
scheduler:
|
||||||
|
cron: '@every 30s'
|
|
@ -0,0 +1,19 @@
|
||||||
|
apiVersion: chaos-mesh.org/v1alpha1
|
||||||
|
kind: PodChaos
|
||||||
|
metadata:
|
||||||
|
name: test-querynode-pod-failure
|
||||||
|
namespace: chaos-testing
|
||||||
|
spec:
|
||||||
|
action: pod-failure
|
||||||
|
mode: one
|
||||||
|
value: ''
|
||||||
|
duration: '20s'
|
||||||
|
selector:
|
||||||
|
namespaces:
|
||||||
|
- chaos-testing # target namespace of milvus deployment
|
||||||
|
labelSelectors:
|
||||||
|
app.kubernetes.io/instance: milvus-chaos
|
||||||
|
app.kubernetes.io/name: milvus
|
||||||
|
component: querynode
|
||||||
|
scheduler:
|
||||||
|
cron: '@every 30s'
|
|
@ -140,4 +140,26 @@ Collections:
|
||||||
testcase:
|
testcase:
|
||||||
name: test_querynode_network_isolation
|
name: test_querynode_network_isolation
|
||||||
chaos: chaos_querynode_network_isolation.yaml
|
chaos: chaos_querynode_network_isolation.yaml
|
||||||
# and 10 more for the other pods
|
# and 10 more for the other pods
|
||||||
|
|
||||||
|
-
|
||||||
|
testcase:
|
||||||
|
name: test_datanode_container_kill
|
||||||
|
chaos: chaos_datanode_container_kill.yaml
|
||||||
|
expectation:
|
||||||
|
cluster_1_node:
|
||||||
|
insert: succ
|
||||||
|
flush: fail
|
||||||
|
cluster_n_nodes:
|
||||||
|
insert: degrade
|
||||||
|
|
||||||
|
-
|
||||||
|
testcase:
|
||||||
|
name: test_datanode_pod_failure
|
||||||
|
chaos: chaos_datanode_pod_failure.yaml
|
||||||
|
expectation:
|
||||||
|
cluster_1_node:
|
||||||
|
insert: succ
|
||||||
|
flush: fail
|
||||||
|
cluster_n_nodes:
|
||||||
|
insert: degrade
|
|
@ -34,7 +34,7 @@ class Checker:
|
||||||
schema=cf.gen_default_collection_schema(),
|
schema=cf.gen_default_collection_schema(),
|
||||||
timeout=timeout)
|
timeout=timeout)
|
||||||
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
|
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
|
||||||
timeout=timeout, check_task='check_nothing')
|
timeout=timeout)
|
||||||
self.initial_entities = self.c_wrap.num_entities # do as a flush
|
self.initial_entities = self.c_wrap.num_entities # do as a flush
|
||||||
|
|
||||||
def total(self):
|
def total(self):
|
||||||
|
@ -63,7 +63,7 @@ class SearchChecker(Checker):
|
||||||
data=search_vec,
|
data=search_vec,
|
||||||
anns_field=ct.default_float_vec_field_name,
|
anns_field=ct.default_float_vec_field_name,
|
||||||
param={"nprobe": 32},
|
param={"nprobe": 32},
|
||||||
limit=1, timeout=timeout, check_task='check_nothing'
|
limit=1, timeout=timeout
|
||||||
)
|
)
|
||||||
if result:
|
if result:
|
||||||
self._succ += 1
|
self._succ += 1
|
||||||
|
@ -82,7 +82,7 @@ class InsertFlushChecker(Checker):
|
||||||
while self._running:
|
while self._running:
|
||||||
_, insert_result = \
|
_, insert_result = \
|
||||||
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
|
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
|
||||||
timeout=timeout, check_task='check_nothing')
|
timeout=timeout)
|
||||||
if not self._flush:
|
if not self._flush:
|
||||||
if insert_result:
|
if insert_result:
|
||||||
self._succ += 1
|
self._succ += 1
|
||||||
|
@ -106,11 +106,11 @@ class CreateChecker(Checker):
|
||||||
_, result = self.c_wrap.init_collection(
|
_, result = self.c_wrap.init_collection(
|
||||||
name=cf.gen_unique_str("CreateChecker_"),
|
name=cf.gen_unique_str("CreateChecker_"),
|
||||||
schema=cf.gen_default_collection_schema(),
|
schema=cf.gen_default_collection_schema(),
|
||||||
timeout=timeout, check_task='check_nothing'
|
timeout=timeout
|
||||||
)
|
)
|
||||||
if result:
|
if result:
|
||||||
self._succ += 1
|
self._succ += 1
|
||||||
self.c_wrap.drop(timeout=timeout, check_task="check_nothing")
|
self.c_wrap.drop(timeout=timeout)
|
||||||
else:
|
else:
|
||||||
self._fail += 1
|
self._fail += 1
|
||||||
sleep(constants.WAIT_PER_OP / 10)
|
sleep(constants.WAIT_PER_OP / 10)
|
||||||
|
@ -120,7 +120,7 @@ class IndexChecker(Checker):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.c_wrap.insert(data=cf.gen_default_list_data(nb=5*constants.ENTITIES_FOR_SEARCH),
|
self.c_wrap.insert(data=cf.gen_default_list_data(nb=5*constants.ENTITIES_FOR_SEARCH),
|
||||||
timeout=timeout, check_task='check_nothing')
|
timeout=timeout)
|
||||||
log.debug(f"Index ready entities: {self.c_wrap.num_entities }") # do as a flush before indexing
|
log.debug(f"Index ready entities: {self.c_wrap.num_entities }") # do as a flush before indexing
|
||||||
|
|
||||||
def keep_running(self):
|
def keep_running(self):
|
||||||
|
@ -128,10 +128,10 @@ class IndexChecker(Checker):
|
||||||
_, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
|
_, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
|
||||||
constants.DEFAULT_INDEX_PARAM,
|
constants.DEFAULT_INDEX_PARAM,
|
||||||
name=cf.gen_unique_str('index_'),
|
name=cf.gen_unique_str('index_'),
|
||||||
timeout=timeout, check_task='check_nothing')
|
timeout=timeout)
|
||||||
if result:
|
if result:
|
||||||
self._succ += 1
|
self._succ += 1
|
||||||
self.c_wrap.drop_index(timeout=timeout, check_task='check_nothing')
|
self.c_wrap.drop_index(timeout=timeout)
|
||||||
else:
|
else:
|
||||||
self._fail += 1
|
self._fail += 1
|
||||||
|
|
||||||
|
@ -147,7 +147,7 @@ class QueryChecker(Checker):
|
||||||
for _ in range(5):
|
for _ in range(5):
|
||||||
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))
|
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))
|
||||||
term_expr = f'{ct.default_int64_field_name} in {int_values}'
|
term_expr = f'{ct.default_int64_field_name} in {int_values}'
|
||||||
_, result = self.c_wrap.query(term_expr, timeout=timeout, check_task='check_nothing')
|
_, result = self.c_wrap.query(term_expr, timeout=timeout)
|
||||||
if result:
|
if result:
|
||||||
self._succ += 1
|
self._succ += 1
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -129,7 +129,7 @@ class TestChaos(TestChaosBase):
|
||||||
chaos_opt = ChaosOpt(chaos_config['kind'])
|
chaos_opt = ChaosOpt(chaos_config['kind'])
|
||||||
chaos_opt.create_chaos_object(chaos_config)
|
chaos_opt.create_chaos_object(chaos_config)
|
||||||
log.debug("chaos injected")
|
log.debug("chaos injected")
|
||||||
|
sleep(constants.WAIT_PER_OP * 2.1)
|
||||||
# reset counting
|
# reset counting
|
||||||
reset_counting(self.health_checkers)
|
reset_counting(self.health_checkers)
|
||||||
|
|
||||||
|
|
|
@ -12,3 +12,4 @@ QUERY_NODE = "queryNode"
|
||||||
# my values.yaml path
|
# my values.yaml path
|
||||||
MILVUS_CHART_ENV = 'MILVUS_CHART_ENV'
|
MILVUS_CHART_ENV = 'MILVUS_CHART_ENV'
|
||||||
MILVUS_CHART_PATH = '/home/zong/milvus-helm/charts/milvus'
|
MILVUS_CHART_PATH = '/home/zong/milvus-helm/charts/milvus'
|
||||||
|
MILVUS_LOGS_PATH = '/tmp/milvus'
|
||||||
|
|
|
@ -95,14 +95,37 @@ class HelmEnv:
|
||||||
service = v1.read_namespaced_service(f'{self.release_name}-milvus', constants.NAMESPACE)
|
service = v1.read_namespaced_service(f'{self.release_name}-milvus', constants.NAMESPACE)
|
||||||
return service.status.load_balancer.ingress[0].ip
|
return service.status.load_balancer.ingress[0].ip
|
||||||
|
|
||||||
|
def export_all_logs(self):
|
||||||
|
"""
|
||||||
|
export all cluster logs to /tmp/milvus, and temporarily missing minio pod logs
|
||||||
|
:return: export all pods' log to constants.MILVUS_LOGS_PATH
|
||||||
|
"""
|
||||||
|
pods = self.list_all_pods()
|
||||||
|
for pod in pods:
|
||||||
|
os.system(f'kubectl logs {pod} > {constants.MILVUS_LOGS_PATH}/{pod}.log 2>&1')
|
||||||
|
|
||||||
|
def list_all_pods(self):
|
||||||
|
from kubernetes import client, config
|
||||||
|
config.load_kube_config()
|
||||||
|
v1 = client.CoreV1Api()
|
||||||
|
label_selector = f'app.kubernetes.io/instance={self.release_name}'
|
||||||
|
ret = v1.list_namespaced_pod(namespace=constants.NAMESPACE, label_selector=label_selector)
|
||||||
|
pods = []
|
||||||
|
# # label_selector = 'release=zong-single'
|
||||||
|
for i in ret.items:
|
||||||
|
pods.append(i.metadata.name)
|
||||||
|
# # print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
|
||||||
|
return pods
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# default deploy q replicas
|
# default deploy q replicas
|
||||||
release_name = "scale-test"
|
release_name = "milvus-chaos"
|
||||||
env = HelmEnv(release_name=release_name)
|
env = HelmEnv(release_name=release_name)
|
||||||
# host = env.get_svc_external_ip()
|
# host = env.get_svc_external_ip()
|
||||||
# log.debug(host)
|
# log.debug(host)
|
||||||
# env.helm_install_cluster_milvus()
|
# env.helm_install_cluster_milvus()
|
||||||
# env.helm_upgrade_cluster_milvus(queryNode=2)
|
# env.helm_upgrade_cluster_milvus(queryNode=2)
|
||||||
env.helm_uninstall_cluster_milvus()
|
env.helm_uninstall_cluster_milvus()
|
||||||
sleep(5)
|
# sleep(5)
|
||||||
|
# env.export_all_logs()
|
||||||
|
|
Loading…
Reference in New Issue