core/homeassistant/util/__init__.py

413 lines
12 KiB
Python
Raw Normal View History

"""
homeassistant.util
~~~~~~~~~~~~~~~~~~
Helper methods for various modules.
"""
2014-11-28 23:34:42 +00:00
import collections
from itertools import chain
import threading
import queue
from datetime import datetime
2013-10-07 07:15:47 +00:00
import re
2014-04-15 06:48:00 +00:00
import enum
import socket
import random
import string
from functools import wraps
2016-02-10 07:27:01 +00:00
from types import MappingProxyType
2013-10-07 07:15:47 +00:00
2015-08-04 20:21:09 +00:00
from .dt import datetime_to_local_str, utcnow
2014-03-16 00:57:16 +00:00
RE_SANITIZE_FILENAME = re.compile(r'(~|\.\.|/|\\)')
2014-10-22 06:52:24 +00:00
RE_SANITIZE_PATH = re.compile(r'(~|\.(\.)+)')
2015-09-16 06:35:28 +00:00
RE_SLUGIFY = re.compile(r'[^a-z0-9_]+')
2013-11-11 00:46:48 +00:00
2013-10-07 07:15:47 +00:00
def sanitize_filename(filename):
""" Sanitizes a filename by removing .. / and \\. """
return RE_SANITIZE_FILENAME.sub("", filename)
2014-10-22 06:52:24 +00:00
def sanitize_path(path):
2014-11-23 07:31:52 +00:00
""" Sanitizes a path by removing ~ and .. """
2014-10-22 06:52:24 +00:00
return RE_SANITIZE_PATH.sub("", path)
def slugify(text):
""" Slugifies a given text. """
2015-09-16 06:35:28 +00:00
text = text.lower().replace(" ", "_")
return RE_SLUGIFY.sub("", text)
def repr_helper(inp):
""" Helps creating a more readable string representation of objects. """
2016-02-10 07:27:01 +00:00
if isinstance(inp, (dict, MappingProxyType)):
return ", ".join(
repr_helper(key)+"="+repr_helper(item) for key, item
2014-03-12 05:49:54 +00:00
in inp.items())
elif isinstance(inp, datetime):
return datetime_to_local_str(inp)
else:
return str(inp)
def convert(value, to_type, default=None):
""" Converts value to to_type, returns default if fails. """
try:
2014-03-26 07:08:50 +00:00
return default if value is None else to_type(value)
except (ValueError, TypeError):
# If value could not be converted
return default
def ensure_unique_string(preferred_string, current_strings):
""" Returns a string that is not present in current_strings.
If preferred string exists will append _2, _3, .. """
test_string = preferred_string
2015-09-10 06:37:15 +00:00
current_strings = set(current_strings)
tries = 1
while test_string in current_strings:
tries += 1
test_string = "{}_{}".format(preferred_string, tries)
return test_string
# Taken from: http://stackoverflow.com/a/11735897
def get_local_ip():
""" Tries to determine the local IP address of the machine. """
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Use Google Public DNS server to determine own IP
sock.connect(('8.8.8.8', 80))
2015-08-03 15:05:33 +00:00
return sock.getsockname()[0]
except socket.error:
return socket.gethostbyname(socket.gethostname())
2015-08-03 15:05:33 +00:00
finally:
sock.close()
# Taken from http://stackoverflow.com/a/23728630
def get_random_string(length=10):
""" Returns a random string with letters and digits. """
generator = random.SystemRandom()
source_chars = string.ascii_letters + string.digits
return ''.join(generator.choice(source_chars) for _ in range(length))
2014-04-15 06:48:00 +00:00
class OrderedEnum(enum.Enum):
""" Taken from Python 3.4.0 docs. """
2014-11-23 20:57:29 +00:00
# pylint: disable=no-init, too-few-public-methods
2014-04-15 06:48:00 +00:00
def __ge__(self, other):
if self.__class__ is other.__class__:
return self.value >= other.value
return NotImplemented
def __gt__(self, other):
if self.__class__ is other.__class__:
return self.value > other.value
return NotImplemented
def __le__(self, other):
if self.__class__ is other.__class__:
return self.value <= other.value
return NotImplemented
def __lt__(self, other):
if self.__class__ is other.__class__:
return self.value < other.value
return NotImplemented
2014-11-28 23:34:42 +00:00
class OrderedSet(collections.MutableSet):
""" Ordered set taken from http://code.activestate.com/recipes/576694/ """
def __init__(self, iterable=None):
self.end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.map = {} # key --> [key, prev, next]
if iterable is not None:
self |= iterable
def __len__(self):
return len(self.map)
def __contains__(self, key):
return key in self.map
def add(self, key):
""" Add an element to the end of the set. """
2014-11-28 23:34:42 +00:00
if key not in self.map:
end = self.end
curr = end[1]
curr[2] = end[1] = self.map[key] = [key, curr, end]
def promote(self, key):
""" Promote element to beginning of the set, add if not there. """
if key in self.map:
self.discard(key)
begin = self.end[2]
curr = begin[1]
curr[2] = begin[1] = self.map[key] = [key, curr, begin]
2014-11-28 23:34:42 +00:00
def discard(self, key):
""" Discard an element from the set. """
if key in self.map:
key, prev_item, next_item = self.map.pop(key)
prev_item[2] = next_item
next_item[1] = prev_item
def __iter__(self):
end = self.end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def pop(self, last=True): # pylint: disable=arguments-differ
2016-01-26 23:08:06 +00:00
"""
Pops element of the end of the set.
Set last=False to pop from the beginning.
"""
2014-11-28 23:34:42 +00:00
if not self:
raise KeyError('set is empty')
key = self.end[1][0] if last else self.end[2][0]
self.discard(key)
return key
def update(self, *args):
""" Add elements from args to the set. """
for item in chain(*args):
self.add(item)
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, list(self))
def __eq__(self, other):
if isinstance(other, OrderedSet):
return len(self) == len(other) and list(self) == list(other)
return set(self) == set(other)
class Throttle(object):
"""
A method decorator to add a cooldown to a method to prevent it from being
called more then 1 time within the timedelta interval `min_time` after it
returned its result.
Calling a method a second time during the interval will return None.
Pass keyword argument `no_throttle=True` to the wrapped method to make
the call not throttled.
Decorator takes in an optional second timedelta interval to throttle the
'no_throttle' calls.
Adds a datetime attribute `last_call` to the method.
"""
# pylint: disable=too-few-public-methods
def __init__(self, min_time, limit_no_throttle=None):
self.min_time = min_time
self.limit_no_throttle = limit_no_throttle
def __call__(self, method):
if self.limit_no_throttle is not None:
method = Throttle(self.limit_no_throttle)(method)
2015-10-11 17:42:42 +00:00
# Different methods that can be passed in:
# - a function
# - an unbound function on a class
# - a method (bound function on a class)
# We want to be able to differentiate between function and unbound
# methods (which are considered functions).
2015-10-09 06:49:55 +00:00
# All methods have the classname in their qualname seperated by a '.'
# Functions have a '.' in their qualname if defined inline, but will
# be prefixed by '.<locals>.' so we strip that out.
2015-10-11 17:42:42 +00:00
is_func = (not hasattr(method, '__self__') and
'.' not in method.__qualname__.split('.<locals>.')[-1])
2015-10-09 06:49:55 +00:00
@wraps(method)
def wrapper(*args, **kwargs):
"""
Wrapper that allows wrapped to be called only once per min_time.
If we cannot acquire the lock, it is running so return None.
"""
2015-10-11 18:04:16 +00:00
# pylint: disable=protected-access
2015-10-11 17:42:42 +00:00
if hasattr(method, '__self__'):
host = method.__self__
elif is_func:
host = wrapper
else:
host = args[0] if args else wrapper
2015-10-09 06:49:55 +00:00
if not hasattr(host, '_throttle_lock'):
host._throttle_lock = threading.Lock()
if not host._throttle_lock.acquire(False):
2015-09-13 05:56:49 +00:00
return None
2015-10-09 06:49:55 +00:00
last_call = getattr(host, '_throttle_last_call', None)
# Check if method is never called or no_throttle is given
force = not last_call or kwargs.pop('no_throttle', False)
2015-09-13 05:56:49 +00:00
2015-10-09 06:49:55 +00:00
try:
2015-09-13 05:56:49 +00:00
if force or utcnow() - last_call > self.min_time:
result = method(*args, **kwargs)
2015-10-09 06:49:55 +00:00
host._throttle_last_call = utcnow()
2015-09-13 05:56:49 +00:00
return result
else:
return None
finally:
2015-10-09 06:49:55 +00:00
host._throttle_lock.release()
return wrapper
class ThreadPool(object):
2016-02-15 07:01:49 +00:00
"""A priority queue-based thread pool."""
2014-11-23 20:57:29 +00:00
# pylint: disable=too-many-instance-attributes
def __init__(self, job_handler, worker_count=0, busy_callback=None):
"""
job_handler: method to be called from worker thread to handle job
worker_count: number of threads to run that handle jobs
busy_callback: method to be called when queue gets too big.
Parameters: worker_count, list of current_jobs,
pending_jobs_count
"""
self._job_handler = job_handler
self._busy_callback = busy_callback
self.worker_count = 0
self.busy_warning_limit = 0
self._work_queue = queue.PriorityQueue()
self.current_jobs = []
2014-11-23 17:51:16 +00:00
self._lock = threading.RLock()
self._quit_task = object()
self.running = True
for _ in range(worker_count):
self.add_worker()
def add_worker(self):
2016-02-15 07:01:49 +00:00
"""Add worker to the thread pool and reset warning limit."""
with self._lock:
if not self.running:
raise RuntimeError("ThreadPool not running")
worker = threading.Thread(target=self._worker)
worker.daemon = True
worker.start()
self.worker_count += 1
self.busy_warning_limit = self.worker_count * 3
def remove_worker(self):
2016-02-15 07:01:49 +00:00
"""Remove worker from the thread pool and reset warning limit."""
with self._lock:
if not self.running:
raise RuntimeError("ThreadPool not running")
self._work_queue.put(PriorityQueueItem(0, self._quit_task))
self.worker_count -= 1
self.busy_warning_limit = self.worker_count * 3
2014-11-23 17:51:16 +00:00
def add_job(self, priority, job):
""" Add a job to the queue. """
2014-11-23 17:51:16 +00:00
with self._lock:
if not self.running:
raise RuntimeError("ThreadPool not running")
2014-11-23 17:51:16 +00:00
self._work_queue.put(PriorityQueueItem(priority, job))
2014-11-23 17:51:16 +00:00
# check if our queue is getting too big
if self._work_queue.qsize() > self.busy_warning_limit \
and self._busy_callback is not None:
2014-11-23 17:51:16 +00:00
# Increase limit we will issue next warning
self.busy_warning_limit *= 2
self._busy_callback(
self.worker_count, self.current_jobs,
self._work_queue.qsize())
2014-11-23 05:40:01 +00:00
def block_till_done(self):
2016-02-15 07:01:49 +00:00
"""Block till current work is done."""
self._work_queue.join()
2016-02-15 07:01:49 +00:00
# import traceback
# traceback.print_stack()
2014-11-23 17:51:16 +00:00
def stop(self):
2016-02-15 07:01:49 +00:00
"""Finish all the jobs and stops all the threads."""
self.block_till_done()
2014-11-23 17:51:16 +00:00
with self._lock:
2014-11-23 20:57:29 +00:00
if not self.running:
return
2014-11-23 17:51:16 +00:00
# Tell the workers to quit
2014-11-23 20:57:29 +00:00
for _ in range(self.worker_count):
self.remove_worker()
2014-11-23 17:51:16 +00:00
self.running = False
# Wait till all workers have quit
2014-11-23 17:51:16 +00:00
self.block_till_done()
2014-11-23 05:40:01 +00:00
def _worker(self):
2016-02-15 07:01:49 +00:00
"""Handle jobs for the thread pool."""
while True:
# Get new item from work_queue
job = self._work_queue.get().item
if job == self._quit_task:
self._work_queue.task_done()
return
# Add to current running jobs
job_log = (utcnow(), job)
self.current_jobs.append(job_log)
# Do the job
self._job_handler(job)
# Remove from current running job
self.current_jobs.remove(job_log)
# Tell work_queue the task is done
self._work_queue.task_done()
class PriorityQueueItem(object):
""" Holds a priority and a value. Used within PriorityQueue. """
# pylint: disable=too-few-public-methods
def __init__(self, priority, item):
self.priority = priority
self.item = item
def __lt__(self, other):
return self.priority < other.priority