diff --git a/shared/selene/util/auth.py b/shared/selene/util/auth.py index a462dd38..4839b203 100644 --- a/shared/selene/util/auth.py +++ b/shared/selene/util/auth.py @@ -1,55 +1,89 @@ from datetime import datetime -from logging import getLogger from time import time import jwt -ONE_DAY = 86400 - -_log = getLogger(__package__) +FIFTEEN_MINUTES = 900 +ONE_MONTH = 2628000 class AuthenticationError(Exception): pass -def encode_auth_token(secret_key, user_uuid): - """ - Generates the Auth Token - :return: string - """ - token_expiration = time() + ONE_DAY - payload = dict(iat=datetime.utcnow(), exp=token_expiration, sub=user_uuid) - selene_token = jwt.encode( - payload, - secret_key, - algorithm='HS256' - ) +class AuthenticationTokenGenerator(object): + _access_token = None + _refresh_token = None - # before returning the token, convert it from bytes to string so that - # it can be included in a JSON response object - return selene_token.decode() + def __init__(self, account_id: str): + self.account_id = account_id + self.access_secret = None + self.refresh_secret = None + + def _generate_token(self, token_duration: int): + """ + Generates a JWT token + """ + token_expiration = time() + token_duration + payload = dict( + iat=datetime.utcnow(), + exp=token_expiration, + sub=self.account_id + ) + + if token_duration == FIFTEEN_MINUTES: + secret = self.access_secret + else: + secret = self.refresh_secret + + if secret is None: + raise ValueError('cannot generate a token without a secret') + + token = jwt.encode( + payload, + secret, + algorithm='HS256' + ) + + # convert the token from byte-array to string so that + # it can be included in a JSON response object + return token.decode() + + @property + def access_token(self): + """ + Generates a JWT access token + """ + if self._access_token is None: + self._access_token = self._generate_token(FIFTEEN_MINUTES) + + return self._access_token + + @property + def refresh_token(self): + """ + Generates a JWT access token + """ + if self._refresh_token is None: + self._refresh_token = self._generate_token(ONE_MONTH) + + return self._refresh_token -def decode_auth_token(auth_token: str, secret_key: str) -> tuple: - """ - Decodes the auth token - :param auth_token: the Selene JSON Web Token extracted from cookies. - :param secret_key: the key needed to decode the token - :return: two-value tuple containing a boolean value indicating if the - token is good and the user UUID extracted from the token. UUID will - be None if token is invalid. - """ - try: - payload = jwt.decode(auth_token, secret_key) - user_uuid = payload['sub'] - except jwt.ExpiredSignatureError: - error_msg = 'Selene token expired' - _log.info(error_msg) - raise AuthenticationError(error_msg) - except jwt.InvalidTokenError: - error_msg = 'Invalid Selene token' - _log.info(error_msg) - raise AuthenticationError(error_msg) +class AuthenticationTokenValidator(object): + def __init__(self, token: str, secret: str): + self.token = token + self.secret = secret + self.account_id = None + self.token_is_expired = False + self.token_is_invalid = False - return user_uuid + def validate_token(self): + """Decodes the auth token""" + try: + payload = jwt.decode(self.token, self.secret) + self.account_id = payload['sub'] + except jwt.ExpiredSignatureError: + self.token_is_expired = True + except jwt.InvalidTokenError: + self.token_is_invalid = True