Sonos lock subscription actions (#66204)
* Move exception logging to helper * Wrap subscription in lock * Rewrite subscribe method to use new logging helper * Mark entitites as unavailable sooner to avoid unnecessary polls * Rename unclear method and update docstring * Move lock to unsubscribepull/66304/head
parent
7d70b0a16b
commit
9fd8428254
|
@ -175,7 +175,7 @@ class SonosSpeaker:
|
|||
# Subscriptions and events
|
||||
self.subscriptions_failed: bool = False
|
||||
self._subscriptions: list[SubscriptionBase] = []
|
||||
self._resubscription_lock: asyncio.Lock | None = None
|
||||
self._subscription_lock: asyncio.Lock | None = None
|
||||
self._event_dispatchers: dict[str, Callable] = {}
|
||||
self._last_activity: float = NEVER_TIME
|
||||
self._last_event_cache: dict[str, Any] = {}
|
||||
|
@ -343,8 +343,41 @@ class SonosSpeaker:
|
|||
#
|
||||
# Subscription handling and event dispatchers
|
||||
#
|
||||
async def async_subscribe(self) -> bool:
|
||||
"""Initiate event subscriptions."""
|
||||
def log_subscription_result(
|
||||
self, result: Any, event: str, level: str = logging.DEBUG
|
||||
) -> None:
|
||||
"""Log a message if a subscription action (create/renew/stop) results in an exception."""
|
||||
if not isinstance(result, Exception):
|
||||
return
|
||||
|
||||
if isinstance(result, asyncio.exceptions.TimeoutError):
|
||||
message = "Request timed out"
|
||||
exc_info = None
|
||||
else:
|
||||
message = result
|
||||
exc_info = result if not str(result) else None
|
||||
|
||||
_LOGGER.log(
|
||||
level,
|
||||
"%s failed for %s: %s",
|
||||
event,
|
||||
self.zone_name,
|
||||
message,
|
||||
exc_info=exc_info,
|
||||
)
|
||||
|
||||
async def async_subscribe(self) -> None:
|
||||
"""Initiate event subscriptions under an async lock."""
|
||||
if not self._subscription_lock:
|
||||
self._subscription_lock = asyncio.Lock()
|
||||
|
||||
async with self._subscription_lock:
|
||||
if self._subscriptions:
|
||||
return
|
||||
await self._async_subscribe()
|
||||
|
||||
async def _async_subscribe(self) -> None:
|
||||
"""Create event subscriptions."""
|
||||
_LOGGER.debug("Creating subscriptions for %s", self.zone_name)
|
||||
|
||||
# Create a polling task in case subscriptions fail or callback events do not arrive
|
||||
|
@ -359,24 +392,15 @@ class SonosSpeaker:
|
|||
SCAN_INTERVAL,
|
||||
)
|
||||
|
||||
try:
|
||||
await self.hass.async_add_executor_job(self.set_basic_info)
|
||||
|
||||
if self._subscriptions:
|
||||
raise RuntimeError(
|
||||
f"Attempted to attach subscriptions to player: {self.soco} "
|
||||
f"when existing subscriptions exist: {self._subscriptions}"
|
||||
)
|
||||
|
||||
subscriptions = [
|
||||
self._subscribe(getattr(self.soco, service), self.async_dispatch_event)
|
||||
for service in SUBSCRIPTION_SERVICES
|
||||
]
|
||||
await asyncio.gather(*subscriptions)
|
||||
except SoCoException as ex:
|
||||
_LOGGER.warning("Could not connect %s: %s", self.zone_name, ex)
|
||||
return False
|
||||
return True
|
||||
subscriptions = [
|
||||
self._subscribe(getattr(self.soco, service), self.async_dispatch_event)
|
||||
for service in SUBSCRIPTION_SERVICES
|
||||
]
|
||||
results = await asyncio.gather(*subscriptions, return_exceptions=True)
|
||||
for result in results:
|
||||
self.log_subscription_result(
|
||||
result, "Creating subscription", logging.WARNING
|
||||
)
|
||||
|
||||
async def _subscribe(
|
||||
self, target: SubscriptionBase, sub_callback: Callable
|
||||
|
@ -399,49 +423,24 @@ class SonosSpeaker:
|
|||
return_exceptions=True,
|
||||
)
|
||||
for result in results:
|
||||
if isinstance(result, asyncio.exceptions.TimeoutError):
|
||||
message = "Request timed out"
|
||||
exc_info = None
|
||||
elif isinstance(result, Exception):
|
||||
message = result
|
||||
exc_info = result if not str(result) else None
|
||||
else:
|
||||
continue
|
||||
_LOGGER.debug(
|
||||
"Unsubscribe failed for %s: %s",
|
||||
self.zone_name,
|
||||
message,
|
||||
exc_info=exc_info,
|
||||
)
|
||||
self.log_subscription_result(result, "Unsubscribe")
|
||||
self._subscriptions = []
|
||||
|
||||
@callback
|
||||
def async_renew_failed(self, exception: Exception) -> None:
|
||||
"""Handle a failed subscription renewal."""
|
||||
self.hass.async_create_task(self.async_resubscribe(exception))
|
||||
self.hass.async_create_task(self._async_renew_failed(exception))
|
||||
|
||||
async def async_resubscribe(self, exception: Exception) -> None:
|
||||
"""Attempt to resubscribe when a renewal failure is detected."""
|
||||
if not self._resubscription_lock:
|
||||
self._resubscription_lock = asyncio.Lock()
|
||||
async def _async_renew_failed(self, exception: Exception) -> None:
|
||||
"""Mark the speaker as offline after a subscription renewal failure.
|
||||
|
||||
async with self._resubscription_lock:
|
||||
if not self.available:
|
||||
return
|
||||
This is to reset the state to allow a future clean subscription attempt.
|
||||
"""
|
||||
if not self.available:
|
||||
return
|
||||
|
||||
if isinstance(exception, asyncio.exceptions.TimeoutError):
|
||||
message = "Request timed out"
|
||||
exc_info = None
|
||||
else:
|
||||
message = exception
|
||||
exc_info = exception if not str(exception) else None
|
||||
_LOGGER.warning(
|
||||
"Subscription renewals for %s failed, marking unavailable: %s",
|
||||
self.zone_name,
|
||||
message,
|
||||
exc_info=exc_info,
|
||||
)
|
||||
await self.async_offline()
|
||||
self.log_subscription_result(exception, "Subscription renewal", logging.WARNING)
|
||||
await self.async_offline()
|
||||
|
||||
@callback
|
||||
def async_dispatch_event(self, event: SonosEvent) -> None:
|
||||
|
@ -576,17 +575,22 @@ class SonosSpeaker:
|
|||
|
||||
async def async_offline(self) -> None:
|
||||
"""Handle removal of speaker when unavailable."""
|
||||
if not self.available:
|
||||
return
|
||||
|
||||
self.available = False
|
||||
self.async_write_entity_states()
|
||||
|
||||
self._share_link_plugin = None
|
||||
|
||||
if self._poll_timer:
|
||||
self._poll_timer()
|
||||
self._poll_timer = None
|
||||
|
||||
await self.async_unsubscribe()
|
||||
async with self._subscription_lock:
|
||||
await self.async_unsubscribe()
|
||||
|
||||
self.hass.data[DATA_SONOS].discovery_known.discard(self.soco.uid)
|
||||
self.async_write_entity_states()
|
||||
|
||||
async def async_vanished(self, reason: str) -> None:
|
||||
"""Handle removal of speaker when marked as vanished."""
|
||||
|
|
Loading…
Reference in New Issue