"""Provides a UPNP discovery method that mimicks Hue hubs.""" import threading import socket import logging import os import select from aiohttp import web from homeassistant import core from homeassistant.components.http import HomeAssistantView _LOGGER = logging.getLogger(__name__) class DescriptionXmlView(HomeAssistantView): """Handles requests for the description.xml file.""" url = '/description.xml' name = 'description:xml' requires_auth = False def __init__(self, config): """Initialize the instance of the view.""" self.config = config @core.callback def get(self, request): """Handle a GET request.""" xml_template = """ 1 0 http://{0}:{1}/ urn:schemas-upnp-org:device:Basic:1 HASS Bridge ({0}) Royal Philips Electronics http://www.philips.com Philips hue Personal Wireless Lighting Philips hue bridge 2015 BSB002 http://www.meethue.com 1234 uuid:2f402f80-da50-11e1-9b23-001788255acc """ resp_text = xml_template.format( self.config.advertise_ip, self.config.advertise_port) return web.Response(text=resp_text, content_type='text/xml') class UPNPResponderThread(threading.Thread): """Handle responding to UPNP/SSDP discovery requests.""" _interrupted = False def __init__(self, host_ip_addr, listen_port, upnp_bind_multicast, advertise_ip, advertise_port): """Initialize the class.""" threading.Thread.__init__(self) self.host_ip_addr = host_ip_addr self.listen_port = listen_port self.upnp_bind_multicast = upnp_bind_multicast # Note that the double newline at the end of # this string is required per the SSDP spec resp_template = """HTTP/1.1 200 OK CACHE-CONTROL: max-age=60 EXT: LOCATION: http://{0}:{1}/description.xml SERVER: FreeRTOS/6.0.5, UPnP/1.0, IpBridge/0.1 hue-bridgeid: 1234 ST: urn:schemas-upnp-org:device:basic:1 USN: uuid:Socket-1_0-221438K0100073::urn:schemas-upnp-org:device:basic:1 """ self.upnp_response = resp_template.format( advertise_ip, advertise_port).replace("\n", "\r\n") \ .encode('utf-8') # Set up a pipe for signaling to the receiver that it's time to # shutdown. Essentially, we place the SSDP socket into nonblocking # mode and use select() to wait for data to arrive on either the SSDP # socket or the pipe. If data arrives on either one, select() returns # and tells us which filenos have data ready to read. # # When we want to stop the responder, we write data to the pipe, which # causes the select() to return and indicate that said pipe has data # ready to be read, which indicates to us that the responder needs to # be shutdown. self._interrupted_read_pipe, self._interrupted_write_pipe = os.pipe() def run(self): """Run the server.""" # Listen for UDP port 1900 packets sent to SSDP multicast address ssdp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ssdp_socket.setblocking(False) # Required for receiving multicast ssdp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ssdp_socket.setsockopt( socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(self.host_ip_addr)) ssdp_socket.setsockopt( socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton("239.255.255.250") + socket.inet_aton(self.host_ip_addr)) if self.upnp_bind_multicast: ssdp_socket.bind(("239.255.255.250", 1900)) else: ssdp_socket.bind((self.host_ip_addr, 1900)) while True: if self._interrupted: clean_socket_close(ssdp_socket) return try: read, _, _ = select.select( [self._interrupted_read_pipe, ssdp_socket], [], [ssdp_socket]) if self._interrupted_read_pipe in read: # Implies self._interrupted is True clean_socket_close(ssdp_socket) return elif ssdp_socket in read: data, addr = ssdp_socket.recvfrom(1024) else: continue except socket.error as ex: if self._interrupted: clean_socket_close(ssdp_socket) return _LOGGER.error("UPNP Responder socket exception occured: %s", ex.__str__) if "M-SEARCH" in data.decode('utf-8'): # SSDP M-SEARCH method received, respond to it with our info resp_socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM) resp_socket.sendto(self.upnp_response, addr) resp_socket.close() def stop(self): """Stop the server.""" # Request for server self._interrupted = True os.write(self._interrupted_write_pipe, bytes([0])) self.join() def clean_socket_close(sock): """Close a socket connection and logs its closure.""" _LOGGER.info("UPNP responder shutting down.") sock.close()