Improve voluptuous and login errors for Asus device tracker (#3170)

pull/3176/head
Johann Kellerman 2016-09-04 01:32:43 +02:00 committed by Teagan Glenn
parent fe7f797ad9
commit 91a3522100
3 changed files with 31 additions and 16 deletions

View File

@ -26,19 +26,20 @@ CONF_PROTOCOL = 'protocol'
CONF_MODE = 'mode'
CONF_SSH_KEY = 'ssh_key'
CONF_PUB_KEY = 'pub_key'
SECRET_GROUP = 'Password or SSH Key'
PLATFORM_SCHEMA = vol.All(
cv.has_at_least_one_key(CONF_PASSWORD, CONF_PUB_KEY, CONF_SSH_KEY),
PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD): cv.string,
vol.Optional(CONF_PROTOCOL, default='ssh'):
vol.In(['ssh', 'telnet']),
vol.Optional(CONF_MODE, default='router'):
vol.In(['router', 'ap']),
vol.Optional(CONF_SSH_KEY): cv.isfile,
vol.Optional(CONF_PUB_KEY): cv.isfile
vol.Exclusive(CONF_PASSWORD, SECRET_GROUP): cv.string,
vol.Exclusive(CONF_SSH_KEY, SECRET_GROUP): cv.isfile,
vol.Exclusive(CONF_PUB_KEY, SECRET_GROUP): cv.isfile
}))
@ -101,6 +102,21 @@ class AsusWrtDeviceScanner(object):
self.protocol = config[CONF_PROTOCOL]
self.mode = config[CONF_MODE]
if self.protocol == 'ssh':
if self.ssh_key:
self.ssh_secret = {'ssh_key': self.ssh_key}
elif self.password:
self.ssh_secret = {'password': self.password}
else:
_LOGGER.error('No password or private key specified')
self.success_init = False
return
else:
if not self.password:
_LOGGER.error('No password specified')
self.success_init = False
return
self.lock = threading.Lock()
self.last_results = {}
@ -149,15 +165,17 @@ class AsusWrtDeviceScanner(object):
"""Retrieve data from ASUSWRT via the ssh protocol."""
from pexpect import pxssh, exceptions
ssh = pxssh.pxssh()
try:
ssh.login(self.host, self.username, **self.ssh_secret)
except exceptions.EOF as err:
_LOGGER.error('Connection refused. Is SSH enabled?')
return None
except pxssh.ExceptionPxssh as err:
_LOGGER.error('Unable to connect via SSH: %s', str(err))
return None
try:
ssh = pxssh.pxssh()
if self.ssh_key:
ssh.login(self.host, self.username, ssh_key=self.ssh_key)
elif self.password:
ssh.login(self.host, self.username, self.password)
else:
_LOGGER.error('No password or private key specified')
return None
ssh.sendline(_IP_NEIGH_CMD)
ssh.prompt()
neighbors = ssh.before.split(b'\n')[1:-1]
@ -178,9 +196,6 @@ class AsusWrtDeviceScanner(object):
except pxssh.ExceptionPxssh as exc:
_LOGGER.error('Unexpected response from router: %s', exc)
return None
except exceptions.EOF:
_LOGGER.error('Connection refused or no route to host')
return None
def telnet_connection(self):
"""Retrieve data from ASUSWRT via the telnet protocol."""

View File

@ -70,7 +70,7 @@ def isfile(value: Any) -> str:
"""Validate that the value is an existing file."""
if value is None:
raise vol.Invalid('None is not file')
file_in = str(value)
file_in = os.path.expanduser(str(value))
if not os.path.isfile(file_in):
raise vol.Invalid('not a file')

View File

@ -138,7 +138,7 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
asuswrt.ssh_connection()
ssh.login.assert_called_once_with('fake_host', 'fake_user',
'fake_pass')
password='fake_pass')
def test_ssh_login_without_password_or_pubkey(self): \
# pylint: disable=invalid-name