mycroft-core/mycroft/metrics/__init__.py

218 lines
6.2 KiB
Python

# Copyright 2017 Mycroft AI Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from http import HTTPStatus
from queue import Queue, Empty
import threading
import time
import requests
from mycroft.api import DeviceApi
from mycroft.configuration import Configuration
from mycroft.session import SessionManager
from mycroft.util.log import LOG
from mycroft.version import CORE_VERSION_STR
from copy import copy
class _MetricSender(threading.Thread):
"""Thread responsible for sending metrics data."""
def __init__(self):
super().__init__()
self.queue = Queue()
self.daemon = True
self.start()
def run(self):
while True:
time.sleep(30)
try:
while True: # Try read the queue until it fails
report_metric(*self.queue.get_nowait())
time.sleep(0.5)
except Empty:
pass # If the queue is empty just continue the loop
except Exception as e:
LOG.error('Could not send Metrics: {}'.format(repr(e)))
_metric_uploader = _MetricSender()
def report_metric(name, data):
"""
Report a general metric to the Mycroft servers
Args:
name (str): Name of metric. Must use only letters and hyphens
data (dict): JSON dictionary to report. Must be valid JSON
"""
try:
if Configuration().get()['opt_in']:
DeviceApi().report_metric(name, data)
except requests.HTTPError as http_error:
if http_error.response.status_code == HTTPStatus.UNAUTHORIZED:
LOG.warning('Metric not uploaded - device not paired')
else:
LOG.exception(
'Metric couldn\'t be uploaded, due to a network error'
)
except requests.RequestException:
LOG.exception('Metric couldn\'t be uploaded, due to a network error')
def report_timing(ident, system, timing, additional_data=None):
"""Create standardized message for reporting timing.
Args:
ident (str): identifier of user interaction
system (str): system the that's generated the report
timing (stopwatch): Stopwatch object with recorded timing
additional_data (dict): dictionary with related data
"""
additional_data = additional_data or {}
report = copy(additional_data)
report['id'] = ident
report['system'] = system
report['start_time'] = timing.timestamp
report['time'] = timing.time
_metric_uploader.queue.put(('timing', report))
class Stopwatch:
"""
Simple time measuring class.
"""
def __init__(self):
self.timestamp = None
self.time = None
def start(self):
"""
Start a time measurement
"""
self.timestamp = time.time()
def lap(self):
cur_time = time.time()
start_time = self.timestamp
self.timestamp = cur_time
return cur_time - start_time
def stop(self):
"""
Stop a running time measurement. returns the measured time
"""
cur_time = time.time()
start_time = self.timestamp
self.time = cur_time - start_time
return self.time
def __enter__(self):
"""
Start stopwatch when entering with-block.
"""
self.start()
def __exit__(self, tpe, value, tb):
"""
Stop stopwatch when exiting with-block.
"""
self.stop()
def __str__(self):
cur_time = time.time()
if self.timestamp:
return str(self.time or cur_time - self.timestamp)
else:
return 'Not started'
class MetricsAggregator:
"""
MetricsAggregator is not threadsafe, and multiple clients writing the
same metric "concurrently" may result in data loss.
"""
def __init__(self):
self._counters = {}
self._timers = {}
self._levels = {}
self._attributes = {}
self.attr("version", CORE_VERSION_STR)
def increment(self, name, value=1):
cur = self._counters.get(name, 0)
self._counters[name] = cur + value
def timer(self, name, value):
cur = self._timers.get(name)
if not cur:
self._timers[name] = []
cur = self._timers[name] = []
cur.append(value)
def level(self, name, value):
self._levels[name] = value
def clear(self):
self._counters = {}
self._timers = {}
self._levels = {}
self._attributes = {}
self.attr("version", CORE_VERSION_STR)
def attr(self, name, value):
self._attributes[name] = value
def flush(self):
publisher = MetricsPublisher()
payload = {
'counters': self._counters,
'timers': self._timers,
'levels': self._levels,
'attributes': self._attributes
}
self.clear()
count = (len(payload['counters']) + len(payload['timers']) +
len(payload['levels']))
if count > 0:
# LOG.debug(json.dumps(payload))
def publish():
publisher.publish(payload)
threading.Thread(target=publish).start()
class MetricsPublisher:
def __init__(self, url=None, enabled=False):
conf = Configuration().get()['server']
self.url = url or conf['url']
self.enabled = enabled or conf['metrics']
def publish(self, events):
if 'session_id' not in events:
session_id = SessionManager.get().session_id
events['session_id'] = session_id
if self.enabled:
requests.post(
self.url,
headers={'Content-Type': 'application/json'},
data=json.dumps(events), verify=False)