Merge pull request #78 from jMyles/character-crypto

Character-level encryption, decryption, signing, and verification.  Alice, Bob, and Ursula classes working.
pull/81/head
Justin Holmes 2017-10-17 10:52:24 -07:00 committed by GitHub
commit 7693b108ee
12 changed files with 199 additions and 645 deletions

View File

@ -1,7 +1,8 @@
from kademlia.network import Server
from nkms.crypto.constants import NOT_SIGNED
from nkms.crypto import api
from nkms.crypto.powers import CryptoPower, SigningKeypair
from nkms.crypto.constants import NOT_SIGNED, NO_DECRYPTION_PERFORMED
from nkms.crypto.powers import CryptoPower, SigningKeypair, EncryptingPower
from nkms.network.server import NuCypherDHTServer, NuCypherSeedOnlyDHTServer
@ -44,15 +45,19 @@ class Character(object):
"""
Can be called to sign something or used to express the signing public key as bytes.
"""
__call__ = self._crypto_power.sign
def __call__(seal_instance, *messages_to_sign):
return self._crypto_power.sign(*messages_to_sign)
def _as_tuple(seal):
return self._crypto_power.pubkey_sig_tuple()
def as_bytes(seal_instance):
def __iter__(seal):
yield from seal._as_tuple()
def __bytes__(seal):
return self._crypto_power.pubkey_sig_bytes()
def as_tuple(self_instance):
return self._crypto_power.pubkey_sig_tuple()
def __eq__(seal, other):
return other == seal._as_tuple() or other == bytes(seal)
self.seal = Seal()
@ -67,6 +72,10 @@ class Character(object):
else:
raise RuntimeError("Server hasn't been attached.")
@property
def name(self):
return self.__class__.__name__
def learn_about_actor(self, actor):
self._actor_mapping[actor.id()] = actor
@ -83,9 +92,9 @@ class Character(object):
:return: A tuple, (ciphertext, signature). If sign==False, then signature will be NOT_SIGNED.
"""
actor = self._lookup_actor(recipient)
pubkey_sign_id = actor.seal() # I don't even like this. I prefer .seal(), which
ciphertext = self._crypto_power.encrypt_for(pubkey_sign_id, cleartext)
ciphertext = self._crypto_power.encrypt_for(actor.public_key(EncryptingPower),
cleartext)
if sign:
if sign_cleartext:
signature = self.seal(cleartext)
@ -96,24 +105,34 @@ class Character(object):
return ciphertext, signature
def verify_from(self, actor_whom_sender_claims_to_be: str, signature: bytes,
*messages: bytes, decrypt=False,
signature_is_on_cleartext=True) -> tuple:
def verify_from(self, actor_whom_sender_claims_to_be: "Character", signature: bytes,
message: bytes, decrypt=False,
signature_is_on_cleartext=False) -> tuple:
"""
Inverse of encrypt_for.
:param actor_that_sender_claims_to_be: The str representation of the actor on this KeyStore
that the sender is claiming to be.
:param message:
:param decrypt:
:param signature_is_on_cleartext:
:return:
:param actor_that_sender_claims_to_be: A Character instance representing the actor whom the sender claims to be. We check the public key owned by this Character instance to verify.
:param messages: The messages to be verified.
:param decrypt: Whether or not to decrypt the messages.
:param signature_is_on_cleartext: True if we expect the signature to be on the cleartext. Otherwise, we presume that the ciphertext is what is signed.
:return: (Whether or not the signature is valid, the decrypted plaintext or NO_DECRYPTION_PERFORMED)
"""
cleartext = NO_DECRYPTION_PERFORMED
if signature_is_on_cleartext:
if decrypt:
cleartext = self._crypto_power.decrypt(message)
msg_digest = api.keccak_digest(cleartext)
else:
raise ValueError(
"Can't look for a signature on the cleartext if we're not decrypting.")
else:
msg_digest = api.keccak_digest(message)
actor = self._lookup_actor(actor_whom_sender_claims_to_be)
signature_pub_key = actor.seal.as_tuple() # TODO: and again, maybe in the real world this looks in KeyStore.
msg_digest = b"".join(api.keccak_digest(m) for m in messages) # This does work.
signature_pub_key = actor.seal
sig = api.ecdsa_load_sig(signature)
return api.ecdsa_verify(*sig, msg_digest, signature_pub_key)
return api.ecdsa_verify(*sig, msg_digest, signature_pub_key), cleartext
def _lookup_actor(self, actor: "Character"):
try:
@ -124,25 +143,27 @@ class Character(object):
def id(self):
return "whatever actor id ends up being - {}".format(id(self))
class Ursula(Character):
_server_class = NuCypherDHTServer
_default_crypto_powerups = [SigningKeypair]
def public_key(self, key_class):
try:
return self._crypto_power.public_keys[key_class]
except KeyError:
raise # TODO: Does it make sense to have a specialized exception here? Probably.
class Alice(Character):
_server_class = NuCypherSeedOnlyDHTServer
_default_crypto_powerups = [SigningKeypair]
_default_crypto_powerups = [SigningKeypair, EncryptingPower]
def find_best_ursula(self):
# TODO: Right now this just finds the nearest node and returns its ip and port. Make it do something useful.
return self.server.bootstrappableNeighbors()[0]
def generate_re_encryption_keys(self,
pubkey_enc_bob,
bob,
m,
n):
# TODO: Make this actually work.
pubkey_enc_bob = bob.seal # ??? We need Bob's enc key, not sig.
kfrags = [
'sfasdfsd9',
'dfasd09fi',
@ -150,3 +171,12 @@ class Alice(Character):
]
return kfrags
class Bob(Character):
_default_crypto_powerups = [SigningKeypair, EncryptingPower]
class Ursula(Character):
_server_class = NuCypherDHTServer
_default_crypto_powerups = [SigningKeypair, EncryptingPower]

View File

@ -1,11 +1,12 @@
import sha3
from random import SystemRandom
from npre import umbral
from npre import elliptic_curve
from nacl.secret import SecretBox
from typing import Tuple, Union, List
import sha3
from nacl.secret import SecretBox
from py_ecc.secp256k1 import N, privtopub, ecdsa_raw_recover, ecdsa_raw_sign
from npre import elliptic_curve
from npre import umbral
PRE = umbral.PRE()
SYSTEM_RAND = SystemRandom()

View File

@ -1 +1,2 @@
NOT_SIGNED = 445
NOT_SIGNED = 445
NO_DECRYPTION_PERFORMED = 455

View File

@ -1,217 +0,0 @@
import npre.elliptic_curve as ec
import sha3
from nacl.secret import SecretBox
from nacl.utils import random
from nkms.crypto.powers import EncryptingKeypair
from npre import umbral
class KeyStore(object):
"""
A vault of cryptographic keys owned by and stored on behalf of a discrete actor.
"""
def __init__(self, enc_privkey=None):
"""
Initializes a KeyStore object. Uses the private keys to initialize
their respective objects, if provided. If not, it will generate new
keypairs.
:param bytes sig_privkey: Private key in bytes of ECDSA signing keypair
:param bytes enc_privkey: Private key in bytes of encrypting keypair
"""
self.enc_keypair = EncryptingKeypair(enc_privkey)
self.pre = umbral.PRE()
self._key_mapping = {}
@property
def enc_pubkey(self):
return self.enc_keypair.pub_key
@property
def enc_privkey(self):
return self.enc_keypair.priv_key
def _split_path(self, path):
"""
Splits the file path provided and provides subpaths to each directory.
:param bytes path: Path to file
:return: Subpath(s) from path
:rtype: List of bytes
"""
# Hacky workaround: b'/'.split(b'/') == b['', b'']
if path == b'/':
return [b'']
dirs = path.split(b'/')
return [b'/'.join(dirs[:i + 1]) for i in range(len(dirs))]
def _derive_path_key(self, path, is_pub=True):
"""
Derives a key for the specific path.
:param bytes path: Path to generate the key for
:param bool is_pub: Is the derived key a public key?
:rtype: bytes
:return: Derived key
"""
priv_key = ec.serialize(self.enc_privkey)[1:]
key = sha3.keccak_256(priv_key + path).digest()
key_int = int.from_bytes(key, byteorder='big')
return self.pre.priv2pub(key_int) if is_pub else key
def encrypt(self, plaintext, path=None):
"""
Encrypts `plaintext` with a subkey made from the path, if provided.
:param bytes plaintext: Plaintext to encrypt
:param bytes path: Path to derive keys for the encrypted plaintext
:rtype: List(Tuple(enc_path_symm_key, enc_symm_key), ..)
:return: List of encrypted keys for each subpath in path, if provided
"""
if path:
subpaths = self._split_path(path)
path_keys = [self._derive_path_key(subpath) for subpath in subpaths]
enc_keys = []
for key in path_keys:
path_symm_key, enc_symm_key = self.generate_key(pubkey=key)
enc_keys.append(
(self.symm_encrypt(path_symm_key, plaintext), enc_symm_key))
return enc_keys
def decrypt(self, enc_path_key, enc_symm_key):
"""
Decrypts the encrypted path key `enc_path_key` with the `enc_symm_key`.
This works by decapsulating `enc_symm_key` and using it to decrypt
`enc_path_key`.
:param bytes enc_path_key: Encrypted key used to encrypt the data key
:param EncryptedKey enc_symm_key: Encrypted key used to encrypt path key
:rtype: bytes
:return: Plaintext of the enc_path_key
"""
dec_symm_key = self.decrypt_key(enc_symm_key)
return self.symm_decrypt(dec_symm_key, enc_path_key)
def generate_key(self, pubkey=None):
"""
Generates a raw symmetric key and its encrypted counterpart.
:param ec.Element pubkey: Pubkey to generate key for
:rtype: Tuple(bytes, EncryptedKey)
:return: Tuple containing raw encrypted key and the encrypted key
"""
symm_key, enc_symm_key = self.enc_keypair.generate_key(pubkey=pubkey)
return (symm_key, enc_symm_key)
def decrypt_key(self, enc_key, privkey=None):
"""
Decrypts an ECIES encrypted symmetric key.
:param EncryptedKey enc_key: ECIES encrypted key in bytes
:param bytes privkey: The privkey to decrypt with
:rtype: bytes
:return: Bytestring of the decrypted symmetric key
"""
return self.enc_keypair.decrypt_key(enc_key, privkey)
def rekey(self, privkey_a, pubkey_b):
"""
Generates a re-encryption key in interactive mode.
:param bytes privkey_a: Alive's private key
:param bytes pubkey_b: Bob's public key
:rtype: bytes
:return: Bytestring of a re-encryption key
"""
# Generate an ephemeral keypair
priv_e = self.enc_keypair.pre.gen_priv()
priv_e_bytes = ec.serialize(priv_e)[1:]
# Encrypt ephemeral key with an ECIES generated key
symm_key_bob, enc_symm_key_bob = self.enc_keypair.generate_key(
pubkey=pubkey_b)
enc_priv_e = self.symm_encrypt(symm_key_bob, priv_e_bytes)
reenc_key = self.enc_keypair.rekey(privkey_a, priv_e)
return (reenc_key, enc_symm_key_bob, enc_priv_e)
def reencrypt(self, reenc_key, ciphertext):
"""
Re-encrypts the provided ciphertext for the recipient of the generated
re-encryption key provided.
:param bytes reenc_key: The re-encryption key from the proxy to Bob
:param bytes ciphertext: The ciphertext to re-encrypt to Bob
:rtype: bytes
:return: Re-encrypted ciphertext
"""
return self.enc_keypair.reencrypt(reenc_key, ciphertext)
def gen_split_rekey(self, privkey_a, privkey_b, min_shares, num_shares):
"""
Generates secret shares that can be used to reconstruct data given
`min_shares` have been acquired.
:param bytes privkey_a: Alice's private key
:param bytes privkey_b: Bob's private key (or an ephemeral privkey)
:param int min_shares: Threshold shares needed to reconstruct secret
:param int num_shares: Total number of shares to create
:rtype: List(RekeyFrag)
:return: List of `num_shares` RekeyFrags
"""
return self.enc_keypair.split_rekey(privkey_a, privkey_b,
min_shares, num_shares)
def build_secret(self, shares):
"""
Reconstructs a secret from the given shares.
:param list shares: List of secret share fragments
:rtype: EncryptedKey
:return: EncrypedKey from `shares`
"""
# TODO: What to do if not enough shares, or invalid?
return self.enc_keypair.combine(shares)
def symm_encrypt(self, key, plaintext):
"""
Encrypts the plaintext using SecretBox symmetric encryption.
:param bytes key: Key to encrypt with
:param bytes plaintext: Plaintext to encrypt
:rtype: bytes
:return: Ciphertext from SecretBox symmetric encryption
"""
cipher = SecretBox(key)
return cipher.encrypt(plaintext)
def symm_decrypt(self, key, ciphertext):
"""
Decrypts the ciphertext using SecretBox symmetric decryption.
:param bytes key: Key to decrypt with
:param bytes ciphertext: Ciphertext from SecretBox encryption
:rtype: bytes
:return: Plaintext from SecretBox decryption
"""
cipher = SecretBox(key)
return cipher.decrypt(ciphertext)

View File

@ -1,11 +1,8 @@
from random import SystemRandom
from typing import Iterable, Union, List, Tuple
from py_ecc.secp256k1 import N, privtopub
from typing import Iterable, List, Tuple
from nkms.crypto import api as API
from nkms.keystore import keypairs
from npre import umbral
from nkms.keystore.keypairs import SigningKeypair as RealSigingKeypair, EncryptingKeypair
class PowerUpError(TypeError):
@ -48,23 +45,21 @@ class CryptoPower(object):
def pubkey_sig_bytes(self):
try:
return self._power_ups[
SigningKeypair].pubkey_bytes() # TODO: Turn this into an ID lookup on a KeyStore.
SigningKeypair].pub_key # TODO: Turn this into an ID lookup on a KeyStore.
except KeyError:
raise NoSigningPower
def pubkey_sig_tuple(self):
try:
return self._power_ups[
SigningKeypair].pub_key # TODO: Turn this into an ID lookup on a KeyStore.
return API.ecdsa_bytes2pub(self._power_ups[
SigningKeypair].pub_key) # TODO: Turn this into an ID lookup on a KeyStore.
except KeyError:
raise NoSigningPower
def sign(self, *messages):
"""
Signs a message and returns a signature with the keccak hash.
:param Iterable messages: Messages to sign in an iterable of bytes
:rtype: bytestring
:return: Signature of message
"""
@ -76,10 +71,18 @@ class CryptoPower(object):
return sig_keypair.sign(msg_digest)
def encrypt_for(self, pubkey_sign_id, cleartext):
def decrypt(self, ciphertext):
try:
enc_keypair = self._power_ups[EncryptingKeypair]
# TODO: Actually encrypt.
encrypting_power = self._power_ups[EncryptingPower]
return encrypting_power.decrypt(ciphertext)
except KeyError:
raise NoEncryptingPower
def encrypt_for(self, pubkey, cleartext):
try:
encrypting_power = self._power_ups[EncryptingPower]
ciphertext = encrypting_power.encrypt(cleartext, bytes(pubkey))
return ciphertext
except KeyError:
raise NoEncryptingPower
@ -94,19 +97,12 @@ class CryptoPowerUp(object):
class SigningKeypair(CryptoPowerUp):
confers_public_key = True
def __init__(self, privkey_bytes=None):
self.secure_rand = SystemRandom()
if privkey_bytes:
self.priv_key = privkey_bytes
else:
# Key generation is random([1, N - 1])
priv_number = self.secure_rand.randrange(1, N)
self.priv_key = priv_number.to_bytes(32, byteorder='big')
# Get the public component
self.pub_key = privtopub(self.priv_key)
def __init__(self, keypair=None): # TODO: Pretty much move this __init__ to SigningPower
def pubkey_bytes(self):
return b''.join(i.to_bytes(32, 'big') for i in self.pub_key)
self.real_keypair = keypair or RealSigingKeypair() # Total throwaway line - this will not be "real_keypair" because this will be in SigningPower
self.real_keypair.gen_privkey()
self.priv_key, self.pub_key = self.real_keypair.privkey, self.real_keypair.pubkey
def sign(self, msghash):
"""
@ -127,15 +123,17 @@ class SigningKeypair(CryptoPowerUp):
class EncryptingPower(CryptoPowerUp):
confers_public_key = True
KEYSIZE = 32
def __init__(self, keypair: keypairs.EncryptingKeypair):
def __init__(self, keypair: keypairs.EncryptingKeypair = None):
"""
Initalizes an EncryptingPower object for CryptoPower.
"""
self.keypair = keypair
self.priv_key = keypair.privkey
self.pub_key = keypair.pubkey
self.keypair = keypair or EncryptingKeypair()
self.priv_key = self.keypair.privkey
self.pub_key = self.keypair.pubkey
def _split_path(self, path: bytes) -> List[bytes]:
"""
@ -153,8 +151,8 @@ class EncryptingPower(CryptoPowerUp):
return [b'/'.join(dirs[:i + 1]) for i in range(len(dirs))]
def _derive_path_key(
self,
path: bytes,
self,
path: bytes,
) -> bytes:
"""
Derives a key for the specific path.
@ -269,3 +267,6 @@ class EncryptingPower(CryptoPowerUp):
dec_key = API.ecies_decapsulate(privkey, enc_key)
return API.symm_decrypt(dec_key, ciphertext)
def public_key(self):
return self.keypair.pubkey

View File

@ -1,19 +1,36 @@
from typing import Tuple
from nacl.secret import SecretBox
from nkms.crypto import api as API
from npre import umbral
from npre import elliptic_curve as ec
class EncryptingKeypair(object):
class Keypair(object):
public_only = False
def __init__(self, privkey: bytes = None, pubkey: bytes = None):
if privkey and pubkey:
self.privkey, self.pubkey = privkey, pubkey
elif not privkey and not pubkey:
# Neither key is provided; we'll generate.
self.gen_privkey(create_pubkey=True)
elif privkey and not pubkey:
# We have the privkey; use it to generate the pubkey.
self.privkey = privkey
self.pubkey = API.privtopub(privkey)
elif pubkey and not privkey:
# We have only the pubkey; this is a public-only pair.
self.public_only = True
class EncryptingKeypair(Keypair):
"""
An EncryptingKeypair that uses ECIES.
"""
def __init__(self, privkey: bytes = None, pubkey: bytes = None):
"""
Initializes an EncryptingKeypair object.
"""
self.privkey = privkey
self.pubkey = pubkey
# TODO: Generate KeyID as a keccak_digest of the pubkey.
def gen_privkey(self, create_pubkey: bool = True):
"""
Generates an ECIES secp256k1 private key.
@ -27,6 +44,36 @@ class EncryptingKeypair(object):
if create_pubkey:
self.pubkey = API.ecies_priv2pub(self.privkey)
def decrypt(self,
edata: Tuple[bytes, bytes],
privkey: bytes = None) -> bytes:
"""
Decrypt data encrypted by ECIES
edata = (ekey, edata)
ekey is needed to reconstruct a DH secret
edata encrypted by the block cipher
privkey is optional private key if we want to use something else
than what keypair uses
"""
if isinstance(edata[0], tuple) and isinstance(edata[1], tuple):
# In case it was re-encrypted data
return self.decrypt_reencrypted(edata)
ekey, edata = edata
# When it comes to decrypt(), ekey[1] is always None
# we could use that and save 2 bytes,
# but it makes the code less readable
ekey = umbral.EncryptedKey(
ekey=ec.deserialize(API.PRE.ecgroup, ekey[0]), re_id=ekey[1])
if privkey is None:
privkey = self._priv_key
else:
privkey = ec.deserialize(API.PRE.ecgroup, privkey)
key = self.pre.decapsulate(privkey, ekey)
cipher = SecretBox(key)
return cipher.decrypt(edata)
class SigningKeypair(object):
"""

View File

@ -1,6 +1,6 @@
import msgpack
from nkms.characters import Alice
from nkms.characters import Alice, Bob
from nkms.crypto import api
from nkms.policy.constants import UNKNOWN_KFRAG
@ -46,7 +46,7 @@ class PolicyManagerForAlice(PolicyManager):
return ursulas_and_results
def create_policy_group(self,
pubkey_enc_bob: tuple,
bob: Bob,
uri: bytes,
m: int,
n: int,
@ -56,18 +56,18 @@ class PolicyManagerForAlice(PolicyManager):
Alice dictates a new group of policies.
"""
re_enc_keys = self.owner.generate_re_encryption_keys(
pubkey_enc_bob,
bob,
m,
n)
policies = []
for kfrag_id, rekey in enumerate(re_enc_keys):
policy = Policy.from_alice(
rekey,
self.owner.seal.as_bytes(),
self.owner.seal,
)
policies.append(policy)
return PolicyGroup(uri, pubkey_enc_bob, policies)
return PolicyGroup(uri, bob, policies)
class PolicyGroup(object):
@ -77,9 +77,9 @@ class PolicyGroup(object):
_id = None
def __init__(self, uri, pubkey_enc_bob, policies=None):
def __init__(self, uri: str, bob: Bob, policies=None):
self.policies = policies or []
self.pubkey_enc_bob = pubkey_enc_bob
self.bob = bob
self.uri = uri
def transmit(self, networky_stuff):
@ -93,7 +93,7 @@ class PolicyGroup(object):
@property
def id(self):
if not self._id:
self._id = api.keccak_digest(self.uri, self.pubkey_enc_bob)
self._id = api.keccak_digest(self.uri, bytes(self.bob.seal))
return self._id
@ -122,7 +122,7 @@ class Policy(object):
:param challenge_size: The number of challenges to create in the ChallengePack.
"""
self.kfrag = kfrag
self.deterministic_id_portion = deterministic_id_portion
self.deterministic_id_portion = bytes(deterministic_id_portion)
self.random_id_portion = api.secure_random(32) # TOOD: Where do we actually want this to live?
self.challenge_size = challenge_size
self.treasure_map = []
@ -148,9 +148,9 @@ class Policy(object):
@staticmethod
def from_alice(kfrag,
pubkey_sig_alice,
alice_seal,
):
policy = Policy(kfrag, deterministic_id_portion=pubkey_sig_alice)
policy = Policy(kfrag, deterministic_id_portion=alice_seal)
policy.generate_challenge_pack()
return policy

View File

@ -3,13 +3,15 @@ import pytest
from nkms.characters import Alice, Ursula, Character
from nkms.crypto import api
from nkms.crypto.constants import NOT_SIGNED
from nkms.crypto.powers import CryptoPower, SigningKeypair, NoSigningPower, NoEncryptingPower, \
EncryptingKeypair
"""
SIGNING
"""
from nkms.crypto.constants import NO_DECRYPTION_PERFORMED
from nkms.crypto.powers import CryptoPower, SigningKeypair, NoSigningPower, NoEncryptingPower, \
EncryptingPower
def test_actor_without_signing_power_cannot_sign():
"""
@ -23,11 +25,11 @@ def test_actor_without_signing_power_cannot_sign():
with pytest.raises(NoSigningPower) as e_info:
non_signer.seal("something")
# ...or as a way to show the public key.
# ...or as a way to cast the public key to bytes or tuple.
with pytest.raises(NoSigningPower) as e_info:
non_signer.seal.as_bytes()
bytes(non_signer.seal)
with pytest.raises(NoSigningPower) as e_info:
non_signer.seal.as_tuple()
tuple(non_signer.seal)
def test_actor_with_signing_power_can_sign():
@ -46,7 +48,7 @@ def test_actor_with_signing_power_can_sign():
# ...or to get the signer's public key for verification purposes.
sig = api.ecdsa_load_sig(signature)
verification = api.ecdsa_verify(*sig, api.keccak_digest(message), seal_of_the_signer.as_tuple())
verification = api.ecdsa_verify(*sig, api.keccak_digest(message), seal_of_the_signer)
assert verification is True
@ -70,8 +72,13 @@ def test_anybody_can_verify():
signature = alice.seal(message)
# Our everyman can verify it.
verification = somebody.verify_from(alice, signature, message)
verification, cleartext = somebody.verify_from(alice, signature, message, decrypt=False)
assert verification is True
assert cleartext is NO_DECRYPTION_PERFORMED
# If we pass the signature and message backwards, we get TypeError.
# with pytest.raises(TypeError):
# verification = somebody.verify_from(alice, message, signature)
"""
@ -106,7 +113,7 @@ def test_character_with_encrypting_power_can_encrypt():
"""
Now, a Character *with* EncryptingKeyPair can encrypt.
"""
can_sign_and_encrypt = Character(crypto_power_ups=[SigningKeypair, EncryptingKeypair])
can_sign_and_encrypt = Character(crypto_power_ups=[SigningKeypair, EncryptingPower])
ursula = Ursula()
can_sign_and_encrypt.learn_about_actor(ursula)
@ -116,4 +123,4 @@ def test_character_with_encrypting_power_can_encrypt():
ciphertext, signature = can_sign_and_encrypt.encrypt_for(ursula, cleartext, sign=False)
assert signature == NOT_SIGNED
assert ciphertext is not None # annnd fail.
assert ciphertext is not None # annnd fail.

View File

@ -327,3 +327,7 @@ class TestCrypto(unittest.TestCase):
reenc_key = api.ecies_reencrypt(rk_eb, enc_key)
dec_key = api.ecies_decapsulate(self.privkey_b, reenc_key)
self.assertEqual(plain_key, dec_key)
def test_alpha_is_resolved(self):
with self.assertRaises(ImportError):
from nkms.crypto import _alpha

View File

@ -1,101 +0,0 @@
import random
import unittest
import sha3
from nkms.crypto import api
from nkms.crypto.powers import SigningKeypair, EncryptingKeypair
class TestSigningKeypair(unittest.TestCase):
def setUp(self):
self.keypair_a = SigningKeypair()
self.keypair_b = SigningKeypair()
self.msg = b'this is a test'
def test_signing(self):
msg_digest = sha3.keccak_256(self.msg).digest()
signature = self.keypair_a.sign(msg_digest)
sig = api.ecdsa_load_sig(signature)
self.assertEqual(tuple, type(sig))
self.assertEqual(int, type(sig[0])) # Check v
self.assertEqual(int, type(sig[1])) # Check r
self.assertEqual(int, type(sig[2])) # Check s
def test_verification(self):
msg_digest = sha3.keccak_256(self.msg).digest()
signature = self.keypair_a.sign(msg_digest)
sig = api.ecdsa_load_sig(signature)
self.assertEqual(int, type(sig[0])) # Check v
self.assertEqual(int, type(sig[1])) # Check r
self.assertEqual(int, type(sig[2])) # Check s
verify_sig = api.ecdsa_verify(*sig, msg_digest,
self.keypair_a.pub_key)
self.assertTrue(verify_sig)
def test_digest(self):
digest_a = api.keccak_digest(b'foo', b'bar')
digest_b = api.keccak_digest(b'foobar')
self.assertEqual(digest_a, digest_b)
class TestEncryptingKeypair(unittest.TestCase):
def setUp(self):
self.send_keypair = EncryptingKeypair()
self.recv_keypair = EncryptingKeypair()
self.msg = b'this is a test'
def test_key_gen(self):
raw_symm, enc_symm = self.send_keypair.generate_key()
self.assertEqual(32, len(raw_symm))
self.assertTrue(raw_symm != enc_symm)
def test_key_decryption(self):
raw_symm, enc_symm = self.send_keypair.generate_key()
self.assertEqual(32, len(raw_symm))
self.assertTrue(raw_symm != enc_symm)
dec_symm_key = self.send_keypair.decrypt_key(enc_symm)
self.assertEqual(32, len(dec_symm_key))
self.assertTrue(raw_symm == dec_symm_key)
def test_reencryption(self):
raw_symm, enc_symm = self.send_keypair.generate_key()
self.assertEqual(32, len(raw_symm))
self.assertTrue(raw_symm != enc_symm)
rekey_ab = self.send_keypair.rekey(self.send_keypair.priv_key,
self.recv_keypair.priv_key)
reenc_key = self.send_keypair.reencrypt(rekey_ab, enc_symm)
self.assertTrue(reenc_key != enc_symm)
dec_key = self.recv_keypair.decrypt_key(reenc_key)
self.assertEqual(32, len(dec_key))
self.assertTrue(dec_key == raw_symm)
def test_split_rekey(self):
raw_symm, enc_symm = self.send_keypair.generate_key()
self.assertEqual(32, len(raw_symm))
self.assertTrue(raw_symm != enc_symm)
enc_shares = self.send_keypair.split_rekey(self.send_keypair.priv_key,
self.recv_keypair.priv_key,
4, 10)
self.assertEqual(10, len(enc_shares))
rand_shares = random.sample(enc_shares, 4)
self.assertEqual(4, len(rand_shares))
frags = [self.send_keypair.reencrypt(rk, enc_symm) for rk in rand_shares]
self.assertEqual(4, len(frags))
enc_key = self.recv_keypair.combine(frags)
self.assertTrue(raw_symm != enc_key)
self.assertTrue(enc_symm != enc_key)
dec_key = self.recv_keypair.decrypt_key(enc_key)
self.assertTrue(dec_key == raw_symm)

View File

@ -1,196 +0,0 @@
import random
import unittest
import msgpack
import npre.elliptic_curve as ec
from nkms.crypto import api
from nkms.keystore.keystore import KeyStore
from nkms.crypto.powers import CryptoPower, SigningKeypair
class TestKeyRing(unittest.TestCase):
def setUp(self):
self.power_of_signing = CryptoPower(power_ups=[SigningKeypair])
self.keyring_a = KeyStore()
self.keyring_b = KeyStore()
self.msg = b'this is a test'
def test_signing(self):
signature = self.power_of_signing.sign(self.msg)
sig = api.ecdsa_load_sig(signature)
self.assertTrue(int, type(sig[0])) # Check v
self.assertTrue(int, type(sig[1])) # Check r
self.assertTrue(int, type(sig[2])) # Check s
def test_verification(self):
signature = self.power_of_signing.sign(self.msg)
sig = api.ecdsa_load_sig(signature)
self.assertTrue(int, type(sig[0])) # Check v
self.assertTrue(int, type(sig[1])) # Check r
self.assertTrue(int, type(sig[2])) # Check s
msghash = api.keccak_digest(self.msg)
is_valid = api.ecdsa_verify(*sig, msghash,
pubkey=self.power_of_signing.pubkey_sig_tuple())
self.assertTrue(is_valid)
def test_key_generation(self):
raw_key, enc_key = self.keyring_a.generate_key()
self.assertEqual(32, len(raw_key))
self.assertTrue(raw_key != enc_key)
def test_key_decryption(self):
raw_key, enc_key = self.keyring_a.generate_key()
self.assertEqual(32, len(raw_key))
self.assertTrue(raw_key != enc_key)
dec_key = self.keyring_a.decrypt_key(enc_key)
self.assertTrue(32, len(dec_key))
self.assertTrue(raw_key == dec_key)
def test_rekey_and_reencryption(self):
# Generate random key for Alice's data
symm_key_alice, enc_symm_key_alice = self.keyring_a.generate_key()
# Generate the re-encryption key and the ephemeral key data
reenc_key, enc_symm_key_bob, enc_priv_e = self.keyring_a.rekey(
self.keyring_a.enc_privkey,
self.keyring_b.enc_pubkey)
# Re-encrypt Alice's symm key
enc_symm_key_ab = self.keyring_a.reencrypt(reenc_key,
enc_symm_key_alice)
# Decapsulate the ECIES encrypted key
dec_symm_key_bob = self.keyring_b.decrypt_key(enc_symm_key_bob)
self.assertEqual(32, len(dec_symm_key_bob))
# Decrypt the ephemeral private key
dec_priv_e = self.keyring_b.symm_decrypt(dec_symm_key_bob, enc_priv_e)
self.assertEqual(32, len(dec_priv_e))
dec_priv_e = int.from_bytes(dec_priv_e, byteorder='big')
dec_key = self.keyring_b.decrypt_key(enc_symm_key_ab,
privkey=dec_priv_e)
self.assertEqual(dec_key, symm_key_alice)
def test_split_key_sharing(self):
raw_key, enc_key = self.keyring_a.generate_key()
self.assertTrue(32, len(raw_key))
shares = self.keyring_a.gen_split_rekey(self.keyring_a.enc_privkey,
self.keyring_b.enc_privkey,
4, 10)
self.assertEqual(10, len(shares))
rand_shares = random.sample(shares, 4)
self.assertEqual(4, len(rand_shares))
frags = [self.keyring_a.reencrypt(rk, enc_key) for rk in rand_shares]
self.assertEqual(4, len(frags))
split_enc_key = self.keyring_b.build_secret(frags)
self.assertTrue(raw_key != split_enc_key)
self.assertTrue(enc_key != split_enc_key)
dec_key = self.keyring_b.decrypt_key(split_enc_key)
self.assertEqual(32, len(dec_key))
self.assertTrue(dec_key == raw_key)
def test_symm_encryption(self):
key = api.secure_random(32)
self.assertEqual(32, len(key))
ciphertext = self.keyring_a.symm_encrypt(key, self.msg)
self.assertTrue(self.msg not in ciphertext)
def test_symm_decryption(self):
key = api.secure_random(32)
self.assertEqual(32, len(key))
ciphertext = self.keyring_a.symm_encrypt(key, self.msg)
self.assertTrue(self.msg not in ciphertext)
plaintext = self.keyring_a.symm_decrypt(key, ciphertext)
self.assertTrue(self.msg == plaintext)
def test_split_path(self):
subpaths = self.keyring_a._split_path(b'/foo/bar')
self.assertEqual(3, len(subpaths))
self.assertTrue(b'' in subpaths)
self.assertTrue(b'/foo' in subpaths)
self.assertTrue(b'/foo/bar' in subpaths)
subpaths = self.keyring_a._split_path(b'foobar')
self.assertEqual(1, len(subpaths))
self.assertTrue(b'foobar' in subpaths)
subpaths = self.keyring_a._split_path(b'')
self.assertEqual(1, len(subpaths))
self.assertTrue(b'' in subpaths)
def test_derive_path_key(self):
return
path = b'/foo/bar'
path_priv_key = self.keyring_a._derive_path_key(path, is_pub=False)
self.assertEqual(32, len(path_priv_key))
path_pub_key = self.keyring_a._derive_path_key(path)
self.assertEqual(32, len(path_pub_key))
path_priv_key_int = int.from_bytes(path_priv_key, byteorder='big')
verify_path_key = self.keyring_a.pre.priv2pub(path_priv_key_int)
# TODO: Figure out why this returns 34 chars
verify_path_key = ec.serialize(verify_path_key)[2:]
self.assertEqual(32, len(verify_path_key))
self.assertEqual(path_priv_key, path_priv_key)
def test_encrypt_decrypt_reencrypt(self):
plaintext = b'test'
path = b'/'
enc_keys = self.keyring_a.encrypt(plaintext, path=path)
self.assertEqual(1, len(enc_keys))
self.assertEqual(2, len(enc_keys[0]))
path_priv_a = self.keyring_a._derive_path_key(b'', is_pub=False)
path_priv_a = int.from_bytes(path_priv_a, byteorder='big')
rk_ab, enc_symm_key_bob, enc_priv_e = self.keyring_a.rekey(
path_priv_a, self.keyring_b.enc_pubkey)
enc_path_key, enc_path_symm_key = enc_keys[0]
reenc_path_symm_key = self.keyring_a.reencrypt(rk_ab, enc_path_symm_key)
priv_e = self.keyring_b.decrypt(enc_priv_e, enc_symm_key_bob)
priv_e = int.from_bytes(priv_e, byteorder='big')
keyring_e = KeyStore(enc_privkey=priv_e)
dec_key = keyring_e.decrypt(enc_path_key, reenc_path_symm_key)
self.assertEqual(plaintext, dec_key)
def test_encrypt_decrypt(self):
plaintext = b'test'
path = b'/'
enc_keys = self.keyring_a.encrypt(plaintext, path=path)
self.assertEqual(1, len(enc_keys))
self.assertEqual(2, len(enc_keys[0]))
path_priv_a = self.keyring_a._derive_path_key(b'', is_pub=False)
path_priv_a = int.from_bytes(path_priv_a, byteorder='big')
keyring_a_path = KeyStore(enc_privkey=path_priv_a)
dec_key = keyring_a_path.decrypt(*enc_keys[0])
self.assertEqual(plaintext, dec_key)
def test_secure_random(self):
length = random.randrange(1, 100)
rand_bytes = api.secure_random(length)
self.assertEqual(length, len(rand_bytes))

View File

@ -3,30 +3,12 @@ import datetime
import pytest
from nkms import crypto
from nkms.characters import Ursula, Alice, Character
from nkms.characters import Ursula, Alice, Character, Bob
from nkms.crypto import api
from nkms.crypto.encrypting_keypair import EncryptingKeypair
from nkms.keystore.keystore import KeyStore
from nkms.policy.constants import NON_PAYMENT
from nkms.policy.models import PolicyManagerForAlice, PolicyOffer, TreasureMap, PolicyGroup
class MockEncryptingPair(object):
def encrypt(self, cleartext):
pass
def decrypt(selfs, ciphertext):
pass
class MockUrsula(object):
def encrypt_for(self, payload):
# TODO: Make this a testable result
import random
return random.getrandbits(32)
class MockPolicyOfferResponse(object):
was_accepted = True
@ -36,23 +18,16 @@ class MockNetworkyStuff(object):
return MockPolicyOfferResponse()
def find_ursula(self, id, hashed_part):
return MockUrsula()
return Ursula()
class MockTreasureMap(TreasureMap):
pass
def test_complete_treasure_map_flow():
def test_treasure_map_from_alice_to_ursula_to_bob():
"""
Shows that Alice can share a TreasureMap with Ursula and that Bob can receive and decrypt it.
"""
keyring_alice = KeyStore()
bob_encrypting_keypair = EncryptingKeypair()
keyring_ursula = KeyStore()
alice, ursula, event_loop = test_alice_finds_ursula()
bob = Bob()
alice.learn_about_actor(bob)
bob.learn_about_actor(alice)
_discovered_ursula_ip, discovered_ursula_port = alice.find_best_ursula()
@ -60,25 +35,27 @@ def test_complete_treasure_map_flow():
for i in range(50):
treasure_map.nodes.append(api.secure_random(50))
encrypted_treasure_map = bob_encrypting_keypair.encrypt(treasure_map.packed_payload())
signature = "THIS IS A SIGNATURE"
encrypted_treasure_map, signature = alice.encrypt_for(bob, treasure_map.packed_payload())
# For example, a hashed path.
resource_id = b"as098duasdlkj213098asf"
policy_group = PolicyGroup(resource_id, bob_encrypting_keypair.pub_key)
policy_group = PolicyGroup(resource_id, bob)
setter = alice.server.set(policy_group.id, encrypted_treasure_map)
event_loop.run_until_complete(setter)
treasure_map_as_set_on_network = list(ursula.server.storage.items())[0][1]
treasure_map_as_decrypted_by_bob = bob_encrypting_keypair.decrypt(
treasure_map_as_set_on_network)
verified, treasure_map_as_decrypted_by_bob = bob.verify_from(alice, signature,
treasure_map_as_set_on_network,
decrypt=True,
signature_is_on_cleartext=True,
)
assert treasure_map_as_decrypted_by_bob == treasure_map.packed_payload()
assert verified is True
def test_alice_has_ursulas_public_key_and_uses_it_to_encode_policy_payload():
alice = Alice()
keychain_bob = KeyStore()
keychain_ursula = KeyStore()
bob = Bob()
# For example, a hashed path.
resource_id = b"as098duasdlkj213098asf"
@ -94,7 +71,7 @@ def test_alice_has_ursulas_public_key_and_uses_it_to_encode_policy_payload():
policy_manager = PolicyManagerForAlice(alice)
policy_group = policy_manager.create_policy_group(
keychain_bob.enc_keypair.pub_key,
bob,
resource_id,
m=20,
n=50,