Change initial kill to SIGINT

This sends a ctrl+c signal to each process which will allow code to exit properly by handling KeyboardInterrupt
Other notable changes:
 - create_daemon method used to clean up create daemon threads
 - create_echo_function used to reduce code duplication with messagebus
 echo functions
 - wait_for_exit_signal used to wait for ctrl+c (SIGINT)
 - reset_sigint_handler used to ensure SIGINT will raise KeyboardInterrupt
pull/1519/head
Matthew D. Scholefield 2018-04-03 09:50:53 -05:00 committed by Matthew D. Scholefield
parent 70b9b320ed
commit 10bd9a1cf3
10 changed files with 189 additions and 144 deletions

View File

@ -18,17 +18,18 @@
This handles playback of audio and speech
"""
import imp
import json
import sys
import time
from os import listdir
from os.path import abspath, dirname, basename, isdir, join
import mycroft.audio.speech as speech
from mycroft.configuration import Configuration
from mycroft.messagebus.client.ws import WebsocketClient
from mycroft.messagebus.message import Message
from mycroft.util import reset_sigint_handler, wait_for_exit_signal, \
create_daemon, create_echo_function
from mycroft.util.log import LOG
try:
@ -36,7 +37,6 @@ try:
except ImportError:
pulsectl = None
MAINMODULE = '__init__'
sys.path.append(abspath(dirname(__file__)))
@ -139,6 +139,7 @@ class AudioService(object):
Handles playback of audio and selecting proper backend for the uri
to be played.
"""
def __init__(self, ws):
"""
Args:
@ -458,31 +459,20 @@ class AudioService(object):
def main():
""" Main function. Run when file is invoked. """
reset_sigint_handler()
ws = WebsocketClient()
Configuration.init(ws)
speech.init(ws)
def echo(message):
""" Echo message bus messages. """
try:
_message = json.loads(message)
if 'mycroft.audio.service' not in _message.get('type'):
return
message = json.dumps(_message)
except Exception as e:
LOG.exception(e)
LOG.debug(message)
LOG.info("Staring Audio Services")
ws.on('message', echo)
LOG.info("Starting Audio Services")
ws.on('message', create_echo_function('AUDIO', ['mycroft.audio.service']))
audio = AudioService(ws) # Connect audio service instance to message bus
try:
ws.run_forever()
except KeyboardInterrupt as e:
LOG.exception(e)
create_daemon(ws.run_forever)
wait_for_exit_signal()
speech.shutdown()
audio.shutdown()
sys.exit()
if __name__ == "__main__":

View File

@ -151,7 +151,6 @@ def init(websocket):
def shutdown():
global tts
if tts:
tts.playback.stop()
tts.playback.join()

View File

@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from threading import Thread, Lock
from threading import Lock
from mycroft import dialog
from mycroft.client.enclosure.api import EnclosureAPI
@ -23,14 +22,14 @@ from mycroft.identity import IdentityManager
from mycroft.lock import Lock as PIDLock # Create/Support PID locking file
from mycroft.messagebus.client.ws import WebsocketClient
from mycroft.messagebus.message import Message
from mycroft.util import create_daemon, wait_for_exit_signal, \
reset_sigint_handler
from mycroft.util.log import LOG
ws = None
lock = Lock()
loop = None
config = Configuration.get()
def handle_record_begin():
LOG.info("Begin Recording...")
@ -132,17 +131,12 @@ def handle_open():
EnclosureAPI(ws).reset()
def connect():
ws.run_forever()
def main():
global ws
global loop
global config
lock = PIDLock("voice")
reset_sigint_handler()
PIDLock("voice")
ws = WebsocketClient()
config = Configuration.get()
Configuration.init(ws)
loop = RecognizerLoop()
loop.on('recognizer_loop:utterance', handle_utterance)
@ -163,15 +157,11 @@ def main():
ws.on('recognizer_loop:audio_output_start', handle_audio_start)
ws.on('recognizer_loop:audio_output_end', handle_audio_end)
ws.on('mycroft.stop', handle_stop)
event_thread = Thread(target=connect)
event_thread.setDaemon(True)
event_thread.start()
try:
loop.run()
except KeyboardInterrupt as e:
LOG.exception(e)
sys.exit()
create_daemon(ws.run_forever)
create_daemon(loop.run)
wait_for_exit_signal()
if __name__ == "__main__":

View File

@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from signal import getsignal, signal, SIGKILL, SIGINT, SIGTERM # signals
from signal import getsignal, signal, SIGKILL, SIGINT, SIGTERM, \
SIG_DFL, default_int_handler, SIG_IGN # signals
import os # Operating System functions
@ -20,16 +21,19 @@ import os # Operating System functions
#
# Wrapper around chain of handler functions for a specific system level signal.
# Often used to trap Ctrl-C for specific application purposes.
from mycroft.util import LOG
class Signal(object): # python 3+ class Signal
'''
"""
Capture and replace a signal handler with a user supplied function.
The user supplied function is always called first then the previous
handler, if it exists, will be called. It is possible to chain several
signal handlers together by creating multiply instances of objects of
this class, providing a different user functions for each instance. All
provided user functions will be called in LIFO order.
'''
"""
#
# Constructor
@ -37,38 +41,40 @@ class Signal(object): # python 3+ class Signal
# as the new handler function for this signal
def __init__(self, sig_value, func):
'''
"""
Create an instance of the signal handler class.
sig_value: The ID value of the signal to be captured.
func: User supplied function that will act as the new signal handler.
'''
"""
super(Signal, self).__init__() # python 3+ 'super().__init__()
self.__sig_value = sig_value
self.__user_func = func # store user passed function
self.__previous_func = getsignal(sig_value) # get current handler
signal(sig_value, self)
self.__previous_func = signal(sig_value, self)
self.__previous_func = { # Convert signal codes to functions
SIG_DFL: default_int_handler,
SIG_IGN: lambda a, b: None
}.get(self.__previous_func, self.__previous_func)
#
# Called to handle the passed signal
def __call__(self, signame, sf):
'''
"""
Allows the instance of this class to be called as a function.
When called it runs the user supplied signal handler than
checks to see if there is a previously defined handler. If
there is a previously defined handler call it.
'''
self.__user_func() # call user function
if self.__previous_func:
"""
self.__user_func()
self.__previous_func(signame, sf)
#
# reset the signal handler
def __del__(self):
'''
"""
Class destructor. Called during garbage collection.
Resets the signal handler to the previous function.
'''
"""
signal(self.__sig_value, self.__previous_func)
# End class Signal
@ -83,12 +89,12 @@ class Signal(object): # python 3+ class Signal
# ------------------------------------------------------------------------------
class Lock(object): # python 3+ 'class Lock'
'''
"""
Create and maintains the PID lock file for this application process.
The PID lock file is located in /tmp/mycroft/*.pid. If another process
of the same type is started, this class will 'attempt' to stop the
previously running process and then change the process ID in the lock file.
'''
"""
#
# Class constants
@ -98,13 +104,13 @@ class Lock(object): # python 3+ 'class Lock'
#
# Constructor
def __init__(self, service):
'''
"""
Builds the instance of this object. Holds the lock until the
object is garbage collected.
service: Text string. The name of the service application
to be locked (ie: skills, voice)
'''
"""
super(Lock, self).__init__() # python 3+ 'super().__init__()'
self.__pid = os.getpid() # PID of this application
self.path = Lock.DIRECTORY + Lock.FILE.format(service)
@ -114,11 +120,11 @@ class Lock(object): # python 3+ 'class Lock'
#
# Reset the signal handlers to the 'delete' function
def set_handlers(self):
'''
"""
Trap both SIGINT and SIGTERM to gracefully clean up PID files
'''
self.__handlers = {SIGINT: Signal(SIGINT, self.delete)}
self.__handlers = {SIGTERM: Signal(SIGTERM, self.delete)}
"""
self.__handlers = {SIGINT: Signal(SIGINT, self.delete),
SIGTERM: Signal(SIGTERM, self.delete)}
#
# Check to see if the PID already exists
@ -126,12 +132,12 @@ class Lock(object): # python 3+ 'class Lock'
# Stop the current process
# Delete the exiting file
def exists(self):
'''
"""
Check to see if the PID lock file currently exists. If it does
than send a SIGTERM signal to the process defined by the value
in the lock file. Catch the keyboard interrupt exception to
prevent propagation if stopped by use of Ctrl-C.
'''
"""
if not os.path.isfile(self.path):
return
with open(self.path, 'r') as L:
@ -143,11 +149,11 @@ class Lock(object): # python 3+ 'class Lock'
#
# Create a lock file for this server process
def touch(self):
'''
"""
If needed, create the '/tmp/mycroft' directory than open the
lock file for writting and store the current process ID (PID)
as text.
'''
"""
if not os.path.exists(Lock.DIRECTORY):
os.makedirs(Lock.DIRECTORY)
with open(self.path, 'w') as L:
@ -156,12 +162,12 @@ class Lock(object): # python 3+ 'class Lock'
#
# Create the PID file
def create(self):
'''
"""
Checks to see if a lock file for this service already exists,
if so have it killed. In either case write the process ID of
the current service process to to the existing or newly created
lock file in /tmp/mycroft/
'''
"""
self.exists() # check for current running process
self.touch()
@ -169,12 +175,12 @@ class Lock(object): # python 3+ 'class Lock'
# Delete the PID file - but only if it has not been overwritten
# by a duplicate service application
def delete(self, *args):
'''
"""
If the PID lock file contains the PID of this process delete it.
*args: Ignored. Required as this fuction is called as a signel
handler.
'''
"""
try:
with open(self.path, 'r') as L:
pid = int(L.read())

View File

@ -14,16 +14,16 @@
#
import json
import time
import monotonic
from multiprocessing.pool import ThreadPool
from threading import Event
import monotonic
from pyee import EventEmitter
from websocket import WebSocketApp
from websocket import WebSocketApp, WebSocketConnectionClosedException
from mycroft.configuration import Configuration
from mycroft.messagebus.message import Message
from mycroft.util import validate_param
from mycroft.util import validate_param, create_echo_function
from mycroft.util.log import LOG
@ -68,11 +68,19 @@ class WebsocketClient(object):
self.emitter.emit("close")
def on_error(self, ws, error):
if isinstance(error, WebSocketConnectionClosedException):
LOG.warning('Could not send message because connection has closed')
return
LOG.exception(
'=== ' + error.__class__.__name__ + ': ' + str(error) + ' ===')
try:
self.emitter.emit('error', error)
if self.client.keep_running:
self.client.close()
except Exception as e:
LOG.error(repr(e))
LOG.error('Exception closing websocket: ' + repr(e))
LOG.warning("WS Client will reconnect in %d seconds." % self.retry)
time.sleep(self.retry)
self.retry = min(self.retry * 2, 60)
@ -92,10 +100,14 @@ class WebsocketClient(object):
'before emitting messages')
self.connected_event.wait()
try:
if hasattr(message, 'serialize'):
self.client.send(message.serialize())
else:
self.client.send(json.dumps(message.__dict__))
except WebSocketConnectionClosedException:
LOG.warning('Could not send {} message because connection '
'has been closed'.format(message.type))
def wait_for_response(self, message, reply_type=None, timeout=None):
"""Send a message and wait for a response.
@ -141,7 +153,10 @@ class WebsocketClient(object):
self.emitter.once(event_name, func)
def remove(self, event_name, func):
try:
self.emitter.remove_listener(event_name, func)
except ValueError as e:
LOG.warning('Failed to remove event {}: {}'.format(event_name, e))
def remove_all_listeners(self, event_name):
'''
@ -166,14 +181,11 @@ class WebsocketClient(object):
def echo():
ws = WebsocketClient()
def echo(message):
LOG.info(message)
def repeat_utterance(message):
message.type = 'speak'
ws.emit(message)
ws.on('message', echo)
ws.on('message', create_echo_function(None))
ws.on('recognizer_loop:utterance', repeat_utterance)
ws.run_forever()

View File

@ -17,8 +17,8 @@ from tornado import autoreload, web, ioloop
from mycroft.configuration import Configuration
from mycroft.lock import Lock # creates/supports PID locking file
from mycroft.messagebus.service.ws import WebsocketEventHandler
from mycroft.util import validate_param
from mycroft.util import validate_param, reset_sigint_handler, create_daemon, \
wait_for_exit_signal
settings = {
'debug': True
@ -27,6 +27,7 @@ settings = {
def main():
import tornado.options
reset_sigint_handler()
lock = Lock("service")
tornado.options.parse_command_line()
@ -50,7 +51,9 @@ def main():
]
application = web.Application(routes, **settings)
application.listen(port, host)
ioloop.IOLoop.instance().start()
create_daemon(ioloop.IOLoop.instance().start)
wait_for_exit_signal()
if __name__ == "__main__":

View File

@ -910,7 +910,7 @@ class MycroftSkill(object):
Returns:
str: name unique to this skill
"""
return str(self.skill_id) + ':' + name
return str(self.skill_id) + ':' + (name or '')
def _schedule_event(self, handler, when, data=None, name=None,
repeat=None):

View File

@ -13,7 +13,6 @@
# limitations under the License.
#
import gc
import json
import os
import subprocess
import sys
@ -26,16 +25,19 @@ from os.path import exists, join
import mycroft.lock
from mycroft import MYCROFT_ROOT_PATH, dialog
from mycroft.api import is_paired, BackendDown
from mycroft.client.enclosure.api import EnclosureAPI
from mycroft.configuration import Configuration
from mycroft.messagebus.client.ws import WebsocketClient
from mycroft.messagebus.message import Message
from mycroft.skills.core import load_skill, create_skill_descriptor, \
MainModule, FallbackSkill
from mycroft.client.enclosure.api import EnclosureAPI
from mycroft.skills.event_scheduler import EventScheduler
from mycroft.skills.intent_service import IntentService
from mycroft.skills.padatious_service import PadatiousService
from mycroft.util import connected, wait_while_speaking
from mycroft.util import (
connected, wait_while_speaking, reset_sigint_handler,
create_echo_function, create_daemon, wait_for_exit_signal
)
from mycroft.util.log import LOG
ws = None
@ -146,8 +148,8 @@ def check_connection():
# Time moved by over an hour in the NTP sync. Force a reboot to
# prevent weird things from occcurring due to the 'time warp'.
#
ws.emit(Message("speak", {'utterance':
dialog.get("time.changed.reboot")}))
data = {'utterance': dialog.get("time.changed.reboot")}
ws.emit(Message("speak", data))
wait_while_speaking()
# provide visual indicators of the reboot
@ -315,8 +317,8 @@ class SkillManager(Thread):
self.next_download = time.time() + 60 * MINUTES
if res == 0 and speak:
self.ws.emit(Message("speak", {'utterance':
dialog.get("skills updated")}))
data = {'utterance': dialog.get("skills updated")}
self.ws.emit(Message("speak", data))
return True
elif not connected():
LOG.error('msm failed, network connection not available')
@ -386,11 +388,10 @@ class SkillManager(Thread):
# Remove two local references that are known
refs = sys.getrefcount(skill["instance"]) - 2
if refs > 0:
LOG.warning(
"After shutdown of {} there are still "
msg = ("After shutdown of {} there are still "
"{} references remaining. The skill "
"won't be cleaned from memory."
.format(skill['instance'].name, refs))
"won't be cleaned from memory.")
LOG.warning(msg.format(skill['instance'].name, refs))
del skill["instance"]
self.ws.emit(Message("mycroft.skills.shutdown",
{"folder": skill_folder,
@ -478,13 +479,6 @@ class SkillManager(Thread):
# Pause briefly before beginning next scan
time.sleep(2)
# Do a clean shutdown of all skills
for skill in self.loaded_skills:
try:
self.loaded_skills[skill]['instance'].shutdown()
except BaseException:
pass
def send_skill_list(self, message=None):
"""
Send list of loaded skills.
@ -504,6 +498,15 @@ class SkillManager(Thread):
""" Tell the manager to shutdown """
self._stop_event.set()
# Do a clean shutdown of all skills
for name, skill_info in self.loaded_skills.items():
instance = skill_info.get('instance')
if instance:
try:
instance.shutdown()
except Exception:
LOG.exception('Shutting down skill: ' + name)
def handle_converse_request(self, message):
""" Check if the targeted skill id can handle conversation
@ -538,39 +541,23 @@ class SkillManager(Thread):
def main():
global ws
reset_sigint_handler()
# Create PID file, prevent multiple instancesof this service
mycroft.lock.Lock('skills')
# Connect this Skill management process to the websocket
ws = WebsocketClient()
Configuration.init(ws)
ignore_logs = Configuration.get().get("ignore_logs")
# Listen for messages and echo them for logging
def _echo(message):
try:
_message = json.loads(message)
if _message.get("type") in ignore_logs:
return
if _message.get("type") == "registration":
# do not log tokens from registration messages
_message["data"]["token"] = None
message = json.dumps(_message)
except BaseException:
pass
LOG('SKILLS').debug(message)
ws.on('message', _echo)
ws.on('message', create_echo_function('SKILLS'))
# Startup will be called after websocket is fully live
ws.once('open', _starting_up)
ws.run_forever()
create_daemon(ws.run_forever)
wait_for_exit_signal()
shutdown()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
def shutdown():
if event_scheduler:
event_scheduler.shutdown()
@ -579,5 +566,6 @@ if __name__ == "__main__":
skill_manager.stop()
skill_manager.join()
finally:
sys.exit()
if __name__ == "__main__":
main()

View File

@ -12,13 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import socket
import subprocess
from threading import Thread
from time import sleep
import json
import os.path
import psutil
from stat import S_ISREG, ST_MTIME, ST_MODE, ST_SIZE
import signal as sig
import mycroft.audio
import mycroft.configuration
from mycroft.util.format import nice_number
@ -266,3 +272,53 @@ def stop_speaking():
def get_arch():
""" Get architecture string of system. """
return os.uname()[4]
def reset_sigint_handler():
"""
Reset the sigint handler to the default. This fixes KeyboardInterrupt
not getting raised when started via start-mycroft.sh
"""
sig.signal(sig.SIGINT, sig.default_int_handler)
def create_daemon(target, args=(), kwargs=None):
"""Helper to quickly create and start a thread with daemon = True"""
t = Thread(target=target, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return t
def wait_for_exit_signal():
"""Blocks until KeyboardInterrupt is received"""
try:
while True:
sleep(100)
except KeyboardInterrupt:
pass
def create_echo_function(name, whitelist=None):
from mycroft.configuration import Configuration
blacklist = Configuration.get().get("ignore_logs")
def echo(message):
"""Listen for messages and echo them for logging"""
try:
js_msg = json.loads(message)
if whitelist and js_msg.get("type") not in whitelist:
return
if blacklist and js_msg.get("type") in blacklist:
return
if js_msg.get("type") == "registration":
# do not log tokens from registration messages
js_msg["data"]["token"] = None
message = json.dumps(js_msg)
except Exception:
pass
LOG(name).debug(message)
return echo

View File

@ -53,7 +53,7 @@ function end-process() {
if process-running $1 ; then
pid=$( ps aux | grep "[p]ython .*${1}/main.py" | awk '{print $2}' )
kill ${pid}
kill -SIGINT ${pid}
c=1
while [ $c -le 20 ]
@ -67,6 +67,7 @@ function end-process() {
done
if process-running $1 ; then
echo "Killing $1..."
kill -9 ${pid}
fi
fi