mirror of https://github.com/nucypher/nucypher.git
localize certificate references on VerifiableNode
parent
9061da4fdf
commit
b255416b1b
|
@ -115,8 +115,8 @@ def ecdsa_verify(message: bytes,
|
|||
|
||||
|
||||
def _save_tls_certificate(certificate: Certificate,
|
||||
full_filepath,
|
||||
force: bool = True,
|
||||
full_filepath: str,
|
||||
force: bool = True, # TODO
|
||||
) -> str:
|
||||
if force is False and os.path.isfile(full_filepath):
|
||||
raise FileExistsError('A TLS certificate already exists at {}.'.format(full_filepath))
|
||||
|
@ -125,6 +125,8 @@ def _save_tls_certificate(certificate: Certificate,
|
|||
public_pem_bytes = certificate.public_bytes(Encoding.PEM)
|
||||
certificate_file.write(public_pem_bytes)
|
||||
|
||||
return full_filepath
|
||||
|
||||
|
||||
def load_tls_certificate(filepath: str) -> Certificate:
|
||||
"""Deserialize an X509 certificate from a filepath"""
|
||||
|
@ -140,10 +142,9 @@ def load_tls_certificate(filepath: str) -> Certificate:
|
|||
def generate_self_signed_certificate(common_name: str,
|
||||
curve: EllipticCurve,
|
||||
host: str,
|
||||
certificate_dir: str,
|
||||
private_key: _EllipticCurvePrivateKey = None,
|
||||
days_valid: int = 365
|
||||
) -> Tuple[Certificate, _EllipticCurvePrivateKey, str]:
|
||||
) -> Tuple[Certificate, _EllipticCurvePrivateKey]:
|
||||
|
||||
if not private_key:
|
||||
private_key = ec.generate_private_key(curve, default_backend())
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
import os
|
||||
|
||||
import OpenSSL
|
||||
from constant_sorrow import constants
|
||||
from cryptography.x509 import Certificate
|
||||
from eth_keys.datatypes import Signature as EthSignature
|
||||
|
||||
from nucypher.crypto.api import _save_tls_certificate
|
||||
|
@ -17,13 +20,19 @@ class VerifiableNode:
|
|||
_verified_node = False
|
||||
|
||||
def __init__(self,
|
||||
certificate: Certificate,
|
||||
certificate_filepath: str,
|
||||
interface_signature=constants.NOT_SIGNED.bool_value(False),
|
||||
certificate_filepath: str = None,
|
||||
) -> None:
|
||||
|
||||
self.certificate_filepath = certificate_filepath # TODO: This gets messy when it is None (although it being None is actually reasonable in some cases, at least for testing). Let's make this a method instead that inspects the TLSHostingPower (similar to get_deployer()).
|
||||
self.certificate = certificate
|
||||
self.certificate_filepath = certificate_filepath
|
||||
self._interface_signature_object = interface_signature
|
||||
|
||||
# TODO: This gets messy when it is None
|
||||
# (although it being None is actually reasonable in some cases, at least for testing).
|
||||
# Let's make this a method instead that inspects the TLSHostingPower (similar to get_deployer()).
|
||||
|
||||
class InvalidNode(SuspiciousActivity):
|
||||
"""
|
||||
Raised when a node has an invalid characteristic - stamp, interface, or address.
|
||||
|
@ -34,6 +43,12 @@ class VerifiableNode:
|
|||
Raise when a Character tries to use another Character as decentralized when the latter is federated_only.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def from_tls_hosting_power(cls, tls_hosting_power: TLSHostingPower, *args, **kwargs):
|
||||
certificate_filepath = tls_hosting_power.keypair.certificate_filepath
|
||||
certificate = tls_hosting_power.keypair.certificate
|
||||
return cls(certificate=certificate, certificate_filepath=certificate_filepath, *args, **kwargs)
|
||||
|
||||
def _stamp_has_valid_wallet_signature(self):
|
||||
signature_bytes = self._evidence_of_decentralized_identity
|
||||
if signature_bytes is constants.NOT_SIGNED:
|
||||
|
@ -141,21 +156,28 @@ class VerifiableNode:
|
|||
raise NoSigningPower("This Ursula is a Stranger; you didn't init with an interface signature, so you can't verify.")
|
||||
return self._interface_signature_object
|
||||
|
||||
def certificate(self):
|
||||
return self._crypto_power.power_ups(TLSHostingPower).keypair.certificate
|
||||
|
||||
def save_certificate_to_disk(self, directory):
|
||||
x509 = OpenSSL.crypto.X509.from_cryptography(self.certificate())
|
||||
@property
|
||||
def common_name(self):
|
||||
x509 = OpenSSL.crypto.X509.from_cryptography(self.certificate)
|
||||
subject_components = x509.get_subject().get_components()
|
||||
common_name_as_bytes = subject_components[0][1]
|
||||
common_name_from_cert = common_name_as_bytes.decode()
|
||||
return common_name_from_cert
|
||||
|
||||
@property
|
||||
def certificate_filename(self):
|
||||
return self.common_name + '.pem' # TODO: use cert encoding..?
|
||||
|
||||
def save_certificate_to_disk(self, directory):
|
||||
x509 = OpenSSL.crypto.X509.from_cryptography(self.certificate)
|
||||
subject_components = x509.get_subject().get_components()
|
||||
common_name_as_bytes = subject_components[0][1]
|
||||
common_name_from_cert = common_name_as_bytes.decode()
|
||||
|
||||
if not self.checksum_public_address == common_name_from_cert:
|
||||
# TODO: It's better for us to have checked this a while ago so that this situation is impossible. #443
|
||||
raise ValueError(
|
||||
"You passed a common_name that is not the same one as the cert. Why? FWIW, You don't even need to pass a common name here; the cert will be saved according to the name on the cert itself.")
|
||||
|
||||
certificate_filepath = "{}/{}".format(directory,
|
||||
common_name_from_cert) # TODO: Do this with proper path tooling.
|
||||
_save_tls_certificate(self.certificate(), full_filepath=certificate_filepath)
|
||||
raise ValueError("You passed a common_name that is not the same one as the cert. Why? FWIW, You don't even need to pass a common name here; the cert will be saved according to the name on the cert itself.")
|
||||
|
||||
certificate_filepath = os.path.join(directory, self.certificate_filename)
|
||||
_save_tls_certificate(self.certificate, full_filepath=certificate_filepath)
|
||||
self.certificate_filepath = certificate_filepath
|
Loading…
Reference in New Issue