From a5d561f9b6429a68d1c43f668d7312d3496026ef Mon Sep 17 00:00:00 2001 From: Kieran Prasch Date: Thu, 20 Sep 2018 12:49:22 -0700 Subject: [PATCH] type hints for crypto.api SSl functions --- nucypher/crypto/api.py | 71 ++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/nucypher/crypto/api.py b/nucypher/crypto/api.py index b92924aa2..0f388d57f 100644 --- a/nucypher/crypto/api.py +++ b/nucypher/crypto/api.py @@ -1,21 +1,20 @@ -import os - import datetime +import os from random import SystemRandom - -from cryptography.hazmat.primitives.serialization import Encoding -from cryptography.x509 import Certificate -from typing import Union +from typing import Union, Tuple import sha3 from constant_sorrow import constants from cryptography import x509 from cryptography.exceptions import InvalidSignature from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.backends.openssl.ec import _EllipticCurvePrivateKey from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.x509 import Certificate from cryptography.x509.oid import NameOID - from umbral import pre from umbral.keys import UmbralPrivateKey, UmbralPublicKey @@ -72,7 +71,9 @@ def keccak_digest(*messages: bytes) -> bytes: return hash.digest() -def ecdsa_sign(message: bytes, privkey: UmbralPrivateKey) -> bytes: +def ecdsa_sign(message: bytes, + privkey: UmbralPrivateKey + ) -> bytes: """ Accepts a hashed message and signs it with the private key given. @@ -86,11 +87,10 @@ def ecdsa_sign(message: bytes, privkey: UmbralPrivateKey) -> bytes: return signature_der_bytes -def ecdsa_verify( - message: bytes, - signature: bytes, - pubkey: UmbralPublicKey -) -> bool: +def ecdsa_verify(message: bytes, + signature: bytes, + pubkey: UmbralPublicKey + ) -> bool: """ Accepts a message and signature and verifies it with the provided public key. @@ -132,19 +132,24 @@ def _save_tls_certificate(certificate: Certificate, return certificate_filepath -def load_tls_certificate(filepath): - with open(filepath, 'r') as certificate_file: - cert = x509.load_pem_x509_certificate(certificate_file.read(), - backend=default_backend()) - return cert +def load_tls_certificate(filepath: str) -> Certificate: + """Deserialize an X509 certificate from a filepath""" + try: + with open(filepath, 'r') as certificate_file: + cert = x509.load_pem_x509_certificate(certificate_file.read(), + backend=default_backend()) + return cert + except FileNotFoundError: + raise # TODO: Better error message here -def generate_self_signed_certificate(common_name, - curve, - host, - certificate_dir, - private_key=None, - days_valid=365): +def generate_self_signed_certificate(common_name: str, + curve: EllipticCurve, + host: str, + certificate_dir: str, + private_key: _EllipticCurvePrivateKey = None, + days_valid: int = 365 + ) -> Tuple[Certificate, _EllipticCurvePrivateKey, str]: if not private_key: private_key = ec.generate_private_key(curve, default_backend()) @@ -165,19 +170,19 @@ def generate_self_signed_certificate(common_name, cert = cert.add_extension(x509.SubjectAlternativeName([x509.DNSName(host)]), critical=False) cert = cert.sign(private_key, hashes.SHA512(), default_backend()) - # if certificate_dir: - tls_certificate_filepath = _save_tls_certificate(cert, directory=certificate_dir, common_name=common_name) - # else: - # tls_certificate_filepath = constants.CERTIFICATE_NOT_SAVED + if certificate_dir: # TODO: Make this more configurable? + tls_certificate_filepath = _save_tls_certificate(cert, directory=certificate_dir, common_name=common_name) + else: + tls_certificate_filepath = constants.CERTIFICATE_NOT_SAVED return cert, private_key, tls_certificate_filepath def encrypt_and_sign(recipient_pubkey_enc: UmbralPublicKey, - plaintext: bytes, - signer: Union["SignatureStamp", "_Constant"], - sign_plaintext=True, - ) -> tuple: + plaintext: bytes, + signer: 'SignatureStamp', + sign_plaintext: bool = True + ) -> Tuple[UmbralMessageKit, 'SignatureStamp']: if signer is not constants.DO_NOT_SIGN: # The caller didn't expressly tell us not to sign; we'll sign. @@ -197,7 +202,7 @@ def encrypt_and_sign(recipient_pubkey_enc: UmbralPublicKey, else: # Don't sign. signature = sig_header = constants.NOT_SIGNED - alice_pubkey = None + alice_pubkey = None # TODO: ..eh? ciphertext, capsule = pre.encrypt(recipient_pubkey_enc, sig_header + plaintext) message_kit = UmbralMessageKit(ciphertext=ciphertext, capsule=capsule)