mycroft-core/mycroft/api/__init__.py

159 lines
4.7 KiB
Python

from copy import copy
import requests
from requests import HTTPError
from mycroft.configuration import ConfigurationManager
from mycroft.identity import IdentityManager
from mycroft.version import VersionManager
__author__ = 'jdorleans'
class Api(object):
def __init__(self, path):
self.path = path
config = ConfigurationManager().get()
config_server = config.get("server")
self.url = config_server.get("url")
self.version = config_server.get("version")
self.identity = IdentityManager.get()
def request(self, params):
self.check_token()
self.build_path(params)
self.old_params = copy(params)
return self.send(params)
def check_token(self):
if self.identity.refresh and self.identity.is_expired():
self.identity = IdentityManager.load()
if self.identity.is_expired():
self.refresh_token()
def refresh_token(self):
data = self.send({
"path": "auth/token",
"headers": {
"Authorization": "Bearer " + self.identity.refresh
}
})
IdentityManager.save(data)
def send(self, params):
method = params.get("method", "GET")
headers = self.build_headers(params)
data = self.build_data(params)
json = self.build_json(params)
query = self.build_query(params)
url = self.build_url(params)
response = requests.request(method, url, headers=headers, params=query,
data=data, json=json, timeout=(3.05, 15))
return self.get_response(response)
def get_response(self, response):
data = self.get_data(response)
if 200 <= response.status_code < 300:
return data
elif response.status_code == 401\
and not response.url.endswith("auth/token"):
self.refresh_token()
return self.send(self.old_params)
raise HTTPError(data, response=response)
def get_data(self, response):
try:
return response.json()
except:
return response.text
def build_headers(self, params):
headers = params.get("headers", {})
self.add_content_type(headers)
self.add_authorization(headers)
params["headers"] = headers
return headers
def add_content_type(self, headers):
if not headers.__contains__("Content-Type"):
headers["Content-Type"] = "application/json"
def add_authorization(self, headers):
if not headers.__contains__("Authorization"):
headers["Authorization"] = "Bearer " + self.identity.access
def build_data(self, params):
return params.get("data")
def build_json(self, params):
json = params.get("json")
if json and params["headers"]["Content-Type"] == "application/json":
for k, v in json.iteritems():
if v == "":
json[k] = None
params["json"] = json
return json
def build_query(self, params):
return params.get("query")
def build_path(self, params):
path = params.get("path", "")
params["path"] = self.path + path
return params["path"]
def build_url(self, params):
path = params.get("path", "")
version = params.get("version", self.version)
return self.url + "/" + version + "/" + path
class DeviceApi(Api):
def __init__(self):
super(DeviceApi, self).__init__("device")
def get_code(self, state):
IdentityManager.update()
return self.request({
"path": "/code?state=" + state
})
def activate(self, state, token):
version = VersionManager.get()
return self.request({
"method": "POST",
"path": "/activate",
"json": {"state": state,
"token": token,
"coreVersion": version.get("coreVersion"),
"enclosureVersion": version.get("enclosureVersion")}
})
def find(self):
return self.request({
"path": "/" + self.identity.uuid
})
def find_setting(self):
return self.request({
"path": "/" + self.identity.uuid + "/setting"
})
def find_location(self):
return self.request({
"path": "/" + self.identity.uuid + "/location"
})
class STTApi(Api):
def __init__(self):
super(STTApi, self).__init__("stt")
def stt(self, audio, language, limit):
return self.request({
"method": "POST",
"headers": {"Content-Type": "audio/x-flac"},
"query": {"lang": language, "limit": limit},
"data": audio
})