test: add test cases for code change (#32290)

issue: #31368 
pr: #32289

Signed-off-by: binbin lv <binbin.lv@zilliz.com>
pull/32293/head
binbin 2024-04-16 09:45:19 +08:00 committed by GitHub
parent 018a784989
commit 9b25ce882b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 403 additions and 153 deletions

View File

@ -154,6 +154,10 @@ class TestCollectionSearchInvalid(TestcaseBase):
def enable_dynamic_field(self, request):
yield request.param
@pytest.fixture(scope="function", params=["FLOAT_VECTOR", "FLOAT16_VECTOR", "BFLOAT16_VECTOR"])
def vector_data_type(self, request):
yield request.param
"""
******************************************************************
# The followings are invalid cases
@ -720,9 +724,8 @@ class TestCollectionSearchInvalid(TestcaseBase):
check_items={"err_code": 65535,
"err_msg": "collection not loaded"})
@pytest.mark.skip("enable this later using session/strong consistency")
@pytest.mark.tags(CaseLabel.L1)
def test_search_with_empty_collection(self):
def test_search_with_empty_collection(self, vector_data_type):
"""
target: test search with empty connection
method: 1. search the empty collection before load
@ -733,15 +736,16 @@ class TestCollectionSearchInvalid(TestcaseBase):
3. return topk successfully
"""
# 1. initialize without data
collection_w = self.init_collection_general(prefix)[0]
collection_w = self.init_collection_general(prefix, is_index=False, vector_data_type=vector_data_type)[0]
# 2. search collection without data before load
log.info("test_search_with_empty_collection: Searching empty collection %s"
% collection_w.name)
err_msg = "collection" + collection_w.name + "was not loaded into memory"
vectors = cf.gen_vectors_based_on_vector_type(default_nq, default_dim, vector_data_type)
collection_w.search(vectors[:default_nq], default_search_field, default_search_params,
default_limit, default_search_exp, timeout=1,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
check_items={"err_code": 101,
"err_msg": err_msg})
# 3. search collection without data after load
collection_w.create_index(
@ -754,16 +758,15 @@ class TestCollectionSearchInvalid(TestcaseBase):
"ids": [],
"limit": 0})
# 4. search with data inserted but not load again
data = cf.gen_default_dataframe_data(nb=2000)
insert_res = collection_w.insert(data)[0]
insert_res = cf.insert_data(collection_w, vector_data_type=vector_data_type)[3]
assert collection_w.num_entities == default_nb
# Using bounded staleness, maybe we cannot search the "inserted" requests,
# since the search requests arrived query nodes earlier than query nodes consume the insert requests.
collection_w.search(vectors[:default_nq], default_search_field, default_search_params,
default_limit, default_search_exp,
guarantee_timestamp=insert_res.timestamp,
check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq,
"ids": insert_res.primary_keys,
"ids": insert_res,
"limit": default_limit})
@pytest.mark.tags(CaseLabel.L2)
@ -2212,7 +2215,7 @@ class TestCollectionSearch(TestcaseBase):
"limit": limit,
"_async": _async})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.GPU)
@pytest.mark.parametrize("index, params",
zip(ct.all_index_types[:7],
@ -2224,8 +2227,6 @@ class TestCollectionSearch(TestcaseBase):
method: test search after different index and corresponding search params
expected: search successfully with limit(topK)
"""
if index == "DISKANN":
pytest.skip("https://github.com/milvus-io/milvus/issues/30793")
# 1. initialize with data
collection_w, _, _, insert_ids, time_stamp = self.init_collection_general(prefix, True, 5000,
partition_num=1,
@ -10616,6 +10617,105 @@ class TestCollectionHybridSearchValid(TestcaseBase):
for i in range(len(score_answer_nq[k][:default_limit])):
assert score_answer_nq[k][i] - hybrid_res[k].distances[i] < hybrid_search_epsilon
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("nq", [16384])
def test_hybrid_search_normal_max_nq(self, nq):
"""
target: test hybrid search normal case
method: create connection, collection, insert and search
expected: hybrid search successfully with limit(topK)
"""
# 1. initialize collection with data
collection_w, _, _, insert_ids, time_stamp = self.init_collection_general(prefix, True)[0:5]
# 2. extract vector field name
vector_name_list = cf.extract_vector_field_name_list(collection_w)
vector_name_list.append(ct.default_float_vec_field_name)
# 3. prepare search params
req_list = []
weights = [1]
vectors = cf.gen_vectors_based_on_vector_type(nq, default_dim, "FLOAT_VECTOR")
log.debug("binbin")
log.debug(vectors)
# 4. get hybrid search req list
for i in range(len(vector_name_list)):
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
"param": {"metric_type": "COSINE"},
"limit": default_limit,
"expr": "int64 > 0"}
req = AnnSearchRequest(**search_param)
req_list.append(req)
# 5. hybrid search
hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit})[0]
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.xfail(reason="issue 32288")
@pytest.mark.parametrize("nq", [0, 16385])
def test_hybrid_search_normal_over_max_nq(self, nq):
"""
target: test hybrid search normal case
method: create connection, collection, insert and search
expected: hybrid search successfully with limit(topK)
"""
# 1. initialize collection with data
collection_w = self.init_collection_general(prefix, True)[0]
# 2. extract vector field name
vector_name_list = cf.extract_vector_field_name_list(collection_w)
vector_name_list.append(ct.default_float_vec_field_name)
# 3. prepare search params
req_list = []
weights = [1]
vectors = cf.gen_vectors_based_on_vector_type(nq, default_dim, "FLOAT_VECTOR")
# 4. get hybrid search req list
for i in range(len(vector_name_list)):
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
"param": {"metric_type": "COSINE"},
"limit": default_limit,
"expr": "int64 > 0"}
req = AnnSearchRequest(**search_param)
req_list.append(req)
# 5. hybrid search
err_msg = "nq (number of search vector per search request) should be in range [1, 16384]"
collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit,
check_task=CheckTasks.err_res,
check_items={"err_code": 65535,
"err_msg": err_msg})
@pytest.mark.tags(CaseLabel.L1)
def test_hybrid_search_no_limit(self):
"""
target: test hybrid search with no limit
method: create connection, collection, insert and search
expected: hybrid search successfully with limit(topK)
"""
# 1. initialize collection with data
multiple_dim_array = [default_dim, default_dim]
collection_w, _, _, insert_ids, time_stamp = \
self.init_collection_general(prefix, True, multiple_dim_array=multiple_dim_array)[0:5]
# 2. extract vector field name
vector_name_list = cf.extract_vector_field_name_list(collection_w)
vector_name_list.append(ct.default_float_vec_field_name)
# 3. prepare search params
vectors = cf.gen_vectors_based_on_vector_type(nq, default_dim, "FLOAT_VECTOR")
# get hybrid search req list
search_param = {
"data": vectors,
"anns_field": vector_name_list[0],
"param": {"metric_type": "COSINE"},
"limit": default_limit,
"expr": "int64 > 0"}
req = AnnSearchRequest(**search_param)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_WeightedRanker_empty_reqs(self, primary_field):
@ -10895,7 +10995,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_overall_different_limit(self, primary_field, dim, enable_dynamic_field, metric_type):
def test_hybrid_search_overall_different_limit(self, nq, primary_field, dim, enable_dynamic_field, metric_type):
"""
target: test hybrid search with different limit params
method: create connection, collection, insert and search
@ -10917,7 +11017,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
req_list = []
for i in range(len(vector_name_list)):
search_param = {
"data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(1)],
"data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)],
"anns_field": vector_name_list[i],
"param": {"metric_type": metric_type, "offset": 0},
"limit": default_limit - i,
@ -10927,7 +11027,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
# 4. hybrid search
collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit})
@ -10980,7 +11080,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_max_limit(self, primary_field, dim, enable_dynamic_field, metric_type):
def test_hybrid_search_max_limit(self, nq, primary_field, dim, enable_dynamic_field, metric_type):
"""
target: test hybrid search with maximum limit params
method: create connection, collection, insert and search
@ -11002,7 +11102,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
req_list = []
for i in range(len(vector_name_list)):
search_param = {
"data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(1)],
"data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)],
"anns_field": vector_name_list[i],
"param": {"metric_type": metric_type},
"limit": max_limit,
@ -11012,13 +11112,13 @@ class TestCollectionHybridSearchValid(TestcaseBase):
# 4. hybrid search
collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_max_min_limit(self, primary_field, dim, enable_dynamic_field, metric_type):
def test_hybrid_search_max_min_limit(self, nq, primary_field, dim, enable_dynamic_field, metric_type):
"""
target: test hybrid search with maximum and minimum limit params
method: create connection, collection, insert and search
@ -11043,7 +11143,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
if i == 1:
limit = 1
search_param = {
"data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(1)],
"data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)],
"anns_field": vector_name_list[i],
"param": {"metric_type": metric_type},
"limit": limit,
@ -11053,13 +11153,13 @@ class TestCollectionHybridSearchValid(TestcaseBase):
# 4. hybrid search
collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_same_anns_field(self, primary_field, dim, enable_dynamic_field, metric_type):
def test_hybrid_search_same_anns_field(self, nq, primary_field, dim, enable_dynamic_field, metric_type):
"""
target: test hybrid search: multiple search on same anns field
method: create connection, collection, insert and search
@ -11081,7 +11181,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
req_list = []
for i in range(len(vector_name_list)):
search_param = {
"data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(1)],
"data": [[random.random() for _ in range(multiple_dim_array[i])] for _ in range(nq)],
"anns_field": vector_name_list[0],
"param": {"metric_type": metric_type, "offset": 0},
"limit": default_limit,
@ -11091,13 +11191,13 @@ class TestCollectionHybridSearchValid(TestcaseBase):
# 4. hybrid search
collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9), default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_different_offset_single_field(self, primary_field, dim, auto_id, is_flush,
def test_hybrid_search_different_offset_single_field(self, nq, primary_field, dim, auto_id, is_flush,
enable_dynamic_field, metric_type):
"""
target: test hybrid search for fields with different offset
@ -11120,7 +11220,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
req_list = []
for i in range(len(vector_name_list)):
search_param = {
"data": [[random.random() for _ in range(dim)] for _ in range(1)],
"data": [[random.random() for _ in range(dim)] for _ in range(nq)],
"anns_field": vector_name_list[i],
"param": {"metric_type": metric_type, "offset": i},
"limit": default_limit,
@ -11130,7 +11230,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
# 4. hybrid search
collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9, 1), default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit})
@ -11730,199 +11830,334 @@ class TestCollectionHybridSearchValid(TestcaseBase):
collection_w.hybrid_search(req_list, WeightedRanker(*weights), limit,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_with_output_fields_all_fields(self, primary_field):
def test_hybrid_search_with_output_fields(self, nq, dim, auto_id, is_flush, enable_dynamic_field,
primary_field, vector_data_type):
"""
target: test hybrid search normal case
method: create connection, collection, insert and search
expected: hybrid search successfully with limit(topK)
"""
# 1. initialize collection with data
nq = 10
multiple_dim_array = [dim, dim]
collection_w, _, _, insert_ids, time_stamp = \
self.init_collection_general(prefix, True, primary_field=primary_field,
multiple_dim_array=[default_dim, default_dim])[0:5]
self.init_collection_general(prefix, True, auto_id=auto_id, dim=dim, is_flush=is_flush,
primary_field=primary_field,
enable_dynamic_field=enable_dynamic_field,
multiple_dim_array=multiple_dim_array,
vector_data_type=vector_data_type)[0:5]
# 2. extract vector field name
vector_name_list = cf.extract_vector_field_name_list(collection_w)
vector_name_list.append(ct.default_float_vec_field_name)
# 3. prepare search params
req_list = []
weights = [0.2, 0.3, 0.5]
search_res_dict_array = []
metrics = []
search_res_dict_array = []
search_res_dict_array_nq = []
vectors = cf.gen_vectors_based_on_vector_type(nq, dim, vector_data_type)
# get hybrid search req list
for i in range(len(vector_name_list)):
vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)]
search_res_dict = {}
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
"param": {"metric_type": "COSINE", "offset": 0},
"param": {"metric_type": "COSINE"},
"limit": default_limit,
"expr": "int64 > 0"}
req = AnnSearchRequest(**search_param)
req_list.append(req)
metrics.append("COSINE")
# search to get the base line of hybrid_search
search_res = collection_w.search(vectors[:1], vector_name_list[i],
default_search_params, default_limit,
# get the result of search with the same params of the following hybrid search
single_search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}}
for k in range(nq):
for i in range(len(vector_name_list)):
search_res_dict = {}
search_res_dict_array = []
vectors_search = vectors[k]
# 5. search to get the base line of hybrid_search
search_res = collection_w.search([vectors_search], vector_name_list[i],
single_search_param, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"ids": insert_ids,
"limit": default_limit})[0]
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
# 4. calculate hybrid search base line
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array, weights, metrics)
# 5. hybrid search
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
search_res_dict_array_nq.append(search_res_dict_array)
# 6. calculate hybrid search base line
score_answer_nq = []
for k in range(nq):
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array_nq[k], weights, metrics)
score_answer_nq.append(score_answer)
# 7. hybrid search
output_fields = [default_int64_field_name]
hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit,
output_fields=output_fields,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit})[0]
# 8. compare results through the re-calculated distances
for k in range(len(score_answer_nq)):
for i in range(len(score_answer_nq[k][:default_limit])):
assert score_answer_nq[k][i] - hybrid_res[k].distances[i] < hybrid_search_epsilon
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_with_output_fields_all_fields(self, nq, dim, auto_id, is_flush, enable_dynamic_field,
primary_field, vector_data_type):
"""
target: test hybrid search normal case
method: create connection, collection, insert and search
expected: hybrid search successfully with limit(topK)
"""
# 1. initialize collection with data
nq = 10
multiple_dim_array = [dim, dim]
collection_w, _, _, insert_ids, time_stamp = \
self.init_collection_general(prefix, True, auto_id=auto_id, dim=dim, is_flush=is_flush,
primary_field=primary_field,
enable_dynamic_field=enable_dynamic_field,
multiple_dim_array=multiple_dim_array,
vector_data_type=vector_data_type)[0:5]
# 2. extract vector field name
vector_name_list = cf.extract_vector_field_name_list(collection_w)
vector_name_list.append(ct.default_float_vec_field_name)
# 3. prepare search params
req_list = []
weights = [0.2, 0.3, 0.5]
metrics = []
search_res_dict_array = []
search_res_dict_array_nq = []
vectors = cf.gen_vectors_based_on_vector_type(nq, dim, vector_data_type)
# get hybrid search req list
for i in range(len(vector_name_list)):
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
"param": {"metric_type": "COSINE"},
"limit": default_limit,
"expr": "int64 > 0"}
req = AnnSearchRequest(**search_param)
req_list.append(req)
metrics.append("COSINE")
# get the result of search with the same params of the following hybrid search
single_search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}}
for k in range(nq):
for i in range(len(vector_name_list)):
search_res_dict = {}
search_res_dict_array = []
vectors_search = vectors[k]
# 5. search to get the base line of hybrid_search
search_res = collection_w.search([vectors_search], vector_name_list[i],
single_search_param, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"ids": insert_ids,
"limit": default_limit})[0]
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
search_res_dict_array_nq.append(search_res_dict_array)
# 6. calculate hybrid search base line
score_answer_nq = []
for k in range(nq):
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array_nq[k], weights, metrics)
score_answer_nq.append(score_answer)
# 7. hybrid search
output_fields = [default_int64_field_name, default_float_field_name, default_string_field_name,
default_json_field_name]
output_fields = output_fields + vector_name_list
hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit,
output_fields=output_fields,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit,
"output_fields": output_fields})[0]
# 6. compare results through the re-calculated distances
for i in range(len(score_answer[:default_limit])):
delta = math.fabs(score_answer[i] - hybrid_res[0].distances[i])
assert delta < hybrid_search_epsilon
"limit": default_limit})[0]
# 8. compare results through the re-calculated distances
for k in range(len(score_answer_nq)):
for i in range(len(score_answer_nq[k][:default_limit])):
assert score_answer_nq[k][i] - hybrid_res[k].distances[i] < hybrid_search_epsilon
@pytest.mark.tags(CaseLabel.L2)
def test_hybrid_search_with_output_fields_all_fields_wildcard(self):
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_with_output_fields_all_fields(self, nq, dim, auto_id, is_flush, enable_dynamic_field,
primary_field, vector_data_type):
"""
target: test hybrid search normal case
method: create connection, collection, insert and search
expected: hybrid search successfully with limit(topK)
"""
# 1. initialize collection with data
nq = 10
multiple_dim_array = [dim, dim]
collection_w, _, _, insert_ids, time_stamp = \
self.init_collection_general(prefix, True, multiple_dim_array=[default_dim, default_dim])[0:5]
self.init_collection_general(prefix, True, auto_id=auto_id, dim=dim, is_flush=is_flush,
primary_field=primary_field,
enable_dynamic_field=enable_dynamic_field,
multiple_dim_array=multiple_dim_array,
vector_data_type=vector_data_type)[0:5]
# 2. extract vector field name
vector_name_list = cf.extract_vector_field_name_list(collection_w)
vector_name_list.append(ct.default_float_vec_field_name)
# 3. prepare search params
req_list = []
weights = [0.2, 0.3, 0.5]
search_res_dict_array = []
metrics = []
search_res_dict_array = []
search_res_dict_array_nq = []
vectors = cf.gen_vectors_based_on_vector_type(nq, dim, vector_data_type)
# get hybrid search req list
for i in range(len(vector_name_list)):
vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)]
search_res_dict = {}
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
"param": {"metric_type": "COSINE", "offset": 0},
"param": {"metric_type": "COSINE"},
"limit": default_limit,
"expr": "int64 > 0"}
req = AnnSearchRequest(**search_param)
req_list.append(req)
metrics.append("COSINE")
# search to get the base line of hybrid_search
search_res = collection_w.search(vectors[:1], vector_name_list[i],
default_search_params, default_limit,
# get the result of search with the same params of the following hybrid search
single_search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}}
for k in range(nq):
for i in range(len(vector_name_list)):
search_res_dict = {}
search_res_dict_array = []
vectors_search = vectors[k]
# 5. search to get the base line of hybrid_search
search_res = collection_w.search([vectors_search], vector_name_list[i],
single_search_param, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"ids": insert_ids,
"limit": default_limit})[0]
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
# 4. calculate hybrid search base line
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array, weights, metrics)
# 5. hybrid search
output_fields = [default_int64_field_name, default_float_field_name, default_string_field_name,
default_json_field_name]
output_fields = output_fields + vector_name_list
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
search_res_dict_array_nq.append(search_res_dict_array)
# 6. calculate hybrid search base line
score_answer_nq = []
for k in range(nq):
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array_nq[k], weights, metrics)
score_answer_nq.append(score_answer)
# 7. hybrid search
hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit,
output_fields = ["*"],
output_fields= ["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit,
"output_fields": output_fields})[0]
# 6. compare results through the re-calculated distances
for i in range(len(score_answer[:default_limit])):
delta = math.fabs(score_answer[i] - hybrid_res[0].distances[i])
assert delta < hybrid_search_epsilon
"limit": default_limit})[0]
# 8. compare results through the re-calculated distances
for k in range(len(score_answer_nq)):
for i in range(len(score_answer_nq[k][:default_limit])):
assert score_answer_nq[k][i] - hybrid_res[k].distances[i] < hybrid_search_epsilon
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("output_fields", [[default_search_field], [default_search_field, default_int64_field_name]])
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_with_output_fields_sync_async(self, primary_field, output_fields, _async):
def test_hybrid_search_with_output_fields_sync_async(self, nq, primary_field, output_fields, _async):
"""
target: test hybrid search normal case
method: create connection, collection, insert and search
expected: hybrid search successfully with limit(topK)
"""
# 1. initialize collection with data
multiple_dim_array = [default_dim, default_dim]
collection_w, _, _, insert_ids, time_stamp = \
self.init_collection_general(prefix, True, primary_field=primary_field,
multiple_dim_array=[default_dim, default_dim])[0:5]
self.init_collection_general(prefix, True, dim=default_dim,
primary_field=primary_field,
multiple_dim_array=multiple_dim_array)[0:5]
# 2. extract vector field name
vector_name_list = cf.extract_vector_field_name_list(collection_w)
vector_name_list.append(ct.default_float_vec_field_name)
# 3. prepare search params
req_list = []
weights = [0.2, 0.3, 0.5]
search_res_dict_array = []
metrics = []
search_res_dict_array = []
search_res_dict_array_nq = []
vectors = cf.gen_vectors_based_on_vector_type(nq, default_dim, "FLOAT_VECTOR")
# get hybrid search req list
for i in range(len(vector_name_list)):
vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)]
search_res_dict = {}
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
"param": {"metric_type": "COSINE", "offset": 0},
"param": {"metric_type": "COSINE"},
"limit": default_limit,
"expr": "int64 > 0"}
req = AnnSearchRequest(**search_param)
req_list.append(req)
metrics.append("COSINE")
# search to get the base line of hybrid_search
search_res = collection_w.search(vectors[:1], vector_name_list[i],
default_search_params, default_limit,
# get the result of search with the same params of the following hybrid search
single_search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}}
for k in range(nq):
for i in range(len(vector_name_list)):
search_res_dict = {}
search_res_dict_array = []
vectors_search = vectors[k]
# 5. search to get the base line of hybrid_search
search_res = collection_w.search([vectors_search], vector_name_list[i],
single_search_param, default_limit,
default_search_exp,
_async = _async,
_async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"ids": insert_ids,
"limit": default_limit,
"_async": _async})[0]
if _async:
search_res.done()
search_res = search_res.result()
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
# 4. calculate hybrid search base line
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array, weights, metrics)
# 5. hybrid search
hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit, _async = _async,
output_fields = output_fields,
"limit": default_limit})[0]
if _async:
search_res.done()
search_res = search_res.result()
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
search_res_dict_array_nq.append(search_res_dict_array)
# 6. calculate hybrid search base line
score_answer_nq = []
for k in range(nq):
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array_nq[k], weights, metrics)
score_answer_nq.append(score_answer)
# 7. hybrid search
hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit,
output_fields= output_fields,
_async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": default_limit,
"output_fields": output_fields,
"_async": _async})[0]
"limit": default_limit})[0]
if _async:
hybrid_res.done()
hybrid_res = hybrid_res.result()
# 6. compare results through the re-calculated distances
for i in range(len(score_answer[:default_limit])):
delta = math.fabs(score_answer[i] - hybrid_res[0].distances[i])
assert delta < hybrid_search_epsilon
# 8. compare results through the re-calculated distances
for k in range(len(score_answer_nq)):
for i in range(len(score_answer_nq[k][:default_limit])):
assert score_answer_nq[k][i] - hybrid_res[k].distances[i] < hybrid_search_epsilon
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("rerank", [RRFRanker(), WeightedRanker(0.1, 0.9, 1)])
@ -11965,7 +12200,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("limit", [1, 100, 16384])
@pytest.mark.parametrize("primary_field", [ct.default_int64_field_name, ct.default_string_field_name])
def test_hybrid_search_is_partition_key(self, primary_field, limit):
def test_hybrid_search_is_partition_key(self, nq, primary_field, limit, vector_data_type):
"""
target: test hybrid search with different valid limit and round decimal
method: create connection, collection, insert and search
@ -11975,6 +12210,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
collection_w, _, _, insert_ids, time_stamp = \
self.init_collection_general(prefix, True, primary_field=primary_field,
multiple_dim_array=[default_dim, default_dim],
vector_data_type = vector_data_type,
is_partition_key=ct.default_float_field_name)[0:5]
# 2. extract vector field name
vector_name_list = cf.extract_vector_field_name_list(collection_w)
@ -11982,51 +12218,63 @@ class TestCollectionHybridSearchValid(TestcaseBase):
# 3. prepare search params
req_list = []
weights = [0.2, 0.3, 0.5]
search_res_dict_array = []
if limit > default_nb:
limit = default_limit
metrics = []
search_res_dict_array = []
search_res_dict_array_nq = []
vectors = cf.gen_vectors_based_on_vector_type(nq, default_dim, vector_data_type)
# get hybrid search req list
for i in range(len(vector_name_list)):
vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)]
search_res_dict = {}
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
"param": {"metric_type": "COSINE", "offset": 0},
"limit": limit,
"param": {"metric_type": "COSINE"},
"limit": default_limit,
"expr": "int64 > 0"}
req = AnnSearchRequest(**search_param)
req_list.append(req)
metrics.append("COSINE")
# search to get the base line of hybrid_search
search_res = collection_w.search(vectors[:1], vector_name_list[i],
default_search_params, limit,
default_search_exp, round_decimal= 5,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"ids": insert_ids,
"limit": limit})[0]
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
# 4. calculate hybrid search base line
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array, weights, metrics, 5)
# 5. hybrid search
hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), limit,
round_decimal=5,
# get the result of search with the same params of the following hybrid search
single_search_param = {"metric_type": "COSINE", "params": {"nprobe": 10}}
for k in range(nq):
for i in range(len(vector_name_list)):
search_res_dict = {}
search_res_dict_array = []
vectors_search = vectors[k]
# 5. search to get the base line of hybrid_search
search_res = collection_w.search([vectors_search], vector_name_list[i],
single_search_param, default_limit,
default_search_exp,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
"ids": insert_ids,
"limit": default_limit})[0]
ids = search_res[0].ids
distance_array = search_res[0].distances
for j in range(len(ids)):
search_res_dict[ids[j]] = distance_array[j]
search_res_dict_array.append(search_res_dict)
search_res_dict_array_nq.append(search_res_dict_array)
# 6. calculate hybrid search base line
score_answer_nq = []
for k in range(nq):
ids_answer, score_answer = cf.get_hybrid_search_base_results(search_res_dict_array_nq[k], weights, metrics)
score_answer_nq.append(score_answer)
# 7. hybrid search
hybrid_res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1,
check_items={"nq": nq,
"ids": insert_ids,
"limit": limit})[0]
# 6. compare results through the re-calculated distances
for i in range(len(score_answer[:limit])):
delta = math.fabs(score_answer[i] - hybrid_res[0].distances[i])
assert delta < hybrid_search_epsilon
"limit": default_limit})[0]
# 8. compare results through the re-calculated distances
for k in range(len(score_answer_nq)):
for i in range(len(score_answer_nq[k][:default_limit])):
assert score_answer_nq[k][i] - hybrid_res[k].distances[i] < hybrid_search_epsilon
@pytest.mark.tags(CaseLabel.L1)
def test_hybrid_search_result_L2_order(self):
def test_hybrid_search_result_L2_order(self, nq):
"""
target: test hybrid search result having correct order for L2 distance
method: create connection, collection, insert and search
@ -12048,7 +12296,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
req_list = []
weights = [0.2, 0.3, 0.5]
for i in range(len(vector_name_list)):
vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)]
vectors = [[random.random() for _ in range(default_dim)] for _ in range(nq)]
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
@ -12059,11 +12307,12 @@ class TestCollectionHybridSearchValid(TestcaseBase):
req_list.append(req)
# 4. hybrid search
res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), 10)[0]
is_sorted_descend = lambda lst: all(lst[i] >= lst[i+1] for i in range(len(lst)-1))
assert is_sorted_descend(res[0].distances)
is_sorted_descend = lambda lst: all(lst[i] >= lst[i + 1] for i in range(len(lst) - 1))
for i in range(nq):
assert is_sorted_descend(res[i].distances)
@pytest.mark.tags(CaseLabel.L1)
def test_hybrid_search_result_order(self):
def test_hybrid_search_result_order(self, nq):
"""
target: test hybrid search result having correct order for cosine distance
method: create connection, collection, insert and search
@ -12079,7 +12328,7 @@ class TestCollectionHybridSearchValid(TestcaseBase):
req_list = []
weights = [0.2, 0.3, 0.5]
for i in range(len(vector_name_list)):
vectors = [[random.random() for _ in range(default_dim)] for _ in range(1)]
vectors = [[random.random() for _ in range(default_dim)] for _ in range(nq)]
search_param = {
"data": vectors,
"anns_field": vector_name_list[i],
@ -12091,4 +12340,5 @@ class TestCollectionHybridSearchValid(TestcaseBase):
# 4. hybrid search
res = collection_w.hybrid_search(req_list, WeightedRanker(*weights), 10)[0]
is_sorted_descend = lambda lst: all(lst[i] >= lst[i+1] for i in range(len(lst)-1))
assert is_sorted_descend(res[0].distances)
for i in range(nq):
assert is_sorted_descend(res[i].distances)