Simplify
Skip the queue thread, do upload in series with skill load/reload.pull/2370/head
parent
fa1bdfdda7
commit
f5685bde96
|
@ -59,8 +59,7 @@ import json
|
|||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from queue import Queue, Empty
|
||||
from threading import Thread, Timer, Event
|
||||
from threading import Timer
|
||||
|
||||
from mycroft.api import DeviceApi, is_paired
|
||||
from mycroft.configuration import Configuration
|
||||
|
@ -140,48 +139,6 @@ def _extract_settings_from_meta(settings_meta: dict) -> dict:
|
|||
return fields
|
||||
|
||||
|
||||
class SettingsMetaQueue(Thread):
|
||||
"""Queue for sending settings metadata to backend.
|
||||
|
||||
This queue can be used during startup to capture all settingsmeta requests
|
||||
and then processing can be triggered at a later stage.
|
||||
|
||||
After all queued settingsmeta has been processed and the queue is empty
|
||||
the queue will set the self.processed event allowing waiting for
|
||||
processing.
|
||||
"""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._queue = Queue()
|
||||
self.daemon = True
|
||||
self.stopped = False
|
||||
self.processed = Event()
|
||||
|
||||
def run(self):
|
||||
timeout = 5
|
||||
while not self.stopped:
|
||||
try:
|
||||
uploader = self._queue.get(timeout=timeout)
|
||||
if uploader is None:
|
||||
break
|
||||
else:
|
||||
uploader.upload()
|
||||
except Empty:
|
||||
# When the queue is empty indicate that everything
|
||||
# sent to it before starting has been processed.
|
||||
timeout = None
|
||||
self.processed.set()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the queue processer."""
|
||||
self.stopped = True
|
||||
self._queue.put(None) # Insert dummy value to trigger a loop.
|
||||
|
||||
def put(self, value):
|
||||
"""Append a value to the queue."""
|
||||
self._queue.put(value)
|
||||
|
||||
|
||||
class SettingsMetaUploader:
|
||||
"""Synchronize the contents of the settingsmeta.json file with the backend.
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
"""Load, update and manage skills on this device."""
|
||||
import os
|
||||
from glob import glob
|
||||
from threading import Thread, Event
|
||||
from threading import Thread, Event, Lock
|
||||
from time import sleep, time
|
||||
|
||||
from mycroft.api import is_paired
|
||||
|
@ -24,13 +24,58 @@ from mycroft.configuration import Configuration
|
|||
from mycroft.messagebus.message import Message
|
||||
from mycroft.util.log import LOG
|
||||
from .msm_wrapper import create_msm as msm_creator, build_msm_config
|
||||
from .settings import SkillSettingsDownloader, SettingsMetaQueue
|
||||
from .settings import SkillSettingsDownloader
|
||||
from .skill_loader import SkillLoader
|
||||
from .skill_updater import SkillUpdater
|
||||
|
||||
SKILL_MAIN_MODULE = '__init__.py'
|
||||
|
||||
|
||||
class SettingsMetaQueue:
|
||||
"""Queue for sending settings metadata to backend.
|
||||
|
||||
This queue can be used during startup to capture all settingsmeta requests
|
||||
and then processing can be triggered at a later stage.
|
||||
|
||||
After all queued settingsmeta has been processed and the queue is empty
|
||||
the queue will set the self.started flag.
|
||||
"""
|
||||
def __init__(self):
|
||||
self._queue = []
|
||||
self.started = False
|
||||
self.lock = Lock()
|
||||
|
||||
def start(self):
|
||||
"""Start processing of the queue."""
|
||||
self.send()
|
||||
self.started = True
|
||||
|
||||
def send(self):
|
||||
"""Loop through all stored loaders triggering settingsmeta upload."""
|
||||
with self.lock:
|
||||
queue = self._queue
|
||||
self._queue = []
|
||||
if queue:
|
||||
LOG.info('New Settings meta to upload.')
|
||||
for loader in queue:
|
||||
loader.instance.settings_meta.upload()
|
||||
|
||||
def __len__(self):
|
||||
return len(self._queue)
|
||||
|
||||
def put(self, loader):
|
||||
"""Append a skill loader to the queue.
|
||||
|
||||
If a loader is already present it's removed in favor of the new entry.
|
||||
"""
|
||||
if self.started:
|
||||
LOG.info('Updating settings meta during runtime...')
|
||||
with self.lock:
|
||||
# Remove existing loader
|
||||
self._queue == [e for e in self._queue if e != loader]
|
||||
self._queue.append(loader)
|
||||
|
||||
|
||||
class SkillManager(Thread):
|
||||
_msm = None
|
||||
|
||||
|
@ -109,9 +154,8 @@ class SkillManager(Thread):
|
|||
self.skill_updater.next_download = time() - 1
|
||||
|
||||
def _start_settings_update(self):
|
||||
LOG.info('Sending settings meta')
|
||||
LOG.info('Start settings update')
|
||||
self.upload_queue.start()
|
||||
self.upload_queue.processed.wait()
|
||||
LOG.info('All settings meta has been processed or upload has started')
|
||||
self.settings_downloader.download()
|
||||
LOG.info('Skill settings downloading has started')
|
||||
|
@ -161,6 +205,11 @@ class SkillManager(Thread):
|
|||
self._load_new_skills()
|
||||
self._unload_removed_skills()
|
||||
self._update_skills()
|
||||
if is_paired() and len(self.upload_queue) > 0:
|
||||
self.msm.clear_cache()
|
||||
self.skill_updater.post_manifest()
|
||||
self.upload_queue.send()
|
||||
|
||||
sleep(2) # Pause briefly before beginning next scan
|
||||
except Exception:
|
||||
LOG.exception('Something really unexpected has occured '
|
||||
|
@ -184,37 +233,24 @@ class SkillManager(Thread):
|
|||
|
||||
def _reload_modified_skills(self):
|
||||
"""Handle reload of recently changed skill(s)"""
|
||||
reload_occured = False
|
||||
for skill_dir in self._get_skill_directories():
|
||||
try:
|
||||
skill_loader = self.skill_loaders.get(skill_dir)
|
||||
if skill_loader is not None and skill_loader.reload_needed():
|
||||
skill_loader.reload()
|
||||
reload_occured = True
|
||||
if skill_loader.instance:
|
||||
self.upload_queue.put(skill_loader)
|
||||
except Exception:
|
||||
LOG.exception('Unhandled exception occured while '
|
||||
'reloading {}'.format(skill_dir))
|
||||
|
||||
if reload_occured:
|
||||
# If a reload occured a skill gid may have changed.
|
||||
self.skill_updater.post_manifest(reload_skills_manifest=True)
|
||||
|
||||
def _load_new_skills(self):
|
||||
"""Handle load of skills installed since startup."""
|
||||
new_skills = []
|
||||
for skill_dir in self._get_skill_directories():
|
||||
if skill_dir not in self.skill_loaders:
|
||||
sl = self._load_skill(skill_dir)
|
||||
if sl:
|
||||
new_skills.append(sl)
|
||||
|
||||
# Upload manifest if paired and enque settingsmeta
|
||||
if new_skills:
|
||||
self.msm.clear_cache()
|
||||
if is_paired():
|
||||
self.skill_updater.post_manifest()
|
||||
for sl in new_skills:
|
||||
self.upload_queue.put(sl.instance.settings_meta)
|
||||
self.upload_queue.put(sl)
|
||||
|
||||
def _load_skill(self, skill_directory):
|
||||
skill_loader = SkillLoader(self.bus, skill_directory)
|
||||
|
|
|
@ -192,9 +192,7 @@ class SkillUpdater:
|
|||
upload_allowed = self.config['skills'].get('upload_skill_manifest')
|
||||
if upload_allowed and is_paired():
|
||||
if reload_skills_manifest:
|
||||
# TODO: Handle inside msm
|
||||
self.msm._device_skill_state = None
|
||||
|
||||
self.msm.clear_cache()
|
||||
try:
|
||||
device_api = DeviceApi()
|
||||
device_api.upload_skills_data(self.msm.device_skill_state)
|
||||
|
|
Loading…
Reference in New Issue