mirror of https://github.com/milvus-io/milvus.git
401 lines
17 KiB
Python
401 lines
17 KiB
Python
import time
|
|
import copy
|
|
import logging
|
|
from . import locust_user
|
|
from .base import BaseRunner
|
|
from milvus_benchmark import parser
|
|
from milvus_benchmark import utils
|
|
from milvus_benchmark.runners import utils as runner_utils
|
|
|
|
logger = logging.getLogger("milvus_benchmark.runners.locust")
|
|
|
|
|
|
class LocustRunner(BaseRunner):
|
|
def __init__(self, env, metric):
|
|
super(LocustRunner, self).__init__(env, metric)
|
|
|
|
def run_case(self, case_metric, **case_param):
|
|
collection_name = case_param["collection_name"]
|
|
task = case_param["task"]
|
|
connection_type = case_param["connection_type"]
|
|
|
|
# spawn locust requests
|
|
task["during_time"] = utils.timestr_to_int(task["during_time"])
|
|
task_types = task["types"]
|
|
run_params = {"tasks": {}}
|
|
run_params.update(task)
|
|
info_in_params = {
|
|
"index_field_name": case_param["index_field_name"],
|
|
"vector_field_name": case_param["vector_field_name"],
|
|
"dimension": case_param["dimension"],
|
|
"collection_info": self.milvus.get_info(collection_name)}
|
|
logger.info(info_in_params)
|
|
run_params.update({"op_info": info_in_params})
|
|
for task_type in task_types:
|
|
run_params["tasks"].update({
|
|
task_type["type"]: {
|
|
"weight": task_type["weight"] if "weight" in task_type else 1,
|
|
"params": task_type["params"] if "params" in task_type else None,
|
|
}
|
|
})
|
|
# collect stats
|
|
# pdb.set_trace()
|
|
logger.info(run_params)
|
|
locust_stats = locust_user.locust_executor(self.hostname, self.port, collection_name,
|
|
connection_type=connection_type, run_params=run_params)
|
|
return locust_stats
|
|
|
|
|
|
class LocustInsertRunner(LocustRunner):
|
|
"""run insert"""
|
|
name = "locust_insert_performance"
|
|
|
|
def __init__(self, env, metric):
|
|
super(LocustInsertRunner, self).__init__(env, metric)
|
|
|
|
def extract_cases(self, collection):
|
|
collection_name = collection["collection_name"] if "collection_name" in collection else None
|
|
|
|
(data_type, collection_size, dimension, metric_type) = parser.collection_parser(collection_name)
|
|
ni_per = collection["ni_per"]
|
|
build_index = collection["build_index"] if "build_index" in collection else False
|
|
vector_type = runner_utils.get_vector_type(data_type)
|
|
other_fields = collection["other_fields"] if "other_fields" in collection else None
|
|
collection_info = {
|
|
"dimension": dimension,
|
|
"metric_type": metric_type,
|
|
"dataset_name": collection_name,
|
|
"collection_size": collection_size,
|
|
"other_fields": other_fields,
|
|
"ni_per": ni_per
|
|
}
|
|
index_field_name = None
|
|
index_type = None
|
|
index_param = None
|
|
index_info = None
|
|
vector_field_name = runner_utils.get_default_field_name(vector_type)
|
|
if build_index is True:
|
|
index_type = collection["index_type"]
|
|
index_param = collection["index_param"]
|
|
index_info = {
|
|
"index_type": index_type,
|
|
"index_param": index_param
|
|
}
|
|
index_field_name = runner_utils.get_default_field_name(vector_type)
|
|
task = collection["task"]
|
|
connection_type = "single"
|
|
connection_num = task["connection_num"]
|
|
if connection_num > 1:
|
|
connection_type = "multi"
|
|
run_params = {
|
|
"task": collection["task"],
|
|
"connection_type": connection_type,
|
|
}
|
|
self.init_metric(self.name, collection_info, index_info, None, run_params)
|
|
case_metric = copy.deepcopy(self.metric)
|
|
# set metric type as case
|
|
case_metric.set_case_metric_type()
|
|
case_metrics = list()
|
|
case_params = list()
|
|
case_metrics.append(case_metric)
|
|
case_param = {
|
|
"collection_name": collection_name,
|
|
"data_type": data_type,
|
|
"dimension": dimension,
|
|
"collection_size": collection_size,
|
|
"ni_per": ni_per,
|
|
"metric_type": metric_type,
|
|
"vector_type": vector_type,
|
|
"other_fields": other_fields,
|
|
"build_index": build_index,
|
|
"index_field_name": index_field_name,
|
|
"vector_field_name": vector_field_name,
|
|
"index_type": index_type,
|
|
"index_param": index_param,
|
|
"task": collection["task"],
|
|
"connection_type": connection_type,
|
|
}
|
|
case_params.append(case_param)
|
|
return case_params, case_metrics
|
|
|
|
def prepare(self, **case_param):
|
|
collection_name = case_param["collection_name"]
|
|
dimension = case_param["dimension"]
|
|
vector_type = case_param["vector_type"]
|
|
other_fields = case_param["other_fields"]
|
|
index_field_name = case_param["index_field_name"]
|
|
build_index = case_param["build_index"]
|
|
|
|
self.milvus.set_collection(collection_name)
|
|
if self.milvus.exists_collection():
|
|
logger.debug("Start drop collection")
|
|
self.milvus.drop()
|
|
time.sleep(runner_utils.DELETE_INTERVAL_TIME)
|
|
self.milvus.create_collection(dimension, data_type=vector_type,
|
|
other_fields=other_fields)
|
|
# TODO: update fields in collection_info
|
|
# fields = self.get_fields(self.milvus, collection_name)
|
|
# collection_info = {
|
|
# "dimension": dimension,
|
|
# "metric_type": metric_type,
|
|
# "dataset_name": collection_name,
|
|
# "fields": fields
|
|
# }
|
|
if build_index is True:
|
|
if case_param["index_type"]:
|
|
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
|
|
logger.debug(self.milvus.describe_index(index_field_name))
|
|
else:
|
|
build_index = False
|
|
logger.warning("Please specify the index_type")
|
|
|
|
|
|
class LocustSearchRunner(LocustRunner):
|
|
"""run search"""
|
|
name = "locust_search_performance"
|
|
|
|
def __init__(self, env, metric):
|
|
super(LocustSearchRunner, self).__init__(env, metric)
|
|
|
|
def extract_cases(self, collection):
|
|
collection_name = collection["collection_name"] if "collection_name" in collection else None
|
|
(data_type, collection_size, dimension, metric_type) = parser.collection_parser(collection_name)
|
|
ni_per = collection["ni_per"]
|
|
build_index = collection["build_index"] if "build_index" in collection else False
|
|
vector_type = runner_utils.get_vector_type(data_type)
|
|
other_fields = collection["other_fields"] if "other_fields" in collection else None
|
|
|
|
collection_info = {
|
|
"dimension": dimension,
|
|
"metric_type": metric_type,
|
|
"dataset_name": collection_name,
|
|
"collection_size": collection_size,
|
|
"other_fields": other_fields,
|
|
"ni_per": ni_per
|
|
}
|
|
index_field_name = None
|
|
index_type = None
|
|
index_param = None
|
|
index_info = None
|
|
if build_index is True:
|
|
index_type = collection["index_type"]
|
|
index_param = collection["index_param"]
|
|
index_info = {
|
|
"index_type": index_type,
|
|
"index_param": index_param
|
|
}
|
|
index_field_name = runner_utils.get_default_field_name(vector_type)
|
|
vector_field_name = runner_utils.get_default_field_name(vector_type)
|
|
task = collection["task"]
|
|
connection_type = "single"
|
|
connection_num = task["connection_num"]
|
|
if connection_num > 1:
|
|
connection_type = "multi"
|
|
run_params = {
|
|
"task": collection["task"],
|
|
"connection_type": connection_type,
|
|
}
|
|
self.init_metric(self.name, collection_info, index_info, None, run_params)
|
|
case_metric = copy.deepcopy(self.metric)
|
|
# set metric type as case
|
|
case_metric.set_case_metric_type()
|
|
case_metrics = list()
|
|
case_params = list()
|
|
case_metrics.append(case_metric)
|
|
case_param = {
|
|
"collection_name": collection_name,
|
|
"data_type": data_type,
|
|
"dimension": dimension,
|
|
"collection_size": collection_size,
|
|
"ni_per": ni_per,
|
|
"metric_type": metric_type,
|
|
"vector_type": vector_type,
|
|
"other_fields": other_fields,
|
|
"build_index": build_index,
|
|
"index_field_name": index_field_name,
|
|
"vector_field_name": vector_field_name,
|
|
"index_type": index_type,
|
|
"index_param": index_param,
|
|
"task": collection["task"],
|
|
"connection_type": connection_type,
|
|
}
|
|
case_params.append(case_param)
|
|
return case_params, case_metrics
|
|
|
|
def prepare(self, **case_param):
|
|
collection_name = case_param["collection_name"]
|
|
dimension = case_param["dimension"]
|
|
vector_type = case_param["vector_type"]
|
|
other_fields = case_param["other_fields"]
|
|
index_field_name = case_param["index_field_name"]
|
|
metric_type = case_param["metric_type"]
|
|
build_index = case_param["build_index"]
|
|
|
|
self.milvus.set_collection(collection_name)
|
|
if self.milvus.exists_collection():
|
|
logger.debug("Start drop collection")
|
|
self.milvus.drop()
|
|
time.sleep(runner_utils.DELETE_INTERVAL_TIME)
|
|
self.milvus.create_collection(dimension, data_type=vector_type,
|
|
other_fields=other_fields)
|
|
# TODO: update fields in collection_info
|
|
# fields = self.get_fields(self.milvus, collection_name)
|
|
# collection_info = {
|
|
# "dimension": dimension,
|
|
# "metric_type": metric_type,
|
|
# "dataset_name": collection_name,
|
|
# "fields": fields
|
|
# }
|
|
if build_index is True:
|
|
if case_param["index_type"]:
|
|
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
|
|
logger.debug(self.milvus.describe_index(index_field_name))
|
|
else:
|
|
build_index = False
|
|
logger.warning("Please specify the index_type")
|
|
self.insert(self.milvus, collection_name, case_param["data_type"], dimension, case_param["collection_size"], case_param["ni_per"])
|
|
build_time = 0.0
|
|
start_time = time.time()
|
|
self.milvus.flush()
|
|
flush_time = round(time.time()-start_time, 2)
|
|
logger.debug(self.milvus.count())
|
|
if build_index is True:
|
|
logger.debug("Start build index for last file")
|
|
start_time = time.time()
|
|
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
|
|
build_time = round(time.time()-start_time, 2)
|
|
logger.debug({"flush_time": flush_time, "build_time": build_time})
|
|
logger.info(self.milvus.count())
|
|
logger.info("Start load collection")
|
|
load_start_time = time.time()
|
|
self.milvus.load_collection()
|
|
logger.debug({"load_time": round(time.time()-load_start_time, 2)})
|
|
# search_param = None
|
|
# for op in case_param["task"]["types"]:
|
|
# if op["type"] == "query":
|
|
# search_param = op["params"]["search_param"]
|
|
# break
|
|
# logger.info("index_field_name: {}".format(index_field_name))
|
|
# TODO: enable warm query
|
|
# self.milvus.warm_query(index_field_name, search_param, metric_type, times=2)
|
|
|
|
|
|
class LocustRandomRunner(LocustRunner):
|
|
"""run random interface"""
|
|
name = "locust_random_performance"
|
|
|
|
def __init__(self, env, metric):
|
|
super(LocustRandomRunner, self).__init__(env, metric)
|
|
|
|
def extract_cases(self, collection):
|
|
collection_name = collection["collection_name"] if "collection_name" in collection else None
|
|
(data_type, collection_size, dimension, metric_type) = parser.collection_parser(collection_name)
|
|
ni_per = collection["ni_per"]
|
|
build_index = collection["build_index"] if "build_index" in collection else False
|
|
vector_type = runner_utils.get_vector_type(data_type)
|
|
other_fields = collection["other_fields"] if "other_fields" in collection else None
|
|
|
|
collection_info = {
|
|
"dimension": dimension,
|
|
"metric_type": metric_type,
|
|
"dataset_name": collection_name,
|
|
"collection_size": collection_size,
|
|
"other_fields": other_fields,
|
|
"ni_per": ni_per
|
|
}
|
|
index_field_name = None
|
|
index_type = None
|
|
index_param = None
|
|
index_info = None
|
|
vector_field_name = runner_utils.get_default_field_name(vector_type)
|
|
if build_index is True:
|
|
index_type = collection["index_type"]
|
|
index_param = collection["index_param"]
|
|
index_info = {
|
|
"index_type": index_type,
|
|
"index_param": index_param
|
|
}
|
|
index_field_name = runner_utils.get_default_field_name(vector_type)
|
|
task = collection["task"]
|
|
connection_type = "single"
|
|
connection_num = task["connection_num"]
|
|
if connection_num > 1:
|
|
connection_type = "multi"
|
|
run_params = {
|
|
"task": collection["task"],
|
|
"connection_type": connection_type,
|
|
}
|
|
self.init_metric(self.name, collection_info, index_info, None, run_params)
|
|
case_metric = copy.deepcopy(self.metric)
|
|
case_metric.set_case_metric_type()
|
|
case_metrics = list()
|
|
case_params = list()
|
|
case_metrics.append(case_metric)
|
|
case_param = {
|
|
"collection_name": collection_name,
|
|
"data_type": data_type,
|
|
"dimension": dimension,
|
|
"collection_size": collection_size,
|
|
"ni_per": ni_per,
|
|
"metric_type": metric_type,
|
|
"vector_type": vector_type,
|
|
"other_fields": other_fields,
|
|
"build_index": build_index,
|
|
"index_field_name": index_field_name,
|
|
"vector_field_name": vector_field_name,
|
|
"index_type": index_type,
|
|
"index_param": index_param,
|
|
"task": collection["task"],
|
|
"connection_type": connection_type,
|
|
}
|
|
case_params.append(case_param)
|
|
return case_params, case_metrics
|
|
|
|
def prepare(self, **case_param):
|
|
collection_name = case_param["collection_name"]
|
|
dimension = case_param["dimension"]
|
|
vector_type = case_param["vector_type"]
|
|
other_fields = case_param["other_fields"]
|
|
index_field_name = case_param["index_field_name"]
|
|
build_index = case_param["build_index"]
|
|
|
|
self.milvus.set_collection(collection_name)
|
|
if self.milvus.exists_collection():
|
|
logger.debug("Start drop collection")
|
|
self.milvus.drop()
|
|
time.sleep(runner_utils.DELETE_INTERVAL_TIME)
|
|
self.milvus.create_collection(dimension, data_type=vector_type,
|
|
other_fields=other_fields)
|
|
# TODO: update fields in collection_info
|
|
# fields = self.get_fields(self.milvus, collection_name)
|
|
# collection_info = {
|
|
# "dimension": dimension,
|
|
# "metric_type": metric_type,
|
|
# "dataset_name": collection_name,
|
|
# "fields": fields
|
|
# }
|
|
if build_index is True:
|
|
if case_param["index_type"]:
|
|
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
|
|
logger.debug(self.milvus.describe_index(index_field_name))
|
|
else:
|
|
build_index = False
|
|
logger.warning("Please specify the index_type")
|
|
self.insert(self.milvus, collection_name, case_param["data_type"], dimension, case_param["collection_size"], case_param["ni_per"])
|
|
build_time = 0.0
|
|
start_time = time.time()
|
|
self.milvus.flush()
|
|
flush_time = round(time.time()-start_time, 2)
|
|
logger.debug(self.milvus.count())
|
|
if build_index is True:
|
|
logger.debug("Start build index for last file")
|
|
start_time = time.time()
|
|
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
|
|
build_time = round(time.time()-start_time, 2)
|
|
logger.debug({"flush_time": flush_time, "build_time": build_time})
|
|
logger.info(self.milvus.count())
|
|
logger.info("Start load collection")
|
|
load_start_time = time.time()
|
|
self.milvus.load_collection()
|
|
logger.debug({"load_time": round(time.time()-load_start_time, 2)})
|