Merge pull request #1036 from balloob/owntracks_beacon2

Improve owntracks events, add support for stationary ibeacons.
pull/1047/head
Greg Dowling 2016-01-29 09:47:20 +00:00
commit dd7a7f4c75
2 changed files with 353 additions and 41 deletions

View File

@ -8,16 +8,23 @@ https://home-assistant.io/components/device_tracker.owntracks/
"""
import json
import logging
import threading
from collections import defaultdict
import homeassistant.components.mqtt as mqtt
from homeassistant.const import (STATE_HOME, STATE_NOT_HOME)
from homeassistant.const import STATE_HOME
DEPENDENCIES = ['mqtt']
CONF_TRANSITION_EVENTS = 'use_events'
REGIONS_ENTERED = defaultdict(list)
LOCATION_TOPIC = 'owntracks/+/+'
EVENT_TOPIC = 'owntracks/+/+/event'
_LOGGER = logging.getLogger(__name__)
LOCK = threading.Lock()
def setup_scanner(hass, config, see):
""" Set up an OwnTracks tracker. """
@ -31,27 +38,27 @@ def setup_scanner(hass, config, see):
data = json.loads(payload)
except ValueError:
# If invalid JSON
logging.getLogger(__name__).error(
_LOGGER.error(
'Unable to parse payload as JSON: %s', payload)
return
if not isinstance(data, dict) or data.get('_type') != 'location':
return
parts = topic.split('/')
kwargs = {
'dev_id': '{}_{}'.format(parts[1], parts[2]),
'host_name': parts[1],
'gps': (data['lat'], data['lon']),
}
if 'acc' in data:
kwargs['gps_accuracy'] = data['acc']
if 'batt' in data:
kwargs['battery'] = data['batt']
dev_id, kwargs = _parse_see_args(topic, data)
# Block updates if we're in a region
with LOCK:
if REGIONS_ENTERED[dev_id]:
_LOGGER.debug(
"location update ignored - inside region %s",
REGIONS_ENTERED[-1])
return
see(**kwargs)
def owntracks_event_update(topic, payload, qos):
# pylint: disable=too-many-branches
""" MQTT event (geofences) received. """
# Docs on available data:
@ -60,47 +67,91 @@ def setup_scanner(hass, config, see):
data = json.loads(payload)
except ValueError:
# If invalid JSON
logging.getLogger(__name__).error(
_LOGGER.error(
'Unable to parse payload as JSON: %s', payload)
return
if not isinstance(data, dict) or data.get('_type') != 'transition':
return
# check if in "home" fence or other zone
location = ''
if data['event'] == 'enter':
if data['desc'].lower() == 'home':
# OwnTracks uses - at the start of a beacon zone
# to switch on 'hold mode' - ignore this
location = data['desc'].lstrip("-")
if location.lower() == 'home':
location = STATE_HOME
else:
location = data['desc']
dev_id, kwargs = _parse_see_args(topic, data)
if data['event'] == 'enter':
zone = hass.states.get("zone.{}".format(location))
with LOCK:
if zone is not None:
kwargs['location_name'] = location
regions = REGIONS_ENTERED[dev_id]
if location not in regions:
regions.append(location)
_LOGGER.info("Enter region %s", location)
_set_gps_from_zone(kwargs, zone)
see(**kwargs)
elif data['event'] == 'leave':
location = STATE_NOT_HOME
regions = REGIONS_ENTERED[dev_id]
if location in regions:
regions.remove(location)
new_region = regions[-1] if regions else None
if new_region:
# Exit to previous region
zone = hass.states.get("zone.{}".format(new_region))
kwargs['location_name'] = new_region
_set_gps_from_zone(kwargs, zone)
_LOGGER.info("Exit from %s to %s", location, new_region)
else:
logging.getLogger(__name__).error(
_LOGGER.info("Exit from %s to GPS", location)
see(**kwargs)
else:
_LOGGER.error(
'Misformatted mqtt msgs, _type=transition, event=%s',
data['event'])
return
mqtt.subscribe(hass, LOCATION_TOPIC, owntracks_location_update, 1)
mqtt.subscribe(hass, EVENT_TOPIC, owntracks_event_update, 1)
return True
def _parse_see_args(topic, data):
""" Parse the OwnTracks location parameters,
into the format see expects. """
parts = topic.split('/')
dev_id = '{}_{}'.format(parts[1], parts[2])
host_name = parts[1]
kwargs = {
'dev_id': '{}_{}'.format(parts[1], parts[2]),
'host_name': parts[1],
'gps': (data['lat'], data['lon']),
'location_name': location,
'dev_id': dev_id,
'host_name': host_name,
'gps': (data['lat'], data['lon'])
}
if 'acc' in data:
kwargs['gps_accuracy'] = data['acc']
if 'batt' in data:
kwargs['battery'] = data['batt']
return dev_id, kwargs
see(**kwargs)
use_events = config.get(CONF_TRANSITION_EVENTS)
def _set_gps_from_zone(kwargs, zone):
""" Set the see parameters from the zone parameters """
if use_events:
mqtt.subscribe(hass, EVENT_TOPIC, owntracks_event_update, 1)
else:
mqtt.subscribe(hass, LOCATION_TOPIC, owntracks_location_update, 1)
return True
if zone is not None:
kwargs['gps'] = (
zone.attributes['latitude'],
zone.attributes['longitude'])
kwargs['gps_accuracy'] = zone.attributes['radius']
return kwargs

View File

@ -0,0 +1,261 @@
"""
tests.components.sensor.template
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests template sensor.
"""
import json
import os
import unittest
from collections import defaultdict
from homeassistant.components import device_tracker
from homeassistant.const import (STATE_NOT_HOME, CONF_PLATFORM)
import homeassistant.components.device_tracker.owntracks as owntracks
from tests.common import (
get_test_home_assistant, mock_mqtt_component, fire_mqtt_message)
USER = 'greg'
DEVICE = 'phone'
LOCATION_TOPIC = "owntracks/{}/{}".format(USER, DEVICE)
EVENT_TOPIC = "owntracks/{}/{}/event".format(USER, DEVICE)
DEVICE_TRACKER_STATE = "device_tracker.{}_{}".format(USER, DEVICE)
LOCATION_MESSAGE = {
'batt': 92,
'cog': 248,
'tid': 'user',
'lon': 1.0,
't': 'u',
'alt': 27,
'acc': 60,
'p': 101.3977584838867,
'vac': 4,
'lat': 2.0,
'_type': 'location',
'tst': 1,
'vel': 0}
REGION_ENTER_MESSAGE = {
'lon': 1.0,
'event': 'enter',
'tid': 'user',
'desc': 'inner',
'wtst': 1,
't': 'b',
'acc': 60,
'tst': 2,
'lat': 2.0,
'_type': 'transition'}
REGION_LEAVE_MESSAGE = {
'lon': 1.0,
'event': 'leave',
'tid': 'user',
'desc': 'inner',
'wtst': 1,
't': 'b',
'acc': 60,
'tst': 2,
'lat': 2.0,
'_type': 'transition'}
class TestDeviceTrackerOwnTracks(unittest.TestCase):
""" Test the Template sensor. """
def setup_method(self, method):
""" Init needed objects. """
self.hass = get_test_home_assistant()
mock_mqtt_component(self.hass)
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'owntracks'
}}))
self.hass.states.set(
'zone.inner', 'zoning',
{
'name': 'zone',
'latitude': 2.1,
'longitude': 1.1,
'radius': 10
})
self.hass.states.set(
'zone.inner_2', 'zoning',
{
'name': 'zone',
'latitude': 2.1,
'longitude': 1.1,
'radius': 10
})
self.hass.states.set(
'zone.outer', 'zoning',
{
'name': 'zone',
'latitude': 2.0,
'longitude': 1.0,
'radius': 100000
})
# Clear state between teste
self.hass.states.set(DEVICE_TRACKER_STATE, None)
owntracks.REGIONS_ENTERED = defaultdict(list)
def teardown_method(self, method):
""" Stop down stuff we started. """
self.hass.stop()
try:
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES))
except FileNotFoundError:
pass
def send_message(self, topic, message):
fire_mqtt_message(
self.hass, topic, json.dumps(message))
self.hass.pool.block_till_done()
def assert_location_state(self, location):
state = self.hass.states.get(DEVICE_TRACKER_STATE)
self.assertEqual(state.state, location)
def assert_location_latitude(self, latitude):
state = self.hass.states.get(DEVICE_TRACKER_STATE)
self.assertEqual(state.attributes.get('latitude'), latitude)
def assert_location_accuracy(self, accuracy):
state = self.hass.states.get(DEVICE_TRACKER_STATE)
self.assertEqual(state.attributes.get('gps_accuracy'), accuracy)
def test_location_update(self):
self.send_message(LOCATION_TOPIC, LOCATION_MESSAGE)
self.assert_location_latitude(2.0)
self.assert_location_accuracy(60.0)
self.assert_location_state('outer')
def test_event_entry_exit(self):
self.send_message(EVENT_TOPIC, REGION_ENTER_MESSAGE)
# Enter uses the zone's gps co-ords
self.assert_location_latitude(2.1)
self.assert_location_accuracy(10.0)
self.assert_location_state('inner')
self.send_message(LOCATION_TOPIC, LOCATION_MESSAGE)
# Updates ignored when in a zone
self.assert_location_latitude(2.1)
self.assert_location_accuracy(10.0)
self.assert_location_state('inner')
self.send_message(EVENT_TOPIC, REGION_LEAVE_MESSAGE)
# Exit switches back to GPS
self.assert_location_latitude(2.0)
self.assert_location_accuracy(60.0)
self.assert_location_state('outer')
# Left clean zone state
self.assertFalse(owntracks.REGIONS_ENTERED[USER])
def test_event_exit_outside_zone_sets_away(self):
self.send_message(EVENT_TOPIC, REGION_ENTER_MESSAGE)
self.assert_location_state('inner')
# Exit message far away GPS location
message = REGION_LEAVE_MESSAGE.copy()
message['lon'] = 90.1
message['lat'] = 90.1
self.send_message(EVENT_TOPIC, message)
# Exit forces zone change to away
self.assert_location_state(STATE_NOT_HOME)
def test_event_entry_exit_right_order(self):
# Enter inner zone
self.send_message(EVENT_TOPIC, REGION_ENTER_MESSAGE)
self.assert_location_state('inner')
self.assert_location_latitude(2.1)
self.assert_location_accuracy(10.0)
# Enter inner2 zone
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = "inner_2"
self.send_message(EVENT_TOPIC, message)
self.assert_location_state('inner_2')
self.assert_location_latitude(2.1)
self.assert_location_accuracy(10.0)
# Exit inner_2 - should be in 'inner'
message = REGION_LEAVE_MESSAGE.copy()
message['desc'] = "inner_2"
self.send_message(EVENT_TOPIC, message)
self.assert_location_state('inner')
self.assert_location_latitude(2.1)
self.assert_location_accuracy(10.0)
# Exit inner - should be in 'outer'
self.send_message(EVENT_TOPIC, REGION_LEAVE_MESSAGE)
self.assert_location_state('outer')
self.assert_location_latitude(2.0)
self.assert_location_accuracy(60.0)
def test_event_entry_exit_wrong_order(self):
# Enter inner zone
self.send_message(EVENT_TOPIC, REGION_ENTER_MESSAGE)
self.assert_location_state('inner')
# Enter inner2 zone
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = "inner_2"
self.send_message(EVENT_TOPIC, message)
self.assert_location_state('inner_2')
# Exit inner - should still be in 'inner_2'
self.send_message(EVENT_TOPIC, REGION_LEAVE_MESSAGE)
self.assert_location_state('inner_2')
# Exit inner_2 - should be in 'outer'
message = REGION_LEAVE_MESSAGE.copy()
message['desc'] = "inner_2"
self.send_message(EVENT_TOPIC, message)
self.assert_location_state('outer')
def test_event_entry_unknown_zone(self):
# Just treat as location update
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = "unknown"
self.send_message(EVENT_TOPIC, message)
self.assert_location_latitude(2.0)
self.assert_location_state('outer')
def test_event_exit_unknown_zone(self):
# Just treat as location update
message = REGION_LEAVE_MESSAGE.copy()
message['desc'] = "unknown"
self.send_message(EVENT_TOPIC, message)
self.assert_location_latitude(2.0)
self.assert_location_state('outer')
def test_event_entry_zone_loading_dash(self):
# Make sure the leading - is ignored
# Ownracks uses this to switch on hold
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = "-inner"
self.send_message(EVENT_TOPIC, REGION_ENTER_MESSAGE)
self.assert_location_state('inner')