milvus/cmd/tools/binlogv2/minio_parquet_analyzer.py

804 lines
32 KiB
Python

#!/usr/bin/env python3
"""
MinIO Parquet Analyzer - Integrated Tool
Combines MinIO client with parquet_analyzer_cli.py for seamless analysis
"""
import argparse
import sys
import os
import tempfile
import subprocess
import json
from pathlib import Path
from typing import Optional, List, Dict, Any
from datetime import datetime
try:
from minio import Minio
from minio.error import S3Error
except ImportError:
print("❌ MinIO client library not found. Please install it:")
print(" pip install minio")
sys.exit(1)
class MinioParquetAnalyzer:
"""Integrated MinIO and Parquet Analyzer"""
def __init__(self, endpoint: str, port: int = 9000, secure: bool = False,
access_key: str = None, secret_key: str = None):
"""Initialize the integrated analyzer"""
self.endpoint = endpoint
self.port = port
self.secure = secure
self.access_key = access_key
self.secret_key = secret_key
# Initialize MinIO client
try:
self.client = Minio(
f"{endpoint}:{port}",
access_key=access_key,
secret_key=secret_key,
secure=secure
)
print(f"✅ Connected to MinIO server: {endpoint}:{port}")
except Exception as e:
print(f"❌ Failed to connect to MinIO server: {e}")
sys.exit(1)
# Check if parquet_analyzer_cli.py exists
self.cli_script = Path(__file__).parent / "parquet_analyzer_cli.py"
if not self.cli_script.exists():
print(f"❌ parquet_analyzer_cli.py not found at: {self.cli_script}")
sys.exit(1)
def list_buckets(self) -> List[Dict[str, Any]]:
"""List all buckets"""
try:
buckets = []
for bucket in self.client.list_buckets():
buckets.append({
'name': bucket.name,
'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None
})
return buckets
except S3Error as e:
print(f"❌ Failed to list buckets: {e}")
return []
def filter_objects(self, objects: List[Dict[str, Any]],
prefix: str = None, suffix: str = None, contains: str = None,
size_min: int = None, size_max: int = None,
date_from: str = None, date_to: str = None) -> List[Dict[str, Any]]:
"""
Filter objects based on various criteria
Args:
objects: List of objects to filter
prefix: Filter by object name prefix
suffix: Filter by object name suffix
contains: Filter by object name containing string
size_min: Minimum size in MB
size_max: Maximum size in MB
date_from: Filter objects modified after date (YYYY-MM-DD)
date_to: Filter objects modified before date (YYYY-MM-DD)
Returns:
Filtered list of objects
"""
filtered = objects
if prefix:
filtered = [obj for obj in filtered if obj['name'].startswith(prefix)]
if suffix:
filtered = [obj for obj in filtered if obj['name'].endswith(suffix)]
if contains:
# Support complex logic with parentheses, OR (comma) and AND (&) logic
filtered = self._apply_contains_filter(filtered, contains)
if size_min is not None:
size_min_bytes = size_min * 1024 * 1024
filtered = [obj for obj in filtered if obj['size'] >= size_min_bytes]
if size_max is not None:
size_max_bytes = size_max * 1024 * 1024
filtered = [obj for obj in filtered if obj['size'] <= size_max_bytes]
if date_from:
try:
from_date = datetime.datetime.fromisoformat(date_from).date()
filtered = [obj for obj in filtered
if obj['last_modified'] and
datetime.datetime.fromisoformat(obj['last_modified']).date() >= from_date]
except ValueError:
print(f"⚠️ Invalid date format for --filter-date-from: {date_from}")
if date_to:
try:
to_date = datetime.datetime.fromisoformat(date_to).date()
filtered = [obj for obj in filtered
if obj['last_modified'] and
datetime.datetime.fromisoformat(obj['last_modified']).date() <= to_date]
except ValueError:
print(f"⚠️ Invalid date format for --filter-date-to: {date_to}")
return filtered
def _apply_contains_filter(self, objects: List[Dict[str, Any]], contains_expr: str) -> List[Dict[str, Any]]:
"""
Apply complex contains filter with parentheses support
Args:
objects: List of objects to filter
contains_expr: Complex contains expression with parentheses, OR (comma), and AND (&) logic
Returns:
Filtered list of objects
"""
def evaluate_expression(expr: str, obj_name: str) -> bool:
"""Evaluate a single expression for an object name"""
expr = expr.strip()
# Handle parentheses first
if '(' in expr and ')' in expr:
# Find the innermost parentheses
start = expr.rfind('(')
end = expr.find(')', start)
if start != -1 and end != -1:
# Extract the content inside parentheses
inner_expr = expr[start+1:end]
# Evaluate the inner expression
inner_result = evaluate_expression(inner_expr, obj_name)
# Replace the parentheses expression with the result
new_expr = expr[:start] + ('true' if inner_result else 'false') + expr[end+1:]
return evaluate_expression(new_expr, obj_name)
# Handle AND logic (&)
if '&' in expr:
parts = [p.strip() for p in expr.split('&')]
return all(evaluate_expression(part, obj_name) for part in parts)
# Handle OR logic (,)
if ',' in expr:
parts = [p.strip() for p in expr.split(',')]
return any(evaluate_expression(part, obj_name) for part in parts)
# Single keyword
return expr in obj_name
return [obj for obj in objects if evaluate_expression(contains_expr, obj['name'])]
def list_parquet_files(self, bucket_name: str, prefix: str = "") -> List[Dict[str, Any]]:
"""List Parquet files in a bucket"""
try:
parquet_files = []
for obj in self.client.list_objects(bucket_name, prefix=prefix, recursive=True):
if obj.object_name.lower().endswith('.parquet'):
parquet_files.append({
'name': obj.object_name,
'size': obj.size,
'size_mb': obj.size / (1024 * 1024),
'last_modified': obj.last_modified.isoformat() if obj.last_modified else None,
'etag': obj.etag
})
return parquet_files
except S3Error as e:
print(f"❌ Failed to list objects in bucket '{bucket_name}': {e}")
return []
def download_and_analyze(self, bucket_name: str, object_name: str,
command: str = "analyze", output_file: str = None,
rows: int = 10, verbose: bool = False,
keep_local: bool = False) -> Dict[str, Any]:
"""
Download Parquet file from MinIO and analyze it
Returns:
Dictionary with analysis results and metadata
"""
result = {
'success': False,
'bucket': bucket_name,
'object': object_name,
'command': command,
'local_path': None,
'analysis_output': None,
'error': None,
'timestamp': datetime.now().isoformat()
}
# Download the file
try:
local_path = self._download_file(bucket_name, object_name, keep_local)
if not local_path:
result['error'] = "Failed to download file"
return result
result['local_path'] = local_path
# Run analysis
analysis_result = self._run_analysis(local_path, command, output_file, rows, verbose)
result['analysis_output'] = analysis_result
if analysis_result['success']:
result['success'] = True
else:
result['error'] = analysis_result['error']
except Exception as e:
result['error'] = str(e)
return result
def _download_file(self, bucket_name: str, object_name: str, keep_local: bool = False) -> Optional[str]:
"""Download file from MinIO"""
try:
if keep_local:
# Save to current directory
local_path = Path(object_name).name
else:
# Create temporary file
temp_dir = tempfile.gettempdir()
filename = Path(object_name).name
local_path = os.path.join(temp_dir, f"minio_{filename}")
print(f"📥 Downloading {object_name} from bucket {bucket_name}...")
self.client.fget_object(bucket_name, object_name, local_path)
print(f"✅ Downloaded to: {local_path}")
return local_path
except S3Error as e:
print(f"❌ Failed to download {object_name}: {e}")
return None
except Exception as e:
print(f"❌ Unexpected error downloading {object_name}: {e}")
return None
def _run_analysis(self, local_path: str, command: str, output_file: str = None,
rows: int = 10, verbose: bool = False) -> Dict[str, Any]:
"""Run parquet_analyzer_cli.py on local file"""
result = {
'success': False,
'command': command,
'output_file': output_file,
'error': None
}
try:
# Build command
cmd = [sys.executable, str(self.cli_script), command, local_path]
# Add optional arguments
if output_file:
cmd.extend(["--output", output_file])
if rows != 10:
cmd.extend(["--rows", str(rows)])
if verbose:
cmd.append("--verbose")
print(f"🔍 Running analysis: {' '.join(cmd)}")
print("=" * 60)
# Execute the command
process = subprocess.run(cmd, capture_output=True, text=True)
result['return_code'] = process.returncode
result['stdout'] = process.stdout
result['stderr'] = process.stderr
if process.returncode == 0:
result['success'] = True
print("✅ Analysis completed successfully")
else:
result['error'] = f"Analysis failed with return code: {process.returncode}"
print(f"{result['error']}")
if process.stderr:
print(f"Error output: {process.stderr}")
except Exception as e:
result['error'] = str(e)
print(f"❌ Failed to run analysis: {e}")
return result
def batch_analyze(self, bucket_name: str, prefix: str = "", command: str = "analyze",
output_dir: str = None, verbose: bool = False) -> List[Dict[str, Any]]:
"""
Analyze multiple Parquet files in a bucket
Args:
bucket_name: Name of the bucket
prefix: Prefix to filter files
command: Analysis command
output_dir: Directory to save output files
verbose: Verbose output
Returns:
List of analysis results
"""
print(f"🔍 Batch analyzing Parquet files in bucket '{bucket_name}'")
if prefix:
print(f"📁 Filtering by prefix: '{prefix}'")
# List Parquet files
parquet_files = self.list_parquet_files(bucket_name, prefix)
if not parquet_files:
print("❌ No Parquet files found")
return []
print(f"📊 Found {len(parquet_files)} Parquet file(s)")
# Create output directory if specified
if output_dir:
os.makedirs(output_dir, exist_ok=True)
print(f"📁 Output directory: {output_dir}")
results = []
for i, file_info in enumerate(parquet_files, 1):
print(f"\n{'='*60}")
print(f"📊 Processing file {i}/{len(parquet_files)}: {file_info['name']}")
print(f"📏 Size: {file_info['size_mb']:.2f} MB")
# Determine output file
output_file = None
if output_dir:
base_name = Path(file_info['name']).stem
output_file = os.path.join(output_dir, f"{base_name}_{command}_result.json")
# Analyze the file
result = self.download_and_analyze(
bucket_name, file_info['name'], command, output_file, verbose=verbose
)
results.append(result)
# Clean up temporary file
if result['local_path'] and result['local_path'].startswith(tempfile.gettempdir()):
try:
os.remove(result['local_path'])
except:
pass
# Print summary
successful = sum(1 for r in results if r['success'])
failed = len(results) - successful
print(f"\n{'='*60}")
print(f"📊 Batch Analysis Summary:")
print(f" Total files: {len(results)}")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
return results
def interactive_mode(self):
"""Interactive mode for browsing and analyzing files"""
print("🔍 MinIO Parquet Analyzer - Interactive Mode")
print("=" * 50)
# List buckets
buckets = self.list_buckets()
if not buckets:
print("❌ No buckets found or access denied")
return
print(f"📦 Found {len(buckets)} bucket(s):")
for i, bucket in enumerate(buckets):
print(f" {i+1}. {bucket['name']}")
# Select bucket
while True:
try:
choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
bucket_idx = int(choice) - 1
if 0 <= bucket_idx < len(buckets):
selected_bucket = buckets[bucket_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# List Parquet files in selected bucket
print(f"\n📁 Parquet files in bucket '{selected_bucket}':")
parquet_files = self.list_parquet_files(selected_bucket)
if not parquet_files:
print("❌ No Parquet files found in this bucket")
return
print(f"📊 Found {len(parquet_files)} Parquet file(s):")
for i, obj in enumerate(parquet_files):
print(f" {i+1}. {obj['name']} ({obj['size_mb']:.2f} MB)")
# Select operation mode
print(f"\n🔍 Operation modes:")
print(" 1. Analyze single file")
print(" 2. Batch analyze all files")
print(" 3. Select specific files")
while True:
try:
mode_choice = input(f"\nSelect mode (1-3) or 'q' to quit: ").strip()
if mode_choice.lower() == 'q':
return
mode = int(mode_choice)
if mode in [1, 2, 3]:
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
if mode == 1:
# Single file analysis
self._interactive_single_file(selected_bucket, parquet_files)
elif mode == 2:
# Batch analysis
self._interactive_batch_analysis(selected_bucket, parquet_files)
elif mode == 3:
# Select specific files
self._interactive_select_files(selected_bucket, parquet_files)
def _interactive_single_file(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
"""Interactive single file analysis"""
# Select file
while True:
try:
choice = input(f"\nSelect file (1-{len(parquet_files)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
file_idx = int(choice) - 1
if 0 <= file_idx < len(parquet_files):
selected_file = parquet_files[file_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Select analysis command
commands = ["analyze", "metadata", "vector", "export", "data"]
print(f"\n🔍 Available analysis commands:")
for i, cmd in enumerate(commands):
print(f" {i+1}. {cmd}")
while True:
try:
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
cmd_idx = int(choice) - 1
if 0 <= cmd_idx < len(commands):
selected_command = commands[cmd_idx]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Additional options
output_file = None
rows = 10
verbose = False
if selected_command in ["export", "data"]:
output_choice = input("Enter output file path (or press Enter for default): ").strip()
if output_choice:
output_file = output_choice
if selected_command == "data":
rows_choice = input("Enter number of rows to export (default: 10): ").strip()
if rows_choice:
try:
rows = int(rows_choice)
except ValueError:
print("❌ Invalid number, using default: 10")
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
verbose = verbose_choice in ['y', 'yes']
# Run analysis
print(f"\n🚀 Starting analysis...")
result = self.download_and_analyze(
bucket_name, selected_file, selected_command,
output_file, rows, verbose
)
if result['success']:
print("✅ Analysis completed successfully!")
else:
print(f"❌ Analysis failed: {result['error']}")
def _interactive_batch_analysis(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
"""Interactive batch analysis"""
print(f"\n📊 Batch analysis for {len(parquet_files)} files")
# Select command
commands = ["analyze", "metadata", "vector", "export", "data"]
print(f"\n🔍 Available analysis commands:")
for i, cmd in enumerate(commands):
print(f" {i+1}. {cmd}")
while True:
try:
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
cmd_idx = int(choice) - 1
if 0 <= cmd_idx < len(commands):
selected_command = commands[cmd_idx]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Output directory
output_dir = input("Enter output directory (or press Enter for current dir): ").strip()
if not output_dir:
output_dir = "."
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
verbose = verbose_choice in ['y', 'yes']
# Run batch analysis
print(f"\n🚀 Starting batch analysis...")
results = self.batch_analyze(bucket_name, "", selected_command, output_dir, verbose)
print(f"✅ Batch analysis completed!")
def _interactive_select_files(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
"""Interactive select specific files"""
print(f"\n📋 Select files to analyze (comma-separated numbers, e.g., 1,3,5)")
print(f"Available files:")
for i, obj in enumerate(parquet_files):
print(f" {i+1}. {obj['name']} ({obj['size_mb']:.2f} MB)")
while True:
try:
choice = input(f"\nEnter file numbers (1-{len(parquet_files)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
selected_indices = [int(x.strip()) - 1 for x in choice.split(',')]
if all(0 <= idx < len(parquet_files) for idx in selected_indices):
selected_files = [parquet_files[idx]['name'] for idx in selected_indices]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter valid numbers")
# Select command
commands = ["analyze", "metadata", "vector", "export", "data"]
print(f"\n🔍 Available analysis commands:")
for i, cmd in enumerate(commands):
print(f" {i+1}. {cmd}")
while True:
try:
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
cmd_idx = int(choice) - 1
if 0 <= cmd_idx < len(commands):
selected_command = commands[cmd_idx]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Additional options
output_dir = input("Enter output directory (or press Enter for current dir): ").strip()
if not output_dir:
output_dir = "."
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
verbose = verbose_choice in ['y', 'yes']
# Run analysis for selected files
print(f"\n🚀 Starting analysis for {len(selected_files)} selected files...")
results = []
for i, file_name in enumerate(selected_files, 1):
print(f"\n📊 Processing file {i}/{len(selected_files)}: {file_name}")
# Determine output file
output_file = None
if output_dir:
base_name = Path(file_name).stem
output_file = os.path.join(output_dir, f"{base_name}_{selected_command}_result.json")
result = self.download_and_analyze(
bucket_name, file_name, selected_command, output_file, verbose=verbose
)
results.append(result)
# Print summary
successful = sum(1 for r in results if r['success'])
print(f"\n✅ Analysis completed! {successful}/{len(results)} files processed successfully.")
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="MinIO Parquet Analyzer - Integrated Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive mode
python minio_parquet_analyzer.py --endpoint localhost --interactive
# Analyze specific file
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --object data.parquet --command analyze
# Batch analyze all Parquet files
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --batch --command metadata --output-dir results
# List buckets
python minio_parquet_analyzer.py --endpoint localhost --list-buckets
# List Parquet files in bucket
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files
# Filter files by prefix (insert_log files only)
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-prefix "files/insert_log/"
# Filter files by size (files larger than 1MB)
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-size-min 1
# Filter files by date range
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-date-from "2024-01-01" --filter-date-to "2024-01-31"
# Combine multiple filters
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-prefix "files/" --filter-size-min 0.5 --filter-contains "insert"
# Filter with OR logic (files containing 'insert' OR 'delete')
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "insert,delete"
# Filter with AND logic (files containing 'insert' AND 'log')
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "insert&log"
# Filter with parentheses grouping ((insert OR delete) AND log)
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "(insert,delete)&log"
# Complex nested parentheses ((insert OR delete) AND (log OR bin))
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "(insert,delete)&(log,bin)"
"""
)
# Connection arguments
parser.add_argument("--endpoint", "-e", required=True, help="MinIO server endpoint")
parser.add_argument("--port", "-p", type=int, default=9000, help="MinIO server port (default: 9000)")
parser.add_argument("--secure", "-s", action="store_true", help="Use HTTPS")
parser.add_argument("--access-key", "-a", help="MinIO access key")
parser.add_argument("--secret-key", "-k", help="MinIO secret key")
# Operation arguments
parser.add_argument("--interactive", "-i", action="store_true", help="Interactive mode")
parser.add_argument("--list-buckets", "-b", action="store_true", help="List all buckets")
parser.add_argument("--bucket", help="Bucket name")
parser.add_argument("--list-files", "-l", action="store_true", help="List Parquet files in bucket")
parser.add_argument("--object", "-o", help="Object name in bucket")
parser.add_argument("--batch", action="store_true", help="Batch analyze all Parquet files in bucket")
# Filter arguments
parser.add_argument("--filter-prefix", help="Filter objects by prefix (e.g., 'files/insert_log/')")
parser.add_argument("--filter-suffix", help="Filter objects by suffix (e.g., '.parquet')")
parser.add_argument("--filter-contains", help="Filter objects containing specific string(s). Use comma for OR logic (e.g., 'insert,delete'), use & for AND logic (e.g., 'insert&log'), use () for grouping (e.g., '(insert,delete)&log')")
parser.add_argument("--filter-size-min", type=int, help="Filter objects by minimum size in MB")
parser.add_argument("--filter-size-max", type=int, help="Filter objects by maximum size in MB")
parser.add_argument("--filter-date-from", help="Filter objects modified after date (YYYY-MM-DD)")
parser.add_argument("--filter-date-to", help="Filter objects modified before date (YYYY-MM-DD)")
# Analysis arguments
parser.add_argument("--command", "-c", choices=["analyze", "metadata", "vector", "export", "data"],
default="analyze", help="Analysis command (default: analyze)")
parser.add_argument("--output", help="Output file path (for single file analysis)")
parser.add_argument("--output-dir", help="Output directory (for batch analysis)")
parser.add_argument("--rows", "-r", type=int, default=10, help="Number of rows to export (for data command)")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--keep-local", action="store_true", help="Keep downloaded files locally")
args = parser.parse_args()
# Initialize analyzer
analyzer = MinioParquetAnalyzer(
endpoint=args.endpoint,
port=args.port,
secure=args.secure,
access_key=args.access_key,
secret_key=args.secret_key
)
# Execute requested operation
if args.interactive:
analyzer.interactive_mode()
elif args.list_buckets:
buckets = analyzer.list_buckets()
if buckets:
print(f"📦 Found {len(buckets)} bucket(s):")
for bucket in buckets:
print(f" - {bucket['name']}")
else:
print("❌ No buckets found or access denied")
elif args.list_files:
if not args.bucket:
print("❌ --bucket is required for --list-files")
sys.exit(1)
files = analyzer.list_parquet_files(args.bucket)
if files:
# Apply filters if specified
original_count = len(files)
files = analyzer.filter_objects(
files,
prefix=args.filter_prefix,
suffix=args.filter_suffix,
contains=args.filter_contains,
size_min=args.filter_size_min,
size_max=args.filter_size_max,
date_from=args.filter_date_from,
date_to=args.filter_date_to
)
if original_count != len(files):
print(f"🔍 Applied filters: {original_count}{len(files)} files")
if files:
print(f"📊 Found {len(files)} Parquet file(s) in bucket '{args.bucket}':")
for obj in files:
modified_str = ""
if obj.get('last_modified'):
modified_str = f" (modified: {obj['last_modified'][:10]})"
print(f" - {obj['name']} ({obj['size_mb']:.2f} MB){modified_str}")
else:
print("❌ No files match the specified filters")
else:
print("❌ No Parquet files found or access denied")
elif args.batch:
if not args.bucket:
print("❌ --bucket is required for --batch")
sys.exit(1)
results = analyzer.batch_analyze(
args.bucket, "", args.command, args.output_dir, args.verbose
)
if not results:
sys.exit(1)
elif args.object:
if not args.bucket:
print("❌ --bucket is required when specifying --object")
sys.exit(1)
result = analyzer.download_and_analyze(
args.bucket, args.object, args.command,
args.output, args.rows, args.verbose, args.keep_local
)
if not result['success']:
sys.exit(1)
else:
print("❌ No operation specified. Use --interactive, --list-buckets, --list-files, --batch, or specify --object")
sys.exit(1)
if __name__ == "__main__":
main()