added refresh token logic
parent
4554cbdd0b
commit
fa94d5b1df
|
@ -1,55 +1,89 @@
|
|||
from datetime import datetime
|
||||
from logging import getLogger
|
||||
from time import time
|
||||
|
||||
import jwt
|
||||
|
||||
ONE_DAY = 86400
|
||||
|
||||
_log = getLogger(__package__)
|
||||
FIFTEEN_MINUTES = 900
|
||||
ONE_MONTH = 2628000
|
||||
|
||||
|
||||
class AuthenticationError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def encode_auth_token(secret_key, user_uuid):
|
||||
"""
|
||||
Generates the Auth Token
|
||||
:return: string
|
||||
"""
|
||||
token_expiration = time() + ONE_DAY
|
||||
payload = dict(iat=datetime.utcnow(), exp=token_expiration, sub=user_uuid)
|
||||
selene_token = jwt.encode(
|
||||
payload,
|
||||
secret_key,
|
||||
algorithm='HS256'
|
||||
)
|
||||
class AuthenticationTokenGenerator(object):
|
||||
_access_token = None
|
||||
_refresh_token = None
|
||||
|
||||
# before returning the token, convert it from bytes to string so that
|
||||
# it can be included in a JSON response object
|
||||
return selene_token.decode()
|
||||
def __init__(self, account_id: str):
|
||||
self.account_id = account_id
|
||||
self.access_secret = None
|
||||
self.refresh_secret = None
|
||||
|
||||
def _generate_token(self, token_duration: int):
|
||||
"""
|
||||
Generates a JWT token
|
||||
"""
|
||||
token_expiration = time() + token_duration
|
||||
payload = dict(
|
||||
iat=datetime.utcnow(),
|
||||
exp=token_expiration,
|
||||
sub=self.account_id
|
||||
)
|
||||
|
||||
if token_duration == FIFTEEN_MINUTES:
|
||||
secret = self.access_secret
|
||||
else:
|
||||
secret = self.refresh_secret
|
||||
|
||||
if secret is None:
|
||||
raise ValueError('cannot generate a token without a secret')
|
||||
|
||||
token = jwt.encode(
|
||||
payload,
|
||||
secret,
|
||||
algorithm='HS256'
|
||||
)
|
||||
|
||||
# convert the token from byte-array to string so that
|
||||
# it can be included in a JSON response object
|
||||
return token.decode()
|
||||
|
||||
@property
|
||||
def access_token(self):
|
||||
"""
|
||||
Generates a JWT access token
|
||||
"""
|
||||
if self._access_token is None:
|
||||
self._access_token = self._generate_token(FIFTEEN_MINUTES)
|
||||
|
||||
return self._access_token
|
||||
|
||||
@property
|
||||
def refresh_token(self):
|
||||
"""
|
||||
Generates a JWT access token
|
||||
"""
|
||||
if self._refresh_token is None:
|
||||
self._refresh_token = self._generate_token(ONE_MONTH)
|
||||
|
||||
return self._refresh_token
|
||||
|
||||
|
||||
def decode_auth_token(auth_token: str, secret_key: str) -> tuple:
|
||||
"""
|
||||
Decodes the auth token
|
||||
:param auth_token: the Selene JSON Web Token extracted from cookies.
|
||||
:param secret_key: the key needed to decode the token
|
||||
:return: two-value tuple containing a boolean value indicating if the
|
||||
token is good and the user UUID extracted from the token. UUID will
|
||||
be None if token is invalid.
|
||||
"""
|
||||
try:
|
||||
payload = jwt.decode(auth_token, secret_key)
|
||||
user_uuid = payload['sub']
|
||||
except jwt.ExpiredSignatureError:
|
||||
error_msg = 'Selene token expired'
|
||||
_log.info(error_msg)
|
||||
raise AuthenticationError(error_msg)
|
||||
except jwt.InvalidTokenError:
|
||||
error_msg = 'Invalid Selene token'
|
||||
_log.info(error_msg)
|
||||
raise AuthenticationError(error_msg)
|
||||
class AuthenticationTokenValidator(object):
|
||||
def __init__(self, token: str, secret: str):
|
||||
self.token = token
|
||||
self.secret = secret
|
||||
self.account_id = None
|
||||
self.token_is_expired = False
|
||||
self.token_is_invalid = False
|
||||
|
||||
return user_uuid
|
||||
def validate_token(self):
|
||||
"""Decodes the auth token"""
|
||||
try:
|
||||
payload = jwt.decode(self.token, self.secret)
|
||||
self.account_id = payload['sub']
|
||||
except jwt.ExpiredSignatureError:
|
||||
self.token_is_expired = True
|
||||
except jwt.InvalidTokenError:
|
||||
self.token_is_invalid = True
|
||||
|
|
Loading…
Reference in New Issue