Initial plumbing for authenticating :userAddress context variable information provided using SIWE instead of EIP712 which we've traditionally used.

Leave some room for other possibilities.
pull/3502/head
derekpierre 2024-05-14 16:55:14 -04:00
parent ac5535c3f1
commit e6d1534d1f
No known key found for this signature in database
2 changed files with 95 additions and 34 deletions

View File

@ -0,0 +1,75 @@
from enum import Enum
from eth_account.account import Account
from eth_account.messages import HexBytes, encode_typed_data
from siwe import SiweMessage, VerificationError
class Auth:
class AuthScheme(Enum):
EIP712 = "EIP712"
SIWE = "SIWE"
class InvalidData(Exception):
pass
class AuthenticationFailed(Exception):
pass
@classmethod
def authenticate(cls, data, signature, expected_address):
raise NotImplementedError
@classmethod
def from_scheme(cls, scheme: AuthScheme):
if scheme == cls.AuthScheme.EIP712:
return EIP712Auth
elif scheme == cls.AuthScheme.SIWE:
return SIWEAuth
raise ValueError(f"Invalid authentication scheme: {scheme}")
class EIP712Auth(Auth):
@classmethod
def authenticate(cls, data, signature, expected_address):
try:
# convert hex data for byte fields - bytes are expected by underlying library
# 1. salt
salt = data["domain"]["salt"]
data["domain"]["salt"] = HexBytes(salt)
# 2. blockHash
blockHash = data["message"]["blockHash"]
data["message"]["blockHash"] = HexBytes(blockHash)
signable_message = encode_typed_data(full_message=data)
address_for_signature = Account.recover_message(
signable_message=signable_message, signature=signature
)
except Exception as e:
# data could not be processed
raise cls.InvalidData(f"Invalid auth data: {e.__class__.__name__} - {e}")
if address_for_signature != expected_address:
# verification failed - addresses don't match
raise cls.AuthenticationFailed(
f"Invalid EIP712 signature; does not match expected address, {expected_address}"
)
class SIWEAuth(Auth):
@classmethod
def authenticate(cls, data, signature, expected_address):
try:
siwe_message = SiweMessage(message=data)
except Exception as e:
raise cls.InvalidData(
f"Invalid SIWE message - {e.__class__.__name__} - {e}"
)
try:
siwe_message.verify(signature=signature)
except VerificationError as e:
raise cls.AuthenticationFailed(
f"Invalid SIWE signature - {e.__class__.__name__} - {e}"
)

View File

@ -1,11 +1,10 @@
import re
from typing import Any, List, Union
from eth_account.account import Account
from eth_account.messages import HexBytes, encode_typed_data
from eth_typing import ChecksumAddress
from eth_utils import to_checksum_address
from nucypher.policy.conditions.auth import Auth
from nucypher.policy.conditions.exceptions import (
ContextVariableVerificationFailed,
InvalidContextVariableData,
@ -20,7 +19,7 @@ CONTEXT_REGEX = re.compile(":[a-zA-Z_][a-zA-Z0-9_]*")
def _recover_user_address(**context) -> ChecksumAddress:
"""
Recovers a checksum address from a signed EIP712 message.
Recovers a checksum address from a signed message.
Expected format:
{
@ -28,50 +27,37 @@ def _recover_user_address(**context) -> ChecksumAddress:
{
"signature": "<signature>",
"address": "<address>",
"typedData": "<a complicated EIP712 data structure>"
"scheme": "EIP712" | "SIWE" | ...
"typeData": ...
}
}
"""
# setup
try:
user_address_info = context[USER_ADDRESS_CONTEXT]
signature = user_address_info["signature"]
user_address = to_checksum_address(user_address_info["address"])
eip712_message = user_address_info["typedData"]
expected_address = to_checksum_address(user_address_info["address"])
type_data = user_address_info["typedData"]
# convert hex data for byte fields - bytes are expected by underlying library
# 1. salt
salt = eip712_message["domain"]["salt"]
eip712_message["domain"]["salt"] = HexBytes(salt)
# 2. blockHash
blockHash = eip712_message["message"]["blockHash"]
eip712_message["message"]["blockHash"] = HexBytes(blockHash)
signable_message = encode_typed_data(full_message=eip712_message)
scheme = user_address_info.get("scheme", Auth.AuthScheme.EIP712)
auth = Auth.from_scheme(scheme)
auth.authenticate(
data=type_data, signature=signature, expected_address=expected_address
)
except Auth.InvalidData as e:
raise InvalidContextVariableData(
f"Invalid context variable data for '{USER_ADDRESS_CONTEXT}': {e}"
)
except Auth.AuthenticationFailed:
raise ContextVariableVerificationFailed(
f"Invalid signature for '{USER_ADDRESS_CONTEXT}'; does not match expected address, {expected_address}"
)
except Exception as e:
# data could not be processed
raise InvalidContextVariableData(
f'Invalid data provided for "{USER_ADDRESS_CONTEXT}"; {e.__class__.__name__} - {e}'
)
# actual verification
try:
address_for_signature = Account.recover_message(
signable_message=signable_message, signature=signature
)
if address_for_signature == user_address:
return user_address
except Exception as e:
# exception during verification
raise ContextVariableVerificationFailed(
f"Could not determine address of signature for '{USER_ADDRESS_CONTEXT}'; {e.__class__.__name__} - {e}"
)
# verification failed - addresses don't match
raise ContextVariableVerificationFailed(
f"Signer address for '{USER_ADDRESS_CONTEXT}' signature does not match; expected {user_address}"
)
return expected_address
_DIRECTIVES = {