mirror of https://github.com/nucypher/nucypher.git
Deterministic power derivation from keystore. Iterating on keystore drafting.
parent
11a630066e
commit
82e81fc710
|
@ -19,13 +19,14 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
|||
import json
|
||||
import os
|
||||
import stat
|
||||
import time
|
||||
import string
|
||||
from json import JSONDecodeError
|
||||
from os.path import abspath
|
||||
from pathlib import Path
|
||||
from secrets import token_bytes
|
||||
from typing import Callable, ClassVar, Dict, List, Union, Optional, Tuple
|
||||
|
||||
import time
|
||||
from constant_sorrow.constants import KEYRING_LOCKED
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
||||
|
@ -33,7 +34,6 @@ from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
|
|||
from mnemonic.mnemonic import Mnemonic
|
||||
from nacl.exceptions import CryptoError
|
||||
from nacl.secret import SecretBox
|
||||
from umbral.keys import UmbralPrivateKey, UmbralKeyingMaterial
|
||||
|
||||
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
|
||||
from nucypher.crypto.constants import BLAKE2B
|
||||
|
@ -42,7 +42,7 @@ from nucypher.crypto.powers import (
|
|||
DecryptingPower,
|
||||
DerivedKeyBasedPower,
|
||||
KeyPairBasedPower,
|
||||
SigningPower
|
||||
SigningPower, CryptoPowerUp, DelegatingPower
|
||||
)
|
||||
from nucypher.crypto.tls import (
|
||||
_write_tls_certificate,
|
||||
|
@ -50,13 +50,10 @@ from nucypher.crypto.tls import (
|
|||
_TLS_CURVE
|
||||
)
|
||||
from nucypher.network.server import TLSHostingPower
|
||||
from nucypher.utilities.logging import Logger
|
||||
from umbral.keys import UmbralPrivateKey, UmbralKeyingMaterial
|
||||
|
||||
# HKDF
|
||||
__WRAPPING_KEY_LENGTH = 32
|
||||
__HKDF_HASH_ALGORITHM = BLAKE2B
|
||||
_SALT_SIZE = 32
|
||||
|
||||
__INFO_BASE = b'NuCypher/'
|
||||
_WRAPPING_INFO = __INFO_BASE + b'wrap'
|
||||
_VERIFYING_INFO = __INFO_BASE + b'verify'
|
||||
|
@ -64,15 +61,25 @@ _DECRYPTING_INFO = __INFO_BASE + b'encrypt'
|
|||
_DELEGATING_INFO = __INFO_BASE + b'delegate'
|
||||
_TLS_INFO = __INFO_BASE + b'tls'
|
||||
|
||||
# Mnemonic
|
||||
# Wrapping key
|
||||
_MINIMUM_PASSWORD_LENGTH = 8
|
||||
_SALT_SIZE = 32
|
||||
__WRAPPING_KEY_LENGTH = 32
|
||||
|
||||
# Mnemonic
|
||||
_ENTROPY_BITS = 256
|
||||
_MNEMONIC_LANGUAGE = "english"
|
||||
|
||||
# Keystore
|
||||
# Keystore Filename
|
||||
_ID_SIZE = 32
|
||||
_DEFAULT_PATH: Path = DEFAULT_CONFIG_ROOT / 'keystore'
|
||||
_DELIMITER = '-'
|
||||
_SUFFIX = 'priv'
|
||||
|
||||
# Keystore File
|
||||
FILE_ENCODING = 'utf-8'
|
||||
__PRIVATE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL # Write, Create, Non-Existing
|
||||
__PRIVATE_MODE = stat.S_IRUSR | stat.S_IWUSR # 0o600
|
||||
__PRIVATE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL # Write, Create, Non-Existing
|
||||
__PRIVATE_MODE = stat.S_IRUSR | stat.S_IWUSR # 0o600
|
||||
|
||||
|
||||
class InvalidPassword(ValueError):
|
||||
|
@ -109,7 +116,7 @@ def _derive_wrapping_key(password: str, salt: bytes) -> bytes:
|
|||
p=1,
|
||||
backend=default_backend()
|
||||
)
|
||||
derived_key = kdf.derive(password.encode())
|
||||
derived_key = kdf.derive(key_material=password.encode())
|
||||
return derived_key
|
||||
|
||||
|
||||
|
@ -213,26 +220,71 @@ def validate_keystore_password(password: str) -> List:
|
|||
return failures
|
||||
|
||||
|
||||
def unlock_required(func):
|
||||
"""Method decorator"""
|
||||
def wrapped(keyring=None, *args, **kwargs):
|
||||
if not keyring.is_unlocked:
|
||||
raise Keystore.Locked(f"{keyring.id} is locked. Unlock with .unlock")
|
||||
return func(keyring, *args, **kwargs)
|
||||
return wrapped
|
||||
def validate_keystore_filename(path: Path) -> None:
|
||||
base_name = path.name.rstrip('.' + _SUFFIX)
|
||||
parts = base_name.split(_DELIMITER)
|
||||
|
||||
try:
|
||||
created, keystore_id = parts
|
||||
except ValueError:
|
||||
raise Keystore.Invalid(f'{path} is not a valid keystore filename')
|
||||
|
||||
validators = (
|
||||
bool(len(keystore_id) == _ID_SIZE),
|
||||
all(char in string.hexdigits for char in keystore_id)
|
||||
)
|
||||
|
||||
valid_path = all(validators)
|
||||
if not valid_path:
|
||||
raise Keystore.Invalid(f'{path} is not a valid keystore filename')
|
||||
|
||||
|
||||
def _parse_path(path: Path) -> Tuple[int, str]:
|
||||
|
||||
# validate keystore file
|
||||
path = Path(path)
|
||||
if not path.exists():
|
||||
raise Keystore.NotFound(f"Keystore '{path}' does not exist.")
|
||||
if not path.is_file():
|
||||
raise ValueError('Keystore path must be a file.')
|
||||
if not path.match(f'*{_DELIMITER}*.{_SUFFIX}'):
|
||||
Keystore.Invalid(f'{path} is not a valid keystore filename')
|
||||
|
||||
# dissect keystore filename
|
||||
validate_keystore_filename(path)
|
||||
base_name = path.name.rstrip('.'+_SUFFIX)
|
||||
parts = base_name.split(_DELIMITER)
|
||||
created, keystore_id = parts
|
||||
return created, keystore_id
|
||||
|
||||
|
||||
def _derive_hosting_power(host: str,
|
||||
private_key: UmbralPrivateKey,
|
||||
keyring_dir: Path
|
||||
) -> TLSHostingPower:
|
||||
if not host:
|
||||
raise ValueError('Host is required to derive a TLSHostingPower')
|
||||
public_key = bytes(private_key.pubkey).hex()
|
||||
certificate_filepath = keyring_dir / f'{public_key}.pem'
|
||||
keypair = HostingKeypair(host=host,
|
||||
private_key=private_key,
|
||||
generate_certificate=False,
|
||||
certificate_filepath=str(certificate_filepath))
|
||||
power = TLSHostingPower(keypair=keypair, host=host)
|
||||
return power
|
||||
|
||||
|
||||
def generate_tls_certificate(host: str, pseudonym: str, path: Path) -> Path:
|
||||
private_key, cert = _generate_tls_keys(host=host, checksum_address=pseudonym, curve=_TLS_CURVE)
|
||||
certificate_filepath = _write_tls_certificate(full_filepath=path, certificate=cert)
|
||||
return certificate_filepath
|
||||
|
||||
|
||||
class Keystore:
|
||||
|
||||
log = Logger("keys")
|
||||
|
||||
_DEFAULT_PATH: Path = DEFAULT_CONFIG_ROOT / 'keystore'
|
||||
_ID_LENGTH = 32
|
||||
_DELIMITER = '-'
|
||||
_SUFFIX = 'priv'
|
||||
|
||||
__HKDF_INFO = {SigningPower: _VERIFYING_INFO,
|
||||
DecryptingPower: _DECRYPTING_INFO,
|
||||
DelegatingPower: _DELEGATING_INFO,
|
||||
TLSHostingPower: _TLS_INFO}
|
||||
|
||||
class Exists(FileExistsError):
|
||||
|
@ -252,55 +304,65 @@ class Keystore:
|
|||
|
||||
def __init__(self, keystore_path: Path):
|
||||
self.keystore_path = keystore_path
|
||||
created, stamp = self.__parse_path(keystore_path)
|
||||
self.__created = created
|
||||
self.__id = stamp
|
||||
self.__created, self.__id = _parse_path(keystore_path)
|
||||
self.__secret = KEYRING_LOCKED
|
||||
|
||||
def __del__(self):
|
||||
self.lock()
|
||||
|
||||
def __decrypt_keystore(self, path: Path, password: str) -> bool:
|
||||
payload = _read_keystore(path, deserializer=_deserialize_keystore)
|
||||
wrapping_key = _derive_wrapping_key(salt=payload['salt'], password=password)
|
||||
self.__secret = SecretBox(wrapping_key).decrypt(payload['key'])
|
||||
return True
|
||||
|
||||
def __parse_path(self, path: Path) -> Tuple[int, str]:
|
||||
path = Path(path)
|
||||
if not path.exists():
|
||||
raise Keystore.NotFound(f"Keystore '{path}' does not exist.")
|
||||
if not path.is_file():
|
||||
raise ValueError('Keystore path must be a file.')
|
||||
if not path.match(f'*{self._DELIMITER}*.{self._SUFFIX}'):
|
||||
Keystore.Invalid(f'{path} is not a valid keystore path')
|
||||
@staticmethod
|
||||
def __save(secret: bytes, password: str, keystore_dir: Optional[Path] = None) -> Path:
|
||||
failures = validate_keystore_password(password)
|
||||
if failures:
|
||||
# TODO: Ensure this scope is separable from the scope containing the password
|
||||
# to help avoid unintentional logging of the password.
|
||||
raise InvalidPassword(''.join(failures))
|
||||
|
||||
base_name = path.name.rstrip('.'+self._SUFFIX)
|
||||
try:
|
||||
created, stamp = base_name.split(self._DELIMITER)
|
||||
except ValueError:
|
||||
raise Keystore.Invalid(f'{path} is not a valid keystore path')
|
||||
# Derive verifying key (used as ID)
|
||||
verifying_key = _derive_umbral_key(material=secret, info=_VERIFYING_INFO)
|
||||
keystore_id = verifying_key.to_bytes().hex()[:_ID_SIZE]
|
||||
|
||||
return created, stamp
|
||||
# Generate paths
|
||||
keystore_dir = keystore_dir or _DEFAULT_PATH
|
||||
os.makedirs(abspath(keystore_dir), exist_ok=True, mode=0o700)
|
||||
keystore_path = generate_keystore_filepath(parent=keystore_dir, id=keystore_id)
|
||||
|
||||
def __derive_hosting_power(self,
|
||||
host: str,
|
||||
private_key: UmbralPrivateKey,
|
||||
keyring_dir: Path
|
||||
) -> TLSHostingPower:
|
||||
public_key = bytes(private_key.pubkey).hex()
|
||||
certificate_filepath = keyring_dir / f'{public_key}.pem'
|
||||
keypair = HostingKeypair(host=host,
|
||||
private_key=private_key,
|
||||
generate_certificate=False,
|
||||
certificate_filepath=str(certificate_filepath))
|
||||
power = TLSHostingPower(keypair=keypair, host=host)
|
||||
return power
|
||||
# Encrypt secret
|
||||
__salt = token_bytes(_SALT_SIZE)
|
||||
__wrapping_key = _derive_wrapping_key(salt=__salt, password=password)
|
||||
encrypted_secret = bytes(SecretBox(__wrapping_key).encrypt(secret))
|
||||
|
||||
# Create keystore file
|
||||
keystore_payload = _assemble_keystore(encrypted_secret=encrypted_secret, salt=__salt)
|
||||
_write_keystore(path=keystore_path, payload=keystore_payload, serializer=_serialize_keystore)
|
||||
|
||||
return keystore_path
|
||||
|
||||
#
|
||||
# Public API
|
||||
#
|
||||
|
||||
|
||||
@classmethod
|
||||
def restore(cls, words: str, password: str, keystore_dir: Optional[Path] = None) -> 'Keystore':
|
||||
__mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
|
||||
__secret = __mnemonic.to_entropy(words)
|
||||
path = Keystore.__save(secret=__secret, password=password, keystore_dir=keystore_dir)
|
||||
keystore = cls(keystore_path=path)
|
||||
return keystore
|
||||
|
||||
@classmethod
|
||||
def generate(cls, password: str, keystore_dir: Optional[Path] = None) -> 'Keystore':
|
||||
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
|
||||
__words = mnemonic.generate(strength=_ENTROPY_BITS)
|
||||
__secret = mnemonic.to_entropy(__words)
|
||||
path = Keystore.__save(secret=__secret, password=password, keystore_dir=keystore_dir)
|
||||
keystore = cls(keystore_path=path)
|
||||
return keystore
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self.__id
|
||||
|
@ -314,8 +376,6 @@ class Keystore:
|
|||
return self.is_unlocked
|
||||
|
||||
def unlock(self, password: str) -> bool:
|
||||
if self.is_unlocked:
|
||||
return self.is_unlocked
|
||||
try:
|
||||
self.__decrypt_keystore(path=self.keystore_path, password=password)
|
||||
except CryptoError:
|
||||
|
@ -323,77 +383,33 @@ class Keystore:
|
|||
raise self.AuthenticationFailed
|
||||
return self.is_unlocked
|
||||
|
||||
@unlock_required
|
||||
def derive_crypto_power(self,
|
||||
power_class: ClassVar,
|
||||
host: Optional[str] = None
|
||||
power_class: ClassVar[CryptoPowerUp],
|
||||
*power_args, **power_kwargs
|
||||
) -> Union[KeyPairBasedPower, DerivedKeyBasedPower]:
|
||||
|
||||
if not self.is_unlocked:
|
||||
raise Keystore.Locked(f"{self.id} is locked. Unlock with .unlock")
|
||||
try:
|
||||
info = self.__HKDF_INFO[power_class]
|
||||
except KeyError:
|
||||
failure_message = f"{power_class.__name__} is an invalid type for deriving a CryptoPower"
|
||||
raise TypeError(failure_message)
|
||||
else:
|
||||
__private = _derive_umbral_key(material=self.__secret, info=info)
|
||||
__private_key = _derive_umbral_key(material=self.__secret, info=info)
|
||||
|
||||
if power_class is TLSHostingPower: # TODO: something more elegant?
|
||||
if not host:
|
||||
raise ValueError('Host is required to derive a TLSHostingPower')
|
||||
power = self.__derive_hosting_power(host=host,
|
||||
private_key=__private,
|
||||
keyring_dir=self.keystore_dir)
|
||||
power = _derive_hosting_power(private_key=__private_key, *power_args, **power_kwargs)
|
||||
|
||||
elif issubclass(power_class, KeyPairBasedPower):
|
||||
keypair = power_class._keypair_class(__private)
|
||||
power = power_class(keypair=keypair)
|
||||
keypair = power_class._keypair_class(__private_key)
|
||||
power = power_class(keypair=keypair, *power_args, **power_kwargs)
|
||||
|
||||
elif issubclass(power_class, DerivedKeyBasedPower):
|
||||
keying_material = UmbralKeyingMaterial.from_bytes(key_bytes=__private.to_bytes())
|
||||
power = power_class(keying_material=keying_material)
|
||||
power = power_class(keying_material=__private_key.to_bytes())
|
||||
|
||||
else:
|
||||
failure_message = f"{power_class.__name__} is an invalid type for deriving a CryptoPower."
|
||||
raise ValueError(failure_message)
|
||||
|
||||
return power
|
||||
|
||||
@classmethod
|
||||
def generate(cls, password: str, keystore_dir: Path = None) -> 'Keystore':
|
||||
|
||||
keystore_dir = keystore_dir or cls._DEFAULT_PATH
|
||||
failures = validate_keystore_password(password)
|
||||
if failures:
|
||||
# TODO: Ensure this scope is separable from the scope containing the password
|
||||
raise InvalidPassword(", ".join(failures))
|
||||
|
||||
# Generate seed
|
||||
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
|
||||
words = mnemonic.generate(strength=_ENTROPY_BITS)
|
||||
secret = mnemonic.to_entropy(words)
|
||||
|
||||
# TODO: Interactive confirmation
|
||||
|
||||
# Derive verifying key (used as ID)
|
||||
verifying_key = _derive_umbral_key(material=secret, info=_VERIFYING_INFO)
|
||||
kid = verifying_key.to_bytes().hex()[:cls._ID_LENGTH]
|
||||
|
||||
# Wrap secret
|
||||
salt = token_bytes(_SALT_SIZE)
|
||||
wrapping_key = _derive_wrapping_key(salt=salt, password=password)
|
||||
encrypted_secret = bytes(SecretBox(wrapping_key).encrypt(secret))
|
||||
|
||||
# Create keystore file
|
||||
keystore_path = generate_keystore_filepath(parent=keystore_dir, id=kid)
|
||||
keystore_payload = _assemble_keystore(encrypted_secret=encrypted_secret, salt=salt)
|
||||
os.makedirs(abspath(keystore_dir), exist_ok=True, mode=0o700)
|
||||
_write_keystore(path=keystore_path,
|
||||
payload=keystore_payload,
|
||||
serializer=_serialize_keystore)
|
||||
|
||||
keystore = cls(keystore_path=keystore_path)
|
||||
return keystore
|
||||
|
||||
def generate_tls_certificate(self, host: str, pseudonym: str, path: Path) -> Path:
|
||||
private_key, cert = _generate_tls_keys(host=host, checksum_address=pseudonym, curve=_TLS_CURVE)
|
||||
certificate_filepath = _write_tls_certificate(full_filepath=path, certificate=cert)
|
||||
return certificate_filepath
|
||||
|
|
|
@ -19,6 +19,7 @@ from collections import defaultdict
|
|||
|
||||
import lmdb
|
||||
import pytest
|
||||
from eth_utils.crypto import keccak
|
||||
|
||||
from nucypher.characters.control.emitters import WebEmitter
|
||||
from nucypher.crypto.powers import TransactingPower
|
||||
|
@ -57,13 +58,10 @@ def __very_pretty_and_insecure_scrypt_do_not_use(request):
|
|||
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
|
||||
original_derivation_function = Scrypt.derive
|
||||
|
||||
# One-Time Insecure Password
|
||||
insecure_password = bytes(INSECURE_DEVELOPMENT_PASSWORD, encoding='utf8')
|
||||
|
||||
# Patch Method
|
||||
def __insecure_derive(*args, **kwargs):
|
||||
def __insecure_derive(_scrypt, key_material: bytes):
|
||||
"""Temporarily replaces Scrypt.derive for mocking"""
|
||||
return insecure_password
|
||||
return keccak(key_material)
|
||||
|
||||
# Disable Scrypt KDF
|
||||
Scrypt.derive = __insecure_derive
|
||||
|
|
|
@ -15,12 +15,25 @@
|
|||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from constant_sorrow.constants import KEYRING_LOCKED
|
||||
from cryptography.hazmat.primitives.serialization.base import Encoding
|
||||
from mnemonic.mnemonic import Mnemonic
|
||||
from umbral.keys import UmbralKeyingMaterial
|
||||
|
||||
from nucypher.crypto.keystore import Keystore, InvalidPassword
|
||||
from nucypher.crypto.keystore import (
|
||||
Keystore,
|
||||
InvalidPassword,
|
||||
validate_keystore_filename,
|
||||
_ID_SIZE,
|
||||
_MNEMONIC_LANGUAGE, _derive_umbral_key, _DELEGATING_INFO
|
||||
)
|
||||
from nucypher.crypto.keystore import (
|
||||
_assemble_keystore,
|
||||
_serialize_keystore,
|
||||
|
@ -28,68 +41,84 @@ from nucypher.crypto.keystore import (
|
|||
_write_keystore,
|
||||
_read_keystore
|
||||
)
|
||||
from nucypher.crypto.powers import DecryptingPower
|
||||
from nucypher.crypto.powers import DecryptingPower, SigningPower, DelegatingPower
|
||||
from nucypher.network.server import TLSHostingPower
|
||||
from nucypher.utilities.networking import LOOPBACK_ADDRESS
|
||||
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
|
||||
|
||||
def test_invalid_keystore_path(tmp_path):
|
||||
path = Path()
|
||||
def test_invalid_keystore_path_parts(tmp_path, tmp_path_factory):
|
||||
|
||||
# Setup
|
||||
not_hex = 'h' + ''.join(random.choice(string.ascii_letters) for _ in range(_ID_SIZE))
|
||||
invalid_paths = (
|
||||
'nosuffix', # missing suffix
|
||||
'deadbeef.priv', # missing created epoch
|
||||
f'123-{not_hex[:3]}.priv', # too short
|
||||
f'123-{not_hex}.priv', # not hex
|
||||
)
|
||||
|
||||
# Test
|
||||
for invalid_path in invalid_paths:
|
||||
invalid_path = Path(invalid_path)
|
||||
with pytest.raises(Keystore.Invalid, match=f'{invalid_path} is not a valid keystore filename'):
|
||||
validate_keystore_filename(path=invalid_path)
|
||||
|
||||
|
||||
def test_invalid_keystore_file_type(tmp_path, tmp_path_factory):
|
||||
|
||||
# Not a file
|
||||
invalid_path = Path()
|
||||
with pytest.raises(ValueError, match="Keystore path must be a file."):
|
||||
_keystore = Keystore(path)
|
||||
|
||||
path = Path(tmp_path)
|
||||
_keystore = Keystore(invalid_path)
|
||||
invalid_path = Path(tmp_path)
|
||||
with pytest.raises(ValueError, match="Keystore path must be a file."):
|
||||
_keystore = Keystore(path)
|
||||
_keystore = Keystore(invalid_path)
|
||||
|
||||
path = Path('does-not-exist')
|
||||
with pytest.raises(Keystore.NotFound, match=f"Keystore '{str(path)}' does not exist."):
|
||||
_keystore = Keystore(path)
|
||||
# Not an existing file
|
||||
invalid_path = Path('does-not-exist')
|
||||
with pytest.raises(Keystore.NotFound, match=f"Keystore '{str(invalid_path)}' does not exist."):
|
||||
_keystore = Keystore(invalid_path)
|
||||
|
||||
|
||||
def test_keystore_defaults(tmp_path_factory):
|
||||
def test_keystore_instantiation_defaults(tmp_path_factory):
|
||||
|
||||
# Setup
|
||||
parent = Path(tmp_path_factory.mktemp('test-keystore-'))
|
||||
parent.touch(exist_ok=True)
|
||||
path = parent / '123-deadbeef.priv'
|
||||
keystore_id = ''.join(random.choice(string.hexdigits.lower()) for _ in range(_ID_SIZE))
|
||||
path = parent / f'123-{keystore_id}.priv'
|
||||
path.touch()
|
||||
|
||||
# Test
|
||||
keystore = Keystore(path)
|
||||
assert keystore.keystore_path == path
|
||||
assert keystore.id == 'deadbeef'
|
||||
assert not keystore.is_unlocked
|
||||
assert keystore.keystore_path == path # retains the correct keystore path
|
||||
assert keystore.id == keystore_id # accurately parses filename for ID
|
||||
assert not keystore.is_unlocked # defaults to locked
|
||||
assert keystore._Keystore__secret is KEYRING_LOCKED
|
||||
assert parent in keystore.keystore_path.parents # created in the correct directory
|
||||
|
||||
|
||||
def test_keystore_generation_defaults(tmp_path_factory):
|
||||
|
||||
# Setup
|
||||
parent = Path(tmp_path_factory.mktemp('test-keystore-'))
|
||||
parent.touch(exist_ok=True)
|
||||
|
||||
# Test
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=parent)
|
||||
assert not keystore.is_unlocked # defaults to locked
|
||||
assert keystore._Keystore__secret is KEYRING_LOCKED
|
||||
assert parent in keystore.keystore_path.parents # created in the correct directory
|
||||
|
||||
|
||||
def test_keyring_invalid_password(tmpdir):
|
||||
with pytest.raises(InvalidPassword):
|
||||
_keystore = Keystore.generate(keystore_dir=tmpdir, password='short')
|
||||
|
||||
|
||||
def test_keyring_lock_unlock(tmpdir):
|
||||
keystore = Keystore.generate(keystore_dir=tmpdir, password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
|
||||
# locked by default
|
||||
assert not keystore.is_unlocked
|
||||
assert keystore._Keystore__secret is KEYRING_LOCKED
|
||||
|
||||
# unlock
|
||||
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
|
||||
assert keystore.is_unlocked
|
||||
assert keystore._Keystore__secret != KEYRING_LOCKED
|
||||
assert isinstance(keystore._Keystore__secret, bytes)
|
||||
|
||||
# unlock when already unlocked
|
||||
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
|
||||
assert keystore.is_unlocked
|
||||
|
||||
# lock
|
||||
keystore.lock()
|
||||
assert not keystore.is_unlocked
|
||||
|
||||
# lock when already locked
|
||||
keystore.lock()
|
||||
assert not keystore.is_unlocked
|
||||
_keystore = Keystore.generate('short', keystore_dir=tmpdir)
|
||||
|
||||
|
||||
def test_keyring_derive_crypto_power_without_unlock(tmpdir):
|
||||
keystore = Keystore.generate(keystore_dir=tmpdir, password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
with pytest.raises(Keystore.Locked):
|
||||
keystore.derive_crypto_power(power_class=DecryptingPower)
|
||||
|
||||
|
@ -103,7 +132,41 @@ def test_keystore_serializer():
|
|||
assert deserialized_key_data['salt'] == salt
|
||||
|
||||
|
||||
def test_write_read_private_keyfile(temp_dir_path):
|
||||
def test_keyring_lock_unlock(tmpdir):
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
|
||||
# locked by default
|
||||
assert not keystore.is_unlocked
|
||||
assert keystore._Keystore__secret is KEYRING_LOCKED
|
||||
|
||||
# incorrect password
|
||||
with pytest.raises(Keystore.AuthenticationFailed):
|
||||
keystore.unlock('opensaysme')
|
||||
|
||||
# unlock
|
||||
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
|
||||
assert keystore.is_unlocked
|
||||
assert keystore._Keystore__secret != KEYRING_LOCKED
|
||||
assert isinstance(keystore._Keystore__secret, bytes)
|
||||
|
||||
# unlock when already unlocked
|
||||
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
|
||||
assert keystore.is_unlocked
|
||||
|
||||
# incorrect password when already unlocked
|
||||
with pytest.raises(Keystore.AuthenticationFailed):
|
||||
keystore.unlock('opensaysme')
|
||||
|
||||
# lock
|
||||
keystore.lock()
|
||||
assert not keystore.is_unlocked
|
||||
|
||||
# lock when already locked
|
||||
keystore.lock()
|
||||
assert not keystore.is_unlocked
|
||||
|
||||
|
||||
def test_write_keystore_file(temp_dir_path):
|
||||
temp_filepath = Path(temp_dir_path) / "test_private_key_serialization_file"
|
||||
encrypted_secret, salt = b'peanuts! Get your peanuts!', b'sea salt'
|
||||
payload = _assemble_keystore(encrypted_secret=encrypted_secret, salt=salt)
|
||||
|
@ -113,60 +176,96 @@ def test_write_read_private_keyfile(temp_dir_path):
|
|||
assert deserialized_payload_from_file['salt'] == salt
|
||||
|
||||
|
||||
#
|
||||
# def test_keyring_restoration(tmpdir):
|
||||
# keyring = _generate_keyring(tmpdir)
|
||||
# keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
#
|
||||
# account = keyring.account
|
||||
# checksum_address = keyring.checksum_address
|
||||
# certificate_filepath = keyring.certificate_filepath
|
||||
# encrypting_public_key_hex = keyring.encrypting_public_key.hex()
|
||||
# signing_public_key_hex = keyring.signing_public_key.hex()
|
||||
#
|
||||
# # tls power
|
||||
# tls_hosting_power = keyring.derive_crypto_power(power_class=TLSHostingPower, host=LOOPBACK_ADDRESS)
|
||||
# tls_hosting_power_public_key_numbers = tls_hosting_power.public_key().public_numbers()
|
||||
# tls_hosting_power_certificate_public_bytes = \
|
||||
# tls_hosting_power.keypair.certificate.public_bytes(encoding=Encoding.PEM)
|
||||
# tls_hosting_power_certificate_filepath = tls_hosting_power.keypair.certificate_filepath
|
||||
#
|
||||
# # decrypting power
|
||||
# decrypting_power = keyring.derive_crypto_power(power_class=DecryptingPower)
|
||||
# decrypting_power_public_key_hex = decrypting_power.public_key().hex()
|
||||
# decrypting_power_fingerprint = decrypting_power.keypair.fingerprint()
|
||||
#
|
||||
# # signing power
|
||||
# signing_power = keyring.derive_crypto_power(power_class=SigningPower)
|
||||
# signing_power_public_key_hex = signing_power.public_key().hex()
|
||||
# signing_power_fingerprint = signing_power.keypair.fingerprint()
|
||||
#
|
||||
# # get rid of object, but not persistent data
|
||||
# del keyring
|
||||
#
|
||||
# restored_keyring = Keystore(keyring_root=tmpdir, account=account)
|
||||
# restored_keyring.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
#
|
||||
# assert restored_keyring.account == account
|
||||
# assert restored_keyring.checksum_address == checksum_address
|
||||
# assert restored_keyring.certificate_filepath == certificate_filepath
|
||||
# assert restored_keyring.encrypting_public_key.hex() == encrypting_public_key_hex
|
||||
# assert restored_keyring.signing_public_key.hex() == signing_public_key_hex
|
||||
#
|
||||
# # tls power
|
||||
# restored_tls_hosting_power = restored_keyring.derive_crypto_power(power_class=TLSHostingPower,
|
||||
# host=LOOPBACK_ADDRESS)
|
||||
# assert restored_tls_hosting_power.public_key().public_numbers() == tls_hosting_power_public_key_numbers
|
||||
# assert restored_tls_hosting_power.keypair.certificate.public_bytes(encoding=Encoding.PEM) == \
|
||||
# tls_hosting_power_certificate_public_bytes
|
||||
# assert restored_tls_hosting_power.keypair.certificate_filepath == tls_hosting_power_certificate_filepath
|
||||
#
|
||||
# # decrypting power
|
||||
# restored_decrypting_power = restored_keyring.derive_crypto_power(power_class=DecryptingPower)
|
||||
# assert restored_decrypting_power.public_key().hex() == decrypting_power_public_key_hex
|
||||
# assert restored_decrypting_power.keypair.fingerprint() == decrypting_power_fingerprint
|
||||
#
|
||||
# # signing power
|
||||
# restored_signing_power = restored_keyring.derive_crypto_power(power_class=SigningPower)
|
||||
# assert restored_signing_power.public_key().hex() == signing_power_public_key_hex
|
||||
# assert restored_signing_power.keypair.fingerprint() == signing_power_fingerprint
|
||||
def test_decrypt_keystore(tmpdir, mocker):
|
||||
|
||||
# Setup
|
||||
spy = mocker.spy(Mnemonic, 'generate')
|
||||
|
||||
# Decrypt post-generation
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
|
||||
words = spy.spy_return
|
||||
secret = mnemonic.to_entropy(words)
|
||||
assert keystore._Keystore__secret == secret
|
||||
|
||||
# Decrypt from keystore file
|
||||
keystore_path = keystore.keystore_path
|
||||
del words
|
||||
del keystore
|
||||
keystore = Keystore(keystore_path=keystore_path)
|
||||
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
|
||||
assert keystore._Keystore__secret == secret
|
||||
|
||||
|
||||
def test_keystore_persistence(tmpdir):
|
||||
"""Regression test for keystore file persistence"""
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
path = keystore.keystore_path
|
||||
del keystore
|
||||
assert path.exists()
|
||||
|
||||
|
||||
def test_restore_keystore_from_mnemonic(tmpdir, mocker):
|
||||
|
||||
# Setup
|
||||
spy = mocker.spy(Mnemonic, 'generate')
|
||||
|
||||
# Decrypt post-generation
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
|
||||
words = spy.spy_return
|
||||
secret = mnemonic.to_entropy(words)
|
||||
keystore_path = keystore.keystore_path
|
||||
|
||||
# remove local and disk references, simulating a
|
||||
# lost keystore or forgotten password.
|
||||
del keystore
|
||||
os.remove(keystore_path)
|
||||
|
||||
# prove the keystore is lost or missing
|
||||
assert not keystore_path.exists()
|
||||
with pytest.raises(Keystore.NotFound):
|
||||
_keystore = Keystore(keystore_path=keystore_path)
|
||||
|
||||
# Restore with user-supplied words and a new password
|
||||
keystore = Keystore.restore(words=words, password='ANewHope')
|
||||
keystore.unlock(password='ANewHope')
|
||||
assert keystore._Keystore__secret == secret
|
||||
|
||||
|
||||
def test_derive_signing_power(tmpdir):
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
signing_power = keystore.derive_crypto_power(power_class=SigningPower)
|
||||
assert signing_power.public_key().hex()
|
||||
assert signing_power.keypair.fingerprint()
|
||||
|
||||
|
||||
def test_derive_decrypting_power(tmpdir):
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
decrypting_power = keystore.derive_crypto_power(power_class=DecryptingPower)
|
||||
assert decrypting_power.public_key().hex()
|
||||
assert decrypting_power.keypair.fingerprint()
|
||||
|
||||
|
||||
def test_derive_delegating_power(tmpdir):
|
||||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
delegating_power = keystore.derive_crypto_power(power_class=DelegatingPower)
|
||||
private_key = _derive_umbral_key(info=_DELEGATING_INFO, material=keystore._Keystore__secret)
|
||||
keying_material = UmbralKeyingMaterial.from_bytes(private_key.to_bytes()).to_bytes()
|
||||
assert delegating_power._DelegatingPower__umbral_keying_material.to_bytes() == keying_material
|
||||
assert delegating_power._get_privkey_from_label(label=b'some-label')
|
||||
|
||||
|
||||
# def test_derive_hosting_power(tmpdir):
|
||||
# keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
# keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
# hosting_power = keystore.derive_crypto_power(power_class=TLSHostingPower, host=LOOPBACK_ADDRESS)
|
||||
# hosting_power_public_key_numbers = hosting_power.public_key().public_numbers()
|
||||
# hosting_power_certificate_public_bytes = hosting_power.keypair.certificate.public_bytes(encoding=Encoding.PEM)
|
||||
# tls_hosting_power_certificate_filepath = hosting_power.keypair.certificate_filepath
|
||||
|
|
Loading…
Reference in New Issue