270 lines
7.7 KiB
Python
270 lines
7.7 KiB
Python
"""Validate requirements."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
from functools import cache
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from typing import Any
|
|
|
|
from awesomeversion import AwesomeVersion, AwesomeVersionStrategy
|
|
from tqdm import tqdm
|
|
|
|
import homeassistant.util.package as pkg_util
|
|
from script.gen_requirements_all import (
|
|
EXCLUDED_REQUIREMENTS_ALL,
|
|
normalize_package_name,
|
|
)
|
|
|
|
from .model import Config, Integration
|
|
|
|
PACKAGE_REGEX = re.compile(
|
|
r"^(?:--.+\s)?([-_,\.\w\d\[\]]+)(==|>=|<=|~=|!=|<|>|===)*(.*)$"
|
|
)
|
|
PIP_REGEX = re.compile(r"^(--.+\s)?([-_\.\w\d]+.*(?:==|>=|<=|~=|!=|<|>|===)?.*$)")
|
|
PIP_VERSION_RANGE_SEPARATOR = re.compile(r"^(==|>=|<=|~=|!=|<|>|===)?(.*)$")
|
|
|
|
|
|
def validate(integrations: dict[str, Integration], config: Config) -> None:
|
|
"""Handle requirements for integrations."""
|
|
# Check if we are doing format-only validation.
|
|
if not config.requirements:
|
|
for integration in integrations.values():
|
|
validate_requirements_format(integration)
|
|
return
|
|
|
|
# check for incompatible requirements
|
|
|
|
disable_tqdm = bool(config.specific_integrations or os.environ.get("CI"))
|
|
|
|
for integration in tqdm(integrations.values(), disable=disable_tqdm):
|
|
validate_requirements(integration)
|
|
|
|
|
|
def validate_requirements_format(integration: Integration) -> bool:
|
|
"""Validate requirements format.
|
|
|
|
Returns if valid.
|
|
"""
|
|
start_errors = len(integration.errors)
|
|
|
|
for req in integration.requirements:
|
|
if " " in req:
|
|
integration.add_error(
|
|
"requirements",
|
|
f'Requirement "{req}" contains a space',
|
|
)
|
|
continue
|
|
|
|
if not (match := PACKAGE_REGEX.match(req)):
|
|
integration.add_error(
|
|
"requirements",
|
|
f'Requirement "{req}" does not match package regex pattern',
|
|
)
|
|
continue
|
|
pkg, sep, version = match.groups()
|
|
|
|
if integration.core and sep != "==":
|
|
integration.add_error(
|
|
"requirements",
|
|
f'Requirement {req} need to be pinned "<pkg name>==<version>".',
|
|
)
|
|
continue
|
|
|
|
if not version:
|
|
continue
|
|
|
|
if integration.core:
|
|
for part in version.split(";", 1)[0].split(","):
|
|
version_part = PIP_VERSION_RANGE_SEPARATOR.match(part)
|
|
if (
|
|
version_part
|
|
and AwesomeVersion(version_part.group(2)).strategy
|
|
== AwesomeVersionStrategy.UNKNOWN
|
|
):
|
|
integration.add_error(
|
|
"requirements",
|
|
f"Unable to parse package version ({version}) for {pkg}.",
|
|
)
|
|
continue
|
|
|
|
return len(integration.errors) == start_errors
|
|
|
|
|
|
def validate_requirements(integration: Integration) -> None:
|
|
"""Validate requirements."""
|
|
if not validate_requirements_format(integration):
|
|
return
|
|
|
|
integration_requirements = set()
|
|
integration_packages = set()
|
|
for req in integration.requirements:
|
|
package = normalize_package_name(req)
|
|
if not package:
|
|
integration.add_error(
|
|
"requirements",
|
|
f"Failed to normalize package name from requirement {req}",
|
|
)
|
|
return
|
|
if package in EXCLUDED_REQUIREMENTS_ALL:
|
|
continue
|
|
integration_requirements.add(req)
|
|
integration_packages.add(package)
|
|
|
|
if integration.disabled:
|
|
return
|
|
|
|
install_ok = install_requirements(integration, integration_requirements)
|
|
|
|
if not install_ok:
|
|
return
|
|
|
|
all_integration_requirements = get_requirements(integration, integration_packages)
|
|
|
|
if integration_requirements and not all_integration_requirements:
|
|
integration.add_error(
|
|
"requirements",
|
|
f"Failed to resolve requirements {integration_requirements}",
|
|
)
|
|
return
|
|
|
|
# Check for requirements incompatible with standard library.
|
|
standard_library_violations = set()
|
|
for req in all_integration_requirements:
|
|
if req in sys.stdlib_module_names:
|
|
standard_library_violations.add(req)
|
|
|
|
if standard_library_violations:
|
|
integration.add_error(
|
|
"requirements",
|
|
(
|
|
f"Package {req} has dependencies {standard_library_violations} which "
|
|
"are not compatible with the Python standard library"
|
|
),
|
|
)
|
|
|
|
|
|
@cache
|
|
def get_pipdeptree() -> dict[str, dict[str, Any]]:
|
|
"""Get pipdeptree output. Cached on first invocation.
|
|
|
|
{
|
|
"flake8-docstring": {
|
|
"key": "flake8-docstrings",
|
|
"package_name": "flake8-docstrings",
|
|
"installed_version": "1.5.0"
|
|
"dependencies": {"flake8"}
|
|
}
|
|
}
|
|
"""
|
|
deptree = {}
|
|
|
|
for item in json.loads(
|
|
subprocess.run(
|
|
["pipdeptree", "-w", "silence", "--json"],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
).stdout
|
|
):
|
|
deptree[item["package"]["key"]] = {
|
|
**item["package"],
|
|
"dependencies": {dep["key"] for dep in item["dependencies"]},
|
|
}
|
|
return deptree
|
|
|
|
|
|
def get_requirements(integration: Integration, packages: set[str]) -> set[str]:
|
|
"""Return all (recursively) requirements for an integration."""
|
|
deptree = get_pipdeptree()
|
|
|
|
all_requirements = set()
|
|
|
|
to_check = deque(packages)
|
|
|
|
while to_check:
|
|
package = to_check.popleft()
|
|
|
|
if package in all_requirements:
|
|
continue
|
|
|
|
all_requirements.add(package)
|
|
|
|
item = deptree.get(package)
|
|
|
|
if item is None:
|
|
# Only warn if direct dependencies could not be resolved
|
|
if package in packages:
|
|
integration.add_error(
|
|
"requirements", f"Failed to resolve requirements for {package}"
|
|
)
|
|
continue
|
|
|
|
to_check.extend(item["dependencies"])
|
|
|
|
return all_requirements
|
|
|
|
|
|
def install_requirements(integration: Integration, requirements: set[str]) -> bool:
|
|
"""Install integration requirements.
|
|
|
|
Return True if successful.
|
|
"""
|
|
deptree = get_pipdeptree()
|
|
|
|
for req in requirements:
|
|
match = PIP_REGEX.search(req)
|
|
|
|
if not match:
|
|
integration.add_error(
|
|
"requirements",
|
|
f"Failed to parse requirement {req} before installation",
|
|
)
|
|
continue
|
|
|
|
install_args = match.group(1)
|
|
requirement_arg = match.group(2)
|
|
|
|
is_installed = False
|
|
|
|
normalized = normalize_package_name(requirement_arg)
|
|
|
|
if normalized and "==" in requirement_arg:
|
|
ver = requirement_arg.split("==")[-1]
|
|
item = deptree.get(normalized)
|
|
is_installed = bool(item and item["installed_version"] == ver)
|
|
|
|
if not is_installed:
|
|
try:
|
|
is_installed = pkg_util.is_installed(req)
|
|
except ValueError:
|
|
is_installed = False
|
|
|
|
if is_installed:
|
|
continue
|
|
|
|
args = ["uv", "pip", "install", "--quiet"]
|
|
if install_args:
|
|
args.append(install_args)
|
|
args.append(requirement_arg)
|
|
try:
|
|
result = subprocess.run(args, check=True, capture_output=True, text=True)
|
|
except subprocess.SubprocessError:
|
|
integration.add_error(
|
|
"requirements",
|
|
f"Requirement {req} failed to install",
|
|
)
|
|
else:
|
|
# Clear the pipdeptree cache if something got installed
|
|
if "Successfully installed" in result.stdout:
|
|
get_pipdeptree.cache_clear()
|
|
|
|
if integration.errors:
|
|
return False
|
|
|
|
return True
|