mirror of https://github.com/milvus-io/milvus.git
494 lines
19 KiB
Python
494 lines
19 KiB
Python
"""
|
|
Parquet Analyzer Main Component
|
|
Main analyzer that integrates metadata parsing and vector deserialization functionality
|
|
"""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
from .meta_parser import ParquetMetaParser
|
|
from .vector_deserializer import VectorDeserializer
|
|
|
|
|
|
class ParquetAnalyzer:
|
|
"""Main Parquet file analyzer class"""
|
|
|
|
def __init__(self, file_path: str):
|
|
"""
|
|
Initialize analyzer
|
|
|
|
Args:
|
|
file_path: parquet file path
|
|
"""
|
|
self.file_path = Path(file_path)
|
|
self.meta_parser = ParquetMetaParser(file_path)
|
|
self.vector_deserializer = VectorDeserializer()
|
|
|
|
def load(self) -> bool:
|
|
"""
|
|
Load parquet file
|
|
|
|
Returns:
|
|
bool: whether loading was successful
|
|
"""
|
|
return self.meta_parser.load()
|
|
|
|
def analyze_metadata(self) -> Dict[str, Any]:
|
|
"""
|
|
Analyze metadata information
|
|
|
|
Returns:
|
|
Dict: metadata analysis results
|
|
"""
|
|
if not self.meta_parser.metadata:
|
|
return {}
|
|
|
|
return {
|
|
"basic_info": self.meta_parser.get_basic_info(),
|
|
"file_metadata": self.meta_parser.get_file_metadata(),
|
|
"schema_metadata": self.meta_parser.get_schema_metadata(),
|
|
"column_statistics": self.meta_parser.get_column_statistics(),
|
|
"row_group_info": self.meta_parser.get_row_group_info(),
|
|
"metadata_summary": self.meta_parser.get_metadata_summary()
|
|
}
|
|
|
|
def analyze_vectors(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Analyze vector data
|
|
|
|
Returns:
|
|
List: vector analysis results list
|
|
"""
|
|
if not self.meta_parser.metadata:
|
|
return []
|
|
|
|
vector_analysis = []
|
|
column_stats = self.meta_parser.get_column_statistics()
|
|
|
|
for col_stats in column_stats:
|
|
if "statistics" in col_stats and col_stats["statistics"]:
|
|
stats = col_stats["statistics"]
|
|
col_name = col_stats["column_name"]
|
|
|
|
# Check if there's binary data (vector)
|
|
if "min" in stats:
|
|
min_value = stats["min"]
|
|
if isinstance(min_value, bytes):
|
|
min_analysis = VectorDeserializer.deserialize_with_analysis(
|
|
min_value, col_name
|
|
)
|
|
if min_analysis:
|
|
vector_analysis.append({
|
|
"column_name": col_name,
|
|
"stat_type": "min",
|
|
"analysis": min_analysis
|
|
})
|
|
elif isinstance(min_value, str) and len(min_value) > 32:
|
|
# May be hex string, try to convert back to bytes
|
|
try:
|
|
min_bytes = bytes.fromhex(min_value)
|
|
min_analysis = VectorDeserializer.deserialize_with_analysis(
|
|
min_bytes, col_name
|
|
)
|
|
if min_analysis:
|
|
vector_analysis.append({
|
|
"column_name": col_name,
|
|
"stat_type": "min",
|
|
"analysis": min_analysis
|
|
})
|
|
except ValueError:
|
|
pass
|
|
|
|
if "max" in stats:
|
|
max_value = stats["max"]
|
|
if isinstance(max_value, bytes):
|
|
max_analysis = VectorDeserializer.deserialize_with_analysis(
|
|
max_value, col_name
|
|
)
|
|
if max_analysis:
|
|
vector_analysis.append({
|
|
"column_name": col_name,
|
|
"stat_type": "max",
|
|
"analysis": max_analysis
|
|
})
|
|
elif isinstance(max_value, str) and len(max_value) > 32:
|
|
# May be hex string, try to convert back to bytes
|
|
try:
|
|
max_bytes = bytes.fromhex(max_value)
|
|
max_analysis = VectorDeserializer.deserialize_with_analysis(
|
|
max_bytes, col_name
|
|
)
|
|
if max_analysis:
|
|
vector_analysis.append({
|
|
"column_name": col_name,
|
|
"stat_type": "max",
|
|
"analysis": max_analysis
|
|
})
|
|
except ValueError:
|
|
pass
|
|
|
|
return vector_analysis
|
|
|
|
def analyze(self) -> Dict[str, Any]:
|
|
"""
|
|
Complete parquet file analysis
|
|
|
|
Returns:
|
|
Dict: complete analysis results
|
|
"""
|
|
if not self.load():
|
|
return {}
|
|
|
|
return {
|
|
"metadata": self.analyze_metadata(),
|
|
"vectors": self.analyze_vectors()
|
|
}
|
|
|
|
def export_analysis(self, output_file: Optional[str] = None) -> str:
|
|
"""
|
|
Export analysis results
|
|
|
|
Args:
|
|
output_file: output file path, if None will auto-generate
|
|
|
|
Returns:
|
|
str: output file path
|
|
"""
|
|
if output_file is None:
|
|
output_file = f"{self.file_path.stem}_analysis.json"
|
|
|
|
analysis_result = self.analyze()
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(analysis_result, f, indent=2, ensure_ascii=False)
|
|
|
|
return output_file
|
|
|
|
def print_summary(self):
|
|
"""Print analysis summary"""
|
|
if not self.meta_parser.metadata:
|
|
print("❌ No parquet file loaded")
|
|
return
|
|
|
|
# Print metadata summary
|
|
self.meta_parser.print_summary()
|
|
|
|
# Print vector analysis summary
|
|
vector_analysis = self.analyze_vectors()
|
|
if vector_analysis:
|
|
print(f"\n🔍 Vector Analysis Summary:")
|
|
print("=" * 60)
|
|
for vec_analysis in vector_analysis:
|
|
col_name = vec_analysis["column_name"]
|
|
stat_type = vec_analysis["stat_type"]
|
|
analysis = vec_analysis["analysis"]
|
|
|
|
print(f" Column: {col_name} ({stat_type})")
|
|
print(f" Vector Type: {analysis['vector_type']}")
|
|
print(f" Dimension: {analysis['dimension']}")
|
|
|
|
if "statistics" in analysis and analysis["statistics"]:
|
|
stats = analysis["statistics"]
|
|
print(f" Min: {stats.get('min', 'N/A')}")
|
|
print(f" Max: {stats.get('max', 'N/A')}")
|
|
print(f" Mean: {stats.get('mean', 'N/A')}")
|
|
print(f" Std: {stats.get('std', 'N/A')}")
|
|
|
|
if analysis["vector_type"] == "BinaryVector" and "statistics" in analysis:
|
|
stats = analysis["statistics"]
|
|
print(f" Zero Count: {stats.get('zero_count', 'N/A')}")
|
|
print(f" One Count: {stats.get('one_count', 'N/A')}")
|
|
|
|
print()
|
|
|
|
def get_vector_samples(self, column_name: str, sample_count: int = 5) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get vector sample data
|
|
|
|
Args:
|
|
column_name: column name
|
|
sample_count: number of samples
|
|
|
|
Returns:
|
|
List: vector sample list
|
|
"""
|
|
# This can be extended to read samples from actual data
|
|
# Currently returns min/max from statistics as samples
|
|
vector_analysis = self.analyze_vectors()
|
|
samples = []
|
|
|
|
for vec_analysis in vector_analysis:
|
|
if vec_analysis["column_name"] == column_name:
|
|
analysis = vec_analysis["analysis"]
|
|
samples.append({
|
|
"type": vec_analysis["stat_type"],
|
|
"vector_type": analysis["vector_type"],
|
|
"dimension": analysis["dimension"],
|
|
"data": analysis["deserialized"][:sample_count] if analysis["deserialized"] else [],
|
|
"statistics": analysis.get("statistics", {})
|
|
})
|
|
|
|
return samples
|
|
|
|
def compare_vectors(self, column_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Compare different vector statistics for the same column
|
|
|
|
Args:
|
|
column_name: column name
|
|
|
|
Returns:
|
|
Dict: comparison results
|
|
"""
|
|
vector_analysis = self.analyze_vectors()
|
|
column_vectors = [v for v in vector_analysis if v["column_name"] == column_name]
|
|
|
|
if len(column_vectors) < 2:
|
|
return {}
|
|
|
|
comparison = {
|
|
"column_name": column_name,
|
|
"vector_count": len(column_vectors),
|
|
"comparison": {}
|
|
}
|
|
|
|
for vec_analysis in column_vectors:
|
|
stat_type = vec_analysis["stat_type"]
|
|
analysis = vec_analysis["analysis"]
|
|
|
|
comparison["comparison"][stat_type] = {
|
|
"vector_type": analysis["vector_type"],
|
|
"dimension": analysis["dimension"],
|
|
"statistics": analysis.get("statistics", {})
|
|
}
|
|
|
|
return comparison
|
|
|
|
def validate_vector_consistency(self) -> Dict[str, Any]:
|
|
"""
|
|
Validate vector data consistency
|
|
|
|
Returns:
|
|
Dict: validation results
|
|
"""
|
|
vector_analysis = self.analyze_vectors()
|
|
validation_result = {
|
|
"total_vectors": len(vector_analysis),
|
|
"consistent_columns": [],
|
|
"inconsistent_columns": [],
|
|
"details": {}
|
|
}
|
|
|
|
# Group by column
|
|
columns = {}
|
|
for vec_analysis in vector_analysis:
|
|
col_name = vec_analysis["column_name"]
|
|
if col_name not in columns:
|
|
columns[col_name] = []
|
|
columns[col_name].append(vec_analysis)
|
|
|
|
for col_name, vec_list in columns.items():
|
|
if len(vec_list) >= 2:
|
|
# Check if vector types are consistent for the same column
|
|
vector_types = set(v["analysis"]["vector_type"] for v in vec_list)
|
|
dimensions = set(v["analysis"]["dimension"] for v in vec_list)
|
|
|
|
is_consistent = len(vector_types) == 1 and len(dimensions) == 1
|
|
|
|
validation_result["details"][col_name] = {
|
|
"vector_types": list(vector_types),
|
|
"dimensions": list(dimensions),
|
|
"is_consistent": is_consistent,
|
|
"vector_count": len(vec_list)
|
|
}
|
|
|
|
if is_consistent:
|
|
validation_result["consistent_columns"].append(col_name)
|
|
else:
|
|
validation_result["inconsistent_columns"].append(col_name)
|
|
|
|
return validation_result
|
|
|
|
def query_by_id(self, id_value: Any, id_column: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Query data by ID value
|
|
|
|
Args:
|
|
id_value: ID value to search for
|
|
id_column: ID column name (if None, will try to find primary key column)
|
|
|
|
Returns:
|
|
Dict: query results
|
|
"""
|
|
try:
|
|
import pandas as pd
|
|
import pyarrow.parquet as pq
|
|
except ImportError:
|
|
return {"error": "pandas and pyarrow are required for ID query"}
|
|
|
|
if not self.meta_parser.metadata:
|
|
return {"error": "Parquet file not loaded"}
|
|
|
|
try:
|
|
# Read the parquet file
|
|
df = pd.read_parquet(self.file_path)
|
|
|
|
# If no ID column specified, try to find primary key column
|
|
if id_column is None:
|
|
# Common primary key column names
|
|
pk_candidates = ['id', 'ID', 'Id', 'pk', 'PK', 'primary_key', 'row_id', 'RowID']
|
|
for candidate in pk_candidates:
|
|
if candidate in df.columns:
|
|
id_column = candidate
|
|
break
|
|
|
|
if id_column is None:
|
|
# If no common PK found, use the first column
|
|
id_column = df.columns[0]
|
|
|
|
if id_column not in df.columns:
|
|
return {
|
|
"error": f"ID column '{id_column}' not found in the data",
|
|
"available_columns": list(df.columns)
|
|
}
|
|
|
|
# Query by ID
|
|
result = df[df[id_column] == id_value]
|
|
|
|
if result.empty:
|
|
return {
|
|
"found": False,
|
|
"id_column": id_column,
|
|
"id_value": id_value,
|
|
"message": f"No record found with {id_column} = {id_value}"
|
|
}
|
|
|
|
# Convert to dict for JSON serialization
|
|
record = result.iloc[0].to_dict()
|
|
|
|
# Handle vector columns if present
|
|
vector_columns = []
|
|
for col_name, value in record.items():
|
|
if isinstance(value, bytes) and len(value) > 32:
|
|
# This might be a vector, try to deserialize
|
|
try:
|
|
vector_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name)
|
|
if vector_analysis:
|
|
vector_columns.append({
|
|
"column_name": col_name,
|
|
"analysis": vector_analysis
|
|
})
|
|
# Replace bytes with analysis summary
|
|
if vector_analysis["vector_type"] == "JSON":
|
|
# For JSON, show the actual content
|
|
record[col_name] = vector_analysis["deserialized"]
|
|
elif vector_analysis["vector_type"] == "Array":
|
|
# For Array, show the actual content
|
|
record[col_name] = vector_analysis["deserialized"]
|
|
else:
|
|
# For vectors, show type and dimension
|
|
record[col_name] = {
|
|
"vector_type": vector_analysis["vector_type"],
|
|
"dimension": vector_analysis["dimension"],
|
|
"data_preview": vector_analysis["deserialized"][:5] if vector_analysis["deserialized"] else []
|
|
}
|
|
except Exception:
|
|
# If deserialization fails, keep as bytes but truncate for display
|
|
record[col_name] = f"<binary data: {len(value)} bytes>"
|
|
|
|
return {
|
|
"found": True,
|
|
"id_column": id_column,
|
|
"id_value": id_value,
|
|
"record": record,
|
|
"vector_columns": vector_columns,
|
|
"total_columns": len(df.columns),
|
|
"total_rows": len(df)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Query failed: {str(e)}"}
|
|
|
|
def get_id_column_info(self) -> Dict[str, Any]:
|
|
"""
|
|
Get information about ID columns in the data
|
|
|
|
Returns:
|
|
Dict: ID column information
|
|
"""
|
|
try:
|
|
import pandas as pd
|
|
except ImportError:
|
|
return {"error": "pandas is required for ID column analysis"}
|
|
|
|
if not self.meta_parser.metadata:
|
|
return {"error": "Parquet file not loaded"}
|
|
|
|
try:
|
|
df = pd.read_parquet(self.file_path)
|
|
|
|
# Find potential ID columns
|
|
id_columns = []
|
|
for col in df.columns:
|
|
col_data = df[col]
|
|
|
|
# Check if column looks like an ID column
|
|
is_unique = col_data.nunique() == len(col_data)
|
|
is_numeric = pd.api.types.is_numeric_dtype(col_data)
|
|
is_integer = pd.api.types.is_integer_dtype(col_data)
|
|
|
|
id_columns.append({
|
|
"column_name": col,
|
|
"is_unique": is_unique,
|
|
"is_numeric": is_numeric,
|
|
"is_integer": is_integer,
|
|
"unique_count": col_data.nunique(),
|
|
"total_count": len(col_data),
|
|
"min_value": col_data.min() if is_numeric else None,
|
|
"max_value": col_data.max() if is_numeric else None,
|
|
"sample_values": col_data.head(5).tolist()
|
|
})
|
|
|
|
return {
|
|
"total_columns": len(df.columns),
|
|
"total_rows": len(df),
|
|
"id_columns": id_columns,
|
|
"recommended_id_column": self._get_recommended_id_column(id_columns)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": f"ID column analysis failed: {str(e)}"}
|
|
|
|
def _get_recommended_id_column(self, id_columns: List[Dict[str, Any]]) -> str:
|
|
"""
|
|
Get recommended ID column based on heuristics
|
|
|
|
Args:
|
|
id_columns: List of ID column information
|
|
|
|
Returns:
|
|
str: Recommended ID column name
|
|
"""
|
|
# Priority order for ID columns
|
|
priority_names = ['id', 'ID', 'Id', 'pk', 'PK', 'primary_key', 'row_id', 'RowID']
|
|
|
|
# First, look for columns with priority names that are unique
|
|
for priority_name in priority_names:
|
|
for col_info in id_columns:
|
|
if (col_info["column_name"].lower() == priority_name.lower() and
|
|
col_info["is_unique"]):
|
|
return col_info["column_name"]
|
|
|
|
# Then, look for any unique integer column
|
|
for col_info in id_columns:
|
|
if col_info["is_unique"] and col_info["is_integer"]:
|
|
return col_info["column_name"]
|
|
|
|
# Finally, look for any unique column
|
|
for col_info in id_columns:
|
|
if col_info["is_unique"]:
|
|
return col_info["column_name"]
|
|
|
|
# If no unique column found, return the first column
|
|
return id_columns[0]["column_name"] if id_columns else "" |