diff --git a/tests/python_client/base/collection_wrapper.py b/tests/python_client/base/collection_wrapper.py index 9998b3e80c..9f26dc39a7 100644 --- a/tests/python_client/base/collection_wrapper.py +++ b/tests/python_client/base/collection_wrapper.py @@ -310,6 +310,14 @@ class ApiCollectionWrapper: check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result + @trace() + def upsert(self, data, partition_name=None, timeout=None, check_task=None, check_items=None, **kwargs): + timeout = TIMEOUT if timeout is None else timeout + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.collection.upsert, data, partition_name, timeout], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + @trace() def compact(self, timeout=None, check_task=None, check_items=None, **kwargs): timeout = TIMEOUT if timeout is None else timeout diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 3dff79e665..84f646c510 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -238,6 +238,20 @@ def gen_default_dataframe_data(nb=ct.default_nb, dim=ct.default_dim, start=0): return df +def gen_default_data_for_upsert(nb=ct.default_nb, dim=ct.default_dim, start=0, size=10000): + int_values = pd.Series(data=[i for i in range(start, start + nb)]) + float_values = pd.Series(data=[np.float32(i + size) for i in range(start, start + nb)], dtype="float32") + string_values = pd.Series(data=[str(i + size) for i in range(start, start + nb)], dtype="string") + float_vec_values = gen_vectors(nb, dim) + df = pd.DataFrame({ + ct.default_int64_field_name: int_values, + ct.default_float_field_name: float_values, + ct.default_string_field_name: string_values, + ct.default_float_vec_field_name: float_vec_values + }) + return df, float_values + + def gen_dataframe_multi_vec_fields(vec_fields, nb=ct.default_nb): """ gen dataframe data for fields: int64, float, float_vec and vec_fields @@ -845,6 +859,7 @@ def gen_grant_list(collection_name): {"object": "User", "object_name": "*", "privilege": "SelectUser"}] return grant_list + def install_milvus_operator_specific_config(namespace, milvus_mode, release_name, image, rate_limit_enable, collection_rate_limit): """ diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index fdc5a2f276..0f50eaff25 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -12,7 +12,7 @@ allure-pytest==2.7.0 pytest-print==0.2.1 pytest-level==0.1.1 pytest-xdist==2.5.0 -pymilvus==2.3.0.dev38 +pymilvus==2.3.0.dev45 pytest-rerunfailures==9.1.1 git+https://github.com/Projectplace/pytest-tags ndg-httpsclient diff --git a/tests/python_client/testcases/test_insert.py b/tests/python_client/testcases/test_insert.py index e7b6d16a83..6f420a8d5e 100644 --- a/tests/python_client/testcases/test_insert.py +++ b/tests/python_client/testcases/test_insert.py @@ -15,10 +15,12 @@ from common import common_type as ct from common.common_type import CaseLabel, CheckTasks prefix = "insert" +pre_upsert = "upsert" exp_name = "name" exp_schema = "schema" exp_num = "num_entities" exp_primary = "primary" +default_float_name = ct.default_float_field_name default_schema = cf.gen_default_collection_schema() default_binary_schema = cf.gen_default_binary_collection_schema() default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} @@ -1285,3 +1287,249 @@ class TestInsertString(TestcaseBase): data[2] = [""for _ in range(nb)] collection_w.insert(data) assert collection_w.num_entities == nb + + +class TestUpsertValid(TestcaseBase): + """ Valid test case of Upsert interface """ + + @pytest.mark.tags(CaseLabel.L1) + def test_upsert_data_pk_not_exist(self): + """ + target: test upsert with collection has no data + method: 1. create a collection with no initialized data + 2. upsert data + expected: upsert run normally as inert + """ + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + data = cf.gen_default_dataframe_data() + collection_w.upsert(data=data) + assert collection_w.num_entities == ct.default_nb + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("start", [0, 1500, 2500, 3500]) + def test_upsert_data_pk_exist(self, start): + """ + target: test upsert data and collection pk exists + method: 1. create a collection and insert data + 2. upsert data whose pk exists + expected: upsert succeed + """ + upsert_nb = 1000 + collection_w = self.init_collection_general(pre_upsert, True)[0] + upsert_data, float_values = cf.gen_default_data_for_upsert(upsert_nb, start=start) + collection_w.upsert(data=upsert_data) + exp = f"int64 >= {start} && int64 <= {upsert_nb + start}" + res = collection_w.query(exp, output_fields=[default_float_name])[0] + assert [res[i][default_float_name] for i in range(upsert_nb)] == float_values.to_list() + + @pytest.mark.tags(CaseLabel.L2) + def test_upsert_with_primary_key_string(self): + """ + target: test upsert with string primary key + method: 1. create a collection with pk string + 2. insert data + 3. upsert data with ' ' before or after string + expected: raise no exception + """ + c_name = cf.gen_unique_str(pre_upsert) + fields = [cf.gen_string_field(), cf.gen_float_vec_field(dim=ct.default_dim)] + schema = cf.gen_collection_schema(fields=fields, primary_field=ct.default_string_field_name) + collection_w = self.init_collection_wrap(name=c_name, schema=schema) + vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(2)] + collection_w.insert([["a", "b"], vectors]) + collection_w.upsert([[" a", "b "], vectors]) + assert collection_w.num_entities == 4 + + @pytest.mark.tags(CaseLabel.L1) + def test_upsert_same_with_inserted_data(self): + """ + target: test upsert with data same with collection inserted data + method: 1. create a collection and insert data + 2. upsert data same with inserted + 3. check the update data number + expected: upsert successfully + """ + upsert_nb = 1000 + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + data = cf.gen_default_dataframe_data() + collection_w.insert(data=data) + upsert_data = data[:upsert_nb] + res = collection_w.upsert(data=upsert_data)[0] + assert res.insert_count == upsert_nb, res.delete_count == upsert_nb + + @pytest.mark.tags(CaseLabel.L2) + def test_upsert_data_is_none(self): + """ + target: test upsert with data=None + method: 1. create a collection + 2. insert data + 3. upsert data=None + expected: raise no exception + """ + collection_w = self.init_collection_general(pre_upsert, insert_data=True, is_index=False)[0] + assert collection_w.num_entities == ct.default_nb + collection_w.upsert(data=None) + assert collection_w.num_entities == ct.default_nb + + @pytest.mark.tags(CaseLabel.L1) + def test_upsert_in_specific_partition(self): + """ + target: test upsert in specific partition + method: 1. create a collection and 2 partitions + 2. insert data + 3. upsert in the given partition + expected: raise no exception + """ + upsert_nb = 10 + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + collection_w.create_partition("partition_1") + collection_w.create_partition("partition_2") + cf.insert_data(collection_w) + collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + collection_w.load() + data, float_values = cf.gen_default_data_for_upsert(upsert_nb) + collection_w.upsert(data=data, partition_name="partition_2") + res = collection_w.query("int64 >= 0 && int64<=20", output_fields=[default_float_name], + partition_names=["partition_2"])[0] + assert [res[i][default_float_name] for i in range(upsert_nb)] == float_values.to_list() + + +class TestUpsertInvalid(TestcaseBase): + """ Invalid test case of Upsert interface """ + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("data", ct.get_invalid_strs[:12]) + def test_upsert_non_data_type(self, data): + """ + target: test upsert with invalid data type + method: upsert data type string, set, number, float... + expected: raise exception + """ + if data is None: + pytest.skip("data=None is valid") + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + error = {ct.err_code: 1, ct.err_msg: "The fields don't match with schema fields, expected: " + "['int64', 'float', 'varchar', 'float_vector']"} + collection_w.upsert(data=data, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + def test_upsert_pk_type_invalid(self): + """ + target: test upsert with invalid pk type + method: upsert data type string, float... + expected: raise exception + """ + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + data = [['a', 1.5], [np.float32(i) for i in range(2)], [str(i) for i in range(2)], + cf.gen_vectors(2, ct.default_dim)] + error = {ct.err_code: 1, ct.err_msg: "The data type of field int64 doesn't match, " + "expected: INT64, got VARCHAR"} + collection_w.upsert(data=data, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + def test_upsert_data_unmatch(self): + """ + target: test upsert with unmatched data type + method: 1. create a collection with default schema [int, float, string, vector] + 2. upsert with data [int, string, float, vector] + expected: raise exception + """ + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + vector = [random.random() for _ in range(ct.default_dim)] + data = [1, "a", 2.0, vector] + error = {ct.err_code: 1, ct.err_msg: "The fields don't match with schema fields, " + "expected: ['int64', 'float', 'varchar', 'float_vector']"} + collection_w.upsert(data=[data], check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("vector", [[], [1.0, 2.0], "a", 1.0, None]) + def test_upsert_vector_unmatch(self, vector): + """ + target: test upsert with unmatched data vector + method: 1. create a collection with dim=128 + 2. upsert with vector dim unmatch + expected: raise exception + """ + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + data = [2.0, "a", vector] + error = {ct.err_code: 1, ct.err_msg: "The fields don't match with schema fields, " + "expected: ['int64', 'float', 'varchar', 'float_vector']"} + collection_w.upsert(data=[data], check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.skip(reason="issue #22499") + @pytest.mark.parametrize("partition_name", ct.get_invalid_strs) + def test_upsert_partition_name_invalid(self, partition_name): + """ + target: test upsert partition name invalid + method: 1. create a collection with partitions + 2. upsert with invalid partition name + expected: raise exception + """ + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + p_name = cf.gen_unique_str('partition_') + collection_w.create_partition(p_name) + cf.insert_data(collection_w) + data = cf.gen_default_dataframe_data(nb=100) + error = {ct.err_code: 1, ct.err_msg: "The type of partition_name should be .."} + collection_w.upsert(data=data, partition_name=partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.skip(reason="issue #22499") + def test_upsert_partition_name_nonexistent(self): + """ + target: test upsert partition name nonexistent + method: 1. create a collection + 2. upsert with nonexistent partition name + expected: raise exception + """ + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + data = cf.gen_default_dataframe_data(nb=2) + partition_name = "partition1" + error = {ct.err_code: 1, ct.err_msg: "The type of partition_name should be .."} + collection_w.upsert(data=data, partition_name=partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + def test_upsert_multi_partitions(self): + """ + target: test upsert two partitions + method: 1. create a collection and two partitions + 2. upsert two partitions + expected: raise exception + """ + c_name = cf.gen_unique_str(pre_upsert) + collection_w = self.init_collection_wrap(name=c_name) + collection_w.create_partition("partition_1") + collection_w.create_partition("partition_2") + cf.insert_data(collection_w) + data = cf.gen_default_dataframe_data(nb=1000) + error = {ct.err_code: 1, ct.err_msg: "['partition_1', 'partition_2'] has type , " + "but expected one of: (, )"} + collection_w.upsert(data=data, partition_name=["partition_1", "partition_2"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + def test_upsert_with_auto_id(self): + """ + target: test upsert with auto id + method: 1. create a collection with autoID=true + 2. upsert data no pk + expected: raise exception + """ + collection_w = self.init_collection_general(pre_upsert, auto_id=True, is_index=False)[0] + error = {ct.err_code: 1, ct.err_msg: "Upsert don't support autoid == true"} + float_vec_values = cf.gen_vectors(ct.default_nb, ct.default_dim) + data = [[np.float32(i) for i in range(ct.default_nb)], [str(i) for i in range(ct.default_nb)], + float_vec_values] + collection_w.upsert(data=data, check_task=CheckTasks.err_res, check_items=error) diff --git a/tests/python_client/testcases/test_query.py b/tests/python_client/testcases/test_query.py index c0819f53bc..244072e591 100644 --- a/tests/python_client/testcases/test_query.py +++ b/tests/python_client/testcases/test_query.py @@ -1125,6 +1125,31 @@ class TestQueryParams(TestcaseBase): ct.err_msg: "offset [%s] is invalid, should be in range " "[1, 16384], but got %s" % (offset, offset)}) + @pytest.mark.tags(CaseLabel.L2) + def test_query_during_upsert(self): + """ + target: test query during upsert + method: 1. create a collection and query + 2. query during upsert + 3. compare two query results + expected: the two query results is the same + """ + upsert_nb = 1000 + expr = f"int64 >= 0 && int64 <= {upsert_nb}" + collection_w = self.init_collection_general(prefix, True)[0] + res1 = collection_w.query(expr, output_fields=[default_float_field_name])[0] + + def do_upsert(): + data = cf.gen_default_data_for_upsert(upsert_nb)[0] + collection_w.upsert(data=data) + + t = threading.Thread(target=do_upsert, args=()) + t.start() + res2 = collection_w.query(expr, output_fields=[default_float_field_name])[0] + t.join() + assert [res1[i][default_float_field_name] for i in range(upsert_nb)] == \ + [res2[i][default_float_field_name] for i in range(upsert_nb)] + class TestQueryOperation(TestcaseBase): """ diff --git a/tests/python_client/testcases/test_search.py b/tests/python_client/testcases/test_search.py index ec8e67d02f..f10f28dac9 100644 --- a/tests/python_client/testcases/test_search.py +++ b/tests/python_client/testcases/test_search.py @@ -2,7 +2,7 @@ import multiprocessing import numbers import random import numpy - +import threading import pytest import pandas as pd from time import sleep @@ -2901,7 +2901,7 @@ class TestCollectionSearch(TestcaseBase): check_items={"name": collection_name, "schema": schema}) collection_w.create_index(field_name1, default_index_params, index_name=index_name) int_values = pd.Series(data=[i for i in range(0, nb)]) - float_vec_values = gen_vectors(nb, dim) + float_vec_values = cf.gen_vectors(nb, dim) dataframe = pd.DataFrame({ct.default_int64_field_name: int_values, field_name1: int_values, field_name2: float_vec_values}) collection_w.insert(dataframe) @@ -2975,6 +2975,31 @@ class TestCollectionSearch(TestcaseBase): "ids": insert_ids, "limit": default_limit}) + @pytest.mark.tags(CaseLabel.L2) + def test_search_during_upsert(self): + """ + target: test search during upsert + method: 1. create a collection and search + 2. search during upsert + 3. compare two search results + expected: the two search results is the same + """ + nq = 5 + upsert_nb = 1000 + collection_w = self.init_collection_general(prefix, True)[0] + vectors = [[random.random() for _ in range(default_dim)] for _ in range(nq)] + res1 = collection_w.search(vectors[:nq], default_search_field, default_search_params, default_limit)[0] + + def do_upsert(): + data = cf.gen_default_data_for_upsert(upsert_nb)[0] + collection_w.upsert(data=data) + + t = threading.Thread(target=do_upsert, args=()) + t.start() + res2 = collection_w.search(vectors[:nq], default_search_field, default_search_params, default_limit)[0] + t.join() + assert [res1[i].ids for i in range(nq)] == [res2[i].ids for i in range(nq)] + class TestSearchBase(TestcaseBase): @pytest.fixture(