[test]Fix bulk insert perf (#20927)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/20969/head
zhuwenxing 2022-12-05 09:01:17 +08:00 committed by GitHub
parent 25a3b9ae19
commit c1b5e6aeb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 139 additions and 43 deletions

View File

@ -143,7 +143,7 @@ class ApiUtilityWrapper:
pending_tasks = self.get_bulk_insert_pending_list() pending_tasks = self.get_bulk_insert_pending_list()
log.info(f"after waiting, there are {len(pending_tasks)} pending tasks") log.info(f"after waiting, there are {len(pending_tasks)} pending tasks")
log.info(f"task state distribution: {tasks_state_distribution}") log.info(f"task state distribution: {tasks_state_distribution}")
log.debug(tasks_state) log.info(tasks_state)
if len(tasks_state_distribution["success"]) == len(task_ids): if len(tasks_state_distribution["success"]) == len(task_ids):
log.info(f"wait for bulk load tasks completed successfully, cost time: {end-start}") log.info(f"wait for bulk load tasks completed successfully, cost time: {end-start}")
return True, tasks_state return True, tasks_state

View File

@ -0,0 +1,25 @@
import pytest
def pytest_addoption(parser):
parser.addoption("--file_type", action="store", default="json", help="filetype")
parser.addoption("--create_index", action="store", default="create_index", help="whether creating index")
parser.addoption("--nb", action="store", default="", help="nb")
parser.addoption("--dim", action="store", default="2048", help="whether creating index")
@pytest.fixture
def file_type(request):
return request.config.getoption("--file_type")
@pytest.fixture
def create_index(request):
return request.config.getoption("--create_index")
@pytest.fixture
def nb(request):
return request.config.getoption("--nb")
@pytest.fixture
def dim(request):
return request.config.getoption("--dim")

View File

@ -1,5 +1,8 @@
import pytest import pytest
import json import os
import time
import threading
from pathlib import Path
from time import sleep from time import sleep
from minio import Minio from minio import Minio
from pymilvus import connections from pymilvus import connections
@ -58,6 +61,11 @@ class TestChaosBase:
class TestChaos(TestChaosBase): class TestChaos(TestChaosBase):
def teardown(self):
sleep(10)
log.info(f'Alive threads: {threading.enumerate()}')
@pytest.fixture(scope="function", autouse=True) @pytest.fixture(scope="function", autouse=True)
def connection(self, host, port, milvus_ns): def connection(self, host, port, milvus_ns):
connections.add_connection(default={"host": host, "port": port}) connections.add_connection(default={"host": host, "port": port})
@ -74,17 +82,16 @@ class TestChaos(TestChaosBase):
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys)
@pytest.fixture(scope="function", autouse=True) def init_health_checkers(self, collection_name=None, create_index=True, dim=2048):
def init_health_checkers(self, collection_name=None):
log.info("init health checkers") log.info("init health checkers")
c_name = collection_name if collection_name else cf.gen_unique_str("Checker_") c_name = collection_name if collection_name else cf.gen_unique_str("Checker_")
checkers = { checkers = {
Op.bulk_insert: BulkInsertChecker(collection_name=c_name, use_one_collection=True), Op.bulk_insert: BulkInsertChecker(collection_name=c_name, use_one_collection=False,
dim=dim, create_index=create_index),
} }
self.health_checkers = checkers self.health_checkers = checkers
@pytest.fixture(scope="function", autouse=True) def prepare_bulk_insert(self, nb=3000, file_type="json",dim=2048):
def prepare_bulk_insert(self, nb=100000):
if Op.bulk_insert not in self.health_checkers: if Op.bulk_insert not in self.health_checkers:
log.info("bulk_insert checker is not in health checkers, skip prepare bulk load") log.info("bulk_insert checker is not in health checkers, skip prepare bulk load")
return return
@ -100,40 +107,53 @@ class TestChaos(TestChaosBase):
minio_port = "9000" minio_port = "9000"
minio_endpoint = f"{minio_ip}:{minio_port}" minio_endpoint = f"{minio_ip}:{minio_port}"
bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"] bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
schema = cf.gen_default_collection_schema() schema = cf.gen_default_collection_schema(dim=dim)
data = cf.gen_default_list_data_for_bulk_insert(nb=nb) data = cf.gen_default_list_data_for_bulk_insert(nb=nb, dim=dim)
fields_name = [field.name for field in schema.fields] data_dir = "/tmp/bulk_insert_data"
entities = [] Path(data_dir).mkdir(parents=True, exist_ok=True)
for i in range(nb): files = []
entity_value = [field_values[i] for field_values in data] if file_type == "json":
entity = dict(zip(fields_name, entity_value)) files = cf.gen_json_files_for_bulk_insert(data, schema, data_dir)
entities.append(entity) if file_type == "npy":
data_dict = {"rows": entities} files = cf.gen_npy_files_for_bulk_insert(data, schema, data_dir)
data_source = "/tmp/ci_logs/bulk_insert_data_source.json"
file_name = "bulk_insert_data_source.json"
files = ["bulk_insert_data_source.json"]
# TODO: npy file type is not supported so far
log.info("generate bulk load file")
with open(data_source, "w") as f:
f.write(json.dumps(data_dict, indent=4))
log.info("upload file to minio") log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False) client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
client.fput_object(bucket_name, file_name, data_source) for file_name in files:
file_size = os.path.getsize(os.path.join(data_dir, file_name)) / 1024 / 1024
t0 = time.time()
client.fput_object(bucket_name, file_name, os.path.join(data_dir, file_name))
log.info(f"upload file {file_name} to minio, size: {file_size:.2f} MB, cost {time.time() - t0:.2f} s")
self.health_checkers[Op.bulk_insert].update(schema=schema, files=files) self.health_checkers[Op.bulk_insert].update(schema=schema, files=files)
log.info("prepare data for bulk load done") log.info("prepare data for bulk load done")
@pytest.mark.tags(CaseLabel.L3) @pytest.mark.tags(CaseLabel.L3)
def test_bulk_insert(self): def test_bulk_insert_perf(self, file_type, create_index, nb, dim):
# start the monitor threads to check the milvus ops # start the monitor threads to check the milvus ops
log.info("*********************Test Start**********************") log.info("*********************Test Start**********************")
log.info(connections.get_connection_addr('default')) log.info(connections.get_connection_addr('default'))
# c_name = cf.gen_unique_str("BulkInsertChecker_") log.info(f"file_type: {file_type}, create_index: {create_index}")
# self.init_health_checkers(collection_name=c_name) if create_index == "create_index":
create_index = True
else:
create_index = False
self.init_health_checkers(create_index=create_index, dim=int(dim))
if nb=="None":
nb = 3000
if file_type == "json":
nb = 13800
if file_type == "npy":
nb = 65000
else:
nb = int(nb)
self.prepare_bulk_insert(file_type=file_type, nb=nb, dim=int(dim))
cc.start_monitor_threads(self.health_checkers) cc.start_monitor_threads(self.health_checkers)
# wait 600s # wait 600s
sleep(constants.WAIT_PER_OP * 60) sleep(constants.WAIT_PER_OP * 30)
assert_statistic(self.health_checkers) assert_statistic(self.health_checkers)
assert_expectations() assert_expectations()
for k, checker in self.health_checkers.items():
checker.check_result()
checker.terminate()
log.info("*********************Test Completed**********************") log.info("*********************Test Completed**********************")

View File

@ -88,7 +88,7 @@ class Checker:
b. count operations and success rate b. count operations and success rate
""" """
def __init__(self, collection_name=None, shards_num=2): def __init__(self, collection_name=None, shards_num=2, dim=ct.default_dim):
self._succ = 0 self._succ = 0
self._fail = 0 self._fail = 0
self._keep_running = True self._keep_running = True
@ -98,12 +98,12 @@ class Checker:
c_name = collection_name if collection_name is not None else cf.gen_unique_str( c_name = collection_name if collection_name is not None else cf.gen_unique_str(
'Checker_') 'Checker_')
self.c_wrap.init_collection(name=c_name, self.c_wrap.init_collection(name=c_name,
schema=cf.gen_default_collection_schema(), schema=cf.gen_default_collection_schema(dim=dim),
shards_num=shards_num, shards_num=shards_num,
timeout=timeout, timeout=timeout,
# active_trace=True, # active_trace=True,
enable_traceback=enable_traceback) enable_traceback=enable_traceback)
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH), self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH, dim=dim),
timeout=timeout, timeout=timeout,
enable_traceback=enable_traceback) enable_traceback=enable_traceback)
self.initial_entities = self.c_wrap.num_entities # do as a flush self.initial_entities = self.c_wrap.num_entities # do as a flush
@ -125,6 +125,7 @@ class Checker:
checker_name = self.__class__.__name__ checker_name = self.__class__.__name__
checkers_result = f"{checker_name}, succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}" checkers_result = f"{checker_name}, succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}"
log.info(checkers_result) log.info(checkers_result)
log.info(f"{checker_name} rsp times: {self.rsp_times}")
return checkers_result return checkers_result
def terminate(self): def terminate(self):
@ -579,18 +580,19 @@ class LoadBalanceChecker(Checker):
class BulkInsertChecker(Checker): class BulkInsertChecker(Checker):
"""check bulk load operations in a dependent thread""" """check bulk load operations in a dependent thread"""
def __init__(self, collection_name=None, files=[], use_one_collection=False): def __init__(self, collection_name=None, files=[], use_one_collection=False, dim=ct.default_dim, create_index=True):
if collection_name is None: if collection_name is None:
collection_name = cf.gen_unique_str("BulkInsertChecker_") collection_name = cf.gen_unique_str("BulkInsertChecker_")
super().__init__(collection_name=collection_name) super().__init__(collection_name=collection_name, dim=dim)
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, self.create_index = create_index
constants.DEFAULT_INDEX_PARAM, if self.create_index:
index_name=cf.gen_unique_str( res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
'index_'), constants.DEFAULT_INDEX_PARAM,
timeout=timeout, index_name=cf.gen_unique_str(
enable_traceback=enable_traceback, 'index_'),
check_task=CheckTasks.check_nothing) timeout=timeout,
self.c_wrap.load() enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.utility_wrap = ApiUtilityWrapper() self.utility_wrap = ApiUtilityWrapper()
self.schema = cf.gen_default_collection_schema() self.schema = cf.gen_default_collection_schema()
self.files = files self.files = files
@ -610,7 +612,7 @@ class BulkInsertChecker(Checker):
log.info(f"bulk insert collection name: {self.c_name}") log.info(f"bulk insert collection name: {self.c_name}")
task_ids, result = self.utility_wrap.do_bulk_insert(collection_name=self.c_name, task_ids, result = self.utility_wrap.do_bulk_insert(collection_name=self.c_name,
files=self.files) files=self.files)
completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=60) completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=120)
return task_ids, completed return task_ids, completed
@exception_handler() @exception_handler()
@ -622,6 +624,14 @@ class BulkInsertChecker(Checker):
else: else:
self.c_name = cf.gen_unique_str("BulkInsertChecker_") self.c_name = cf.gen_unique_str("BulkInsertChecker_")
self.c_wrap.init_collection(name=self.c_name, schema=self.schema) self.c_wrap.init_collection(name=self.c_name, schema=self.schema)
if self.create_index:
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
# bulk insert data # bulk insert data
task_ids, completed = self.bulk_insert() task_ids, completed = self.bulk_insert()
if not completed: if not completed:

View File

@ -2,6 +2,8 @@ import os
import random import random
import math import math
import string import string
import json
from functools import singledispatch
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from sklearn import preprocessing from sklearn import preprocessing
@ -15,6 +17,17 @@ from customize.milvus_operator import MilvusOperator
"""" Methods of processing data """ """" Methods of processing data """
@singledispatch
def to_serializable(val):
"""Used by default."""
return str(val)
@to_serializable.register(np.float32)
def ts_float32(val):
"""Used if *val* is an instance of numpy.float32."""
return np.float64(val)
class ParamInfo: class ParamInfo:
def __init__(self): def __init__(self):
self.param_host = "" self.param_host = ""
@ -321,13 +334,41 @@ def gen_default_list_data(nb=ct.default_nb, dim=ct.default_dim):
def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, dim=ct.default_dim): def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, dim=ct.default_dim):
int_values = [i for i in range(nb)] int_values = [i for i in range(nb)]
float_values = [float(i) for i in range(nb)] float_values = [np.float32(i) for i in range(nb)]
string_values = [str(i) for i in range(nb)] string_values = [str(i) for i in range(nb)]
float_vec_values = gen_vectors(nb, dim) float_vec_values = gen_vectors(nb, dim)
data = [int_values, float_values, string_values, float_vec_values] data = [int_values, float_values, string_values, float_vec_values]
return data return data
def gen_json_files_for_bulk_insert(data, schema, data_dir):
nb = len(data[0])
fields_name = [field.name for field in schema.fields]
entities = []
for i in range(nb):
entity_value = [field_values[i] for field_values in data]
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
file_name = "bulk_insert_data_source.json"
files = ["bulk_insert_data_source.json"]
data_source = os.path.join(data_dir, file_name)
with open(data_source, "w") as f:
f.write(json.dumps(data_dict, indent=4, default=to_serializable))
return files
def gen_npy_files_for_bulk_insert(data, schema, data_dir):
fields_name = [field.name for field in schema.fields]
files = []
for field in fields_name:
files.append(f"{field}.npy")
for i, file in enumerate(files):
data_source = os.path.join(data_dir, file)
np.save(data_source, np.array(data[i]))
return files
def gen_default_tuple_data(nb=ct.default_nb, dim=ct.default_dim): def gen_default_tuple_data(nb=ct.default_nb, dim=ct.default_dim):
int_values = [i for i in range(nb)] int_values = [i for i in range(nb)]
float_values = [np.float32(i) for i in range(nb)] float_values = [np.float32(i) for i in range(nb)]