Created endpoint to refresh device oauth token

pull/63/head
Matheus Lima 2019-03-04 12:49:26 -03:00
parent 66ce602e65
commit e8e30f9d6e
4 changed files with 119 additions and 0 deletions

View File

@ -12,6 +12,7 @@ from .endpoints.device_activate import DeviceActivateEndpoint
from .endpoints.device_code import DeviceCodeEndpoint
from .endpoints.device_email import DeviceEmailEndpoint
from .endpoints.device_metrics import DeviceMetricsEndpoint, MetricsService
from .endpoints.device_refresh_token import DeviceRefreshTokenEndpoint
from .endpoints.device_setting import DeviceSettingEndpoint
from .endpoints.device_skill import DeviceSkillEndpoint
from .endpoints.device_skills import DeviceSkillsEndpoint
@ -108,3 +109,8 @@ public.add_url_rule(
view_func=DeviceMetricsEndpoint.as_view('device_metric_api'),
methods=['POST']
)
public.add_url_rule(
'/auth/token',
view_func=DeviceRefreshTokenEndpoint.as_view('refresh_token_api'),
methods=['GET']
)

View File

@ -0,0 +1,54 @@
import hashlib
import json
import uuid
from http import HTTPStatus
from selene.api import SeleneEndpoint
from selene.util.cache import SeleneCache
class DeviceRefreshTokenEndpoint(SeleneEndpoint):
ONE_DAY = 86400
def __init__(self):
super(DeviceRefreshTokenEndpoint, self).__init__()
self.cache: SeleneCache = self.config['SELENE_CACHE']
self.sha512 = hashlib.sha512()
def get(self):
refresh = self.request.headers['Authorization']
if refresh.startswith('Bearer '):
refresh = refresh[len('Bearer '):]
session = self._refresh_session_token(refresh)
if session:
response = session, HTTPStatus.OK
else:
response = '', HTTPStatus.UNAUTHORIZED
else:
response = '', HTTPStatus.UNAUTHORIZED
return response
def _refresh_session_token(self, refresh: str):
refresh_key = 'device.token.refresh:{}'.format(refresh)
session = self.cache.get(refresh_key)
if session:
old_login = json.loads(session)
device_id = old_login['uuid']
self.cache.delete(refresh_key)
self.sha512.update(bytes(str(uuid.uuid4()), 'utf-8'))
access = self.sha512.hexdigest()
self.sha512.update(bytes(str(uuid.uuid4()), 'utf-8'))
refresh = self.sha512.hexdigest()
login = dict(
uuid=device_id,
accessToken=access,
refreshToken=refresh,
expiration=self.ONE_DAY
)
new_login = json.dumps(login)
# Storing device access token for one:
self.cache.set_with_expiration('device.session:{uuid}'.format(uuid=device_id), new_login, self.ONE_DAY)
# Storing device refresh token for ever:
self.cache.set('device.token.refresh:{refresh}'.format(refresh=refresh), new_login)
return new_login

View File

@ -0,0 +1,13 @@
Feature: Refresh device token
Test the endpoint used to refresh the device session login
Scenario: A valid login session is returned after the refreshing token is performed
Given a device pairing code
When a device is added to an account using the pairing code
And device is activated
And the session token is refreshed
Then a valid new session entity should be returned
Scenario: An error status code is returned after trying to refresh an invalid token
When try to refresh an invalid refresh token
Then 401 status code should be returned

View File

@ -0,0 +1,46 @@
import json
from http import HTTPStatus
from behave import when, then
from hamcrest import assert_that, equal_to, has_key, is_not
@when('the session token is refreshed')
def refresh_token(context):
login = json.loads(context.activate_device_response.data)
refresh = login['refreshToken']
context.refresh_token_response = context.client.get(
'/auth/token',
headers={'Authorization': 'Bearer {token}'.format(token=refresh)}
)
@then('a valid new session entity should be returned')
def validate_refresh_token(context):
response = context.refresh_token_response
assert_that(response.status_code, equal_to(HTTPStatus.OK))
new_login = json.loads(response.data)
assert_that(new_login, has_key(equal_to('uuid')))
assert_that(new_login, has_key(equal_to('accessToken')))
assert_that(new_login, has_key(equal_to('refreshToken')))
assert_that(new_login, has_key(equal_to('expiration')))
old_login = json.loads(context.activate_device_response.data)
assert_that(new_login['uuid']), equal_to(old_login['uuid'])
assert_that(new_login['accessToken'], is_not(equal_to(old_login['accessToken'])))
assert_that(new_login['refreshToken'], is_not(equal_to(old_login['refreshToken'])))
@when('try to refresh an invalid refresh token')
def refresh_invalid_token(context):
context.refresh_invalid_token_response = context.client.get(
'/auth/token',
headers={'Authorization': 'Bearer {token}'.format(token='123')}
)
@then('401 status code should be returned')
def validate_refresh_invalid_token(context):
response = context.refresh_invalid_token_response
assert_that(response.status_code, equal_to(HTTPStatus.UNAUTHORIZED))