test: [E2e Refactor]gen collection name by testcase name and update search pagination test (#41170)

1.  gen collection name by testcase name
2. update search pagination test with milvus client v2
3. use collection shared mode for some tests
related issue: #40698

---------

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
pull/41195/head
yanliang567 2025-04-09 17:44:28 +08:00 committed by GitHub
parent 13f31af35a
commit bbaa3c71f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 329 additions and 390 deletions

View File

@ -182,7 +182,7 @@ class TestcaseBase(Base):
def init_collection_wrap(self, name=None, schema=None, check_task=None, check_items=None,
enable_dynamic_field=False, with_json=True, **kwargs):
name = cf.gen_unique_str('coll_') if name is None else name
name = cf.gen_collection_name_by_testcase_name(2) if name is None else name
schema = cf.gen_default_collection_schema(enable_dynamic_field=enable_dynamic_field, with_json=with_json) \
if schema is None else schema
if not self.connection_wrap.has_connection(alias=DefaultConfig.DEFAULT_USING)[0]:
@ -276,7 +276,7 @@ class TestcaseBase(Base):
log.info("Test case of search interface: initialize before test case")
if not self.connection_wrap.has_connection(alias=DefaultConfig.DEFAULT_USING)[0]:
self._connect()
collection_name = cf.gen_unique_str(prefix)
collection_name = cf.gen_collection_name_by_testcase_name(2)
if name is not None:
collection_name = name
if not isinstance(nullable_fields, dict):
@ -363,7 +363,8 @@ class TestcaseBase(Base):
:return: collection wrap and partition wrap
"""
self._connect()
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
collection_name = cf.gen_collection_name_by_testcase_name(2)
collection_w = self.init_collection_wrap(name=collection_name)
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
# insert [0, half) into partition_w
df_partition = cf.gen_default_dataframe_data(nb=half, start=0)
@ -387,7 +388,8 @@ class TestcaseBase(Base):
:param is_dup: whether the primary keys of each segment is duplicated
:return: collection wrap and partition wrap
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(collection_prefix), shards_num=1)
collection_name = cf.gen_collection_name_by_testcase_name(2)
collection_w = self.init_collection_wrap(name=collection_name, shards_num=1)
for i in range(num_of_segment):
start = 0 if is_dup else i * nb_of_segment

View File

@ -76,8 +76,8 @@ class TestMilvusClientV2Base(Base):
@trace()
def create_collection(self, client, collection_name, dimension=None, primary_field_name='id',
id_type='int', vector_field_name='vector', metric_type='COSINE',
auto_id=False, schema=None, index_params=None, timeout=None, check_task=None,
check_items=None, **kwargs):
auto_id=False, schema=None, index_params=None, timeout=None, force_teardown=True,
check_task=None, check_items=None, **kwargs):
timeout = TIMEOUT if timeout is None else timeout
consistency_level = kwargs.get("consistency_level", "Strong")
kwargs.update({"consistency_level": consistency_level})
@ -89,8 +89,9 @@ class TestMilvusClientV2Base(Base):
check_result = ResponseChecker(res, func_name, check_task, check_items, check,
collection_name=collection_name, dimension=dimension,
**kwargs).run()
# self.tear_down_collection_names.append(collection_name)
if force_teardown:
# if running with collection-shared-mode, please not teardown here, but do it in the specific test class
self.tear_down_collection_names.append(collection_name)
return res, check_result
def has_collection(self, client, collection_name, timeout=None, check_task=None,

View File

@ -24,8 +24,9 @@ from collections import Counter
import bm25s
import jieba
import re
import inspect
from pymilvus import CollectionSchema, DataType, FunctionType, Function
from pymilvus import CollectionSchema, DataType, FunctionType, Function, MilvusException
from bm25s.tokenization import Tokenizer
@ -3101,8 +3102,8 @@ def install_milvus_operator_specific_config(namespace, milvus_mode, release_name
}
mil = MilvusOperator()
mil.install(data_config)
if mil.wait_for_healthy(release_name, NAMESPACE, timeout=TIMEOUT):
host = mic.endpoint(release_name, NAMESPACE).split(':')[0]
if mil.wait_for_healthy(release_name, namespace, timeout=1800):
host = mil.endpoint(release_name, namespace).split(':')[0]
else:
raise MilvusException(message=f'Milvus healthy timeout 1800s')
@ -3404,3 +3405,12 @@ def iter_insert_list_data(data: list, batch: int, total_len: int):
data_obj = [iter(d) for d in data]
for n in nb_list:
yield [[next(o) for _ in range(n)] for o in data_obj]
def gen_collection_name_by_testcase_name(module_index=1):
"""
Gen a unique collection name by testcase name
if calling from the test base class, module_index=2
if calling from the testcase, module_index=1
"""
return inspect.stack()[module_index][3] + gen_unique_str("_")

View File

@ -26,7 +26,6 @@ class TestMilvusClientE2E(TestMilvusClientV2Base):
""" Test case of end-to-end interface """
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.skip(reason="issue #40686")
@pytest.mark.parametrize("flush_enable", [True, False])
@pytest.mark.parametrize("scalar_index_enable", [True, False])
def test_milvus_client_e2e_default(self, flush_enable, scalar_index_enable):
@ -40,7 +39,7 @@ class TestMilvusClientE2E(TestMilvusClientV2Base):
client = self._client()
# 1. Create collection with custom schema
collection_name = cf.gen_unique_str("test_e2e")
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
# Primary key and vector field
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
@ -66,7 +65,7 @@ class TestMilvusClientE2E(TestMilvusClientV2Base):
self.create_collection(client, collection_name, schema=schema)
# 2. Insert data with null values for nullable fields
num_inserts = 3 # insert data for 3 times
num_inserts = 5 # insert data for 5 times
total_rows = []
for batch in range(num_inserts):
vectors = cf.gen_vectors(default_nb, default_dim)
@ -112,6 +111,7 @@ class TestMilvusClientE2E(TestMilvusClientV2Base):
t0 = time.time()
self.insert(client, collection_name, rows)
t1 = time.time()
time.sleep(0.5)
log.info(f"Insert batch {batch + 1}: {default_nb} entities cost {t1 - t0:.4f} seconds")
log.info(f"Total inserted {num_inserts * default_nb} entities")
@ -135,6 +135,7 @@ class TestMilvusClientE2E(TestMilvusClientV2Base):
index_params.add_index(field_name="float_field", index_type="AUTOINDEX")
index_params.add_index(field_name="double_field", index_type="AUTOINDEX")
index_params.add_index(field_name="varchar_field", index_type="AUTOINDEX")
index_params.add_index(field_name="array_field", index_type="AUTOINDEX")
# 3. create index
self.create_index(client, collection_name, index_params)
@ -142,8 +143,8 @@ class TestMilvusClientE2E(TestMilvusClientV2Base):
# Verify scalar indexes are created if enabled
indexes = self.list_indexes(client, collection_name)[0]
log.info(f"Created indexes: {indexes}")
expected_scalar_indexes = ["int8_field", "int16_field", "int32_field", "int64_field",
"float_field", "double_field", "varchar_field"]
expected_scalar_indexes = ["int8_field", "int16_field", "int32_field", "int64_field",
"float_field", "double_field", "varchar_field", "array_field"]
if scalar_index_enable:
for field in expected_scalar_indexes:
assert field in indexes, f"Scalar index not created for field: {field}"

View File

@ -1,10 +1,6 @@
import logging
import numpy as np
from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTENCY_SESSION, CONSISTENCY_EVENTUALLY
from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker
from pymilvus import (
FieldSchema, CollectionSchema, DataType,
Collection
)
from common.constants import *
from utils.util_pymilvus import *
from common.common_type import CaseLabel, CheckTasks
@ -17,6 +13,7 @@ import random
import pytest
import pandas as pd
from faker import Faker
import inspect
Faker.seed(19530)
fake_en = Faker("en_US")
@ -56,8 +53,6 @@ half_nb = ct.default_nb // 2
default_primary_key_field_name = "id"
default_vector_field_name = "vector"
default_float_field_name = ct.default_float_field_name
default_string_field_name = ct.default_string_field_name
@pytest.mark.xdist_group("TestMilvusClientSearchPagination")
@ -66,7 +61,17 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
def setup_class(self):
super().setup_class(self)
self.collection_name = cf.gen_unique_str("test_search_pagination")
self.collection_name = "TestMilvusClientSearchPagination" + cf.gen_unique_str("_")
self.float_vector_field_name = "float_vector"
self.bfloat16_vector_field_name = "bfloat16_vector"
self.sparse_vector_field_name = "sparse_vector"
self.binary_vector_field_name = "binary_vector"
self.float_vector_dim = 128
self.bf16_vector_dim = 200
self.binary_vector_dim = 256
self.primary_keys = []
self.enable_dynamic_field = False
self.datas = []
@pytest.fixture(scope="class", autouse=True)
def prepare_collection(self, request):
@ -75,46 +80,80 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
"""
# Get client connection
client = self._client()
# Create collection
self.collection_schema = self.create_schema(client, enable_dynamic_field=False)[0]
self.collection_schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
self.collection_schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
self.collection_schema.add_field(default_float_field_name, DataType.FLOAT)
self.collection_schema.add_field(default_string_field_name, DataType.VARCHAR, max_length=65535)
self.create_collection(client, self.collection_name, schema=self.collection_schema)
# Insert data 5 times with non-duplicated primary keys
for j in range(5):
rows = [{default_primary_key_field_name: i + j * default_nb,
default_vector_field_name: list(cf.gen_vectors(1, default_dim)[0]),
default_float_field_name: (i + j * default_nb) * 1.0,
default_string_field_name: str(i + j * default_nb)}
for i in range(default_nb)]
self.insert(client, self.collection_name, rows)
# Create collection
collection_schema = self.create_schema(client, enable_dynamic_field=self.enable_dynamic_field)[0]
collection_schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
collection_schema.add_field(self.float_vector_field_name, DataType.FLOAT_VECTOR, dim=128)
collection_schema.add_field(self.bfloat16_vector_field_name, DataType.BFLOAT16_VECTOR, dim=200)
collection_schema.add_field(self.sparse_vector_field_name, DataType.SPARSE_FLOAT_VECTOR)
collection_schema.add_field(self.binary_vector_field_name, DataType.BINARY_VECTOR, dim=256)
collection_schema.add_field(default_float_field_name, DataType.FLOAT)
collection_schema.add_field(default_string_field_name, DataType.VARCHAR, max_length=256)
collection_schema.add_field(default_int64_field_name, DataType.INT64)
self.create_collection(client, self.collection_name, schema=collection_schema, force_teardown=False)
# Define number of insert iterations
insert_times = 10
# Generate vectors for each type and store in self
float_vectors = cf.gen_vectors(default_nb * insert_times, dim=self.float_vector_dim, vector_data_type='FLOAT_VECTOR')
bfloat16_vectors = cf.gen_vectors(default_nb * insert_times, dim=self.bf16_vector_dim, vector_data_type='BFLOAT16_VECTOR')
sparse_vectors = cf.gen_sparse_vectors(default_nb * insert_times, empty_percentage=2)
_, binary_vectors = cf.gen_binary_vectors(default_nb * insert_times, dim=self.binary_vector_dim)
# Insert data multiple times with non-duplicated primary keys
for j in range(insert_times):
rows = [{
default_primary_key_field_name: i + j * default_nb,
self.float_vector_field_name: list(float_vectors[i + j * default_nb]),
self.bfloat16_vector_field_name: bfloat16_vectors[i + j * default_nb],
self.sparse_vector_field_name: sparse_vectors[i + j * default_nb],
self.binary_vector_field_name: binary_vectors[i + j * default_nb],
default_float_field_name: (i + j * default_nb) * 1.0,
default_string_field_name: str(i + j * default_nb),
default_int64_field_name: i + j * default_nb
}
for i in range(default_nb)]
self.datas.extend(rows)
self.primary_keys.extend([i + j * default_nb for i in range(default_nb)])
self.insert(client, self.collection_name, data=rows)
self.flush(client, self.collection_name)
# Create index
self.index_params = self.prepare_index_params(client)[0]
self.index_params.add_index(field_name=default_vector_field_name,
metric_type="COSINE",
index_type="IVF_FLAT",
params={"nlist": 128})
self.create_index(client, self.collection_name, index_params=self.index_params)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name=self.float_vector_field_name,
metric_type="COSINE",
index_type="IVF_FLAT",
params={"nlist": 128})
index_params.add_index(field_name=self.bfloat16_vector_field_name,
metric_type="L2",
index_type="DISKANN",
params={})
index_params.add_index(field_name=self.sparse_vector_field_name,
metric_type="IP",
index_type="SPARSE_INVERTED_INDEX",
params={})
index_params.add_index(field_name=self.binary_vector_field_name,
metric_type="JACCARD",
index_type="BIN_IVF_FLAT",
params={"nlist": 128})
self.create_index(client, self.collection_name, index_params=index_params)
# Load collection
self.load_collection(client, self.collection_name)
def teardown():
self.drop_collection(self._client(), self.collection_name)
request.addfinalizer(teardown)
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_search_with_pagination_default(self):
def test_search_float_vectors_with_pagination_default(self):
"""
target: test search with pagination
target: test search float vectors with pagination
method: 1. connect and create a collection
2. search pagination with offset
2. search float vectors with pagination
3. search with offset+limit
4. compare with the search results whose corresponding ids should be the same
expected: search successfully and ids is correct
@ -135,7 +174,7 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=default_vector_field_name,
anns_field=self.float_vector_field_name,
search_params=search_params,
limit=limit,
check_task=CheckTasks.check_search_results,
@ -152,7 +191,7 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=default_vector_field_name,
anns_field=self.float_vector_field_name,
search_params=search_params_full,
limit=limit * pages
)
@ -166,11 +205,11 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
assert page_ids == ids_in_full
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_search_with_pagination_default1(self):
def test_search_bfloat16_with_pagination_default(self):
"""
target: test search with pagination
target: test search bfloat16 vectors with pagination
method: 1. connect and create a collection
2. search pagination with offset
2. search bfloat16 vectors with pagination
3. search with offset+limit
4. compare with the search results whose corresponding ids should be the same
expected: search successfully and ids is correct
@ -182,16 +221,16 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
# 2. Search with pagination for 10 pages
limit = 100
pages = 10
vectors_to_search = cf.gen_vectors(default_nq, default_dim)
vectors_to_search = cf.gen_vectors(default_nq, self.bf16_vector_dim, vector_data_type='BFLOAT16_VECTOR')
all_pages_results = []
for page in range(pages):
offset = page * limit
search_params = {"metric_type": "COSINE", "params": {"nprobe": 100}, "offset": offset}
search_params = {"offset": offset}
search_res_with_offset, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=default_vector_field_name,
anns_field=self.bfloat16_vector_field_name,
search_params=search_params,
limit=limit,
check_task=CheckTasks.check_search_results,
@ -203,12 +242,69 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
all_pages_results.append(search_res_with_offset)
# 3. Search without pagination
search_params_full = {"metric_type": "COSINE", "params": {"nprobe": 100}}
search_params_full = {}
search_res_full, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=default_vector_field_name,
anns_field=self.bfloat16_vector_field_name,
search_params=search_params_full,
limit=limit * pages
)
# 4. Compare results - verify pagination results equal the results in full search with offsets
for p in range(pages):
page_res = all_pages_results[p]
for i in range(default_nq):
page_ids = [page_res[i][j].get('id') for j in range(limit)]
ids_in_full = [search_res_full[i][p * limit:p * limit + limit][j].get('id') for j in range(limit)]
intersection_ids = set(ids_in_full).intersection(set(page_ids))
log.debug(f"page[{p}], nq[{i}], intersection_ids: {len(intersection_ids)}")
@pytest.mark.tags(CaseLabel.L0)
def test_search_sparse_with_pagination_default(self):
"""
target: test search sparse vectors with pagination
method: 1. connect and create a collection
2. search sparse vectors with pagination
3. search with offset+limit
4. compare with the search results whose corresponding ids should be the same
expected: search successfully and ids is correct
"""
client = self._client()
# 1. Create collection with schema
collection_name = self.collection_name
# 2. Search with pagination for 10 pages
limit = 100
pages = 10
vectors_to_search = cf.gen_sparse_vectors(default_nq, empty_percentage=2)
all_pages_results = []
for page in range(pages):
offset = page * limit
search_params = {"params": {"drop_ratio_search": "0.2"}, "offset": offset}
search_res_with_offset, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=self.sparse_vector_field_name,
search_params=search_params,
limit=limit,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": default_nq,
"limit": limit
}
)
all_pages_results.append(search_res_with_offset)
# 3. Search without pagination
search_params_full = {"params": {"drop_ratio_search": "0.2"}}
search_res_full, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=self.sparse_vector_field_name,
search_params=search_params_full,
limit=limit * pages
)
@ -222,11 +318,11 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
assert page_ids == ids_in_full
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_search_with_pagination_default2(self):
def test_search_binary_with_pagination_default(self):
"""
target: test search with pagination
target: test search binary vectors with pagination
method: 1. connect and create a collection
2. search pagination with offset
2. search binary vectors with pagination
3. search with offset+limit
4. compare with the search results whose corresponding ids should be the same
expected: search successfully and ids is correct
@ -238,16 +334,16 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
# 2. Search with pagination for 10 pages
limit = 100
pages = 10
vectors_to_search = cf.gen_vectors(default_nq, default_dim)
vectors_to_search = cf.gen_binary_vectors(default_nq, dim=self.binary_vector_dim)[1]
all_pages_results = []
for page in range(pages):
offset = page * limit
search_params = {"metric_type": "COSINE", "params": {"nprobe": 100}, "offset": offset}
search_params = {"params": {"nprobe": 32}, "offset": offset}
search_res_with_offset, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=default_vector_field_name,
anns_field=self.binary_vector_field_name,
search_params=search_params,
limit=limit,
check_task=CheckTasks.check_search_results,
@ -259,12 +355,12 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
all_pages_results.append(search_res_with_offset)
# 3. Search without pagination
search_params_full = {"metric_type": "COSINE", "params": {"nprobe": 100}}
search_params_full = {"params": {"nprobe": 32}}
search_res_full, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=default_vector_field_name,
anns_field=self.binary_vector_field_name,
search_params=search_params_full,
limit=limit * pages
)
@ -276,73 +372,145 @@ class TestMilvusClientSearchPagination(TestMilvusClientV2Base):
page_ids = [page_res[i][j].get('id') for j in range(limit)]
ids_in_full = [search_res_full[i][p * limit:p * limit + limit][j].get('id') for j in range(limit)]
assert page_ids == ids_in_full
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("limit", [100, 3000, 10000])
def test_search_with_pagination_topk(self, limit):
"""
target: Test search pagination when limit + offset equals topK
method: 1. Get client connection
2. Calculate offset as topK - limit
3. Perform search with calculated offset and limit
4. Verify search results are returned correctly
expected: Search should complete successfully with correct number of results
based on the specified limit and offset
"""
client = self._client()
# 1. Create collection with schema
collection_name = self.collection_name
# @pytest.mark.tags(CaseLabel.L0)
# def test_milvus_client_search_with_pagination_default(self):
# """
# target: test search with pagination
# method: 1. connect and create a collection
# 2. search pagination with offset
# 3. search with offset+limit
# 4. compare with the search results whose corresponding ids should be the same
# expected: search successfully and ids is correct
# """
# client = self._client()
# # 1. Create collection with schema
# collection_name = cf.gen_unique_str("test_search_pagination")
# self.create_collection(client, collection_name, default_dim)
#
# # Insert data 5 times with non-duplicated primary keys
# for j in range(5):
# rows = [{default_primary_key_field_name: i + j * default_nb,
# default_vector_field_name: list(cf.gen_vectors(1, default_dim)[0]),
# default_float_field_name: (i + j * default_nb) * 1.0,
# default_string_field_name: str(i + j * default_nb)}
# for i in range(default_nb)]
# self.insert(client, collection_name, rows)
# self.flush(client, collection_name)
#
# # 2. Search with pagination for 10 pages
# limit = 100
# pages = 10
# vectors_to_search = cf.gen_vectors(default_nq, default_dim)
# all_pages_results = []
# for page in range(pages):
# offset = page * limit
# search_params = {"metric_type": "COSINE", "params": {"nprobe": 100}, "offset": offset}
# search_res_with_offset, _ = self.search(
# client,
# collection_name,
# vectors_to_search[:default_nq],
# anns_field=default_vector_field_name,
# search_params=search_params,
# limit=limit,
# check_task=CheckTasks.check_search_results,
# check_items={"enable_milvus_client_api": True,
# "nq": default_nq,
# "limit": limit
# }
# )
# all_pages_results.append(search_res_with_offset)
#
# # 3. Search without pagination
# search_params_full = {"metric_type": "COSINE", "params": {"nprobe": 100}}
# search_res_full, _ = self.search(
# client,
# collection_name,
# vectors_to_search[:default_nq],
# anns_field=default_vector_field_name,
# search_params=search_params_full,
# limit=limit * pages
# )
#
# # 4. Compare results - verify pagination results equal the results in full search with offsets
# for p in range(pages):
# page_res = all_pages_results[p]
# for i in range(default_nq):
# page_ids = [page_res[i][j].get('id') for j in range(limit)]
# ids_in_full = [search_res_full[i][p*limit:p*limit+limit][j].get('id') for j in range(limit)]
# assert page_ids == ids_in_full
# 2. Search with pagination
topK=16384
offset = topK - limit
search_param = {"nprobe": 10, "offset": offset}
vectors_to_search = [[random.random() for _ in range(default_dim)]
for _ in range(default_nq)]
client.search(collection_name, vectors_to_search[:default_nq], anns_field=self.float_vector_field_name,
search_params=search_param, limit=limit, check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": default_nq,
"limit": limit})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("offset", [0, 100])
def test_search_pagination_with_expression(self, offset):
"""
target: Test search pagination functionality with filtering expressions
method: 1. Create collection and insert test data
2. Search with pagination offset and expression filter
3. Search with full limit and expression filter
4. Compare paginated results match full results with offset
expected: Paginated search results should match corresponding subset of full search results
"""
client = self._client()
collection_name = self.collection_name
# filter result with expression in collection
total_datas = self.datas
for expressions in cf.gen_normal_expressions_and_templates():
log.debug(f"search with expression: {expressions}")
expr = expressions[0].replace("&&", "and").replace("||", "or")
filter_ids = []
for i, _id in enumerate(self.primary_keys):
int64 = total_datas[i][ct.default_int64_field_name]
float = total_datas[i][ct.default_float_field_name]
if not expr or eval(expr):
filter_ids.append(_id)
# 2. search
limit = min(default_limit, len(filter_ids))
if offset >= len(filter_ids):
limit = 0
elif len(filter_ids) - offset < default_limit:
limit = len(filter_ids) - offset
search_params = {"metric_type": "COSINE", "params": {"nprobe": 128}, "offset": offset}
vectors_to_search = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)]
search_res_with_offset, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=self.float_vector_field_name,
search_params=search_params,
limit=default_limit,
filter=expr,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": default_nq,
"limit": limit}
)
# 3. search with offset+limit
search_params_full = {"metric_type": "COSINE", "params": {"nprobe": 128}}
search_res_full, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=self.float_vector_field_name,
search_params=search_params_full,
limit=default_limit + offset,
filter=expr
)
# 4. Compare results
filter_ids_set = set(filter_ids)
for hits in search_res_with_offset:
ids = [hit.get('id') for hit in hits]
assert set(ids).issubset(filter_ids_set)
# Compare pagination results with full results
page_ids = [search_res_with_offset[0][j].get('id') for j in range(limit)]
ids_in_full = [search_res_full[0][offset:offset + limit][j].get('id') for j in range(limit)]
assert page_ids == ids_in_full
# 5. search again with expression template
expr = cf.get_expr_from_template(expressions[1]).replace("&&", "and").replace("||", "or")
expr_params = cf.get_expr_params_from_template(expressions[1])
search_res_with_offset, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=self.float_vector_field_name,
search_params=search_params,
limit=default_limit,
filter=expr,
filter_params=expr_params,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": default_nq,
"limit": limit}
)
# 6. search with offset+limit
search_res_full, _ = self.search(
client,
collection_name,
vectors_to_search[:default_nq],
anns_field=self.float_vector_field_name,
search_params=search_params_full,
limit=default_limit + offset,
filter=expr,
filter_params=expr_params
)
# Compare results
filter_ids_set = set(filter_ids)
for hits in search_res_with_offset:
ids = [hit.get('id') for hit in hits]
assert set(ids).issubset(filter_ids_set)
# Compare pagination results with full results
page_ids = [search_res_with_offset[0][j].get('id') for j in range(limit)]
ids_in_full = [search_res_full[0][offset:offset + limit][j].get('id') for j in range(limit)]
assert page_ids == ids_in_full
class TestSearchPagination(TestcaseBase):
@ -374,251 +542,6 @@ class TestSearchPagination(TestcaseBase):
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L1)
def test_search_string_with_pagination(self, offset, _async):
"""
target: test search string with pagination
method: 1. connect and create a collection
2. search pagination with offset
3. search with offset+limit
4. compare with the search results whose corresponding ids should be the same
expected: search successfully and ids is correct
"""
# 1. create a collection
auto_id = True
enable_dynamic_field = True
collection_w, _, _, insert_ids = \
self.init_collection_general(prefix, True, auto_id=auto_id, dim=default_dim,
enable_dynamic_field=enable_dynamic_field)[0:4]
# 2. search
search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}, "offset": offset}
vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)]
output_fields = [default_string_field_name, default_float_field_name]
search_res = collection_w.search(vectors[:default_nq], default_search_field,
search_param, default_limit,
default_search_string_exp,
output_fields=output_fields,
_async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"ids": insert_ids,
"limit": default_limit,
"_async": _async})[0]
# 3. search with offset+limit
res = collection_w.search(vectors[:default_nq], default_search_field, default_search_params,
default_limit + offset, default_search_string_exp, _async=_async)[0]
if _async:
search_res.done()
search_res = search_res.result()
res.done()
res = res.result()
res_distance = res[0].distances[offset:]
# assert sorted(search_res[0].distances, key=numpy.float32) == sorted(res_distance, key=numpy.float32)
assert set(search_res[0].ids) == set(res[0].ids[offset:])
@pytest.mark.tags(CaseLabel.L1)
def test_search_binary_with_pagination(self, offset):
"""
target: test search binary with pagination
method: 1. connect and create a collection
2. search pagination with offset
3. search with offset+limit
4. compare with the search results whose corresponding ids should be the same
expected: search successfully and ids is correct
"""
# 1. create a collection
auto_id = False
collection_w, _, _, insert_ids = \
self.init_collection_general(
prefix, True, is_binary=True, auto_id=auto_id, dim=default_dim)[0:4]
# 2. search
search_param = {"metric_type": "JACCARD",
"params": {"nprobe": 10}, "offset": offset}
binary_vectors = cf.gen_binary_vectors(default_nq, default_dim)[1]
search_res = collection_w.search(binary_vectors[:default_nq], "binary_vector",
search_param, default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"ids": insert_ids,
"limit": default_limit})[0]
# 3. search with offset+limit
search_binary_param = {
"metric_type": "JACCARD", "params": {"nprobe": 10}}
res = collection_w.search(binary_vectors[:default_nq], "binary_vector", search_binary_param,
default_limit + offset)[0]
assert len(search_res[0].ids) == len(res[0].ids[offset:])
assert sorted(search_res[0].distances, key=np.float32) == sorted(
res[0].distances[offset:], key=np.float32)
@pytest.mark.tags(CaseLabel.L1)
def test_search_all_vector_type_with_pagination(self, vector_data_type):
"""
target: test search with pagination using different vector datatype
method: 1. connect and create a collection
2. search pagination with offset
3. search with offset+limit
4. compare with the search results whose corresponding ids should be the same
expected: search successfully and ids is correct
"""
# 1. create a collection
auto_id = False
enable_dynamic_field = True
offset = 100
limit = 20
collection_w = self.init_collection_general(prefix, True, auto_id=auto_id, dim=default_dim,
enable_dynamic_field=enable_dynamic_field,
vector_data_type=vector_data_type)[0]
# 2. search pagination with offset
search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}, "offset": offset}
vectors = cf.gen_vectors_based_on_vector_type(default_nq, default_dim, vector_data_type)
search_res = collection_w.search(vectors[:default_nq], default_search_field,
search_param, limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": limit})[0]
# 3. search with offset+limit
res = collection_w.search(vectors[:default_nq], default_search_field, default_search_params,
limit + offset, default_search_exp)[0]
res_distance = res[0].distances[offset:]
# assert sorted(search_res[0].distances, key=numpy.float32) == sorted(res_distance, key=numpy.float32)
assert set(search_res[0].ids) == set(res[0].ids[offset:])
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("limit", [100, 3000, 10000])
def test_search_with_pagination_topK(self, limit, _async):
"""
target: test search with pagination limit + offset = topK
method: 1. connect and create a collection
2. search pagination with offset
3. search with topK
4. compare with the search results whose corresponding ids should be the same
expected: search successfully and ids is correct
"""
# 1. create a collection
topK = 16384
auto_id = True
offset = topK - limit
collection_w = self.init_collection_general(
prefix, True, nb=20000, auto_id=auto_id, dim=default_dim)[0]
# 2. search
search_param = {"metric_type": "COSINE",
"params": {"nprobe": 10}, "offset": offset}
vectors = [[random.random() for _ in range(default_dim)]
for _ in range(default_nq)]
search_res = collection_w.search(vectors[:default_nq], default_search_field,
search_param, limit,
default_search_exp, _async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"limit": limit,
"_async": _async})[0]
# 3. search with topK
res = collection_w.search(vectors[:default_nq], default_search_field, default_search_params,
topK, default_search_exp, _async=_async)[0]
if _async:
search_res.done()
search_res = search_res.result()
res.done()
res = res.result()
res_distance = res[0].distances[offset:]
# assert sorted(search_res[0].distances, key=numpy.float32) == sorted(res_distance, key=numpy.float32)
assert set(search_res[0].ids) == set(res[0].ids[offset:])
@pytest.mark.tags(CaseLabel.L2)
def test_search_pagination_with_expression(self, offset):
"""
target: test search pagination with expression
method: create connection, collection, insert and search with expression
expected: search successfully
"""
# 1. create a collection
nb = 2500
dim = 38
enable_dynamic_field = False
collection_w, _vectors, _, insert_ids = \
self.init_collection_general(prefix, True, nb=nb, dim=dim,
enable_dynamic_field=enable_dynamic_field)[0:4]
collection_w.load()
# filter result with expression in collection
_vectors = _vectors[0]
for _async in [False, True]:
for expressions in cf.gen_normal_expressions_and_templates():
log.debug(f"search with expression: {expressions} with _async: {_async}")
expr = expressions[0].replace("&&", "and").replace("||", "or")
filter_ids = []
for i, _id in enumerate(insert_ids):
if enable_dynamic_field:
int64 = _vectors[i][ct.default_int64_field_name]
float = _vectors[i][ct.default_float_field_name]
else:
int64 = _vectors.int64[i]
float = _vectors.float[i]
if not expr or eval(expr):
filter_ids.append(_id)
# 2. search
limit = min(default_limit, len(filter_ids))
if offset >= len(filter_ids):
limit = 0
elif len(filter_ids) - offset < default_limit:
limit = len(filter_ids) - offset
search_param = {"metric_type": "COSINE",
"params": {"nprobe": 10}, "offset": offset}
vectors = [[random.random() for _ in range(dim)]
for _ in range(default_nq)]
search_res, _ = collection_w.search(vectors[:default_nq], default_search_field,
search_param, default_limit,
expr=expr,
_async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"ids": insert_ids,
"limit": limit,
"_async": _async})
# 3. search with offset+limit
res = collection_w.search(vectors[:default_nq], default_search_field, default_search_params,
default_limit + offset,
expr=expr, _async=_async)[0]
if _async:
res.done()
res = res.result()
search_res.done()
search_res = search_res.result()
filter_ids_set = set(filter_ids)
for hits in search_res:
ids = hits.ids
assert set(ids).issubset(filter_ids_set)
assert set(search_res[0].ids) == set(res[0].ids[offset:])
# 4. search again with expression template
expr = cf.get_expr_from_template(expressions[1]).replace("&&", "and").replace("||", "or")
expr_params = cf.get_expr_params_from_template(expressions[1])
search_res, _ = collection_w.search(vectors[:default_nq], default_search_field,
search_param, default_limit,
expr=expr, expr_params=expr_params,
_async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"ids": insert_ids,
"limit": limit,
"_async": _async})
# 3. search with offset+limit
res = collection_w.search(vectors[:default_nq], default_search_field, default_search_params,
default_limit + offset,
expr=expr, expr_params=expr_params, _async=_async)[0]
if _async:
res.done()
res = res.result()
search_res.done()
search_res = search_res.result()
filter_ids_set = set(filter_ids)
for hits in search_res:
ids = hits.ids
assert set(ids).issubset(filter_ids_set)
assert set(search_res[0].ids) == set(res[0].ids[offset:])
@pytest.mark.tags(CaseLabel.L2)
def test_search_pagination_with_index_partition(self, offset, _async):
"""
@ -843,7 +766,8 @@ class TestSearchPagination(TestcaseBase):
res = collection_w.search(search_vectors[:default_nq], ct.default_sparse_vec_field_name, _search_param,
default_limit + offset)[0]
assert len(search_res[0].ids) == len(res[0].ids[offset:])
assert sorted(search_res[0].distances, key=np.float32) == sorted(res[0].distances[offset:], key=np.float32)
assert sorted(search_res[0].distances, key=np.float32) == sorted(
res[0].distances[offset:], key=np.float32)
class TestSearchPaginationInvalid(TestMilvusClientV2Base):
@ -853,6 +777,7 @@ class TestSearchPaginationInvalid(TestMilvusClientV2Base):
# The following are invalid cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L1)
def test_search_pagination_with_invalid_offset_type(self):
"""
@ -861,9 +786,9 @@ class TestSearchPaginationInvalid(TestMilvusClientV2Base):
expected: raise exception
"""
client = self._client()
# 1. Create collection with schema
collection_name = cf.gen_unique_str("test_search_pagination")
collection_name = cf.gen_collection_name_by_testcase_name()
self.create_collection(client, collection_name, default_dim)
# Insert data
@ -875,7 +800,7 @@ class TestSearchPaginationInvalid(TestMilvusClientV2Base):
# Search with invalid offset types
vectors_to_search = cf.gen_vectors(default_nq, default_dim)
invalid_offsets = [" ", [1, 2], {1}, "12 s"]
for offset in invalid_offsets:
log.debug(f"assert search error if offset={offset}")
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}, "offset": offset}
@ -901,9 +826,9 @@ class TestSearchPaginationInvalid(TestMilvusClientV2Base):
expected: raise exception
"""
client = self._client()
# 1. Create collection with schema
collection_name = cf.gen_unique_str("test_search_pagination")
collection_name = cf.gen_collection_name_by_testcase_name()
self.create_collection(client, collection_name, default_dim)
# Insert data
@ -915,7 +840,7 @@ class TestSearchPaginationInvalid(TestMilvusClientV2Base):
# Search with invalid offset values
vectors_to_search = cf.gen_vectors(default_nq, default_dim)
invalid_offsets = [-1, 16385]
for offset in invalid_offsets:
log.debug(f"assert search error if offset={offset}")
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}, "offset": offset}
@ -931,4 +856,4 @@ class TestSearchPaginationInvalid(TestMilvusClientV2Base):
"err_code": 1,
"err_msg": f"offset [{offset}] is invalid, it should be in range [1, 16384]"
}
)
)

View File

@ -15,11 +15,11 @@ class TestE2e(TestcaseBase):
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_default(self):
# create
name = cf.gen_unique_str(prefix)
collection_name = cf.gen_collection_name_by_testcase_name()
t0 = time.time()
collection_w = self.init_collection_wrap(name=name, active_trace=True)
collection_w = self.init_collection_wrap(name=collection_name, active_trace=True)
tt = time.time() - t0
assert collection_w.name == name
assert collection_w.name == collection_name
# index
index_params = {"index_type": "IVF_SQ8", "params": {"nlist": 64}, "metric_type": "L2"}