# -*- coding: utf-8 -*- import os import sys import pdb import time import json import datetime import argparse import threading import logging import string import random # import multiprocessing import numpy as np # import psutil import sklearn.preprocessing import h5py # import docker from yaml import full_load, dump import yaml import tableprint as tp from pprint import pprint logger = logging.getLogger("milvus_benchmark.utils") REGISTRY_URL = "registry.zilliz.com/milvus/engine" class literal_str(str): pass def change_style(style, representer): def new_representer(dumper, data): scalar = representer(dumper, data) scalar.style = style return scalar return new_representer from yaml.representer import SafeRepresenter # represent_str does handle some corner cases, so use that # instead of calling represent_scalar directly represent_literal_str = change_style('|', SafeRepresenter.represent_str) yaml.add_representer(literal_str, represent_literal_str) def normalize(metric_type, X): """ Normalize the vectors. If type equals ip, using sklearn.preprocessing.normalize to convert it """ if metric_type == "ip": logger.info("Set normalize for metric_type: %s" % metric_type) X = sklearn.preprocessing.normalize(X, axis=1, norm='l2') X = X.tolist() elif metric_type in ["jaccard", "hamming", "sub", "super"]: tmp = [] for _, item in enumerate(X): new_vector = bytes(np.packbits(item, axis=-1).tolist()) tmp.append(new_vector) X = tmp return X def get_unique_name(prefix=None): if prefix is None: prefix = "milvus-benchmark-test-" return prefix+"".join(random.choice(string.ascii_letters + string.digits) for _ in range(8)).lower() def get_current_time(): return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) def print_table(headers, columns, data): bodys = [] for index, value in enumerate(columns): tmp = [value] tmp.extend(data[index]) bodys.append(tmp) tp.table(bodys, headers) def get_dataset(hdf5_file_path): if not os.path.exists(hdf5_file_path): raise Exception("%s not existed" % hdf5_file_path) dataset = h5py.File(hdf5_file_path) return dataset def modify_config(k, v, type=None, file_path="conf/server_config.yaml", db_slave=None): if not os.path.isfile(file_path): raise Exception('File: %s not found' % file_path) with open(file_path) as f: config_dict = full_load(f) f.close() if config_dict: if k.find("use_blas_threshold") != -1: config_dict['engine_config']['use_blas_threshold'] = int(v) elif k.find("use_gpu_threshold") != -1: config_dict['engine_config']['gpu_search_threshold'] = int(v) elif k.find("cpu_cache_capacity") != -1: config_dict['cache_config']['cpu_cache_capacity'] = int(v) elif k.find("enable_gpu") != -1: config_dict['gpu_resource_config']['enable'] = v elif k.find("gpu_cache_capacity") != -1: config_dict['gpu_resource_config']['cache_capacity'] = int(v) elif k.find("index_build_device") != -1: config_dict['gpu_resource_config']['build_index_resources'] = v elif k.find("search_resources") != -1: config_dict['resource_config']['resources'] = v # if db_slave: # config_dict['db_config']['db_slave_path'] = MULTI_DB_SLAVE_PATH with open(file_path, 'w') as f: dump(config_dict, f, default_flow_style=False) f.close() else: raise Exception('Load file:%s error' % file_path) # update server_config.yaml def update_server_config(file_path, server_config): if not os.path.isfile(file_path): raise Exception('File: %s not found' % file_path) with open(file_path) as f: values_dict = full_load(f) f.close() for k, v in server_config.items(): if k.find("primary_path") != -1: values_dict["db_config"]["primary_path"] = v elif k.find("use_blas_threshold") != -1: values_dict['engine_config']['use_blas_threshold'] = int(v) elif k.find("gpu_search_threshold") != -1: values_dict['engine_config']['gpu_search_threshold'] = int(v) elif k.find("cpu_cache_capacity") != -1: values_dict['cache_config']['cpu_cache_capacity'] = int(v) elif k.find("cache_insert_data") != -1: values_dict['cache_config']['cache_insert_data'] = v elif k.find("enable") != -1: values_dict['gpu_resource_config']['enable'] = v elif k.find("gpu_cache_capacity") != -1: values_dict['gpu_resource_config']['cache_capacity'] = int(v) elif k.find("build_index_resources") != -1: values_dict['gpu_resource_config']['build_index_resources'] = v elif k.find("search_resources") != -1: values_dict['gpu_resource_config']['search_resources'] = v with open(file_path, 'w') as f: dump(values_dict, f, default_flow_style=False) f.close() # update values.yaml def update_values(file_path, deploy_mode, hostname, server_config): from kubernetes import client, config client.rest.logger.setLevel(logging.WARNING) if not os.path.isfile(file_path): raise Exception('File: %s not found' % file_path) # bak values.yaml file_name = os.path.basename(file_path) bak_file_name = file_name+".bak" file_parent_path = os.path.dirname(file_path) bak_file_path = file_parent_path+'/'+bak_file_name if os.path.exists(bak_file_path): os.system("cp %s %s" % (bak_file_path, file_path)) else: os.system("cp %s %s" % (file_path, bak_file_path)) with open(file_path) as f: values_dict = full_load(f) f.close() for k, v in server_config.items(): if k.find("primary_path") != -1: suffix_path = server_config["suffix_path"] if "suffix_path" in server_config else None path_value = v if suffix_path: path_value = v + "_" + str(int(time.time())) values_dict["primaryPath"] = path_value values_dict['wal']['path'] = path_value+"/wal" values_dict['logs']['path'] = path_value+"/logs" # elif k.find("use_blas_threshold") != -1: # values_dict['useBLASThreshold'] = int(v) elif k.find("gpu_search_threshold") != -1: values_dict['gpu']['gpuSearchThreshold'] = int(v) elif k.find("cpu_cache_capacity") != -1: values_dict['cache']['cacheSize'] = v # elif k.find("cache_insert_data") != -1: # values_dict['cache']['cacheInsertData'] = v elif k.find("insert_buffer_size") != -1: values_dict['cache']['insertBufferSize'] = v elif k.find("gpu_resource_config.enable") != -1: values_dict['gpu']['enabled'] = v elif k.find("gpu_resource_config.cache_capacity") != -1: values_dict['gpu']['cacheSize'] = v elif k.find("build_index_resources") != -1: values_dict['gpu']['buildIndexDevices'] = v elif k.find("search_resources") != -1: values_dict['gpu']['searchDevices'] = v # wal elif k.find("auto_flush_interval") != -1: values_dict['storage']['autoFlushInterval'] = v elif k.find("wal_enable") != -1: values_dict['wal']['enabled'] = v # if values_dict['nodeSelector']: # logger.warning("nodeSelector has been set: %s" % str(values_dict['engine']['nodeSelector'])) # return values_dict["wal"]["recoveryErrorIgnore"] = True # enable monitor values_dict["metrics"]["enabled"] = True values_dict["metrics"]["address"] = "192.168.1.237" values_dict["metrics"]["port"] = 9091 # Using sqlite for single mode if deploy_mode == "single": values_dict["mysql"]["enabled"] = False # update values.yaml with the given host if hostname: config.load_kube_config() v1 = client.CoreV1Api() values_dict['nodeSelector'] = {'kubernetes.io/hostname': hostname} # node = v1.read_node(hostname) cpus = v1.read_node(hostname).status.allocatable.get("cpu") # set limit/request cpus in resources values_dict["image"]['resources'] = { "limits": { "cpu": str(int(cpus))+".0" }, "requests": { "cpu": str(int(cpus)-1)+".0" } } values_dict['tolerations'] = [{ "key": "worker", "operator": "Equal", "value": "performance", "effect": "NoSchedule" }] # add extra volumes values_dict['extraVolumes'] = [{ 'name': 'test', 'flexVolume': { 'driver': "fstab/cifs", 'fsType': "cifs", 'secretRef': { 'name': "cifs-test-secret" }, 'options': { 'networkPath': "//192.168.1.126/test", 'mountOptions': "vers=1.0" } } }] values_dict['extraVolumeMounts'] = [{ 'name': 'test', 'mountPath': '/test' }] # add extra volumes for mysql # values_dict['mysql']['persistence']['enabled'] = True # values_dict['mysql']['configurationFilesPath'] = "/etc/mysql/mysql.conf.d/" # values_dict['mysql']['imageTag'] = '5.6' # values_dict['mysql']['securityContext'] = { # 'enabled': True} # mysql_db_path = "/test" if deploy_mode == "shards": # mount_path = values_dict["primaryPath"]+'/data' # long_str = '- name: test-mysql\n flexVolume:\n driver: fstab/cifs\n fsType: cifs\n secretRef:\n name: cifs-test-secret\n options:\n networkPath: //192.168.1.126/test\n mountOptions: vers=1.0' # values_dict['mysql']['extraVolumes'] = literal_str(long_str) # long_str_2 = "- name: test-mysql\n mountPath: %s" % mysql_db_path # values_dict['mysql']['extraVolumeMounts'] = literal_str(long_str_2) # mysql_cnf_str = '[mysqld]\npid-file=%s/mysql.pid\ndatadir=%s' % (mount_path, mount_path) # values_dict['mysql']['configurationFiles'] = {} # values_dict['mysql']['configurationFiles']['mysqld.cnf'] = literal_str(mysql_cnf_str) values_dict['mysql']['enabled'] = False values_dict['externalMysql']['enabled'] = True values_dict['externalMysql']["ip"] = "192.168.1.197" values_dict['externalMysql']["port"] = 3306 values_dict['externalMysql']["user"] = "root" values_dict['externalMysql']["password"] = "Fantast1c" values_dict['externalMysql']["database"] = "db" logger.debug(values_dict) # print(dump(values_dict)) with open(file_path, 'w') as f: dump(values_dict, f, default_flow_style=False) f.close() # DEBUG with open(file_path) as f: for line in f.readlines(): line = line.strip("\n") logger.debug(line) def helm_install_server(helm_path, deploy_mode, image_tag, image_type, name, namespace): """Deploy server with using helm. """ from kubernetes import client, config client.rest.logger.setLevel(logging.WARNING) timeout = 300 logger.debug("Server deploy mode: %s" % deploy_mode) host = "%s.%s.svc.cluster.local" % (name, namespace) if deploy_mode == "single": install_cmd = "helm install --wait --timeout %ds \ --set image.repository=%s \ --set image.tag=%s \ --set image.pullPolicy=Always \ --set service.type=ClusterIP \ -f ci/filebeat/values.yaml \ --namespace %s \ %s ." % (timeout, REGISTRY_URL, image_tag, namespace, name) elif deploy_mode == "shards": install_cmd = "helm install --wait --timeout %ds \ --set cluster.enabled=true \ --set persistence.enabled=true \ --set mishards.image.tag=test \ --set mishards.image.pullPolicy=Always \ --set image.repository=%s \ --set image.tag=%s \ --set image.pullPolicy=Always \ --set service.type=ClusterIP \ -f ci/filebeat/values.yaml \ --namespace %s \ %s ." % (timeout, REGISTRY_URL, image_tag, namespace, name) logger.debug(install_cmd) logger.debug(host) if os.system("cd %s && %s" % (helm_path, install_cmd)): logger.error("Helm install failed") return None time.sleep(5) # config.load_kube_config() # v1 = client.CoreV1Api() # pod_name = None # pod_id = None # pods = v1.list_namespaced_pod(namespace) # for i in pods.items: # if i.metadata.name.find(name) != -1: # pod_name = i.metadata.name # pod_ip = i.status.pod_ip # logger.debug(pod_name) # logger.debug(pod_ip) # return pod_name, pod_ip return host def helm_del_server(name, namespace): """ Delete server with using helm uninstall. Return status if uninstall successfully or not """ # logger.debug("Sleep 600s before uninstall server") # time.sleep(600) del_cmd = "helm uninstall -n milvus %s" % name logger.debug(del_cmd) if os.system(del_cmd): logger.error("Helm delete name:%s failed" % name) return False return True def restart_server(helm_release_name, namespace): res = True timeout = 120 from kubernetes import client, config client.rest.logger.setLevel(logging.WARNING) # service_name = "%s.%s.svc.cluster.local" % (helm_release_name, namespace) config.load_kube_config() v1 = client.CoreV1Api() pod_name = None # config_map_names = v1.list_namespaced_config_map(namespace, pretty='true') # body = {"replicas": 0} pods = v1.list_namespaced_pod(namespace) for i in pods.items: if i.metadata.name.find(helm_release_name) != -1 and i.metadata.name.find("mysql") == -1: pod_name = i.metadata.name break # v1.patch_namespaced_config_map(config_map_name, namespace, body, pretty='true') # status_res = v1.read_namespaced_service_status(helm_release_name, namespace, pretty='true') logger.debug("Pod name: %s" % pod_name) if pod_name is not None: try: v1.delete_namespaced_pod(pod_name, namespace) except Exception as e: logging.error(str(e)) logging.error("Exception when calling CoreV1Api->delete_namespaced_pod") res = False return res logging.error("Sleep 10s after pod deleted") time.sleep(10) # check if restart successfully pods = v1.list_namespaced_pod(namespace) for i in pods.items: pod_name_tmp = i.metadata.name logging.error(pod_name_tmp) if pod_name_tmp == pod_name: continue elif pod_name_tmp.find(helm_release_name) == -1 or pod_name_tmp.find("mysql") != -1: continue else: status_res = v1.read_namespaced_pod_status(pod_name_tmp, namespace, pretty='true') logging.error(status_res.status.phase) start_time = time.time() while time.time() - start_time <= timeout: logging.error(time.time()) status_res = v1.read_namespaced_pod_status(pod_name_tmp, namespace, pretty='true') if status_res.status.phase == "Running": logging.error("Running") logging.error("Sleep after restart") break else: time.sleep(1) if time.time() - start_time > timeout: logging.error("Restart pod: %s timeout" % pod_name_tmp) res = False return res else: raise Exception("Pod: %s not found" % pod_name) follow = True pretty = True previous = True # bool | Return previous terminated container logs. Defaults to false. (optional) since_seconds = 56 # int | A relative time in seconds before the current time from which to show logs. If this value precedes the time a pod was started, only logs since the pod start will be returned. If this value is in the future, no logs will be returned. Only one of sinceSeconds or sinceTime may be specified. (optional) timestamps = True # bool | If true, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output. Defaults to false. (optional) container = "milvus" try: api_response = v1.read_namespaced_pod_log(pod_name_tmp, namespace, container=container, follow=follow, pretty=pretty, previous=previous, since_seconds=since_seconds, timestamps=timestamps) logging.error(api_response) except Exception as e: logging.error("Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) res = False return res time.sleep(30) return res # def pull_image(image): # registry = image.split(":")[0] # image_tag = image.split(":")[1] # client = docker.APIClient(base_url='unix://var/run/docker.sock') # logger.info("Start pulling image: %s" % image) # return client.pull(registry, image_tag) # def run_server(image, mem_limit=None, timeout=30, test_type="local", volume_name=None, db_slave=None): # import colors # client = docker.from_env() # # if mem_limit is None: # # mem_limit = psutil.virtual_memory().available # # logger.info('Memory limit:', mem_limit) # # cpu_limit = "0-%d" % (multiprocessing.cpu_count() - 1) # # logger.info('Running on CPUs:', cpu_limit) # for dir_item in ['logs', 'db']: # try: # os.mkdir(os.path.abspath(dir_item)) # except Exception as e: # pass # if test_type == "local": # volumes = { # os.path.abspath('conf'): # {'bind': '/opt/milvus/conf', 'mode': 'ro'}, # os.path.abspath('logs'): # {'bind': '/opt/milvus/logs', 'mode': 'rw'}, # os.path.abspath('db'): # {'bind': '/opt/milvus/db', 'mode': 'rw'}, # } # elif test_type == "remote": # if volume_name is None: # raise Exception("No volume name") # remote_log_dir = volume_name+'/logs' # remote_db_dir = volume_name+'/db' # for dir_item in [remote_log_dir, remote_db_dir]: # if not os.path.isdir(dir_item): # os.makedirs(dir_item, exist_ok=True) # volumes = { # os.path.abspath('conf'): # {'bind': '/opt/milvus/conf', 'mode': 'ro'}, # remote_log_dir: # {'bind': '/opt/milvus/logs', 'mode': 'rw'}, # remote_db_dir: # {'bind': '/opt/milvus/db', 'mode': 'rw'} # } # # add volumes # if db_slave and isinstance(db_slave, int): # for i in range(2, db_slave+1): # remote_db_dir = volume_name+'/data'+str(i) # if not os.path.isdir(remote_db_dir): # os.makedirs(remote_db_dir, exist_ok=True) # volumes[remote_db_dir] = {'bind': '/opt/milvus/data'+str(i), 'mode': 'rw'} # container = client.containers.run( # image, # volumes=volumes, # runtime="nvidia", # ports={'19530/tcp': 19530, '8080/tcp': 8080}, # # environment=["OMP_NUM_THREADS=48"], # # cpuset_cpus=cpu_limit, # # mem_limit=mem_limit, # # environment=[""], # detach=True) # def stream_logs(): # for line in container.logs(stream=True): # logger.info(colors.color(line.decode().rstrip(), fg='blue')) # if sys.version_info >= (3, 0): # t = threading.Thread(target=stream_logs, daemon=True) # else: # t = threading.Thread(target=stream_logs) # t.daemon = True # t.start() # logger.info('Container: %s started' % container) # return container # # exit_code = container.wait(timeout=timeout) # # # Exit if exit code # # if exit_code == 0: # # return container # # elif exit_code is not None: # # print(colors.color(container.logs().decode(), fg='red')) # def restart_server(container): # client = docker.APIClient(base_url='unix://var/run/docker.sock') # client.restart(container.name) # logger.info('Container: %s restarted' % container.name) # return container # def remove_container(container): # container.remove(force=True) # logger.info('Container: %s removed' % container) # def remove_all_containers(image): # client = docker.from_env() # try: # for container in client.containers.list(): # if image in container.image.tags: # container.stop(timeout=30) # container.remove(force=True) # except Exception as e: # logger.error("Containers removed failed") # def container_exists(image): # ''' # Check if container existed with the given image name # @params: image name # @return: container if exists # ''' # res = False # client = docker.from_env() # for container in client.containers.list(): # if image in container.image.tags: # # True # res = container # return res if __name__ == '__main__': update_values("","shards",None,None)