Avoid reschedule churn in Storage.async_delay_save (#111091)
* Avoid circular import in Storage.async_delay_save We call Storage.async_delay_save for every entity being added or removed from the registry. The late import took more time than everything else in the function. * Avoid reschedule churn in Storage.async_delay_save When we are adding or removing entities we will call async_delay_save quite often which has to add and remove a TimerHandle on the event loop which can add up when there are a lot of registry items changing. If the timer handle still has 80% of the time remaining on it we will avoid resceduling and let it fire at the time the original async_delay_save call was made. This ensures we do not force the event loop to rebuild its heapq because too many timer handlers were cancelled at once * div0 * add coverage for 0 since we had none * fix bad conflict * tweaks * tweaks * tweaks * tweaks * tweaks * tweaks * more test fixes * mqtt tests rely on event loop overheadpull/111258/head
parent
ff0e0b3e77
commit
5b8591ec7e
|
@ -42,6 +42,7 @@ _LOGGER = logging.getLogger(__name__)
|
|||
|
||||
STORAGE_SEMAPHORE = "storage_semaphore"
|
||||
|
||||
|
||||
_T = TypeVar("_T", bound=Mapping[str, Any] | Sequence[Any])
|
||||
|
||||
|
||||
|
@ -108,13 +109,14 @@ class Store(Generic[_T]):
|
|||
self.hass = hass
|
||||
self._private = private
|
||||
self._data: dict[str, Any] | None = None
|
||||
self._unsub_delay_listener: asyncio.TimerHandle | None = None
|
||||
self._delay_handle: asyncio.TimerHandle | None = None
|
||||
self._unsub_final_write_listener: CALLBACK_TYPE | None = None
|
||||
self._write_lock = asyncio.Lock()
|
||||
self._load_task: asyncio.Future[_T | None] | None = None
|
||||
self._encoder = encoder
|
||||
self._atomic_writes = atomic_writes
|
||||
self._read_only = read_only
|
||||
self._next_write_time = 0.0
|
||||
|
||||
@cached_property
|
||||
def path(self):
|
||||
|
@ -286,6 +288,11 @@ class Store(Generic[_T]):
|
|||
"data_func": data_func,
|
||||
}
|
||||
|
||||
next_when = self.hass.loop.time() + delay
|
||||
if self._delay_handle and self._delay_handle.when() < next_when:
|
||||
self._next_write_time = next_when
|
||||
return
|
||||
|
||||
self._async_cleanup_delay_listener()
|
||||
self._async_ensure_final_write_listener()
|
||||
|
||||
|
@ -293,13 +300,24 @@ class Store(Generic[_T]):
|
|||
return
|
||||
|
||||
# We use call_later directly here to avoid a circular import
|
||||
self._unsub_delay_listener = self.hass.loop.call_later(
|
||||
delay, self._async_schedule_callback_delayed_write
|
||||
self._async_reschedule_delayed_write(next_when)
|
||||
|
||||
@callback
|
||||
def _async_reschedule_delayed_write(self, when: float) -> None:
|
||||
"""Reschedule a delayed write."""
|
||||
self._delay_handle = self.hass.loop.call_at(
|
||||
when, self._async_schedule_callback_delayed_write
|
||||
)
|
||||
|
||||
@callback
|
||||
def _async_schedule_callback_delayed_write(self) -> None:
|
||||
"""Schedule the delayed write in a task."""
|
||||
if self.hass.loop.time() < self._next_write_time:
|
||||
# Timer fired too early because there were multiple
|
||||
# calls to async_delay_save before the first one
|
||||
# wrote. Reschedule the timer to the next write time.
|
||||
self._async_reschedule_delayed_write(self._next_write_time)
|
||||
return
|
||||
self.hass.async_create_task(self._async_callback_delayed_write())
|
||||
|
||||
@callback
|
||||
|
@ -320,9 +338,9 @@ class Store(Generic[_T]):
|
|||
@callback
|
||||
def _async_cleanup_delay_listener(self) -> None:
|
||||
"""Clean up a delay listener."""
|
||||
if self._unsub_delay_listener is not None:
|
||||
self._unsub_delay_listener.cancel()
|
||||
self._unsub_delay_listener = None
|
||||
if self._delay_handle is not None:
|
||||
self._delay_handle.cancel()
|
||||
self._delay_handle = None
|
||||
|
||||
async def _async_callback_delayed_write(self) -> None:
|
||||
"""Handle a delayed write callback."""
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
"""The tests for Home Assistant frontend."""
|
||||
from datetime import timedelta
|
||||
from http import HTTPStatus
|
||||
import re
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.frontend import (
|
||||
|
@ -20,7 +20,6 @@ from homeassistant.components.websocket_api.const import TYPE_RESULT
|
|||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.loader import async_get_integration
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from tests.common import MockUser, async_capture_events, async_fire_time_changed
|
||||
from tests.typing import MockHAClientWebSocket, WebSocketGenerator
|
||||
|
@ -220,7 +219,10 @@ async def test_themes_persist(
|
|||
|
||||
|
||||
async def test_themes_save_storage(
|
||||
hass: HomeAssistant, hass_storage: dict[str, Any], frontend_themes
|
||||
hass: HomeAssistant,
|
||||
hass_storage: dict[str, Any],
|
||||
freezer: FrozenDateTimeFactory,
|
||||
frontend_themes,
|
||||
) -> None:
|
||||
"""Test that theme settings are restores after restart."""
|
||||
|
||||
|
@ -233,7 +235,8 @@ async def test_themes_save_storage(
|
|||
)
|
||||
|
||||
# To trigger the call_later
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=60))
|
||||
freezer.tick(60.0)
|
||||
async_fire_time_changed(hass)
|
||||
# To execute the save
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
|
|
@ -1400,6 +1400,8 @@ async def test_replaying_payload_same_topic(
|
|||
hass, "test/state", "online", qos=0, retain=True
|
||||
) # Simulate a (retained) message played back
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(calls_a) == 1
|
||||
mqtt_client_mock.subscribe.assert_called()
|
||||
calls_a = []
|
||||
|
@ -1498,6 +1500,7 @@ async def test_replaying_payload_after_resubscribing(
|
|||
await hass.async_block_till_done()
|
||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_block_till_done()
|
||||
mqtt_client_mock.subscribe.assert_called()
|
||||
|
||||
# Simulate a (retained) message played back
|
||||
|
@ -1638,6 +1641,7 @@ async def test_not_calling_unsubscribe_with_active_subscribers(
|
|||
await hass.async_block_till_done()
|
||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
||||
await hass.async_block_till_done()
|
||||
await hass.async_block_till_done()
|
||||
assert mqtt_client_mock.subscribe.called
|
||||
|
||||
unsub()
|
||||
|
|
|
@ -6,6 +6,7 @@ import os
|
|||
from typing import Any, NamedTuple
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import py
|
||||
import pytest
|
||||
|
||||
|
@ -19,7 +20,11 @@ from homeassistant.helpers import issue_registry as ir, storage
|
|||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.color import RGBColor
|
||||
|
||||
from tests.common import async_fire_time_changed, async_test_home_assistant
|
||||
from tests.common import (
|
||||
async_fire_time_changed,
|
||||
async_fire_time_changed_exact,
|
||||
async_test_home_assistant,
|
||||
)
|
||||
|
||||
MOCK_VERSION = 1
|
||||
MOCK_VERSION_2 = 2
|
||||
|
@ -115,7 +120,7 @@ async def test_loading_parallel(
|
|||
|
||||
|
||||
async def test_saving_with_delay(
|
||||
hass: HomeAssistant, store, hass_storage: dict[str, Any]
|
||||
hass: HomeAssistant, store: storage.Store, hass_storage: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test saving data after a delay."""
|
||||
store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
|
@ -131,6 +136,88 @@ async def test_saving_with_delay(
|
|||
}
|
||||
|
||||
|
||||
async def test_saving_with_delay_churn_reduction(
|
||||
hass: HomeAssistant,
|
||||
store: storage.Store,
|
||||
hass_storage: dict[str, Any],
|
||||
freezer: FrozenDateTimeFactory,
|
||||
) -> None:
|
||||
"""Test saving data after a delay with timer churn reduction."""
|
||||
store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
assert store.key not in hass_storage
|
||||
|
||||
freezer.tick(0.2)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key not in hass_storage
|
||||
|
||||
freezer.tick(1)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert hass_storage[store.key] == {
|
||||
"version": MOCK_VERSION,
|
||||
"minor_version": 1,
|
||||
"key": MOCK_KEY,
|
||||
"data": MOCK_DATA,
|
||||
}
|
||||
|
||||
del hass_storage[store.key]
|
||||
# Simulate what some of the registries do when they add 100 entities
|
||||
for _ in range(100):
|
||||
store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
|
||||
freezer.tick(0.2)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key not in hass_storage
|
||||
store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
|
||||
freezer.tick(1)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key in hass_storage
|
||||
|
||||
del hass_storage[store.key]
|
||||
|
||||
store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
freezer.tick(0.5)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key not in hass_storage
|
||||
|
||||
store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
freezer.tick(0.8)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key not in hass_storage
|
||||
|
||||
store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
freezer.tick(0.8)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key not in hass_storage
|
||||
|
||||
freezer.tick(0.2)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key in hass_storage
|
||||
|
||||
# Make sure if we do another delayed save
|
||||
# and one with a shorter delay, the shorter delay wins
|
||||
del hass_storage[store.key]
|
||||
store.async_delay_save(lambda: MOCK_DATA, 2)
|
||||
freezer.tick(0.2)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key not in hass_storage
|
||||
|
||||
store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
freezer.tick(1.0)
|
||||
async_fire_time_changed_exact(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key in hass_storage
|
||||
|
||||
|
||||
async def test_saving_on_final_write(
|
||||
hass: HomeAssistant, hass_storage: dict[str, Any]
|
||||
) -> None:
|
||||
|
@ -281,6 +368,23 @@ async def test_multiple_delay_save_calls(
|
|||
assert data == {"delay": "no"}
|
||||
|
||||
|
||||
async def test_delay_save_zero(
|
||||
hass: HomeAssistant, store: storage.Store, hass_storage: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test async_delay_save accepts 0."""
|
||||
store.async_delay_save(lambda: {"delay": "0"}, 0)
|
||||
# sleep is to run one event loop to get the task scheduled
|
||||
await asyncio.sleep(0)
|
||||
await hass.async_block_till_done()
|
||||
assert store.key in hass_storage
|
||||
assert hass_storage[store.key] == {
|
||||
"version": MOCK_VERSION,
|
||||
"minor_version": 1,
|
||||
"key": MOCK_KEY,
|
||||
"data": {"delay": "0"},
|
||||
}
|
||||
|
||||
|
||||
async def test_multiple_save_calls(
|
||||
hass: HomeAssistant, store, hass_storage: dict[str, Any]
|
||||
) -> None:
|
||||
|
@ -706,7 +810,7 @@ async def test_os_error_is_fatal(tmpdir: py.path.local) -> None:
|
|||
|
||||
|
||||
async def test_read_only_store(
|
||||
hass: HomeAssistant, read_only_store, hass_storage: dict[str, Any]
|
||||
hass: HomeAssistant, read_only_store: storage.Store, hass_storage: dict[str, Any]
|
||||
) -> None:
|
||||
"""Test store opened in read only mode does not save."""
|
||||
read_only_store.async_delay_save(lambda: MOCK_DATA, 1)
|
||||
|
|
|
@ -8,6 +8,7 @@ import logging
|
|||
from typing import Any
|
||||
from unittest.mock import ANY, AsyncMock, Mock, patch
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
|
||||
|
@ -733,13 +734,13 @@ async def test_entries_excludes_ignore_and_disabled(
|
|||
]
|
||||
|
||||
|
||||
async def test_saving_and_loading(hass: HomeAssistant) -> None:
|
||||
async def test_saving_and_loading(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test that we're saving and loading correctly."""
|
||||
mock_integration(
|
||||
hass,
|
||||
MockModule(
|
||||
"test", async_setup_entry=lambda *args: AsyncMock(return_value=True)
|
||||
),
|
||||
MockModule("test", async_setup_entry=AsyncMock(return_value=True)),
|
||||
)
|
||||
mock_platform(hass, "test.config_flow", None)
|
||||
|
||||
|
@ -784,7 +785,8 @@ async def test_saving_and_loading(hass: HomeAssistant) -> None:
|
|||
)
|
||||
|
||||
# To trigger the call_later
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))
|
||||
freezer.tick(1.0)
|
||||
async_fire_time_changed(hass)
|
||||
# To execute the save
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
|
Loading…
Reference in New Issue