mirror of https://github.com/milvus-io/milvus.git
3882 lines
203 KiB
Python
3882 lines
203 KiB
Python
import pytest
|
|
import numpy
|
|
|
|
from base.client_v2_base import TestMilvusClientV2Base
|
|
from utils.util_log import test_log as log
|
|
from common import common_func as cf
|
|
from common import common_type as ct
|
|
from common.common_type import CaseLabel, CheckTasks
|
|
from utils.util_pymilvus import *
|
|
from pymilvus.client.types import LoadState
|
|
|
|
prefix = "client_collection"
|
|
epsilon = ct.epsilon
|
|
default_nb = ct.default_nb
|
|
default_nb_medium = ct.default_nb_medium
|
|
default_nq = ct.default_nq
|
|
default_dim = ct.default_dim
|
|
default_limit = ct.default_limit
|
|
default_search_exp = "id >= 0"
|
|
exp_res = "exp_res"
|
|
default_search_string_exp = "varchar >= \"0\""
|
|
default_search_mix_exp = "int64 >= 0 && varchar >= \"0\""
|
|
default_invaild_string_exp = "varchar >= 0"
|
|
default_json_search_exp = "json_field[\"number\"] >= 0"
|
|
perfix_expr = 'varchar like "0%"'
|
|
default_search_field = ct.default_float_vec_field_name
|
|
default_search_params = ct.default_search_params
|
|
default_primary_key_field_name = "id"
|
|
default_vector_field_name = "vector"
|
|
default_float_field_name = ct.default_float_field_name
|
|
default_bool_field_name = ct.default_bool_field_name
|
|
default_string_field_name = ct.default_string_field_name
|
|
default_int32_array_field_name = ct.default_int32_array_field_name
|
|
default_string_array_field_name = ct.default_string_array_field_name
|
|
|
|
|
|
class TestMilvusClientCollectionInvalid(TestMilvusClientV2Base):
|
|
""" Test case of create collection interface """
|
|
|
|
@pytest.fixture(scope="function", params=[False, True])
|
|
def auto_id(self, request):
|
|
yield request.param
|
|
|
|
@pytest.fixture(scope="function", params=["COSINE", "L2"])
|
|
def metric_type(self, request):
|
|
yield request.param
|
|
|
|
"""
|
|
******************************************************************
|
|
# The following are invalid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("collection_name", ["12-s", "12 s", "(mn)", "中文", "%$#", "español", "عربي", "हिंदी", "Русский"])
|
|
def test_milvus_client_collection_invalid_collection_name(self, collection_name):
|
|
"""
|
|
target: test fast create collection with invalid collection name
|
|
method: create collection with invalid collection
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
# 1. create collection
|
|
if collection_name == "español":
|
|
expected_msg = "collection name can only contain numbers, letters and underscores"
|
|
else:
|
|
expected_msg = "the first character of a collection name must be an underscore or letter"
|
|
|
|
error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {collection_name}. {expected_msg}: invalid parameter"}
|
|
self.create_collection(client, collection_name, default_dim,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_name_over_max_length(self):
|
|
"""
|
|
target: test fast create collection with over max collection name length
|
|
method: create collection with over max collection name length
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
# 1. create collection
|
|
collection_name = "a".join("a" for i in range(256))
|
|
error = {ct.err_code: 1100, ct.err_msg: f"the length of a collection name must be less than 255 characters"}
|
|
self.create_collection(client, collection_name, default_dim,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_name_empty(self):
|
|
"""
|
|
target: test fast create collection name with empty
|
|
method: create collection name with empty
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
# 1. create collection
|
|
collection_name = " "
|
|
error = {ct.err_code: 1100, ct.err_msg: "Invalid collection name"}
|
|
self.create_collection(client, collection_name, default_dim,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("invalid_dim", ct.invalid_dims)
|
|
def test_milvus_client_collection_vector_invalid_dim_default_schema(self, invalid_dim):
|
|
"""
|
|
target: Test collection with invalid vector dimension
|
|
method: Create collection with vector field having invalid dimension
|
|
expected: Raise exception with appropriate error message
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Determine expected error based on invalid dimension type
|
|
if isinstance(invalid_dim, int) and (invalid_dim > 32768):
|
|
expected_msg = f"invalid dimension: {invalid_dim} of field {default_vector_field_name}. float vector dimension should be in range 2 ~ 32768"
|
|
elif isinstance(invalid_dim, int) and (invalid_dim < 2): # range errors: 1, -32
|
|
expected_msg = f"invalid dimension: {invalid_dim}. should be in range 2 ~ 32768"
|
|
elif isinstance(invalid_dim, str): # type conversion errors: "vii", "十六"
|
|
expected_msg = f"wrong type of argument [dimension], expected type: [int], got type: [str]"
|
|
elif isinstance(invalid_dim, float): # type conversion errors: 32.1
|
|
expected_msg = f"wrong type of argument [dimension], expected type: [int], got type: [float]"
|
|
# Try to create collection and expect error
|
|
error = {ct.err_code: 65535, ct.err_msg: expected_msg}
|
|
self.create_collection(client, collection_name, invalid_dim,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.skip(reason="pymilvus issue 1554")
|
|
def test_milvus_client_collection_invalid_primary_field(self):
|
|
"""
|
|
target: test fast create collection name with invalid primary field
|
|
method: create collection name with invalid primary field
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
error = {ct.err_code: 1, ct.err_msg: f"Param id_type must be int or string"}
|
|
self.create_collection(client, collection_name, default_dim, id_type="invalid",
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_string_auto_id(self):
|
|
"""
|
|
target: test creating a collection with string primary key and auto_id but without specifying max_length
|
|
method: attempt to create collection with string primary key and auto_id=True, omitting max_length
|
|
expected: raise exception due to missing max_length for string primary key
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# 1. create collection
|
|
error = {ct.err_code: 65535, ct.err_msg: f"type param(max_length) should be specified for the field(id) "
|
|
f"of collection {collection_name}"}
|
|
self.create_collection(client, collection_name, default_dim, id_type="string", auto_id=True,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("auto_id", [None, 1, "string"])
|
|
def test_collection_auto_id_invalid_types(self, auto_id):
|
|
"""
|
|
target: test collection creation with invalid auto_id types
|
|
method: attempt to create a collection with auto_id set to non-bool values
|
|
expected: raise exception indicating auto_id must be bool
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Attempt to create a collection with invalid auto_id
|
|
error = {ct.err_code: 0, ct.err_msg: "Param auto_id must be bool type"}
|
|
self.create_collection(client, collection_name, default_dim, auto_id=auto_id,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_auto_id_none_in_field(self):
|
|
"""
|
|
target: test collection with auto_id set to None in field definition
|
|
method: try to create a collection with a primary key field where auto_id=None
|
|
expected: raise exception indicating auto_id must be bool
|
|
"""
|
|
client = self._client()
|
|
# Create schema and try to add field with auto_id=None - this should raise exception
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
error = {ct.err_code: 0, ct.err_msg: "Param auto_id must be bool type"}
|
|
self.add_field(schema, ct.default_int64_field_name, DataType.INT64, is_primary=True, auto_id=None,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_multi_fields_auto_id(self):
|
|
"""
|
|
target: test collection auto_id with multi fields (non-primary field with auto_id)
|
|
method: specify auto_id=True for a non-primary int64 field
|
|
expected: raise exception indicating auto_id can only be specified on primary key field
|
|
"""
|
|
client = self._client()
|
|
# Create schema and try to add non-primary field with auto_id=True - this should raise exception
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
# Add primary key field
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True, auto_id=True)
|
|
# Test that adding a non-primary field with auto_id=True raises exception
|
|
error = {ct.err_code: 0, ct.err_msg: "auto_id can only be specified on the primary key field"}
|
|
self.add_field(schema, "int_field", DataType.INT64, auto_id=True,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_auto_id_non_primary_field(self):
|
|
"""
|
|
target: test collection set auto_id in non-primary field
|
|
method: set auto_id=True in non-primary field directly
|
|
expected: raise exception indicating auto_id can only be specified on primary key field
|
|
"""
|
|
client = self._client()
|
|
# Create schema and try to add non-primary field with auto_id=True - this should raise exception
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
# Test that creating a non-primary field with auto_id=True raises exception
|
|
error = {ct.err_code: 999, ct.err_msg: "auto_id can only be specified on the primary key field"}
|
|
self.add_field(schema, ct.default_int64_field_name, DataType.INT64, auto_id=True,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_create_collection_dup_name_different_params(self):
|
|
"""
|
|
target: test create same collection with different parameters
|
|
method: create same collection with different dims, schemas, and primary fields
|
|
expected: raise exception for all different parameter cases
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client, collection_name, default_dim)
|
|
# Test 1: Different dimensions
|
|
error = {ct.err_code: 1, ct.err_msg: f"create duplicate collection with different parameters, "
|
|
f"collection: {collection_name}"}
|
|
self.create_collection(client, collection_name, default_dim + 1,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
# Test 2: Different schemas
|
|
schema_diff = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema_diff.add_field("new_id", DataType.VARCHAR, max_length=64, is_primary=True, auto_id=False)
|
|
schema_diff.add_field("new_vector", DataType.FLOAT_VECTOR, dim=128)
|
|
self.create_collection(client, collection_name, schema=schema_diff,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
# Test 3: Different primary fields
|
|
schema2 = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema2.add_field("id_2", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema2.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
self.create_collection(client, collection_name, schema=schema2,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
# Verify original collection's primary field is unchanged
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"dim": default_dim,
|
|
"id_name": "id"})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("metric_type", [1, " ", "invalid"])
|
|
def test_milvus_client_collection_invalid_metric_type(self, metric_type):
|
|
"""
|
|
target: test create same collection with invalid metric type
|
|
method: create same collection with invalid metric type
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# 1. create collection
|
|
error = {ct.err_code: 1100, ct.err_msg: f"float vector index does not support metric type: {metric_type}: "
|
|
f"invalid parameter[expected=valid index params][actual=invalid index params"}
|
|
self.create_collection(client, collection_name, default_dim, metric_type=metric_type,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.skip(reason="pymilvus issue 1864")
|
|
def test_milvus_client_collection_invalid_schema_field_name(self):
|
|
"""
|
|
target: test create collection with invalid schema field name
|
|
method: create collection with invalid schema field name
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("%$#", DataType.VARCHAR, max_length=64,
|
|
is_primary=True, auto_id=False)
|
|
schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=128)
|
|
# 1. create collection
|
|
error = {ct.err_code: 65535,
|
|
ct.err_msg: "metric type not found or not supported, supported: [L2 IP COSINE HAMMING JACCARD]"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("dtype", [6, [[]], "int64", 5.1, (), "", "a", DataType.UNKNOWN])
|
|
def test_milvus_client_collection_invalid_field_type(self, dtype):
|
|
"""
|
|
target: test collection with invalid field type
|
|
method: try to add a field with an invalid DataType to schema
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
# Try to add a field with invalid dtype
|
|
error = {ct.err_code: 999, ct.err_msg: "Field dtype must be of DataType"}
|
|
# The add_field method should raise an error for invalid dtype
|
|
self.add_field(schema, field_name="test", datatype=dtype,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("unsupported_field_type", [
|
|
DataType.NONE, DataType.BOOL, DataType.INT8, DataType.INT16, DataType.INT32,
|
|
DataType.FLOAT, DataType.DOUBLE, DataType.STRING, DataType.JSON,
|
|
DataType.ARRAY, DataType.GEOMETRY,
|
|
DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR, DataType.SPARSE_FLOAT_VECTOR,
|
|
DataType.INT8_VECTOR, DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR
|
|
])
|
|
def test_milvus_client_collection_unsupported_primary_field(self, unsupported_field_type):
|
|
"""
|
|
target: test collection with unsupported primary field type
|
|
method: create collection with unsupported primary field type
|
|
expected: raise exception when creating collection
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with unsupported primary field type
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
if unsupported_field_type in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR,
|
|
DataType.INT8_VECTOR, DataType.FLOAT16_VECTOR,
|
|
DataType.BFLOAT16_VECTOR]:
|
|
schema.add_field("unsupported_primary", unsupported_field_type, is_primary=True, dim=default_dim)
|
|
elif unsupported_field_type == DataType.SPARSE_FLOAT_VECTOR:
|
|
schema.add_field("unsupported_primary", unsupported_field_type, is_primary=True)
|
|
elif unsupported_field_type == DataType.ARRAY:
|
|
schema.add_field("unsupported_primary", unsupported_field_type, is_primary=True,
|
|
element_type=DataType.INT64, max_capacity=100)
|
|
else:
|
|
schema.add_field("unsupported_primary", unsupported_field_type, is_primary=True)
|
|
schema.add_field("vector_field", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Try to create collection - should fail here
|
|
error = {ct.err_code: 1100, ct.err_msg: "Primary key type must be DataType.INT64 or DataType.VARCHAR"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("invalid_name", ["中文", "español", "عربي", "हिंदी", "Русский", "!@#$%^&*()", "123abc"])
|
|
def test_milvus_client_collection_schema_with_invalid_field_name(self, invalid_name):
|
|
"""
|
|
target: test create collection schema with invalid field names
|
|
method: try to create a schema with a field name
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Add a field with an invalid name
|
|
schema.add_field(invalid_name, DataType.VARCHAR, max_length=128)
|
|
# Determine expected error message based on invalid field name type
|
|
if invalid_name == "español":
|
|
expected_msg = "Field name can only contain numbers, letters, and underscores."
|
|
else:
|
|
expected_msg = "The first character of a field name must be an underscore or letter."
|
|
error = {ct.err_code: 1701, ct.err_msg: f"Invalid field name: {invalid_name}. {expected_msg}: field name invalid[field={invalid_name}]"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("keyword", [
|
|
"$meta", "like", "exists", "EXISTS", "and", "or", "not", "in",
|
|
"json_contains", "JSON_CONTAINS", "json_contains_all", "JSON_CONTAINS_ALL",
|
|
"json_contains_any", "JSON_CONTAINS_ANY", "array_contains", "ARRAY_CONTAINS",
|
|
"array_contains_all", "ARRAY_CONTAINS_ALL", "array_contains_any", "ARRAY_CONTAINS_ANY",
|
|
"array_length", "ARRAY_LENGTH", "true", "True", "TRUE", "false", "False", "FALSE",
|
|
"text_match", "TEXT_MATCH", "phrase_match", "PHRASE_MATCH", "random_sample", "RANDOM_SAMPLE"
|
|
])
|
|
def test_milvus_client_collection_field_name_with_keywords(self, keyword):
|
|
"""
|
|
target: test collection creation with field name using Milvus keywords
|
|
method: create collection with field name using reserved keywords
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with field name using reserved keyword
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field(keyword, DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Attempt to create collection with invalid field name - should fail
|
|
error = {ct.err_code: 1701, ct.err_msg: f"Invalid field name: {keyword}"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_empty_fields(self):
|
|
"""
|
|
target: test create collection with empty fields
|
|
method: create collection with schema that has no fields
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create an empty schema (no fields added)
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
error = {ct.err_code: 1100, ct.err_msg: "Schema must have a primary key field"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_over_maximum_limits(self):
|
|
"""
|
|
target: combine validations for all over-maximum scenarios
|
|
method:
|
|
- Scenario 1: over maximum total fields
|
|
- Scenario 2: over maximum vector fields
|
|
- Scenario 3: multiple vector fields and over maximum total fields
|
|
- Scenario 4: over maximum vector fields and over maximum total fields
|
|
expected: each scenario raises the same errors as in the original individual tests
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# ========== Scenario 1: over maximum total fields ==========
|
|
schema_1 = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema_1.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
schema_1.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
|
limit_num = ct.max_field_num - 2
|
|
for _ in range(limit_num):
|
|
schema_1.add_field(cf.gen_unique_str("field_name"), DataType.INT64)
|
|
schema_1.add_field(cf.gen_unique_str("extra_field"), DataType.INT64)
|
|
error_fields_over = {ct.err_code: 1, ct.err_msg: "maximum field's number should be limited to 64"}
|
|
self.create_collection(client, collection_name, default_dim, schema=schema_1,
|
|
check_task=CheckTasks.err_res, check_items=error_fields_over)
|
|
# ========== Scenario 2: over maximum vector fields ==========
|
|
schema_2 = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
for _ in range(ct.max_vector_field_num + 1):
|
|
schema_2.add_field(cf.gen_unique_str("vector_field_name"), DataType.FLOAT_VECTOR, dim=default_dim)
|
|
schema_2.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
error_vector_over = {ct.err_code: 65535, ct.err_msg: "maximum vector field's number should be limited to 4"}
|
|
self.create_collection(client, collection_name, default_dim, schema=schema_2,
|
|
check_task=CheckTasks.err_res, check_items=error_vector_over)
|
|
# ========== Scenario 3: multiple vector fields and over maximum total fields ==========
|
|
schema_3 = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
vector_limit_num = ct.max_vector_field_num - 2
|
|
for _ in range(vector_limit_num):
|
|
schema_3.add_field(cf.gen_unique_str("field_name"), DataType.FLOAT_VECTOR, dim=default_dim)
|
|
for _ in range(ct.max_field_num):
|
|
schema_3.add_field(cf.gen_unique_str("field_name"), DataType.INT64)
|
|
schema_3.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
error_fields_over_64 = {ct.err_code: 65535, ct.err_msg: "maximum field's number should be limited to 64"}
|
|
self.create_collection(client, collection_name, default_dim, schema=schema_3,
|
|
check_task=CheckTasks.err_res, check_items=error_fields_over_64)
|
|
# ========== Scenario 4: over maximum vector fields and over maximum total fields ==========
|
|
schema_4 = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
for _ in range(ct.max_vector_field_num + 1):
|
|
schema_4.add_field(cf.gen_unique_str("field_name"), DataType.FLOAT_VECTOR, dim=default_dim)
|
|
for _ in range(limit_num - 4):
|
|
schema_4.add_field(cf.gen_unique_str("field_name"), DataType.INT64)
|
|
schema_4.add_field(cf.gen_unique_str("field_name"), DataType.FLOAT_VECTOR, dim=default_dim)
|
|
schema_4.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
self.create_collection(client, collection_name, default_dim, schema=schema_4,
|
|
check_task=CheckTasks.err_res, check_items=error_fields_over_64)
|
|
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_collection_without_vectors(self):
|
|
"""
|
|
target: test create collection without vectors
|
|
method: create collection only with int field
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with only non-vector fields
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("int_field", DataType.INT64, is_primary=True, auto_id=False)
|
|
error = {ct.err_code: 1100, ct.err_msg: "schema does not contain vector field: invalid parameter"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("vector_type", [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR])
|
|
def test_milvus_client_collection_vector_without_dim(self, vector_type):
|
|
"""
|
|
target: test creating a collection with a vector field missing the dimension
|
|
method: define a vector field without specifying dim and attempt to create the collection
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with a vector field missing the dim parameter
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
# Add vector field without dim
|
|
schema.add_field("vector_field", vector_type)
|
|
error = {ct.err_code: 1, ct.err_msg: "dimension is not defined in field type params"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("vector_type", [DataType.FLOAT_VECTOR, DataType.INT8_VECTOR, DataType.BINARY_VECTOR])
|
|
def test_milvus_client_collection_without_primary_field(self, vector_type):
|
|
"""
|
|
target: test create collection without primary field
|
|
method: no primary field specified in collection schema and fields
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with fields but no primary key
|
|
schema1 = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema1.add_field("int_field", DataType.INT64) # Not primary
|
|
schema1.add_field("vector_field", vector_type, dim=default_dim)
|
|
error = {ct.err_code: 1100, ct.err_msg: "Schema must have a primary key field"}
|
|
self.create_collection(client, collection_name, schema=schema1,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
# Create schema with only vector field
|
|
schema2 = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema2.add_field("vector_field", vector_type, dim=default_dim)
|
|
error = {ct.err_code: 1100, ct.err_msg: "Schema must have a primary key field"}
|
|
self.create_collection(client, collection_name, schema=schema2,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("primary_field", [[], 1, [1, "2", 3], (1,), {1: 1}])
|
|
def test_milvus_client_collection_non_string_primary_field(self, primary_field):
|
|
"""
|
|
target: test collection with non-string primary_field
|
|
method: pass a non-string/non-int value as primary_field to schema creation
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
# Test at schema creation level - create schema with invalid primary_field parameter
|
|
error = {ct.err_code: 999, ct.err_msg: "Param primary_field must be int or str type"}
|
|
# This should fail when creating schema with invalid primary_field type
|
|
self.create_schema(client, enable_dynamic_field=False, primary_field=primary_field,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("is_primary", [None, 2, "string"])
|
|
def test_milvus_client_collection_invalid_is_primary(self, is_primary):
|
|
"""
|
|
target: test collection with invalid is_primary value
|
|
method: define a field with is_primary set to a non-bool value and attempt to create a collection
|
|
expected: raise exception indicating is_primary must be bool type
|
|
"""
|
|
client = self._client()
|
|
# Create schema and attempt to add a field with invalid is_primary value
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
error = {ct.err_code: 999, ct.err_msg: "Param is_primary must be bool type"}
|
|
# Attempt to add a field with invalid is_primary value, expect error
|
|
self.add_field(schema, "id", DataType.INT64, is_primary=is_primary,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_dup_field(self):
|
|
"""
|
|
target: test create collection with duplicate field names
|
|
method: create schema with two fields having the same name
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with duplicate field names
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("int64_field", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("int64_field", DataType.INT64)
|
|
schema.add_field("vector_field", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
|
|
error = {ct.err_code: 1100, ct.err_msg: "duplicated field name"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
has_collection = self.has_collection(client, collection_name)[0]
|
|
assert not has_collection
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_add_field_as_primary(self):
|
|
"""
|
|
target: test fast create collection with add new field as primary
|
|
method: create collection name with add new field as primary
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# 1. create collection
|
|
dim, field_name = 8, "field_new"
|
|
error = {ct.err_code: 1100, ct.err_msg: f"not support to add pk field, "
|
|
f"field name = {field_name}: invalid parameter"}
|
|
self.create_collection(client, collection_name, dim)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
self.add_collection_field(client, collection_name, field_name=field_name, data_type=DataType.INT64,
|
|
nullable=True, is_primary=True, check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_none_desc(self):
|
|
"""
|
|
target: test create collection with none description
|
|
method: create collection with none description in schema
|
|
expected: raise exception due to invalid description type
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
|
|
# Try to create schema with None description
|
|
schema = self.create_schema(client, enable_dynamic_field=False, description=None)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
|
|
error = {ct.err_code: 1100, ct.err_msg: "description [None] has type NoneType, but expected one of: bytes, str"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_invalid_schema_multi_pk(self):
|
|
"""
|
|
target: test create collection with schema containing multiple primary key fields
|
|
method: create schema with two primary key fields and use it to create collection
|
|
expected: raise exception due to multiple primary keys
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with multiple primary key fields
|
|
schema_1 = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema_1.add_field("field1", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema_1.add_field("field2", DataType.INT64, is_primary=True, auto_id=False) # Second primary key
|
|
schema_1.add_field("vector_field", DataType.FLOAT_VECTOR, dim=32)
|
|
# Try to create collection with multiple primary keys
|
|
error = {ct.err_code: 999, ct.err_msg: "Expected only one primary key field"}
|
|
self.create_collection(client, collection_name, schema=schema_1,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
schema_2 = self.create_schema(client, enable_dynamic_field=False, primary_field="field2")[0]
|
|
schema_2.add_field("field1", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema_2.add_field("field2", DataType.INT64) # Second primary key
|
|
schema_2.add_field("vector_field", DataType.FLOAT_VECTOR, dim=32)
|
|
# Try to create collection with multiple primary keys
|
|
error = {ct.err_code: 999, ct.err_msg: "Expected only one primary key field"}
|
|
self.create_collection(client, collection_name, schema=schema_2,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("shards_num,error_type", [(ct.max_shards_num + 1, "range"), (257, "range"), (1.0, "type"), ("2", "type")])
|
|
def test_milvus_client_collection_invalid_shards(self, shards_num, error_type):
|
|
"""
|
|
target: test collection with invalid shards_num values
|
|
method: create collection with shards_num that are out of valid range or wrong type
|
|
expected: raise exception with appropriate error message
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
if error_type == "range":
|
|
error = {ct.err_code: 1, ct.err_msg: f"maximum shards's number should be limited to {ct.max_shards_num}"}
|
|
else: # error_type == "type"
|
|
error = {ct.err_code: 999, ct.err_msg: "invalid num_shards type"}
|
|
# Try to create collection with invalid shards_num (should fail)
|
|
self.create_collection(client, collection_name, default_dim, shards_num=shards_num,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
class TestMilvusClientCollectionValid(TestMilvusClientV2Base):
|
|
""" Test case of create collection interface """
|
|
|
|
@pytest.fixture(scope="function", params=[False, True])
|
|
def auto_id(self, request):
|
|
yield request.param
|
|
|
|
@pytest.fixture(scope="function", params=["COSINE", "L2", "IP"])
|
|
def metric_type(self, request):
|
|
yield request.param
|
|
|
|
@pytest.fixture(scope="function", params=["int", "string"])
|
|
def id_type(self, request):
|
|
yield request.param
|
|
|
|
"""
|
|
******************************************************************
|
|
# The following are valid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
@pytest.mark.parametrize("dim", [ct.min_dim, default_dim, ct.max_dim])
|
|
def test_milvus_client_collection_fast_creation_default(self, dim):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
self.using_database(client, "default")
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, dim)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"dim": dim,
|
|
"consistency_level": 0})
|
|
index = self.list_indexes(client, collection_name)[0]
|
|
assert index == ['vector']
|
|
# load_state = self.get_load_state(collection_name)[0]
|
|
self.load_partitions(client, collection_name, "_default")
|
|
self.release_partitions(client, collection_name, "_default")
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("dim", [ct.min_dim, default_dim, ct.max_dim])
|
|
def test_milvus_client_collection_fast_creation_all_params(self, dim, metric_type, id_type, auto_id):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
max_length = 100
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, dim, id_type=id_type, metric_type=metric_type,
|
|
auto_id=auto_id, max_length=max_length)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"dim": dim,
|
|
"auto_id": auto_id,
|
|
"consistency_level": 0})
|
|
index = self.list_indexes(client, collection_name)[0]
|
|
assert index == ['vector']
|
|
# load_state = self.get_load_state(collection_name)[0]
|
|
self.release_collection(client, collection_name)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
@pytest.mark.parametrize("nullable", [True, False])
|
|
@pytest.mark.parametrize("vector_type", [DataType.FLOAT_VECTOR, DataType.INT8_VECTOR])
|
|
@pytest.mark.parametrize("add_field", [True, False])
|
|
def test_milvus_client_collection_self_creation_default(self, nullable, vector_type, add_field):
|
|
"""
|
|
target: test self create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
dim = 128
|
|
# 1. create collection
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id_string", DataType.VARCHAR, max_length=64, is_primary=True, auto_id=False)
|
|
schema.add_field("embeddings", vector_type, dim=dim)
|
|
schema.add_field("title", DataType.VARCHAR, max_length=64, is_partition_key=True)
|
|
schema.add_field("nullable_field", DataType.INT64, nullable=nullable, default_value=10)
|
|
schema.add_field("array_field", DataType.ARRAY, element_type=DataType.INT64, max_capacity=12,
|
|
max_length=64, nullable=nullable)
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index("embeddings", metric_type="COSINE")
|
|
# index_params.add_index("title")
|
|
self.create_collection(client, collection_name, dimension=dim, schema=schema, index_params=index_params)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
check_items = {"collection_name": collection_name,
|
|
"dim": dim,
|
|
"consistency_level": 0,
|
|
"enable_dynamic_field": False,
|
|
"num_partitions": 16,
|
|
"id_name": "id_string",
|
|
"vector_name": "embeddings"}
|
|
if nullable:
|
|
check_items["nullable_fields"] = ["nullable_field", "array_field"]
|
|
if add_field:
|
|
self.add_collection_field(client, collection_name, field_name="field_new_int64", data_type=DataType.INT64,
|
|
nullable=True, is_cluster_key=True)
|
|
self.add_collection_field(client, collection_name, field_name="field_new_var", data_type=DataType.VARCHAR,
|
|
nullable=True, default_vaule="field_new_var", max_length=64)
|
|
check_items["add_fields"] = ["field_new_int64", "field_new_var"]
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items=check_items)
|
|
index = self.list_indexes(client, collection_name)[0]
|
|
assert index == ['embeddings']
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_all_datatype_fields(self):
|
|
"""
|
|
target: Test create collection with all supported dataType fields
|
|
method: Create collection with schema containing all supported dataTypes
|
|
expected: Collection created successfully with all field types
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Add all supported scalar data types (excluding vectors and unsupported types)
|
|
supported_types = []
|
|
for k, v in DataType.__members__.items():
|
|
if (v and v != DataType.UNKNOWN and v != DataType.STRING
|
|
and v != DataType.VARCHAR and v != DataType.FLOAT_VECTOR
|
|
and v != DataType.BINARY_VECTOR and v != DataType.ARRAY
|
|
and v != DataType.FLOAT16_VECTOR and v != DataType.BFLOAT16_VECTOR
|
|
and v != DataType.INT8_VECTOR):
|
|
supported_types.append((k.lower(), v))
|
|
for field_name, data_type in supported_types:
|
|
if data_type != DataType.INT64: # Skip INT64 as it's already added as primary key
|
|
schema.add_field(field_name, data_type)
|
|
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
expected_field_count = len([name for name in supported_types]) + 2
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"enable_dynamic_field": False,
|
|
"fields_num": expected_field_count})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_self_creation_multiple_vectors(self):
|
|
"""
|
|
target: test self create collection with multiple vectors
|
|
method: create collection with multiple vectors
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
dim = 128
|
|
# 1. create collection
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id_int64", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=dim)
|
|
schema.add_field("embeddings_1", DataType.INT8_VECTOR, dim=dim * 2)
|
|
schema.add_field("embeddings_2", DataType.FLOAT16_VECTOR, dim=int(dim / 2))
|
|
schema.add_field("embeddings_3", DataType.BFLOAT16_VECTOR, dim=int(dim / 2))
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index("embeddings", metric_type="COSINE")
|
|
index_params.add_index("embeddings_1", metric_type="IP")
|
|
index_params.add_index("embeddings_2", metric_type="L2")
|
|
index_params.add_index("embeddings_3", metric_type="COSINE")
|
|
# index_params.add_index("title")
|
|
self.create_collection(client, collection_name, dimension=dim, schema=schema, index_params=index_params)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
check_items = {"collection_name": collection_name,
|
|
"dim": [dim, dim * 2, int(dim / 2), int(dim / 2)],
|
|
"consistency_level": 0,
|
|
"enable_dynamic_field": False,
|
|
"id_name": "id_int64",
|
|
"vector_name": ["embeddings", "embeddings_1", "embeddings_2", "embeddings_3"]}
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items=check_items)
|
|
index = self.list_indexes(client, collection_name)[0]
|
|
assert sorted(index) == sorted(['embeddings', 'embeddings_1', 'embeddings_2', 'embeddings_3'])
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("primary_key_type", ["int64", "varchar"])
|
|
def test_milvus_client_collection_max_fields_and_max_vector_fields(self, primary_key_type):
|
|
"""
|
|
target: merge validations for maximum total fields and maximum vector fields in one case
|
|
method:
|
|
- Scenario A: create collection with maximum total fields (1 vector + scalars)
|
|
- Scenario B: create collection with maximum vector fields and maximum total fields
|
|
expected: collections created successfully and properties verified for both scenarios
|
|
"""
|
|
client = self._client()
|
|
# ===================== Scenario A: maximum total fields (single vector field) =====================
|
|
collection_name_a = cf.gen_collection_name_by_testcase_name()
|
|
schema_a = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema_a.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
# Add one vector field
|
|
schema_a.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Fill remaining fields with scalars to reach the maximum field number
|
|
remaining_scalar_a = ct.max_field_num - 2
|
|
for _ in range(remaining_scalar_a):
|
|
schema_a.add_field(cf.gen_unique_str("field_name"), DataType.INT64)
|
|
# Create collection and verify
|
|
self.create_collection(client, collection_name_a, default_dim, schema=schema_a)
|
|
assert collection_name_a in self.list_collections(client)[0]
|
|
self.describe_collection(client, collection_name_a,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={
|
|
"collection_name": collection_name_a,
|
|
"fields_num": ct.max_field_num,
|
|
"enable_dynamic_field": False,})
|
|
self.drop_collection(client, collection_name_a)
|
|
# ===================== Scenario B: maximum vector fields + maximum total fields =====================
|
|
collection_name_b = cf.gen_collection_name_by_testcase_name()
|
|
schema_b = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
if primary_key_type == "int64":
|
|
schema_b.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
else:
|
|
schema_b.add_field("pk_string", DataType.VARCHAR, max_length=100, is_primary=True)
|
|
# Add maximum number of vector fields
|
|
vector_field_names = []
|
|
for _ in range(ct.max_vector_field_num):
|
|
vector_field_name = cf.gen_unique_str("vector_field")
|
|
vector_field_names.append(vector_field_name)
|
|
schema_b.add_field(vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Fill remaining with scalars to reach maximum fields
|
|
remaining_scalar_b = ct.max_field_num - ct.max_vector_field_num - 1
|
|
for _ in range(remaining_scalar_b):
|
|
schema_b.add_field(cf.gen_unique_str("scalar_field"), DataType.INT64)
|
|
# Create collection and verify
|
|
self.create_collection(client, collection_name_b, default_dim, schema=schema_b)
|
|
assert collection_name_b in self.list_collections(client)[0]
|
|
self.describe_collection(client, collection_name_b,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={
|
|
"collection_name": collection_name_b,
|
|
"dim": [default_dim] * ct.max_vector_field_num,
|
|
"enable_dynamic_field": False,
|
|
"id_name": ct.default_int64_field_name if primary_key_type == "int64" else "pk_string",
|
|
"vector_name": vector_field_names,
|
|
"fields_num": ct.max_field_num,
|
|
}
|
|
)
|
|
self.drop_collection(client, collection_name_b)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_collection_primary_in_schema(self):
|
|
"""
|
|
target: test collection with primary field
|
|
method: specify primary field in CollectionSchema
|
|
expected: collection.primary_field
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with primary field specified in CollectionSchema
|
|
schema = self.create_schema(client, enable_dynamic_field=False, primary_field=ct.default_int64_field_name)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"id_name": ct.default_int64_field_name,
|
|
"enable_dynamic_field": False}
|
|
)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_collection_primary_in_field(self):
|
|
"""
|
|
target: test collection with primary field
|
|
method: specify primary field in FieldSchema
|
|
expected: collection.primary_field
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema and specify primary field in FieldSchema
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
schema.add_field("float_field", DataType.FLOAT)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"id_name": ct.default_int64_field_name,
|
|
"enable_dynamic_field": False})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_collection_primary_field_consistency(self):
|
|
"""
|
|
target: Test collection with both CollectionSchema and FieldSchema primary field specification
|
|
method: Specify primary field in CollectionSchema and also set is_primary=True in FieldSchema
|
|
expected: The collection's primary field is set correctly and matches the expected field name
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with primary field specified in CollectionSchema
|
|
schema = self.create_schema(client, enable_dynamic_field=False, primary_field="primary_field")[0]
|
|
schema.add_field("primary_field", DataType.INT64, is_primary=True)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"id_name": "primary_field",
|
|
"enable_dynamic_field": False}
|
|
)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
@pytest.mark.parametrize("auto_id", [True, False])
|
|
@pytest.mark.parametrize("set_in", ["field", "schema", "both"])
|
|
def test_milvus_client_collection_auto_id(self, auto_id, set_in):
|
|
"""
|
|
target: Test auto_id setting in field schema, collection schema, and both
|
|
method: Set auto_id in different ways and verify the behavior
|
|
expected: auto_id is correctly applied and collection behavior matches expectation
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
if set_in == "field":
|
|
# Test setting auto_id in field schema only
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=auto_id)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
elif set_in == "schema":
|
|
# Test setting auto_id in collection schema only
|
|
schema = self.create_schema(client, enable_dynamic_field=False, auto_id=auto_id)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
else: # both
|
|
# Test setting auto_id in both field schema and collection schema (should be consistent)
|
|
schema = self.create_schema(client, enable_dynamic_field=False, auto_id=auto_id)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=auto_id)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Create collection
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Verify collection properties
|
|
res = self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"auto_id": auto_id,
|
|
"enable_dynamic_field": False})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_auto_id_true_on_primary_and_false_on_non_primary(self):
|
|
"""
|
|
target: Test collection with auto_id=True on primary field and auto_id=False on non-primary field
|
|
method: Set auto_id=True on primary key field and auto_id=False on a non-primary field, then verify schema auto_id is True
|
|
expected: Collection schema auto_id should be True when primary key field has auto_id=True, regardless of non-primary field auto_id setting
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with primary key field (auto_id=True) and a non-primary field (auto_id=False)
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=True)
|
|
schema.add_field("field2", DataType.INT64, auto_id=False)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Create collection
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Verify collection properties: auto_id should be True
|
|
res = self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"auto_id": True,
|
|
"enable_dynamic_field": False})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("field_auto_id", [True, False])
|
|
@pytest.mark.parametrize("schema_auto_id", [True, False])
|
|
def test_milvus_client_collection_auto_id_inconsistent(self, field_auto_id, schema_auto_id):
|
|
"""
|
|
target: Test collection auto_id with different settings between field schema and collection schema
|
|
method: Set different auto_id values in field schema and collection schema
|
|
expected: If either field or schema has auto_id=True, final result is True
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with auto_id setting
|
|
schema = self.create_schema(client, enable_dynamic_field=False, auto_id=schema_auto_id)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=field_auto_id)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Create collection
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Determine expected auto_id: True if either field or schema has auto_id=True
|
|
expected_auto_id = field_auto_id or schema_auto_id
|
|
# Verify that the final auto_id follows OR logic
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"auto_id": expected_auto_id,
|
|
"enable_dynamic_field": False})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_create_collection_dup_name(self):
|
|
"""
|
|
target: test create collection with same name
|
|
method: create collection with same name with same default params
|
|
expected: collection properties consistent
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
# 2. create collection with same params
|
|
self.create_collection(client, collection_name, default_dim)
|
|
|
|
collections = self.list_collections(client)[0]
|
|
collection_count = collections.count(collection_name)
|
|
assert collection_name in collections
|
|
assert collection_count == 1, f"Expected only 1 collection named '{collection_name}', but found {collection_count}"
|
|
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_create_collection_dup_name_same_schema(self):
|
|
"""
|
|
target: test create collection with dup name and same schema
|
|
method: create collection with dup name and same schema
|
|
expected: two collection object is available and properties consistent
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
dim = 128
|
|
description = "test collection description"
|
|
# Create schema
|
|
schema = self.create_schema(client, enable_dynamic_field=False, description=description)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("float_field", DataType.FLOAT)
|
|
schema.add_field("varchar_field", DataType.VARCHAR, max_length=100)
|
|
schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=dim)
|
|
# 1. Create collection with specific schema
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Get first collection info
|
|
collection_info_1 = self.describe_collection(client, collection_name)[0]
|
|
description_1 = collection_info_1.get("description", "")
|
|
# 2. Create collection again with same name and same schema
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Verify collection still exists and properties are consistent
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
# Get second collection info and compare
|
|
collection_info_2 = self.describe_collection(client, collection_name)[0]
|
|
description_2 = collection_info_2.get("description", "")
|
|
# Verify collection properties are consistent
|
|
assert collection_info_1["collection_name"] == collection_info_2["collection_name"]
|
|
assert description_1 == description_2 == description
|
|
assert len(collection_info_1["fields"]) == len(collection_info_2["fields"])
|
|
# Verify field names and types are the same
|
|
fields_1 = {field["name"]: field["type"] for field in collection_info_1["fields"]}
|
|
fields_2 = {field["name"]: field["type"] for field in collection_info_2["fields"]}
|
|
assert fields_1 == fields_2
|
|
collection_count = collections.count(collection_name)
|
|
assert collection_count == 1, f"Expected only 1 collection named '{collection_name}', but found {collection_count}"
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_array_insert_search(self):
|
|
"""
|
|
target: test search (high level api) normal case
|
|
method: create connection, collection, insert and search
|
|
expected: search/query successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
# 2. insert
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{
|
|
default_primary_key_field_name: i,
|
|
default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0,
|
|
default_int32_array_field_name: [i, i + 1, i + 2],
|
|
default_string_array_field_name: [str(i), str(i + 1), str(i + 2)]
|
|
} for i in range(default_nb)]
|
|
self.insert(client, collection_name, rows)
|
|
# 3. search
|
|
vectors_to_search = rng.random((1, default_dim))
|
|
insert_ids = [i for i in range(default_nb)]
|
|
self.search(client, collection_name, vectors_to_search,
|
|
check_task=CheckTasks.check_search_results,
|
|
check_items={"enable_milvus_client_api": True,
|
|
"nq": len(vectors_to_search),
|
|
"ids": insert_ids,
|
|
"limit": default_limit,
|
|
"pk_name": default_primary_key_field_name})
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.skip(reason="issue 25110")
|
|
def test_milvus_client_search_query_string(self):
|
|
"""
|
|
target: test search (high level api) for string primary key
|
|
method: create connection, collection, insert and search
|
|
expected: search/query successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim, id_type="string", max_length=ct.default_length)
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"dim": default_dim,
|
|
"consistency_level": 0})
|
|
# 2. insert
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [
|
|
{default_primary_key_field_name: str(i), default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
self.insert(client, collection_name, rows)
|
|
# self.flush(client, collection_name)
|
|
# assert self.num_entities(client, collection_name)[0] == default_nb
|
|
# 3. search
|
|
vectors_to_search = rng.random((1, default_dim))
|
|
self.search(client, collection_name, vectors_to_search,
|
|
check_task=CheckTasks.check_search_results,
|
|
check_items={"enable_milvus_client_api": True,
|
|
"nq": len(vectors_to_search),
|
|
"pk_name": default_primary_key_field_name,
|
|
"limit": default_limit})
|
|
# 4. query
|
|
self.query(client, collection_name, filter="id in [0, 1]",
|
|
check_task=CheckTasks.check_query_results,
|
|
check_items={exp_res: rows,
|
|
"with_vec": True,
|
|
"pk_name": default_primary_key_field_name})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_search_different_metric_types_not_specifying_in_search_params(self, metric_type, auto_id):
|
|
"""
|
|
target: test search (high level api) normal case
|
|
method: create connection, collection, insert and search
|
|
expected: search successfully with limit(topK)
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim, metric_type=metric_type, auto_id=auto_id,
|
|
consistency_level="Strong")
|
|
# 2. insert
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
if auto_id:
|
|
for row in rows:
|
|
row.pop(default_primary_key_field_name)
|
|
self.insert(client, collection_name, rows)
|
|
# 3. search
|
|
vectors_to_search = rng.random((1, default_dim))
|
|
# search_params = {"metric_type": metric_type}
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit,
|
|
output_fields=[default_primary_key_field_name],
|
|
check_task=CheckTasks.check_search_results,
|
|
check_items={"enable_milvus_client_api": True,
|
|
"nq": len(vectors_to_search),
|
|
"pk_name": default_primary_key_field_name,
|
|
"limit": default_limit})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.skip("pymilvus issue #1866")
|
|
def test_milvus_client_search_different_metric_types_specifying_in_search_params(self, metric_type, auto_id):
|
|
"""
|
|
target: test search (high level api) normal case
|
|
method: create connection, collection, insert and search
|
|
expected: search successfully with limit(topK)
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim, metric_type=metric_type, auto_id=auto_id,
|
|
consistency_level="Strong")
|
|
# 2. insert
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
if auto_id:
|
|
for row in rows:
|
|
row.pop(default_primary_key_field_name)
|
|
self.insert(client, collection_name, rows)
|
|
# 3. search
|
|
vectors_to_search = rng.random((1, default_dim))
|
|
search_params = {"metric_type": metric_type}
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit,
|
|
search_params=search_params,
|
|
output_fields=[default_primary_key_field_name],
|
|
check_task=CheckTasks.check_search_results,
|
|
check_items={"enable_milvus_client_api": True,
|
|
"nq": len(vectors_to_search),
|
|
"pk_name": default_primary_key_field_name,
|
|
"limit": default_limit})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_delete_with_ids(self):
|
|
"""
|
|
target: test delete (high level api)
|
|
method: create connection, collection, insert delete, and search
|
|
expected: search/query successfully without deleted data
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
|
# 2. insert
|
|
default_nb = 1000
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
pks = self.insert(client, collection_name, rows)[0]
|
|
# 3. delete
|
|
delete_num = 3
|
|
self.delete(client, collection_name, ids=[i for i in range(delete_num)])
|
|
# 4. search
|
|
vectors_to_search = rng.random((1, default_dim))
|
|
insert_ids = [i for i in range(default_nb)]
|
|
for insert_id in range(delete_num):
|
|
if insert_id in insert_ids:
|
|
insert_ids.remove(insert_id)
|
|
limit = default_nb - delete_num
|
|
self.search(client, collection_name, vectors_to_search, limit=default_nb,
|
|
check_task=CheckTasks.check_search_results,
|
|
check_items={"enable_milvus_client_api": True,
|
|
"nq": len(vectors_to_search),
|
|
"ids": insert_ids,
|
|
"limit": limit,
|
|
"pk_name": default_primary_key_field_name})
|
|
# 5. query
|
|
self.query(client, collection_name, filter=default_search_exp,
|
|
check_task=CheckTasks.check_query_results,
|
|
check_items={exp_res: rows[delete_num:],
|
|
"with_vec": True,
|
|
"pk_name": default_primary_key_field_name})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_delete_with_filters(self):
|
|
"""
|
|
target: test delete (high level api)
|
|
method: create connection, collection, insert delete, and search
|
|
expected: search/query successfully without deleted data
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
|
# 2. insert
|
|
default_nb = 1000
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
pks = self.insert(client, collection_name, rows)[0]
|
|
# 3. delete
|
|
delete_num = 3
|
|
self.delete(client, collection_name, filter=f"id < {delete_num}")
|
|
# 4. search
|
|
vectors_to_search = rng.random((1, default_dim))
|
|
insert_ids = [i for i in range(default_nb)]
|
|
for insert_id in range(delete_num):
|
|
if insert_id in insert_ids:
|
|
insert_ids.remove(insert_id)
|
|
limit = default_nb - delete_num
|
|
self.search(client, collection_name, vectors_to_search, limit=default_nb,
|
|
check_task=CheckTasks.check_search_results,
|
|
check_items={"enable_milvus_client_api": True,
|
|
"nq": len(vectors_to_search),
|
|
"ids": insert_ids,
|
|
"limit": limit,
|
|
"pk_name": default_primary_key_field_name})
|
|
# 5. query
|
|
self.query(client, collection_name, filter=default_search_exp,
|
|
check_task=CheckTasks.check_query_results,
|
|
check_items={exp_res: rows[delete_num:],
|
|
"with_vec": True,
|
|
"pk_name": default_primary_key_field_name})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_rename_collection(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
old_name = collection_name
|
|
new_name = collection_name + "new"
|
|
self.rename_collection(client, old_name, new_name)
|
|
collections = self.list_collections(client)[0]
|
|
assert new_name in collections
|
|
assert old_name not in collections
|
|
self.describe_collection(client, new_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": new_name,
|
|
"dim": default_dim,
|
|
"consistency_level": 0})
|
|
index = self.list_indexes(client, new_name)[0]
|
|
assert index == ['vector']
|
|
# load_state = self.get_load_state(collection_name)[0]
|
|
error = {ct.err_code: 100, ct.err_msg: f"collection not found"}
|
|
self.load_partitions(client, old_name, "_default",
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
self.load_partitions(client, new_name, "_default")
|
|
self.release_partitions(client, new_name, "_default")
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, new_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.skip(reason="db not ready")
|
|
def test_milvus_client_collection_rename_collection_target_db(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
db_name = "new_db"
|
|
self.using_database(client, db_name)
|
|
old_name = collection_name
|
|
new_name = collection_name + "new"
|
|
self.rename_collection(client, old_name, new_name, target_db=db_name)
|
|
collections = self.list_collections(client)[0]
|
|
assert new_name in collections
|
|
assert old_name not in collections
|
|
self.describe_collection(client, new_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": new_name,
|
|
"dim": default_dim,
|
|
"consistency_level": 0})
|
|
index = self.list_indexes(client, new_name)[0]
|
|
assert index == ['vector']
|
|
# load_state = self.get_load_state(collection_name)[0]
|
|
error = {ct.err_code: 100, ct.err_msg: f"collection not found"}
|
|
self.load_partitions(client, old_name, "_default",
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
self.load_partitions(client, new_name, "_default")
|
|
self.release_partitions(client, new_name, "_default")
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, new_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_dup_name_drop(self):
|
|
"""
|
|
target: test collection with dup name, and drop
|
|
method: 1. create collection with client1
|
|
2. create collection with client2 with same name
|
|
3. use client2 to drop collection
|
|
4. verify collection is dropped and client1 operations fail
|
|
expected: collection is successfully dropped and subsequent operations from the first client should fail with collection not found error
|
|
"""
|
|
client1 = self._client(alias="client1")
|
|
client2 = self._client(alias="client2")
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# 1. Create collection with client1
|
|
self.create_collection(client1, collection_name, default_dim)
|
|
# 2. Create collection with client2 using same name
|
|
self.create_collection(client2, collection_name, default_dim)
|
|
# 3. Use client2 to drop collection
|
|
self.drop_collection(client2, collection_name)
|
|
# 4. Verify collection is deleted
|
|
has_collection = self.has_collection(client1, collection_name)[0]
|
|
assert not has_collection
|
|
error = {ct.err_code: 100, ct.err_msg: f"can't find collection[database=default]"
|
|
f"[collection={collection_name}]"}
|
|
self.describe_collection(client1, collection_name, check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_long_desc(self):
|
|
"""
|
|
target: test create collection with long description
|
|
method: create collection with description longer than 255 characters
|
|
expected: collection created successfully with long description
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create long description
|
|
long_desc = "a".join("a" for _ in range(256))
|
|
# Create schema with long description
|
|
schema = self.create_schema(client, enable_dynamic_field=False, description=long_desc)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Create collection with long description
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
collection_info = self.describe_collection(client, collection_name)[0]
|
|
actual_desc = collection_info.get("description", "")
|
|
assert actual_desc == long_desc
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("collection_name", ct.valid_resource_names)
|
|
def test_milvus_client_collection_valid_naming_rules(self, collection_name):
|
|
"""
|
|
target: test create collection with valid names following naming rules
|
|
method: 1. create collection using names that follow all supported naming rule elements
|
|
2. create fields with names that also use naming rule elements
|
|
3. verify collection is created successfully with correct properties
|
|
expected: collection created successfully for all valid names
|
|
"""
|
|
client = self._client()
|
|
|
|
# Create schema with fields that also use naming rule elements
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("_1nt", DataType.INT64) # field name using naming rule elements
|
|
schema.add_field("f10at_", DataType.FLOAT_VECTOR, dim=default_dim) # vector field with naming elements
|
|
|
|
# Create collection with valid name
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
|
|
collection_info = self.describe_collection(client, collection_name)[0]
|
|
assert collection_info["collection_name"] == collection_name
|
|
|
|
field_names = [field["name"] for field in collection_info["fields"]]
|
|
assert ct.default_int64_field_name in field_names
|
|
assert "_1nt" in field_names
|
|
assert "f10at_" in field_names
|
|
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_collection_binary(self):
|
|
"""
|
|
target: test collection with binary-vec
|
|
method: create collection with binary vector field
|
|
expected: collection created successfully with binary vector field
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with binary vector field
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field(ct.default_binary_vec_field_name, DataType.BINARY_VECTOR, dim=default_dim)
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
collection_info = self.describe_collection(client, collection_name)[0]
|
|
field_names = [field["name"] for field in collection_info["fields"]]
|
|
assert ct.default_int64_field_name in field_names
|
|
assert ct.default_binary_vec_field_name in field_names
|
|
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_multi_create_drop(self):
|
|
"""
|
|
target: test cycle creation and deletion of multiple collections
|
|
method: in a loop, collections are created and deleted sequentially
|
|
expected: no exception, each collection is created and dropped successfully
|
|
"""
|
|
client = self._client()
|
|
c_num = 20
|
|
|
|
for i in range(c_num):
|
|
collection_name = cf.gen_collection_name_by_testcase_name() + f"_{i}"
|
|
self.create_collection(client, collection_name, default_dim)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
# Drop the collection
|
|
self.drop_collection(client, collection_name)
|
|
collections_after_drop = self.list_collections(client)[0]
|
|
assert collection_name not in collections_after_drop
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_after_drop(self):
|
|
"""
|
|
target: test create collection after create and drop
|
|
method: 1. create a collection 2. drop the collection 3. re-create with same name
|
|
expected: no exception, collection can be recreated with the same name after dropping
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.drop_collection(client, collection_name)
|
|
assert not self.has_collection(client, collection_name)[0]
|
|
|
|
self.create_collection(client, collection_name, default_dim)
|
|
assert self.has_collection(client, collection_name)[0]
|
|
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_create_collection_multithread(self):
|
|
"""
|
|
target: Test create collection with multi-thread
|
|
method: Create collection using multi-thread
|
|
expected: Collections are created successfully
|
|
"""
|
|
client = self._client()
|
|
threads_num = 8
|
|
threads = []
|
|
collection_names = []
|
|
|
|
def create():
|
|
"""Create collection in separate thread"""
|
|
collection_name = cf.gen_collection_name_by_testcase_name() + "_" + cf.gen_unique_str()
|
|
collection_names.append(collection_name)
|
|
self.create_collection(client, collection_name, default_dim)
|
|
# Start multiple threads to create collections
|
|
for i in range(threads_num):
|
|
t = MyThread(target=create, args=())
|
|
threads.append(t)
|
|
t.start()
|
|
time.sleep(0.2)
|
|
# Wait for all threads to complete
|
|
for t in threads:
|
|
t.join()
|
|
# Verify all collections were created successfully
|
|
collections_list = self.list_collections(client)[0]
|
|
for collection_name in collection_names:
|
|
assert collection_name in collections_list
|
|
# Clean up: drop the created collection
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_create_drop_collection_multithread(self):
|
|
"""
|
|
target: test create and drop collection with multi-thread
|
|
method: create and drop collection using multi-thread
|
|
expected: collections are created and dropped successfully
|
|
"""
|
|
client = self._client()
|
|
threads_num = 8
|
|
threads = []
|
|
collection_names = []
|
|
|
|
def create():
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
collection_names.append(collection_name)
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
for i in range(threads_num):
|
|
t = MyThread(target=create, args=())
|
|
threads.append(t)
|
|
t.start()
|
|
time.sleep(0.2)
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Verify all collections have been dropped
|
|
for collection_name in collection_names:
|
|
assert not self.has_collection(client, collection_name)[0]
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_count_no_vectors(self):
|
|
"""
|
|
target: test collection rows_count is correct or not, if collection is empty
|
|
method: create collection and no vectors in it,
|
|
assert the value returned by get_collection_stats is equal to 0
|
|
expected: the count is equal to 0
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client, collection_name, default_dim)
|
|
# Get collection stats for empty collection
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == 0
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_non_vector_field_dim(self):
|
|
"""
|
|
target: test collection with dim for non-vector field
|
|
method: define int64 field with dim parameter
|
|
expected: no exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with non-vector field having dim parameter
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
# Add INT64 field with dim parameter
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True, dim=ct.default_dim)
|
|
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Create collection
|
|
self.create_collection(client, collection_name, default_dim, schema=schema)
|
|
# Verify collection was created successfully
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
# Verify schema properties
|
|
collection_desc = self.describe_collection(client, collection_name)[0]
|
|
assert collection_desc['collection_name'] == collection_name
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_multi_sparse_vectors(self):
|
|
"""
|
|
target: Test multiple sparse vectors in a collection
|
|
method: create 2 sparse vectors in a collection
|
|
expected: successful creation of a collection
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with multiple vector fields including sparse vector
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
schema.add_field(ct.default_float_field_name, DataType.FLOAT)
|
|
schema.add_field(ct.default_float_vec_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
|
schema.add_field("vec_sparse", DataType.SPARSE_FLOAT_VECTOR)
|
|
# Create collection
|
|
self.create_collection(client, collection_name, default_dim, schema=schema)
|
|
# Verify collection was created successfully
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
|
|
|
|
class TestMilvusClientDropCollectionInvalid(TestMilvusClientV2Base):
|
|
""" Test case of drop collection interface """
|
|
"""
|
|
******************************************************************
|
|
# The following are invalid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
|
|
@pytest.mark.skip(reason="https://github.com/milvus-io/milvus/pull/43064 change drop alias restraint")
|
|
def test_milvus_client_drop_collection_invalid_collection_name(self, name):
|
|
"""
|
|
target: Test drop collection with invalid params
|
|
method: drop collection with invalid collection name
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. "
|
|
f"the first character of a collection name must be an underscore or letter"}
|
|
self.drop_collection(client, name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_drop_collection_not_existed(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str("nonexisted")
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("collection_name", ['', None])
|
|
def test_milvus_client_drop_collection_with_empty_or_None_collection_name(self, collection_name):
|
|
"""
|
|
target: test drop invalid collection
|
|
method: drop collection with empty or None collection name
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
# Set different error messages based on collection_name value
|
|
error = {ct.err_code: 1, ct.err_msg: f"`collection_name` value {collection_name} is illegal"}
|
|
self.drop_collection(client, collection_name, check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_drop_collection_after_disconnect(self):
|
|
"""
|
|
target: test drop collection operation after connection is closed
|
|
method: 1. create collection with client
|
|
2. close the client connection
|
|
3. try to drop_collection with disconnected client
|
|
expected: operation should raise appropriate connection error
|
|
"""
|
|
client_temp = self._client(alias="client_drop_collection")
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client_temp, collection_name, default_dim)
|
|
self.close(client_temp)
|
|
error = {ct.err_code: 1, ct.err_msg: 'should create connection first'}
|
|
self.drop_collection(client_temp, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
class TestMilvusClientReleaseCollectionInvalid(TestMilvusClientV2Base):
|
|
""" Test case of release collection interface """
|
|
"""
|
|
******************************************************************
|
|
# The following are invalid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
|
|
def test_milvus_client_release_collection_invalid_collection_name(self, name):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
error = {ct.err_code: 1100,
|
|
ct.err_msg: f"Invalid collection name: {name}. "
|
|
f"the first character of a collection name must be an underscore or letter"}
|
|
self.release_collection(client, name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_release_collection_not_existed(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str("nonexisted")
|
|
error = {ct.err_code: 1100, ct.err_msg: f"collection not found[database=default]"
|
|
f"[collection={collection_name}]"}
|
|
self.release_collection(client, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_release_collection_name_over_max_length(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
# 1. create collection
|
|
collection_name = "a".join("a" for i in range(256))
|
|
error = {ct.err_code: 1100, ct.err_msg: f"the length of a collection name must be less than 255 characters"}
|
|
self.release_collection(client, collection_name, default_dim,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
class TestMilvusClientReleaseCollectionValid(TestMilvusClientV2Base):
|
|
""" Test case of release collection interface """
|
|
|
|
@pytest.fixture(scope="function", params=[False, True])
|
|
def auto_id(self, request):
|
|
yield request.param
|
|
|
|
@pytest.fixture(scope="function", params=["COSINE", "L2", "IP"])
|
|
def metric_type(self, request):
|
|
yield request.param
|
|
|
|
@pytest.fixture(scope="function", params=["int", "string"])
|
|
def id_type(self, request):
|
|
yield request.param
|
|
|
|
"""
|
|
******************************************************************
|
|
# The following are valid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_release_unloaded_collection(self):
|
|
"""
|
|
target: Test releasing a collection that has not been loaded
|
|
method: Create a collection and call release_collection multiple times without loading
|
|
expected: No raising errors, and the collection can still be dropped
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.release_collection(client, collection_name)
|
|
self.release_collection(client, collection_name)
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_release_partition_after_load_collection(self):
|
|
"""
|
|
target: test releasing specific partitions after loading entire collection
|
|
method: 1. create collection and partition
|
|
2. load entire collection
|
|
3. attempt to release specific partition while collection is loaded
|
|
expected: partition release operations work correctly with loaded collection
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
partition_name = cf.gen_unique_str("partition")
|
|
# 1. create collection and partition
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name)
|
|
self.release_partitions(client, collection_name, ["_default", partition_name])
|
|
self.release_collection(client, collection_name)
|
|
self.load_collection(client, collection_name)
|
|
self.release_partitions(client, collection_name, [partition_name])
|
|
self.release_collection(client, collection_name)
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
class TestMilvusClientReleaseAdvanced(TestMilvusClientV2Base):
|
|
"""
|
|
******************************************************************
|
|
The following cases are used to test release during search operations
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_release_collection_during_searching(self):
|
|
"""
|
|
target: test release collection during searching
|
|
method: insert entities into collection, flush and load collection, release collection during searching
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.load_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded, but got {load_state['state']}"
|
|
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, _async=True)
|
|
self.release_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after release, but got {load_state['state']}"
|
|
error = {ct.err_code: 65535, ct.err_msg: "collection not loaded"}
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
class TestMilvusClientLoadCollectionInvalid(TestMilvusClientV2Base):
|
|
""" Test case of search interface """
|
|
"""
|
|
******************************************************************
|
|
# The following are invalid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
|
|
def test_milvus_client_load_collection_invalid_collection_name(self, name):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
error = {ct.err_code: 1100,
|
|
ct.err_msg: f"Invalid collection name: {name}. "
|
|
f"the first character of a collection name must be an underscore or letter"}
|
|
self.load_collection(client, name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_collection_not_existed(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str("nonexisted")
|
|
error = {ct.err_code: 1100, ct.err_msg: f"collection not found[database=default]"
|
|
f"[collection={collection_name}]"}
|
|
self.load_collection(client, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_collection_after_drop(self):
|
|
"""
|
|
target: test load collection after it has been dropped
|
|
method: 1. create collection
|
|
2. drop the collection
|
|
3. try to load the dropped collection
|
|
expected: raise exception indicating collection not found
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.drop_collection(client, collection_name)
|
|
error = {ct.err_code: 999, ct.err_msg: "collection not found"}
|
|
self.load_collection(client, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_release_collection(self):
|
|
"""
|
|
target: test load, release non-exist collection
|
|
method: 1. load, release and drop collection
|
|
2. load and release dropped collection
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create collection
|
|
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
# Prepare and create index
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
|
|
self.create_index(client, collection_name, index_params)
|
|
# Load, release and drop collection
|
|
self.load_collection(client, collection_name)
|
|
self.release_collection(client, collection_name)
|
|
self.drop_collection(client, collection_name)
|
|
# Try to load and release dropped collection - should raise exception
|
|
error = {ct.err_code: 100, ct.err_msg: "collection not found"}
|
|
self.load_collection(client, collection_name, check_task=CheckTasks.err_res, check_items=error)
|
|
self.release_collection(client, collection_name, check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_collection_over_max_length(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = "a".join("a" for i in range(256))
|
|
error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {collection_name}. "
|
|
f"the length of a collection name must be less than 255 characters: "
|
|
f"invalid parameter"}
|
|
self.load_collection(client, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_load_collection_without_index(self):
|
|
"""
|
|
target: test loading a collection without an index
|
|
method: create a collection, drop its index, then attempt to load the collection
|
|
expected: loading should fail with an 'index not found' error
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
error = {ct.err_code: 700, ct.err_msg: f"index not found[collection={collection_name}]"}
|
|
self.load_collection(client, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("partition_names", [[], None])
|
|
def test_milvus_client_load_partition_names_empty(self, partition_names):
|
|
"""
|
|
target: test load partitions with empty partition names list
|
|
method: 1. create collection and partition
|
|
2. insert data into both default partition and custom partition
|
|
3. create index
|
|
4. attempt to load with empty partition_names list
|
|
expected: should raise exception indicating no partition specified
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
partition_name = cf.gen_unique_str("partition")
|
|
# 1. Create collection and partition
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name)
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
# 2. Insert data into both partitions
|
|
rng = np.random.default_rng(seed=19530)
|
|
half = default_nb // 2
|
|
# Insert into default partition
|
|
data_default = [{
|
|
default_primary_key_field_name: i,
|
|
default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0
|
|
} for i in range(half)]
|
|
self.insert(client, collection_name, data_default, partition_name="_default")
|
|
# Insert into custom partition
|
|
data_partition = [{
|
|
default_primary_key_field_name: i + half,
|
|
default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: (i + half) * 1.0
|
|
} for i in range(half)]
|
|
self.insert(client, collection_name, data_partition, partition_name=partition_name)
|
|
# 3. Create index
|
|
self.flush(client, collection_name)
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
|
|
self.create_index(client, collection_name, index_params)
|
|
# 4. Attempt to load with empty partition_names list
|
|
error = {ct.err_code: 0, ct.err_msg: "due to no partition specified"}
|
|
self.load_partitions(client, collection_name, partition_names=partition_names,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("invalid_num_replica", [0.2, "not-int"])
|
|
def test_milvus_client_load_replica_non_number(self, invalid_num_replica):
|
|
"""
|
|
target: test load collection with non-number replicas
|
|
method: load with non-number replicas
|
|
expected: raise exceptions
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# 1. Create collection and insert data
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
# 2. Insert data
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
self.insert(client, collection_name, rows)
|
|
# Verify entity count
|
|
self.flush(client, collection_name)
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == default_nb
|
|
# 3. Create index
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
|
|
self.create_index(client, collection_name, index_params)
|
|
# 4. Attempt to load with invalid replica_number
|
|
error = {ct.err_code: 999, ct.err_msg: f"`replica_number` value {invalid_num_replica} is illegal"}
|
|
self.load_collection(client, collection_name, replica_number=invalid_num_replica,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("replicas", [None, -1, 0])
|
|
def test_milvus_client_load_replica_invalid_input(self, replicas):
|
|
"""
|
|
target: test load partition with invalid replica number or None
|
|
method: load with invalid replica number or None
|
|
expected: load successfully as replica = 1
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# 1. Create collection and prepare
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
# 2. Insert data
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
self.insert(client, collection_name, rows)
|
|
# Verify entity count
|
|
self.flush(client, collection_name)
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == default_nb
|
|
# 3. Create index
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
|
|
self.create_index(client, collection_name, index_params)
|
|
# 4. Load with invalid replica_number (should succeed as replica=1)
|
|
self.load_collection(client, collection_name, replica_number=replicas)
|
|
# 5. Verify replicas
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading collection, but got {load_state['state']}"
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_replica_greater_than_querynodes(self):
|
|
"""
|
|
target: test load with replicas that greater than querynodes
|
|
method: load with 3 replicas (2 querynode)
|
|
expected: Raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# 1. Create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
# 2. Insert data
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
self.insert(client, collection_name, rows)
|
|
# 3. Verify entity count
|
|
self.flush(client, collection_name)
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == default_nb
|
|
# 4. Create index
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
|
|
self.create_index(client, collection_name, index_params)
|
|
# 5. Load with replica_number=3 (should fail if only 2 querynodes available)
|
|
error = {ct.err_code: 999,
|
|
ct.err_msg: "call query coordinator LoadCollection: when load 3 replica count: "
|
|
"service resource insufficient[currentStreamingNode=1][expectedStreamingNode=3]"}
|
|
self.load_collection(client, collection_name, replica_number=3,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_create_collection_without_connection(self):
|
|
"""
|
|
target: test create collection without connection
|
|
method: 1. create collection after connection removed
|
|
expected: raise exception
|
|
"""
|
|
client_temp = self._client(alias="client_temp")
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Remove connection
|
|
self.close(client_temp)
|
|
error = {ct.err_code: 1, ct.err_msg: 'should create connection first'}
|
|
self.create_collection(client_temp, collection_name, default_dim,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_collection_after_disconnect(self):
|
|
"""
|
|
target: test load/release collection operations after connection is closed
|
|
method: 1. create collection with client
|
|
2. close the client connection
|
|
3. try to load collection with disconnected client
|
|
expected: operations should raise appropriate connection errors
|
|
"""
|
|
client_temp = self._client(alias="client_temp")
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client_temp, collection_name, default_dim)
|
|
self.close(client_temp)
|
|
error = {ct.err_code: 1, ct.err_msg: 'should create connection first'}
|
|
self.load_collection(client_temp, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_release_collection_after_disconnect(self):
|
|
"""
|
|
target: test load/release collection operations after connection is closed
|
|
method: 1. create collection with client
|
|
2. close the client connection
|
|
3. try to release collection with disconnected client
|
|
expected: operations should raise appropriate connection errors
|
|
"""
|
|
client_temp = self._client(alias="client_temp2")
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client_temp, collection_name, default_dim)
|
|
self.close(client_temp)
|
|
error = {ct.err_code: 999, ct.err_msg: 'should create connection first'}
|
|
self.release_collection(client_temp, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
|
|
class TestMilvusClientLoadCollectionValid(TestMilvusClientV2Base):
|
|
""" Test case of search interface """
|
|
|
|
@pytest.fixture(scope="function", params=[False, True])
|
|
def auto_id(self, request):
|
|
yield request.param
|
|
|
|
@pytest.fixture(scope="function", params=["COSINE", "L2", "IP"])
|
|
def metric_type(self, request):
|
|
yield request.param
|
|
|
|
@pytest.fixture(scope="function", params=["int", "string"])
|
|
def id_type(self, request):
|
|
yield request.param
|
|
|
|
"""
|
|
******************************************************************
|
|
# The following are valid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_loaded_collection(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.load_collection(client, collection_name)
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_partition_after_release_collection(self):
|
|
"""
|
|
target: test mixed loading scenarios with partial partitions and full collection
|
|
method: 1. create collection and partition
|
|
2. load specific partition first
|
|
3. then load entire collection
|
|
4. release and load again
|
|
expected: all loading operations work correctly without conflicts
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
partition_name = cf.gen_unique_str("partition")
|
|
# Step 1: Create collection and partition
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name)
|
|
# Step 2: Release collection and verify state NotLoad
|
|
self.release_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after release, but got {load_state['state']}"
|
|
# Step 3: Load specific partition and verify state changes to Loaded
|
|
self.load_partitions(client, collection_name, [partition_name])
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading partition, but got {load_state['state']}"
|
|
# Step 4: Load entire collection and verify state remains Loaded
|
|
self.load_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading collection, but got {load_state['state']}"
|
|
# Step 5: Release collection and verify state changes to NotLoad
|
|
self.release_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after release, but got {load_state['state']}"
|
|
# Step 6: Load multiple partitions and verify state changes to Loaded
|
|
self.load_partitions(client, collection_name, ["_default", partition_name])
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading partitions, but got {load_state['state']}"
|
|
# Step 7: Load collection again and verify state remains Loaded
|
|
self.load_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after final load collection, but got {load_state['state']}"
|
|
# Step 8: Cleanup - drop collection if it exists
|
|
if self.has_collection(client, collection_name)[0]:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_load_partitions_after_load_collection(self):
|
|
"""
|
|
target: test load partitions after load collection
|
|
method: 1. load collection
|
|
2. load partitions
|
|
3. search on one partition
|
|
expected: No exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
partition_name_1 = cf.gen_unique_str("partition1")
|
|
partition_name_2 = cf.gen_unique_str("partition2")
|
|
# Create collection and partitions
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name_1)
|
|
self.create_partition(client, collection_name, partition_name_2)
|
|
# Verify initial state is Loaded
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading collection, but got {load_state['state']}"
|
|
# Load collection and verify state
|
|
self.load_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading collection, but got {load_state['state']}"
|
|
# Load partitions and verify state (should remain Loaded)
|
|
self.load_partitions(client, collection_name, [partition_name_1, partition_name_2])
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading partitions, but got {load_state['state']}"
|
|
# Search on one partition
|
|
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
|
|
self.search(client, collection_name, vectors_to_search,
|
|
limit=default_limit, partition_names=[partition_name_1])
|
|
# Verify state remains Loaded after search
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after search, but got {load_state['state']}"
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_collection_load_release_comprehensive(self):
|
|
"""
|
|
target: comprehensive test for collection load/release operations with search/query validation
|
|
method: 1. test collection load -> search/query (should work)
|
|
2. test collection release -> search/query (should fail)
|
|
3. test repeated load/release operations
|
|
4. test load after release
|
|
expected: proper search/query behavior based on collection load/release state
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Step 1: Create collection with data for testing
|
|
self.create_collection(client, collection_name, default_dim)
|
|
# Step 2: Test point 1 - loaded collection can be searched/queried
|
|
self.load_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded, but got {load_state['state']}"
|
|
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit)
|
|
self.query(client, collection_name, filter=default_search_exp)
|
|
# Step 3: Test point 2 - loaded collection can be loaded again
|
|
self.load_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after repeated load, but got {load_state['state']}"
|
|
# Step 4: Test point 3 - released collection cannot be searched/queried
|
|
self.release_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad, but got {load_state['state']}"
|
|
error_search = {ct.err_code: 101, ct.err_msg: "collection not loaded"}
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit,
|
|
check_task=CheckTasks.err_res, check_items=error_search)
|
|
error_query = {ct.err_code: 101, ct.err_msg: "collection not loaded"}
|
|
self.query(client, collection_name, filter=default_search_exp,
|
|
check_task=CheckTasks.err_res, check_items=error_query)
|
|
# Step 5: Test point 4 - released collection can be released again
|
|
self.release_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after repeated release, but got {load_state['state']}"
|
|
# Step 6: Test point 5 - released collection can be loaded again
|
|
self.load_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after reload, but got {load_state['state']}"
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit)
|
|
# Step 7: Cleanup
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_partition_load_release_comprehensive(self):
|
|
"""
|
|
target: comprehensive test for partition load/release operations with search/query validation
|
|
method: 1. test partition load -> search/query
|
|
2. test partition release -> search/query (should fail)
|
|
3. test repeated load/release operations
|
|
4. test load after release
|
|
expected: proper search/query behavior based on partition load/release state
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
partition_name_1 = cf.gen_unique_str("partition1")
|
|
partition_name_2 = cf.gen_unique_str("partition2")
|
|
# Step 1: Create collection with partitions
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name_1)
|
|
self.create_partition(client, collection_name, partition_name_2)
|
|
# Step 2: Test point 1 - loaded partitions can be searched/queried
|
|
self.load_partitions(client, collection_name, [partition_name_1, partition_name_2])
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded, but got {load_state['state']}"
|
|
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1, partition_name_2])
|
|
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1, partition_name_2])
|
|
# Step 3: Test point 2 - loaded partitions can be loaded again
|
|
self.load_partitions(client, collection_name, [partition_name_1, partition_name_2])
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1, partition_name_2])
|
|
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1, partition_name_2])
|
|
# Step 4: Test point 3 - released partitions cannot be searched/queried
|
|
self.release_partitions(client, collection_name, [partition_name_1])
|
|
error_search = {ct.err_code: 201, ct.err_msg: "partition not loaded"}
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1],
|
|
check_task=CheckTasks.err_res, check_items=error_search)
|
|
error_query = {ct.err_code: 201, ct.err_msg: "partition not loaded"}
|
|
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1],
|
|
check_task=CheckTasks.err_res, check_items=error_query)
|
|
# Non-released partition should still work
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_2])
|
|
# Step 5: Test point 4 - released partitions can be released again
|
|
self.release_partitions(client, collection_name, [partition_name_1]) # Release again
|
|
error_search = {ct.err_code: 201, ct.err_msg: "partition not loaded"}
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1],
|
|
check_task=CheckTasks.err_res, check_items=error_search)
|
|
# Step 6: Test point 5 - released partitions can be loaded again
|
|
self.load_partitions(client, collection_name, [partition_name_1])
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1])
|
|
# Step 8: Cleanup
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_mixed_collection_partition_operations_comprehensive(self):
|
|
"""
|
|
target: comprehensive test for mixed collection/partition load/release operations
|
|
method: 1. test collection load -> partition release -> mixed behavior
|
|
2. test partition load -> collection load -> behavior
|
|
3. test collection release -> partition load -> behavior
|
|
expected: consistent behavior across mixed operations
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
partition_name_1 = cf.gen_unique_str("partition1")
|
|
partition_name_2 = cf.gen_unique_str("partition2")
|
|
# Step 1: Setup collection with partitions
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name_1)
|
|
self.create_partition(client, collection_name, partition_name_2)
|
|
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
|
|
# Step 2: Test Release partition after collection release
|
|
self.release_collection(client, collection_name)
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after collection release, but got {load_state['state']}"
|
|
self.release_partitions(client, collection_name, ["_default"])
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after default partition release, but got {load_state['state']}"
|
|
# Step 3: Load specific partitions
|
|
self.load_partitions(client, collection_name, [partition_name_1])
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after partition load, but got {load_state['state']}"
|
|
# Search should work on loaded partitions
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1])
|
|
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1])
|
|
# Step 4: Test load collection after partition load
|
|
self.load_collection(client, collection_name)
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1, partition_name_2])
|
|
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1, partition_name_2])
|
|
# Step 5: Test edge case - release all partitions individually
|
|
self.release_partitions(client, collection_name, ["_default", partition_name_1, partition_name_2])
|
|
load_state = self.get_load_state(client, collection_name)[0]
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after releasing all partitions, but got {load_state['state']}"
|
|
error_search = {ct.err_code: 101, ct.err_msg: "collection not loaded"}
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit,
|
|
check_task=CheckTasks.err_res, check_items=error_search)
|
|
# Step 6: Test release collection after partition release
|
|
self.release_collection(client, collection_name)
|
|
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after releasing all partitions, but got {load_state['state']}"
|
|
error = {ct.err_code: 101, ct.err_msg: "collection not loaded"}
|
|
self.search(client, collection_name, vectors_to_search, limit=default_limit,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
self.query(client, collection_name, filter=default_search_exp,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
# Step 7: Cleanup
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_collection_after_drop_partition_and_release_another(self):
|
|
"""
|
|
target: test load collection after drop a partition and release another
|
|
method: 1. load collection
|
|
2. drop a partition
|
|
3. release left partition
|
|
4. query on the left partition
|
|
5. load collection
|
|
expected: No exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
partition_name_1 = cf.gen_unique_str("partition1")
|
|
partition_name_2 = cf.gen_unique_str("partition2")
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name_1)
|
|
self.create_partition(client, collection_name, partition_name_2)
|
|
|
|
self.load_collection(client, collection_name)
|
|
self.release_partitions(client, collection_name, [partition_name_1])
|
|
self.drop_partition(client, collection_name, partition_name_1)
|
|
self.release_partitions(client, collection_name, [partition_name_2])
|
|
error = {ct.err_code: 65538, ct.err_msg: 'partition not loaded'}
|
|
self.query(client, collection_name, filter=default_search_exp,
|
|
partition_names=[partition_name_2],
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
self.load_collection(client, collection_name)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_partition_after_drop_partition_and_release_another(self):
|
|
"""
|
|
target: test load partition after drop a partition and release another
|
|
method: 1. load collection
|
|
2. drop a partition
|
|
3. release left partition
|
|
4. load partition
|
|
5. query on the partition
|
|
expected: No exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
partition_name_1 = cf.gen_unique_str("partition1")
|
|
partition_name_2 = cf.gen_unique_str("partition2")
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name_1)
|
|
self.create_partition(client, collection_name, partition_name_2)
|
|
|
|
self.load_collection(client, collection_name)
|
|
self.release_partitions(client, collection_name, [partition_name_1])
|
|
self.drop_partition(client, collection_name, partition_name_1)
|
|
self.release_partitions(client, collection_name, [partition_name_2])
|
|
self.load_partitions(client, collection_name, [partition_name_2])
|
|
self.query(client, collection_name, filter=default_search_exp,
|
|
partition_names=[partition_name_2])
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_another_partition_after_drop_one_partition(self):
|
|
"""
|
|
target: test load another partition after drop a partition
|
|
method: 1. load collection
|
|
2. drop a partition
|
|
3. load another partition
|
|
4. query on the partition
|
|
expected: No exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
partition_name_1 = cf.gen_unique_str("partition1")
|
|
partition_name_2 = cf.gen_unique_str("partition2")
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name_1)
|
|
self.create_partition(client, collection_name, partition_name_2)
|
|
|
|
self.load_collection(client, collection_name)
|
|
self.release_partitions(client, collection_name, [partition_name_1])
|
|
self.drop_partition(client, collection_name, partition_name_1)
|
|
self.load_partitions(client, collection_name, [partition_name_2])
|
|
self.query(client, collection_name, filter=default_search_exp,
|
|
partition_names=[partition_name_2])
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_collection_after_drop_one_partition(self):
|
|
"""
|
|
target: test load collection after drop a partition
|
|
method: 1. load collection
|
|
2. drop a partition
|
|
3. load collection
|
|
4. query on the partition
|
|
expected: No exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
partition_name_1 = cf.gen_unique_str("partition1")
|
|
partition_name_2 = cf.gen_unique_str("partition2")
|
|
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_partition(client, collection_name, partition_name_1)
|
|
self.create_partition(client, collection_name, partition_name_2)
|
|
|
|
self.load_collection(client, collection_name)
|
|
self.release_partitions(client, collection_name, [partition_name_1])
|
|
self.drop_partition(client, collection_name, partition_name_1)
|
|
self.load_collection(client, collection_name)
|
|
|
|
# Query on the remaining partition
|
|
self.query(client, collection_name, filter=default_search_exp,
|
|
partition_names=[partition_name_2])
|
|
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
@pytest.mark.parametrize("vector_type", [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR])
|
|
def test_milvus_client_load_collection_after_index(self, vector_type):
|
|
"""
|
|
target: test load collection after index created
|
|
method: insert data and create index, load collection with correct params
|
|
expected: no error raised
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
if vector_type == DataType.FLOAT_VECTOR:
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
elif vector_type == DataType.BINARY_VECTOR:
|
|
schema.add_field("binary_vector", DataType.BINARY_VECTOR, dim=default_dim)
|
|
self.create_collection(client, collection_name, schema=schema, consistency_level="Strong")
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
|
|
self.insert(client, collection_name, rows)
|
|
self.flush(client, collection_name)
|
|
|
|
index_params = self.prepare_index_params(client)[0]
|
|
if vector_type == DataType.FLOAT_VECTOR:
|
|
index_params.add_index(field_name="vector", index_type="IVF_SQ8", metric_type="L2")
|
|
elif vector_type == DataType.BINARY_VECTOR:
|
|
index_params.add_index(field_name="binary_vector", index_type="BIN_IVF_FLAT", metric_type="JACCARD")
|
|
self.create_index(client, collection_name, index_params)
|
|
self.load_collection(client, collection_name)
|
|
self.release_collection(client, collection_name)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_load_collection_after_load_release(self):
|
|
"""
|
|
target: test load collection after load and release
|
|
method: 1.load and release collection after entities flushed
|
|
2.re-load collection
|
|
expected: No exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create collection
|
|
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
# Insert data
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
self.insert(client, collection_name, rows)
|
|
# Verify entity count
|
|
self.flush(client, collection_name)
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == default_nb
|
|
# Prepare and create index
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
|
|
self.create_index(client, collection_name, index_params)
|
|
# Load, release, and re-load collection
|
|
self.load_collection(client, collection_name)
|
|
self.release_collection(client, collection_name)
|
|
self.load_collection(client, collection_name)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_load_collection_repeatedly(self):
|
|
"""
|
|
target: test load collection repeatedly
|
|
method: load collection twice
|
|
expected: No exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create collection
|
|
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, "vector")
|
|
# Insert data
|
|
rng = np.random.default_rng(seed=19530)
|
|
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
|
|
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
|
|
self.insert(client, collection_name, rows)
|
|
# Verify entity count
|
|
self.flush(client, collection_name)
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == default_nb
|
|
# Prepare and create index
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
|
|
self.create_index(client, collection_name, index_params)
|
|
# Load collection twice (test repeated loading)
|
|
self.load_collection(client, collection_name)
|
|
self.load_collection(client, collection_name)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
class TestMilvusClientDescribeCollectionInvalid(TestMilvusClientV2Base):
|
|
""" Test case of search interface """
|
|
"""
|
|
******************************************************************
|
|
# The following are invalid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
|
|
def test_milvus_client_describe_collection_invalid_collection_name(self, name):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
error = {ct.err_code: 1100,
|
|
ct.err_msg: f"Invalid collection name: {name}. "
|
|
f"the first character of a collection name must be an underscore or letter"}
|
|
self.describe_collection(client, name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_describe_collection_not_existed(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = "nonexisted"
|
|
error = {ct.err_code: 100, ct.err_msg: "can't find collection[database=default][collection=nonexisted]"}
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_describe_collection_deleted_collection(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.drop_collection(client, collection_name)
|
|
error = {ct.err_code: 100, ct.err_msg: f"can't find collection[database=default][collection={collection_name}]"}
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
class TestMilvusClientDescribeCollectionValid(TestMilvusClientV2Base):
|
|
"""
|
|
******************************************************************
|
|
The following cases are used to test `describe_collection` function
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_describe(self):
|
|
"""
|
|
target: test describe collection
|
|
method: create a collection and check its information when describe
|
|
expected: return correct information
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
|
# Expected description structure
|
|
expected_description = {
|
|
'collection_name': collection_name,
|
|
'auto_id': False,
|
|
'num_shards': ct.default_shards_num,
|
|
'description': '',
|
|
'fields': [
|
|
{'field_id': 100, 'name': 'id', 'description': '', 'type': DataType.INT64, 'params': {},
|
|
'is_primary': True},
|
|
{'field_id': 101, 'name': 'vector', 'description': '', 'type': DataType.FLOAT_VECTOR,
|
|
'params': {'dim': default_dim}}
|
|
],
|
|
'functions': [],
|
|
'aliases': [],
|
|
'consistency_level': 0,
|
|
'properties': {},
|
|
'num_partitions': 1,
|
|
'enable_dynamic_field': True
|
|
}
|
|
# Get actual description
|
|
res = self.describe_collection(client, collection_name)[0]
|
|
# Remove dynamic fields that vary between runs (like V1 test)
|
|
assert isinstance(res['collection_id'], int) and isinstance(res['created_timestamp'], int)
|
|
del res['collection_id']
|
|
del res['created_timestamp']
|
|
del res['update_timestamp']
|
|
# Exact comparison
|
|
assert expected_description == res, f"Description mismatch:\nExpected: {expected_description}\nActual: {res}"
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_describe_nullable_default_value(self):
|
|
"""
|
|
target: test describe collection with nullable and default_value fields
|
|
method: create a collection with nullable and default_value fields, then check its information when describe
|
|
expected: return correct information
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create collection with nullable and default_value fields
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("float_field", DataType.FLOAT, nullable=True)
|
|
schema.add_field("varchar_field", DataType.VARCHAR, max_length=65535, default_value="default_string")
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Describe collection and verify nullable and default_value properties
|
|
res = self.describe_collection(client, collection_name)[0]
|
|
# Check fields for nullable and default_value properties
|
|
for field in res["fields"]:
|
|
if field["name"] == "float_field":
|
|
assert field.get("nullable") is True, f"Expected nullable=True for float_field, got {field.get('nullable')}"
|
|
if field["name"] == "varchar_field":
|
|
assert field["default_value"].string_data == "default_string", f"Expected 'default_string', got {field['default_value'].string_data}"
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
class TestMilvusClientHasCollectionValid(TestMilvusClientV2Base):
|
|
""" Test case of has collection interface """
|
|
"""
|
|
******************************************************************
|
|
# The following are valid base cases
|
|
******************************************************************
|
|
"""
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_has_collection_multithread(self):
|
|
"""
|
|
target: test has collection with multi-thread
|
|
method: create collection and use multi-thread to check if collection exists
|
|
expected: all threads should correctly identify that collection exists
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client, collection_name, default_dim)
|
|
threads_num = 4
|
|
threads = []
|
|
|
|
def has():
|
|
result = self.has_collection(client, collection_name)[0]
|
|
assert result == True
|
|
|
|
for i in range(threads_num):
|
|
t = MyThread(target=has, args=())
|
|
threads.append(t)
|
|
t.start()
|
|
time.sleep(0.2)
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Cleanup
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
|
|
class TestMilvusClientHasCollectionInvalid(TestMilvusClientV2Base):
|
|
""" Test case of has collection interface """
|
|
"""
|
|
******************************************************************
|
|
# The following are invalid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#", "a".join("a" for i in range(256))])
|
|
def test_milvus_client_has_collection_invalid_collection_name(self, name):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
if name == "a".join("a" for i in range(256)):
|
|
error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. "
|
|
f"the length of a collection name must be less than 255 characters: "
|
|
f"invalid parameter"}
|
|
else:
|
|
error = {ct.err_code: 1100,
|
|
ct.err_msg: f"Invalid collection name: {name}. "
|
|
f"the first character of a collection name must be an underscore or letter"}
|
|
self.has_collection(client, name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("collection_name", ['', None])
|
|
def test_milvus_client_has_collection_with_empty_or_none_collection_name(self, collection_name):
|
|
"""
|
|
target: test has collection with empty or None collection name
|
|
method: call has_collection with empty string or None as collection name
|
|
expected: raise exception with appropriate error message
|
|
"""
|
|
client = self._client()
|
|
if collection_name is None:
|
|
error = {ct.err_code: -1, ct.err_msg: '`collection_name` value None is illegal'}
|
|
else: # empty string
|
|
error = {ct.err_code: -1, ct.err_msg: '`collection_name` value is illegal'}
|
|
self.has_collection(client, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_has_collection_not_existed(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = "nonexisted"
|
|
result = self.has_collection(client, collection_name)[0]
|
|
assert result == False
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_has_collection_deleted_collection(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.drop_collection(client, collection_name)
|
|
result = self.has_collection(client, collection_name)[0]
|
|
assert result == False
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_has_collection_after_disconnect(self):
|
|
"""
|
|
target: test has collection operation after connection is closed
|
|
method: 1. create collection with client
|
|
2. close the client connection
|
|
3. try to has_collection with disconnected client
|
|
expected: operation should raise appropriate connection error
|
|
"""
|
|
client_temp = self._client(alias="client_has_collection")
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client_temp, collection_name, default_dim)
|
|
self.close(client_temp)
|
|
error = {ct.err_code: 1, ct.err_msg: 'should create connection first'}
|
|
self.has_collection(client_temp, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
class TestMilvusClientListCollection(TestMilvusClientV2Base):
|
|
""" Test case of list collection interface """
|
|
"""
|
|
******************************************************************
|
|
# The following are valid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_milvus_client_list_collections_multi_collections(self):
|
|
"""
|
|
target: test list collections with multiple collections
|
|
method: create multiple collections, assert each collection appears in list_collections result
|
|
expected: all created collections are listed correctly
|
|
"""
|
|
client = self._client()
|
|
collection_num = 50
|
|
collection_names = []
|
|
# Create multiple collections and verify each collection in list_collections
|
|
for i in range(collection_num):
|
|
collection_name = cf.gen_collection_name_by_testcase_name() + f"_{i}"
|
|
collection_names.append(collection_name)
|
|
self.create_collection(client, collection_name, default_dim)
|
|
assert collection_names[i] in self.list_collections(client)[0]
|
|
# Cleanup - drop all created collections
|
|
for collection_name in collection_names:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_list_collections_after_disconnect(self):
|
|
"""
|
|
target: test list collections operation after connection is closed
|
|
method: 1. create collection with client
|
|
2. close the client connection
|
|
3. try to list_collections with disconnected client
|
|
expected: operation should raise appropriate connection error
|
|
"""
|
|
client_temp = self._client(alias="client_list_collections")
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
self.create_collection(client_temp, collection_name, default_dim)
|
|
self.close(client_temp)
|
|
error = {ct.err_code: 999, ct.err_msg: 'should create connection first'}
|
|
self.list_collections(client_temp,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_list_collections_multithread(self):
|
|
"""
|
|
target: test list collections with multi-threads
|
|
method: create collection and use multi-threads to list collections
|
|
expected: all threads should correctly identify that collection exists in list
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create collection first
|
|
self.create_collection(client, collection_name, default_dim)
|
|
threads_num = 10
|
|
threads = []
|
|
def _list():
|
|
collections_list = self.list_collections(client)[0]
|
|
assert collection_name in collections_list
|
|
|
|
for i in range(threads_num):
|
|
t = MyThread(target=_list)
|
|
threads.append(t)
|
|
t.start()
|
|
time.sleep(0.2)
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Cleanup
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
|
|
class TestMilvusClientRenameCollectionInValid(TestMilvusClientV2Base):
|
|
""" Test case of rename collection interface """
|
|
|
|
"""
|
|
******************************************************************
|
|
# The following are valid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
|
|
def test_milvus_client_rename_collection_invalid_collection_name(self, name):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
error = {ct.err_code: 100, ct.err_msg: f"collection not found[database=1][collection={name}]"}
|
|
self.rename_collection(client, name, "new_collection",
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_rename_collection_not_existed_collection(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = "nonexisted"
|
|
error = {ct.err_code: 100, ct.err_msg: f"collection not found[database=1][collection={collection_name}]"}
|
|
self.rename_collection(client, collection_name, "new_collection",
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_rename_collection_duplicated_collection(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
error = {ct.err_code: 65535, ct.err_msg: f"duplicated new collection name default:{collection_name} "
|
|
f"with other collection name or alias"}
|
|
self.rename_collection(client, collection_name, collection_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_rename_deleted_collection(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.drop_collection(client, collection_name)
|
|
error = {ct.err_code: 100, ct.err_msg: f"{collection_name}: collection not found[collection=default]"}
|
|
self.rename_collection(client, collection_name, "new_collection",
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
class TestMilvusClientRenameCollectionValid(TestMilvusClientV2Base):
|
|
""" Test case of rename collection interface """
|
|
|
|
"""
|
|
******************************************************************
|
|
# The following are valid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_rename_collection_multiple_times(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: create collection with default schema, index, and load successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 2. rename with invalid new_name
|
|
new_name = "new_name_rename"
|
|
self.create_collection(client, collection_name, default_dim)
|
|
times = 3
|
|
for _ in range(times):
|
|
self.rename_collection(client, collection_name, new_name)
|
|
self.rename_collection(client, new_name, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_rename_collection_deleted_collection(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
another_collection_name = cf.gen_unique_str("another_collection")
|
|
# 1. create 2 collections
|
|
self.create_collection(client, collection_name, default_dim)
|
|
self.create_collection(client, another_collection_name, default_dim)
|
|
# 2. drop one collection
|
|
self.drop_collection(client, another_collection_name)
|
|
# 3. rename to dropped collection
|
|
self.rename_collection(client, collection_name, another_collection_name)
|
|
|
|
|
|
class TestMilvusClientUsingDatabaseInvalid(TestMilvusClientV2Base):
|
|
""" Test case of using database interface """
|
|
|
|
"""
|
|
******************************************************************
|
|
# The following are invalid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.skip(reason="pymilvus issue 1900")
|
|
@pytest.mark.parametrize("db_name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
|
|
def test_milvus_client_using_database_not_exist_db_name(self, db_name):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
# db_name = cf.gen_unique_str("nonexisted")
|
|
error = {ct.err_code: 999, ct.err_msg: f"database not found[database={db_name}]"}
|
|
self.using_database(client, db_name,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.skip(reason="# this case is dup to using a non exist db name, try to add one for create database")
|
|
def test_milvus_client_using_database_db_name_over_max_length(self):
|
|
"""
|
|
target: test fast create collection normal case
|
|
method: create collection
|
|
expected: drop successfully
|
|
"""
|
|
pass
|
|
|
|
class TestMilvusClientCollectionPropertiesInvalid(TestMilvusClientV2Base):
|
|
""" Test case of alter/drop collection properties """
|
|
"""
|
|
******************************************************************
|
|
# The following are invalid base cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("alter_name", ["%$#", "test", " "])
|
|
def test_milvus_client_alter_collection_properties_invalid_collection_name(self, alter_name):
|
|
"""
|
|
target: test alter collection properties with invalid collection name
|
|
method: alter collection properties with non-existent collection name
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
# alter collection properties
|
|
properties = {'mmap.enabled': True}
|
|
error = {ct.err_code: 100, ct.err_msg: f"collection not found[database=default][collection={alter_name}]"}
|
|
self.alter_collection_properties(client, alter_name, properties,
|
|
check_task=CheckTasks.err_res,
|
|
check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("properties", [""])
|
|
def test_milvus_client_alter_collection_properties_invalid_properties(self, properties):
|
|
"""
|
|
target: test alter collection properties with invalid properties
|
|
method: alter collection properties with invalid properties
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim, id_type="string", max_length=ct.default_length)
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"dim": default_dim,
|
|
"consistency_level": 0})
|
|
error = {ct.err_code: 1, ct.err_msg: f"`properties` value {properties} is illegal"}
|
|
self.alter_collection_properties(client, collection_name, properties,
|
|
check_task=CheckTasks.err_res,
|
|
check_items=error)
|
|
|
|
self.drop_collection(client, collection_name)
|
|
|
|
#TODO properties with non-existent params
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("drop_name", ["%$#", "test", " "])
|
|
def test_milvus_client_drop_collection_properties_invalid_collection_name(self, drop_name):
|
|
"""
|
|
target: test drop collection properties with invalid collection name
|
|
method: drop collection properties with non-existent collection name
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
# drop collection properties
|
|
properties = {'mmap.enabled': True}
|
|
error = {ct.err_code: 100, ct.err_msg: f"collection not found[database=default][collection={drop_name}]"}
|
|
self.drop_collection_properties(client, drop_name, properties,
|
|
check_task=CheckTasks.err_res,
|
|
check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("property_keys", ["", {}, []])
|
|
def test_milvus_client_drop_collection_properties_invalid_properties(self, property_keys):
|
|
"""
|
|
target: test drop collection properties with invalid properties
|
|
method: drop collection properties with invalid properties
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim, id_type="string", max_length=ct.default_length)
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"dim": default_dim,
|
|
"consistency_level": 0})
|
|
error = {ct.err_code: 65535, ct.err_msg: f"The collection properties to alter and keys to delete must not be empty at the same time"}
|
|
self.drop_collection_properties(client, collection_name, property_keys,
|
|
check_task=CheckTasks.err_res,
|
|
check_items=error)
|
|
|
|
self.drop_collection(client, collection_name)
|
|
|
|
#TODO properties with non-existent params
|
|
|
|
|
|
class TestMilvusClientCollectionPropertiesValid(TestMilvusClientV2Base):
|
|
""" Test case of alter/drop collection properties """
|
|
|
|
"""
|
|
******************************************************************
|
|
# The following are valid base cases
|
|
******************************************************************
|
|
"""
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_alter_collection_properties(self):
|
|
"""
|
|
target: test alter collection
|
|
method: alter collection
|
|
expected: alter successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
self.using_database(client, "default")
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
self.release_collection(client, collection_name)
|
|
properties = {"mmap.enabled": True}
|
|
self.alter_collection_properties(client, collection_name, properties)
|
|
describe = self.describe_collection(client, collection_name)[0].get("properties")
|
|
assert describe["mmap.enabled"] == 'True'
|
|
self.release_collection(client, collection_name)
|
|
properties = {"mmap.enabled": False}
|
|
self.alter_collection_properties(client, collection_name, properties)
|
|
describe = self.describe_collection(client, collection_name)[0].get("properties")
|
|
assert describe["mmap.enabled"] == 'False'
|
|
#TODO add case that confirm the parameter is actually valid
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_drop_collection_properties(self):
|
|
"""
|
|
target: test drop collection
|
|
method: drop collection
|
|
expected: drop successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_unique_str(prefix)
|
|
self.using_database(client, "default")
|
|
# 1. create collection
|
|
self.create_collection(client, collection_name, default_dim)
|
|
collections = self.list_collections(client)[0]
|
|
assert collection_name in collections
|
|
self.release_collection(client, collection_name)
|
|
properties = {"mmap.enabled": True}
|
|
self.alter_collection_properties(client, collection_name, properties)
|
|
describe = self.describe_collection(client, collection_name)[0].get("properties")
|
|
assert describe["mmap.enabled"] == 'True'
|
|
property_keys = ["mmap.enabled"]
|
|
self.drop_collection_properties(client, collection_name, property_keys)
|
|
describe = self.describe_collection(client, collection_name)[0].get("properties")
|
|
assert "mmap.enabled" not in describe
|
|
#TODO add case that confirm the parameter is actually invalid
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
class TestMilvusClientCollectionNullInvalid(TestMilvusClientV2Base):
|
|
""" Test case of collection interface """
|
|
|
|
"""
|
|
******************************************************************
|
|
# The followings are invalid cases
|
|
******************************************************************
|
|
"""
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("vector_type", ct.all_float_vector_dtypes)
|
|
def test_milvus_client_collection_set_nullable_on_pk_field(self, vector_type):
|
|
"""
|
|
target: test create collection with nullable=True on primary key field
|
|
method: create collection schema with primary key field set as nullable
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with nullable primary key field
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False, nullable=True)
|
|
if vector_type == DataType.SPARSE_FLOAT_VECTOR:
|
|
schema.add_field("vector", vector_type)
|
|
else:
|
|
schema.add_field("vector", vector_type, dim=default_dim)
|
|
error = {ct.err_code: 1100, ct.err_msg: "primary field not support null"}
|
|
self.create_collection(client, collection_name, schema=schema, check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("vector_type", ct.all_float_vector_dtypes)
|
|
def test_milvus_client_collection_set_nullable_on_vector_field(self, vector_type):
|
|
"""
|
|
target: test create collection with nullable=True on vector field
|
|
method: create collection schema with vector field set as nullable
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with nullable vector field
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
if vector_type == DataType.SPARSE_FLOAT_VECTOR:
|
|
schema.add_field("vector", vector_type, nullable=True)
|
|
else:
|
|
schema.add_field("vector", vector_type, dim=default_dim, nullable=True)
|
|
error = {ct.err_code: 1100, ct.err_msg: "vector type not support null"}
|
|
self.create_collection(client, collection_name, schema=schema, check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_collection_set_nullable_on_partition_key_field(self):
|
|
"""
|
|
target: test create collection with nullable=True on partition key field
|
|
method: create collection schema with partition key field set as nullable
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with nullable partition key field
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("partition_key", DataType.VARCHAR, max_length=64, is_partition_key=True, nullable=True)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
error = {ct.err_code: 1100, ct.err_msg: "partition key field not support nullable: invalid parameter"}
|
|
self.create_collection(client, collection_name, schema=schema, check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
|
|
class TestMilvusClientCollectionDefaultValueInvalid(TestMilvusClientV2Base):
|
|
""" Test case of collection interface """
|
|
|
|
"""
|
|
******************************************************************
|
|
# The followings are invalid cases
|
|
******************************************************************
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("vector_type", ct.all_float_vector_dtypes)
|
|
def test_milvus_client_create_collection_default_value_on_pk_field(self, vector_type):
|
|
"""
|
|
target: test create collection with set default value on pk field
|
|
method: create collection with default value on primary key field
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with primary key field that has default value
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False, default_value=10)
|
|
if vector_type == DataType.SPARSE_FLOAT_VECTOR:
|
|
schema.add_field("vector", vector_type)
|
|
else:
|
|
schema.add_field("vector", vector_type, dim=default_dim)
|
|
error = {ct.err_code: 1100, ct.err_msg: "primary field not support default_value"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("vector_type", ct.all_float_vector_dtypes)
|
|
def test_milvus_client_create_collection_default_value_on_vector_field(self, vector_type):
|
|
"""
|
|
target: test create collection with set default value on vector field
|
|
method: create collection with default value on vector field
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with vector field that has default value
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
if vector_type == DataType.SPARSE_FLOAT_VECTOR:
|
|
schema.add_field("vector", vector_type, default_value=10)
|
|
else:
|
|
schema.add_field("vector", vector_type, dim=default_dim, default_value=10)
|
|
error = {ct.err_code: 1100, ct.err_msg: f"type not support default_value"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("scalar_type", ["JSON", "Array"])
|
|
def test_milvus_client_create_collection_default_value_on_not_support_scalar_field(self, scalar_type):
|
|
"""
|
|
target: test create collection with set default value on not supported scalar field
|
|
method: create collection with default value on json and array field
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with scalar field that has default value
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
# Add scalar field with default value based on type
|
|
if scalar_type == "JSON":
|
|
schema.add_field("json_field", DataType.JSON, default_value=10)
|
|
elif scalar_type == "Array":
|
|
schema.add_field("array_field", DataType.ARRAY, element_type=DataType.INT64,
|
|
max_capacity=ct.default_max_capacity, default_value=10)
|
|
# Add vector field
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
|
|
error = {ct.err_code: 1100, ct.err_msg: f"type not support default_value, type:{scalar_type}"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("default_value", ["abc", 9.09, 1, False])
|
|
@pytest.mark.parametrize("field_type", [DataType.INT8, DataType.FLOAT])
|
|
def test_milvus_client_create_collection_non_match_default_value(self, default_value, field_type):
|
|
"""
|
|
target: test create collection with set data type not matched default value
|
|
method: create collection with data type not matched default value
|
|
expected: raise exception
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with field that has mismatched default value type
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Add field with mismatched default value type based on field_type
|
|
if field_type == DataType.INT8:
|
|
schema.add_field("int8_field", DataType.INT8, default_value=default_value)
|
|
field_name = "int8_field"
|
|
field_type_str = "Int8"
|
|
elif field_type == DataType.FLOAT:
|
|
schema.add_field("float_field", DataType.FLOAT, default_value=default_value)
|
|
field_name = "float_field"
|
|
field_type_str = "Float"
|
|
# Determine expected error message based on default_value type
|
|
if isinstance(default_value, str):
|
|
expected_type = "DataType_VarChar"
|
|
elif isinstance(default_value, bool):
|
|
expected_type = "DataType_Bool"
|
|
elif isinstance(default_value, float):
|
|
expected_type = "DataType_Double"
|
|
elif isinstance(default_value, int):
|
|
expected_type = "DataType_Int64"
|
|
error = {ct.err_code: 1100,
|
|
ct.err_msg: f"type ({field_type_str}) of field ({field_name}) is not equal to the type({expected_type}) of default_value"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("nullable", [True, False])
|
|
def test_milvus_client_create_collection_default_value_none(self, nullable):
|
|
"""
|
|
target: test create field with None as default value when nullable is False or True
|
|
method: create collection with default_value=None on one field
|
|
expected: 1. raise exception when nullable=False and default_value=None
|
|
2. create field successfully when nullable=True and default_value=None
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
|
|
if nullable:
|
|
schema.add_field("int8_field", DataType.INT8, nullable=nullable, default_value=None)
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
else:
|
|
error = {ct.err_code: 1,
|
|
ct.err_msg: "Default value cannot be None for a field that is defined as nullable == false"}
|
|
self.add_field(schema, "int8_field", DataType.INT8, nullable=nullable, default_value=None,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("default_value", ["abc"])
|
|
def test_milvus_client_create_collection_with_invalid_default_value_string(self, default_value):
|
|
"""
|
|
target: Test create collection with invalid default_value for string field
|
|
method: Create collection with string field where default_value exceeds max_length
|
|
expected: Raise exception with appropriate error message
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
max_length = 2
|
|
# Create schema with string field having default_value longer than max_length
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("pk", DataType.INT64, is_primary=True)
|
|
schema.add_field(ct.default_float_vec_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
|
|
schema.add_field("string_field", DataType.VARCHAR, max_length=max_length, default_value=default_value)
|
|
error = {ct.err_code: 1100, ct.err_msg: f"the length ({len(default_value)}) of string exceeds max length ({max_length}): "
|
|
f"invalid parameter[expected=valid length string][actual=string length exceeds max length]"}
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
|
|
class TestMilvusClientCollectionDefaultValueValid(TestMilvusClientV2Base):
|
|
""" Test case of collection interface """
|
|
|
|
"""
|
|
******************************************************************
|
|
# The followings are valid cases
|
|
******************************************************************
|
|
"""
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_create_collection_default_value_twice(self):
|
|
"""
|
|
target: test create collection with set default value twice
|
|
method: create collection with default value twice
|
|
expected: successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with float field that has default value
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("float_field", DataType.FLOAT, default_value=numpy.float32(10.0))
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Create collection twice with same schema and name
|
|
collection_1 = self.create_collection(client, collection_name, schema=schema)[0]
|
|
collection_2 = self.create_collection(client, collection_name, schema=schema)[0]
|
|
# Verify both collections are the same
|
|
assert collection_1 == collection_2
|
|
# Clean up: drop the collection
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_milvus_client_create_collection_none_twice(self):
|
|
"""
|
|
target: test create collection with nullable field twice
|
|
method: create collection with nullable field twice
|
|
expected: successfully
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with nullable float field
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
|
|
schema.add_field("float_field", DataType.FLOAT, nullable=True)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Create collection twice with same schema and name
|
|
collection_1 = self.create_collection(client, collection_name, schema=schema)[0]
|
|
collection_2 = self.create_collection(client, collection_name, schema=schema)[0]
|
|
# Verify both collections are the same
|
|
assert collection_1 == collection_2
|
|
# Clean up: drop the collection
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("auto_id", [True, False])
|
|
def test_milvus_client_create_collection_using_default_value(self, auto_id):
|
|
"""
|
|
target: Test create collection with default_value fields
|
|
method: Create a schema with various fields using default values
|
|
expected: Collection is created successfully with default values
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
schema = self.create_schema(client, enable_dynamic_field=False, auto_id=auto_id)[0]
|
|
schema.add_field("pk", DataType.INT64, is_primary=True)
|
|
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
|
|
# Add various scalar fields with default values
|
|
schema.add_field(ct.default_int8_field_name, DataType.INT8, default_value=numpy.int8(8))
|
|
schema.add_field(ct.default_int16_field_name, DataType.INT16, default_value=numpy.int16(16))
|
|
schema.add_field(ct.default_int32_field_name, DataType.INT32, default_value=numpy.int32(32))
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, default_value=numpy.int64(64))
|
|
schema.add_field(ct.default_float_field_name, DataType.FLOAT, default_value=numpy.float32(3.14))
|
|
schema.add_field(ct.default_double_field_name, DataType.DOUBLE, default_value=numpy.double(3.1415))
|
|
schema.add_field(ct.default_bool_field_name, DataType.BOOL, default_value=False)
|
|
schema.add_field(ct.default_string_field_name, DataType.VARCHAR, max_length=100, default_value="abc")
|
|
# Create collection with default value fields
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
self.describe_collection(client, collection_name,
|
|
check_task=CheckTasks.check_describe_collection_property,
|
|
check_items={"collection_name": collection_name,
|
|
"auto_id": auto_id,
|
|
"enable_dynamic_field": False,
|
|
"schema": schema})
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
class TestMilvusClientCollectionCountIP(TestMilvusClientV2Base):
|
|
"""
|
|
Test collection count functionality with different entity counts
|
|
params means different nb, the nb value may trigger merge, or not
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("insert_count", [1, 1000, 2001])
|
|
def test_milvus_client_collection_count_after_index_created(self, insert_count):
|
|
"""
|
|
target: test count_entities, after index have been created
|
|
method: add vectors in db, and create index, then calling get_collection_stats with correct params
|
|
expected: count_entities returns correct count
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create collection
|
|
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
|
|
self.release_collection(client, collection_name)
|
|
self.drop_index(client, collection_name, default_vector_field_name)
|
|
# Prepare and insert data
|
|
schema_info = self.describe_collection(client, collection_name)[0]
|
|
rows = cf.gen_row_data_by_schema(nb=insert_count, schema=schema_info)
|
|
self.insert(client, collection_name, rows)
|
|
self.flush(client, collection_name)
|
|
# Create index
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name=default_vector_field_name, index_type="HNSW", metric_type="L2")
|
|
self.create_index(client, collection_name, index_params)
|
|
# Verify entity count
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == insert_count
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
class TestMilvusClientCollectionCountBinary(TestMilvusClientV2Base):
|
|
"""
|
|
Test collection count functionality with binary vectors
|
|
Params means different nb, the nb value may trigger merge, or not
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("insert_count", [8, 1000, 2001])
|
|
def test_milvus_client_collection_count_after_index_created_binary(self, insert_count):
|
|
"""
|
|
target: Test collection count after binary index is created
|
|
method: Create binary collection, insert data, create index, then verify count
|
|
expected: Collection count equals entities count just inserted
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create binary collection schema
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
schema.add_field(ct.default_binary_vec_field_name, DataType.BINARY_VECTOR, dim=default_dim)
|
|
# Create collection
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Generate and insert binary data
|
|
data = cf.gen_row_data_by_schema(nb=insert_count, schema=schema)
|
|
self.insert(client, collection_name, data)
|
|
self.flush(client, collection_name)
|
|
# Create index
|
|
index_params = self.prepare_index_params(client)[0]
|
|
index_params.add_index(field_name=ct.default_binary_vec_field_name, index_type="BIN_IVF_FLAT", metric_type="JACCARD")
|
|
self.create_index(client, collection_name, index_params)
|
|
# Verify entity count
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == insert_count
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("auto_id", [True, False])
|
|
def test_milvus_client_binary_collection_with_min_dim(self, auto_id):
|
|
"""
|
|
target: Test binary collection when dim=1 (invalid for binary vectors)
|
|
method: Create collection with binary vector field having dim=1
|
|
expected: Raise exception with appropriate error message
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create schema with invalid binary vector dimension
|
|
schema = self.create_schema(client, enable_dynamic_field=False, auto_id=auto_id)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
# Try to add binary vector field with invalid dimension
|
|
error = {ct.err_code: 1,
|
|
ct.err_msg: f"invalid dimension: {ct.min_dim} of field {ct.default_binary_vec_field_name}. "
|
|
f"binary vector dimension should be multiple of 8."}
|
|
schema.add_field(ct.default_binary_vec_field_name, DataType.BINARY_VECTOR, dim=ct.min_dim)
|
|
# Try to create collection
|
|
self.create_collection(client, collection_name, schema=schema,
|
|
check_task=CheckTasks.err_res, check_items=error)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_count_no_entities(self):
|
|
"""
|
|
target: Test collection count when collection is empty
|
|
method: Create binary collection with binary vector field but insert no data
|
|
expected: The count should be equal to 0
|
|
"""
|
|
client = self._client()
|
|
collection_name = cf.gen_collection_name_by_testcase_name()
|
|
# Create binary collection schema
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
schema.add_field(ct.default_binary_vec_field_name, DataType.BINARY_VECTOR, dim=default_dim)
|
|
# Create collection without inserting any data
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Verify entity count is 0
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == 0
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|
|
class TestMilvusClientCollectionMultiCollections(TestMilvusClientV2Base):
|
|
"""
|
|
Test collection count functionality with multiple collections
|
|
Params means different nb, the nb value may trigger merge, or not
|
|
"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("insert_count", [1, 1000, 2001])
|
|
def test_milvus_client_collection_count_multi_collections_l2(self, insert_count):
|
|
"""
|
|
target: Test collection rows_count with multiple float vector collections (L2 metric)
|
|
method: Create multiple collections, insert entities, and verify count for each
|
|
expected: The count equals the length of entities for each collection
|
|
"""
|
|
client = self._client()
|
|
collection_list = []
|
|
collection_num = 10
|
|
# Create multiple collections and insert data
|
|
for i in range(collection_num):
|
|
collection_name = cf.gen_collection_name_by_testcase_name() + f"_{i}"
|
|
self.create_collection(client, collection_name, default_dim)
|
|
schema_info = self.describe_collection(client, collection_name)[0]
|
|
data = cf.gen_row_data_by_schema(nb=insert_count, schema=schema_info)
|
|
self.insert(client, collection_name, data)
|
|
self.flush(client, collection_name)
|
|
collection_list.append(collection_name)
|
|
# Verify count for each collection
|
|
for collection_name in collection_list:
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == insert_count
|
|
# Cleanup
|
|
for collection_name in collection_list:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("insert_count", [1, 1000, 2001])
|
|
def test_milvus_client_collection_count_multi_collections_binary(self, insert_count):
|
|
"""
|
|
target: Test collection rows_count with multiple binary vector collections (JACCARD metric)
|
|
method: Create multiple binary collections, insert entities, and verify count for each
|
|
expected: The count equals the length of entities for each collection
|
|
"""
|
|
client = self._client()
|
|
collection_list = []
|
|
collection_num = 20
|
|
# Create multiple binary collections and insert data
|
|
for i in range(collection_num):
|
|
collection_name = cf.gen_collection_name_by_testcase_name() + f"_{i}"
|
|
# Create binary collection schema
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
schema.add_field(ct.default_binary_vec_field_name, DataType.BINARY_VECTOR, dim=default_dim)
|
|
# Create collection
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Generate and insert binary data
|
|
data = cf.gen_row_data_by_schema(nb=insert_count, schema=schema)
|
|
self.insert(client, collection_name, data)
|
|
self.flush(client, collection_name)
|
|
collection_list.append(collection_name)
|
|
# Verify count for each collection
|
|
for collection_name in collection_list:
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == insert_count
|
|
# Cleanup
|
|
for collection_name in collection_list:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_milvus_client_collection_count_multi_collections_mix(self):
|
|
"""
|
|
target: Test collection rows_count with mixed float and binary vector collections
|
|
method: Create both float and binary collections, insert entities, and verify count for each
|
|
expected: The count equals the length of entities for each collection
|
|
"""
|
|
client = self._client()
|
|
collection_list = []
|
|
collection_num = 20
|
|
insert_count = ct.default_nb
|
|
# Create half float vector collections and half binary vector collections
|
|
for i in range(0, int(collection_num / 2)):
|
|
# Create float vector collection
|
|
collection_name = cf.gen_collection_name_by_testcase_name() + f"_float_{i}"
|
|
self.create_collection(client, collection_name, default_dim)
|
|
schema_info = self.describe_collection(client, collection_name)[0]
|
|
data = cf.gen_row_data_by_schema(nb=insert_count, schema=schema_info)
|
|
self.insert(client, collection_name, data)
|
|
self.flush(client, collection_name)
|
|
collection_list.append(collection_name)
|
|
for i in range(int(collection_num / 2), collection_num):
|
|
# Create binary vector collection
|
|
collection_name = cf.gen_collection_name_by_testcase_name() + f"_binary_{i}"
|
|
schema = self.create_schema(client, enable_dynamic_field=False)[0]
|
|
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True)
|
|
schema.add_field(ct.default_binary_vec_field_name, DataType.BINARY_VECTOR, dim=default_dim)
|
|
self.create_collection(client, collection_name, schema=schema)
|
|
# Generate and insert binary data
|
|
data = cf.gen_row_data_by_schema(nb=insert_count, schema=schema)
|
|
self.insert(client, collection_name, data)
|
|
self.flush(client, collection_name)
|
|
collection_list.append(collection_name)
|
|
# Verify count for each collection
|
|
for collection_name in collection_list:
|
|
stats = self.get_collection_stats(client, collection_name)[0]
|
|
assert stats['row_count'] == insert_count
|
|
# Cleanup
|
|
for collection_name in collection_list:
|
|
self.drop_collection(client, collection_name)
|
|
|
|
|