Issues 351 - Merging service and client web socket configuration
parent
74abe9267d
commit
db9c12c122
|
@ -13,15 +13,10 @@
|
|||
"version": "v1",
|
||||
"update": true
|
||||
},
|
||||
"messagebus_service": {
|
||||
"websocket": {
|
||||
"host": "localhost",
|
||||
"port": 8000,
|
||||
"route": "/events/ws"
|
||||
},
|
||||
"messagebus_client": {
|
||||
"host": "localhost",
|
||||
"port": 8000,
|
||||
"route": "/events/ws",
|
||||
"port": 8181,
|
||||
"route": "/core",
|
||||
"ssl": false
|
||||
},
|
||||
"metrics_client": {
|
||||
|
|
|
@ -23,56 +23,42 @@ from multiprocessing.pool import ThreadPool
|
|||
from pyee import EventEmitter
|
||||
from websocket import WebSocketApp
|
||||
|
||||
import mycroft.util.log
|
||||
from mycroft.configuration import ConfigurationManager
|
||||
from mycroft.messagebus.message import Message
|
||||
from mycroft.util import str2bool
|
||||
from mycroft.util import validate_param
|
||||
from mycroft.util.log import getLogger
|
||||
|
||||
__author__ = 'seanfitz'
|
||||
__author__ = 'seanfitz', 'jdorleans'
|
||||
|
||||
logger = mycroft.util.log.getLogger(__name__)
|
||||
|
||||
config = ConfigurationManager.get()
|
||||
client_config = config.get("messagebus_client")
|
||||
|
||||
|
||||
def validate_param(value, name):
|
||||
if not value:
|
||||
raise ValueError("Missing or empty %s in mycroft.ini "
|
||||
"[messagebus_client] section", name)
|
||||
LOG = getLogger(__name__)
|
||||
config = ConfigurationManager.get().get("websocket")
|
||||
|
||||
|
||||
class WebsocketClient(object):
|
||||
def __init__(self, host=client_config.get("host"),
|
||||
port=client_config.get("port"),
|
||||
path=client_config.get("route"),
|
||||
ssl=str2bool(client_config.get("ssl"))):
|
||||
def __init__(self, host=config.get("host"), port=config.get("port"),
|
||||
route=config.get("route"), ssl=config.get("ssl")):
|
||||
|
||||
validate_param(host, "host")
|
||||
validate_param(port, "port")
|
||||
validate_param(path, "route")
|
||||
# validate_param(ssl, "ssl")
|
||||
# ssl = str2bool(ssl)
|
||||
validate_param(host, "websocket.host")
|
||||
validate_param(port, "websocket.port")
|
||||
validate_param(route, "websocket.route")
|
||||
|
||||
self.build_url(host, port, route, ssl)
|
||||
self.emitter = EventEmitter()
|
||||
self.scheme = "wss" if ssl else "ws"
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.path = path
|
||||
self.exp_backoff_counter = 1
|
||||
self.client = self._create_new_connection()
|
||||
self.client = self.create_client()
|
||||
self.pool = ThreadPool(10)
|
||||
self.retry = 1
|
||||
|
||||
def _create_new_connection(self):
|
||||
return WebSocketApp(
|
||||
self.scheme + "://" + self.host + ":" + str(self.port) + self.path,
|
||||
on_open=self.on_open,
|
||||
on_close=self.on_close,
|
||||
on_error=self.on_error,
|
||||
on_message=self.on_message)
|
||||
def build_url(self, host, port, route, ssl):
|
||||
scheme = "wss" if ssl else "ws"
|
||||
self.url = scheme + "://" + host + ":" + str(port) + route
|
||||
|
||||
def create_client(self):
|
||||
return WebSocketApp(self.url,
|
||||
on_open=self.on_open, on_close=self.on_close,
|
||||
on_error=self.on_error, on_message=self.on_message)
|
||||
|
||||
def on_open(self, ws):
|
||||
logger.info("Connected")
|
||||
LOG.info("Connected")
|
||||
self.emitter.emit("open")
|
||||
|
||||
def on_close(self, ws):
|
||||
|
@ -83,13 +69,11 @@ class WebsocketClient(object):
|
|||
self.emitter.emit('error', error)
|
||||
self.client.close()
|
||||
except Exception, e:
|
||||
logger.error(repr(e))
|
||||
sleep_time = self.exp_backoff_counter
|
||||
logger.warn(
|
||||
"Disconnecting on error, reconnecting in %d seconds." % sleep_time)
|
||||
self.exp_backoff_counter = min(self.exp_backoff_counter * 2, 60)
|
||||
time.sleep(sleep_time)
|
||||
self.client = self._create_new_connection()
|
||||
LOG.error(repr(e))
|
||||
LOG.warn("WS Client Error: reconnecting in %d seconds." % self.retry)
|
||||
time.sleep(self.retry)
|
||||
self.retry = min(self.retry * 2, 60)
|
||||
self.client = self.create_client()
|
||||
self.run_forever()
|
||||
|
||||
def on_message(self, ws, message):
|
||||
|
@ -127,14 +111,16 @@ def echo():
|
|||
client = WebsocketClient()
|
||||
|
||||
def echo(message):
|
||||
logger.info(message)
|
||||
LOG.info(message)
|
||||
|
||||
def repeat_utterance(message):
|
||||
message.message_type = 'speak'
|
||||
client.emit(message)
|
||||
|
||||
client.on('message', echo)
|
||||
client.on('recognizer_loop:utterance', repeat_utterance)
|
||||
client.run_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
echo()
|
||||
|
|
|
@ -20,42 +20,33 @@ import tornado.web as web
|
|||
|
||||
from mycroft.configuration import ConfigurationManager
|
||||
from mycroft.messagebus.service.ws import WebsocketEventHandler
|
||||
from mycroft.util import validate_param
|
||||
|
||||
__author__ = 'seanfitz'
|
||||
__author__ = 'seanfitz', 'jdorleans'
|
||||
|
||||
settings = {
|
||||
'debug': True
|
||||
}
|
||||
|
||||
|
||||
def validate_param(value, name):
|
||||
if not value:
|
||||
raise ValueError("Missing or empty %s in mycroft.ini "
|
||||
"[messagebus_service] section", name)
|
||||
|
||||
|
||||
def main():
|
||||
import tornado.options
|
||||
tornado.options.parse_command_line()
|
||||
config = ConfigurationManager.get()
|
||||
service_config = config.get("messagebus_service")
|
||||
config = ConfigurationManager.get().get("websocket")
|
||||
|
||||
route = service_config.get('route')
|
||||
validate_param(route, 'route')
|
||||
host = config.get("host")
|
||||
port = config.get("port")
|
||||
route = config.get("route")
|
||||
validate_param(host, "websocket.host")
|
||||
validate_param(port, "websocket.port")
|
||||
validate_param(route, "websocket.route")
|
||||
|
||||
routes = [
|
||||
(route, WebsocketEventHandler)
|
||||
]
|
||||
|
||||
application = web.Application(routes, **settings)
|
||||
host = service_config.get("host")
|
||||
port = service_config.get("port")
|
||||
validate_param(host, 'host')
|
||||
validate_param(port, 'port')
|
||||
|
||||
application.listen(port, host)
|
||||
loop = ioloop.IOLoop.instance()
|
||||
loop.start()
|
||||
ioloop.IOLoop.instance().start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -16,15 +16,15 @@
|
|||
# along with Mycroft Core. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import traceback
|
||||
import sys
|
||||
import json
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
from pyee import EventEmitter
|
||||
import tornado.websocket
|
||||
from pyee import EventEmitter
|
||||
|
||||
from mycroft.messagebus.message import Message
|
||||
import mycroft.util.log
|
||||
from mycroft.messagebus.message import Message
|
||||
|
||||
logger = mycroft.util.log.getLogger(__name__)
|
||||
__author__ = 'seanfitz'
|
||||
|
|
|
@ -63,7 +63,7 @@ class SkillContainer(object):
|
|||
return parser.parse_args(args)
|
||||
|
||||
def __init_client(self, params):
|
||||
config = ConfigurationManager.get().get("messagebus_client")
|
||||
config = ConfigurationManager.get().get("websocket")
|
||||
|
||||
if not params.host:
|
||||
params.host = config.get('host')
|
||||
|
|
|
@ -127,5 +127,10 @@ def check_for_signal(signalName):
|
|||
return False
|
||||
|
||||
|
||||
def validate_param(value, name):
|
||||
if not value:
|
||||
raise ValueError("Missing or empty %s in mycroft.conf " % name)
|
||||
|
||||
|
||||
class CerberusAccessDenied(Exception):
|
||||
pass
|
||||
|
|
Loading…
Reference in New Issue