Merge pull request #2742 from KPrasch/byok

Bring your own key material
pull/2786/head
KPrasch 2021-08-23 11:47:11 -07:00 committed by GitHub
commit b3572c7b65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 100 additions and 22 deletions

View File

@ -0,0 +1 @@
Allow importing of secret key material for power derivations.

View File

@ -55,16 +55,15 @@ from nucypher.cli.options import (
option_teacher_uri,
option_threshold,
option_lonely,
option_max_gas_price
option_max_gas_price,
option_key_material
)
from nucypher.cli.painting.help import paint_new_installation_help
from nucypher.cli.painting.policies import paint_single_card
from nucypher.cli.types import EIP55_CHECKSUM_ADDRESS
from nucypher.cli.utils import make_cli_character, setup_emitter
from nucypher.config.characters import AliceConfiguration
from nucypher.config.constants import (
TEMPORARY_DOMAIN,
)
from nucypher.config.constants import TEMPORARY_DOMAIN
from nucypher.crypto.keystore import Keystore
from nucypher.network.middleware import RestMiddleware
from nucypher.policy.identity import Card
@ -185,7 +184,7 @@ class AliceFullConfigOptions:
self.shares = shares
self.payment_periods = payment_periods
def generate_config(self, emitter: StdoutEmitter, config_root: Path) -> AliceConfiguration:
def generate_config(self, emitter: StdoutEmitter, config_root: Path, key_material: str) -> AliceConfiguration:
opts = self.config_options
@ -207,6 +206,7 @@ class AliceFullConfigOptions:
return AliceConfiguration.generate(
password=get_nucypher_password(emitter=emitter, confirm=True),
key_material=bytes.fromhex(key_material) if key_material else None,
config_root=config_root,
checksum_address=pay_with,
domain=opts.domain,
@ -295,12 +295,15 @@ def alice():
@group_full_config_options
@option_config_root
@group_general_config
def init(general_config, full_config_options, config_root):
@option_key_material
def init(general_config, full_config_options, config_root, key_material):
"""Create a brand new persistent Alice."""
emitter = setup_emitter(general_config)
if not config_root:
config_root = general_config.config_root
new_alice_config = full_config_options.generate_config(emitter, config_root)
new_alice_config = full_config_options.generate_config(emitter=emitter,
config_root=config_root,
key_material=key_material)
filepath = new_alice_config.to_configuration_file()
paint_new_installation_help(emitter, new_configuration=new_alice_config, filepath=filepath)

View File

@ -14,6 +14,8 @@
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from base64 import b64decode
from pathlib import Path
@ -50,7 +52,8 @@ from nucypher.cli.options import (
option_signer_uri,
option_teacher_uri,
option_lonely,
option_max_gas_price
option_max_gas_price,
option_key_material
)
from nucypher.cli.painting.help import paint_new_installation_help
from nucypher.cli.painting.policies import paint_single_card
@ -133,7 +136,7 @@ class BobConfigOptions:
handle_missing_configuration_file(character_config_class=BobConfiguration,
config_file=config_file)
def generate_config(self, emitter: StdoutEmitter, config_root: Path) -> BobConfiguration:
def generate_config(self, emitter: StdoutEmitter, config_root: Path, key_material: str) -> BobConfiguration:
checksum_address = self.checksum_address
if not checksum_address and not self.federated_only:
@ -143,6 +146,7 @@ class BobConfigOptions:
return BobConfiguration.generate(
password=get_nucypher_password(emitter=emitter, confirm=True),
key_material=bytes.fromhex(key_material) if key_material else None,
config_root=config_root,
checksum_address=checksum_address,
domain=self.domain,
@ -227,12 +231,15 @@ def bob():
@option_federated_only
@option_config_root
@group_general_config
def init(general_config, config_options, config_root):
@option_key_material
def init(general_config, config_options, config_root, key_material):
"""Create a brand new persistent Bob."""
emitter = setup_emitter(general_config)
if not config_root:
config_root = general_config.config_root
new_bob_config = config_options.generate_config(emitter, config_root)
new_bob_config = config_options.generate_config(emitter=emitter,
config_root=config_root,
key_material=key_material)
filepath = new_bob_config.to_configuration_file()
paint_new_installation_help(emitter, new_configuration=new_bob_config, filepath=filepath)

View File

@ -53,7 +53,8 @@ from nucypher.cli.options import (
option_signer_uri,
option_teacher_uri,
option_lonely,
option_max_gas_price
option_max_gas_price,
option_key_material
)
from nucypher.cli.painting.help import paint_new_installation_help
from nucypher.cli.types import EIP55_CHECKSUM_ADDRESS, NETWORK_PORT, WORKER_IP
@ -161,7 +162,7 @@ class UrsulaConfigOptions:
# TODO: Exit codes (not only for this, but for other exceptions)
return click.get_current_context().exit(1)
def generate_config(self, emitter, config_root, force):
def generate_config(self, emitter, config_root, force, key_material):
if self.dev:
raise RuntimeError('Persistent configurations cannot be created in development mode.')
@ -180,6 +181,7 @@ class UrsulaConfigOptions:
self.rest_host = collect_worker_ip_address(emitter, network=self.domain, force=force)
return UrsulaConfiguration.generate(password=get_nucypher_password(emitter=emitter, confirm=True),
key_material=bytes.fromhex(key_material) if key_material else None,
config_root=config_root,
rest_host=self.rest_host,
rest_port=self.rest_port,
@ -295,7 +297,8 @@ def ursula():
@option_force
@option_config_root
@group_general_config
def init(general_config, config_options, force, config_root):
@option_key_material
def init(general_config, config_options, force, config_root, key_material):
"""Create a new Ursula node configuration."""
emitter = setup_emitter(general_config, config_options.worker_address)
_pre_launch_warnings(emitter, dev=None, force=force)
@ -305,7 +308,10 @@ def init(general_config, config_options, force, config_root):
raise click.BadOptionUsage('--provider', message="--provider is required to initialize a new ursula.")
if not config_options.federated_only and not config_options.domain:
config_options.domain = select_network(emitter)
ursula_config = config_options.generate_config(emitter, config_root, force)
ursula_config = config_options.generate_config(emitter=emitter,
config_root=config_root,
force=force,
key_material=key_material)
filepath = ursula_config.to_configuration_file()
paint_new_installation_help(emitter, new_configuration=ursula_config, filepath=filepath)

View File

@ -49,6 +49,7 @@ option_federated_only = click.option('--federated-only/--decentralized', '-F', h
option_force = click.option('--force', help="Don't ask for confirmation", is_flag=True)
option_gas_price = click.option('--gas-price', help="Set a static gas price (in GWEI)", type=GWEI)
option_gas_strategy = click.option('--gas-strategy', help="Operate with a specified gas price strategy", type=click.STRING) # TODO: GAS_STRATEGY_CHOICES
option_key_material = click.option('--key-material', help="A pre-secured hex-encoded secret to use for private key derivations", type=click.STRING)
option_max_gas_price = click.option('--max-gas-price', help="Maximum acceptable gas price (in GWEI)", type=GWEI)
option_hw_wallet = click.option('--hw-wallet/--no-hw-wallet')
option_light = click.option('--light', help="Indicate that node is light", is_flag=True, default=None)

View File

@ -579,10 +579,10 @@ class CharacterConfiguration(BaseConfiguration):
return super().update(filepath=self.config_file_location, **kwargs)
@classmethod
def generate(cls, password: str, *args, **kwargs):
def generate(cls, password: str, key_material: Optional[bytes] = None, *args, **kwargs):
"""Shortcut: Hook-up a new initial installation and configuration."""
node_config = cls(dev_mode=False, *args, **kwargs)
node_config.initialize(password=password)
node_config.initialize(key_material=key_material, password=password)
return node_config
def cleanup(self) -> None:
@ -769,7 +769,7 @@ class CharacterConfiguration(BaseConfiguration):
power_ups.append(power_up)
return power_ups
def initialize(self, password: str) -> Path:
def initialize(self, password: str, key_material: Optional[bytes] = None) -> str:
"""Initialize a new configuration and write installation files to disk."""
# Development
@ -780,7 +780,9 @@ class CharacterConfiguration(BaseConfiguration):
# Persistent
else:
self._ensure_config_root_exists()
self.write_keystore(password=password, interactive=self.MNEMONIC_KEYSTORE)
self.write_keystore(key_material=key_material,
password=password,
interactive=self.MNEMONIC_KEYSTORE)
self._cache_runtime_filepaths()
self.node_storage.initialize()
@ -792,10 +794,17 @@ class CharacterConfiguration(BaseConfiguration):
# Success
message = "Created nucypher installation files at {}".format(self.config_root)
self.log.debug(message)
return self.config_root
return Path(self.config_root)
def write_keystore(self, password: str, interactive: bool = True) -> Keystore:
self.__keystore = Keystore.generate(password=password, keystore_dir=self.keystore_dir, interactive=interactive)
def write_keystore(self, password: str, key_material: Optional[bytes] = None, interactive: bool = True) -> Keystore:
if key_material:
self.__keystore = Keystore.import_secure(key_material=key_material,
password=password,
keystore_dir=self.keystore_dir)
else:
self.__keystore = Keystore.generate(password=password,
keystore_dir=self.keystore_dir,
interactive=interactive)
return self.keystore
@classmethod

View File

@ -320,6 +320,22 @@ class Keystore:
instance = cls(keystore_path=filepath)
return instance
@classmethod
def import_secure(cls, key_material: bytes, password: str, keystore_dir: Optional[Path] = None) -> 'Keystore':
"""
Generate a Keystore using a a custom pre-secured entropy blob.
This method of keystore creation does not generate a mnemonic phrase - it is assumed
that the provided blob is recoverable and secure.
"""
emitter = StdoutEmitter()
emitter.message(f'WARNING: Key importing assumes that you have already secured your secret '
f'and can recover it. No mnemonic will be generated.\n', color='yellow')
if len(key_material) != SecretKey.serialized_size():
raise ValueError(f'Entropy bytes bust be exactly {SecretKey.serialized_size()}.')
path = Keystore.__save(secret=key_material, password=password, keystore_dir=keystore_dir)
keystore = cls(keystore_path=path)
return keystore
@classmethod
def restore(cls, words: str, password: str, keystore_dir: Optional[Path] = None) -> 'Keystore':
"""Restore a keystore from seed words"""

View File

@ -41,6 +41,7 @@ from nucypher.crypto.keystore import (
_read_keystore
)
from nucypher.crypto.powers import DecryptingPower, SigningPower, DelegatingPower
from nucypher.crypto.umbral_adapter import SecretKey
from nucypher.crypto.umbral_adapter import (
secret_key_factory_from_seed,
secret_key_factory_from_secret_key_factory
@ -241,6 +242,40 @@ def test_restore_keystore_from_mnemonic(tmpdir, mocker):
assert keystore._Keystore__secret == secret
def test_import_custom_keystore(tmpdir):
# Too short - 32 bytes is required
custom_secret = b'tooshort'
with pytest.raises(ValueError, match=f'Entropy bytes bust be exactly {SecretKey.serialized_size()}.'):
_keystore = Keystore.import_secure(key_material=custom_secret,
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir)
# Too short - 32 bytes is required
custom_secret = b'thisisabunchofbytesthatisabittoolong'
with pytest.raises(ValueError, match=f'Entropy bytes bust be exactly {SecretKey.serialized_size()}.'):
_keystore = Keystore.import_secure(key_material=custom_secret,
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir)
# Import private key
custom_secret = os.urandom(SecretKey.serialized_size()) # insecure but works
keystore = Keystore.import_secure(key_material=custom_secret,
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
assert keystore._Keystore__secret == custom_secret
keystore.lock()
path = keystore.keystore_path
del keystore
# Restore custom secret from encrypted keystore file
keystore = Keystore(keystore_path=path)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
assert keystore._Keystore__secret == custom_secret
def test_derive_signing_power(tmpdir):
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)