"""Support for Australian BOM (Bureau of Meteorology) weather service.""" import datetime import ftplib import gzip import io import json import logging import os import re import zipfile import requests import voluptuous as vol import homeassistant.helpers.config_validation as cv import homeassistant.util.dt as dt_util from homeassistant.components.sensor import PLATFORM_SCHEMA from homeassistant.const import ( CONF_MONITORED_CONDITIONS, TEMP_CELSIUS, CONF_NAME, ATTR_ATTRIBUTION, CONF_LATITUDE, CONF_LONGITUDE, ) from homeassistant.helpers.entity import Entity from homeassistant.util import Throttle _RESOURCE = "http://www.bom.gov.au/fwo/{}/{}.{}.json" _LOGGER = logging.getLogger(__name__) ATTR_LAST_UPDATE = "last_update" ATTR_SENSOR_ID = "sensor_id" ATTR_STATION_ID = "station_id" ATTR_STATION_NAME = "station_name" ATTR_ZONE_ID = "zone_id" ATTRIBUTION = "Data provided by the Australian Bureau of Meteorology" CONF_STATION = "station" CONF_ZONE_ID = "zone_id" CONF_WMO_ID = "wmo_id" MIN_TIME_BETWEEN_UPDATES = datetime.timedelta(seconds=60) SENSOR_TYPES = { "wmo": ["wmo", None], "name": ["Station Name", None], "history_product": ["Zone", None], "local_date_time": ["Local Time", None], "local_date_time_full": ["Local Time Full", None], "aifstime_utc": ["UTC Time Full", None], "lat": ["Lat", None], "lon": ["Long", None], "apparent_t": ["Feels Like C", TEMP_CELSIUS], "cloud": ["Cloud", None], "cloud_base_m": ["Cloud Base", None], "cloud_oktas": ["Cloud Oktas", None], "cloud_type_id": ["Cloud Type ID", None], "cloud_type": ["Cloud Type", None], "delta_t": ["Delta Temp C", TEMP_CELSIUS], "gust_kmh": ["Wind Gust kmh", "km/h"], "gust_kt": ["Wind Gust kt", "kt"], "air_temp": ["Air Temp C", TEMP_CELSIUS], "dewpt": ["Dew Point C", TEMP_CELSIUS], "press": ["Pressure mb", "mbar"], "press_qnh": ["Pressure qnh", "qnh"], "press_msl": ["Pressure msl", "msl"], "press_tend": ["Pressure Tend", None], "rain_trace": ["Rain Today", "mm"], "rel_hum": ["Relative Humidity", "%"], "sea_state": ["Sea State", None], "swell_dir_worded": ["Swell Direction", None], "swell_height": ["Swell Height", "m"], "swell_period": ["Swell Period", None], "vis_km": ["Visability km", "km"], "weather": ["Weather", None], "wind_dir": ["Wind Direction", None], "wind_spd_kmh": ["Wind Speed kmh", "km/h"], "wind_spd_kt": ["Wind Speed kt", "kt"], } def validate_station(station): """Check that the station ID is well-formed.""" if station is None: return station = station.replace(".shtml", "") if not re.fullmatch(r"ID[A-Z]\d\d\d\d\d\.\d\d\d\d\d", station): raise vol.error.Invalid("Malformed station ID") return station PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Inclusive(CONF_ZONE_ID, "Deprecated partial station ID"): cv.string, vol.Inclusive(CONF_WMO_ID, "Deprecated partial station ID"): cv.string, vol.Optional(CONF_NAME): cv.string, vol.Optional(CONF_STATION): validate_station, vol.Required(CONF_MONITORED_CONDITIONS, default=[]): vol.All( cv.ensure_list, [vol.In(SENSOR_TYPES)] ), } ) def setup_platform(hass, config, add_entities, discovery_info=None): """Set up the BOM sensor.""" station = config.get(CONF_STATION) zone_id, wmo_id = config.get(CONF_ZONE_ID), config.get(CONF_WMO_ID) if station is not None: if zone_id and wmo_id: _LOGGER.warning( "Using config %s, not %s and %s for BOM sensor", CONF_STATION, CONF_ZONE_ID, CONF_WMO_ID, ) elif zone_id and wmo_id: station = f"{zone_id}.{wmo_id}" else: station = closest_station( config.get(CONF_LATITUDE), config.get(CONF_LONGITUDE), hass.config.config_dir, ) if station is None: _LOGGER.error("Could not get BOM weather station from lat/lon") return bom_data = BOMCurrentData(station) try: bom_data.update() except ValueError as err: _LOGGER.error("Received error from BOM Current: %s", err) return add_entities( [ BOMCurrentSensor(bom_data, variable, config.get(CONF_NAME)) for variable in config[CONF_MONITORED_CONDITIONS] ] ) class BOMCurrentSensor(Entity): """Implementation of a BOM current sensor.""" def __init__(self, bom_data, condition, stationname): """Initialize the sensor.""" self.bom_data = bom_data self._condition = condition self.stationname = stationname @property def name(self): """Return the name of the sensor.""" if self.stationname is None: return "BOM {}".format(SENSOR_TYPES[self._condition][0]) return "BOM {} {}".format(self.stationname, SENSOR_TYPES[self._condition][0]) @property def state(self): """Return the state of the sensor.""" return self.bom_data.get_reading(self._condition) @property def device_state_attributes(self): """Return the state attributes of the device.""" attr = { ATTR_ATTRIBUTION: ATTRIBUTION, ATTR_LAST_UPDATE: self.bom_data.last_updated, ATTR_SENSOR_ID: self._condition, ATTR_STATION_ID: self.bom_data.latest_data["wmo"], ATTR_STATION_NAME: self.bom_data.latest_data["name"], ATTR_ZONE_ID: self.bom_data.latest_data["history_product"], } return attr @property def unit_of_measurement(self): """Return the units of measurement.""" return SENSOR_TYPES[self._condition][1] def update(self): """Update current conditions.""" self.bom_data.update() class BOMCurrentData: """Get data from BOM.""" def __init__(self, station_id): """Initialize the data object.""" self._zone_id, self._wmo_id = station_id.split(".") self._data = None self.last_updated = None def _build_url(self): """Build the URL for the requests.""" url = _RESOURCE.format(self._zone_id, self._zone_id, self._wmo_id) _LOGGER.debug("BOM URL: %s", url) return url @property def latest_data(self): """Return the latest data object.""" if self._data: return self._data[0] return None def get_reading(self, condition): """Return the value for the given condition. BOM weather publishes condition readings for weather (and a few other conditions) at intervals throughout the day. To avoid a `-` value in the frontend for these conditions, we traverse the historical data for the latest value that is not `-`. Iterators are used in this method to avoid iterating needlessly through the entire BOM provided dataset. """ condition_readings = (entry[condition] for entry in self._data) return next((x for x in condition_readings if x != "-"), None) def should_update(self): """Determine whether an update should occur. BOM provides updated data every 30 minutes. We manually define refreshing logic here rather than a throttle to keep updates in lock-step with BOM. If 35 minutes has passed since the last BOM data update, then an update should be done. """ if self.last_updated is None: # Never updated before, therefore an update should occur. return True now = dt_util.utcnow() update_due_at = self.last_updated + datetime.timedelta(minutes=35) return now > update_due_at @Throttle(MIN_TIME_BETWEEN_UPDATES) def update(self): """Get the latest data from BOM.""" if not self.should_update(): _LOGGER.debug( "BOM was updated %s minutes ago, skipping update as" " < 35 minutes, Now: %s, LastUpdate: %s", (dt_util.utcnow() - self.last_updated), dt_util.utcnow(), self.last_updated, ) return try: result = requests.get(self._build_url(), timeout=10).json() self._data = result["observations"]["data"] # set lastupdate using self._data[0] as the first element in the # array is the latest date in the json self.last_updated = dt_util.as_utc( datetime.datetime.strptime( str(self._data[0]["local_date_time_full"]), "%Y%m%d%H%M%S" ) ) return except ValueError as err: _LOGGER.error("Check BOM %s", err.args) self._data = None raise def _get_bom_stations(): """Return {CONF_STATION: (lat, lon)} for all stations, for auto-config. This function does several MB of internet requests, so please use the caching version to minimise latency and hit-count. """ latlon = {} with io.BytesIO() as file_obj: with ftplib.FTP("ftp.bom.gov.au") as ftp: ftp.login() ftp.cwd("anon2/home/ncc/metadata/sitelists") ftp.retrbinary("RETR stations.zip", file_obj.write) file_obj.seek(0) with zipfile.ZipFile(file_obj) as zipped: with zipped.open("stations.txt") as station_txt: for _ in range(4): station_txt.readline() # skip header while True: line = station_txt.readline().decode().strip() if len(line) < 120: break # end while loop, ignoring any footer text wmo, lat, lon = ( line[a:b].strip() for a, b in [(128, 134), (70, 78), (79, 88)] ) if wmo != "..": latlon[wmo] = (float(lat), float(lon)) zones = {} pattern = ( r'' ) for state in ("nsw", "vic", "qld", "wa", "tas", "nt"): url = "http://www.bom.gov.au/{0}/observations/{0}all.shtml".format(state) for zone_id, wmo_id in re.findall(pattern, requests.get(url).text): zones[wmo_id] = zone_id return {"{}.{}".format(zones[k], k): latlon[k] for k in set(latlon) & set(zones)} def bom_stations(cache_dir): """Return {CONF_STATION: (lat, lon)} for all stations, for auto-config. Results from internet requests are cached as compressed JSON, making subsequent calls very much faster. """ cache_file = os.path.join(cache_dir, ".bom-stations.json.gz") if not os.path.isfile(cache_file): stations = _get_bom_stations() with gzip.open(cache_file, "wt") as cache: json.dump(stations, cache, sort_keys=True) return stations with gzip.open(cache_file, "rt") as cache: return {k: tuple(v) for k, v in json.load(cache).items()} def closest_station(lat, lon, cache_dir): """Return the ZONE_ID.WMO_ID of the closest station to our lat/lon.""" if lat is None or lon is None or not os.path.isdir(cache_dir): return stations = bom_stations(cache_dir) def comparable_dist(wmo_id): """Create a psudeo-distance from latitude/longitude.""" station_lat, station_lon = stations[wmo_id] return (lat - station_lat) ** 2 + (lon - station_lon) ** 2 return min(stations, key=comparable_dist)