Add functionality to Mastodon (#112862)

* Adds functionality to Mastodon

* protect media type

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* update log warning

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* protect upload media

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* Update protected functions

---------

Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: Erik Montnemery <erik@montnemery.com>
pull/113080/head^2
Jeffrey Stone 2024-05-16 06:08:50 -04:00 committed by GitHub
parent 962dd81eb7
commit e6f5b08264
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 65 additions and 4 deletions

View File

@ -2,13 +2,18 @@
from __future__ import annotations
import mimetypes
from typing import Any
from mastodon import Mastodon
from mastodon.Mastodon import MastodonAPIError, MastodonUnauthorizedError
import voluptuous as vol
from homeassistant.components.notify import PLATFORM_SCHEMA, BaseNotificationService
from homeassistant.components.notify import (
ATTR_DATA,
PLATFORM_SCHEMA,
BaseNotificationService,
)
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_CLIENT_ID, CONF_CLIENT_SECRET
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
@ -16,6 +21,11 @@ from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import CONF_BASE_URL, DEFAULT_URL, LOGGER
ATTR_MEDIA = "media"
ATTR_TARGET = "target"
ATTR_MEDIA_WARNING = "media_warning"
ATTR_CONTENT_WARNING = "content_warning"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_ACCESS_TOKEN): cv.string,
@ -60,8 +70,59 @@ class MastodonNotificationService(BaseNotificationService):
self._api = api
def send_message(self, message: str = "", **kwargs: Any) -> None:
"""Send a message to a user."""
"""Toot a message, with media perhaps."""
data = kwargs.get(ATTR_DATA)
media = None
mediadata = None
target = None
sensitive = False
content_warning = None
if data:
media = data.get(ATTR_MEDIA)
if media:
if not self.hass.config.is_allowed_path(media):
LOGGER.warning("'%s' is not a whitelisted directory", media)
return
mediadata = self._upload_media(media)
target = data.get(ATTR_TARGET)
sensitive = data.get(ATTR_MEDIA_WARNING)
content_warning = data.get(ATTR_CONTENT_WARNING)
if mediadata:
try:
self._api.toot(message)
self._api.status_post(
message,
media_ids=mediadata["id"],
sensitive=sensitive,
visibility=target,
spoiler_text=content_warning,
)
except MastodonAPIError:
LOGGER.error("Unable to send message")
else:
try:
self._api.status_post(
message, visibility=target, spoiler_text=content_warning
)
except MastodonAPIError:
LOGGER.error("Unable to send message")
def _upload_media(self, media_path: Any = None) -> Any:
"""Upload media."""
with open(media_path, "rb"):
media_type = self._media_type(media_path)
try:
mediadata = self._api.media_post(media_path, mime_type=media_type)
except MastodonAPIError:
LOGGER.error(f"Unable to upload image {media_path}")
return mediadata
def _media_type(self, media_path: Any = None) -> Any:
"""Get media Type."""
(media_type, _) = mimetypes.guess_type(media_path)
return media_type