mirror of https://github.com/nucypher/nucypher.git
type hints for crypto.api SSl functions
parent
ba0a23c889
commit
a5d561f9b6
|
@ -1,21 +1,20 @@
|
|||
import os
|
||||
|
||||
import datetime
|
||||
import os
|
||||
from random import SystemRandom
|
||||
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from cryptography.x509 import Certificate
|
||||
from typing import Union
|
||||
from typing import Union, Tuple
|
||||
|
||||
import sha3
|
||||
from constant_sorrow import constants
|
||||
from cryptography import x509
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.backends.openssl.ec import _EllipticCurvePrivateKey
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from cryptography.x509 import Certificate
|
||||
from cryptography.x509.oid import NameOID
|
||||
|
||||
from umbral import pre
|
||||
from umbral.keys import UmbralPrivateKey, UmbralPublicKey
|
||||
|
||||
|
@ -72,7 +71,9 @@ def keccak_digest(*messages: bytes) -> bytes:
|
|||
return hash.digest()
|
||||
|
||||
|
||||
def ecdsa_sign(message: bytes, privkey: UmbralPrivateKey) -> bytes:
|
||||
def ecdsa_sign(message: bytes,
|
||||
privkey: UmbralPrivateKey
|
||||
) -> bytes:
|
||||
"""
|
||||
Accepts a hashed message and signs it with the private key given.
|
||||
|
||||
|
@ -86,11 +87,10 @@ def ecdsa_sign(message: bytes, privkey: UmbralPrivateKey) -> bytes:
|
|||
return signature_der_bytes
|
||||
|
||||
|
||||
def ecdsa_verify(
|
||||
message: bytes,
|
||||
signature: bytes,
|
||||
pubkey: UmbralPublicKey
|
||||
) -> bool:
|
||||
def ecdsa_verify(message: bytes,
|
||||
signature: bytes,
|
||||
pubkey: UmbralPublicKey
|
||||
) -> bool:
|
||||
"""
|
||||
Accepts a message and signature and verifies it with the
|
||||
provided public key.
|
||||
|
@ -132,19 +132,24 @@ def _save_tls_certificate(certificate: Certificate,
|
|||
return certificate_filepath
|
||||
|
||||
|
||||
def load_tls_certificate(filepath):
|
||||
with open(filepath, 'r') as certificate_file:
|
||||
cert = x509.load_pem_x509_certificate(certificate_file.read(),
|
||||
backend=default_backend())
|
||||
return cert
|
||||
def load_tls_certificate(filepath: str) -> Certificate:
|
||||
"""Deserialize an X509 certificate from a filepath"""
|
||||
try:
|
||||
with open(filepath, 'r') as certificate_file:
|
||||
cert = x509.load_pem_x509_certificate(certificate_file.read(),
|
||||
backend=default_backend())
|
||||
return cert
|
||||
except FileNotFoundError:
|
||||
raise # TODO: Better error message here
|
||||
|
||||
|
||||
def generate_self_signed_certificate(common_name,
|
||||
curve,
|
||||
host,
|
||||
certificate_dir,
|
||||
private_key=None,
|
||||
days_valid=365):
|
||||
def generate_self_signed_certificate(common_name: str,
|
||||
curve: EllipticCurve,
|
||||
host: str,
|
||||
certificate_dir: str,
|
||||
private_key: _EllipticCurvePrivateKey = None,
|
||||
days_valid: int = 365
|
||||
) -> Tuple[Certificate, _EllipticCurvePrivateKey, str]:
|
||||
|
||||
if not private_key:
|
||||
private_key = ec.generate_private_key(curve, default_backend())
|
||||
|
@ -165,19 +170,19 @@ def generate_self_signed_certificate(common_name,
|
|||
cert = cert.add_extension(x509.SubjectAlternativeName([x509.DNSName(host)]), critical=False)
|
||||
cert = cert.sign(private_key, hashes.SHA512(), default_backend())
|
||||
|
||||
# if certificate_dir:
|
||||
tls_certificate_filepath = _save_tls_certificate(cert, directory=certificate_dir, common_name=common_name)
|
||||
# else:
|
||||
# tls_certificate_filepath = constants.CERTIFICATE_NOT_SAVED
|
||||
if certificate_dir: # TODO: Make this more configurable?
|
||||
tls_certificate_filepath = _save_tls_certificate(cert, directory=certificate_dir, common_name=common_name)
|
||||
else:
|
||||
tls_certificate_filepath = constants.CERTIFICATE_NOT_SAVED
|
||||
|
||||
return cert, private_key, tls_certificate_filepath
|
||||
|
||||
|
||||
def encrypt_and_sign(recipient_pubkey_enc: UmbralPublicKey,
|
||||
plaintext: bytes,
|
||||
signer: Union["SignatureStamp", "_Constant"],
|
||||
sign_plaintext=True,
|
||||
) -> tuple:
|
||||
plaintext: bytes,
|
||||
signer: 'SignatureStamp',
|
||||
sign_plaintext: bool = True
|
||||
) -> Tuple[UmbralMessageKit, 'SignatureStamp']:
|
||||
|
||||
if signer is not constants.DO_NOT_SIGN:
|
||||
# The caller didn't expressly tell us not to sign; we'll sign.
|
||||
|
@ -197,7 +202,7 @@ def encrypt_and_sign(recipient_pubkey_enc: UmbralPublicKey,
|
|||
else:
|
||||
# Don't sign.
|
||||
signature = sig_header = constants.NOT_SIGNED
|
||||
alice_pubkey = None
|
||||
alice_pubkey = None # TODO: ..eh?
|
||||
ciphertext, capsule = pre.encrypt(recipient_pubkey_enc, sig_header + plaintext)
|
||||
message_kit = UmbralMessageKit(ciphertext=ciphertext, capsule=capsule)
|
||||
|
||||
|
|
Loading…
Reference in New Issue