core/homeassistant/components/google/__init__.py

458 lines
15 KiB
Python

"""Support for Google - Calendar Event Devices."""
from collections.abc import Mapping
from datetime import datetime, timedelta, timezone
from enum import Enum
import logging
import os
from typing import Any
from googleapiclient import discovery as google_discovery
import httplib2
from oauth2client.client import (
FlowExchangeError,
OAuth2DeviceCodeError,
OAuth2WebServerFlow,
)
from oauth2client.file import Storage
import voluptuous as vol
from voluptuous.error import Error as VoluptuousError
import yaml
from homeassistant.components import persistent_notification
from homeassistant.const import (
CONF_CLIENT_ID,
CONF_CLIENT_SECRET,
CONF_DEVICE_ID,
CONF_ENTITIES,
CONF_NAME,
CONF_OFFSET,
Platform,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import generate_entity_id
from homeassistant.helpers.event import track_utc_time_change
from homeassistant.helpers.typing import ConfigType
from homeassistant.util import convert
_LOGGER = logging.getLogger(__name__)
DOMAIN = "google"
ENTITY_ID_FORMAT = DOMAIN + ".{}"
CONF_TRACK_NEW = "track_new_calendar"
CONF_CAL_ID = "cal_id"
CONF_TRACK = "track"
CONF_SEARCH = "search"
CONF_IGNORE_AVAILABILITY = "ignore_availability"
CONF_MAX_RESULTS = "max_results"
CONF_CALENDAR_ACCESS = "calendar_access"
DEFAULT_CONF_TRACK_NEW = True
DEFAULT_CONF_OFFSET = "!!"
EVENT_CALENDAR_ID = "calendar_id"
EVENT_DESCRIPTION = "description"
EVENT_END_CONF = "end"
EVENT_END_DATE = "end_date"
EVENT_END_DATETIME = "end_date_time"
EVENT_IN = "in"
EVENT_IN_DAYS = "days"
EVENT_IN_WEEKS = "weeks"
EVENT_START_CONF = "start"
EVENT_START_DATE = "start_date"
EVENT_START_DATETIME = "start_date_time"
EVENT_SUMMARY = "summary"
EVENT_TYPES_CONF = "event_types"
NOTIFICATION_ID = "google_calendar_notification"
NOTIFICATION_TITLE = "Google Calendar Setup"
GROUP_NAME_ALL_CALENDARS = "Google Calendar Sensors"
SERVICE_SCAN_CALENDARS = "scan_for_calendars"
SERVICE_FOUND_CALENDARS = "found_calendar"
SERVICE_ADD_EVENT = "add_event"
DATA_INDEX = "google_calendars"
YAML_DEVICES = f"{DOMAIN}_calendars.yaml"
TOKEN_FILE = f".{DOMAIN}.token"
class FeatureAccess(Enum):
"""Class to represent different access scopes."""
read_only = "https://www.googleapis.com/auth/calendar.readonly"
read_write = "https://www.googleapis.com/auth/calendar"
def __init__(self, scope: str) -> None:
"""Init instance."""
self._scope = scope
@property
def scope(self) -> str:
"""Google calendar scope for the feature."""
return self._scope
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_CLIENT_ID): cv.string,
vol.Required(CONF_CLIENT_SECRET): cv.string,
vol.Optional(CONF_TRACK_NEW): cv.boolean,
vol.Optional(CONF_CALENDAR_ACCESS, default="read_write"): cv.enum(
FeatureAccess
),
}
)
},
extra=vol.ALLOW_EXTRA,
)
_SINGLE_CALSEARCH_CONFIG = vol.All(
cv.deprecated(CONF_MAX_RESULTS),
vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_DEVICE_ID): cv.string,
vol.Optional(CONF_IGNORE_AVAILABILITY, default=True): cv.boolean,
vol.Optional(CONF_OFFSET): cv.string,
vol.Optional(CONF_SEARCH): cv.string,
vol.Optional(CONF_TRACK): cv.boolean,
vol.Optional(CONF_MAX_RESULTS): cv.positive_int, # Now unused
}
),
)
DEVICE_SCHEMA = vol.Schema(
{
vol.Required(CONF_CAL_ID): cv.string,
vol.Required(CONF_ENTITIES, None): vol.All(
cv.ensure_list, [_SINGLE_CALSEARCH_CONFIG]
),
},
extra=vol.ALLOW_EXTRA,
)
_EVENT_IN_TYPES = vol.Schema(
{
vol.Exclusive(EVENT_IN_DAYS, EVENT_TYPES_CONF): cv.positive_int,
vol.Exclusive(EVENT_IN_WEEKS, EVENT_TYPES_CONF): cv.positive_int,
}
)
ADD_EVENT_SERVICE_SCHEMA = vol.Schema(
{
vol.Required(EVENT_CALENDAR_ID): cv.string,
vol.Required(EVENT_SUMMARY): cv.string,
vol.Optional(EVENT_DESCRIPTION, default=""): cv.string,
vol.Exclusive(EVENT_START_DATE, EVENT_START_CONF): cv.date,
vol.Exclusive(EVENT_END_DATE, EVENT_END_CONF): cv.date,
vol.Exclusive(EVENT_START_DATETIME, EVENT_START_CONF): cv.datetime,
vol.Exclusive(EVENT_END_DATETIME, EVENT_END_CONF): cv.datetime,
vol.Exclusive(EVENT_IN, EVENT_START_CONF, EVENT_END_CONF): _EVENT_IN_TYPES,
}
)
def do_authentication(
hass: HomeAssistant, hass_config: ConfigType, config: ConfigType
) -> bool:
"""Notify user of actions and authenticate.
Notify user of user_code and verification_url then poll
until we have an access token.
"""
oauth = OAuth2WebServerFlow(
client_id=config[CONF_CLIENT_ID],
client_secret=config[CONF_CLIENT_SECRET],
scope=config[CONF_CALENDAR_ACCESS].scope,
redirect_uri="Home-Assistant.io",
)
try:
dev_flow = oauth.step1_get_device_and_user_codes()
except OAuth2DeviceCodeError as err:
persistent_notification.create(
hass,
f"Error: {err}<br />You will need to restart hass after fixing." "",
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)
return False
persistent_notification.create(
hass,
(
f"In order to authorize Home-Assistant to view your calendars "
f'you must visit: <a href="{dev_flow.verification_url}" target="_blank">{dev_flow.verification_url}</a> and enter '
f"code: {dev_flow.user_code}"
),
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)
def step2_exchange(now: datetime) -> None:
"""Keep trying to validate the user_code until it expires."""
_LOGGER.debug("Attempting to validate user code")
# For some reason, oauth.step1_get_device_and_user_codes() returns a datetime
# object without tzinfo. For the comparison below to work, it needs one.
user_code_expiry = dev_flow.user_code_expiry.replace(tzinfo=timezone.utc)
if now >= user_code_expiry:
persistent_notification.create(
hass,
"Authentication code expired, please restart "
"Home-Assistant and try again",
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)
listener()
return
try:
credentials = oauth.step2_exchange(device_flow_info=dev_flow)
except FlowExchangeError:
# not ready yet, call again
return
storage = Storage(hass.config.path(TOKEN_FILE))
storage.put(credentials)
do_setup(hass, hass_config, config)
listener()
persistent_notification.create(
hass,
(
f"We are all setup now. Check {YAML_DEVICES} for calendars that have "
f"been found"
),
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)
listener = track_utc_time_change(
hass, step2_exchange, second=range(0, 60, dev_flow.interval)
)
return True
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Google platform."""
if DATA_INDEX not in hass.data:
hass.data[DATA_INDEX] = {}
if not (conf := config.get(DOMAIN, {})):
# component is set up by tts platform
return True
token_file = hass.config.path(TOKEN_FILE)
if not os.path.isfile(token_file):
_LOGGER.debug("Token file does not exist, authenticating for first time")
do_authentication(hass, config, conf)
else:
if not check_correct_scopes(hass, token_file, conf):
_LOGGER.debug("Existing scopes are not sufficient, re-authenticating")
do_authentication(hass, config, conf)
else:
do_setup(hass, config, conf)
return True
def check_correct_scopes(
hass: HomeAssistant, token_file: str, config: ConfigType
) -> bool:
"""Check for the correct scopes in file."""
creds = Storage(token_file).get()
if not creds or not creds.scopes:
return False
target_scope = config[CONF_CALENDAR_ACCESS].scope
return target_scope in creds.scopes
class GoogleCalendarService:
"""Calendar service interface to Google."""
def __init__(self, token_file: str) -> None:
"""Init the Google Calendar service."""
self.token_file = token_file
def get(self) -> google_discovery.Resource:
"""Get the calendar service from the storage file token."""
credentials = Storage(self.token_file).get()
http = credentials.authorize(httplib2.Http())
service = google_discovery.build(
"calendar", "v3", http=http, cache_discovery=False
)
return service
def setup_services(
hass: HomeAssistant,
hass_config: ConfigType,
config: ConfigType,
track_new_found_calendars: bool,
calendar_service: GoogleCalendarService,
) -> None:
"""Set up the service listeners."""
def _found_calendar(call: ServiceCall) -> None:
"""Check if we know about a calendar and generate PLATFORM_DISCOVER."""
calendar = get_calendar_info(hass, call.data)
if hass.data[DATA_INDEX].get(calendar[CONF_CAL_ID]) is not None:
return
hass.data[DATA_INDEX].update({calendar[CONF_CAL_ID]: calendar})
update_config(
hass.config.path(YAML_DEVICES), hass.data[DATA_INDEX][calendar[CONF_CAL_ID]]
)
discovery.load_platform(
hass,
Platform.CALENDAR,
DOMAIN,
hass.data[DATA_INDEX][calendar[CONF_CAL_ID]],
hass_config,
)
hass.services.register(DOMAIN, SERVICE_FOUND_CALENDARS, _found_calendar)
def _scan_for_calendars(call: ServiceCall) -> None:
"""Scan for new calendars."""
service = calendar_service.get()
cal_list = service.calendarList()
calendars = cal_list.list().execute()["items"]
for calendar in calendars:
calendar["track"] = track_new_found_calendars
hass.services.call(DOMAIN, SERVICE_FOUND_CALENDARS, calendar)
hass.services.register(DOMAIN, SERVICE_SCAN_CALENDARS, _scan_for_calendars)
def _add_event(call: ServiceCall) -> None:
"""Add a new event to calendar."""
service = calendar_service.get()
start = {}
end = {}
if EVENT_IN in call.data:
if EVENT_IN_DAYS in call.data[EVENT_IN]:
now = datetime.now()
start_in = now + timedelta(days=call.data[EVENT_IN][EVENT_IN_DAYS])
end_in = start_in + timedelta(days=1)
start = {"date": start_in.strftime("%Y-%m-%d")}
end = {"date": end_in.strftime("%Y-%m-%d")}
elif EVENT_IN_WEEKS in call.data[EVENT_IN]:
now = datetime.now()
start_in = now + timedelta(weeks=call.data[EVENT_IN][EVENT_IN_WEEKS])
end_in = start_in + timedelta(days=1)
start = {"date": start_in.strftime("%Y-%m-%d")}
end = {"date": end_in.strftime("%Y-%m-%d")}
elif EVENT_START_DATE in call.data:
start = {"date": str(call.data[EVENT_START_DATE])}
end = {"date": str(call.data[EVENT_END_DATE])}
elif EVENT_START_DATETIME in call.data:
start_dt = str(
call.data[EVENT_START_DATETIME].strftime("%Y-%m-%dT%H:%M:%S")
)
end_dt = str(call.data[EVENT_END_DATETIME].strftime("%Y-%m-%dT%H:%M:%S"))
start = {"dateTime": start_dt, "timeZone": str(hass.config.time_zone)}
end = {"dateTime": end_dt, "timeZone": str(hass.config.time_zone)}
event = {
"summary": call.data[EVENT_SUMMARY],
"description": call.data[EVENT_DESCRIPTION],
"start": start,
"end": end,
}
service_data = {"calendarId": call.data[EVENT_CALENDAR_ID], "body": event}
event = service.events().insert(**service_data).execute()
# Only expose the add event service if we have the correct permissions
if config.get(CONF_CALENDAR_ACCESS) is FeatureAccess.read_write:
hass.services.register(
DOMAIN, SERVICE_ADD_EVENT, _add_event, schema=ADD_EVENT_SERVICE_SCHEMA
)
def do_setup(hass: HomeAssistant, hass_config: ConfigType, config: ConfigType) -> None:
"""Run the setup after we have everything configured."""
_LOGGER.debug("Setting up integration")
# Load calendars the user has configured
hass.data[DATA_INDEX] = load_config(hass.config.path(YAML_DEVICES))
calendar_service = GoogleCalendarService(hass.config.path(TOKEN_FILE))
track_new_found_calendars = convert(
config.get(CONF_TRACK_NEW), bool, DEFAULT_CONF_TRACK_NEW
)
assert track_new_found_calendars is not None
setup_services(
hass, hass_config, config, track_new_found_calendars, calendar_service
)
for calendar in hass.data[DATA_INDEX].values():
discovery.load_platform(hass, Platform.CALENDAR, DOMAIN, calendar, hass_config)
# Look for any new calendars
hass.services.call(DOMAIN, SERVICE_SCAN_CALENDARS, None)
def get_calendar_info(
hass: HomeAssistant, calendar: Mapping[str, Any]
) -> dict[str, Any]:
"""Convert data from Google into DEVICE_SCHEMA."""
calendar_info: dict[str, Any] = DEVICE_SCHEMA(
{
CONF_CAL_ID: calendar["id"],
CONF_ENTITIES: [
{
CONF_TRACK: calendar["track"],
CONF_NAME: calendar["summary"],
CONF_DEVICE_ID: generate_entity_id(
"{}", calendar["summary"], hass=hass
),
}
],
}
)
return calendar_info
def load_config(path: str) -> dict[str, Any]:
"""Load the google_calendar_devices.yaml."""
calendars = {}
try:
with open(path, encoding="utf8") as file:
data = yaml.safe_load(file)
for calendar in data:
try:
calendars.update({calendar[CONF_CAL_ID]: DEVICE_SCHEMA(calendar)})
except VoluptuousError as exception:
# keep going
_LOGGER.warning("Calendar Invalid Data: %s", exception)
except FileNotFoundError:
# When YAML file could not be loaded/did not contain a dict
return {}
return calendars
def update_config(path: str, calendar: dict[str, Any]) -> None:
"""Write the google_calendar_devices.yaml."""
with open(path, "a", encoding="utf8") as out:
out.write("\n")
yaml.dump([calendar], out, default_flow_style=False)