Merge pull request #2233 from forslund/refactor/event-emitter

Small cleanup of the event scheduler
pull/2241/head
Åke 2019-07-26 17:10:23 +02:00 committed by GitHub
commit 7ebf58919e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 48 additions and 45 deletions

View File

@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Event scheduler system for calling skill (and other) methods at a specific
times.
"""
import json
import time
from threading import Thread, Lock
from os.path import isfile, join, expanduser
from mycroft.configuration import Configuration
@ -41,23 +43,22 @@ def repeat_time(sched_time, repeat):
class EventScheduler(Thread):
def __init__(self, bus, schedule_file='schedule.json'):
"""
Create an event scheduler thread. Will send messages at a
predetermined time to the registered targets.
"""Create an event scheduler thread. Will send messages at a
predetermined time to the registered targets.
Args:
bus: Mycroft messagebus (mycroft.messagebus)
schedule_file: File to store pending events to on shutdown
"""
super(EventScheduler, self).__init__()
Arguments:
bus: Mycroft messagebus (mycroft.messagebus)
schedule_file: File to store pending events to on shutdown
"""
def __init__(self, bus, schedule_file='schedule.json'):
super().__init__()
data_dir = expanduser(Configuration.get()['data_dir'])
self.events = {}
self.event_lock = Lock()
self.bus = bus
self.isRunning = True
self.is_running = True
self.schedule_file = join(data_dir, schedule_file)
if self.schedule_file:
self.load()
@ -73,9 +74,7 @@ class EventScheduler(Thread):
self.start()
def load(self):
"""
Load json data with active events from json file.
"""
"""Load json data with active events from json file."""
if isfile(self.schedule_file):
json_data = {}
with open(self.schedule_file) as f:
@ -92,14 +91,12 @@ class EventScheduler(Thread):
if e[0] > current_time or e[1]]
def run(self):
while self.isRunning:
while self.is_running:
self.check_state()
time.sleep(0.5)
def check_state(self):
"""
Check if an event should be triggered.
"""
"""Check if an event should be triggered."""
with self.event_lock:
# Check all events
pending_messages = []
@ -128,9 +125,9 @@ class EventScheduler(Thread):
self.bus.emit(msg)
def schedule_event(self, event, sched_time, repeat=None, data=None):
""" Add event to pending event schedule using thread safe queue.
"""Add event to pending event schedule.
Args:
Arguments:
event (str): Handler for the event
sched_time ([type]): [description]
repeat ([type], optional): Defaults to None. [description]
@ -151,16 +148,14 @@ class EventScheduler(Thread):
self.events[event] = event_list
def schedule_event_handler(self, message):
"""
Messagebus interface to the schedule_event method.
Required data in the message envelope is
event: event to emit
time: time to emit the event
optional data is
repeat: repeat interval
data: data to send along with the event
"""Messagebus interface to the schedule_event method.
Required data in the message envelope is
event: event to emit
time: time to emit the event
Optional data is
repeat: repeat interval
data: data to send along with the event
"""
event = message.data.get('event')
sched_time = message.data.get('time')
@ -174,16 +169,30 @@ class EventScheduler(Thread):
LOG.error('Scheduled event time not provided')
def remove_event(self, event):
"""Remove an event from the list of scheduled events.
Arguments:
event (str): event identifier
"""
with self.event_lock:
if event in self.events:
self.events.pop(event)
def remove_event_handler(self, message):
""" Messagebus interface to the remove_event method. """
"""Messagebus interface to the remove_event method."""
event = message.data.get('event')
self.remove_event(event)
def update_event(self, event, data):
"""Change an existing events data.
This will only update the first call if multiple calls are registered
to the same event identifier.
Arguments:
event (str): event identifier
data (dict): new data
"""
with self.event_lock:
# if there is an active event with this name
if len(self.events.get(event, [])) > 0:
@ -191,15 +200,15 @@ class EventScheduler(Thread):
self.events[event][0] = (time, repeat, data)
def update_event_handler(self, message):
""" Messagebus interface to the update_event method. """
"""Messagebus interface to the update_event method."""
event = message.data.get('event')
data = message.data.get('data')
self.update_event(event, data)
def get_event_handler(self, message):
"""
Messagebus interface to get_event.
Emits another event sending event status
"""Messagebus interface to get_event.
Emits another event sending event status.
"""
event_name = message.data.get("name")
event = None
@ -210,32 +219,26 @@ class EventScheduler(Thread):
self.bus.emit(message.reply(emitter_name, data=event))
def store(self):
"""
Write current schedule to disk.
"""
"""Write current schedule to disk."""
with self.event_lock:
with open(self.schedule_file, 'w') as f:
json.dump(self.events, f)
def clear_repeating(self):
"""
Remove repeating events from events dict.
"""
"""Remove repeating events from events dict."""
with self.event_lock:
for e in self.events:
self.events[e] = [i for i in self.events[e] if i[1] is None]
def clear_empty(self):
"""
Remove empty event entries from events dict
"""
"""Remove empty event entries from events dict."""
with self.event_lock:
self.events = {k: self.events[k] for k in self.events
if self.events[k] != []}
def shutdown(self):
""" Stop the running thread. """
self.isRunning = False
"""Stop the running thread."""
self.is_running = False
# Remove listeners
self.bus.remove_all_listeners('mycroft.scheduler.schedule_event')
self.bus.remove_all_listeners('mycroft.scheduler.remove_event')