milvus/tests/milvus_python_test/test_index.py

1747 lines
73 KiB
Python

"""
For testing index operations, including `create_index`, `describe_index` and `drop_index` interfaces
"""
import logging
import pytest
import time
import pdb
import threading
from multiprocessing import Pool, Process
import numpy
import sklearn.preprocessing
from milvus import IndexType, MetricType
from utils import *
nb = 6000
dim = 128
index_file_size = 10
vectors = gen_vectors(nb, dim)
vectors = sklearn.preprocessing.normalize(vectors, axis=1, norm='l2')
vectors = vectors.tolist()
BUILD_TIMEOUT = 300
nprobe = 1
tag = "1970-01-01"
NLIST = 4046
INVALID_NLIST = 100000000
class TestIndexBase:
@pytest.fixture(
scope="function",
params=gen_index()
)
def get_index(self, request, connect):
if str(connect._cmd("mode")[1]) == "CPU":
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in CPU mode")
if str(connect._cmd("mode")[1]) == "GPU":
if request.param["index_type"] == IndexType.IVF_PQ:
pytest.skip("ivfpq not support in GPU mode")
return request.param
@pytest.fixture(
scope="function",
params=gen_simple_index()
)
def get_simple_index(self, request, connect):
if str(connect._cmd("mode")[1]) == "CPU":
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in CPU mode")
if str(connect._cmd("mode")[1]) == "GPU":
if request.param["index_type"] == IndexType.IVF_PQ:
pytest.skip("ivfpq not support in GPU mode")
return request.param
"""
******************************************************************
The following cases are used to test `create_index` function
******************************************************************
"""
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index(self, connect, table, get_simple_index):
'''
target: test create index interface
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_type, index_param)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_no_vectors(self, connect, table, get_simple_index):
'''
target: test create index interface
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
status = connect.create_index(table, index_type, index_param)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_partition(self, connect, table, get_simple_index):
'''
target: test create index interface
method: create table, create partition, and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
status = connect.create_partition(table, tag)
status, ids = connect.add_vectors(table, vectors, partition_tag=tag)
status = connect.create_index(table, index_type, index_param)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_partition_flush(self, connect, table, get_simple_index):
'''
target: test create index interface
method: create table, create partition, and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
status = connect.create_partition(table, tag)
status, ids = connect.add_vectors(table, vectors, partition_tag=tag)
connect.flush()
status = connect.create_index(table, index_type, index_param)
assert status.OK()
@pytest.mark.level(2)
def test_create_index_without_connect(self, dis_connect, table):
'''
target: test create index without connection
method: create table and add vectors in it, check if added successfully
expected: raise exception
'''
nlist = NLIST
index_type = IndexType.IVF_SQ8
index_param = {"nlist": nlist}
with pytest.raises(Exception) as e:
status = dis_connect.create_index(table, index_type, index_param)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_search_with_query_vectors(self, connect, table, get_simple_index):
'''
target: test create index interface, search with more query vectors
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_type, index_param)
logging.getLogger().info(connect.describe_index(table))
query_vecs = [vectors[0], vectors[1], vectors[2]]
top_k = 5
search_param = get_search_param(index_type)
status, result = connect.search_vectors(table, top_k, query_vecs, params=search_param)
assert status.OK()
assert len(result) == len(query_vecs)
logging.getLogger().info(result)
@pytest.mark.timeout(BUILD_TIMEOUT)
@pytest.mark.level(2)
def test_create_index_multithread(self, connect, table, args):
'''
target: test create index interface with multiprocess
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
status, ids = connect.add_vectors(table, vectors)
def build(connect):
status = connect.create_index(table, IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK()
threads_num = 8
threads = []
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(threads_num):
m = get_milvus(args["handler"])
m.connect(uri=uri)
t = threading.Thread(target=build, args=(m,))
threads.append(t)
t.start()
time.sleep(0.2)
for t in threads:
t.join()
query_vec = [vectors[0]]
top_k = 1
search_param = {"nprobe": nprobe}
status, result = connect.search_vectors(table, top_k, query_vec, params=search_param)
assert len(result) == 1
assert len(result[0]) == top_k
assert result[0][0].distance == 0.0
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_multithread_multitable(self, connect, args):
'''
target: test create index interface with multiprocess
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
threads_num = 8
loop_num = 8
threads = []
table = []
j = 0
while j < (threads_num*loop_num):
table_name = gen_unique_str("test_create_index_multiprocessing")
table.append(table_name)
param = {'table_name': table_name,
'dimension': dim,
'index_type': IndexType.FLAT,
'store_raw_vector': False}
connect.create_table(param)
j = j + 1
def create_index():
i = 0
while i < loop_num:
# assert connect.has_table(table[ids*process_num+i])
status, ids = connect.add_vectors(table[ids*threads_num+i], vectors)
status = connect.create_index(table[ids*threads_num+i], IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK()
query_vec = [vectors[0]]
top_k = 1
search_param = {"nprobe": nprobe}
status, result = connect.search_vectors(table[ids*threads_num+i], top_k, query_vec, params=search_param)
assert len(result) == 1
assert len(result[0]) == top_k
assert result[0][0].distance == 0.0
i = i + 1
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(threads_num):
m = get_milvus(args["handler"])
m.connect(uri=uri)
ids = i
t = threading.Thread(target=create_index, args=(m,ids))
threads.append(t)
t.start()
time.sleep(0.2)
for t in threads:
t.join()
@pytest.mark.timeout(BUILD_TIMEOUT)
@pytest.mark.level(2)
def test_create_index_a_multithreads(self, connect, table, args):
status, ids = connect.add_vectors(table, vectors)
def build(connect):
status = connect.create_index(table, IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK()
def count(connect):
status, count = connect.get_table_row_count(table)
assert status.OK()
assert count == nb
threads_num = 8
threads = []
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(threads_num):
m = get_milvus(args["handler"])
m.connect(uri=uri)
if(i % 2 == 0):
p = threading.Thread(target=build, args=(m,))
else:
p = threading.Thread(target=count, args=(m,))
threads.append(p)
p.start()
time.sleep(0.2)
for p in threads:
p.join()
# TODO: enable
@pytest.mark.timeout(BUILD_TIMEOUT)
@pytest.mark.level(2)
def _test_create_index_multiprocessing(self, connect, table, args):
'''
target: test create index interface with multiprocess
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
status, ids = connect.add_vectors(table, vectors)
def build(connect):
status = connect.create_index(table, IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK()
process_num = 8
processes = []
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(process_num):
m = get_milvus(args["handler"])
m.connect(uri=uri)
p = Process(target=build, args=(m,))
processes.append(p)
p.start()
time.sleep(0.2)
for p in processes:
p.join()
query_vec = [vectors[0]]
top_k = 1
search_param = {"nprobe": nprobe}
status, result = connect.search_vectors(table, top_k, query_vec, params=search_param)
assert len(result) == 1
assert len(result[0]) == top_k
assert result[0][0].distance == 0.0
# TODO: enable
@pytest.mark.timeout(BUILD_TIMEOUT)
def _test_create_index_multiprocessing_multitable(self, connect, args):
'''
target: test create index interface with multiprocess
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
process_num = 8
loop_num = 8
processes = []
table = []
j = 0
while j < (process_num*loop_num):
table_name = gen_unique_str("test_create_index_multiprocessing")
table.append(table_name)
param = {'table_name': table_name,
'dimension': dim,
'index_type': IndexType.FLAT,
'store_raw_vector': False}
connect.create_table(param)
j = j + 1
def create_index():
i = 0
while i < loop_num:
# assert connect.has_table(table[ids*process_num+i])
status, ids = connect.add_vectors(table[ids*process_num+i], vectors)
status = connect.create_index(table[ids*process_num+i], IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK()
query_vec = [vectors[0]]
top_k = 1
search_param = {"nprobe": nprobe}
status, result = connect.search_vectors(table[ids*process_num+i], top_k, query_vec, params=search_param)
assert len(result) == 1
assert len(result[0]) == top_k
assert result[0][0].distance == 0.0
i = i + 1
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(process_num):
m = get_milvus(args["handler"])
m.connect(uri=uri)
ids = i
p = Process(target=create_index, args=(m,ids))
processes.append(p)
p.start()
time.sleep(0.2)
for p in processes:
p.join()
def test_create_index_table_not_existed(self, connect):
'''
target: test create index interface when table name not existed
method: create table and add vectors in it, create index
, make sure the table name not in index
expected: return code not equals to 0, create index failed
'''
table_name = gen_unique_str(self.__class__.__name__)
nlist = NLIST
index_type = IndexType.IVF_SQ8
index_param = {"nlist": nlist}
status = connect.create_index(table_name, index_type, index_param)
assert not status.OK()
def test_create_index_table_None(self, connect):
'''
target: test create index interface when table name is None
method: create table and add vectors in it, create index with an table_name: None
expected: return code not equals to 0, create index failed
'''
table_name = None
nlist = NLIST
index_type = IndexType.IVF_SQ8
index_param = {"nlist": nlist}
with pytest.raises(Exception) as e:
status = connect.create_index(table_name, index_type, index_param)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_no_vectors_then_add_vectors(self, connect, table, get_simple_index):
'''
target: test create index interface when there is no vectors in table, and does not affect the subsequent process
method: create table and add no vectors in it, and then create index, add vectors in it
expected: return code equals to 0
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
status = connect.create_index(table, index_type, index_param)
status, ids = connect.add_vectors(table, vectors)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_same_index_repeatedly(self, connect, table, get_simple_index):
'''
target: check if index can be created repeatedly, with the same create_index params
method: create index after index have been built
expected: return code success, and search ok
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
status = connect.create_index(table, index_type, index_param)
status = connect.create_index(table, index_type, index_param)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_different_index_repeatedly(self, connect, table):
'''
target: check if index can be created repeatedly, with the different create_index params
method: create another index with different index_params after index have been built
expected: return code 0, and describe index result equals with the second index params
'''
nlist = NLIST
status, ids = connect.add_vectors(table, vectors)
index_type_1 = IndexType.IVF_SQ8
index_type_2 = IndexType.IVFLAT
indexs = [{"index_type": index_type_1, "index_param": {"nlist": nlist}}, {"index_type": index_type_2, "index_param": {"nlist": nlist}}]
logging.getLogger().info(indexs)
for index in indexs:
status = connect.create_index(table, index["index_type"], index["index_param"])
assert status.OK()
status, result = connect.describe_index(table)
assert result._params["nlist"] == nlist
assert result._table_name == table
assert result._index_type == index_type_2
"""
******************************************************************
The following cases are used to test `describe_index` function
******************************************************************
"""
def test_describe_index(self, connect, table, get_index):
'''
target: test describe index interface
method: create table and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
index_param = get_index["index_param"]
index_type = get_index["index_type"]
logging.getLogger().info(get_index)
# status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_type, index_param)
status, result = connect.describe_index(table)
logging.getLogger().info(result)
assert result._params == index_param
assert result._table_name == table
assert result._index_type == index_type
def test_describe_and_drop_index_multi_tables(self, connect, get_simple_index):
'''
target: test create, describe and drop index interface with multiple tables of L2
method: create tables and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
nq = 100
vectors = gen_vectors(nq, dim)
table_list = []
for i in range(10):
table_name = gen_unique_str()
table_list.append(table_name)
param = {'table_name': table_name,
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
connect.create_table(param)
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
status, ids = connect.add_vectors(table_name=table_name, records=vectors)
status = connect.create_index(table_name, index_type, index_param)
assert status.OK()
for i in range(10):
status, result = connect.describe_index(table_list[i])
logging.getLogger().info(result)
assert result._params == index_param
assert result._table_name == table_list[i]
assert result._index_type == index_type
for i in range(10):
status = connect.drop_index(table_list[i])
assert status.OK()
status, result = connect.describe_index(table_list[i])
logging.getLogger().info(result)
assert result._table_name == table_list[i]
assert result._index_type == IndexType.FLAT
@pytest.mark.level(2)
def test_describe_index_without_connect(self, dis_connect, table):
'''
target: test describe index without connection
method: describe index, and check if describe successfully
expected: raise exception
'''
with pytest.raises(Exception) as e:
status = dis_connect.describe_index(table)
def test_describe_index_table_not_existed(self, connect):
'''
target: test describe index interface when table name not existed
method: create table and add vectors in it, create index
, make sure the table name not in index
expected: return code not equals to 0, describe index failed
'''
table_name = gen_unique_str(self.__class__.__name__)
status, result = connect.describe_index(table_name)
assert not status.OK()
def test_describe_index_table_None(self, connect):
'''
target: test describe index interface when table name is None
method: create table and add vectors in it, create index with an table_name: None
expected: return code not equals to 0, describe index failed
'''
table_name = None
with pytest.raises(Exception) as e:
status = connect.describe_index(table_name)
def test_describe_index_not_create(self, connect, table):
'''
target: test describe index interface when index not created
method: create table and add vectors in it, create index
, make sure the table name not in index
expected: return code not equals to 0, describe index failed
'''
status, ids = connect.add_vectors(table, vectors)
status, result = connect.describe_index(table)
logging.getLogger().info(result)
assert status.OK()
# assert result._params["nlist"] == index_params["nlist"]
# assert result._table_name == table
# assert result._index_type == index_params["index_type"]
"""
******************************************************************
The following cases are used to test `drop_index` function
******************************************************************
"""
def test_drop_index(self, connect, table, get_simple_index):
'''
target: test drop index interface
method: create table and add vectors in it, create index, call drop index
expected: return code 0, and default index param
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
# status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
status = connect.drop_index(table)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
assert result._table_name == table
assert result._index_type == IndexType.FLAT
def test_drop_index_repeatly(self, connect, table, get_simple_index):
'''
target: test drop index repeatly
method: create index, call drop index, and drop again
expected: return code 0
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
# status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
status = connect.drop_index(table)
assert status.OK()
status = connect.drop_index(table)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
assert result._table_name == table
assert result._index_type == IndexType.FLAT
@pytest.mark.level(2)
def test_drop_index_without_connect(self, dis_connect, table):
'''
target: test drop index without connection
method: drop index, and check if drop successfully
expected: raise exception
'''
with pytest.raises(Exception) as e:
status = dis_connect.drop_index(table)
def test_drop_index_table_not_existed(self, connect):
'''
target: test drop index interface when table name not existed
method: create table and add vectors in it, create index
, make sure the table name not in index, and then drop it
expected: return code not equals to 0, drop index failed
'''
table_name = gen_unique_str(self.__class__.__name__)
status = connect.drop_index(table_name)
assert not status.OK()
def test_drop_index_table_None(self, connect):
'''
target: test drop index interface when table name is None
method: create table and add vectors in it, create index with an table_name: None
expected: return code not equals to 0, drop index failed
'''
table_name = None
with pytest.raises(Exception) as e:
status = connect.drop_index(table_name)
def test_drop_index_table_not_create(self, connect, table):
'''
target: test drop index interface when index not created
method: create table and add vectors in it, create index
expected: return code not equals to 0, drop index failed
'''
status, ids = connect.add_vectors(table, vectors)
status, result = connect.describe_index(table)
logging.getLogger().info(result)
# no create index
status = connect.drop_index(table)
logging.getLogger().info(status)
assert status.OK()
def test_create_drop_index_repeatly(self, connect, table, get_simple_index):
'''
target: test create / drop index repeatly, use the same index params
method: create index, drop index, four times
expected: return code 0
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
# status, ids = connect.add_vectors(table, vectors)
for i in range(2):
status = connect.create_index(table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
status = connect.drop_index(table)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
assert result._table_name == table
assert result._index_type == IndexType.FLAT
def test_create_drop_index_repeatly_different_index_params(self, connect, table):
'''
target: test create / drop index repeatly, use the different index params
method: create index, drop index, four times, each tme use different index_params to create index
expected: return code 0
'''
nlist = NLIST
indexs = [{"index_type": IndexType.IVFLAT, "index_param": {"nlist": nlist}}, {"index_type": IndexType.IVF_SQ8, "index_param": {"nlist": nlist}}]
# status, ids = connect.add_vectors(table, vectors)
for i in range(2):
status = connect.create_index(table, indexs[i]["index_type"], indexs[i]["index_param"])
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
status = connect.drop_index(table)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
assert result._table_name == table
assert result._index_type == IndexType.FLAT
class TestIndexIP:
@pytest.fixture(
scope="function",
params=gen_index()
)
def get_index(self, request, connect):
if str(connect._cmd("mode")[1]) == "CPU":
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in CPU mode")
if str(connect._cmd("mode")[1]) == "GPU":
if request.param["index_type"] == IndexType.IVF_PQ:
pytest.skip("ivfpq not support in GPU mode")
return request.param
@pytest.fixture(
scope="function",
params=gen_simple_index()
)
def get_simple_index(self, request, connect):
if str(connect._cmd("mode")[1]) == "CPU":
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in CPU mode")
if str(connect._cmd("mode")[1]) == "GPU":
if request.param["index_type"] == IndexType.IVF_PQ:
pytest.skip("ivfpq not support in GPU mode")
return request.param
"""
******************************************************************
The following cases are used to test `create_index` function
******************************************************************
"""
@pytest.mark.level(2)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index(self, connect, ip_table, get_simple_index):
'''
target: test create index interface
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(ip_table, index_type, index_param)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_table(self, connect, ip_table, get_simple_index):
'''
target: test create index interface
method: create table, create partition, and add vectors in it, create index on table
expected: return code equals to 0, and search success
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
logging.getLogger().info(get_simple_index)
status = connect.create_partition(ip_table, tag)
status, ids = connect.add_vectors(ip_table, vectors, partition_tag=tag)
status = connect.create_index(ip_table, index_type, index_param)
assert status.OK()
@pytest.mark.level(2)
def test_create_index_without_connect(self, dis_connect, ip_table):
'''
target: test create index without connection
method: create table and add vectors in it, check if added successfully
expected: raise exception
'''
nlist = NLIST
index_type = IndexType.IVF_SQ8
index_param = {"nlist": nlist}
with pytest.raises(Exception) as e:
status = dis_connect.create_index(ip_table, index_type, index_param)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_search_with_query_vectors(self, connect, ip_table, get_simple_index):
'''
target: test create index interface, search with more query vectors
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(ip_table, index_type, index_param)
logging.getLogger().info(connect.describe_index(ip_table))
query_vecs = [vectors[0], vectors[1], vectors[2]]
top_k = 5
search_param = get_search_param(index_type)
status, result = connect.search_vectors(ip_table, top_k, query_vecs, params=search_param)
logging.getLogger().info(result)
assert status.OK()
assert len(result) == len(query_vecs)
# TODO: enable
@pytest.mark.timeout(BUILD_TIMEOUT)
@pytest.mark.level(2)
def _test_create_index_multiprocessing(self, connect, ip_table, args):
'''
target: test create index interface with multiprocess
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
status, ids = connect.add_vectors(ip_table, vectors)
def build(connect):
status = connect.create_index(ip_table, IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK()
process_num = 8
processes = []
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(process_num):
m = get_milvus(args["handler"])
m.connect(uri=uri)
p = Process(target=build, args=(m,))
processes.append(p)
p.start()
time.sleep(0.2)
for p in processes:
p.join()
query_vec = [vectors[0]]
top_k = 1
search_param = {"nprobe": nprobe}
status, result = connect.search_vectors(ip_table, top_k, query_vec, params=search_param)
assert len(result) == 1
assert len(result[0]) == top_k
assert result[0][0].distance == 0.0
# TODO: enable
@pytest.mark.timeout(BUILD_TIMEOUT)
def _test_create_index_multiprocessing_multitable(self, connect, args):
'''
target: test create index interface with multiprocess
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
process_num = 8
loop_num = 8
processes = []
table = []
j = 0
while j < (process_num*loop_num):
table_name = gen_unique_str("test_create_index_multiprocessing")
table.append(table_name)
param = {'table_name': table_name,
'dimension': dim}
connect.create_table(param)
j = j + 1
def create_index():
i = 0
while i < loop_num:
# assert connect.has_table(table[ids*process_num+i])
status, ids = connect.add_vectors(table[ids*process_num+i], vectors)
status = connect.create_index(table[ids*process_num+i], IndexType.IVFLAT, {"nlist": NLIST})
assert status.OK()
query_vec = [vectors[0]]
top_k = 1
search_param = {"nprobe": nprobe}
status, result = connect.search_vectors(table[ids*process_num+i], top_k, query_vec, params=search_param)
assert len(result) == 1
assert len(result[0]) == top_k
assert result[0][0].distance == 0.0
i = i + 1
uri = "tcp://%s:%s" % (args["ip"], args["port"])
for i in range(process_num):
m = get_milvus(args["handler"])
m.connect(uri=uri)
ids = i
p = Process(target=create_index, args=(m,ids))
processes.append(p)
p.start()
time.sleep(0.2)
for p in processes:
p.join()
def test_create_index_no_vectors(self, connect, ip_table):
'''
target: test create index interface when there is no vectors in table
method: create table and add no vectors in it, and then create index
expected: return code equals to 0
'''
nlist = NLIST
index_type = IndexType.IVF_SQ8
index_param = {"nlist": nlist}
status = connect.create_index(ip_table, index_type, index_param)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_no_vectors_then_add_vectors(self, connect, ip_table, get_simple_index):
'''
target: test create index interface when there is no vectors in table, and does not affect the subsequent process
method: create table and add no vectors in it, and then create index, add vectors in it
expected: return code equals to 0
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
status = connect.create_index(ip_table, index_type, index_param)
status, ids = connect.add_vectors(ip_table, vectors)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_same_index_repeatedly(self, connect, ip_table):
'''
target: check if index can be created repeatedly, with the same create_index params
method: create index after index have been built
expected: return code success, and search ok
'''
nlist = NLIST
status, ids = connect.add_vectors(ip_table, vectors)
index_type = IndexType.IVF_SQ8
index_param = {"nlist": nlist}
status = connect.create_index(ip_table, index_type, index_param)
status = connect.create_index(ip_table, index_type, index_param)
assert status.OK()
query_vec = [vectors[0]]
top_k = 1
search_param = {"nprobe": nprobe}
status, result = connect.search_vectors(ip_table, top_k, query_vec, params=search_param)
assert len(result) == 1
assert len(result[0]) == top_k
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_different_index_repeatedly(self, connect, ip_table):
'''
target: check if index can be created repeatedly, with the different create_index params
method: create another index with different index_params after index have been built
expected: return code 0, and describe index result equals with the second index params
'''
nlist = NLIST
status, ids = connect.add_vectors(ip_table, vectors)
index_type_1 = IndexType.IVF_SQ8
index_type_2 = IndexType.IVFLAT
indexs = [{"index_type": index_type_1, "index_param": {"nlist": nlist}}, {"index_type": index_type_2, "index_param": {"nlist": nlist}}]
logging.getLogger().info(indexs)
for index in indexs:
status = connect.create_index(ip_table, index["index_type"], index["index_param"])
assert status.OK()
status, result = connect.describe_index(ip_table)
assert result._params["nlist"] == nlist
assert result._table_name == ip_table
assert result._index_type == index_type_2
"""
******************************************************************
The following cases are used to test `describe_index` function
******************************************************************
"""
def test_describe_index(self, connect, ip_table, get_simple_index):
'''
target: test describe index interface
method: create table and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
logging.getLogger().info(get_simple_index)
if index_type in [IndexType.RNSG]:
pytest.skip()
# status, ids = connect.add_vectors(ip_table, vectors[:5000])
status = connect.create_index(ip_table, index_type, index_param)
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._table_name == ip_table
status, mode = connect._cmd("mode")
if str(mode) == "GPU" and index_type == IndexType.IVF_PQ:
assert result._index_type == IndexType.FLAT
assert result._params["nlist"] == NLIST
else:
assert result._index_type == index_type
assert result._params == index_param
def test_describe_index_partition(self, connect, ip_table, get_simple_index):
'''
target: test describe index interface
method: create table, create partition and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
logging.getLogger().info(get_simple_index)
status = connect.create_partition(ip_table, tag)
status, ids = connect.add_vectors(ip_table, vectors, partition_tag=tag)
status = connect.create_index(ip_table, index_type, index_param)
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._params == index_param
assert result._table_name == ip_table
assert result._index_type == index_type
def test_describe_index_partition_A(self, connect, ip_table, get_simple_index):
'''
target: test describe index interface
method: create table, create partitions and add vectors in it, create index on partitions, call describe index
expected: return code 0, and index instructure
'''
new_tag = "new_tag"
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
logging.getLogger().info(get_simple_index)
status = connect.create_partition(ip_table, tag)
status = connect.create_partition(ip_table, new_tag)
status, ids = connect.add_vectors(ip_table, vectors, partition_tag=tag)
status, ids = connect.add_vectors(ip_table, vectors, partition_tag=new_tag)
status = connect.create_index(ip_table, index_type, index_param)
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._params == index_param
assert result._table_name == ip_table
assert result._index_type == index_type
def test_describe_and_drop_index_multi_tables(self, connect, get_simple_index):
'''
target: test create, describe and drop index interface with multiple tables of IP
method: create tables and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
nq = 100
vectors = gen_vectors(nq, dim)
table_list = []
for i in range(10):
table_name = gen_unique_str()
table_list.append(table_name)
param = {'table_name': table_name,
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.IP}
connect.create_table(param)
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
logging.getLogger().info(get_simple_index)
status, ids = connect.add_vectors(table_name=table_name, records=vectors)
status = connect.create_index(table_name, index_type, index_param)
assert status.OK()
for i in range(10):
status, result = connect.describe_index(table_list[i])
logging.getLogger().info(result)
assert result._params == index_param
assert result._table_name == table_list[i]
assert result._index_type == index_type
for i in range(10):
status = connect.drop_index(table_list[i])
assert status.OK()
status, result = connect.describe_index(table_list[i])
logging.getLogger().info(result)
assert result._table_name == table_list[i]
assert result._index_type == IndexType.FLAT
@pytest.mark.level(2)
def test_describe_index_without_connect(self, dis_connect, ip_table):
'''
target: test describe index without connection
method: describe index, and check if describe successfully
expected: raise exception
'''
with pytest.raises(Exception) as e:
status = dis_connect.describe_index(ip_table)
def test_describe_index_not_create(self, connect, ip_table):
'''
target: test describe index interface when index not created
method: create table and add vectors in it, create index
, make sure the table name not in index
expected: return code not equals to 0, describe index failed
'''
status, ids = connect.add_vectors(ip_table, vectors)
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert status.OK()
# assert result._params["nlist"] == index_params["nlist"]
# assert result._table_name == table
# assert result._index_type == index_params["index_type"]
"""
******************************************************************
The following cases are used to test `drop_index` function
******************************************************************
"""
def test_drop_index(self, connect, ip_table, get_simple_index):
'''
target: test drop index interface
method: create table and add vectors in it, create index, call drop index
expected: return code 0, and default index param
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
status, mode = connect._cmd("mode")
assert status.OK()
if index_type in [IndexType.RNSG]:
pytest.skip()
# status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(ip_table, index_type, index_param)
if str(mode) == "GPU" and (index_type == IndexType.IVF_PQ):
assert not status.OK()
else:
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
status = connect.drop_index(ip_table)
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._table_name == ip_table
assert result._index_type == IndexType.FLAT
def test_drop_index_partition(self, connect, ip_table, get_simple_index):
'''
target: test drop index interface
method: create table, create partition and add vectors in it, create index on table, call drop table index
expected: return code 0, and default index param
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
status = connect.create_partition(ip_table, tag)
status, ids = connect.add_vectors(ip_table, vectors, partition_tag=tag)
status = connect.create_index(ip_table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
status = connect.drop_index(ip_table)
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._table_name == ip_table
assert result._index_type == IndexType.FLAT
def test_drop_index_partition_C(self, connect, ip_table, get_simple_index):
'''
target: test drop index interface
method: create table, create partitions and add vectors in it, create index on partitions, call drop partition index
expected: return code 0, and default index param
'''
new_tag = "new_tag"
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
status = connect.create_partition(ip_table, tag)
status = connect.create_partition(ip_table, new_tag)
status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(ip_table, index_type, index_param)
assert status.OK()
status = connect.drop_index(ip_table)
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._table_name == ip_table
assert result._index_type == IndexType.FLAT
def test_drop_index_repeatly(self, connect, ip_table, get_simple_index):
'''
target: test drop index repeatly
method: create index, call drop index, and drop again
expected: return code 0
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
# status, ids = connect.add_vectors(ip_table, vectors)
status, mode = connect._cmd("mode")
assert status.OK()
if index_type in [IndexType.RNSG]:
pytest.skip()
# status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(ip_table, index_type, index_param)
if str(mode) == "GPU" and (index_type == IndexType.IVF_PQ):
assert not status.OK()
else:
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
status = connect.drop_index(ip_table)
assert status.OK()
status = connect.drop_index(ip_table)
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._table_name == ip_table
assert result._index_type == IndexType.FLAT
@pytest.mark.level(2)
def test_drop_index_without_connect(self, dis_connect, ip_table):
'''
target: test drop index without connection
method: drop index, and check if drop successfully
expected: raise exception
'''
nlist = NLIST
index_type = IndexType.IVFLAT
index_param = {"nlist": nlist}
with pytest.raises(Exception) as e:
status = dis_connect.drop_index(ip_table, index_type, index_param)
def test_drop_index_table_not_create(self, connect, ip_table):
'''
target: test drop index interface when index not created
method: create table and add vectors in it, create index
expected: return code not equals to 0, drop index failed
'''
status, ids = connect.add_vectors(ip_table, vectors)
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
# no create index
status = connect.drop_index(ip_table)
logging.getLogger().info(status)
assert status.OK()
def test_create_drop_index_repeatly(self, connect, ip_table, get_simple_index):
'''
target: test create / drop index repeatly, use the same index params
method: create index, drop index, four times
expected: return code 0
'''
index_param = get_simple_index["index_param"]
index_type = get_simple_index["index_type"]
if index_type in [IndexType.RNSG]:
pytest.skip("Skip some RNSG cases")
status, ids = connect.add_vectors(ip_table, vectors)
for i in range(2):
status = connect.create_index(ip_table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
status = connect.drop_index(ip_table)
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._table_name == ip_table
assert result._index_type == IndexType.FLAT
def test_create_drop_index_repeatly_different_index_params(self, connect, ip_table):
'''
target: test create / drop index repeatly, use the different index params
method: create index, drop index, four times, each tme use different index_params to create index
expected: return code 0
'''
nlist = NLIST
indexs = [{"index_type": IndexType.IVFLAT, "index_param": {"nlist": nlist}}, {"index_type": IndexType.IVF_SQ8, "index_param": {"nlist": nlist}}]
status, ids = connect.add_vectors(ip_table, vectors)
for i in range(2):
status = connect.create_index(ip_table, indexs[i]["index_type"], indexs[i]["index_param"])
assert status.OK()
status, result = connect.describe_index(ip_table)
assert result._params == indexs[i]["index_param"]
assert result._table_name == ip_table
assert result._index_type == indexs[i]["index_type"]
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
status = connect.drop_index(ip_table)
assert status.OK()
status, result = connect.describe_index(ip_table)
logging.getLogger().info(result)
assert result._table_name == ip_table
assert result._index_type == IndexType.FLAT
class TestIndexJAC:
tmp, vectors = gen_binary_vectors(nb, dim)
@pytest.fixture(
scope="function",
params=gen_index()
)
def get_index(self, request, connect):
if str(connect._cmd("mode")[1]) == "CPU":
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in CPU mode")
if str(connect._cmd("mode")[1]) == "GPU":
if request.param["index_type"] == IndexType.IVF_PQ:
pytest.skip("ivfpq not support in GPU mode")
return request.param
@pytest.fixture(
scope="function",
params=gen_simple_index()
)
def get_simple_index(self, request, connect):
if str(connect._cmd("mode")[1]) == "CPU":
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in CPU mode")
if str(connect._cmd("mode")[1]) == "GPU":
if request.param["index_type"] == IndexType.IVF_PQ:
pytest.skip("ivfpq not support in GPU mode")
return request.param
@pytest.fixture(
scope="function",
params=gen_simple_index()
)
def get_jaccard_index(self, request, connect):
logging.getLogger().info(request.param)
if request.param["index_type"] == IndexType.IVFLAT or request.param["index_type"] == IndexType.FLAT:
return request.param
else:
pytest.skip("Skip index Temporary")
"""
******************************************************************
The following cases are used to test `create_index` function
******************************************************************
"""
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index(self, connect, jac_table, get_jaccard_index):
'''
target: test create index interface
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_jaccard_index["index_param"]
index_type = get_jaccard_index["index_type"]
logging.getLogger().info(get_jaccard_index)
status, ids = connect.add_vectors(jac_table, self.vectors)
status = connect.create_index(jac_table, index_type, index_param)
if index_type != IndexType.FLAT and index_type != IndexType.IVFLAT:
assert not status.OK()
else:
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_partition(self, connect, jac_table, get_jaccard_index):
'''
target: test create index interface
method: create table, create partition, and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_jaccard_index["index_param"]
index_type = get_jaccard_index["index_type"]
logging.getLogger().info(get_jaccard_index)
status = connect.create_partition(jac_table, tag)
status, ids = connect.add_vectors(jac_table, self.vectors, partition_tag=tag)
status = connect.create_index(jac_table, index_type, index_param)
assert status.OK()
@pytest.mark.level(2)
def test_create_index_without_connect(self, dis_connect, jac_table):
'''
target: test create index without connection
method: create table and add vectors in it, check if added successfully
expected: raise exception
'''
nlist = NLIST
index_param = {"nlist": nlist}
with pytest.raises(Exception) as e:
status = dis_connect.create_index(jac_table, IndexType.IVF_SQ8, index_param)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_search_with_query_vectors(self, connect, jac_table, get_jaccard_index):
'''
target: test create index interface, search with more query vectors
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_jaccard_index["index_param"]
index_type = get_jaccard_index["index_type"]
logging.getLogger().info(get_jaccard_index)
status, ids = connect.add_vectors(jac_table, self.vectors)
status = connect.create_index(jac_table, index_type, index_param)
logging.getLogger().info(connect.describe_index(jac_table))
query_vecs = [self.vectors[0], self.vectors[1], self.vectors[2]]
top_k = 5
search_param = get_search_param(index_type)
status, result = connect.search_vectors(jac_table, top_k, query_vecs, params=search_param)
logging.getLogger().info(result)
assert status.OK()
assert len(result) == len(query_vecs)
"""
******************************************************************
The following cases are used to test `describe_index` function
******************************************************************
"""
def test_describe_index(self, connect, jac_table, get_jaccard_index):
'''
target: test describe index interface
method: create table and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
index_param = get_jaccard_index["index_param"]
index_type = get_jaccard_index["index_type"]
logging.getLogger().info(get_jaccard_index)
# status, ids = connect.add_vectors(jac_table, vectors[:5000])
status = connect.create_index(jac_table, index_type, index_param)
status, result = connect.describe_index(jac_table)
logging.getLogger().info(result)
assert result._table_name == jac_table
assert result._index_type == index_type
assert result._params == index_param
def test_describe_index_partition(self, connect, jac_table, get_jaccard_index):
'''
target: test describe index interface
method: create table, create partition and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
index_param = get_jaccard_index["index_param"]
index_type = get_jaccard_index["index_type"]
logging.getLogger().info(get_jaccard_index)
status = connect.create_partition(jac_table, tag)
status, ids = connect.add_vectors(jac_table, vectors, partition_tag=tag)
status = connect.create_index(jac_table, index_type, index_param)
status, result = connect.describe_index(jac_table)
logging.getLogger().info(result)
assert result._params == index_param
assert result._table_name == jac_table
assert result._index_type == index_type
"""
******************************************************************
The following cases are used to test `drop_index` function
******************************************************************
"""
def test_drop_index(self, connect, jac_table, get_jaccard_index):
'''
target: test drop index interface
method: create table and add vectors in it, create index, call drop index
expected: return code 0, and default index param
'''
index_param = get_jaccard_index["index_param"]
index_type = get_jaccard_index["index_type"]
status, mode = connect._cmd("mode")
assert status.OK()
# status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(jac_table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(jac_table)
logging.getLogger().info(result)
status = connect.drop_index(jac_table)
assert status.OK()
status, result = connect.describe_index(jac_table)
logging.getLogger().info(result)
assert result._table_name == jac_table
assert result._index_type == IndexType.FLAT
def test_drop_index_partition(self, connect, jac_table, get_jaccard_index):
'''
target: test drop index interface
method: create table, create partition and add vectors in it, create index on table, call drop table index
expected: return code 0, and default index param
'''
index_param = get_jaccard_index["index_param"]
index_type = get_jaccard_index["index_type"]
status = connect.create_partition(jac_table, tag)
status, ids = connect.add_vectors(jac_table, vectors, partition_tag=tag)
status = connect.create_index(jac_table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(jac_table)
logging.getLogger().info(result)
status = connect.drop_index(jac_table)
assert status.OK()
status, result = connect.describe_index(jac_table)
logging.getLogger().info(result)
assert result._table_name == jac_table
assert result._index_type == IndexType.FLAT
class TestIndexHAM:
tmp, vectors = gen_binary_vectors(nb, dim)
@pytest.fixture(
scope="function",
params=gen_index()
)
def get_index(self, request, connect):
if str(connect._cmd("mode")[1]) == "CPU":
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in CPU mode")
if request.param["index_type"] == IndexType.IVF_PQ or request.param["index_type"] == IndexType.HNSW:
pytest.skip("Skip PQ Temporary")
return request.param
@pytest.fixture(
scope="function",
params=gen_simple_index()
)
def get_simple_index(self, request, connect):
if str(connect._cmd("mode")[1]) == "CPU":
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in CPU mode")
if request.param["index_type"] == IndexType.IVF_PQ or request.param["index_type"] == IndexType.HNSW:
pytest.skip("Skip PQ Temporary")
return request.param
@pytest.fixture(
scope="function",
params=gen_simple_index()
)
def get_hamming_index(self, request, connect):
logging.getLogger().info(request.param)
if request.param["index_type"] == IndexType.IVFLAT or request.param["index_type"] == IndexType.FLAT:
return request.param
else:
pytest.skip("Skip index Temporary")
"""
******************************************************************
The following cases are used to test `create_index` function
******************************************************************
"""
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index(self, connect, ham_table, get_hamming_index):
'''
target: test create index interface
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_hamming_index["index_param"]
index_type = get_hamming_index["index_type"]
logging.getLogger().info(get_hamming_index)
status, ids = connect.add_vectors(ham_table, self.vectors)
status = connect.create_index(ham_table, index_type, index_param)
if index_type != IndexType.FLAT and index_type != IndexType.IVFLAT:
assert not status.OK()
else:
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_partition(self, connect, ham_table, get_hamming_index):
'''
target: test create index interface
method: create table, create partition, and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_hamming_index["index_param"]
index_type = get_hamming_index["index_type"]
logging.getLogger().info(get_hamming_index)
status = connect.create_partition(ham_table, tag)
status, ids = connect.add_vectors(ham_table, self.vectors, partition_tag=tag)
status = connect.create_index(ham_table, index_type, index_param)
assert status.OK()
status, res = connect.get_table_row_count(ham_table)
assert res == len(self.vectors)
@pytest.mark.level(2)
def test_create_index_without_connect(self, dis_connect, ham_table):
'''
target: test create index without connection
method: create table and add vectors in it, check if added successfully
expected: raise exception
'''
nlist = NLIST
index_param = {"nlist": nlist}
with pytest.raises(Exception) as e:
status = dis_connect.create_index(ham_table, IndexType.IVF_SQ8, index_param)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_search_with_query_vectors(self, connect, ham_table, get_hamming_index):
'''
target: test create index interface, search with more query vectors
method: create table and add vectors in it, create index
expected: return code equals to 0, and search success
'''
index_param = get_hamming_index["index_param"]
index_type = get_hamming_index["index_type"]
logging.getLogger().info(get_hamming_index)
status, ids = connect.add_vectors(ham_table, self.vectors)
status = connect.create_index(ham_table, index_type, index_param)
logging.getLogger().info(connect.describe_index(ham_table))
query_vecs = [self.vectors[0], self.vectors[1], self.vectors[2]]
top_k = 5
search_param = get_search_param(index_type)
status, result = connect.search_vectors(ham_table, top_k, query_vecs, params=search_param)
logging.getLogger().info(result)
assert status.OK()
assert len(result) == len(query_vecs)
"""
******************************************************************
The following cases are used to test `describe_index` function
******************************************************************
"""
def test_describe_index(self, connect, ham_table, get_hamming_index):
'''
target: test describe index interface
method: create table and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
index_param = get_hamming_index["index_param"]
index_type = get_hamming_index["index_type"]
logging.getLogger().info(get_hamming_index)
# status, ids = connect.add_vectors(jac_table, vectors[:5000])
status = connect.create_index(ham_table, index_type, index_param)
status, result = connect.describe_index(ham_table)
logging.getLogger().info(result)
assert result._table_name == ham_table
assert result._index_type == index_type
assert result._params == index_param
def test_describe_index_partition(self, connect, ham_table, get_hamming_index):
'''
target: test describe index interface
method: create table, create partition and add vectors in it, create index, call describe index
expected: return code 0, and index instructure
'''
index_param = get_hamming_index["index_param"]
index_type = get_hamming_index["index_type"]
logging.getLogger().info(get_hamming_index)
status = connect.create_partition(ham_table, tag)
status, ids = connect.add_vectors(ham_table, vectors, partition_tag=tag)
status = connect.create_index(ham_table, index_type, index_param)
status, result = connect.describe_index(ham_table)
logging.getLogger().info(result)
assert result._params == index_param
assert result._table_name == ham_table
assert result._index_type == index_type
"""
******************************************************************
The following cases are used to test `drop_index` function
******************************************************************
"""
def test_drop_index(self, connect, ham_table, get_hamming_index):
'''
target: test drop index interface
method: create table and add vectors in it, create index, call drop index
expected: return code 0, and default index param
'''
index_param = get_hamming_index["index_param"]
index_type = get_hamming_index["index_type"]
status, mode = connect._cmd("mode")
assert status.OK()
# status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(ham_table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(ham_table)
logging.getLogger().info(result)
status = connect.drop_index(ham_table)
assert status.OK()
status, result = connect.describe_index(ham_table)
logging.getLogger().info(result)
assert result._table_name == ham_table
assert result._index_type == IndexType.FLAT
def test_drop_index_partition(self, connect, ham_table, get_hamming_index):
'''
target: test drop index interface
method: create table, create partition and add vectors in it, create index on table, call drop table index
expected: return code 0, and default index param
'''
index_param = get_hamming_index["index_param"]
index_type = get_hamming_index["index_type"]
status = connect.create_partition(ham_table, tag)
status, ids = connect.add_vectors(ham_table, vectors, partition_tag=tag)
status = connect.create_index(ham_table, index_type, index_param)
assert status.OK()
status, result = connect.describe_index(ham_table)
logging.getLogger().info(result)
status = connect.drop_index(ham_table)
assert status.OK()
status, result = connect.describe_index(ham_table)
logging.getLogger().info(result)
assert result._table_name == ham_table
assert result._index_type == IndexType.FLAT
class TestIndexTableInvalid(object):
"""
Test create / describe / drop index interfaces with invalid table names
"""
@pytest.fixture(
scope="function",
params=gen_invalid_table_names()
)
def get_table_name(self, request):
yield request.param
@pytest.mark.level(1)
def test_create_index_with_invalid_tablename(self, connect, get_table_name):
table_name = get_table_name
nlist = NLIST
index_param = {"nlist": nlist}
status = connect.create_index(table_name, IndexType.IVF_SQ8, index_param)
assert not status.OK()
@pytest.mark.level(1)
def test_describe_index_with_invalid_tablename(self, connect, get_table_name):
table_name = get_table_name
status, result = connect.describe_index(table_name)
assert not status.OK()
@pytest.mark.level(1)
def test_drop_index_with_invalid_tablename(self, connect, get_table_name):
table_name = get_table_name
status = connect.drop_index(table_name)
assert not status.OK()
class TestCreateIndexParamsInvalid(object):
"""
Test Building index with invalid table names, table names not in db
"""
@pytest.fixture(
scope="function",
params=gen_invalid_index()
)
def get_index(self, request):
yield request.param
@pytest.mark.level(1)
def test_create_index_with_invalid_index_params(self, connect, table, get_index):
index_param = get_index["index_param"]
index_type = get_index["index_type"]
logging.getLogger().info(get_index)
# status, ids = connect.add_vectors(table, vectors)
if (not index_type) or (not isinstance(index_type, IndexType)):
with pytest.raises(Exception) as e:
status = connect.create_index(table, index_type, index_param)
else:
status = connect.create_index(table, index_type, index_param)
assert not status.OK()
"""
Test Building index with invalid nlist
"""
@pytest.fixture(
scope="function",
params=[IndexType.FLAT,IndexType.IVFLAT,IndexType.IVF_SQ8,IndexType.IVF_SQ8H]
)
def get_index_type(self, request):
yield request.param
def test_create_index_with_invalid_nlist(self, connect, table, get_index_type):
status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, get_index_type, {"nlist": INVALID_NLIST})
if get_index_type != IndexType.FLAT:
assert not status.OK()
'''
Test Building index with empty params
'''
def test_create_index_with_empty_param(self, connect, table, get_index_type):
logging.getLogger().info(get_index_type)
status = connect.create_index(table, get_index_type, {})
if get_index_type != IndexType.FLAT :
assert not status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
assert result._table_name == table
assert result._index_type == IndexType.FLAT