milvus/cmd/tools/binlogv2/minio_client.py

688 lines
30 KiB
Python

#!/usr/bin/env python3
"""
MinIO Client for Parquet Analysis
Downloads files from MinIO and passes them to parquet_analyzer_cli.py for analysis
"""
import argparse
import sys
import os
import tempfile
import subprocess
from pathlib import Path
from typing import Optional, List, Dict, Any
import json
try:
from minio import Minio
from minio.error import S3Error
except ImportError:
print("❌ MinIO client library not found. Please install it:")
print(" pip install minio")
sys.exit(1)
class MinioParquetAnalyzer:
"""MinIO client for downloading and analyzing Parquet files and Milvus binlog files"""
def __init__(self, endpoint: str, port: int = 9001, secure: bool = False,
access_key: str = None, secret_key: str = None):
"""
Initialize MinIO client
Args:
endpoint: MinIO server endpoint (hostname/IP)
port: MinIO server port (default: 9000)
secure: Use HTTPS (default: False)
access_key: MinIO access key (optional, for public buckets can be None)
secret_key: MinIO secret key (optional, for public buckets can be None)
"""
self.endpoint = endpoint
self.port = port
self.secure = secure
self.access_key = access_key
self.secret_key = secret_key
# Initialize MinIO client
try:
self.client = Minio(
f"{endpoint}:{port}",
access_key=access_key,
secret_key=secret_key,
secure=secure
)
print(f"✅ Connected to MinIO server: {endpoint}:{port}")
except Exception as e:
print(f"❌ Failed to connect to MinIO server: {e}")
sys.exit(1)
def list_buckets(self) -> List[Dict[str, Any]]:
"""List all buckets"""
try:
buckets = []
for bucket in self.client.list_buckets():
buckets.append({
'name': bucket.name,
'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None
})
return buckets
except S3Error as e:
print(f"❌ Failed to list buckets: {e}")
return []
def list_objects(self, bucket_name: str, prefix: str = "", recursive: bool = True) -> List[Dict[str, Any]]:
"""List objects in a bucket"""
try:
objects = []
for obj in self.client.list_objects(bucket_name, prefix=prefix, recursive=recursive):
objects.append({
'name': obj.object_name,
'size': obj.size,
'last_modified': obj.last_modified.isoformat() if obj.last_modified else None,
'etag': obj.etag
})
return objects
except S3Error as e:
print(f"❌ Failed to list objects in bucket '{bucket_name}': {e}")
return []
def filter_objects(self, objects: List[Dict[str, Any]],
prefix: str = None, suffix: str = None, contains: str = None,
size_min: int = None, size_max: int = None,
date_from: str = None, date_to: str = None) -> List[Dict[str, Any]]:
"""
Filter objects based on various criteria
Args:
objects: List of objects to filter
prefix: Filter by object name prefix
suffix: Filter by object name suffix
contains: Filter by object name containing string
size_min: Minimum size in MB
size_max: Maximum size in MB
date_from: Filter objects modified after date (YYYY-MM-DD)
date_to: Filter objects modified before date (YYYY-MM-DD)
Returns:
Filtered list of objects
"""
filtered = objects
if prefix:
filtered = [obj for obj in filtered if obj['name'].startswith(prefix)]
if suffix:
filtered = [obj for obj in filtered if obj['name'].endswith(suffix)]
if contains:
# Support complex logic with parentheses, OR (comma) and AND (&) logic
filtered = self._apply_contains_filter(filtered, contains)
if size_min is not None:
size_min_bytes = size_min * 1024 * 1024
filtered = [obj for obj in filtered if obj['size'] >= size_min_bytes]
if size_max is not None:
size_max_bytes = size_max * 1024 * 1024
filtered = [obj for obj in filtered if obj['size'] <= size_max_bytes]
if date_from:
try:
from_date = datetime.datetime.fromisoformat(date_from).date()
filtered = [obj for obj in filtered
if obj['last_modified'] and
datetime.datetime.fromisoformat(obj['last_modified']).date() >= from_date]
except ValueError:
print(f"⚠️ Invalid date format for --filter-date-from: {date_from}")
if date_to:
try:
to_date = datetime.datetime.fromisoformat(date_to).date()
filtered = [obj for obj in filtered
if obj['last_modified'] and
datetime.datetime.fromisoformat(obj['last_modified']).date() <= to_date]
except ValueError:
print(f"⚠️ Invalid date format for --filter-date-to: {date_to}")
return filtered
def _apply_contains_filter(self, objects: List[Dict[str, Any]], contains_expr: str) -> List[Dict[str, Any]]:
"""
Apply complex contains filter with parentheses support
Args:
objects: List of objects to filter
contains_expr: Complex contains expression with parentheses, OR (comma), and AND (&) logic
Returns:
Filtered list of objects
"""
def evaluate_expression(expr: str, obj_name: str) -> bool:
"""Evaluate a single expression for an object name"""
expr = expr.strip()
# Handle parentheses first
if '(' in expr and ')' in expr:
# Find the innermost parentheses
start = expr.rfind('(')
end = expr.find(')', start)
if start != -1 and end != -1:
# Extract the content inside parentheses
inner_expr = expr[start+1:end]
# Evaluate the inner expression
inner_result = evaluate_expression(inner_expr, obj_name)
# Replace the parentheses expression with the result
new_expr = expr[:start] + ('true' if inner_result else 'false') + expr[end+1:]
return evaluate_expression(new_expr, obj_name)
# Handle AND logic (&)
if '&' in expr:
parts = [p.strip() for p in expr.split('&')]
return all(evaluate_expression(part, obj_name) for part in parts)
# Handle OR logic (,)
if ',' in expr:
parts = [p.strip() for p in expr.split(',')]
return any(evaluate_expression(part, obj_name) for part in parts)
# Single keyword
return expr in obj_name
return [obj for obj in objects if evaluate_expression(contains_expr, obj['name'])]
def download_file(self, bucket_name: str, object_name: str, local_path: str = None) -> Optional[str]:
"""
Download a file from MinIO
Args:
bucket_name: Name of the bucket
object_name: Name of the object in the bucket
local_path: Local path to save the file (optional, will use temp file if not provided)
Returns:
Local file path if successful, None otherwise
"""
try:
if not local_path:
# Create temporary file
temp_dir = tempfile.gettempdir()
filename = Path(object_name).name
local_path = os.path.join(temp_dir, f"minio_{filename}")
print(f"📥 Downloading {object_name} from bucket {bucket_name}...")
self.client.fget_object(bucket_name, object_name, local_path)
print(f"✅ Downloaded to: {local_path}")
return local_path
except S3Error as e:
print(f"❌ Failed to download {object_name}: {e}")
return None
except Exception as e:
print(f"❌ Unexpected error downloading {object_name}: {e}")
return None
def analyze_parquet_from_minio(self, bucket_name: str, object_name: str,
command: str = "analyze", output_file: str = None,
rows: int = 10, verbose: bool = False,
id_value: str = None, id_column: str = None) -> bool:
"""
Download Parquet file from MinIO and analyze it using parquet_analyzer_cli.py
Args:
bucket_name: Name of the bucket
object_name: Name of the object in the bucket
command: Analysis command (analyze, metadata, vector, export, data)
output_file: Output file path for export/data commands
rows: Number of rows to export (for data command)
verbose: Verbose output
Returns:
True if successful, False otherwise
"""
# Download the file
local_path = self.download_file(bucket_name, object_name)
if not local_path:
return False
try:
# Build command for parquet_analyzer_cli.py
cli_script = Path(__file__).parent / "parquet_analyzer_cli.py"
if not cli_script.exists():
print(f"❌ parquet_analyzer_cli.py not found at: {cli_script}")
return False
cmd = [sys.executable, str(cli_script), command, local_path]
# Add optional arguments
if output_file:
cmd.extend(["--output", output_file])
if rows != 10:
cmd.extend(["--rows", str(rows)])
if verbose:
cmd.append("--verbose")
if id_value:
cmd.extend(["--id-value", str(id_value)])
if id_column:
cmd.extend(["--id-column", id_column])
print(f"🔍 Running analysis command: {' '.join(cmd)}")
print("=" * 60)
# Execute the command
result = subprocess.run(cmd, capture_output=False, text=True)
if result.returncode == 0:
print("✅ Analysis completed successfully")
return True
else:
print(f"❌ Analysis failed with return code: {result.returncode}")
return False
except Exception as e:
print(f"❌ Failed to run analysis: {e}")
return False
finally:
# Clean up temporary file if it was created
if local_path and local_path.startswith(tempfile.gettempdir()):
try:
os.remove(local_path)
print(f"🧹 Cleaned up temporary file: {local_path}")
except:
pass
def interactive_mode(self):
"""Interactive mode for browsing and analyzing files"""
print("🔍 MinIO Interactive Mode")
print("=" * 40)
# List buckets
buckets = self.list_buckets()
if not buckets:
print("❌ No buckets found or access denied")
return
print(f"📦 Found {len(buckets)} bucket(s):")
for i, bucket in enumerate(buckets):
print(f" {i+1}. {bucket['name']}")
# Select bucket
while True:
try:
choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
bucket_idx = int(choice) - 1
if 0 <= bucket_idx < len(buckets):
selected_bucket = buckets[bucket_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Main interactive loop
while True:
print(f"\n📁 Current bucket: '{selected_bucket}'")
print("=" * 50)
# List objects in selected bucket
print(f"📁 Objects in bucket '{selected_bucket}':")
objects = self.list_objects(selected_bucket)
if not objects:
print("❌ No objects found in this bucket")
return
# Apply filters if user wants to
print("\n🔍 Filter options:")
print(" 1. No filter (show all)")
print(" 2. Apply custom filters")
filter_choice = input("Select filter option (1-2): ").strip()
if filter_choice == "2":
print("\n📋 Available filters:")
print(" - prefix: Filter by object name prefix")
print(" - suffix: Filter by object name suffix")
print(" - contains: Filter by object name containing string(s). Use comma for OR logic, & for AND logic, () for grouping")
print(" - size_min: Filter by minimum size in MB")
print(" - size_max: Filter by maximum size in MB")
print(" - date_from: Filter by modification date (YYYY-MM-DD)")
print(" - date_to: Filter by modification date (YYYY-MM-DD)")
prefix = input("Prefix filter (or press Enter to skip): ").strip() or None
suffix = input("Suffix filter (or press Enter to skip): ").strip() or None
contains = input("Contains filter (or press Enter to skip): ").strip() or None
size_min_str = input("Minimum size in MB (or press Enter to skip): ").strip()
size_min = int(size_min_str) if size_min_str else None
size_max_str = input("Maximum size in MB (or press Enter to skip): ").strip()
size_max = int(size_max_str) if size_max_str else None
date_from = input("Date from (YYYY-MM-DD, or press Enter to skip): ").strip() or None
date_to = input("Date to (YYYY-MM-DD, or press Enter to skip): ").strip() or None
original_count = len(objects)
objects = self.filter_objects(
objects, prefix, suffix, contains, size_min, size_max, date_from, date_to
)
if original_count != len(objects):
print(f"🔍 Applied filters: {original_count}{len(objects)} objects")
# Filter Parquet files and Milvus binlog files
# Milvus binlog files don't have .parquet extension but are actually Parquet format
parquet_files = []
for obj in objects:
name = obj['name'].lower()
# Check for .parquet files
if name.endswith('.parquet'):
parquet_files.append(obj)
# Check for Milvus binlog files (insert_log, delta_log, etc.)
elif any(log_type in name for log_type in ['insert_log', 'delta_log', 'stats_log', 'index_files']):
parquet_files.append(obj)
if not parquet_files:
print("❌ No Parquet files or Milvus binlog files found in this bucket")
return
print(f"📊 Found {len(parquet_files)} Parquet file(s):")
for i, obj in enumerate(parquet_files):
size_mb = obj['size'] / (1024 * 1024)
modified_str = ""
if obj['last_modified']:
modified_str = f" (modified: {obj['last_modified'][:10]})"
print(f" {i+1}. {obj['name']} ({size_mb:.2f} MB){modified_str}")
# Select file
while True:
try:
choice = input(f"\nSelect file (1-{len(parquet_files)}) or 'b' to change bucket or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
elif choice.lower() == 'b':
# Go back to bucket selection
break
file_idx = int(choice) - 1
if 0 <= file_idx < len(parquet_files):
selected_file = parquet_files[file_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
if choice.lower() == 'b':
# Re-select bucket
print(f"\n📦 Available buckets:")
for i, bucket in enumerate(buckets):
print(f" {i+1}. {bucket['name']}")
while True:
try:
choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
bucket_idx = int(choice) - 1
if 0 <= bucket_idx < len(buckets):
selected_bucket = buckets[bucket_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
continue
# File analysis loop
while True:
print(f"\n📄 Selected file: {selected_file}")
print("-" * 40)
# Select analysis command
commands = ["analyze", "metadata", "vector", "export", "data", "query"]
print(f"🔍 Available analysis commands:")
for i, cmd in enumerate(commands):
print(f" {i+1}. {cmd}")
while True:
try:
choice = input(f"\nSelect command (1-{len(commands)}) or 'f' to change file or 'b' to change bucket or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
elif choice.lower() == 'b':
# Go back to bucket selection
break
elif choice.lower() == 'f':
# Go back to file selection
break
cmd_idx = int(choice) - 1
if 0 <= cmd_idx < len(commands):
selected_command = commands[cmd_idx]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
if choice.lower() in ['b', 'f']:
break
# Additional options
output_file = None
rows = 10
verbose = False
id_value = None
id_column = None
if selected_command in ["export", "data"]:
output_choice = input("Enter output file path (or press Enter for default): ").strip()
if output_choice:
output_file = output_choice
if selected_command == "data":
rows_choice = input("Enter number of rows to export (default: 10): ").strip()
if rows_choice:
try:
rows = int(rows_choice)
except ValueError:
print("❌ Invalid number, using default: 10")
if selected_command == "query":
id_value_choice = input("Enter ID value to query (or press Enter to see available ID columns): ").strip()
if id_value_choice:
id_value = id_value_choice
id_column_choice = input("Enter ID column name (or press Enter for auto-detection): ").strip()
if id_column_choice:
id_column = id_column_choice
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
verbose = verbose_choice in ['y', 'yes']
# Run analysis
print(f"\n🚀 Starting analysis...")
success = self.analyze_parquet_from_minio(
selected_bucket, selected_file, selected_command,
output_file, rows, verbose, id_value, id_column
)
if success:
print("✅ Analysis completed successfully!")
else:
print("❌ Analysis failed")
# Ask if user wants to continue with the same file
continue_choice = input(f"\nContinue with the same file '{selected_file}'? (y/N): ").strip().lower()
if continue_choice not in ['y', 'yes']:
break
if choice.lower() == 'b':
continue
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="MinIO Client for Parquet Analysis and Milvus Binlog Analysis",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive mode
python minio_client.py --endpoint localhost --interactive
# Analyze specific Parquet file
python minio_client.py --endpoint localhost --bucket mybucket --object data.parquet --command analyze
# Analyze Milvus binlog file
python minio_client.py --endpoint localhost --bucket a-bucket --object files/insert_log/459761955352871853/459761955352871854/459761955353071864/0 --command analyze
# Query by ID
python minio_client.py --endpoint localhost --bucket a-bucket --object files/insert_log/459761955352871853/459761955352871854/459761955353071864/0 --command query --id-value 123
# Export metadata
python minio_client.py --endpoint localhost --bucket mybucket --object data.parquet --command export --output result.json
# List buckets
python minio_client.py --endpoint localhost --list-buckets
# List objects in bucket
python minio_client.py --endpoint localhost --bucket mybucket --list-objects
# Filter objects by prefix (insert_log files only)
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-prefix "files/insert_log/"
# Filter objects by size (files larger than 1MB)
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-size-min 1
# Filter objects by date range
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-date-from "2024-01-01" --filter-date-to "2024-01-31"
# Combine multiple filters
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-prefix "files/" --filter-size-min 0.5 --filter-contains "insert"
# Filter with OR logic (files containing 'insert' OR 'delete')
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "insert,delete"
# Filter with AND logic (files containing 'insert' AND 'log')
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "insert&log"
# Filter with parentheses grouping ((insert OR delete) AND log)
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "(insert,delete)&log"
# Complex nested parentheses ((insert OR delete) AND (log OR bin))
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "(insert,delete)&(log,bin)"
"""
)
# Connection arguments
parser.add_argument("--endpoint", "-e", required=True, help="MinIO server endpoint")
parser.add_argument("--port", "-p", type=int, default=9000, help="MinIO server port (default: 9000)")
parser.add_argument("--secure", "-s", action="store_true", help="Use HTTPS")
parser.add_argument("--access-key", "-a", help="MinIO access key")
parser.add_argument("--secret-key", "-k", help="MinIO secret key")
# Operation arguments
parser.add_argument("--interactive", "-i", action="store_true", help="Interactive mode")
parser.add_argument("--list-buckets", "-b", action="store_true", help="List all buckets")
parser.add_argument("--bucket", help="Bucket name")
parser.add_argument("--list-objects", "-l", action="store_true", help="List objects in bucket")
parser.add_argument("--object", "-o", help="Object name in bucket")
# Filter arguments
parser.add_argument("--filter-prefix", help="Filter objects by prefix (e.g., 'files/insert_log/')")
parser.add_argument("--filter-suffix", help="Filter objects by suffix (e.g., '.parquet')")
parser.add_argument("--filter-contains", help="Filter objects containing specific string(s). Use comma for OR logic (e.g., 'insert,delete'), use & for AND logic (e.g., 'insert&log'), use () for grouping (e.g., '(insert,delete)&log')")
parser.add_argument("--filter-size-min", type=int, help="Filter objects by minimum size in MB")
parser.add_argument("--filter-size-max", type=int, help="Filter objects by maximum size in MB")
parser.add_argument("--filter-date-from", help="Filter objects modified after date (YYYY-MM-DD)")
parser.add_argument("--filter-date-to", help="Filter objects modified before date (YYYY-MM-DD)")
# Analysis arguments
parser.add_argument("--command", "-c", choices=["analyze", "metadata", "vector", "export", "data", "query"],
default="analyze", help="Analysis command (default: analyze)")
parser.add_argument("--output", help="Output file path (for export/data commands)")
parser.add_argument("--rows", "-r", type=int, default=10, help="Number of rows to export (for data command)")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--id-value", "-q", help="ID value to query (for query command)")
parser.add_argument("--id-column", help="ID column name (for query command, auto-detected if not specified)")
args = parser.parse_args()
# Initialize MinIO client
client = MinioParquetAnalyzer(
endpoint=args.endpoint,
port=args.port,
secure=args.secure,
access_key=args.access_key,
secret_key=args.secret_key
)
# Execute requested operation
if args.interactive:
client.interactive_mode()
elif args.list_buckets:
buckets = client.list_buckets()
if buckets:
print(f"📦 Found {len(buckets)} bucket(s):")
for bucket in buckets:
print(f" - {bucket['name']}")
else:
print("❌ No buckets found or access denied")
elif args.list_objects:
if not args.bucket:
print("❌ --bucket is required for --list-objects")
sys.exit(1)
objects = client.list_objects(args.bucket)
if objects:
# Apply filters if specified
original_count = len(objects)
objects = client.filter_objects(
objects,
prefix=args.filter_prefix,
suffix=args.filter_suffix,
contains=args.filter_contains,
size_min=args.filter_size_min,
size_max=args.filter_size_max,
date_from=args.filter_date_from,
date_to=args.filter_date_to
)
if original_count != len(objects):
print(f"🔍 Applied filters: {original_count}{len(objects)} objects")
if objects:
print(f"📁 Found {len(objects)} object(s) in bucket '{args.bucket}':")
for obj in objects:
size_mb = obj['size'] / (1024 * 1024)
modified_str = ""
if obj['last_modified']:
modified_str = f" (modified: {obj['last_modified'][:10]})"
print(f" - {obj['name']} ({size_mb:.2f} MB){modified_str}")
else:
print("❌ No objects match the specified filters")
else:
print("❌ No objects found or access denied")
elif args.object:
if not args.bucket:
print("❌ --bucket is required when specifying --object")
sys.exit(1)
success = client.analyze_parquet_from_minio(
args.bucket, args.object, args.command,
args.output, args.rows, args.verbose, args.id_value, args.id_column
)
if not success:
sys.exit(1)
else:
print("❌ No operation specified. Use --interactive, --list-buckets, --list-objects, or specify --object")
sys.exit(1)
if __name__ == "__main__":
main()