[test]Enable standby during rolling update and refine bulk insert (#26039)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/26046/head
zhuwenxing 2023-08-01 09:37:04 +08:00 committed by GitHub
parent 8b5e276361
commit 037a58a60d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 18 deletions

View File

@ -8,6 +8,7 @@ from base.collection_wrapper import ApiCollectionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from common import common_func as cf
from common import common_type as ct
from common.milvus_sys import MilvusSys
from chaos import constants
from common.common_type import CheckTasks
@ -30,7 +31,7 @@ class Op(Enum):
unknown = 'unknown'
timeout = 120
timeout = 10
enable_traceback = False
DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}'
@ -103,6 +104,8 @@ class Checker:
self.rsp_times = []
self.average_time = 0
self.files = []
self.ms = MilvusSys()
self.bucket_name = self.ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
self.c_wrap = ApiCollectionWrapper()
self.utility_wrap = ApiUtilityWrapper()
c_name = collection_name if collection_name is not None else cf.gen_unique_str(
@ -180,16 +183,21 @@ class Checker:
nb=constants.ENTITIES_FOR_BULKINSERT,
file_type="npy",
minio_endpoint="127.0.0.1:9000",
bucket_name="milvus-bucket"):
bucket_name=None):
schema = self.schema
bucket_name = self.bucket_name if bucket_name is None else bucket_name
log.info(f"prepare data for bulk insert")
files = cf.prepare_bulk_insert_data(schema=schema,
nb=nb,
file_type=file_type,
minio_endpoint=minio_endpoint,
bucket_name=bucket_name)
self.files = files
return files
try:
files = cf.prepare_bulk_insert_data(schema=schema,
nb=nb,
file_type=file_type,
minio_endpoint=minio_endpoint,
bucket_name=bucket_name)
self.files = files
return files, True
except Exception as e:
log.error(f"prepare data for bulk insert failed with error {e}")
return [], False
def do_bulk_insert(self):
log.info(f"bulk insert collection name: {self.c_name}")

View File

@ -2,9 +2,10 @@ import time
import json
from collections import defaultdict
import pytest
from pymilvus import Collection
from base.client_base import TestcaseBase
from deploy.common import get_chaos_test_collections
from chaos import constants
from common.common_type import CaseLabel
from utils.util_log import test_log as log
@ -19,6 +20,8 @@ class TestGetCollections(TestcaseBase):
all_collections = [c_name for c_name in all_collections if "Checker" in c_name]
selected_collections_map = {}
for c_name in all_collections:
if Collection(name=c_name).num_entities < constants.ENTITIES_FOR_SEARCH:
continue
prefix = c_name.split("_")[0]
if prefix not in selected_collections_map:
selected_collections_map[prefix] = [c_name]

View File

@ -75,13 +75,22 @@ class TestOperations(TestBase):
self.init_health_checkers(collection_name=c_name)
# prepare data by bulk insert
log.info("*********************Prepare Data by bulk insert**********************")
cc.start_monitor_threads(self.health_checkers)
for k, v in self.health_checkers.items():
log.info(f"prepare bulk insert data for {k}")
v.prepare_bulk_insert_data(minio_endpoint=self.minio_endpoint)
v.do_bulk_insert()
if k in [Op.search, Op.query]:
log.info(f"prepare bulk insert data for {k}")
v.prepare_bulk_insert_data(minio_endpoint=self.minio_endpoint)
completed = False
retry_times = 0
while not completed and retry_times < 3:
completed, result = v.do_bulk_insert()
if not completed:
log.info(f"do bulk insert failed: {result}")
retry_times += 1
sleep(5)
# how to make sure the bulk insert done before rolling update?
log.info("*********************Load Start**********************")
cc.start_monitor_threads(self.health_checkers)
# wait request_duration
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
if request_duration[-1] == "+":
@ -95,8 +104,11 @@ class TestOperations(TestBase):
v.pause()
for k, v in self.health_checkers.items():
v.check_result()
for k, v in self.health_checkers.items():
for k, v in self.health_checkers.items():
log.info(f"{k} failed request: {v.fail_records}")
for k, v in self.health_checkers.items():
log.info(f"{k} rto: {v.get_rto()}")
if is_check:
assert_statistic(self.health_checkers, succ_rate_threshold=0.98)
assert_expectations()

View File

@ -2,7 +2,7 @@
apiVersion: milvus.io/v1beta1
kind: Milvus
metadata:
name: kafka-demo
name: operator-demo
namespace: chaos-testing
labels:
app: milvus
@ -12,6 +12,16 @@ spec:
dataNode:
memory:
forceSyncEnable: false
rootCoord:
enableActiveStandby: true
dataCoord:
enableActiveStandby: true
queryCoord:
enableActiveStandby: true
indexCoord:
enableActiveStandby: true
# mixCoord:
# enableActiveStandby: true
quotaAndLimits:
enable: false
log:
@ -28,7 +38,7 @@ spec:
queryNode:
replicas: 2
mixCoord:
replicas: 1
replicas: 1
dependencies:
msgStreamType: kafka
etcd: