""" Parquet Analyzer Main Component Main analyzer that integrates metadata parsing and vector deserialization functionality """ import json from pathlib import Path from typing import Dict, List, Any, Optional from .meta_parser import ParquetMetaParser from .vector_deserializer import VectorDeserializer class ParquetAnalyzer: """Main Parquet file analyzer class""" def __init__(self, file_path: str): """ Initialize analyzer Args: file_path: parquet file path """ self.file_path = Path(file_path) self.meta_parser = ParquetMetaParser(file_path) self.vector_deserializer = VectorDeserializer() def load(self) -> bool: """ Load parquet file Returns: bool: whether loading was successful """ return self.meta_parser.load() def analyze_metadata(self) -> Dict[str, Any]: """ Analyze metadata information Returns: Dict: metadata analysis results """ if not self.meta_parser.metadata: return {} return { "basic_info": self.meta_parser.get_basic_info(), "file_metadata": self.meta_parser.get_file_metadata(), "schema_metadata": self.meta_parser.get_schema_metadata(), "column_statistics": self.meta_parser.get_column_statistics(), "row_group_info": self.meta_parser.get_row_group_info(), "metadata_summary": self.meta_parser.get_metadata_summary() } def analyze_vectors(self) -> List[Dict[str, Any]]: """ Analyze vector data Returns: List: vector analysis results list """ if not self.meta_parser.metadata: return [] vector_analysis = [] column_stats = self.meta_parser.get_column_statistics() for col_stats in column_stats: if "statistics" in col_stats and col_stats["statistics"]: stats = col_stats["statistics"] col_name = col_stats["column_name"] # Check if there's binary data (vector) if "min" in stats: min_value = stats["min"] if isinstance(min_value, bytes): min_analysis = VectorDeserializer.deserialize_with_analysis( min_value, col_name ) if min_analysis: vector_analysis.append({ "column_name": col_name, "stat_type": "min", "analysis": min_analysis }) elif isinstance(min_value, str) and len(min_value) > 32: # May be hex string, try to convert back to bytes try: min_bytes = bytes.fromhex(min_value) min_analysis = VectorDeserializer.deserialize_with_analysis( min_bytes, col_name ) if min_analysis: vector_analysis.append({ "column_name": col_name, "stat_type": "min", "analysis": min_analysis }) except ValueError: pass if "max" in stats: max_value = stats["max"] if isinstance(max_value, bytes): max_analysis = VectorDeserializer.deserialize_with_analysis( max_value, col_name ) if max_analysis: vector_analysis.append({ "column_name": col_name, "stat_type": "max", "analysis": max_analysis }) elif isinstance(max_value, str) and len(max_value) > 32: # May be hex string, try to convert back to bytes try: max_bytes = bytes.fromhex(max_value) max_analysis = VectorDeserializer.deserialize_with_analysis( max_bytes, col_name ) if max_analysis: vector_analysis.append({ "column_name": col_name, "stat_type": "max", "analysis": max_analysis }) except ValueError: pass return vector_analysis def analyze(self) -> Dict[str, Any]: """ Complete parquet file analysis Returns: Dict: complete analysis results """ if not self.load(): return {} return { "metadata": self.analyze_metadata(), "vectors": self.analyze_vectors() } def export_analysis(self, output_file: Optional[str] = None) -> str: """ Export analysis results Args: output_file: output file path, if None will auto-generate Returns: str: output file path """ if output_file is None: output_file = f"{self.file_path.stem}_analysis.json" analysis_result = self.analyze() with open(output_file, 'w', encoding='utf-8') as f: json.dump(analysis_result, f, indent=2, ensure_ascii=False) return output_file def print_summary(self): """Print analysis summary""" if not self.meta_parser.metadata: print("āŒ No parquet file loaded") return # Print metadata summary self.meta_parser.print_summary() # Print vector analysis summary vector_analysis = self.analyze_vectors() if vector_analysis: print(f"\nšŸ” Vector Analysis Summary:") print("=" * 60) for vec_analysis in vector_analysis: col_name = vec_analysis["column_name"] stat_type = vec_analysis["stat_type"] analysis = vec_analysis["analysis"] print(f" Column: {col_name} ({stat_type})") print(f" Vector Type: {analysis['vector_type']}") print(f" Dimension: {analysis['dimension']}") if "statistics" in analysis and analysis["statistics"]: stats = analysis["statistics"] print(f" Min: {stats.get('min', 'N/A')}") print(f" Max: {stats.get('max', 'N/A')}") print(f" Mean: {stats.get('mean', 'N/A')}") print(f" Std: {stats.get('std', 'N/A')}") if analysis["vector_type"] == "BinaryVector" and "statistics" in analysis: stats = analysis["statistics"] print(f" Zero Count: {stats.get('zero_count', 'N/A')}") print(f" One Count: {stats.get('one_count', 'N/A')}") print() def get_vector_samples(self, column_name: str, sample_count: int = 5) -> List[Dict[str, Any]]: """ Get vector sample data Args: column_name: column name sample_count: number of samples Returns: List: vector sample list """ # This can be extended to read samples from actual data # Currently returns min/max from statistics as samples vector_analysis = self.analyze_vectors() samples = [] for vec_analysis in vector_analysis: if vec_analysis["column_name"] == column_name: analysis = vec_analysis["analysis"] samples.append({ "type": vec_analysis["stat_type"], "vector_type": analysis["vector_type"], "dimension": analysis["dimension"], "data": analysis["deserialized"][:sample_count] if analysis["deserialized"] else [], "statistics": analysis.get("statistics", {}) }) return samples def compare_vectors(self, column_name: str) -> Dict[str, Any]: """ Compare different vector statistics for the same column Args: column_name: column name Returns: Dict: comparison results """ vector_analysis = self.analyze_vectors() column_vectors = [v for v in vector_analysis if v["column_name"] == column_name] if len(column_vectors) < 2: return {} comparison = { "column_name": column_name, "vector_count": len(column_vectors), "comparison": {} } for vec_analysis in column_vectors: stat_type = vec_analysis["stat_type"] analysis = vec_analysis["analysis"] comparison["comparison"][stat_type] = { "vector_type": analysis["vector_type"], "dimension": analysis["dimension"], "statistics": analysis.get("statistics", {}) } return comparison def validate_vector_consistency(self) -> Dict[str, Any]: """ Validate vector data consistency Returns: Dict: validation results """ vector_analysis = self.analyze_vectors() validation_result = { "total_vectors": len(vector_analysis), "consistent_columns": [], "inconsistent_columns": [], "details": {} } # Group by column columns = {} for vec_analysis in vector_analysis: col_name = vec_analysis["column_name"] if col_name not in columns: columns[col_name] = [] columns[col_name].append(vec_analysis) for col_name, vec_list in columns.items(): if len(vec_list) >= 2: # Check if vector types are consistent for the same column vector_types = set(v["analysis"]["vector_type"] for v in vec_list) dimensions = set(v["analysis"]["dimension"] for v in vec_list) is_consistent = len(vector_types) == 1 and len(dimensions) == 1 validation_result["details"][col_name] = { "vector_types": list(vector_types), "dimensions": list(dimensions), "is_consistent": is_consistent, "vector_count": len(vec_list) } if is_consistent: validation_result["consistent_columns"].append(col_name) else: validation_result["inconsistent_columns"].append(col_name) return validation_result def query_by_id(self, id_value: Any, id_column: str = None) -> Dict[str, Any]: """ Query data by ID value Args: id_value: ID value to search for id_column: ID column name (if None, will try to find primary key column) Returns: Dict: query results """ try: import pandas as pd import pyarrow.parquet as pq except ImportError: return {"error": "pandas and pyarrow are required for ID query"} if not self.meta_parser.metadata: return {"error": "Parquet file not loaded"} try: # Read the parquet file df = pd.read_parquet(self.file_path) # If no ID column specified, try to find primary key column if id_column is None: # Common primary key column names pk_candidates = ['id', 'ID', 'Id', 'pk', 'PK', 'primary_key', 'row_id', 'RowID'] for candidate in pk_candidates: if candidate in df.columns: id_column = candidate break if id_column is None: # If no common PK found, use the first column id_column = df.columns[0] if id_column not in df.columns: return { "error": f"ID column '{id_column}' not found in the data", "available_columns": list(df.columns) } # Query by ID result = df[df[id_column] == id_value] if result.empty: return { "found": False, "id_column": id_column, "id_value": id_value, "message": f"No record found with {id_column} = {id_value}" } # Convert to dict for JSON serialization record = result.iloc[0].to_dict() # Handle vector columns if present vector_columns = [] for col_name, value in record.items(): if isinstance(value, bytes) and len(value) > 32: # This might be a vector, try to deserialize try: vector_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name) if vector_analysis: vector_columns.append({ "column_name": col_name, "analysis": vector_analysis }) # Replace bytes with analysis summary if vector_analysis["vector_type"] == "JSON": # For JSON, show the actual content record[col_name] = vector_analysis["deserialized"] elif vector_analysis["vector_type"] == "Array": # For Array, show the actual content record[col_name] = vector_analysis["deserialized"] else: # For vectors, show type and dimension record[col_name] = { "vector_type": vector_analysis["vector_type"], "dimension": vector_analysis["dimension"], "data_preview": vector_analysis["deserialized"][:5] if vector_analysis["deserialized"] else [] } except Exception: # If deserialization fails, keep as bytes but truncate for display record[col_name] = f"" return { "found": True, "id_column": id_column, "id_value": id_value, "record": record, "vector_columns": vector_columns, "total_columns": len(df.columns), "total_rows": len(df) } except Exception as e: return {"error": f"Query failed: {str(e)}"} def get_id_column_info(self) -> Dict[str, Any]: """ Get information about ID columns in the data Returns: Dict: ID column information """ try: import pandas as pd except ImportError: return {"error": "pandas is required for ID column analysis"} if not self.meta_parser.metadata: return {"error": "Parquet file not loaded"} try: df = pd.read_parquet(self.file_path) # Find potential ID columns id_columns = [] for col in df.columns: col_data = df[col] # Check if column looks like an ID column is_unique = col_data.nunique() == len(col_data) is_numeric = pd.api.types.is_numeric_dtype(col_data) is_integer = pd.api.types.is_integer_dtype(col_data) id_columns.append({ "column_name": col, "is_unique": is_unique, "is_numeric": is_numeric, "is_integer": is_integer, "unique_count": col_data.nunique(), "total_count": len(col_data), "min_value": col_data.min() if is_numeric else None, "max_value": col_data.max() if is_numeric else None, "sample_values": col_data.head(5).tolist() }) return { "total_columns": len(df.columns), "total_rows": len(df), "id_columns": id_columns, "recommended_id_column": self._get_recommended_id_column(id_columns) } except Exception as e: return {"error": f"ID column analysis failed: {str(e)}"} def _get_recommended_id_column(self, id_columns: List[Dict[str, Any]]) -> str: """ Get recommended ID column based on heuristics Args: id_columns: List of ID column information Returns: str: Recommended ID column name """ # Priority order for ID columns priority_names = ['id', 'ID', 'Id', 'pk', 'PK', 'primary_key', 'row_id', 'RowID'] # First, look for columns with priority names that are unique for priority_name in priority_names: for col_info in id_columns: if (col_info["column_name"].lower() == priority_name.lower() and col_info["is_unique"]): return col_info["column_name"] # Then, look for any unique integer column for col_info in id_columns: if col_info["is_unique"] and col_info["is_integer"]: return col_info["column_name"] # Finally, look for any unique column for col_info in id_columns: if col_info["is_unique"]: return col_info["column_name"] # If no unique column found, return the first column return id_columns[0]["column_name"] if id_columns else ""