From d566b0ceff3ea586cea92a841a878b0479870653 Mon Sep 17 00:00:00 2001 From: ThreadDao Date: Mon, 14 Oct 2024 19:15:23 +0800 Subject: [PATCH] test: add stats task feature cases and DefaultVectorSearchParams (#36768) issue: #36767 --------- Signed-off-by: ThreadDao --- .../python_client/common/bulk_insert_data.py | 24 +++- tests/python_client/common/common_func.py | 2 +- tests/python_client/common/common_params.py | 132 ++++++++++++++++-- .../testcases/test_bulk_insert.py | 75 ++++++++++ tests/python_client/testcases/test_utility.py | 103 ++++++++++++-- 5 files changed, 306 insertions(+), 30 deletions(-) diff --git a/tests/python_client/common/bulk_insert_data.py b/tests/python_client/common/bulk_insert_data.py index df8b962265..632a3eb6bd 100644 --- a/tests/python_client/common/bulk_insert_data.py +++ b/tests/python_client/common/bulk_insert_data.py @@ -389,9 +389,10 @@ def gen_vectors_in_numpy_file(dir, data_field, float_vector, rows, dim, vector_t return file_name -def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False): +def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False, **kwargs): file_name = f"{data_field}.npy" file = f"{dir}/{file_name}" + shuffle_pk = kwargs.get("shuffle_pk", False) if not os.path.exists(file) or force: # non vector columns data = [] @@ -399,7 +400,9 @@ def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False): data = [gen_unique_str(str(i)) for i in range(start, rows+start)] arr = np.array(data) # print(f"file_name: {file_name} data type: {arr.dtype}") - log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") + if shuffle_pk: + np.random.shuffle(arr) + log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}, shuffle_pk: {shuffle_pk}") np.save(file, arr) return file_name @@ -463,9 +466,10 @@ def gen_json_in_numpy_file(dir, data_field, rows, start=0, force=False): return file_name -def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, nullable=False): +def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, nullable=False, **kwargs): file_name = f"{data_field}.npy" file = f"{dir}/{file_name}" + shuffle_pk = kwargs.get("shuffle_pk", False) if not os.path.exists(file) or force: # non vector columns data = [] @@ -477,13 +481,15 @@ def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, data = [np.float64(random.random()) for _ in range(rows)] elif data_field == DataField.pk_field: data = [i for i in range(start, start + rows)] + if shuffle_pk: + random.shuffle(data) elif data_field == DataField.int_field: if not nullable: data = [random.randint(-999999, 9999999) for _ in range(rows)] else: data = [None for _ in range(rows)] arr = np.array(data) - log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") + log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}, shuffle_pk: {shuffle_pk}") np.save(file, arr) return file_name @@ -694,6 +700,7 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk, def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False, **kwargs): schema = kwargs.get("schema", None) + shuffle = kwargs.get("shuffle", False) schema = schema.to_dict() if schema is not None else None data = [] nullable = False @@ -785,7 +792,9 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d d["name"] = fake.name() d["address"] = fake.address() data.append(d) - + if shuffle: + random.shuffle(data) + log.info(f"shuffle={shuffle}") return data @@ -850,6 +859,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num files = [] start_uid = 0 nullable = False + shuffle_pk = kwargs.get("shuffle_pk", False) if file_nums == 1: # gen the numpy file without subfolders if only one set of files for data_field in data_fields: @@ -878,7 +888,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num file_name = gen_vectors_in_numpy_file(dir=data_source_new, data_field=data_field, float_vector=float_vector, vector_type=vector_type, rows=rows, dim=dim, force=force) elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17 - file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force) + file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force, shuffle_pk=shuffle_pk) elif data_field == DataField.text_field: file_name = gen_text_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force, nullable=nullable) elif data_field == DataField.bool_field: @@ -887,7 +897,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num file_name = gen_json_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force) else: file_name = gen_int_or_float_in_numpy_file(dir=data_source_new, data_field=data_field, - rows=rows, force=force, nullable=nullable) + rows=rows, force=force, nullable=nullable, shuffle_pk=shuffle_pk) files.append(file_name) if enable_dynamic_field and include_meta: file_name = gen_dynamic_field_in_numpy_file(dir=data_source_new, rows=rows, force=force) diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 694ce1e688..c41b38d26d 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -1846,7 +1846,7 @@ def gen_values(schema: CollectionSchema, nb, start_id=0, default_values: dict = if default_value is not None: data.append(default_value) elif field.auto_id is False: - data.append(gen_data_by_collection_field(field, nb, start_id * nb)) + data.append(gen_data_by_collection_field(field, nb, start_id)) return data diff --git a/tests/python_client/common/common_params.py b/tests/python_client/common/common_params.py index 3d1331781d..44cbecf8df 100644 --- a/tests/python_client/common/common_params.py +++ b/tests/python_client/common/common_params.py @@ -1,7 +1,5 @@ from dataclasses import dataclass -from typing import List, Dict - -from pymilvus import DataType +from typing import List, Dict, Optional """ Define param names""" @@ -284,6 +282,18 @@ class IndexPrams(BasePrams): params: dict = None metric_type: str = None +@dataclass +class SearchInsidePrams(BasePrams): + # inside params + radius: Optional[float] = None + range_filter: Optional[float] = None + group_by_field: Optional[str] = None + +@dataclass +class SearchPrams(BasePrams): + metric_type: str = MetricType.L2 + params: dict = None + """ Define default params """ @@ -307,9 +317,15 @@ class DefaultVectorIndexParams: } @staticmethod - def HNSW(field: str, m: int = 8, ef: int = 200, metric_type=MetricType.L2): + def HNSW(field: str, m: int = 8, efConstruction: int = 200, metric_type=MetricType.L2): return { - field: IndexPrams(index_type=IndexName.HNSW, params={"M": m, "efConstruction": ef}, metric_type=metric_type) + field: IndexPrams(index_type=IndexName.HNSW, params={"M": m, "efConstruction": efConstruction}, metric_type=metric_type) + } + + @staticmethod + def SCANN(field: str, nlist: int = 128, metric_type=MetricType.L2): + return { + field: IndexPrams(index_type=IndexName.SCANN, params={"nlist": nlist}, metric_type=metric_type) } @staticmethod @@ -330,20 +346,19 @@ class DefaultVectorIndexParams: } @staticmethod - def SPARSE_WAND(field: str, drop_ratio_build: int = 0.2, metric_type=MetricType.IP): + def SPARSE_WAND(field: str, drop_ratio_build: float = 0.2, metric_type=MetricType.IP): return { field: IndexPrams(index_type=IndexName.SPARSE_WAND, params={"drop_ratio_build": drop_ratio_build}, metric_type=metric_type) } @staticmethod - def SPARSE_INVERTED_INDEX(field: str, drop_ratio_build: int = 0.2, metric_type=MetricType.IP): + def SPARSE_INVERTED_INDEX(field: str, drop_ratio_build: float = 0.2, metric_type=MetricType.IP): return { field: IndexPrams(index_type=IndexName.SPARSE_INVERTED_INDEX, params={"drop_ratio_build": drop_ratio_build}, metric_type=metric_type) } - class DefaultScalarIndexParams: @staticmethod @@ -389,6 +404,107 @@ class AlterIndexParams: def index_mmap(enable: bool = True): return {'mmap.enabled': enable} +class DefaultVectorSearchParams: + + @staticmethod + def FLAT(metric_type=MetricType.L2, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def IVF_FLAT(metric_type=MetricType.L2, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {"nprobe": nprobe} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def IVF_SQ8(metric_type=MetricType.L2, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {"nprobe": nprobe} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def HNSW(metric_type=MetricType.L2, ef: int = 200, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {"ef": ef} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def SCANN(metric_type=MetricType.L2, nprobe: int = 32, reorder_k: int = 200, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {"nprobe": nprobe, "reorder_k": reorder_k} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def DISKANN(metric_type=MetricType.L2, search_list: int = 30, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {"search_list": search_list} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def BIN_FLAT(metric_type=MetricType.JACCARD, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def BIN_IVF_FLAT(metric_type=MetricType.JACCARD, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {"nprobe": nprobe} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def SPARSE_WAND(metric_type=MetricType.IP, drop_ratio_search: float = 0.2, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {"drop_ratio_search": drop_ratio_search} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp + + @staticmethod + def SPARSE_INVERTED_INDEX(metric_type=MetricType.IP, drop_ratio_search: float = 0.2, inside_params: SearchInsidePrams = None, **kwargs): + inside_params_dict = {"drop_ratio_search": drop_ratio_search} + if inside_params is not None: + inside_params_dict.update(inside_params.to_dict) + + sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict + sp.update(kwargs) + return sp @dataclass class ExprCheckParams: diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index 557dfdb0f6..23d18ba054 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -9,6 +9,7 @@ from pathlib import Path from base.client_base import TestcaseBase from common import common_func as cf from common import common_type as ct +from common.common_params import DefaultVectorIndexParams, DefaultVectorSearchParams from common.milvus_sys import MilvusSys from common.common_type import CaseLabel, CheckTasks from utils.util_log import test_log as log @@ -2160,3 +2161,77 @@ class TestBulkInsert(TestcaseBaseBulkInsert): empty_partition_num += 1 num_entities += p.num_entities assert num_entities == entities * file_nums + + @pytest.mark.parametrize("pk_field", [df.pk_field, df.string_field]) + @pytest.mark.tags(CaseLabel.L3) + def test_bulk_import_random_pk_stats_task(self, pk_field): + # connect -> prepare json data + self._connect() + collection_name = cf.gen_unique_str("stats_task") + nb = 3000 + fields = [] + files = "" + + # prepare data: int64_pk -> json data; varchar_pk -> numpy data + if pk_field == df.pk_field: + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=False), + cf.gen_float_vec_field(name=df.float_vec_field, dim=ct.default_dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_new_json_files( + minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, + is_row_based=True, rows=nb, dim=ct.default_dim, auto_id=False, data_fields=data_fields, force=True, + shuffle=True + ) + elif pk_field == df.string_field: + fields = [ + cf.gen_string_field(name=df.string_field, is_primary=True, auto_id=False), + cf.gen_float_vec_field(name=df.float_vec_field, dim=ct.default_dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_numpy_files( + minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name, + rows=nb, dim=ct.default_dim, data_fields=data_fields, enable_dynamic_field=False, force=True, + shuffle_pk=True + ) + else: + log.error(f"pk_field name {pk_field} not supported now, [{df.pk_field}, {df.string_field}] expected~") + + # create collection -> create vector index + schema = cf.gen_collection_schema(fields=fields) + self.collection_wrap.init_collection(collection_name, schema=schema) + self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(df.float_vec_field)) + + # bulk_insert data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=collection_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + completed, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{completed} with latency {tt}") + assert completed + + # load -> get_segment_info -> verify stats task + self.collection_wrap.load() + res_segment_info, _ = self.utility_wrap.get_query_segment_info(collection_name) + assert len(res_segment_info) > 0 # maybe mix compaction to 1 segment + cnt = 0 + for r in res_segment_info: + log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows}; is_sorted: {r.is_sorted} ") + cnt += r.num_rows + assert r.is_sorted is True + assert cnt == nb + + # verify search + self.collection_wrap.search( + data=cf.gen_vectors(ct.default_nq, ct.default_dim, vector_data_type=DataType.FLOAT_VECTOR.name), + anns_field=df.float_vec_field, param=DefaultVectorSearchParams.IVF_SQ8(), + limit=ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": ct.default_nq, + "limit": ct.default_limit}) diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index ee578b0d8e..b62a904a9a 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -7,6 +7,7 @@ from pymilvus.exceptions import MilvusException from base.client_base import TestcaseBase from base.collection_wrapper import ApiCollectionWrapper from base.utility_wrapper import ApiUtilityWrapper +from common.common_params import FieldParams, DefaultVectorIndexParams, DefaultVectorSearchParams from utils.util_log import test_log as log from common import common_func as cf from common import common_type as ct @@ -1359,24 +1360,98 @@ class TestUtilityAdvanced(TestcaseBase): assert len(res) == 0 @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.skip("index must created before load, but create_index will trigger flush") - def test_get_sealed_query_segment_info(self): + @pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"]) + def test_get_sealed_query_segment_info(self, primary_field): """ - target: test getting sealed query segment info of collection without index - method: init a collection, insert data, flush, load, and get query segment info + target: test getting sealed query segment info of collection with data + method: init a collection, insert data, flush, index, load, and get query segment info expected: - 1. length of segment is equal to 0 + 1. length of segment is greater than 0 + 2. the sum num_rows of each segment is equal to num of entities + 3. all segment is_sorted true """ - c_name = cf.gen_unique_str(prefix) - collection_w = self.init_collection_wrap(name=c_name) nb = 3000 - df = cf.gen_default_dataframe_data(nb) - collection_w.insert(df) - collection_w.num_entities - collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index) - collection_w.load() - res, _ = self.utility_wrap.get_query_segment_info(c_name) - assert len(res) == 0 + segment_num = 2 + collection_name = cf.gen_unique_str(prefix) + + # connect -> create collection + self._connect() + self.collection_wrap.init_collection( + name=collection_name, + schema=cf.set_collection_schema( + fields=[primary_field, ct.default_float_vec_field_name], + field_params={ + primary_field: FieldParams(is_primary=True, max_length=128).to_dict, + ct.default_float_vec_field_name: FieldParams(dim=ct.default_dim).to_dict, + }, + ) + ) + + for _ in range(segment_num): + # insert random pks, ***start=None will generate random data*** + data = cf.gen_values(self.collection_wrap.schema, nb=nb, start_id=None) + self.collection_wrap.insert(data) + self.collection_wrap.flush() + + # flush -> index -> load -> sealed segments is sorted + self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(ct.default_float_vec_field_name)) + self.collection_wrap.load() + + # get_query_segment_info and verify results + res_sealed, _ = self.utility_wrap.get_query_segment_info(collection_name) + assert len(res_sealed) > 0 # maybe mix compaction to 1 segment + cnt = 0 + for r in res_sealed: + log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows}; is_sorted: {r.is_sorted} ") + cnt += r.num_rows + assert r.is_sorted is True + assert cnt == nb * segment_num + + # verify search + self.collection_wrap.search(data=cf.gen_vectors(ct.default_nq, ct.default_dim), + anns_field=ct.default_float_vec_field_name, param=DefaultVectorSearchParams.IVF_SQ8(), + limit=ct.default_limit, + check_task=CheckTasks.check_search_results, + check_items={"nq": ct.default_nq, + "limit": ct.default_limit}) + + @pytest.mark.tags(CaseLabel.L1) + def test_get_growing_query_segment_info(self): + """ + target: test getting growing query segment info of collection with data + method: init a collection, index, load, insert data, and get query segment info + expected: + 1. length of segment is 0, growing segment is not visible for get_query_segment_info + """ + nb = 3000 + primary_field = "int64" + collection_name = cf.gen_unique_str(prefix) + + # connect -> create collection + self._connect() + self.collection_wrap.init_collection( + name=collection_name, + schema=cf.set_collection_schema( + fields=[primary_field, ct.default_float_vec_field_name], + field_params={ + primary_field: FieldParams(is_primary=True, max_length=128).to_dict, + ct.default_float_vec_field_name: FieldParams(dim=ct.default_dim).to_dict, + }, + ) + ) + + # index -> load + self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(ct.default_float_vec_field_name)) + self.collection_wrap.load() + + # insert random pks, ***start=None will generate random data*** + data = cf.gen_values(self.collection_wrap.schema, nb=nb, start_id=None) + self.collection_wrap.insert(data) + + # get_query_segment_info and verify results + res_sealed, _ = self.utility_wrap.get_query_segment_info(collection_name) + assert len(res_sealed) == 0 + @pytest.mark.tags(CaseLabel.L1) def test_get_sealed_query_segment_info_after_create_index(self):