[test]Add test case for bulk insert (#19898)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/19937/head
zhuwenxing 2022-10-20 18:19:29 +08:00 committed by GitHub
parent e081eb287d
commit c2bf419cb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 355 additions and 21 deletions

View File

@ -2,6 +2,7 @@ import logging
import time
import pytest
import random
import numpy as np
from pathlib import Path
from base.client_base import TestcaseBase
from common import common_func as cf
@ -140,7 +141,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
nq = 2
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -220,7 +221,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
nq = 3
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
search_params = ct.default_search_params
time.sleep(5)
res, _ = self.collection_wrap.search(
search_data,
@ -278,11 +279,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
p_name = cf.gen_unique_str("bulk_insert")
m_partition, _ = self.collection_wrap.create_partition(partition_name=p_name)
# build index before bulk insert
index_params = {
"index_type": "IVF_SQ8",
"params": {"nlist": 128},
"metric_type": "L2",
}
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
@ -320,7 +317,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
nq = 10
topk = 5
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 16}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -468,11 +465,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# build index before bulk insert
# build index
index_params = {
"index_type": "HNSW",
"params": {"M": 8, "efConstruction": 100},
"metric_type": "L2",
}
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
@ -516,7 +509,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "IP", "params": {"ef": 64}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -538,6 +531,102 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
df.pk_field, 1
)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("insert_before_bulk_insert", [True, False])
def test_insert_before_or_after_bulk_insert(self, insert_before_bulk_insert):
"""
collection schema: [pk, float_vector]
Steps:
1. create collection
2. create index and insert data or not
3. import data
4. insert data or not
5. verify the data entities
6. verify search and query
"""
bulk_insert_row = 500
direct_insert_row = 3000
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=True,
rows=bulk_insert_row,
dim=16,
data_fields=[df.pk_field, df.float_field, df.vec_field],
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_field(name=df.float_field),
cf.gen_float_vec_field(name=df.vec_field, dim=16),
]
data = [
[i for i in range(direct_insert_row)],
[np.float32(i) for i in range(direct_insert_row)],
cf.gen_vectors(direct_insert_row, 16),
]
schema = cf.gen_collection_schema(fields=fields)
self.collection_wrap.init_collection(c_name, schema=schema)
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
# load collection
self.collection_wrap.load()
if insert_before_bulk_insert:
# insert data
self.collection_wrap.insert(data)
self.collection_wrap.num_entities
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name, is_row_based=True, files=files
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
if not insert_before_bulk_insert:
# insert data
self.collection_wrap.insert(data)
self.collection_wrap.num_entities
num_entities = self.collection_wrap.num_entities
log.info(f"collection entities: {num_entities}")
assert num_entities == bulk_insert_row + direct_insert_row
# verify no index
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': num_entities, 'indexed_rows': num_entities}
assert res == exp_res
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, 16)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
expr = f"{df.pk_field} in {ids}"
expr = expr.replace("'", '"')
results, _ = self.collection_wrap.query(expr=expr)
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("create_index_before_bulk_insert", [True, False])
@pytest.mark.parametrize("loaded_before_bulk_insert", [True, False])
@ -610,7 +699,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, 16)
search_params = {"metric_type": "IP", "params": {"ef": 64}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -715,7 +804,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
search_data = cf.gen_vectors(1, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -956,7 +1045,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
nq = 2
topk = 5
search_data = cf.gen_vectors(nq, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -1092,7 +1181,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
search_data = cf.gen_vectors(1, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -1166,7 +1255,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
time.sleep(5)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -1636,7 +1725,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
if is_row_based:
failed_reason = f"field {dismatch_scalar_field} missed at the row 0"
else:
failed_reason = f"field '{dismatch_scalar_field}' row count 0 is not equal to other fields row count"
failed_reason = f"field {dismatch_scalar_field} row count 0 is not equal to other fields row count"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@ -2428,7 +2517,7 @@ class TestBulkInsertAdvanced(TestcaseBaseBulkInsert):
loaded_segs = len(self.utility_wrap.get_query_segment_info(c_name)[0])
log.info(f"query seg info: {loaded_segs} segs loaded.")
search_data = cf.gen_vectors(1, dim)
search_params = {"metric_type": "L2", "params": {"nprobe": 2}}
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
vec_field,

View File

@ -0,0 +1,245 @@
import logging
import time
import pytest
import random
import numpy as np
from pathlib import Path
from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
from common.milvus_sys import MilvusSys
from common.common_type import CaseLabel, CheckTasks
from utils.util_k8s import (
get_pod_ip_name_pairs,
get_milvus_instance_name,
)
from utils.util_log import test_log as log
from bulk_insert_data import (
prepare_bulk_insert_json_files,
prepare_bulk_insert_numpy_files,
DataField as df,
DataErrorType,
)
default_vec_only_fields = [df.vec_field]
default_multi_fields = [
df.vec_field,
df.int_field,
df.string_field,
df.bool_field,
df.float_field,
]
default_vec_n_int_fields = [df.vec_field, df.int_field]
milvus_ns = "chaos-testing"
base_dir = "/tmp/bulk_insert_data"
def entity_suffix(entities):
if entities // 1000000 > 0:
suffix = f"{entities // 1000000}m"
elif entities // 1000 > 0:
suffix = f"{entities // 1000}k"
else:
suffix = f"{entities}"
return suffix
class TestcaseBaseBulkInsert(TestcaseBase):
@pytest.fixture(scope="function", autouse=True)
def init_minio_client(self, host):
Path("/tmp/bulk_insert_data").mkdir(parents=True, exist_ok=True)
self._connect()
self.instance_name = get_milvus_instance_name(milvus_ns, host)
minio_ip_pod_pair = get_pod_ip_name_pairs(
milvus_ns, f"release={self.instance_name}, app=minio"
)
ms = MilvusSys()
minio_ip = list(minio_ip_pod_pair.keys())[0]
minio_port = "9000"
self.minio_endpoint = f"{minio_ip}:{minio_port}"
self.bucket_name = ms.index_nodes[0]["infos"]["system_configurations"][
"minio_bucket_name"
]
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." % method.__name__)
class TestBulkInsertTaskClean(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8, 128
@pytest.mark.parametrize("entities", [100]) # 100, 1000
def test_success_task_not_cleaned(self, is_row_based, auto_id, dim, entities):
"""
collection: auto_id, customized_id
collection schema: [pk, float_vector]
Steps:
1. create collection
2. import data
3. verify the data entities equal the import data
4. load the collection
5. verify search successfully
6. verify query successfully
7. wait for task clean triggered
8. verify the task not cleaned
"""
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_vec_only_fields,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
nq = 2
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
log.info("wait for task clean triggered")
time.sleep(6*60) # wait for 6 minutes for task clean triggered
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True, False])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8, 128
@pytest.mark.parametrize("entities", [100]) # 100, 1000
def test_failed_task_was_cleaned(self, is_row_based, auto_id, dim, entities):
"""
collection: auto_id, customized_id
collection schema: [pk, float_vector]
Steps:
1. create collection
2. import data with wrong dimension
3. verify the data entities is 0 and task was failed
4. wait for task clean triggered
5. verify the task was cleaned
"""
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_vec_only_fields,
err_type=DataErrorType.one_entity_wrong_dim,
wrong_position=entities // 2,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_ids, _ = self.utility_wrap.bulk_insert(
collection_name=c_name,
partition_name=None,
is_row_based=is_row_based,
files=files,
)
logging.info(f"bulk insert task ids:{task_ids}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert not success
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == 0
log.info("wait for task clean triggered")
time.sleep(6*60) # wait for 6 minutes for task clean triggered
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == 0
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=task_ids, timeout=90
)
assert not success
for state in states.values():
assert state.state_name in ["Failed and cleaned"]