mirror of https://github.com/milvus-io/milvus.git
test: add query with text match filter (#36381)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com> Co-authored-by: yanliang567 <82361606+yanliang567@users.noreply.github.com>pull/36496/head
parent
aee046e52c
commit
58baeee8f1
|
@ -21,8 +21,12 @@ from common.common_params import ExprCheckParams
|
|||
from utils.util_log import test_log as log
|
||||
from customize.milvus_operator import MilvusOperator
|
||||
import pickle
|
||||
from collections import Counter
|
||||
import bm25s
|
||||
import jieba
|
||||
fake = Faker()
|
||||
|
||||
|
||||
from common.common_params import Expr
|
||||
"""" Methods of processing data """
|
||||
|
||||
|
@ -72,6 +76,210 @@ class ParamInfo:
|
|||
param_info = ParamInfo()
|
||||
|
||||
|
||||
def analyze_documents(texts, language="en"):
|
||||
stopwords = "en"
|
||||
if language in ["en", "english"]:
|
||||
stopwords = "en"
|
||||
if language in ["zh", "cn", "chinese"]:
|
||||
stopword = " "
|
||||
new_texts = []
|
||||
for doc in texts:
|
||||
seg_list = jieba.cut(doc, cut_all=True)
|
||||
new_texts.append(" ".join(seg_list))
|
||||
texts = new_texts
|
||||
stopwords = [stopword]
|
||||
# Start timing
|
||||
t0 = time.time()
|
||||
|
||||
# Tokenize the corpus
|
||||
tokenized = bm25s.tokenize(texts, lower=True, stopwords=stopwords)
|
||||
# log.info(f"Tokenized: {tokenized}")
|
||||
# Create a frequency counter
|
||||
freq = Counter()
|
||||
|
||||
# Count the frequency of each token
|
||||
for doc_ids in tokenized.ids:
|
||||
freq.update(doc_ids)
|
||||
# Create a reverse vocabulary mapping
|
||||
id_to_word = {id: word for word, id in tokenized.vocab.items()}
|
||||
|
||||
# Convert token ids back to words
|
||||
word_freq = Counter({id_to_word[token_id]: count for token_id, count in freq.items()})
|
||||
|
||||
# End timing
|
||||
tt = time.time() - t0
|
||||
log.info(f"Analyze document cost time: {tt}")
|
||||
|
||||
return word_freq
|
||||
|
||||
|
||||
def split_dataframes(df, fields, language="en"):
|
||||
df_copy = df.copy()
|
||||
if language in ["zh", "cn", "chinese"]:
|
||||
for col in fields:
|
||||
new_texts = []
|
||||
for doc in df[col]:
|
||||
seg_list = jieba.cut(doc)
|
||||
new_texts.append(seg_list)
|
||||
df_copy[col] = new_texts
|
||||
return df_copy
|
||||
for col in fields:
|
||||
texts = df[col].to_list()
|
||||
tokenized = bm25s.tokenize(texts, lower=True, stopwords="en")
|
||||
new_texts = []
|
||||
id_vocab_map = {id: word for word, id in tokenized.vocab.items()}
|
||||
for doc_ids in tokenized.ids:
|
||||
new_texts.append([id_vocab_map[token_id] for token_id in doc_ids])
|
||||
|
||||
df_copy[col] = new_texts
|
||||
return df_copy
|
||||
|
||||
|
||||
def generate_pandas_text_match_result(expr, df):
|
||||
def manual_check(expr):
|
||||
if "not" in expr:
|
||||
key = expr["not"]["field"]
|
||||
value = expr["not"]["value"]
|
||||
return lambda row: value not in row[key]
|
||||
key = expr["field"]
|
||||
value = expr["value"]
|
||||
return lambda row: value in row[key]
|
||||
if "not" in expr:
|
||||
key = expr["not"]["field"]
|
||||
else:
|
||||
key = expr["field"]
|
||||
manual_result = df[df.apply(manual_check(expr), axis=1)]
|
||||
log.info(f"pandas filter result {len(manual_result)}\n{manual_result[key]}")
|
||||
return manual_result
|
||||
|
||||
|
||||
def generate_text_match_expr(query_dict):
|
||||
"""
|
||||
Generate a TextMatch expression with multiple logical operators and field names.
|
||||
:param query_dict: A dictionary representing the query structure
|
||||
:return: A string representing the TextMatch expression
|
||||
"""
|
||||
|
||||
def process_node(node):
|
||||
if isinstance(node, dict) and 'field' in node and 'value' in node:
|
||||
return f"TextMatch({node['field']}, '{node['value']}')"
|
||||
elif isinstance(node, dict) and 'not' in node:
|
||||
return f"not {process_node(node['not'])}"
|
||||
elif isinstance(node, list):
|
||||
return ' '.join(process_node(item) for item in node)
|
||||
elif isinstance(node, str):
|
||||
return node
|
||||
else:
|
||||
raise ValueError(f"Invalid node type: {type(node)}")
|
||||
|
||||
return f"({process_node(query_dict)})"
|
||||
|
||||
|
||||
def generate_pandas_query_string(query):
|
||||
def process_node(node):
|
||||
if isinstance(node, dict):
|
||||
if 'field' in node and 'value' in node:
|
||||
return f"('{node['value']}' in row['{node['field']}'])"
|
||||
elif 'not' in node:
|
||||
return f"not {process_node(node['not'])}"
|
||||
elif isinstance(node, str):
|
||||
return node
|
||||
else:
|
||||
raise ValueError(f"Invalid node type: {type(node)}")
|
||||
|
||||
parts = [process_node(item) for item in query]
|
||||
expression = ' '.join(parts).replace('and', 'and').replace('or', 'or')
|
||||
log.info(f"Generated pandas query: {expression}")
|
||||
return lambda row: eval(expression)
|
||||
|
||||
|
||||
def evaluate_expression(step_by_step_results):
|
||||
# merge result of different steps to final result
|
||||
def apply_operator(operators, operands):
|
||||
operator = operators.pop()
|
||||
right = operands.pop()
|
||||
left = operands.pop()
|
||||
if operator == "and":
|
||||
operands.append(left.intersection(right))
|
||||
elif operator == "or":
|
||||
operands.append(left.union(right))
|
||||
|
||||
operators = []
|
||||
operands = []
|
||||
|
||||
for item in step_by_step_results:
|
||||
if isinstance(item, list):
|
||||
operands.append(set(item))
|
||||
elif item in ("and", "or"):
|
||||
while operators and operators[-1] == "and" and item == "or":
|
||||
apply_operator(operators, operands)
|
||||
operators.append(item)
|
||||
while operators:
|
||||
apply_operator(operators, operands)
|
||||
|
||||
return operands[0] if operands else set()
|
||||
|
||||
|
||||
def generate_random_query_from_freq_dict(freq_dict, min_freq=1, max_terms=3, p_not=0.2):
|
||||
"""
|
||||
Generate a random query expression from a dictionary of field frequencies.
|
||||
:param freq_dict: A dictionary where keys are field names and values are word frequency dictionaries
|
||||
:param min_freq: Minimum frequency for a word to be included in the query (default: 1)
|
||||
:param max_terms: Maximum number of terms in the query (default: 3)
|
||||
:param p_not: Probability of using NOT for any term (default: 0.2)
|
||||
:return: A tuple of (query list, query expression string)
|
||||
example:
|
||||
freq_dict = {
|
||||
"title": {"The": 3, "Lord": 2, "Rings": 2, "Harry": 1, "Potter": 1},
|
||||
"author": {"Tolkien": 2, "Rowling": 1, "Orwell": 1},
|
||||
"description": {"adventure": 4, "fantasy": 3, "magic": 1, "dystopian": 2}
|
||||
}
|
||||
print("Random queries from frequency dictionary:")
|
||||
for _ in range(5):
|
||||
query_list, expr = generate_random_query_from_freq_dict(freq_dict, min_freq=1, max_terms=4, p_not=0.2)
|
||||
print(f"Query: {query_list}")
|
||||
print(f"Expression: {expr}")
|
||||
print()
|
||||
"""
|
||||
|
||||
def random_term(field, words):
|
||||
term = {"field": field, "value": random.choice(words)}
|
||||
if random.random() < p_not:
|
||||
return {"not": term}
|
||||
return term
|
||||
|
||||
# Filter words based on min_freq
|
||||
filtered_dict = {
|
||||
field: [word for word, freq in words.items() if freq >= min_freq]
|
||||
for field, words in freq_dict.items()
|
||||
}
|
||||
|
||||
# Remove empty fields
|
||||
filtered_dict = {k: v for k, v in filtered_dict.items() if v}
|
||||
|
||||
if not filtered_dict:
|
||||
return [], ""
|
||||
|
||||
# Randomly select fields and terms
|
||||
query = []
|
||||
for _ in range(min(max_terms, sum(len(words) for words in filtered_dict.values()))):
|
||||
if not filtered_dict:
|
||||
break
|
||||
field = random.choice(list(filtered_dict.keys()))
|
||||
if filtered_dict[field]:
|
||||
term = random_term(field, filtered_dict[field])
|
||||
query.append(term)
|
||||
# Insert random AND/OR between terms
|
||||
if query and _ < max_terms - 1:
|
||||
query.append(random.choice(["and", "or"]))
|
||||
# Remove the used word to avoid repetition
|
||||
used_word = term['value'] if isinstance(term, dict) and 'value' in term else term['not']['value']
|
||||
filtered_dict[field].remove(used_word)
|
||||
if not filtered_dict[field]:
|
||||
del filtered_dict[field]
|
||||
return query, generate_text_match_expr(query), generate_pandas_query_string(query)
|
||||
|
||||
|
||||
def generate_array_dataset(size, array_length, hit_probabilities, target_values):
|
||||
dataset = []
|
||||
target_array_length = target_values.get('array_length_field', None)
|
||||
|
|
|
@ -12,8 +12,7 @@ allure-pytest==2.7.0
|
|||
pytest-print==0.2.1
|
||||
pytest-level==0.1.1
|
||||
pytest-xdist==2.5.0
|
||||
pymilvus==2.5.0rc81
|
||||
pymilvus[bulk_writer]==2.5.0rc81
|
||||
|
||||
pytest-rerunfailures==9.1.1
|
||||
git+https://github.com/Projectplace/pytest-tags
|
||||
ndg-httpsclient
|
||||
|
@ -27,6 +26,10 @@ pytest-sugar==0.9.5
|
|||
pytest-parallel
|
||||
pytest-random-order
|
||||
|
||||
# pymilvus
|
||||
pymilvus==2.5.0rc81
|
||||
pymilvus[bulk_writer]==2.5.0rc81
|
||||
|
||||
# for customize config test
|
||||
python-benedict==0.24.3
|
||||
timeout-decorator==0.5.0
|
||||
|
@ -51,7 +54,7 @@ rich==13.7.0
|
|||
etcd-sdk-python==0.0.4
|
||||
deepdiff==6.7.1
|
||||
|
||||
# for test result anaylszer
|
||||
# for test result analyzer
|
||||
prettytable==3.8.0
|
||||
pyarrow==14.0.1
|
||||
fastparquet==2023.7.0
|
||||
|
@ -59,3 +62,9 @@ fastparquet==2023.7.0
|
|||
# for bf16 datatype
|
||||
ml-dtypes==0.2.0
|
||||
|
||||
# for text match
|
||||
bm25s==0.2.0
|
||||
jieba==0.42.1
|
||||
|
||||
# for perf test
|
||||
locust==2.25.0
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue