core/homeassistant/components/profiler/__init__.py

81 lines
2.4 KiB
Python

"""The profiler integration."""
import asyncio
import cProfile
import time
from pyprof2calltree import convert
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers.service import async_register_admin_service
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
SERVICE_START = "start"
CONF_SECONDS = "seconds"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the profiler component."""
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up Profiler from a config entry."""
lock = asyncio.Lock()
async def _async_run_profile(call: ServiceCall):
async with lock:
await _async_generate_profile(hass, call)
async_register_admin_service(
hass,
DOMAIN,
SERVICE_START,
_async_run_profile,
schema=vol.Schema(
{vol.Optional(CONF_SECONDS, default=60.0): vol.Coerce(float)}
),
)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Unload a config entry."""
hass.services.async_remove(domain=DOMAIN, service=SERVICE_START)
return True
async def _async_generate_profile(hass: HomeAssistant, call: ServiceCall):
start_time = int(time.time() * 1000000)
hass.components.persistent_notification.async_create(
"The profile started. This notification will be updated when it is complete.",
title="Profile Started",
notification_id=f"profiler_{start_time}",
)
profiler = cProfile.Profile()
profiler.enable()
await asyncio.sleep(float(call.data[CONF_SECONDS]))
profiler.disable()
cprofile_path = hass.config.path(f"profile.{start_time}.cprof")
callgrind_path = hass.config.path(f"callgrind.out.{start_time}")
await hass.async_add_executor_job(
_write_profile, profiler, cprofile_path, callgrind_path
)
hass.components.persistent_notification.async_create(
f"Wrote cProfile data to {cprofile_path} and callgrind data to {callgrind_path}",
title="Profile Complete",
notification_id=f"profiler_{start_time}",
)
def _write_profile(profiler, cprofile_path, callgrind_path):
profiler.create_stats()
profiler.dump_stats(cprofile_path)
convert(profiler.getstats(), callgrind_path)