Use assignment expressions 09 (#57790)

pull/45618/head^2
Marc Mueller 2021-10-17 20:19:56 +02:00 committed by GitHub
parent aa7dc78a1e
commit 2b72b7b7b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 37 additions and 91 deletions

View File

@ -221,8 +221,7 @@ class Alert(ToggleEntity):
async def watched_entity_change(self, ev):
"""Determine if the alert should start or stop."""
to_state = ev.data.get("new_state")
if to_state is None:
if (to_state := ev.data.get("new_state")) is None:
return
_LOGGER.debug("Watched entity (%s) has changed", ev.data.get("entity_id"))
if to_state.state == self._alert_state and not self._firing:

View File

@ -30,9 +30,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce a single state."""
cur_state = hass.states.get(state.entity_id)
if cur_state is None:
if (cur_state := hass.states.get(state.entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id)
return

View File

@ -1096,8 +1096,7 @@ class AlexaThermostatController(AlexaCapability):
supported_modes = []
hvac_modes = self.entity.attributes.get(climate.ATTR_HVAC_MODES)
for mode in hvac_modes:
thermostat_mode = API_THERMOSTAT_MODES.get(mode)
if thermostat_mode:
if thermostat_mode := API_THERMOSTAT_MODES.get(mode):
supported_modes.append(thermostat_mode)
preset_modes = self.entity.attributes.get(climate.ATTR_PRESET_MODES)

View File

@ -93,8 +93,7 @@ class AlexaFlashBriefingView(http.HomeAssistantView):
else:
output[ATTR_MAIN_TEXT] = item.get(CONF_TEXT)
uid = item.get(CONF_UID)
if uid is None:
if (uid := item.get(CONF_UID)) is None:
uid = str(uuid.uuid4())
output[ATTR_UID] = uid

View File

@ -1150,8 +1150,7 @@ async def async_api_adjust_range(hass, config, directive, context):
if instance == f"{cover.DOMAIN}.{cover.ATTR_POSITION}":
range_delta = int(range_delta * 20) if range_delta_default else int(range_delta)
service = SERVICE_SET_COVER_POSITION
current = entity.attributes.get(cover.ATTR_POSITION)
if not current:
if not (current := entity.attributes.get(cover.ATTR_POSITION)):
msg = f"Unable to determine {entity.entity_id} current position"
raise AlexaInvalidValueError(msg)
position = response_value = min(100, max(0, range_delta + current))
@ -1187,8 +1186,7 @@ async def async_api_adjust_range(hass, config, directive, context):
else int(range_delta)
)
service = fan.SERVICE_SET_PERCENTAGE
current = entity.attributes.get(fan.ATTR_PERCENTAGE)
if not current:
if not (current := entity.attributes.get(fan.ATTR_PERCENTAGE)):
msg = f"Unable to determine {entity.entity_id} current fan speed"
raise AlexaInvalidValueError(msg)
percentage = response_value = min(100, max(0, range_delta + current))

View File

@ -120,9 +120,7 @@ async def async_handle_message(hass, message):
req = message.get("request")
req_type = req["type"]
handler = HANDLERS.get(req_type)
if not handler:
if not (handler := HANDLERS.get(req_type)):
raise UnknownRequest(f"Received unknown request {req_type}")
return await handler(hass, message)

View File

@ -12,9 +12,8 @@ def async_describe_events(hass, async_describe_event):
def async_describe_logbook_event(event):
"""Describe a logbook event."""
data = event.data
entity_id = data["request"].get("entity_id")
if entity_id:
if entity_id := data["request"].get("entity_id"):
state = hass.states.get(entity_id)
name = state.name if state else entity_id
message = f"sent command {data['request']['namespace']}/{data['request']['name']} for {name}"

View File

@ -97,8 +97,7 @@ class APIEventStream(HomeAssistantView):
stop_obj = object()
to_write = asyncio.Queue()
restrict = request.query.get("restrict")
if restrict:
if restrict := request.query.get("restrict"):
restrict = restrict.split(",") + [EVENT_HOMEASSISTANT_STOP]
async def forward_events(event):
@ -225,8 +224,7 @@ class APIEntityStateView(HomeAssistantView):
if not user.permissions.check_entity(entity_id, POLICY_READ):
raise Unauthorized(entity_id=entity_id)
state = request.app["hass"].states.get(entity_id)
if state:
if state := request.app["hass"].states.get(entity_id):
return self.json(state)
return self.json_message("Entity not found.", HTTPStatus.NOT_FOUND)
@ -240,9 +238,7 @@ class APIEntityStateView(HomeAssistantView):
except ValueError:
return self.json_message("Invalid JSON specified.", HTTPStatus.BAD_REQUEST)
new_state = data.get("state")
if new_state is None:
if (new_state := data.get("state")) is None:
return self.json_message("No state specified.", HTTPStatus.BAD_REQUEST)
attributes = data.get("attributes")

View File

@ -270,9 +270,7 @@ class TokenView(HomeAssistantView):
# 2.2 The authorization server responds with HTTP status code 200
# if the token has been revoked successfully or if the client
# submitted an invalid token.
token = data.get("token")
if token is None:
if (token := data.get("token")) is None:
return web.Response(status=HTTPStatus.OK)
refresh_token = await hass.auth.async_get_refresh_token_by_token(token)
@ -292,9 +290,7 @@ class TokenView(HomeAssistantView):
status_code=HTTPStatus.BAD_REQUEST,
)
code = data.get("code")
if code is None:
if (code := data.get("code")) is None:
return self.json(
{"error": "invalid_request", "error_description": "Invalid code"},
status_code=HTTPStatus.BAD_REQUEST,
@ -349,9 +345,7 @@ class TokenView(HomeAssistantView):
status_code=HTTPStatus.BAD_REQUEST,
)
token = data.get("refresh_token")
if token is None:
if (token := data.get("refresh_token")) is None:
return self.json(
{"error": "invalid_request"}, status_code=HTTPStatus.BAD_REQUEST
)

View File

@ -130,8 +130,7 @@ def _prepare_result_json(result):
data = result.copy()
schema = data["data_schema"]
if schema is None:
if (schema := data["data_schema"]) is None:
data["data_schema"] = []
else:
data["data_schema"] = voluptuous_serialize.convert(schema)

View File

@ -70,8 +70,7 @@ def websocket_setup_mfa(
"""Return a setup flow for mfa auth module."""
flow_manager = hass.data[DATA_SETUP_FLOW_MGR]
flow_id = msg.get("flow_id")
if flow_id is not None:
if (flow_id := msg.get("flow_id")) is not None:
result = await flow_manager.async_configure(flow_id, msg.get("user_input"))
connection.send_message(
websocket_api.result_message(msg["id"], _prepare_result_json(result))
@ -139,8 +138,7 @@ def _prepare_result_json(result):
data = result.copy()
schema = data["data_schema"]
if schema is None:
if (schema := data["data_schema"]) is None:
data["data_schema"] = []
else:
data["data_schema"] = voluptuous_serialize.convert(schema)

View File

@ -460,8 +460,7 @@ class AutomationEntity(ToggleEntity, RestoreEntity):
self._trace_config,
) as automation_trace:
this = None
state = self.hass.states.get(self.entity_id)
if state:
if state := self.hass.states.get(self.entity_id):
this = state.as_dict()
variables = {"this": this, **(run_variables or {})}
if self._variables:
@ -589,8 +588,7 @@ class AutomationEntity(ToggleEntity, RestoreEntity):
this = None
self.async_write_ha_state()
state = self.hass.states.get(self.entity_id)
if state:
if state := self.hass.states.get(self.entity_id):
this = state.as_dict()
variables = {"this": this}
if self._trigger_variables:

View File

@ -30,9 +30,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce a single state."""
cur_state = hass.states.get(state.entity_id)
if cur_state is None:
if (cur_state := hass.states.get(state.entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id)
return

View File

@ -272,9 +272,7 @@ async def async_get_still_stream(
def _get_camera_from_entity_id(hass: HomeAssistant, entity_id: str) -> Camera:
"""Get camera component from entity_id."""
component = hass.data.get(DOMAIN)
if component is None:
if (component := hass.data.get(DOMAIN)) is None:
raise HomeAssistantError("Camera integration not set up")
camera = component.get_entity(entity_id)
@ -653,8 +651,7 @@ class CameraMjpegStream(CameraView):
async def handle(self, request: web.Request, camera: Camera) -> web.StreamResponse:
"""Serve camera stream, possibly with interval."""
interval_str = request.query.get("interval")
if interval_str is None:
if (interval_str := request.query.get("interval")) is None:
stream = await camera.handle_async_mjpeg_stream(request)
if stream is None:
raise web.HTTPBadGateway()

View File

@ -118,9 +118,7 @@ async def async_attach_trigger(
automation_info: AutomationTriggerInfo,
) -> CALLBACK_TYPE:
"""Attach a trigger."""
trigger_type = config[CONF_TYPE]
if trigger_type == "hvac_mode_changed":
if (trigger_type := config[CONF_TYPE]) == "hvac_mode_changed":
state_config = {
state_trigger.CONF_PLATFORM: "state",
state_trigger.CONF_ENTITY_ID: config[CONF_ENTITY_ID],

View File

@ -41,9 +41,7 @@ async def async_provide_implementation(hass: HomeAssistant, domain: str):
async def _get_services(hass):
"""Get the available services."""
services = hass.data.get(DATA_SERVICES)
if services is not None:
if (services := hass.data.get(DATA_SERVICES)) is not None:
return services
try:

View File

@ -216,9 +216,7 @@ class CloudPreferences:
@property
def remote_enabled(self):
"""Return if remote is enabled on start."""
enabled = self._prefs.get(PREF_ENABLE_REMOTE, False)
if not enabled:
if not self._prefs.get(PREF_ENABLE_REMOTE, False):
return False
if self._has_local_trusted_network or self._has_local_trusted_proxies:
@ -307,9 +305,7 @@ class CloudPreferences:
async def _load_cloud_user(self) -> User | None:
"""Load cloud user if available."""
user_id = self._prefs.get(PREF_CLOUD_USER)
if user_id is None:
if (user_id := self._prefs.get(PREF_CLOUD_USER)) is None:
return None
# Fetch the user. It can happen that the user no longer exists if

View File

@ -15,14 +15,10 @@ SUPPORT_LANGUAGES = list({key[0] for key in MAP_VOICE})
def validate_lang(value):
"""Validate chosen gender or language."""
lang = value.get(CONF_LANG)
if lang is None:
if (lang := value.get(CONF_LANG)) is None:
return value
gender = value.get(CONF_GENDER)
if gender is None:
if (gender := value.get(CONF_GENDER)) is None:
gender = value[CONF_GENDER] = next(
(chk_gender for chk_lang, chk_gender in MAP_VOICE if chk_lang == lang), None
)

View File

@ -50,9 +50,8 @@ async def websocket_get_entity(hass, connection, msg):
Async friendly.
"""
registry = await async_get_registry(hass)
entry = registry.entities.get(msg["entity_id"])
if entry is None:
if (entry := registry.entities.get(msg["entity_id"])) is None:
connection.send_message(
websocket_api.error_message(msg["id"], ERR_NOT_FOUND, "Entity not found")
)

View File

@ -80,8 +80,7 @@ class ZWaveConfigWriteView(HomeAssistantView):
def post(self, request):
"""Save cache configuration to zwcfg_xxxxx.xml."""
hass = request.app["hass"]
network = hass.data.get(const.DATA_NETWORK)
if network is None:
if (network := hass.data.get(const.DATA_NETWORK)) is None:
return self.json_message(
"No Z-Wave network data found", HTTPStatus.NOT_FOUND
)
@ -131,8 +130,7 @@ class ZWaveNodeGroupView(HomeAssistantView):
nodeid = int(node_id)
hass = request.app["hass"]
network = hass.data.get(const.DATA_NETWORK)
node = network.nodes.get(nodeid)
if node is None:
if (node := network.nodes.get(nodeid)) is None:
return self.json_message("Node not found", HTTPStatus.NOT_FOUND)
groupdata = node.groups
groups = {}
@ -158,8 +156,7 @@ class ZWaveNodeConfigView(HomeAssistantView):
nodeid = int(node_id)
hass = request.app["hass"]
network = hass.data.get(const.DATA_NETWORK)
node = network.nodes.get(nodeid)
if node is None:
if (node := network.nodes.get(nodeid)) is None:
return self.json_message("Node not found", HTTPStatus.NOT_FOUND)
config = {}
for value in node.get_values(
@ -189,8 +186,7 @@ class ZWaveUserCodeView(HomeAssistantView):
nodeid = int(node_id)
hass = request.app["hass"]
network = hass.data.get(const.DATA_NETWORK)
node = network.nodes.get(nodeid)
if node is None:
if (node := network.nodes.get(nodeid)) is None:
return self.json_message("Node not found", HTTPStatus.NOT_FOUND)
usercodes = {}
if not node.has_command_class(const.COMMAND_CLASS_USER_CODE):
@ -220,8 +216,7 @@ class ZWaveProtectionView(HomeAssistantView):
def _fetch_protection():
"""Get protection data."""
node = network.nodes.get(nodeid)
if node is None:
if (node := network.nodes.get(nodeid)) is None:
return self.json_message("Node not found", HTTPStatus.NOT_FOUND)
protection_options = {}
if not node.has_command_class(const.COMMAND_CLASS_PROTECTION):

View File

@ -65,9 +65,7 @@ def async_request_config(
if description_image is not None:
description += f"\n\n![Description image]({description_image})"
instance = hass.data.get(_KEY_INSTANCE)
if instance is None:
if (instance := hass.data.get(_KEY_INSTANCE)) is None:
instance = hass.data[_KEY_INSTANCE] = Configurator(hass)
request_id = instance.async_request_config(

View File

@ -30,9 +30,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce a single state."""
cur_state = hass.states.get(state.entity_id)
if cur_state is None:
if (cur_state := hass.states.get(state.entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id)
return

View File

@ -42,9 +42,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce a single state."""
cur_state = hass.states.get(state.entity_id)
if cur_state is None:
if (cur_state := hass.states.get(state.entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id)
return