updated tests to include the validate_federated endpoint
parent
845dd6b308
commit
6b6a687a00
|
@ -16,14 +16,13 @@ class ValidateFederatedEndpoint(SeleneEndpoint):
|
|||
else:
|
||||
access_token, refresh_token = self._generate_tokens()
|
||||
self._generate_token_cookies(access_token, refresh_token)
|
||||
self._update_refresh_token_on_db(refresh_token)
|
||||
self._add_refresh_token_to_db(refresh_token)
|
||||
self.response = 'account validated', HTTPStatus.OK
|
||||
|
||||
return self.response
|
||||
|
||||
def _get_account(self):
|
||||
request_data = json.loads(self.request.data)
|
||||
email_address = request_data['email']
|
||||
email_address = self.request.form['email']
|
||||
with get_db_connection(self.config['DB_CONNECTION_POOL']) as db:
|
||||
acct_repository = AccountRepository(db)
|
||||
self.account = acct_repository.get_account_by_email(email_address)
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
from behave import fixture, use_fixture
|
||||
|
||||
from sso_api.api import sso
|
||||
from selene.account import RefreshTokenRepository
|
||||
from selene.util.db import get_db_connection
|
||||
|
||||
|
||||
@fixture
|
||||
def sso_client(context):
|
||||
sso.testing = True
|
||||
context.db_pool = sso.config['DB_CONNECTION_POOL']
|
||||
context.client = sso.test_client()
|
||||
|
||||
yield context.client
|
||||
|
@ -13,3 +16,10 @@ def sso_client(context):
|
|||
|
||||
def before_feature(context, _):
|
||||
use_fixture(sso_client, context)
|
||||
|
||||
|
||||
def after_scenario(context, _):
|
||||
if hasattr(context, 'refresh_token'):
|
||||
with get_db_connection(context.db_pool) as db:
|
||||
token_repository = RefreshTokenRepository(db, context.account)
|
||||
token_repository.delete_refresh_token(context.refresh_token)
|
||||
|
|
|
@ -10,4 +10,14 @@ Feature: internal login
|
|||
Scenario: User signs in with invalid email/password combination
|
||||
Given user enters email address "devops@mycroft.ai" and password "foo"
|
||||
When user attempts to login
|
||||
Then login fails
|
||||
Then login fails with "provided credentials not found" error
|
||||
|
||||
Scenario: User with existing account signs in via Facebook
|
||||
Given user "devops@mycroft.ai" authenticates through facebook
|
||||
When single sign on validates the account
|
||||
Then login succeeds
|
||||
|
||||
Scenario: User without account signs in via Facebook
|
||||
Given user "foo@mycroft.ai" authenticates through facebook
|
||||
When single sign on validates the account
|
||||
Then login fails with "account not found" error
|
||||
|
|
|
@ -4,21 +4,35 @@ from behave import given, then, when
|
|||
from hamcrest import assert_that, contains, equal_to, has_item
|
||||
|
||||
from selene.account import Account, AccountRepository
|
||||
from selene.util.db import get_db_connection
|
||||
|
||||
|
||||
# TODO: add a step here when the add account logic is built
|
||||
@given('user enters email address "{user}" and password "{password}"')
|
||||
def add_credentials_to_db(context, user, password):
|
||||
context.user = user
|
||||
@given('user enters email address "{email}" and password "{password}"')
|
||||
def add_credentials_to_db(context, email, password):
|
||||
context.email = email
|
||||
context.password = password
|
||||
|
||||
|
||||
@given('user "{email}" authenticates through facebook')
|
||||
def add_credentials_to_db(context, email):
|
||||
context.email = email
|
||||
|
||||
|
||||
@when('single sign on validates the account')
|
||||
def call_validate_federated_endpoint(context):
|
||||
context.response = context.client.post(
|
||||
'/api/validate-federated',
|
||||
data=dict(email=context.email)
|
||||
)
|
||||
|
||||
|
||||
@when('user attempts to login')
|
||||
def call_internal_login_endpoint(context):
|
||||
credentials = '{}:{}'.format(context.user, context.password).encode()
|
||||
credentials = '{}:{}'.format(context.email, context.password).encode()
|
||||
credentials = b2a_base64(credentials, newline=False).decode()
|
||||
context.response = context.client.get(
|
||||
'/api/login/internal',
|
||||
'/api/internal-login',
|
||||
headers=dict(Authorization='Basic ' + credentials))
|
||||
|
||||
|
||||
|
@ -36,24 +50,26 @@ def check_for_login_success(context):
|
|||
assert_that(ingredient_names, has_item('seleneAccess'))
|
||||
elif cookie.startswith('seleneRefresh'):
|
||||
assert_that(ingredient_names, has_item('seleneRefresh'))
|
||||
context.refresh_token = ingredients['seleneRefresh']
|
||||
else:
|
||||
raise ValueError('unexpected cookie found: ' + cookie)
|
||||
for ingredient_name in ('Domain', 'Expires', 'Max-Age', 'HttpOnly'):
|
||||
for ingredient_name in ('Domain', 'Expires', 'Max-Age'):
|
||||
assert_that(ingredient_names, has_item(ingredient_name))
|
||||
with get_db_connection(context.db_pool) as db:
|
||||
acct_repository = AccountRepository(db)
|
||||
context.account = acct_repository.get_account_by_email(context.email)
|
||||
assert_that(context.account.refresh_tokens, has_item(context.refresh_token))
|
||||
|
||||
|
||||
@then('login fails')
|
||||
def check_for_login_fail(context):
|
||||
@then('login fails with "{error_message}" error')
|
||||
def check_for_login_fail(context, error_message):
|
||||
assert_that(context.response.status_code, equal_to(401))
|
||||
assert_that(
|
||||
context.response.headers['Access-Control-Allow-Origin'],
|
||||
equal_to('*')
|
||||
)
|
||||
assert_that(context.response.is_json, equal_to(True))
|
||||
assert_that(
|
||||
context.response.get_json(),
|
||||
equal_to('provided credentials not found')
|
||||
)
|
||||
assert_that(context.response.get_json(), equal_to(error_message))
|
||||
|
||||
|
||||
def parse_cookie(cookie: str) -> dict:
|
||||
|
|
Loading…
Reference in New Issue