Add query count(*) test cases (#23387)

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
pull/23402/head
ThreadDao 2023-04-12 16:46:29 +08:00 committed by GitHub
parent 296380d6e6
commit 2c038612ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 705 additions and 44 deletions

View File

@ -16,8 +16,8 @@ default_search_ip_params = {"metric_type": "IP", "params": {"nprobe": 10}}
default_search_binary_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}}
default_index = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
default_binary_index = {"index_type": "BIN_IVF_FLAT", "params": {"nlist": 128}, "metric_type": "JACCARD"}
default_diskann_index ={"index_type": "DISKANN", "metric_type":"L2", "params": {}}
default_diskann_search_params ={"metric_type": "L2", "params": {"search_list": 30}}
default_diskann_index = {"index_type": "DISKANN", "metric_type": "L2", "params": {}}
default_diskann_search_params = {"metric_type": "L2", "params": {"search_list": 30}}
max_top_k = 16384
max_partition_num = 4096 # 256
default_segment_row_limit = 1000
@ -62,6 +62,7 @@ max_compaction_interval = 60 # the max time interval (s) from the last compacti
max_field_num = 64 # Maximum number of fields in a collection
max_name_length = 255 # Maximum length of name for a collection or alias
default_replica_num = 1
default_graceful_time = 5 #
IMAGE_REPOSITORY_MILVUS = "harbor.milvus.io/dockerhub/milvusdb/milvus"
NAMESPACE_CHAOS_TESTING = "chaos-testing"
@ -77,6 +78,7 @@ in_cluster_env = "IN_CLUSTER"
default_flat_index = {"index_type": "FLAT", "params": {}, "metric_type": "L2"}
default_bin_flat_index = {"index_type": "BIN_FLAT", "params": {}, "metric_type": "JACCARD"}
default_count_output = "count(*)"
"""" List of parameters used to pass """
get_invalid_strs = [

View File

@ -2806,6 +2806,29 @@ class TestLoadCollection(TestcaseBase):
check_items={"err_code": 15,
"err_msg": "collection not found, maybe not loaded"})
@pytest.mark.tags(CaseLabel.L3)
def test_count_multi_replicas(self):
"""
target: test count multi replicas
method: 1. load data with multi replicas
2. count
expected: verify count
"""
# create -> insert -> flush
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
collection_w.flush()
# index -> load replicas
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load(replica_number=2)
# count
collection_w.query(expr=f'{ct.default_int64_field_name} >= 0', output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={'exp_res': [{"count(*)": ct.default_nb}]})
@pytest.mark.tags(CaseLabel.L1)
def test_load_collection_without_creating_index(self):
"""

View File

@ -1,9 +1,13 @@
from datetime import datetime
import time
import pytest
import random
import numpy as np
import pandas as pd
from pymilvus import DefaultConfig
import threading
from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTENCY_EVENTUALLY
from base.client_base import TestcaseBase
from common.code_mapping import ConnectionErrorMessage as cem
@ -16,9 +20,11 @@ import utils.util_pymilvus as ut
prefix = "query"
exp_res = "exp_res"
count = "count(*)"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
default_mix_expr = "int64 >= 0 && varchar >= \"0\""
default_invaild_expr = "varchar >= 0"
default_expr = f'{ct.default_int64_field_name} >= 0'
default_invalid_expr = "varchar >= 0"
default_string_term_expr = f'{ct.default_string_field_name} in [\"0\", \"1\"]'
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
binary_index_params = {"index_type": "BIN_IVF_FLAT", "metric_type": "JACCARD", "params": {"nlist": 64}}
@ -867,7 +873,7 @@ class TestQueryParams(TestcaseBase):
# query from empty partition_names
term_expr = f'{ct.default_int64_field_name} in [0, {half}, {ct.default_nb}-1]'
res = [{'int64': 0}, {'int64': half}, {'int64': ct.default_nb-1}]
res = [{'int64': 0}, {'int64': half}, {'int64': ct.default_nb - 1}]
collection_w.query(term_expr, partition_names=[], check_task=CheckTasks.check_query_results,
check_items={exp_res: res})
@ -897,7 +903,7 @@ class TestQueryParams(TestcaseBase):
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
partition_names = cf.gen_unique_str()
error = {ct.err_code: 1, ct.err_msg: f'PartitonName: {partition_names} not found'}
error = {ct.err_code: 1, ct.err_msg: f'PartitionName: {partition_names} not found'}
collection_w.query(default_term_expr, partition_names=[partition_names],
check_task=CheckTasks.err_res, check_items=error)
@ -985,14 +991,14 @@ class TestQueryParams(TestcaseBase):
collection_w, vectors = self.init_collection_general(prefix, insert_data=True)[0:2]
int_values = vectors[0][ct.default_int64_field_name].values.tolist()
pos = 10
term_expr = f'{ct.default_int64_field_name} in {int_values[offset: pos+offset]}'
res = vectors[0].iloc[offset:pos+offset, :1].to_dict('records')
term_expr = f'{ct.default_int64_field_name} in {int_values[offset: pos + offset]}'
res = vectors[0].iloc[offset:pos + offset, :1].to_dict('records')
query_params = {"offset": offset, "limit": 10}
query_res = collection_w.query(term_expr, params=query_params,
check_task=CheckTasks.check_query_results,
check_items={exp_res: res})[0]
key_res = [item[key] for item in query_res for key in item]
assert key_res == int_values[offset: pos+offset]
assert key_res == int_values[offset: pos + offset]
@pytest.mark.tags(CaseLabel.L1)
def test_query_binary_pagination(self, offset):
@ -1007,8 +1013,8 @@ class TestQueryParams(TestcaseBase):
is_binary=True)[0:2]
int_values = vectors[0][ct.default_int64_field_name].values.tolist()
pos = 10
term_expr = f'{ct.default_int64_field_name} in {int_values[offset: pos+offset]}'
res = vectors[0].iloc[offset:pos+offset, :1].to_dict('records')
term_expr = f'{ct.default_int64_field_name} in {int_values[offset: pos + offset]}'
res = vectors[0].iloc[offset:pos + offset, :1].to_dict('records')
query_params = {"offset": offset, "limit": 10}
query_res = collection_w.query(term_expr, params=query_params,
check_task=CheckTasks.check_query_results,
@ -1131,7 +1137,7 @@ class TestQueryParams(TestcaseBase):
collection_w, vectors = self.init_collection_general(prefix, insert_data=True)[0:2]
int_values = vectors[0][ct.default_int64_field_name].values.tolist()
pos = 10
term_expr = f'{ct.default_int64_field_name} in {int_values[10: pos+10]}'
term_expr = f'{ct.default_int64_field_name} in {int_values[10: pos + 10]}'
collection_w.query(term_expr, offset=10, limit=limit,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
@ -1380,7 +1386,8 @@ class TestQueryOperation(TestcaseBase):
3. query
expected: query result is correct
"""
collection_w, vectors, binary_raw_vectors = self.init_collection_general(prefix, insert_data=True, is_index=False)[0:3]
collection_w, vectors, binary_raw_vectors = self.init_collection_general(prefix, insert_data=True,
is_index=False)[0:3]
default_field_name = ct.default_float_vec_field_name
collection_w.create_index(default_field_name, default_index_params)
@ -1447,7 +1454,8 @@ class TestQueryOperation(TestcaseBase):
method: create index and specify vec field as output field
expected: return primary field and vec field
"""
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, is_binary=True, is_index=False)[0:2]
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, is_binary=True, is_index=False)[
0:2]
fields = [ct.default_int64_field_name, ct.default_binary_vec_field_name]
collection_w.create_index(ct.default_binary_vec_field_name, binary_index_params)
assert collection_w.has_index()[0]
@ -1547,7 +1555,6 @@ class TestQueryOperation(TestcaseBase):
4.query
expected: Data can be queried
"""
import time
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
# load collection
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
@ -1562,7 +1569,7 @@ class TestQueryOperation(TestcaseBase):
check_task=CheckTasks.check_query_results, check_items={exp_res: res})
class TestqueryString(TestcaseBase):
class TestQueryString(TestcaseBase):
"""
******************************************************************
The following cases are used to test query with string
@ -1593,7 +1600,8 @@ class TestqueryString(TestcaseBase):
method: specify string primary field as output field
expected: return string primary field
"""
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name)[0:2]
collection_w, vectors = self.init_collection_general(prefix, insert_data=True,
primary_field=ct.default_string_field_name)[0:2]
res, _ = collection_w.query(expression, output_fields=[ct.default_string_field_name])
assert res[0].keys() == {ct.default_string_field_name}
@ -1605,11 +1613,12 @@ class TestqueryString(TestcaseBase):
query with mix expr in string field and int field
expected: query successfully
"""
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name)[0:2]
collection_w, vectors = self.init_collection_general(prefix, insert_data=True,
primary_field=ct.default_string_field_name)[0:2]
res = vectors[0].iloc[:, 1:3].to_dict('records')
output_fields = [default_float_field_name, default_string_field_name]
collection_w.query(default_mix_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_results, check_items={exp_res: res})
check_task=CheckTasks.check_query_results, check_items={exp_res: res})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("expression", cf.gen_invaild_string_expressions())
@ -1631,7 +1640,8 @@ class TestqueryString(TestcaseBase):
method: query string expr with binary
expected: verify query successfully
"""
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, is_binary=True, is_index=False)[0:2]
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, is_binary=True, is_index=False)[
0:2]
collection_w.create_index(ct.default_binary_vec_field_name, binary_index_params)
collection_w.load()
assert collection_w.has_index()[0]
@ -1645,7 +1655,8 @@ class TestqueryString(TestcaseBase):
method: specify string is primary field, use prefix string expr
expected: verify query successfully
"""
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name)[0:2]
collection_w, vectors = self.init_collection_general(prefix, insert_data=True,
primary_field=ct.default_string_field_name)[0:2]
res = vectors[0].iloc[:1, :3].to_dict('records')
expression = 'varchar like "0%"'
output_fields = [default_int_field_name, default_float_field_name, default_string_field_name]
@ -1672,7 +1683,8 @@ class TestqueryString(TestcaseBase):
method: specify string primary field, compare two fields
expected: verify query successfully
"""
collection_w = self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name)[0]
collection_w = \
self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name)[0]
res = []
expression = 'float > int64'
output_fields = [default_int_field_name, default_float_field_name, default_string_field_name]
@ -1686,7 +1698,8 @@ class TestqueryString(TestcaseBase):
method: specify string primary field, compare string and int field
expected: raise error
"""
collection_w = self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name)[0]
collection_w = \
self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name)[0]
expression = 'varchar == int64'
collection_w.query(expression, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: f' cannot parse expression:{expression}'})
@ -1704,9 +1717,9 @@ class TestqueryString(TestcaseBase):
primary_keys = []
df_list = []
#prepare original data for parallel insert
# prepare original data for parallel insert
for i in range(thread_num):
df = cf.gen_default_dataframe_data(ct.default_nb, start=i*ct.default_nb)
df = cf.gen_default_dataframe_data(ct.default_nb, start=i * ct.default_nb)
df_list.append(df)
primary_key = df[ct.default_int64_field_name].values.tolist()
primary_keys.append(primary_key)
@ -1725,7 +1738,7 @@ class TestqueryString(TestcaseBase):
t.join()
assert collection_w.num_entities == ct.default_nb * thread_num
#Check data consistency after parallel insert
# Check data consistency after parallel insert
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
df_dict_list = []
@ -1753,15 +1766,14 @@ class TestqueryString(TestcaseBase):
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), schema=schema)
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
nb = 3000
df = cf.gen_default_list_data(nb)
df[2] = [""for _ in range(nb)]
df[2] = ["" for _ in range(nb)]
collection_w.insert(df)
assert collection_w.num_entities == nb
string_exp = "varchar >= \"\""
output_fields = [default_int_field_name, default_float_field_name, default_string_field_name]
res, _ = collection_w.query(string_exp, output_fields=output_fields)
@ -1779,20 +1791,20 @@ class TestqueryString(TestcaseBase):
"""
# 1. create a collection
collection_w, vectors = self.init_collection_general(prefix, insert_data=False, is_index=False)[0:2]
nb = 3000
df = cf.gen_default_list_data(nb)
df[2] = [""for _ in range(nb)]
df[2] = ["" for _ in range(nb)]
collection_w.insert(df)
assert collection_w.num_entities == nb
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
assert collection_w.has_index()[0]
collection_w.load()
output_fields = [default_int_field_name, default_float_field_name, default_string_field_name]
expr = "varchar == \"\""
res, _ = collection_w.query(expr, output_fields=output_fields)
@ -1806,7 +1818,7 @@ class TestqueryString(TestcaseBase):
expected: verify query result
"""
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, is_index=False)[0:2]
collection_w.create_index(ct.default_float_vec_field_name, ct.default_diskann_index)
assert collection_w.has_index()[0]
@ -1816,7 +1828,7 @@ class TestqueryString(TestcaseBase):
term_expr = f'{ct.default_int64_field_name} in {int_values}'
check_vec = vectors[0].iloc[:, [0]][0:len(int_values)].to_dict('records')
collection_w.query(term_expr, check_task=CheckTasks.check_query_results, check_items={exp_res: check_vec})
@pytest.mark.tags(CaseLabel.L2)
def test_query_with_create_diskann_with_string_pk(self):
"""
@ -1824,14 +1836,16 @@ class TestqueryString(TestcaseBase):
method: create a collection with string pk and build diskann index
expected: verify query result
"""
collection_w, vectors = self.init_collection_general(prefix, insert_data=True, primary_field=ct.default_string_field_name, is_index=False)[0:2]
collection_w, vectors = self.init_collection_general(prefix, insert_data=True,
primary_field=ct.default_string_field_name,
is_index=False)[0:2]
collection_w.create_index(ct.default_float_vec_field_name, ct.default_diskann_index)
assert collection_w.has_index()[0]
collection_w.load()
res = vectors[0].iloc[:, 1:3].to_dict('records')
output_fields = [default_float_field_name, default_string_field_name]
collection_w.query(default_mix_expr, output_fields=output_fields,
check_task=CheckTasks.check_query_results, check_items={exp_res: res})
check_task=CheckTasks.check_query_results, check_items={exp_res: res})
@pytest.mark.tags(CaseLabel.L1)
def test_query_with_scalar_field(self):
@ -1844,25 +1858,647 @@ class TestqueryString(TestcaseBase):
"""
# 1. create a collection
collection_w, vectors = self.init_collection_general(prefix, insert_data=False, is_index=False)[0:2]
nb = 3000
df = cf.gen_default_list_data(nb)
df[2] = [""for _ in range(nb)]
df[2] = ["" for _ in range(nb)]
collection_w.insert(df)
assert collection_w.num_entities == nb
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
assert collection_w.has_index()[0]
index_params = {}
collection_w.create_index(ct.default_int64_field_name, index_params=index_params)
collection_w.load()
output_fields = [default_int_field_name, default_float_field_name]
expr = "int64 in [2,4,6,8]"
res, _ = collection_w.query(expr, output_fields=output_fields)
assert len(res) == 4
class TestQueryCount(TestcaseBase):
"""
test query count(*)
"""
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("consistency_level", [CONSISTENCY_BOUNDED, CONSISTENCY_STRONG, CONSISTENCY_EVENTUALLY])
def test_count_consistency_level(self, consistency_level):
"""
target: test count(*) with bounded level
method: 1. create collection with different consistency level
2. load collection
3. insert and count
4. verify count
expected: expected count
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), consistency_level=consistency_level)
# load collection
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
if consistency_level == CONSISTENCY_BOUNDED:
time.sleep(ct.default_graceful_time)
elif consistency_level == CONSISTENCY_STRONG:
pass
elif consistency_level == CONSISTENCY_EVENTUALLY:
time.sleep(ct.default_graceful_time)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb}]})
@pytest.mark.tags(CaseLabel.L2)
def test_count_travel_timestamp(self):
"""
target: test count with travel_timestamp
method: count with travel_timestamp
expected: verify count
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
# load collection
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# insert
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
collection_w.delete(default_term_expr)
# query count with travel_timestamp
collection_w.query(expr=default_term_expr, output_fields=[ct.default_count_output],
travel_timestamp=insert_res.timestamp,
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: 2}]}
)
collection_w.query(expr=default_term_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: 0}]}
)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("invalid_output_field", ["count", "count(int64)", "count(**)"])
def test_count_invalid_output_field(self, invalid_output_field):
"""
target: test count with invalid
method:
expected:
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
# load collection
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# insert
df = cf.gen_default_dataframe_data(nb=2)
insert_res, _ = collection_w.insert(df)
collection_w.query(expr=default_term_expr, output_fields=[invalid_output_field],
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": f"field {invalid_output_field} not exist"})
@pytest.mark.tags(CaseLabel.L2)
def test_count_without_loading(self):
"""
target: test count without loading
method: count without loading
expected: exception
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
collection_w.query(expr=default_term_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": f"has not been loaded to memory or load failed"})
@pytest.mark.tags(CaseLabel.L1)
def test_count_duplicate_ids(self):
"""
target: test count duplicate ids
method: 1. insert duplicate ids
2. count
3. delete duplicate ids
4. count
expected: verify count
"""
# create
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# insert duplicate ids
tmp_nb = 100
df = cf.gen_default_dataframe_data(tmp_nb)
df[ct.default_int64_field_name] = 0
collection_w.insert(df)
# query count
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb}]}
)
# delete and verify count
collection_w.delete(default_term_expr)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: 0}]}
)
@pytest.mark.tags(CaseLabel.L1)
def test_count_multi_partitions(self):
"""
target: test count multi partitions
method: 1. init partitions: p1, _default
2. count p1, _default, [p1, _default]
3. delete _default entities and count _default, [p1, _default]
4. drop p1 and count p1, [p1, _default]
expected: verify count
"""
half = ct.default_nb // 2
# insert [0, half) into partition_w, [half, nb) into _default
collection_w, p1, _, _ = self.insert_entities_into_two_partitions_in_half(half=half)
# query count p1, [p1, _default]
for p_name in [p1.name, ct.default_partition_name]:
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output], partition_names=[p_name],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: half}]})
# delete entities from _default
delete_expr = f"{ct.default_int64_field_name} in {[i for i in range(half, ct.default_nb)]} "
collection_w.delete(expr=delete_expr)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
partition_names=[ct.default_partition_name],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: 0}]}
)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
partition_names=[p1.name, ct.default_partition_name],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: half}]}
)
# drop p1 partition
p1.drop()
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
partition_names=[p1.name],
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": f'partition name: {p1.name} not found'}
)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
partition_names=[ct.default_partition_name],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: 0}]}
)
@pytest.mark.tags(CaseLabel.L2)
def test_count_partition_duplicate(self):
"""
target: test count from partitions which have duplicate ids
method: 1. insert same ids into 2 partitions
2. count
3. delete some ids and count
expected: verify count
"""
# init partitions: _default and p1
p1 = "p1"
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
collection_w.create_partition(p1)
df = cf.gen_default_dataframe_data()
collection_w.insert(df, partition_name=ct.default_partition_name)
collection_w.insert(df, partition_name=p1)
# index and load
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# count
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb * 2}]}
)
# delete some duplicate ids
delete_res, _ = collection_w.delete(default_term_expr)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
partition_names=[p1],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb - delete_res.delete_count}]}
)
@pytest.mark.tags(CaseLabel.L1)
def test_count_growing_sealed_segment(self):
"""
target: test count growing and sealed segment
method: 1. insert -> index -> load
2. count
3. new insert
4. count
expected: verify count
"""
tmp_nb = 100
# create -> insert -> index -> load -> count sealed
collection_w = self.init_collection_general(insert_data=True, nb=tmp_nb)[0]
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb}]}
)
# new insert and growing count
df = cf.gen_default_dataframe_data(nb=tmp_nb, start=tmp_nb)
collection_w.insert(df)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb * 2}]})
@pytest.mark.tags(CaseLabel.L2)
def test_count_during_handoff(self):
"""
target: test count during handoff
method: 1. index -> load
2. insert
3. flush while count
expected: verify count
"""
# create -> index -> load
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# flush while count
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
t_flush = threading.Thread(target=collection_w.flush, args=())
t_count = threading.Thread(target=collection_w.query, args=(default_expr,),
kwargs={
"output_fields": [ct.default_count_output],
"check_task": CheckTasks.check_query_results,
"check_items": {exp_res: [{count: ct.default_nb}]}
})
t_flush.start()
t_count.start()
t_flush.join()
t_count.join()
@pytest.mark.tags(CaseLabel.L1)
def test_count_delete_insert_duplicate_ids(self):
"""
target: test count after delete and re-insert same entities
method: 1. insert and delete
2. count
3. re-insert deleted ids with different vectors
4. count
expected: verify count
"""
tmp_nb = 100
# create -> insert ids [0, default_nb + tmp) -> index -> load
collection_w = self.init_collection_general(insert_data=True)[0]
df = cf.gen_default_dataframe_data(nb=tmp_nb, start=ct.default_nb)
insert_res, _ = collection_w.insert(df)
# delete growing and sealed ids -> count
collection_w.delete(f"{ct.default_int64_field_name} in {[i for i in range(ct.default_nb)]}")
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb}]}
)
# re-insert deleted ids [0, default_nb) with different vectors
df_same = cf.gen_default_dataframe_data()
collection_w.insert(df_same)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb + tmp_nb}]}
)
@pytest.mark.tags(CaseLabel.L1)
def test_count_compact_merge(self):
"""
target: test count after compact merge segments
method: 1. init 2 segments with same channel
2. compact
3. count
expected: verify count
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
# init compact_segment_num_threshold segments
tmp_nb = 100
for i in range(ct.compact_segment_num_threshold):
df = cf.gen_default_dataframe_data(nb=tmp_nb, start=i * tmp_nb)
collection_w.insert(df)
collection_w.flush()
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
collection_w.compact()
collection_w.wait_for_compaction_completed()
collection_w.load()
segment_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name)
assert len(segment_info) == 1
# count after compact
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb * ct.compact_segment_num_threshold}]})
@pytest.mark.tags(CaseLabel.L2)
def test_count_compact_delete(self):
"""
target: test count after delete-compact
method: 1. init segments
2. delete half ids and compact
3. count
expected: verify count
"""
# create -> index -> insert
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), shards_num=1)
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
# delete half entities, flush
half_expr = f'{ct.default_int64_field_name} in {[i for i in range(ct.default_nb // 2)]}'
collection_w.delete(half_expr)
assert collection_w.num_entities == ct.default_nb
# compact
collection_w.compact()
collection_w.wait_for_compaction_completed()
# load and count
collection_w.load()
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb // 2}]}
)
@pytest.mark.tags(CaseLabel.L2)
def test_count_during_compact(self):
"""
target: test count during compact merge many small segments
method: 1. init many small segments
2. compact while count
expected: verify count
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
# init 2 segments
tmp_nb = 100
for i in range(10):
df = cf.gen_default_dataframe_data(tmp_nb, start=i * tmp_nb)
collection_w.insert(df)
collection_w.flush()
# compact while count
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
collection_w.load()
t_compact = threading.Thread(target=collection_w.compact, args=())
t_count = threading.Thread(target=collection_w.query, args=(default_expr,),
kwargs={
"output_fields": [ct.default_count_output],
"check_task": CheckTasks.check_query_results,
"check_items": {exp_res: [{count: tmp_nb * 10}]}
})
t_compact.start()
t_count.start()
t_count.join()
t_count.join()
@pytest.mark.tags(CaseLabel.L0)
def test_count_with_expr(self):
"""
target: test count with expr
method: count with expr
expected: verify count
"""
# create -> insert -> index -> load
collection_w = self.init_collection_general(insert_data=True)[0]
# count with expr
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb}]}
)
collection_w.query(expr=default_term_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: 2}]})
# TODO count(*) with page
@pytest.mark.skip(reason="https://github.com/milvus-io/milvus/issues/23368")
@pytest.mark.tags(CaseLabel.L2)
def test_count_with_pagination_param(self):
"""
target: test count with pagination params
method: count with pagination params: offset, limit
expected: exception
"""
# create -> insert -> index -> load
collection_w = self.init_collection_general(insert_data=True)[0]
# count with offset
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output], offset=10,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "xxx"}
)
# count with limit
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output], limit=10,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "xxx"}
)
# count with pagination params
collection_w.query(default_expr, output_fields=[ct.default_count_output], params={"offset": 10, "limit": 10},
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "xxx"})
@pytest.mark.skip(reason="https://github.com/milvus-io/milvus/issues/23386")
@pytest.mark.tags(CaseLabel.L2)
def test_count_alias_insert_delete_drop(self):
"""
target: test count after alias insert and load
method: 1. init collection
2. alias insert more entities
3. count and alias count
expected: verify count
"""
# create -> insert -> index -> load
collection_w = self.init_collection_general(insert_data=True)[0]
# create alias
alias = cf.gen_unique_str("alias")
self.utility_wrap.create_alias(collection_w.name, alias)
collection_w_alias = self.init_collection_wrap(name=alias)
# new insert partitions and count
p_name = "p_alias"
collection_w_alias.create_partition(p_name)
collection_w_alias.insert(cf.gen_default_dataframe_data(start=ct.default_nb), partition_name=p_name)
collection_w_alias.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb * 2}]})
# alias drop partition
collection_w_alias.drop_partition(p_name)
res, _ = collection_w_alias.has_partition(p_name)
assert res is False
collection_w_alias.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb}]})
# alias delete and count
collection_w_alias.delete(f"{ct.default_int64_field_name} in {[i for i in range(ct.default_nb)]}")
collection_w_alias.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: 0}]})
collection_w_alias.drop()
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("is_growing", [True, False])
def test_count_upsert_growing_sealed(self, is_growing):
"""
target: test count after upsert growing
method: 1. create -> index -> load -> insert -> delete
2. upsert deleted id and count (+1)
3. upsert new id and count (+1)
4. upsert existed id and count (+0)
expected: verify count
"""
if is_growing:
# create -> index -> load -> insert -> delete
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
collection_w.insert(cf.gen_default_dataframe_data())
# delete one entity
single_expr = f'{ct.default_int64_field_name} in [0]'
collection_w.delete(single_expr)
else:
# create -> insert -> delete -> index -> load
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
collection_w.insert(cf.gen_default_dataframe_data())
# delete one entity
single_expr = f'{ct.default_int64_field_name} in [0]'
collection_w.delete(single_expr)
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# upsert deleted id
df_zero = cf.gen_default_dataframe_data(nb=1)
collection_w.upsert(df_zero)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb}]})
# upsert new id and count
df_new = cf.gen_default_dataframe_data(nb=1, start=ct.default_nb)
collection_w.upsert(df_new)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb + 1}]})
# upsert existed id and count
df_existed = cf.gen_default_dataframe_data(nb=1, start=10)
collection_w.upsert(df_existed)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb + 1}]})
@pytest.mark.tags(CaseLabel.L1)
def test_count_upsert_duplicate(self):
"""
target: test count after upsert duplicate
method: 1. insert many duplicate ids
2. upsert id and count
3. delete id and count
4. upsert deleted id and count
expected: verify count
"""
# init collection and insert same ids
tmp_nb = 100
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data(nb=tmp_nb)
df[ct.default_int64_field_name] = 0
collection_w.insert(df)
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# upsert id and count
df_existed = cf.gen_default_dataframe_data(nb=tmp_nb, start=0)
collection_w.upsert(df_existed)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb}]}
)
# delete id and count
delete_res, _ = collection_w.delete(default_term_expr)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb - delete_res.delete_count}]})
# upsert deleted id and count
df_deleted = cf.gen_default_dataframe_data(nb=delete_res.delete_count, start=0)
collection_w.upsert(df_deleted)
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: tmp_nb}]})
@pytest.mark.tags(CaseLabel.L1)
def test_count_rename_collection(self):
"""
target: test count after rename collection
method: 1. create -> insert -> index -> load
2. rename collection
3. count
expected: verify count
"""
# create -> insert -> index -> load
collection_w = self.init_collection_general(insert_data=True)[0]
new_name = cf.gen_unique_str("new_name")
self.utility_wrap.rename_collection(collection_w.name, new_name)
self.collection_wrap.init_collection(new_name)
self.collection_wrap.query(expr=default_expr, output_fields=[ct.default_count_output],
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: ct.default_nb}]})
@pytest.mark.tags(CaseLabel.L1)
def test_count_disable_growing_segments(self):
"""
target: test count when disable growing segments
method: 1. create -> index -> load -> insert
2. query count with ignore_growing
expected: verify count 0
"""
# create -> index -> load
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# insert
collection_w.insert(cf.gen_default_dataframe_data(nb=100))
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output], ignore_growing=True,
check_task=CheckTasks.check_query_results,
check_items={exp_res: [{count: 0}]})