From 2fe18c7144d2d6cc7eefc5c491965747f8c7feec Mon Sep 17 00:00:00 2001 From: Kieran Prasch Date: Fri, 6 Jul 2018 15:44:33 -0700 Subject: [PATCH] Initial bulk application of type hints to pyUmbral --- umbral/_pre.py | 27 ++++++++-------- umbral/config.py | 14 +++++---- umbral/dem.py | 8 +++-- umbral/fragments.py | 55 +++++++++++++++++--------------- umbral/keys.py | 74 ++++++++++++++++++++++++------------------- umbral/params.py | 5 +-- umbral/pre.py | 77 +++++++++++++++++++++++---------------------- umbral/signing.py | 28 +++++++++-------- umbral/utils.py | 13 ++++---- 9 files changed, 162 insertions(+), 139 deletions(-) diff --git a/umbral/_pre.py b/umbral/_pre.py index de9de8e..09839df 100644 --- a/umbral/_pre.py +++ b/umbral/_pre.py @@ -1,20 +1,21 @@ +from typing import Optional + from umbral.curvebn import CurveBN -from umbral.config import default_params from umbral.keys import UmbralPublicKey -from umbral.params import UmbralParameters -def prove_cfrag_correctness(cfrag: "CapsuleFrag", - kfrag: "KFrag", - capsule: "Capsule", - metadata: bytes = None - ) -> "CorrectnessProof": +def prove_cfrag_correctness(cfrag: 'CapsuleFrag', + kfrag: 'KFrag', + capsule: 'Capsule', + metadata: Optional[bytes] = None + ) -> 'CorrectnessProof': + params = capsule._umbral_params rk = kfrag._bn_key t = CurveBN.gen_rand(params.curve) #### - ## Here are the formulaic constituents shared with `assess_cfrag_correctness`. + # Here are the formulaic constituents shared with `assess_cfrag_correctness`. #### e = capsule._point_e v = capsule._point_v @@ -45,7 +46,7 @@ def prove_cfrag_correctness(cfrag: "CapsuleFrag", raise capsule.NotValid("Capsule verification failed.") -def assess_cfrag_correctness(cfrag, capsule: "Capsule"): +def assess_cfrag_correctness(cfrag: 'CapsuleFrag', capsule: 'Capsule') -> bool: correctness_keys = capsule.get_correctness_keys() @@ -62,7 +63,7 @@ def assess_cfrag_correctness(cfrag, capsule: "Capsule"): params = capsule._umbral_params #### - ## Here are the formulaic constituents shared with `prove_cfrag_correctness`. + # Here are the formulaic constituents shared with `prove_cfrag_correctness`. #### e = capsule._point_e v = capsule._point_v @@ -110,11 +111,11 @@ def assess_cfrag_correctness(cfrag, capsule: "Capsule"): & correct_rk_commitment -def verify_kfrag(kfrag, +def verify_kfrag(kfrag: 'KFrag', delegating_pubkey: UmbralPublicKey, - signing_pubkey, + signing_pubkey: UmbralPublicKey, receiving_pubkey: UmbralPublicKey - ): + ) -> bool: params = delegating_pubkey.params diff --git a/umbral/config.py b/umbral/config.py index 2021d56..708c883 100644 --- a/umbral/config.py +++ b/umbral/config.py @@ -1,6 +1,8 @@ +from typing import Optional, Type, Union from warnings import warn from umbral.curve import Curve, SECP256K1 +from umbral.params import UmbralParameters class _CONFIG: @@ -18,19 +20,19 @@ class _CONFIG: cls.set_curve(cls.__CURVE_TO_USE_IF_NO_DEFAULT_IS_SET_BY_USER) @classmethod - def params(cls): + def params(cls) -> UmbralParameters: if not cls.__params: cls.__set_curve_by_default() return cls.__params @classmethod - def curve(cls): + def curve(cls) -> Union[Type[SECP256R1], Type[SECP256K1]]: if not cls.__curve: cls.__set_curve_by_default() return cls.__curve @classmethod - def set_curve(cls, curve: Curve=None): + def set_curve(cls, curve: Optional[Curve] = None) -> None: if cls.__curve: raise cls.UmbralConfigurationError( "You can only set the default curve once. Do it once and then leave it alone.") @@ -42,13 +44,13 @@ class _CONFIG: cls.__params = UmbralParameters(curve) -def set_default_curve(curve: Curve=None): +def set_default_curve(curve: Optional[Curve] = None) -> None: return _CONFIG.set_curve(curve) -def default_curve(): +def default_curve() -> Union[Type[SECP256R1], Type[SECP256K1]]: return _CONFIG.curve() -def default_params(): +def default_params() -> UmbralParameters: return _CONFIG.params() diff --git a/umbral/dem.py b/umbral/dem.py index 9923963..4a2196f 100644 --- a/umbral/dem.py +++ b/umbral/dem.py @@ -1,4 +1,6 @@ import os +from typing import Optional + from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 @@ -7,7 +9,7 @@ DEM_NONCE_SIZE = 12 class UmbralDEM(object): - def __init__(self, symm_key: bytes): + def __init__(self, symm_key: bytes) -> None: """ Initializes an UmbralDEM object. Requires a key to perform ChaCha20-Poly1305. @@ -19,7 +21,7 @@ class UmbralDEM(object): self.cipher = ChaCha20Poly1305(symm_key) - def encrypt(self, data: bytes, authenticated_data: bytes=None): + def encrypt(self, data: bytes, authenticated_data: Optional[bytes] = None) -> bytes: """ Encrypts data using ChaCha20-Poly1305 with optional authenticated data. """ @@ -28,7 +30,7 @@ class UmbralDEM(object): # Ciphertext will be a 12 byte nonce, the ciphertext, and a 16 byte tag. return nonce + enc_data - def decrypt(self, ciphertext: bytes, authenticated_data: bytes=None): + def decrypt(self, ciphertext: bytes, authenticated_data: Optional[bytes] = None) -> bytes: """ Decrypts data using ChaCha20-Poly1305 and validates the provided authenticated data. diff --git a/umbral/fragments.py b/umbral/fragments.py index 262e2a9..963be87 100644 --- a/umbral/fragments.py +++ b/umbral/fragments.py @@ -1,19 +1,19 @@ -from bytestring_splitter import BytestringSplitter -from cryptography.hazmat.primitives.asymmetric import ec +from typing import Optional +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve + +from bytestring_splitter import BytestringSplitter from umbral._pre import assess_cfrag_correctness, verify_kfrag -from umbral.config import default_curve, default_params +from umbral.config import default_curve from umbral.curvebn import CurveBN from umbral.keys import UmbralPublicKey -from umbral.params import UmbralParameters from umbral.point import Point - from umbral.signing import Signature class KFrag(object): - def __init__(self, id, bn_key, point_noninteractive, - point_commitment, point_xcoord, signature): + def __init__(self, id: bytes, bn_key: CurveBN, point_noninteractive: Point, + point_commitment: Point, point_xcoord: Point, signature: Signature) -> None: self._id = id self._bn_key = bn_key self._point_noninteractive = point_noninteractive @@ -22,7 +22,7 @@ class KFrag(object): self.signature = signature @classmethod - def expected_bytes_length(cls, curve: ec.EllipticCurve = None): + def expected_bytes_length(cls, curve: Optional[EllipticCurve] = None) -> int: """ Returns the size (in bytes) of a KFrag given the curve. If no curve is provided, it will use the default curve. @@ -34,7 +34,7 @@ class KFrag(object): return (bn_size * 4) + (point_size * 3) @classmethod - def from_bytes(cls, data: bytes, curve: ec.EllipticCurve = None): + def from_bytes(cls, data: bytes, curve: Optional[EllipticCurve] = None) -> 'KFrag': """ Instantiate a KFrag object from the serialized data. """ @@ -56,7 +56,7 @@ class KFrag(object): return cls(*components) - def to_bytes(self): + def to_bytes(self) -> bytes: """ Serialize the KFrag into a bytestring. """ @@ -69,19 +69,19 @@ class KFrag(object): return self._id + key + ni + commitment + xcoord + signature def verify(self, - signing_pubkey, + signing_pubkey: UmbralPublicKey, delegating_pubkey: UmbralPublicKey, - receiving_pubkey: UmbralPublicKey): + receiving_pubkey: UmbralPublicKey) -> bool: return verify_kfrag(self, delegating_pubkey, signing_pubkey, receiving_pubkey) - def __bytes__(self): + def __bytes__(self) -> bytes: return self.to_bytes() class CorrectnessProof(object): - def __init__(self, point_e2, point_v2, point_kfrag_commitment, - point_kfrag_pok, bn_sig, kfrag_signature: bytes, metadata: bytes = None): + def __init__(self, point_e2: Point, point_v2: Point, point_kfrag_commitment: Point, + point_kfrag_pok: Point, bn_sig: CurveBN, kfrag_signature: bytes, metadata: Optional[bytes] = None) -> None: self._point_e2 = point_e2 self._point_v2 = point_v2 self._point_kfrag_commitment = point_kfrag_commitment @@ -91,7 +91,7 @@ class CorrectnessProof(object): self.kfrag_signature = kfrag_signature @classmethod - def expected_bytes_length(cls, curve: ec.EllipticCurve = None): + def expected_bytes_length(cls, curve: Optional[EllipticCurve] = None): """ Returns the size (in bytes) of a CorrectnessProof without the metadata. If no curve is given, it will use the default curve. @@ -103,7 +103,7 @@ class CorrectnessProof(object): return (bn_size * 3) + (point_size * 4) @classmethod - def from_bytes(cls, data: bytes, curve: ec.EllipticCurve = None): + def from_bytes(cls, data: bytes, curve: Optional[EllipticCurve] = None) -> 'CorrectnessProof': """ Instantiate CorrectnessProof from serialized data. """ @@ -149,8 +149,13 @@ class CorrectnessProof(object): class CapsuleFrag(object): - def __init__(self, point_e1, point_v1, kfrag_id, point_noninteractive, - point_xcoord, proof: CorrectnessProof = None): + def __init__(self, + point_e1: Point, + point_v1: Point, + kfrag_id: bytes, + point_noninteractive: Point, + point_xcoord: Point, proof: Optional[CorrectnessProof] = None) -> None: + self._point_e1 = point_e1 self._point_v1 = point_v1 self._kfrag_id = kfrag_id @@ -164,7 +169,7 @@ class CapsuleFrag(object): """ @classmethod - def expected_bytes_length(cls, curve: ec.EllipticCurve = None): + def expected_bytes_length(cls, curve: Optional[EllipticCurve] = None) -> int: """ Returns the size (in bytes) of a CapsuleFrag given the curve without the CorrectnessProof. @@ -177,7 +182,7 @@ class CapsuleFrag(object): return (bn_size * 1) + (point_size * 4) @classmethod - def from_bytes(cls, data: bytes, curve: ec.EllipticCurve = None): + def from_bytes(cls, data: bytes, curve: Optional[EllipticCurve] = None) -> 'CapsuleFrag': """ Instantiates a CapsuleFrag object from the serialized data. """ @@ -200,7 +205,7 @@ class CapsuleFrag(object): proof = CorrectnessProof.from_bytes(proof, curve) if proof else None return cls(*components, proof) - def to_bytes(self): + def to_bytes(self) -> bytes: """ Serialize the CapsuleFrag into a bytestring. """ @@ -216,10 +221,10 @@ class CapsuleFrag(object): return serialized_cfrag - def verify_correctness(self, capsule: "Capsule"): + def verify_correctness(self, capsule: 'Capsule') -> bool: return assess_cfrag_correctness(self, capsule) - def attach_proof(self, e2, v2, u1, u2, z3, kfrag_signature, metadata): + def attach_proof(self, e2: Point, v2: Point, u1: Point, u2: Point, z3: CurveBN, kfrag_signature: Signature, metadata: Optional[bytes]) -> None: self.proof = CorrectnessProof(point_e2=e2, point_v2=v2, point_kfrag_commitment=u1, @@ -229,5 +234,5 @@ class CapsuleFrag(object): metadata=metadata, ) - def __bytes__(self): + def __bytes__(self) -> bytes: return self.to_bytes() diff --git a/umbral/keys.py b/umbral/keys.py index 9b64dab..85a432c 100644 --- a/umbral/keys.py +++ b/umbral/keys.py @@ -1,26 +1,22 @@ import os -import base64 -from typing import Callable +from typing import Callable, Optional, Union - -from nacl.secret import SecretBox -from cryptography.hazmat.primitives.kdf.scrypt import Scrypt from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.backends.openssl.ec import ( - _EllipticCurvePublicKey, _EllipticCurvePrivateKey -) +from cryptography.hazmat.backends.openssl.ec import _EllipticCurvePrivateKey, _EllipticCurvePublicKey from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.hkdf import HKDF +from cryptography.hazmat.primitives.kdf.scrypt import Scrypt +from nacl.secret import SecretBox from umbral import openssl from umbral.config import default_params -from umbral.point import Point -from umbral.curvebn import CurveBN +from umbral.curvebn import CurveBN from umbral.params import UmbralParameters +from umbral.point import Point + - class UmbralPrivateKey(object): - def __init__(self, bn_key: CurveBN, params: UmbralParameters): + def __init__(self, bn_key: CurveBN, params: UmbralParameters) -> None: """ Initializes an Umbral private key. """ @@ -29,7 +25,7 @@ class UmbralPrivateKey(object): self.pubkey = UmbralPublicKey(self.bn_key * params.g, params=params) @classmethod - def gen_key(cls, params: UmbralParameters = None): + def gen_key(cls, params: Optional[UmbralParameters] = None) -> 'UmbralPrivateKey': """ Generates a private key and returns it. """ @@ -40,9 +36,12 @@ class UmbralPrivateKey(object): return cls(bn_key, params) @classmethod - def from_bytes(cls, key_bytes: bytes, params: UmbralParameters = None, - password: bytes = None, _scrypt_cost: int = 20, - decoder: Callable = None): + def from_bytes(cls, + key_bytes: bytes, + params: Optional[UmbralParameters] = None, + password: Optional[bytes] = None, + _scrypt_cost: int = 20, + decoder: Optional[Callable] = None) -> 'UmbralPrivateKey': """ Loads an Umbral private key from bytes. Optionally, allows a decoder function to be passed as a param to decode @@ -79,8 +78,10 @@ class UmbralPrivateKey(object): bn_key = CurveBN.from_bytes(key_bytes, params.curve) return cls(bn_key, params) - def to_bytes(self, password: bytes = None, _scrypt_cost: int = 20, - encoder: Callable = None): + def to_bytes(self, + password: Optional[bytes] = None, + _scrypt_cost: int = 20, + encoder: Optional[Callable] = None) -> bytes: """ Returns an Umbral private key as bytes optional symmetric encryption via nacl's Salsa20-Poly1305 and Scrypt key derivation. If a password @@ -114,13 +115,13 @@ class UmbralPrivateKey(object): return umbral_privkey - def get_pubkey(self): + def get_pubkey(self) -> 'UmbralPublicKey': """ Calculates and returns the public key of the private key. """ return self.pubkey - def to_cryptography_privkey(self): + def to_cryptography_privkey(self) -> _EllipticCurvePrivateKey: """ Returns a cryptography.io EllipticCurvePrivateKey from the Umbral key. """ @@ -160,7 +161,7 @@ class UmbralPrivateKey(object): class UmbralPublicKey(object): - def __init__(self, point_key, params: UmbralParameters): + def __init__(self, point_key: Point, params: UmbralParameters) -> None: """ Initializes an Umbral public key. """ @@ -172,8 +173,10 @@ class UmbralPublicKey(object): self.point_key = point_key @classmethod - def from_bytes(cls, key_bytes: bytes, params: UmbralParameters = None, - decoder: Callable = None): + def from_bytes(cls, + key_bytes: bytes, + params: Optional[UmbralParameters] = None, + decoder: Optional[Callable] = None) -> 'UmbralPublicKey': """ Loads an Umbral public key from bytes. Optionally, if an decoder function is provided it will be used to decode @@ -188,7 +191,7 @@ class UmbralPublicKey(object): point_key = Point.from_bytes(key_bytes, params.curve) return cls(point_key, params) - def to_bytes(self, encoder: Callable = None): + def to_bytes(self, encoder: Optional[Callable] = None) -> bytes: """ Returns an Umbral public key as bytes. Optionally, if an encoder function is provided it will be used to encode @@ -204,7 +207,7 @@ class UmbralPublicKey(object): def get_pubkey(self): raise NotImplementedError - def to_cryptography_pubkey(self): + def to_cryptography_pubkey(self) -> _EllipticCurvePublicKey: """ Returns a cryptography.io EllipticCurvePublicKey from the Umbral key. """ @@ -230,7 +233,7 @@ class UmbralPublicKey(object): evp_pkey = backend._ec_cdata_to_evp_pkey(ec_key) return _EllipticCurvePublicKey(backend, ec_key, evp_pkey) - def __bytes__(self): + def __bytes__(self) -> bytes: """ Returns an Umbral Public key as a bytestring. """ @@ -239,7 +242,7 @@ class UmbralPublicKey(object): def __repr__(self): return "{}:{}".format(self.__class__, self.point_key.to_bytes().hex()[:15]) - def __eq__(self, other): + def __eq__(self, other: Optional[Union[bytes, 'UmbralPublicKey', int]]) -> bool: if type(other) == bytes: is_eq = bytes(other) == bytes(self) elif hasattr(other, "point_key"): @@ -248,7 +251,7 @@ class UmbralPublicKey(object): is_eq = False return is_eq - def __hash__(self): + def __hash__(self) -> int: return int.from_bytes(self, byteorder="big") @@ -260,7 +263,7 @@ class UmbralKeyingMaterial(object): """ - def __init__(self, keying_material: bytes = None): + def __init__(self, keying_material: Optional[bytes] = None) -> None: """ Initializes an UmbralKeyingMaterial. """ @@ -271,8 +274,10 @@ class UmbralKeyingMaterial(object): else: self.keying_material = os.urandom(64) - def derive_privkey_by_label(self, label: bytes, salt: bytes = None, - params: UmbralParameters = None): + def derive_privkey_by_label(self, + label: bytes, + salt: Optional[bytes] = None, + params: Optional[UmbralParameters] = None) -> UmbralPrivateKey: """ Derives an UmbralPrivateKey using a KDF from this instance of UmbralKeyingMaterial, a label, and an optional salt. @@ -291,7 +296,10 @@ class UmbralKeyingMaterial(object): return UmbralPrivateKey(bn_key, params) @classmethod - def from_bytes(cls, key_bytes: bytes, password: bytes=None, _scrypt_cost: int=20): + def from_bytes(cls, + key_bytes: bytes, + password: Optional[bytes] = None, + _scrypt_cost: int = 20) -> 'UmbralKeyingMaterial': """ Loads an UmbralKeyingMaterial from a urlsafe base64 encoded string. Optionally, if a password is provided it will decrypt the key using @@ -320,7 +328,7 @@ class UmbralKeyingMaterial(object): return cls(key_bytes) - def to_bytes(self, password: bytes = None, _scrypt_cost: int = 20): + def to_bytes(self, password: Optional[bytes] = None, _scrypt_cost: int = 20) -> bytes: """ Returns an UmbralKeyingMaterial as a urlsafe base64 encoded string with optional symmetric encryption via nacl's Salsa20-Poly1305 and Scrypt diff --git a/umbral/params.py b/umbral/params.py index fca2e26..aca65cc 100644 --- a/umbral/params.py +++ b/umbral/params.py @@ -1,10 +1,11 @@ from cryptography.hazmat.backends.openssl import backend + from umbral import openssl from umbral.curve import Curve class UmbralParameters(object): - def __init__(self, curve: Curve): + def __init__(self, curve: Curve) -> None: from umbral.point import Point, unsafe_hash_to_point from umbral.utils import get_field_order_size_in_bytes @@ -17,7 +18,7 @@ class UmbralParameters(object): parameters_seed = b'NuCypherKMS/UmbralParameters/' self.u = unsafe_hash_to_point(g_bytes, self, parameters_seed + b'u') - def __eq__(self, other): + def __eq__(self, other: 'UmbralParameters') -> bool: self_curve_nid = self.curve.curve_nid other_curve_nid = other.curve.curve_nid diff --git a/umbral/pre.py b/umbral/pre.py index d6b5113..b7ec2d0 100644 --- a/umbral/pre.py +++ b/umbral/pre.py @@ -1,13 +1,14 @@ -from typing import Tuple, Union, List +import os +from typing import Dict, List, Optional, Tuple, Union + +from cryptography.hazmat.backends.openssl import backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve from bytestring_splitter import BytestringSplitter -from cryptography.hazmat.backends.openssl import backend -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives import hashes - -from umbral._pre import prove_cfrag_correctness, assess_cfrag_correctness +from umbral._pre import prove_cfrag_correctness +from umbral.config import default_curve from umbral.curvebn import CurveBN -from umbral.config import default_params, default_curve from umbral.dem import UmbralDEM, DEM_KEYSIZE from umbral.fragments import KFrag, CapsuleFrag from umbral.keys import UmbralPrivateKey, UmbralPublicKey @@ -15,7 +16,6 @@ from umbral.params import UmbralParameters from umbral.point import Point from umbral.signing import Signer from umbral.utils import poly_eval, lambda_coeff, kdf -import os class GenericUmbralError(Exception): @@ -23,7 +23,7 @@ class GenericUmbralError(Exception): class UmbralCorrectnessError(GenericUmbralError): - def __init__(self, message, offending_cfrags): + def __init__(self, message: str, offending_cfrags: List[CapsuleFrag]) -> None: super().__init__(message) self.offending_cfrags = offending_cfrags @@ -31,15 +31,16 @@ class UmbralCorrectnessError(GenericUmbralError): class Capsule(object): def __init__(self, params: UmbralParameters, - point_e=None, - point_v=None, - bn_sig=None, - point_e_prime=None, - point_v_prime=None, - point_noninteractive=None, - delegating_pubkey: UmbralPublicKey = None, - receiving_pubkey: UmbralPublicKey = None, - verifying_pubkey=None): + point_e: Optional[Point] = None, + point_v: Optional[Union[int, Point]] = None, + bn_sig: Optional[Union[int, CurveBN]] = None, + point_e_prime: Optional[Point] = None, + point_v_prime: Optional[Union[int, Point]] = None, + point_noninteractive: Optional[Union[int, Point]] = None, + delegating_pubkey: Optional[UmbralPublicKey] = None, + receiving_pubkey: Optional[UmbralPublicKey] = None, + verifying_pubkey: None = None + ) -> None: self._umbral_params = params @@ -69,7 +70,7 @@ class Capsule(object): self._attached_cfrags = list() @classmethod - def expected_bytes_length(cls, curve: ec.EllipticCurve = None, activated=False): + def expected_bytes_length(cls, curve: Optional[EllipticCurve] = None, activated: bool = False) -> int: """ Returns the size (in bytes) of a Capsule given the curve. If no curve is provided, it will use the default curve. @@ -89,7 +90,7 @@ class Capsule(object): """ @classmethod - def from_bytes(cls, capsule_bytes: bytes, params: UmbralParameters): + def from_bytes(cls, capsule_bytes: bytes, params: UmbralParameters) -> 'Capsule': """ Instantiates a Capsule object from the serialized data. """ @@ -123,7 +124,7 @@ class Capsule(object): components = splitter(capsule_bytes) return cls(params, *components) - def _set_cfrag_correctness_key(self, key_type, key: UmbralPublicKey): + def _set_cfrag_correctness_key(self, key_type: str, key: UmbralPublicKey) -> bool: if key_type not in ("delegating", "receiving", "verifying"): raise ValueError("You can only set 'delegating', 'receiving' or 'verifying' keys.") @@ -142,15 +143,15 @@ class Capsule(object): else: raise ValueError("The {} key is already set; you can't set it again.".format(key_type)) - - def get_correctness_keys(self): + def get_correctness_keys(self) -> Dict[str, Union[UmbralPublicKey, None]]: return dict(self._cfrag_correctness_keys) def set_correctness_keys(self, - delegating: UmbralPublicKey = None, - receiving: UmbralPublicKey = None, - verifying=None - ): + delegating: Optional[UmbralPublicKey] = None, + receiving: Optional[UmbralPublicKey] = None, + verifying: Optional[UmbralPublicKey] = None + ) -> Tuple[bool, bool, bool]: + delegating_key_details = self._set_cfrag_correctness_key("delegating", delegating) receiving_key_details = self._set_cfrag_correctness_key("receiving", receiving) verifying_key_details = self._set_cfrag_correctness_key("verifying", verifying) @@ -236,10 +237,10 @@ class Capsule(object): self._point_v_prime = v self._point_noninteractive = ni - def __bytes__(self): + def __bytes__(self) -> bytes: return self.to_bytes() - def __eq__(self, other): + def __eq__(self, other: 'Capsule') -> bool: """ If both Capsules are activated, we compare only the activated components. Otherwise, we compare only original components. @@ -260,7 +261,7 @@ class Capsule(object): # Again, it's hard to imagine why. return False - def __hash__(self): + def __hash__(self) -> int: # We only ever want to store in a hash table based on original components; # A Capsule that is part of a dict needs to continue to be lookup-able even # after activation. @@ -268,7 +269,7 @@ class Capsule(object): component_bytes = tuple(component.to_bytes() for component in self.original_components()) return hash(component_bytes) - def __len__(self): + def __len__(self) -> int: return len(self._attached_cfrags) @@ -349,8 +350,8 @@ def split_rekey(delegating_privkey: UmbralPrivateKey, signer: Signer, return kfrags -def reencrypt(kfrag: KFrag, capsule: Capsule, provide_proof = True, - metadata: bytes = None) -> CapsuleFrag: +def reencrypt(kfrag: KFrag, capsule: Capsule, provide_proof: bool = True, + metadata: Optional[bytes] = None) -> CapsuleFrag: if not capsule.verify(): raise capsule.NotValid @@ -371,7 +372,7 @@ def reencrypt(kfrag: KFrag, capsule: Capsule, provide_proof = True, def _encapsulate(alice_pubkey: UmbralPublicKey, - key_length=DEM_KEYSIZE) -> Tuple[bytes, Capsule]: + key_length: int = DEM_KEYSIZE) -> Tuple[bytes, Capsule]: """Generates a symmetric key and its associated KEM ciphertext""" params = alice_pubkey.params @@ -395,7 +396,7 @@ def _encapsulate(alice_pubkey: UmbralPublicKey, def _decapsulate_original(priv_key: UmbralPrivateKey, capsule: Capsule, - key_length=DEM_KEYSIZE) -> bytes: + key_length: int = DEM_KEYSIZE) -> bytes: """Derive the same symmetric key""" priv_key = priv_key.bn_key @@ -412,7 +413,7 @@ def _decapsulate_original(priv_key: UmbralPrivateKey, capsule: Capsule, def _decapsulate_reencrypted(receiving_privkey: UmbralPrivateKey, capsule: Capsule, - key_length=DEM_KEYSIZE) -> bytes: + key_length: int = DEM_KEYSIZE) -> bytes: """Derive the same symmetric key""" params = capsule._umbral_params @@ -459,7 +460,7 @@ def encrypt(alice_pubkey: UmbralPublicKey, plaintext: bytes) -> Tuple[bytes, Cap def _open_capsule(capsule: Capsule, receiving_privkey: UmbralPrivateKey, - check_proof=True) -> bytes: + check_proof: bool = True) -> bytes: """ Activates the Capsule from the attached CFrags, opens the Capsule and returns what is inside. @@ -486,7 +487,7 @@ def _open_capsule(capsule: Capsule, receiving_privkey: UmbralPrivateKey, def decrypt(ciphertext: bytes, capsule: Capsule, decrypting_key: UmbralPrivateKey, - check_proof=True) -> bytes: + check_proof: bool = True) -> bytes: """ Opens the capsule and gets what's inside. diff --git a/umbral/signing.py b/umbral/signing.py index 71905dd..87d4474 100644 --- a/umbral/signing.py +++ b/umbral/signing.py @@ -1,17 +1,19 @@ import hmac +from typing import Optional from cryptography.exceptions import InvalidSignature - -from cryptography.hazmat.primitives.asymmetric.ec import ECDSA -from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric.ec import ECDSA +from cryptography.hazmat.primitives.asymmetric.ec import Curve +from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature from umbral.config import default_curve from umbral.curve import Curve -from umbral.keys import UmbralPublicKey, UmbralPrivateKey from umbral.curvebn import CurveBN +from umbral.keys import UmbralPublicKey, UmbralPrivateKey from umbral.utils import get_field_order_size_in_bytes + _BLAKE2B = hashes.BLAKE2b(64) @@ -21,7 +23,7 @@ class Signature: between (r, s) and DER formatting. """ - def __init__(self, r: CurveBN, s: CurveBN): + def __init__(self, r: CurveBN, s: CurveBN) -> None: self.r = r self.s = s @@ -29,7 +31,7 @@ class Signature: return "ECDSA Signature: {}".format(bytes(self).hex()[:15]) @classmethod - def expected_bytes_length(cls, curve: Curve = None): + def expected_bytes_length(cls, curve: Optional[Curve] = None) -> int: curve = curve if curve is not None else default_curve() return get_field_order_size_in_bytes(curve) * 2 @@ -56,7 +58,7 @@ class Signature: return True @classmethod - def from_bytes(cls, signature_as_bytes, der_encoded=False, curve: Curve = None): + def from_bytes(cls, signature_as_bytes: bytes, der_encoded: bool = False, curve: Optional[Curve] = None) -> 'Signature': curve = curve if curve is not None else default_curve() if der_encoded: r, s = decode_dss_signature(signature_as_bytes) @@ -71,10 +73,10 @@ class Signature: return cls(CurveBN.from_int(r, curve), CurveBN.from_int(s, curve)) - def _der_encoded_bytes(self): + def _der_encoded_bytes(self) -> bytes: return encode_dss_signature(int(self.r), int(self.s)) - def __bytes__(self): + def __bytes__(self) -> bytes: return self.r.to_bytes() + self.s.to_bytes() def __len__(self): @@ -83,10 +85,10 @@ class Signature: def __add__(self, other): return bytes(self) + other - def __radd__(self, other): + def __radd__(self, other: bytes) -> bytes: return other + bytes(self) - def __eq__(self, other): + def __eq__(self, other: 'Signature') -> bool: simple_bytes_match = hmac.compare_digest(bytes(self), bytes(other)) der_encoded_match = hmac.compare_digest(self._der_encoded_bytes(), bytes(other)) return simple_bytes_match or der_encoded_match @@ -94,11 +96,11 @@ class Signature: class Signer: - def __init__(self, private_key: UmbralPrivateKey): + def __init__(self, private_key: UmbralPrivateKey) -> None: self.__cryptography_private_key = private_key.to_cryptography_privkey() self._curve = private_key.params.curve - def __call__(self, message): + def __call__(self, message: bytes) -> Signature: """ Signs the message with this instance's private key. diff --git a/umbral/utils.py b/umbral/utils.py index 52221dd..ec43e50 100644 --- a/umbral/utils.py +++ b/umbral/utils.py @@ -1,13 +1,14 @@ -import math +from typing import List from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.hkdf import HKDF -from cryptography.hazmat.primitives.asymmetric import ec + from umbral import openssl +from umbral.curve import Curve -def lambda_coeff(id_i, selected_ids): +def lambda_coeff(id_i: 'CurveBN', selected_ids: List['CurveBN']) -> 'CurveBN': ids = [x for x in selected_ids if x != id_i] if not ids: @@ -22,7 +23,7 @@ def lambda_coeff(id_i, selected_ids): return result -def poly_eval(coeff, x): +def poly_eval(coeff: List['CurveBN'], x: 'CurveBN') -> 'CurveBN': result = coeff[-1] for i in range(-2, -len(coeff) - 1, -1): result = ((result * x) + coeff[i]) @@ -30,7 +31,7 @@ def poly_eval(coeff, x): return result -def kdf(ecpoint, key_length): +def kdf(ecpoint: 'Point', key_length: int) -> bytes: data = ecpoint.to_bytes(is_compressed=True) return HKDF( @@ -42,7 +43,7 @@ def kdf(ecpoint, key_length): ).derive(data) -def get_field_order_size_in_bytes(curve): +def get_field_order_size_in_bytes(curve: Curve) -> int: backend = default_backend() size_in_bits = openssl._get_ec_group_degree(curve.ec_group) return (size_in_bits + 7) // 8