mirror of https://github.com/milvus-io/milvus.git
parent
b21395f487
commit
0cb8153f6b
|
@ -310,6 +310,14 @@ class ApiCollectionWrapper:
|
|||
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
|
||||
return res, check_result
|
||||
|
||||
@trace()
|
||||
def upsert(self, data, partition_name=None, timeout=None, check_task=None, check_items=None, **kwargs):
|
||||
timeout = TIMEOUT if timeout is None else timeout
|
||||
func_name = sys._getframe().f_code.co_name
|
||||
res, check = api_request([self.collection.upsert, data, partition_name, timeout], **kwargs)
|
||||
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
|
||||
return res, check_result
|
||||
|
||||
@trace()
|
||||
def compact(self, timeout=None, check_task=None, check_items=None, **kwargs):
|
||||
timeout = TIMEOUT if timeout is None else timeout
|
||||
|
|
|
@ -238,6 +238,20 @@ def gen_default_dataframe_data(nb=ct.default_nb, dim=ct.default_dim, start=0):
|
|||
return df
|
||||
|
||||
|
||||
def gen_default_data_for_upsert(nb=ct.default_nb, dim=ct.default_dim, start=0, size=10000):
|
||||
int_values = pd.Series(data=[i for i in range(start, start + nb)])
|
||||
float_values = pd.Series(data=[np.float32(i + size) for i in range(start, start + nb)], dtype="float32")
|
||||
string_values = pd.Series(data=[str(i + size) for i in range(start, start + nb)], dtype="string")
|
||||
float_vec_values = gen_vectors(nb, dim)
|
||||
df = pd.DataFrame({
|
||||
ct.default_int64_field_name: int_values,
|
||||
ct.default_float_field_name: float_values,
|
||||
ct.default_string_field_name: string_values,
|
||||
ct.default_float_vec_field_name: float_vec_values
|
||||
})
|
||||
return df, float_values
|
||||
|
||||
|
||||
def gen_dataframe_multi_vec_fields(vec_fields, nb=ct.default_nb):
|
||||
"""
|
||||
gen dataframe data for fields: int64, float, float_vec and vec_fields
|
||||
|
@ -845,6 +859,7 @@ def gen_grant_list(collection_name):
|
|||
{"object": "User", "object_name": "*", "privilege": "SelectUser"}]
|
||||
return grant_list
|
||||
|
||||
|
||||
def install_milvus_operator_specific_config(namespace, milvus_mode, release_name, image,
|
||||
rate_limit_enable, collection_rate_limit):
|
||||
"""
|
||||
|
|
|
@ -12,7 +12,7 @@ allure-pytest==2.7.0
|
|||
pytest-print==0.2.1
|
||||
pytest-level==0.1.1
|
||||
pytest-xdist==2.5.0
|
||||
pymilvus==2.3.0.dev38
|
||||
pymilvus==2.3.0.dev45
|
||||
pytest-rerunfailures==9.1.1
|
||||
git+https://github.com/Projectplace/pytest-tags
|
||||
ndg-httpsclient
|
||||
|
|
|
@ -15,10 +15,12 @@ from common import common_type as ct
|
|||
from common.common_type import CaseLabel, CheckTasks
|
||||
|
||||
prefix = "insert"
|
||||
pre_upsert = "upsert"
|
||||
exp_name = "name"
|
||||
exp_schema = "schema"
|
||||
exp_num = "num_entities"
|
||||
exp_primary = "primary"
|
||||
default_float_name = ct.default_float_field_name
|
||||
default_schema = cf.gen_default_collection_schema()
|
||||
default_binary_schema = cf.gen_default_binary_collection_schema()
|
||||
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
|
||||
|
@ -1285,3 +1287,249 @@ class TestInsertString(TestcaseBase):
|
|||
data[2] = [""for _ in range(nb)]
|
||||
collection_w.insert(data)
|
||||
assert collection_w.num_entities == nb
|
||||
|
||||
|
||||
class TestUpsertValid(TestcaseBase):
|
||||
""" Valid test case of Upsert interface """
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_upsert_data_pk_not_exist(self):
|
||||
"""
|
||||
target: test upsert with collection has no data
|
||||
method: 1. create a collection with no initialized data
|
||||
2. upsert data
|
||||
expected: upsert run normally as inert
|
||||
"""
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
data = cf.gen_default_dataframe_data()
|
||||
collection_w.upsert(data=data)
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("start", [0, 1500, 2500, 3500])
|
||||
def test_upsert_data_pk_exist(self, start):
|
||||
"""
|
||||
target: test upsert data and collection pk exists
|
||||
method: 1. create a collection and insert data
|
||||
2. upsert data whose pk exists
|
||||
expected: upsert succeed
|
||||
"""
|
||||
upsert_nb = 1000
|
||||
collection_w = self.init_collection_general(pre_upsert, True)[0]
|
||||
upsert_data, float_values = cf.gen_default_data_for_upsert(upsert_nb, start=start)
|
||||
collection_w.upsert(data=upsert_data)
|
||||
exp = f"int64 >= {start} && int64 <= {upsert_nb + start}"
|
||||
res = collection_w.query(exp, output_fields=[default_float_name])[0]
|
||||
assert [res[i][default_float_name] for i in range(upsert_nb)] == float_values.to_list()
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_upsert_with_primary_key_string(self):
|
||||
"""
|
||||
target: test upsert with string primary key
|
||||
method: 1. create a collection with pk string
|
||||
2. insert data
|
||||
3. upsert data with ' ' before or after string
|
||||
expected: raise no exception
|
||||
"""
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
fields = [cf.gen_string_field(), cf.gen_float_vec_field(dim=ct.default_dim)]
|
||||
schema = cf.gen_collection_schema(fields=fields, primary_field=ct.default_string_field_name)
|
||||
collection_w = self.init_collection_wrap(name=c_name, schema=schema)
|
||||
vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(2)]
|
||||
collection_w.insert([["a", "b"], vectors])
|
||||
collection_w.upsert([[" a", "b "], vectors])
|
||||
assert collection_w.num_entities == 4
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_upsert_same_with_inserted_data(self):
|
||||
"""
|
||||
target: test upsert with data same with collection inserted data
|
||||
method: 1. create a collection and insert data
|
||||
2. upsert data same with inserted
|
||||
3. check the update data number
|
||||
expected: upsert successfully
|
||||
"""
|
||||
upsert_nb = 1000
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
data = cf.gen_default_dataframe_data()
|
||||
collection_w.insert(data=data)
|
||||
upsert_data = data[:upsert_nb]
|
||||
res = collection_w.upsert(data=upsert_data)[0]
|
||||
assert res.insert_count == upsert_nb, res.delete_count == upsert_nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_upsert_data_is_none(self):
|
||||
"""
|
||||
target: test upsert with data=None
|
||||
method: 1. create a collection
|
||||
2. insert data
|
||||
3. upsert data=None
|
||||
expected: raise no exception
|
||||
"""
|
||||
collection_w = self.init_collection_general(pre_upsert, insert_data=True, is_index=False)[0]
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
collection_w.upsert(data=None)
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_upsert_in_specific_partition(self):
|
||||
"""
|
||||
target: test upsert in specific partition
|
||||
method: 1. create a collection and 2 partitions
|
||||
2. insert data
|
||||
3. upsert in the given partition
|
||||
expected: raise no exception
|
||||
"""
|
||||
upsert_nb = 10
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
collection_w.create_partition("partition_1")
|
||||
collection_w.create_partition("partition_2")
|
||||
cf.insert_data(collection_w)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
|
||||
collection_w.load()
|
||||
data, float_values = cf.gen_default_data_for_upsert(upsert_nb)
|
||||
collection_w.upsert(data=data, partition_name="partition_2")
|
||||
res = collection_w.query("int64 >= 0 && int64<=20", output_fields=[default_float_name],
|
||||
partition_names=["partition_2"])[0]
|
||||
assert [res[i][default_float_name] for i in range(upsert_nb)] == float_values.to_list()
|
||||
|
||||
|
||||
class TestUpsertInvalid(TestcaseBase):
|
||||
""" Invalid test case of Upsert interface """
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.parametrize("data", ct.get_invalid_strs[:12])
|
||||
def test_upsert_non_data_type(self, data):
|
||||
"""
|
||||
target: test upsert with invalid data type
|
||||
method: upsert data type string, set, number, float...
|
||||
expected: raise exception
|
||||
"""
|
||||
if data is None:
|
||||
pytest.skip("data=None is valid")
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
error = {ct.err_code: 1, ct.err_msg: "The fields don't match with schema fields, expected: "
|
||||
"['int64', 'float', 'varchar', 'float_vector']"}
|
||||
collection_w.upsert(data=data, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_upsert_pk_type_invalid(self):
|
||||
"""
|
||||
target: test upsert with invalid pk type
|
||||
method: upsert data type string, float...
|
||||
expected: raise exception
|
||||
"""
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
data = [['a', 1.5], [np.float32(i) for i in range(2)], [str(i) for i in range(2)],
|
||||
cf.gen_vectors(2, ct.default_dim)]
|
||||
error = {ct.err_code: 1, ct.err_msg: "The data type of field int64 doesn't match, "
|
||||
"expected: INT64, got VARCHAR"}
|
||||
collection_w.upsert(data=data, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_upsert_data_unmatch(self):
|
||||
"""
|
||||
target: test upsert with unmatched data type
|
||||
method: 1. create a collection with default schema [int, float, string, vector]
|
||||
2. upsert with data [int, string, float, vector]
|
||||
expected: raise exception
|
||||
"""
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
vector = [random.random() for _ in range(ct.default_dim)]
|
||||
data = [1, "a", 2.0, vector]
|
||||
error = {ct.err_code: 1, ct.err_msg: "The fields don't match with schema fields, "
|
||||
"expected: ['int64', 'float', 'varchar', 'float_vector']"}
|
||||
collection_w.upsert(data=[data], check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.parametrize("vector", [[], [1.0, 2.0], "a", 1.0, None])
|
||||
def test_upsert_vector_unmatch(self, vector):
|
||||
"""
|
||||
target: test upsert with unmatched data vector
|
||||
method: 1. create a collection with dim=128
|
||||
2. upsert with vector dim unmatch
|
||||
expected: raise exception
|
||||
"""
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
data = [2.0, "a", vector]
|
||||
error = {ct.err_code: 1, ct.err_msg: "The fields don't match with schema fields, "
|
||||
"expected: ['int64', 'float', 'varchar', 'float_vector']"}
|
||||
collection_w.upsert(data=[data], check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.skip(reason="issue #22499")
|
||||
@pytest.mark.parametrize("partition_name", ct.get_invalid_strs)
|
||||
def test_upsert_partition_name_invalid(self, partition_name):
|
||||
"""
|
||||
target: test upsert partition name invalid
|
||||
method: 1. create a collection with partitions
|
||||
2. upsert with invalid partition name
|
||||
expected: raise exception
|
||||
"""
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
p_name = cf.gen_unique_str('partition_')
|
||||
collection_w.create_partition(p_name)
|
||||
cf.insert_data(collection_w)
|
||||
data = cf.gen_default_dataframe_data(nb=100)
|
||||
error = {ct.err_code: 1, ct.err_msg: "The type of partition_name should be .."}
|
||||
collection_w.upsert(data=data, partition_name=partition_name,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.skip(reason="issue #22499")
|
||||
def test_upsert_partition_name_nonexistent(self):
|
||||
"""
|
||||
target: test upsert partition name nonexistent
|
||||
method: 1. create a collection
|
||||
2. upsert with nonexistent partition name
|
||||
expected: raise exception
|
||||
"""
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
data = cf.gen_default_dataframe_data(nb=2)
|
||||
partition_name = "partition1"
|
||||
error = {ct.err_code: 1, ct.err_msg: "The type of partition_name should be .."}
|
||||
collection_w.upsert(data=data, partition_name=partition_name,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_upsert_multi_partitions(self):
|
||||
"""
|
||||
target: test upsert two partitions
|
||||
method: 1. create a collection and two partitions
|
||||
2. upsert two partitions
|
||||
expected: raise exception
|
||||
"""
|
||||
c_name = cf.gen_unique_str(pre_upsert)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
collection_w.create_partition("partition_1")
|
||||
collection_w.create_partition("partition_2")
|
||||
cf.insert_data(collection_w)
|
||||
data = cf.gen_default_dataframe_data(nb=1000)
|
||||
error = {ct.err_code: 1, ct.err_msg: "['partition_1', 'partition_2'] has type <class 'list'>, "
|
||||
"but expected one of: (<class 'bytes'>, <class 'str'>)"}
|
||||
collection_w.upsert(data=data, partition_name=["partition_1", "partition_2"],
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_upsert_with_auto_id(self):
|
||||
"""
|
||||
target: test upsert with auto id
|
||||
method: 1. create a collection with autoID=true
|
||||
2. upsert data no pk
|
||||
expected: raise exception
|
||||
"""
|
||||
collection_w = self.init_collection_general(pre_upsert, auto_id=True, is_index=False)[0]
|
||||
error = {ct.err_code: 1, ct.err_msg: "Upsert don't support autoid == true"}
|
||||
float_vec_values = cf.gen_vectors(ct.default_nb, ct.default_dim)
|
||||
data = [[np.float32(i) for i in range(ct.default_nb)], [str(i) for i in range(ct.default_nb)],
|
||||
float_vec_values]
|
||||
collection_w.upsert(data=data, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
|
|
@ -1125,6 +1125,31 @@ class TestQueryParams(TestcaseBase):
|
|||
ct.err_msg: "offset [%s] is invalid, should be in range "
|
||||
"[1, 16384], but got %s" % (offset, offset)})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_query_during_upsert(self):
|
||||
"""
|
||||
target: test query during upsert
|
||||
method: 1. create a collection and query
|
||||
2. query during upsert
|
||||
3. compare two query results
|
||||
expected: the two query results is the same
|
||||
"""
|
||||
upsert_nb = 1000
|
||||
expr = f"int64 >= 0 && int64 <= {upsert_nb}"
|
||||
collection_w = self.init_collection_general(prefix, True)[0]
|
||||
res1 = collection_w.query(expr, output_fields=[default_float_field_name])[0]
|
||||
|
||||
def do_upsert():
|
||||
data = cf.gen_default_data_for_upsert(upsert_nb)[0]
|
||||
collection_w.upsert(data=data)
|
||||
|
||||
t = threading.Thread(target=do_upsert, args=())
|
||||
t.start()
|
||||
res2 = collection_w.query(expr, output_fields=[default_float_field_name])[0]
|
||||
t.join()
|
||||
assert [res1[i][default_float_field_name] for i in range(upsert_nb)] == \
|
||||
[res2[i][default_float_field_name] for i in range(upsert_nb)]
|
||||
|
||||
|
||||
class TestQueryOperation(TestcaseBase):
|
||||
"""
|
||||
|
|
|
@ -2,7 +2,7 @@ import multiprocessing
|
|||
import numbers
|
||||
import random
|
||||
import numpy
|
||||
|
||||
import threading
|
||||
import pytest
|
||||
import pandas as pd
|
||||
from time import sleep
|
||||
|
@ -2901,7 +2901,7 @@ class TestCollectionSearch(TestcaseBase):
|
|||
check_items={"name": collection_name, "schema": schema})
|
||||
collection_w.create_index(field_name1, default_index_params, index_name=index_name)
|
||||
int_values = pd.Series(data=[i for i in range(0, nb)])
|
||||
float_vec_values = gen_vectors(nb, dim)
|
||||
float_vec_values = cf.gen_vectors(nb, dim)
|
||||
dataframe = pd.DataFrame({ct.default_int64_field_name: int_values,
|
||||
field_name1: int_values, field_name2: float_vec_values})
|
||||
collection_w.insert(dataframe)
|
||||
|
@ -2975,6 +2975,31 @@ class TestCollectionSearch(TestcaseBase):
|
|||
"ids": insert_ids,
|
||||
"limit": default_limit})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_search_during_upsert(self):
|
||||
"""
|
||||
target: test search during upsert
|
||||
method: 1. create a collection and search
|
||||
2. search during upsert
|
||||
3. compare two search results
|
||||
expected: the two search results is the same
|
||||
"""
|
||||
nq = 5
|
||||
upsert_nb = 1000
|
||||
collection_w = self.init_collection_general(prefix, True)[0]
|
||||
vectors = [[random.random() for _ in range(default_dim)] for _ in range(nq)]
|
||||
res1 = collection_w.search(vectors[:nq], default_search_field, default_search_params, default_limit)[0]
|
||||
|
||||
def do_upsert():
|
||||
data = cf.gen_default_data_for_upsert(upsert_nb)[0]
|
||||
collection_w.upsert(data=data)
|
||||
|
||||
t = threading.Thread(target=do_upsert, args=())
|
||||
t.start()
|
||||
res2 = collection_w.search(vectors[:nq], default_search_field, default_search_params, default_limit)[0]
|
||||
t.join()
|
||||
assert [res1[i].ids for i in range(nq)] == [res2[i].ids for i in range(nq)]
|
||||
|
||||
|
||||
class TestSearchBase(TestcaseBase):
|
||||
@pytest.fixture(
|
||||
|
|
Loading…
Reference in New Issue