mirror of https://github.com/milvus-io/milvus.git
Add indexNode and queryNode scale test (#6581)
* add shrink queryNode case Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * add scale indexNode case Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * fix assert has index Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * [skip ci] to skip ci Signed-off-by: ThreadDao <yufen.zong@zilliz.com>pull/6588/head^2
parent
b4e1f47732
commit
10ac2d01ce
|
@ -1,4 +1,5 @@
|
|||
import os
|
||||
from time import sleep
|
||||
|
||||
from scale import constants
|
||||
from utils.util_log import test_log as log
|
||||
|
@ -88,3 +89,4 @@ if __name__ == '__main__':
|
|||
# env.helm_install_cluster_milvus()
|
||||
# env.helm_upgrade_cluster_milvus(queryNode=2)
|
||||
env.helm_uninstall_cluster_milvus()
|
||||
sleep(5)
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
import datetime
|
||||
# import pdb
|
||||
|
||||
from pymilvus_orm import connections
|
||||
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
from scale.helm_env import HelmEnv
|
||||
from common import common_func as cf
|
||||
from common import common_type as ct
|
||||
from utils.util_log import test_log as log
|
||||
|
||||
nb = 50000
|
||||
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 128}}
|
||||
|
||||
|
||||
class TestIndexNodeScale:
|
||||
|
||||
def test_expand_index_node(self):
|
||||
"""
|
||||
target: test expand indexNode from 1 to 2
|
||||
method: 1.deploy two indexNode
|
||||
2.create index with two indexNode
|
||||
3.expand indexNode from 1 to 2
|
||||
4.create index with one indexNode
|
||||
expected: The cost of one indexNode is about twice that of two indexNodes
|
||||
"""
|
||||
release_name = "scale-index"
|
||||
env = HelmEnv(release_name=release_name)
|
||||
env.helm_install_cluster_milvus()
|
||||
|
||||
# connect
|
||||
connections.add_connection(default={"host": '10.98.0.8', "port": 19530})
|
||||
connections.connect(alias='default')
|
||||
|
||||
data = cf.gen_default_dataframe_data(nb)
|
||||
|
||||
# create
|
||||
c_name = "index_scale_one"
|
||||
collection_w = ApiCollectionWrapper()
|
||||
# collection_w.init_collection(name=c_name)
|
||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
||||
# insert
|
||||
loop = 10
|
||||
for i in range(loop):
|
||||
collection_w.insert(data)
|
||||
assert collection_w.num_entities == nb*loop
|
||||
|
||||
# create index on collection one and two
|
||||
start = datetime.datetime.now()
|
||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.has_index()[0]
|
||||
t0 = datetime.datetime.now() - start
|
||||
|
||||
log.debug(f't0: {t0}')
|
||||
|
||||
collection_w.drop_index()
|
||||
assert not collection_w.has_index()[0]
|
||||
|
||||
# expand indexNode from 1 to 2
|
||||
env.helm_upgrade_cluster_milvus(indexNode=2)
|
||||
|
||||
start = datetime.datetime.now()
|
||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.has_index()[0]
|
||||
t1 = datetime.datetime.now() - start
|
||||
|
||||
log.debug(f't1: {t1}')
|
||||
assert round(t0 / t1) == 2
|
||||
|
||||
def test_shrink_index_node(self):
|
||||
"""
|
||||
target: test shrink indexNode from 2 to 1
|
||||
method: 1.deploy two indexNode
|
||||
2.create index with two indexNode
|
||||
3.shrink indexNode from 2 to 1
|
||||
4.create index with 1 indexNode
|
||||
expected: The cost of one indexNode is about twice that of two indexNodes
|
||||
"""
|
||||
release_name = "scale-index"
|
||||
env = HelmEnv(release_name=release_name, indexNode=2)
|
||||
env.helm_install_cluster_milvus()
|
||||
|
||||
# connect
|
||||
connections.add_connection(default={"host": '10.98.0.8', "port": 19530})
|
||||
connections.connect(alias='default')
|
||||
|
||||
data = cf.gen_default_dataframe_data(nb)
|
||||
|
||||
# create
|
||||
c_name = "index_scale_one"
|
||||
collection_w = ApiCollectionWrapper()
|
||||
# collection_w.init_collection(name=c_name)
|
||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
||||
# insert
|
||||
loop = 10
|
||||
for i in range(loop):
|
||||
collection_w.insert(data)
|
||||
assert collection_w.num_entities == nb * loop
|
||||
|
||||
# create index on collection one and two
|
||||
start = datetime.datetime.now()
|
||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.has_index()[0]
|
||||
t0 = datetime.datetime.now() - start
|
||||
|
||||
log.debug(f'two indexNodes: {t0}')
|
||||
|
||||
collection_w.drop_index()
|
||||
assert not collection_w.has_index()[0]
|
||||
|
||||
# expand indexNode from 1 to 2
|
||||
# pdb.set_trace()
|
||||
env.helm_upgrade_cluster_milvus(indexNode=1)
|
||||
|
||||
start = datetime.datetime.now()
|
||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.has_index()[0]
|
||||
t1 = datetime.datetime.now() - start
|
||||
|
||||
log.debug(f'one indexNode: {t1}')
|
||||
log.debug(t1 / t0)
|
||||
assert round(t1 / t0) == 2
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
import pdb
|
||||
import random
|
||||
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
|
@ -5,18 +6,21 @@ from scale.helm_env import HelmEnv
|
|||
from utils.util_log import test_log as log
|
||||
from common import common_func as cf
|
||||
from common import common_type as ct
|
||||
from scale import constants
|
||||
from pymilvus_orm import Index, connections
|
||||
|
||||
prefix = "search_scale"
|
||||
nb = 5000
|
||||
nq = 5
|
||||
default_schema = cf.gen_default_collection_schema()
|
||||
default_search_exp = "int64 >= 0"
|
||||
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
|
||||
|
||||
|
||||
class TestSearchScale:
|
||||
def test_search_scale(self):
|
||||
release_name = "scale-test"
|
||||
class TestQueryNodeScale:
|
||||
|
||||
def test_expand_query_node(self):
|
||||
release_name = "scale-query"
|
||||
env = HelmEnv(release_name=release_name)
|
||||
env.helm_install_cluster_milvus()
|
||||
|
||||
|
@ -25,7 +29,7 @@ class TestSearchScale:
|
|||
connections.connect(alias='default')
|
||||
|
||||
# create
|
||||
c_name = "data_scale_one"
|
||||
c_name = "query_scale_one"
|
||||
collection_w = ApiCollectionWrapper()
|
||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
||||
# insert
|
||||
|
@ -34,7 +38,7 @@ class TestSearchScale:
|
|||
assert mutation_res.insert_count == ct.default_nb
|
||||
# # create index
|
||||
# collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
# assert collection_w.has_index()
|
||||
# assert collection_w.has_index()[0]
|
||||
# assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name,
|
||||
# default_index_params)
|
||||
collection_w.load()
|
||||
|
@ -45,7 +49,7 @@ class TestSearchScale:
|
|||
# scale queryNode pod
|
||||
env.helm_upgrade_cluster_milvus(queryNode=2)
|
||||
|
||||
c_name_2 = "data_scale_two"
|
||||
c_name_2 = "query_scale_two"
|
||||
collection_w2 = ApiCollectionWrapper()
|
||||
collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema())
|
||||
collection_w2.insert(data)
|
||||
|
@ -55,3 +59,58 @@ class TestSearchScale:
|
|||
ct.default_search_params, ct.default_limit)
|
||||
|
||||
assert res1[0].ids == res2[0].ids
|
||||
|
||||
def test_shrink_query_node(self):
|
||||
"""
|
||||
target: test shrink queryNode from 2 to 1
|
||||
method: 1.deploy two queryNode
|
||||
2.search two collections in two queryNode
|
||||
3.upgrade queryNode from 2 to 1
|
||||
4.search second collection
|
||||
expected: search result is correct
|
||||
"""
|
||||
# deploy
|
||||
release_name = "scale-query"
|
||||
env = HelmEnv(release_name=release_name, queryNode=2)
|
||||
env.helm_install_cluster_milvus(image_pull_policy=constants.IF_NOT_PRESENT)
|
||||
|
||||
# connect
|
||||
connections.add_connection(default={"host": '10.98.0.8', "port": 19530})
|
||||
connections.connect(alias='default')
|
||||
|
||||
# collection one
|
||||
data = cf.gen_default_list_data(nb)
|
||||
c_name = "query_scale_one"
|
||||
collection_w = ApiCollectionWrapper()
|
||||
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema())
|
||||
collection_w.insert(data)
|
||||
assert collection_w.num_entities == nb
|
||||
collection_w.load()
|
||||
res1, _ = collection_w.search(data[-1][:nq], ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
assert res1[0].ids[0] == data[0][0]
|
||||
|
||||
# collection two
|
||||
c_name_2 = "query_scale_two"
|
||||
collection_w2 = ApiCollectionWrapper()
|
||||
collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema())
|
||||
collection_w2.insert(data)
|
||||
assert collection_w2.num_entities == nb
|
||||
collection_w2.load()
|
||||
res2, _ = collection_w2.search(data[-1][:nq], ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
assert res2[0].ids[0] == data[0][0]
|
||||
|
||||
# scale queryNode pod
|
||||
pdb.set_trace()
|
||||
env.helm_upgrade_cluster_milvus(queryNode=1)
|
||||
|
||||
# search
|
||||
res1, _ = collection_w.search(data[-1][:nq], ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
assert res1[0].ids[0] == data[0][0]
|
||||
res2, _ = collection_w2.search(data[-1][:nq], ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
assert res2[0].ids[0] == data[0][0]
|
||||
|
||||
# env.helm_uninstall_cluster_milvus()
|
|
@ -445,7 +445,7 @@ class TestInsertOperation(TestcaseBase):
|
|||
collection_w.insert(data=df)
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.has_index()
|
||||
assert collection_w.has_index()[0]
|
||||
index, _ = collection_w.index()
|
||||
assert index == Index(collection_w.collection, ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.indexes[0] == index
|
||||
|
@ -459,7 +459,7 @@ class TestInsertOperation(TestcaseBase):
|
|||
"""
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.has_index()
|
||||
assert collection_w.has_index()[0]
|
||||
index, _ = collection_w.index()
|
||||
assert index == Index(collection_w.collection, ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.indexes[0] == index
|
||||
|
@ -477,7 +477,7 @@ class TestInsertOperation(TestcaseBase):
|
|||
schema = cf.gen_default_binary_collection_schema()
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w.create_index(ct.default_binary_vec_field_name, default_binary_index_params)
|
||||
assert collection_w.has_index()
|
||||
assert collection_w.has_index()[0]
|
||||
index, _ = collection_w.index()
|
||||
assert index == Index(collection_w.collection, ct.default_binary_vec_field_name, default_binary_index_params)
|
||||
assert collection_w.indexes[0] == index
|
||||
|
@ -501,7 +501,7 @@ class TestInsertOperation(TestcaseBase):
|
|||
assert collection_w.num_entities == ct.default_nb
|
||||
# create index
|
||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.has_index()
|
||||
assert collection_w.has_index()[0]
|
||||
index, _ = collection_w.index()
|
||||
assert index == Index(collection_w.collection, ct.default_float_vec_field_name, default_index_params)
|
||||
assert collection_w.indexes[0] == index
|
||||
|
|
Loading…
Reference in New Issue