milvus/cmd/tools/binlogv2/parquet_analyzer/meta_parser.py

448 lines
15 KiB
Python

"""
Parquet Metadata Parser Component
Responsible for parsing parquet file metadata information
"""
import pyarrow.parquet as pq
import json
from pathlib import Path
from typing import Dict, List, Any, Optional
class ParquetMetaParser:
"""Parquet file Metadata parser"""
def __init__(self, file_path: str):
"""
Initialize parser
Args:
file_path: parquet file path
"""
self.file_path = Path(file_path)
self.parquet_file = None
self.metadata = None
self.schema = None
def load(self) -> bool:
"""
Load parquet file
Returns:
bool: whether loading was successful
"""
try:
self.parquet_file = pq.ParquetFile(self.file_path)
self.metadata = self.parquet_file.metadata
self.schema = self.parquet_file.schema_arrow
return True
except Exception as e:
print(f"❌ Failed to load parquet file: {e}")
return False
def get_basic_info(self) -> Dict[str, Any]:
"""
Get basic information
Returns:
Dict: dictionary containing basic file information
"""
if not self.metadata:
return {}
file_size = self.file_path.stat().st_size
return {
"name": self.file_path.name,
"size_bytes": file_size,
"size_mb": file_size / 1024 / 1024,
"num_rows": self.metadata.num_rows,
"num_columns": self.metadata.num_columns,
"num_row_groups": self.metadata.num_row_groups,
"created_by": self.metadata.created_by,
"format_version": self.metadata.format_version,
"footer_size_bytes": self.metadata.serialized_size
}
def get_file_metadata(self) -> Dict[str, str]:
"""
Get file-level metadata
Returns:
Dict: file-level metadata dictionary
"""
if not self.metadata or not self.metadata.metadata:
return {}
result = {}
for key, value in self.metadata.metadata.items():
key_str = key.decode() if isinstance(key, bytes) else key
value_str = value.decode() if isinstance(value, bytes) else value
result[key_str] = value_str
return result
def get_schema_metadata(self) -> List[Dict[str, Any]]:
"""
Get Schema-level metadata
Returns:
List: Schema-level metadata list
"""
if not self.schema:
return []
result = []
for i, field in enumerate(self.schema):
if field.metadata:
field_metadata = {}
for k, v in field.metadata.items():
k_str = k.decode() if isinstance(k, bytes) else k
v_str = v.decode() if isinstance(v, bytes) else v
field_metadata[k_str] = v_str
result.append({
"column_index": i,
"column_name": field.name,
"column_type": str(field.type),
"metadata": field_metadata
})
return result
def get_column_statistics(self) -> List[Dict[str, Any]]:
"""
Get column statistics
Returns:
List: column statistics list
"""
if not self.metadata:
return []
result = []
for i in range(self.metadata.num_columns):
col_meta = self.metadata.row_group(0).column(i)
col_stats = {
"column_name": col_meta.path_in_schema,
"compression": str(col_meta.compression),
"encodings": [str(enc) for enc in col_meta.encodings],
"file_offset": col_meta.file_offset,
"compressed_size": col_meta.total_compressed_size,
"uncompressed_size": col_meta.total_uncompressed_size
}
if col_meta.statistics:
stats = col_meta.statistics
col_stats["statistics"] = {}
if hasattr(stats, 'null_count') and stats.null_count is not None:
col_stats["statistics"]["null_count"] = stats.null_count
if hasattr(stats, 'distinct_count') and stats.distinct_count is not None:
col_stats["statistics"]["distinct_count"] = stats.distinct_count
if hasattr(stats, 'min') and stats.min is not None:
if isinstance(stats.min, bytes):
col_stats["statistics"]["min"] = stats.min.hex()
else:
col_stats["statistics"]["min"] = stats.min
if hasattr(stats, 'max') and stats.max is not None:
if isinstance(stats.max, bytes):
col_stats["statistics"]["max"] = stats.max.hex()
else:
col_stats["statistics"]["max"] = stats.max
result.append(col_stats)
return result
def get_row_group_info(self) -> List[Dict[str, Any]]:
"""
Get row group information
Returns:
List: row group information list
"""
if not self.metadata:
return []
result = []
for i in range(self.metadata.num_row_groups):
row_group = self.metadata.row_group(i)
result.append({
"rowgroup_index": i,
"num_rows": row_group.num_rows,
"total_byte_size": row_group.total_byte_size,
"num_columns": row_group.num_columns
})
return result
def parse_row_group_metadata(self, metadata_str: str) -> List[Dict[str, Any]]:
"""
Parse row group metadata string
Args:
metadata_str: metadata string in format "bytes|rows|offset;bytes|rows|offset;..."
Returns:
List: parsed row group metadata
"""
if not metadata_str:
return []
result = []
groups = metadata_str.split(';')
for i, group in enumerate(groups):
if not group.strip():
continue
parts = group.split('|')
if len(parts) >= 3:
try:
bytes_size = int(parts[0])
rows = int(parts[1])
offset = int(parts[2])
result.append({
"rowgroup_index": i,
"bytes": bytes_size,
"rows": rows,
"offset": offset
})
except (ValueError, IndexError):
print(f"⚠️ Warning: Invalid row group metadata format: {group}")
continue
return result
def format_row_group_metadata(self, metadata_str: str) -> str:
"""
Format row group metadata string to readable format
Args:
metadata_str: metadata string in format "bytes|rows|offset;bytes|rows|offset;..."
Returns:
str: formatted string
"""
parsed = self.parse_row_group_metadata(metadata_str)
if not parsed:
return "No valid row group metadata found"
lines = []
for group in parsed:
lines.append(f"row group {group['rowgroup_index']}: {group['bytes']} bytes, {group['rows']} rows, {group['offset']} offset")
return '\n'.join(lines)
def parse_group_field_id_list(self, field_list_str: str) -> List[Dict[str, Any]]:
"""
Parse group field ID list string
Args:
field_list_str: field list string in format "field_ids;field_ids;..."
Returns:
List: parsed field group metadata
"""
if not field_list_str:
return []
result = []
groups = field_list_str.split(';')
for i, group in enumerate(groups):
if not group.strip():
continue
try:
field_ids = [int(fid.strip()) for fid in group.split(',') if fid.strip()]
result.append({
"group_index": i,
"field_ids": field_ids,
"field_count": len(field_ids)
})
except (ValueError, IndexError):
print(f"⚠️ Warning: Invalid field group format: {group}")
continue
return result
def format_group_field_id_list(self, field_list_str: str) -> str:
"""
Format group field ID list string to readable format
Args:
field_list_str: field list string in format "field_ids;field_ids;..."
Returns:
str: formatted string
"""
parsed = self.parse_group_field_id_list(field_list_str)
if not parsed:
return "No valid field group metadata found"
lines = []
for group in parsed:
field_ids_str = ','.join(map(str, group['field_ids']))
lines.append(f"column group {group['group_index']}: {field_ids_str}")
return '\n'.join(lines)
def parse_custom_metadata(self, metadata_dict: Dict[str, str]) -> Dict[str, Any]:
"""
Parse custom metadata and format special fields
Args:
metadata_dict: metadata dictionary
Returns:
Dict: parsed metadata with formatted special fields
"""
result = {
"raw_metadata": metadata_dict,
"formatted_metadata": {}
}
for key, value in metadata_dict.items():
if key == "row_group_metadata":
result["formatted_metadata"]["row_group_metadata"] = {
"raw": value,
"parsed": self.parse_row_group_metadata(value),
"formatted": self.format_row_group_metadata(value)
}
elif key == "group_field_id_list":
result["formatted_metadata"]["group_field_id_list"] = {
"raw": value,
"parsed": self.parse_group_field_id_list(value),
"formatted": self.format_group_field_id_list(value)
}
else:
result["formatted_metadata"][key] = value
return result
def get_metadata_summary(self) -> Dict[str, Any]:
"""
Get metadata summary information
Returns:
Dict: metadata summary
"""
file_metadata = self.get_file_metadata()
schema_metadata = self.get_schema_metadata()
summary = {
"file_metadata_count": len(file_metadata),
"schema_metadata_count": len(schema_metadata),
"total_metadata_count": len(file_metadata) + len(schema_metadata)
}
return summary
def export_metadata(self, output_file: Optional[str] = None) -> str:
"""
Export metadata to JSON file
Args:
output_file: output file path, if None will auto-generate
Returns:
str: output file path
"""
if output_file is None:
output_file = f"{self.file_path.stem}_metadata.json"
file_metadata = self.get_file_metadata()
parsed_metadata = self.parse_custom_metadata(file_metadata)
metadata_export = {
"file_info": self.get_basic_info(),
"file_metadata": file_metadata,
"parsed_metadata": parsed_metadata,
"schema_metadata": self.get_schema_metadata(),
"column_statistics": self.get_column_statistics(),
"row_group_info": self.get_row_group_info(),
"metadata_summary": self.get_metadata_summary()
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(metadata_export, f, indent=2, ensure_ascii=False)
return output_file
def print_summary(self):
"""Print metadata summary"""
if not self.metadata:
print("❌ No parquet file loaded")
return
basic_info = self.get_basic_info()
summary = self.get_metadata_summary()
file_metadata = self.get_file_metadata()
print(f"📊 Parquet File Metadata Summary: {basic_info['name']}")
print("=" * 60)
print(f" File Size: {basic_info['size_mb']:.2f} MB")
print(f" Total Rows: {basic_info['num_rows']:,}")
print(f" Columns: {basic_info['num_columns']}")
print(f" Row Groups: {basic_info['num_row_groups']}")
print(f" Created By: {basic_info['created_by']}")
print(f" Parquet Version: {basic_info['format_version']}")
print(f" Footer Size: {basic_info['footer_size_bytes']:,} bytes")
print(f" File-level Metadata: {summary['file_metadata_count']} items")
print(f" Schema-level Metadata: {summary['schema_metadata_count']} items")
print(f" Total Metadata: {summary['total_metadata_count']} items")
# Print formatted custom metadata
if file_metadata:
parsed_metadata = self.parse_custom_metadata(file_metadata)
formatted = parsed_metadata.get("formatted_metadata", {})
if "row_group_metadata" in formatted:
print("\n📋 Row Group Metadata:")
print("-" * 30)
print(formatted["row_group_metadata"]["formatted"])
if "group_field_id_list" in formatted:
print("\n📋 Column Group Metadata:")
print("-" * 30)
print(formatted["group_field_id_list"]["formatted"])
def print_formatted_metadata(self, metadata_key: str = None):
"""
Print formatted metadata for specific keys
Args:
metadata_key: specific metadata key to format, if None prints all
"""
if not self.metadata:
print("❌ No parquet file loaded")
return
file_metadata = self.get_file_metadata()
if not file_metadata:
print("❌ No file metadata found")
return
parsed_metadata = self.parse_custom_metadata(file_metadata)
formatted = parsed_metadata.get("formatted_metadata", {})
if metadata_key:
if metadata_key in formatted:
print(f"\n📋 {metadata_key.upper()} Metadata:")
print("-" * 50)
print(formatted[metadata_key]["formatted"])
else:
print(f"❌ Metadata key '{metadata_key}' not found")
else:
# Print all formatted metadata
for key, value in formatted.items():
if isinstance(value, dict) and "formatted" in value:
print(f"\n📋 {key.upper()} Metadata:")
print("-" * 50)
print(value["formatted"])
else:
print(f"\n📋 {key.upper()}: {value}")