mirror of https://github.com/nucypher/nucypher.git
Merge pull request #3508 from derekpierre/siwe-part-2
SIWE (EIP4361) Authentication Part 2pull/3510/head
commit
f1914f37a2
|
@ -0,0 +1,2 @@
|
|||
Add functionality for ``:userAddressEIP712`` and ``:userAddressEIP4361`` to provide specific authentication
|
||||
support for user address context values for conditions. ``:userAddress`` will allow any valid authentication scheme.
|
|
@ -10,7 +10,7 @@ from siwe import SiweMessage, VerificationError
|
|||
class Auth:
|
||||
class AuthScheme(Enum):
|
||||
EIP712 = "EIP712"
|
||||
SIWE = "SIWE"
|
||||
EIP4361 = "EIP4361"
|
||||
|
||||
@classmethod
|
||||
def values(cls) -> List[str]:
|
||||
|
@ -30,8 +30,8 @@ class Auth:
|
|||
def from_scheme(cls, scheme: str):
|
||||
if scheme == cls.AuthScheme.EIP712.value:
|
||||
return EIP712Auth
|
||||
elif scheme == cls.AuthScheme.SIWE.value:
|
||||
return SIWEAuth
|
||||
elif scheme == cls.AuthScheme.EIP4361.value:
|
||||
return EIP4361Auth
|
||||
|
||||
raise ValueError(f"Invalid authentication scheme: {scheme}")
|
||||
|
||||
|
@ -65,8 +65,7 @@ class EIP712Auth(Auth):
|
|||
)
|
||||
|
||||
|
||||
class SIWEAuth(Auth):
|
||||
# TODO; this is a safety precaution - is this what we want and is this the correct value?
|
||||
class EIP4361Auth(Auth):
|
||||
FRESHNESS_IN_HOURS = 2
|
||||
|
||||
@classmethod
|
||||
|
@ -75,25 +74,27 @@ class SIWEAuth(Auth):
|
|||
siwe_message = SiweMessage(message=data)
|
||||
except Exception as e:
|
||||
raise cls.InvalidData(
|
||||
f"Invalid SIWE message - {str(e) or e.__class__.__name__}"
|
||||
)
|
||||
|
||||
# enforce a freshness check
|
||||
issued_at = maya.MayaDT.from_iso8601(siwe_message.issued_at)
|
||||
if maya.now() > issued_at.add(hours=cls.FRESHNESS_IN_HOURS):
|
||||
raise cls.AuthenticationFailed(
|
||||
f"SIWE message is stale; more than {cls.FRESHNESS_IN_HOURS} hours old (issued at {issued_at.iso8601()})"
|
||||
f"Invalid EIP4361 message - {str(e) or e.__class__.__name__}"
|
||||
)
|
||||
|
||||
try:
|
||||
siwe_message.verify(signature=signature)
|
||||
except VerificationError as e:
|
||||
raise cls.AuthenticationFailed(
|
||||
f"SIWE verification failed - {str(e) or e.__class__.__name__}"
|
||||
f"EIP4361 verification failed - {str(e) or e.__class__.__name__}"
|
||||
)
|
||||
|
||||
# enforce a freshness check
|
||||
# TODO: "not-before" throws off the freshness timing; so skip if specified. Is this safe / what we want?
|
||||
if not siwe_message.not_before:
|
||||
issued_at = maya.MayaDT.from_iso8601(siwe_message.issued_at)
|
||||
if maya.now() > issued_at.add(hours=cls.FRESHNESS_IN_HOURS):
|
||||
raise cls.AuthenticationFailed(
|
||||
f"EIP4361 message is stale; more than {cls.FRESHNESS_IN_HOURS} hours old (issued at {issued_at.iso8601()})"
|
||||
)
|
||||
|
||||
if siwe_message.address != expected_address:
|
||||
# verification failed - addresses don't match
|
||||
raise cls.AuthenticationFailed(
|
||||
f"Invalid SIWE signature; does not match expected address, {expected_address}"
|
||||
f"Invalid EIP4361 signature; does not match expected address, {expected_address}"
|
||||
)
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import re
|
||||
from functools import partial
|
||||
from typing import Any, List, Union
|
||||
|
||||
from eth_typing import ChecksumAddress
|
||||
|
@ -12,18 +13,30 @@ from nucypher.policy.conditions.exceptions import (
|
|||
)
|
||||
|
||||
USER_ADDRESS_CONTEXT = ":userAddress"
|
||||
USER_ADDRESS_EIP712_CONTEXT = ":userAddressEIP712"
|
||||
USER_ADDRESS_EIP4361_CONTEXT = ":userAddressEIP4361"
|
||||
|
||||
CONTEXT_PREFIX = ":"
|
||||
CONTEXT_REGEX = re.compile(":[a-zA-Z_][a-zA-Z0-9_]*")
|
||||
|
||||
USER_ADDRESS_SCHEMES = {
|
||||
USER_ADDRESS_CONTEXT: None, # any of the available auth types
|
||||
USER_ADDRESS_EIP712_CONTEXT: Auth.AuthScheme.EIP712.value,
|
||||
USER_ADDRESS_EIP4361_CONTEXT: Auth.AuthScheme.EIP4361.value,
|
||||
}
|
||||
|
||||
def _recover_user_address(**context) -> ChecksumAddress:
|
||||
|
||||
class UnexpectedScheme(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _resolve_user_address(user_address_context_variable, **context) -> ChecksumAddress:
|
||||
"""
|
||||
Recovers a checksum address from a signed message.
|
||||
|
||||
Expected format:
|
||||
{
|
||||
":userAddress":
|
||||
":userAddress...":
|
||||
{
|
||||
"signature": "<signature>",
|
||||
"address": "<address>",
|
||||
|
@ -33,35 +46,50 @@ def _recover_user_address(**context) -> ChecksumAddress:
|
|||
}
|
||||
"""
|
||||
try:
|
||||
user_address_info = context[USER_ADDRESS_CONTEXT]
|
||||
user_address_info = context[user_address_context_variable]
|
||||
signature = user_address_info["signature"]
|
||||
expected_address = to_checksum_address(user_address_info["address"])
|
||||
type_data = user_address_info["typedData"]
|
||||
typed_data = user_address_info["typedData"]
|
||||
|
||||
scheme = user_address_info.get("scheme", Auth.AuthScheme.EIP712.value)
|
||||
expected_scheme = USER_ADDRESS_SCHEMES[user_address_context_variable]
|
||||
if expected_scheme and scheme != expected_scheme:
|
||||
raise UnexpectedScheme(
|
||||
f"Expected {expected_scheme} authentication scheme, but received {scheme}"
|
||||
)
|
||||
|
||||
auth = Auth.from_scheme(scheme)
|
||||
auth.authenticate(
|
||||
data=type_data, signature=signature, expected_address=expected_address
|
||||
data=typed_data, signature=signature, expected_address=expected_address
|
||||
)
|
||||
except Auth.InvalidData as e:
|
||||
raise InvalidContextVariableData(
|
||||
f"Invalid context variable data for '{USER_ADDRESS_CONTEXT}'; {e}"
|
||||
f"Invalid context variable data for '{user_address_context_variable}'; {e}"
|
||||
)
|
||||
except Auth.AuthenticationFailed as e:
|
||||
raise ContextVariableVerificationFailed(
|
||||
f"Authentication failed for '{USER_ADDRESS_CONTEXT}'; {e}"
|
||||
f"Authentication failed for '{user_address_context_variable}'; {e}"
|
||||
)
|
||||
except Exception as e:
|
||||
# data could not be processed
|
||||
raise InvalidContextVariableData(
|
||||
f"Invalid context variable data for '{USER_ADDRESS_CONTEXT}'; {e.__class__.__name__} - {e}"
|
||||
f"Invalid context variable data for '{user_address_context_variable}'; {e.__class__.__name__} - {e}"
|
||||
)
|
||||
|
||||
return expected_address
|
||||
|
||||
|
||||
_DIRECTIVES = {
|
||||
USER_ADDRESS_CONTEXT: _recover_user_address,
|
||||
USER_ADDRESS_CONTEXT: partial(
|
||||
_resolve_user_address, user_address_context_variable=USER_ADDRESS_CONTEXT
|
||||
),
|
||||
USER_ADDRESS_EIP712_CONTEXT: partial(
|
||||
_resolve_user_address, user_address_context_variable=USER_ADDRESS_EIP712_CONTEXT
|
||||
),
|
||||
USER_ADDRESS_EIP4361_CONTEXT: partial(
|
||||
_resolve_user_address,
|
||||
user_address_context_variable=USER_ADDRESS_EIP4361_CONTEXT,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import copy
|
||||
import json
|
||||
import os
|
||||
from unittest import mock
|
||||
|
@ -14,10 +13,8 @@ from nucypher.blockchain.eth.agents import (
|
|||
SubscriptionManagerAgent,
|
||||
)
|
||||
from nucypher.blockchain.eth.constants import NULL_ADDRESS
|
||||
from nucypher.policy.conditions.auth import Auth
|
||||
from nucypher.policy.conditions.context import (
|
||||
USER_ADDRESS_CONTEXT,
|
||||
_recover_user_address,
|
||||
get_context_value,
|
||||
)
|
||||
from nucypher.policy.conditions.evm import (
|
||||
|
@ -25,10 +22,7 @@ from nucypher.policy.conditions.evm import (
|
|||
RPCCondition,
|
||||
)
|
||||
from nucypher.policy.conditions.exceptions import (
|
||||
ContextVariableVerificationFailed,
|
||||
InvalidCondition,
|
||||
InvalidConditionContext,
|
||||
InvalidContextVariableData,
|
||||
NoConnectionToChain,
|
||||
RequiredContextVariable,
|
||||
RPCExecutionFailed,
|
||||
|
@ -64,76 +58,6 @@ def test_required_context_variable(
|
|||
) # no context
|
||||
|
||||
|
||||
@pytest.mark.parametrize("expected_entry", ["address", "signature", "typedData"])
|
||||
@pytest.mark.parametrize(
|
||||
"valid_user_address_context", Auth.AuthScheme.values(), indirect=True
|
||||
)
|
||||
def test_user_address_context_missing_required_entries(
|
||||
expected_entry, valid_user_address_context
|
||||
):
|
||||
context = copy.deepcopy(valid_user_address_context)
|
||||
del context[USER_ADDRESS_CONTEXT][expected_entry]
|
||||
with pytest.raises(InvalidContextVariableData):
|
||||
get_context_value(USER_ADDRESS_CONTEXT, **context)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"valid_user_address_context", Auth.AuthScheme.values(), indirect=True
|
||||
)
|
||||
def test_user_address_context_invalid_typed_data(valid_user_address_context):
|
||||
# invalid typed data
|
||||
context = copy.deepcopy(valid_user_address_context)
|
||||
context[USER_ADDRESS_CONTEXT]["typedData"] = dict(
|
||||
randomSaying="Comparison is the thief of joy." # -– Theodore Roosevelt
|
||||
)
|
||||
with pytest.raises(InvalidContextVariableData):
|
||||
get_context_value(USER_ADDRESS_CONTEXT, **context)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"valid_user_address_context", Auth.AuthScheme.values(), indirect=True
|
||||
)
|
||||
def test_user_address_context_variable_verification(
|
||||
valid_user_address_context, accounts
|
||||
):
|
||||
# call underlying directive directly (appease codecov)
|
||||
address = _recover_user_address(**valid_user_address_context)
|
||||
assert address == valid_user_address_context[USER_ADDRESS_CONTEXT]["address"]
|
||||
|
||||
# valid user address context
|
||||
address = get_context_value(USER_ADDRESS_CONTEXT, **valid_user_address_context)
|
||||
assert address == valid_user_address_context[USER_ADDRESS_CONTEXT]["address"]
|
||||
|
||||
# invalid user address context - signature does not match address
|
||||
# internals are mutable - deepcopy
|
||||
mismatch_with_address_context = copy.deepcopy(valid_user_address_context)
|
||||
mismatch_with_address_context[USER_ADDRESS_CONTEXT][
|
||||
"address"
|
||||
] = accounts.etherbase_account
|
||||
with pytest.raises(ContextVariableVerificationFailed):
|
||||
get_context_value(USER_ADDRESS_CONTEXT, **mismatch_with_address_context)
|
||||
|
||||
# invalid user address context - signature does not match address
|
||||
# internals are mutable - deepcopy
|
||||
mismatch_with_address_context = copy.deepcopy(valid_user_address_context)
|
||||
signature = (
|
||||
"0x93252ddff5f90584b27b5eef1915b23a8b01a703be56c8bf0660647c15cb75e9"
|
||||
"1983bde9877eaad11da5a3ebc9b64957f1c182536931f9844d0c600f0c41293d1b"
|
||||
)
|
||||
mismatch_with_address_context[USER_ADDRESS_CONTEXT]["signature"] = signature
|
||||
with pytest.raises(ContextVariableVerificationFailed):
|
||||
get_context_value(USER_ADDRESS_CONTEXT, **mismatch_with_address_context)
|
||||
|
||||
# invalid signature
|
||||
# internals are mutable - deepcopy
|
||||
invalid_signature_context = copy.deepcopy(valid_user_address_context)
|
||||
invalid_signature_context[USER_ADDRESS_CONTEXT][
|
||||
"signature"
|
||||
] = "0xdeadbeef" # invalid signature
|
||||
with pytest.raises(InvalidConditionContext):
|
||||
get_context_value(USER_ADDRESS_CONTEXT, **invalid_signature_context)
|
||||
|
||||
|
||||
@mock.patch(
|
||||
GET_CONTEXT_VALUE_IMPORT_PATH,
|
||||
side_effect=_dont_validate_user_address,
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import tempfile
|
||||
from datetime import timedelta
|
||||
|
@ -648,9 +649,14 @@ def rpc_condition():
|
|||
return condition
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def valid_user_address_context(request):
|
||||
if request.param == Auth.AuthScheme.EIP712.value:
|
||||
@pytest.fixture(scope="function")
|
||||
def valid_user_address_auth_message(request):
|
||||
auth_message_type = request.param
|
||||
if auth_message_type is None:
|
||||
# pick one at random
|
||||
auth_message_type = random.choice(Auth.AuthScheme.values())
|
||||
|
||||
if auth_message_type == Auth.AuthScheme.EIP712.value:
|
||||
auth_message = {
|
||||
"signature": "0x488a7acefdc6d098eedf73cdfd379777c0f4a4023a660d350d3bf309a51dd4251abaad9cdd11b71c400cfb4625c14ca142f72b39165bd980c8da1ea32892ff071c",
|
||||
"address": "0x5ce9454909639D2D17A3F753ce7d93fa0b9aB12E",
|
||||
|
@ -685,7 +691,7 @@ def valid_user_address_context(request):
|
|||
},
|
||||
},
|
||||
}
|
||||
elif request.param == Auth.AuthScheme.SIWE.value:
|
||||
elif auth_message_type == Auth.AuthScheme.EIP4361.value:
|
||||
signer = InMemorySigner()
|
||||
siwe_message_data = {
|
||||
"domain": "login.xyz",
|
||||
|
@ -704,13 +710,13 @@ def valid_user_address_context(request):
|
|||
auth_message = {
|
||||
"signature": f"{signature.hex()}",
|
||||
"address": f"{signer.accounts[0]}",
|
||||
"scheme": f"{Auth.AuthScheme.SIWE.value}",
|
||||
"scheme": f"{Auth.AuthScheme.EIP4361.value}",
|
||||
"typedData": f"{siwe_message}",
|
||||
}
|
||||
else:
|
||||
raise ValueError(f"No context for provided scheme, {request.param}")
|
||||
|
||||
return {USER_ADDRESS_CONTEXT: auth_message}
|
||||
return auth_message
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
|
|
|
@ -3,13 +3,14 @@ import pytest
|
|||
from siwe import SiweMessage
|
||||
|
||||
from nucypher.blockchain.eth.signers import InMemorySigner
|
||||
from nucypher.policy.conditions.auth import Auth, EIP712Auth, SIWEAuth
|
||||
from nucypher.policy.conditions.context import USER_ADDRESS_CONTEXT
|
||||
from nucypher.policy.conditions.auth import Auth, EIP712Auth, EIP4361Auth
|
||||
|
||||
|
||||
def test_auth_scheme():
|
||||
for scheme in Auth.AuthScheme:
|
||||
expected_scheme = EIP712Auth if scheme == Auth.AuthScheme.EIP712 else SIWEAuth
|
||||
expected_scheme = (
|
||||
EIP712Auth if scheme == Auth.AuthScheme.EIP712 else EIP4361Auth
|
||||
)
|
||||
assert Auth.from_scheme(scheme=scheme.value) == expected_scheme
|
||||
|
||||
# non-existent scheme
|
||||
|
@ -18,12 +19,14 @@ def test_auth_scheme():
|
|||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"valid_user_address_context", [Auth.AuthScheme.EIP712.value], indirect=True
|
||||
"valid_user_address_auth_message", [Auth.AuthScheme.EIP712.value], indirect=True
|
||||
)
|
||||
def test_authenticate_eip712(valid_user_address_context, get_random_checksum_address):
|
||||
data = valid_user_address_context[USER_ADDRESS_CONTEXT]["typedData"]
|
||||
signature = valid_user_address_context[USER_ADDRESS_CONTEXT]["signature"]
|
||||
address = valid_user_address_context[USER_ADDRESS_CONTEXT]["address"]
|
||||
def test_authenticate_eip712(
|
||||
valid_user_address_auth_message, get_random_checksum_address
|
||||
):
|
||||
data = valid_user_address_auth_message["typedData"]
|
||||
signature = valid_user_address_auth_message["signature"]
|
||||
address = valid_user_address_auth_message["address"]
|
||||
|
||||
# invalid data
|
||||
invalid_data = dict(data) # make a copy
|
||||
|
@ -69,7 +72,7 @@ def test_authenticate_eip712(valid_user_address_context, get_random_checksum_add
|
|||
EIP712Auth.authenticate(data, signature, address)
|
||||
|
||||
|
||||
def test_authenticate_siwe(get_random_checksum_address):
|
||||
def test_authenticate_eip4361(get_random_checksum_address):
|
||||
signer = InMemorySigner()
|
||||
siwe_message_data = {
|
||||
"domain": "login.xyz",
|
||||
|
@ -88,14 +91,14 @@ def test_authenticate_siwe(get_random_checksum_address):
|
|||
valid_address_for_signature = signer.accounts[0]
|
||||
|
||||
# everything valid
|
||||
SIWEAuth.authenticate(
|
||||
EIP4361Auth.authenticate(
|
||||
valid_message, valid_message_signature, valid_address_for_signature
|
||||
)
|
||||
|
||||
# invalid data
|
||||
invalid_data = "just a regular old string"
|
||||
with pytest.raises(Auth.InvalidData):
|
||||
SIWEAuth.authenticate(
|
||||
EIP4361Auth.authenticate(
|
||||
data=invalid_data,
|
||||
signature=valid_message_signature,
|
||||
expected_address=valid_address_for_signature,
|
||||
|
@ -107,9 +110,10 @@ def test_authenticate_siwe(get_random_checksum_address):
|
|||
"1983bde9877eaad11da5a3ebc9b64957f1c182536931f9844d0c600f0c41293d1b"
|
||||
)
|
||||
with pytest.raises(
|
||||
Auth.AuthenticationFailed, match="SIWE verification failed - InvalidSignature"
|
||||
Auth.AuthenticationFailed,
|
||||
match="EIP4361 verification failed - InvalidSignature",
|
||||
):
|
||||
SIWEAuth.authenticate(
|
||||
EIP4361Auth.authenticate(
|
||||
data=valid_message,
|
||||
signature=incorrect_signature,
|
||||
expected_address=valid_address_for_signature,
|
||||
|
@ -118,9 +122,10 @@ def test_authenticate_siwe(get_random_checksum_address):
|
|||
# invalid signature
|
||||
invalid_signature = "0xdeadbeef"
|
||||
with pytest.raises(
|
||||
Auth.AuthenticationFailed, match="SIWE verification failed - InvalidSignature"
|
||||
Auth.AuthenticationFailed,
|
||||
match="EIP4361 verification failed - InvalidSignature",
|
||||
):
|
||||
SIWEAuth.authenticate(
|
||||
EIP4361Auth.authenticate(
|
||||
data=valid_message,
|
||||
signature=invalid_signature,
|
||||
expected_address=valid_address_for_signature,
|
||||
|
@ -130,7 +135,7 @@ def test_authenticate_siwe(get_random_checksum_address):
|
|||
with pytest.raises(
|
||||
Auth.AuthenticationFailed, match="does not match expected address"
|
||||
):
|
||||
SIWEAuth.authenticate(
|
||||
EIP4361Auth.authenticate(
|
||||
data=valid_message,
|
||||
signature=valid_message_signature,
|
||||
expected_address=get_random_checksum_address(),
|
||||
|
@ -139,21 +144,21 @@ def test_authenticate_siwe(get_random_checksum_address):
|
|||
# stale message
|
||||
stale_message_data = dict(siwe_message_data)
|
||||
stale_message_data["issued_at"] = (
|
||||
f"{maya.now().subtract(hours=SIWEAuth.FRESHNESS_IN_HOURS + 1).iso8601()}"
|
||||
f"{maya.now().subtract(hours=EIP4361Auth.FRESHNESS_IN_HOURS + 1).iso8601()}"
|
||||
)
|
||||
stale_message = SiweMessage(stale_message_data).prepare_message()
|
||||
stale_message_signature = signer.sign_message(
|
||||
account=valid_address_for_signature, message=stale_message.encode()
|
||||
)
|
||||
with pytest.raises(Auth.AuthenticationFailed, match="SIWE message is stale"):
|
||||
SIWEAuth.authenticate(
|
||||
with pytest.raises(Auth.AuthenticationFailed, match="EIP4361 message is stale"):
|
||||
EIP4361Auth.authenticate(
|
||||
stale_message, stale_message_signature.hex(), valid_address_for_signature
|
||||
)
|
||||
|
||||
# old, but not stale and still valid
|
||||
old_but_not_stale_message_data = dict(siwe_message_data)
|
||||
old_but_not_stale_message_data["issued_at"] = (
|
||||
f"{maya.now().subtract(hours=SIWEAuth.FRESHNESS_IN_HOURS - 1).iso8601()}"
|
||||
f"{maya.now().subtract(hours=EIP4361Auth.FRESHNESS_IN_HOURS - 1).iso8601()}"
|
||||
)
|
||||
old_but_not_stale_message = SiweMessage(
|
||||
old_but_not_stale_message_data
|
||||
|
@ -161,7 +166,7 @@ def test_authenticate_siwe(get_random_checksum_address):
|
|||
old_not_stale_message_signature = signer.sign_message(
|
||||
account=valid_address_for_signature, message=old_but_not_stale_message.encode()
|
||||
)
|
||||
SIWEAuth.authenticate(
|
||||
EIP4361Auth.authenticate(
|
||||
old_but_not_stale_message,
|
||||
old_not_stale_message_signature.hex(),
|
||||
valid_address_for_signature,
|
||||
|
@ -180,10 +185,50 @@ def test_authenticate_siwe(get_random_checksum_address):
|
|||
message=not_stale_but_past_expiry_message.encode(),
|
||||
)
|
||||
with pytest.raises(
|
||||
Auth.AuthenticationFailed, match="SIWE verification failed - ExpiredMessage"
|
||||
Auth.AuthenticationFailed, match="EIP4361 verification failed - ExpiredMessage"
|
||||
):
|
||||
SIWEAuth.authenticate(
|
||||
EIP4361Auth.authenticate(
|
||||
not_stale_but_past_expiry_message,
|
||||
not_stale_but_past_expiry_signature.hex(),
|
||||
valid_address_for_signature,
|
||||
)
|
||||
|
||||
# not before specified
|
||||
not_before_message_data = dict(siwe_message_data)
|
||||
not_before_message_data["not_before"] = f"{maya.now().add(hours=1).iso8601()}"
|
||||
not_before_message = SiweMessage(not_before_message_data).prepare_message()
|
||||
not_before_message_signature = signer.sign_message(
|
||||
account=valid_address_for_signature, message=not_before_message.encode()
|
||||
)
|
||||
with pytest.raises(
|
||||
Auth.AuthenticationFailed,
|
||||
match="EIP4361 verification failed - NotYetValidMessage",
|
||||
):
|
||||
EIP4361Auth.authenticate(
|
||||
not_before_message,
|
||||
not_before_message_signature.hex(),
|
||||
valid_address_for_signature,
|
||||
)
|
||||
|
||||
# not before specified, so stale message check not performed
|
||||
not_before_no_stale_check_message_data = dict(siwe_message_data)
|
||||
not_before_no_stale_check_message_data["not_before"] = (
|
||||
f"{maya.now().subtract(hours=EIP4361Auth.FRESHNESS_IN_HOURS - 1).iso8601()}"
|
||||
)
|
||||
# issued more than freshness check hours ago
|
||||
old_but_not_stale_message_data["issued_at"] = (
|
||||
f"{maya.now().subtract(hours=EIP4361Auth.FRESHNESS_IN_HOURS - 2).iso8601()}"
|
||||
)
|
||||
not_before_no_stale_check_message = SiweMessage(
|
||||
not_before_no_stale_check_message_data
|
||||
).prepare_message()
|
||||
not_before_no_stale_check_message_signature = signer.sign_message(
|
||||
account=valid_address_for_signature,
|
||||
message=not_before_no_stale_check_message.encode(),
|
||||
)
|
||||
# even though stale, "not-before" causes check to be skipped
|
||||
EIP4361Auth.authenticate(
|
||||
not_before_no_stale_check_message,
|
||||
not_before_no_stale_check_message_signature.hex(),
|
||||
valid_address_for_signature,
|
||||
)
|
||||
|
|
|
@ -1,14 +1,28 @@
|
|||
import copy
|
||||
import itertools
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from nucypher.policy.conditions.auth import Auth
|
||||
from nucypher.policy.conditions.context import (
|
||||
USER_ADDRESS_EIP712_CONTEXT,
|
||||
USER_ADDRESS_EIP4361_CONTEXT,
|
||||
USER_ADDRESS_SCHEMES,
|
||||
_resolve_context_variable,
|
||||
_resolve_user_address,
|
||||
get_context_value,
|
||||
is_context_variable,
|
||||
resolve_any_context_variables,
|
||||
)
|
||||
from nucypher.policy.conditions.lingo import ReturnValueTest
|
||||
from nucypher.policy.conditions.exceptions import (
|
||||
ContextVariableVerificationFailed,
|
||||
InvalidConditionContext,
|
||||
InvalidContextVariableData,
|
||||
)
|
||||
from nucypher.policy.conditions.lingo import (
|
||||
ReturnValueTest,
|
||||
)
|
||||
|
||||
INVALID_CONTEXT_PARAM_NAMES = [
|
||||
":",
|
||||
|
@ -81,3 +95,113 @@ def test_resolve_any_context_variables():
|
|||
assert resolved_return_value.comparator == return_value_test.comparator
|
||||
assert resolved_return_value.index == return_value_test.index
|
||||
assert resolved_return_value.value == resolved_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize("expected_entry", ["address", "signature", "typedData"])
|
||||
@pytest.mark.parametrize(
|
||||
"context_variable_name, valid_user_address_auth_message",
|
||||
list(USER_ADDRESS_SCHEMES.items()),
|
||||
indirect=["valid_user_address_auth_message"],
|
||||
)
|
||||
def test_user_address_context_missing_required_entries(
|
||||
expected_entry, context_variable_name, valid_user_address_auth_message
|
||||
):
|
||||
context = {context_variable_name: valid_user_address_auth_message}
|
||||
del context[context_variable_name][expected_entry]
|
||||
with pytest.raises(InvalidContextVariableData):
|
||||
get_context_value(context_variable_name, **context)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"context_variable_name, valid_user_address_auth_message",
|
||||
list(USER_ADDRESS_SCHEMES.items()),
|
||||
indirect=["valid_user_address_auth_message"],
|
||||
)
|
||||
def test_user_address_context_invalid_typed_data(
|
||||
context_variable_name, valid_user_address_auth_message
|
||||
):
|
||||
# invalid typed data
|
||||
context = {context_variable_name: valid_user_address_auth_message}
|
||||
context[context_variable_name]["typedData"] = dict(
|
||||
randomSaying="Comparison is the thief of joy." # -– Theodore Roosevelt
|
||||
)
|
||||
with pytest.raises(InvalidContextVariableData):
|
||||
get_context_value(context_variable_name, **context)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"context_variable_name, valid_user_address_auth_message",
|
||||
list(
|
||||
zip(
|
||||
[
|
||||
USER_ADDRESS_EIP712_CONTEXT,
|
||||
USER_ADDRESS_EIP4361_CONTEXT,
|
||||
],
|
||||
[
|
||||
Auth.AuthScheme.EIP4361.value,
|
||||
Auth.AuthScheme.EIP712.value,
|
||||
],
|
||||
)
|
||||
),
|
||||
indirect=["valid_user_address_auth_message"],
|
||||
)
|
||||
def test_user_address_context_unexpected_scheme_data(
|
||||
context_variable_name, valid_user_address_auth_message
|
||||
):
|
||||
# scheme in message is unexpected for context variable name
|
||||
context = {context_variable_name: valid_user_address_auth_message}
|
||||
with pytest.raises(InvalidContextVariableData, match="UnexpectedScheme"):
|
||||
get_context_value(context_variable_name, **context)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"context_variable_name, valid_user_address_auth_message",
|
||||
list(USER_ADDRESS_SCHEMES.items()),
|
||||
indirect=["valid_user_address_auth_message"],
|
||||
)
|
||||
def test_user_address_context_variable_verification(
|
||||
context_variable_name, valid_user_address_auth_message, get_random_checksum_address
|
||||
):
|
||||
valid_user_address_context = {
|
||||
context_variable_name: valid_user_address_auth_message
|
||||
}
|
||||
|
||||
# call underlying directive directly (appease codecov)
|
||||
address = _resolve_user_address(
|
||||
user_address_context_variable=context_variable_name,
|
||||
**valid_user_address_context,
|
||||
)
|
||||
assert address == valid_user_address_context[context_variable_name]["address"]
|
||||
|
||||
# valid user address context
|
||||
address = get_context_value(context_variable_name, **valid_user_address_context)
|
||||
assert address == valid_user_address_context[context_variable_name]["address"]
|
||||
|
||||
# invalid user address context - signature does not match address
|
||||
# internals are mutable - deepcopy
|
||||
mismatch_with_address_context = copy.deepcopy(valid_user_address_context)
|
||||
mismatch_with_address_context[context_variable_name][
|
||||
"address"
|
||||
] = get_random_checksum_address()
|
||||
with pytest.raises(ContextVariableVerificationFailed):
|
||||
get_context_value(context_variable_name, **mismatch_with_address_context)
|
||||
|
||||
# invalid user address context - signature does not match address
|
||||
# internals are mutable - deepcopy
|
||||
mismatch_with_address_context = copy.deepcopy(valid_user_address_context)
|
||||
signature = (
|
||||
"0x93252ddff5f90584b27b5eef1915b23a8b01a703be56c8bf0660647c15cb75e9"
|
||||
"1983bde9877eaad11da5a3ebc9b64957f1c182536931f9844d0c600f0c41293d1b"
|
||||
)
|
||||
mismatch_with_address_context[context_variable_name]["signature"] = signature
|
||||
with pytest.raises(ContextVariableVerificationFailed):
|
||||
get_context_value(context_variable_name, **mismatch_with_address_context)
|
||||
|
||||
# invalid signature
|
||||
# internals are mutable - deepcopy
|
||||
invalid_signature_context = copy.deepcopy(valid_user_address_context)
|
||||
invalid_signature_context[context_variable_name][
|
||||
"signature"
|
||||
] = "0xdeadbeef" # invalid signature
|
||||
with pytest.raises(InvalidConditionContext):
|
||||
get_context_value(context_variable_name, **invalid_signature_context)
|
||||
|
|
Loading…
Reference in New Issue