Merge pull request #74 from tuxxy/crypto-api

[WIP] Implement a low-level, frozen ECIES PRE Cryptographic API w/ serialization
pull/76/head
Tux 2017-10-10 21:20:25 -06:00 committed by GitHub
commit 453ce8d1b3
13 changed files with 592 additions and 341 deletions

View File

@ -1,7 +1,6 @@
from kademlia.network import Server
from nkms.crypto.constants import NOT_SIGNED
from nkms.crypto import crypto as Crypto
from nkms.crypto.hash import signature_hash
from nkms.crypto import api
from nkms.crypto.powers import CryptoPower, SigningKeypair
from nkms.network.server import NuCypherDHTServer, NuCypherSeedOnlyDHTServer
@ -112,8 +111,9 @@ class Character(object):
"""
actor = self._lookup_actor(actor_whom_sender_claims_to_be)
signature_pub_key = actor.seal.as_tuple() # TODO: and again, maybe in the real world this looks in KeyStore.
msg_digest = b"".join(signature_hash(m) for m in messages) # This does work.
return Crypto.verify(signature, msg_digest, signature_pub_key)
msg_digest = b"".join(api.keccak_digest(m) for m in messages) # This does work.
sig = api.ecdsa_load_sig(signature)
return api.ecdsa_verify(*sig, msg_digest, signature_pub_key)
def _lookup_actor(self, actor: "Character"):
try:

View File

@ -1,41 +1,203 @@
import os
from typing import Tuple, Union, List
import msgpack
import sha3
from nacl.secret import SecretBox
from py_ecc.secp256k1.secp256k1 import ecdsa_raw_recover
from npre import elliptic_curve
from random import SystemRandom
from npre import umbral
from npre import elliptic_curve
from nacl.secret import SecretBox
from typing import Tuple, Union, List
from py_ecc.secp256k1 import N, privtopub, ecdsa_raw_recover, ecdsa_raw_sign
PRE = umbral.PRE()
SYSTEM_RAND = SystemRandom()
def priv_bytes2ec(
privkey: bytes
) -> elliptic_curve.ec_element:
def secure_random(
num_bytes: int
) -> bytes:
"""
Turns a private key, in bytes, into an elliptic_curve.ec_element.
Returns an amount `num_bytes` of data from the OS's random device.
If a randomness source isn't found, returns a `NotImplementedError`.
In this case, a secure random source most likely doesn't exist and
randomness will have to found elsewhere.
:param privkey: Private key to turn into an elliptic_curve.ec_element.
:param num_bytes: Number of bytes to return.
:return: elliptic_curve.ec_element
:return: bytes
"""
return elliptic_curve.deserialize(PRE.ecgroup, b'\x00' + privkey)
# TODO: Should we just use os.urandom or avoid the import w/ this?
return SYSTEM_RAND.getrandbits(num_bytes*8).to_bytes(num_bytes,
byteorder='big')
def pub_bytes2ec(
pubkey: bytes,
) -> elliptic_curve.ec_element:
def secure_random_range(
min: int,
max: int
) -> int:
"""
Turns a public key, in bytes, into an elliptic_curve.ec_element.
Returns a number from a secure random source betwee the range of
`min` and `max` - 1.
:param pubkey: Public key to turn into an elliptic_curve.ec_element.
:param min: Minimum number in the range
:param max: Maximum number in the range
:return: elliptic_curve.ec_element
:return: int
"""
return elliptic_curve.deserialize(PRE.ecgroup, b'\x01' + pubkey)
return SYSTEM_RAND.randrange(min, max)
def keccak_digest(
*messages: bytes
) -> bytes:
"""
Accepts an iterable containing bytes and digests it returning a
Keccak digest of 32 bytes (keccak_256).
:param bytes *messages: Data to hash
:rtype: bytes
:return: bytestring of digested data
"""
hash = sha3.keccak_256()
for message in messages:
hash.update(message)
return hash.digest()
def ecdsa_pub2bytes(
pubkey: Tuple[int]
) -> bytes:
"""
Takes an ECDSA public key and converts to bytes.
:param pubkey: Tuple[int] of Public Key
:return: bytestring of Public key
"""
x = pubkey[0].to_bytes(32, byteorder='big')
y = pubkey[1].to_bytes(32, byteorder='big')
return x+y
def ecdsa_bytes2pub(
pubkey: bytes
) -> Tuple[int]:
"""
Takes a byte encoded ECDSA public key and converts to a Tuple of x, and y
:param pubkey: Byte encoded public key
:return: Tuple[int] of Public Key
"""
x = int.from_bytes(pubkey[:32], byteorder='big')
y = int.from_bytes(pubkey[32:], byteorder='big')
return (x, y)
def ecdsa_gen_priv() -> bytes:
"""
Generates an ECDSA Private Key.
:return: Byte encoded ECDSA privkey
"""
privkey = secure_random_range(1, N)
return privkey.to_bytes(32, byteorder='big')
def ecdsa_priv2pub(
privkey: bytes,
to_bytes: bool = True
) -> Union[bytes, Tuple[int]]:
"""
Returns the public component of an ECDSA private key.
:param privkey: Private key as an int or bytestring
:param to_bytes: Serialize to bytes or not?
:return: Byte encoded or Tuple[int] ECDSA pubkey
"""
pubkey = privtopub(privkey)
if to_bytes:
return ecdsa_pub2bytes(pubkey)
return pubkey
def ecdsa_gen_sig(
v: int,
r: int,
s: int
) -> bytes:
"""
Generates an ECDSA signature, in bytes.
:param v: v of sig
:param r: r of sig
:param s: s of sig
:return: bytestring of v, r, and s
"""
v = v.to_bytes(1, byteorder='big')
r = r.to_bytes(32, byteorder='big')
s = s.to_bytes(32, byteorder='big')
return v+r+s
def ecdsa_load_sig(
signature: bytes
) -> Tuple[int]:
"""
Loads an ECDSA signature, from a bytestring, to a tuple.
:param signature: Signature, as a bytestring, of v, r, and s.
:return: Tuple(v, r, s)
"""
v = int.from_bytes(signature[:1], byteorder='big')
r = int.from_bytes(signature[1:33], byteorder='big')
s = int.from_bytes(signature[33:], byteorder='big')
return (v, r, s)
def ecdsa_sign(
msghash: bytes,
privkey: bytes
) -> Tuple[int]:
"""
Accepts a hashed message and signs it with the private key given.
:param msghash: Hashed message to sign
:param privkey: Private key to sign with
:return: Tuple(v, r, s)
"""
v, r, s = ecdsa_raw_sign(msghash, privkey)
return (v, r, s)
def ecdsa_verify(
v: int,
r: int,
s: int,
msghash: bytes,
pubkey: Union[bytes, Tuple[int]]
) -> bool:
"""
Takes a v, r, s, a pubkey, and a hash of a message to verify via ECDSA.
:param v: V of sig
:param r: R of sig
:param s: S of sig
:param bytes msghash: The hashed message to verify
:param bytes pubkey: Pubkey to validate signature for
:rtype: Boolean
:return: Is the signature valid or not?
"""
if bytes == type(pubkey):
pubkey = ecdsa_bytes2pub(pubkey)
verify_sig = ecdsa_raw_recover(msghash, (v, r, s))
# TODO: Should this equality test be done better?
return verify_sig == pubkey
def symm_encrypt(
@ -70,6 +232,32 @@ def symm_decrypt(
return cipher.decrypt(ciphertext)
def priv_bytes2ec(
privkey: bytes
) -> elliptic_curve.ec_element:
"""
Turns a private key, in bytes, into an elliptic_curve.ec_element.
:param privkey: Private key to turn into an elliptic_curve.ec_element.
:return: elliptic_curve.ec_element
"""
return elliptic_curve.deserialize(PRE.ecgroup, b'\x00' + privkey)
def pub_bytes2ec(
pubkey: bytes,
) -> elliptic_curve.ec_element:
"""
Turns a public key, in bytes, into an elliptic_curve.ec_element.
:param pubkey: Public key to turn into an elliptic_curve.ec_element.
:return: elliptic_curve.ec_element
"""
return elliptic_curve.deserialize(PRE.ecgroup, b'\x01' + pubkey)
def ecies_gen_priv(
to_bytes: bool = True
) -> Union[bytes, elliptic_curve.ec_element]:
@ -221,64 +409,3 @@ def ecies_reencrypt(
if type(enc_key) == bytes:
enc_key = umbral.EncryptedKey(priv_bytes2ec(enc_key), None)
return PRE.reencrypt(rekey, enc_key)
def vrs_msgpack_dump(v, r, s):
v_bytes = v.to_bytes(1, byteorder='big')
r_bytes = r.to_bytes(32, byteorder='big')
s_bytes = s.to_bytes(32, byteorder='big')
return msgpack.dumps((v_bytes, r_bytes, s_bytes))
def vrs_msgpack_load(msgpack_vrs):
sig = msgpack.loads(msgpack_vrs)
v = int.from_bytes(sig[0], byteorder='big')
r = int.from_bytes(sig[1], byteorder='big')
s = int.from_bytes(sig[2], byteorder='big')
return (v, r, s)
def digest(*messages):
"""
Accepts an iterable containing bytes and digests it.
:param bytes *args: Data to hash
:rtype: bytes
:return: bytestring of digested data
"""
hash = sha3.keccak_256()
for message in messages:
hash.update(message)
return hash.digest()
def verify(signature, msghash, pubkey=None):
"""
Takes a msgpacked signature and verifies the message.
:param bytes msghash: The hashed message to verify
:param bytes signature: The msgpacked signature (v, r, and s)
:param bytes pubkey: Pubkey to validate signature for
Default is the keypair's pub_key.
:rtype: Boolean
:return: Is the signature valid or not?
"""
sig = vrs_msgpack_load(signature)
# Generate the public key from the signature and validate
# TODO: Look into fixed processing time functions for comparison
verify_sig = ecdsa_raw_recover(msghash, sig)
return verify_sig == pubkey
def secure_random(num_bytes: int) -> bytes:
"""
Returns an amount `num_bytes` of data from the OS's random device.
If a randomness source isn't found, returns a `NotImplementedError`.
In this case, a secure random source most likely doesn't exist and
randomness will have to found elsewhere.
:param num_bytes: Number of bytes to return.
:return: bytes
"""
return os.urandom(num_bytes)

View File

@ -1,11 +0,0 @@
import sha3
def content_hash(*hash_inputs):
hash = sha3.keccak_256()
for hash_input in hash_inputs:
hash.update(hash_input)
return hash.digest()
signature_hash = content_hash

View File

@ -3,8 +3,7 @@ from typing import Iterable
from py_ecc.secp256k1 import N, privtopub, ecdsa_raw_sign, ecdsa_raw_recover
from nkms.crypto import crypto as Crypto
from nkms.crypto.hash import signature_hash
from nkms.crypto import api
from nkms.crypto.keypairs import EncryptingKeypair
@ -68,7 +67,7 @@ class CryptoPower(object):
sig_keypair = self._power_ups[SigningKeypair]
except KeyError:
raise NoSigningPower
msg_digest = b"".join(signature_hash(m) for m in messages)
msg_digest = b"".join(api.keccak_digest(m) for m in messages)
return sig_keypair.sign(msg_digest)
@ -115,8 +114,8 @@ class SigningKeypair(CryptoPowerUp):
:rtype: Bytestring
:return: Msgpacked bytestring of v, r, and s (the signature)
"""
v, r, s = ecdsa_raw_sign(msghash, self.priv_key)
return Crypto.vrs_msgpack_dump(v, r, s)
v, r, s = api.ecdsa_sign(msghash, self.priv_key)
return api.ecdsa_gen_sig(v, r, s)
def public_key(self):
return self.pub_key

View File

@ -1,5 +1,5 @@
import msgpack
from nkms.crypto import crypto as Crypto
from nkms.crypto import api
from npre.bbs98 import PRE as BasePRE
@ -24,7 +24,7 @@ class PRE(BasePRE):
return super(PRE, self).priv2pub(priv)
def rekey(self, priv1, pub2):
priv_to = Crypto.secure_random(self.KEY_SIZE)
priv_to = api.secure_random(self.KEY_SIZE)
rk = super(PRE, self).rekey(
convert_priv(priv1), convert_priv(priv_to), dtype=bytes)
epriv_to = self.encrypt(pub2, priv_to)

View File

@ -1,8 +1,7 @@
import msgpack
from nkms.characters import Alice
from nkms.crypto import crypto as Crypto
from nkms.crypto.hash import content_hash
from nkms.crypto import api
from nkms.policy.constants import UNKNOWN_KFRAG
@ -94,7 +93,7 @@ class PolicyGroup(object):
@property
def id(self):
if not self._id:
self._id = content_hash(self.uri, self.pubkey_enc_bob)
self._id = api.keccak_digest(self.uri, self.pubkey_enc_bob)
return self._id
@ -124,7 +123,7 @@ class Policy(object):
"""
self.kfrag = kfrag
self.deterministic_id_portion = deterministic_id_portion
self.random_id_portion = Crypto.secure_random(32) # TOOD: Where do we actually want this to live?
self.random_id_portion = api.secure_random(32) # TOOD: Where do we actually want this to live?
self.challenge_size = challenge_size
self.treasure_map = []
@ -140,10 +139,10 @@ class Policy(object):
def set_id(self):
if self.deterministic_id_portion:
self._id = "{}-{}".format(content_hash(*[str(d).encode() for d in self.deterministic_id_portion], self.random_id_portion),
content_hash(self.random_id_portion))
self._id = "{}-{}".format(api.keccak_digest(*[str(d).encode() for d in self.deterministic_id_portion], self.random_id_portion),
api.keccak_digest(self.random_id_portion))
else:
self._id = content_hash(self.random_id_portion)
self._id = api.keccak_digest(self.random_id_portion)
@ -158,9 +157,9 @@ class Policy(object):
def hash(self, pubkey_sig_alice, hash_input):
self.hashed_part = content_hash(hash_input)
self.hashed_part = api.keccak_digest(hash_input)
hash_input_for_id = str(pubkey_sig_alice).encode() + str(self.hashed_part).encode()
self._id = content_hash(hash_input_for_id)
self._id = api.keccak_digest(hash_input_for_id)
return self._id
def generate_challenge_pack(self):

View File

@ -1,7 +1,7 @@
import pytest
from nkms.characters import Alice, Ursula, Character
from nkms.crypto import crypto as Crypto
from nkms.crypto import api
from nkms.crypto.powers import CryptoPower, SigningKeypair, NoSigningPower, NoEncryptingPower
@ -39,7 +39,8 @@ def test_actor_with_signing_power_can_sign():
signature = seal_of_the_signer(message)
# ...or to get the signer's public key for verification purposes.
verification = Crypto.verify(signature, Crypto.digest(message), seal_of_the_signer.as_tuple())
sig = api.ecdsa_load_sig(signature)
verification = api.ecdsa_verify(*sig, api.keccak_digest(message), seal_of_the_signer.as_tuple())
assert verification is True

329
tests/crypto/test_api.py Normal file
View File

@ -0,0 +1,329 @@
import unittest
import random
import sha3
from nacl.utils import EncryptedMessage
from npre import umbral
from npre import elliptic_curve as ec
from nkms.crypto import api
class TestCrypto(unittest.TestCase):
def setUp(self):
self.pre = umbral.PRE()
self.privkey_a = self.pre.gen_priv()
self.privkey_a_bytes = ec.serialize(self.privkey_a)[1:]
self.privkey_b = self.pre.gen_priv()
self.privkey_b_bytes = ec.serialize(self.privkey_b)[1:]
self.pubkey_a = self.pre.priv2pub(self.privkey_a)
self.pubkey_b = self.pre.priv2pub(self.privkey_b)
self.pubkey_a_bytes = ec.serialize(self.pubkey_a)[1:]
self.pubkey_b_bytes = ec.serialize(self.pubkey_b)[1:]
def test_secure_random(self):
rand1 = api.secure_random(10)
rand2 = api.secure_random(10)
self.assertNotEqual(rand1, rand2)
self.assertEqual(bytes, type(rand1))
self.assertEqual(bytes, type(rand2))
self.assertEqual(10, len(rand1))
self.assertEqual(10, len(rand2))
def test_secure_random_range(self):
output = [api.secure_random_range(1, 3) for _ in range(20)]
# Test that highest output can be max-1
self.assertNotIn(3, output)
# Test that min is present
output = [api.secure_random_range(1, 2) for _ in range(20)]
self.assertNotIn(2, output)
self.assertIn(1, output)
def test_keccak_digest(self):
data = b'this is a test'
digest1 = sha3.keccak_256(data).digest()
digest2 = api.keccak_digest(data)
self.assertEqual(digest1, digest2)
# Test iterables
data = data.split()
digest1 = sha3.keccak_256(b''.join(data)).digest()
digest2 = api.keccak_digest(*data)
self.assertEqual(digest1, digest2)
def test_ecdsa_pub2bytes(self):
privkey = api.ecdsa_gen_priv()
self.assertEqual(32, len(privkey))
pubkey = api.ecdsa_priv2pub(privkey, to_bytes=False)
self.assertEqual(tuple, type(pubkey))
self.assertEqual(2, len(pubkey))
self.assertEqual(int, type(pubkey[0]))
self.assertEqual(int, type(pubkey[1]))
pubkey = api.ecdsa_pub2bytes(pubkey)
self.assertEqual(bytes, type(pubkey))
def test_ecdsa_bytes2pub(self):
privkey = api.ecdsa_gen_priv()
self.assertEqual(32, len(privkey))
pubkey_tuple = api.ecdsa_priv2pub(privkey, to_bytes=False)
self.assertEqual(tuple, type(pubkey_tuple))
self.assertEqual(2, len(pubkey_tuple))
self.assertEqual(int, type(pubkey_tuple[0]))
self.assertEqual(int, type(pubkey_tuple[1]))
pubkey_bytes = api.ecdsa_priv2pub(privkey)
self.assertEqual(bytes, type(pubkey_bytes))
pubkey = api.ecdsa_bytes2pub(pubkey_bytes)
self.assertEqual(tuple, type(pubkey))
self.assertEqual(2, len(pubkey))
self.assertEqual(int, type(pubkey[0]))
self.assertEqual(int, type(pubkey[1]))
self.assertEqual(pubkey_tuple, pubkey)
def test_ecdsa_gen_priv(self):
privkey = api.ecdsa_gen_priv()
self.assertEqual(bytes, type(privkey))
self.assertEqual(32, len(privkey))
def test_ecdsa_priv2pub(self):
privkey = api.ecdsa_gen_priv()
self.assertEqual(bytes, type(privkey))
self.assertEqual(32, len(privkey))
# Test Serialization
pubkey = api.ecdsa_priv2pub(privkey)
self.assertEqual(bytes, type(pubkey))
self.assertEqual(64, len(pubkey))
# Test no serialization
pubkey = api.ecdsa_priv2pub(privkey, to_bytes=False)
self.assertEqual(tuple, type(pubkey))
self.assertEqual(2, len(pubkey))
self.assertEqual(int, type(pubkey[0]))
self.assertEqual(int, type(pubkey[1]))
def test_ecdsa_gen_sig(self):
v, r, s = 1, 2, 3
sig = api.ecdsa_gen_sig(v, r, s)
self.assertEqual(bytes, type(sig))
def test_ecdsa_load_sig(self):
v = 1
r = int.from_bytes(api.secure_random(32), byteorder='big')
s = int.from_bytes(api.secure_random(32), byteorder='big')
sig = api.ecdsa_gen_sig(v, r, s)
self.assertEqual(bytes, type(sig))
self.assertEqual(65, len(sig))
loaded_sig = api.ecdsa_load_sig(sig)
self.assertEqual(tuple, type(loaded_sig))
self.assertEqual(3, len(loaded_sig))
self.assertEqual((v, r, s), loaded_sig)
def test_ecdsa_sign(self):
msghash = api.secure_random(32)
privkey = api.ecdsa_gen_priv()
vrs = api.ecdsa_sign(msghash, privkey)
self.assertEqual(tuple, type(vrs))
self.assertEqual(3, len(vrs))
def test_ecdsa_verify(self):
msghash = api.secure_random(32)
privkey = api.ecdsa_gen_priv()
pubkey = api.ecdsa_priv2pub(privkey, to_bytes=False)
vrs = api.ecdsa_sign(msghash, privkey)
self.assertEqual(tuple, type(vrs))
self.assertEqual(3, len(vrs))
is_verified = api.ecdsa_verify(*vrs, msghash, pubkey)
self.assertEqual(bool, type(is_verified))
self.assertTrue(is_verified)
def test_symm_encrypt(self):
key = api.secure_random(32)
plaintext = b'this is a test'
ciphertext = api.symm_encrypt(key, plaintext)
self.assertEqual(EncryptedMessage, type(ciphertext))
self.assertNotEqual(plaintext, ciphertext)
def test_symm_decrypt(self):
key = api.secure_random(32)
plaintext = b'this is a test'
ciphertext = api.symm_encrypt(key, plaintext)
self.assertEqual(EncryptedMessage, type(ciphertext))
self.assertNotEqual(plaintext, ciphertext)
dec_text = api.symm_decrypt(key, ciphertext)
self.assertEqual(bytes, type(dec_text))
self.assertNotEqual(ciphertext, dec_text)
self.assertEqual(plaintext, dec_text)
def test_priv_bytes2ec(self):
privkey_bytes = ec.serialize(self.privkey_a)[1:]
self.assertEqual(bytes, type(privkey_bytes))
self.assertEqual(32, len(privkey_bytes))
privkey = api.priv_bytes2ec(privkey_bytes)
self.assertEqual(ec.ec_element, type(privkey))
self.assertEqual(self.privkey_a, privkey)
def test_pub_bytes2ec(self):
pubkey = self.pre.priv2pub(self.privkey_a)
self.assertEqual(ec.ec_element, type(pubkey))
pubkey_bytes = ec.serialize(pubkey)[1:]
self.assertEqual(bytes, type(pubkey_bytes))
self.assertEqual(33, len(pubkey_bytes))
pubkey_ec = api.pub_bytes2ec(pubkey_bytes)
self.assertEqual(ec.ec_element, type(pubkey_ec))
self.assertEqual(pubkey_ec, pubkey)
def test_ecies_gen_priv(self):
# Check serialiation first
privkey = api.ecies_gen_priv()
self.assertEqual(bytes, type(privkey))
self.assertEqual(32, len(privkey))
# Check no serialization
privkey = api.ecies_gen_priv(to_bytes=False)
self.assertEqual(ec.ec_element, type(privkey))
def test_ecies_priv2pub(self):
# Check serialization first
pubkey = api.ecies_priv2pub(self.privkey_a)
self.assertEqual(bytes, type(pubkey))
self.assertEqual(33, len(pubkey))
# Check no serialization
pubkey = api.ecies_priv2pub(self.privkey_a_bytes, to_bytes=False)
self.assertEqual(ec.ec_element, type(pubkey))
def test_ecies_encapsulate(self):
# Check from ec.element
key, enc_key = api.ecies_encapsulate(self.pubkey_a)
self.assertNotEqual(key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(key))
self.assertEqual(32, len(key))
# Check from bytes
key, enc_key = api.ecies_encapsulate(self.pubkey_a_bytes)
self.assertNotEqual(key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(32, len(key))
def test_ecies_decapsulate(self):
# Check from ec.element
key, enc_key = api.ecies_encapsulate(self.pubkey_a)
self.assertNotEqual(key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(key))
self.assertEqual(32, len(key))
dec_key = api.ecies_decapsulate(self.privkey_a, enc_key)
self.assertEqual(bytes, type(dec_key))
self.assertEqual(32, len(dec_key))
self.assertEqual(key, dec_key)
# Check from bytes
key, enc_key = api.ecies_encapsulate(self.pubkey_a_bytes)
self.assertNotEqual(key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(key))
self.assertEqual(32, len(key))
dec_key = api.ecies_decapsulate(self.privkey_a, enc_key)
self.assertEqual(bytes, type(dec_key))
self.assertEqual(32, len(dec_key))
self.assertEqual(key, dec_key)
def test_ecies_rekey(self):
# Check serialization first
rekey = api.ecies_rekey(self.privkey_a, self.privkey_b)
self.assertEqual(bytes, type(rekey))
self.assertEqual(32, len(rekey))
# Check no serialization
rekey = api.ecies_rekey(self.privkey_a_bytes, self.privkey_b_bytes,
to_bytes=False)
self.assertEqual(umbral.RekeyFrag, type(rekey))
self.assertEqual(ec.ec_element, type(rekey.key))
def test_ecies_split_rekey(self):
# Check w/o conversion
frags = api.ecies_split_rekey(self.privkey_a, self.privkey_b, 3, 4)
self.assertEqual(list, type(frags))
self.assertEqual(4, len(frags))
# Check with conversion
frags = api.ecies_split_rekey(self.privkey_a_bytes,
self.privkey_b_bytes, 3, 4)
self.assertEqual(list, type(frags))
self.assertEqual(4, len(frags))
def test_ecies_combine(self):
eph_priv = self.pre.gen_priv()
eph_pub = self.pre.priv2pub(eph_priv)
plain_key, enc_key = api.ecies_encapsulate(eph_pub)
self.assertNotEqual(plain_key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(plain_key))
self.assertEqual(32, len(plain_key))
rk_frags = api.ecies_split_rekey(eph_priv, self.privkey_b, 6, 10)
self.assertEqual(list, type(rk_frags))
self.assertEqual(10, len(rk_frags))
rk_selected = random.sample(rk_frags, 6)
shares = [api.ecies_reencrypt(rk_frag, enc_key) for rk_frag in rk_selected]
self.assertEqual(list, type(shares))
self.assertEqual(6, len(shares))
[self.assertEqual(umbral.EncryptedKey, type(share)) for share in shares]
e_b = api.ecies_combine(shares)
self.assertEqual(umbral.EncryptedKey, type(e_b))
dec_key = api.ecies_decapsulate(self.privkey_b, e_b)
self.assertEqual(bytes, type(dec_key))
self.assertEqual(32, len(dec_key))
self.assertEqual(plain_key, dec_key)
def test_ecies_reencrypt(self):
eph_priv = self.pre.gen_priv()
eph_pub = self.pre.priv2pub(eph_priv)
plain_key, enc_key = api.ecies_encapsulate(eph_pub)
self.assertNotEqual(plain_key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(plain_key))
self.assertEqual(32, len(plain_key))
rk_eb = api.ecies_rekey(eph_priv, self.privkey_b,
to_bytes=False)
self.assertEqual(umbral.RekeyFrag, type(rk_eb))
self.assertEqual(ec.ec_element, type(rk_eb.key))
reenc_key = api.ecies_reencrypt(rk_eb, enc_key)
dec_key = api.ecies_decapsulate(self.privkey_b, reenc_key)
self.assertEqual(plain_key, dec_key)

View File

@ -1,194 +0,0 @@
import unittest
import random
from nacl.utils import EncryptedMessage
from npre import umbral
from npre import elliptic_curve as ec
from nkms.crypto import crypto as Crypto
class TestCrypto(unittest.TestCase):
def setUp(self):
self.pre = umbral.PRE()
self.privkey_a = self.pre.gen_priv()
self.privkey_a_bytes = ec.serialize(self.privkey_a)[1:]
self.privkey_b = self.pre.gen_priv()
self.privkey_b_bytes = ec.serialize(self.privkey_b)[1:]
self.pubkey_a = self.pre.priv2pub(self.privkey_a)
self.pubkey_b = self.pre.priv2pub(self.privkey_b)
self.pubkey_a_bytes = ec.serialize(self.pubkey_a)[1:]
self.pubkey_b_bytes = ec.serialize(self.pubkey_b)[1:]
def test_priv_bytes2ec(self):
privkey_bytes = ec.serialize(self.privkey_a)[1:]
self.assertEqual(bytes, type(privkey_bytes))
self.assertEqual(32, len(privkey_bytes))
privkey = Crypto.priv_bytes2ec(privkey_bytes)
self.assertEqual(ec.ec_element, type(privkey))
self.assertEqual(self.privkey_a, privkey)
def test_pub_bytes2ec(self):
pubkey = self.pre.priv2pub(self.privkey_a)
self.assertEqual(ec.ec_element, type(pubkey))
pubkey_bytes = ec.serialize(pubkey)[1:]
self.assertEqual(bytes, type(pubkey_bytes))
self.assertEqual(33, len(pubkey_bytes))
pubkey_ec = Crypto.pub_bytes2ec(pubkey_bytes)
self.assertEqual(ec.ec_element, type(pubkey_ec))
self.assertEqual(pubkey_ec, pubkey)
def test_symm_encrypt(self):
key = random._urandom(32)
plaintext = b'this is a test'
ciphertext = Crypto.symm_encrypt(key, plaintext)
self.assertEqual(EncryptedMessage, type(ciphertext))
self.assertNotEqual(plaintext, ciphertext)
def test_symm_decrypt(self):
key = random._urandom(32)
plaintext = b'this is a test'
ciphertext = Crypto.symm_encrypt(key, plaintext)
self.assertEqual(EncryptedMessage, type(ciphertext))
self.assertNotEqual(plaintext, ciphertext)
dec_text = Crypto.symm_decrypt(key, ciphertext)
self.assertEqual(bytes, type(dec_text))
self.assertNotEqual(ciphertext, dec_text)
self.assertEqual(plaintext, dec_text)
def test_ecies_gen_priv(self):
# Check serialiation first
privkey = Crypto.ecies_gen_priv()
self.assertEqual(bytes, type(privkey))
self.assertEqual(32, len(privkey))
# Check no serialization
privkey = Crypto.ecies_gen_priv(to_bytes=False)
self.assertEqual(ec.ec_element, type(privkey))
def test_ecies_priv2pub(self):
# Check serialization first
pubkey = Crypto.ecies_priv2pub(self.privkey_a)
self.assertEqual(bytes, type(pubkey))
self.assertEqual(33, len(pubkey))
# Check no serialization
pubkey = Crypto.ecies_priv2pub(self.privkey_a_bytes, to_bytes=False)
self.assertEqual(ec.ec_element, type(pubkey))
def test_ecies_encapsulate(self):
# Check from ec.element
key, enc_key = Crypto.ecies_encapsulate(self.pubkey_a)
self.assertNotEqual(key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(key))
self.assertEqual(32, len(key))
# Check from bytes
key, enc_key = Crypto.ecies_encapsulate(self.pubkey_a_bytes)
self.assertNotEqual(key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(32, len(key))
def test_ecies_decapsulate(self):
# Check from ec.element
key, enc_key = Crypto.ecies_encapsulate(self.pubkey_a)
self.assertNotEqual(key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(key))
self.assertEqual(32, len(key))
dec_key = Crypto.ecies_decapsulate(self.privkey_a, enc_key)
self.assertEqual(bytes, type(dec_key))
self.assertEqual(32, len(dec_key))
self.assertEqual(key, dec_key)
# Check from bytes
key, enc_key = Crypto.ecies_encapsulate(self.pubkey_a_bytes)
self.assertNotEqual(key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(key))
self.assertEqual(32, len(key))
dec_key = Crypto.ecies_decapsulate(self.privkey_a, enc_key)
self.assertEqual(bytes, type(dec_key))
self.assertEqual(32, len(dec_key))
self.assertEqual(key, dec_key)
def test_ecies_rekey(self):
# Check serialization first
rekey = Crypto.ecies_rekey(self.privkey_a, self.privkey_b)
self.assertEqual(bytes, type(rekey))
self.assertEqual(32, len(rekey))
# Check no serialization
rekey = Crypto.ecies_rekey(self.privkey_a_bytes, self.privkey_b_bytes,
to_bytes=False)
self.assertEqual(umbral.RekeyFrag, type(rekey))
self.assertEqual(ec.ec_element, type(rekey.key))
def test_ecies_split_rekey(self):
# Check w/o conversion
frags = Crypto.ecies_split_rekey(self.privkey_a, self.privkey_b, 3, 4)
self.assertEqual(list, type(frags))
self.assertEqual(4, len(frags))
# Check with conversion
frags = Crypto.ecies_split_rekey(self.privkey_a_bytes,
self.privkey_b_bytes, 3, 4)
self.assertEqual(list, type(frags))
self.assertEqual(4, len(frags))
def test_ecies_combine(self):
eph_priv = self.pre.gen_priv()
eph_pub = self.pre.priv2pub(eph_priv)
plain_key, enc_key = Crypto.ecies_encapsulate(eph_pub)
self.assertNotEqual(plain_key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(plain_key))
self.assertEqual(32, len(plain_key))
rk_frags = Crypto.ecies_split_rekey(eph_priv, self.privkey_b, 6, 10)
self.assertEqual(list, type(rk_frags))
self.assertEqual(10, len(rk_frags))
rk_selected = random.sample(rk_frags, 6)
shares = [Crypto.ecies_reencrypt(rk_frag, enc_key) for rk_frag in rk_selected]
self.assertEqual(list, type(shares))
self.assertEqual(6, len(shares))
[self.assertEqual(umbral.EncryptedKey, type(share)) for share in shares]
e_b = Crypto.ecies_combine(shares)
self.assertEqual(umbral.EncryptedKey, type(e_b))
dec_key = Crypto.ecies_decapsulate(self.privkey_b, e_b)
self.assertEqual(bytes, type(dec_key))
self.assertEqual(32, len(dec_key))
self.assertEqual(plain_key, dec_key)
def test_ecies_reencrypt(self):
eph_priv = self.pre.gen_priv()
eph_pub = self.pre.priv2pub(eph_priv)
plain_key, enc_key = Crypto.ecies_encapsulate(eph_pub)
self.assertNotEqual(plain_key, enc_key)
self.assertEqual(umbral.EncryptedKey, type(enc_key))
self.assertEqual(bytes, type(plain_key))
self.assertEqual(32, len(plain_key))
rk_eb = Crypto.ecies_rekey(eph_priv, self.privkey_b,
to_bytes=False)
self.assertEqual(umbral.RekeyFrag, type(rk_eb))
self.assertEqual(ec.ec_element, type(rk_eb.key))
reenc_key = Crypto.ecies_reencrypt(rk_eb, enc_key)
dec_key = Crypto.ecies_decapsulate(self.privkey_b, reenc_key)
self.assertEqual(plain_key, dec_key)

View File

@ -3,7 +3,7 @@ import sha3
import msgpack
import random
from nkms.crypto import crypto as Crypto
from nkms.crypto import api
from nkms.crypto.keypairs import EncryptingKeypair
from nkms.crypto.powers import SigningKeypair
@ -18,27 +18,28 @@ class TestSigningKeypair(unittest.TestCase):
msg_digest = sha3.keccak_256(self.msg).digest()
signature = self.keypair_a.sign(msg_digest)
sig = msgpack.loads(signature)
self.assertTrue(1, len(sig[0])) # Check v
self.assertTrue(32, len(sig[1])) # Check r
self.assertTrue(32, len(sig[2])) # Check s
sig = api.ecdsa_load_sig(signature)
self.assertEqual(tuple, type(sig))
self.assertEqual(int, type(sig[0])) # Check v
self.assertEqual(int, type(sig[1])) # Check r
self.assertEqual(int, type(sig[2])) # Check s
def test_verification(self):
msg_digest = sha3.keccak_256(self.msg).digest()
signature = self.keypair_a.sign(msg_digest)
sig = msgpack.loads(signature)
self.assertTrue(1, len(sig[0])) # Check v
self.assertTrue(32, len(sig[1])) # Check r
self.assertTrue(32, len(sig[2])) # Check s
sig = api.ecdsa_load_sig(signature)
self.assertEqual(int, type(sig[0])) # Check v
self.assertEqual(int, type(sig[1])) # Check r
self.assertEqual(int, type(sig[2])) # Check s
verify_sig = Crypto.verify(signature, msg_digest,
pubkey=self.keypair_a.pub_key)
verify_sig = api.ecdsa_verify(*sig, msg_digest,
self.keypair_a.pub_key)
self.assertTrue(verify_sig)
def test_digest(self):
digest_a = Crypto.digest(b'foo', b'bar')
digest_b = Crypto.digest(b'foobar')
digest_a = api.keccak_digest(b'foo', b'bar')
digest_b = api.keccak_digest(b'foobar')
self.assertEqual(digest_a, digest_b)

View File

@ -4,7 +4,7 @@ import unittest
import msgpack
import npre.elliptic_curve as ec
from nkms.crypto import crypto as Crypto
from nkms.crypto import api
from nkms.crypto.keystore import KeyStore
from nkms.crypto.powers import CryptoPower, SigningKeypair
@ -20,21 +20,21 @@ class TestKeyRing(unittest.TestCase):
def test_signing(self):
signature = self.power_of_signing.sign(self.msg)
sig = msgpack.loads(signature)
self.assertTrue(1, len(sig[0])) # Check v
self.assertTrue(32, len(sig[1])) # Check r
self.assertTrue(32, len(sig[2])) # Check s
sig = api.ecdsa_load_sig(signature)
self.assertTrue(int, type(sig[0])) # Check v
self.assertTrue(int, type(sig[1])) # Check r
self.assertTrue(int, type(sig[2])) # Check s
def test_verification(self):
signature = self.power_of_signing.sign(self.msg)
sig = msgpack.loads(signature)
self.assertTrue(1, len(sig[0])) # Check v
self.assertTrue(32, len(sig[1])) # Check r
self.assertTrue(32, len(sig[2])) # Check s
sig = api.ecdsa_load_sig(signature)
self.assertTrue(int, type(sig[0])) # Check v
self.assertTrue(int, type(sig[1])) # Check r
self.assertTrue(int, type(sig[2])) # Check s
msghash = Crypto.digest(self.msg)
is_valid = Crypto.verify(signature, msghash,
msghash = api.keccak_digest(self.msg)
is_valid = api.ecdsa_verify(*sig, msghash,
pubkey=self.power_of_signing.pubkey_sig_tuple())
self.assertTrue(is_valid)
@ -103,14 +103,14 @@ class TestKeyRing(unittest.TestCase):
self.assertTrue(dec_key == raw_key)
def test_symm_encryption(self):
key = Crypto.secure_random(32)
key = api.secure_random(32)
self.assertEqual(32, len(key))
ciphertext = self.keyring_a.symm_encrypt(key, self.msg)
self.assertTrue(self.msg not in ciphertext)
def test_symm_decryption(self):
key = Crypto.secure_random(32)
key = api.secure_random(32)
self.assertEqual(32, len(key))
ciphertext = self.keyring_a.symm_encrypt(key, self.msg)
@ -192,5 +192,5 @@ class TestKeyRing(unittest.TestCase):
def test_secure_random(self):
length = random.randrange(1, 100)
rand_bytes = Crypto.secure_random(length)
rand_bytes = api.secure_random(length)
self.assertEqual(length, len(rand_bytes))

View File

@ -1,12 +1,12 @@
from nkms.crypto import default_algorithm
from nkms.crypto import symmetric_from_algorithm
from nkms.crypto import pre_from_algorithm
from nkms.crypto import crypto as Crypto
from nkms.crypto import api
def test_symmetric():
Cipher = symmetric_from_algorithm(default_algorithm)
key = Crypto.secure_random(Cipher.KEY_SIZE)
key = api.secure_random(Cipher.KEY_SIZE)
cipher = Cipher(key)
data = b'Hello world' * 10

View File

@ -5,7 +5,7 @@ import pytest
from nkms import crypto
from nkms.characters import Ursula, Alice, Character
from nkms.crypto import crypto as Crypto
from nkms.crypto import api
from nkms.crypto.encrypting_keypair import EncryptingKeypair
from nkms.crypto.keystore import KeyStore
from nkms.policy.constants import NON_PAYMENT
@ -58,7 +58,7 @@ def test_complete_treasure_map_flow():
treasure_map = TreasureMap()
for i in range(50):
treasure_map.nodes.append(Crypto.secure_random(50))
treasure_map.nodes.append(api.secure_random(50))
encrypted_treasure_map = bob_encrypting_keypair.encrypt(treasure_map.packed_payload())
signature = "THIS IS A SIGNATURE"