mirror of https://github.com/milvus-io/milvus.git
[test]Fix growing segment in testcases (#18259)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/18266/head
parent
54d17bc5da
commit
b9d7027522
|
@ -678,18 +678,11 @@ def get_segment_distribution(res):
|
|||
Get segment distribution
|
||||
"""
|
||||
from collections import defaultdict
|
||||
segment_distribution = defaultdict(lambda: {"growing": [], "sealed": []})
|
||||
segment_distribution = defaultdict(lambda: {"sealed": []})
|
||||
for r in res:
|
||||
for node_id in r.nodeIds:
|
||||
if node_id not in segment_distribution:
|
||||
segment_distribution[node_id] = {
|
||||
"growing": [],
|
||||
"sealed": []
|
||||
}
|
||||
if r.state == 3:
|
||||
segment_distribution[node_id]["sealed"].append(r.segmentID)
|
||||
if r.state == 2:
|
||||
segment_distribution[node_id]["growing"].append(r.segmentID)
|
||||
|
||||
return segment_distribution
|
||||
|
||||
|
|
|
@ -90,7 +90,6 @@ class TestAutoLoadBalance(object):
|
|||
seg_distribution = cf.get_segment_distribution(seg_info)
|
||||
for k in seg_distribution.keys():
|
||||
log.info(f"collection {c}'s segment distribution in node {k} is {seg_distribution[k]['sealed']}")
|
||||
log.info(f"collection {c}'s growing segment distribution in node {k} is {seg_distribution[k]['growing']}")
|
||||
# first assert
|
||||
log.info("first assert")
|
||||
assert_statistic(self.health_checkers)
|
||||
|
@ -107,7 +106,6 @@ class TestAutoLoadBalance(object):
|
|||
seg_distribution = cf.get_segment_distribution(seg_info)
|
||||
for k in seg_distribution.keys():
|
||||
log.info(f"collection {c}'s sealed segment distribution in node {k} is {seg_distribution[k]['sealed']}")
|
||||
log.info(f"collection {c}'s growing segment distribution in node {k} is {seg_distribution[k]['growing']}")
|
||||
# second assert
|
||||
log.info("second assert")
|
||||
assert_statistic(self.health_checkers)
|
||||
|
|
|
@ -118,11 +118,6 @@ class TestCompactionParams(TestcaseBase):
|
|||
c_plans = collection_w.get_compaction_plans()[0]
|
||||
assert len(c_plans.plans) == 0
|
||||
|
||||
collection_w.load()
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
for segment_info in segments_info:
|
||||
assert segment_info.state == SegmentState.Growing
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_compact_empty_collection(self):
|
||||
"""
|
||||
|
|
|
@ -1408,59 +1408,6 @@ class TestUtilityAdvanced(TestcaseBase):
|
|||
res, _ = self.utility_wrap.get_query_segment_info(c_name)
|
||||
assert len(res) == 0
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_get_growing_query_segment_info(self):
|
||||
"""
|
||||
target: test getting growing query segment info of collection with data
|
||||
method: init a collection, insert data, load, search, and get query segment info
|
||||
expected:
|
||||
1. length of segment is greater than 0
|
||||
2. the sum num_rows of each segment is equal to num of entities
|
||||
"""
|
||||
import random
|
||||
dim = 128
|
||||
c_name = cf.gen_unique_str(prefix)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
nb = 3000
|
||||
nq = 2
|
||||
df = cf.gen_default_dataframe_data(nb)
|
||||
collection_w.insert(df)
|
||||
collection_w.load()
|
||||
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
|
||||
collection_w.search(vectors, default_field_name, ct.default_search_params, ct.default_limit)
|
||||
res, _ = self.utility_wrap.get_query_segment_info(c_name)
|
||||
assert len(res) > 0
|
||||
segment_ids = []
|
||||
cnt = 0
|
||||
for r in res:
|
||||
log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows} ")
|
||||
if r.segmentID not in segment_ids:
|
||||
segment_ids.append(r.segmentID)
|
||||
cnt += r.num_rows
|
||||
assert cnt == nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_get_growing_segment_info_after_load(self):
|
||||
"""
|
||||
target: test get growing segment info
|
||||
method: 1.create and load collection
|
||||
2.insert data and no flush
|
||||
3.get the growing segment
|
||||
expected: Verify growing segment num entities
|
||||
"""
|
||||
from pymilvus.grpc_gen.common_pb2 import SegmentState
|
||||
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
|
||||
|
||||
collection_w.load()
|
||||
collection_w.insert(cf.gen_default_dataframe_data())
|
||||
collection_w.search(cf.gen_vectors(1, ct.default_dim), default_field_name, ct.default_search_params, ct.default_limit)
|
||||
seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
num_entities = 0
|
||||
for seg in seg_info:
|
||||
assert seg.state == SegmentState.Growing
|
||||
num_entities += seg.num_rows
|
||||
assert num_entities == ct.default_nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_get_sealed_query_segment_info(self):
|
||||
"""
|
||||
|
@ -1665,11 +1612,8 @@ class TestUtilityAdvanced(TestcaseBase):
|
|||
dst_node_ids = all_querynodes[1:]
|
||||
dst_node_ids.append([node["identifier"] for node in ms.index_nodes][0])
|
||||
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
|
||||
# add a segment id which is not exist or a growing segment
|
||||
if len(segment_distribution[src_node_id]["growing"]) > 0:
|
||||
sealed_segment_ids.append(segment_distribution[src_node_id]["growing"][0])
|
||||
else:
|
||||
sealed_segment_ids.append(max(segment_distribution[src_node_id]["sealed"]) + 1)
|
||||
# add a segment id which is not exist
|
||||
sealed_segment_ids.append(max(segment_distribution[src_node_id]["sealed"]) + 1)
|
||||
# load balance
|
||||
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
|
||||
check_task=CheckTasks.err_res,
|
||||
|
|
Loading…
Reference in New Issue