mirror of https://github.com/milvus-io/milvus.git
Add case to test search with db (#26596)
Signed-off-by: ThreadDao <yufen.zong@zilliz.com>pull/26668/head
parent
dc88ef0399
commit
69495bd23d
|
@ -1,3 +1,4 @@
|
||||||
|
import pandas as pd
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from base.client_base import TestcaseBase
|
from base.client_base import TestcaseBase
|
||||||
|
@ -575,7 +576,7 @@ class TestDatabaseOtherApi(TestcaseBase):
|
||||||
user=ct.default_user, password=ct.default_password,
|
user=ct.default_user, password=ct.default_password,
|
||||||
secure=cf.param_info.param_secure,
|
secure=cf.param_info.param_secure,
|
||||||
check_task=CheckTasks.err_res,
|
check_task=CheckTasks.err_res,
|
||||||
check_items={ct.err_code: 2, ct.err_msg: "Fail connecting to server"})
|
check_items={ct.err_code: 1, ct.err_msg: "database not found:"})
|
||||||
|
|
||||||
def test_connect_not_existed_db(self, host, port):
|
def test_connect_not_existed_db(self, host, port):
|
||||||
"""
|
"""
|
||||||
|
@ -628,3 +629,206 @@ class TestDatabaseOtherApi(TestcaseBase):
|
||||||
|
|
||||||
collections_default, _ = self.utility_wrap.list_collections()
|
collections_default, _ = self.utility_wrap.list_collections()
|
||||||
assert collection_w.name not in collections_default
|
assert collection_w.name not in collections_default
|
||||||
|
|
||||||
|
def test_connect_after_using_db(self):
|
||||||
|
"""
|
||||||
|
target: test connect after using db
|
||||||
|
method: 1. connect
|
||||||
|
2. create a db and using db
|
||||||
|
3. create collection in the db
|
||||||
|
3. connect
|
||||||
|
4. list collections
|
||||||
|
expected: current db is connect params db, if None db is default
|
||||||
|
"""
|
||||||
|
# create db
|
||||||
|
self._connect()
|
||||||
|
db_name = cf.gen_unique_str(prefix)
|
||||||
|
self.database_wrap.create_database(db_name)
|
||||||
|
self.database_wrap.using_database(db_name)
|
||||||
|
|
||||||
|
# create collection
|
||||||
|
self.collection_wrap.init_collection(name=cf.gen_unique_str(prefix), schema=cf.gen_default_collection_schema())
|
||||||
|
|
||||||
|
# connect again
|
||||||
|
self._connect()
|
||||||
|
collections_default, _ = self.utility_wrap.list_collections()
|
||||||
|
assert self.collection_wrap.name not in collections_default
|
||||||
|
|
||||||
|
def test_search_db(self):
|
||||||
|
"""
|
||||||
|
target: test search with db
|
||||||
|
method: 1. create collection in a db
|
||||||
|
2. search with expr on some partitions
|
||||||
|
3. search with output_fields
|
||||||
|
4. search output vector field and ignore growing
|
||||||
|
5. search with pagination
|
||||||
|
6. range search (filter with radius)
|
||||||
|
7. search iterator
|
||||||
|
expected: no error
|
||||||
|
"""
|
||||||
|
# prepare data:
|
||||||
|
# 1. create collection with pk_field + vector_field, enable dynamic field
|
||||||
|
# 2. insert [0, nb) into default partition and flush
|
||||||
|
# 3. create index and load
|
||||||
|
# 4. insert data with dynamic extra field into new partition, pk from [nb, 2*nb)
|
||||||
|
_, partition_name = self.prepare_data_for_db_search()
|
||||||
|
|
||||||
|
query_vec = cf.gen_vectors(ct.default_nq, ct.default_dim)
|
||||||
|
|
||||||
|
# search with dynamic field expr and from partition
|
||||||
|
self.collection_wrap.search(data=query_vec, anns_field=ct.default_float_vec_field_name,
|
||||||
|
param=ct.default_search_params, limit=ct.default_limit,
|
||||||
|
expr=f'{ct.default_int64_field_name} < 2800 or {ct.default_int8_field_name} > 500',
|
||||||
|
partition_names=[ct.default_partition_name],
|
||||||
|
check_task=CheckTasks.check_search_results,
|
||||||
|
check_items={"nq": ct.default_nq,
|
||||||
|
"limit": ct.default_limit})
|
||||||
|
|
||||||
|
# search with output pk + dynamic fields
|
||||||
|
ignore_growing_search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}, "ignore_growing": True}
|
||||||
|
search_res, _ = self.collection_wrap.search(data=query_vec, anns_field=ct.default_float_vec_field_name,
|
||||||
|
param=ignore_growing_search_params, limit=ct.default_limit,
|
||||||
|
output_fields=[ct.default_int64_field_name,
|
||||||
|
ct.default_string_field_name],
|
||||||
|
check_task=CheckTasks.check_search_results,
|
||||||
|
check_items={"nq": ct.default_nq,
|
||||||
|
"limit": ct.default_limit})
|
||||||
|
assert ct.default_int64_field_name in set(search_res[0][0].entity.fields)
|
||||||
|
|
||||||
|
# search with output vector fields and ignore growing
|
||||||
|
ignore_growing_search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}, "ignore_growing": False}
|
||||||
|
self.collection_wrap.search(data=query_vec, anns_field=ct.default_float_vec_field_name,
|
||||||
|
param=ignore_growing_search_params, limit=ct.default_limit,
|
||||||
|
output_fields=[ct.default_int64_field_name,
|
||||||
|
ct.default_float_vec_field_name],
|
||||||
|
check_task=CheckTasks.check_search_results,
|
||||||
|
check_items={"nq": ct.default_nq,
|
||||||
|
"limit": ct.default_limit,
|
||||||
|
"output_fields": [ct.default_int64_field_name,
|
||||||
|
ct.default_float_vec_field_name]})
|
||||||
|
|
||||||
|
# search with pagination
|
||||||
|
self.collection_wrap.search(data=query_vec, anns_field=ct.default_float_vec_field_name,
|
||||||
|
param=ct.default_search_params, limit=ct.default_limit, offset=ct.default_limit,
|
||||||
|
check_task=CheckTasks.check_search_results,
|
||||||
|
check_items={"nq": ct.default_nq,
|
||||||
|
"limit": ct.default_limit})
|
||||||
|
|
||||||
|
# range search
|
||||||
|
range_search_params = {"metric_type": "COSINE", "params": {"radius": 0.0,
|
||||||
|
"range_filter": 1000}}
|
||||||
|
self.collection_wrap.search(query_vec, ct.default_float_vec_field_name,
|
||||||
|
range_search_params, ct.default_limit,
|
||||||
|
expr=None,
|
||||||
|
check_task=CheckTasks.check_search_results,
|
||||||
|
check_items={"nq": ct.default_nq,
|
||||||
|
"limit": ct.default_limit})
|
||||||
|
|
||||||
|
# search iterator
|
||||||
|
self.collection_wrap.search_iterator(query_vec[:1], ct.default_float_vec_field_name, ct.default_search_params,
|
||||||
|
ct.default_limit * 100, partition_names=[partition_name],
|
||||||
|
check_task=CheckTasks.check_search_iterator,
|
||||||
|
check_items={"limit": ct.default_limit * 100})
|
||||||
|
|
||||||
|
def test_query_db(self):
|
||||||
|
"""
|
||||||
|
target: test search with db
|
||||||
|
method: 1. create collection in a db
|
||||||
|
2. query from partitions
|
||||||
|
3. query output fields: pk + dynamic
|
||||||
|
4. query output vector field and ignore growing
|
||||||
|
5. query with pagination
|
||||||
|
6. query iterator
|
||||||
|
expected: no error
|
||||||
|
"""
|
||||||
|
# prepare data:
|
||||||
|
# 1. create collection with pk_field + vector_field, enable dynamic field
|
||||||
|
# 2. insert [0, nb) into default partition and flush
|
||||||
|
# 3. create index and load
|
||||||
|
# 4. insert data with dynamic extra field into new partition, pk from [nb, 2*nb)
|
||||||
|
_, partition_name = self.prepare_data_for_db_search()
|
||||||
|
|
||||||
|
# query from partition
|
||||||
|
query_expr = f'{ct.default_int64_field_name} in [0, {ct.default_nb}]'
|
||||||
|
res, _ = self.collection_wrap.query(query_expr, partition_names=[partition_name])
|
||||||
|
assert len(res) == 1
|
||||||
|
|
||||||
|
# query output pk + dynamic fields
|
||||||
|
res_dynamic, _ = self.collection_wrap.query(query_expr, output_fields=[ct.default_int64_field_name,
|
||||||
|
ct.default_string_field_name])
|
||||||
|
assert ct.default_int64_field_name in res_dynamic[0].keys()
|
||||||
|
|
||||||
|
# query output vector field
|
||||||
|
vec_res, _ = self.collection_wrap.query(query_expr, output_fields=[ct.default_float_vec_field_name])
|
||||||
|
assert set(vec_res[0].keys()) == {ct.default_float_vec_field_name, ct.default_int64_field_name}
|
||||||
|
|
||||||
|
# query with pagination
|
||||||
|
expr = f'1000 <= {ct.default_int64_field_name} < 4000 '
|
||||||
|
page_res, _ = self.collection_wrap.query(expr, offset=1000, limit=1000)
|
||||||
|
assert len(page_res) == 1000
|
||||||
|
|
||||||
|
# delte and query
|
||||||
|
del_expr = f'{ct.default_int64_field_name} in [0, {ct.default_nb}]'
|
||||||
|
self.collection_wrap.delete(del_expr)
|
||||||
|
self.collection_wrap.query(del_expr, check_task=CheckTasks.check_query_empty)
|
||||||
|
|
||||||
|
# upsert and query
|
||||||
|
# TODO https://github.com/milvus-io/milvus/issues/26595
|
||||||
|
# upsert_data = cf.gen_default_rows_data(start=0, nb=1, with_json=False)
|
||||||
|
# upsert_df= pd.DataFrame({
|
||||||
|
# ct.default_int64_field_name: pd.Series(data=[0]),
|
||||||
|
# ct.default_float_vec_field_name: cf.gen_vectors(1, ct.default_dim)
|
||||||
|
# })
|
||||||
|
# self.collection_wrap.upsert(data=upsert_df)
|
||||||
|
# upsert_entity, _ = self.collection_wrap.query(del_expr, output_fields=[ct.default_string_field_name])
|
||||||
|
# assert set(vec_res[0].keys()) == {ct.default_int64_field_name}
|
||||||
|
|
||||||
|
# query iterator
|
||||||
|
self.collection_wrap.query_iterator(f"{ct.default_int64_field_name} <= 3000", limit=ct.default_limit * 10,
|
||||||
|
partition_names=[partition_name],
|
||||||
|
check_task=CheckTasks.check_query_iterator,
|
||||||
|
check_items={"count": 1000,
|
||||||
|
"limit": ct.default_limit * 10})
|
||||||
|
|
||||||
|
def prepare_data_for_db_search(self):
|
||||||
|
"""
|
||||||
|
prepare data in db collection
|
||||||
|
:return:
|
||||||
|
:rtype:
|
||||||
|
"""
|
||||||
|
self._connect()
|
||||||
|
|
||||||
|
# create a db
|
||||||
|
db_name = cf.gen_unique_str("a")
|
||||||
|
self.database_wrap.create_database(db_name)
|
||||||
|
|
||||||
|
# using db
|
||||||
|
self.database_wrap.using_database(db_name)
|
||||||
|
|
||||||
|
# create collection and a partition
|
||||||
|
partition_name = "p1"
|
||||||
|
self.collection_wrap.init_collection(name=cf.gen_unique_str(prefix),
|
||||||
|
schema=cf.gen_default_collection_schema(enable_dynamic_field=True))
|
||||||
|
self.partition_wrap.init_partition(self.collection_wrap.collection, partition_name)
|
||||||
|
|
||||||
|
# insert data into collection
|
||||||
|
df = pd.DataFrame({
|
||||||
|
ct.default_int64_field_name: pd.Series(data=[i for i in range(ct.default_nb)]),
|
||||||
|
ct.default_float_vec_field_name: cf.gen_vectors(ct.default_nb, ct.default_dim)
|
||||||
|
})
|
||||||
|
self.collection_wrap.insert(df)
|
||||||
|
self.collection_wrap.flush()
|
||||||
|
|
||||||
|
# create index with COSINE metrics
|
||||||
|
_index = {"index_type": "HNSW", "metric_type": "COSINE", "params": {"M": 8, "efConstruction": 200}}
|
||||||
|
self.collection_wrap.create_index(ct.default_float_vec_field_name, _index)
|
||||||
|
|
||||||
|
# load collection
|
||||||
|
self.collection_wrap.load()
|
||||||
|
|
||||||
|
# insert data into partition with dynamic field
|
||||||
|
data_par = cf.gen_default_rows_data(start=ct.default_nb)
|
||||||
|
log.info(data_par[0].keys())
|
||||||
|
self.collection_wrap.insert(data_par, partition_name=self.partition_wrap.name)
|
||||||
|
|
||||||
|
return db_name, partition_name
|
||||||
|
|
Loading…
Reference in New Issue