mirror of https://github.com/nucypher/nucypher.git
Validate expected issuer in JWT token
parent
a184934ac1
commit
dcc385b8ba
|
@ -77,9 +77,18 @@ class JWTVerificationCall(ExecutionCall):
|
|||
|
||||
# header = jwt.get_unverified_header(self.jwt_token)
|
||||
# algorithm = header['alg']
|
||||
|
||||
require = []
|
||||
if self.expected_issuer:
|
||||
require.append("iss")
|
||||
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
jwt_token, self.public_key, algorithms=self._valid_jwt_algorithms
|
||||
jwt=jwt_token,
|
||||
key=self.public_key,
|
||||
algorithms=self._valid_jwt_algorithms,
|
||||
options=dict(require=require),
|
||||
issuer=self.expected_issuer,
|
||||
)
|
||||
except jwt.exceptions.InvalidAlgorithmError:
|
||||
raise # TODO: raise something specific
|
||||
|
|
|
@ -66,9 +66,19 @@ def test_jwt_verification_call_invalid():
|
|||
JWTVerificationCall(jwt_token=token, public_key=TEST_ECDSA_PUBLIC_KEY)
|
||||
|
||||
|
||||
def test_jwt_verification_call_invalid2():
|
||||
def test_jwt_verification_call_valid():
|
||||
token = jwt_token()
|
||||
TestJWTVerificationCall(jwt_token=token, public_key=TEST_ECDSA_PUBLIC_KEY)
|
||||
call = TestJWTVerificationCall(jwt_token=token, public_key=TEST_ECDSA_PUBLIC_KEY)
|
||||
assert call.execute()
|
||||
|
||||
|
||||
def test_jwt_verification_call_invalid_issuer():
|
||||
token = jwt_token(with_iat=False, claims={"iss": "Isabel"})
|
||||
call = TestJWTVerificationCall(
|
||||
jwt_token=token, public_key=TEST_ECDSA_PUBLIC_KEY, expected_issuer="Isabel"
|
||||
)
|
||||
payload = call.execute()
|
||||
assert payload == {"iss": "Isabel"}
|
||||
|
||||
|
||||
def test_jwt_condition_initialization():
|
||||
|
@ -83,7 +93,7 @@ def test_jwt_condition_initialization():
|
|||
|
||||
|
||||
def test_jwt_condition_verify():
|
||||
token = jwt_token()
|
||||
token = jwt_token(with_iat=False)
|
||||
condition = JWTCondition(
|
||||
jwt_token=":anotherContextVariableForJWTs",
|
||||
public_key=TEST_ECDSA_PUBLIC_KEY,
|
||||
|
@ -92,11 +102,11 @@ def test_jwt_condition_verify():
|
|||
context = {":anotherContextVariableForJWTs": token}
|
||||
success, result = condition.verify(**context)
|
||||
assert success
|
||||
assert result is not None
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_jwt_condition_verify_of_jwt_with_custom_claims():
|
||||
token = jwt_token(claims={"foo": "bar"})
|
||||
token = jwt_token(with_iat=False, claims={"foo": "bar"})
|
||||
condition = JWTCondition(
|
||||
jwt_token=":anotherContextVariableForJWTs",
|
||||
public_key=TEST_ECDSA_PUBLIC_KEY,
|
||||
|
@ -105,4 +115,32 @@ def test_jwt_condition_verify_of_jwt_with_custom_claims():
|
|||
context = {":anotherContextVariableForJWTs": token}
|
||||
success, result = condition.verify(**context)
|
||||
assert success
|
||||
assert result is not None
|
||||
assert result == {"foo": "bar"}
|
||||
|
||||
|
||||
def test_jwt_condition_verify_with_correct_issuer():
|
||||
token = jwt_token(with_iat=False, claims={"iss": "Isabel"})
|
||||
condition = JWTCondition(
|
||||
jwt_token=":anotherContextVariableForJWTs",
|
||||
public_key=TEST_ECDSA_PUBLIC_KEY,
|
||||
expected_issuer="Isabel",
|
||||
)
|
||||
|
||||
context = {":anotherContextVariableForJWTs": token}
|
||||
success, result = condition.verify(**context)
|
||||
assert success
|
||||
assert result == {"iss": "Isabel"}
|
||||
|
||||
|
||||
def test_jwt_condition_verify_with_incorrect_issuer():
|
||||
token = jwt_token(with_iat=False, claims={"iss": "Isabel"})
|
||||
condition = JWTCondition(
|
||||
jwt_token=":anotherContextVariableForJWTs",
|
||||
public_key=TEST_ECDSA_PUBLIC_KEY,
|
||||
expected_issuer="Isobel",
|
||||
)
|
||||
|
||||
context = {":anotherContextVariableForJWTs": token}
|
||||
success, result = condition.verify(**context)
|
||||
assert not success
|
||||
assert result is None
|
||||
|
|
Loading…
Reference in New Issue