mirror of https://github.com/nucypher/pyUmbral.git
Initial bulk application of type hints to pyUmbral
parent
062d4d9a28
commit
2fe18c7144
|
@ -1,20 +1,21 @@
|
|||
from typing import Optional
|
||||
|
||||
from umbral.curvebn import CurveBN
|
||||
from umbral.config import default_params
|
||||
from umbral.keys import UmbralPublicKey
|
||||
from umbral.params import UmbralParameters
|
||||
|
||||
|
||||
def prove_cfrag_correctness(cfrag: "CapsuleFrag",
|
||||
kfrag: "KFrag",
|
||||
capsule: "Capsule",
|
||||
metadata: bytes = None
|
||||
) -> "CorrectnessProof":
|
||||
def prove_cfrag_correctness(cfrag: 'CapsuleFrag',
|
||||
kfrag: 'KFrag',
|
||||
capsule: 'Capsule',
|
||||
metadata: Optional[bytes] = None
|
||||
) -> 'CorrectnessProof':
|
||||
|
||||
params = capsule._umbral_params
|
||||
|
||||
rk = kfrag._bn_key
|
||||
t = CurveBN.gen_rand(params.curve)
|
||||
####
|
||||
## Here are the formulaic constituents shared with `assess_cfrag_correctness`.
|
||||
# Here are the formulaic constituents shared with `assess_cfrag_correctness`.
|
||||
####
|
||||
e = capsule._point_e
|
||||
v = capsule._point_v
|
||||
|
@ -45,7 +46,7 @@ def prove_cfrag_correctness(cfrag: "CapsuleFrag",
|
|||
raise capsule.NotValid("Capsule verification failed.")
|
||||
|
||||
|
||||
def assess_cfrag_correctness(cfrag, capsule: "Capsule"):
|
||||
def assess_cfrag_correctness(cfrag: 'CapsuleFrag', capsule: 'Capsule') -> bool:
|
||||
|
||||
correctness_keys = capsule.get_correctness_keys()
|
||||
|
||||
|
@ -62,7 +63,7 @@ def assess_cfrag_correctness(cfrag, capsule: "Capsule"):
|
|||
params = capsule._umbral_params
|
||||
|
||||
####
|
||||
## Here are the formulaic constituents shared with `prove_cfrag_correctness`.
|
||||
# Here are the formulaic constituents shared with `prove_cfrag_correctness`.
|
||||
####
|
||||
e = capsule._point_e
|
||||
v = capsule._point_v
|
||||
|
@ -110,11 +111,11 @@ def assess_cfrag_correctness(cfrag, capsule: "Capsule"):
|
|||
& correct_rk_commitment
|
||||
|
||||
|
||||
def verify_kfrag(kfrag,
|
||||
def verify_kfrag(kfrag: 'KFrag',
|
||||
delegating_pubkey: UmbralPublicKey,
|
||||
signing_pubkey,
|
||||
signing_pubkey: UmbralPublicKey,
|
||||
receiving_pubkey: UmbralPublicKey
|
||||
):
|
||||
) -> bool:
|
||||
|
||||
|
||||
params = delegating_pubkey.params
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
from typing import Optional, Type, Union
|
||||
from warnings import warn
|
||||
|
||||
from umbral.curve import Curve, SECP256K1
|
||||
from umbral.params import UmbralParameters
|
||||
|
||||
|
||||
class _CONFIG:
|
||||
|
@ -18,19 +20,19 @@ class _CONFIG:
|
|||
cls.set_curve(cls.__CURVE_TO_USE_IF_NO_DEFAULT_IS_SET_BY_USER)
|
||||
|
||||
@classmethod
|
||||
def params(cls):
|
||||
def params(cls) -> UmbralParameters:
|
||||
if not cls.__params:
|
||||
cls.__set_curve_by_default()
|
||||
return cls.__params
|
||||
|
||||
@classmethod
|
||||
def curve(cls):
|
||||
def curve(cls) -> Union[Type[SECP256R1], Type[SECP256K1]]:
|
||||
if not cls.__curve:
|
||||
cls.__set_curve_by_default()
|
||||
return cls.__curve
|
||||
|
||||
@classmethod
|
||||
def set_curve(cls, curve: Curve=None):
|
||||
def set_curve(cls, curve: Optional[Curve] = None) -> None:
|
||||
if cls.__curve:
|
||||
raise cls.UmbralConfigurationError(
|
||||
"You can only set the default curve once. Do it once and then leave it alone.")
|
||||
|
@ -42,13 +44,13 @@ class _CONFIG:
|
|||
cls.__params = UmbralParameters(curve)
|
||||
|
||||
|
||||
def set_default_curve(curve: Curve=None):
|
||||
def set_default_curve(curve: Optional[Curve] = None) -> None:
|
||||
return _CONFIG.set_curve(curve)
|
||||
|
||||
|
||||
def default_curve():
|
||||
def default_curve() -> Union[Type[SECP256R1], Type[SECP256K1]]:
|
||||
return _CONFIG.curve()
|
||||
|
||||
|
||||
def default_params():
|
||||
def default_params() -> UmbralParameters:
|
||||
return _CONFIG.params()
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
import os
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
||||
|
||||
|
||||
|
@ -7,7 +9,7 @@ DEM_NONCE_SIZE = 12
|
|||
|
||||
|
||||
class UmbralDEM(object):
|
||||
def __init__(self, symm_key: bytes):
|
||||
def __init__(self, symm_key: bytes) -> None:
|
||||
"""
|
||||
Initializes an UmbralDEM object. Requires a key to perform
|
||||
ChaCha20-Poly1305.
|
||||
|
@ -19,7 +21,7 @@ class UmbralDEM(object):
|
|||
|
||||
self.cipher = ChaCha20Poly1305(symm_key)
|
||||
|
||||
def encrypt(self, data: bytes, authenticated_data: bytes=None):
|
||||
def encrypt(self, data: bytes, authenticated_data: Optional[bytes] = None) -> bytes:
|
||||
"""
|
||||
Encrypts data using ChaCha20-Poly1305 with optional authenticated data.
|
||||
"""
|
||||
|
@ -28,7 +30,7 @@ class UmbralDEM(object):
|
|||
# Ciphertext will be a 12 byte nonce, the ciphertext, and a 16 byte tag.
|
||||
return nonce + enc_data
|
||||
|
||||
def decrypt(self, ciphertext: bytes, authenticated_data: bytes=None):
|
||||
def decrypt(self, ciphertext: bytes, authenticated_data: Optional[bytes] = None) -> bytes:
|
||||
"""
|
||||
Decrypts data using ChaCha20-Poly1305 and validates the provided
|
||||
authenticated data.
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
from bytestring_splitter import BytestringSplitter
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
|
||||
|
||||
from bytestring_splitter import BytestringSplitter
|
||||
from umbral._pre import assess_cfrag_correctness, verify_kfrag
|
||||
from umbral.config import default_curve, default_params
|
||||
from umbral.config import default_curve
|
||||
from umbral.curvebn import CurveBN
|
||||
from umbral.keys import UmbralPublicKey
|
||||
from umbral.params import UmbralParameters
|
||||
from umbral.point import Point
|
||||
|
||||
from umbral.signing import Signature
|
||||
|
||||
|
||||
class KFrag(object):
|
||||
def __init__(self, id, bn_key, point_noninteractive,
|
||||
point_commitment, point_xcoord, signature):
|
||||
def __init__(self, id: bytes, bn_key: CurveBN, point_noninteractive: Point,
|
||||
point_commitment: Point, point_xcoord: Point, signature: Signature) -> None:
|
||||
self._id = id
|
||||
self._bn_key = bn_key
|
||||
self._point_noninteractive = point_noninteractive
|
||||
|
@ -22,7 +22,7 @@ class KFrag(object):
|
|||
self.signature = signature
|
||||
|
||||
@classmethod
|
||||
def expected_bytes_length(cls, curve: ec.EllipticCurve = None):
|
||||
def expected_bytes_length(cls, curve: Optional[EllipticCurve] = None) -> int:
|
||||
"""
|
||||
Returns the size (in bytes) of a KFrag given the curve.
|
||||
If no curve is provided, it will use the default curve.
|
||||
|
@ -34,7 +34,7 @@ class KFrag(object):
|
|||
return (bn_size * 4) + (point_size * 3)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes, curve: ec.EllipticCurve = None):
|
||||
def from_bytes(cls, data: bytes, curve: Optional[EllipticCurve] = None) -> 'KFrag':
|
||||
"""
|
||||
Instantiate a KFrag object from the serialized data.
|
||||
"""
|
||||
|
@ -56,7 +56,7 @@ class KFrag(object):
|
|||
|
||||
return cls(*components)
|
||||
|
||||
def to_bytes(self):
|
||||
def to_bytes(self) -> bytes:
|
||||
"""
|
||||
Serialize the KFrag into a bytestring.
|
||||
"""
|
||||
|
@ -69,19 +69,19 @@ class KFrag(object):
|
|||
return self._id + key + ni + commitment + xcoord + signature
|
||||
|
||||
def verify(self,
|
||||
signing_pubkey,
|
||||
signing_pubkey: UmbralPublicKey,
|
||||
delegating_pubkey: UmbralPublicKey,
|
||||
receiving_pubkey: UmbralPublicKey):
|
||||
receiving_pubkey: UmbralPublicKey) -> bool:
|
||||
|
||||
return verify_kfrag(self, delegating_pubkey, signing_pubkey, receiving_pubkey)
|
||||
|
||||
def __bytes__(self):
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
|
||||
class CorrectnessProof(object):
|
||||
def __init__(self, point_e2, point_v2, point_kfrag_commitment,
|
||||
point_kfrag_pok, bn_sig, kfrag_signature: bytes, metadata: bytes = None):
|
||||
def __init__(self, point_e2: Point, point_v2: Point, point_kfrag_commitment: Point,
|
||||
point_kfrag_pok: Point, bn_sig: CurveBN, kfrag_signature: bytes, metadata: Optional[bytes] = None) -> None:
|
||||
self._point_e2 = point_e2
|
||||
self._point_v2 = point_v2
|
||||
self._point_kfrag_commitment = point_kfrag_commitment
|
||||
|
@ -91,7 +91,7 @@ class CorrectnessProof(object):
|
|||
self.kfrag_signature = kfrag_signature
|
||||
|
||||
@classmethod
|
||||
def expected_bytes_length(cls, curve: ec.EllipticCurve = None):
|
||||
def expected_bytes_length(cls, curve: Optional[EllipticCurve] = None):
|
||||
"""
|
||||
Returns the size (in bytes) of a CorrectnessProof without the metadata.
|
||||
If no curve is given, it will use the default curve.
|
||||
|
@ -103,7 +103,7 @@ class CorrectnessProof(object):
|
|||
return (bn_size * 3) + (point_size * 4)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes, curve: ec.EllipticCurve = None):
|
||||
def from_bytes(cls, data: bytes, curve: Optional[EllipticCurve] = None) -> 'CorrectnessProof':
|
||||
"""
|
||||
Instantiate CorrectnessProof from serialized data.
|
||||
"""
|
||||
|
@ -149,8 +149,13 @@ class CorrectnessProof(object):
|
|||
|
||||
|
||||
class CapsuleFrag(object):
|
||||
def __init__(self, point_e1, point_v1, kfrag_id, point_noninteractive,
|
||||
point_xcoord, proof: CorrectnessProof = None):
|
||||
def __init__(self,
|
||||
point_e1: Point,
|
||||
point_v1: Point,
|
||||
kfrag_id: bytes,
|
||||
point_noninteractive: Point,
|
||||
point_xcoord: Point, proof: Optional[CorrectnessProof] = None) -> None:
|
||||
|
||||
self._point_e1 = point_e1
|
||||
self._point_v1 = point_v1
|
||||
self._kfrag_id = kfrag_id
|
||||
|
@ -164,7 +169,7 @@ class CapsuleFrag(object):
|
|||
"""
|
||||
|
||||
@classmethod
|
||||
def expected_bytes_length(cls, curve: ec.EllipticCurve = None):
|
||||
def expected_bytes_length(cls, curve: Optional[EllipticCurve] = None) -> int:
|
||||
"""
|
||||
Returns the size (in bytes) of a CapsuleFrag given the curve without
|
||||
the CorrectnessProof.
|
||||
|
@ -177,7 +182,7 @@ class CapsuleFrag(object):
|
|||
return (bn_size * 1) + (point_size * 4)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes, curve: ec.EllipticCurve = None):
|
||||
def from_bytes(cls, data: bytes, curve: Optional[EllipticCurve] = None) -> 'CapsuleFrag':
|
||||
"""
|
||||
Instantiates a CapsuleFrag object from the serialized data.
|
||||
"""
|
||||
|
@ -200,7 +205,7 @@ class CapsuleFrag(object):
|
|||
proof = CorrectnessProof.from_bytes(proof, curve) if proof else None
|
||||
return cls(*components, proof)
|
||||
|
||||
def to_bytes(self):
|
||||
def to_bytes(self) -> bytes:
|
||||
"""
|
||||
Serialize the CapsuleFrag into a bytestring.
|
||||
"""
|
||||
|
@ -216,10 +221,10 @@ class CapsuleFrag(object):
|
|||
|
||||
return serialized_cfrag
|
||||
|
||||
def verify_correctness(self, capsule: "Capsule"):
|
||||
def verify_correctness(self, capsule: 'Capsule') -> bool:
|
||||
return assess_cfrag_correctness(self, capsule)
|
||||
|
||||
def attach_proof(self, e2, v2, u1, u2, z3, kfrag_signature, metadata):
|
||||
def attach_proof(self, e2: Point, v2: Point, u1: Point, u2: Point, z3: CurveBN, kfrag_signature: Signature, metadata: Optional[bytes]) -> None:
|
||||
self.proof = CorrectnessProof(point_e2=e2,
|
||||
point_v2=v2,
|
||||
point_kfrag_commitment=u1,
|
||||
|
@ -229,5 +234,5 @@ class CapsuleFrag(object):
|
|||
metadata=metadata,
|
||||
)
|
||||
|
||||
def __bytes__(self):
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
|
|
@ -1,26 +1,22 @@
|
|||
import os
|
||||
import base64
|
||||
from typing import Callable
|
||||
from typing import Callable, Optional, Union
|
||||
|
||||
|
||||
from nacl.secret import SecretBox
|
||||
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.backends.openssl.ec import (
|
||||
_EllipticCurvePublicKey, _EllipticCurvePrivateKey
|
||||
)
|
||||
from cryptography.hazmat.backends.openssl.ec import _EllipticCurvePrivateKey, _EllipticCurvePublicKey
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
|
||||
from nacl.secret import SecretBox
|
||||
|
||||
from umbral import openssl
|
||||
from umbral.config import default_params
|
||||
from umbral.point import Point
|
||||
from umbral.curvebn import CurveBN
|
||||
from umbral.curvebn import CurveBN
|
||||
from umbral.params import UmbralParameters
|
||||
from umbral.point import Point
|
||||
|
||||
|
||||
|
||||
class UmbralPrivateKey(object):
|
||||
def __init__(self, bn_key: CurveBN, params: UmbralParameters):
|
||||
def __init__(self, bn_key: CurveBN, params: UmbralParameters) -> None:
|
||||
"""
|
||||
Initializes an Umbral private key.
|
||||
"""
|
||||
|
@ -29,7 +25,7 @@ class UmbralPrivateKey(object):
|
|||
self.pubkey = UmbralPublicKey(self.bn_key * params.g, params=params)
|
||||
|
||||
@classmethod
|
||||
def gen_key(cls, params: UmbralParameters = None):
|
||||
def gen_key(cls, params: Optional[UmbralParameters] = None) -> 'UmbralPrivateKey':
|
||||
"""
|
||||
Generates a private key and returns it.
|
||||
"""
|
||||
|
@ -40,9 +36,12 @@ class UmbralPrivateKey(object):
|
|||
return cls(bn_key, params)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, key_bytes: bytes, params: UmbralParameters = None,
|
||||
password: bytes = None, _scrypt_cost: int = 20,
|
||||
decoder: Callable = None):
|
||||
def from_bytes(cls,
|
||||
key_bytes: bytes,
|
||||
params: Optional[UmbralParameters] = None,
|
||||
password: Optional[bytes] = None,
|
||||
_scrypt_cost: int = 20,
|
||||
decoder: Optional[Callable] = None) -> 'UmbralPrivateKey':
|
||||
"""
|
||||
Loads an Umbral private key from bytes.
|
||||
Optionally, allows a decoder function to be passed as a param to decode
|
||||
|
@ -79,8 +78,10 @@ class UmbralPrivateKey(object):
|
|||
bn_key = CurveBN.from_bytes(key_bytes, params.curve)
|
||||
return cls(bn_key, params)
|
||||
|
||||
def to_bytes(self, password: bytes = None, _scrypt_cost: int = 20,
|
||||
encoder: Callable = None):
|
||||
def to_bytes(self,
|
||||
password: Optional[bytes] = None,
|
||||
_scrypt_cost: int = 20,
|
||||
encoder: Optional[Callable] = None) -> bytes:
|
||||
"""
|
||||
Returns an Umbral private key as bytes optional symmetric encryption
|
||||
via nacl's Salsa20-Poly1305 and Scrypt key derivation. If a password
|
||||
|
@ -114,13 +115,13 @@ class UmbralPrivateKey(object):
|
|||
|
||||
return umbral_privkey
|
||||
|
||||
def get_pubkey(self):
|
||||
def get_pubkey(self) -> 'UmbralPublicKey':
|
||||
"""
|
||||
Calculates and returns the public key of the private key.
|
||||
"""
|
||||
return self.pubkey
|
||||
|
||||
def to_cryptography_privkey(self):
|
||||
def to_cryptography_privkey(self) -> _EllipticCurvePrivateKey:
|
||||
"""
|
||||
Returns a cryptography.io EllipticCurvePrivateKey from the Umbral key.
|
||||
"""
|
||||
|
@ -160,7 +161,7 @@ class UmbralPrivateKey(object):
|
|||
|
||||
|
||||
class UmbralPublicKey(object):
|
||||
def __init__(self, point_key, params: UmbralParameters):
|
||||
def __init__(self, point_key: Point, params: UmbralParameters) -> None:
|
||||
"""
|
||||
Initializes an Umbral public key.
|
||||
"""
|
||||
|
@ -172,8 +173,10 @@ class UmbralPublicKey(object):
|
|||
self.point_key = point_key
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, key_bytes: bytes, params: UmbralParameters = None,
|
||||
decoder: Callable = None):
|
||||
def from_bytes(cls,
|
||||
key_bytes: bytes,
|
||||
params: Optional[UmbralParameters] = None,
|
||||
decoder: Optional[Callable] = None) -> 'UmbralPublicKey':
|
||||
"""
|
||||
Loads an Umbral public key from bytes.
|
||||
Optionally, if an decoder function is provided it will be used to decode
|
||||
|
@ -188,7 +191,7 @@ class UmbralPublicKey(object):
|
|||
point_key = Point.from_bytes(key_bytes, params.curve)
|
||||
return cls(point_key, params)
|
||||
|
||||
def to_bytes(self, encoder: Callable = None):
|
||||
def to_bytes(self, encoder: Optional[Callable] = None) -> bytes:
|
||||
"""
|
||||
Returns an Umbral public key as bytes.
|
||||
Optionally, if an encoder function is provided it will be used to encode
|
||||
|
@ -204,7 +207,7 @@ class UmbralPublicKey(object):
|
|||
def get_pubkey(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def to_cryptography_pubkey(self):
|
||||
def to_cryptography_pubkey(self) -> _EllipticCurvePublicKey:
|
||||
"""
|
||||
Returns a cryptography.io EllipticCurvePublicKey from the Umbral key.
|
||||
"""
|
||||
|
@ -230,7 +233,7 @@ class UmbralPublicKey(object):
|
|||
evp_pkey = backend._ec_cdata_to_evp_pkey(ec_key)
|
||||
return _EllipticCurvePublicKey(backend, ec_key, evp_pkey)
|
||||
|
||||
def __bytes__(self):
|
||||
def __bytes__(self) -> bytes:
|
||||
"""
|
||||
Returns an Umbral Public key as a bytestring.
|
||||
"""
|
||||
|
@ -239,7 +242,7 @@ class UmbralPublicKey(object):
|
|||
def __repr__(self):
|
||||
return "{}:{}".format(self.__class__, self.point_key.to_bytes().hex()[:15])
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: Optional[Union[bytes, 'UmbralPublicKey', int]]) -> bool:
|
||||
if type(other) == bytes:
|
||||
is_eq = bytes(other) == bytes(self)
|
||||
elif hasattr(other, "point_key"):
|
||||
|
@ -248,7 +251,7 @@ class UmbralPublicKey(object):
|
|||
is_eq = False
|
||||
return is_eq
|
||||
|
||||
def __hash__(self):
|
||||
def __hash__(self) -> int:
|
||||
return int.from_bytes(self, byteorder="big")
|
||||
|
||||
|
||||
|
@ -260,7 +263,7 @@ class UmbralKeyingMaterial(object):
|
|||
|
||||
"""
|
||||
|
||||
def __init__(self, keying_material: bytes = None):
|
||||
def __init__(self, keying_material: Optional[bytes] = None) -> None:
|
||||
"""
|
||||
Initializes an UmbralKeyingMaterial.
|
||||
"""
|
||||
|
@ -271,8 +274,10 @@ class UmbralKeyingMaterial(object):
|
|||
else:
|
||||
self.keying_material = os.urandom(64)
|
||||
|
||||
def derive_privkey_by_label(self, label: bytes, salt: bytes = None,
|
||||
params: UmbralParameters = None):
|
||||
def derive_privkey_by_label(self,
|
||||
label: bytes,
|
||||
salt: Optional[bytes] = None,
|
||||
params: Optional[UmbralParameters] = None) -> UmbralPrivateKey:
|
||||
"""
|
||||
Derives an UmbralPrivateKey using a KDF from this instance of
|
||||
UmbralKeyingMaterial, a label, and an optional salt.
|
||||
|
@ -291,7 +296,10 @@ class UmbralKeyingMaterial(object):
|
|||
return UmbralPrivateKey(bn_key, params)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, key_bytes: bytes, password: bytes=None, _scrypt_cost: int=20):
|
||||
def from_bytes(cls,
|
||||
key_bytes: bytes,
|
||||
password: Optional[bytes] = None,
|
||||
_scrypt_cost: int = 20) -> 'UmbralKeyingMaterial':
|
||||
"""
|
||||
Loads an UmbralKeyingMaterial from a urlsafe base64 encoded string.
|
||||
Optionally, if a password is provided it will decrypt the key using
|
||||
|
@ -320,7 +328,7 @@ class UmbralKeyingMaterial(object):
|
|||
|
||||
return cls(key_bytes)
|
||||
|
||||
def to_bytes(self, password: bytes = None, _scrypt_cost: int = 20):
|
||||
def to_bytes(self, password: Optional[bytes] = None, _scrypt_cost: int = 20) -> bytes:
|
||||
"""
|
||||
Returns an UmbralKeyingMaterial as a urlsafe base64 encoded string with
|
||||
optional symmetric encryption via nacl's Salsa20-Poly1305 and Scrypt
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
from cryptography.hazmat.backends.openssl import backend
|
||||
|
||||
from umbral import openssl
|
||||
from umbral.curve import Curve
|
||||
|
||||
|
||||
class UmbralParameters(object):
|
||||
def __init__(self, curve: Curve):
|
||||
def __init__(self, curve: Curve) -> None:
|
||||
from umbral.point import Point, unsafe_hash_to_point
|
||||
from umbral.utils import get_field_order_size_in_bytes
|
||||
|
||||
|
@ -17,7 +18,7 @@ class UmbralParameters(object):
|
|||
parameters_seed = b'NuCypherKMS/UmbralParameters/'
|
||||
self.u = unsafe_hash_to_point(g_bytes, self, parameters_seed + b'u')
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: 'UmbralParameters') -> bool:
|
||||
|
||||
self_curve_nid = self.curve.curve_nid
|
||||
other_curve_nid = other.curve.curve_nid
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
from typing import Tuple, Union, List
|
||||
import os
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
|
||||
|
||||
from bytestring_splitter import BytestringSplitter
|
||||
from cryptography.hazmat.backends.openssl import backend
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
from umbral._pre import prove_cfrag_correctness, assess_cfrag_correctness
|
||||
from umbral._pre import prove_cfrag_correctness
|
||||
from umbral.config import default_curve
|
||||
from umbral.curvebn import CurveBN
|
||||
from umbral.config import default_params, default_curve
|
||||
from umbral.dem import UmbralDEM, DEM_KEYSIZE
|
||||
from umbral.fragments import KFrag, CapsuleFrag
|
||||
from umbral.keys import UmbralPrivateKey, UmbralPublicKey
|
||||
|
@ -15,7 +16,6 @@ from umbral.params import UmbralParameters
|
|||
from umbral.point import Point
|
||||
from umbral.signing import Signer
|
||||
from umbral.utils import poly_eval, lambda_coeff, kdf
|
||||
import os
|
||||
|
||||
|
||||
class GenericUmbralError(Exception):
|
||||
|
@ -23,7 +23,7 @@ class GenericUmbralError(Exception):
|
|||
|
||||
|
||||
class UmbralCorrectnessError(GenericUmbralError):
|
||||
def __init__(self, message, offending_cfrags):
|
||||
def __init__(self, message: str, offending_cfrags: List[CapsuleFrag]) -> None:
|
||||
super().__init__(message)
|
||||
self.offending_cfrags = offending_cfrags
|
||||
|
||||
|
@ -31,15 +31,16 @@ class UmbralCorrectnessError(GenericUmbralError):
|
|||
class Capsule(object):
|
||||
def __init__(self,
|
||||
params: UmbralParameters,
|
||||
point_e=None,
|
||||
point_v=None,
|
||||
bn_sig=None,
|
||||
point_e_prime=None,
|
||||
point_v_prime=None,
|
||||
point_noninteractive=None,
|
||||
delegating_pubkey: UmbralPublicKey = None,
|
||||
receiving_pubkey: UmbralPublicKey = None,
|
||||
verifying_pubkey=None):
|
||||
point_e: Optional[Point] = None,
|
||||
point_v: Optional[Union[int, Point]] = None,
|
||||
bn_sig: Optional[Union[int, CurveBN]] = None,
|
||||
point_e_prime: Optional[Point] = None,
|
||||
point_v_prime: Optional[Union[int, Point]] = None,
|
||||
point_noninteractive: Optional[Union[int, Point]] = None,
|
||||
delegating_pubkey: Optional[UmbralPublicKey] = None,
|
||||
receiving_pubkey: Optional[UmbralPublicKey] = None,
|
||||
verifying_pubkey: None = None
|
||||
) -> None:
|
||||
|
||||
self._umbral_params = params
|
||||
|
||||
|
@ -69,7 +70,7 @@ class Capsule(object):
|
|||
self._attached_cfrags = list()
|
||||
|
||||
@classmethod
|
||||
def expected_bytes_length(cls, curve: ec.EllipticCurve = None, activated=False):
|
||||
def expected_bytes_length(cls, curve: Optional[EllipticCurve] = None, activated: bool = False) -> int:
|
||||
"""
|
||||
Returns the size (in bytes) of a Capsule given the curve.
|
||||
If no curve is provided, it will use the default curve.
|
||||
|
@ -89,7 +90,7 @@ class Capsule(object):
|
|||
"""
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, capsule_bytes: bytes, params: UmbralParameters):
|
||||
def from_bytes(cls, capsule_bytes: bytes, params: UmbralParameters) -> 'Capsule':
|
||||
"""
|
||||
Instantiates a Capsule object from the serialized data.
|
||||
"""
|
||||
|
@ -123,7 +124,7 @@ class Capsule(object):
|
|||
components = splitter(capsule_bytes)
|
||||
return cls(params, *components)
|
||||
|
||||
def _set_cfrag_correctness_key(self, key_type, key: UmbralPublicKey):
|
||||
def _set_cfrag_correctness_key(self, key_type: str, key: UmbralPublicKey) -> bool:
|
||||
if key_type not in ("delegating", "receiving", "verifying"):
|
||||
raise ValueError("You can only set 'delegating', 'receiving' or 'verifying' keys.")
|
||||
|
||||
|
@ -142,15 +143,15 @@ class Capsule(object):
|
|||
else:
|
||||
raise ValueError("The {} key is already set; you can't set it again.".format(key_type))
|
||||
|
||||
|
||||
def get_correctness_keys(self):
|
||||
def get_correctness_keys(self) -> Dict[str, Union[UmbralPublicKey, None]]:
|
||||
return dict(self._cfrag_correctness_keys)
|
||||
|
||||
def set_correctness_keys(self,
|
||||
delegating: UmbralPublicKey = None,
|
||||
receiving: UmbralPublicKey = None,
|
||||
verifying=None
|
||||
):
|
||||
delegating: Optional[UmbralPublicKey] = None,
|
||||
receiving: Optional[UmbralPublicKey] = None,
|
||||
verifying: Optional[UmbralPublicKey] = None
|
||||
) -> Tuple[bool, bool, bool]:
|
||||
|
||||
delegating_key_details = self._set_cfrag_correctness_key("delegating", delegating)
|
||||
receiving_key_details = self._set_cfrag_correctness_key("receiving", receiving)
|
||||
verifying_key_details = self._set_cfrag_correctness_key("verifying", verifying)
|
||||
|
@ -236,10 +237,10 @@ class Capsule(object):
|
|||
self._point_v_prime = v
|
||||
self._point_noninteractive = ni
|
||||
|
||||
def __bytes__(self):
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: 'Capsule') -> bool:
|
||||
"""
|
||||
If both Capsules are activated, we compare only the activated components.
|
||||
Otherwise, we compare only original components.
|
||||
|
@ -260,7 +261,7 @@ class Capsule(object):
|
|||
# Again, it's hard to imagine why.
|
||||
return False
|
||||
|
||||
def __hash__(self):
|
||||
def __hash__(self) -> int:
|
||||
# We only ever want to store in a hash table based on original components;
|
||||
# A Capsule that is part of a dict needs to continue to be lookup-able even
|
||||
# after activation.
|
||||
|
@ -268,7 +269,7 @@ class Capsule(object):
|
|||
component_bytes = tuple(component.to_bytes() for component in self.original_components())
|
||||
return hash(component_bytes)
|
||||
|
||||
def __len__(self):
|
||||
def __len__(self) -> int:
|
||||
return len(self._attached_cfrags)
|
||||
|
||||
|
||||
|
@ -349,8 +350,8 @@ def split_rekey(delegating_privkey: UmbralPrivateKey, signer: Signer,
|
|||
return kfrags
|
||||
|
||||
|
||||
def reencrypt(kfrag: KFrag, capsule: Capsule, provide_proof = True,
|
||||
metadata: bytes = None) -> CapsuleFrag:
|
||||
def reencrypt(kfrag: KFrag, capsule: Capsule, provide_proof: bool = True,
|
||||
metadata: Optional[bytes] = None) -> CapsuleFrag:
|
||||
|
||||
if not capsule.verify():
|
||||
raise capsule.NotValid
|
||||
|
@ -371,7 +372,7 @@ def reencrypt(kfrag: KFrag, capsule: Capsule, provide_proof = True,
|
|||
|
||||
|
||||
def _encapsulate(alice_pubkey: UmbralPublicKey,
|
||||
key_length=DEM_KEYSIZE) -> Tuple[bytes, Capsule]:
|
||||
key_length: int = DEM_KEYSIZE) -> Tuple[bytes, Capsule]:
|
||||
"""Generates a symmetric key and its associated KEM ciphertext"""
|
||||
|
||||
params = alice_pubkey.params
|
||||
|
@ -395,7 +396,7 @@ def _encapsulate(alice_pubkey: UmbralPublicKey,
|
|||
|
||||
|
||||
def _decapsulate_original(priv_key: UmbralPrivateKey, capsule: Capsule,
|
||||
key_length=DEM_KEYSIZE) -> bytes:
|
||||
key_length: int = DEM_KEYSIZE) -> bytes:
|
||||
"""Derive the same symmetric key"""
|
||||
|
||||
priv_key = priv_key.bn_key
|
||||
|
@ -412,7 +413,7 @@ def _decapsulate_original(priv_key: UmbralPrivateKey, capsule: Capsule,
|
|||
|
||||
|
||||
def _decapsulate_reencrypted(receiving_privkey: UmbralPrivateKey, capsule: Capsule,
|
||||
key_length=DEM_KEYSIZE) -> bytes:
|
||||
key_length: int = DEM_KEYSIZE) -> bytes:
|
||||
"""Derive the same symmetric key"""
|
||||
params = capsule._umbral_params
|
||||
|
||||
|
@ -459,7 +460,7 @@ def encrypt(alice_pubkey: UmbralPublicKey, plaintext: bytes) -> Tuple[bytes, Cap
|
|||
|
||||
|
||||
def _open_capsule(capsule: Capsule, receiving_privkey: UmbralPrivateKey,
|
||||
check_proof=True) -> bytes:
|
||||
check_proof: bool = True) -> bytes:
|
||||
"""
|
||||
Activates the Capsule from the attached CFrags,
|
||||
opens the Capsule and returns what is inside.
|
||||
|
@ -486,7 +487,7 @@ def _open_capsule(capsule: Capsule, receiving_privkey: UmbralPrivateKey,
|
|||
|
||||
|
||||
def decrypt(ciphertext: bytes, capsule: Capsule, decrypting_key: UmbralPrivateKey,
|
||||
check_proof=True) -> bytes:
|
||||
check_proof: bool = True) -> bytes:
|
||||
"""
|
||||
Opens the capsule and gets what's inside.
|
||||
|
||||
|
|
|
@ -1,17 +1,19 @@
|
|||
import hmac
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import Curve
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
|
||||
|
||||
from umbral.config import default_curve
|
||||
from umbral.curve import Curve
|
||||
from umbral.keys import UmbralPublicKey, UmbralPrivateKey
|
||||
from umbral.curvebn import CurveBN
|
||||
from umbral.keys import UmbralPublicKey, UmbralPrivateKey
|
||||
from umbral.utils import get_field_order_size_in_bytes
|
||||
|
||||
|
||||
_BLAKE2B = hashes.BLAKE2b(64)
|
||||
|
||||
|
||||
|
@ -21,7 +23,7 @@ class Signature:
|
|||
between (r, s) and DER formatting.
|
||||
"""
|
||||
|
||||
def __init__(self, r: CurveBN, s: CurveBN):
|
||||
def __init__(self, r: CurveBN, s: CurveBN) -> None:
|
||||
self.r = r
|
||||
self.s = s
|
||||
|
||||
|
@ -29,7 +31,7 @@ class Signature:
|
|||
return "ECDSA Signature: {}".format(bytes(self).hex()[:15])
|
||||
|
||||
@classmethod
|
||||
def expected_bytes_length(cls, curve: Curve = None):
|
||||
def expected_bytes_length(cls, curve: Optional[Curve] = None) -> int:
|
||||
curve = curve if curve is not None else default_curve()
|
||||
return get_field_order_size_in_bytes(curve) * 2
|
||||
|
||||
|
@ -56,7 +58,7 @@ class Signature:
|
|||
return True
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, signature_as_bytes, der_encoded=False, curve: Curve = None):
|
||||
def from_bytes(cls, signature_as_bytes: bytes, der_encoded: bool = False, curve: Optional[Curve] = None) -> 'Signature':
|
||||
curve = curve if curve is not None else default_curve()
|
||||
if der_encoded:
|
||||
r, s = decode_dss_signature(signature_as_bytes)
|
||||
|
@ -71,10 +73,10 @@ class Signature:
|
|||
|
||||
return cls(CurveBN.from_int(r, curve), CurveBN.from_int(s, curve))
|
||||
|
||||
def _der_encoded_bytes(self):
|
||||
def _der_encoded_bytes(self) -> bytes:
|
||||
return encode_dss_signature(int(self.r), int(self.s))
|
||||
|
||||
def __bytes__(self):
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.r.to_bytes() + self.s.to_bytes()
|
||||
|
||||
def __len__(self):
|
||||
|
@ -83,10 +85,10 @@ class Signature:
|
|||
def __add__(self, other):
|
||||
return bytes(self) + other
|
||||
|
||||
def __radd__(self, other):
|
||||
def __radd__(self, other: bytes) -> bytes:
|
||||
return other + bytes(self)
|
||||
|
||||
def __eq__(self, other):
|
||||
def __eq__(self, other: 'Signature') -> bool:
|
||||
simple_bytes_match = hmac.compare_digest(bytes(self), bytes(other))
|
||||
der_encoded_match = hmac.compare_digest(self._der_encoded_bytes(), bytes(other))
|
||||
return simple_bytes_match or der_encoded_match
|
||||
|
@ -94,11 +96,11 @@ class Signature:
|
|||
|
||||
class Signer:
|
||||
|
||||
def __init__(self, private_key: UmbralPrivateKey):
|
||||
def __init__(self, private_key: UmbralPrivateKey) -> None:
|
||||
self.__cryptography_private_key = private_key.to_cryptography_privkey()
|
||||
self._curve = private_key.params.curve
|
||||
|
||||
def __call__(self, message):
|
||||
def __call__(self, message: bytes) -> Signature:
|
||||
"""
|
||||
Signs the message with this instance's private key.
|
||||
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
import math
|
||||
from typing import List
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
|
||||
from umbral import openssl
|
||||
from umbral.curve import Curve
|
||||
|
||||
|
||||
def lambda_coeff(id_i, selected_ids):
|
||||
def lambda_coeff(id_i: 'CurveBN', selected_ids: List['CurveBN']) -> 'CurveBN':
|
||||
ids = [x for x in selected_ids if x != id_i]
|
||||
|
||||
if not ids:
|
||||
|
@ -22,7 +23,7 @@ def lambda_coeff(id_i, selected_ids):
|
|||
return result
|
||||
|
||||
|
||||
def poly_eval(coeff, x):
|
||||
def poly_eval(coeff: List['CurveBN'], x: 'CurveBN') -> 'CurveBN':
|
||||
result = coeff[-1]
|
||||
for i in range(-2, -len(coeff) - 1, -1):
|
||||
result = ((result * x) + coeff[i])
|
||||
|
@ -30,7 +31,7 @@ def poly_eval(coeff, x):
|
|||
return result
|
||||
|
||||
|
||||
def kdf(ecpoint, key_length):
|
||||
def kdf(ecpoint: 'Point', key_length: int) -> bytes:
|
||||
data = ecpoint.to_bytes(is_compressed=True)
|
||||
|
||||
return HKDF(
|
||||
|
@ -42,7 +43,7 @@ def kdf(ecpoint, key_length):
|
|||
).derive(data)
|
||||
|
||||
|
||||
def get_field_order_size_in_bytes(curve):
|
||||
def get_field_order_size_in_bytes(curve: Curve) -> int:
|
||||
backend = default_backend()
|
||||
size_in_bits = openssl._get_ec_group_degree(curve.ec_group)
|
||||
return (size_in_bits + 7) // 8
|
||||
|
|
Loading…
Reference in New Issue