added refresh token logic
parent
2a3c89961e
commit
ff875fc79f
|
@ -9,10 +9,17 @@ from binascii import a2b_base64
|
|||
from http import HTTPStatus
|
||||
from time import time
|
||||
|
||||
from flask import after_this_request
|
||||
|
||||
from selene.account import Account, AccountRepository, AuthenticationRepository
|
||||
from selene.api import SeleneEndpoint, APIError
|
||||
from selene.util.auth import encode_auth_token, SEVEN_DAYS
|
||||
from selene.util.db.connection_pool import get_db_connection
|
||||
from selene.account.repository import get_account_id_from_credentials
|
||||
from selene.util.auth import (
|
||||
AuthenticationError,
|
||||
AuthenticationTokenGenerator,
|
||||
FIFTEEN_MINUTES,
|
||||
ONE_MONTH
|
||||
)
|
||||
from selene.util.db import get_db_connection
|
||||
|
||||
|
||||
class AuthenticateInternalEndpoint(SeleneEndpoint):
|
||||
|
@ -22,16 +29,33 @@ class AuthenticateInternalEndpoint(SeleneEndpoint):
|
|||
def __init__(self):
|
||||
super(AuthenticateInternalEndpoint, self).__init__()
|
||||
self.response_status_code = HTTPStatus.OK
|
||||
self.account_uuid = None
|
||||
self.account: Account = None
|
||||
|
||||
def get(self):
|
||||
try:
|
||||
self._authenticate_credentials()
|
||||
self._generate_tokens()
|
||||
except APIError:
|
||||
pass
|
||||
else:
|
||||
self._build_response()
|
||||
|
||||
@after_this_request
|
||||
def set_cookies(response):
|
||||
response.set_cookie(
|
||||
'seleneAccess',
|
||||
str(self.access_token),
|
||||
max_age=FIFTEEN_MINUTES,
|
||||
httponly=True
|
||||
)
|
||||
response.set_cookie(
|
||||
'seleneRefresh',
|
||||
str(self.refresh_token),
|
||||
max_age=ONE_MONTH,
|
||||
httponly=True
|
||||
)
|
||||
return response
|
||||
|
||||
return self.response
|
||||
|
||||
def _authenticate_credentials(self):
|
||||
|
@ -41,18 +65,25 @@ class AuthenticateInternalEndpoint(SeleneEndpoint):
|
|||
binary_credentials = a2b_base64(basic_credentials.strip('Basic '))
|
||||
email_address, password = binary_credentials.decode().split(':')
|
||||
with get_db_connection(self.config['DB_CONNECTION_POOL']) as db:
|
||||
self.account_uuid = get_account_id_from_credentials(
|
||||
db,
|
||||
email_address,
|
||||
password
|
||||
auth_repository = AuthenticationRepository(db)
|
||||
self.account = auth_repository.get_account_from_credentials(
|
||||
email_address,
|
||||
password
|
||||
)
|
||||
if self.account is None:
|
||||
raise AuthenticationError('provided credentials not found')
|
||||
|
||||
def _generate_tokens(self):
|
||||
token_generator = AuthenticationTokenGenerator(self.account_id)
|
||||
token_generator.access_secret = self.config['ACCESS_SECRET']
|
||||
token_generator.refresh_secret = self.config['REFRESH_SECRET']
|
||||
self.access_token = token_generator.access_token
|
||||
self.refresh_token = token_generator.refresh_token
|
||||
|
||||
def _update_refresh_token_on_db(self):
|
||||
with get_db_connection(self.config['DB_CONNECTION_POOL']) as db:
|
||||
acct_repository = AccountRepository(db)
|
||||
acct_repository.update_refresh_token(self.account)
|
||||
|
||||
def _build_response(self):
|
||||
self.selene_token = encode_auth_token(
|
||||
self.config['SECRET_KEY'], self.account_uuid
|
||||
)
|
||||
response_data = dict(
|
||||
expiration=time() + SEVEN_DAYS,
|
||||
seleneToken=self.selene_token,
|
||||
)
|
||||
self.response = (response_data, HTTPStatus.OK)
|
||||
self.response = ({}, HTTPStatus.OK)
|
||||
|
|
Loading…
Reference in New Issue