test: add stats task feature cases and DefaultVectorSearchParams (#36768)

issue: #36767

---------

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
pull/36821/head^2
ThreadDao 2024-10-14 19:15:23 +08:00 committed by GitHub
parent 937ebec2ce
commit d566b0ceff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 306 additions and 30 deletions

View File

@ -389,9 +389,10 @@ def gen_vectors_in_numpy_file(dir, data_field, float_vector, rows, dim, vector_t
return file_name
def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False):
def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False, **kwargs):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
shuffle_pk = kwargs.get("shuffle_pk", False)
if not os.path.exists(file) or force:
# non vector columns
data = []
@ -399,7 +400,9 @@ def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False):
data = [gen_unique_str(str(i)) for i in range(start, rows+start)]
arr = np.array(data)
# print(f"file_name: {file_name} data type: {arr.dtype}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
if shuffle_pk:
np.random.shuffle(arr)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}, shuffle_pk: {shuffle_pk}")
np.save(file, arr)
return file_name
@ -463,9 +466,10 @@ def gen_json_in_numpy_file(dir, data_field, rows, start=0, force=False):
return file_name
def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, nullable=False):
def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, nullable=False, **kwargs):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
shuffle_pk = kwargs.get("shuffle_pk", False)
if not os.path.exists(file) or force:
# non vector columns
data = []
@ -477,13 +481,15 @@ def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False,
data = [np.float64(random.random()) for _ in range(rows)]
elif data_field == DataField.pk_field:
data = [i for i in range(start, start + rows)]
if shuffle_pk:
random.shuffle(data)
elif data_field == DataField.int_field:
if not nullable:
data = [random.randint(-999999, 9999999) for _ in range(rows)]
else:
data = [None for _ in range(rows)]
arr = np.array(data)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}, shuffle_pk: {shuffle_pk}")
np.save(file, arr)
return file_name
@ -694,6 +700,7 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk,
def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False, **kwargs):
schema = kwargs.get("schema", None)
shuffle = kwargs.get("shuffle", False)
schema = schema.to_dict() if schema is not None else None
data = []
nullable = False
@ -785,7 +792,9 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
d["name"] = fake.name()
d["address"] = fake.address()
data.append(d)
if shuffle:
random.shuffle(data)
log.info(f"shuffle={shuffle}")
return data
@ -850,6 +859,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
files = []
start_uid = 0
nullable = False
shuffle_pk = kwargs.get("shuffle_pk", False)
if file_nums == 1:
# gen the numpy file without subfolders if only one set of files
for data_field in data_fields:
@ -878,7 +888,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
file_name = gen_vectors_in_numpy_file(dir=data_source_new, data_field=data_field, float_vector=float_vector,
vector_type=vector_type, rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17
file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force, shuffle_pk=shuffle_pk)
elif data_field == DataField.text_field:
file_name = gen_text_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force, nullable=nullable)
elif data_field == DataField.bool_field:
@ -887,7 +897,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
file_name = gen_json_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source_new, data_field=data_field,
rows=rows, force=force, nullable=nullable)
rows=rows, force=force, nullable=nullable, shuffle_pk=shuffle_pk)
files.append(file_name)
if enable_dynamic_field and include_meta:
file_name = gen_dynamic_field_in_numpy_file(dir=data_source_new, rows=rows, force=force)

View File

@ -1846,7 +1846,7 @@ def gen_values(schema: CollectionSchema, nb, start_id=0, default_values: dict =
if default_value is not None:
data.append(default_value)
elif field.auto_id is False:
data.append(gen_data_by_collection_field(field, nb, start_id * nb))
data.append(gen_data_by_collection_field(field, nb, start_id))
return data

View File

@ -1,7 +1,5 @@
from dataclasses import dataclass
from typing import List, Dict
from pymilvus import DataType
from typing import List, Dict, Optional
""" Define param names"""
@ -284,6 +282,18 @@ class IndexPrams(BasePrams):
params: dict = None
metric_type: str = None
@dataclass
class SearchInsidePrams(BasePrams):
# inside params
radius: Optional[float] = None
range_filter: Optional[float] = None
group_by_field: Optional[str] = None
@dataclass
class SearchPrams(BasePrams):
metric_type: str = MetricType.L2
params: dict = None
""" Define default params """
@ -307,9 +317,15 @@ class DefaultVectorIndexParams:
}
@staticmethod
def HNSW(field: str, m: int = 8, ef: int = 200, metric_type=MetricType.L2):
def HNSW(field: str, m: int = 8, efConstruction: int = 200, metric_type=MetricType.L2):
return {
field: IndexPrams(index_type=IndexName.HNSW, params={"M": m, "efConstruction": ef}, metric_type=metric_type)
field: IndexPrams(index_type=IndexName.HNSW, params={"M": m, "efConstruction": efConstruction}, metric_type=metric_type)
}
@staticmethod
def SCANN(field: str, nlist: int = 128, metric_type=MetricType.L2):
return {
field: IndexPrams(index_type=IndexName.SCANN, params={"nlist": nlist}, metric_type=metric_type)
}
@staticmethod
@ -330,20 +346,19 @@ class DefaultVectorIndexParams:
}
@staticmethod
def SPARSE_WAND(field: str, drop_ratio_build: int = 0.2, metric_type=MetricType.IP):
def SPARSE_WAND(field: str, drop_ratio_build: float = 0.2, metric_type=MetricType.IP):
return {
field: IndexPrams(index_type=IndexName.SPARSE_WAND, params={"drop_ratio_build": drop_ratio_build},
metric_type=metric_type)
}
@staticmethod
def SPARSE_INVERTED_INDEX(field: str, drop_ratio_build: int = 0.2, metric_type=MetricType.IP):
def SPARSE_INVERTED_INDEX(field: str, drop_ratio_build: float = 0.2, metric_type=MetricType.IP):
return {
field: IndexPrams(index_type=IndexName.SPARSE_INVERTED_INDEX, params={"drop_ratio_build": drop_ratio_build},
metric_type=metric_type)
}
class DefaultScalarIndexParams:
@staticmethod
@ -389,6 +404,107 @@ class AlterIndexParams:
def index_mmap(enable: bool = True):
return {'mmap.enabled': enable}
class DefaultVectorSearchParams:
@staticmethod
def FLAT(metric_type=MetricType.L2, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def IVF_FLAT(metric_type=MetricType.L2, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"nprobe": nprobe}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def IVF_SQ8(metric_type=MetricType.L2, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"nprobe": nprobe}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def HNSW(metric_type=MetricType.L2, ef: int = 200, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"ef": ef}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def SCANN(metric_type=MetricType.L2, nprobe: int = 32, reorder_k: int = 200, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"nprobe": nprobe, "reorder_k": reorder_k}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def DISKANN(metric_type=MetricType.L2, search_list: int = 30, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"search_list": search_list}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def BIN_FLAT(metric_type=MetricType.JACCARD, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def BIN_IVF_FLAT(metric_type=MetricType.JACCARD, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"nprobe": nprobe}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def SPARSE_WAND(metric_type=MetricType.IP, drop_ratio_search: float = 0.2, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"drop_ratio_search": drop_ratio_search}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@staticmethod
def SPARSE_INVERTED_INDEX(metric_type=MetricType.IP, drop_ratio_search: float = 0.2, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"drop_ratio_search": drop_ratio_search}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)
sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp
@dataclass
class ExprCheckParams:

View File

@ -9,6 +9,7 @@ from pathlib import Path
from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
from common.common_params import DefaultVectorIndexParams, DefaultVectorSearchParams
from common.milvus_sys import MilvusSys
from common.common_type import CaseLabel, CheckTasks
from utils.util_log import test_log as log
@ -2160,3 +2161,77 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
empty_partition_num += 1
num_entities += p.num_entities
assert num_entities == entities * file_nums
@pytest.mark.parametrize("pk_field", [df.pk_field, df.string_field])
@pytest.mark.tags(CaseLabel.L3)
def test_bulk_import_random_pk_stats_task(self, pk_field):
# connect -> prepare json data
self._connect()
collection_name = cf.gen_unique_str("stats_task")
nb = 3000
fields = []
files = ""
# prepare data: int64_pk -> json data; varchar_pk -> numpy data
if pk_field == df.pk_field:
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=False),
cf.gen_float_vec_field(name=df.float_vec_field, dim=ct.default_dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name,
is_row_based=True, rows=nb, dim=ct.default_dim, auto_id=False, data_fields=data_fields, force=True,
shuffle=True
)
elif pk_field == df.string_field:
fields = [
cf.gen_string_field(name=df.string_field, is_primary=True, auto_id=False),
cf.gen_float_vec_field(name=df.float_vec_field, dim=ct.default_dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_numpy_files(
minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name,
rows=nb, dim=ct.default_dim, data_fields=data_fields, enable_dynamic_field=False, force=True,
shuffle_pk=True
)
else:
log.error(f"pk_field name {pk_field} not supported now, [{df.pk_field}, {df.string_field}] expected~")
# create collection -> create vector index
schema = cf.gen_collection_schema(fields=fields)
self.collection_wrap.init_collection(collection_name, schema=schema)
self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(df.float_vec_field))
# bulk_insert data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=collection_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
completed, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{completed} with latency {tt}")
assert completed
# load -> get_segment_info -> verify stats task
self.collection_wrap.load()
res_segment_info, _ = self.utility_wrap.get_query_segment_info(collection_name)
assert len(res_segment_info) > 0 # maybe mix compaction to 1 segment
cnt = 0
for r in res_segment_info:
log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows}; is_sorted: {r.is_sorted} ")
cnt += r.num_rows
assert r.is_sorted is True
assert cnt == nb
# verify search
self.collection_wrap.search(
data=cf.gen_vectors(ct.default_nq, ct.default_dim, vector_data_type=DataType.FLOAT_VECTOR.name),
anns_field=df.float_vec_field, param=DefaultVectorSearchParams.IVF_SQ8(),
limit=ct.default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": ct.default_nq,
"limit": ct.default_limit})

View File

@ -7,6 +7,7 @@ from pymilvus.exceptions import MilvusException
from base.client_base import TestcaseBase
from base.collection_wrapper import ApiCollectionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from common.common_params import FieldParams, DefaultVectorIndexParams, DefaultVectorSearchParams
from utils.util_log import test_log as log
from common import common_func as cf
from common import common_type as ct
@ -1359,24 +1360,98 @@ class TestUtilityAdvanced(TestcaseBase):
assert len(res) == 0
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("index must created before load, but create_index will trigger flush")
def test_get_sealed_query_segment_info(self):
@pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"])
def test_get_sealed_query_segment_info(self, primary_field):
"""
target: test getting sealed query segment info of collection without index
method: init a collection, insert data, flush, load, and get query segment info
target: test getting sealed query segment info of collection with data
method: init a collection, insert data, flush, index, load, and get query segment info
expected:
1. length of segment is equal to 0
1. length of segment is greater than 0
2. the sum num_rows of each segment is equal to num of entities
3. all segment is_sorted true
"""
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
collection_w.num_entities
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
res, _ = self.utility_wrap.get_query_segment_info(c_name)
assert len(res) == 0
segment_num = 2
collection_name = cf.gen_unique_str(prefix)
# connect -> create collection
self._connect()
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, ct.default_float_vec_field_name],
field_params={
primary_field: FieldParams(is_primary=True, max_length=128).to_dict,
ct.default_float_vec_field_name: FieldParams(dim=ct.default_dim).to_dict,
},
)
)
for _ in range(segment_num):
# insert random pks, ***start=None will generate random data***
data = cf.gen_values(self.collection_wrap.schema, nb=nb, start_id=None)
self.collection_wrap.insert(data)
self.collection_wrap.flush()
# flush -> index -> load -> sealed segments is sorted
self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(ct.default_float_vec_field_name))
self.collection_wrap.load()
# get_query_segment_info and verify results
res_sealed, _ = self.utility_wrap.get_query_segment_info(collection_name)
assert len(res_sealed) > 0 # maybe mix compaction to 1 segment
cnt = 0
for r in res_sealed:
log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows}; is_sorted: {r.is_sorted} ")
cnt += r.num_rows
assert r.is_sorted is True
assert cnt == nb * segment_num
# verify search
self.collection_wrap.search(data=cf.gen_vectors(ct.default_nq, ct.default_dim),
anns_field=ct.default_float_vec_field_name, param=DefaultVectorSearchParams.IVF_SQ8(),
limit=ct.default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": ct.default_nq,
"limit": ct.default_limit})
@pytest.mark.tags(CaseLabel.L1)
def test_get_growing_query_segment_info(self):
"""
target: test getting growing query segment info of collection with data
method: init a collection, index, load, insert data, and get query segment info
expected:
1. length of segment is 0, growing segment is not visible for get_query_segment_info
"""
nb = 3000
primary_field = "int64"
collection_name = cf.gen_unique_str(prefix)
# connect -> create collection
self._connect()
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, ct.default_float_vec_field_name],
field_params={
primary_field: FieldParams(is_primary=True, max_length=128).to_dict,
ct.default_float_vec_field_name: FieldParams(dim=ct.default_dim).to_dict,
},
)
)
# index -> load
self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(ct.default_float_vec_field_name))
self.collection_wrap.load()
# insert random pks, ***start=None will generate random data***
data = cf.gen_values(self.collection_wrap.schema, nb=nb, start_id=None)
self.collection_wrap.insert(data)
# get_query_segment_info and verify results
res_sealed, _ = self.utility_wrap.get_query_segment_info(collection_name)
assert len(res_sealed) == 0
@pytest.mark.tags(CaseLabel.L1)
def test_get_sealed_query_segment_info_after_create_index(self):