milvus/tests/python_client/scale/test_query_node_scale.py

354 lines
16 KiB
Python

import random
import threading
import time
import pytest
from base.collection_wrapper import ApiCollectionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from common.common_type import CaseLabel, CheckTasks
from common.milvus_sys import MilvusSys
from customize.milvus_operator import MilvusOperator
from common import common_func as cf
from common import common_type as ct
from scale import constants, scale_common
from pymilvus import Index, connections, MilvusException
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready, read_pod_log
from utils.util_pymilvus import get_latest_tag
from utils.wrapper import counter
nb = 10000
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
def verify_load_balance(c_name, host, port=19530):
"""
verify load balance is available after scale
"""
connections.connect('default', host=host, port=port)
# verify load balance
utility_w = ApiUtilityWrapper()
collection_w = ApiCollectionWrapper()
collection_w.init_collection(c_name)
ms = MilvusSys()
res, _ = utility_w.get_query_segment_info(collection_w.name)
log.debug(res)
segment_distribution = cf.get_segment_distribution(res)
all_querynodes = [node["identifier"] for node in ms.query_nodes]
assert len(all_querynodes) > 1
all_querynodes = sorted(all_querynodes,
key=lambda x: len(segment_distribution[x]["sealed"])
if x in segment_distribution else 0, reverse=True)
log.debug(all_querynodes)
src_node_id = all_querynodes[0]
des_node_ids = all_querynodes[1:]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
utility_w.load_balance(collection_w.name, src_node_id, des_node_ids, sealed_segment_ids)
# get segments distribution after load balance
res, _ = utility_w.get_query_segment_info(collection_w.name)
log.debug(res)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id]["sealed"]
# assert src node has no sealed segments
assert sealed_segment_ids_after_load_banalce == []
des_sealed_segment_ids = []
for des_node_id in des_node_ids:
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
# assert sealed_segment_ids is subset of des_sealed_segment_ids
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
@pytest.mark.tags(CaseLabel.L3)
class TestQueryNodeScale:
@pytest.mark.tags(CaseLabel.L3)
def test_scale_query_node(self, host):
"""
target: test scale queryNode
method: 1.deploy milvus cluster with 1 queryNode
2.prepare work (connect, create, insert, index and load)
3.continuously search (daemon thread)
4.expand queryNode from 2 to 5
5.continuously insert new data (daemon thread)
6.shrink queryNode from 5 to 3
expected: Verify milvus remains healthy and search successfully during scale
"""
release_name = "scale-query"
image_tag = get_latest_tag()
image = f'{constants.IMAGE_REPOSITORY}:{image_tag}'
query_config = {
'metadata.namespace': constants.NAMESPACE,
'spec.mode': 'cluster',
'metadata.name': release_name,
'spec.components.image': image,
'spec.components.proxy.serviceType': 'LoadBalancer',
'spec.components.queryNode.replicas': 1,
'spec.config.common.retentionDuration': 60
}
mic = MilvusOperator()
mic.install(query_config)
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
else:
raise MilvusException(message=f'Milvus healthy timeout 1800s')
try:
# connect
connections.add_connection(default={"host": host, "port": 19530})
connections.connect(alias='default')
# create
c_name = cf.gen_unique_str("scale_query")
# c_name = 'scale_query_DymS7kI4'
collection_w = ApiCollectionWrapper()
utility_w = ApiUtilityWrapper()
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
# insert two segments
for i in range(30):
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
log.debug(collection_w.num_entities)
# create index
collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60)
assert collection_w.has_index()[0]
assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name,
default_index_params)
# load
collection_w.load()
# scale queryNode to 5
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 5}, constants.NAMESPACE)
@counter
def do_search():
""" do search """
search_res, is_succ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
ct.default_float_vec_field_name, ct.default_search_params,
ct.default_limit, check_task=CheckTasks.check_nothing)
assert len(search_res) == 1
return search_res, is_succ
def loop_search():
""" continuously search """
while True:
do_search()
threading.Thread(target=loop_search, args=(), daemon=True).start()
# wait new QN running, continuously insert
mic.wait_for_healthy(release_name, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
# verify load balance
verify_load_balance(c_name, host=host)
@counter
def do_insert():
""" do insert """
return collection_w.insert(cf.gen_default_dataframe_data(1000), check_task=CheckTasks.check_nothing)
def loop_insert():
""" loop insert """
while True:
do_insert()
threading.Thread(target=loop_insert, args=(), daemon=True).start()
log.debug(collection_w.num_entities)
time.sleep(20)
log.debug("Expand querynode test finished")
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 3}, constants.NAMESPACE)
mic.wait_for_healthy(release_name, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
log.debug(collection_w.num_entities)
time.sleep(60)
scale_common.check_succ_rate(do_search)
scale_common.check_succ_rate(do_insert)
log.debug("Shrink querynode test finished")
except Exception as e:
raise Exception(str(e))
finally:
label = f"app.kubernetes.io/instance={release_name}"
log.info('Start to export milvus pod logs')
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
mic.uninstall(release_name, namespace=constants.NAMESPACE)
def test_scale_query_node_replicas(self):
"""
target: test scale out querynode when load multi replicas
method: 1.Deploy cluster with 5 querynodes
2.Create collection with 2 shards
3.Insert 10 segments and flushed
4.Load collection with 2 replicas
5.Scale out querynode from 5 to 6 while search and insert growing data
expected: Verify search succ rate is 100%
"""
release_name = "scale-replica"
image_tag = get_latest_tag()
image = f'{constants.IMAGE_REPOSITORY}:{image_tag}'
query_config = {
'metadata.namespace': constants.NAMESPACE,
'metadata.name': release_name,
'spec.mode': 'cluster',
'spec.components.image': image,
'spec.components.proxy.serviceType': 'LoadBalancer',
'spec.components.queryNode.replicas': 5,
'spec.config.common.retentionDuration': 60
}
mic = MilvusOperator()
mic.install(query_config)
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
else:
raise MilvusException(message=f'Milvus healthy timeout 1800s')
try:
scale_querynode = random.choice([6, 7, 4, 3])
connections.connect("scale-replica", host=host, port=19530)
collection_w = ApiCollectionWrapper()
collection_w.init_collection(name=cf.gen_unique_str("scale_out"), schema=cf.gen_default_collection_schema(),
using='scale-replica', shards_num=3)
# insert 10 sealed segments
for i in range(5):
df = cf.gen_default_dataframe_data(nb=nb, start=i * nb)
collection_w.insert(df)
assert collection_w.num_entities == (i + 1) * nb
collection_w.load(replica_number=2)
@counter
def do_search():
""" do search """
search_res, is_succ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
ct.default_float_vec_field_name, ct.default_search_params,
ct.default_limit, check_task=CheckTasks.check_nothing)
assert len(search_res) == 1
return search_res, is_succ
def loop_search():
""" continuously search """
while True:
do_search()
threading.Thread(target=loop_search, args=(), daemon=True).start()
# scale out
mic.upgrade(release_name, {'spec.components.queryNode.replicas': scale_querynode}, constants.NAMESPACE)
mic.wait_for_healthy(release_name, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
log.debug("Scale out querynode success")
time.sleep(100)
scale_common.check_succ_rate(do_search)
log.debug("Scale out test finished")
except Exception as e:
raise Exception(str(e))
finally:
label = f"app.kubernetes.io/instance={release_name}"
log.info('Start to export milvus pod logs')
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
mic.uninstall(release_name, namespace=constants.NAMESPACE)
def test_scale_in_query_node_less_than_replicas(self):
"""
target: test scale in cluster and querynode < replica
method: 1.Deploy cluster with 3 querynodes
2.Create and insert data, flush
3.Load collection with 2 replica number
4.Scale in querynode from 3 to 1 and query
5.Scale out querynode from 1 back to 3
expected: Verify search successfully after scale out
"""
release_name = "scale-in-query"
image_tag = get_latest_tag()
image = f'{constants.IMAGE_REPOSITORY}:{image_tag}'
query_config = {
'metadata.namespace': constants.NAMESPACE,
'metadata.name': release_name,
'spec.mode': 'cluster',
'spec.components.image': image,
'spec.components.proxy.serviceType': 'LoadBalancer',
'spec.components.queryNode.replicas': 2,
'spec.config.common.retentionDuration': 60
}
mic = MilvusOperator()
mic.install(query_config)
if mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1800):
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
else:
raise MilvusException(message=f'Milvus healthy timeout 1800s')
try:
# prepare collection
connections.connect("scale-in", host=host, port=19530)
utility_w = ApiUtilityWrapper()
collection_w = ApiCollectionWrapper()
collection_w.init_collection(name=cf.gen_unique_str("scale_in"), schema=cf.gen_default_collection_schema(),
using="scale-in")
collection_w.insert(cf.gen_default_dataframe_data())
assert collection_w.num_entities == ct.default_nb
# load multi replicas and search success
collection_w.load(replica_number=2)
search_res, is_succ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
assert len(search_res[0].ids) == ct.default_limit
log.info("Search successfully after load with 2 replicas")
log.debug(collection_w.get_replicas()[0])
log.debug(utility_w.get_query_segment_info(collection_w.name, using="scale-in"))
# scale in querynode from 2 to 1, less than replica number
log.debug("Scale in querynode from 2 to 1")
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 1}, constants.NAMESPACE)
mic.wait_for_healthy(release_name, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
# search and not assure success
collection_w.search(cf.gen_vectors(1, ct.default_dim),
ct.default_float_vec_field_name, ct.default_search_params,
ct.default_limit, check_task=CheckTasks.check_nothing)
log.debug(collection_w.get_replicas(check_task=CheckTasks.check_nothing)[0])
# scale querynode from 1 back to 2
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 2}, constants.NAMESPACE)
mic.wait_for_healthy(release_name, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
# verify search success
collection_w.search(cf.gen_vectors(1, ct.default_dim),
ct.default_float_vec_field_name, ct.default_search_params, ct.default_limit)
# Verify replica info is correct
replicas = collection_w.get_replicas()[0]
assert len(replicas.groups) == 2
for group in replicas.groups:
assert len(group.group_nodes) == 1
# Verify loaded segment info is correct
seg_info = utility_w.get_query_segment_info(collection_w.name, using="scale-in")[0]
num_entities = 0
for seg in seg_info:
assert len(seg.nodeIds) == 2
num_entities += seg.num_rows
assert num_entities == ct.default_nb
except Exception as e:
raise Exception(str(e))
finally:
label = f"app.kubernetes.io/instance={release_name}"
log.info('Start to export milvus pod logs')
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
mic.uninstall(release_name, namespace=constants.NAMESPACE)