mirror of https://github.com/nucypher/nucypher.git
parent
f1ad1426ff
commit
0c05699908
|
@ -22,7 +22,7 @@ from flask import Response
|
|||
from unittest.mock import patch
|
||||
|
||||
from nucypher.characters.lawful import Ursula
|
||||
from nucypher.crypto.umbral_adapter import PublicKey
|
||||
from nucypher.crypto.umbral_adapter import PublicKey, encrypt
|
||||
from nucypher.datastore.base import RecordField
|
||||
from nucypher.network.nodes import Teacher
|
||||
from tests.markers import skip_on_circleci
|
||||
|
@ -36,8 +36,6 @@ from tests.mock.performance_mocks import (
|
|||
mock_metadata_validation,
|
||||
mock_pubkey_from_bytes,
|
||||
mock_secret_source,
|
||||
mock_signature_bytes,
|
||||
mock_stamp_call,
|
||||
mock_verify_node
|
||||
)
|
||||
from tests.utils.middleware import SluggishLargeFleetMiddleware
|
||||
|
@ -77,7 +75,7 @@ def test_alice_can_learn_about_a_whole_bunch_of_ursulas(highperf_mocked_alice):
|
|||
actual_ursula.bytestring_of_known_nodes = lambda *args, **kwargs: _teacher_known_nodes_bytestring # TODO: Formalize this? #1537
|
||||
|
||||
with mock_cert_storage, mock_cert_loading, mock_verify_node, mock_message_verification, mock_metadata_validation:
|
||||
with mock_pubkey_from_bytes(), mock_stamp_call, mock_signature_bytes:
|
||||
with mock_pubkey_from_bytes():
|
||||
started = time.time()
|
||||
highperf_mocked_alice.block_until_number_of_known_nodes_is(4000, learn_on_this_thread=True)
|
||||
ended = time.time()
|
||||
|
@ -115,19 +113,26 @@ def test_alice_verifies_ursula_just_in_time(fleet_of_highperf_mocked_ursulas,
|
|||
def mock_receive_treasure_map():
|
||||
return Response(bytes(), status=201)
|
||||
|
||||
def mock_encrypt(public_key, plaintext):
|
||||
if not isinstance(public_key, PublicKey):
|
||||
public_key = public_key.i_want_to_be_a_real_boy()
|
||||
return encrypt(public_key, plaintext)
|
||||
|
||||
with NotARestApp.replace_route("receive_treasure_map", mock_receive_treasure_map):
|
||||
with NotARestApp.replace_route("set_policy", mock_set_policy):
|
||||
with patch('umbral.PublicKey.__eq__', lambda *args, **kwargs: True):
|
||||
with patch('umbral.PublicKey.from_bytes',
|
||||
with patch('nucypher.crypto.umbral_adapter.PublicKey.__eq__', lambda *args, **kwargs: True):
|
||||
with patch('nucypher.crypto.umbral_adapter.PublicKey.from_bytes',
|
||||
new=actual_random_key_instead):
|
||||
with patch("nucypher.datastore.models.PolicyArrangement._alice_verifying_key",
|
||||
new=not_public_key_record_field):
|
||||
with mock_cert_loading, mock_metadata_validation, mock_message_verification:
|
||||
with mock_secret_source():
|
||||
policy = highperf_mocked_alice.grant(
|
||||
highperf_mocked_bob, b"any label", m=20, n=30,
|
||||
expiration=maya.when('next week'),
|
||||
publish_treasure_map=False)
|
||||
with patch('nucypher.crypto.umbral_adapter.encrypt',
|
||||
new=mock_encrypt):
|
||||
policy = highperf_mocked_alice.grant(
|
||||
highperf_mocked_bob, b"any label", m=20, n=30,
|
||||
expiration=maya.when('next week'),
|
||||
publish_treasure_map=False)
|
||||
# TODO: Make some assertions about policy.
|
||||
total_verified = sum(node.verified_node for node in highperf_mocked_alice.known_nodes)
|
||||
# Alice may be able to verify more than `n`, but certainly not less,
|
||||
|
@ -170,7 +175,7 @@ def test_mass_treasure_map_placement(fleet_of_highperf_mocked_ursulas,
|
|||
|
||||
policy = _POLICY_PRESERVER.pop()
|
||||
|
||||
with patch('umbral.PublicKey.__eq__', lambda *args, **kwargs: True), mock_metadata_validation:
|
||||
with patch('nucypher.crypto.umbral_adapter.PublicKey.__eq__', lambda *args, **kwargs: True), mock_metadata_validation:
|
||||
|
||||
started = datetime.now()
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ class NotAPublicKey:
|
|||
self.serial = serial
|
||||
|
||||
def __bytes__(self):
|
||||
return b"\x03\ not a compress publickey:" + self.serial
|
||||
return b"\x03 not a compress publickey:" + self.serial
|
||||
|
||||
@classmethod
|
||||
def reset(cls):
|
||||
|
@ -81,13 +81,8 @@ class NotAPublicKey:
|
|||
def from_int(cls, serial):
|
||||
return cls(serial.to_bytes(cls._serial_bytes_length, byteorder="big"))
|
||||
|
||||
def to_bytes(self, *args, **kwargs):
|
||||
return b"this is not a public key... but it is 64 bytes.. so, ya know" + self.serial
|
||||
|
||||
def i_want_to_be_a_real_boy(self):
|
||||
_umbral_pubkey = self._umbral_pubkey_from_bytes(bytes(self))
|
||||
self.__dict__ = _umbral_pubkey.__dict__
|
||||
self.__class__ = _umbral_pubkey.__class__
|
||||
return self._umbral_pubkey_from_bytes(bytes(self))
|
||||
|
||||
def __eq__(self, other):
|
||||
return bytes(self) == bytes(other)
|
||||
|
@ -95,23 +90,25 @@ class NotAPublicKey:
|
|||
|
||||
class NotAPrivateKey:
|
||||
|
||||
fake_signature = Signature.from_bytes(
|
||||
b'@\xbfS&\x97\xb3\x9e\x9e\xd3\\j\x9f\x0e\x8fY\x0c\xbeS\x08d\x0b%s\xf6\x17\xe2\xb6\xcd\x95u\xaapON\xd9E\xb3\x10M\xe1\xf4u\x0bL\x99q\xd6\r\x8e_\xe5I\x1e\xe5\xa2\xcf\xe5\x8be_\x077Gz'
|
||||
)
|
||||
|
||||
def public_key(self):
|
||||
return NotAPublicKey()
|
||||
|
||||
def sign(self, *args, **kwargs):
|
||||
return b'0D\x02 @\xbfS&\x97\xb3\x9e\x9e\xd3\\j\x9f\x0e\x8fY\x0c\xbeS\x08d\x0b%s\xf6\x17\xe2\xb6\xcd\x95u\xaap\x02 ON\xd9E\xb3\x10M\xe1\xf4u\x0bL\x99q\xd6\r\x8e_\xe5I\x1e\xe5\xa2\xcf\xe5\x8be_\x077Gz'
|
||||
|
||||
@classmethod
|
||||
def stamp(cls, *args, **kwargs):
|
||||
return cls.fake_signature
|
||||
class NotASignature:
|
||||
|
||||
@classmethod
|
||||
def signature_bytes(cls, *args, **kwargs):
|
||||
return b'@\xbfS&\x97\xb3\x9e\x9e\xd3\\j\x9f\x0e\x8fY\x0c\xbeS\x08d\x0b%s\xf6\x17\xe2\xb6\xcd\x95u\xaapON\xd9E\xb3\x10M\xe1\xf4u\x0bL\x99q\xd6\r\x8e_\xe5I\x1e\xe5\xa2\xcf\xe5\x8be_\x077Gz'
|
||||
fake_signature_bytes = b'@\xbfS&\x97\xb3\x9e\x9e\xd3\\j\x9f\x0e\x8fY\x0c\xbeS\x08d\x0b%s\xf6\x17\xe2\xb6\xcd\x95u\xaapON\xd9E\xb3\x10M\xe1\xf4u\x0bL\x99q\xd6\r\x8e_\xe5I\x1e\xe5\xa2\xcf\xe5\x8be_\x077Gz'
|
||||
|
||||
def __bytes__(self):
|
||||
return self.fake_signature_bytes
|
||||
|
||||
|
||||
class NotASigner:
|
||||
|
||||
def __init__(self, secret_key):
|
||||
self._secret_key = secret_key
|
||||
|
||||
def sign(self, message):
|
||||
return NotASignature()
|
||||
|
||||
|
||||
class NotACert:
|
||||
|
@ -217,21 +214,18 @@ mock_metadata_validation = patch("nucypher.network.nodes.Teacher.validate_metada
|
|||
@contextmanager
|
||||
def mock_secret_source(*args, **kwargs):
|
||||
with patch("nucypher.crypto.keypairs.Keypair._private_key_source", new=lambda *args, **kwargs: NotAPrivateKey()):
|
||||
yield
|
||||
with patch("nucypher.crypto.keypairs.Signer", new=lambda *args, **kwargs: NotASigner(*args, **kwargs)):
|
||||
yield
|
||||
NotAPublicKey.reset()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def mock_pubkey_from_bytes(*args, **kwargs):
|
||||
with patch('umbral.PublicKey.from_bytes', NotAPublicKey.from_bytes):
|
||||
with patch('nucypher.crypto.umbral_adapter.PublicKey.from_bytes', NotAPublicKey.from_bytes):
|
||||
yield
|
||||
NotAPublicKey.reset()
|
||||
|
||||
|
||||
mock_stamp_call = patch('nucypher.crypto.signing.SignatureStamp.__call__', new=NotAPrivateKey.stamp)
|
||||
mock_signature_bytes = patch('umbral.Signature.__bytes__', new=NotAPrivateKey.signature_bytes)
|
||||
|
||||
|
||||
def _determine_good_serials(start, end):
|
||||
'''
|
||||
Figure out which serials are good to use in mocks because they won't result in non-viable public keys.
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue