diff --git a/tests/python_client/customize/milvus_operator.py b/tests/python_client/customize/milvus_operator.py index bfe35336e8..df3dd29b4f 100644 --- a/tests/python_client/customize/milvus_operator.py +++ b/tests/python_client/customize/milvus_operator.py @@ -1,90 +1,111 @@ +import time from benedict import benedict from utils.util_log import test_log as log from common.cus_resource_opts import CustomResourceOperations as CusResource -import time + template_yaml = 'template/default.yaml' MILVUS_GRP = 'milvus.io' MILVUS_VER = 'v1alpha1' MILVUS_PLURAL = 'milvusclusters' +MILVUS_KIND = 'MilvusCluster' -def update_configs(configs, template=template_yaml): - if not isinstance(configs, dict): - log.error("customize configurations must be in dict type") - return None - - d_configs = benedict.from_yaml(template) - - for key in configs.keys(): - d_configs[key] = configs[key] - - # return a python dict for common use - log.info(f"customized configs: {d_configs._dict}") - return d_configs._dict +class MilvusOperator(object): + def __init__(self): + self.group = MILVUS_GRP + self.version = MILVUS_VER + self.plural = MILVUS_PLURAL.lower() -def install_milvus(configs, template): + @staticmethod + def _update_configs(configs, template=None): + if not isinstance(configs, dict): + log.error("customize configurations must be in dict type") + return None - new_configs = update_configs(configs, template) - namespace = new_configs['metadata'].get('namespace', 'default') - # apply custom resource object to deploy milvus - cus_res = CusResource(kind=MILVUS_PLURAL, group=MILVUS_GRP, - version=MILVUS_VER, namespace=namespace) - return cus_res.create(new_configs) + if template is None: + d_configs = benedict() + d_configs['apiVersion'] = f'{MILVUS_GRP}/{MILVUS_VER}' + d_configs['kind'] = MILVUS_KIND + else: + d_configs = benedict.from_yaml(template) + for key in configs.keys(): + d_configs[key] = configs[key] -def uninstall_milvus(release_name, namespace='default'): + # return a python dict if it is not none + return d_configs._dict if d_configs._dict is not None else d_configs - # delete custom resource object to uninstall milvus - cus_res = CusResource(kind=MILVUS_PLURAL, group=MILVUS_GRP, - version=MILVUS_VER, namespace=namespace) - cus_res.delete(release_name) + def install(self, configs, template=None): + new_configs = self._update_configs(configs, template) + print(new_configs) + namespace = new_configs['metadata'].get('namespace', 'default') + # apply custom resource object to deploy milvus + cus_res = CusResource(kind=self.plural, group=self.group, + version=self.version, namespace=namespace) + log.info(f"install milvus with configs: {new_configs}") + return cus_res.create(new_configs) + def uninstall(self, release_name, namespace='default', delete_depends=True, delete_pvc=True): + # delete custom resource object to uninstall milvus + cus_res = CusResource(kind=self.plural, group=self.group, + version=self.version, namespace=namespace) + del_configs = {} + if delete_depends: + del_configs = {'spec.dependencies.etcd.inCluster.deletionPolicy': 'Delete', + 'spec.dependencies.pulsar.inCluster.deletionPolicy': 'Delete', + 'spec.dependencies.storage.inCluster.deletionPolicy': 'Delete' + } + if delete_pvc: + del_configs.update({'spec.dependencies.etcd.inCluster.pvcDeletion': True, + 'spec.dependencies.pulsar.inCluster.pvcDeletion': True, + 'spec.dependencies.storage.inCluster.pvcDeletion': True + }) + if delete_depends or delete_pvc: + self.upgrade(release_name, del_configs, namespace=namespace) + cus_res.delete(release_name) -def upgrade_milvus(release_name, configs, namespace='default'): - if not isinstance(configs, dict): - log.error("customize configurations must be in dict type") - return None + def upgrade(self, release_name, configs, namespace='default'): + if not isinstance(configs, dict): + log.error("customize configurations must be in dict type") + return None - d_configs = benedict() + d_configs = benedict() - for key in configs.keys(): - d_configs[key] = configs[key] + for key in configs.keys(): + d_configs[key] = configs[key] - cus_res = CusResource(kind=MILVUS_PLURAL, group=MILVUS_GRP, - version=MILVUS_VER, namespace=namespace) - log.debug(f"upgrade milvus with configs: {d_configs}") - cus_res.patch(release_name, d_configs) + cus_res = CusResource(kind=self.plural, group=self.group, + version=self.version, namespace=namespace) + log.debug(f"upgrade milvus with configs: {d_configs}") + cus_res.patch(release_name, d_configs) + def wait_for_healthy(self, release_name, namespace='default', timeout=600): -def wait_for_milvus_healthy(release_name, namespace='default', timeout=600): + cus_res = CusResource(kind=self.plural, group=self.group, + version=self.version, namespace=namespace) + starttime = time.time() + log.info(f"start to check healthy: {starttime}") + while time.time() < starttime + timeout: + time.sleep(10) + res_object = cus_res.get(release_name) + if res_object.get('status', None) is not None: + if 'Healthy' == res_object['status']['status']: + log.info(f"milvus healthy in {time.time()-starttime} seconds") + return True + log.info(f"end to check healthy until timeout {timeout}") + return False - cus_res = CusResource(kind=MILVUS_PLURAL, group=MILVUS_GRP, - version=MILVUS_VER, namespace=namespace) - starttime = time.time() - log.info(f"start to check healthy: {starttime}") - while time.time() < starttime + timeout: - time.sleep(10) + def endpoint(self, release_name, namespace='default'): + endpoint = None + cus_res = CusResource(kind=self.plural, group=self.group, + version=self.version, namespace=namespace) res_object = cus_res.get(release_name) if res_object.get('status', None) is not None: - if 'Healthy' == res_object['status']['status']: - log.info(f"milvus healthy in {time.time()-starttime} seconds") - return True - log.info(f"end to check healthy until timeout {timeout}") - return False + endpoint = res_object['status']['endpoint'] - -def get_milvus_endpoint(release_name, namespace='default'): - - endpoint = None - cus_res = CusResource(kind=MILVUS_PLURAL, group=MILVUS_GRP, - version=MILVUS_VER, namespace=namespace) - res_object = cus_res.get(release_name) - if res_object.get('status', None) is not None: - endpoint = res_object['status']['endpoint'] - - return endpoint + return endpoint if __name__ == '__main__': @@ -94,25 +115,27 @@ if __name__ == '__main__': cus_configs = {'spec.components.image': 'milvusdb/milvus-dev:master-20211020-b40513b', 'metadata.namespace': namespace, 'metadata.name': name, + 'apiVersion': 'milvus.io/v1alpha1', + 'kind': 'MilvusCluster', 'spec.components.queryNode.replicas': 2, 'spec.components.queryNode.resources.limits.memory': '2048Mi' } - milvus_instance = install_milvus(cus_configs, template_yaml) - result = wait_for_milvus_healthy(name, namespace=namespace) - endpoint = get_milvus_endpoint(name, namespace=namespace) + milvusOp = MilvusOperator() + milvus_instance = milvusOp.install(cus_configs, template=None) + result = milvusOp.wait_for_healthy(name, namespace=namespace) + endpoint = milvusOp.endpoint(name, namespace=namespace) print(endpoint) log.info(f"install milvus healthy: {result}") - n_configs = {'spec.components.queryNode.replicas': 1, - 'spec.components.proxy.serviceType': 'LoadBalancer', - 'spec.components.dataNode.resources.limits.memory': '2048Mi' - } - upgrade_milvus(name, n_configs, namespace=namespace) - result = wait_for_milvus_healthy(name, namespace=namespace) - log.info(f"upgrade milvus healthy: {result}") - endpoint = get_milvus_endpoint(name, namespace=namespace) - print(endpoint) + # n_configs = {'spec.components.queryNode.replicas': 1, + # 'spec.components.proxy.serviceType': 'LoadBalancer' + # } + # milvusOp.upgrade(name, n_configs, namespace=namespace) + # result = milvusOp.wait_for_healthy(name, namespace=namespace) + # log.info(f"upgrade milvus healthy: {result}") + # endpoint = milvusOp.endpoint(name, namespace=namespace) + # print(endpoint) - # uninstall_milvus(name, namespace=namespace) + milvusOp.uninstall(name, namespace=namespace) diff --git a/tests/python_client/customize/test_simd_compat.py b/tests/python_client/customize/test_simd_compat.py index ff854a08fa..bab2aca2a7 100644 --- a/tests/python_client/customize/test_simd_compat.py +++ b/tests/python_client/customize/test_simd_compat.py @@ -1,171 +1,176 @@ import pytest +import time from pymilvus import connections from utils.util_log import test_log as log from base.collection_wrapper import ApiCollectionWrapper from common import common_func as cf from common import common_type as ct -# from milvus_operator import * -# from common.milvus_sys import MilvusSys -# from common.common_type import CaseLabel -# from pytest_dependency import depends +from milvus_operator import MilvusOperator +from common.milvus_sys import MilvusSys +from common.common_type import CaseLabel supported_simd_types = ["sse4_2", "avx", "avx2", "avx512"] # TODO: implement simd config after supported -# @pytest.mark.tags(CaseLabel.L3) -# class TestSimdCompatibility: -# """ -# steps -# 1. [test_milvus_install]: set up milvus with customized simd configured -# 2. [test_simd_compat_e2e]: verify milvus is working well -# 4. [test_milvus_cleanup]: clear the env "avx", "avx2", "avx512" -# """ -# -# @pytest.mark.parametrize('simd', [ -# pytest.param("sse4_2", marks=pytest.mark.dependency(name='ins_sse4_2')), -# # pytest.param("avx", marks=pytest.mark.dependency(name='ins_avx')), -# # pytest.param("avx2", marks=pytest.mark.dependency(name='ins_avx2')), -# pytest.param("avx512", marks=pytest.mark.dependency(name='ins_avx512')) -# ]) -# def test_milvus_install(self, request, simd): -# release_name = "mil-simd-" + cf.gen_digits_by_length(6) -# namespace = 'chaos-testing' -# cus_configs = {'spec.components.image': 'milvusdb/milvus-dev:master-latest', -# 'metadata.namespace': namespace, -# 'metadata.name': release_name, -# 'spec.components.proxy.serviceType': 'LoadBalancer', -# # TODO: use simd config instead of replicas -# 'spec.components.queryNode.replicas': 2 -# } -# install_milvus(cus_configs, template=template_yaml) -# healthy = wait_for_milvus_healthy(release_name, namespace) -# log.info(f"milvus healthy: {healthy}") -# assert healthy -# endpoint = get_milvus_endpoint(release_name, namespace) -# log.info(f"milvus endpoint: {endpoint}") -# host = endpoint.split(':')[0] -# port = endpoint.split(':')[1] -# conn = connections.connect(simd, host=host, port=port) -# assert conn is not None -# mil = MilvusSys(alias=simd) -# log.info(f"milvus build version: {mil.build_version}") -# # TODO: Verify simd config instead of replicas -# assert len(mil.query_nodes) == 2 -# -# # cache results for dependent tests -# cache = {'release_name': release_name, -# 'namespace': namespace, -# 'alias': simd, -# 'simd': simd -# } -# request.config.cache.set(simd, cache) -# -# @pytest.mark.parametrize('simd', [ -# pytest.param("sse4_2", marks=pytest.mark.dependency(name='e2e_sse4_2', depends=["ins_sse4_2"])), -# # pytest.param("avx", marks=pytest.mark.dependency(name='e2e_avx', depends=["ins_avx"])), -# # pytest.param("avx2", marks=pytest.mark.dependency(name='e2e_avx2', depends=["ins_avx2"])), -# pytest.param("avx512", marks=pytest.mark.dependency(name='e2e_avx512', depends=["ins_avx512"])) -# ]) -# def test_simd_compat_e2e(self, request, simd): -# log.info(f"start to e2e verification: {simd}") -# # parse results from previous results -# results = request.config.cache.get(simd, None) -# alias = results.get('alias', simd) -# conn = connections.connect(alias=alias) -# assert conn is not None -# simd_cache = request.config.cache.get(simd, None) -# log.info(f"simd_cache: {simd_cache}") -# # create -# name = cf.gen_unique_str("compat") -# t0 = time.time() -# collection_w = ApiCollectionWrapper() -# collection_w.init_collection(name=name, -# schema=cf.gen_default_collection_schema(), -# using=alias, -# timeout=40) -# tt = time.time() - t0 -# assert collection_w.name == name -# entities = collection_w.num_entities -# log.info(f"assert create collection: {tt}, init_entities: {entities}") -# -# # insert -# data = cf.gen_default_list_data() -# t0 = time.time() -# _, res = collection_w.insert(data) -# tt = time.time() - t0 -# log.info(f"assert insert: {tt}") -# assert res -# -# # flush -# t0 = time.time() -# assert collection_w.num_entities == len(data[0]) + entities -# tt = time.time() - t0 -# entities = collection_w.num_entities -# log.info(f"assert flush: {tt}, entities: {entities}") -# -# # search -# collection_w.load() -# search_vectors = cf.gen_vectors(1, ct.default_dim) -# search_params = {"metric_type": "L2", "params": {"nprobe": 16}} -# t0 = time.time() -# res_1, _ = collection_w.search(data=search_vectors, -# anns_field=ct.default_float_vec_field_name, -# param=search_params, limit=1) -# tt = time.time() - t0 -# log.info(f"assert search: {tt}") -# assert len(res_1) == 1 -# collection_w.release() -# -# # index -# d = cf.gen_default_list_data() -# collection_w.insert(d) -# log.info(f"assert index entities: {collection_w.num_entities}") -# _index_params = {"index_type": "IVF_SQ8", "params": {"nlist": 64}, "metric_type": "L2"} -# t0 = time.time() -# index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name, -# index_params=_index_params, -# name=cf.gen_unique_str()) -# tt = time.time() - t0 -# log.info(f"assert index: {tt}") -# assert len(collection_w.indexes) == 1 -# -# # search -# t0 = time.time() -# collection_w.load() -# tt = time.time() - t0 -# log.info(f"assert load: {tt}") -# search_vectors = cf.gen_vectors(1, ct.default_dim) -# t0 = time.time() -# res_1, _ = collection_w.search(data=search_vectors, -# anns_field=ct.default_float_vec_field_name, -# param=search_params, limit=1) -# tt = time.time() - t0 -# log.info(f"assert search: {tt}") -# -# # query -# term_expr = f'{ct.default_int64_field_name} in [1001,1201,4999,2999]' -# t0 = time.time() -# res, _ = collection_w.query(term_expr) -# tt = time.time() - t0 -# log.info(f"assert query result {len(res)}: {tt}") -# -# @pytest.mark.parametrize('simd', [ -# pytest.param("sse4_2", marks=pytest.mark.dependency(name='clear_sse4_2', depends=["ins_sse4_2", "e2e_sse4_2"])), -# # pytest.param("avx", marks=pytest.mark.dependency(name='clear_avx', depends=["ins_avx", "e2e_avx"])), -# # pytest.param("avx2", marks=pytest.mark.dependency(name='clear_avx2', depends=["ins_avx2", "e2e_avx2"])), -# pytest.param("avx512", marks=pytest.mark.dependency(name='clear_avx512', depends=["ins_avx512", "e2e_avx512"])) -# ]) -# def test_milvus_cleanup(self, request, simd): -# # get release name from previous results -# results = request.config.cache.get(simd, None) -# release_name = results.get('release_name', "name-not-found") -# namespace = results.get('namespace', "namespace-not-found") -# simd_cache = request.config.cache.get(simd, None) -# log.info(f"stat to cleanup: {simd}") -# log.info(f"simd_cache: {simd_cache}") -# log.info(f"release_name: {release_name}") -# log.info(f"namespace: {namespace}") -# -# uninstall_milvus(release_name, namespace) +@pytest.mark.skip(reason='simd config is not supported yet') +class TestSimdCompatibility: + """ + steps + 1. [test_milvus_install]: set up milvus with customized simd configured + 2. [test_simd_compat_e2e]: verify milvus is working well + 4. [test_milvus_cleanup]: clear the env "avx", "avx2", "avx512" + """ + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize('simd', [ + pytest.param("sse4_2", marks=pytest.mark.dependency(name='ins_sse4_2')), + # pytest.param("avx", marks=pytest.mark.dependency(name='ins_avx')), + # pytest.param("avx2", marks=pytest.mark.dependency(name='ins_avx2')), + pytest.param("avx512", marks=pytest.mark.dependency(name='ins_avx512')) + ]) + def test_milvus_install(self, request, simd): + release_name = "mil-simd-" + cf.gen_digits_by_length(6) + namespace = 'chaos-testing' + cus_configs = {'spec.components.image': 'milvusdb/milvus-dev:master-latest', + 'metadata.namespace': namespace, + 'metadata.name': release_name, + 'spec.components.proxy.serviceType': 'LoadBalancer', + # TODO: use simd config instead of replicas + 'spec.components.queryNode.replicas': 2 + } + milvus_op = MilvusOperator() + milvus_op.install(cus_configs) + healthy = milvus_op.wait_for_healthy(release_name, namespace) + log.info(f"milvus healthy: {healthy}") + assert healthy + endpoint = milvus_op.endpoint(release_name, namespace) + log.info(f"milvus endpoint: {endpoint}") + host = endpoint.split(':')[0] + port = endpoint.split(':')[1] + conn = connections.connect(simd, host=host, port=port) + assert conn is not None + mil = MilvusSys(alias=simd) + log.info(f"milvus build version: {mil.build_version}") + # TODO: Verify simd config instead of replicas + assert len(mil.query_nodes) == 2 + + # cache results for dependent tests + cache = {'release_name': release_name, + 'namespace': namespace, + 'alias': simd, + 'simd': simd + } + request.config.cache.set(simd, cache) + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize('simd', [ + pytest.param("sse4_2", marks=pytest.mark.dependency(name='e2e_sse4_2', depends=["ins_sse4_2"])), + # pytest.param("avx", marks=pytest.mark.dependency(name='e2e_avx', depends=["ins_avx"])), + # pytest.param("avx2", marks=pytest.mark.dependency(name='e2e_avx2', depends=["ins_avx2"])), + pytest.param("avx512", marks=pytest.mark.dependency(name='e2e_avx512', depends=["ins_avx512"])) + ]) + def test_simd_compat_e2e(self, request, simd): + log.info(f"start to e2e verification: {simd}") + # parse results from previous results + results = request.config.cache.get(simd, None) + alias = results.get('alias', simd) + conn = connections.connect(alias=alias) + assert conn is not None + simd_cache = request.config.cache.get(simd, None) + log.info(f"simd_cache: {simd_cache}") + # create + name = cf.gen_unique_str("compat") + t0 = time.time() + collection_w = ApiCollectionWrapper() + collection_w.init_collection(name=name, + schema=cf.gen_default_collection_schema(), + using=alias, + timeout=40) + tt = time.time() - t0 + assert collection_w.name == name + entities = collection_w.num_entities + log.info(f"assert create collection: {tt}, init_entities: {entities}") + + # insert + data = cf.gen_default_list_data() + t0 = time.time() + _, res = collection_w.insert(data) + tt = time.time() - t0 + log.info(f"assert insert: {tt}") + assert res + + # flush + t0 = time.time() + assert collection_w.num_entities == len(data[0]) + entities + tt = time.time() - t0 + entities = collection_w.num_entities + log.info(f"assert flush: {tt}, entities: {entities}") + + # search + collection_w.load() + search_vectors = cf.gen_vectors(1, ct.default_dim) + search_params = {"metric_type": "L2", "params": {"nprobe": 16}} + t0 = time.time() + res_1, _ = collection_w.search(data=search_vectors, + anns_field=ct.default_float_vec_field_name, + param=search_params, limit=1) + tt = time.time() - t0 + log.info(f"assert search: {tt}") + assert len(res_1) == 1 + collection_w.release() + + # index + d = cf.gen_default_list_data() + collection_w.insert(d) + log.info(f"assert index entities: {collection_w.num_entities}") + _index_params = {"index_type": "IVF_SQ8", "params": {"nlist": 64}, "metric_type": "L2"} + t0 = time.time() + index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name, + index_params=_index_params, + name=cf.gen_unique_str()) + tt = time.time() - t0 + log.info(f"assert index: {tt}") + assert len(collection_w.indexes) == 1 + + # search + t0 = time.time() + collection_w.load() + tt = time.time() - t0 + log.info(f"assert load: {tt}") + search_vectors = cf.gen_vectors(1, ct.default_dim) + t0 = time.time() + res_1, _ = collection_w.search(data=search_vectors, + anns_field=ct.default_float_vec_field_name, + param=search_params, limit=1) + tt = time.time() - t0 + log.info(f"assert search: {tt}") + + # query + term_expr = f'{ct.default_int64_field_name} in [1001,1201,4999,2999]' + t0 = time.time() + res, _ = collection_w.query(term_expr) + tt = time.time() - t0 + log.info(f"assert query result {len(res)}: {tt}") + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize('simd', [ + pytest.param("sse4_2", marks=pytest.mark.dependency(name='clear_sse4_2', depends=["ins_sse4_2", "e2e_sse4_2"])), + # pytest.param("avx", marks=pytest.mark.dependency(name='clear_avx', depends=["ins_avx", "e2e_avx"])), + # pytest.param("avx2", marks=pytest.mark.dependency(name='clear_avx2', depends=["ins_avx2", "e2e_avx2"])), + pytest.param("avx512", marks=pytest.mark.dependency(name='clear_avx512', depends=["ins_avx512", "e2e_avx512"])) + ]) + def test_milvus_cleanup(self, request, simd): + # get release name from previous results + results = request.config.cache.get(simd, None) + release_name = results.get('release_name', "name-not-found") + namespace = results.get('namespace', "namespace-not-found") + simd_cache = request.config.cache.get(simd, None) + log.info(f"stat to cleanup: {simd}") + log.info(f"simd_cache: {simd_cache}") + log.info(f"release_name: {release_name}") + log.info(f"namespace: {namespace}") + + milvus_op = MilvusOperator() + milvus_op.uninstall(release_name, namespace)