milvus/tests/benchmark/milvus_benchmark/update.py

237 lines
9.0 KiB
Python

import sys
import re
import logging
import traceback
import argparse
from yaml import full_load, dump
import config
import utils
def parse_server_tag(server_tag):
""" paser server tag from server config"""
# tag format: "8c"/"8c16m"/"8c16m1g"
if server_tag[-1] == "c":
p = r"(\d+)c"
elif server_tag[-1] == "m":
p = r"(\d+)c(\d+)m"
elif server_tag[-1] == "g":
p = r"(\d+)c(\d+)m(\d+)g"
else:
raise Exception("Unable to parse server tag")
m = re.match(p, server_tag)
cpus = int(m.groups()[0])
mems = None
gpus = None
if len(m.groups()) > 1:
mems = int(m.groups()[1])
if len(m.groups()) > 2:
gpus = int(m.groups()[2])
return {"cpus": cpus, "mems": mems, "gpus": gpus}
def update_values(src_values_file, deploy_params_file):
"""
description: update values.yaml
return: no return
"""
perf_tolerations = [{
"key": "node-role.kubernetes.io/benchmark",
"operator": "Exists",
"effect": "NoSchedule"
}]
# deploy_mode, hostname, server_tag, milvus_config, server_config=None
try:
with open(src_values_file) as f:
values_dict = full_load(f)
f.close()
with open(deploy_params_file) as f:
deploy_params = full_load(f)
f.close()
except Exception as e:
logging.error(str(e))
raise Exception("File not found")
deploy_mode = utils.get_deploy_mode(deploy_params)
print("[benchmark update] deploy_mode: %s" % str(deploy_mode))
cluster = False
if deploy_mode in [config.CLUSTER_DEPLOY_MODE, config.CLUSTER_3RD_DEPLOY_MODE]:
cluster = True
elif deploy_mode == config.SINGLE_DEPLOY_MODE:
values_dict["cluster"]["enabled"] = False
values_dict["etcd"]["replicaCount"] = 1
values_dict["minio"]["mode"] = "standalone"
values_dict["pulsar"]["enabled"] = False
server_tag = utils.get_server_tag(deploy_params)
print(server_tag)
# TODO: update milvus config
# update values.yaml with the given host
# if server_name:
# node_config = {'kubernetes.io/hostname': server_name}
# elif server_tag:
# node_config = {'instance-type': server_tag}
cpus = None
mems = None
gpus = None
if server_tag:
res = parse_server_tag(server_tag)
cpus = res["cpus"]
mems = res["mems"]
gpus = res["gpus"]
if cpus:
resources = {
"limits": {
"cpu": str(int(cpus)) + ".0"
},
"requests": {
"cpu": str(int(cpus) // 2 + 1) + ".0"
}
}
if cpus and mems:
resources_cluster = {
"limits": {
"cpu": str(int(cpus)) + ".0",
"memory": str(int(mems)) + "Gi"
},
"requests": {
"cpu": str(int(cpus) // 2 + 1) + ".0",
"memory": str(int(mems) // 2 + 1) + "Gi"
# "cpu": "4.0"
# "cpu": str(int(cpus) - 1) + ".0"
}
}
# use external minio/s3
# TODO: disable temp
values_dict['minio']['enabled'] = True
# values_dict["externalS3"]["enabled"] = True
values_dict["externalS3"]["enabled"] = False
values_dict["externalS3"]["host"] = config.MINIO_HOST
values_dict["externalS3"]["port"] = config.MINIO_PORT
values_dict["externalS3"]["accessKey"] = config.MINIO_ACCESS_KEY
values_dict["externalS3"]["secretKey"] = config.MINIO_SECRET_KEY
values_dict["externalS3"]["bucketName"] = config.MINIO_BUCKET_NAME
logging.debug(values_dict["externalS3"])
if cluster is False:
# TODO: support pod affinity for standalone mode
if cpus:
# values_dict['standalone']['nodeSelector'] = node_config
# values_dict['minio']['nodeSelector'] = node_config
# values_dict['etcd']['nodeSelector'] = node_config
# # set limit/request cpus in resources
values_dict['standalone']['resources'] = resources
if mems:
values_dict['standalone']['resources']["limits"].update({"memory": str(int(mems)) + "Gi"})
values_dict['standalone']['resources']["requests"].update({"memory": str(int(mems) // 2 + 1) + "Gi"})
if gpus:
logging.info("TODO: Need to schedule pod on GPU server")
logging.debug("Add tolerations into standalone server")
values_dict['standalone']['tolerations'] = perf_tolerations
values_dict['minio']['tolerations'] = perf_tolerations
values_dict['etcd']['tolerations'] = perf_tolerations
else:
# TODO: mem limits on distributed mode
# values_dict['pulsar']["broker"]["configData"].update({"maxMessageSize": "52428800", "PULSAR_MEM": BOOKKEEPER_PULSAR_MEM})
# values_dict['pulsar']["bookkeeper"]["configData"].update({"nettyMaxFrameSizeBytes": "52428800", "PULSAR_MEM": BROKER_PULSAR_MEM})
if cpus:
# values_dict['standalone']['nodeSelector'] = node_config
# values_dict['minio']['nodeSelector'] = node_config
# values_dict['etcd']['nodeSelector'] = node_config
# # set limit/request cpus in resources
# values_dict['proxy']['resources'] = resources
values_dict['queryNode']['resources'] = resources_cluster
values_dict['indexNode']['resources'] = resources_cluster
values_dict['dataNode']['resources'] = resources_cluster
# values_dict['minio']['resources'] = resources
# values_dict['pulsarStandalone']['resources'] = resources
if mems:
logging.debug("TODO: Update mem resources")
logging.debug("Add tolerations into cluster server")
values_dict['proxy']['tolerations'] = perf_tolerations
values_dict['queryNode']['tolerations'] = perf_tolerations
values_dict['indexNode']['tolerations'] = perf_tolerations
values_dict['dataNode']['tolerations'] = perf_tolerations
values_dict['etcd']['tolerations'] = perf_tolerations
values_dict['minio']['tolerations'] = perf_tolerations
if deploy_mode == config.SINGLE_DEPLOY_MODE:
values_dict['pulsarStandalone']['tolerations'] = perf_tolerations
# TODO: for distributed deployment
# values_dict['pulsar']['autoRecovery']['tolerations'] = perf_tolerations
# values_dict['pulsar']['proxy']['tolerations'] = perf_tolerations
# values_dict['pulsar']['broker']['tolerations'] = perf_tolerations
# values_dict['pulsar']['bookkeeper']['tolerations'] = perf_tolerations
# values_dict['pulsar']['zookeeper']['tolerations'] = perf_tolerations
milvus_params = deploy_params["milvus"]
if "datanode" in milvus_params:
if "replicas" in milvus_params["datanode"]:
values_dict['dataNode']["replicas"] = milvus_params["datanode"]["replicas"]
if "querynode"in milvus_params:
if "replicas" in milvus_params["querynode"]:
values_dict['queryNode']["replicas"] = milvus_params["querynode"]["replicas"]
if "indexnode"in milvus_params:
if "replicas" in milvus_params["indexnode"]:
values_dict['indexNode']["replicas"] = milvus_params["indexnode"]["replicas"]
if "proxy"in milvus_params:
if "replicas" in milvus_params["proxy"]:
values_dict['proxy']["replicas"] = milvus_params["proxy"]["replicas"]
# add extra volumes
values_dict['extraVolumes'] = [{
'name': 'test',
'flexVolume': {
'driver': "fstab/cifs",
'fsType': "cifs",
'secretRef': {
'name': "cifs-test-secret"
},
'options': {
'networkPath': config.IDC_NAS_URL,
'mountOptions': "vers=1.0"
}
}
}]
values_dict['extraVolumeMounts'] = [{
'name': 'test',
'mountPath': '/test'
}]
server_resource = utils.get_server_resource(deploy_params)
print("[benchmark update] server_resource: %s" % str(server_resource))
values_dict = utils.update_dict_value(server_resource, values_dict)
print(values_dict)
# Update content of src_values_file
with open(src_values_file, 'w') as f:
dump(values_dict, f, default_flow_style=False)
f.close()
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument(
'--src-values',
help='src values.yaml')
arg_parser.add_argument(
'--deploy-params',
help='deploy params')
args = arg_parser.parse_args()
src_values_file = args.src_values
deploy_params_file = args.deploy_params
if not src_values_file or not deploy_params_file:
logging.error("No valid file input")
sys.exit(-1)
try:
update_values(src_values_file, deploy_params_file)
logging.info("Values.yaml updated")
except Exception as e:
logging.error(str(e))
logging.error(traceback.format_exc())
sys.exit(-1)