mirror of https://github.com/milvus-io/milvus.git
test: Share one collection for group search tests (#36427)
related issue: #36407 1. add partial load tests 2. use new test class to share one collection for all grouping search tests Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>pull/36410/head
parent
3b01b7dc9a
commit
1fb8b46db0
|
@ -0,0 +1,471 @@
|
|||
import pytest
|
||||
from base.client_base import TestcaseBase
|
||||
from common import common_func as cf
|
||||
from common import common_type as ct
|
||||
from common.common_type import CaseLabel, CheckTasks
|
||||
from utils.util_pymilvus import *
|
||||
|
||||
|
||||
class TestFieldPartialLoad(TestcaseBase):
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
def test_field_partial_load_default(self):
|
||||
"""
|
||||
target: test field partial load
|
||||
method:
|
||||
1. create a collection with fields
|
||||
2. index/not index fields to be loaded; index/not index fields to be skipped
|
||||
3. load a part of the fields
|
||||
expected:
|
||||
1. verify the collection loaded successfully
|
||||
2. verify the loaded fields can be searched in expr and output_fields
|
||||
3. verify the skipped fields not loaded, and cannot search with them in expr or output_fields
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 128
|
||||
nb = 2000
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
load_int64_field = cf.gen_int64_field(name="int64_load")
|
||||
not_load_int64_field = cf.gen_int64_field(name="int64_not_load")
|
||||
load_string_field = cf.gen_string_field(name="string_load")
|
||||
not_load_string_field = cf.gen_string_field(name="string_not_load")
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, not_load_int64_field,
|
||||
load_string_field, not_load_string_field, vector_field],
|
||||
auto_id=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
int_values = [i for i in range(nb)]
|
||||
string_values = [str(i) for i in range(nb)]
|
||||
float_vec_values = gen_vectors(nb, dim)
|
||||
collection_w.insert([int_values, int_values, string_values, string_values, float_vec_values])
|
||||
|
||||
# build index
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name, load_int64_field.name],
|
||||
replica_number=1)
|
||||
# search
|
||||
search_params = ct.default_search_params
|
||||
nq = 2
|
||||
search_vectors = float_vec_values[0:nq]
|
||||
res, _ = collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
limit=100, output_fields=["*"])
|
||||
assert pk_field.name in res[0][0].fields.keys() \
|
||||
and vector_field.name in res[0][0].fields.keys() \
|
||||
and load_string_field.name in res[0][0].fields.keys() \
|
||||
and load_int64_field.name in res[0][0].fields.keys() \
|
||||
and not_load_string_field.name not in res[0][0].fields.keys() \
|
||||
and not_load_int64_field.name not in res[0][0].fields.keys()
|
||||
|
||||
# release and reload with some other fields
|
||||
collection_w.release()
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name,
|
||||
not_load_string_field.name, not_load_int64_field.name])
|
||||
res, _ = collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
limit=100, output_fields=["*"])
|
||||
assert pk_field.name in res[0][0].fields.keys() \
|
||||
and vector_field.name in res[0][0].fields.keys() \
|
||||
and load_string_field.name not in res[0][0].fields.keys() \
|
||||
and load_int64_field.name not in res[0][0].fields.keys() \
|
||||
and not_load_string_field.name in res[0][0].fields.keys() \
|
||||
and not_load_int64_field.name in res[0][0].fields.keys()
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@pytest.mark.xfail(reason="issue #36353")
|
||||
def test_skip_load_dynamic_field(self):
|
||||
"""
|
||||
target: test skip load dynamic field
|
||||
method:
|
||||
1. create a collection with dynamic field
|
||||
2. load
|
||||
3. search on dynamic field in expr and/or output_fields
|
||||
expected: search successfully
|
||||
4. release and reload with skip load dynamic field=true
|
||||
5. search on dynamic field in expr and/or output_fields
|
||||
expected: raise exception
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 128
|
||||
nb = 2000
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
load_int64_field = cf.gen_int64_field(name="int64_load")
|
||||
load_string_field = cf.gen_string_field(name="string_load")
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, load_string_field, vector_field],
|
||||
auto_id=True, enable_dynamic_field=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
data = []
|
||||
for i in range(nb):
|
||||
data.append({
|
||||
f"{load_int64_field.name}": i,
|
||||
f"{load_string_field.name}": str(i),
|
||||
f"{vector_field.name}": [random.uniform(-1, 1) for _ in range(dim)],
|
||||
"color": i,
|
||||
"tag": i,
|
||||
})
|
||||
collection_w.insert(data)
|
||||
|
||||
# build index
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
collection_w.load()
|
||||
# search
|
||||
search_params = ct.default_search_params
|
||||
nq = 2
|
||||
search_vectors = cf.gen_vectors(nq, dim)
|
||||
res, _ = collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
expr="color > 0",
|
||||
limit=100, output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": nq, "limit": 100})
|
||||
|
||||
collection_w.release()
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name],
|
||||
skip_load_dynamic_field=True)
|
||||
error = {ct.err_code: 999, ct.err_msg: f"field color cannot be returned since dynamic field not loaded"}
|
||||
collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
limit=100, output_fields=["color"],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
error = {ct.err_code: 999, ct.err_msg: f"field color is dynamic but dynamic field is not loaded"}
|
||||
collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
expr="color > 0", limit=100, output_fields=["*"],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_skip_load_some_vector_fields(self):
|
||||
"""
|
||||
target: test skip load some vector fields
|
||||
method:
|
||||
1. create a collection with multiple vector fields
|
||||
2. not create index for skip load vector fields
|
||||
2. load some vector fields
|
||||
3. search on vector fields in expr and/or output_fields
|
||||
expected: search successfully
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 128
|
||||
nb = 2000
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
load_string_field = cf.gen_string_field(name="string_load")
|
||||
vector_field = cf.gen_float_vec_field(name="vec_float32", dim=dim)
|
||||
sparse_vector_field = cf.gen_float_vec_field(name="sparse", vector_data_type="SPARSE_FLOAT_VECTOR")
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, load_string_field, vector_field, sparse_vector_field],
|
||||
auto_id=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
string_values = [str(i) for i in range(nb)]
|
||||
float_vec_values = cf.gen_vectors(nb, dim)
|
||||
sparse_vec_values = cf.gen_vectors(nb, dim, vector_data_type="SPARSE_FLOAT_VECTOR")
|
||||
collection_w.insert([string_values, float_vec_values, sparse_vec_values])
|
||||
|
||||
# build index on one of vector fields
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
# not load sparse vector field
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name])
|
||||
# search
|
||||
search_params = ct.default_search_params
|
||||
nq = 2
|
||||
search_vectors = float_vec_values[0:nq]
|
||||
res, _ = collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
limit=100, output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": nq, "limit": 100})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@pytest.mark.xfail(reason="fail to load 2 partition with same load_fields #36037 ")
|
||||
def test_partial_load_with_partition(self):
|
||||
"""
|
||||
target: test partial load with partitions
|
||||
method:
|
||||
1. create a collection with fields
|
||||
2. create 2 partitions: p1, p2
|
||||
3. partial load p1
|
||||
4. search on p1
|
||||
5. load p2 with different fields
|
||||
expected: p2 load fail
|
||||
6. load p2 with the same partial fields
|
||||
7. search on p2
|
||||
expected: search successfully
|
||||
8. load the collection with all fields
|
||||
expected: load fail
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 128
|
||||
nb = 2000
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
load_int64_field = cf.gen_int64_field(name="int64_load")
|
||||
not_load_int64_field = cf.gen_int64_field(name="int64_not_load")
|
||||
load_string_field = cf.gen_string_field(name="string_load")
|
||||
not_load_string_field = cf.gen_string_field(name="string_not_load")
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, not_load_int64_field,
|
||||
load_string_field, not_load_string_field, vector_field],
|
||||
auto_id=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
p1 = self.init_partition_wrap(collection_w, name='p1')
|
||||
p2 = self.init_partition_wrap(collection_w, name='p2')
|
||||
int_values = [i for i in range(nb)]
|
||||
string_values = [str(i) for i in range(nb)]
|
||||
float_vec_values = gen_vectors(nb, dim)
|
||||
p1.insert([int_values, int_values, string_values, string_values, float_vec_values])
|
||||
p2.insert([int_values, int_values, string_values, string_values, float_vec_values])
|
||||
|
||||
# build index
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
# p1 load with partial fields
|
||||
p1.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name, load_int64_field.name])
|
||||
# search
|
||||
search_params = ct.default_search_params
|
||||
nq = 2
|
||||
search_vectors = float_vec_values[0:nq]
|
||||
res, _ = p1.search(data=search_vectors, anns_field=vector_field.name, params=search_params,
|
||||
limit=100, output_fields=["*"])
|
||||
assert pk_field.name in res[0][0].fields.keys() \
|
||||
and vector_field.name in res[0][0].fields.keys()
|
||||
# load p2 with different fields
|
||||
error = {ct.err_code: 999, ct.err_msg: f"can't change the load field list for loaded collection"}
|
||||
p2.load(load_fields=[pk_field.name, vector_field.name, not_load_string_field.name, not_load_int64_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
# load p2 with the same partial fields
|
||||
p2.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name, load_int64_field.name])
|
||||
res, _ = p2.search(data=search_vectors, anns_field=vector_field.name, params=search_params,
|
||||
limit=100, output_fields=["*"])
|
||||
assert pk_field.name in res[0][0].fields.keys() \
|
||||
and vector_field.name in res[0][0].fields.keys()
|
||||
|
||||
# load the collection with all fields
|
||||
collection_w.load(check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.search(data=search_vectors, anns_field=vector_field.name, params=search_params,
|
||||
limit=100, output_fields=["*"],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_skip_load_on_all_scalar_field_types(self):
|
||||
"""
|
||||
target: test skip load on all scalar field types
|
||||
method:
|
||||
1. create a collection with fields define skip load on all scalar field types
|
||||
expected:
|
||||
1. load and search successfully
|
||||
"""
|
||||
prefix = "partial_load"
|
||||
collection_w = self.init_collection_general(prefix, insert_data=True, is_index=True,
|
||||
is_all_data_type=True, with_json=True)[0]
|
||||
collection_w.release()
|
||||
# load with only pk field and vector field
|
||||
collection_w.load(load_fields=[ct.default_int64_field_name, ct.default_float_vec_field_name])
|
||||
search_vectors = cf.gen_vectors(1, ct.default_dim)
|
||||
search_params = {"params": {}}
|
||||
res = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name,
|
||||
param=search_params, limit=10, output_fields=["*"],
|
||||
check_tasks=CheckTasks.check_search_results, check_items={"nq": 1, "limit": 10})[0]
|
||||
assert len(res[0][0].fields.keys()) == 2
|
||||
|
||||
|
||||
class TestFieldPartialLoadInvalid(TestcaseBase):
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_skip_load_on_pk_field_or_vector_field(self):
|
||||
"""
|
||||
target: test skip load on pk field
|
||||
method:
|
||||
1. create a collection with fields define skip load on pk field
|
||||
expected:
|
||||
1. raise exception
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 32
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
load_int64_field = cf.gen_int64_field(name="int64_load")
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, vector_field], auto_id=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
# load without pk field
|
||||
error = {ct.err_code: 999, ct.err_msg: f"does not contain primary key field {pk_field.name}"}
|
||||
collection_w.load(load_fields=[vector_field.name, load_int64_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
error = {ct.err_code: 999, ct.err_msg: f"does not contain vector field"}
|
||||
collection_w.load(load_fields=[pk_field.name, load_int64_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_skip_load_on_partition_key_field(self):
|
||||
"""
|
||||
target: test skip load on partition key field
|
||||
method:
|
||||
1. create a collection with fields define skip load on partition key field
|
||||
expected:
|
||||
1. raise exception
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 32
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
partition_key_field = cf.gen_int64_field(name="int64_load", is_partition_key=True)
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, partition_key_field, vector_field], auto_id=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
# load without pk field
|
||||
error = {ct.err_code: 999, ct.err_msg: f"does not contain partition key field {partition_key_field.name}"}
|
||||
collection_w.load(load_fields=[vector_field.name, pk_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_skip_load_on_clustering_key_field(self):
|
||||
"""
|
||||
target: test skip load on clustering key field
|
||||
method:
|
||||
1. create a collection with fields define skip load on clustering key field
|
||||
expected:
|
||||
1. raise exception
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 32
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
clustering_key_field = cf.gen_int64_field(name="int64_load", is_clustering_key=True)
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, clustering_key_field, vector_field], auto_id=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
# load without pk field
|
||||
error = {ct.err_code: 999, ct.err_msg: f"does not contain clustering key field {clustering_key_field.name}"}
|
||||
collection_w.load(load_fields=[vector_field.name, pk_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_update_load_fields_list_when_reloading_collection(self):
|
||||
"""
|
||||
target: test update load fields list when reloading collection
|
||||
method:
|
||||
1. create a collection with fields
|
||||
2. load a part of the fields
|
||||
3. update load fields list when reloading collection
|
||||
expected:
|
||||
1. raise exception
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 32
|
||||
nb = 2000
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
not_load_int64_field = cf.gen_int64_field(name="not_int64_load")
|
||||
load_string_field = cf.gen_string_field(name="string_load")
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, not_load_int64_field, load_string_field, vector_field],
|
||||
auto_id=True, enable_dynamic_field=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
int_values = [i for i in range(nb)]
|
||||
string_values = [str(i) for i in range(nb)]
|
||||
float_vec_values = cf.gen_vectors(nb, dim)
|
||||
collection_w.insert([int_values, string_values, float_vec_values])
|
||||
|
||||
# build index
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name])
|
||||
# search
|
||||
search_params = ct.default_search_params
|
||||
nq = 1
|
||||
search_vectors = float_vec_values[0:nq]
|
||||
collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
limit=10, output_fields=[load_string_field.name],
|
||||
check_task=CheckTasks.check_search_results, check_items={"nq": nq, "limit": 10})
|
||||
# try to add more fields in load fields list when reloading
|
||||
error = {ct.err_code: 999, ct.err_msg: f"can't change the load field list for loaded collection"}
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name,
|
||||
load_string_field.name, not_load_int64_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
# try to remove fields in load fields list when reloading
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_one_of_dynamic_fields_in_load_fields_list(self):
|
||||
"""
|
||||
target: test one of dynamic fields in load fields list
|
||||
method:
|
||||
1. create a collection with fields
|
||||
3. add one of dynamic fields in load fields list when loading
|
||||
expected: raise exception
|
||||
4. add non_existing field in load fields list when loading
|
||||
expected: raise exception
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 32
|
||||
nb = 2000
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
load_int64_field = cf.gen_int64_field(name="int64_load")
|
||||
load_string_field = cf.gen_string_field(name="string_load")
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, load_int64_field, load_string_field, vector_field],
|
||||
auto_id=True, enable_dynamic_field=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
data = []
|
||||
for i in range(nb):
|
||||
data.append({
|
||||
f"{load_int64_field.name}": i,
|
||||
f"{load_string_field.name}": str(i),
|
||||
f"{vector_field.name}": [random.uniform(-1, 1) for _ in range(dim)],
|
||||
"color": i,
|
||||
"tag": i,
|
||||
})
|
||||
collection_w.insert(data)
|
||||
# build index
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
# add one of dynamic fields in load fields list
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f"failed to get field schema by name: fieldName(color) not found"}
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name, "color"],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
# add non_existing field in load fields list
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f"failed to get field schema by name: fieldName(not_existing) not found"}
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name, "not_existing"],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_search_on_not_loaded_fields(self):
|
||||
"""
|
||||
target: test search on skipped fields
|
||||
method:
|
||||
1. create a collection with fields
|
||||
2. load a part of the fields
|
||||
3. search on skipped fields in expr and/or output_fields
|
||||
expected:
|
||||
1. raise exception
|
||||
"""
|
||||
self._connect()
|
||||
name = cf.gen_unique_str()
|
||||
dim = 32
|
||||
nb = 2000
|
||||
pk_field = cf.gen_int64_field(name='pk', is_primary=True)
|
||||
not_load_int64_field = cf.gen_int64_field(name="not_int64_load")
|
||||
load_string_field = cf.gen_string_field(name="string_load")
|
||||
vector_field = cf.gen_float_vec_field(dim=dim)
|
||||
schema = cf.gen_collection_schema(fields=[pk_field, not_load_int64_field, load_string_field, vector_field],
|
||||
auto_id=True, enable_dynamic_field=True)
|
||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
||||
int_values = [i for i in range(nb)]
|
||||
string_values = [str(i) for i in range(nb)]
|
||||
float_vec_values = cf.gen_vectors(nb, dim)
|
||||
collection_w.insert([int_values, string_values, float_vec_values])
|
||||
|
||||
# build index
|
||||
collection_w.create_index(field_name=vector_field.name, index_params=ct.default_index)
|
||||
collection_w.load(load_fields=[pk_field.name, vector_field.name, load_string_field.name])
|
||||
# search
|
||||
search_params = ct.default_search_params
|
||||
nq = 1
|
||||
search_vectors = float_vec_values[0:nq]
|
||||
error = {ct.err_code: 999, ct.err_msg: f"field {not_load_int64_field.name} is not loaded"}
|
||||
collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
limit=10, output_fields=[not_load_int64_field.name, load_string_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
error = {ct.err_code: 999, ct.err_msg: f"cannot parse expression"}
|
||||
collection_w.search(data=search_vectors, anns_field=vector_field.name, param=search_params,
|
||||
expr=f"{not_load_int64_field.name} > 0",
|
||||
limit=10, output_fields=[load_string_field.name],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
|
@ -2,10 +2,14 @@ import re
|
|||
import math # do not remove `math`
|
||||
import pytest
|
||||
from pymilvus import DataType, AnnSearchRequest, RRFRanker
|
||||
import numpy as np
|
||||
import random
|
||||
from pymilvus import AnnSearchRequest, RRFRanker, WeightedRanker
|
||||
|
||||
from common.common_type import CaseLabel, CheckTasks
|
||||
from common import common_type as ct
|
||||
from common import common_func as cf
|
||||
from utils.util_log import test_log as log
|
||||
from common.code_mapping import QueryErrorMessage as qem
|
||||
from common.common_params import (
|
||||
FieldParams, MetricType, DefaultVectorIndexParams, DefaultScalarIndexParams, Expr, AlterIndexParams
|
||||
|
@ -1156,3 +1160,349 @@ class TestMixScenes(TestcaseBase):
|
|||
# re-query
|
||||
self.collection_wrap.query(expr=expr, output_fields=scalar_fields, check_task=CheckTasks.check_query_results,
|
||||
check_items={"exp_res": []})
|
||||
|
||||
|
||||
@pytest.mark.xdist_group("TestMultiVectorsGroupSearch")
|
||||
class TestGroupSearch(TestCaseClassBase):
|
||||
"""
|
||||
Testing group search scenarios
|
||||
1. collection schema: int64_pk(auto_id), float16_vector, float_vector, bfloat16_vector, varchar
|
||||
2. varchar field is inserted with dup values for group by
|
||||
3. index for each vector field with different index types, dims and metric types
|
||||
Author: Yanliang567
|
||||
"""
|
||||
def setup_class(self):
|
||||
super().setup_class(self)
|
||||
|
||||
# connect to server before testing
|
||||
self._connect(self)
|
||||
|
||||
# init params
|
||||
self.primary_field = "int64_pk"
|
||||
self.indexed_string_field = "varchar_with_index"
|
||||
|
||||
# create a collection with fields
|
||||
self.collection_wrap.init_collection(
|
||||
name=cf.gen_unique_str("test_group_search"),
|
||||
schema=cf.set_collection_schema(
|
||||
fields=[DataType.VARCHAR.name, self.primary_field, DataType.FLOAT16_VECTOR.name,
|
||||
DataType.FLOAT_VECTOR.name, DataType.BFLOAT16_VECTOR.name,
|
||||
DataType.INT8.name, DataType.INT64.name, DataType.BOOL.name,
|
||||
self.indexed_string_field],
|
||||
field_params={
|
||||
self.primary_field: FieldParams(is_primary=True).to_dict,
|
||||
DataType.FLOAT16_VECTOR.name: FieldParams(dim=31).to_dict,
|
||||
DataType.FLOAT_VECTOR.name: FieldParams(dim=64).to_dict,
|
||||
DataType.BFLOAT16_VECTOR.name: FieldParams(dim=24).to_dict,
|
||||
},
|
||||
auto_id=True
|
||||
)
|
||||
)
|
||||
|
||||
self.vector_fields = [DataType.FLOAT16_VECTOR.name, DataType.FLOAT_VECTOR.name, DataType.BFLOAT16_VECTOR.name]
|
||||
self.dims = [31, 64, 24]
|
||||
self.index_types = ["IVF_SQ8", "HNSW", "IVF_FLAT"]
|
||||
|
||||
@pytest.fixture(scope="class", autouse=True)
|
||||
def prepare_data(self):
|
||||
# prepare data (> 1024 triggering index building)
|
||||
import pandas as pd
|
||||
nb = 100
|
||||
for _ in range(100):
|
||||
string_values = pd.Series(data=[str(i) for i in range(nb)], dtype="string")
|
||||
data = [string_values]
|
||||
for i in range(len(self.vector_fields)):
|
||||
data.append(cf.gen_vectors(dim=self.dims[i], nb=nb, vector_data_type=self.vector_fields[i]))
|
||||
data.append(pd.Series(data=[np.int8(i) for i in range(nb)], dtype="int8"))
|
||||
data.append(pd.Series(data=[np.int64(i) for i in range(nb)], dtype="int64"))
|
||||
data.append(pd.Series(data=[np.bool_(i) for i in range(nb)], dtype="bool"))
|
||||
data.append(pd.Series(data=[str(i) for i in range(nb)], dtype="string"))
|
||||
self.collection_wrap.insert(data)
|
||||
|
||||
# flush collection, segment sealed
|
||||
self.collection_wrap.flush()
|
||||
|
||||
# build index for each vector field
|
||||
index_params = {
|
||||
**DefaultVectorIndexParams.IVF_SQ8(DataType.FLOAT16_VECTOR.name, metric_type=MetricType.L2),
|
||||
**DefaultVectorIndexParams.HNSW(DataType.FLOAT_VECTOR.name, metric_type=MetricType.IP),
|
||||
**DefaultVectorIndexParams.IVF_FLAT(DataType.BFLOAT16_VECTOR.name, metric_type=MetricType.COSINE),
|
||||
# index params for varchar field
|
||||
**DefaultScalarIndexParams.INVERTED(self.indexed_string_field)
|
||||
}
|
||||
|
||||
self.build_multi_index(index_params=index_params)
|
||||
assert sorted([n.field_name for n in self.collection_wrap.indexes]) == sorted(index_params.keys())
|
||||
|
||||
# load collection
|
||||
self.collection_wrap.load()
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.xfail(reason="issue #36407")
|
||||
@pytest.mark.parametrize("group_by_field", [DataType.VARCHAR.name, "varchar_with_index"])
|
||||
def test_search_group_size(self, group_by_field):
|
||||
"""
|
||||
target:
|
||||
1. search on 3 different float vector fields with group by varchar field with group size
|
||||
verify results entity = limit * group_size and group size is full if group_strict_size is True
|
||||
verify results group counts = limit if group_strict_size is False
|
||||
"""
|
||||
nq = 2
|
||||
limit = 50
|
||||
group_size = 5
|
||||
for j in range(len(self.vector_fields)):
|
||||
search_vectors = cf.gen_vectors(nq, dim=self.dims[j], vector_data_type=self.vector_fields[j])
|
||||
search_params = {"params": cf.get_search_params_params(self.index_types[j])}
|
||||
# when group_strict_size=true, it shall return results with entities = limit * group_size
|
||||
res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j],
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=group_by_field,
|
||||
group_size=group_size, group_strict_size=True,
|
||||
output_fields=[group_by_field])[0]
|
||||
for i in range(nq):
|
||||
assert len(res1[i]) == limit * group_size
|
||||
for l in range(limit):
|
||||
group_values = []
|
||||
for k in range(group_size):
|
||||
group_values.append(res1[i][l*group_size+k].fields.get(group_by_field))
|
||||
assert len(set(group_values)) == 1
|
||||
|
||||
# when group_strict_size=false, it shall return results with group counts = limit
|
||||
res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j],
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=group_by_field,
|
||||
group_size=group_size, group_strict_size=False,
|
||||
output_fields=[group_by_field])[0]
|
||||
for i in range(nq):
|
||||
group_values = []
|
||||
for l in range(len(res1[i])):
|
||||
group_values.append(res1[i][l].fields.get(group_by_field))
|
||||
assert len(set(group_values)) == limit
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.xfail(reason="issue #36407")
|
||||
def test_hybrid_search_group_size(self):
|
||||
"""
|
||||
hybrid search group by on 3 different float vector fields with group by varchar field with group size
|
||||
verify results returns with de-dup group values and group distances are in order as rank_group_scorer
|
||||
"""
|
||||
nq = 2
|
||||
limit = 50
|
||||
group_size = 5
|
||||
req_list = []
|
||||
for j in range(len(self.vector_fields)):
|
||||
search_params = {
|
||||
"data": cf.gen_vectors(nq, dim=self.dims[j], vector_data_type=self.vector_fields[j]),
|
||||
"anns_field": self.vector_fields[j],
|
||||
"param": {"params": cf.get_search_params_params(self.index_types[j])},
|
||||
"limit": limit,
|
||||
"expr": f"{self.primary_field} > 0"}
|
||||
req = AnnSearchRequest(**search_params)
|
||||
req_list.append(req)
|
||||
# 4. hybrid search group by
|
||||
rank_scorers = ["max", "avg", "sum"]
|
||||
for scorer in rank_scorers:
|
||||
res = self.collection_wrap.hybrid_search(req_list, WeightedRanker(0.3, 0.3, 0.3), limit=limit,
|
||||
group_by_field=DataType.VARCHAR.name,
|
||||
group_size=group_size, rank_group_scorer=scorer,
|
||||
output_fields=[DataType.VARCHAR.name])[0]
|
||||
for i in range(nq):
|
||||
group_values = []
|
||||
for l in range(len(res[i])):
|
||||
group_values.append(res[i][l].fields.get(DataType.VARCHAR.name))
|
||||
assert len(set(group_values)) == limit
|
||||
|
||||
# group_distances = []
|
||||
tmp_distances = [100 for _ in range(group_size)] # init with a large value
|
||||
group_distances = [res[i][0].distance] # init with the first value
|
||||
for l in range(len(res[i]) - 1):
|
||||
curr_group_value = res[i][l].fields.get(DataType.VARCHAR.name)
|
||||
next_group_value = res[i][l + 1].fields.get(DataType.VARCHAR.name)
|
||||
if curr_group_value == next_group_value:
|
||||
group_distances.append(res[i][l + 1].distance)
|
||||
else:
|
||||
if scorer == 'sum':
|
||||
assert np.sum(group_distances) < np.sum(tmp_distances)
|
||||
elif scorer == 'avg':
|
||||
assert np.mean(group_distances) < np.mean(tmp_distances)
|
||||
else: # default max
|
||||
assert np.max(group_distances) < np.max(tmp_distances)
|
||||
|
||||
tmp_distances = group_distances
|
||||
group_distances = [res[i][l + 1].distance]
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@pytest.mark.xfail(reason="issue #36407")
|
||||
def test_hybrid_search_group_by(self):
|
||||
"""
|
||||
verify hybrid search group by works with different Rankers
|
||||
"""
|
||||
# 3. prepare search params
|
||||
req_list = []
|
||||
for i in range(len(self.vector_fields)):
|
||||
search_param = {
|
||||
"data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], vector_data_type=self.vector_fields[i]),
|
||||
"anns_field": self.vector_fields[i],
|
||||
"param": {},
|
||||
"limit": ct.default_limit,
|
||||
"expr": f"{self.primary_field} > 0"}
|
||||
req = AnnSearchRequest(**search_param)
|
||||
req_list.append(req)
|
||||
# 4. hybrid search group by
|
||||
res = self.collection_wrap.hybrid_search(req_list, WeightedRanker(0.1, 0.9, 0.2), ct.default_limit,
|
||||
group_by_field=DataType.VARCHAR.name,
|
||||
output_fields=[DataType.VARCHAR.name],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": ct.default_nq, "limit": ct.default_limit})[0]
|
||||
print(res)
|
||||
for i in range(ct.default_nq):
|
||||
group_values = []
|
||||
for l in range(ct.default_limit):
|
||||
group_values.append(res[i][l].fields.get(DataType.VARCHAR.name))
|
||||
assert len(group_values) == len(set(group_values))
|
||||
|
||||
# 5. hybrid search with RRFRanker on one vector field with group by
|
||||
req_list = []
|
||||
for i in range(1, len(self.vector_fields)):
|
||||
search_param = {
|
||||
"data": cf.gen_vectors(ct.default_nq, dim=self.dims[i], vector_data_type=self.vector_fields[i]),
|
||||
"anns_field": self.vector_fields[i],
|
||||
"param": {},
|
||||
"limit": ct.default_limit,
|
||||
"expr": f"{self.primary_field} > 0"}
|
||||
req = AnnSearchRequest(**search_param)
|
||||
req_list.append(req)
|
||||
self.collection_wrap.hybrid_search(req_list, RRFRanker(), ct.default_limit,
|
||||
group_by_field=self.indexed_string_field,
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": ct.default_nq, "limit": ct.default_limit})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.parametrize("support_field", [DataType.INT8.name, DataType.INT64.name,
|
||||
DataType.BOOL.name, DataType.VARCHAR.name])
|
||||
def test_search_group_by_supported_scalars(self, support_field):
|
||||
"""
|
||||
verify search group by works with supported scalar fields
|
||||
"""
|
||||
nq = 2
|
||||
limit = 15
|
||||
for j in range(len(self.vector_fields)):
|
||||
search_vectors = cf.gen_vectors(nq, dim=self.dims[j], vector_data_type=self.vector_fields[j])
|
||||
search_params = {"params": cf.get_search_params_params(self.index_types[j])}
|
||||
res1 = self.collection_wrap.search(data=search_vectors, anns_field=self.vector_fields[j],
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=support_field,
|
||||
output_fields=[support_field])[0]
|
||||
for i in range(nq):
|
||||
grpby_values = []
|
||||
dismatch = 0
|
||||
results_num = 2 if support_field == DataType.BOOL.name else limit
|
||||
for l in range(results_num):
|
||||
top1 = res1[i][l]
|
||||
top1_grpby_pk = top1.id
|
||||
top1_grpby_value = top1.fields.get(support_field)
|
||||
expr = f"{support_field}=={top1_grpby_value}"
|
||||
if support_field == DataType.VARCHAR.name:
|
||||
expr = f"{support_field}=='{top1_grpby_value}'"
|
||||
grpby_values.append(top1_grpby_value)
|
||||
res_tmp = self.collection_wrap.search(data=[search_vectors[i]], anns_field=self.vector_fields[j],
|
||||
param=search_params, limit=1, expr=expr,
|
||||
output_fields=[support_field])[0]
|
||||
top1_expr_pk = res_tmp[0][0].id
|
||||
if top1_grpby_pk != top1_expr_pk:
|
||||
dismatch += 1
|
||||
log.info(f"{support_field} on {self.vector_fields[j]} dismatch_item, top1_grpby_dis: {top1.distance}, top1_expr_dis: {res_tmp[0][0].distance}")
|
||||
log.info(f"{support_field} on {self.vector_fields[j]} top1_dismatch_num: {dismatch}, results_num: {results_num}, dismatch_rate: {dismatch / results_num}")
|
||||
baseline = 1 if support_field == DataType.BOOL.name else 0.2 # skip baseline check for boolean
|
||||
assert dismatch / results_num <= baseline
|
||||
# verify no dup values of the group_by_field in results
|
||||
assert len(grpby_values) == len(set(grpby_values))
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_search_pagination_group_by(self):
|
||||
"""
|
||||
verify search group by works with pagination
|
||||
"""
|
||||
limit = 10
|
||||
page_rounds = 3
|
||||
search_param = {}
|
||||
default_search_exp = f"{self.primary_field} >= 0"
|
||||
grpby_field = self.indexed_string_field
|
||||
default_search_field = self.vector_fields[1]
|
||||
search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=self.vector_fields[1])
|
||||
all_pages_ids = []
|
||||
all_pages_grpby_field_values = []
|
||||
for r in range(page_rounds):
|
||||
page_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field,
|
||||
param=search_param, limit=limit, offset=limit * r,
|
||||
expr=default_search_exp, group_by_field=grpby_field,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit},
|
||||
)[0]
|
||||
for j in range(limit):
|
||||
all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field))
|
||||
all_pages_ids += page_res[0].ids
|
||||
hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3)
|
||||
assert hit_rate >= 0.8
|
||||
|
||||
total_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field,
|
||||
param=search_param, limit=limit * page_rounds,
|
||||
expr=default_search_exp, group_by_field=grpby_field,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit * page_rounds}
|
||||
)[0]
|
||||
hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids)))
|
||||
hit_rate = round(hit_num / (limit * page_rounds), 3)
|
||||
assert hit_rate >= 0.8
|
||||
log.info(f"search pagination with groupby hit_rate: {hit_rate}")
|
||||
grpby_field_values = []
|
||||
for i in range(limit * page_rounds):
|
||||
grpby_field_values.append(total_res[0][i].fields.get(grpby_field))
|
||||
assert len(grpby_field_values) == len(set(grpby_field_values))
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.skip(reason="issue #36401")
|
||||
def test_search_pagination_group_size(self):
|
||||
limit = 10
|
||||
group_size = 5
|
||||
page_rounds = 3
|
||||
search_param = {}
|
||||
default_search_exp = f"{self.primary_field} >= 0"
|
||||
grpby_field = self.indexed_string_field
|
||||
default_search_field = self.vector_fields[1]
|
||||
search_vectors = cf.gen_vectors(1, dim=self.dims[1], vector_data_type=self.vector_fields[1])
|
||||
all_pages_ids = []
|
||||
all_pages_grpby_field_values = []
|
||||
for r in range(page_rounds):
|
||||
page_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field,
|
||||
param=search_param, limit=limit, offset=limit * r,
|
||||
expr=default_search_exp,
|
||||
group_by_field=grpby_field, group_size=group_size,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit},
|
||||
)[0]
|
||||
for j in range(limit):
|
||||
all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field))
|
||||
all_pages_ids += page_res[0].ids
|
||||
hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3)
|
||||
assert hit_rate >= 0.8
|
||||
|
||||
total_res = self.collection_wrap.search(search_vectors, anns_field=default_search_field,
|
||||
param=search_param, limit=limit * page_rounds,
|
||||
expr=default_search_exp,
|
||||
group_by_field=grpby_field, group_size=group_size,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit * page_rounds}
|
||||
)[0]
|
||||
hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids)))
|
||||
hit_rate = round(hit_num / (limit * page_rounds), 3)
|
||||
assert hit_rate >= 0.8
|
||||
log.info(f"search pagination with groupby hit_rate: {hit_rate}")
|
||||
grpby_field_values = []
|
||||
for i in range(limit * page_rounds):
|
||||
grpby_field_values.append(total_res[0][i].fields.get(grpby_field))
|
||||
assert len(grpby_field_values) == len(set(grpby_field_values))
|
|
@ -10240,202 +10240,6 @@ class TestSearchIterator(TestcaseBase):
|
|||
class TestSearchGroupBy(TestcaseBase):
|
||||
""" Test case of search group by """
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.parametrize("index_type, metric", zip(["FLAT", "IVF_FLAT", "HNSW"], ct.float_metrics))
|
||||
@pytest.mark.parametrize("vector_data_type", ["FLOAT16_VECTOR", "FLOAT_VECTOR", "BFLOAT16_VECTOR"])
|
||||
def test_search_group_by_default(self, index_type, metric, vector_data_type):
|
||||
"""
|
||||
target: test search group by
|
||||
method: 1. create a collection with data
|
||||
2. create index with different metric types
|
||||
3. search with group by
|
||||
verify no duplicate values for group_by_field
|
||||
4. search with filtering every value of group_by_field
|
||||
verify: verify that every record in groupby results is the top1 for that value of the group_by_field
|
||||
"""
|
||||
collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False,
|
||||
vector_data_type=vector_data_type,
|
||||
is_all_data_type=True, with_json=False)[0]
|
||||
_index_params = {"index_type": index_type, "metric_type": metric, "params": {"M": 16, "efConstruction": 128}}
|
||||
if index_type in ["IVF_FLAT", "FLAT"]:
|
||||
_index_params = {"index_type": index_type, "metric_type": metric, "params": {"nlist": 128}}
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params=_index_params)
|
||||
# insert with the same values for scalar fields
|
||||
for _ in range(50):
|
||||
data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False)
|
||||
collection_w.insert(data)
|
||||
|
||||
collection_w.flush()
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params=_index_params)
|
||||
collection_w.load()
|
||||
|
||||
search_params = {"metric_type": metric, "params": {"ef": 128}}
|
||||
nq = 2
|
||||
limit = 15
|
||||
search_vectors = cf.gen_vectors(nq, dim=ct.default_dim)
|
||||
# # verify the results are same if gourp by pk
|
||||
# res1 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name,
|
||||
# param=search_params, limit=limit, consistency_level=CONSISTENCY_STRONG,
|
||||
# group_by_field=ct.default_int64_field_name)[0]
|
||||
# res2 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name,
|
||||
# param=search_params, limit=limit, consistency_level=CONSISTENCY_STRONG)[0]
|
||||
# hits_num = 0
|
||||
# for i in range(nq):
|
||||
# assert res1[i].ids == res2[i].ids
|
||||
# hits_num += len(set(res1[i].ids).intersection(set(res2[i].ids)))
|
||||
# hit_rate = hits_num / (nq * limit)
|
||||
# log.info(f"groupy primary key hits_num: {hits_num}, nq: {nq}, limit: {limit}, hit_rate: {hit_rate}")
|
||||
# assert hit_rate >= 0.60
|
||||
|
||||
# verify that every record in groupby results is the top1 for that value of the group_by_field
|
||||
supported_grpby_fields = [ct.default_int8_field_name, ct.default_int16_field_name,
|
||||
ct.default_int32_field_name, ct.default_bool_field_name,
|
||||
ct.default_string_field_name]
|
||||
for grpby_field in supported_grpby_fields:
|
||||
res1 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name,
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=grpby_field,
|
||||
output_fields=[grpby_field])[0]
|
||||
for i in range(nq):
|
||||
grpby_values = []
|
||||
dismatch = 0
|
||||
results_num = 2 if grpby_field == ct.default_bool_field_name else limit
|
||||
for l in range(results_num):
|
||||
top1 = res1[i][l]
|
||||
top1_grpby_pk = top1.id
|
||||
top1_grpby_value = top1.fields.get(grpby_field)
|
||||
expr = f"{grpby_field}=={top1_grpby_value}"
|
||||
if grpby_field == ct.default_string_field_name:
|
||||
expr = f"{grpby_field}=='{top1_grpby_value}'"
|
||||
grpby_values.append(top1_grpby_value)
|
||||
res_tmp = collection_w.search(data=[search_vectors[i]], anns_field=ct.default_float_vec_field_name,
|
||||
param=search_params, limit=1,
|
||||
expr=expr,
|
||||
output_fields=[grpby_field])[0]
|
||||
top1_expr_pk = res_tmp[0][0].id
|
||||
if top1_grpby_pk != top1_expr_pk:
|
||||
dismatch += 1
|
||||
log.info(f"{grpby_field} on {metric} dismatch_item, top1_grpby_dis: {top1.distance}, top1_expr_dis: {res_tmp[0][0].distance}")
|
||||
log.info(f"{grpby_field} on {metric} top1_dismatch_num: {dismatch}, results_num: {results_num}, dismatch_rate: {dismatch / results_num}")
|
||||
baseline = 1 if grpby_field == ct.default_bool_field_name else 0.2 # skip baseline check for boolean
|
||||
assert dismatch / results_num <= baseline
|
||||
# verify no dup values of the group_by_field in results
|
||||
assert len(grpby_values) == len(set(grpby_values))
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
def test_search_group_size_default(self):
|
||||
"""
|
||||
target: test search group by
|
||||
method: 1. create a collection with 3 different float vectors
|
||||
2. build index with 3 different index types and metrics
|
||||
2. search on 3 different float vector fields with group by varchar field with group size
|
||||
verify results entity = limit * group_size and group size is full if group_strict_size is True
|
||||
verify results group counts = limit if group_strict_size is False
|
||||
"""
|
||||
self._connect()
|
||||
dense_types = ["FLOAT16_VECTOR", "FLOAT_VECTOR", "BFLOAT16_VECTOR"]
|
||||
dims = [16, 128, 64]
|
||||
index_types = ["FLAT", "IVF_SQ8", "HNSW"]
|
||||
metrics = ct.float_metrics
|
||||
fields = [cf.gen_int64_field(is_primary=True), cf.gen_string_field()]
|
||||
for i in range(len(dense_types)):
|
||||
fields.append(cf.gen_float_vec_field(name=dense_types[i],
|
||||
vector_data_type=dense_types[i], dim=dims[i]))
|
||||
schema = cf.gen_collection_schema(fields, auto_id=True)
|
||||
collection_w = self.init_collection_wrap(name=prefix, schema=schema)
|
||||
|
||||
# insert with the same values for scalar fields
|
||||
nb = 100
|
||||
for _ in range(100):
|
||||
string_values = pd.Series(data=[str(i) for i in range(nb)], dtype="string")
|
||||
data = [string_values]
|
||||
for i in range(len(dense_types)):
|
||||
data.append(cf.gen_vectors(dim=dims[i], nb=nb, vector_data_type=dense_types[i]))
|
||||
collection_w.insert(data)
|
||||
|
||||
collection_w.flush()
|
||||
for i in range(len(dense_types)):
|
||||
_index_params = {"index_type": index_types[i], "metric_type": metrics[i],
|
||||
"params": cf.get_index_params_params(index_types[i])}
|
||||
collection_w.create_index(dense_types[i], _index_params)
|
||||
collection_w.load()
|
||||
|
||||
nq = 2
|
||||
limit = 50
|
||||
group_size = 5
|
||||
for j in range(len(dense_types)):
|
||||
search_vectors = cf.gen_vectors(nq, dim=dims[j], vector_data_type=dense_types[j])
|
||||
search_params = {"params": cf.get_search_params_params(index_types[j])}
|
||||
# when group_strict_size=true, it shall return results with entities = limit * group_size
|
||||
res1 = collection_w.search(data=search_vectors, anns_field=dense_types[j],
|
||||
param=search_params, limit=limit, # consistency_level=CONSISTENCY_STRONG,
|
||||
group_by_field=ct.default_string_field_name,
|
||||
group_size=group_size, group_strict_size=True,
|
||||
output_fields=[ct.default_string_field_name])[0]
|
||||
for i in range(nq):
|
||||
for l in range(limit):
|
||||
group_values = []
|
||||
for k in range(10):
|
||||
group_values.append(res1[i][l].fields.get(ct.default_string_field_name))
|
||||
assert len(set(group_values)) == 1
|
||||
assert len(res1[i]) == limit * group_size
|
||||
|
||||
# when group_strict_size=false, it shall return results with group counts = limit
|
||||
res1 = collection_w.search(data=search_vectors, anns_field=dense_types[j],
|
||||
param=search_params, limit=limit, # consistency_level=CONSISTENCY_STRONG,
|
||||
group_by_field=ct.default_string_field_name,
|
||||
group_size=group_size, group_strict_size=False,
|
||||
output_fields=[ct.default_string_field_name])[0]
|
||||
for i in range(nq):
|
||||
group_values = []
|
||||
for l in range(len(res1[i])):
|
||||
group_values.append(res1[i][l].fields.get(ct.default_string_field_name))
|
||||
assert len(set(group_values)) == limit
|
||||
|
||||
# hybrid search group by
|
||||
req_list = []
|
||||
for j in range(len(dense_types)):
|
||||
search_params = {
|
||||
"data": cf.gen_vectors(nq, dim=dims[j], vector_data_type=dense_types[j]),
|
||||
"anns_field": dense_types[j],
|
||||
"param": {"params": cf.get_search_params_params(index_types[j])},
|
||||
"limit": limit,
|
||||
"expr": "int64 > 0"}
|
||||
req = AnnSearchRequest(**search_params)
|
||||
req_list.append(req)
|
||||
# 4. hybrid search group by
|
||||
import numpy as np
|
||||
rank_scorers = ["max", "avg", "sum"]
|
||||
for scorer in rank_scorers:
|
||||
res = collection_w.hybrid_search(req_list, WeightedRanker(0.3, 0.3, 0.3), limit=limit,
|
||||
group_by_field=ct.default_string_field_name,
|
||||
group_size=group_size, rank_group_scorer=scorer,
|
||||
output_fields=[ct.default_string_field_name])[0]
|
||||
for i in range(nq):
|
||||
group_values = []
|
||||
for l in range(len(res[i])):
|
||||
group_values.append(res[i][l].fields.get(ct.default_string_field_name))
|
||||
assert len(set(group_values)) == limit
|
||||
|
||||
# group_distances = []
|
||||
tmp_distances = [100 for _ in range(group_size)] # init with a large value
|
||||
group_distances = [res[i][0].distance] # init with the first value
|
||||
for l in range(len(res[i])-1):
|
||||
curr_group_value = res[i][l].fields.get(ct.default_string_field_name)
|
||||
next_group_value = res[i][l+1].fields.get(ct.default_string_field_name)
|
||||
if curr_group_value == next_group_value:
|
||||
group_distances.append(res[i][l+1].distance)
|
||||
else:
|
||||
if scorer == 'sum':
|
||||
assert np.sum(group_distances) < np.sum(tmp_distances)
|
||||
elif scorer == 'avg':
|
||||
assert np.mean(group_distances) < np.mean(tmp_distances)
|
||||
else: # default max
|
||||
assert np.max(group_distances) < np.max(tmp_distances)
|
||||
|
||||
tmp_distances = group_distances
|
||||
group_distances = [res[i][l+1].distance]
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_search_max_group_size_and_max_limit(self):
|
||||
"""
|
||||
|
@ -10501,70 +10305,6 @@ class TestSearchGroupBy(TestcaseBase):
|
|||
check_task=CheckTasks.err_res,
|
||||
check_items={"err_code": err_code, "err_msg": err_msg})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("grpby_field", [ct.default_string_field_name, ct.default_int8_field_name])
|
||||
def test_search_group_by_with_field_indexed(self, grpby_field):
|
||||
"""
|
||||
target: test search group by with the field indexed
|
||||
method: 1. create a collection with data
|
||||
2. create index for the vector field and the groupby field
|
||||
3. search with group by
|
||||
4. search with filtering every value of group_by_field
|
||||
verify: verify that every record in groupby results is the top1 for that value of the group_by_field
|
||||
"""
|
||||
metric = "COSINE"
|
||||
collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False,
|
||||
is_all_data_type=True, with_json=False)[0]
|
||||
_index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}}
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params=_index)
|
||||
# insert with the same values(by insert rounds) for scalar fields
|
||||
for _ in range(50):
|
||||
data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False)
|
||||
collection_w.insert(data)
|
||||
|
||||
collection_w.flush()
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params=_index)
|
||||
collection_w.create_index(grpby_field)
|
||||
collection_w.load()
|
||||
|
||||
search_params = {"metric_type": metric, "params": {"ef": 128}}
|
||||
nq = 2
|
||||
limit = 20
|
||||
search_vectors = cf.gen_vectors(nq, dim=ct.default_dim)
|
||||
|
||||
# verify that every record in groupby results is the top1 for that value of the group_by_field
|
||||
res1 = collection_w.search(data=search_vectors, anns_field=ct.default_float_vec_field_name,
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=grpby_field,
|
||||
output_fields=[grpby_field])[0]
|
||||
for i in range(nq):
|
||||
grpby_values = []
|
||||
dismatch = 0
|
||||
results_num = 2 if grpby_field == ct.default_bool_field_name else limit
|
||||
for l in range(results_num):
|
||||
top1 = res1[i][l]
|
||||
top1_grpby_pk = top1.id
|
||||
top1_grpby_value = top1.fields.get(grpby_field)
|
||||
expr = f"{grpby_field}=={top1_grpby_value}"
|
||||
if grpby_field == ct.default_string_field_name:
|
||||
expr = f"{grpby_field}=='{top1_grpby_value}'"
|
||||
grpby_values.append(top1_grpby_value)
|
||||
res_tmp = collection_w.search(data=[search_vectors[i]], anns_field=ct.default_float_vec_field_name,
|
||||
param=search_params, limit=1,
|
||||
expr=expr,
|
||||
output_fields=[grpby_field])[0]
|
||||
top1_expr_pk = res_tmp[0][0].id
|
||||
log.info(f"nq={i}, limit={l}")
|
||||
# assert top1_grpby_pk == top1_expr_pk
|
||||
if top1_grpby_pk != top1_expr_pk:
|
||||
dismatch += 1
|
||||
log.info(f"{grpby_field} on {metric} dismatch_item, top1_grpby_dis: {top1.distance}, top1_expr_dis: {res_tmp[0][0].distance}")
|
||||
log.info(f"{grpby_field} on {metric} top1_dismatch_num: {dismatch}, results_num: {results_num}, dismatch_rate: {dismatch / results_num}")
|
||||
baseline = 1 if grpby_field == ct.default_bool_field_name else 0.2 # skip baseline check for boolean
|
||||
assert dismatch / results_num <= baseline
|
||||
# verify no dup values of the group_by_field in results
|
||||
assert len(grpby_values) == len(set(grpby_values))
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@pytest.mark.parametrize("grpby_unsupported_field", [ct.default_float_field_name, ct.default_json_field_name,
|
||||
ct.default_double_field_name, ct.default_float_vec_field_name])
|
||||
|
@ -10698,68 +10438,6 @@ class TestSearchGroupBy(TestcaseBase):
|
|||
check_task=CheckTasks.err_res,
|
||||
check_items={"err_code": err_code, "err_msg": err_msg})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
# @pytest.mark.xfail(reason="issue #30828")
|
||||
def test_search_pagination_group_by(self):
|
||||
"""
|
||||
target: test search pagination with group by
|
||||
method: 1. create a collection with data
|
||||
2. create index HNSW
|
||||
3. search with groupby and pagination
|
||||
4. search with groupby and limits=pages*page_rounds
|
||||
verify: search with groupby and pagination returns correct results
|
||||
"""
|
||||
# 1. create a collection
|
||||
metric = "COSINE"
|
||||
collection_w = self.init_collection_general(prefix, auto_id=True, insert_data=False, is_index=False,
|
||||
is_all_data_type=True, with_json=False)[0]
|
||||
# insert with the same values for scalar fields
|
||||
for _ in range(50):
|
||||
data = cf.gen_dataframe_all_data_type(nb=100, auto_id=True, with_json=False)
|
||||
collection_w.insert(data)
|
||||
|
||||
collection_w.flush()
|
||||
_index = {"index_type": "HNSW", "metric_type": metric, "params": {"M": 16, "efConstruction": 128}}
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params=_index)
|
||||
collection_w.load()
|
||||
# 2. search pagination with offset
|
||||
limit = 10
|
||||
page_rounds = 3
|
||||
search_param = {"metric_type": metric}
|
||||
grpby_field = ct.default_string_field_name
|
||||
search_vectors = cf.gen_vectors(1, dim=ct.default_dim)
|
||||
all_pages_ids = []
|
||||
all_pages_grpby_field_values = []
|
||||
for r in range(page_rounds):
|
||||
page_res = collection_w.search(search_vectors, anns_field=default_search_field,
|
||||
param=search_param, limit=limit, offset=limit * r,
|
||||
expr=default_search_exp, group_by_field=grpby_field,
|
||||
output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit},
|
||||
)[0]
|
||||
for j in range(limit):
|
||||
all_pages_grpby_field_values.append(page_res[0][j].get(grpby_field))
|
||||
all_pages_ids += page_res[0].ids
|
||||
hit_rate = round(len(set(all_pages_grpby_field_values)) / len(all_pages_grpby_field_values), 3)
|
||||
assert hit_rate >= 0.8
|
||||
|
||||
total_res = collection_w.search(search_vectors, anns_field=default_search_field,
|
||||
param=search_param, limit=limit * page_rounds,
|
||||
expr=default_search_exp, group_by_field=grpby_field,
|
||||
output_fields=[grpby_field],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": limit * page_rounds}
|
||||
)[0]
|
||||
hit_num = len(set(total_res[0].ids).intersection(set(all_pages_ids)))
|
||||
hit_rate = round(hit_num / (limit * page_rounds), 3)
|
||||
assert hit_rate >= 0.8
|
||||
log.info(f"search pagination with groupby hit_rate: {hit_rate}")
|
||||
grpby_field_values = []
|
||||
for i in range(limit * page_rounds):
|
||||
grpby_field_values.append(total_res[0][i].fields.get(grpby_field))
|
||||
assert len(grpby_field_values) == len(set(grpby_field_values))
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_search_iterator_not_support_group_by(self):
|
||||
"""
|
||||
|
@ -10834,108 +10512,6 @@ class TestSearchGroupBy(TestcaseBase):
|
|||
check_task=CheckTasks.err_res,
|
||||
check_items={"err_code": err_code, "err_msg": err_msg})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_hybrid_search_support_group_by(self):
|
||||
"""
|
||||
target: verify that hybrid search does not support groupby
|
||||
method: 1. create a collection with multiple vector fields
|
||||
2. create index hnsw and load
|
||||
3. hybrid_search with group by
|
||||
verify: the error code and msg
|
||||
"""
|
||||
# 1. initialize collection with data
|
||||
dim = 128
|
||||
supported_index = ["HNSW", "FLAT", "IVF_FLAT", "IVF_SQ8"]
|
||||
metric = ct.default_L0_metric
|
||||
collection_w, _, _, insert_ids, time_stamp = \
|
||||
self.init_collection_general(prefix, True, dim=dim, is_index=False,
|
||||
enable_dynamic_field=False,
|
||||
multiple_dim_array=[dim, dim, dim])[0:5]
|
||||
# 2. extract vector field name
|
||||
vector_name_list = cf.extract_vector_field_name_list(collection_w)
|
||||
vector_name_list.append(ct.default_float_vec_field_name)
|
||||
for i in range(len(vector_name_list)):
|
||||
index = supported_index[i]
|
||||
_index_params = {"index_type": index, "metric_type": metric,
|
||||
"params": cf.get_index_params_params(index)}
|
||||
collection_w.create_index(vector_name_list[i], _index_params)
|
||||
collection_w.load()
|
||||
# 3. prepare search params
|
||||
req_list = []
|
||||
for vector_name in vector_name_list:
|
||||
search_param = {
|
||||
"data": [[random.random() for _ in range(dim)] for _ in range(ct.default_nq)],
|
||||
"anns_field": vector_name,
|
||||
"param": {"metric_type": metric, "offset": 0},
|
||||
"limit": default_limit,
|
||||
"expr": "int64 > 0"}
|
||||
req = AnnSearchRequest(**search_param)
|
||||
req_list.append(req)
|
||||
# 4. hybrid search group by
|
||||
res = collection_w.hybrid_search(req_list, WeightedRanker(0.1, 0.9, 1, 0.2), default_limit,
|
||||
group_by_field=ct.default_string_field_name,
|
||||
output_fields=[ct.default_string_field_name],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": ct.default_nq, "limit": default_limit})[0]
|
||||
print(res)
|
||||
for i in range(ct.default_nq):
|
||||
group_values = []
|
||||
for l in range(ct.default_limit):
|
||||
group_values.append(res[i][l].fields.get(ct.default_string_field_name))
|
||||
assert len(group_values) == len(set(group_values))
|
||||
|
||||
# 5. hybrid search with RRFRanker on one vector field with group by
|
||||
req_list = []
|
||||
for vector_name in vector_name_list[:1]:
|
||||
search_param = {
|
||||
"data": [[random.random() for _ in range(dim)] for _ in range(1)],
|
||||
"anns_field": vector_name,
|
||||
"param": {"metric_type": metric, "offset": 0},
|
||||
"limit": default_limit,
|
||||
"expr": "int64 > 0"}
|
||||
req = AnnSearchRequest(**search_param)
|
||||
req_list.append(req)
|
||||
collection_w.hybrid_search(req_list, RRFRanker(), default_limit,
|
||||
group_by_field=ct.default_string_field_name,
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": default_limit})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_multi_vectors_search_one_vector_group_by(self):
|
||||
"""
|
||||
target: test search group by works on a collection with multi vectors
|
||||
method: 1. create a collection with multiple vector fields
|
||||
2. create index hnsw and load
|
||||
3. search on the vector with hnsw index with group by
|
||||
verify: search successfully
|
||||
"""
|
||||
dim = 33
|
||||
index_type = "HNSW"
|
||||
metric_type = "COSINE"
|
||||
_index_params = {"index_type": index_type, "metric_type": metric_type,
|
||||
"params": {"M": 16, "efConstruction": 128}}
|
||||
collection_w, _, _, insert_ids, time_stamp = \
|
||||
self.init_collection_general(prefix, True, dim=dim, is_index=False,
|
||||
enable_dynamic_field=False, multiple_dim_array=[dim, dim])[0:5]
|
||||
# 2. extract vector field name
|
||||
vector_name_list = cf.extract_vector_field_name_list(collection_w)
|
||||
vector_name_list.append(ct.default_float_vec_field_name)
|
||||
for vector_name in vector_name_list:
|
||||
collection_w.create_index(vector_name, _index_params)
|
||||
collection_w.load()
|
||||
|
||||
nq = 2
|
||||
limit = 10
|
||||
search_params = {"metric_type": metric_type, "params": {"ef": 32}}
|
||||
for vector_name in vector_name_list:
|
||||
search_vectors = cf.gen_vectors(nq, dim=dim)
|
||||
# verify the results are same if gourp by pk
|
||||
collection_w.search(data=search_vectors, anns_field=vector_name,
|
||||
param=search_params, limit=limit,
|
||||
group_by_field=ct.default_int64_field_name,
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": nq, "limit": limit})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.parametrize("index", ct.all_index_types[9:11])
|
||||
def test_sparse_vectors_group_by(self, index):
|
||||
|
|
Loading…
Reference in New Issue