mycroft-core/mycroft/skills/event_scheduler.py

457 lines
17 KiB
Python

# Copyright 2017 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Event scheduler system for calling skill (and other) methods at a specific
times.
"""
import json
import shutil
import time
from datetime import datetime, timedelta
from threading import Thread, Lock
from os.path import isfile, join, expanduser
import xdg.BaseDirectory
from mycroft.configuration import Configuration
from mycroft.messagebus.message import Message
from mycroft.util.log import LOG
from .mycroft_skill.event_container import EventContainer, create_basic_wrapper
def repeat_time(sched_time, repeat):
"""Next scheduled time for repeating event. Guarantees that the
time is not in the past (but could skip interim events)
Args:
sched_time (float): Scheduled unix time for the event
repeat (float): Repeat period in seconds
Returns: (float) time for next event
"""
next_time = sched_time + repeat
while next_time < time.time():
# Schedule at an offset to assure no doubles
next_time = time.time() + abs(repeat)
return next_time
class EventScheduler(Thread):
"""Create an event scheduler thread. Will send messages at a
predetermined time to the registered targets.
Args:
bus: Mycroft messagebus (mycroft.messagebus)
schedule_file: File to store pending events to on shutdown
"""
def __init__(self, bus, schedule_file='schedule.json'):
super().__init__()
self.events = {}
self.event_lock = Lock()
self.bus = bus
self.is_running = True
old_schedule_path = join(expanduser(Configuration.get()['data_dir']),
schedule_file)
new_schedule_path = join(
xdg.BaseDirectory.load_first_config('mycroft'), schedule_file
)
if isfile(old_schedule_path):
shutil.move(old_schedule_path, new_schedule_path)
self.schedule_file = new_schedule_path
if self.schedule_file:
self.load()
self.bus.on('mycroft.scheduler.schedule_event',
self.schedule_event_handler)
self.bus.on('mycroft.scheduler.remove_event',
self.remove_event_handler)
self.bus.on('mycroft.scheduler.update_event',
self.update_event_handler)
self.bus.on('mycroft.scheduler.get_event',
self.get_event_handler)
self.start()
def load(self):
"""Load json data with active events from json file."""
if isfile(self.schedule_file):
json_data = {}
with open(self.schedule_file) as f:
try:
json_data = json.load(f)
except Exception as e:
LOG.error(e)
current_time = time.time()
with self.event_lock:
for key in json_data:
event_list = json_data[key]
# discard non repeating events that has already happened
self.events[key] = [tuple(e) for e in event_list
if e[0] > current_time or e[1]]
def run(self):
while self.is_running:
self.check_state()
time.sleep(0.5)
def check_state(self):
"""Check if an event should be triggered."""
with self.event_lock:
# Check all events
pending_messages = []
for event in self.events:
current_time = time.time()
e = self.events[event]
# Get scheduled times that has passed
passed = [(t, r, d, c) for
(t, r, d, c) in e if t <= current_time]
# and remaining times that we're still waiting for
remaining = [(t, r, d, c) for
t, r, d, c in e if t > current_time]
# Trigger registered methods
for sched_time, repeat, data, context in passed:
pending_messages.append(Message(event, data, context))
# if this is a repeated event add a new trigger time
if repeat:
next_time = repeat_time(sched_time, repeat)
remaining.append((next_time, repeat, data, context))
# update list of events
self.events[event] = remaining
# Remove events have are now completed
self.clear_empty()
# Finally, emit the queued up events that triggered
for msg in pending_messages:
self.bus.emit(msg)
def schedule_event(self, event, sched_time, repeat=None,
data=None, context=None):
"""Add event to pending event schedule.
Args:
event (str): Handler for the event
sched_time ([type]): [description]
repeat ([type], optional): Defaults to None. [description]
data ([type], optional): Defaults to None. [description]
context (dict, optional): context (dict, optional): message
context to send when the
handler is called
"""
data = data or {}
with self.event_lock:
# get current list of scheduled times for event, [] if missing
event_list = self.events.get(event, [])
# Don't schedule if the event is repeating and already scheduled
if repeat and event in self.events:
LOG.debug('Repeating event {} is already scheduled, discarding'
.format(event))
else:
# add received event and time
event_list.append((sched_time, repeat, data, context))
self.events[event] = event_list
def schedule_event_handler(self, message):
"""Messagebus interface to the schedule_event method.
Required data in the message envelope is
event: event to emit
time: time to emit the event
Optional data is
repeat: repeat interval
data: data to send along with the event
"""
event = message.data.get('event')
sched_time = message.data.get('time')
repeat = message.data.get('repeat')
data = message.data.get('data')
context = message.context
if event and sched_time:
self.schedule_event(event, sched_time, repeat, data, context)
elif not event:
LOG.error('Scheduled event name not provided')
else:
LOG.error('Scheduled event time not provided')
def remove_event(self, event):
"""Remove an event from the list of scheduled events.
Args:
event (str): event identifier
"""
with self.event_lock:
if event in self.events:
self.events.pop(event)
def remove_event_handler(self, message):
"""Messagebus interface to the remove_event method."""
event = message.data.get('event')
self.remove_event(event)
def update_event(self, event, data):
"""Change an existing events data.
This will only update the first call if multiple calls are registered
to the same event identifier.
Args:
event (str): event identifier
data (dict): new data
"""
with self.event_lock:
# if there is an active event with this name
if len(self.events.get(event, [])) > 0:
time, repeat, _, context = self.events[event][0]
self.events[event][0] = (time, repeat, data, context)
def update_event_handler(self, message):
"""Messagebus interface to the update_event method."""
event = message.data.get('event')
data = message.data.get('data')
self.update_event(event, data)
def get_event_handler(self, message):
"""Messagebus interface to get_event.
Emits another event sending event status.
"""
event_name = message.data.get("name")
event = None
with self.event_lock:
if event_name in self.events:
event = self.events[event_name]
emitter_name = 'mycroft.event_status.callback.{}'.format(event_name)
self.bus.emit(message.reply(emitter_name, data=event))
def store(self):
"""Write current schedule to disk."""
with self.event_lock:
with open(self.schedule_file, 'w') as f:
json.dump(self.events, f)
def clear_repeating(self):
"""Remove repeating events from events dict."""
with self.event_lock:
for e in self.events:
self.events[e] = [i for i in self.events[e] if i[1] is None]
def clear_empty(self):
"""Remove empty event entries from events dict."""
with self.event_lock:
self.events = {k: self.events[k] for k in self.events
if self.events[k] != []}
def shutdown(self):
"""Stop the running thread."""
self.is_running = False
# Remove listeners
self.bus.remove_all_listeners('mycroft.scheduler.schedule_event')
self.bus.remove_all_listeners('mycroft.scheduler.remove_event')
self.bus.remove_all_listeners('mycroft.scheduler.update_event')
# Wait for thread to finish
self.join()
# Prune event list in preparation for saving
self.clear_repeating()
self.clear_empty()
# Store all pending scheduled events
self.store()
class EventSchedulerInterface:
"""Interface for accessing the event scheduler over the message bus."""
def __init__(self, name, sched_id=None, bus=None):
self.name = name
self.sched_id = sched_id
self.bus = bus
self.events = EventContainer(bus)
self.scheduled_repeats = []
def set_bus(self, bus):
self.bus = bus
self.events.set_bus(bus)
def set_id(self, sched_id):
self.sched_id = sched_id
def _create_unique_name(self, name):
"""Return a name unique to this skill using the format
[skill_id]:[name].
Args:
name: Name to use internally
Returns:
str: name unique to this skill
"""
return str(self.sched_id) + ':' + (name or '')
def _schedule_event(self, handler, when, data, name,
repeat_interval=None, context=None):
"""Underlying method for schedule_event and schedule_repeating_event.
Takes scheduling information and sends it off on the message bus.
Args:
handler: method to be called
when (datetime): time (in system timezone) for first
calling the handler, or None to
initially trigger <frequency> seconds
from now
data (dict, optional): data to send when the handler is called
name (str, optional): reference name, must be unique
repeat_interval (float/int): time in seconds between calls
context (dict, optional): message context to send
when the handler is called
"""
if isinstance(when, (int, float)) and when >= 0:
when = datetime.now() + timedelta(seconds=when)
if not name:
name = self.name + handler.__name__
unique_name = self._create_unique_name(name)
if repeat_interval:
self.scheduled_repeats.append(name) # store "friendly name"
data = data or {}
def on_error(e):
LOG.exception('An error occured executing the scheduled event '
'{}'.format(repr(e)))
wrapped = create_basic_wrapper(handler, on_error)
self.events.add(unique_name, wrapped, once=not repeat_interval)
event_data = {'time': time.mktime(when.timetuple()),
'event': unique_name,
'repeat': repeat_interval,
'data': data}
self.bus.emit(Message('mycroft.scheduler.schedule_event',
data=event_data, context=context))
def schedule_event(self, handler, when, data=None, name=None,
context=None):
"""Schedule a single-shot event.
Args:
handler: method to be called
when (datetime/int/float): datetime (in system timezone) or
number of seconds in the future when the
handler should be called
data (dict, optional): data to send when the handler is called
name (str, optional): reference name
NOTE: This will not warn or replace a
previously scheduled event of the same
name.
context (dict, optional): message context to send
when the handler is called
"""
self._schedule_event(handler, when, data, name, context=context)
def schedule_repeating_event(self, handler, when, interval,
data=None, name=None, context=None):
"""Schedule a repeating event.
Args:
handler: method to be called
when (datetime): time (in system timezone) for first
calling the handler, or None to
initially trigger <frequency> seconds
from now
interval (float/int): time in seconds between calls
data (dict, optional): data to send when the handler is called
name (str, optional): reference name, must be unique
context (dict, optional): message context to send
when the handler is called
"""
# Do not schedule if this event is already scheduled by the skill
if name not in self.scheduled_repeats:
# If only interval is given set to trigger in [interval] seconds
# from now.
if not when:
when = datetime.now() + timedelta(seconds=interval)
self._schedule_event(handler, when, data, name, interval,
context=context)
else:
LOG.debug('The event is already scheduled, cancel previous '
'event if this scheduling should replace the last.')
def update_scheduled_event(self, name, data=None):
"""Change data of event.
Args:
name (str): reference name of event (from original scheduling)
"""
data = data or {}
data = {
'event': self._create_unique_name(name),
'data': data
}
self.bus.emit(Message('mycroft.schedule.update_event',
data=data))
def cancel_scheduled_event(self, name):
"""Cancel a pending event. The event will no longer be scheduled
to be executed
Args:
name (str): reference name of event (from original scheduling)
"""
unique_name = self._create_unique_name(name)
data = {'event': unique_name}
if name in self.scheduled_repeats:
self.scheduled_repeats.remove(name)
if self.events.remove(unique_name):
self.bus.emit(Message('mycroft.scheduler.remove_event',
data=data))
def get_scheduled_event_status(self, name):
"""Get scheduled event data and return the amount of time left
Args:
name (str): reference name of event (from original scheduling)
Returns:
int: the time left in seconds
Raises:
Exception: Raised if event is not found
"""
event_name = self._create_unique_name(name)
data = {'name': event_name}
reply_name = 'mycroft.event_status.callback.{}'.format(event_name)
msg = Message('mycroft.scheduler.get_event', data=data)
status = self.bus.wait_for_response(msg, reply_type=reply_name)
if status:
event_time = int(status.data[0][0])
current_time = int(time.time())
time_left_in_seconds = event_time - current_time
LOG.info(time_left_in_seconds)
return time_left_in_seconds
else:
raise Exception("Event Status Messagebus Timeout")
def cancel_all_repeating_events(self):
"""Cancel any repeating events started by the skill."""
# NOTE: Gotta make a copy of the list due to the removes that happen
# in cancel_scheduled_event().
for e in list(self.scheduled_repeats):
self.cancel_scheduled_event(e)
def shutdown(self):
"""Shutdown the interface unregistering any event handlers."""
self.cancel_all_repeating_events()
self.events.clear()