Implements protected key caching methods for keyring.

pull/216/head
Kieran Prasch 2018-04-10 14:41:44 -07:00
parent 5e3f1d94f9
commit ec88557dfa
2 changed files with 87 additions and 88 deletions

View File

@ -16,6 +16,7 @@ from web3.auto import w3
from nkms.config import utils
from nkms.config.configs import _DEFAULT_CONFIGURATION_DIR, KMSConfigurationError
from nkms.config.utils import _parse_keyfile, _save_private_keyfile
from nkms.crypto.powers import SigningPower, EncryptingPower, CryptoPower
from nkms.keystore.keypairs import SigningKeypair, EncryptingKeypair
@ -85,7 +86,7 @@ def _encrypt_umbral_key(wrapping_key: bytes, umbral_key: UmbralPrivateKey) -> di
# TODO: Handle decryption failures
def _decrypt_key(wrapping_key: bytes, nonce: bytes, enc_key_material: bytes) -> UmbralPrivateKey:
def _decrypt_umbral_key(wrapping_key: bytes, nonce: bytes, enc_key_material: bytes) -> UmbralPrivateKey:
"""
Decrypts an encrypted key with nacl's XSalsa20-Poly1305 algorithm (SecretBox).
Returns a decrypted key as an UmbralPrivateKey.
@ -112,53 +113,6 @@ def _generate_signing_keys() -> tuple:
return privkey, pubkey
def _parse_keyfile(keypath: str):
"""Parses a keyfile and returns key metadata as a dict."""
with open(keypath, 'r') as keyfile:
try:
key_metadata = json.loads(keyfile)
except json.JSONDecodeError:
raise KMSConfigurationError("Invalid data in keyfile {}".format(keypath))
else:
return key_metadata
def _save_private_keyfile(keypath: str, key_data: dict) -> str:
"""
Creates a permissioned keyfile and save it to the local filesystem.
The file must be created in this call, and will fail if the path exists.
Returns the filepath string used to write the keyfile.
Note: getting and setting the umask is not thread-safe!
See linux open docs: http://man7.org/linux/man-pages/man2/open.2.html
---------------------------------------------------------------------
O_CREAT - If pathname does not exist, create it as a regular file.
O_EXCL - Ensure that this call creates the file: if this flag is
specified in conjunction with O_CREAT, and pathname already
exists, then open() fails with the error EEXIST.
---------------------------------------------------------------------
"""
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL # Write, Create, Non-Existing
mode = stat.S_IRUSR | stat.S_IWUSR # 0o600
try:
keyfile_descriptor = os.open(path=keypath, flags=flags, mode=mode)
finally:
os.umask(0) # Set the umask to 0 after opening
# Write and destroy file descriptor reference
with os.fdopen(keyfile_descriptor, 'w') as keyfile:
keyfile.write(json.dumps(key_data))
output_path = keyfile.name
del keyfile_descriptor
return output_path
def _generate_transacting_keys(passphrase: str) -> dict:
"""Create a new wallet address and private "transacting" key from the provided passphrase"""
entropy = os.urandom(32) # max out entropy for keccak256
@ -167,37 +121,6 @@ def _generate_transacting_keys(passphrase: str) -> dict:
return encrypted_wallet_data
# TODO: Make these one function
def _get_decrypting_key(self, master_key: bytes = None) -> UmbralPrivateKey:
"""Returns plaintext version of decrypting key."""
key_data = _parse_keyfile(self.__private_key_dir)
# TODO: Prompt user for password?
if master_key is None:
return
wrap_key = _derive_wrapping_key_from_master_key(key_data['wrap_salt'], master_key)
plain_key = _decrypt_key(wrap_key, key_data['nonce'], key_data['enc_key'])
return plain_key
def _get_signing_key(self, master_key: bytes = None) -> UmbralPrivateKey:
"""Returns plaintext version of private signature ("decrypting") key."""
key_data = _parse_keyfile(self.__signing_keypath)
# TODO: Prompt user for password?
if master_key is None:
return
wrap_key = _derive_wrapping_key_from_master_key(key_data['wrap_salt'], master_key)
plain_key = _decrypt_key(wrap_key, key_data['nonce'], key_data['enc_key'])
return plain_key
class KMSKeyring:
"""
Warning: This class handles private keys!
@ -225,6 +148,12 @@ class KMSKeyring:
'transacting': os.path.join(__default_private_key_dir, 'wallet.json')
}
class KeyringError(Exception):
pass
class KeyringLocked(KeyringError):
pass
def __init__(self, root_key_path: str=None, signing_key_path: str=None, transacting_key_path: str=None):
"""
Generates a KMSKeyring instance with the provided key paths,
@ -251,7 +180,29 @@ class KMSKeyring:
def __del__(self):
self.lock()
def __decrypt_keyfile(self, key_path: str) -> UmbralPrivateKey:
"""Returns plaintext version of decrypting key."""
# Checks for cached key
if self.__derived_master_key is None:
message = 'The keyring cannot be used when it is locked. Call .unlock first.'
raise self.KeyringLocked(message)
key_data = _parse_keyfile(key_path)
wrap_key = _derive_wrapping_key_from_master_key(key_data['wrap_salt'], self.__derived_master_key)
plain_umbral_key = _decrypt_umbral_key(wrap_key, key_data['nonce'], key_data['enc_key'])
return plain_umbral_key
def unlock(self, passphrase: bytes) -> None:
if self.__derived_master_key is not None:
raise Exception('Keyring already unlocked')
derived_key = _derive_master_key_from_passphrase(passphrase=passphrase)
self.__derived_master_key = derived_key
def lock(self) -> None:
"""Make efforts to remove references to the cached key data"""
self.__derived_master_key = None
self.__transacting_private_key = None
@ -261,23 +212,23 @@ class KMSKeyring:
a either a SigningPower or EncryptingPower with the coinciding
private key.
TODO: Derive a key from the root_key.
TODO: TransactingPower
"""
if power_class is SigningPower:
umbral_privkey = _get_signing_key(self.__derived_master_key)
keypair = SigningKeypair(umbral_privkey)
key_path = self.__signing_keypath
elif power_class is EncryptingPower:
# TODO: Derive a key from the root_key.
umbral_privkey = _get_decrypting_key(self.__derived_master_key)
keypair = EncryptingKeypair(umbral_privkey)
key_path = self.__root_keypath
else:
failure_message = "{} is an invalid type for deriving a CryptoPower.".format(type(power_class))
raise ValueError(failure_message)
new_power = power_class(keypair=keypair)
return new_power
umbral_privkey = self.__decrypt_keyfile(key_path)
keypair = power_class._keypair_class(umbral_privkey)
new_cryptopower = power_class(keypair=keypair)
return new_cryptopower
@classmethod
def generate(cls, passphrase: str, encryption: bool=True, transacting: bool=True, output_path: str=None) -> 'KMSKeyring':

View File

@ -1,7 +1,55 @@
import json
import os
from nkms.config.keys import KMSKeyring
from .configs import _DEFAULT_CONFIGURATION_DIR
from .configs import _DEFAULT_CONFIGURATION_DIR, KMSConfigurationError
def _save_private_keyfile(keypath: str, key_data: dict) -> str:
"""
Creates a permissioned keyfile and save it to the local filesystem.
The file must be created in this call, and will fail if the path exists.
Returns the filepath string used to write the keyfile.
Note: getting and setting the umask is not thread-safe!
See linux open docs: http://man7.org/linux/man-pages/man2/open.2.html
---------------------------------------------------------------------
O_CREAT - If pathname does not exist, create it as a regular file.
O_EXCL - Ensure that this call creates the file: if this flag is
specified in conjunction with O_CREAT, and pathname already
exists, then open() fails with the error EEXIST.
---------------------------------------------------------------------
"""
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL # Write, Create, Non-Existing
mode = stat.S_IRUSR | stat.S_IWUSR # 0o600
try:
keyfile_descriptor = os.open(path=keypath, flags=flags, mode=mode)
finally:
os.umask(0) # Set the umask to 0 after opening
# Write and destroy file descriptor reference
with os.fdopen(keyfile_descriptor, 'w') as keyfile:
keyfile.write(json.dumps(key_data))
output_path = keyfile.name
del keyfile_descriptor
return output_path
def _parse_keyfile(keypath: str):
"""Parses a keyfile and returns key metadata as a dict."""
with open(keypath, 'r') as keyfile:
try:
key_metadata = json.loads(keyfile)
except json.JSONDecodeError:
raise KMSConfigurationError("Invalid data in keyfile {}".format(keypath))
else:
return key_metadata
def generate_confg_dir(path: str=None,) -> None: