Removes Alice, Bob, and Card CLI

pull/2985/head
Kieran Prasch 2022-10-18 20:05:32 +02:00
parent 723cc00afc
commit ec07eee6d2
11 changed files with 0 additions and 2328 deletions

View File

@ -1,562 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import click
from nucypher.control.emitters import StdoutEmitter
from nucypher.characters.control.interfaces import AliceInterface
from nucypher.cli.actions.auth import get_nucypher_password
from nucypher.cli.actions.collect import collect_bob_public_keys, collect_policy_parameters
from nucypher.cli.actions.configure import (
destroy_configuration,
handle_missing_configuration_file,
get_or_update_configuration
)
from nucypher.cli.actions.confirm import confirm_staged_grant
from nucypher.cli.actions.select import select_client_account, select_config_file
from nucypher.cli.actions.validate import validate_grant_command
from nucypher.cli.commands.deploy import option_gas_strategy
from nucypher.cli.config import group_general_config
from nucypher.cli.options import (
group_options,
option_config_file,
option_config_root,
option_controller_port,
option_dev,
option_discovery_port,
option_dry_run,
option_federated_only,
option_force,
option_hw_wallet,
option_light,
option_middleware,
option_min_stake,
option_network,
option_shares,
option_poa,
option_eth_provider_uri,
option_registry_filepath,
option_signer_uri,
option_teacher_uri,
option_threshold,
option_lonely,
option_max_gas_price,
option_key_material, option_payment_method, option_payment_network, option_payment_provider
)
from nucypher.cli.painting.help import paint_new_installation_help
from nucypher.cli.painting.policies import paint_single_card
from nucypher.cli.types import EIP55_CHECKSUM_ADDRESS
from nucypher.cli.utils import make_cli_character, setup_emitter
from nucypher.config.characters import AliceConfiguration
from nucypher.config.constants import TEMPORARY_DOMAIN
from nucypher.crypto.keystore import Keystore
from nucypher.network.middleware import RestMiddleware
from nucypher.policy.identity import Card
option_pay_with = click.option('--pay-with', help="Run with a specified account", type=EIP55_CHECKSUM_ADDRESS)
option_duration = click.option('--duration', help="Policy payment periods", type=click.INT)
class AliceConfigOptions:
__option_name__ = 'config_options'
def __init__(self,
dev: bool,
network: str,
eth_provider_uri: str,
federated_only: bool,
discovery_port: int,
pay_with: str,
registry_filepath: Path,
middleware: RestMiddleware,
gas_strategy: str,
max_gas_price: int, # gwei
signer_uri: str,
lonely: bool,
payment_method: str,
payment_provider: str,
payment_network: str
):
self.dev = dev
self.domain = network
self.eth_provider_uri = eth_provider_uri
self.signer_uri = signer_uri
self.gas_strategy = gas_strategy
self.max_gas_price = max_gas_price
self.federated_only = federated_only
self.pay_with = pay_with
self.discovery_port = discovery_port
self.registry_filepath = registry_filepath
self.middleware = middleware
self.lonely = lonely
self.payment_method = payment_method
self.payment_provider = payment_provider
self.payment_network = payment_network
def create_config(self, emitter, config_file):
if self.dev:
# Can be None as well, meaning it is unset - no error in this case
if self.federated_only is False:
raise click.BadOptionUsage(
option_name="--federated-only",
message=click.style("--federated-only cannot be explicitly set to False when --dev is set", fg="red"))
return AliceConfiguration(
emitter=emitter,
dev_mode=True,
network_middleware=self.middleware,
domain=TEMPORARY_DOMAIN,
eth_provider_uri=self.eth_provider_uri,
signer_uri=self.signer_uri,
gas_strategy=self.gas_strategy,
max_gas_price=self.max_gas_price,
federated_only=True,
lonely=self.lonely,
payment_method=self.payment_method,
payment_provider=self.payment_provider,
payment_network=self.payment_network
)
else:
if not config_file:
config_file = select_config_file(emitter=emitter,
checksum_address=self.pay_with,
config_class=AliceConfiguration)
try:
return AliceConfiguration.from_configuration_file(
emitter=emitter,
dev_mode=False,
network_middleware=self.middleware,
domain=self.domain,
eth_provider_uri=self.eth_provider_uri,
signer_uri=self.signer_uri,
gas_strategy=self.gas_strategy,
max_gas_price=self.max_gas_price,
filepath=config_file,
rest_port=self.discovery_port,
checksum_address=self.pay_with,
registry_filepath=self.registry_filepath,
lonely=self.lonely,
payment_method=self.payment_method,
payment_provider=self.payment_provider,
payment_network=self.payment_network
)
except FileNotFoundError:
return handle_missing_configuration_file(
character_config_class=AliceConfiguration,
config_file=config_file
)
group_config_options = group_options(
AliceConfigOptions,
dev=option_dev,
network=option_network(),
eth_provider_uri=option_eth_provider_uri(),
signer_uri=option_signer_uri,
gas_strategy=option_gas_strategy,
max_gas_price=option_max_gas_price,
federated_only=option_federated_only,
discovery_port=option_discovery_port(),
pay_with=option_pay_with,
registry_filepath=option_registry_filepath,
middleware=option_middleware,
lonely=option_lonely,
payment_provider=option_payment_provider,
payment_network=option_payment_network,
payment_method=option_payment_method,
)
class AliceFullConfigOptions:
__option_name__ = 'full_config_options'
def __init__(self, config_options, poa: bool, light: bool, threshold: int, shares: int, duration: int):
self.config_options = config_options
self.poa = poa
self.light = light
self.threshold = threshold
self.shares = shares
self.duration = duration
def generate_config(self, emitter: StdoutEmitter, config_root: Path, key_material: str) -> AliceConfiguration:
opts = self.config_options
if opts.dev:
raise click.BadArgumentUsage("Cannot create a persistent development character")
if not opts.eth_provider_uri and not opts.federated_only:
raise click.BadOptionUsage(
option_name='--eth-provider',
message=click.style("--eth-provider is required to create a new decentralized alice.", fg="red"))
pay_with = opts.pay_with
if not pay_with and not opts.federated_only:
pay_with = select_client_account(emitter=emitter,
eth_provider_uri=opts.eth_provider_uri,
signer_uri=opts.signer_uri,
show_eth_balance=True,
network=opts.domain)
return AliceConfiguration.generate(
password=get_nucypher_password(emitter=emitter, confirm=True),
key_material=bytes.fromhex(key_material) if key_material else None,
config_root=config_root,
checksum_address=pay_with,
domain=opts.domain,
federated_only=opts.federated_only,
eth_provider_uri=opts.eth_provider_uri,
signer_uri=opts.signer_uri,
registry_filepath=opts.registry_filepath,
poa=self.poa,
light=self.light,
threshold=self.threshold,
shares=self.shares,
duration=self.duration,
payment_provider=opts.payment_provider,
payment_network=opts.payment_network,
payment_method=opts.payment_method,
)
def get_updates(self) -> dict:
opts = self.config_options
payload = dict(checksum_address=opts.pay_with,
domain=opts.domain,
federated_only=opts.federated_only,
eth_provider_uri=opts.eth_provider_uri,
signer_uri=opts.signer_uri,
registry_filepath=opts.registry_filepath,
poa=self.poa,
light=self.light,
threshold=self.threshold,
shares=self.shares,
duration=self.duration,
payment_provider=opts.payment_provider,
payment_network=opts.payment_network,
payment_method=opts.payment_method,
)
# Depends on defaults being set on Configuration classes, filtrates None values
updates = {k: v for k, v in payload.items() if v is not None}
return updates
group_full_config_options = group_options(
AliceFullConfigOptions,
config_options=group_config_options,
poa=option_poa,
light=option_light,
threshold=option_threshold,
shares=option_shares,
duration=option_duration
)
class AliceCharacterOptions:
__option_name__ = 'character_options'
def __init__(self, config_options: AliceConfigOptions, hw_wallet: bool, teacher_uri: str, min_stake: int):
self.config_options = config_options
self.hw_wallet = hw_wallet
self.teacher_uri = teacher_uri
self.min_stake = min_stake
def create_character(self, emitter, config_file, json_ipc, load_seednodes=True):
config = self.config_options.create_config(emitter, config_file)
try:
ALICE = make_cli_character(character_config=config,
emitter=emitter,
unlock_keystore=not config.dev_mode,
unlock_signer=not config.federated_only,
teacher_uri=self.teacher_uri,
min_stake=self.min_stake,
start_learning_now=load_seednodes,
lonely=self.config_options.lonely,
json_ipc=json_ipc)
return ALICE
except Keystore.AuthenticationFailed as e:
emitter.echo(str(e), color='red', bold=True)
click.get_current_context().exit(1)
group_character_options = group_options(
AliceCharacterOptions,
config_options=group_config_options,
hw_wallet=option_hw_wallet,
teacher_uri=option_teacher_uri,
min_stake=option_min_stake,
)
@click.group()
def alice():
""""Alice the Policy Authority" management commands."""
@alice.command()
@group_full_config_options
@option_config_root
@group_general_config
@option_key_material
def init(general_config, full_config_options, config_root, key_material):
"""Create a brand new persistent Alice."""
emitter = setup_emitter(general_config)
if not config_root:
config_root = general_config.config_root
new_alice_config = full_config_options.generate_config(emitter=emitter,
config_root=config_root,
key_material=key_material)
filepath = new_alice_config.to_configuration_file()
paint_new_installation_help(emitter, new_configuration=new_alice_config, filepath=filepath)
@alice.command()
@option_config_file
@group_general_config
@group_full_config_options
def config(general_config, config_file, full_config_options):
"""View and optionally update existing Alice's configuration."""
emitter = setup_emitter(general_config)
if not config_file:
config_file = select_config_file(emitter=emitter,
checksum_address=full_config_options.config_options.pay_with,
config_class=AliceConfiguration)
updates = full_config_options.get_updates()
get_or_update_configuration(emitter=emitter,
config_class=AliceConfiguration,
filepath=config_file,
updates=updates)
@alice.command()
@group_config_options
@option_config_file
@option_force
@group_general_config
def destroy(general_config, config_options, config_file, force):
"""Delete existing Alice's configuration."""
emitter = setup_emitter(general_config)
alice_config = config_options.create_config(emitter, config_file)
destroy_configuration(emitter, character_config=alice_config, force=force)
@alice.command()
@option_config_file
@option_controller_port(default=AliceConfiguration.DEFAULT_CONTROLLER_PORT)
@option_dry_run
@group_general_config
@group_character_options
def run(general_config, character_options, config_file, controller_port, dry_run):
"""Start Alice's web controller."""
# Setup
emitter = setup_emitter(general_config)
ALICE = character_options.create_character(emitter, config_file, general_config.json_ipc)
try:
# RPC
if general_config.json_ipc:
rpc_controller = ALICE.make_rpc_controller()
_transport = rpc_controller.make_control_transport()
rpc_controller.start()
return
# HTTP
else:
emitter.message(f"Alice Verifying Key {bytes(ALICE.stamp).hex()}", color="green", bold=True)
controller = ALICE.make_web_controller(crash_on_error=general_config.debug)
ALICE.log.info('Starting HTTP Character Web Controller')
emitter.message(f'Running HTTP Alice Controller at http://localhost:{controller_port}')
return controller.start(port=controller_port, dry_run=dry_run)
# Handle Crash
except Exception as e:
ALICE.log.critical(str(e))
emitter.message(f"{e.__class__.__name__} {e}", color='red', bold=True)
if general_config.debug:
raise # Crash :-(
@alice.command("public-keys")
@AliceInterface.connect_cli('public_keys')
@group_character_options
@option_config_file
@group_general_config
def public_keys(general_config, character_options, config_file):
"""Obtain Alice's public verification and encryption keys."""
emitter = setup_emitter(general_config)
ALICE = character_options.create_character(emitter, config_file, general_config.json_ipc, load_seednodes=False)
response = ALICE.controller.public_keys()
return response
@alice.command()
@group_character_options
@option_config_file
@group_general_config
@click.option('--nickname', help="Human-readable nickname / alias for a card", type=click.STRING, required=False)
def make_card(general_config, character_options, config_file, nickname):
"""Create a character card file for public key sharing"""
emitter = setup_emitter(general_config)
ALICE = character_options.create_character(emitter, config_file,
json_ipc=general_config.json_ipc,
load_seednodes=False)
card = Card.from_character(ALICE)
if nickname:
card.nickname = nickname
card.save(overwrite=True)
emitter.message(f"Saved new character card to {card.filepath}", color='green')
paint_single_card(card=card, emitter=emitter)
@alice.command('derive-policy-pubkey')
@AliceInterface.connect_cli('derive_policy_encrypting_key')
@group_character_options
@option_config_file
@group_general_config
def derive_policy_pubkey(general_config, label, character_options, config_file):
"""Get a policy public key from a policy label."""
emitter = setup_emitter(general_config)
ALICE = character_options.create_character(emitter,
config_file,
json_ipc=general_config.json_ipc,
load_seednodes=False)
return ALICE.controller.derive_policy_encrypting_key(label=label)
@alice.command()
@AliceInterface.connect_cli('grant')
@option_config_file
@group_general_config
@group_character_options
@option_force
@click.option('--bob', type=click.STRING, help="The card id or nickname of a stored Bob card.")
def grant(general_config,
bob,
bob_encrypting_key,
bob_verifying_key,
label,
value,
rate,
expiration,
threshold,
shares,
character_options,
config_file,
force):
"""Create and enact an access policy for Bob."""
# Setup
emitter = setup_emitter(general_config)
ALICE = character_options.create_character(
emitter=emitter,
config_file=config_file,
json_ipc=general_config.json_ipc
)
validate_grant_command(
emitter=emitter,
alice=ALICE,
force=force,
bob=bob,
label=label,
rate=rate,
value=value,
expiration=expiration,
bob_encrypting_key=bob_encrypting_key,
bob_verifying_key=bob_verifying_key
)
# Collect
bob_public_keys = collect_bob_public_keys(
emitter=emitter,
force=force,
card_identifier=bob,
bob_encrypting_key=bob_encrypting_key,
bob_verifying_key=bob_verifying_key
)
policy = collect_policy_parameters(
emitter=emitter,
alice=ALICE,
force=force,
bob_identifier=bob_public_keys.verifying_key[:8],
label=label,
threshold=threshold,
shares=shares,
rate=rate,
value=value,
expiration=expiration
)
grant_request = {
'bob_encrypting_key': bob_public_keys.encrypting_key,
'bob_verifying_key': bob_public_keys.verifying_key,
'label': policy.label,
'threshold': policy.threshold,
'shares': policy.shares,
'expiration': policy.expiration,
}
if not ALICE.federated_only:
# These values can be 0
if policy.value is not None:
grant_request['value'] = policy.value
elif policy.rate is not None:
grant_request['rate'] = policy.rate # in wei
# Grant
if not force and not general_config.json_ipc:
confirm_staged_grant(emitter=emitter,
grant_request=grant_request,
federated=ALICE.federated_only,
seconds_per_period=(None if ALICE.federated_only else ALICE.economics.seconds_per_period))
emitter.echo(f'Granting Access to {bob_public_keys.verifying_key[:8]}', color='yellow')
return ALICE.controller.grant(request=grant_request)
@alice.command()
@AliceInterface.connect_cli('revoke')
@group_character_options
@option_config_file
@group_general_config
def revoke(general_config, bob_verifying_key, label, character_options, config_file):
"""Revoke a policy."""
emitter = setup_emitter(general_config)
ALICE = character_options.create_character(emitter, config_file, general_config.json_ipc)
revoke_request = {'label': label, 'bob_verifying_key': bob_verifying_key}
return ALICE.controller.revoke(request=revoke_request)
@alice.command()
@AliceInterface.connect_cli('decrypt')
@group_character_options
@option_config_file
@group_general_config
def decrypt(general_config, label, message_kit, character_options, config_file):
"""Decrypt data encrypted under an Alice's policy public key."""
emitter = setup_emitter(general_config)
ALICE = character_options.create_character(emitter, config_file, general_config.json_ipc, load_seednodes=False)
request_data = {'label': label, 'message_kit': message_kit}
response = ALICE.controller.decrypt(request=request_data)
return response

View File

@ -1,420 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from base64 import b64decode
from pathlib import Path
import click
from nucypher.control.emitters import StdoutEmitter
from nucypher.characters.control.interfaces import BobInterface
from nucypher.characters.lawful import Alice
from nucypher.cli.actions.auth import get_nucypher_password
from nucypher.cli.actions.configure import (
destroy_configuration,
get_or_update_configuration,
handle_missing_configuration_file
)
from nucypher.cli.actions.select import select_client_account, select_config_file
from nucypher.cli.commands.deploy import option_gas_strategy
from nucypher.cli.config import group_general_config
from nucypher.cli.options import (
group_options,
option_alice_verifying_key,
option_checksum_address,
option_config_file,
option_config_root,
option_controller_port,
option_dev,
option_discovery_port,
option_dry_run,
option_federated_only,
option_force,
option_middleware,
option_min_stake,
option_network,
option_eth_provider_uri,
option_registry_filepath,
option_signer_uri,
option_teacher_uri,
option_lonely,
option_max_gas_price,
option_key_material
)
from nucypher.cli.painting.help import paint_new_installation_help
from nucypher.cli.painting.policies import paint_single_card
from nucypher.cli.utils import make_cli_character, setup_emitter
from nucypher.config.characters import BobConfiguration
from nucypher.config.constants import TEMPORARY_DOMAIN
from nucypher.crypto.powers import DecryptingPower
from nucypher.network.middleware import RestMiddleware
from nucypher.policy.identity import Card
class BobConfigOptions:
__option_name__ = 'config_options'
def __init__(self,
eth_provider_uri: str,
network: str,
registry_filepath: Path,
checksum_address: str,
discovery_port: int,
dev: bool,
middleware: RestMiddleware,
federated_only: bool,
gas_strategy: str,
max_gas_price: int,
signer_uri: str,
lonely: bool
):
self.eth_provider_uri = eth_provider_uri
self.signer_uri = signer_uri
self.gas_strategy = gas_strategy
self.max_gas_price = max_gas_price
self.domain = network
self.registry_filepath = registry_filepath
self.checksum_address = checksum_address
self.discovery_port = discovery_port
self.dev = dev
self.middleware = middleware
self.federated_only = federated_only
self.lonely = lonely
def create_config(self, emitter: StdoutEmitter, config_file: Path) -> BobConfiguration:
if self.dev:
return BobConfiguration(
emitter=emitter,
dev_mode=True,
domain=TEMPORARY_DOMAIN,
eth_provider_uri=self.eth_provider_uri,
gas_strategy=self.gas_strategy,
max_gas_price=self.max_gas_price,
signer_uri=self.signer_uri,
federated_only=True,
checksum_address=self.checksum_address,
network_middleware=self.middleware,
lonely=self.lonely
)
else:
if not config_file:
config_file = select_config_file(emitter=emitter,
checksum_address=self.checksum_address,
config_class=BobConfiguration)
try:
return BobConfiguration.from_configuration_file(
emitter=emitter,
filepath=config_file,
domain=self.domain,
checksum_address=self.checksum_address,
rest_port=self.discovery_port,
eth_provider_uri=self.eth_provider_uri,
signer_uri=self.signer_uri,
gas_strategy=self.gas_strategy,
max_gas_price=self.max_gas_price,
registry_filepath=self.registry_filepath,
network_middleware=self.middleware,
lonely=self.lonely
)
except FileNotFoundError:
handle_missing_configuration_file(character_config_class=BobConfiguration,
config_file=config_file)
def generate_config(self, emitter: StdoutEmitter, config_root: Path, key_material: str) -> BobConfiguration:
checksum_address = self.checksum_address
if not checksum_address and not self.federated_only:
checksum_address = select_client_account(emitter=emitter,
signer_uri=self.signer_uri,
eth_provider_uri=self.eth_provider_uri) # TODO: See #1888
return BobConfiguration.generate(
password=get_nucypher_password(emitter=emitter, confirm=True),
key_material=bytes.fromhex(key_material) if key_material else None,
config_root=config_root,
checksum_address=checksum_address,
domain=self.domain,
federated_only=self.federated_only,
registry_filepath=self.registry_filepath,
eth_provider_uri=self.eth_provider_uri,
signer_uri=self.signer_uri,
gas_strategy=self.gas_strategy,
max_gas_price=self.max_gas_price,
lonely=self.lonely
)
def get_updates(self) -> dict:
payload = dict(checksum_address=self.checksum_address,
domain=self.domain,
federated_only=self.federated_only,
registry_filepath=self.registry_filepath,
eth_provider_uri=self.eth_provider_uri,
signer_uri=self.signer_uri,
gas_strategy=self.gas_strategy,
max_gas_price=self.max_gas_price,
lonely=self.lonely
)
# Depends on defaults being set on Configuration classes, filtrates None values
updates = {k: v for k, v in payload.items() if v is not None}
return updates
group_config_options = group_options(
BobConfigOptions,
eth_provider_uri=option_eth_provider_uri(),
gas_strategy=option_gas_strategy,
max_gas_price=option_max_gas_price,
signer_uri=option_signer_uri,
network=option_network(),
registry_filepath=option_registry_filepath,
checksum_address=option_checksum_address,
discovery_port=option_discovery_port(),
dev=option_dev,
middleware=option_middleware,
federated_only=option_federated_only,
lonely=option_lonely,
)
class BobCharacterOptions:
__option_name__ = 'character_options'
def __init__(self, config_options: BobConfigOptions, teacher_uri: str, min_stake: int):
self.config_options = config_options
self.teacher_uri = teacher_uri
self.min_stake = min_stake
def create_character(self, emitter, config_file, json_ipc):
config = self.config_options.create_config(emitter, config_file)
BOB = make_cli_character(character_config=config,
emitter=emitter,
unlock_keystore=not self.config_options.dev,
unlock_signer=not config.federated_only and config.signer_uri,
teacher_uri=self.teacher_uri,
min_stake=self.min_stake,
json_ipc=json_ipc)
return BOB
group_character_options = group_options(
BobCharacterOptions,
config_options=group_config_options,
teacher_uri=option_teacher_uri,
min_stake=option_min_stake
)
@click.group()
def bob():
""""Bob management commands."""
@bob.command()
@group_config_options
@option_federated_only
@option_config_root
@group_general_config
@option_key_material
def init(general_config, config_options, config_root, key_material):
"""Create a brand new persistent Bob."""
emitter = setup_emitter(general_config)
if not config_root:
config_root = general_config.config_root
new_bob_config = config_options.generate_config(emitter=emitter,
config_root=config_root,
key_material=key_material)
filepath = new_bob_config.to_configuration_file()
paint_new_installation_help(emitter, new_configuration=new_bob_config, filepath=filepath)
@bob.command()
@group_character_options
@option_config_file
@option_controller_port(default=BobConfiguration.DEFAULT_CONTROLLER_PORT)
@option_dry_run
@group_general_config
def run(general_config, character_options, config_file, controller_port, dry_run):
"""Start Bob's controller."""
# Setup
emitter = setup_emitter(general_config)
BOB = character_options.create_character(emitter=emitter,
config_file=config_file,
json_ipc=general_config.json_ipc)
# RPC
if general_config.json_ipc:
rpc_controller = BOB.make_rpc_controller()
_transport = rpc_controller.make_control_transport()
rpc_controller.start()
return
# Echo Public Keys
emitter.message(f"Bob Verifying Key {bytes(BOB.stamp).hex()}", color='green', bold=True)
bob_encrypting_key = bytes(BOB.public_keys(DecryptingPower)).hex()
emitter.message(f"Bob Encrypting Key {bob_encrypting_key}", color="blue", bold=True)
# Start Controller
controller = BOB.make_web_controller(crash_on_error=general_config.debug)
BOB.log.info('Starting HTTP Character Web Controller')
return controller.start(port=controller_port, dry_run=dry_run)
@bob.command()
@option_config_file
@group_config_options
@group_general_config
def config(general_config, config_options, config_file):
"""View and optionally update existing Bob's configuration."""
emitter = setup_emitter(general_config)
if not config_file:
config_file = select_config_file(emitter=emitter,
checksum_address=config_options.checksum_address,
config_class=BobConfiguration)
updates = config_options.get_updates()
get_or_update_configuration(emitter=emitter,
config_class=BobConfiguration,
filepath=config_file,
updates=updates)
@bob.command()
@group_config_options
@option_config_file
@option_force
@group_general_config
def destroy(general_config, config_options, config_file, force):
"""Delete existing Bob's configuration."""
emitter = setup_emitter(general_config)
if config_options.dev:
message = "'nucypher bob destroy' cannot be used in --dev mode"
raise click.BadOptionUsage(option_name='--dev', message=click.style(message, fg="red"))
bob_config = config_options.create_config(emitter, config_file)
destroy_configuration(emitter, character_config=bob_config, force=force)
@bob.command(name='public-keys')
@group_character_options
@option_config_file
@BobInterface.connect_cli('public_keys')
@group_general_config
def public_keys(general_config, character_options, config_file):
"""Obtain Bob's public verification and encryption keys."""
emitter = setup_emitter(general_config)
BOB = character_options.create_character(emitter, config_file, json_ipc=general_config.json_ipc)
response = BOB.controller.public_keys()
return response
@bob.command()
@group_character_options
@option_config_file
@group_general_config
@click.option('--nickname', help="Human-readable nickname / alias for a card", type=click.STRING, required=False)
def make_card(general_config, character_options, config_file, nickname):
emitter = setup_emitter(general_config)
BOB = character_options.create_character(emitter, config_file, json_ipc=False)
card = Card.from_character(BOB)
if nickname:
card.nickname = nickname
card.save(overwrite=True)
emitter.message(f"Saved new character card to {card.filepath}", color='green')
paint_single_card(card=card, emitter=emitter)
@bob.command()
@group_character_options
@option_config_file
@group_general_config
@option_force
@BobInterface.connect_cli('retrieve_and_decrypt', exclude={'alice_verifying_key'}) # indicate that alice_verifying_key should be excluded as an option
@option_alice_verifying_key(required=False) # alice verifying key overridden to be not required, to allow Alice card to be possibly specified instead (#2115)
@click.option('--alice', type=click.STRING, help="The card id or nickname of a stored Alice card.")
@click.option('--ipfs', help="Download an encrypted message from IPFS at the specified gateway URI")
@click.option('--decode', help="Decode base64 plaintext messages", is_flag=True)
def retrieve_and_decrypt(general_config,
character_options,
config_file,
alice_verifying_key,
treasure_map,
message_kit,
ipfs,
alice,
decode,
force):
"""Obtain plaintext from encrypted data, if access was granted."""
# 'message_kit' is a required and a "multiple" value click option - the option name was kept singular so that
# it makes sense when specifying many of them i.e. `--message-kit <message_kit_1> --message-kit <message_kit_2> ...`
message_kits = list(message_kit)
# Setup
emitter = setup_emitter(general_config)
BOB = character_options.create_character(emitter, config_file, json_ipc=general_config.json_ipc)
if not (bool(alice_verifying_key) ^ bool(alice)):
message = f"Pass either '--alice_verifying_key' or '--alice'; " \
f"got {'both' if alice_verifying_key else 'neither'}"
raise click.BadOptionUsage(option_name='--alice_verifying_key, --alice', message=click.style(message, fg="red"))
if not alice_verifying_key:
if alice: # from storage
card = Card.load(identifier=alice)
if card.character is not Alice:
emitter.error('Grantee card is not an Alice.')
raise click.Abort
alice_verifying_key = bytes(card.verifying_key).hex()
emitter.message(f'{card.nickname or ("Alice #"+card.id.hex())}\n'
f'Verifying Key | {bytes(card.verifying_key).hex()}',
color='green')
if not force:
click.confirm('Is this the correct Granter (Alice)?', abort=True)
if ipfs:
# '--message_kit' option was repurposed to specify ipfs cids (#2098)
cids = []
for cid in message_kits:
cids.append(cid)
# populate message_kits list with actual message_kits
message_kits = []
import ipfshttpclient
# TODO: #2108
emitter.message(f"Connecting to IPFS Gateway {ipfs}")
ipfs_client = ipfshttpclient.connect(ipfs)
for cid in cids:
raw_message_kit = ipfs_client.cat(cid) # cat the contents at the hash reference
emitter.message(f"Downloaded message kit from IPFS (CID {cid})", color='green')
message_kit = raw_message_kit.decode() # cast to utf-8
message_kits.append(message_kit)
# Request
bob_request_data = {
'alice_verifying_key': alice_verifying_key,
'message_kits': message_kits,
'encrypted_treasure_map': treasure_map
}
response = BOB.controller.retrieve_and_decrypt(request=bob_request_data)
if decode:
messages = list([b64decode(r).decode() for r in response['cleartexts']])
emitter.echo('----------Messages----------')
for message in messages:
emitter.echo(message)
return response

View File

@ -1,148 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import click
import os
import shutil
from nucypher.control.emitters import StdoutEmitter
from nucypher.cli.actions.select import select_card
from nucypher.cli.options import option_force
from nucypher.cli.painting.policies import paint_single_card, paint_cards
from nucypher.cli.types import EXISTING_READABLE_FILE, UMBRAL_PUBLIC_KEY_HEX
from nucypher.policy.identity import Card
id_option = click.option('--id', 'card_id', help="A single card's checksum or ID", type=click.STRING, required=False)
type_option = click.option('--type', 'character_flag', help="Type of card: (A)lice or (B)ob.", type=click.STRING, required=False)
verifying_key_option = click.option('--verifying-key', help="Alice's or Bob's verifying key as hex", type=click.STRING, required=False)
encrypting_key_option = click.option('--encrypting-key', help="An encrypting key as hex", type=click.STRING, required=False)
nickname_option = click.option('--nickname', help="Human-readable nickname / alias for a card", type=click.STRING, required=False)
@click.group()
def contacts():
"""Lightweight contacts utility to store public keys of known ("character cards") Alices and Bobs."""
@contacts.command()
@click.argument('query')
@click.option('--qrcode', help="Display the QR code representing a card to the console", is_flag=True, default=None)
def show(query, qrcode):
"""
Lookup and view existing character card
QUERY can be either the card id or nickname.
"""
emitter = StdoutEmitter()
try:
card = select_card(emitter=emitter, card_identifier=query)
except Card.UnknownCard as e:
return emitter.error(str(e))
paint_single_card(emitter=emitter, card=card, qrcode=qrcode)
@contacts.command('list')
def _list():
"""Show all character cards"""
emitter = StdoutEmitter()
if not Card.CARD_DIR.is_dir():
Card.CARD_DIR.mkdir()
card_filepaths = list(Card.CARD_DIR.iterdir())
if not card_filepaths:
emitter.error(f'No cards found at {Card.CARD_DIR}. '
f"To create one run 'nucypher {contacts.name} {create.name}'.")
cards = list()
for filename in card_filepaths:
card = Card.load(filepath=Card.CARD_DIR / filename)
cards.append(card)
paint_cards(emitter=emitter, cards=cards, as_table=True)
@contacts.command()
@type_option
@encrypting_key_option
@verifying_key_option
@nickname_option
@option_force
def create(character_flag, verifying_key, encrypting_key, nickname, force):
"""Store a new character card"""
emitter = StdoutEmitter()
# Validate
if not all((character_flag, verifying_key, encrypting_key)) and force:
emitter.error(f'--verifying-key, --encrypting-key, and --type are required with --force enabled.')
# Card type
from constant_sorrow.constants import ALICE, BOB
flags = {'a': ALICE, 'b': BOB}
if not character_flag:
choice = click.prompt('Enter Card Type - (A)lice or (B)ob', type=click.Choice(['a', 'b'], case_sensitive=False))
character_flag = flags[choice]
else:
character_flag = flags[character_flag]
# Verifying Key
if not verifying_key:
verifying_key = click.prompt('Enter Verifying Key', type=UMBRAL_PUBLIC_KEY_HEX)
verifying_key = bytes.fromhex(verifying_key) # TODO: Move / Validate
# Encrypting Key
if character_flag is BOB:
if not encrypting_key:
encrypting_key = click.prompt('Enter Encrypting Key', type=UMBRAL_PUBLIC_KEY_HEX)
encrypting_key = bytes.fromhex(encrypting_key) # TODO: Move / Validate
# Init
new_card = Card(character_flag=character_flag,
verifying_key=verifying_key,
encrypting_key=encrypting_key,
nickname=nickname)
# Nickname
if not force and not nickname:
card_id_hex = new_card.id.hex()
nickname = click.prompt('Enter nickname for card', default=card_id_hex)
if nickname != card_id_hex: # not the default
nickname = nickname.strip()
new_card.nickname = nickname
# Save
new_card.save()
emitter.message(f'Saved new card {new_card}', color='green')
paint_single_card(emitter=emitter, card=new_card)
@contacts.command()
@id_option
@option_force
def delete(force, card_id):
"""Delete an existing character card."""
emitter = StdoutEmitter()
card = select_card(emitter=emitter, card_identifier=card_id)
if not force:
click.confirm(f'Are you sure you want to delete {card}?', abort=True)
card.delete()
emitter.message(f'Deleted card for {card.id.hex()}.', color='red')
@contacts.command()
@click.option('--filepath', help="System filepath of stored card to import", type=EXISTING_READABLE_FILE)
def import_card(filepath):
"""Import a character card from a card file"""
emitter = StdoutEmitter()
shutil.copy(filepath, Card.CARD_DIR)
# paint_single_card(card=card)
emitter.message(f'Successfully imported card.', color="green")

View File

@ -1,43 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from tabulate import tabulate
from typing import List
from nucypher.characters.lawful import Bob
from nucypher.policy.identity import Card
def paint_single_card(emitter, card: Card, qrcode: bool = False) -> None:
emitter.echo('*'*90, color='cyan')
emitter.message(f'{(card.nickname or str(card.character.__name__)).capitalize()}\'s Card (ID {card.id.hex()})', bold=True)
emitter.echo(f'Verifying Key - {bytes(card.verifying_key).hex()}')
if card.character is Bob:
emitter.echo(f'Encrypting Key - {bytes(card.encrypting_key).hex()}')
if qrcode:
card.to_qr_code()
emitter.echo('*'*90, color='cyan')
def paint_cards(emitter, cards: List[Card], as_table: bool = True) -> None:
if as_table:
rows = [card.describe() for card in cards]
emitter.echo(tabulate(rows, headers='keys', showindex='always', tablefmt="presto"))
else:
for card in cards:
paint_single_card(emitter=emitter, card=card)

View File

@ -1,347 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import base64
import hashlib
import json
from pathlib import Path
from typing import Callable, Dict, Optional, Union
import constant_sorrow
from bytestring_splitter import BytestringKwargifier, VariableLengthBytestring
from constant_sorrow.constants import ALICE, BOB, NO_SIGNATURE
from hexbytes.main import HexBytes
from nucypher_core.umbral import PublicKey
from nucypher.characters.base import Character
from nucypher.characters.lawful import Alice, Bob
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
from nucypher.crypto.powers import DecryptingPower, SigningPower
class Card:
""""
A simple serializable representation of a character's public materials.
"""
_alice_specification = dict(
character_flag=(bytes, 8),
verifying_key=(PublicKey, PublicKey.serialized_size()),
nickname=(bytes, VariableLengthBytestring),
)
_bob_specification = dict(
character_flag=(bytes, 8),
verifying_key=(PublicKey, PublicKey.serialized_size()),
encrypting_key=(PublicKey, PublicKey.serialized_size()),
nickname=(bytes, VariableLengthBytestring),
)
__CARD_TYPES = {
bytes(ALICE): Alice,
bytes(BOB): Bob,
}
__ID_LENGTH = 10 # TODO: Review this size (bytes of hex len?)
__MAX_NICKNAME_SIZE = 32
__BASE_PAYLOAD_SIZE = sum(length[1] for length in _bob_specification.values() if isinstance(length[1], int))
__MAX_CARD_LENGTH = __BASE_PAYLOAD_SIZE + __MAX_NICKNAME_SIZE + 2
__FILE_EXTENSION = 'card'
__DELIMITER = '.' # delimits nickname from ID
TRUNCATE = 16
CARD_DIR = Path(DEFAULT_CONFIG_ROOT) / 'cards'
NO_SIGNATURE.bool_value(False)
class InvalidCard(Exception):
"""Raised when an invalid, corrupted, or otherwise unsable card is encountered"""
class UnknownCard(Exception):
"""Raised when a card cannot be found in storage"""
class UnsignedCard(Exception):
"""Raised when a card serialization cannot be handled due to the lack of a signature"""
def __init__(self,
character_flag: Union[ALICE, BOB],
verifying_key: Union[PublicKey, bytes],
encrypting_key: Optional[Union[PublicKey, bytes]] = None,
nickname: Optional[Union[bytes, str]] = None):
try:
self.__character_class = self.__CARD_TYPES[bytes(character_flag)]
except KeyError:
raise ValueError(f'Unsupported card type {str(character_flag)}')
self.__character_flag = character_flag
if isinstance(verifying_key, bytes):
verifying_key = PublicKey.from_bytes(verifying_key)
self.__verifying_key = verifying_key # signing public key
if isinstance(encrypting_key, bytes):
encrypting_key = PublicKey.from_bytes(encrypting_key)
self.__encrypting_key = encrypting_key # public key
if isinstance(nickname, str):
nickname = nickname.encode()
self.__nickname = nickname or None
self.__validate()
def __repr__(self) -> str:
name = self.nickname or f'{self.__character_class.__name__}'
short_key = bytes(self.__verifying_key).hex()[:6]
r = f'{self.__class__.__name__}({name}:{short_key}:{self.id.hex()[:6]})'
return r
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
raise TypeError(f'Cannot compare {self.__class__.__name__} and {other}')
return self.id == other.id
def __validate(self) -> bool:
if self.__nickname and (len(self.__nickname) > self.__MAX_NICKNAME_SIZE):
raise self.InvalidCard(f'Nickname exceeds maximum length of {self.__MAX_NICKNAME_SIZE}')
return True
@classmethod
def __hash(cls, payload: bytes) -> HexBytes:
blake = hashlib.blake2b()
blake.update(payload)
digest = blake.digest().hex()
truncated_digest = digest[:cls.__ID_LENGTH]
return HexBytes(truncated_digest)
@property
def character(self):
return self.__CARD_TYPES[bytes(self.__character_flag)]
#
# Serializers
#
def __bytes__(self) -> bytes:
self.__validate()
payload = self.__payload
if self.nickname:
payload += VariableLengthBytestring(self.__nickname)
else:
payload += VariableLengthBytestring(b'')
return payload
def __hex__(self) -> str:
return self.to_hex()
@property
def __payload(self) -> bytes:
elements = [
self.__character_flag,
self.__verifying_key,
]
if self.character is Bob:
elements.append(self.__encrypting_key)
payload = b''.join(bytes(e) for e in elements)
return payload
@classmethod
def from_bytes(cls, card_bytes: bytes) -> 'Card':
if len(card_bytes) > cls.__MAX_CARD_LENGTH:
raise cls.InvalidCard(f'Card exceeds maximum size (max is {cls.__MAX_CARD_LENGTH} bytes card is {len(card_bytes)} bytes). '
f'Verify the card filepath and contents.')
character_flag = card_bytes[:8]
if character_flag == bytes(ALICE):
specification = cls._alice_specification
elif character_flag == bytes(BOB):
specification = cls._bob_specification
else:
raise RuntimeError(f'Unknown character card header ({character_flag}).')
return BytestringKwargifier(cls, **specification)(card_bytes)
@classmethod
def from_hex(cls, hexdata: str):
return cls.from_bytes(bytes.fromhex(hexdata))
def to_hex(self) -> str:
return bytes(self).hex()
@classmethod
def from_base64(cls, b64data: str):
return cls.from_bytes(base64.urlsafe_b64decode(b64data))
def to_base64(self) -> str:
return base64.urlsafe_b64encode(bytes(self)).decode()
def to_qr_code(self):
import qrcode
from qrcode.main import QRCode
qr = QRCode(
version=1,
box_size=1,
border=4, # min spec is 4
error_correction=qrcode.constants.ERROR_CORRECT_L,
)
qr.add_data(bytes(self))
qr.print_ascii()
@classmethod
def from_dict(cls, card: Dict):
instance = cls(nickname=card.get('nickname'),
verifying_key=card['verifying_key'],
encrypting_key=card['encrypting_key'],
character_flag=card['character'])
return instance
def to_dict(self) -> Dict:
payload = dict(
nickname=self.__nickname,
verifying_key=self.verifying_key,
encrypting_key=self.encrypting_key,
character=self.__character_flag
)
return payload
def describe(self, truncate: int = TRUNCATE) -> Dict:
description = dict(
nickname=self.__nickname,
id=self.id.hex(),
verifying_key=bytes(self.verifying_key).hex()[:truncate],
character=self.character.__name__
)
if self.character is Bob:
description['encrypting_key'] = bytes(self.encrypting_key).hex()[:truncate]
return description
def to_json(self, as_string: bool = True) -> Union[dict, str]:
payload = dict(
nickname=self.__nickname.decode(),
verifying_key=bytes(self.verifying_key).hex(),
encrypting_key=bytes(self.encrypting_key).hex(),
character=self.character.__name__
)
if as_string:
payload = json.dumps(payload)
return payload
@classmethod
def from_character(cls, character: Character, nickname: Optional[str] = None) -> 'Card':
flag = getattr(constant_sorrow.constants, character.__class__.__name__.upper())
instance = cls(verifying_key=character.public_keys(power_up_class=SigningPower),
encrypting_key=character.public_keys(power_up_class=DecryptingPower),
character_flag=bytes(flag),
nickname=nickname)
return instance
#
# Card API
#
@property
def verifying_key(self) -> PublicKey:
return self.__verifying_key
@property
def encrypting_key(self) -> PublicKey:
return self.__encrypting_key
@property
def id(self) -> HexBytes:
return self.__hash(self.__payload)
@property
def nickname(self) -> str:
if self.__nickname:
return self.__nickname.decode()
def set_nickname(self, nickname: str) -> None:
nickname = nickname.replace(' ', '_')
if len(nickname.encode()) > self.__MAX_NICKNAME_SIZE:
raise ValueError(f'New nickname exceeds maximum size ({self.__MAX_NICKNAME_SIZE} bytes)')
self.__nickname = nickname.encode()
@nickname.setter
def nickname(self, nickname: str) -> None:
self.set_nickname(nickname)
#
# Card Storage API
#
@property
def filepath(self) -> Path:
identifier = f'{self.nickname}{self.__DELIMITER}{self.id.hex()}' if self.__nickname else self.id.hex()
filename = f'{identifier}.{self.__FILE_EXTENSION}'
filepath = self.CARD_DIR / filename
return filepath
@property
def is_saved(self) -> bool:
exists = self.filepath.exists()
return exists
def save(self, encoder: Callable = base64.b64encode, overwrite: bool = False) -> Path:
if not self.CARD_DIR.is_dir():
self.CARD_DIR.mkdir()
if self.is_saved and not overwrite:
raise FileExistsError('Card exists. Pass overwrite=True to allow this operation.')
with open(self.filepath, 'wb') as file:
file.write(encoder(bytes(self)))
return Path(self.filepath)
@classmethod
def lookup(cls, identifier: str, card_dir: Optional[Path] = CARD_DIR) -> Path:
"""Resolve a card ID or nickname into a Path object"""
try:
nickname, _id = identifier.split(cls.__DELIMITER)
except ValueError:
nickname = identifier
filenames = [f for f in Card.CARD_DIR.iterdir() if nickname.lower() in f.name.lower()]
if not filenames:
raise cls.UnknownCard(f'Unknown card nickname or ID "{nickname}".')
elif len(filenames) == 1:
filename = filenames[0]
else:
raise ValueError(f'Ambiguous card nickname: {nickname}. Try using card ID instead.')
filepath = card_dir / filename
return filepath
@classmethod
def load(cls,
filepath: Optional[Path] = None,
identifier: str = None,
card_dir: Optional[Path] = None,
decoder: Callable = base64.b64decode
) -> 'Card':
if not card_dir:
card_dir = cls.CARD_DIR
if filepath and identifier:
raise ValueError(f'Pass either filepath or identifier, not both.')
if not filepath:
filepath = cls.lookup(identifier=identifier, card_dir=card_dir)
try:
with open(filepath, 'rb') as file:
card_bytes = decoder(file.read())
except FileNotFoundError:
raise cls.UnknownCard
instance = cls.from_bytes(card_bytes)
return instance
def delete(self) -> None:
self.filepath.unlink()

View File

@ -1,191 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from unittest import mock
from unittest.mock import PropertyMock
from nucypher.cli.commands.alice import AliceConfigOptions
from nucypher.cli.literature import COLLECT_NUCYPHER_PASSWORD, SUCCESSFUL_DESTRUCTION
from nucypher.cli.main import nucypher_cli
from nucypher.config.base import CharacterConfiguration
from nucypher.config.characters import AliceConfiguration
from nucypher.config.constants import NUCYPHER_ENVVAR_KEYSTORE_PASSWORD, TEMPORARY_DOMAIN
from nucypher.config.storages import LocalFileBasedNodeStorage
from nucypher.crypto.keystore import Keystore
from nucypher.policy.identity import Card
from tests.constants import (
FAKE_PASSWORD_CONFIRMED,
INSECURE_DEVELOPMENT_PASSWORD,
MOCK_CUSTOM_INSTALLATION_PATH
)
@mock.patch('nucypher.config.characters.AliceConfiguration.default_filepath', return_value='/non/existent/file')
def test_missing_configuration_file(default_filepath_mock, click_runner, test_registry_source_manager):
cmd_args = ('alice', 'run', '--network', TEMPORARY_DOMAIN)
env = {NUCYPHER_ENVVAR_KEYSTORE_PASSWORD: INSECURE_DEVELOPMENT_PASSWORD}
result = click_runner.invoke(nucypher_cli, cmd_args, catch_exceptions=False, env=env)
assert result.exit_code != 0
assert default_filepath_mock.called
assert "nucypher alice init" in result.output
def test_initialize_alice_defaults(click_runner, mocker, custom_filepath, monkeypatch, blockchain_ursulas, tmpdir):
monkeypatch.delenv(NUCYPHER_ENVVAR_KEYSTORE_PASSWORD, raising=False)
# Mock out filesystem writes
mocker.patch.object(AliceConfiguration, 'initialize', autospec=True)
mocker.patch.object(AliceConfiguration, 'to_configuration_file', autospec=True)
mocker.patch.object(LocalFileBasedNodeStorage, 'all', return_value=blockchain_ursulas)
# Mock Keystore init
keystore = Keystore.generate(keystore_dir=tmpdir, password=INSECURE_DEVELOPMENT_PASSWORD)
mocker.patch.object(CharacterConfiguration, 'keystore', return_value=keystore, new_callable=PropertyMock)
# Use default alice init args
init_args = ('alice', 'init',
'--network', TEMPORARY_DOMAIN,
'--config-root', str(custom_filepath.absolute()),
'--federated-only')
result = click_runner.invoke(nucypher_cli, init_args, input=FAKE_PASSWORD_CONFIRMED, catch_exceptions=False)
assert result.exit_code == 0
# REST Host
assert "nucypher alice run" in result.output
# Auth
assert COLLECT_NUCYPHER_PASSWORD in result.output, 'WARNING: User was not prompted for password'
assert 'Repeat for confirmation:' in result.output, 'User was not prompted to confirm password'
def test_alice_control_starts_with_mocked_keystore(click_runner, mocker, monkeypatch, custom_filepath):
monkeypatch.delenv(NUCYPHER_ENVVAR_KEYSTORE_PASSWORD, raising=False)
class MockKeystore:
is_unlocked = False
keystore_dir = custom_filepath / 'keystore'
keystore_path = custom_filepath / 'keystore' / 'path.json'
def derive_crypto_power(self, power_class, *args, **kwargs):
return power_class()
@classmethod
def unlock(cls, password, *args, **kwargs):
assert password == INSECURE_DEVELOPMENT_PASSWORD
cls.is_unlocked = True
good_enough_config = AliceConfiguration(dev_mode=True, federated_only=True, keystore=MockKeystore())
mocker.patch.object(AliceConfigOptions, "create_config", return_value=good_enough_config)
init_args = ('alice', 'run', '-x', '--lonely', '--network', TEMPORARY_DOMAIN)
result = click_runner.invoke(nucypher_cli, init_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0, result.output
def test_initialize_alice_with_custom_configuration_root(custom_filepath, click_runner, monkeypatch):
monkeypatch.delenv(NUCYPHER_ENVVAR_KEYSTORE_PASSWORD, raising=False)
# Use a custom local filepath for configuration
init_args = ('alice', 'init',
'--network', TEMPORARY_DOMAIN,
'--federated-only',
'--config-root', str(custom_filepath.absolute()))
result = click_runner.invoke(nucypher_cli, init_args, input=FAKE_PASSWORD_CONFIRMED, catch_exceptions=False)
assert result.exit_code == 0
# CLI Output
assert str(MOCK_CUSTOM_INSTALLATION_PATH) in result.output, "Configuration not in system temporary directory"
assert "nucypher alice run" in result.output, 'Help message is missing suggested command'
assert 'IPv4' not in result.output
# Files and Directories
assert custom_filepath.is_dir(), 'Configuration file does not exist'
assert (custom_filepath / 'keystore').is_dir(), 'Keystore does not exist'
# TODO: Only using in-memory node storage for now
# assert (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
assert not (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
# Auth
assert COLLECT_NUCYPHER_PASSWORD in result.output, 'WARNING: User was not prompted for password'
assert 'Repeat for confirmation:' in result.output, 'User was not prompted to confirm password'
def test_alice_control_starts_with_preexisting_configuration(click_runner, custom_filepath):
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
run_args = ('alice', 'run', '--dry-run', '--lonely', '--config-file', str(custom_config_filepath.absolute()))
result = click_runner.invoke(nucypher_cli, run_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0, result.exception
def test_alice_make_card(click_runner, custom_filepath, mocker):
mock_save_card = mocker.patch.object(Card, 'save')
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
command = ('alice', 'make-card', '--nickname', 'flora', '--config-file', str(custom_config_filepath.absolute()))
result = click_runner.invoke(nucypher_cli, command, input=FAKE_PASSWORD_CONFIRMED, catch_exceptions=False)
assert result.exit_code == 0
mock_save_card.assert_called_once()
assert "Saved new character card " in result.output
def test_alice_cannot_init_with_dev_flag(click_runner):
init_args = ('alice', 'init', '--network', TEMPORARY_DOMAIN, '--federated-only', '--dev')
result = click_runner.invoke(nucypher_cli, init_args, catch_exceptions=False)
assert result.exit_code == 2
assert 'Cannot create a persistent development character' in result.output, \
'Missing or invalid error message was produced.'
def test_alice_derive_policy_pubkey(click_runner):
label = 'random_label'
derive_key_args = ('alice', 'derive-policy-pubkey', '--label', label, '--dev')
result = click_runner.invoke(nucypher_cli, derive_key_args, catch_exceptions=False)
assert result.exit_code == 0
assert "policy_encrypting_key" in result.output
assert "label" in result.output
assert label in result.output
def test_alice_public_keys(click_runner):
derive_key_args = ('alice', 'public-keys', '--dev')
result = click_runner.invoke(nucypher_cli, derive_key_args, catch_exceptions=False)
assert result.exit_code == 0
assert "alice_verifying_key" in result.output
def test_alice_view_preexisting_configuration(click_runner, custom_filepath):
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
view_args = ('alice', 'config', '--config-file', str(custom_config_filepath.absolute()))
result = click_runner.invoke(nucypher_cli, view_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0
assert "checksum_address" in result.output
assert "domain" in result.output
assert TEMPORARY_DOMAIN in result.output
assert str(custom_filepath) in result.output
def test_alice_destroy(click_runner, custom_filepath):
"""Should be the last test since it deletes the configuration file"""
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
destroy_args = ('alice', 'destroy', '--config-file', str(custom_config_filepath.absolute()), '--force')
result = click_runner.invoke(nucypher_cli, destroy_args, catch_exceptions=False)
assert result.exit_code == 0
assert SUCCESSFUL_DESTRUCTION in result.output
assert not custom_config_filepath.exists(), "Alice config file was deleted"

View File

@ -1,263 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import json
from base64 import b64encode
from pathlib import Path
from unittest import mock
from nucypher.cli.commands.bob import BobCharacterOptions
from nucypher.cli.literature import SUCCESSFUL_DESTRUCTION, COLLECT_NUCYPHER_PASSWORD
from nucypher.cli.main import nucypher_cli
from nucypher.config.characters import BobConfiguration
from nucypher.config.constants import TEMPORARY_DOMAIN
from nucypher.control.emitters import JSONRPCStdoutEmitter
from nucypher.crypto.powers import SigningPower
from nucypher.policy.identity import Card
from nucypher.utilities.logging import GlobalLoggerSettings, Logger
from tests.constants import (
FAKE_PASSWORD_CONFIRMED,
INSECURE_DEVELOPMENT_PASSWORD,
MOCK_CUSTOM_INSTALLATION_PATH
)
log = Logger()
@mock.patch('nucypher.config.characters.BobConfiguration.default_filepath', return_value='/non/existent/file')
def test_missing_configuration_file(default_filepath_mock, click_runner):
cmd_args = ('bob', 'run')
result = click_runner.invoke(nucypher_cli, cmd_args, catch_exceptions=False)
assert result.exit_code != 0
assert default_filepath_mock.called
assert "nucypher bob init" in result.output
def test_initialize_bob_with_custom_configuration_root(click_runner, custom_filepath: Path):
# Use a custom local filepath for configuration
init_args = ('bob', 'init',
'--network', TEMPORARY_DOMAIN,
'--federated-only',
'--config-root', str(custom_filepath.absolute()))
result = click_runner.invoke(nucypher_cli, init_args, input=FAKE_PASSWORD_CONFIRMED, catch_exceptions=False)
assert result.exit_code == 0, result.exception
# CLI Output
assert str(MOCK_CUSTOM_INSTALLATION_PATH) in result.output, "Configuration not in system temporary directory"
assert "nucypher bob run" in result.output, 'Help message is missing suggested command'
assert 'IPv4' not in result.output
# Files and Directories
assert custom_filepath.is_dir(), 'Configuration file does not exist'
assert (custom_filepath / 'keystore').is_dir(), 'Keystore does not exist'
# TODO: Only using in-memory node storage for now
# assert (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
assert not (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
# Auth
assert COLLECT_NUCYPHER_PASSWORD in result.output, 'WARNING: User was not prompted for password'
assert 'Repeat for confirmation:' in result.output, 'User was not prompted to confirm password'
def test_bob_control_starts_with_preexisting_configuration(click_runner, custom_filepath: Path):
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
init_args = ('bob', 'run', '--dry-run', '--lonely', '--config-file', str(custom_config_filepath.absolute()))
result = click_runner.invoke(nucypher_cli, init_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0, result.exception
assert "Bob Verifying Key" in result.output
assert "Bob Encrypting Key" in result.output
def test_bob_make_card(click_runner, custom_filepath: Path, mocker):
mock_save_card = mocker.patch.object(Card, 'save')
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
command = ('bob', 'make-card', '--nickname', 'anders', '--config-file', str(custom_config_filepath.absolute()))
result = click_runner.invoke(nucypher_cli, command, input=FAKE_PASSWORD_CONFIRMED, catch_exceptions=False)
assert result.exit_code == 0
assert "Saved new character card " in result.output
mock_save_card.assert_called_once()
def test_bob_view_with_preexisting_configuration(click_runner, custom_filepath: Path):
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
view_args = ('bob', 'config', '--config-file', str(custom_config_filepath.absolute()))
result = click_runner.invoke(nucypher_cli, view_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0, result.exception
assert "checksum_address" in result.output
assert "domain" in result.output
assert TEMPORARY_DOMAIN in result.output
assert str(custom_filepath) in result.output
def test_bob_public_keys(click_runner):
derive_key_args = ('bob', 'public-keys', '--lonely', '--dev')
result = click_runner.invoke(nucypher_cli, derive_key_args, catch_exceptions=False)
assert result.exit_code == 0
assert "bob_encrypting_key" in result.output
assert "bob_verifying_key" in result.output
def test_bob_retrieve_and_decrypt(click_runner,
capsule_side_channel,
enacted_federated_policy,
federated_ursulas,
custom_filepath_2: Path,
federated_alice,
federated_bob,
mocker):
teacher = list(federated_ursulas)[0]
first_message, _ = capsule_side_channel.reset(plaintext_passthrough=True)
message_kits_b64 = [b64encode(bytes(message_kit)).decode() for message_kit in
[first_message, capsule_side_channel(), capsule_side_channel(), capsule_side_channel()]
]
bob_config_root = custom_filepath_2
bob_configuration_file_location = bob_config_root / BobConfiguration.generate_filename()
# I already have a Bob.
# Need to init so that the config file is made, even though we won't use this Bob.
bob_init_args = ('bob', 'init',
'--network', TEMPORARY_DOMAIN,
'--config-root', str(bob_config_root.absolute()),
'--federated-only')
envvars = {'NUCYPHER_KEYSTORE_PASSWORD': INSECURE_DEVELOPMENT_PASSWORD}
log.info("Init'ing a normal Bob; we'll substitute the Policy Bob in shortly.")
bob_init_response = click_runner.invoke(nucypher_cli, bob_init_args, catch_exceptions=False, env=envvars)
assert bob_init_response.exit_code == 0, bob_init_response.output
teacher_uri = teacher.seed_node_metadata(as_teacher_uri=True)
bob_config_file = str(bob_configuration_file_location.absolute())
policy_encrypting_key_hex = bytes(enacted_federated_policy.public_key).hex()
alice_verifying_key_hex = bytes(federated_alice.public_keys(SigningPower)).hex()
encrypted_treasure_map_b64 = b64encode(bytes(enacted_federated_policy.treasure_map)).decode()
# Retrieve without --alice_verifying_key or --alice specified - tests override of schema definition for CLI
retrieve_args = ('bob', 'retrieve-and-decrypt',
'--mock-networking',
'--json-ipc',
'--teacher', teacher_uri,
'--config-file', bob_config_file,
'--message-kit', message_kits_b64[0],
'--treasure-map', encrypted_treasure_map_b64,
)
retrieve_response = click_runner.invoke(nucypher_cli,
retrieve_args,
catch_exceptions=False,
env=envvars)
assert retrieve_response.exit_code != 0, "no alice_verifying_key specified"
assert "Pass either '--alice_verifying_key' or '--alice'; got neither" in retrieve_response.output, retrieve_response.output
# Retrieve with both --alice_verifying_key and --alice specified - should not be allowed
retrieve_args = ('bob', 'retrieve-and-decrypt',
'--mock-networking',
'--json-ipc',
'--teacher', teacher_uri,
'--config-file', bob_config_file,
'--message-kit', message_kits_b64[0],
'--alice-verifying-key', alice_verifying_key_hex,
'--alice', 'rando-card-nickname',
'--treasure-map', encrypted_treasure_map_b64,
)
retrieve_response = click_runner.invoke(nucypher_cli,
retrieve_args,
catch_exceptions=False,
env=envvars)
assert retrieve_response.exit_code != 0, "both alice_verifying_key and alice can't be specified"
assert "Pass either '--alice_verifying_key' or '--alice'; got both" in retrieve_response.output, retrieve_response.output
#
# Perform actual retrieve and decrypts
#
def substitute_bob(*args, **kwargs):
log.info("Substituting the Bob used in the CLI runtime.")
this_fuckin_guy = federated_bob
this_fuckin_guy.controller.emitter = JSONRPCStdoutEmitter()
return this_fuckin_guy
with mocker.patch.object(BobCharacterOptions, 'create_character', side_effect=substitute_bob):
#
# Retrieve one message kit
#
retrieve_args = ('bob', 'retrieve-and-decrypt',
'--mock-networking',
'--json-ipc',
'--teacher', teacher_uri,
'--config-file', bob_config_file,
'--message-kit', message_kits_b64[0],
'--alice-verifying-key', alice_verifying_key_hex,
'--treasure-map', encrypted_treasure_map_b64,
)
with GlobalLoggerSettings.pause_all_logging_while():
retrieve_response = click_runner.invoke(nucypher_cli,
retrieve_args,
catch_exceptions=False,
env=envvars)
log.info(f"Retrieval response: {retrieve_response.output}")
assert retrieve_response.exit_code == 0, retrieve_response.output
retrieve_response = json.loads(retrieve_response.output)
cleartexts = retrieve_response['result']['cleartexts']
assert len(cleartexts) == 1
assert cleartexts[0].encode() == capsule_side_channel.plaintexts[0]
#
# Retrieve and decrypt multiple message kits
#
retrieve_args = ('bob', 'retrieve-and-decrypt',
'--mock-networking',
'--json-ipc',
'--teacher', teacher_uri,
'--config-file', bob_config_file,
# use multiple message kits
'--message-kit', message_kits_b64[0],
'--message-kit', message_kits_b64[1],
'--message-kit', message_kits_b64[2],
'--message-kit', message_kits_b64[3],
'--alice-verifying-key', alice_verifying_key_hex,
'--treasure-map', encrypted_treasure_map_b64
)
with GlobalLoggerSettings.pause_all_logging_while():
retrieve_response = click_runner.invoke(nucypher_cli, retrieve_args, catch_exceptions=False, env=envvars)
log.info(f"Retrieval response: {retrieve_response.output}")
assert retrieve_response.exit_code == 0, retrieve_response.output
retrieve_response = json.loads(retrieve_response.output)
cleartexts = retrieve_response['result']['cleartexts']
assert len(cleartexts) == len(message_kits_b64)
for index, cleartext in enumerate(cleartexts):
assert cleartext.encode() == capsule_side_channel.plaintexts[index]
# NOTE: Should be the last test in this module since it deletes the configuration file
def test_bob_destroy(click_runner, custom_filepath: Path):
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
destroy_args = ('bob', 'destroy', '--config-file', str(custom_config_filepath.absolute()), '--force')
result = click_runner.invoke(nucypher_cli, destroy_args, catch_exceptions=False)
assert result.exit_code == 0, result.exception
assert SUCCESSFUL_DESTRUCTION in result.output
assert not custom_config_filepath.exists(), "Bob config file was deleted"

View File

@ -1,199 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import os
import pytest
import tempfile
from pathlib import Path
from nucypher_core.umbral import SecretKey
from nucypher.cli.main import nucypher_cli
from nucypher.policy.identity import Card
@pytest.fixture(scope='module', autouse=True)
def patch_card_directory(session_mocker):
custom_filepath = '/tmp/nucypher-test-cards-'
tmpdir = tempfile.TemporaryDirectory(prefix=custom_filepath)
tmpdir.cleanup()
session_mocker.patch.object(Card, 'CARD_DIR', return_value=Path(tmpdir.name),
new_callable=session_mocker.PropertyMock)
yield
tmpdir.cleanup()
@pytest.fixture(scope='module')
def alice_verifying_key():
return bytes(SecretKey.random().public_key()).hex()
@pytest.fixture(scope='module')
def bob_nickname():
return 'edward'.capitalize()
@pytest.fixture(scope='module')
def alice_nickname():
return 'alice'.capitalize()
@pytest.fixture(scope='module')
def bob_verifying_key():
return bytes(SecretKey.random().public_key()).hex()
@pytest.fixture(scope='module')
def bob_encrypting_key():
return bytes(SecretKey.random().public_key()).hex()
def test_card_directory_autocreation(click_runner, mocker):
mocked_is_dir = mocker.patch('pathlib.Path.is_dir', return_value=False)
mocked_mkdir = mocker.patch('pathlib.Path.mkdir')
mocked_listdir = mocker.patch('pathlib.Path.iterdir', return_value=[])
command = ('contacts', 'list')
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
mocked_is_dir.assert_called_once()
mocked_mkdir.assert_called_once()
mocked_listdir.assert_called_once()
def test_list_cards_with_none_created(click_runner, certificates_tempdir):
command = ('contacts', 'list')
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert f'No cards found at {Card.CARD_DIR}.' in result.output
def test_create_alice_card_interactive(click_runner, alice_verifying_key, alice_nickname, mocker):
command = ('contacts', 'create')
user_input = (
'a', # Alice
alice_verifying_key, # Public key
alice_nickname # Nickname
)
user_input = '\n'.join(user_input)
assert len(list(Card.CARD_DIR.iterdir())) == 0
# Let's play pretend: this alice does not have the card directory (yet)
mocker.patch('pathlib.Path.is_dir', return_value=False)
mocked_mkdir = mocker.patch('pathlib.Path.mkdir')
result = click_runner.invoke(nucypher_cli, command, input=user_input, catch_exceptions=False)
# The path was created.
mocked_mkdir.assert_called_once()
assert result.exit_code == 0, result.output
assert 'Enter Verifying Key' in result.output
assert 'Saved new card' in result.output
assert len(list(Card.CARD_DIR.iterdir())) == 1
def test_create_alice_card_inline(click_runner, alice_verifying_key, alice_nickname):
command = ('contacts', 'create',
'--type', 'a',
'--verifying-key', bytes(SecretKey.random().public_key()).hex(),
'--nickname', 'philippa')
assert len(list(Card.CARD_DIR.iterdir())) == 1
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert 'Saved new card' in result.output
assert len(list(Card.CARD_DIR.iterdir())) == 2
def test_create_bob_card_interactive(click_runner, bob_nickname, bob_encrypting_key, bob_verifying_key):
command = ('contacts', 'create')
user_input = (
'b', # Bob
bob_encrypting_key, # Public key 1
bob_verifying_key, # Public key 2
bob_nickname # Nickname
)
user_input = '\n'.join(user_input)
assert len(list(Card.CARD_DIR.iterdir())) == 2
result = click_runner.invoke(nucypher_cli, command, input=user_input, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert 'Enter Verifying Key' in result.output
assert 'Enter Encrypting Key' in result.output
assert 'Saved new card' in result.output
assert len(list(Card.CARD_DIR.iterdir())) == 3
def test_create_bob_card_inline(click_runner, alice_verifying_key, alice_nickname):
command = ('contacts', 'create',
'--type', 'b',
'--verifying-key', bytes(SecretKey.random().public_key()).hex(),
'--encrypting-key', bytes(SecretKey.random().public_key()).hex(),
'--nickname', 'hans')
assert len(list(Card.CARD_DIR.iterdir())) == 3
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert 'Saved new card' in result.output
assert len(list(Card.CARD_DIR.iterdir())) == 4
def test_show_unknown_card(click_runner, alice_nickname, alice_verifying_key):
command = ('contacts', 'show', 'idontknowwhothatis')
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert 'Unknown card nickname or ID' in result.output
def test_show_alice_card(click_runner, alice_nickname, alice_verifying_key):
command = ('contacts', 'show', alice_nickname)
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert alice_nickname in result.output
assert alice_verifying_key in result.output
def test_show_bob_card(click_runner, bob_nickname, bob_encrypting_key, bob_verifying_key):
command = ('contacts', 'show', bob_nickname)
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert bob_nickname in result.output
assert bob_encrypting_key in result.output
assert bob_verifying_key in result.output
def test_list_card(click_runner, bob_nickname, bob_encrypting_key,
bob_verifying_key, alice_nickname, alice_verifying_key):
command = ('contacts', 'list')
assert len(list(Card.CARD_DIR.iterdir())) == 4
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert bob_nickname in result.output
assert bob_encrypting_key[:Card.TRUNCATE] in result.output
assert bob_verifying_key[:Card.TRUNCATE] in result.output
assert alice_nickname in result.output
assert alice_verifying_key[:Card.TRUNCATE] in result.output
def test_delete_card(click_runner, bob_nickname):
command = ('contacts', 'delete', '--id', bob_nickname, '--force')
assert len(list(Card.CARD_DIR.iterdir())) == 4
result = click_runner.invoke(nucypher_cli, command, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert 'Deleted card' in result.output
assert len(list(Card.CARD_DIR.iterdir())) == 3

View File

@ -1,44 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest_twisted as pt
from nucypher.config.storages import LocalFileBasedNodeStorage
from tests.acceptance.cli.lifecycle import run_entire_cli_lifecycle
@pt.inlineCallbacks
def test_decentralized_cli_lifecycle(click_runner,
testerchain,
random_policy_label,
blockchain_ursulas,
custom_filepath,
custom_filepath_2,
agency_local_registry,
mocker):
# For the purposes of this test, assume that all peers are already known and stored.
mocker.patch.object(LocalFileBasedNodeStorage, 'all', return_value=blockchain_ursulas)
yield run_entire_cli_lifecycle(click_runner,
random_policy_label,
blockchain_ursulas,
custom_filepath,
custom_filepath_2,
agency_local_registry.filepath,
testerchain=testerchain)

View File

@ -1,32 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest_twisted as pt
from tests.acceptance.cli.lifecycle import run_entire_cli_lifecycle
@pt.inlineCallbacks
def test_federated_cli_lifecycle(click_runner,
random_policy_label,
federated_ursulas,
custom_filepath,
custom_filepath_2):
yield run_entire_cli_lifecycle(click_runner,
random_policy_label,
federated_ursulas,
custom_filepath,
custom_filepath_2)

View File

@ -1,79 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from nucypher.characters.lawful import Alice, Bob
from nucypher.crypto.powers import DecryptingPower, SigningPower
from nucypher.policy.identity import Card
from tests.utils.middleware import MockRestMiddleware
@pytest.mark.parametrize('character_class', (Bob, Alice))
def test_character_card(character_class, capsys):
character = character_class(federated_only=True,
start_learning_now=False,
network_middleware=MockRestMiddleware())
character_card = character.get_card()
same_card = Card.from_character(character)
assert character_card == same_card
with pytest.raises(TypeError):
# only cards can be compared to other cards
_ = character_card == same_card.verifying_key
# Bob's Keys
assert character_card.verifying_key == character.public_keys(SigningPower)
assert character_card.encrypting_key == character.public_keys(DecryptingPower)
# Card Serialization
# bytes
card_bytes = bytes(character_card)
assert Card.from_bytes(card_bytes) == character_card == same_card
# hex
hex_bob = character_card.to_hex()
assert Card.from_hex(hex_bob) == character_card == same_card
# base64
base64_bob = character_card.to_base64()
assert Card.from_base64(base64_bob) == character_card == same_card
# qr code echo
character_card.to_qr_code()
captured = capsys.readouterr()
qr_code_padding = '\xa0' * 21 # min length for qr code version 1
assert captured.out.startswith(qr_code_padding)
assert captured.out.endswith(f'{qr_code_padding}\n')
# filepath without nickname
assert character_card.id.hex() in str(character_card.filepath)
# nicknames
original_checksum = character_card.id
nickname = 'Wilson the Great'
expected_nickname = nickname.replace(' ', '_')
character_card.set_nickname(nickname)
restored = Card.from_bytes(bytes(character_card))
restored_checksum = restored.id
assert restored.nickname == expected_nickname
assert original_checksum == restored_checksum == same_card.id
# filepath with nickname
assert f'{expected_nickname}.{character_card.id.hex()}' in str(character_card.filepath)