mirror of https://github.com/milvus-io/milvus.git
804 lines
32 KiB
Python
804 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MinIO Parquet Analyzer - Integrated Tool
|
|
Combines MinIO client with parquet_analyzer_cli.py for seamless analysis
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import subprocess
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime
|
|
|
|
try:
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
except ImportError:
|
|
print("❌ MinIO client library not found. Please install it:")
|
|
print(" pip install minio")
|
|
sys.exit(1)
|
|
|
|
|
|
class MinioParquetAnalyzer:
|
|
"""Integrated MinIO and Parquet Analyzer"""
|
|
|
|
def __init__(self, endpoint: str, port: int = 9000, secure: bool = False,
|
|
access_key: str = None, secret_key: str = None):
|
|
"""Initialize the integrated analyzer"""
|
|
self.endpoint = endpoint
|
|
self.port = port
|
|
self.secure = secure
|
|
self.access_key = access_key
|
|
self.secret_key = secret_key
|
|
|
|
# Initialize MinIO client
|
|
try:
|
|
self.client = Minio(
|
|
f"{endpoint}:{port}",
|
|
access_key=access_key,
|
|
secret_key=secret_key,
|
|
secure=secure
|
|
)
|
|
print(f"✅ Connected to MinIO server: {endpoint}:{port}")
|
|
except Exception as e:
|
|
print(f"❌ Failed to connect to MinIO server: {e}")
|
|
sys.exit(1)
|
|
|
|
# Check if parquet_analyzer_cli.py exists
|
|
self.cli_script = Path(__file__).parent / "parquet_analyzer_cli.py"
|
|
if not self.cli_script.exists():
|
|
print(f"❌ parquet_analyzer_cli.py not found at: {self.cli_script}")
|
|
sys.exit(1)
|
|
|
|
def list_buckets(self) -> List[Dict[str, Any]]:
|
|
"""List all buckets"""
|
|
try:
|
|
buckets = []
|
|
for bucket in self.client.list_buckets():
|
|
buckets.append({
|
|
'name': bucket.name,
|
|
'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None
|
|
})
|
|
return buckets
|
|
except S3Error as e:
|
|
print(f"❌ Failed to list buckets: {e}")
|
|
return []
|
|
|
|
def filter_objects(self, objects: List[Dict[str, Any]],
|
|
prefix: str = None, suffix: str = None, contains: str = None,
|
|
size_min: int = None, size_max: int = None,
|
|
date_from: str = None, date_to: str = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Filter objects based on various criteria
|
|
|
|
Args:
|
|
objects: List of objects to filter
|
|
prefix: Filter by object name prefix
|
|
suffix: Filter by object name suffix
|
|
contains: Filter by object name containing string
|
|
size_min: Minimum size in MB
|
|
size_max: Maximum size in MB
|
|
date_from: Filter objects modified after date (YYYY-MM-DD)
|
|
date_to: Filter objects modified before date (YYYY-MM-DD)
|
|
|
|
Returns:
|
|
Filtered list of objects
|
|
"""
|
|
filtered = objects
|
|
|
|
if prefix:
|
|
filtered = [obj for obj in filtered if obj['name'].startswith(prefix)]
|
|
|
|
if suffix:
|
|
filtered = [obj for obj in filtered if obj['name'].endswith(suffix)]
|
|
|
|
if contains:
|
|
# Support complex logic with parentheses, OR (comma) and AND (&) logic
|
|
filtered = self._apply_contains_filter(filtered, contains)
|
|
|
|
if size_min is not None:
|
|
size_min_bytes = size_min * 1024 * 1024
|
|
filtered = [obj for obj in filtered if obj['size'] >= size_min_bytes]
|
|
|
|
if size_max is not None:
|
|
size_max_bytes = size_max * 1024 * 1024
|
|
filtered = [obj for obj in filtered if obj['size'] <= size_max_bytes]
|
|
|
|
if date_from:
|
|
try:
|
|
from_date = datetime.datetime.fromisoformat(date_from).date()
|
|
filtered = [obj for obj in filtered
|
|
if obj['last_modified'] and
|
|
datetime.datetime.fromisoformat(obj['last_modified']).date() >= from_date]
|
|
except ValueError:
|
|
print(f"⚠️ Invalid date format for --filter-date-from: {date_from}")
|
|
|
|
if date_to:
|
|
try:
|
|
to_date = datetime.datetime.fromisoformat(date_to).date()
|
|
filtered = [obj for obj in filtered
|
|
if obj['last_modified'] and
|
|
datetime.datetime.fromisoformat(obj['last_modified']).date() <= to_date]
|
|
except ValueError:
|
|
print(f"⚠️ Invalid date format for --filter-date-to: {date_to}")
|
|
|
|
return filtered
|
|
|
|
def _apply_contains_filter(self, objects: List[Dict[str, Any]], contains_expr: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Apply complex contains filter with parentheses support
|
|
|
|
Args:
|
|
objects: List of objects to filter
|
|
contains_expr: Complex contains expression with parentheses, OR (comma), and AND (&) logic
|
|
|
|
Returns:
|
|
Filtered list of objects
|
|
"""
|
|
def evaluate_expression(expr: str, obj_name: str) -> bool:
|
|
"""Evaluate a single expression for an object name"""
|
|
expr = expr.strip()
|
|
|
|
# Handle parentheses first
|
|
if '(' in expr and ')' in expr:
|
|
# Find the innermost parentheses
|
|
start = expr.rfind('(')
|
|
end = expr.find(')', start)
|
|
if start != -1 and end != -1:
|
|
# Extract the content inside parentheses
|
|
inner_expr = expr[start+1:end]
|
|
# Evaluate the inner expression
|
|
inner_result = evaluate_expression(inner_expr, obj_name)
|
|
# Replace the parentheses expression with the result
|
|
new_expr = expr[:start] + ('true' if inner_result else 'false') + expr[end+1:]
|
|
return evaluate_expression(new_expr, obj_name)
|
|
|
|
# Handle AND logic (&)
|
|
if '&' in expr:
|
|
parts = [p.strip() for p in expr.split('&')]
|
|
return all(evaluate_expression(part, obj_name) for part in parts)
|
|
|
|
# Handle OR logic (,)
|
|
if ',' in expr:
|
|
parts = [p.strip() for p in expr.split(',')]
|
|
return any(evaluate_expression(part, obj_name) for part in parts)
|
|
|
|
# Single keyword
|
|
return expr in obj_name
|
|
|
|
return [obj for obj in objects if evaluate_expression(contains_expr, obj['name'])]
|
|
|
|
def list_parquet_files(self, bucket_name: str, prefix: str = "") -> List[Dict[str, Any]]:
|
|
"""List Parquet files in a bucket"""
|
|
try:
|
|
parquet_files = []
|
|
for obj in self.client.list_objects(bucket_name, prefix=prefix, recursive=True):
|
|
if obj.object_name.lower().endswith('.parquet'):
|
|
parquet_files.append({
|
|
'name': obj.object_name,
|
|
'size': obj.size,
|
|
'size_mb': obj.size / (1024 * 1024),
|
|
'last_modified': obj.last_modified.isoformat() if obj.last_modified else None,
|
|
'etag': obj.etag
|
|
})
|
|
return parquet_files
|
|
except S3Error as e:
|
|
print(f"❌ Failed to list objects in bucket '{bucket_name}': {e}")
|
|
return []
|
|
|
|
def download_and_analyze(self, bucket_name: str, object_name: str,
|
|
command: str = "analyze", output_file: str = None,
|
|
rows: int = 10, verbose: bool = False,
|
|
keep_local: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Download Parquet file from MinIO and analyze it
|
|
|
|
Returns:
|
|
Dictionary with analysis results and metadata
|
|
"""
|
|
result = {
|
|
'success': False,
|
|
'bucket': bucket_name,
|
|
'object': object_name,
|
|
'command': command,
|
|
'local_path': None,
|
|
'analysis_output': None,
|
|
'error': None,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
# Download the file
|
|
try:
|
|
local_path = self._download_file(bucket_name, object_name, keep_local)
|
|
if not local_path:
|
|
result['error'] = "Failed to download file"
|
|
return result
|
|
|
|
result['local_path'] = local_path
|
|
|
|
# Run analysis
|
|
analysis_result = self._run_analysis(local_path, command, output_file, rows, verbose)
|
|
result['analysis_output'] = analysis_result
|
|
|
|
if analysis_result['success']:
|
|
result['success'] = True
|
|
else:
|
|
result['error'] = analysis_result['error']
|
|
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
|
|
return result
|
|
|
|
def _download_file(self, bucket_name: str, object_name: str, keep_local: bool = False) -> Optional[str]:
|
|
"""Download file from MinIO"""
|
|
try:
|
|
if keep_local:
|
|
# Save to current directory
|
|
local_path = Path(object_name).name
|
|
else:
|
|
# Create temporary file
|
|
temp_dir = tempfile.gettempdir()
|
|
filename = Path(object_name).name
|
|
local_path = os.path.join(temp_dir, f"minio_{filename}")
|
|
|
|
print(f"📥 Downloading {object_name} from bucket {bucket_name}...")
|
|
self.client.fget_object(bucket_name, object_name, local_path)
|
|
print(f"✅ Downloaded to: {local_path}")
|
|
return local_path
|
|
|
|
except S3Error as e:
|
|
print(f"❌ Failed to download {object_name}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"❌ Unexpected error downloading {object_name}: {e}")
|
|
return None
|
|
|
|
def _run_analysis(self, local_path: str, command: str, output_file: str = None,
|
|
rows: int = 10, verbose: bool = False) -> Dict[str, Any]:
|
|
"""Run parquet_analyzer_cli.py on local file"""
|
|
result = {
|
|
'success': False,
|
|
'command': command,
|
|
'output_file': output_file,
|
|
'error': None
|
|
}
|
|
|
|
try:
|
|
# Build command
|
|
cmd = [sys.executable, str(self.cli_script), command, local_path]
|
|
|
|
# Add optional arguments
|
|
if output_file:
|
|
cmd.extend(["--output", output_file])
|
|
if rows != 10:
|
|
cmd.extend(["--rows", str(rows)])
|
|
if verbose:
|
|
cmd.append("--verbose")
|
|
|
|
print(f"🔍 Running analysis: {' '.join(cmd)}")
|
|
print("=" * 60)
|
|
|
|
# Execute the command
|
|
process = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
result['return_code'] = process.returncode
|
|
result['stdout'] = process.stdout
|
|
result['stderr'] = process.stderr
|
|
|
|
if process.returncode == 0:
|
|
result['success'] = True
|
|
print("✅ Analysis completed successfully")
|
|
else:
|
|
result['error'] = f"Analysis failed with return code: {process.returncode}"
|
|
print(f"❌ {result['error']}")
|
|
if process.stderr:
|
|
print(f"Error output: {process.stderr}")
|
|
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
print(f"❌ Failed to run analysis: {e}")
|
|
|
|
return result
|
|
|
|
def batch_analyze(self, bucket_name: str, prefix: str = "", command: str = "analyze",
|
|
output_dir: str = None, verbose: bool = False) -> List[Dict[str, Any]]:
|
|
"""
|
|
Analyze multiple Parquet files in a bucket
|
|
|
|
Args:
|
|
bucket_name: Name of the bucket
|
|
prefix: Prefix to filter files
|
|
command: Analysis command
|
|
output_dir: Directory to save output files
|
|
verbose: Verbose output
|
|
|
|
Returns:
|
|
List of analysis results
|
|
"""
|
|
print(f"🔍 Batch analyzing Parquet files in bucket '{bucket_name}'")
|
|
if prefix:
|
|
print(f"📁 Filtering by prefix: '{prefix}'")
|
|
|
|
# List Parquet files
|
|
parquet_files = self.list_parquet_files(bucket_name, prefix)
|
|
|
|
if not parquet_files:
|
|
print("❌ No Parquet files found")
|
|
return []
|
|
|
|
print(f"📊 Found {len(parquet_files)} Parquet file(s)")
|
|
|
|
# Create output directory if specified
|
|
if output_dir:
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
print(f"📁 Output directory: {output_dir}")
|
|
|
|
results = []
|
|
|
|
for i, file_info in enumerate(parquet_files, 1):
|
|
print(f"\n{'='*60}")
|
|
print(f"📊 Processing file {i}/{len(parquet_files)}: {file_info['name']}")
|
|
print(f"📏 Size: {file_info['size_mb']:.2f} MB")
|
|
|
|
# Determine output file
|
|
output_file = None
|
|
if output_dir:
|
|
base_name = Path(file_info['name']).stem
|
|
output_file = os.path.join(output_dir, f"{base_name}_{command}_result.json")
|
|
|
|
# Analyze the file
|
|
result = self.download_and_analyze(
|
|
bucket_name, file_info['name'], command, output_file, verbose=verbose
|
|
)
|
|
|
|
results.append(result)
|
|
|
|
# Clean up temporary file
|
|
if result['local_path'] and result['local_path'].startswith(tempfile.gettempdir()):
|
|
try:
|
|
os.remove(result['local_path'])
|
|
except:
|
|
pass
|
|
|
|
# Print summary
|
|
successful = sum(1 for r in results if r['success'])
|
|
failed = len(results) - successful
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"📊 Batch Analysis Summary:")
|
|
print(f" Total files: {len(results)}")
|
|
print(f" Successful: {successful}")
|
|
print(f" Failed: {failed}")
|
|
|
|
return results
|
|
|
|
def interactive_mode(self):
|
|
"""Interactive mode for browsing and analyzing files"""
|
|
print("🔍 MinIO Parquet Analyzer - Interactive Mode")
|
|
print("=" * 50)
|
|
|
|
# List buckets
|
|
buckets = self.list_buckets()
|
|
if not buckets:
|
|
print("❌ No buckets found or access denied")
|
|
return
|
|
|
|
print(f"📦 Found {len(buckets)} bucket(s):")
|
|
for i, bucket in enumerate(buckets):
|
|
print(f" {i+1}. {bucket['name']}")
|
|
|
|
# Select bucket
|
|
while True:
|
|
try:
|
|
choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip()
|
|
if choice.lower() == 'q':
|
|
return
|
|
|
|
bucket_idx = int(choice) - 1
|
|
if 0 <= bucket_idx < len(buckets):
|
|
selected_bucket = buckets[bucket_idx]['name']
|
|
break
|
|
else:
|
|
print("❌ Invalid selection")
|
|
except ValueError:
|
|
print("❌ Please enter a valid number")
|
|
|
|
# List Parquet files in selected bucket
|
|
print(f"\n📁 Parquet files in bucket '{selected_bucket}':")
|
|
parquet_files = self.list_parquet_files(selected_bucket)
|
|
|
|
if not parquet_files:
|
|
print("❌ No Parquet files found in this bucket")
|
|
return
|
|
|
|
print(f"📊 Found {len(parquet_files)} Parquet file(s):")
|
|
for i, obj in enumerate(parquet_files):
|
|
print(f" {i+1}. {obj['name']} ({obj['size_mb']:.2f} MB)")
|
|
|
|
# Select operation mode
|
|
print(f"\n🔍 Operation modes:")
|
|
print(" 1. Analyze single file")
|
|
print(" 2. Batch analyze all files")
|
|
print(" 3. Select specific files")
|
|
|
|
while True:
|
|
try:
|
|
mode_choice = input(f"\nSelect mode (1-3) or 'q' to quit: ").strip()
|
|
if mode_choice.lower() == 'q':
|
|
return
|
|
|
|
mode = int(mode_choice)
|
|
if mode in [1, 2, 3]:
|
|
break
|
|
else:
|
|
print("❌ Invalid selection")
|
|
except ValueError:
|
|
print("❌ Please enter a valid number")
|
|
|
|
if mode == 1:
|
|
# Single file analysis
|
|
self._interactive_single_file(selected_bucket, parquet_files)
|
|
elif mode == 2:
|
|
# Batch analysis
|
|
self._interactive_batch_analysis(selected_bucket, parquet_files)
|
|
elif mode == 3:
|
|
# Select specific files
|
|
self._interactive_select_files(selected_bucket, parquet_files)
|
|
|
|
def _interactive_single_file(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
|
|
"""Interactive single file analysis"""
|
|
# Select file
|
|
while True:
|
|
try:
|
|
choice = input(f"\nSelect file (1-{len(parquet_files)}) or 'q' to quit: ").strip()
|
|
if choice.lower() == 'q':
|
|
return
|
|
|
|
file_idx = int(choice) - 1
|
|
if 0 <= file_idx < len(parquet_files):
|
|
selected_file = parquet_files[file_idx]['name']
|
|
break
|
|
else:
|
|
print("❌ Invalid selection")
|
|
except ValueError:
|
|
print("❌ Please enter a valid number")
|
|
|
|
# Select analysis command
|
|
commands = ["analyze", "metadata", "vector", "export", "data"]
|
|
print(f"\n🔍 Available analysis commands:")
|
|
for i, cmd in enumerate(commands):
|
|
print(f" {i+1}. {cmd}")
|
|
|
|
while True:
|
|
try:
|
|
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
|
|
if choice.lower() == 'q':
|
|
return
|
|
|
|
cmd_idx = int(choice) - 1
|
|
if 0 <= cmd_idx < len(commands):
|
|
selected_command = commands[cmd_idx]
|
|
break
|
|
else:
|
|
print("❌ Invalid selection")
|
|
except ValueError:
|
|
print("❌ Please enter a valid number")
|
|
|
|
# Additional options
|
|
output_file = None
|
|
rows = 10
|
|
verbose = False
|
|
|
|
if selected_command in ["export", "data"]:
|
|
output_choice = input("Enter output file path (or press Enter for default): ").strip()
|
|
if output_choice:
|
|
output_file = output_choice
|
|
|
|
if selected_command == "data":
|
|
rows_choice = input("Enter number of rows to export (default: 10): ").strip()
|
|
if rows_choice:
|
|
try:
|
|
rows = int(rows_choice)
|
|
except ValueError:
|
|
print("❌ Invalid number, using default: 10")
|
|
|
|
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
|
|
verbose = verbose_choice in ['y', 'yes']
|
|
|
|
# Run analysis
|
|
print(f"\n🚀 Starting analysis...")
|
|
result = self.download_and_analyze(
|
|
bucket_name, selected_file, selected_command,
|
|
output_file, rows, verbose
|
|
)
|
|
|
|
if result['success']:
|
|
print("✅ Analysis completed successfully!")
|
|
else:
|
|
print(f"❌ Analysis failed: {result['error']}")
|
|
|
|
def _interactive_batch_analysis(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
|
|
"""Interactive batch analysis"""
|
|
print(f"\n📊 Batch analysis for {len(parquet_files)} files")
|
|
|
|
# Select command
|
|
commands = ["analyze", "metadata", "vector", "export", "data"]
|
|
print(f"\n🔍 Available analysis commands:")
|
|
for i, cmd in enumerate(commands):
|
|
print(f" {i+1}. {cmd}")
|
|
|
|
while True:
|
|
try:
|
|
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
|
|
if choice.lower() == 'q':
|
|
return
|
|
|
|
cmd_idx = int(choice) - 1
|
|
if 0 <= cmd_idx < len(commands):
|
|
selected_command = commands[cmd_idx]
|
|
break
|
|
else:
|
|
print("❌ Invalid selection")
|
|
except ValueError:
|
|
print("❌ Please enter a valid number")
|
|
|
|
# Output directory
|
|
output_dir = input("Enter output directory (or press Enter for current dir): ").strip()
|
|
if not output_dir:
|
|
output_dir = "."
|
|
|
|
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
|
|
verbose = verbose_choice in ['y', 'yes']
|
|
|
|
# Run batch analysis
|
|
print(f"\n🚀 Starting batch analysis...")
|
|
results = self.batch_analyze(bucket_name, "", selected_command, output_dir, verbose)
|
|
|
|
print(f"✅ Batch analysis completed!")
|
|
|
|
def _interactive_select_files(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
|
|
"""Interactive select specific files"""
|
|
print(f"\n📋 Select files to analyze (comma-separated numbers, e.g., 1,3,5)")
|
|
print(f"Available files:")
|
|
for i, obj in enumerate(parquet_files):
|
|
print(f" {i+1}. {obj['name']} ({obj['size_mb']:.2f} MB)")
|
|
|
|
while True:
|
|
try:
|
|
choice = input(f"\nEnter file numbers (1-{len(parquet_files)}) or 'q' to quit: ").strip()
|
|
if choice.lower() == 'q':
|
|
return
|
|
|
|
selected_indices = [int(x.strip()) - 1 for x in choice.split(',')]
|
|
if all(0 <= idx < len(parquet_files) for idx in selected_indices):
|
|
selected_files = [parquet_files[idx]['name'] for idx in selected_indices]
|
|
break
|
|
else:
|
|
print("❌ Invalid selection")
|
|
except ValueError:
|
|
print("❌ Please enter valid numbers")
|
|
|
|
# Select command
|
|
commands = ["analyze", "metadata", "vector", "export", "data"]
|
|
print(f"\n🔍 Available analysis commands:")
|
|
for i, cmd in enumerate(commands):
|
|
print(f" {i+1}. {cmd}")
|
|
|
|
while True:
|
|
try:
|
|
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
|
|
if choice.lower() == 'q':
|
|
return
|
|
|
|
cmd_idx = int(choice) - 1
|
|
if 0 <= cmd_idx < len(commands):
|
|
selected_command = commands[cmd_idx]
|
|
break
|
|
else:
|
|
print("❌ Invalid selection")
|
|
except ValueError:
|
|
print("❌ Please enter a valid number")
|
|
|
|
# Additional options
|
|
output_dir = input("Enter output directory (or press Enter for current dir): ").strip()
|
|
if not output_dir:
|
|
output_dir = "."
|
|
|
|
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
|
|
verbose = verbose_choice in ['y', 'yes']
|
|
|
|
# Run analysis for selected files
|
|
print(f"\n🚀 Starting analysis for {len(selected_files)} selected files...")
|
|
|
|
results = []
|
|
for i, file_name in enumerate(selected_files, 1):
|
|
print(f"\n📊 Processing file {i}/{len(selected_files)}: {file_name}")
|
|
|
|
# Determine output file
|
|
output_file = None
|
|
if output_dir:
|
|
base_name = Path(file_name).stem
|
|
output_file = os.path.join(output_dir, f"{base_name}_{selected_command}_result.json")
|
|
|
|
result = self.download_and_analyze(
|
|
bucket_name, file_name, selected_command, output_file, verbose=verbose
|
|
)
|
|
results.append(result)
|
|
|
|
# Print summary
|
|
successful = sum(1 for r in results if r['success'])
|
|
print(f"\n✅ Analysis completed! {successful}/{len(results)} files processed successfully.")
|
|
|
|
|
|
def main():
|
|
"""Main function"""
|
|
parser = argparse.ArgumentParser(
|
|
description="MinIO Parquet Analyzer - Integrated Tool",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Interactive mode
|
|
python minio_parquet_analyzer.py --endpoint localhost --interactive
|
|
|
|
# Analyze specific file
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --object data.parquet --command analyze
|
|
|
|
# Batch analyze all Parquet files
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --batch --command metadata --output-dir results
|
|
|
|
# List buckets
|
|
python minio_parquet_analyzer.py --endpoint localhost --list-buckets
|
|
|
|
# List Parquet files in bucket
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files
|
|
|
|
# Filter files by prefix (insert_log files only)
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-prefix "files/insert_log/"
|
|
|
|
# Filter files by size (files larger than 1MB)
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-size-min 1
|
|
|
|
# Filter files by date range
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-date-from "2024-01-01" --filter-date-to "2024-01-31"
|
|
|
|
# Combine multiple filters
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-prefix "files/" --filter-size-min 0.5 --filter-contains "insert"
|
|
|
|
# Filter with OR logic (files containing 'insert' OR 'delete')
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "insert,delete"
|
|
|
|
# Filter with AND logic (files containing 'insert' AND 'log')
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "insert&log"
|
|
|
|
# Filter with parentheses grouping ((insert OR delete) AND log)
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "(insert,delete)&log"
|
|
|
|
# Complex nested parentheses ((insert OR delete) AND (log OR bin))
|
|
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "(insert,delete)&(log,bin)"
|
|
"""
|
|
)
|
|
|
|
# Connection arguments
|
|
parser.add_argument("--endpoint", "-e", required=True, help="MinIO server endpoint")
|
|
parser.add_argument("--port", "-p", type=int, default=9000, help="MinIO server port (default: 9000)")
|
|
parser.add_argument("--secure", "-s", action="store_true", help="Use HTTPS")
|
|
parser.add_argument("--access-key", "-a", help="MinIO access key")
|
|
parser.add_argument("--secret-key", "-k", help="MinIO secret key")
|
|
|
|
# Operation arguments
|
|
parser.add_argument("--interactive", "-i", action="store_true", help="Interactive mode")
|
|
parser.add_argument("--list-buckets", "-b", action="store_true", help="List all buckets")
|
|
parser.add_argument("--bucket", help="Bucket name")
|
|
parser.add_argument("--list-files", "-l", action="store_true", help="List Parquet files in bucket")
|
|
parser.add_argument("--object", "-o", help="Object name in bucket")
|
|
parser.add_argument("--batch", action="store_true", help="Batch analyze all Parquet files in bucket")
|
|
|
|
# Filter arguments
|
|
parser.add_argument("--filter-prefix", help="Filter objects by prefix (e.g., 'files/insert_log/')")
|
|
parser.add_argument("--filter-suffix", help="Filter objects by suffix (e.g., '.parquet')")
|
|
parser.add_argument("--filter-contains", help="Filter objects containing specific string(s). Use comma for OR logic (e.g., 'insert,delete'), use & for AND logic (e.g., 'insert&log'), use () for grouping (e.g., '(insert,delete)&log')")
|
|
parser.add_argument("--filter-size-min", type=int, help="Filter objects by minimum size in MB")
|
|
parser.add_argument("--filter-size-max", type=int, help="Filter objects by maximum size in MB")
|
|
parser.add_argument("--filter-date-from", help="Filter objects modified after date (YYYY-MM-DD)")
|
|
parser.add_argument("--filter-date-to", help="Filter objects modified before date (YYYY-MM-DD)")
|
|
|
|
# Analysis arguments
|
|
parser.add_argument("--command", "-c", choices=["analyze", "metadata", "vector", "export", "data"],
|
|
default="analyze", help="Analysis command (default: analyze)")
|
|
parser.add_argument("--output", help="Output file path (for single file analysis)")
|
|
parser.add_argument("--output-dir", help="Output directory (for batch analysis)")
|
|
parser.add_argument("--rows", "-r", type=int, default=10, help="Number of rows to export (for data command)")
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
|
parser.add_argument("--keep-local", action="store_true", help="Keep downloaded files locally")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Initialize analyzer
|
|
analyzer = MinioParquetAnalyzer(
|
|
endpoint=args.endpoint,
|
|
port=args.port,
|
|
secure=args.secure,
|
|
access_key=args.access_key,
|
|
secret_key=args.secret_key
|
|
)
|
|
|
|
# Execute requested operation
|
|
if args.interactive:
|
|
analyzer.interactive_mode()
|
|
elif args.list_buckets:
|
|
buckets = analyzer.list_buckets()
|
|
if buckets:
|
|
print(f"📦 Found {len(buckets)} bucket(s):")
|
|
for bucket in buckets:
|
|
print(f" - {bucket['name']}")
|
|
else:
|
|
print("❌ No buckets found or access denied")
|
|
elif args.list_files:
|
|
if not args.bucket:
|
|
print("❌ --bucket is required for --list-files")
|
|
sys.exit(1)
|
|
|
|
files = analyzer.list_parquet_files(args.bucket)
|
|
if files:
|
|
# Apply filters if specified
|
|
original_count = len(files)
|
|
files = analyzer.filter_objects(
|
|
files,
|
|
prefix=args.filter_prefix,
|
|
suffix=args.filter_suffix,
|
|
contains=args.filter_contains,
|
|
size_min=args.filter_size_min,
|
|
size_max=args.filter_size_max,
|
|
date_from=args.filter_date_from,
|
|
date_to=args.filter_date_to
|
|
)
|
|
|
|
if original_count != len(files):
|
|
print(f"🔍 Applied filters: {original_count} → {len(files)} files")
|
|
|
|
if files:
|
|
print(f"📊 Found {len(files)} Parquet file(s) in bucket '{args.bucket}':")
|
|
for obj in files:
|
|
modified_str = ""
|
|
if obj.get('last_modified'):
|
|
modified_str = f" (modified: {obj['last_modified'][:10]})"
|
|
print(f" - {obj['name']} ({obj['size_mb']:.2f} MB){modified_str}")
|
|
else:
|
|
print("❌ No files match the specified filters")
|
|
else:
|
|
print("❌ No Parquet files found or access denied")
|
|
elif args.batch:
|
|
if not args.bucket:
|
|
print("❌ --bucket is required for --batch")
|
|
sys.exit(1)
|
|
|
|
results = analyzer.batch_analyze(
|
|
args.bucket, "", args.command, args.output_dir, args.verbose
|
|
)
|
|
|
|
if not results:
|
|
sys.exit(1)
|
|
elif args.object:
|
|
if not args.bucket:
|
|
print("❌ --bucket is required when specifying --object")
|
|
sys.exit(1)
|
|
|
|
result = analyzer.download_and_analyze(
|
|
args.bucket, args.object, args.command,
|
|
args.output, args.rows, args.verbose, args.keep_local
|
|
)
|
|
|
|
if not result['success']:
|
|
sys.exit(1)
|
|
else:
|
|
print("❌ No operation specified. Use --interactive, --list-buckets, --list-files, --batch, or specify --object")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |