mirror of https://github.com/milvus-io/milvus.git
448 lines
15 KiB
Python
448 lines
15 KiB
Python
"""
|
|
Parquet Metadata Parser Component
|
|
Responsible for parsing parquet file metadata information
|
|
"""
|
|
|
|
import pyarrow.parquet as pq
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
|
|
class ParquetMetaParser:
|
|
"""Parquet file Metadata parser"""
|
|
|
|
def __init__(self, file_path: str):
|
|
"""
|
|
Initialize parser
|
|
|
|
Args:
|
|
file_path: parquet file path
|
|
"""
|
|
self.file_path = Path(file_path)
|
|
self.parquet_file = None
|
|
self.metadata = None
|
|
self.schema = None
|
|
|
|
def load(self) -> bool:
|
|
"""
|
|
Load parquet file
|
|
|
|
Returns:
|
|
bool: whether loading was successful
|
|
"""
|
|
try:
|
|
self.parquet_file = pq.ParquetFile(self.file_path)
|
|
self.metadata = self.parquet_file.metadata
|
|
self.schema = self.parquet_file.schema_arrow
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Failed to load parquet file: {e}")
|
|
return False
|
|
|
|
def get_basic_info(self) -> Dict[str, Any]:
|
|
"""
|
|
Get basic information
|
|
|
|
Returns:
|
|
Dict: dictionary containing basic file information
|
|
"""
|
|
if not self.metadata:
|
|
return {}
|
|
|
|
file_size = self.file_path.stat().st_size
|
|
return {
|
|
"name": self.file_path.name,
|
|
"size_bytes": file_size,
|
|
"size_mb": file_size / 1024 / 1024,
|
|
"num_rows": self.metadata.num_rows,
|
|
"num_columns": self.metadata.num_columns,
|
|
"num_row_groups": self.metadata.num_row_groups,
|
|
"created_by": self.metadata.created_by,
|
|
"format_version": self.metadata.format_version,
|
|
"footer_size_bytes": self.metadata.serialized_size
|
|
}
|
|
|
|
def get_file_metadata(self) -> Dict[str, str]:
|
|
"""
|
|
Get file-level metadata
|
|
|
|
Returns:
|
|
Dict: file-level metadata dictionary
|
|
"""
|
|
if not self.metadata or not self.metadata.metadata:
|
|
return {}
|
|
|
|
result = {}
|
|
for key, value in self.metadata.metadata.items():
|
|
key_str = key.decode() if isinstance(key, bytes) else key
|
|
value_str = value.decode() if isinstance(value, bytes) else value
|
|
result[key_str] = value_str
|
|
|
|
return result
|
|
|
|
def get_schema_metadata(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get Schema-level metadata
|
|
|
|
Returns:
|
|
List: Schema-level metadata list
|
|
"""
|
|
if not self.schema:
|
|
return []
|
|
|
|
result = []
|
|
for i, field in enumerate(self.schema):
|
|
if field.metadata:
|
|
field_metadata = {}
|
|
for k, v in field.metadata.items():
|
|
k_str = k.decode() if isinstance(k, bytes) else k
|
|
v_str = v.decode() if isinstance(v, bytes) else v
|
|
field_metadata[k_str] = v_str
|
|
|
|
result.append({
|
|
"column_index": i,
|
|
"column_name": field.name,
|
|
"column_type": str(field.type),
|
|
"metadata": field_metadata
|
|
})
|
|
|
|
return result
|
|
|
|
def get_column_statistics(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get column statistics
|
|
|
|
Returns:
|
|
List: column statistics list
|
|
"""
|
|
if not self.metadata:
|
|
return []
|
|
|
|
result = []
|
|
for i in range(self.metadata.num_columns):
|
|
col_meta = self.metadata.row_group(0).column(i)
|
|
col_stats = {
|
|
"column_name": col_meta.path_in_schema,
|
|
"compression": str(col_meta.compression),
|
|
"encodings": [str(enc) for enc in col_meta.encodings],
|
|
"file_offset": col_meta.file_offset,
|
|
"compressed_size": col_meta.total_compressed_size,
|
|
"uncompressed_size": col_meta.total_uncompressed_size
|
|
}
|
|
|
|
if col_meta.statistics:
|
|
stats = col_meta.statistics
|
|
col_stats["statistics"] = {}
|
|
if hasattr(stats, 'null_count') and stats.null_count is not None:
|
|
col_stats["statistics"]["null_count"] = stats.null_count
|
|
if hasattr(stats, 'distinct_count') and stats.distinct_count is not None:
|
|
col_stats["statistics"]["distinct_count"] = stats.distinct_count
|
|
if hasattr(stats, 'min') and stats.min is not None:
|
|
if isinstance(stats.min, bytes):
|
|
col_stats["statistics"]["min"] = stats.min.hex()
|
|
else:
|
|
col_stats["statistics"]["min"] = stats.min
|
|
if hasattr(stats, 'max') and stats.max is not None:
|
|
if isinstance(stats.max, bytes):
|
|
col_stats["statistics"]["max"] = stats.max.hex()
|
|
else:
|
|
col_stats["statistics"]["max"] = stats.max
|
|
|
|
result.append(col_stats)
|
|
|
|
return result
|
|
|
|
def get_row_group_info(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get row group information
|
|
|
|
Returns:
|
|
List: row group information list
|
|
"""
|
|
if not self.metadata:
|
|
return []
|
|
|
|
result = []
|
|
for i in range(self.metadata.num_row_groups):
|
|
row_group = self.metadata.row_group(i)
|
|
result.append({
|
|
"rowgroup_index": i,
|
|
"num_rows": row_group.num_rows,
|
|
"total_byte_size": row_group.total_byte_size,
|
|
"num_columns": row_group.num_columns
|
|
})
|
|
|
|
return result
|
|
|
|
def parse_row_group_metadata(self, metadata_str: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Parse row group metadata string
|
|
|
|
Args:
|
|
metadata_str: metadata string in format "bytes|rows|offset;bytes|rows|offset;..."
|
|
|
|
Returns:
|
|
List: parsed row group metadata
|
|
"""
|
|
if not metadata_str:
|
|
return []
|
|
|
|
result = []
|
|
groups = metadata_str.split(';')
|
|
|
|
for i, group in enumerate(groups):
|
|
if not group.strip():
|
|
continue
|
|
|
|
parts = group.split('|')
|
|
if len(parts) >= 3:
|
|
try:
|
|
bytes_size = int(parts[0])
|
|
rows = int(parts[1])
|
|
offset = int(parts[2])
|
|
|
|
result.append({
|
|
"rowgroup_index": i,
|
|
"bytes": bytes_size,
|
|
"rows": rows,
|
|
"offset": offset
|
|
})
|
|
except (ValueError, IndexError):
|
|
print(f"⚠️ Warning: Invalid row group metadata format: {group}")
|
|
continue
|
|
|
|
return result
|
|
|
|
def format_row_group_metadata(self, metadata_str: str) -> str:
|
|
"""
|
|
Format row group metadata string to readable format
|
|
|
|
Args:
|
|
metadata_str: metadata string in format "bytes|rows|offset;bytes|rows|offset;..."
|
|
|
|
Returns:
|
|
str: formatted string
|
|
"""
|
|
parsed = self.parse_row_group_metadata(metadata_str)
|
|
|
|
if not parsed:
|
|
return "No valid row group metadata found"
|
|
|
|
lines = []
|
|
for group in parsed:
|
|
lines.append(f"row group {group['rowgroup_index']}: {group['bytes']} bytes, {group['rows']} rows, {group['offset']} offset")
|
|
|
|
return '\n'.join(lines)
|
|
|
|
def parse_group_field_id_list(self, field_list_str: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Parse group field ID list string
|
|
|
|
Args:
|
|
field_list_str: field list string in format "field_ids;field_ids;..."
|
|
|
|
Returns:
|
|
List: parsed field group metadata
|
|
"""
|
|
if not field_list_str:
|
|
return []
|
|
|
|
result = []
|
|
groups = field_list_str.split(';')
|
|
|
|
for i, group in enumerate(groups):
|
|
if not group.strip():
|
|
continue
|
|
|
|
try:
|
|
field_ids = [int(fid.strip()) for fid in group.split(',') if fid.strip()]
|
|
|
|
result.append({
|
|
"group_index": i,
|
|
"field_ids": field_ids,
|
|
"field_count": len(field_ids)
|
|
})
|
|
except (ValueError, IndexError):
|
|
print(f"⚠️ Warning: Invalid field group format: {group}")
|
|
continue
|
|
|
|
return result
|
|
|
|
def format_group_field_id_list(self, field_list_str: str) -> str:
|
|
"""
|
|
Format group field ID list string to readable format
|
|
|
|
Args:
|
|
field_list_str: field list string in format "field_ids;field_ids;..."
|
|
|
|
Returns:
|
|
str: formatted string
|
|
"""
|
|
parsed = self.parse_group_field_id_list(field_list_str)
|
|
|
|
if not parsed:
|
|
return "No valid field group metadata found"
|
|
|
|
lines = []
|
|
for group in parsed:
|
|
field_ids_str = ','.join(map(str, group['field_ids']))
|
|
lines.append(f"column group {group['group_index']}: {field_ids_str}")
|
|
|
|
return '\n'.join(lines)
|
|
|
|
def parse_custom_metadata(self, metadata_dict: Dict[str, str]) -> Dict[str, Any]:
|
|
"""
|
|
Parse custom metadata and format special fields
|
|
|
|
Args:
|
|
metadata_dict: metadata dictionary
|
|
|
|
Returns:
|
|
Dict: parsed metadata with formatted special fields
|
|
"""
|
|
result = {
|
|
"raw_metadata": metadata_dict,
|
|
"formatted_metadata": {}
|
|
}
|
|
|
|
for key, value in metadata_dict.items():
|
|
if key == "row_group_metadata":
|
|
result["formatted_metadata"]["row_group_metadata"] = {
|
|
"raw": value,
|
|
"parsed": self.parse_row_group_metadata(value),
|
|
"formatted": self.format_row_group_metadata(value)
|
|
}
|
|
elif key == "group_field_id_list":
|
|
result["formatted_metadata"]["group_field_id_list"] = {
|
|
"raw": value,
|
|
"parsed": self.parse_group_field_id_list(value),
|
|
"formatted": self.format_group_field_id_list(value)
|
|
}
|
|
else:
|
|
result["formatted_metadata"][key] = value
|
|
|
|
return result
|
|
|
|
def get_metadata_summary(self) -> Dict[str, Any]:
|
|
"""
|
|
Get metadata summary information
|
|
|
|
Returns:
|
|
Dict: metadata summary
|
|
"""
|
|
file_metadata = self.get_file_metadata()
|
|
schema_metadata = self.get_schema_metadata()
|
|
|
|
summary = {
|
|
"file_metadata_count": len(file_metadata),
|
|
"schema_metadata_count": len(schema_metadata),
|
|
"total_metadata_count": len(file_metadata) + len(schema_metadata)
|
|
}
|
|
|
|
return summary
|
|
|
|
def export_metadata(self, output_file: Optional[str] = None) -> str:
|
|
"""
|
|
Export metadata to JSON file
|
|
|
|
Args:
|
|
output_file: output file path, if None will auto-generate
|
|
|
|
Returns:
|
|
str: output file path
|
|
"""
|
|
if output_file is None:
|
|
output_file = f"{self.file_path.stem}_metadata.json"
|
|
|
|
file_metadata = self.get_file_metadata()
|
|
parsed_metadata = self.parse_custom_metadata(file_metadata)
|
|
|
|
metadata_export = {
|
|
"file_info": self.get_basic_info(),
|
|
"file_metadata": file_metadata,
|
|
"parsed_metadata": parsed_metadata,
|
|
"schema_metadata": self.get_schema_metadata(),
|
|
"column_statistics": self.get_column_statistics(),
|
|
"row_group_info": self.get_row_group_info(),
|
|
"metadata_summary": self.get_metadata_summary()
|
|
}
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(metadata_export, f, indent=2, ensure_ascii=False)
|
|
|
|
return output_file
|
|
|
|
def print_summary(self):
|
|
"""Print metadata summary"""
|
|
if not self.metadata:
|
|
print("❌ No parquet file loaded")
|
|
return
|
|
|
|
basic_info = self.get_basic_info()
|
|
summary = self.get_metadata_summary()
|
|
file_metadata = self.get_file_metadata()
|
|
|
|
print(f"📊 Parquet File Metadata Summary: {basic_info['name']}")
|
|
print("=" * 60)
|
|
print(f" File Size: {basic_info['size_mb']:.2f} MB")
|
|
print(f" Total Rows: {basic_info['num_rows']:,}")
|
|
print(f" Columns: {basic_info['num_columns']}")
|
|
print(f" Row Groups: {basic_info['num_row_groups']}")
|
|
print(f" Created By: {basic_info['created_by']}")
|
|
print(f" Parquet Version: {basic_info['format_version']}")
|
|
print(f" Footer Size: {basic_info['footer_size_bytes']:,} bytes")
|
|
print(f" File-level Metadata: {summary['file_metadata_count']} items")
|
|
print(f" Schema-level Metadata: {summary['schema_metadata_count']} items")
|
|
print(f" Total Metadata: {summary['total_metadata_count']} items")
|
|
|
|
# Print formatted custom metadata
|
|
if file_metadata:
|
|
parsed_metadata = self.parse_custom_metadata(file_metadata)
|
|
formatted = parsed_metadata.get("formatted_metadata", {})
|
|
|
|
if "row_group_metadata" in formatted:
|
|
print("\n📋 Row Group Metadata:")
|
|
print("-" * 30)
|
|
print(formatted["row_group_metadata"]["formatted"])
|
|
|
|
if "group_field_id_list" in formatted:
|
|
print("\n📋 Column Group Metadata:")
|
|
print("-" * 30)
|
|
print(formatted["group_field_id_list"]["formatted"])
|
|
|
|
def print_formatted_metadata(self, metadata_key: str = None):
|
|
"""
|
|
Print formatted metadata for specific keys
|
|
|
|
Args:
|
|
metadata_key: specific metadata key to format, if None prints all
|
|
"""
|
|
if not self.metadata:
|
|
print("❌ No parquet file loaded")
|
|
return
|
|
|
|
file_metadata = self.get_file_metadata()
|
|
if not file_metadata:
|
|
print("❌ No file metadata found")
|
|
return
|
|
|
|
parsed_metadata = self.parse_custom_metadata(file_metadata)
|
|
formatted = parsed_metadata.get("formatted_metadata", {})
|
|
|
|
if metadata_key:
|
|
if metadata_key in formatted:
|
|
print(f"\n📋 {metadata_key.upper()} Metadata:")
|
|
print("-" * 50)
|
|
print(formatted[metadata_key]["formatted"])
|
|
else:
|
|
print(f"❌ Metadata key '{metadata_key}' not found")
|
|
else:
|
|
# Print all formatted metadata
|
|
for key, value in formatted.items():
|
|
if isinstance(value, dict) and "formatted" in value:
|
|
print(f"\n📋 {key.upper()} Metadata:")
|
|
print("-" * 50)
|
|
print(value["formatted"])
|
|
else:
|
|
print(f"\n📋 {key.upper()}: {value}") |