Migrate legacy typehints in core to PEP-526 (#26403)

* Migrate legacy typehints in core to PEP-526

* Fix one type
pull/26413/head
Franck Nijhof 2019-09-04 05:36:04 +02:00 committed by Paulus Schoutsen
parent 2dc90be94f
commit 2f0eb07624
45 changed files with 221 additions and 263 deletions

View File

@ -7,7 +7,7 @@ import platform
import subprocess
import sys
import threading
from typing import List, Dict, Any, TYPE_CHECKING # noqa pylint: disable=unused-import
from typing import List, Dict, Any, TYPE_CHECKING
from homeassistant import monkey_patch
from homeassistant.const import __version__, REQUIRED_PYTHON_VER, RESTART_EXIT_CODE
@ -280,7 +280,7 @@ async def setup_and_run_hass(config_dir: str, args: argparse.Namespace) -> int:
hass = core.HomeAssistant()
if args.demo_mode:
config = {"frontend": {}, "demo": {}} # type: Dict[str, Any]
config: Dict[str, Any] = {"frontend": {}, "demo": {}}
bootstrap.async_from_config_dict(
config,
hass,

View File

@ -47,7 +47,7 @@ async def auth_manager_from_config(
else:
providers = ()
# So returned auth providers are in same order as config
provider_hash = OrderedDict() # type: _ProviderDict
provider_hash: _ProviderDict = OrderedDict()
for provider in providers:
key = (provider.type, provider.id)
provider_hash[key] = provider
@ -59,7 +59,7 @@ async def auth_manager_from_config(
else:
modules = ()
# So returned auth modules are in same order as config
module_hash = OrderedDict() # type: _MfaModuleDict
module_hash: _MfaModuleDict = OrderedDict()
for module in modules:
module_hash[module.id] = module
@ -168,11 +168,11 @@ class AuthManager:
async def async_create_user(self, name: str) -> models.User:
"""Create a user."""
kwargs = {
kwargs: Dict[str, Any] = {
"name": name,
"is_active": True,
"group_ids": [GROUP_ID_ADMIN],
} # type: Dict[str, Any]
}
if await self._user_should_be_owner():
kwargs["is_owner"] = True
@ -238,7 +238,7 @@ class AuthManager:
group_ids: Optional[List[str]] = None,
) -> None:
"""Update a user."""
kwargs = {} # type: Dict[str,Any]
kwargs: Dict[str, Any] = {}
if name is not None:
kwargs["name"] = name
if group_ids is not None:
@ -299,7 +299,7 @@ class AuthManager:
async def async_get_enabled_mfa(self, user: models.User) -> Dict[str, str]:
"""List enabled mfa modules for user."""
modules = OrderedDict() # type: Dict[str, str]
modules: Dict[str, str] = OrderedDict()
for module_id, module in self._mfa_modules.items():
if await module.async_is_user_setup(user.id):
modules[module_id] = module.name

View File

@ -4,7 +4,7 @@ from collections import OrderedDict
from datetime import timedelta
import hmac
from logging import getLogger
from typing import Any, Dict, List, Optional # noqa: F401
from typing import Any, Dict, List, Optional
from homeassistant.auth.const import ACCESS_TOKEN_EXPIRATION
from homeassistant.core import HomeAssistant, callback
@ -13,7 +13,7 @@ from homeassistant.util import dt as dt_util
from . import models
from .const import GROUP_ID_ADMIN, GROUP_ID_USER, GROUP_ID_READ_ONLY
from .permissions import PermissionLookup, system_policies
from .permissions.types import PolicyType # noqa: F401
from .permissions.types import PolicyType
STORAGE_VERSION = 1
STORAGE_KEY = "auth"
@ -34,9 +34,9 @@ class AuthStore:
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the auth store."""
self.hass = hass
self._users = None # type: Optional[Dict[str, models.User]]
self._groups = None # type: Optional[Dict[str, models.Group]]
self._perm_lookup = None # type: Optional[PermissionLookup]
self._users: Optional[Dict[str, models.User]] = None
self._groups: Optional[Dict[str, models.Group]] = None
self._perm_lookup: Optional[PermissionLookup] = None
self._store = hass.helpers.storage.Store(
STORAGE_VERSION, STORAGE_KEY, private=True
)
@ -97,13 +97,13 @@ class AuthStore:
raise ValueError(f"Invalid group specified {group_id}")
groups.append(group)
kwargs = {
kwargs: Dict[str, Any] = {
"name": name,
# Until we get group management, we just put everyone in the
# same group.
"groups": groups,
"perm_lookup": self._perm_lookup,
} # type: Dict[str, Any]
}
if is_owner is not None:
kwargs["is_owner"] = is_owner
@ -210,12 +210,12 @@ class AuthStore:
access_token_expiration: timedelta = ACCESS_TOKEN_EXPIRATION,
) -> models.RefreshToken:
"""Create a new token for a user."""
kwargs = {
kwargs: Dict[str, Any] = {
"user": user,
"client_id": client_id,
"token_type": token_type,
"access_token_expiration": access_token_expiration,
} # type: Dict[str, Any]
}
if client_name:
kwargs["client_name"] = client_name
if client_icon:
@ -307,8 +307,8 @@ class AuthStore:
self._set_defaults()
return
users = OrderedDict() # type: Dict[str, models.User]
groups = OrderedDict() # type: Dict[str, models.Group]
users: Dict[str, models.User] = OrderedDict()
groups: Dict[str, models.Group] = OrderedDict()
# Soft-migrating data as we load. We are going to make sure we have a
# read only group and an admin group. There are two states that we can
@ -325,7 +325,7 @@ class AuthStore:
# was added.
for group_dict in data.get("groups", []):
policy = None # type: Optional[PolicyType]
policy: Optional[PolicyType] = None
if group_dict["id"] == GROUP_ID_ADMIN:
has_admin_group = True
@ -503,11 +503,11 @@ class AuthStore:
groups = []
for group in self._groups.values():
g_dict = {
g_dict: Dict[str, Any] = {
"id": group.id,
# Name not read for sys groups. Kept here for backwards compat
"name": group.name,
} # type: Dict[str, Any]
}
if not group.system_generated:
g_dict["policy"] = group.policy
@ -558,7 +558,7 @@ class AuthStore:
"""Set default values for auth store."""
self._users = OrderedDict()
groups = OrderedDict() # type: Dict[str, models.Group]
groups: Dict[str, models.Group] = OrderedDict()
admin_group = _system_admin_group()
groups[admin_group.id] = admin_group
user_group = _system_user_group()

View File

@ -109,7 +109,7 @@ class SetupFlow(data_entry_flow.FlowHandler):
Return self.async_show_form(step_id='init') if user_input is None.
Return self.async_create_entry(data={'result': result}) if finish.
"""
errors = {} # type: Dict[str, str]
errors: Dict[str, str] = {}
if user_input:
result = await self._auth_module.async_setup_user(self._user_id, user_input)

View File

@ -95,7 +95,7 @@ class NotifyAuthModule(MultiFactorAuthModule):
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize the user data store."""
super().__init__(hass, config)
self._user_settings = None # type: Optional[_UsersDict]
self._user_settings: Optional[_UsersDict] = None
self._user_store = hass.helpers.storage.Store(
STORAGE_VERSION, STORAGE_KEY, private=True
)
@ -279,18 +279,18 @@ class NotifySetupFlow(SetupFlow):
"""Initialize the setup flow."""
super().__init__(auth_module, setup_schema, user_id)
# to fix typing complaint
self._auth_module = auth_module # type: NotifyAuthModule
self._auth_module: NotifyAuthModule = auth_module
self._available_notify_services = available_notify_services
self._secret = None # type: Optional[str]
self._count = None # type: Optional[int]
self._notify_service = None # type: Optional[str]
self._target = None # type: Optional[str]
self._secret: Optional[str] = None
self._count: Optional[int] = None
self._notify_service: Optional[str] = None
self._target: Optional[str] = None
async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Let user select available notify services."""
errors = {} # type: Dict[str, str]
errors: Dict[str, str] = {}
hass = self._auth_module.hass
if user_input:
@ -304,7 +304,7 @@ class NotifySetupFlow(SetupFlow):
if not self._available_notify_services:
return self.async_abort(reason="no_available_service")
schema = OrderedDict() # type: Dict[str, Any]
schema: Dict[str, Any] = OrderedDict()
schema["notify_service"] = vol.In(self._available_notify_services)
schema["target"] = vol.Optional(str)
@ -316,7 +316,7 @@ class NotifySetupFlow(SetupFlow):
self, user_input: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Verify user can recevie one-time password."""
errors = {} # type: Dict[str, str]
errors: Dict[str, str] = {}
hass = self._auth_module.hass
if user_input:

View File

@ -2,7 +2,7 @@
import asyncio
import logging
from io import BytesIO
from typing import Any, Dict, Optional, Tuple # noqa: F401
from typing import Any, Dict, Optional, Tuple
import voluptuous as vol
@ -75,7 +75,7 @@ class TotpAuthModule(MultiFactorAuthModule):
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize the user data store."""
super().__init__(hass, config)
self._users = None # type: Optional[Dict[str, str]]
self._users: Optional[Dict[str, str]] = None
self._user_store = hass.helpers.storage.Store(
STORAGE_VERSION, STORAGE_KEY, private=True
)
@ -107,7 +107,7 @@ class TotpAuthModule(MultiFactorAuthModule):
"""Create a ota_secret for user."""
import pyotp
ota_secret = secret or pyotp.random_base32() # type: str
ota_secret: str = secret or pyotp.random_base32()
self._users[user_id] = ota_secret # type: ignore
return ota_secret
@ -181,9 +181,9 @@ class TotpSetupFlow(SetupFlow):
"""Initialize the setup flow."""
super().__init__(auth_module, setup_schema, user.id)
# to fix typing complaint
self._auth_module = auth_module # type: TotpAuthModule
self._auth_module: TotpAuthModule = auth_module
self._user = user
self._ota_secret = None # type: Optional[str]
self._ota_secret: Optional[str] = None
self._url = None # type Optional[str]
self._image = None # type Optional[str]
@ -197,7 +197,7 @@ class TotpSetupFlow(SetupFlow):
"""
import pyotp
errors = {} # type: Dict[str, str]
errors: Dict[str, str] = {}
if user_input:
verified = await self.hass.async_add_executor_job( # type: ignore

View File

@ -1,6 +1,6 @@
"""Auth models."""
from datetime import datetime, timedelta
from typing import Dict, List, NamedTuple, Optional # noqa: F401
from typing import Dict, List, NamedTuple, Optional
import uuid
import attr
@ -31,9 +31,7 @@ class User:
"""A user."""
name = attr.ib(type=str) # type: Optional[str]
perm_lookup = attr.ib(
type=perm_mdl.PermissionLookup, cmp=False
) # type: perm_mdl.PermissionLookup
perm_lookup = attr.ib(type=perm_mdl.PermissionLookup, cmp=False)
id = attr.ib(type=str, factory=lambda: uuid.uuid4().hex)
is_owner = attr.ib(type=bool, default=False)
is_active = attr.ib(type=bool, default=False)

View File

@ -1,6 +1,6 @@
"""Entity permissions."""
from collections import OrderedDict
from typing import Callable, Optional # noqa: F401
from typing import Callable, Optional
import voluptuous as vol
@ -8,8 +8,7 @@ from .const import SUBCAT_ALL, POLICY_READ, POLICY_CONTROL, POLICY_EDIT
from .models import PermissionLookup
from .types import CategoryType, SubCategoryDict, ValueType
# pylint: disable=unused-import
from .util import SubCatLookupType, lookup_all, compile_policy # noqa
from .util import SubCatLookupType, lookup_all, compile_policy
SINGLE_ENTITY_SCHEMA = vol.Any(
True,
@ -90,7 +89,7 @@ def compile_entities(
policy: CategoryType, perm_lookup: PermissionLookup
) -> Callable[[str, str], bool]:
"""Compile policy into a function that tests policy."""
subcategories = OrderedDict() # type: SubCatLookupType
subcategories: SubCatLookupType = OrderedDict()
subcategories[ENTITY_ENTITY_IDS] = _lookup_entity_id
subcategories[ENTITY_DEVICE_IDS] = _lookup_device
subcategories[ENTITY_AREAS] = _lookup_area

View File

@ -1,13 +1,13 @@
"""Merging of policies."""
from typing import cast, Dict, List, Set # noqa: F401
from typing import cast, Dict, List, Set
from .types import PolicyType, CategoryType
def merge_policies(policies: List[PolicyType]) -> PolicyType:
"""Merge policies."""
new_policy = {} # type: Dict[str, CategoryType]
seen = set() # type: Set[str]
new_policy: Dict[str, CategoryType] = {}
seen: Set[str] = set()
for policy in policies:
for category in policy:
if category in seen:
@ -33,8 +33,8 @@ def _merge_policies(sources: List[CategoryType]) -> CategoryType:
# If there are multiple sources with a dict as policy, we recursively
# merge each key in the source.
policy = None # type: CategoryType
seen = set() # type: Set[str]
policy: CategoryType = None
seen: Set[str] = set()
for source in sources:
if source is None:
continue

View File

@ -1,7 +1,7 @@
"""Helpers to deal with permissions."""
from functools import wraps
from typing import Callable, Dict, List, Optional, cast # noqa: F401
from typing import Callable, Dict, List, Optional, cast
from .const import SUBCAT_ALL
from .models import PermissionLookup
@ -45,7 +45,7 @@ def compile_policy(
assert isinstance(policy, dict)
funcs = [] # type: List[Callable[[str, str], Optional[bool]]]
funcs: List[Callable[[str, str], Optional[bool]]] = []
for key, lookup_func in subcategories.items():
lookup_value = policy.get(key)
@ -85,7 +85,7 @@ def _gen_dict_test_func(
def test_value(object_id: str, key: str) -> Optional[bool]:
"""Test if permission is allowed based on the keys."""
schema = lookup_func(perm_lookup, lookup_dict, object_id) # type: ValueType
schema: ValueType = lookup_func(perm_lookup, lookup_dict, object_id)
if schema is None or isinstance(schema, bool):
return schema

View File

@ -16,7 +16,7 @@ from homeassistant.util.decorator import Registry
from ..auth_store import AuthStore
from ..const import MFA_SESSION_EXPIRATION
from ..models import Credentials, User, UserMeta # noqa: F401
from ..models import Credentials, User, UserMeta
_LOGGER = logging.getLogger(__name__)
DATA_REQS = "auth_prov_reqs_processed"
@ -175,12 +175,12 @@ class LoginFlow(data_entry_flow.FlowHandler):
def __init__(self, auth_provider: AuthProvider) -> None:
"""Initialize the login flow."""
self._auth_provider = auth_provider
self._auth_module_id = None # type: Optional[str]
self._auth_module_id: Optional[str] = None
self._auth_manager = auth_provider.hass.auth # type: ignore
self.available_mfa_modules = {} # type: Dict[str, str]
self.available_mfa_modules: Dict[str, str] = {}
self.created_at = dt_util.utcnow()
self.invalid_mfa_times = 0
self.user = None # type: Optional[User]
self.user: Optional[User] = None
async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None
@ -255,10 +255,10 @@ class LoginFlow(data_entry_flow.FlowHandler):
if not errors:
return await self.async_finish(self.user)
description_placeholders = {
description_placeholders: Dict[str, Optional[str]] = {
"mfa_module_name": auth_module.name,
"mfa_module_id": auth_module.id,
} # type: Dict[str, Optional[str]]
}
return self.async_show_form(
step_id="mfa",

View File

@ -53,7 +53,7 @@ class CommandLineAuthProvider(AuthProvider):
attributes provided by external programs.
"""
super().__init__(*args, **kwargs)
self._user_meta = {} # type: Dict[str, Dict[str, Any]]
self._user_meta: Dict[str, Dict[str, Any]] = {}
async def async_login_flow(self, context: Optional[dict]) -> LoginFlow:
"""Return a flow to login."""
@ -85,7 +85,7 @@ class CommandLineAuthProvider(AuthProvider):
raise InvalidAuthError
if self.config[CONF_META]:
meta = {} # type: Dict[str, str]
meta: Dict[str, str] = {}
for _line in stdout.splitlines():
try:
line = _line.decode().lstrip()
@ -146,7 +146,7 @@ class CommandLineLoginFlow(LoginFlow):
user_input.pop("password")
return await self.async_finish(user_input)
schema = collections.OrderedDict() # type: Dict[str, type]
schema: Dict[str, type] = collections.OrderedDict()
schema["username"] = str
schema["password"] = str

View File

@ -4,7 +4,7 @@ import base64
from collections import OrderedDict
import logging
from typing import Any, Dict, List, Optional, Set, cast # noqa: F401
from typing import Any, Dict, List, Optional, Set, cast
import bcrypt
import voluptuous as vol
@ -53,7 +53,7 @@ class Data:
self._store = hass.helpers.storage.Store(
STORAGE_VERSION, STORAGE_KEY, private=True
)
self._data = None # type: Optional[Dict[str, Any]]
self._data: Optional[Dict[str, Any]] = None
# Legacy mode will allow usernames to start/end with whitespace
# and will compare usernames case-insensitive.
# Remove in 2020 or when we launch 1.0.
@ -74,7 +74,7 @@ class Data:
if data is None:
data = {"users": []}
seen = set() # type: Set[str]
seen: Set[str] = set()
for user in data["users"]:
username = user["username"]
@ -210,7 +210,7 @@ class HassAuthProvider(AuthProvider):
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize an Home Assistant auth provider."""
super().__init__(*args, **kwargs)
self.data = None # type: Optional[Data]
self.data: Optional[Data] = None
self._init_lock = asyncio.Lock()
async def async_initialize(self) -> None:
@ -296,7 +296,7 @@ class HassLoginFlow(LoginFlow):
user_input.pop("password")
return await self.async_finish(user_input)
schema = OrderedDict() # type: Dict[str, type]
schema: Dict[str, type] = OrderedDict()
schema["username"] = str
schema["password"] = str

View File

@ -112,7 +112,7 @@ class ExampleLoginFlow(LoginFlow):
user_input.pop("password")
return await self.async_finish(user_input)
schema = OrderedDict() # type: Dict[str, type]
schema: Dict[str, type] = OrderedDict()
schema["username"] = str
schema["password"] = str

View File

@ -206,9 +206,9 @@ def async_enable_logging(
):
if log_rotate_days:
err_handler = logging.handlers.TimedRotatingFileHandler(
err_handler: logging.FileHandler = logging.handlers.TimedRotatingFileHandler(
err_log_path, when="midnight", backupCount=log_rotate_days
) # type: logging.FileHandler
)
else:
err_handler = logging.FileHandler(err_log_path, mode="w", delay=True)
@ -335,7 +335,7 @@ async def _async_set_up_integrations(
)
# Load all integrations
after_dependencies = {} # type: Dict[str, Set[str]]
after_dependencies: Dict[str, Set[str]] = {}
for int_or_exc in await asyncio.gather(
*(loader.async_get_integration(hass, domain) for domain in stage_2_domains),

View File

@ -532,7 +532,7 @@ class Device(RestoreEntity):
class DeviceScanner:
"""Device scanner object."""
hass = None # type: HomeAssistantType
hass: HomeAssistantType = None
def scan_devices(self) -> List[str]:
"""Scan for devices."""

View File

@ -7,17 +7,7 @@ import logging
import os
import re
import shutil
from typing import ( # noqa: F401 pylint: disable=unused-import
Any,
Tuple,
Optional,
Dict,
List,
Union,
Callable,
Sequence,
Set,
)
from typing import Any, Tuple, Optional, Dict, Union, Callable, Sequence, Set
from types import ModuleType
import voluptuous as vol
from voluptuous.humanize import humanize_error
@ -118,7 +108,7 @@ def _no_duplicate_auth_provider(
Each type of auth provider can only have one config without optional id.
Unique id is required if same type of auth provider used multiple times.
"""
config_keys = set() # type: Set[Tuple[str, Optional[str]]]
config_keys: Set[Tuple[str, Optional[str]]] = set()
for config in configs:
key = (config[CONF_TYPE], config.get(CONF_ID))
if key in config_keys:
@ -142,7 +132,7 @@ def _no_duplicate_auth_mfa_module(
times.
Note: this is different than auth provider
"""
config_keys = set() # type: Set[str]
config_keys: Set[str] = set()
for config in configs:
key = config.get(CONF_ID, config[CONF_TYPE])
if key in config_keys:
@ -623,7 +613,7 @@ def _identify_config_schema(module: ModuleType) -> Tuple[Optional[str], Optional
def _recursive_merge(conf: Dict[str, Any], package: Dict[str, Any]) -> Union[bool, str]:
"""Merge package into conf, recursively."""
error = False # type: Union[bool, str]
error: Union[bool, str] = False
for key, pack_conf in package.items():
if isinstance(pack_conf, dict):
if not pack_conf:

View File

@ -138,10 +138,10 @@ class ConfigEntry:
self.state = state
# Listeners to call on update
self.update_listeners = [] # type: list
self.update_listeners: List = []
# Function to cancel a scheduled retry
self._async_cancel_retry_setup = None # type: Optional[Callable[[], Any]]
self._async_cancel_retry_setup: Optional[Callable[[], Any]] = None
async def async_setup(
self,
@ -386,14 +386,14 @@ class ConfigEntries:
)
self.options = OptionsFlowManager(hass)
self._hass_config = hass_config
self._entries = [] # type: List[ConfigEntry]
self._entries: List[ConfigEntry] = []
self._store = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY)
EntityRegistryDisabledHandler(hass).async_setup()
@callback
def async_domains(self) -> List[str]:
"""Return domains for which we have entries."""
seen = set() # type: Set[str]
seen: Set[str] = set()
result = []
for entry in self._entries:

View File

@ -260,8 +260,8 @@ ATTR_ICON = "icon"
# The unit of measurement if applicable
ATTR_UNIT_OF_MEASUREMENT = "unit_of_measurement"
CONF_UNIT_SYSTEM_METRIC = "metric" # type: str
CONF_UNIT_SYSTEM_IMPERIAL = "imperial" # type: str
CONF_UNIT_SYSTEM_METRIC: str = "metric"
CONF_UNIT_SYSTEM_IMPERIAL: str = "imperial"
# Electrical attributes
ATTR_VOLTAGE = "voltage"
@ -334,39 +334,39 @@ TEMP_CELSIUS = "°C"
TEMP_FAHRENHEIT = "°F"
# Length units
LENGTH_CENTIMETERS = "cm" # type: str
LENGTH_METERS = "m" # type: str
LENGTH_KILOMETERS = "km" # type: str
LENGTH_CENTIMETERS: str = "cm"
LENGTH_METERS: str = "m"
LENGTH_KILOMETERS: str = "km"
LENGTH_INCHES = "in" # type: str
LENGTH_FEET = "ft" # type: str
LENGTH_YARD = "yd" # type: str
LENGTH_MILES = "mi" # type: str
LENGTH_INCHES: str = "in"
LENGTH_FEET: str = "ft"
LENGTH_YARD: str = "yd"
LENGTH_MILES: str = "mi"
# Pressure units
PRESSURE_PA = "Pa" # type: str
PRESSURE_HPA = "hPa" # type: str
PRESSURE_BAR = "bar" # type: str
PRESSURE_MBAR = "mbar" # type: str
PRESSURE_INHG = "inHg" # type: str
PRESSURE_PSI = "psi" # type: str
PRESSURE_PA: str = "Pa"
PRESSURE_HPA: str = "hPa"
PRESSURE_BAR: str = "bar"
PRESSURE_MBAR: str = "mbar"
PRESSURE_INHG: str = "inHg"
PRESSURE_PSI: str = "psi"
# Volume units
VOLUME_LITERS = "L" # type: str
VOLUME_MILLILITERS = "mL" # type: str
VOLUME_LITERS: str = "L"
VOLUME_MILLILITERS: str = "mL"
VOLUME_GALLONS = "gal" # type: str
VOLUME_FLUID_OUNCE = "fl. oz." # type: str
VOLUME_GALLONS: str = "gal"
VOLUME_FLUID_OUNCE: str = "fl. oz."
# Mass units
MASS_GRAMS = "g" # type: str
MASS_KILOGRAMS = "kg" # type: str
MASS_GRAMS: str = "g"
MASS_KILOGRAMS: str = "kg"
MASS_OUNCES = "oz" # type: str
MASS_POUNDS = "lb" # type: str
MASS_OUNCES: str = "oz"
MASS_POUNDS: str = "lb"
# UV Index units
UNIT_UV_INDEX = "UV index" # type: str
UNIT_UV_INDEX: str = "UV index"
# #### SERVICES ####
SERVICE_HOMEASSISTANT_STOP = "stop"
@ -460,15 +460,15 @@ CONTENT_TYPE_TEXT_PLAIN = "text/plain"
# The exit code to send to request a restart
RESTART_EXIT_CODE = 100
UNIT_NOT_RECOGNIZED_TEMPLATE = "{} is not a recognized {} unit." # type: str
UNIT_NOT_RECOGNIZED_TEMPLATE: str = "{} is not a recognized {} unit."
LENGTH = "length" # type: str
MASS = "mass" # type: str
PRESSURE = "pressure" # type: str
VOLUME = "volume" # type: str
TEMPERATURE = "temperature" # type: str
SPEED_MS = "speed_ms" # type: str
ILLUMINANCE = "illuminance" # type: str
LENGTH: str = "length"
MASS: str = "mass"
PRESSURE: str = "pressure"
VOLUME: str = "volume"
TEMPERATURE: str = "temperature"
SPEED_MS: str = "speed_ms"
ILLUMINANCE: str = "illuminance"
WEEKDAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]

View File

@ -17,7 +17,7 @@ from time import monotonic
import uuid
from types import MappingProxyType
from typing import ( # noqa: F401 pylint: disable=unused-import
from typing import (
Optional,
Any,
Callable,
@ -28,7 +28,6 @@ from typing import ( # noqa: F401 pylint: disable=unused-import
Set,
TYPE_CHECKING,
Awaitable,
Iterator,
)
from async_timeout import timeout
@ -170,10 +169,10 @@ class HomeAssistant:
"""Initialize new Home Assistant object."""
self.loop: asyncio.events.AbstractEventLoop = (loop or asyncio.get_event_loop())
executor_opts = {
executor_opts: Dict[str, Any] = {
"max_workers": None,
"thread_name_prefix": "SyncWorker",
} # type: Dict[str, Any]
}
self.executor = ThreadPoolExecutor(**executor_opts)
self.loop.set_default_executor(self.executor)
@ -733,7 +732,7 @@ class State:
)
self.entity_id = entity_id.lower()
self.state = state # type: str
self.state: str = state
self.attributes = MappingProxyType(attributes or {})
self.last_updated = last_updated or dt_util.utcnow()
self.last_changed = last_changed or self.last_updated
@ -836,7 +835,7 @@ class StateMachine:
def __init__(self, bus: EventBus, loop: asyncio.events.AbstractEventLoop) -> None:
"""Initialize state machine."""
self._states = {} # type: Dict[str, State]
self._states: Dict[str, State] = {}
self._bus = bus
self._loop = loop
@ -1050,7 +1049,7 @@ class ServiceRegistry:
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize a service registry."""
self._services = {} # type: Dict[str, Dict[str, Service]]
self._services: Dict[str, Dict[str, Service]] = {}
self._hass = hass
@property
@ -1269,29 +1268,29 @@ class Config:
"""Initialize a new config object."""
self.hass = hass
self.latitude = 0 # type: float
self.longitude = 0 # type: float
self.elevation = 0 # type: int
self.location_name = "Home" # type: str
self.time_zone = dt_util.UTC # type: datetime.tzinfo
self.units = METRIC_SYSTEM # type: UnitSystem
self.latitude: float = 0
self.longitude: float = 0
self.elevation: int = 0
self.location_name: str = "Home"
self.time_zone: datetime.tzinfo = dt_util.UTC
self.units: UnitSystem = METRIC_SYSTEM
self.config_source = "default" # type: str
self.config_source: str = "default"
# If True, pip install is skipped for requirements on startup
self.skip_pip = False # type: bool
self.skip_pip: bool = False
# List of loaded components
self.components = set() # type: set
self.components: set = set()
# API (HTTP) server configuration, see components.http.ApiConfig
self.api = None # type: Optional[Any]
self.api: Optional[Any] = None
# Directory that holds the configuration
self.config_dir = None # type: Optional[str]
self.config_dir: Optional[str] = None
# List of allowed external dirs to access
self.whitelist_external_dirs = set() # type: Set[str]
self.whitelist_external_dirs: Set[str] = set()
def distance(self, lat: float, lon: float) -> Optional[float]:
"""Calculate distance from Home Assistant.

View File

@ -1,13 +1,6 @@
"""Classes to help gather user submissions."""
import logging
from typing import (
Dict,
Any,
Callable,
Hashable,
List,
Optional,
) # noqa pylint: disable=unused-import
from typing import Dict, Any, Callable, Hashable, List, Optional
import uuid
import voluptuous as vol
from .core import callback, HomeAssistant
@ -52,7 +45,7 @@ class FlowManager:
) -> None:
"""Initialize the flow manager."""
self.hass = hass
self._progress = {} # type: Dict[str, Any]
self._progress: Dict[str, Any] = {}
self._async_create_flow = async_create_flow
self._async_finish_flow = async_finish_flow
@ -136,7 +129,7 @@ class FlowManager:
)
)
result = await getattr(flow, method)(user_input) # type: Dict
result: Dict = await getattr(flow, method)(user_input)
if result["type"] not in (
RESULT_TYPE_FORM,

View File

@ -1,9 +1,9 @@
"""Helper for aiohttp webclient stuff."""
import asyncio
import sys
from ssl import SSLContext # noqa: F401
from ssl import SSLContext
from typing import Any, Awaitable, Optional, cast
from typing import Union # noqa: F401
from typing import Union
import aiohttp
from aiohttp.hdrs import USER_AGENT, CONTENT_TYPE
@ -171,7 +171,7 @@ def _async_get_connector(
return cast(aiohttp.BaseConnector, hass.data[key])
if verify_ssl:
ssl_context = ssl_util.client_context() # type: Union[bool, SSLContext]
ssl_context: Union[bool, SSLContext] = ssl_util.client_context()
else:
ssl_context = False

View File

@ -3,7 +3,7 @@ import logging
import uuid
from asyncio import Event
from collections import OrderedDict
from typing import MutableMapping # noqa: F401
from typing import MutableMapping
from typing import Iterable, Optional, cast
import attr
@ -36,7 +36,7 @@ class AreaRegistry:
def __init__(self, hass: HomeAssistantType) -> None:
"""Initialize the area registry."""
self.hass = hass
self.areas = {} # type: MutableMapping[str, AreaEntry]
self.areas: MutableMapping[str, AreaEntry] = {}
self._store = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY)
@callback
@ -119,7 +119,7 @@ class AreaRegistry:
"""Load the area registry."""
data = await self._store.async_load()
areas = OrderedDict() # type: OrderedDict[str, AreaEntry]
areas: MutableMapping[str, AreaEntry] = OrderedDict()
if data is not None:
for area in data["areas"]:

View File

@ -36,7 +36,7 @@ CheckConfigError = namedtuple("CheckConfigError", "message domain config")
class HomeAssistantConfig(OrderedDict):
"""Configuration result with errors attribute."""
errors = attr.ib(default=attr.Factory(list)) # type: List[CheckConfigError]
errors: List[CheckConfigError] = attr.ib(default=attr.Factory(list))
def add_error(self, message, domain=None, config=None):
"""Add a single error."""

View File

@ -823,7 +823,7 @@ OR_CONDITION_SCHEMA = vol.Schema(
}
)
CONDITION_SCHEMA = vol.Any(
CONDITION_SCHEMA: vol.Schema = vol.Any(
NUMERIC_STATE_CONDITION_SCHEMA,
STATE_CONDITION_SCHEMA,
SUN_CONDITION_SCHEMA,
@ -832,7 +832,7 @@ CONDITION_SCHEMA = vol.Any(
ZONE_CONDITION_SCHEMA,
AND_CONDITION_SCHEMA,
OR_CONDITION_SCHEMA,
) # type: vol.Schema
)
_SCRIPT_DELAY_SCHEMA = vol.Schema(
{

View File

@ -91,7 +91,7 @@ class Entity:
entity_id = None # type: str
# Owning hass instance. Will be set by EntityPlatform
hass = None # type: Optional[HomeAssistant]
hass: Optional[HomeAssistant] = None
# Owning platform instance. Will be set by EntityPlatform
platform = None
@ -109,10 +109,10 @@ class Entity:
parallel_updates = None
# Entry in the entity registry
registry_entry = None # type: Optional[RegistryEntry]
registry_entry: Optional[RegistryEntry] = None
# Hold list for functions to call on remove.
_on_remove = None # type: Optional[List[CALLBACK_TYPE]]
_on_remove: Optional[List[CALLBACK_TYPE]] = None
# Context
_context = None

View File

@ -2,7 +2,7 @@
from collections import OrderedDict
import fnmatch
import re
from typing import Any, Dict, Optional, Pattern # noqa: F401
from typing import Any, Dict, Optional, Pattern
from homeassistant.core import split_entity_id
@ -17,12 +17,12 @@ class EntityValues:
glob: Optional[Dict] = None,
) -> None:
"""Initialize an EntityConfigDict."""
self._cache = {} # type: Dict[str, Dict]
self._cache: Dict[str, Dict] = {}
self._exact = exact
self._domain = domain
if glob is None:
compiled = None # type: Optional[Dict[Pattern[str], Any]]
compiled: Optional[Dict[Pattern[str], Any]] = None
else:
compiled = OrderedDict()
for key, value in glob.items():

View File

@ -55,7 +55,7 @@ async def async_handle(
text_input: Optional[str] = None,
) -> "IntentResponse":
"""Handle an intent."""
handler = hass.data.get(DATA_KEY, {}).get(intent_type) # type: IntentHandler
handler: IntentHandler = hass.data.get(DATA_KEY, {}).get(intent_type)
if handler is None:
raise UnknownIntent(f"Unknown intent {intent_type}")
@ -122,10 +122,10 @@ def async_test_feature(state: State, feature: int, feature_name: str) -> None:
class IntentHandler:
"""Intent handler registration."""
intent_type = None # type: Optional[str]
slot_schema = None # type: Optional[vol.Schema]
intent_type: Optional[str] = None
slot_schema: Optional[vol.Schema] = None
_slot_schema = None
platforms = [] # type: Optional[Iterable[str]]
platforms: Optional[Iterable[str]] = []
@callback
def async_can_handle(self, intent_obj: "Intent") -> bool:
@ -236,8 +236,8 @@ class IntentResponse:
def __init__(self, intent: Optional[Intent] = None) -> None:
"""Initialize an IntentResponse."""
self.intent = intent
self.speech = {} # type: Dict[str, Dict[str, Any]]
self.card = {} # type: Dict[str, Dict[str, str]]
self.speech: Dict[str, Dict[str, Any]] = {}
self.card: Dict[str, Dict[str, str]] = {}
@callback
def async_set_speech(

View File

@ -2,7 +2,7 @@
import asyncio
import logging
from datetime import timedelta, datetime
from typing import Any, Dict, List, Set, Optional # noqa pylint_disable=unused-import
from typing import Any, Dict, List, Set, Optional
from homeassistant.core import (
HomeAssistant,
@ -17,7 +17,7 @@ from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.json import JSONEncoder
from homeassistant.helpers.storage import Store # noqa pylint_disable=unused-import
from homeassistant.helpers.storage import Store
# mypy: allow-untyped-calls, allow-untyped-defs, no-check-untyped-defs
@ -108,12 +108,12 @@ class RestoreStateData:
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the restore state data class."""
self.hass = hass # type: HomeAssistant
self.store = Store(
self.hass: HomeAssistant = hass
self.store: Store = Store(
hass, STORAGE_VERSION, STORAGE_KEY, encoder=JSONEncoder
) # type: Store
self.last_states = {} # type: Dict[str, StoredState]
self.entity_ids = set() # type: Set[str]
)
self.last_states: Dict[str, StoredState] = {}
self.entity_ids: Set[str] = set()
def async_get_stored_states(self) -> List[StoredState]:
"""Get the set of states which should be stored.

View File

@ -102,15 +102,15 @@ class Script:
self.name = name
self._change_listener = change_listener
self._cur = -1
self._exception_step = None # type: Optional[int]
self._exception_step: Optional[int] = None
self.last_action = None
self.last_triggered = None # type: Optional[datetime]
self.last_triggered: Optional[datetime] = None
self.can_cancel = any(
CONF_DELAY in action or CONF_WAIT_TEMPLATE in action
for action in self.sequence
)
self._async_listener = [] # type: List[CALLBACK_TYPE]
self._config_cache = {} # type: Dict[Set[Tuple], Callable[..., bool]]
self._async_listener: List[CALLBACK_TYPE] = []
self._config_cache: Dict[Set[Tuple], Callable[..., bool]] = {}
self._actions = {
ACTION_DELAY: self._async_delay,
ACTION_WAIT_TEMPLATE: self._async_wait_template,

View File

@ -5,16 +5,7 @@ import json
import logging
from collections import defaultdict
from types import ModuleType, TracebackType
from typing import ( # noqa: F401 pylint: disable=unused-import
Awaitable,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
)
from typing import Awaitable, Dict, Iterable, List, Optional, Tuple, Type, Union
from homeassistant.loader import bind_hass, async_get_integration, IntegrationNotFound
import homeassistant.util.dt as dt_util
@ -99,7 +90,7 @@ class AsyncTrackStates:
def __init__(self, hass: HomeAssistantType) -> None:
"""Initialize a TrackStates block."""
self.hass = hass
self.states = [] # type: List[State]
self.states: List[State] = []
# pylint: disable=attribute-defined-outside-init
def __enter__(self) -> List[State]:
@ -147,7 +138,7 @@ async def async_reproduce_state(
if isinstance(states, State):
states = [states]
to_call = defaultdict(list) # type: Dict[str, List[State]]
to_call: Dict[str, List[State]] = defaultdict(list)
for state in states:
to_call[state.domain].append(state)
@ -191,7 +182,7 @@ async def async_reproduce_state_legacy(
context: Optional[Context] = None,
) -> None:
"""Reproduce given state."""
to_call = defaultdict(list) # type: Dict[Tuple[str, str], List[str]]
to_call: Dict[Tuple[str, str], List[str]] = defaultdict(list)
if domain == GROUP_DOMAIN:
service_domain = HASS_DOMAIN
@ -238,7 +229,7 @@ async def async_reproduce_state_legacy(
key = (service, json.dumps(dict(state.attributes), sort_keys=True))
to_call[key].append(state.entity_id)
domain_tasks = [] # type: List[Awaitable[Optional[bool]]]
domain_tasks: List[Awaitable[Optional[bool]]] = []
for (service, service_data), entity_ids in to_call.items():
data = json.loads(service_data)
data[ATTR_ENTITY_ID] = entity_ids

View File

@ -70,11 +70,11 @@ class Store:
self.key = key
self.hass = hass
self._private = private
self._data = None # type: Optional[Dict[str, Any]]
self._data: Optional[Dict[str, Any]] = None
self._unsub_delay_listener = None
self._unsub_stop_listener = None
self._write_lock = asyncio.Lock()
self._load_task = None # type: Optional[asyncio.Future]
self._load_task: Optional[asyncio.Future] = None
self._encoder = encoder
@property

View File

@ -68,14 +68,14 @@ def get_location_astral_event_next(
mod = -1
while True:
try:
next_dt = (
next_dt: datetime.datetime = (
getattr(location, event)(
dt_util.as_local(utc_point_in_time).date()
+ datetime.timedelta(days=mod),
local=False,
)
+ offset
) # type: datetime.datetime
)
if next_dt > utc_point_in_time:
return next_dt
except AstralError:

View File

@ -82,7 +82,7 @@ def build_resources(
) -> Dict[str, Dict[str, Any]]:
"""Build the resources response for the given components."""
# Build response
resources = {} # type: Dict[str, Dict[str, Any]]
resources: Dict[str, Dict[str, Any]] = {}
for component in components:
if "." not in component:
domain = component

View File

@ -127,7 +127,7 @@ async def async_get_config_flows(hass: "HomeAssistant") -> Set[str]:
"""Return cached list of config flows."""
from homeassistant.generated.config_flows import FLOWS
flows = set() # type: Set[str]
flows: Set[str] = set()
flows.update(FLOWS)
integrations = await async_get_custom_components(hass)
@ -201,14 +201,14 @@ class Integration:
self.hass = hass
self.pkg_path = pkg_path
self.file_path = file_path
self.name = manifest["name"] # type: str
self.domain = manifest["domain"] # type: str
self.dependencies = manifest["dependencies"] # type: List[str]
self.after_dependencies = manifest.get(
self.name: str = manifest["name"]
self.domain: str = manifest["domain"]
self.dependencies: List[str] = manifest["dependencies"]
self.after_dependencies: Optional[List[str]] = manifest.get(
"after_dependencies"
) # type: Optional[List[str]]
self.requirements = manifest["requirements"] # type: List[str]
self.config_flow = manifest.get("config_flow", False) # type: bool
)
self.requirements: List[str] = manifest["requirements"]
self.config_flow: bool = manifest.get("config_flow", False)
_LOGGER.info("Loaded %s from %s", self.domain, pkg_path)
@property
@ -246,9 +246,7 @@ async def async_get_integration(hass: "HomeAssistant", domain: str) -> Integrati
raise IntegrationNotFound(domain)
cache = hass.data[DATA_INTEGRATIONS] = {}
int_or_evt = cache.get(
domain, _UNDEF
) # type: Union[Integration, asyncio.Event, None]
int_or_evt: Union[Integration, asyncio.Event, None] = cache.get(domain, _UNDEF)
if isinstance(int_or_evt, asyncio.Event):
await int_or_evt.wait()
@ -428,7 +426,7 @@ class Components:
integration = self._hass.data.get(DATA_INTEGRATIONS, {}).get(comp_name)
if isinstance(integration, Integration):
component = integration.get_component() # type: Optional[ModuleType]
component: Optional[ModuleType] = integration.get_component()
else:
# Fallback to importing old-school
component = _load_file(self._hass, comp_name, LOOKUP_PATHS)

View File

@ -15,7 +15,7 @@ from homeassistant.util import dt as dt_util
# mypy: allow-untyped-calls, allow-untyped-defs, no-check-untyped-defs
# mypy: no-warn-return-any
BENCHMARKS = {} # type: Dict[str, Callable]
BENCHMARKS: Dict[str, Callable] = {}
def run(args):

View File

@ -21,11 +21,11 @@ REQUIREMENTS = ("colorlog==4.0.2",)
_LOGGER = logging.getLogger(__name__)
# pylint: disable=protected-access
MOCKS = {
MOCKS: Dict[str, Tuple[str, Callable]] = {
"load": ("homeassistant.util.yaml.loader.load_yaml", yaml_loader.load_yaml),
"load*": ("homeassistant.config.load_yaml", yaml_loader.load_yaml),
"secrets": ("homeassistant.util.yaml.loader.secret_yaml", yaml_loader.secret_yaml),
} # type: Dict[str, Tuple[str, Callable]]
}
SILENCE = ("homeassistant.scripts.check_config.yaml_loader.clear_secret_cache",)
PATCHES: Dict[str, Any] = {}
@ -82,7 +82,7 @@ def run(script_args: List) -> int:
res = check(config_dir, args.secrets)
domain_info = [] # type: List[str]
domain_info: List[str] = []
if args.info:
domain_info = args.info.split(",")
@ -122,7 +122,7 @@ def run(script_args: List) -> int:
dump_dict(res["components"].get(domain, None))
if args.secrets:
flatsecret = {} # type: Dict[str, str]
flatsecret: Dict[str, str] = {}
for sfn, sdict in res["secret_cache"].items():
sss = []
@ -153,13 +153,13 @@ def run(script_args: List) -> int:
def check(config_dir, secrets=False):
"""Perform a check by mocking hass load functions."""
logging.getLogger("homeassistant.loader").setLevel(logging.CRITICAL)
res = {
res: Dict[str, Any] = {
"yaml_files": OrderedDict(), # yaml_files loaded
"secrets": OrderedDict(), # secret cache and secrets loaded
"except": OrderedDict(), # exceptions raised (with config)
#'components' is a HomeAssistantConfig # noqa: E265
"secret_cache": None,
} # type: Dict[str, Any]
}
# pylint: disable=possibly-unused-variable
def mock_load(filename):

View File

@ -22,7 +22,7 @@ class MockRequest:
self.method = method
self.url = url
self.status = status
self.headers = CIMultiDict(headers or {}) # type: CIMultiDict[str]
self.headers: CIMultiDict[str] = CIMultiDict(headers or {})
self.query_string = query_string or ""
self._content = content

View File

@ -103,11 +103,11 @@ def _chain_future(
raise TypeError("A future is required for destination argument")
# pylint: disable=protected-access
if isinstance(source, Future):
source_loop = source._loop # type: Optional[AbstractEventLoop]
source_loop: Optional[AbstractEventLoop] = source._loop
else:
source_loop = None
if isinstance(destination, Future):
dest_loop = destination._loop # type: Optional[AbstractEventLoop]
dest_loop: Optional[AbstractEventLoop] = destination._loop
else:
dest_loop = None
@ -152,7 +152,7 @@ def run_coroutine_threadsafe(
if not coroutines.iscoroutine(coro):
raise TypeError("A coroutine object is required")
future = concurrent.futures.Future() # type: concurrent.futures.Future
future: concurrent.futures.Future = concurrent.futures.Future()
def callback() -> None:
"""Handle the call to the coroutine."""
@ -200,7 +200,7 @@ def run_callback_threadsafe(
if ident is not None and ident == threading.get_ident():
raise RuntimeError("Cannot be called from within the event loop")
future = concurrent.futures.Future() # type: concurrent.futures.Future
future: concurrent.futures.Future = concurrent.futures.Future()
def run_callback() -> None:
"""Run callback and store result."""

View File

@ -1,25 +1,17 @@
"""Helper methods to handle the time in Home Assistant."""
import datetime as dt
import re
from typing import (
Any,
Union,
Optional, # noqa pylint: disable=unused-import
Tuple,
List,
cast,
Dict,
)
from typing import Any, Union, Optional, Tuple, List, cast, Dict
import pytz
import pytz.exceptions as pytzexceptions
import pytz.tzinfo as pytzinfo # noqa pylint: disable=unused-import
import pytz.tzinfo as pytzinfo
from homeassistant.const import MATCH_ALL
DATE_STR_FORMAT = "%Y-%m-%d"
UTC = pytz.utc
DEFAULT_TIME_ZONE = pytz.utc # type: dt.tzinfo
DEFAULT_TIME_ZONE: dt.tzinfo = pytz.utc
# Copyright (c) Django Software Foundation and individual contributors.
@ -83,7 +75,7 @@ def as_utc(dattim: dt.datetime) -> dt.datetime:
def as_timestamp(dt_value: dt.datetime) -> float:
"""Convert a date/time into a unix time (seconds since 1970)."""
if hasattr(dt_value, "timestamp"):
parsed_dt = dt_value # type: Optional[dt.datetime]
parsed_dt: Optional[dt.datetime] = dt_value
else:
parsed_dt = parse_datetime(str(dt_value))
if parsed_dt is None:
@ -111,7 +103,7 @@ def start_of_local_day(
) -> dt.datetime:
"""Return local datetime object of start of day from date or datetime."""
if dt_or_d is None:
date = now().date() # type: dt.date
date: dt.date = now().date()
elif isinstance(dt_or_d, dt.datetime):
date = dt_or_d.date()
return DEFAULT_TIME_ZONE.localize( # type: ignore
@ -133,12 +125,12 @@ def parse_datetime(dt_str: str) -> Optional[dt.datetime]:
match = DATETIME_RE.match(dt_str)
if not match:
return None
kws = match.groupdict() # type: Dict[str, Any]
kws: Dict[str, Any] = match.groupdict()
if kws["microsecond"]:
kws["microsecond"] = kws["microsecond"].ljust(6, "0")
tzinfo_str = kws.pop("tzinfo")
tzinfo = None # type: Optional[dt.tzinfo]
tzinfo: Optional[dt.tzinfo] = None
if tzinfo_str == "Z":
tzinfo = UTC
elif tzinfo_str is not None:
@ -324,7 +316,7 @@ def find_next_time_expression_time(
# Now we need to handle timezones. We will make this datetime object
# "naive" first and then re-convert it to the target timezone.
# This is so that we can call pytz's localize and handle DST changes.
tzinfo = result.tzinfo # type: pytzinfo.DstTzInfo
tzinfo: pytzinfo.DstTzInfo = result.tzinfo
result = result.replace(tzinfo=None)
try:

View File

@ -34,7 +34,7 @@ class AsyncHandler:
"""Initialize async logging handler wrapper."""
self.handler = handler
self.loop = loop
self._queue = asyncio.Queue(loop=loop) # type: asyncio.Queue
self._queue: asyncio.Queue = asyncio.Queue(loop=loop)
self._thread = threading.Thread(target=self._process)
# Delegate from handler

View File

@ -22,7 +22,7 @@ JSON_TYPE = Union[List, Dict, str] # pylint: disable=invalid-name
class ExtSafeConstructor(SafeConstructor):
"""Extended SafeConstructor."""
name = None # type: Optional[str]
name: Optional[str] = None
class UnsupportedYamlError(HomeAssistantError):
@ -67,7 +67,7 @@ def object_to_yaml(data: JSON_TYPE) -> str:
stream = StringIO()
try:
yaml.dump(data, stream)
result = stream.getvalue() # type: str
result: str = stream.getvalue()
return result
except YAMLError as exc:
_LOGGER.error("YAML error: %s", exc)
@ -78,7 +78,7 @@ def yaml_to_object(data: str) -> JSON_TYPE:
"""Create object from yaml string."""
yaml = YAML(typ="rt")
try:
result = yaml.load(data) # type: Union[List, Dict, str]
result: Union[List, Dict, str] = yaml.load(data)
return result
except YAMLError as exc:
_LOGGER.error("YAML error: %s", exc)

View File

@ -75,7 +75,7 @@ class UnitSystem:
pressure: str,
) -> None:
"""Initialize the unit system object."""
errors = ", ".join(
errors: str = ", ".join(
UNIT_NOT_RECOGNIZED_TEMPLATE.format(unit, unit_type)
for unit, unit_type in [
(temperature, TEMPERATURE),
@ -85,7 +85,7 @@ class UnitSystem:
(pressure, PRESSURE),
]
if not is_valid_unit(unit, unit_type)
) # type: str
)
if errors:
raise ValueError(errors)

View File

@ -29,7 +29,7 @@ def represent_odict( # type: ignore
dump, tag, mapping, flow_style=None
) -> yaml.MappingNode:
"""Like BaseRepresenter.represent_mapping but does not issue the sort()."""
value = [] # type: list
value: list = []
node = yaml.MappingNode(tag, value, flow_style=flow_style)
if dump.alias_key is not None:
dump.represented_objects[dump.alias_key] = node

View File

@ -26,12 +26,12 @@ from .objects import NodeListClass, NodeStrClass
# mypy: allow-untyped-calls, no-warn-return-any
_LOGGER = logging.getLogger(__name__)
__SECRET_CACHE = {} # type: Dict[str, JSON_TYPE]
JSON_TYPE = Union[List, Dict, str] # pylint: disable=invalid-name
DICT_T = TypeVar("DICT_T", bound=Dict) # pylint: disable=invalid-name
_LOGGER = logging.getLogger(__name__)
__SECRET_CACHE: Dict[str, JSON_TYPE] = {}
def clear_secret_cache() -> None:
"""Clear the secret cache.
@ -47,10 +47,8 @@ class SafeLineLoader(yaml.SafeLoader):
def compose_node(self, parent: yaml.nodes.Node, index: int) -> yaml.nodes.Node:
"""Annotate a node with the first line it was seen."""
last_line = self.line # type: int
node = super(SafeLineLoader, self).compose_node(
parent, index
) # type: yaml.nodes.Node
last_line: int = self.line
node: yaml.nodes.Node = super(SafeLineLoader, self).compose_node(parent, index)
node.__line__ = last_line + 1 # type: ignore
return node
@ -141,7 +139,7 @@ def _include_dir_named_yaml(
loader: SafeLineLoader, node: yaml.nodes.Node
) -> OrderedDict:
"""Load multiple files from directory as a dictionary."""
mapping = OrderedDict() # type: OrderedDict
mapping: OrderedDict = OrderedDict()
loc = os.path.join(os.path.dirname(loader.name), node.value)
for fname in _find_files(loc, "*.yaml"):
filename = os.path.splitext(os.path.basename(fname))[0]
@ -155,7 +153,7 @@ def _include_dir_merge_named_yaml(
loader: SafeLineLoader, node: yaml.nodes.Node
) -> OrderedDict:
"""Load multiple files from directory as a merged dictionary."""
mapping = OrderedDict() # type: OrderedDict
mapping: OrderedDict = OrderedDict()
loc = os.path.join(os.path.dirname(loader.name), node.value)
for fname in _find_files(loc, "*.yaml"):
if os.path.basename(fname) == SECRET_YAML:
@ -182,8 +180,8 @@ def _include_dir_merge_list_yaml(
loader: SafeLineLoader, node: yaml.nodes.Node
) -> JSON_TYPE:
"""Load multiple files from directory as a merged list."""
loc = os.path.join(os.path.dirname(loader.name), node.value) # type: str
merged_list = [] # type: List[JSON_TYPE]
loc: str = os.path.join(os.path.dirname(loader.name), node.value)
merged_list: List[JSON_TYPE] = []
for fname in _find_files(loc, "*.yaml"):
if os.path.basename(fname) == SECRET_YAML:
continue
@ -198,7 +196,7 @@ def _ordered_dict(loader: SafeLineLoader, node: yaml.nodes.MappingNode) -> Order
loader.flatten_mapping(node)
nodes = loader.construct_pairs(node)
seen = {} # type: Dict
seen: Dict = {}
for (key, _), (child_node, _) in zip(nodes, node.value):
line = child_node.start_mark.line