Merge pull request #3538 from derekpierre/public-keys

Key Recovery and Auditing Tools
pull/3554/head
KPrasch 2024-08-20 11:41:37 +02:00 committed by derekpierre
commit d207b534bf
No known key found for this signature in database
12 changed files with 977 additions and 51 deletions

View File

@ -6,7 +6,7 @@ repos:
- id: tests
name: Run Nucypher Unit Tests
entry: scripts/hooks/run_unit_tests.sh
entry: scripts/run_unit_tests.sh
language: system
types: [python]
stages: [push] # required additional setup: pre-commit install && pre-commit install -t pre-push

View File

@ -0,0 +1 @@
Expands recovery CLI to include audit and keystore identification features

View File

@ -11,7 +11,6 @@ from nucypher.cli.literature import (
GENERIC_PASSWORD_PROMPT,
PASSWORD_COLLECTION_NOTICE,
)
from nucypher.config.base import CharacterConfiguration
from nucypher.config.constants import NUCYPHER_ENVVAR_KEYSTORE_PASSWORD
from nucypher.crypto.keystore import _WORD_COUNT, Keystore
from nucypher.utilities.emitters import StdoutEmitter
@ -36,7 +35,7 @@ def get_client_password(checksum_address: str, envvar: str = None, confirm: bool
return client_password
def unlock_signer_account(config: CharacterConfiguration, json_ipc: bool) -> None:
def unlock_signer_account(config, json_ipc: bool) -> None:
# TODO: Remove this block after deprecating 'operator_address'
from nucypher.config.characters import UrsulaConfiguration
@ -67,7 +66,11 @@ def get_nucypher_password(emitter, confirm: bool = False, envvar=NUCYPHER_ENVVAR
return keystore_password
def unlock_nucypher_keystore(emitter: StdoutEmitter, password: str, character_configuration: CharacterConfiguration) -> bool:
def unlock_nucypher_keystore(
emitter: StdoutEmitter,
password: str,
character_configuration,
) -> bool:
"""Unlocks a nucypher keystore and attaches it to the supplied configuration if successful."""
emitter.message(DECRYPTING_CHARACTER_KEYSTORE.format(name=character_configuration.NAME.capitalize()), color='yellow')
@ -80,15 +83,27 @@ def unlock_nucypher_keystore(emitter: StdoutEmitter, password: str, character_co
return True
def recover_keystore(emitter) -> None:
def collect_mnemonic(emitter: StdoutEmitter) -> str:
"""Collect nucypher mnemonic seed words interactively."""
while True:
__words = click.prompt("Enter nucypher keystore seed words")
word_count = len(__words.split())
if word_count != _WORD_COUNT:
emitter.message(
f"Invalid mnemonic - Number of words must be {str(_WORD_COUNT)}, but only got {word_count}"
)
continue
break
return __words
def recover_keystore(emitter) -> Keystore:
emitter.message('This procedure will recover your nucypher keystore from mnemonic seed words. '
'You will need to provide the entire mnemonic (space seperated) in the correct '
'order and choose a new password.', color='cyan')
click.confirm('Do you want to continue', abort=True)
__words = click.prompt("Enter nucypher keystore seed words")
word_count = len(__words.split())
if word_count != _WORD_COUNT:
emitter.message(f'Invalid mnemonic - Number of words must be {str(_WORD_COUNT)}, but only got {word_count}')
__words = collect_mnemonic(emitter)
__password = get_nucypher_password(emitter=emitter, confirm=True)
keystore = Keystore.restore(words=__words, password=__password)
emitter.message(f'Recovered nucypher keystore {keystore.id} to \n {keystore.keystore_path}', color='green')
return keystore

View File

@ -187,3 +187,14 @@ def perform_startup_ip_check(emitter: StdoutEmitter, ursula: Ursula, force: bool
raise click.Abort()
else:
emitter.message('✓ External IP matches configuration', 'green')
def update_config_keystore_path(keystore_path: Path, config_file: Path = None) -> None:
"""Update the ursula.json configuration file to use the provided keystore path."""
keystore_path = str(keystore_path.resolve())
with open(config_file, "r+") as f:
ursula_config = json.load(f)
ursula_config["keystore_path"] = keystore_path
f.seek(0)
json.dump(ursula_config, f, indent=2)
f.truncate() # rest of the file truncated

View File

@ -3,6 +3,7 @@ from pathlib import Path
import click
from nucypher.cli.actions.auth import (
collect_mnemonic,
get_client_password,
get_nucypher_password,
recover_keystore,
@ -13,6 +14,7 @@ from nucypher.cli.actions.configure import (
get_or_update_configuration,
handle_missing_configuration_file,
perform_startup_ip_check,
update_config_keystore_path,
)
from nucypher.cli.actions.migrate import migrate
from nucypher.cli.actions.select import (
@ -49,14 +51,22 @@ from nucypher.cli.options import (
option_teacher_uri,
)
from nucypher.cli.painting.help import paint_new_installation_help
from nucypher.cli.types import EIP55_CHECKSUM_ADDRESS, NETWORK_PORT, OPERATOR_IP
from nucypher.cli.types import (
EIP55_CHECKSUM_ADDRESS,
EXISTING_READABLE_FILE,
NETWORK_PORT,
OPERATOR_IP,
)
from nucypher.cli.utils import make_cli_character, setup_emitter
from nucypher.config.characters import UrsulaConfiguration
from nucypher.config.constants import (
DEFAULT_CONFIG_FILEPATH,
NUCYPHER_ENVVAR_OPERATOR_ETH_PASSWORD,
TEMPORARY_DOMAIN_NAME,
)
from nucypher.crypto.keystore import Keystore
from nucypher.crypto.powers import RitualisticPower
from nucypher.utilities.emitters import StdoutEmitter
from nucypher.utilities.prometheus.metrics import PrometheusMetricsConfig
@ -154,7 +164,7 @@ class UrsulaConfigOptions:
# TODO: Exit codes (not only for this, but for other exceptions)
return click.get_current_context().exit(1)
def generate_config(self, emitter, config_root, force, key_material):
def generate_config(self, emitter, config_root, force, key_material, with_mnemonic):
if self.dev:
raise RuntimeError(
@ -183,6 +193,7 @@ class UrsulaConfigOptions:
return UrsulaConfiguration.generate(
password=get_nucypher_password(emitter=emitter, confirm=True),
key_material=bytes.fromhex(key_material) if key_material else None,
with_mnemonic=with_mnemonic,
config_root=config_root,
rest_host=self.rest_host,
rest_port=self.rest_port,
@ -313,7 +324,14 @@ def ursula():
@option_config_root
@group_general_config
@option_key_material
def init(general_config, config_options, force, config_root, key_material):
@click.option(
"--with-mnemonic",
help="Initialize with a mnemonic phrase instead of generating a new keypair from scratch",
is_flag=True,
)
def init(
general_config, config_options, force, config_root, key_material, with_mnemonic
):
"""Create a new Ursula node configuration."""
emitter = setup_emitter(general_config, config_options.operator_address)
_pre_launch_warnings(emitter, dev=None, force=force)
@ -339,16 +357,14 @@ def init(general_config, config_options, force, config_root, key_material):
)
return click.get_current_context().exit(1)
click.clear()
emitter.echo(
"Hello Operator, welcome on board :-) \n\n"
"NOTE: Initializing a new Ursula node configuration is a one-time operation\n"
"for the lifetime of your node. This is a two-step process:\n\n"
"1. Creating a password to encrypt your operator keys\n"
"2. Securing a taco node seed phase\n\n"
"Please follow the prompts.",
color="cyan",
)
if key_material and with_mnemonic:
raise click.BadOptionUsage(
"--key-material",
message=click.style(
"--key-material is incompatible with --with-mnemonic",
fg="red",
),
)
if not config_options.eth_endpoint:
raise click.BadOptionUsage(
@ -365,13 +381,39 @@ def init(general_config, config_options, force, config_root, key_material):
fg="red",
),
)
click.clear()
if with_mnemonic:
emitter.echo(
"Hello Operator, welcome :-) \n\n"
"You are about to initialize a new Ursula node configuration using an existing mnemonic phrase.\n"
"Have your mnemonic phrase ready and ensure you are in a secure environment.\n"
"Please follow the prompts.",
color="cyan",
)
else:
emitter.echo(
"Hello Operator, welcome on board :-) \n\n"
"NOTE: Initializing a new Ursula node configuration is a one-time operation\n"
"for the lifetime of your node. This is a two-step process:\n\n"
"1. Creating a password to encrypt your operator keys\n"
"2. Securing a taco node seed phase\n\n"
"Please follow the prompts.",
color="cyan",
)
if not config_options.domain:
config_options.domain = select_domain(
emitter,
message="Select TACo Domain",
)
ursula_config = config_options.generate_config(
emitter=emitter, config_root=config_root, force=force, key_material=key_material
emitter=emitter,
config_root=config_root,
force=force,
key_material=key_material,
with_mnemonic=with_mnemonic,
)
filepath = ursula_config.to_configuration_file()
paint_new_installation_help(
@ -380,13 +422,107 @@ def init(general_config, config_options, force, config_root, key_material):
@ursula.command()
@group_config_options
@group_general_config
def recover(general_config, config_options):
# TODO: Combine with work in PR #2682
# TODO: Integrate regeneration of configuration files
emitter = setup_emitter(general_config, config_options.operator_address)
recover_keystore(emitter=emitter)
@option_config_file
@click.option(
"--keystore-filepath",
help="Path to keystore .priv file",
type=EXISTING_READABLE_FILE,
required=False,
)
@click.option(
"--view-mnemonic",
help="View mnemonic seed words",
is_flag=True,
)
def audit(config_file, keystore_filepath, view_mnemonic):
"""Audit a mnemonic phrase against a local keystore or view mnemonic seed words."""
emitter = StdoutEmitter()
if keystore_filepath and config_file:
raise click.BadOptionUsage(
"--keystore-filepath",
message=click.style(
"--keystore-filepath is incompatible with --config-file",
fg="red",
),
)
if keystore_filepath:
keystore = Keystore(keystore_filepath)
else:
config_file = config_file or DEFAULT_CONFIG_FILEPATH
if not config_file.exists():
emitter.error(
f"Ursula configuration file not found - {config_file.resolve()}"
)
raise click.Abort()
ursula_config = UrsulaConfiguration.from_configuration_file(
filepath=config_file
)
keystore = ursula_config.keystore
password = get_nucypher_password(emitter=emitter, confirm=False)
try:
keystore.unlock(password=password)
except Keystore.AuthenticationFailed:
emitter.error("Password is incorrect.")
raise click.Abort()
emitter.message("Password is correct.", color="green")
if view_mnemonic:
mnemonic = keystore.get_mnemonic()
emitter.message(f"\n{mnemonic}", color="cyan")
return
try:
correct = keystore.audit(words=collect_mnemonic(emitter), password=password)
except Keystore.InvalidMnemonic:
correct = False
if not correct:
emitter.message("Mnemonic is incorrect.", color="red")
raise click.Abort()
emitter.message("Mnemonic is correct.", color="green")
@ursula.command()
@option_config_file
@click.option(
"--keystore-filepath",
help="Path to keystore .priv file Ursula should use",
type=EXISTING_READABLE_FILE,
required=False,
)
def recover(config_file, keystore_filepath):
emitter = StdoutEmitter()
config_file = config_file or DEFAULT_CONFIG_FILEPATH
if not config_file.exists():
emitter.error(f"Ursula configuration file not found - {config_file.resolve()}")
raise click.Abort()
if keystore_filepath:
# use available file
keystore = Keystore(keystore_filepath)
# ensure that the password for the keystore file is known
password = get_nucypher_password(emitter=emitter, confirm=False)
try:
keystore.unlock(password=password)
except Keystore.AuthenticationFailed:
emitter.error("Password is incorrect.")
raise click.Abort()
else:
# recovery keystore using user-provided mnemonic
keystore = recover_keystore(emitter=emitter)
update_config_keystore_path(
keystore_path=keystore.keystore_path, config_file=config_file
)
emitter.message(
f"Updated {config_file} to use keystore filepath: {keystore.keystore_path.resolve()}",
color="green",
)
@ursula.command()
@ -402,6 +538,50 @@ def destroy(general_config, config_options, config_file, force):
destroy_configuration(emitter, character_config=ursula_config, force=force)
@ursula.command()
@option_config_file
@click.option(
"--keystore-filepath",
help="Path to keystore .priv file",
type=EXISTING_READABLE_FILE,
)
@click.option(
"--from-mnemonic",
help="View TACo public keys from mnemonic seed words",
is_flag=True,
)
def public_keys(config_file, keystore_filepath, from_mnemonic):
"""Display the public keys of a keystore."""
emitter = StdoutEmitter()
if sum(1 for i in (keystore_filepath, config_file, from_mnemonic) if i) > 1:
raise click.BadOptionUsage(
"--keystore-filepath",
message=click.style(
"Exactly one of --keystore-filepath, --config-file, or --from-mnemonic must be specified",
fg="red",
),
)
if from_mnemonic:
keystore = Keystore.from_mnemonic(collect_mnemonic(emitter))
else:
keystore_path_to_use = keystore_filepath
if not keystore_path_to_use:
config_file = config_file or DEFAULT_CONFIG_FILEPATH
ursula_config = UrsulaConfiguration.from_configuration_file(
filepath=config_file
)
keystore_path_to_use = ursula_config.keystore.keystore_path
keystore = Keystore(keystore_path_to_use)
keystore.unlock(get_nucypher_password(emitter=emitter, confirm=False))
ritualistic_power = keystore.derive_crypto_power(RitualisticPower)
ferveo_public_key = bytes(ritualistic_power.public_key()).hex()
emitter.message(f"\nFerveo Public Key: {ferveo_public_key}", color="cyan")
@ursula.command()
@group_character_options
@option_config_file
@ -508,7 +688,7 @@ def config(general_config, config_options, config_file, force, action):
emitter.error(
"--config-file <FILEPATH> is required to run a configuration file migration."
)
return click.Abort()
raise click.Abort()
config_file = select_config_file(
emitter=emitter,
checksum_address=config_options.operator_address,

View File

@ -48,9 +48,7 @@ Path to Logs: {USER_LOG_DIR}
- Never share secret keys with anyone!
- Backup your keystore! Character keys are required to interact with the protocol!
- Remember your password! Without the password, it's impossible to decrypt the key!
"""
- Remember your password! Without the password, it's impossible to decrypt the key!"""
)
if character_name == 'ursula':

View File

@ -24,6 +24,7 @@ from nucypher.blockchain.eth.registry import (
)
from nucypher.blockchain.eth.signers import Signer
from nucypher.characters.lawful import Ursula
from nucypher.cli.actions.auth import collect_mnemonic
from nucypher.config import constants
from nucypher.config.util import cast_paths_from
from nucypher.crypto.keystore import Keystore
@ -583,18 +584,29 @@ class CharacterConfiguration(BaseConfiguration):
Warning: This method allows mutation and may result in an inconsistent configuration.
"""
# config file should exist and we we override -> no need for modifier
# config file should exist and we override -> no need for modifier
return super().update(filepath=self.config_file_location, **kwargs)
@classmethod
def generate(
cls, password: str, key_material: Optional[bytes] = None, *args, **kwargs
cls,
password: str,
key_material: Optional[bytes] = None,
with_mnemonic: bool = False,
*args,
**kwargs,
):
"""
Generates local directories, private keys, and initial configuration for a new node.
"""
"""Generates local directories, private keys, and initial configuration for a new node."""
node_config = cls(dev_mode=False, *args, **kwargs)
node_config.initialize(key_material=key_material, password=password)
if key_material and with_mnemonic:
raise ValueError(
"Cannot provide key_material and with_mnemonic simultaneously"
)
node_config.initialize(
key_material=key_material,
with_mnemonic=with_mnemonic,
password=password,
)
node_config.keystore.unlock(password)
return node_config
@ -789,9 +801,19 @@ class CharacterConfiguration(BaseConfiguration):
power_ups.append(power_up)
return power_ups
def initialize(self, password: str, key_material: Optional[bytes] = None) -> Path:
def initialize(
self,
password: str,
key_material: Optional[bytes] = None,
with_mnemonic: bool = False,
) -> Path:
"""Initialize a new configuration and write installation files to disk."""
if key_material and with_mnemonic:
raise ValueError(
"Cannot provide key_material and with_mnemonic simultaneously"
)
# Development
if self.dev_mode:
self.__temp_dir = TemporaryDirectory(
@ -806,6 +828,7 @@ class CharacterConfiguration(BaseConfiguration):
key_material=key_material,
password=password,
interactive=self.MNEMONIC_KEYSTORE,
with_mnemonic=with_mnemonic,
)
self._cache_runtime_filepaths()
@ -824,13 +847,26 @@ class CharacterConfiguration(BaseConfiguration):
password: str,
key_material: Optional[bytes] = None,
interactive: bool = True,
with_mnemonic: bool = False,
) -> Keystore:
if key_material and with_mnemonic:
raise ValueError(
"Cannot provide key_material and with_mnemonic simultaneously"
)
if key_material:
self.__keystore = Keystore.import_secure(
key_material=key_material,
password=password,
keystore_dir=self.keystore_dir,
)
elif with_mnemonic:
self.__keystore = Keystore.restore(
password=password,
keystore_dir=self.keystore_dir,
words=collect_mnemonic(self.emitter),
)
else:
if interactive:
self.__keystore = Keystore.generate(

View File

@ -24,6 +24,7 @@ BASE_DIR = NUCYPHER_PACKAGE.parent.resolve()
# User Application Filepaths
APP_DIR = AppDirs(nucypher.__title__, nucypher.__author__)
DEFAULT_CONFIG_ROOT = Path(os.getenv('NUCYPHER_CONFIG_ROOT', default=APP_DIR.user_data_dir))
DEFAULT_CONFIG_FILEPATH = DEFAULT_CONFIG_ROOT / "ursula.json"
USER_LOG_DIR = Path(os.getenv('NUCYPHER_USER_LOG_DIR', default=APP_DIR.user_log_dir))
DEFAULT_LOG_FILENAME = "nucypher.log"
DEFAULT_JSON_LOG_FILENAME = "nucypher.json"

View File

@ -7,7 +7,7 @@ from json import JSONDecodeError
from os.path import abspath
from pathlib import Path
from secrets import token_bytes
from typing import Callable, ClassVar, Dict, List, Optional, Tuple, Union
from typing import Callable, Dict, List, Optional, Tuple, Type, Union
import click
from constant_sorrow.constants import KEYSTORE_LOCKED
@ -253,12 +253,34 @@ class Keystore:
class AuthenticationFailed(RuntimeError):
pass
def __init__(self, keystore_path: Path):
self.keystore_path = keystore_path
self.__created, self.__id = _parse_path(keystore_path)
class InvalidMnemonic(ValueError):
pass
def __init__(self, keystore_path: Path = None):
self.__secret = KEYSTORE_LOCKED
self.keystore_path = keystore_path
if self.keystore_path:
self.__created, self.__id = _parse_path(keystore_path)
@classmethod
def from_keystore_id(cls, filepath: Path) -> "Keystore":
instance = cls(keystore_path=filepath)
return instance
@classmethod
def from_mnemonic(cls, words: str) -> "Keystore":
__mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
__secret = bytes(__mnemonic.to_entropy(words))
keystore = cls()
keystore.__secret = __secret
return keystore
def __decrypt_keystore(self, path: Path, password: str) -> bool:
if not self.keystore_path:
raise Keystore.Invalid(
"Keystore path not set, initialize with a valid path."
)
payload = _read_keystore(path, deserializer=_deserialize_keystore)
__password_material = derive_key_material_from_password(password=password.encode(),
salt=payload['password_salt'])
@ -346,12 +368,32 @@ class Keystore:
@classmethod
def restore(cls, words: str, password: str, keystore_dir: Optional[Path] = None) -> 'Keystore':
"""Restore a keystore from seed words"""
__mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
__secret = bytes(__mnemonic.to_entropy(words))
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
__secret = bytes(mnemonic.to_entropy(words))
path = Keystore.__save(secret=__secret, password=password, keystore_dir=keystore_dir)
keystore = cls(keystore_path=path)
return keystore
def audit(self, words: str, password: str) -> bool:
"""Check if a mnemonic phrase can derive the secret key for the local keystore"""
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
try:
__expected_secret = bytes(mnemonic.to_entropy(words))
except ValueError as e:
raise self.InvalidMnemonic(str(e))
self.__decrypt_keystore(path=self.keystore_path, password=password)
return self.__secret == __expected_secret
def get_mnemonic(self) -> str:
"""Return the mnemonic phrase for the keystore"""
if self.__secret is KEYSTORE_LOCKED:
raise Keystore.Locked(
f"{self.id} is locked and must be unlocked before use."
)
mnemonic = Mnemonic(_MNEMONIC_LANGUAGE)
__words = mnemonic.to_mnemonic(self.__secret)
return __words
@classmethod
def generate(
cls, password: str,
@ -439,10 +481,9 @@ class Keystore:
def unlock(self, password: str) -> None:
self.__decrypt_keystore(path=self.keystore_path, password=password)
def derive_crypto_power(self,
power_class: ClassVar[CryptoPowerUp],
*power_args, **power_kwargs
) -> Union[KeyPairBasedPower, DerivedKeyBasedPower]:
def derive_crypto_power(
self, power_class: Type[CryptoPowerUp], *power_args, **power_kwargs
) -> Union[KeyPairBasedPower, DerivedKeyBasedPower]:
if not self.is_unlocked:
raise Keystore.Locked(f"{self.id} is locked and must be unlocked before use.")

View File

@ -0,0 +1,259 @@
import json
from mnemonic import Mnemonic
from nucypher.cli.main import nucypher_cli
from nucypher.crypto.keystore import Keystore
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
def setup_ursula_config_file(custom_filepath, ursula_test_config, config_filename):
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
# update keystore path in config file
ursula_config_file = custom_filepath / config_filename
config_values = json.loads(ursula_test_config.serialize())
config_values["keystore_path"] = str(keystore.keystore_path.resolve())
with open(ursula_config_file, "w") as f:
json.dump(config_values, f)
return keystore, ursula_config_file
def test_ursula_audit_config_file_incorrect_password(
click_runner, ursula_test_config, custom_filepath
):
keystore, ursula_config_file = setup_ursula_config_file(
custom_filepath, ursula_test_config, "ursula-audit-incorrect-password.json"
)
audit_args = (
"ursula",
"audit",
"--config-file",
str(ursula_config_file.resolve()),
)
user_input = "yadda,yadda,yadda\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code != 0, result.output
assert "Password is incorrect" in result.output
assert "Mnemonic" not in result.output
def test_ursula_audit_config_file_incorrect_mnemonic(
click_runner, ursula_test_config, custom_filepath
):
keystore, ursula_config_file = setup_ursula_config_file(
custom_filepath, ursula_test_config, "ursula-audit-incorrect-mnemonic.json"
)
mnemonic = keystore.get_mnemonic()
audit_args = (
"ursula",
"audit",
"--config-file",
str(ursula_config_file.resolve()),
)
incorrect_mnemonic = Mnemonic("english").generate(256)
assert incorrect_mnemonic != mnemonic
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n" + f"{incorrect_mnemonic}\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code != 0, result.output
assert "Password is correct" in result.output
assert "Mnemonic is incorrect" in result.output
def test_ursula_audit_specific_config_file(
click_runner,
custom_filepath,
ursula_test_config,
):
keystore, ursula_config_file = setup_ursula_config_file(
custom_filepath, ursula_test_config, "ursula-audit.json"
)
mnemonic = keystore.get_mnemonic()
# specific config file
audit_args = (
"ursula",
"audit",
"--config-file",
str(ursula_config_file.resolve()),
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n" + f"{mnemonic}\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert "Password is correct" in result.output
assert "Mnemonic is correct" in result.output
def test_ursula_audit_default_config_file(
click_runner, mocker, ursula_test_config, custom_filepath
):
keystore, ursula_config_file = setup_ursula_config_file(
custom_filepath, ursula_test_config, "ursula-audit-default.json"
)
mocker.patch(
"nucypher.cli.commands.ursula.DEFAULT_CONFIG_FILEPATH", ursula_config_file
)
mnemonic = keystore.get_mnemonic()
# default config file
audit_args = (
"ursula",
"audit",
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n" + f"{mnemonic}\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert "Password is correct" in result.output
assert "Mnemonic is correct" in result.output
def test_ursula_audit_view_mnemonic_config_file(
click_runner, ursula_test_config, custom_filepath
):
keystore, ursula_config_file = setup_ursula_config_file(
custom_filepath, ursula_test_config, "ursula-audit-view-mnemonic.json"
)
mnemonic = keystore.get_mnemonic()
# view mnemonic
audit_args = (
"ursula",
"audit",
"--config-file",
str(ursula_config_file.resolve()),
"--view-mnemonic",
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert "Password is correct" in result.output
assert mnemonic in result.output
def test_ursula_audit_keystore_file_incorrect_password(click_runner, custom_filepath):
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
audit_args = (
"ursula",
"audit",
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
)
user_input = "yadda,yadda,yadda\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code != 0, result.output
assert "Password is incorrect" in result.output
assert "Mnemonic" not in result.output
def test_ursula_audit_keystore_file_incorrect_mnemonic(click_runner, custom_filepath):
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
mnemonic = keystore.get_mnemonic()
audit_args = (
"ursula",
"audit",
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
)
incorrect_mnemonic = Mnemonic("english").generate(256)
assert incorrect_mnemonic != mnemonic
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n" + f"{incorrect_mnemonic}\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code != 0, result.output
assert "Password is correct" in result.output
assert "Mnemonic is incorrect" in result.output
def test_ursula_audit_keystore_file(click_runner, custom_filepath):
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
mnemonic = keystore.get_mnemonic()
audit_args = (
"ursula",
"audit",
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n" + f"{mnemonic}\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert "Password is correct" in result.output
assert "Mnemonic is correct" in result.output
def test_ursula_audit_view_mnemonic_keystore_file(click_runner, custom_filepath):
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
mnemonic = keystore.get_mnemonic()
audit_args = (
"ursula",
"audit",
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
"--view-mnemonic",
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
result = click_runner.invoke(
nucypher_cli, audit_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert "Password is correct" in result.output
assert mnemonic in result.output

View File

@ -0,0 +1,152 @@
import json
from nucypher.cli.main import nucypher_cli
from nucypher.crypto.keystore import Keystore
from nucypher.crypto.powers import RitualisticPower
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
def test_ursula_public_keys_invalid(click_runner, ursula_test_config, custom_filepath):
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
ursula_config_file = custom_filepath / "ursula-test.json"
ursula_test_config._write_configuration_file(filepath=ursula_config_file)
expected_error = "Exactly one of --keystore-filepath, --config-file, or --from-mnemonic must be specified"
# config-file and keystore-filepath
public_keys_args = (
"ursula",
"public-keys",
"--config-file",
str(ursula_config_file.resolve()),
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
)
result = click_runner.invoke(nucypher_cli, public_keys_args, catch_exceptions=False)
assert result.exit_code != 0
assert expected_error in result.output
# config-file and from-mnemonic
public_keys_args = (
"ursula",
"public-keys",
"--config-file",
str(ursula_config_file.resolve()),
"--from-mnemonic",
)
result = click_runner.invoke(nucypher_cli, public_keys_args, catch_exceptions=False)
assert result.exit_code != 0
assert expected_error in result.output
# keystore-filepath and from-mnemonic
public_keys_args = (
"ursula",
"public-keys",
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
"--from-mnemonic",
)
result = click_runner.invoke(nucypher_cli, public_keys_args, catch_exceptions=False)
assert result.exit_code != 0
assert expected_error in result.output
# all 3 values - config-file, keystore-filepath, from-mnemonic
public_keys_args = (
"ursula",
"public-keys",
"--config-file",
str(ursula_config_file.resolve()),
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
"--from-mnemonic",
)
result = click_runner.invoke(nucypher_cli, public_keys_args, catch_exceptions=False)
assert result.exit_code != 0
assert expected_error in result.output
def test_ursula_public_keys_derived_ferveo_key(
click_runner, mocker, ursula_test_config, custom_filepath
):
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
power = keystore.derive_crypto_power(RitualisticPower)
expected_public_key = power.public_key()
expected_public_key_hex = bytes(expected_public_key).hex()
public_keys_args = (
"ursula",
"public-keys",
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
result = click_runner.invoke(
nucypher_cli, public_keys_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert expected_public_key_hex in result.output
# using config file produces same key
config_values = json.loads(ursula_test_config.serialize())
config_values["keystore_path"] = str(keystore.keystore_path.resolve())
updated_config_file = custom_filepath / "updated-ursula.json"
with open(updated_config_file, "w") as f:
json.dump(config_values, f)
public_keys_args = (
"ursula",
"public-keys",
"--config-file",
str(updated_config_file.resolve()),
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
result = click_runner.invoke(
nucypher_cli, public_keys_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert expected_public_key_hex in result.output
# using default config filepath - no values provided
mocker.patch(
"nucypher.cli.commands.ursula.DEFAULT_CONFIG_FILEPATH", updated_config_file
)
public_keys_args = (
"ursula",
"public-keys",
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
result = click_runner.invoke(
nucypher_cli, public_keys_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert expected_public_key_hex in result.output
# using mnemonic produces same key
words = keystore.get_mnemonic()
public_keys_args = (
"ursula",
"public-keys",
"--from-mnemonic",
)
user_input = f"{words}\n"
result = click_runner.invoke(
nucypher_cli, public_keys_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0, result.output
assert expected_public_key_hex in result.output

View File

@ -0,0 +1,232 @@
from pathlib import Path
from nucypher.cli.main import nucypher_cli
from nucypher.config.characters import UrsulaConfiguration
from nucypher.crypto.keystore import Keystore
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD, YES_ENTER
def test_ursula_recover_invalid(click_runner, ursula_test_config, custom_filepath):
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
ursula_config_file = custom_filepath / "ursula-invalid-test.json"
# no config-file - so default attempted
recover_args = (
"ursula",
"recover",
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
)
result = click_runner.invoke(nucypher_cli, recover_args, catch_exceptions=False)
assert result.exit_code != 0
assert "Ursula configuration file not found" in result.output
# config-file does not exist
recover_args = (
"ursula",
"recover",
"--config-file",
str(ursula_config_file.resolve()),
)
result = click_runner.invoke(nucypher_cli, recover_args, catch_exceptions=False)
assert result.exit_code != 0
assert f"'{ursula_config_file.resolve()}' does not exist" in result.output
# create config file
ursula_test_config._write_configuration_file(filepath=ursula_config_file)
# keystore-filepath does not exist
non_existent_keystore_filepath = custom_filepath / "non_existent.priv"
recover_args = (
"ursula",
"recover",
"--config-file",
str(ursula_config_file.resolve()),
"--keystore-filepath",
str(non_existent_keystore_filepath.resolve()),
)
result = click_runner.invoke(nucypher_cli, recover_args, catch_exceptions=False)
assert result.exit_code != 0
assert (
f"'{non_existent_keystore_filepath.resolve()}' does not exist" in result.output
)
def test_ursula_recover_keystore_file(
click_runner, mocker, ursula_test_config, custom_filepath
):
#
# use specific config file
#
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
ursula_config_file = custom_filepath / "ursula-keystore-test.json"
ursula_test_config._write_configuration_file(filepath=ursula_config_file)
recover_args = (
"ursula",
"recover",
"--config-file",
str(ursula_config_file.resolve()),
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
result = click_runner.invoke(
nucypher_cli, recover_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0
assert str(keystore.keystore_path.resolve()) in result.output
updated_ursula_config = UrsulaConfiguration._read_configuration_file(
ursula_config_file
)
assert str(updated_ursula_config["keystore_path"].resolve()) == str(
keystore.keystore_path.resolve()
)
assert str(updated_ursula_config["keystore_path"].resolve()) in result.output
#
# use default config file
#
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
ursula_default_file = custom_filepath / "ursula-keystore-default.json"
ursula_test_config._write_configuration_file(filepath=ursula_default_file)
mocker.patch(
"nucypher.cli.commands.ursula.DEFAULT_CONFIG_FILEPATH", ursula_default_file
)
recover_args = (
"ursula",
"recover",
"--keystore-filepath",
str(keystore.keystore_path.resolve()),
)
user_input = f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
result = click_runner.invoke(
nucypher_cli, recover_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0
updated_default_ursula_config = UrsulaConfiguration._read_configuration_file(
ursula_default_file
)
assert (
str(updated_default_ursula_config["keystore_path"].resolve()) in result.output
)
assert str(updated_default_ursula_config["keystore_path"].resolve()) == str(
keystore.keystore_path.resolve()
)
def test_ursula_recover_mnemonic(
click_runner, mocker, ursula_test_config, custom_filepath
):
mocker.patch(
"nucypher.crypto.keystore.Keystore._DEFAULT_DIR", custom_filepath / "keystore"
)
#
# use specific config file
#
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
mnemonic = keystore.get_mnemonic()
ursula_config_file = custom_filepath / "ursula-mnemonic-test.json"
ursula_test_config._write_configuration_file(filepath=ursula_config_file)
# could be None
old_keystore_path = (
UrsulaConfiguration._read_configuration_file(ursula_config_file)[
"keystore_path"
]
or Path()
)
recover_args = (
"ursula",
"recover",
"--config-file",
str(ursula_config_file.resolve()),
)
user_input = (
YES_ENTER
+ f"{mnemonic}\n"
+ f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
+ f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
)
result = click_runner.invoke(
nucypher_cli, recover_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0
updated_ursula_config = UrsulaConfiguration._read_configuration_file(
ursula_config_file
)
assert str(updated_ursula_config["keystore_path"].resolve()) in result.output
assert str(updated_ursula_config["keystore_path"].resolve()) != str(
old_keystore_path.resolve()
)
#
# use default config file
#
keystore = Keystore.generate(
INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=custom_filepath
)
keystore.unlock(INSECURE_DEVELOPMENT_PASSWORD)
mnemonic = keystore.get_mnemonic()
ursula_default_file = custom_filepath / "ursula-mnemonic-default.json"
mocker.patch(
"nucypher.cli.commands.ursula.DEFAULT_CONFIG_FILEPATH", ursula_default_file
)
ursula_test_config._write_configuration_file(filepath=ursula_default_file)
# could be None
old_default_keystore_path = (
UrsulaConfiguration._read_configuration_file(ursula_default_file)[
"keystore_path"
]
or Path()
)
recover_args = (
"ursula",
"recover",
)
user_input = (
YES_ENTER
+ f"{mnemonic}\n"
+ f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
+ f"{INSECURE_DEVELOPMENT_PASSWORD}\n"
)
result = click_runner.invoke(
nucypher_cli, recover_args, input=user_input, catch_exceptions=False
)
assert result.exit_code == 0
updated_default_ursula_config = UrsulaConfiguration._read_configuration_file(
ursula_default_file
)
assert (
str(updated_default_ursula_config["keystore_path"].resolve()) in result.output
)
assert str(updated_default_ursula_config["keystore_path"].resolve()) != str(
old_default_keystore_path.resolve()
)