renamed from "antisocial" to "internal" and changed logic to use new architecture
parent
a47c67798a
commit
5b2d65d546
|
@ -1,4 +1,4 @@
|
|||
from .authenticate_antisocial import AuthenticateAntisocialEndpoint
|
||||
from .authenticate_internal import AuthenticateInternalEndpoint
|
||||
from .social_login_tokens import SocialLoginTokensEndpoint
|
||||
from .facebook import AuthorizeFacebookEndpoint
|
||||
from .github import AuthorizeGithubEndpoint
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
from http import HTTPStatus
|
||||
import json
|
||||
from time import time
|
||||
|
||||
import requests as service_request
|
||||
|
||||
from selene.api import SeleneEndpoint, APIError
|
||||
from selene.util.auth import encode_auth_token, ONE_DAY
|
||||
|
||||
|
||||
class AuthenticateAntisocialEndpoint(SeleneEndpoint):
|
||||
"""
|
||||
User Login Resource
|
||||
"""
|
||||
def __init__(self):
|
||||
super(AuthenticateAntisocialEndpoint, self).__init__()
|
||||
self.response_status_code = HTTPStatus.OK
|
||||
self.tartarus_token = None
|
||||
self.users_uuid = None
|
||||
|
||||
def get(self):
|
||||
try:
|
||||
self._authenticate_credentials()
|
||||
except APIError:
|
||||
pass
|
||||
else:
|
||||
self._build_response()
|
||||
|
||||
return self.response
|
||||
|
||||
def _authenticate_credentials(self):
|
||||
basic_credentials = self.request.headers['authorization']
|
||||
service_request_headers = {'Authorization': basic_credentials}
|
||||
auth_service_response = service_request.get(
|
||||
self.config['TARTARUS_BASE_URL'] + '/auth/login',
|
||||
headers=service_request_headers
|
||||
)
|
||||
self._check_for_service_errors(auth_service_response)
|
||||
auth_service_response_content = json.loads(
|
||||
auth_service_response.content
|
||||
)
|
||||
self.users_uuid = auth_service_response_content['uuid']
|
||||
self.tartarus_token = auth_service_response_content['accessToken']
|
||||
|
||||
def _build_response(self):
|
||||
self.selene_token = encode_auth_token(
|
||||
self.config['SECRET_KEY'], self.users_uuid
|
||||
)
|
||||
response_data = dict(
|
||||
expiration=time() + ONE_DAY,
|
||||
seleneToken=self.selene_token,
|
||||
tartarusToken=self.tartarus_token,
|
||||
)
|
||||
self.response = (response_data, HTTPStatus.OK)
|
|
@ -0,0 +1,58 @@
|
|||
"""Authenticate a user logging in with a email address and password
|
||||
|
||||
This type of login is considered "internal" because we are storing the email
|
||||
address and password on our servers. This is as opposed to "external"
|
||||
authentication, which uses a 3rd party authentication, like Google.
|
||||
|
||||
"""
|
||||
from binascii import a2b_base64
|
||||
from http import HTTPStatus
|
||||
from time import time
|
||||
|
||||
from selene.api import SeleneEndpoint, APIError
|
||||
from selene.util.auth import encode_auth_token, SEVEN_DAYS
|
||||
from selene.util.db.connection_pool import get_db_connection
|
||||
from selene.account.repository import get_account_id_from_credentials
|
||||
|
||||
|
||||
class AuthenticateInternalEndpoint(SeleneEndpoint):
|
||||
"""
|
||||
Sign in a user with an email address and password.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(AuthenticateInternalEndpoint, self).__init__()
|
||||
self.response_status_code = HTTPStatus.OK
|
||||
self.account_uuid = None
|
||||
|
||||
def get(self):
|
||||
try:
|
||||
self._authenticate_credentials()
|
||||
except APIError:
|
||||
pass
|
||||
else:
|
||||
self._build_response()
|
||||
|
||||
return self.response
|
||||
|
||||
def _authenticate_credentials(self):
|
||||
"""Compare credentials in request to credentials in database."""
|
||||
|
||||
basic_credentials = self.request.headers['authorization']
|
||||
binary_credentials = a2b_base64(basic_credentials.strip('Basic '))
|
||||
email_address, password = binary_credentials.decode().split(':')
|
||||
with get_db_connection(self.config['DB_CONNECTION_POOL']) as db:
|
||||
self.account_uuid = get_account_id_from_credentials(
|
||||
db,
|
||||
email_address,
|
||||
password
|
||||
)
|
||||
|
||||
def _build_response(self):
|
||||
self.selene_token = encode_auth_token(
|
||||
self.config['SECRET_KEY'], self.account_uuid
|
||||
)
|
||||
response_data = dict(
|
||||
expiration=time() + SEVEN_DAYS,
|
||||
seleneToken=self.selene_token,
|
||||
)
|
||||
self.response = (response_data, HTTPStatus.OK)
|
Loading…
Reference in New Issue