mirror of https://github.com/nucypher/nucypher.git
Added unit and integration tests for keyring.
parent
cbe03d9dc1
commit
305bf0a7fb
|
@ -702,7 +702,7 @@ class NucypherKeyring:
|
|||
public_key_dir=public_key_dir,
|
||||
private_key_dir=private_key_dir)
|
||||
|
||||
# Remove the parsed paths from the disk, weather they exist or not.
|
||||
# Remove the parsed paths from the disk, whether they exist or not.
|
||||
for filepath in keypaths.values():
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
os.remove(filepath)
|
||||
|
|
|
@ -16,9 +16,13 @@
|
|||
"""
|
||||
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
from unittest.mock import ANY
|
||||
|
||||
import pytest
|
||||
from constant_sorrow.constants import FEDERATED_ADDRESS
|
||||
from cryptography.hazmat.primitives.serialization.base import Encoding
|
||||
from flask import Flask
|
||||
from umbral.keys import UmbralPrivateKey
|
||||
from umbral.signing import Signer
|
||||
|
||||
|
@ -26,7 +30,11 @@ from nucypher.characters.lawful import Alice, Bob, Ursula
|
|||
from nucypher.config.constants import TEMPORARY_DOMAIN
|
||||
from nucypher.config.keyring import NucypherKeyring
|
||||
from nucypher.crypto.powers import DecryptingPower, DelegatingPower
|
||||
from nucypher.datastore.datastore import Datastore
|
||||
from nucypher.network.server import TLSHostingPower, ProxyRESTServer
|
||||
from nucypher.utilities.networking import LOOPBACK_ADDRESS
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
from tests.utils.matchers import IsType
|
||||
|
||||
|
||||
def test_generate_alice_keyring(tmpdir):
|
||||
|
@ -75,7 +83,7 @@ def test_characters_use_keyring(tmpdir):
|
|||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
encrypting=True,
|
||||
rest=True,
|
||||
host='127.0.0.1',
|
||||
host=LOOPBACK_ADDRESS,
|
||||
keyring_root=tmpdir)
|
||||
keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
alice = Alice(federated_only=True, start_learning_now=False, keyring=keyring)
|
||||
|
@ -83,8 +91,57 @@ def test_characters_use_keyring(tmpdir):
|
|||
Ursula(federated_only=True,
|
||||
start_learning_now=False,
|
||||
keyring=keyring,
|
||||
rest_host='127.0.0.1',
|
||||
rest_host=LOOPBACK_ADDRESS,
|
||||
rest_port=12345,
|
||||
db_filepath=tempfile.mkdtemp(),
|
||||
domain=TEMPORARY_DOMAIN)
|
||||
alice.disenchant() # To stop Alice's publication threadpool. TODO: Maybe only start it at first enactment?
|
||||
|
||||
|
||||
def test_tls_hosting_certificate_remains_the_same(tmpdir, mocker):
|
||||
keyring = NucypherKeyring.generate(
|
||||
checksum_address=FEDERATED_ADDRESS,
|
||||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
encrypting=True,
|
||||
rest=True,
|
||||
host=LOOPBACK_ADDRESS,
|
||||
keyring_root=tmpdir)
|
||||
keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
|
||||
rest_port = 12345
|
||||
db_filepath = tempfile.mkdtemp()
|
||||
|
||||
ursula = Ursula(federated_only=True,
|
||||
start_learning_now=False,
|
||||
keyring=keyring,
|
||||
rest_host=LOOPBACK_ADDRESS,
|
||||
rest_port=rest_port,
|
||||
db_filepath=db_filepath,
|
||||
domain=TEMPORARY_DOMAIN)
|
||||
|
||||
assert ursula.keyring is keyring
|
||||
assert ursula.certificate == ursula._crypto_power.power_ups(TLSHostingPower).keypair.certificate
|
||||
|
||||
original_certificate_bytes = ursula.certificate.public_bytes(encoding=Encoding.PEM)
|
||||
ursula.disenchant()
|
||||
del ursula
|
||||
|
||||
spy_rest_server_init = mocker.spy(ProxyRESTServer, '__init__')
|
||||
recreated_ursula = Ursula(federated_only=True,
|
||||
start_learning_now=False,
|
||||
keyring=keyring,
|
||||
rest_host=LOOPBACK_ADDRESS,
|
||||
rest_port=rest_port,
|
||||
db_filepath=db_filepath,
|
||||
domain=TEMPORARY_DOMAIN)
|
||||
|
||||
assert recreated_ursula.keyring is keyring
|
||||
assert recreated_ursula.certificate.public_bytes(encoding=Encoding.PEM) == original_certificate_bytes
|
||||
tls_hosting_power = recreated_ursula._crypto_power.power_ups(TLSHostingPower)
|
||||
spy_rest_server_init.assert_called_once_with(ANY, # self
|
||||
rest_host=LOOPBACK_ADDRESS,
|
||||
rest_port=rest_port,
|
||||
rest_app=IsType(Flask),
|
||||
datastore=IsType(Datastore),
|
||||
hosting_power=tls_hosting_power)
|
||||
recreated_ursula.disenchant()
|
||||
|
|
|
@ -17,6 +17,10 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
|||
from functools import partial
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from constant_sorrow.constants import FEDERATED_ADDRESS
|
||||
from cryptography.hazmat.primitives.serialization.base import Encoding
|
||||
|
||||
from nucypher.config.keyring import (
|
||||
_assemble_key_data,
|
||||
_generate_tls_keys,
|
||||
|
@ -25,10 +29,107 @@ from nucypher.config.keyring import (
|
|||
_serialize_private_key_to_pem,
|
||||
_deserialize_private_key_from_pem,
|
||||
_write_private_keyfile,
|
||||
_read_keyfile
|
||||
_read_keyfile, NucypherKeyring
|
||||
)
|
||||
from nucypher.crypto.api import _TLS_CURVE
|
||||
from nucypher.crypto.powers import DecryptingPower, SigningPower
|
||||
from nucypher.network.server import TLSHostingPower
|
||||
from nucypher.utilities.networking import LOOPBACK_ADDRESS
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
|
||||
|
||||
def test_keyring_invalid_password(tmpdir):
|
||||
with pytest.raises(NucypherKeyring.AuthenticationFailed):
|
||||
_generate_keyring(tmpdir, password='tobeornottobe') # password less than 16 characters
|
||||
|
||||
|
||||
def test_keyring_lock_unlock(tmpdir):
|
||||
keyring = _generate_keyring(tmpdir)
|
||||
assert not keyring.is_unlocked
|
||||
|
||||
keyring.unlock(INSECURE_DEVELOPMENT_PASSWORD)
|
||||
assert keyring.is_unlocked
|
||||
keyring.unlock(INSECURE_DEVELOPMENT_PASSWORD) # unlock when already unlocked
|
||||
assert keyring.is_unlocked
|
||||
|
||||
keyring.lock()
|
||||
assert not keyring.is_unlocked
|
||||
keyring.lock() # lock when already locked
|
||||
assert not keyring.is_unlocked
|
||||
|
||||
|
||||
def test_keyring_derive_crypto_power_without_unlock(tmpdir):
|
||||
keyring = _generate_keyring(tmpdir)
|
||||
with pytest.raises(NucypherKeyring.KeyringLocked):
|
||||
keyring.derive_crypto_power(power_class=DecryptingPower)
|
||||
|
||||
|
||||
def test_keyring_restoration(tmpdir):
|
||||
keyring = _generate_keyring(tmpdir)
|
||||
keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
|
||||
account = keyring.account
|
||||
checksum_address = keyring.checksum_address
|
||||
certificate_filepath = keyring.certificate_filepath
|
||||
encrypting_public_key_hex = keyring.encrypting_public_key.hex()
|
||||
signing_public_key_hex = keyring.signing_public_key.hex()
|
||||
|
||||
# tls power
|
||||
tls_hosting_power = keyring.derive_crypto_power(power_class=TLSHostingPower, host=LOOPBACK_ADDRESS)
|
||||
tls_hosting_power_public_key_numbers = tls_hosting_power.public_key().public_numbers()
|
||||
tls_hosting_power_certificate_public_bytes = \
|
||||
tls_hosting_power.keypair.certificate.public_bytes(encoding=Encoding.PEM)
|
||||
tls_hosting_power_certificate_filepath = tls_hosting_power.keypair.certificate_filepath
|
||||
|
||||
# decrypting power
|
||||
decrypting_power = keyring.derive_crypto_power(power_class=DecryptingPower)
|
||||
decrypting_power_public_key_hex = decrypting_power.public_key().hex()
|
||||
decrypting_power_fingerprint = decrypting_power.keypair.fingerprint()
|
||||
|
||||
# signing power
|
||||
signing_power = keyring.derive_crypto_power(power_class=SigningPower)
|
||||
signing_power_public_key_hex = signing_power.public_key().hex()
|
||||
signing_power_fingerprint = signing_power.keypair.fingerprint()
|
||||
|
||||
# get rid of object, but not persistent data
|
||||
del keyring
|
||||
|
||||
restored_keyring = NucypherKeyring(keyring_root=tmpdir, account=account)
|
||||
restored_keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
|
||||
assert restored_keyring.account == account
|
||||
assert restored_keyring.checksum_address == checksum_address
|
||||
assert restored_keyring.certificate_filepath == certificate_filepath
|
||||
assert restored_keyring.encrypting_public_key.hex() == encrypting_public_key_hex
|
||||
assert restored_keyring.signing_public_key.hex() == signing_public_key_hex
|
||||
|
||||
# tls power
|
||||
restored_tls_hosting_power = restored_keyring.derive_crypto_power(power_class=TLSHostingPower,
|
||||
host=LOOPBACK_ADDRESS)
|
||||
assert restored_tls_hosting_power.public_key().public_numbers() == tls_hosting_power_public_key_numbers
|
||||
assert restored_tls_hosting_power.keypair.certificate.public_bytes(encoding=Encoding.PEM) == \
|
||||
tls_hosting_power_certificate_public_bytes
|
||||
assert restored_tls_hosting_power.keypair.certificate_filepath == tls_hosting_power_certificate_filepath
|
||||
|
||||
# decrypting power
|
||||
restored_decrypting_power = restored_keyring.derive_crypto_power(power_class=DecryptingPower)
|
||||
assert restored_decrypting_power.public_key().hex() == decrypting_power_public_key_hex
|
||||
assert restored_decrypting_power.keypair.fingerprint() == decrypting_power_fingerprint
|
||||
|
||||
# signing power
|
||||
restored_signing_power = restored_keyring.derive_crypto_power(power_class=SigningPower)
|
||||
assert restored_signing_power.public_key().hex() == signing_power_public_key_hex
|
||||
assert restored_signing_power.keypair.fingerprint() == signing_power_fingerprint
|
||||
|
||||
|
||||
def test_keyring_destroy(tmpdir):
|
||||
keyring = _generate_keyring(tmpdir)
|
||||
keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
|
||||
keyring.destroy()
|
||||
|
||||
with pytest.raises(FileNotFoundError):
|
||||
keyring.encrypting_public_key
|
||||
|
||||
|
||||
def test_private_key_serialization():
|
||||
|
@ -94,3 +195,19 @@ def test_tls_write_read_private_keyfile(temp_dir_path):
|
|||
deserializer=tls_deserializer)
|
||||
|
||||
assert private_key.private_numbers() == deserialized_private_key_from_file.private_numbers()
|
||||
|
||||
|
||||
def _generate_keyring(root,
|
||||
checksum_address=FEDERATED_ADDRESS,
|
||||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
encrypting=True,
|
||||
rest=True,
|
||||
host=LOOPBACK_ADDRESS):
|
||||
keyring = NucypherKeyring.generate(
|
||||
checksum_address=checksum_address,
|
||||
password=password,
|
||||
encrypting=encrypting,
|
||||
rest=rest,
|
||||
host=host,
|
||||
keyring_root=root)
|
||||
return keyring
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
|
||||
class IsType:
|
||||
def __init__(self, expected_type):
|
||||
self.expected_type = expected_type
|
||||
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, self.expected_type)
|
||||
|
||||
def __repr__(self):
|
||||
return f'<IsType({self.expected_type})>'
|
Loading…
Reference in New Issue