Use crypto.passwords module for handling password key derivations.

pull/2701/head
Kieran R. Prasch 2021-06-30 16:36:20 -07:00
parent 8a54bd7e6d
commit 7ed52647f3
4 changed files with 52 additions and 55 deletions

View File

@ -31,15 +31,18 @@ import click
from constant_sorrow.constants import KEYSTORE_LOCKED
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from mnemonic.mnemonic import Mnemonic
from nacl.exceptions import CryptoError
from nacl.secret import SecretBox
from nucypher.characters.control.emitters import StdoutEmitter
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
from nucypher.crypto.constants import BLAKE2B
from nucypher.crypto.keypairs import HostingKeypair
from nucypher.crypto.passwords import (
secret_box_decrypt,
secret_box_encrypt,
derive_key_material_from_password,
SecretBoxAuthenticationError
)
from nucypher.crypto.powers import (
DecryptingPower,
DerivedKeyBasedPower,
@ -55,7 +58,6 @@ from nucypher.network.server import TLSHostingPower
# HKDF
__HKDF_HASH_ALGORITHM = BLAKE2B
__INFO_BASE = b'NuCypher/'
_WRAPPING_INFO = __INFO_BASE + b'wrap'
_VERIFYING_INFO = __INFO_BASE + b'verify'
_DECRYPTING_INFO = __INFO_BASE + b'encrypt'
_DELEGATING_INFO = __INFO_BASE + b'delegate'
@ -63,7 +65,6 @@ _TLS_INFO = __INFO_BASE + b'tls'
# Wrapping key
_SALT_SIZE = 32
__WRAPPING_KEY_LENGTH = 32
# Mnemonic
_ENTROPY_BITS = 256
@ -101,18 +102,9 @@ def __hkdf(key_material: bytes,
return kdf.derive(key_material)
def _derive_wrapping_key(password: str, salt: bytes) -> bytes:
"""Derives a symmetric encryption key from password and salt."""
kdf = Scrypt(
salt=salt,
length=__WRAPPING_KEY_LENGTH,
n=2 ** 14,
r=8,
p=1,
backend=default_backend()
)
derived_key = kdf.derive(key_material=password.encode())
return derived_key
def _derive_keying_material(material: bytes, info: bytes) -> bytes:
material = __hkdf(key_material=material, info=info, size=SecretKeyFactory.serialized_size())
return material
def _derive_umbral_key(material: bytes, info: bytes) -> SecretKey:
@ -121,17 +113,13 @@ def _derive_umbral_key(material: bytes, info: bytes) -> SecretKey:
return __private
def _derive_keying_material(material: bytes, info: bytes) -> bytes:
material = __hkdf(key_material=material, info=info, size=SecretKeyFactory.serialized_size())
return material
def _assemble_keystore(encrypted_secret: bytes, salt: bytes) -> Dict[str, Union[str, bytes]]:
def _assemble_keystore(encrypted_secret: bytes, password_salt: bytes, wrapper_salt: bytes) -> Dict[str, Union[str, bytes]]:
encoded_key_data = {
'version': '2.0',
'created': str(time.time()),
'key': encrypted_secret,
'salt': salt
'password_salt': password_salt,
'wrapper_salt': wrapper_salt,
}
return encoded_key_data
@ -178,7 +166,7 @@ def _write_keystore(path: Path, payload: Dict[str, bytes], serializer: Callable)
def _serialize_keystore(payload: Dict) -> bytes:
for field in ('key', 'salt'):
for field in ('key', 'password_salt', 'wrapper_salt'):
payload[field] = bytes(payload[field]).hex()
try:
metadata = json.dumps(payload, indent=4)
@ -193,7 +181,7 @@ def _deserialize_keystore(payload: bytes):
payload = json.loads(payload)
except JSONDecodeError:
raise Keystore.Invalid("Invalid or corrupted key data")
for field in ('key', 'salt'):
for field in ('key', 'password_salt', 'wrapper_salt'):
payload[field] = bytes.fromhex(payload[field])
return payload
@ -304,9 +292,16 @@ class Keystore:
def __decrypt_keystore(self, path: Path, password: str) -> bool:
payload = _read_keystore(path, deserializer=_deserialize_keystore)
wrapping_key = _derive_wrapping_key(salt=payload['salt'], password=password)
self.__secret = SecretBox(wrapping_key).decrypt(payload['key'])
return True
__password_material = derive_key_material_from_password(password=password.encode(),
salt=payload['password_salt'])
try:
self.__secret = secret_box_decrypt(key_material=__password_material,
ciphertext=payload['key'],
salt=payload['wrapper_salt'])
return True
except SecretBoxAuthenticationError:
self.__secret = KEYSTORE_LOCKED
raise self.AuthenticationFailed
@staticmethod
def __save(secret: bytes, password: str, keystore_dir: Optional[Path] = None) -> Path:
@ -316,7 +311,7 @@ class Keystore:
# to help avoid unintentional logging of the password.
raise InvalidPassword(''.join(failures))
# Derive verifying key (used as ID)
# Derive verifying key (for use as ID)
verifying_key = _derive_umbral_key(material=secret, info=_VERIFYING_INFO)
keystore_id = bytes(verifying_key).hex()[:Keystore._ID_SIZE]
@ -326,12 +321,19 @@ class Keystore:
keystore_path = generate_keystore_filepath(parent=keystore_dir, id=keystore_id)
# Encrypt secret
__salt = token_bytes(_SALT_SIZE)
__wrapping_key = _derive_wrapping_key(salt=__salt, password=password)
encrypted_secret = bytes(SecretBox(__wrapping_key).encrypt(secret))
__password_salt = token_bytes(_SALT_SIZE)
__password_material = derive_key_material_from_password(password=password.encode(),
salt=__password_salt)
__wrapper_salt = token_bytes(_SALT_SIZE)
encrypted_secret = secret_box_encrypt(plaintext=secret,
key_material=__password_material,
salt=__wrapper_salt)
# Create keystore file
keystore_payload = _assemble_keystore(encrypted_secret=encrypted_secret, salt=__salt)
keystore_payload = _assemble_keystore(encrypted_secret=encrypted_secret,
password_salt=__password_salt,
wrapper_salt=__wrapper_salt)
_write_keystore(path=keystore_path, payload=keystore_payload, serializer=_serialize_keystore)
return keystore_path
@ -401,11 +403,7 @@ class Keystore:
self.__secret = KEYSTORE_LOCKED
def unlock(self, password: str) -> None:
try:
self.__decrypt_keystore(path=self.keystore_path, password=password)
except CryptoError:
self.__secret = KEYSTORE_LOCKED
raise self.AuthenticationFailed
self.__decrypt_keystore(path=self.keystore_path, password=password)
def derive_crypto_power(self,
power_class: ClassVar[CryptoPowerUp],

View File

@ -15,20 +15,16 @@ You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from typing import Optional
from cryptography.exceptions import InternalError
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from nacl.secret import SecretBox
from nacl.utils import random as nacl_random
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from nacl.exceptions import CryptoError
from nacl.secret import SecretBox
from nucypher.crypto.constants import BLAKE2B
__MASTER_KEY_LENGTH = 32 # This will be passed to HKDF, but it is not picky about the length
__MASTER_KEY_LENGTH = 32 # This will be passed to HKDF, but it is not picky about the length
__WRAPPING_KEY_LENGTH = SecretBox.KEY_SIZE
__WRAPPING_KEY_INFO = b'NuCypher-KeyWrap'
__HKDF_HASH_ALGORITHM = BLAKE2B

View File

@ -20,7 +20,6 @@ import os
import pytest
from constant_sorrow.constants import NO_PASSWORD
from mnemonic.mnemonic import Mnemonic
from nacl.exceptions import CryptoError
from nucypher.blockchain.eth.decorators import InvalidChecksumAddress
from nucypher.characters.control.emitters import StdoutEmitter
@ -37,7 +36,9 @@ from nucypher.cli.literature import (
GENERIC_PASSWORD_PROMPT
)
from nucypher.config.base import CharacterConfiguration
from nucypher.crypto import passwords
from nucypher.crypto.keystore import Keystore
from nucypher.crypto.passwords import SecretBoxAuthenticationError
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
@ -106,7 +107,7 @@ def test_get_nucypher_password(mock_stdin, mock_account, confirm, capsys):
def test_unlock_nucypher_keystore_invalid_password(mocker, test_emitter, alice_blockchain_test_config, capsys, tmpdir):
# Setup
mocker.patch.object(Keystore, '_Keystore__decrypt_keystore', side_effect=CryptoError)
mocker.patch.object(passwords, 'secret_box_decrypt', side_effect=SecretBoxAuthenticationError)
mocker.patch.object(CharacterConfiguration,
'dev_mode',
return_value=False,

View File

@ -125,12 +125,13 @@ def test_keystore_derive_crypto_power_without_unlock(tmpdir):
def test_keystore_serializer():
encrypted_secret, salt = b'peanuts! Get your peanuts!', b'sea salt'
payload = _assemble_keystore(encrypted_secret=encrypted_secret, salt=salt)
encrypted_secret, psalt, wsalt = b'peanuts! Get your peanuts!', b'sea salt', b'bath salt'
payload = _assemble_keystore(encrypted_secret=encrypted_secret, password_salt=psalt, wrapper_salt=wsalt)
serialized_payload = _serialize_keystore(payload)
deserialized_key_data = _deserialize_keystore(serialized_payload)
assert deserialized_key_data['key'] == encrypted_secret
assert deserialized_key_data['salt'] == salt
assert deserialized_key_data['password_salt'] == psalt
assert deserialized_key_data['wrapper_salt'] == wsalt
def test_keystore_lock_unlock(tmpdir):
@ -169,12 +170,13 @@ def test_keystore_lock_unlock(tmpdir):
def test_write_keystore_file(temp_dir_path):
temp_filepath = Path(temp_dir_path) / "test_private_key_serialization_file"
encrypted_secret, salt = b'peanuts! Get your peanuts!', b'sea salt'
payload = _assemble_keystore(encrypted_secret=encrypted_secret, salt=salt)
encrypted_secret, psalt, wsalt = b'peanuts! Get your peanuts!', b'sea salt', b'bath_salt'
payload = _assemble_keystore(encrypted_secret=encrypted_secret, password_salt=psalt, wrapper_salt=wsalt)
_write_keystore(path=temp_filepath, payload=payload, serializer=_serialize_keystore)
deserialized_payload_from_file = _read_keystore(path=temp_filepath, deserializer=_deserialize_keystore)
assert deserialized_payload_from_file['key'] == encrypted_secret
assert deserialized_payload_from_file['salt'] == salt
assert deserialized_payload_from_file['password_salt'] == psalt
assert deserialized_payload_from_file['wrapper_salt'] == wsalt
def test_decrypt_keystore(tmpdir, mocker):