Make fetching integrations with requirements safer (#120481)
parent
b7e7905b54
commit
cef1d35e31
|
@ -4,16 +4,16 @@ from __future__ import annotations
|
|||
|
||||
import asyncio
|
||||
from collections.abc import Iterable
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, cast
|
||||
from typing import Any
|
||||
|
||||
from packaging.requirements import Requirement
|
||||
|
||||
from .core import HomeAssistant, callback
|
||||
from .exceptions import HomeAssistantError
|
||||
from .helpers import singleton
|
||||
from .helpers.typing import UNDEFINED, UndefinedType
|
||||
from .loader import Integration, IntegrationNotFound, async_get_integration
|
||||
from .util import package as pkg_util
|
||||
|
||||
|
@ -119,11 +119,6 @@ def _install_requirements_if_missing(
|
|||
return installed, failures
|
||||
|
||||
|
||||
def _set_result_unless_done(future: asyncio.Future[None]) -> None:
|
||||
if not future.done():
|
||||
future.set_result(None)
|
||||
|
||||
|
||||
class RequirementsManager:
|
||||
"""Manage requirements."""
|
||||
|
||||
|
@ -132,7 +127,7 @@ class RequirementsManager:
|
|||
self.hass = hass
|
||||
self.pip_lock = asyncio.Lock()
|
||||
self.integrations_with_reqs: dict[
|
||||
str, Integration | asyncio.Future[None] | None | UndefinedType
|
||||
str, Integration | asyncio.Future[Integration]
|
||||
] = {}
|
||||
self.install_failure_history: set[str] = set()
|
||||
self.is_installed_cache: set[str] = set()
|
||||
|
@ -151,37 +146,32 @@ class RequirementsManager:
|
|||
else:
|
||||
done.add(domain)
|
||||
|
||||
if self.hass.config.skip_pip:
|
||||
return await async_get_integration(self.hass, domain)
|
||||
|
||||
cache = self.integrations_with_reqs
|
||||
int_or_fut = cache.get(domain, UNDEFINED)
|
||||
|
||||
if isinstance(int_or_fut, asyncio.Future):
|
||||
await int_or_fut
|
||||
|
||||
# When we have waited and it's UNDEFINED, it doesn't exist
|
||||
# We don't cache that it doesn't exist, or else people can't fix it
|
||||
# and then restart, because their config will never be valid.
|
||||
if (int_or_fut := cache.get(domain, UNDEFINED)) is UNDEFINED:
|
||||
raise IntegrationNotFound(domain)
|
||||
|
||||
if int_or_fut is not UNDEFINED:
|
||||
return cast(Integration, int_or_fut)
|
||||
if int_or_fut := cache.get(domain):
|
||||
if isinstance(int_or_fut, Integration):
|
||||
return int_or_fut
|
||||
return await int_or_fut
|
||||
|
||||
future = cache[domain] = self.hass.loop.create_future()
|
||||
|
||||
try:
|
||||
integration = await async_get_integration(self.hass, domain)
|
||||
await self._async_process_integration(integration, done)
|
||||
except Exception:
|
||||
if not self.hass.config.skip_pip:
|
||||
await self._async_process_integration(integration, done)
|
||||
except BaseException as ex:
|
||||
# We do not cache failures as we want to retry, or
|
||||
# else people can't fix it and then restart, because
|
||||
# their config will never be valid.
|
||||
del cache[domain]
|
||||
future.set_exception(ex)
|
||||
with contextlib.suppress(BaseException):
|
||||
# Clear the flag as its normal that nothing
|
||||
# will wait for this future to be resolved
|
||||
# if there are no concurrent requirements fetches.
|
||||
await future
|
||||
raise
|
||||
finally:
|
||||
_set_result_unless_done(future)
|
||||
|
||||
cache[domain] = integration
|
||||
_set_result_unless_done(future)
|
||||
future.set_result(integration)
|
||||
return integration
|
||||
|
||||
async def _async_process_integration(
|
||||
|
|
Loading…
Reference in New Issue