From eee2b2d54346fb83026ac3b590981edd06a60f5d Mon Sep 17 00:00:00 2001 From: Tomas Kislan Date: Tue, 20 Aug 2019 11:56:11 +0200 Subject: [PATCH] Add Minio component (#23567) * Add minio implementation * Static check changes * Added docstrings * Update docstrings * Update docstrings * Fix linter errors * Finally fix all docstring errors * Create services.yaml * Update CODEOWNERS * Final changes * Remove double underscores * Minor changes * Update config.yml * Review changes * Added tests * Fix lint errors * Move tests from unittest to pytest * Add minio as test requirement * Update test_minio_helper.py * Better event thread handling, added hass test * Update tests * Fixed lint errors * Update test_minio.py * Review changes * More review changes * Removed tests * Applied code style changes * Reformat test code --- .coveragerc | 1 + CODEOWNERS | 1 + homeassistant/components/minio/__init__.py | 265 ++++++++++++++++++ homeassistant/components/minio/manifest.json | 12 + .../components/minio/minio_helper.py | 209 ++++++++++++++ homeassistant/components/minio/services.yaml | 35 +++ requirements_all.txt | 3 + requirements_test_all.txt | 3 + script/gen_requirements_all.py | 1 + tests/components/minio/__init__.py | 1 + tests/components/minio/common.py | 48 ++++ tests/components/minio/test_minio.py | 190 +++++++++++++ 12 files changed, 769 insertions(+) create mode 100644 homeassistant/components/minio/__init__.py create mode 100644 homeassistant/components/minio/manifest.json create mode 100644 homeassistant/components/minio/minio_helper.py create mode 100644 homeassistant/components/minio/services.yaml create mode 100644 tests/components/minio/__init__.py create mode 100644 tests/components/minio/common.py create mode 100644 tests/components/minio/test_minio.py diff --git a/.coveragerc b/.coveragerc index d8153a7635c..e0bbbd66d7c 100644 --- a/.coveragerc +++ b/.coveragerc @@ -378,6 +378,7 @@ omit = homeassistant/components/miflora/sensor.py homeassistant/components/mikrotik/* homeassistant/components/mill/climate.py + homeassistant/components/minio/* homeassistant/components/mitemp_bt/sensor.py homeassistant/components/mjpeg/camera.py homeassistant/components/mobile_app/* diff --git a/CODEOWNERS b/CODEOWNERS index f6b9e79b8bd..1425c476478 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -173,6 +173,7 @@ homeassistant/components/meteoalarm/* @rolfberkenbosch homeassistant/components/miflora/* @danielhiversen @ChristianKuehnel homeassistant/components/mill/* @danielhiversen homeassistant/components/min_max/* @fabaff +homeassistant/components/minio/* @tkislan homeassistant/components/mobile_app/* @robbiet480 homeassistant/components/monoprice/* @etsinko homeassistant/components/moon/* @fabaff diff --git a/homeassistant/components/minio/__init__.py b/homeassistant/components/minio/__init__.py new file mode 100644 index 00000000000..cede3a7aad5 --- /dev/null +++ b/homeassistant/components/minio/__init__.py @@ -0,0 +1,265 @@ +"""Minio component.""" +import logging +import os +import threading +from queue import Queue +from typing import List + +import voluptuous as vol + +from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP +import homeassistant.helpers.config_validation as cv + +from .minio_helper import create_minio_client, MinioEventThread + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = "minio" +CONF_HOST = "host" +CONF_PORT = "port" +CONF_ACCESS_KEY = "access_key" +CONF_SECRET_KEY = "secret_key" +CONF_SECURE = "secure" +CONF_LISTEN = "listen" +CONF_LISTEN_BUCKET = "bucket" +CONF_LISTEN_PREFIX = "prefix" +CONF_LISTEN_SUFFIX = "suffix" +CONF_LISTEN_EVENTS = "events" + +ATTR_BUCKET = "bucket" +ATTR_KEY = "key" +ATTR_FILE_PATH = "file_path" + +DEFAULT_LISTEN_PREFIX = "" +DEFAULT_LISTEN_SUFFIX = ".*" +DEFAULT_LISTEN_EVENTS = "s3:ObjectCreated:*" + +CONFIG_SCHEMA = vol.Schema( + { + DOMAIN: vol.Schema( + { + vol.Required(CONF_HOST): cv.string, + vol.Required(CONF_PORT): cv.port, + vol.Required(CONF_ACCESS_KEY): cv.string, + vol.Required(CONF_SECRET_KEY): cv.string, + vol.Required(CONF_SECURE): cv.boolean, + vol.Optional(CONF_LISTEN, default=[]): vol.All( + cv.ensure_list, + [ + vol.Schema( + { + vol.Required(CONF_LISTEN_BUCKET): cv.string, + vol.Optional( + CONF_LISTEN_PREFIX, default=DEFAULT_LISTEN_PREFIX + ): cv.string, + vol.Optional( + CONF_LISTEN_SUFFIX, default=DEFAULT_LISTEN_SUFFIX + ): cv.string, + vol.Optional( + CONF_LISTEN_EVENTS, default=DEFAULT_LISTEN_EVENTS + ): cv.string, + } + ) + ], + ), + } + ) + }, + extra=vol.ALLOW_EXTRA, +) + +BUCKET_KEY_SCHEMA = vol.Schema( + {vol.Required(ATTR_BUCKET): cv.template, vol.Required(ATTR_KEY): cv.template} +) + +BUCKET_KEY_FILE_SCHEMA = BUCKET_KEY_SCHEMA.extend( + {vol.Required(ATTR_FILE_PATH): cv.template} +) + + +def setup(hass, config): + """Set up MinioClient and event listeners.""" + conf = config[DOMAIN] + + host = conf[CONF_HOST] + port = conf[CONF_PORT] + access_key = conf[CONF_ACCESS_KEY] + secret_key = conf[CONF_SECRET_KEY] + secure = conf[CONF_SECURE] + + queue_listener = QueueListener(hass) + queue = queue_listener.queue + + hass.bus.listen_once(EVENT_HOMEASSISTANT_START, queue_listener.start_handler) + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, queue_listener.stop_handler) + + def _setup_listener(listener_conf): + bucket = listener_conf[CONF_LISTEN_BUCKET] + prefix = listener_conf[CONF_LISTEN_PREFIX] + suffix = listener_conf[CONF_LISTEN_SUFFIX] + events = listener_conf[CONF_LISTEN_EVENTS] + + minio_listener = MinioListener( + queue, + get_minio_endpoint(host, port), + access_key, + secret_key, + secure, + bucket, + prefix, + suffix, + events, + ) + + hass.bus.listen_once(EVENT_HOMEASSISTANT_START, minio_listener.start_handler) + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, minio_listener.stop_handler) + + for listen_conf in conf[CONF_LISTEN]: + _setup_listener(listen_conf) + + minio_client = create_minio_client( + get_minio_endpoint(host, port), access_key, secret_key, secure + ) + + def _render_service_value(service, key): + value = service.data[key] + value.hass = hass + return value.async_render() + + def put_file(service): + """Upload file service.""" + bucket = _render_service_value(service, ATTR_BUCKET) + key = _render_service_value(service, ATTR_KEY) + file_path = _render_service_value(service, ATTR_FILE_PATH) + + if not hass.config.is_allowed_path(file_path): + _LOGGER.error("Invalid file_path %s", file_path) + return + + minio_client.fput_object(bucket, key, file_path) + + def get_file(service): + """Download file service.""" + bucket = _render_service_value(service, ATTR_BUCKET) + key = _render_service_value(service, ATTR_KEY) + file_path = _render_service_value(service, ATTR_FILE_PATH) + + if not hass.config.is_allowed_path(file_path): + _LOGGER.error("Invalid file_path %s", file_path) + return + + minio_client.fget_object(bucket, key, file_path) + + def remove_file(service): + """Delete file service.""" + bucket = _render_service_value(service, ATTR_BUCKET) + key = _render_service_value(service, ATTR_KEY) + + minio_client.remove_object(bucket, key) + + hass.services.register(DOMAIN, "put", put_file, schema=BUCKET_KEY_FILE_SCHEMA) + hass.services.register(DOMAIN, "get", get_file, schema=BUCKET_KEY_FILE_SCHEMA) + hass.services.register(DOMAIN, "remove", remove_file, schema=BUCKET_KEY_SCHEMA) + + return True + + +def get_minio_endpoint(host: str, port: int) -> str: + """Create minio endpoint from host and port.""" + return "{}:{}".format(host, port) + + +class QueueListener(threading.Thread): + """Forward events from queue into HASS event bus.""" + + def __init__(self, hass): + """Create queue.""" + super().__init__() + self._hass = hass + self._queue = Queue() + + def run(self): + """Listen to queue events, and forward them to HASS event bus.""" + _LOGGER.info("Running QueueListener") + while True: + event = self._queue.get() + if event is None: + break + + _, file_name = os.path.split(event[ATTR_KEY]) + + _LOGGER.debug( + "Sending event %s, %s, %s", + event["event_name"], + event[ATTR_BUCKET], + event[ATTR_KEY], + ) + self._hass.bus.fire(DOMAIN, {"file_name": file_name, **event}) + + @property + def queue(self): + """Return wrapped queue.""" + return self._queue + + def stop(self): + """Stop run by putting None into queue and join the thread.""" + _LOGGER.info("Stopping QueueListener") + self._queue.put(None) + self.join() + _LOGGER.info("Stopped QueueListener") + + def start_handler(self, _): + """Start handler helper method.""" + self.start() + + def stop_handler(self, _): + """Stop handler helper method.""" + self.stop() + + +class MinioListener: + """MinioEventThread wrapper with helper methods.""" + + def __init__( + self, + queue: Queue, + endpoint: str, + access_key: str, + secret_key: str, + secure: bool, + bucket_name: str, + prefix: str, + suffix: str, + events: List[str], + ): + """Create Listener.""" + self._queue = queue + self._endpoint = endpoint + self._access_key = access_key + self._secret_key = secret_key + self._secure = secure + self._bucket_name = bucket_name + self._prefix = prefix + self._suffix = suffix + self._events = events + self._minio_event_thread = None + + def start_handler(self, _): + """Create and start the event thread.""" + self._minio_event_thread = MinioEventThread( + self._queue, + self._endpoint, + self._access_key, + self._secret_key, + self._secure, + self._bucket_name, + self._prefix, + self._suffix, + self._events, + ) + self._minio_event_thread.start() + + def stop_handler(self, _): + """Issue stop and wait for thread to join.""" + if self._minio_event_thread is not None: + self._minio_event_thread.stop() diff --git a/homeassistant/components/minio/manifest.json b/homeassistant/components/minio/manifest.json new file mode 100644 index 00000000000..2b2f84836ea --- /dev/null +++ b/homeassistant/components/minio/manifest.json @@ -0,0 +1,12 @@ +{ + "domain": "minio", + "name": "Minio", + "documentation": "https://www.home-assistant.io/components/minio", + "requirements": [ + "minio==4.0.9" + ], + "dependencies": [], + "codeowners": [ + "@tkislan" + ] +} diff --git a/homeassistant/components/minio/minio_helper.py b/homeassistant/components/minio/minio_helper.py new file mode 100644 index 00000000000..bd7b15d27d4 --- /dev/null +++ b/homeassistant/components/minio/minio_helper.py @@ -0,0 +1,209 @@ +"""Minio helper methods.""" +import time +from collections.abc import Iterable +import json +import logging +import re +import threading +from queue import Queue +from typing import Iterator, List +from urllib.parse import unquote + +from minio import Minio +from urllib3.exceptions import HTTPError + +_LOGGER = logging.getLogger(__name__) + +_METADATA_RE = re.compile("x-amz-meta-(.*)", re.IGNORECASE) + + +def normalize_metadata(metadata: dict) -> dict: + """Normalize object metadata by stripping the prefix.""" + new_metadata = {} + for meta_key, meta_value in metadata.items(): + match = _METADATA_RE.match(meta_key) + if not match: + continue + + new_metadata[match.group(1).lower()] = meta_value + + return new_metadata + + +def create_minio_client( + endpoint: str, access_key: str, secret_key: str, secure: bool +) -> Minio: + """Create Minio client.""" + return Minio(endpoint, access_key, secret_key, secure) + + +def get_minio_notification_response( + minio_client, bucket_name: str, prefix: str, suffix: str, events: List[str] +): + """Start listening to minio events. Copied from minio-py.""" + query = {"prefix": prefix, "suffix": suffix, "events": events} + # pylint: disable=protected-access + return minio_client._url_open( + "GET", bucket_name=bucket_name, query=query, preload_content=False + ) + + +class MinioEventStreamIterator(Iterable): + """Iterator wrapper over notification http response stream.""" + + def __iter__(self) -> Iterator: + """Return self.""" + return self + + def __init__(self, response): + """Init.""" + self._response = response + self._stream = response.stream() + + def __next__(self): + """Get next not empty line.""" + while True: + line = next(self._stream) + if line.strip(): + event = json.loads(line.decode("utf-8")) + if event["Records"] is not None: + return event + + def close(self): + """Close the response.""" + self._response.close() + + +class MinioEventThread(threading.Thread): + """Thread wrapper around minio notification blocking stream.""" + + def __init__( + self, + queue: Queue, + endpoint: str, + access_key: str, + secret_key: str, + secure: bool, + bucket_name: str, + prefix: str, + suffix: str, + events: List[str], + ): + """Copy over all Minio client options.""" + super().__init__() + self._queue = queue + self._endpoint = endpoint + self._access_key = access_key + self._secret_key = secret_key + self._secure = secure + self._bucket_name = bucket_name + self._prefix = prefix + self._suffix = suffix + self._events = events + self._event_stream_it = None + self._should_stop = False + + def __enter__(self): + """Start the thread.""" + self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + """Stop and join the thread.""" + self.stop() + + def run(self): + """Create MinioClient and run the loop.""" + _LOGGER.info("Running MinioEventThread") + + self._should_stop = False + + minio_client = create_minio_client( + self._endpoint, self._access_key, self._secret_key, self._secure + ) + + while not self._should_stop: + _LOGGER.info("Connecting to minio event stream") + response = None + try: + response = get_minio_notification_response( + minio_client, + self._bucket_name, + self._prefix, + self._suffix, + self._events, + ) + + self._event_stream_it = MinioEventStreamIterator(response) + + self._iterate_event_stream(self._event_stream_it, minio_client) + except json.JSONDecodeError: + if response: + response.close() + except HTTPError as error: + _LOGGER.error("Failed to connect to Minio endpoint: %s", error) + + # Wait before attempting to connect again. + time.sleep(1) + except AttributeError: + # When response is closed, iterator will fail to access + # the underlying socket descriptor. + break + + def _iterate_event_stream(self, event_stream_it, minio_client): + for event in event_stream_it: + for event_name, bucket, key, metadata in iterate_objects(event): + presigned_url = "" + try: + presigned_url = minio_client.presigned_get_object(bucket, key) + # Fail gracefully. If for whatever reason this stops working, + # it shouldn't prevent it from firing events. + # pylint: disable=broad-except + except Exception as error: + _LOGGER.error("Failed to generate presigned url: %s", error) + + queue_entry = { + "event_name": event_name, + "bucket": bucket, + "key": key, + "presigned_url": presigned_url, + "metadata": metadata, + } + _LOGGER.debug("Queue entry, %s", queue_entry) + self._queue.put(queue_entry) + + def stop(self): + """Cancel event stream and join the thread.""" + _LOGGER.debug("Stopping event thread") + self._should_stop = True + if self._event_stream_it is not None: + self._event_stream_it.close() + self._event_stream_it = None + + _LOGGER.debug("Joining event thread") + self.join() + _LOGGER.debug("Event thread joined") + + +def iterate_objects(event): + """ + Iterate over file records of notification event. + + Most of the time it should still be only one record. + """ + records = event.get("Records", []) + + for record in records: + event_name = record.get("eventName") + bucket = record.get("s3", {}).get("bucket", {}).get("name") + key = record.get("s3", {}).get("object", {}).get("key") + metadata = normalize_metadata( + record.get("s3", {}).get("object", {}).get("userMetadata", {}) + ) + + if not bucket or not key: + _LOGGER.warning("Invalid bucket and/or key, %s, %s", bucket, key) + continue + + key = unquote(key) + + yield event_name, bucket, key, metadata diff --git a/homeassistant/components/minio/services.yaml b/homeassistant/components/minio/services.yaml new file mode 100644 index 00000000000..8fb8a267c3b --- /dev/null +++ b/homeassistant/components/minio/services.yaml @@ -0,0 +1,35 @@ +get: + description: Download file from Minio. + fields: + bucket: + description: Bucket to use. + example: camera-files + key: + description: Object key of the file. + example: front_camera/2018/01/02/snapshot_12512514.jpg + file_path: + description: File path on local filesystem. + example: /data/camera_files/snapshot.jpg + +put: + description: Upload file to Minio. + fields: + bucket: + description: Bucket to use. + example: camera-files + key: + description: Object key of the file. + example: front_camera/2018/01/02/snapshot_12512514.jpg + file_path: + description: File path on local filesystem. + example: /data/camera_files/snapshot.jpg + +remove: + description: Delete file from Minio. + fields: + bucket: + description: Bucket to use. + example: camera-files + key: + description: Object key of the file. + example: front_camera/2018/01/02/snapshot_12512514.jpg diff --git a/requirements_all.txt b/requirements_all.txt index 9c578e45b1c..6cd7f5d2822 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -797,6 +797,9 @@ miflora==0.4.0 # homeassistant.components.mill millheater==0.3.4 +# homeassistant.components.minio +minio==4.0.9 + # homeassistant.components.mitemp_bt mitemp_bt==0.0.1 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 7b757d8e1c1..e2829d2018c 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -213,6 +213,9 @@ mbddns==0.1.2 # homeassistant.components.mfi mficlient==0.3.0 +# homeassistant.components.minio +minio==4.0.9 + # homeassistant.components.discovery # homeassistant.components.ssdp netdisco==2.6.0 diff --git a/script/gen_requirements_all.py b/script/gen_requirements_all.py index bcf645034f5..6643fcf7aa9 100755 --- a/script/gen_requirements_all.py +++ b/script/gen_requirements_all.py @@ -99,6 +99,7 @@ TEST_REQUIREMENTS = ( "pyMetno", "mbddns", "mficlient", + "minio", "netdisco", "numpy", "oauth2client", diff --git a/tests/components/minio/__init__.py b/tests/components/minio/__init__.py new file mode 100644 index 00000000000..273de09788e --- /dev/null +++ b/tests/components/minio/__init__.py @@ -0,0 +1 @@ +"""Tests for the minio component.""" diff --git a/tests/components/minio/common.py b/tests/components/minio/common.py new file mode 100644 index 00000000000..4719fc79e49 --- /dev/null +++ b/tests/components/minio/common.py @@ -0,0 +1,48 @@ +"""Minio Test event.""" +TEST_EVENT = { + "Records": [ + { + "eventVersion": "2.0", + "eventSource": "minio:s3", + "awsRegion": "", + "eventTime": "2019-05-02T11:05:07Z", + "eventName": "s3:ObjectCreated:Put", + "userIdentity": {"principalId": "SO9KNO6YT9OGE39PQCZW"}, + "requestParameters": { + "accessKey": "SO9KNO6YT9OGE39PQCZW", + "region": "", + "sourceIPAddress": "172.27.0.1", + }, + "responseElements": { + "x-amz-request-id": "159AD8E6F6805783", + "x-minio-deployment-id": "90b265b8-bac5-413a-b12a-8915469fd769", + "x-minio-origin-endpoint": "http://172.27.0.2:9000", + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "Config", + "bucket": { + "name": "test", + "ownerIdentity": {"principalId": "SO9KNO6YT9OGE39PQCZW"}, + "arn": "arn:aws:s3:::test", + }, + "object": { + "key": "5jJkTAo.jpg", + "size": 108368, + "eTag": "1af324731637228cbbb0b2e8c07d4e50", + "contentType": "image/jpeg", + "userMetadata": {"content-type": "image/jpeg"}, + "versionId": "1", + "sequencer": "159AD8E6F76DD9C4", + }, + }, + "source": { + "host": "", + "port": "", + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) " + "Version/12.0.3 Safari/605.1.15", + }, + } + ] +} diff --git a/tests/components/minio/test_minio.py b/tests/components/minio/test_minio.py new file mode 100644 index 00000000000..836b456dc9b --- /dev/null +++ b/tests/components/minio/test_minio.py @@ -0,0 +1,190 @@ +"""Tests for Minio Hass related code.""" +import asyncio +import json +from unittest.mock import MagicMock + +import pytest +from asynctest import patch, call + +from homeassistant.components.minio import ( + QueueListener, + DOMAIN, + CONF_HOST, + CONF_PORT, + CONF_ACCESS_KEY, + CONF_SECRET_KEY, + CONF_SECURE, + CONF_LISTEN, + CONF_LISTEN_BUCKET, +) +from homeassistant.core import callback +from homeassistant.setup import async_setup_component + +from tests.components.minio.common import TEST_EVENT + + +@pytest.fixture(name="minio_client") +def minio_client_fixture(): + """Patch Minio client.""" + with patch("homeassistant.components.minio.minio_helper.Minio") as minio_mock: + minio_client_mock = minio_mock.return_value + + yield minio_client_mock + + +@pytest.fixture(name="minio_client_event") +def minio_client_event_fixture(): + """Patch helper function for minio notification stream.""" + with patch("homeassistant.components.minio.minio_helper.Minio") as minio_mock: + minio_client_mock = minio_mock.return_value + + response_mock = MagicMock() + stream_mock = MagicMock() + + stream_mock.__next__.side_effect = [ + "", + "", + bytearray(json.dumps(TEST_EVENT), "utf-8"), + ] + + response_mock.stream.return_value = stream_mock + minio_client_mock._url_open.return_value = response_mock + + yield minio_client_mock + + +async def test_minio_services(hass, caplog, minio_client): + """Test Minio services.""" + hass.config.whitelist_external_dirs = set("/tmp") + + await async_setup_component( + hass, + DOMAIN, + { + DOMAIN: { + CONF_HOST: "localhost", + CONF_PORT: "9000", + CONF_ACCESS_KEY: "abcdef", + CONF_SECRET_KEY: "0123456789", + CONF_SECURE: "true", + } + }, + ) + + await hass.async_start() + await hass.async_block_till_done() + + assert "Setup of domain minio took" in caplog.text + + # Call services + await hass.services.async_call( + DOMAIN, + "put", + {"file_path": "/tmp/some_file", "key": "some_key", "bucket": "some_bucket"}, + blocking=True, + ) + assert minio_client.fput_object.call_args == call( + "some_bucket", "some_key", "/tmp/some_file" + ) + minio_client.reset_mock() + + await hass.services.async_call( + DOMAIN, + "get", + {"file_path": "/tmp/some_file", "key": "some_key", "bucket": "some_bucket"}, + blocking=True, + ) + assert minio_client.fget_object.call_args == call( + "some_bucket", "some_key", "/tmp/some_file" + ) + minio_client.reset_mock() + + await hass.services.async_call( + DOMAIN, "remove", {"key": "some_key", "bucket": "some_bucket"}, blocking=True + ) + assert minio_client.remove_object.call_args == call("some_bucket", "some_key") + minio_client.reset_mock() + + +async def test_minio_listen(hass, caplog, minio_client_event): + """Test minio listen on notifications.""" + minio_client_event.presigned_get_object.return_value = "http://url" + + events = [] + + @callback + def event_callback(event): + """Handle event callbback.""" + events.append(event) + + hass.bus.async_listen("minio", event_callback) + + await async_setup_component( + hass, + DOMAIN, + { + DOMAIN: { + CONF_HOST: "localhost", + CONF_PORT: "9000", + CONF_ACCESS_KEY: "abcdef", + CONF_SECRET_KEY: "0123456789", + CONF_SECURE: "true", + CONF_LISTEN: [{CONF_LISTEN_BUCKET: "test"}], + } + }, + ) + + await hass.async_start() + await hass.async_block_till_done() + + assert "Setup of domain minio took" in caplog.text + + while not events: + await asyncio.sleep(0) + + assert 1 == len(events) + event = events[0] + + assert DOMAIN == event.event_type + assert "s3:ObjectCreated:Put" == event.data["event_name"] + assert "5jJkTAo.jpg" == event.data["file_name"] + assert "test" == event.data["bucket"] + assert "5jJkTAo.jpg" == event.data["key"] + assert "http://url" == event.data["presigned_url"] + assert 0 == len(event.data["metadata"]) + + +async def test_queue_listener(): + """Tests QueueListener firing events on Hass event bus.""" + hass = MagicMock() + + queue_listener = QueueListener(hass) + queue_listener.start() + + queue_entry = { + "event_name": "s3:ObjectCreated:Put", + "bucket": "some_bucket", + "key": "some_dir/some_file.jpg", + "presigned_url": "http://host/url?signature=secret", + "metadata": {}, + } + + queue_listener.queue.put(queue_entry) + + queue_listener.stop() + + call_domain, call_event = hass.bus.fire.call_args[0] + + expected_event = { + "event_name": "s3:ObjectCreated:Put", + "file_name": "some_file.jpg", + "bucket": "some_bucket", + "key": "some_dir/some_file.jpg", + "presigned_url": "http://host/url?signature=secret", + "metadata": {}, + } + + assert DOMAIN == call_domain + assert json.dumps(expected_event, sort_keys=True) == json.dumps( + call_event, sort_keys=True + )