#!/usr/bin/env python3 """ Parquet Analyzer Command Line Tool Provides a simple command line interface to use the parquet analyzer """ import argparse import sys import json import pandas as pd import pyarrow.parquet as pq from pathlib import Path # Add current directory to Python path sys.path.append(str(Path(__file__).parent)) from parquet_analyzer import ParquetAnalyzer, ParquetMetaParser, VectorDeserializer def main(): """Main function""" parser = argparse.ArgumentParser( description="Parquet Analyzer Command Line Tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Usage Examples: python parquet_analyzer_cli.py analyze test_large_batch.parquet python parquet_analyzer_cli.py metadata test_large_batch.parquet python parquet_analyzer_cli.py vector test_large_batch.parquet python parquet_analyzer_cli.py export test_large_batch.parquet --output result.json python parquet_analyzer_cli.py data test_large_batch.parquet --rows 10 --output data.json python parquet_analyzer_cli.py query test_large_batch.parquet --id-value 123 python parquet_analyzer_cli.py query test_large_batch.parquet --id-value 123 --id-column user_id """ ) parser.add_argument( "command", choices=["analyze", "metadata", "vector", "export", "data", "query"], help="Command to execute" ) parser.add_argument( "file", help="Parquet file path" ) parser.add_argument( "--output", "-o", help="Output file path (for export and data commands)" ) parser.add_argument( "--rows", "-r", type=int, default=10, help="Number of rows to export (only for data command, default: 10 rows)" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Verbose output" ) # Query-specific arguments parser.add_argument( "--id-value", "-i", help="ID value to query (for query command)" ) parser.add_argument( "--id-column", "-c", help="ID column name (for query command, auto-detected if not specified)" ) args = parser.parse_args() # Check if file exists if not Path(args.file).exists(): print(f"āŒ File does not exist: {args.file}") sys.exit(1) if args.command == "analyze": analyze_file(args.file, args.verbose) elif args.command == "metadata": analyze_metadata(args.file, args.verbose) elif args.command == "vector": analyze_vectors(args.file, args.verbose) elif args.command == "export": export_analysis(args.file, args.output, args.verbose) elif args.command == "data": export_data(args.file, args.output, args.rows, args.verbose) elif args.command == "query": query_by_id(args.file, args.id_value, args.id_column, args.verbose) def analyze_file(file_path: str, verbose: bool = False): """Analyze parquet file""" print(f"šŸ” Analyzing parquet file: {Path(file_path).name}") print("=" * 60) analyzer = ParquetAnalyzer(file_path) if not analyzer.load(): print("āŒ Failed to load parquet file") sys.exit(1) # Print summary analyzer.print_summary() if verbose: # Detailed analysis analysis = analyzer.analyze() print(f"\nšŸ“Š Detailed Analysis Results:") print(f" File Info: {analysis['metadata']['basic_info']['name']}") print(f" Size: {analysis['metadata']['basic_info']['size_mb']:.2f} MB") print(f" Rows: {analysis['metadata']['basic_info']['num_rows']:,}") print(f" Columns: {analysis['metadata']['basic_info']['num_columns']}") # Display vector analysis if analysis['vectors']: print(f"\nšŸ” Vector Analysis:") for vec_analysis in analysis['vectors']: col_name = vec_analysis['column_name'] stat_type = vec_analysis['stat_type'] analysis_data = vec_analysis['analysis'] print(f" {col_name} ({stat_type}):") print(f" Vector Type: {analysis_data['vector_type']}") print(f" Dimension: {analysis_data['dimension']}") if analysis_data['statistics']: stats = analysis_data['statistics'] print(f" Min: {stats.get('min', 'N/A')}") print(f" Max: {stats.get('max', 'N/A')}") print(f" Mean: {stats.get('mean', 'N/A')}") print(f" Std: {stats.get('std', 'N/A')}") def analyze_metadata(file_path: str, verbose: bool = False): """Analyze metadata""" print(f"šŸ“„ Analyzing metadata: {Path(file_path).name}") print("=" * 60) meta_parser = ParquetMetaParser(file_path) if not meta_parser.load(): print("āŒ Failed to load parquet file") sys.exit(1) # Basic information basic_info = meta_parser.get_basic_info() print(f"šŸ“Š File Information:") print(f" Name: {basic_info['name']}") print(f" Size: {basic_info['size_mb']:.2f} MB") print(f" Rows: {basic_info['num_rows']:,}") print(f" Columns: {basic_info['num_columns']}") print(f" Row Groups: {basic_info['num_row_groups']}") print(f" Created By: {basic_info['created_by']}") print(f" Parquet Version: {basic_info['format_version']}") # File-level metadata file_metadata = meta_parser.get_file_metadata() if file_metadata: print(f"\nšŸ“„ File-level Metadata:") for key, value in file_metadata.items(): print(f" {key}: {value}") # Schema-level metadata schema_metadata = meta_parser.get_schema_metadata() if schema_metadata: print(f"\nšŸ“‹ Schema-level Metadata:") for field in schema_metadata: print(f" {field['column_name']}: {field['column_type']}") for k, v in field['metadata'].items(): print(f" {k}: {v}") # Column statistics column_stats = meta_parser.get_column_statistics() if column_stats: print(f"\nšŸ“ˆ Column Statistics:") for col_stats in column_stats: print(f" {col_stats['column_name']}:") print(f" Compression: {col_stats['compression']}") print(f" Encodings: {', '.join(col_stats['encodings'])}") print(f" Compressed Size: {col_stats['compressed_size']:,} bytes") print(f" Uncompressed Size: {col_stats['uncompressed_size']:,} bytes") if 'statistics' in col_stats and col_stats['statistics']: stats = col_stats['statistics'] if 'null_count' in stats: print(f" Null Count: {stats['null_count']}") if 'distinct_count' in stats: print(f" Distinct Count: {stats['distinct_count']}") if 'min' in stats: print(f" Min: {stats['min']}") if 'max' in stats: print(f" Max: {stats['max']}") def analyze_vectors(file_path: str, verbose: bool = False): """Analyze vector data""" print(f"šŸ” Analyzing vector data: {Path(file_path).name}") print("=" * 60) analyzer = ParquetAnalyzer(file_path) if not analyzer.load(): print("āŒ Failed to load parquet file") sys.exit(1) vector_analysis = analyzer.analyze_vectors() if not vector_analysis: print("āŒ No vector data found") return print(f"šŸ“Š Found {len(vector_analysis)} vector statistics:") for vec_analysis in vector_analysis: col_name = vec_analysis['column_name'] stat_type = vec_analysis['stat_type'] analysis = vec_analysis['analysis'] print(f"\nšŸ” {col_name} ({stat_type}):") print(f" Vector Type: {analysis['vector_type']}") print(f" Dimension: {analysis['dimension']}") if analysis['statistics']: stats = analysis['statistics'] print(f" Min: {stats.get('min', 'N/A')}") print(f" Max: {stats.get('max', 'N/A')}") print(f" Mean: {stats.get('mean', 'N/A')}") print(f" Std: {stats.get('std', 'N/A')}") if analysis['vector_type'] == "BinaryVector": print(f" Zero Count: {stats.get('zero_count', 'N/A')}") print(f" One Count: {stats.get('one_count', 'N/A')}") if verbose and analysis['deserialized']: print(f" First 5 elements: {analysis['deserialized'][:5]}") # Validate consistency validation = analyzer.validate_vector_consistency() print(f"\nāœ… Vector Consistency Validation:") print(f" Total Vectors: {validation['total_vectors']}") print(f" Consistent Columns: {validation['consistent_columns']}") print(f" Inconsistent Columns: {validation['inconsistent_columns']}") def export_analysis(file_path: str, output_file: str = None, verbose: bool = False): """Export analysis results""" print(f"šŸ’¾ Exporting analysis results: {Path(file_path).name}") print("=" * 60) analyzer = ParquetAnalyzer(file_path) if not analyzer.load(): print("āŒ Failed to load parquet file") sys.exit(1) # Export analysis results output_file = analyzer.export_analysis(output_file) print(f"āœ… Analysis results exported to: {output_file}") if verbose: # Show file size file_size = Path(output_file).stat().st_size print(f"šŸ“Š Output file size: {file_size:,} bytes ({file_size/1024:.2f} KB)") # Show summary analysis = analyzer.analyze() print(f"šŸ“ˆ Analysis Summary:") print(f" Metadata Count: {analysis['metadata']['metadata_summary']['total_metadata_count']}") print(f" Vector Count: {len(analysis['vectors'])}") def export_data(file_path: str, output_file: str = None, num_rows: int = 10, verbose: bool = False): """Export first N rows of parquet file data""" print(f"šŸ“Š Exporting data: {Path(file_path).name}") print("=" * 60) try: # Read parquet file table = pq.read_table(file_path) df = table.to_pandas() # Get first N rows data_subset = df.head(num_rows) # Process vector columns, convert bytes to readable format processed_data = [] for idx, row in data_subset.iterrows(): row_dict = {} for col_name, value in row.items(): if isinstance(value, bytes): # Try to deserialize as vector try: vec_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name) if vec_analysis and vec_analysis['deserialized']: if vec_analysis['vector_type'] == "JSON": # For JSON, show the actual content row_dict[col_name] = vec_analysis['deserialized'] elif vec_analysis['vector_type'] == "Array": # For Array, show the actual content row_dict[col_name] = vec_analysis['deserialized'] else: # For vectors, show type and dimension row_dict[col_name] = { "vector_type": vec_analysis['vector_type'], "dimension": vec_analysis['dimension'], "data": vec_analysis['deserialized'][:10], # Only show first 10 elements "raw_hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex() } else: row_dict[col_name] = { "type": "binary", "size": len(value), "hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex() } except Exception as e: row_dict[col_name] = { "type": "binary", "size": len(value), "hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex(), "error": str(e) } else: row_dict[col_name] = value processed_data.append(row_dict) # Prepare output result = { "file_info": { "name": Path(file_path).name, "total_rows": len(df), "total_columns": len(df.columns), "exported_rows": len(processed_data) }, "columns": list(df.columns), "data": processed_data } # Determine output file if not output_file: output_file = f"{Path(file_path).stem}_data_{num_rows}rows.json" # Save to file with open(output_file, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"āœ… Data exported to: {output_file}") print(f"šŸ“Š Exported {len(processed_data)} rows (total {len(df)} rows)") print(f"šŸ“‹ Columns: {len(df.columns)}") if verbose: print(f"\nšŸ“ˆ Data Preview:") for i, row_data in enumerate(processed_data[:3]): # Only show first 3 rows preview print(f" Row {i+1}:") for col_name, value in row_data.items(): if isinstance(value, dict) and 'vector_type' in value: print(f" {col_name}: {value['vector_type']}({value['dimension']}) - {value['data'][:5]}...") elif isinstance(value, dict) and 'type' in value: print(f" {col_name}: {value['type']} ({value['size']} bytes)") else: print(f" {col_name}: {value}") print() return output_file except Exception as e: print(f"āŒ Failed to export data: {e}") sys.exit(1) def query_by_id(file_path: str, id_value: str = None, id_column: str = None, verbose: bool = False): """Query data by ID value""" print(f"šŸ” Querying by ID: {Path(file_path).name}") print("=" * 60) analyzer = ParquetAnalyzer(file_path) if not analyzer.load(): print("āŒ Failed to load parquet file") sys.exit(1) # If no ID value provided, show ID column information if id_value is None: print("šŸ“‹ ID Column Information:") print("-" * 40) id_info = analyzer.get_id_column_info() if "error" in id_info: print(f"āŒ {id_info['error']}") sys.exit(1) print(f"šŸ“Š Total rows: {id_info['total_rows']}") print(f"šŸ“‹ Total columns: {id_info['total_columns']}") print(f"šŸŽÆ Recommended ID column: {id_info['recommended_id_column']}") print() print("šŸ“‹ Available ID columns:") for col_info in id_info['id_columns']: status = "āœ…" if col_info['is_unique'] else "āš ļø" print(f" {status} {col_info['column_name']}") print(f" - Unique: {col_info['is_unique']}") print(f" - Type: {'Integer' if col_info['is_integer'] else 'Numeric' if col_info['is_numeric'] else 'Other'}") print(f" - Range: {col_info['min_value']} to {col_info['max_value']}" if col_info['is_numeric'] else " - Range: N/A") print(f" - Sample values: {col_info['sample_values'][:3]}") print() print("šŸ’” Usage: python parquet_analyzer_cli.py query --id-value [--id-column ]") return # Convert ID value to appropriate type try: # Try to convert to integer first if id_value.isdigit(): id_value = int(id_value) elif id_value.replace('.', '').replace('-', '').isdigit(): id_value = float(id_value) except ValueError: # Keep as string if conversion fails pass # Perform the query result = analyzer.query_by_id(id_value, id_column) if "error" in result: print(f"āŒ Query failed: {result['error']}") sys.exit(1) if not result['found']: print(f"āŒ {result['message']}") return # Display results print(f"āœ… Found record with {result['id_column']} = {result['id_value']}") print(f"šŸ“Š Total columns: {result['total_columns']}") print(f"šŸ“ˆ Total rows in file: {result['total_rows']}") print() print("šŸ“‹ Record Data:") print("-" * 40) for col_name, value in result['record'].items(): if isinstance(value, dict) and 'vector_type' in value: # Vector data print(f" {col_name}:") print(f" Type: {value['vector_type']}") print(f" Dimension: {value['dimension']}") print(f" Data preview: {value['data_preview'][:5]}...") elif isinstance(value, dict) and 'name' in value: # JSON data (likely a person record) print(f" {col_name}:") for key, val in value.items(): print(f" {key}: {val}") elif isinstance(value, list) and len(value) > 0 and isinstance(value[0], str): # String array data print(f" {col_name}: {value}") elif isinstance(value, list): # Array data print(f" {col_name}: {value}") elif isinstance(value, str) and value.startswith('