"""Support for Google - Calendar Event Devices.""" from collections.abc import Mapping from datetime import datetime, timedelta, timezone from enum import Enum import logging import os from typing import Any from googleapiclient import discovery as google_discovery import httplib2 from oauth2client.client import ( FlowExchangeError, OAuth2DeviceCodeError, OAuth2WebServerFlow, ) from oauth2client.file import Storage import voluptuous as vol from voluptuous.error import Error as VoluptuousError import yaml from homeassistant.components import persistent_notification from homeassistant.const import ( CONF_CLIENT_ID, CONF_CLIENT_SECRET, CONF_DEVICE_ID, CONF_ENTITIES, CONF_NAME, CONF_OFFSET, Platform, ) from homeassistant.core import HomeAssistant, ServiceCall from homeassistant.helpers import discovery import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entity import generate_entity_id from homeassistant.helpers.event import track_utc_time_change from homeassistant.helpers.typing import ConfigType from homeassistant.util import convert _LOGGER = logging.getLogger(__name__) DOMAIN = "google" ENTITY_ID_FORMAT = DOMAIN + ".{}" CONF_TRACK_NEW = "track_new_calendar" CONF_CAL_ID = "cal_id" CONF_TRACK = "track" CONF_SEARCH = "search" CONF_IGNORE_AVAILABILITY = "ignore_availability" CONF_MAX_RESULTS = "max_results" CONF_CALENDAR_ACCESS = "calendar_access" DEFAULT_CONF_TRACK_NEW = True DEFAULT_CONF_OFFSET = "!!" EVENT_CALENDAR_ID = "calendar_id" EVENT_DESCRIPTION = "description" EVENT_END_CONF = "end" EVENT_END_DATE = "end_date" EVENT_END_DATETIME = "end_date_time" EVENT_IN = "in" EVENT_IN_DAYS = "days" EVENT_IN_WEEKS = "weeks" EVENT_START_CONF = "start" EVENT_START_DATE = "start_date" EVENT_START_DATETIME = "start_date_time" EVENT_SUMMARY = "summary" EVENT_TYPES_CONF = "event_types" NOTIFICATION_ID = "google_calendar_notification" NOTIFICATION_TITLE = "Google Calendar Setup" GROUP_NAME_ALL_CALENDARS = "Google Calendar Sensors" SERVICE_SCAN_CALENDARS = "scan_for_calendars" SERVICE_FOUND_CALENDARS = "found_calendar" SERVICE_ADD_EVENT = "add_event" DATA_INDEX = "google_calendars" YAML_DEVICES = f"{DOMAIN}_calendars.yaml" TOKEN_FILE = f".{DOMAIN}.token" class FeatureAccess(Enum): """Class to represent different access scopes.""" read_only = "https://www.googleapis.com/auth/calendar.readonly" read_write = "https://www.googleapis.com/auth/calendar" def __init__(self, scope: str) -> None: """Init instance.""" self._scope = scope @property def scope(self) -> str: """Google calendar scope for the feature.""" return self._scope CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.Schema( { vol.Required(CONF_CLIENT_ID): cv.string, vol.Required(CONF_CLIENT_SECRET): cv.string, vol.Optional(CONF_TRACK_NEW): cv.boolean, vol.Optional(CONF_CALENDAR_ACCESS, default="read_write"): cv.enum( FeatureAccess ), } ) }, extra=vol.ALLOW_EXTRA, ) _SINGLE_CALSEARCH_CONFIG = vol.All( cv.deprecated(CONF_MAX_RESULTS), vol.Schema( { vol.Required(CONF_NAME): cv.string, vol.Required(CONF_DEVICE_ID): cv.string, vol.Optional(CONF_IGNORE_AVAILABILITY, default=True): cv.boolean, vol.Optional(CONF_OFFSET): cv.string, vol.Optional(CONF_SEARCH): cv.string, vol.Optional(CONF_TRACK): cv.boolean, vol.Optional(CONF_MAX_RESULTS): cv.positive_int, # Now unused } ), ) DEVICE_SCHEMA = vol.Schema( { vol.Required(CONF_CAL_ID): cv.string, vol.Required(CONF_ENTITIES, None): vol.All( cv.ensure_list, [_SINGLE_CALSEARCH_CONFIG] ), }, extra=vol.ALLOW_EXTRA, ) _EVENT_IN_TYPES = vol.Schema( { vol.Exclusive(EVENT_IN_DAYS, EVENT_TYPES_CONF): cv.positive_int, vol.Exclusive(EVENT_IN_WEEKS, EVENT_TYPES_CONF): cv.positive_int, } ) ADD_EVENT_SERVICE_SCHEMA = vol.Schema( { vol.Required(EVENT_CALENDAR_ID): cv.string, vol.Required(EVENT_SUMMARY): cv.string, vol.Optional(EVENT_DESCRIPTION, default=""): cv.string, vol.Exclusive(EVENT_START_DATE, EVENT_START_CONF): cv.date, vol.Exclusive(EVENT_END_DATE, EVENT_END_CONF): cv.date, vol.Exclusive(EVENT_START_DATETIME, EVENT_START_CONF): cv.datetime, vol.Exclusive(EVENT_END_DATETIME, EVENT_END_CONF): cv.datetime, vol.Exclusive(EVENT_IN, EVENT_START_CONF, EVENT_END_CONF): _EVENT_IN_TYPES, } ) def do_authentication( hass: HomeAssistant, hass_config: ConfigType, config: ConfigType ) -> bool: """Notify user of actions and authenticate. Notify user of user_code and verification_url then poll until we have an access token. """ oauth = OAuth2WebServerFlow( client_id=config[CONF_CLIENT_ID], client_secret=config[CONF_CLIENT_SECRET], scope=config[CONF_CALENDAR_ACCESS].scope, redirect_uri="Home-Assistant.io", ) try: dev_flow = oauth.step1_get_device_and_user_codes() except OAuth2DeviceCodeError as err: persistent_notification.create( hass, f"Error: {err}
You will need to restart hass after fixing." "", title=NOTIFICATION_TITLE, notification_id=NOTIFICATION_ID, ) return False persistent_notification.create( hass, ( f"In order to authorize Home-Assistant to view your calendars " f'you must visit: {dev_flow.verification_url} and enter ' f"code: {dev_flow.user_code}" ), title=NOTIFICATION_TITLE, notification_id=NOTIFICATION_ID, ) def step2_exchange(now: datetime) -> None: """Keep trying to validate the user_code until it expires.""" _LOGGER.debug("Attempting to validate user code") # For some reason, oauth.step1_get_device_and_user_codes() returns a datetime # object without tzinfo. For the comparison below to work, it needs one. user_code_expiry = dev_flow.user_code_expiry.replace(tzinfo=timezone.utc) if now >= user_code_expiry: persistent_notification.create( hass, "Authentication code expired, please restart " "Home-Assistant and try again", title=NOTIFICATION_TITLE, notification_id=NOTIFICATION_ID, ) listener() return try: credentials = oauth.step2_exchange(device_flow_info=dev_flow) except FlowExchangeError: # not ready yet, call again return storage = Storage(hass.config.path(TOKEN_FILE)) storage.put(credentials) do_setup(hass, hass_config, config) listener() persistent_notification.create( hass, ( f"We are all setup now. Check {YAML_DEVICES} for calendars that have " f"been found" ), title=NOTIFICATION_TITLE, notification_id=NOTIFICATION_ID, ) listener = track_utc_time_change( hass, step2_exchange, second=range(0, 60, dev_flow.interval) ) return True def setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the Google platform.""" if DATA_INDEX not in hass.data: hass.data[DATA_INDEX] = {} if not (conf := config.get(DOMAIN, {})): # component is set up by tts platform return True token_file = hass.config.path(TOKEN_FILE) if not os.path.isfile(token_file): _LOGGER.debug("Token file does not exist, authenticating for first time") do_authentication(hass, config, conf) else: if not check_correct_scopes(hass, token_file, conf): _LOGGER.debug("Existing scopes are not sufficient, re-authenticating") do_authentication(hass, config, conf) else: do_setup(hass, config, conf) return True def check_correct_scopes( hass: HomeAssistant, token_file: str, config: ConfigType ) -> bool: """Check for the correct scopes in file.""" creds = Storage(token_file).get() if not creds or not creds.scopes: return False target_scope = config[CONF_CALENDAR_ACCESS].scope return target_scope in creds.scopes class GoogleCalendarService: """Calendar service interface to Google.""" def __init__(self, token_file: str) -> None: """Init the Google Calendar service.""" self.token_file = token_file def get(self) -> google_discovery.Resource: """Get the calendar service from the storage file token.""" credentials = Storage(self.token_file).get() http = credentials.authorize(httplib2.Http()) service = google_discovery.build( "calendar", "v3", http=http, cache_discovery=False ) return service def setup_services( hass: HomeAssistant, hass_config: ConfigType, config: ConfigType, track_new_found_calendars: bool, calendar_service: GoogleCalendarService, ) -> None: """Set up the service listeners.""" def _found_calendar(call: ServiceCall) -> None: """Check if we know about a calendar and generate PLATFORM_DISCOVER.""" calendar = get_calendar_info(hass, call.data) if hass.data[DATA_INDEX].get(calendar[CONF_CAL_ID]) is not None: return hass.data[DATA_INDEX].update({calendar[CONF_CAL_ID]: calendar}) update_config( hass.config.path(YAML_DEVICES), hass.data[DATA_INDEX][calendar[CONF_CAL_ID]] ) discovery.load_platform( hass, Platform.CALENDAR, DOMAIN, hass.data[DATA_INDEX][calendar[CONF_CAL_ID]], hass_config, ) hass.services.register(DOMAIN, SERVICE_FOUND_CALENDARS, _found_calendar) def _scan_for_calendars(call: ServiceCall) -> None: """Scan for new calendars.""" service = calendar_service.get() cal_list = service.calendarList() calendars = cal_list.list().execute()["items"] for calendar in calendars: calendar["track"] = track_new_found_calendars hass.services.call(DOMAIN, SERVICE_FOUND_CALENDARS, calendar) hass.services.register(DOMAIN, SERVICE_SCAN_CALENDARS, _scan_for_calendars) def _add_event(call: ServiceCall) -> None: """Add a new event to calendar.""" service = calendar_service.get() start = {} end = {} if EVENT_IN in call.data: if EVENT_IN_DAYS in call.data[EVENT_IN]: now = datetime.now() start_in = now + timedelta(days=call.data[EVENT_IN][EVENT_IN_DAYS]) end_in = start_in + timedelta(days=1) start = {"date": start_in.strftime("%Y-%m-%d")} end = {"date": end_in.strftime("%Y-%m-%d")} elif EVENT_IN_WEEKS in call.data[EVENT_IN]: now = datetime.now() start_in = now + timedelta(weeks=call.data[EVENT_IN][EVENT_IN_WEEKS]) end_in = start_in + timedelta(days=1) start = {"date": start_in.strftime("%Y-%m-%d")} end = {"date": end_in.strftime("%Y-%m-%d")} elif EVENT_START_DATE in call.data: start = {"date": str(call.data[EVENT_START_DATE])} end = {"date": str(call.data[EVENT_END_DATE])} elif EVENT_START_DATETIME in call.data: start_dt = str( call.data[EVENT_START_DATETIME].strftime("%Y-%m-%dT%H:%M:%S") ) end_dt = str(call.data[EVENT_END_DATETIME].strftime("%Y-%m-%dT%H:%M:%S")) start = {"dateTime": start_dt, "timeZone": str(hass.config.time_zone)} end = {"dateTime": end_dt, "timeZone": str(hass.config.time_zone)} event = { "summary": call.data[EVENT_SUMMARY], "description": call.data[EVENT_DESCRIPTION], "start": start, "end": end, } service_data = {"calendarId": call.data[EVENT_CALENDAR_ID], "body": event} event = service.events().insert(**service_data).execute() # Only expose the add event service if we have the correct permissions if config.get(CONF_CALENDAR_ACCESS) is FeatureAccess.read_write: hass.services.register( DOMAIN, SERVICE_ADD_EVENT, _add_event, schema=ADD_EVENT_SERVICE_SCHEMA ) def do_setup(hass: HomeAssistant, hass_config: ConfigType, config: ConfigType) -> None: """Run the setup after we have everything configured.""" _LOGGER.debug("Setting up integration") # Load calendars the user has configured hass.data[DATA_INDEX] = load_config(hass.config.path(YAML_DEVICES)) calendar_service = GoogleCalendarService(hass.config.path(TOKEN_FILE)) track_new_found_calendars = convert( config.get(CONF_TRACK_NEW), bool, DEFAULT_CONF_TRACK_NEW ) assert track_new_found_calendars is not None setup_services( hass, hass_config, config, track_new_found_calendars, calendar_service ) for calendar in hass.data[DATA_INDEX].values(): discovery.load_platform(hass, Platform.CALENDAR, DOMAIN, calendar, hass_config) # Look for any new calendars hass.services.call(DOMAIN, SERVICE_SCAN_CALENDARS, None) def get_calendar_info( hass: HomeAssistant, calendar: Mapping[str, Any] ) -> dict[str, Any]: """Convert data from Google into DEVICE_SCHEMA.""" calendar_info: dict[str, Any] = DEVICE_SCHEMA( { CONF_CAL_ID: calendar["id"], CONF_ENTITIES: [ { CONF_TRACK: calendar["track"], CONF_NAME: calendar["summary"], CONF_DEVICE_ID: generate_entity_id( "{}", calendar["summary"], hass=hass ), } ], } ) return calendar_info def load_config(path: str) -> dict[str, Any]: """Load the google_calendar_devices.yaml.""" calendars = {} try: with open(path, encoding="utf8") as file: data = yaml.safe_load(file) for calendar in data: try: calendars.update({calendar[CONF_CAL_ID]: DEVICE_SCHEMA(calendar)}) except VoluptuousError as exception: # keep going _LOGGER.warning("Calendar Invalid Data: %s", exception) except FileNotFoundError: # When YAML file could not be loaded/did not contain a dict return {} return calendars def update_config(path: str, calendar: dict[str, Any]) -> None: """Write the google_calendar_devices.yaml.""" with open(path, "a", encoding="utf8") as out: out.write("\n") yaml.dump([calendar], out, default_flow_style=False)