From 58baeee8f115a5963160e10713a9843f5165bd1d Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Wed, 25 Sep 2024 15:17:13 +0800 Subject: [PATCH] test: add query with text match filter (#36381) Signed-off-by: zhuwenxing Co-authored-by: yanliang567 <82361606+yanliang567@users.noreply.github.com> --- tests/python_client/common/common_func.py | 208 +++++ tests/python_client/requirements.txt | 15 +- tests/python_client/testcases/test_query.py | 976 +++++++++++++++++++- 3 files changed, 1191 insertions(+), 8 deletions(-) diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 201deac56a..103775730c 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -21,8 +21,12 @@ from common.common_params import ExprCheckParams from utils.util_log import test_log as log from customize.milvus_operator import MilvusOperator import pickle +from collections import Counter +import bm25s +import jieba fake = Faker() + from common.common_params import Expr """" Methods of processing data """ @@ -72,6 +76,210 @@ class ParamInfo: param_info = ParamInfo() +def analyze_documents(texts, language="en"): + stopwords = "en" + if language in ["en", "english"]: + stopwords = "en" + if language in ["zh", "cn", "chinese"]: + stopword = " " + new_texts = [] + for doc in texts: + seg_list = jieba.cut(doc, cut_all=True) + new_texts.append(" ".join(seg_list)) + texts = new_texts + stopwords = [stopword] + # Start timing + t0 = time.time() + + # Tokenize the corpus + tokenized = bm25s.tokenize(texts, lower=True, stopwords=stopwords) + # log.info(f"Tokenized: {tokenized}") + # Create a frequency counter + freq = Counter() + + # Count the frequency of each token + for doc_ids in tokenized.ids: + freq.update(doc_ids) + # Create a reverse vocabulary mapping + id_to_word = {id: word for word, id in tokenized.vocab.items()} + + # Convert token ids back to words + word_freq = Counter({id_to_word[token_id]: count for token_id, count in freq.items()}) + + # End timing + tt = time.time() - t0 + log.info(f"Analyze document cost time: {tt}") + + return word_freq + + +def split_dataframes(df, fields, language="en"): + df_copy = df.copy() + if language in ["zh", "cn", "chinese"]: + for col in fields: + new_texts = [] + for doc in df[col]: + seg_list = jieba.cut(doc) + new_texts.append(seg_list) + df_copy[col] = new_texts + return df_copy + for col in fields: + texts = df[col].to_list() + tokenized = bm25s.tokenize(texts, lower=True, stopwords="en") + new_texts = [] + id_vocab_map = {id: word for word, id in tokenized.vocab.items()} + for doc_ids in tokenized.ids: + new_texts.append([id_vocab_map[token_id] for token_id in doc_ids]) + + df_copy[col] = new_texts + return df_copy + + +def generate_pandas_text_match_result(expr, df): + def manual_check(expr): + if "not" in expr: + key = expr["not"]["field"] + value = expr["not"]["value"] + return lambda row: value not in row[key] + key = expr["field"] + value = expr["value"] + return lambda row: value in row[key] + if "not" in expr: + key = expr["not"]["field"] + else: + key = expr["field"] + manual_result = df[df.apply(manual_check(expr), axis=1)] + log.info(f"pandas filter result {len(manual_result)}\n{manual_result[key]}") + return manual_result + + +def generate_text_match_expr(query_dict): + """ + Generate a TextMatch expression with multiple logical operators and field names. + :param query_dict: A dictionary representing the query structure + :return: A string representing the TextMatch expression + """ + + def process_node(node): + if isinstance(node, dict) and 'field' in node and 'value' in node: + return f"TextMatch({node['field']}, '{node['value']}')" + elif isinstance(node, dict) and 'not' in node: + return f"not {process_node(node['not'])}" + elif isinstance(node, list): + return ' '.join(process_node(item) for item in node) + elif isinstance(node, str): + return node + else: + raise ValueError(f"Invalid node type: {type(node)}") + + return f"({process_node(query_dict)})" + + +def generate_pandas_query_string(query): + def process_node(node): + if isinstance(node, dict): + if 'field' in node and 'value' in node: + return f"('{node['value']}' in row['{node['field']}'])" + elif 'not' in node: + return f"not {process_node(node['not'])}" + elif isinstance(node, str): + return node + else: + raise ValueError(f"Invalid node type: {type(node)}") + + parts = [process_node(item) for item in query] + expression = ' '.join(parts).replace('and', 'and').replace('or', 'or') + log.info(f"Generated pandas query: {expression}") + return lambda row: eval(expression) + + +def evaluate_expression(step_by_step_results): + # merge result of different steps to final result + def apply_operator(operators, operands): + operator = operators.pop() + right = operands.pop() + left = operands.pop() + if operator == "and": + operands.append(left.intersection(right)) + elif operator == "or": + operands.append(left.union(right)) + + operators = [] + operands = [] + + for item in step_by_step_results: + if isinstance(item, list): + operands.append(set(item)) + elif item in ("and", "or"): + while operators and operators[-1] == "and" and item == "or": + apply_operator(operators, operands) + operators.append(item) + while operators: + apply_operator(operators, operands) + + return operands[0] if operands else set() + + +def generate_random_query_from_freq_dict(freq_dict, min_freq=1, max_terms=3, p_not=0.2): + """ + Generate a random query expression from a dictionary of field frequencies. + :param freq_dict: A dictionary where keys are field names and values are word frequency dictionaries + :param min_freq: Minimum frequency for a word to be included in the query (default: 1) + :param max_terms: Maximum number of terms in the query (default: 3) + :param p_not: Probability of using NOT for any term (default: 0.2) + :return: A tuple of (query list, query expression string) + example: + freq_dict = { + "title": {"The": 3, "Lord": 2, "Rings": 2, "Harry": 1, "Potter": 1}, + "author": {"Tolkien": 2, "Rowling": 1, "Orwell": 1}, + "description": {"adventure": 4, "fantasy": 3, "magic": 1, "dystopian": 2} + } + print("Random queries from frequency dictionary:") + for _ in range(5): + query_list, expr = generate_random_query_from_freq_dict(freq_dict, min_freq=1, max_terms=4, p_not=0.2) + print(f"Query: {query_list}") + print(f"Expression: {expr}") + print() + """ + + def random_term(field, words): + term = {"field": field, "value": random.choice(words)} + if random.random() < p_not: + return {"not": term} + return term + + # Filter words based on min_freq + filtered_dict = { + field: [word for word, freq in words.items() if freq >= min_freq] + for field, words in freq_dict.items() + } + + # Remove empty fields + filtered_dict = {k: v for k, v in filtered_dict.items() if v} + + if not filtered_dict: + return [], "" + + # Randomly select fields and terms + query = [] + for _ in range(min(max_terms, sum(len(words) for words in filtered_dict.values()))): + if not filtered_dict: + break + field = random.choice(list(filtered_dict.keys())) + if filtered_dict[field]: + term = random_term(field, filtered_dict[field]) + query.append(term) + # Insert random AND/OR between terms + if query and _ < max_terms - 1: + query.append(random.choice(["and", "or"])) + # Remove the used word to avoid repetition + used_word = term['value'] if isinstance(term, dict) and 'value' in term else term['not']['value'] + filtered_dict[field].remove(used_word) + if not filtered_dict[field]: + del filtered_dict[field] + return query, generate_text_match_expr(query), generate_pandas_query_string(query) + + def generate_array_dataset(size, array_length, hit_probabilities, target_values): dataset = [] target_array_length = target_values.get('array_length_field', None) diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 3e2af12e12..0ee88da6d6 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -12,8 +12,7 @@ allure-pytest==2.7.0 pytest-print==0.2.1 pytest-level==0.1.1 pytest-xdist==2.5.0 -pymilvus==2.5.0rc81 -pymilvus[bulk_writer]==2.5.0rc81 + pytest-rerunfailures==9.1.1 git+https://github.com/Projectplace/pytest-tags ndg-httpsclient @@ -27,6 +26,10 @@ pytest-sugar==0.9.5 pytest-parallel pytest-random-order +# pymilvus +pymilvus==2.5.0rc81 +pymilvus[bulk_writer]==2.5.0rc81 + # for customize config test python-benedict==0.24.3 timeout-decorator==0.5.0 @@ -51,7 +54,7 @@ rich==13.7.0 etcd-sdk-python==0.0.4 deepdiff==6.7.1 -# for test result anaylszer +# for test result analyzer prettytable==3.8.0 pyarrow==14.0.1 fastparquet==2023.7.0 @@ -59,3 +62,9 @@ fastparquet==2023.7.0 # for bf16 datatype ml-dtypes==0.2.0 +# for text match +bm25s==0.2.0 +jieba==0.42.1 + +# for perf test +locust==2.25.0 diff --git a/tests/python_client/testcases/test_query.py b/tests/python_client/testcases/test_query.py index 70427e8f94..e40f5c4fdc 100644 --- a/tests/python_client/testcases/test_query.py +++ b/tests/python_client/testcases/test_query.py @@ -1,25 +1,34 @@ +import jieba + import utils.util_pymilvus as ut from utils.util_log import test_log as log from common.common_type import CaseLabel, CheckTasks from common import common_type as ct from common import common_func as cf -from common.code_mapping import CollectionErrorMessage as clem from common.code_mapping import ConnectionErrorMessage as cem from base.client_base import TestcaseBase from pymilvus.orm.types import CONSISTENCY_STRONG, CONSISTENCY_BOUNDED, CONSISTENCY_EVENTUALLY from pymilvus import ( - FieldSchema, CollectionSchema, DataType, - Collection + FieldSchema, + CollectionSchema, + DataType, ) import threading from pymilvus import DefaultConfig -from datetime import datetime import time import pytest import random import numpy as np import pandas as pd +from collections import Counter +from faker import Faker + +Faker.seed(19530) + +fake_en = Faker("en_US") +fake_zh = Faker("zh_CN") +fake_de = Faker("de_DE") pd.set_option("expand_frame_repr", False) @@ -4489,4 +4498,961 @@ class TestQueryNoneAndDefaultData(TestcaseBase): term_expr = f'{ct.default_int64_field_name} in {int_values[:pos]}' collection_w.query(term_expr, output_fields=[ct.default_int64_field_name, default_float_field_name], - check_task=CheckTasks.check_query_results, check_items={exp_res: res}) \ No newline at end of file + check_task=CheckTasks.check_query_results, check_items={exp_res: res}) + +class TestQueryTextMatch(TestcaseBase): + """ + ****************************************************************** + The following cases are used to test query text match + ****************************************************************** + """ + + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("enable_partition_key", [True, False]) + @pytest.mark.parametrize("enable_inverted_index", [True, False]) + @pytest.mark.parametrize("tokenizer", ["jieba", "default"]) + def test_query_text_match_normal( + self, tokenizer, enable_inverted_index, enable_partition_key + ): + """ + target: test text match normal + method: 1. enable text match and insert data with varchar + 2. get the most common words and query with text match + 3. verify the result + expected: text match successfully and result is correct + """ + analyzer_params = { + "tokenizer": tokenizer, + } + dim = 128 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + is_partition_key=enable_partition_key, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + data_size = 3000 + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema + ) + fake = fake_en + if tokenizer == "jieba": + language = "zh" + fake = fake_zh + else: + language = "en" + + data = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size) + ] + df = pd.DataFrame(data) + log.info(f"dataframe\n{df}") + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < len(df) + else data[i : len(df)] + ) + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + if enable_inverted_index: + collection_w.create_index("word", {"index_type": "INVERTED"}) + collection_w.load() + # analyze the croup + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) + # query single field for one token + for field in text_fields: + token = wf_map[field].most_common()[0][0] + expr = f"TextMatch({field}, '{token}')" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + assert len(res) > 0 + log.info(f"res len {len(res)}") + for r in res: + assert token in r[field] + + # verify inverted index + if enable_inverted_index: + if field == "word": + expr = f"{field} == '{token}'" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + for r in res: + assert r[field] == token + # query single field for multi-word + for field in text_fields: + # match top 10 most common words + top_10_tokens = [] + for word, count in wf_map[field].most_common(10): + top_10_tokens.append(word) + string_of_top_10_words = " ".join(top_10_tokens) + expr = f"TextMatch({field}, '{string_of_top_10_words}')" + log.info(f"expr {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + for r in res: + assert any([token in r[field] for token in top_10_tokens]) + + @pytest.mark.skip("unimplemented") + @pytest.mark.tags(CaseLabel.L0) + def test_query_text_match_custom_analyzer(self): + """ + target: test text match with custom analyzer + method: 1. enable text match, use custom analyzer and insert data with varchar + 2. get the most common words and query with text match + 3. verify the result + expected: get the correct token, text match successfully and result is correct + """ + analyzer_params = { + "tokenizer": "standard", + "alpha_num_only": True, + "ascii_folding": True, + "lower_case": True, + "max_token_length": 40, + "split_compound_words": [ + "dampf", + "schiff", + "fahrt", + "brot", + "backen", + "automat", + ], + "stemmer": "English", + "stop": { + "language": "English", + "words": ["an", "the"], + }, + } + dim = 128 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + data_size = 5000 + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema + ) + fake = fake_en + language = "en" + data = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size) + ] + df = pd.DataFrame(data) + log.info(f"dataframe\n{df}") + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < len(df) + else data[i : len(df)] + ) + collection_w.flush() + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + collection_w.load() + # analyze the croup + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) + # query single field for one word + for field in text_fields: + token = list(wf_map[field].keys())[0] + expr = f"TextMatch({field}, '{token}')" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + for r in res: + assert token in r[field] + + # query single field for multi-word + for field in text_fields: + # match top 10 most common words + top_10_tokens = [] + for word, count in wf_map[field].most_common(10): + top_10_tokens.append(word) + string_of_top_10_words = " ".join(top_10_tokens) + expr = f"TextMatch({field}, '{string_of_top_10_words}')" + log.info(f"expr {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + for r in res: + assert any([token in r[field] for token in top_10_tokens]) + + @pytest.mark.tags(CaseLabel.L0) + def test_query_text_match_with_combined_expression_for_single_field(self): + """ + target: test query text match with combined expression for single field + method: 1. enable text match, and insert data with varchar + 2. get the most common words and form the combined expression with and operator + 3. verify the result + expected: query successfully and result is correct + """ + analyzer_params = { + "tokenizer": "default", + } + # 1. initialize with data + dim = 128 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + data_size = 5000 + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema + ) + fake = fake_en + language = "en" + data = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size) + ] + df = pd.DataFrame(data) + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < len(df) + else data[i : len(df)] + ) + collection_w.flush() + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + collection_w.load() + # analyze the croup and get the tf-idf, then base on it to crate expr and ground truth + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) + + df_new = cf.split_dataframes(df, fields=text_fields) + log.info(f"new df \n{df_new}") + for field in text_fields: + expr_list = [] + wf_counter = Counter(wf_map[field]) + pd_tmp_res_list = [] + for word, count in wf_counter.most_common(2): + tmp = f"TextMatch({field}, '{word}')" + log.info(f"tmp expr {tmp}") + expr_list.append(tmp) + manual_result = df_new[ + df_new.apply(lambda row: word in row[field], axis=1) + ] + tmp_res = set(manual_result["id"].tolist()) + log.info(f"manual check result for {tmp} {len(manual_result)}") + pd_tmp_res_list.append(tmp_res) + final_res = set(pd_tmp_res_list[0]) + for i in range(1, len(pd_tmp_res_list)): + final_res = final_res.intersection(set(pd_tmp_res_list[i])) + log.info(f"intersection res {len(final_res)}") + and_expr = " and ".join(expr_list) + log.info(f"expr: {and_expr}") + res, _ = collection_w.query(expr=and_expr, output_fields=text_fields) + log.info(f"res len {len(res)}, final res {len(final_res)}") + assert len(res) == len(final_res) + + @pytest.mark.tags(CaseLabel.L0) + def test_query_text_match_with_combined_expression_for_multi_field(self): + """ + target: test query text match with combined expression for multi field + method: 1. enable text match, and insert data with varchar + 2. create the combined expression with `and`, `or` and `not` operator for multi field + 3. verify the result + expected: query successfully and result is correct + """ + analyzer_params = { + "tokenizer": "default", + } + # 1. initialize with data + dim = 128 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + data_size = 5000 + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema + ) + fake = fake_en + language = "en" + data = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size) + ] + df = pd.DataFrame(data) + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < len(df) + else data[i : len(df)] + ) + collection_w.flush() + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + collection_w.load() + # analyze the croup and get the tf-idf, then base on it to crate expr and ground truth + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) + + df_new = cf.split_dataframes(df, fields=text_fields) + log.info(f"new df \n{df_new}") + for i in range(2): + query, text_match_expr, pandas_expr = ( + cf.generate_random_query_from_freq_dict( + wf_map, min_freq=3, max_terms=5, p_not=0.2 + ) + ) + log.info(f"expr: {text_match_expr}") + res, _ = collection_w.query(expr=text_match_expr, output_fields=text_fields) + onetime_res = res + log.info(f"res len {len(res)}") + step_by_step_results = [] + for expr in query: + if isinstance(expr, dict): + if "not" in expr: + key = expr["not"]["field"] + else: + key = expr["field"] + + tmp_expr = cf.generate_text_match_expr(expr) + res, _ = collection_w.query( + expr=tmp_expr, output_fields=text_fields + ) + text_match_df = pd.DataFrame(res) + log.info( + f"text match res {len(text_match_df)}\n{text_match_df[key]}" + ) + log.info(f"tmp expr {tmp_expr} {len(res)}") + tmp_idx = [r["id"] for r in res] + step_by_step_results.append(tmp_idx) + pandas_filter_res = cf.generate_pandas_text_match_result( + expr, df_new + ) + tmp_pd_idx = pandas_filter_res["id"].tolist() + diff_id = set(tmp_pd_idx).union(set(tmp_idx)) - set( + tmp_pd_idx + ).intersection(set(tmp_idx)) + log.info(f"diff between text match and manual check {diff_id}") + assert len(diff_id) == 0 + for idx in diff_id: + log.info(df[df["id"] == idx][key].values) + log.info( + f"pandas_filter_res {len(pandas_filter_res)} \n {pandas_filter_res}" + ) + if isinstance(expr, str): + step_by_step_results.append(expr) + final_res = cf.evaluate_expression(step_by_step_results) + log.info(f"one time res {len(onetime_res)}, final res {len(final_res)}") + if len(onetime_res) != len(final_res): + log.info("res is not same") + assert False + + @pytest.mark.tags(CaseLabel.L2) + def test_query_text_match_with_multi_lang(self): + """ + target: test text match with multi-language text data + method: 1. enable text match, and insert data with varchar in different language + 2. get the most common words and query with text match + 3. verify the result + expected: get the correct token, text match successfully and result is correct + """ + + # 1. initialize with data + analyzer_params = { + "tokenizer": "default", + } + # 1. initialize with data + dim = 128 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + data_size = 5000 + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema + ) + fake = fake_en + language = "en" + data_en = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size // 2) + ] + fake = fake_de + data_de = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size // 2, data_size) + ] + data = data_en + data_de + df = pd.DataFrame(data) + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < len(df) + else data[i : len(df)] + ) + collection_w.flush() + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + collection_w.load() + # analyze the croup and get the tf-idf, then base on it to crate expr and ground truth + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) + + df_new = cf.split_dataframes(df, fields=text_fields) + log.info(f"new df \n{df_new}") + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < len(df) + else data[i : len(df)] + ) + collection_w.flush() + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + collection_w.load() + # query single field for one word + for field in text_fields: + token = wf_map[field].most_common()[-1][0] + expr = f"TextMatch({field}, '{token}')" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + assert len(res) > 0 + for r in res: + assert token in r[field] + + # query single field for multi-word + for field in text_fields: + # match top 3 most common words + multi_words = [] + for word, count in wf_map[field].most_common(3): + multi_words.append(word) + string_of_multi_words = " ".join(multi_words) + expr = f"TextMatch({field}, '{string_of_multi_words}')" + log.info(f"expr {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + assert len(res) > 0 + for r in res: + assert any([token in r[field] for token in multi_words]) + + @pytest.mark.tags(CaseLabel.L1) + def test_query_text_match_with_addition_inverted_index(self): + """ + target: test text match with addition inverted index + method: 1. enable text match, and insert data with varchar + 2. create inverted index + 3. get the most common words and query with text match + 4. query with inverted index and verify the result + expected: get the correct token, text match successfully and result is correct + """ + # 1. initialize with data + fake_en = Faker("en_US") + analyzer_params = { + "tokenizer": "default", + } + dim = 128 + default_fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + default_schema = CollectionSchema( + fields=default_fields, description="test collection" + ) + + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=default_schema + ) + data = [] + data_size = 10000 + for i in range(data_size): + d = { + "id": i, + "word": fake_en.word().lower(), + "sentence": fake_en.sentence().lower(), + "paragraph": fake_en.paragraph().lower(), + "text": fake_en.text().lower(), + "emb": cf.gen_vectors(1, dim)[0], + } + data.append(d) + batch_size = 5000 + for i in range(0, data_size, batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < data_size + else data[i:data_size] + ) + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + collection_w.create_index("word", {"index_type": "INVERTED"}) + collection_w.load() + df = pd.DataFrame(data) + df_split = cf.split_dataframes(df, fields=["word", "sentence", "paragraph", "text"]) + log.info(f"dataframe\n{df}") + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language="en") + # query single field for one word + for field in text_fields: + token = wf_map[field].most_common()[-1][0] + expr = f"TextMatch({field}, '{token}')" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + pandas_res = df_split[df_split.apply(lambda row: token in row[field], axis=1)] + log.info(f"res len {len(res)}, pandas res len {len(pandas_res)}") + log.info(f"pandas res\n{pandas_res}") + assert len(res) == len(pandas_res) + log.info(f"res len {len(res)}") + for r in res: + assert token in r[field] + if field == "word": + assert len(res) == wf_map[field].most_common()[-1][1] + expr = f"{field} == '{token}'" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + assert len(res) == wf_map[field].most_common()[-1][1] + + @pytest.mark.tags(CaseLabel.L1) + def test_query_text_match_with_some_empty_string(self): + """ + target: test text match normal + method: 1. enable text match and insert data with varchar with some empty string + 2. get the most common words and query with text match + 3. verify the result + expected: text match successfully and result is correct + """ + # 1. initialize with data + analyzer_params = { + "tokenizer": "default", + } + # 1. initialize with data + dim = 128 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + data_size = 5000 + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema + ) + fake = fake_en + language = "en" + data_en = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size // 2) + ] + data_empty = [ + { + "id": i, + "word": "", + "sentence": " ", + "paragraph": "", + "text": " ", + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size // 2, data_size) + ] + data = data_en + data_empty + df = pd.DataFrame(data) + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < len(df) + else data[i : len(df)] + ) + collection_w.flush() + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + collection_w.load() + # analyze the croup and get the tf-idf, then base on it to crate expr and ground truth + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) + + df_new = cf.split_dataframes(df, fields=text_fields) + log.info(f"new df \n{df_new}") + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i: i + batch_size] + if i + batch_size < len(df) + else data[i: len(df)] + ) + collection_w.flush() + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + collection_w.load() + # query single field for one word + for field in text_fields: + token = wf_map[field].most_common()[-1][0] + expr = f"TextMatch({field}, '{token}')" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + assert len(res) > 0 + for r in res: + assert token in r[field] + # query single field for multi-word + for field in text_fields: + # match top 3 most common words + multi_words = [] + for word, count in wf_map[field].most_common(3): + multi_words.append(word) + string_of_multi_words = " ".join(multi_words) + expr = f"TextMatch({field}, '{string_of_multi_words}')" + log.info(f"expr {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + assert len(res) > 0 + for r in res: + assert any([token in r[field] for token in multi_words]) + + +class TestQueryTextMatchNegative(TestcaseBase): + @pytest.mark.tags(CaseLabel.L0) + def test_query_text_match_with_unsupported_tokenizer(self): + """ + target: test query text match with unsupported tokenizer + method: 1. enable text match, and use unsupported tokenizer + 2. create collection + expected: create collection failed and return error + """ + analyzer_params = { + "tokenizer": "Unsupported", + } + dim = 128 + default_fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="title", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="overview", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="genres", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="producer", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema( + name="cast", + dtype=DataType.VARCHAR, + max_length=65535, + enable_match=True, + analyzer_params=analyzer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + default_schema = CollectionSchema( + fields=default_fields, description="test collection" + ) + error = {ct.err_code: 2000, ct.err_msg: "invalid tokenizer parameters"} + self.init_collection_wrap( + name=cf.gen_unique_str(prefix), + schema=default_schema, + check_task=CheckTasks.err_res, + check_items=error, + )