nucypher/tests/unit/crypto/test_keystore.py

338 lines
12 KiB
Python

import os
import random
import string
from pathlib import Path
import pytest
from constant_sorrow.constants import KEYSTORE_LOCKED
from cryptography.hazmat.primitives._serialization import Encoding
from mnemonic.mnemonic import Mnemonic
from nucypher_core.umbral import SecretKeyFactory
from nucypher.crypto.keystore import (
_DELEGATING_INFO,
_MNEMONIC_LANGUAGE,
InvalidPassword,
Keystore,
_assemble_keystore,
_deserialize_keystore,
_read_keystore,
_serialize_keystore,
_write_keystore,
validate_keystore_filename,
)
from nucypher.crypto.powers import (
DecryptingPower,
DelegatingPower,
SigningPower,
ThresholdRequestDecryptingPower,
TLSHostingPower,
)
from nucypher.utilities.networking import LOOPBACK_ADDRESS
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
def test_invalid_keystore_path_parts(tmp_path, tmp_path_factory):
# Setup
not_hex = 'h' + ''.join(random.choice(string.ascii_letters) for _ in range(Keystore._ID_SIZE))
invalid_paths = (
'nosuffix', # missing suffix
'deadbeef.priv', # missing created epoch
f'123-{not_hex[:3]}.priv', # too short
f'123-{not_hex}.priv', # not hex
)
# Test
for invalid_path in invalid_paths:
invalid_path = Path(invalid_path)
with pytest.raises(Keystore.Invalid, match=f'{invalid_path} is not a valid keystore filename'):
validate_keystore_filename(path=invalid_path)
def test_invalid_keystore_file_type(tmp_path, tmp_path_factory):
# Not a file
invalid_path = Path()
with pytest.raises(ValueError, match="Keystore path must be a file."):
_keystore = Keystore(invalid_path)
invalid_path = Path(tmp_path)
with pytest.raises(ValueError, match="Keystore path must be a file."):
_keystore = Keystore(invalid_path)
# Not an existing file
invalid_path = Path('does-not-exist')
with pytest.raises(Keystore.NotFound, match=f"Keystore '{invalid_path.absolute()}' does not exist."):
_keystore = Keystore(invalid_path)
def test_keystore_instantiation_defaults(tmp_path_factory):
# Setup
parent = Path(tmp_path_factory.mktemp('test-keystore-'))
parent.touch(exist_ok=True)
keystore_id = ''.join(random.choice(string.hexdigits.lower()) for _ in range(Keystore._ID_SIZE))
path = parent / f'123-{keystore_id}.priv'
path.touch()
# Test
keystore = Keystore(path)
assert keystore.keystore_path == path # retains the correct keystore path
assert keystore.id == keystore_id # accurately parses filename for ID
assert not keystore.is_unlocked # defaults to locked
assert keystore._Keystore__secret is KEYSTORE_LOCKED
assert parent in keystore.keystore_path.parents # created in the correct directory
def test_keystore_generation_defaults(tmp_path_factory):
# Setup
parent = Path(tmp_path_factory.mktemp('test-keystore-'))
parent.touch(exist_ok=True)
# Test
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=parent)
assert not keystore.is_unlocked # defaults to locked
assert keystore._Keystore__secret is KEYSTORE_LOCKED
assert parent in keystore.keystore_path.parents # created in the correct directory
def test_keystore_invalid_password(tmpdir):
with pytest.raises(InvalidPassword):
_keystore = Keystore.generate('short', keystore_dir=tmpdir)
def test_keystore_generate_report_interactive_false(tmpdir):
_keystore, words = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir,
interactive=False)
assert len(words.split(" ")) == 24
def test_keystore_derive_crypto_power_without_unlock(tmpdir):
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
with pytest.raises(Keystore.Locked):
keystore.derive_crypto_power(power_class=DecryptingPower)
def test_keystore_serializer():
encrypted_secret, psalt, wsalt = b'peanuts! Get your peanuts!', b'sea salt', b'bath salt'
payload = _assemble_keystore(encrypted_secret=encrypted_secret, password_salt=psalt, wrapper_salt=wsalt)
serialized_payload = _serialize_keystore(payload)
deserialized_key_data = _deserialize_keystore(serialized_payload)
assert deserialized_key_data['key'] == encrypted_secret
assert deserialized_key_data['password_salt'] == psalt
assert deserialized_key_data['wrapper_salt'] == wsalt
def test_keystore_lock_unlock(tmpdir):
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
# locked by default
assert not keystore.is_unlocked
assert keystore._Keystore__secret is KEYSTORE_LOCKED
# incorrect password
with pytest.raises(Keystore.AuthenticationFailed):
keystore.unlock('opensaysme')
# unlock
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
assert keystore.is_unlocked
assert keystore._Keystore__secret != KEYSTORE_LOCKED
assert isinstance(keystore._Keystore__secret, bytes)
# unlock when already unlocked
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
assert keystore.is_unlocked
# incorrect password when already unlocked
with pytest.raises(Keystore.AuthenticationFailed):
keystore.unlock('opensaysme')
# lock
keystore.lock()
assert not keystore.is_unlocked
# lock when already locked
keystore.lock()
assert not keystore.is_unlocked
def test_write_keystore_file(temp_dir_path):
temp_filepath = Path(temp_dir_path) / "test_private_key_serialization_file"
encrypted_secret, psalt, wsalt = b'peanuts! Get your peanuts!', b'sea salt', b'bath_salt'
payload = _assemble_keystore(encrypted_secret=encrypted_secret, password_salt=psalt, wrapper_salt=wsalt)
_write_keystore(path=temp_filepath, payload=payload, serializer=_serialize_keystore)
deserialized_payload_from_file = _read_keystore(path=temp_filepath, deserializer=_deserialize_keystore)
assert deserialized_payload_from_file['key'] == encrypted_secret
assert deserialized_payload_from_file['password_salt'] == psalt
assert deserialized_payload_from_file['wrapper_salt'] == wsalt
def test_decrypt_keystore(tmpdir, mocker):
# Setup
spy = mocker.spy(Mnemonic, 'generate')
# Decrypt post-generation
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
words = spy.spy_return
secret = bytes(mnemonic.to_entropy(words))
assert keystore._Keystore__secret == secret
# Decrypt from keystore file
keystore_path = keystore.keystore_path
del words
del keystore
keystore = Keystore(keystore_path=keystore_path)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
assert keystore._Keystore__secret == secret
def test_keystore_persistence(tmpdir):
"""Regression test for keystore file persistence"""
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
path = keystore.keystore_path
del keystore
assert path.exists()
def test_restore_keystore_from_mnemonic(tmpdir, mocker):
# Setup
spy = mocker.spy(Mnemonic, 'generate')
# Decrypt post-generation
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
words = spy.spy_return
secret = bytes(mnemonic.to_entropy(words))
keystore_path = keystore.keystore_path
# remove local and disk references, simulating a
# lost keystore or forgotten password.
del keystore
os.unlink(keystore_path)
# prove the keystore is lost or missing
assert not keystore_path.exists()
with pytest.raises(Keystore.NotFound):
_keystore = Keystore(keystore_path=keystore_path)
# Restore with user-supplied words and a new password
keystore = Keystore.restore(words=words, password="ANewHope", keystore_dir=tmpdir)
keystore.unlock(password="ANewHope")
assert keystore._Keystore__secret == secret
def test_import_custom_keystore(tmpdir):
# Too short - 32 bytes is required
custom_secret = b"tooshort"
with pytest.raises(
ValueError,
match=f"Entropy bytes bust be exactly {SecretKeyFactory.seed_size()}.",
):
_keystore = Keystore.import_secure(
key_material=custom_secret,
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir,
)
# Too short - 32 bytes is required
custom_secret = b"thisisabunchofbytesthatisabittoolong"
with pytest.raises(
ValueError,
match=f"Entropy bytes bust be exactly {SecretKeyFactory.seed_size()}.",
):
_keystore = Keystore.import_secure(
key_material=custom_secret,
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir,
)
# Import private key
custom_secret = os.urandom(SecretKeyFactory.seed_size()) # insecure but works
keystore = Keystore.import_secure(
key_material=custom_secret,
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir,
)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
assert keystore._Keystore__secret == custom_secret
keystore.lock()
path = keystore.keystore_path
del keystore
# Restore custom secret from encrypted keystore file
keystore = Keystore(keystore_path=path)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
assert keystore._Keystore__secret == custom_secret
def test_derive_signing_power(tmpdir):
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
signing_power = keystore.derive_crypto_power(power_class=SigningPower)
assert signing_power.public_key().to_compressed_bytes().hex()
assert signing_power.keypair.fingerprint()
def test_derive_decrypting_power(tmpdir):
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
decrypting_power = keystore.derive_crypto_power(power_class=DecryptingPower)
assert decrypting_power.public_key().to_compressed_bytes().hex()
assert decrypting_power.keypair.fingerprint()
def test_derive_delegating_power(tmpdir):
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
delegating_power = keystore.derive_crypto_power(power_class=DelegatingPower)
parent_skf = SecretKeyFactory.from_secure_randomness(keystore._Keystore__secret)
child_skf = parent_skf.make_factory(_DELEGATING_INFO)
ref_pk = child_skf.make_key(b"some-label").public_key()
assert (
delegating_power._get_privkey_from_label(label=b"some-label").public_key()
== ref_pk
)
def test_derive_hosting_power(tmpdir):
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
hosting_power = keystore.derive_crypto_power(power_class=TLSHostingPower, host=LOOPBACK_ADDRESS)
assert hosting_power.public_key().public_numbers()
assert hosting_power.keypair.certificate.public_bytes(encoding=Encoding.PEM)
rederived_hosting_power = keystore.derive_crypto_power(power_class=TLSHostingPower, host=LOOPBACK_ADDRESS)
assert hosting_power.public_key().public_numbers() == rederived_hosting_power.public_key().public_numbers()
def test_derive_threshold_request_decrypting_power(tmpdir):
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
threshold_request_decrypting_power = keystore.derive_crypto_power(
power_class=ThresholdRequestDecryptingPower
)
ritual_id = 23
public_key = threshold_request_decrypting_power.get_pubkey_from_ritual_id(
ritual_id=ritual_id
)
other_public_key = threshold_request_decrypting_power.get_pubkey_from_ritual_id(
ritual_id=ritual_id
)
assert bytes(public_key) == bytes(other_public_key)
different_ritual_public_key = (
threshold_request_decrypting_power.get_pubkey_from_ritual_id(ritual_id=0)
)
assert bytes(public_key) != bytes(different_ritual_public_key)