mirror of https://github.com/nucypher/nucypher.git
Add tests for sqlite keystore
commit
2fc5596d5b
|
@ -126,7 +126,7 @@ class KeyStore(object):
|
|||
self.session.commit()
|
||||
return new_workorder
|
||||
|
||||
def get_workorder(self, hrac: bytes) -> Workorder:
|
||||
def get_workorders(self, hrac: bytes) -> Workorder:
|
||||
"""
|
||||
Returns a list of Workorders by HRAC.
|
||||
"""
|
||||
|
@ -135,7 +135,7 @@ class KeyStore(object):
|
|||
raise NotFound("No Workorders with {} HRAC found.".format(hrac))
|
||||
return workorders
|
||||
|
||||
def del_workorder(self, hrac: bytes):
|
||||
def del_workorders(self, hrac: bytes):
|
||||
"""
|
||||
Deletes a Workorder from the Keystore.
|
||||
"""
|
||||
|
|
|
@ -1,97 +1,89 @@
|
|||
import unittest
|
||||
import sha3
|
||||
from sqlalchemy import create_engine
|
||||
import pytest
|
||||
|
||||
from nkms.keystore.db import Base
|
||||
from datetime import datetime
|
||||
from nkms.keystore import keystore, keypairs
|
||||
from nkms.crypto import api as API
|
||||
from umbral.keys import UmbralPrivateKey
|
||||
|
||||
|
||||
class TestKeyStore(unittest.TestCase):
|
||||
def setUp(self):
|
||||
engine = create_engine('sqlite:///:memory:')
|
||||
Base.metadata.create_all(engine)
|
||||
def test_key_sqlite_keystore(test_keystore):
|
||||
keypair = keypairs.SigningKeypair(generate_keys_if_needed=True)
|
||||
|
||||
self.ks = keystore.KeyStore(engine)
|
||||
# Test add pubkey
|
||||
new_key = test_keystore.add_key(keypair)
|
||||
|
||||
def test_ecies_keypair_generation(self):
|
||||
keypair = self.ks.generate_encrypting_keypair()
|
||||
self.assertEqual(keypairs.EncryptingKeypair, type(keypair))
|
||||
self.assertEqual(bytes, type(keypair.privkey))
|
||||
self.assertEqual(bytes, type(keypair.pubkey))
|
||||
# Test get pubkey
|
||||
query_key = test_keystore.get_key(keypair.fingerprint)
|
||||
assert new_key == query_key
|
||||
|
||||
def test_ecdsa_keypair_generation(self):
|
||||
# TODO: Make this test actually do something instead of just checking types.
|
||||
keypair = self.ks.generate_signing_keypair()
|
||||
self.assertEqual(keypairs.SigningKeypair, type(keypair))
|
||||
# Test del pubkey
|
||||
test_keystore.del_key(keypair.fingerprint)
|
||||
with pytest.raises(keystore.NotFound):
|
||||
del_key = test_keystore.get_key(keypair.fingerprint)
|
||||
|
||||
def test_key_sqlite_keystore(self):
|
||||
keypair = self.ks.generate_encrypting_keypair()
|
||||
self.assertEqual(keypairs.EncryptingKeypair, type(keypair))
|
||||
self.assertEqual(bytes, type(keypair.privkey))
|
||||
self.assertEqual(bytes, type(keypair.pubkey))
|
||||
def test_policy_contract_sqlite_keystore(test_keystore):
|
||||
alice_keypair_sig = keypairs.SigningKeypair(generate_keys_if_needed=True)
|
||||
alice_keypair_enc = keypairs.EncryptingKeypair(generate_keys_if_needed=True)
|
||||
bob_keypair_sig = keypairs.SigningKeypair(generate_keys_if_needed=True)
|
||||
|
||||
# Test add pubkey
|
||||
fingerprint_pub = self.ks.add_key(keypair, store_pub=True)
|
||||
self.assertEqual(bytes, type(fingerprint_pub))
|
||||
self.assertEqual(64, len(fingerprint_pub))
|
||||
hrac = b'test'
|
||||
|
||||
key_hash = sha3.keccak_256(keypair.pubkey).hexdigest().encode()
|
||||
self.assertEqual(key_hash, fingerprint_pub)
|
||||
# Test add PolicyContract
|
||||
new_contract = test_keystore.add_policy_contract(
|
||||
datetime.utcnow(), b'test', hrac, alice_keypair_sig,
|
||||
alice_keypair_enc, bob_keypair_sig, b'test'
|
||||
)
|
||||
|
||||
# Test add privkey
|
||||
fingerprint_priv = self.ks.add_key(keypair, store_pub=False)
|
||||
self.assertEqual(bytes, type(fingerprint_priv))
|
||||
self.assertEqual(64, len(fingerprint_priv))
|
||||
# Test get PolicyContract
|
||||
query_contract = test_keystore.get_policy_contract(hrac)
|
||||
assert new_contract == query_contract
|
||||
|
||||
key_hash = sha3.keccak_256(keypair.privkey).hexdigest().encode()
|
||||
self.assertEqual(key_hash, fingerprint_priv)
|
||||
# Test del PolicyContract
|
||||
test_keystore.del_policy_contract(hrac)
|
||||
with pytest.raises(keystore.NotFound):
|
||||
del_key = test_keystore.get_policy_contract(hrac)
|
||||
|
||||
# Test get pubkey
|
||||
keypair_pub = self.ks.get_key(fingerprint_pub)
|
||||
self.assertEqual(keypairs.EncryptingKeypair, type(keypair_pub))
|
||||
self.assertTrue(keypair_pub.public_only)
|
||||
self.assertEqual(keypair.pubkey, keypair_pub.pubkey)
|
||||
|
||||
# Test get privkey
|
||||
keypair_priv = self.ks.get_key(fingerprint_priv)
|
||||
self.assertEqual(keypairs.EncryptingKeypair, type(keypair_priv))
|
||||
self.assertFalse(keypair_priv.public_only)
|
||||
self.assertEqual(keypair.privkey, keypair_priv.privkey)
|
||||
self.assertIsNotNone(keypair_priv.pubkey)
|
||||
self.assertEqual(keypair.pubkey, keypair_priv.pubkey)
|
||||
def test_workorder_sqlite_keystore(test_keystore):
|
||||
bob_keypair_sig1 = keypairs.SigningKeypair(generate_keys_if_needed=True)
|
||||
bob_keypair_sig2 = keypairs.SigningKeypair(generate_keys_if_neeeded=True)
|
||||
|
||||
# Test del pubkey
|
||||
self.ks.del_key(fingerprint_pub)
|
||||
with self.assertRaises(keystore.KeyNotFound):
|
||||
key = self.ks.get_key(fingerprint_pub)
|
||||
hrac = b'test'
|
||||
|
||||
# Test del privkey
|
||||
self.ks.del_key(fingerprint_priv)
|
||||
with self.assertRaises(keystore.KeyNotFound):
|
||||
key = self.ks.get_key(fingerprint_priv)
|
||||
# Test add workorder
|
||||
new_workorder1 = test_keystore.add_workorder(bob_keypair_sig1, b'test', hrac)
|
||||
new_workorder2 = test_keystore.add_workorder(bob_keypair_sig2, b'test', hrac)
|
||||
|
||||
def test_keyfrag_sqlite(self):
|
||||
kfrag_component_length = 32
|
||||
rand_sig = API.secure_random(65)
|
||||
rand_id = b'\x00' + API.secure_random(kfrag_component_length)
|
||||
rand_key = b'\x00' + API.secure_random(kfrag_component_length)
|
||||
rand_hrac = API.secure_random(32)
|
||||
# Test get workorder
|
||||
query_workorders = test_keystore.get_workorders(hrac)
|
||||
assert set([new_workorder1, new_workorder2]).issubset(query_workorders)
|
||||
|
||||
kfrag = KFrag(rand_id+rand_key)
|
||||
self.ks.add_kfrag(rand_hrac, kfrag, sig=rand_sig)
|
||||
# Test del workorder
|
||||
test_keystore.del_workorders(hrac)
|
||||
with pytest.raises(keystore.NotFound):
|
||||
del_key = test_keystore.get_workorders(hrac)
|
||||
|
||||
# Check that kfrag was added
|
||||
kfrag_from_datastore, signature = self.ks.get_kfrag(rand_hrac, get_sig=True)
|
||||
self.assertEqual(rand_sig, signature)
|
||||
def test_keyfrag_sqlite(self):
|
||||
kfrag_component_length = 32
|
||||
rand_sig = API.secure_random(65)
|
||||
rand_id = b'\x00' + API.secure_random(kfrag_component_length)
|
||||
rand_key = b'\x00' + API.secure_random(kfrag_component_length)
|
||||
rand_hrac = API.secure_random(32)
|
||||
|
||||
# De/serialization happens here, by dint of the slicing interface, which casts the kfrag to bytes.
|
||||
# The +1 is to account for the metabyte.
|
||||
self.assertEqual(kfrag_from_datastore[:kfrag_component_length + 1], rand_id)
|
||||
self.assertEqual(kfrag_from_datastore[kfrag_component_length + 1:], rand_key)
|
||||
self.assertEqual(kfrag_from_datastore, kfrag)
|
||||
kfrag = KFrag(rand_id+rand_key)
|
||||
self.ks.add_kfrag(rand_hrac, kfrag, sig=rand_sig)
|
||||
|
||||
# Check that kfrag gets deleted
|
||||
self.ks.del_kfrag(rand_hrac)
|
||||
with self.assertRaises(keystore.KeyNotFound):
|
||||
key = self.ks.get_key(rand_hrac)
|
||||
# Check that kfrag was added
|
||||
kfrag_from_datastore, signature = self.ks.get_kfrag(rand_hrac, get_sig=True)
|
||||
self.assertEqual(rand_sig, signature)
|
||||
|
||||
# De/serialization happens here, by dint of the slicing interface, which casts the kfrag to bytes.
|
||||
# The +1 is to account for the metabyte.
|
||||
self.assertEqual(kfrag_from_datastore[:kfrag_component_length + 1], rand_id)
|
||||
self.assertEqual(kfrag_from_datastore[kfrag_component_length + 1:], rand_key)
|
||||
self.assertEqual(kfrag_from_datastore, kfrag)
|
||||
|
||||
# Check that kfrag gets deleted
|
||||
self.ks.del_kfrag(rand_hrac)
|
||||
with self.assertRaises(keystore.KeyNotFound):
|
||||
key = self.ks.get_key(rand_hrac)
|
||||
|
|
Loading…
Reference in New Issue