mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: ThreadDao <yufen.zong@zilliz.com> Signed-off-by: ThreadDao <yufen.zong@zilliz.com> Signed-off-by: ThreadDao <yufen.zong@zilliz.com>pull/19044/head
parent
362c663b22
commit
56693836ae
|
@ -17,7 +17,7 @@ pipeline {
|
||||||
|
|
||||||
agent {
|
agent {
|
||||||
kubernetes {
|
kubernetes {
|
||||||
label 'milvus-scale-test'
|
// label 'milvus-scale-test'
|
||||||
// inheritFrom 'milvus-test'
|
// inheritFrom 'milvus-test'
|
||||||
defaultContainer 'milvus-test'
|
defaultContainer 'milvus-test'
|
||||||
yamlFile "build/ci/jenkins/pod/scale-test.yaml"
|
yamlFile "build/ci/jenkins/pod/scale-test.yaml"
|
||||||
|
|
|
@ -6,7 +6,7 @@ metadata:
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: milvus-test
|
- name: milvus-test
|
||||||
image: harbor.milvus.io/qa/krte:dev-4
|
image: harbor.milvus.io/qa/fouram:1.1
|
||||||
# image: dockerhub-mirror-sh.zilliz.cc/milvusdb/pytest:20211209-cef343f
|
# image: dockerhub-mirror-sh.zilliz.cc/milvusdb/pytest:20211209-cef343f
|
||||||
command:
|
command:
|
||||||
- cat
|
- cat
|
||||||
|
|
|
@ -74,7 +74,7 @@ class TestIndexNodeScale:
|
||||||
# create index
|
# create index
|
||||||
# Note that the num of segments and the num of indexNode are related to indexing time
|
# Note that the num of segments and the num of indexNode are related to indexing time
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60)
|
||||||
assert collection_w.has_index()[0]
|
assert collection_w.has_index()[0]
|
||||||
t0 = datetime.datetime.now() - start
|
t0 = datetime.datetime.now() - start
|
||||||
log.info(f'Create index on {init_replicas} indexNode cost t0: {t0}')
|
log.info(f'Create index on {init_replicas} indexNode cost t0: {t0}')
|
||||||
|
@ -90,14 +90,14 @@ class TestIndexNodeScale:
|
||||||
|
|
||||||
# create index again
|
# create index again
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60)
|
||||||
assert collection_w.has_index()[0]
|
assert collection_w.has_index()[0]
|
||||||
t1 = datetime.datetime.now() - start
|
t1 = datetime.datetime.now() - start
|
||||||
log.info(f'Create index on {expand_replicas} indexNode cost t1: {t1}')
|
log.info(f'Create index on {expand_replicas} indexNode cost t1: {t1}')
|
||||||
collection_w.drop_index()
|
collection_w.drop_index()
|
||||||
|
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60)
|
||||||
assert collection_w.has_index()[0]
|
assert collection_w.has_index()[0]
|
||||||
t2 = datetime.datetime.now() - start
|
t2 = datetime.datetime.now() - start
|
||||||
log.info(f'Create index on {expand_replicas} indexNode cost t2: {t2}')
|
log.info(f'Create index on {expand_replicas} indexNode cost t2: {t2}')
|
||||||
|
@ -164,7 +164,7 @@ class TestIndexNodeScale:
|
||||||
|
|
||||||
# create index on collection one and two
|
# create index on collection one and two
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60)
|
||||||
assert collection_w.has_index()[0]
|
assert collection_w.has_index()[0]
|
||||||
t0 = datetime.datetime.now() - start
|
t0 = datetime.datetime.now() - start
|
||||||
|
|
||||||
|
@ -187,7 +187,7 @@ class TestIndexNodeScale:
|
||||||
assert not collection_w.has_index()[0]
|
assert not collection_w.has_index()[0]
|
||||||
|
|
||||||
start = datetime.datetime.now()
|
start = datetime.datetime.now()
|
||||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60)
|
||||||
assert collection_w.has_index()[0]
|
assert collection_w.has_index()[0]
|
||||||
t2 = datetime.datetime.now() - start
|
t2 = datetime.datetime.now() - start
|
||||||
log.info(f'Create index on 1 indexNode cost t2: {t2}')
|
log.info(f'Create index on 1 indexNode cost t2: {t2}')
|
||||||
|
|
|
@ -7,6 +7,7 @@ import pytest
|
||||||
from base.collection_wrapper import ApiCollectionWrapper
|
from base.collection_wrapper import ApiCollectionWrapper
|
||||||
from base.utility_wrapper import ApiUtilityWrapper
|
from base.utility_wrapper import ApiUtilityWrapper
|
||||||
from common.common_type import CaseLabel, CheckTasks
|
from common.common_type import CaseLabel, CheckTasks
|
||||||
|
from common.milvus_sys import MilvusSys
|
||||||
from customize.milvus_operator import MilvusOperator
|
from customize.milvus_operator import MilvusOperator
|
||||||
from common import common_func as cf
|
from common import common_func as cf
|
||||||
from common import common_type as ct
|
from common import common_type as ct
|
||||||
|
@ -21,11 +22,49 @@ nb = 10000
|
||||||
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
|
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
|
||||||
|
|
||||||
|
|
||||||
|
def verify_load_balance(c_name, host, port=19530):
|
||||||
|
"""
|
||||||
|
verify load balance is available after scale
|
||||||
|
"""
|
||||||
|
connections.connect('default', host=host, port=port)
|
||||||
|
# verify load balance
|
||||||
|
utility_w = ApiUtilityWrapper()
|
||||||
|
collection_w = ApiCollectionWrapper()
|
||||||
|
collection_w.init_collection(c_name)
|
||||||
|
ms = MilvusSys()
|
||||||
|
res, _ = utility_w.get_query_segment_info(collection_w.name)
|
||||||
|
log.debug(res)
|
||||||
|
segment_distribution = cf.get_segment_distribution(res)
|
||||||
|
all_querynodes = [node["identifier"] for node in ms.query_nodes]
|
||||||
|
assert len(all_querynodes) > 1
|
||||||
|
all_querynodes = sorted(all_querynodes,
|
||||||
|
key=lambda x: len(segment_distribution[x]["sealed"])
|
||||||
|
if x in segment_distribution else 0, reverse=True)
|
||||||
|
log.debug(all_querynodes)
|
||||||
|
src_node_id = all_querynodes[0]
|
||||||
|
des_node_ids = all_querynodes[1:]
|
||||||
|
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
|
||||||
|
# load balance
|
||||||
|
utility_w.load_balance(collection_w.name, src_node_id, des_node_ids, sealed_segment_ids)
|
||||||
|
# get segments distribution after load balance
|
||||||
|
res, _ = utility_w.get_query_segment_info(collection_w.name)
|
||||||
|
log.debug(res)
|
||||||
|
segment_distribution = cf.get_segment_distribution(res)
|
||||||
|
sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id]["sealed"]
|
||||||
|
# assert src node has no sealed segments
|
||||||
|
assert sealed_segment_ids_after_load_banalce == []
|
||||||
|
des_sealed_segment_ids = []
|
||||||
|
for des_node_id in des_node_ids:
|
||||||
|
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
|
||||||
|
# assert sealed_segment_ids is subset of des_sealed_segment_ids
|
||||||
|
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
@pytest.mark.tags(CaseLabel.L3)
|
||||||
class TestQueryNodeScale:
|
class TestQueryNodeScale:
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
@pytest.mark.tags(CaseLabel.L3)
|
||||||
def test_scale_query_node(self):
|
def test_scale_query_node(self, host):
|
||||||
"""
|
"""
|
||||||
target: test scale queryNode
|
target: test scale queryNode
|
||||||
method: 1.deploy milvus cluster with 1 queryNode
|
method: 1.deploy milvus cluster with 1 queryNode
|
||||||
|
@ -65,16 +104,17 @@ class TestQueryNodeScale:
|
||||||
c_name = cf.gen_unique_str("scale_query")
|
c_name = cf.gen_unique_str("scale_query")
|
||||||
# c_name = 'scale_query_DymS7kI4'
|
# c_name = 'scale_query_DymS7kI4'
|
||||||
collection_w = ApiCollectionWrapper()
|
collection_w = ApiCollectionWrapper()
|
||||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=2)
|
utility_w = ApiUtilityWrapper()
|
||||||
|
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
||||||
|
|
||||||
# insert two segments
|
# insert two segments
|
||||||
for i in range(3):
|
for i in range(30):
|
||||||
df = cf.gen_default_dataframe_data(nb)
|
df = cf.gen_default_dataframe_data(nb)
|
||||||
collection_w.insert(df)
|
collection_w.insert(df)
|
||||||
log.debug(collection_w.num_entities)
|
log.debug(collection_w.num_entities)
|
||||||
|
|
||||||
# create index
|
# create index
|
||||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
collection_w.create_index(ct.default_float_vec_field_name, default_index_params, timeout=60)
|
||||||
assert collection_w.has_index()[0]
|
assert collection_w.has_index()[0]
|
||||||
assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name,
|
assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name,
|
||||||
default_index_params)
|
default_index_params)
|
||||||
|
@ -105,6 +145,9 @@ class TestQueryNodeScale:
|
||||||
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
mic.wait_for_healthy(release_name, constants.NAMESPACE)
|
||||||
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
|
||||||
|
|
||||||
|
# verify load balance
|
||||||
|
verify_load_balance(c_name, host=host)
|
||||||
|
|
||||||
@counter
|
@counter
|
||||||
def do_insert():
|
def do_insert():
|
||||||
""" do insert """
|
""" do insert """
|
||||||
|
@ -308,4 +351,4 @@ class TestQueryNodeScale:
|
||||||
label = f"app.kubernetes.io/instance={release_name}"
|
label = f"app.kubernetes.io/instance={release_name}"
|
||||||
log.info('Start to export milvus pod logs')
|
log.info('Start to export milvus pod logs')
|
||||||
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
|
read_pod_log(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
|
||||||
mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
mic.uninstall(release_name, namespace=constants.NAMESPACE)
|
Loading…
Reference in New Issue