From e6d1534d1feffb35e8353fd7a893cea7a881c6f6 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Tue, 14 May 2024 16:55:14 -0400 Subject: [PATCH] Initial plumbing for authenticating :userAddress context variable information provided using SIWE instead of EIP712 which we've traditionally used. Leave some room for other possibilities. --- nucypher/policy/conditions/auth.py | 75 +++++++++++++++++++++++++++ nucypher/policy/conditions/context.py | 54 +++++++------------ 2 files changed, 95 insertions(+), 34 deletions(-) create mode 100644 nucypher/policy/conditions/auth.py diff --git a/nucypher/policy/conditions/auth.py b/nucypher/policy/conditions/auth.py new file mode 100644 index 000000000..4d7836f23 --- /dev/null +++ b/nucypher/policy/conditions/auth.py @@ -0,0 +1,75 @@ +from enum import Enum + +from eth_account.account import Account +from eth_account.messages import HexBytes, encode_typed_data +from siwe import SiweMessage, VerificationError + + +class Auth: + class AuthScheme(Enum): + EIP712 = "EIP712" + SIWE = "SIWE" + + class InvalidData(Exception): + pass + + class AuthenticationFailed(Exception): + pass + + @classmethod + def authenticate(cls, data, signature, expected_address): + raise NotImplementedError + + @classmethod + def from_scheme(cls, scheme: AuthScheme): + if scheme == cls.AuthScheme.EIP712: + return EIP712Auth + elif scheme == cls.AuthScheme.SIWE: + return SIWEAuth + + raise ValueError(f"Invalid authentication scheme: {scheme}") + + +class EIP712Auth(Auth): + @classmethod + def authenticate(cls, data, signature, expected_address): + try: + # convert hex data for byte fields - bytes are expected by underlying library + # 1. salt + salt = data["domain"]["salt"] + data["domain"]["salt"] = HexBytes(salt) + # 2. blockHash + blockHash = data["message"]["blockHash"] + data["message"]["blockHash"] = HexBytes(blockHash) + + signable_message = encode_typed_data(full_message=data) + address_for_signature = Account.recover_message( + signable_message=signable_message, signature=signature + ) + except Exception as e: + # data could not be processed + raise cls.InvalidData(f"Invalid auth data: {e.__class__.__name__} - {e}") + + if address_for_signature != expected_address: + # verification failed - addresses don't match + raise cls.AuthenticationFailed( + f"Invalid EIP712 signature; does not match expected address, {expected_address}" + ) + + +class SIWEAuth(Auth): + @classmethod + def authenticate(cls, data, signature, expected_address): + try: + siwe_message = SiweMessage(message=data) + except Exception as e: + raise cls.InvalidData( + f"Invalid SIWE message - {e.__class__.__name__} - {e}" + ) + + try: + siwe_message.verify(signature=signature) + except VerificationError as e: + raise cls.AuthenticationFailed( + f"Invalid SIWE signature - {e.__class__.__name__} - {e}" + ) diff --git a/nucypher/policy/conditions/context.py b/nucypher/policy/conditions/context.py index 450b28ebf..7d76bd3a9 100644 --- a/nucypher/policy/conditions/context.py +++ b/nucypher/policy/conditions/context.py @@ -1,11 +1,10 @@ import re from typing import Any, List, Union -from eth_account.account import Account -from eth_account.messages import HexBytes, encode_typed_data from eth_typing import ChecksumAddress from eth_utils import to_checksum_address +from nucypher.policy.conditions.auth import Auth from nucypher.policy.conditions.exceptions import ( ContextVariableVerificationFailed, InvalidContextVariableData, @@ -20,7 +19,7 @@ CONTEXT_REGEX = re.compile(":[a-zA-Z_][a-zA-Z0-9_]*") def _recover_user_address(**context) -> ChecksumAddress: """ - Recovers a checksum address from a signed EIP712 message. + Recovers a checksum address from a signed message. Expected format: { @@ -28,50 +27,37 @@ def _recover_user_address(**context) -> ChecksumAddress: { "signature": "", "address": "
", - "typedData": "" + "scheme": "EIP712" | "SIWE" | ... + "typeData": ... } } """ - - # setup try: user_address_info = context[USER_ADDRESS_CONTEXT] signature = user_address_info["signature"] - user_address = to_checksum_address(user_address_info["address"]) - eip712_message = user_address_info["typedData"] + expected_address = to_checksum_address(user_address_info["address"]) + type_data = user_address_info["typedData"] - # convert hex data for byte fields - bytes are expected by underlying library - # 1. salt - salt = eip712_message["domain"]["salt"] - eip712_message["domain"]["salt"] = HexBytes(salt) - # 2. blockHash - blockHash = eip712_message["message"]["blockHash"] - eip712_message["message"]["blockHash"] = HexBytes(blockHash) - - signable_message = encode_typed_data(full_message=eip712_message) + scheme = user_address_info.get("scheme", Auth.AuthScheme.EIP712) + auth = Auth.from_scheme(scheme) + auth.authenticate( + data=type_data, signature=signature, expected_address=expected_address + ) + except Auth.InvalidData as e: + raise InvalidContextVariableData( + f"Invalid context variable data for '{USER_ADDRESS_CONTEXT}': {e}" + ) + except Auth.AuthenticationFailed: + raise ContextVariableVerificationFailed( + f"Invalid signature for '{USER_ADDRESS_CONTEXT}'; does not match expected address, {expected_address}" + ) except Exception as e: # data could not be processed raise InvalidContextVariableData( f'Invalid data provided for "{USER_ADDRESS_CONTEXT}"; {e.__class__.__name__} - {e}' ) - # actual verification - try: - address_for_signature = Account.recover_message( - signable_message=signable_message, signature=signature - ) - if address_for_signature == user_address: - return user_address - except Exception as e: - # exception during verification - raise ContextVariableVerificationFailed( - f"Could not determine address of signature for '{USER_ADDRESS_CONTEXT}'; {e.__class__.__name__} - {e}" - ) - - # verification failed - addresses don't match - raise ContextVariableVerificationFailed( - f"Signer address for '{USER_ADDRESS_CONTEXT}' signature does not match; expected {user_address}" - ) + return expected_address _DIRECTIVES = {