mirror of https://github.com/milvus-io/milvus.git
test: add restful testcases for database api (#38282)
pr: https://github.com/milvus-io/milvus/pull/38281 Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/38306/head
parent
4e5828139b
commit
d2c20ed68b
|
@ -10,6 +10,7 @@ from tenacity import retry, retry_if_exception_type, stop_after_attempt
|
|||
from requests.exceptions import ConnectionError
|
||||
import urllib.parse
|
||||
|
||||
|
||||
ENABLE_LOG_SAVE = False
|
||||
|
||||
|
||||
|
@ -334,6 +335,7 @@ class CollectionClient(Requests):
|
|||
self.endpoint = endpoint
|
||||
self.api_key = token
|
||||
self.db_name = None
|
||||
self.name_list = []
|
||||
self.headers = self.update_headers()
|
||||
|
||||
@classmethod
|
||||
|
@ -435,6 +437,10 @@ class CollectionClient(Requests):
|
|||
|
||||
def collection_create(self, payload, db_name="default"):
|
||||
time.sleep(1) # wait for collection created and in case of rate limit
|
||||
c_name = payload.get("collectionName", None)
|
||||
db_name = payload.get("dbName", db_name)
|
||||
self.name_list.append((db_name, c_name))
|
||||
|
||||
url = f'{self.endpoint}/v2/vectordb/collections/create'
|
||||
if self.db_name is not None:
|
||||
payload["dbName"] = self.db_name
|
||||
|
@ -897,6 +903,56 @@ class ImportJobClient(Requests):
|
|||
return rsp, finished
|
||||
|
||||
|
||||
class DatabaseClient(Requests):
|
||||
def __init__(self, endpoint, token):
|
||||
super().__init__(url=endpoint, api_key=token)
|
||||
self.endpoint = endpoint
|
||||
self.api_key = token
|
||||
self.headers = self.update_headers()
|
||||
self.db_name = None
|
||||
self.db_names = [] # Track created databases
|
||||
|
||||
@classmethod
|
||||
def update_headers(cls):
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {cls.api_key}'
|
||||
}
|
||||
return headers
|
||||
|
||||
def database_create(self, payload):
|
||||
"""Create a database"""
|
||||
url = f"{self.endpoint}/v2/vectordb/databases/create"
|
||||
rsp = self.post(url, data=payload).json()
|
||||
if rsp['code'] == 0:
|
||||
self.db_name = payload['dbName']
|
||||
self.db_names.append(payload['dbName'])
|
||||
return rsp
|
||||
|
||||
def database_list(self, payload):
|
||||
"""List all databases"""
|
||||
url = f"{self.endpoint}/v2/vectordb/databases/list"
|
||||
return self.post(url, data=payload).json()
|
||||
|
||||
def database_describe(self, payload):
|
||||
"""Describe a database"""
|
||||
url = f"{self.endpoint}/v2/vectordb/databases/describe"
|
||||
return self.post(url, data=payload).json()
|
||||
|
||||
def database_alter(self, payload):
|
||||
"""Alter database properties"""
|
||||
url = f"{self.endpoint}/v2/vectordb/databases/alter"
|
||||
return self.post(url, data=payload).json()
|
||||
|
||||
def database_drop(self, payload):
|
||||
"""Drop a database"""
|
||||
url = f"{self.endpoint}/v2/vectordb/databases/drop"
|
||||
rsp = self.post(url, data=payload).json()
|
||||
if rsp['code'] == 0 and payload['dbName'] in self.db_names:
|
||||
self.db_names.remove(payload['dbName'])
|
||||
return rsp
|
||||
|
||||
|
||||
class StorageClient():
|
||||
|
||||
def __init__(self, endpoint, access_key, secret_key, bucket_name, root_path="file"):
|
||||
|
|
|
@ -3,10 +3,10 @@ import sys
|
|||
import pytest
|
||||
import time
|
||||
import uuid
|
||||
from pymilvus import connections, db
|
||||
from pymilvus import connections, db, MilvusClient
|
||||
from utils.util_log import test_log as logger
|
||||
from api.milvus import (VectorClient, CollectionClient, PartitionClient, IndexClient, AliasClient,
|
||||
UserClient, RoleClient, ImportJobClient, StorageClient, Requests)
|
||||
UserClient, RoleClient, ImportJobClient, StorageClient, Requests, DatabaseClient)
|
||||
from utils.utils import get_data_by_payload
|
||||
|
||||
|
||||
|
@ -33,11 +33,15 @@ class Base:
|
|||
role_client = None
|
||||
import_job_client = None
|
||||
storage_client = None
|
||||
milvus_client = None
|
||||
database_client = None
|
||||
|
||||
|
||||
class TestBase(Base):
|
||||
req = None
|
||||
|
||||
def teardown_method(self):
|
||||
# Clean up collections
|
||||
self.collection_client.api_key = self.api_key
|
||||
all_collections = self.collection_client.collection_list()['data']
|
||||
if self.name in all_collections:
|
||||
|
@ -48,11 +52,28 @@ class TestBase(Base):
|
|||
try:
|
||||
rsp = self.collection_client.collection_drop(payload)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.error(f"drop collection error: {e}")
|
||||
|
||||
# def setup_method(self):
|
||||
# self.req = Requests()
|
||||
# self.req.uuid = str(uuid.uuid1())
|
||||
for item in self.collection_client.name_list:
|
||||
db_name = item[0]
|
||||
c_name = item[1]
|
||||
payload = {
|
||||
"collectionName": c_name,
|
||||
"dbName": db_name
|
||||
}
|
||||
try:
|
||||
self.collection_client.collection_drop(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"drop collection error: {e}")
|
||||
|
||||
# Clean up databases created by this client
|
||||
self.database_client.api_key = self.api_key
|
||||
for db_name in self.database_client.db_names[:]: # Create a copy of the list to iterate
|
||||
logger.info(f"database {db_name} exist, drop it")
|
||||
try:
|
||||
rsp = self.database_client.database_drop({"dbName": db_name})
|
||||
except Exception as e:
|
||||
logger.error(f"drop database error: {e}")
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def init_client(self, endpoint, token, minio_host, bucket_name, root_path):
|
||||
|
@ -79,6 +100,8 @@ class TestBase(Base):
|
|||
self.import_job_client = ImportJobClient(self.endpoint, self.api_key)
|
||||
self.import_job_client.update_uuid(_uuid)
|
||||
self.storage_client = StorageClient(f"{minio_host}:9000", "minioadmin", "minioadmin", bucket_name, root_path)
|
||||
self.database_client = DatabaseClient(self.endpoint, self.api_key)
|
||||
self.database_client.update_uuid(_uuid)
|
||||
if token is None:
|
||||
self.vector_client.api_key = None
|
||||
self.collection_client.api_key = None
|
||||
|
@ -162,3 +185,13 @@ class TestBase(Base):
|
|||
self.collection_client.db_name = db_name
|
||||
self.vector_client.db_name = db_name
|
||||
self.import_job_client.db_name = db_name
|
||||
|
||||
def wait_load_completed(self, collection_name, db_name="default", timeout=5):
|
||||
t0 = time.time()
|
||||
while True and time.time() - t0 < timeout:
|
||||
rsp = self.collection_client.collection_describe(collection_name, db_name=db_name)
|
||||
if "data" in rsp and "load" in rsp["data"] and rsp["data"]["load"] == "LoadStateLoaded":
|
||||
logger.info(f"collection {collection_name} load completed in {time.time() - t0} seconds")
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
import pytest
|
||||
from base.testbase import TestBase
|
||||
from utils.utils import gen_unique_str
|
||||
|
||||
|
||||
@pytest.mark.L0
|
||||
class TestDatabaseOperation(TestBase):
|
||||
"""
|
||||
Test cases for database operations
|
||||
"""
|
||||
|
||||
def test_create_database_with_default_properties(self):
|
||||
"""
|
||||
Test creating a database with default properties
|
||||
"""
|
||||
db_name = f"test_db_{gen_unique_str()}"
|
||||
payload = {"dbName": db_name}
|
||||
rsp = self.database_client.database_create(payload)
|
||||
assert rsp["code"] == 0
|
||||
|
||||
# Verify database exists
|
||||
list_rsp = self.database_client.database_list({})
|
||||
assert rsp["code"] == 0
|
||||
assert db_name in list_rsp["data"]
|
||||
|
||||
def test_create_database_with_custom_properties(self):
|
||||
"""
|
||||
Test creating a database with custom properties
|
||||
"""
|
||||
db_name = f"test_db_{gen_unique_str()}"
|
||||
payload = {"dbName": db_name, "properties": {"mmap.enabled": True}}
|
||||
rsp = self.database_client.database_create(payload)
|
||||
assert rsp["code"] == 0
|
||||
|
||||
# Verify properties
|
||||
describe_rsp = self.database_client.database_describe({"dbName": db_name})
|
||||
assert describe_rsp["code"] == 0
|
||||
assert any(
|
||||
prop["key"] == "mmap.enabled" and prop["value"] == "true"
|
||||
for prop in describe_rsp["data"]["properties"]
|
||||
)
|
||||
|
||||
def test_alter_database_properties(self):
|
||||
"""
|
||||
Test altering database properties
|
||||
"""
|
||||
db_name = f"test_db_{gen_unique_str()}"
|
||||
|
||||
# Create database with initial properties
|
||||
create_payload = {"dbName": db_name, "properties": {"mmap.enabled": True}}
|
||||
rsp = self.database_client.database_create(create_payload)
|
||||
assert rsp["code"] == 0
|
||||
# Verify properties
|
||||
describe_rsp = self.database_client.database_describe({"dbName": db_name})
|
||||
assert describe_rsp["code"] == 0
|
||||
assert any(
|
||||
prop["key"] == "mmap.enabled" and prop["value"] == "true"
|
||||
for prop in describe_rsp["data"]["properties"]
|
||||
)
|
||||
|
||||
# Alter properties
|
||||
alter_payload = {"dbName": db_name, "properties": {"mmap.enabled": False}}
|
||||
alter_rsp = self.database_client.database_alter(alter_payload)
|
||||
assert alter_rsp["code"] == 0
|
||||
|
||||
# Verify altered properties
|
||||
describe_rsp = self.database_client.database_describe({"dbName": db_name})
|
||||
assert describe_rsp["code"] == 0
|
||||
assert any(
|
||||
prop["key"] == "mmap.enabled" and prop["value"] == "false"
|
||||
for prop in describe_rsp["data"]["properties"]
|
||||
)
|
||||
|
||||
def test_list_databases(self):
|
||||
"""
|
||||
Test listing databases
|
||||
"""
|
||||
# Create test database
|
||||
db_name = f"test_db_{gen_unique_str()}"
|
||||
self.database_client.database_create({"dbName": db_name})
|
||||
|
||||
# List databases
|
||||
rsp = self.database_client.database_list({})
|
||||
assert rsp["code"] == 0
|
||||
assert "default" in rsp["data"] # Default database should always exist
|
||||
assert db_name in rsp["data"]
|
||||
|
||||
def test_describe_database(self):
|
||||
"""
|
||||
Test describing database
|
||||
"""
|
||||
db_name = f"test_db_{gen_unique_str()}"
|
||||
properties = {"mmap.enabled": True}
|
||||
|
||||
# Create database
|
||||
self.database_client.database_create(
|
||||
{"dbName": db_name, "properties": properties}
|
||||
)
|
||||
|
||||
# Describe database
|
||||
rsp = self.database_client.database_describe({"dbName": db_name})
|
||||
assert rsp["code"] == 0
|
||||
assert rsp["data"]["dbName"] == db_name
|
||||
assert "dbID" in rsp["data"]
|
||||
assert len(rsp["data"]["properties"]) > 0
|
||||
|
||||
|
||||
@pytest.mark.L0
|
||||
class TestDatabaseOperationNegative(TestBase):
|
||||
"""
|
||||
Negative test cases for database operations
|
||||
"""
|
||||
|
||||
def test_create_database_with_invalid_name(self):
|
||||
"""
|
||||
Test creating database with invalid name
|
||||
"""
|
||||
invalid_names = ["", " ", "test db", "test/db", "test\\db"]
|
||||
for name in invalid_names:
|
||||
rsp = self.database_client.database_create({"dbName": name})
|
||||
assert rsp["code"] != 0
|
||||
|
||||
def test_create_duplicate_database(self):
|
||||
"""
|
||||
Test creating database with duplicate name
|
||||
"""
|
||||
db_name = f"test_db_{gen_unique_str()}"
|
||||
|
||||
# Create first database
|
||||
rsp1 = self.database_client.database_create({"dbName": db_name})
|
||||
assert rsp1["code"] == 0
|
||||
|
||||
# Try to create duplicate
|
||||
rsp2 = self.database_client.database_create({"dbName": db_name})
|
||||
assert rsp2["code"] != 0
|
||||
|
||||
def test_describe_non_existent_database(self):
|
||||
"""
|
||||
Test describing non-existent database
|
||||
"""
|
||||
rsp = self.database_client.database_describe({"dbName": "non_existent_db"})
|
||||
assert rsp["code"] != 0
|
||||
|
||||
def test_alter_non_existent_database(self):
|
||||
"""
|
||||
Test altering non-existent database
|
||||
"""
|
||||
payload = {"dbName": "non_existent_db", "properties": {"mmap.enabled": False}}
|
||||
rsp = self.database_client.database_alter(payload)
|
||||
assert rsp["code"] != 0
|
||||
|
||||
def test_drop_non_existent_database(self):
|
||||
"""
|
||||
Test dropping non-existent database
|
||||
"""
|
||||
rsp = self.database_client.database_drop({"dbName": "non_existent_db"})
|
||||
assert rsp["code"] == 0
|
||||
|
||||
def test_drop_default_database(self):
|
||||
"""
|
||||
Test dropping default database (should not be allowed)
|
||||
"""
|
||||
rsp = self.database_client.database_drop({"dbName": "default"})
|
||||
assert rsp["code"] != 0
|
Loading…
Reference in New Issue