core/homeassistant/components/ps4/config_flow.py

205 lines
7.0 KiB
Python
Raw Normal View History

"""Config Flow for PlayStation 4."""
from collections import OrderedDict
from pyps4_2ndscreen.errors import CredentialTimeout
from pyps4_2ndscreen.helpers import Helper
from pyps4_2ndscreen.media_art import COUNTRIES
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
2019-07-31 19:25:30 +00:00
CONF_CODE,
CONF_HOST,
CONF_IP_ADDRESS,
CONF_NAME,
CONF_REGION,
CONF_TOKEN,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import location
from .const import (
CONFIG_ENTRY_VERSION,
COUNTRYCODE_NAMES,
DEFAULT_ALIAS,
DEFAULT_NAME,
DOMAIN,
)
2019-07-31 19:25:30 +00:00
CONF_MODE = "Config Mode"
CONF_AUTO = "Auto Discover"
CONF_MANUAL = "Manual Entry"
LOCAL_UDP_PORT = 1988
UDP_PORT = 987
TCP_PORT = 997
2019-07-31 19:25:30 +00:00
PORT_MSG = {UDP_PORT: "port_987_bind_error", TCP_PORT: "port_997_bind_error"}
PIN_LENGTH = 8
class PlayStation4FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a PlayStation 4 config flow."""
VERSION = CONFIG_ENTRY_VERSION
def __init__(self):
"""Initialize the config flow."""
self.helper = Helper()
self.creds = None
self.name = None
self.host = None
self.region = None
self.pin = None
self.m_device = None
self.location = None
self.device_list = []
async def async_step_user(self, user_input=None):
"""Handle a user config flow."""
# Check if able to bind to ports: UDP 987, TCP 997.
ports = PORT_MSG.keys()
2019-07-31 19:25:30 +00:00
failed = await self.hass.async_add_executor_job(self.helper.port_bind, ports)
if failed in ports:
reason = PORT_MSG[failed]
return self.async_abort(reason=reason)
return await self.async_step_creds()
async def async_step_creds(self, user_input=None):
"""Return PS4 credentials from 2nd Screen App."""
errors = {}
if user_input is not None:
try:
self.creds = await self.hass.async_add_executor_job(
2019-07-31 19:25:30 +00:00
self.helper.get_creds, DEFAULT_ALIAS
)
if self.creds is not None:
return await self.async_step_mode()
2019-07-31 19:25:30 +00:00
return self.async_abort(reason="credential_error")
except CredentialTimeout:
2019-07-31 19:25:30 +00:00
errors["base"] = "credential_timeout"
2019-07-31 19:25:30 +00:00
return self.async_show_form(step_id="creds", errors=errors)
async def async_step_mode(self, user_input=None):
"""Prompt for mode."""
errors = {}
mode = [CONF_AUTO, CONF_MANUAL]
if user_input is not None:
if user_input[CONF_MODE] == CONF_MANUAL:
try:
2021-10-22 09:34:45 +00:00
if device := user_input[CONF_IP_ADDRESS]:
self.m_device = device
except KeyError:
2019-07-31 19:25:30 +00:00
errors[CONF_IP_ADDRESS] = "no_ipaddress"
if not errors:
return await self.async_step_link()
mode_schema = OrderedDict()
2019-07-31 19:25:30 +00:00
mode_schema[vol.Required(CONF_MODE, default=CONF_AUTO)] = vol.In(list(mode))
mode_schema[vol.Optional(CONF_IP_ADDRESS)] = str
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="mode", data_schema=vol.Schema(mode_schema), errors=errors
)
async def async_step_link(self, user_input=None):
"""Prompt user input. Create or edit entry."""
regions = sorted(COUNTRIES.keys())
default_region = None
errors = {}
if user_input is None:
# Search for device.
# If LOCAL_UDP_PORT cannot be used, a random port will be selected.
devices = await self.hass.async_add_executor_job(
self.helper.has_devices, self.m_device, LOCAL_UDP_PORT
2019-07-31 19:25:30 +00:00
)
# Abort if can't find device.
if not devices:
2019-07-31 19:25:30 +00:00
return self.async_abort(reason="no_devices_found")
2019-07-31 19:25:30 +00:00
self.device_list = [device["host-ip"] for device in devices]
# Check that devices found aren't configured per account.
entries = self._async_current_entries()
if entries:
# Retrieve device data from all entries if creds match.
2019-07-31 19:25:30 +00:00
conf_devices = [
device
for entry in entries
if self.creds == entry.data[CONF_TOKEN]
for device in entry.data["devices"]
]
# Remove configured device from search list.
for c_device in conf_devices:
2019-07-31 19:25:30 +00:00
if c_device["host"] in self.device_list:
# Remove configured device from search list.
2019-07-31 19:25:30 +00:00
self.device_list.remove(c_device["host"])
# If list is empty then all devices are configured.
if not self.device_list:
return self.async_abort(reason="already_configured")
# Login to PS4 with user data.
if user_input is not None:
self.region = user_input[CONF_REGION]
self.name = user_input[CONF_NAME]
# Assume pin had leading zeros, before coercing to int.
self.pin = str(user_input[CONF_CODE]).zfill(PIN_LENGTH)
self.host = user_input[CONF_IP_ADDRESS]
is_ready, is_login = await self.hass.async_add_executor_job(
self.helper.link,
self.host,
self.creds,
self.pin,
DEFAULT_ALIAS,
LOCAL_UDP_PORT,
2019-07-31 19:25:30 +00:00
)
if is_ready is False:
errors["base"] = "cannot_connect"
elif is_login is False:
2019-07-31 19:25:30 +00:00
errors["base"] = "login_failed"
else:
device = {
CONF_HOST: self.host,
CONF_NAME: self.name,
2019-07-31 19:25:30 +00:00
CONF_REGION: self.region,
}
# Create entry.
return self.async_create_entry(
2019-07-31 19:25:30 +00:00
title="PlayStation 4",
data={CONF_TOKEN: self.creds, "devices": [device]},
)
# Try to find region automatically.
if not self.location:
self.location = await location.async_detect_location_info(
async_get_clientsession(self.hass)
)
if self.location:
country = COUNTRYCODE_NAMES.get(self.location.country_code)
if country in COUNTRIES:
default_region = country
# Show User Input form.
link_schema = OrderedDict()
2019-07-31 19:25:30 +00:00
link_schema[vol.Required(CONF_IP_ADDRESS)] = vol.In(list(self.device_list))
link_schema[vol.Required(CONF_REGION, default=default_region)] = vol.In(
list(regions)
)
link_schema[vol.Required(CONF_CODE)] = vol.All(
vol.Strip, vol.Length(max=PIN_LENGTH), vol.Coerce(int)
2019-07-31 19:25:30 +00:00
)
link_schema[vol.Required(CONF_NAME, default=DEFAULT_NAME)] = str
return self.async_show_form(
2019-07-31 19:25:30 +00:00
step_id="link", data_schema=vol.Schema(link_schema), errors=errors
)