added test for logout function

pull/39/head
Chris Veilleux 2019-02-06 13:37:39 -06:00
parent bea58504d6
commit fa5da2a914
7 changed files with 128 additions and 66 deletions

View File

@ -3,35 +3,27 @@
from http import HTTPStatus
from logging import getLogger
import requests
from selene.api import SeleneEndpoint, APIError
from selene.account import RefreshTokenRepository
from selene.api import SeleneEndpoint
from selene.util.db import get_db_connection
_log = getLogger(__package__)
class LogoutEndpoint(SeleneEndpoint):
def __init__(self):
super(LogoutEndpoint, self).__init__()
def get(self):
try:
self._authenticate()
self._authenticate()
if self.authenticated or self.refresh_token_expired:
self._logout()
except APIError:
pass
return self.response
def _logout(self):
service_request_headers = {
'Authorization': 'Bearer ' + self.tartarus_token
}
service_url = self.config['TARTARUS_BASE_URL'] + '/auth/logout'
auth_service_response = requests.get(
service_url,
headers=service_request_headers
)
self._check_for_service_errors(auth_service_response)
logout_response = auth_service_response.content.decode()
self.response = (logout_response, HTTPStatus.OK)
request_refresh_token = self.request.cookies['seleneRefresh']
with get_db_connection(self.config['DB_CONNECTION_POOL']) as db:
token_repository = RefreshTokenRepository(db, self.account)
token_repository.delete_refresh_token(request_refresh_token)
access_token, refresh_token = self._generate_tokens()
self._set_token_cookies(access_token, refresh_token, expire=True)
self.response = ('logged out', HTTPStatus.OK)

View File

@ -11,6 +11,7 @@ from selene.util.db import get_db_connection
def sso_client(context):
sso.testing = True
context.db_pool = sso.config['DB_CONNECTION_POOL']
context.client_config = sso.config
context.client = sso.test_client()
yield context.client

View File

@ -0,0 +1,14 @@
Feature: federated login
User signs into a selene web app after authenticating with a 3rd party.
Scenario: User with existing account signs in via Facebook
Given user "foo@mycroft.ai" authenticates through facebook
When single sign on validates the account
Then login request succeeds
And response contains authentication tokens
And account has new refresh token
Scenario: User without account signs in via Facebook
Given user "bar@mycroft.ai" authenticates through facebook
When single sign on validates the account
Then login fails with "account not found" error

View File

@ -5,22 +5,11 @@ Feature: internal login
Scenario: User signs in with valid email/password combination
Given user enters email address "foo@mycroft.ai" and password "foo"
When user attempts to login
Then login succeeds
Then login request succeeds
And response contains authentication tokens
And account has new refresh token
Scenario: User signs in with invalid email/password combination
Given user enters email address "foo@mycroft.ai" and password "bar"
When user attempts to login
Then login fails with "provided credentials not found" error
Scenario: User with existing account signs in via Facebook
Given user "foo@mycroft.ai" authenticates through facebook
When single sign on validates the account
Then login succeeds
Scenario: User without account signs in via Facebook
Given user "bar@mycroft.ai" authenticates through facebook
When single sign on validates the account
Then login fails with "account not found" error
# Scenario: Logged in user requests logout
# Given: user "devops@mycroft.ai" is authenticated

View File

@ -0,0 +1,10 @@
Feature: logout
Regardless of how a user logs in, logging out consists of expiring the
tokens we use to identify logged-in users.
Scenario: Logged in user requests logout
Given user "foo@mycroft.ai" is authenticated
When user attempts to logout
Then request is successful
And response contains expired token cookies
And refresh token in request is removed from account

View File

@ -3,8 +3,7 @@ from http import HTTPStatus
from behave import given, then, when
from hamcrest import assert_that, equal_to, has_item
from selene.account import AccountRepository
from selene.util.db import get_db_connection
from selene.api.testing import get_account, validate_token_cookies
@given('user enters email address "{email}" and password "{password}"')
@ -14,7 +13,6 @@ def save_credentials(context, email, password):
@given('user "{email}" authenticates through facebook')
@given('user "{email}" is authenticated')
def save_email(context, email):
context.email = email
@ -36,28 +34,23 @@ def call_internal_login_endpoint(context):
headers=dict(Authorization='Basic ' + credentials))
@then('login succeeds')
@then('login request succeeds')
def check_for_login_success(context):
assert_that(context.response.status_code, equal_to(HTTPStatus.OK))
assert_that(
context.response.headers['Access-Control-Allow-Origin'],
equal_to('*')
)
for cookie in context.response.headers.getlist('Set-Cookie'):
ingredients = parse_cookie(cookie)
ingredient_names = list(ingredients.keys())
if cookie.startswith('seleneAccess'):
assert_that(ingredient_names, has_item('seleneAccess'))
elif cookie.startswith('seleneRefresh'):
assert_that(ingredient_names, has_item('seleneRefresh'))
context.refresh_token = ingredients['seleneRefresh']
else:
raise ValueError('unexpected cookie found: ' + cookie)
for ingredient_name in ('Domain', 'Expires', 'Max-Age'):
assert_that(ingredient_names, has_item(ingredient_name))
with get_db_connection(context.db_pool) as db:
acct_repository = AccountRepository(db)
account = acct_repository.get_account_by_email(context.email)
@then('response contains authentication tokens')
def check_token_cookies(context):
validate_token_cookies(context)
@then('account has new refresh token')
def check_account_has_refresh_token(context):
account = get_account(context)
assert_that(account.refresh_tokens, has_item(context.refresh_token))
@ -70,15 +63,3 @@ def check_for_login_fail(context, error_message):
)
assert_that(context.response.is_json, equal_to(True))
assert_that(context.response.get_json(), equal_to(error_message))
def parse_cookie(cookie: str) -> dict:
ingredients = {}
for ingredient in cookie.split('; '):
if '=' in ingredient:
key, value = ingredient.split('=')
ingredients[key] = value
else:
ingredients[ingredient] = None
return ingredients

View File

@ -0,0 +1,75 @@
from http import HTTPStatus
from behave import given, then, when
from hamcrest import assert_that, equal_to, has_item, is_not
from selene.account import RefreshTokenRepository
from selene.api.testing import get_account, validate_token_cookies
from selene.util.auth import AuthenticationTokenGenerator
from selene.util.db import get_db_connection
ACCESS_TOKEN_COOKIE_KEY = 'seleneAccess'
REFRESH_TOKEN_COOKIE_KEY = 'seleneRefresh'
@given('user "{email}" is authenticated')
def save_email(context, email):
context.email = email
@when('user attempts to logout')
def call_logout_endpoint(context):
token_generator = AuthenticationTokenGenerator(
context.account.id,
context.client_config['ACCESS_SECRET'],
context.client_config['REFRESH_SECRET']
)
context.client.set_cookie(
context.client_config['DOMAIN'],
ACCESS_TOKEN_COOKIE_KEY,
token_generator.access_token
)
context.client.set_cookie(
context.client_config['DOMAIN'],
REFRESH_TOKEN_COOKIE_KEY,
token_generator.refresh_token
)
context.request_refresh_token = token_generator.refresh_token
with get_db_connection(context.client_config['DB_CONNECTION_POOL']) as db:
token_repository = RefreshTokenRepository(db, context.account)
token_repository.add_refresh_token(token_generator.refresh_token)
context.response = context.client.get('/api/logout')
@then('request is successful')
def check_for_logout_success(context):
assert_that(context.response.status_code, equal_to(HTTPStatus.OK))
assert_that(
context.response.headers['Access-Control-Allow-Origin'],
equal_to('*')
)
@then('response contains expired token cookies')
def check_response_cookies(context):
validate_token_cookies(context, expired=True)
@then('refresh token in request is removed from account')
def check_refresh_token_removed(context):
account = get_account(context)
assert_that(
account.refresh_tokens,
is_not(has_item(context.request_refresh_token))
)
@then('logout fails with "{error_message}" error')
def check_for_login_fail(context, error_message):
assert_that(context.response.status_code, equal_to(HTTPStatus.UNAUTHORIZED))
assert_that(
context.response.headers['Access-Control-Allow-Origin'],
equal_to('*')
)
assert_that(context.response.is_json, equal_to(True))
assert_that(context.response.get_json(), equal_to(error_message))