Removes Web and RPC controller entities

pull/2987/head
Kieran Prasch 2022-10-25 17:29:43 +01:00
parent 52af298af4
commit 07bf78d9ff
9 changed files with 0 additions and 1145 deletions

View File

@ -1,38 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from nucypher.control.controllers import CLIController
from nucypher.control.emitters import StdoutEmitter
class CharacterCLIController(CLIController):
_emitter_class = StdoutEmitter
def __init__(self,
interface: 'CharacterPublicInterface',
*args,
**kwargs):
super().__init__(interface=interface, *args, **kwargs)
def _perform_action(self, *args, **kwargs) -> dict:
try:
response_data = super()._perform_action(*args, **kwargs)
finally:
self.log.debug(f"Finished action '{kwargs['action']}', stopping {self.interface.implementer}")
self.interface.implementer.disenchant()
return response_data

View File

@ -1,184 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from typing import Union, List
import maya
from nucypher_core import MessageKit, HRAC, EncryptedTreasureMap
from nucypher_core.umbral import PublicKey
from nucypher.characters.base import Character
from nucypher.characters.control.specifications import alice, bob, enrico
from nucypher.control.interfaces import attach_schema, ControlInterface
from nucypher.crypto.powers import DecryptingPower, SigningPower
from nucypher.network.middleware import RestMiddleware
class CharacterPublicInterface(ControlInterface):
def __init__(self, character: Character = None, *args, **kwargs):
super().__init__(implementer=character, *args, **kwargs)
class AliceInterface(CharacterPublicInterface):
@attach_schema(alice.CreatePolicy)
def create_policy(self,
bob_encrypting_key: PublicKey,
bob_verifying_key: PublicKey,
label: bytes,
threshold: int,
shares: int,
expiration: maya.MayaDT,
value: int = None
) -> dict:
from nucypher.characters.lawful import Bob
bob = Bob.from_public_keys(encrypting_key=bob_encrypting_key,
verifying_key=bob_verifying_key)
new_policy = self.implementer.create_policy(
bob=bob,
label=label,
threshold=threshold,
shares=shares,
expiration=expiration,
value=value
)
response_data = {'label': new_policy.label, 'policy_encrypting_key': new_policy.public_key}
return response_data
@attach_schema(alice.DerivePolicyEncryptionKey)
def derive_policy_encrypting_key(self, label: bytes) -> dict:
policy_encrypting_key = self.implementer.get_policy_encrypting_key_from_label(label)
response_data = {'policy_encrypting_key': policy_encrypting_key, 'label': label}
return response_data
@attach_schema(alice.GrantPolicy)
def grant(self,
bob_encrypting_key: PublicKey,
bob_verifying_key: PublicKey,
label: bytes,
threshold: int,
shares: int,
expiration: maya.MayaDT,
value: int = None,
rate: int = None,
) -> dict:
from nucypher.characters.lawful import Bob
bob = Bob.from_public_keys(encrypting_key=bob_encrypting_key,
verifying_key=bob_verifying_key)
new_policy = self.implementer.grant(bob=bob,
label=label,
threshold=threshold,
shares=shares,
value=value,
rate=rate,
expiration=expiration)
response_data = {'treasure_map': new_policy.treasure_map,
'policy_encrypting_key': new_policy.public_key,
# For the users of this interface, Publisher is always the same as Alice,
# so we are only returning the Alice's key.
'alice_verifying_key': self.implementer.stamp.as_umbral_pubkey()}
return response_data
@attach_schema(alice.Revoke)
def revoke(self, label: bytes, bob_verifying_key: PublicKey) -> dict:
# TODO: Move deeper into characters
policy_hrac = HRAC(self.implementer.stamp.as_umbral_pubkey(), bob_verifying_key, label)
policy = self.implementer.active_policies[policy_hrac]
receipt, failed_revocations = self.implementer.revoke(policy)
if len(failed_revocations) > 0:
for node_id, attempt in failed_revocations.items():
revocation, fail_reason = attempt
if fail_reason == RestMiddleware.NotFound:
del (failed_revocations[node_id])
if len(failed_revocations) <= (policy.shares - policy.threshold + 1):
del (self.implementer.active_policies[policy_hrac])
response_data = {'failed_revocations': len(failed_revocations)}
return response_data
@attach_schema(alice.Decrypt)
def decrypt(self, label: bytes, message_kit: MessageKit) -> dict:
"""
Character control endpoint to allow Alice to decrypt her own data.
"""
plaintexts = self.implementer.decrypt_message_kit(
message_kit=message_kit,
label=label
)
response = {'cleartexts': plaintexts}
return response
@attach_schema(alice.PublicKeys)
def public_keys(self) -> dict:
"""
Character control endpoint for getting Alice's public keys.
"""
verifying_key = self.implementer.public_keys(SigningPower)
response_data = {'alice_verifying_key': verifying_key}
return response_data
class BobInterface(CharacterPublicInterface):
@attach_schema(bob.RetrieveAndDecrypt)
def retrieve_and_decrypt(self,
alice_verifying_key: PublicKey,
message_kits: List[MessageKit],
encrypted_treasure_map: EncryptedTreasureMap) -> dict:
"""
Character control endpoint for re-encrypting and decrypting policy data.
"""
plaintexts = self.implementer.retrieve_and_decrypt(message_kits,
alice_verifying_key=alice_verifying_key,
encrypted_treasure_map=encrypted_treasure_map)
response_data = {'cleartexts': plaintexts}
return response_data
@attach_schema(bob.PublicKeys)
def public_keys(self) -> dict:
"""
Character control endpoint for getting Bob's encrypting and signing public keys
"""
verifying_key = self.implementer.public_keys(SigningPower)
encrypting_key = self.implementer.public_keys(DecryptingPower)
response_data = {'bob_encrypting_key': encrypting_key, 'bob_verifying_key': verifying_key}
return response_data
class EnricoInterface(CharacterPublicInterface):
@attach_schema(enrico.EncryptMessage)
def encrypt_message(self, plaintext: Union[str, bytes]) -> dict:
"""
Character control endpoint for encrypting data for a policy and
receiving the messagekit (and signature) to give to Bob.
"""
message_kit = self.implementer.encrypt_message(plaintext=plaintext)
response_data = {'message_kit': message_kit}
return response_data

View File

@ -1,215 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from collections import deque
from pathlib import Path
import maya
import os
from twisted.internet import reactor
from twisted.internet.protocol import connectionDone
from twisted.internet.stdio import StandardIO
from twisted.protocols.basic import LineReceiver
from nucypher.utilities.logging import Logger
class UrsulaCommandProtocol(LineReceiver):
encoding = 'utf-8'
delimiter = os.linesep.encode(encoding=encoding)
def __init__(self, ursula, emitter):
super().__init__()
self.ursula = ursula
self.emitter = emitter
self.start_time = maya.now()
self.__history = deque(maxlen=10)
self.prompt = bytes('Ursula({}) >>> '.format(self.ursula.checksum_address[:9]), encoding='utf-8')
# Expose Ursula functional entry points
self.__commands = {
# Help
'?': self.paintHelp,
'help': self.paintHelp,
# Status
'status': self.paintStatus,
'known_nodes': self.paintKnownNodes,
'fleet_state': self.paintFleetState,
# Learning Control
'cycle_teacher': self.cycle_teacher,
'start_learning': self.start_learning,
'stop_learning': self.stop_learning,
# Process Control
'stop': self.stop,
}
self._hidden_commands = ('?',)
@property
def commands(self):
return self.__commands.keys()
def paintHelp(self):
"""
Display this help message.
"""
self.emitter.echo("\nUrsula Command Help\n===================\n")
for command, func in self.__commands.items():
if command not in self._hidden_commands:
try:
self.emitter.echo(f'{command}\n{"-"*len(command)}\n{func.__doc__.lstrip()}')
except AttributeError:
raise AttributeError("Ursula Command method is missing a docstring,"
" which is required for generating help text.")
def paintKnownNodes(self):
"""
Display a list of all known nucypher peers.
"""
from nucypher.cli.painting.nodes import paint_known_nodes
paint_known_nodes(emitter=self.emitter, ursula=self.ursula)
def paintStatus(self):
"""
Display the current status of the attached Ursula node.
"""
from nucypher.cli.painting.nodes import paint_node_status
paint_node_status(emitter=self.emitter, ursula=self.ursula, start_time=self.start_time)
def paintFleetState(self):
"""
Display information about the network-wide fleet state as the attached Ursula node sees it.
"""
from nucypher.cli.painting.nodes import build_fleet_state_status
self.emitter.echo(build_fleet_state_status(ursula=self.ursula))
def connectionMade(self):
self.emitter.echo("\nType 'help' or '?' for help")
self.transport.write(self.prompt)
def connectionLost(self, reason=connectionDone) -> None:
self.ursula.stop_learning_loop(reason=reason)
def lineReceived(self, line):
"""Ursula Console REPL"""
# Read
raw_line = line.decode(encoding=self.encoding)
line = raw_line.strip().lower()
# Evaluate
try:
self.__commands[line]()
# Print
except KeyError:
if line: # allow for empty string
self.emitter.echo("Invalid input")
self.__commands["?"]()
else:
self.__history.append(raw_line)
# Loop
self.transport.write(self.prompt)
def cycle_teacher(self):
"""
Manually direct the attached Ursula node to start learning from a different teacher.
"""
return self.ursula.cycle_teacher_node()
def start_learning(self):
"""
Manually start the attached Ursula's node learning protocol.
"""
return self.ursula.start_learning_loop()
def stop_learning(self):
"""
Manually stop the attached Ursula's node learning protocol.
"""
return self.ursula.stop_learning_loop()
def stop(self):
"""
Shutdown the attached running Ursula node.
"""
return reactor.stop()
class JSONRPCLineReceiver(LineReceiver):
encoding = 'utf-8'
delimiter = os.linesep.encode(encoding=encoding)
__ipc_endpoint = Path("/tmp/nucypher.ipc")
class IPCWriter(StandardIO):
pass
def __init__(self, rpc_controller, capture_output: bool = False):
super().__init__()
self.rpc_controller = rpc_controller
self.start_time = maya.now()
self.__captured_output = list()
self.capture_output = capture_output
self.__ipc_fd = None
self.__ipc_writer = None
self.log = Logger(f"JSON-RPC-{rpc_controller.app_name}") # TODO needs ID
@property
def captured_output(self):
return self.__captured_output
def connectionMade(self):
self.__ipc_fd = open(self.__ipc_endpoint, 'ab+')
self.__ipc_writer = self.__ipc_fd.write
# Hookup the IPC endpoint file
self.transport.write = self.__ipc_writer
self.log.info(f"JSON RPC-IPC endpoint opened at {self.__ipc_endpoint.absolute()}."
f" Listening for messages.") # TODO
def connectionLost(self, reason=connectionDone) -> None:
self.__ipc_fd.close()
self.__ipc_endpoint.unlink()
self.log.info("JSON RPC-IPC Endpoint Closed.") # TODO
def rawDataReceived(self, data):
pass
def lineReceived(self, line):
line = line.strip(self.delimiter)
if line:
self.rpc_controller.handle_request(control_request=line)

View File

@ -1,117 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import json
from collections import deque
import pytest
import sys
from nucypher.cli.processes import JSONRPCLineReceiver
class TransportTrap:
"""Temporarily diverts system standard output"""
def __init__(self):
self.___stdout = sys.stdout
self.buffer = deque()
def __enter__(self):
"""Diversion"""
sys.stdout = self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Return to normal"""
sys.stdout = self.___stdout
def read(self, lines: int = 1):
# Read from faked buffer
results = list()
for readop in range(lines):
results.append(self.buffer.popleft())
# return the popped values
if not lines > 1:
results = results[0]
return results
def write(self, data) -> int:
if data != '\n':
self.buffer.append(data)
size = len(data)
return size
def flush(self) -> None:
pass
@pytest.fixture(scope='module')
def rpc_protocol(federated_alice):
rpc_controller = federated_alice.make_rpc_controller()
protocol = JSONRPCLineReceiver(rpc_controller=rpc_controller, capture_output=True)
yield protocol
def test_alice_rpc_controller_creation(federated_alice):
rpc_controller = federated_alice.make_rpc_controller()
protocol = JSONRPCLineReceiver(rpc_controller=rpc_controller)
assert protocol.rpc_controller == federated_alice.controller
def test_rpc_invalid_input(rpc_protocol, federated_alice):
"""
Example test data fround here: https://www.jsonrpc.org/specification
"""
semi_valid_collection = dict(
# description = (input, error code)
# Semi-valid
number_only=(42, -32600),
empty_batch_request=([], -32600),
empty_request=({}, -32600),
bogus_input=({'bogus': 'input'}, -32600),
non_existent_method=({"jsonrpc": "2.0", "method": "llamas", "id": "9"}, -32601),
invalid_request=({"jsonrpc": "2.0", "method": 1, "params": "bar"}, -32600),
# Malformed
invalid_json=(b'{"jsonrpc": "2.0", "method": "foobar, "params": "bar", "baz]', -32700),
invalid_batch=(b'[{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}, '
b'{"jsonrpc": "2.0", "method"]', -32700)
)
with TransportTrap():
for description, payload in semi_valid_collection.items():
request, expected_error_code = payload
# Allow malformed input to passthrough
if not isinstance(request, bytes):
request = bytes(json.dumps(request), encoding='utf-8')
rpc_protocol.lineReceived(line=request)
stdout = sys.stdout.read(lines=1)
deserialized_response = json.loads(stdout)
assert 'jsonrpc' in deserialized_response
actual_error_code = int(deserialized_response['error']['code'])
assert (actual_error_code == expected_error_code), str(request)

View File

@ -1,121 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import sys
from contextlib import contextmanager
from io import StringIO
import pytest
from nucypher.cli.processes import UrsulaCommandProtocol
from nucypher.control.emitters import StdoutEmitter
@contextmanager
def capture_output():
new_out, new_err = StringIO(), StringIO()
old_out, old_err = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = new_out, new_err
yield sys.stdout, sys.stderr
finally:
sys.stdout, sys.stderr = old_out, old_err
@pytest.fixture(scope='module')
def ursula(federated_ursulas):
ursula = federated_ursulas.pop()
return ursula
@pytest.fixture(scope='module')
def protocol(ursula):
emitter = StdoutEmitter()
protocol = UrsulaCommandProtocol(ursula=ursula, emitter=emitter)
return protocol
def test_ursula_command_protocol_creation(ursula):
emitter = StdoutEmitter()
protocol = UrsulaCommandProtocol(ursula=ursula, emitter=emitter)
assert protocol.ursula == ursula
assert b'Ursula' in protocol.prompt
def test_ursula_command_help(protocol, ursula):
class FakeTransport:
"""This is a transport"""
mock_output = b''
@staticmethod
def write(data: bytes):
FakeTransport.mock_output += data
protocol.transport = FakeTransport
with capture_output() as (out, err):
protocol.lineReceived(line=b'bananas')
commands = protocol.commands
commands = list(set(commands) - set(protocol._hidden_commands))
# Ensure all commands are in the help text
result = out.getvalue()
assert "Invalid input" in result
for command in commands:
assert command in result, '{} is missing from help text'.format(command)
for command in protocol._hidden_commands:
assert command not in result, f'Hidden command {command} in help text'
# Try again with valid 'help' command
with capture_output() as (out, err):
protocol.lineReceived(line=b'help')
result = out.getvalue()
assert "Invalid input" not in result
for command in commands:
assert command in result, '{} is missing from help text'.format(command)
for command in protocol._hidden_commands:
assert command not in result, f'Hidden command {command} in help text'
# Blank lines are OK!
with capture_output() as (out, err):
protocol.lineReceived(line=b'')
assert protocol.prompt in FakeTransport.mock_output
def test_ursula_command_status(protocol, ursula):
with capture_output() as (out, err):
protocol.paintStatus()
result = out.getvalue()
assert ursula.checksum_address in result
assert '...' in result
assert 'Known Nodes' in result
def test_ursula_command_known_nodes(protocol, ursula):
with capture_output() as (out, err):
protocol.paintKnownNodes()
result = out.getvalue()
assert 'Known Nodes' in result
assert ursula.checksum_address not in result

View File

@ -1,124 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import os
from base64 import b64encode
import pytest
from nucypher.control.specifications.exceptions import InvalidInputData
from nucypher.network.nodes import Learner
from tests.utils.middleware import MockRestMiddleware
from tests.utils.policy import retrieval_request_setup, retrieval_params_decode_from_rest
def test_get_ursulas(blockchain_porter_rpc_controller, blockchain_ursulas):
method = 'get_ursulas'
expected_response_id = 0
quantity = 4
blockchain_ursulas_list = list(blockchain_ursulas)
include_ursulas = [blockchain_ursulas_list[0].checksum_address, blockchain_ursulas_list[1].checksum_address]
exclude_ursulas = [blockchain_ursulas_list[2].checksum_address, blockchain_ursulas_list[3].checksum_address]
get_ursulas_params = {
'quantity': quantity,
'include_ursulas': include_ursulas,
'exclude_ursulas': exclude_ursulas
}
#
# Success
#
request_data = {'method': method, 'params': get_ursulas_params}
response = blockchain_porter_rpc_controller.send(request_data)
expected_response_id += 1
assert response.success
# assert response.id == expected_response_id # FIXME
ursulas_info = response.data['result']['ursulas']
returned_ursula_addresses = {ursula_info['checksum_address'] for ursula_info in ursulas_info} # ensure no repeats
assert len(returned_ursula_addresses) == quantity
for address in include_ursulas:
assert address in returned_ursula_addresses
for address in exclude_ursulas:
assert address not in returned_ursula_addresses
# Confirm the same message send works again, with a unique ID
request_data = {'method': method, 'params': get_ursulas_params}
rpc_response = blockchain_porter_rpc_controller.send(request=request_data)
expected_response_id += 1
assert rpc_response.success
# assert rpc_response.id == expected_response_id # FIXME
#
# Failure case
#
failed_ursula_params = dict(get_ursulas_params)
failed_ursula_params['quantity'] = len(blockchain_ursulas_list) + 1 # too many to get
request_data = {'method': method, 'params': failed_ursula_params}
with pytest.raises(Learner.NotEnoughNodes):
blockchain_porter_rpc_controller.send(request_data)
def test_retrieve_cfrags(blockchain_porter,
blockchain_porter_rpc_controller,
random_blockchain_policy,
blockchain_bob,
blockchain_alice,
random_context):
method = 'retrieve_cfrags'
# Setup
network_middleware = MockRestMiddleware()
# enact new random policy since idle_blockchain_policy/enacted_blockchain_policy already modified in previous tests
enacted_policy = random_blockchain_policy.enact(network_middleware=network_middleware)
retrieve_cfrags_params, _ = retrieval_request_setup(enacted_policy,
blockchain_bob,
blockchain_alice,
encode_for_rest=True)
# Success
request_data = {'method': method, 'params': retrieve_cfrags_params}
response = blockchain_porter_rpc_controller.send(request_data)
assert response.success
retrieval_results = response.data['result']['retrieval_results']
assert retrieval_results
# expected results - can only compare length of results, ursulas are randomized to obtain cfrags
retrieve_args = retrieval_params_decode_from_rest(retrieve_cfrags_params)
expected_results = blockchain_porter.retrieve_cfrags(**retrieve_args)
assert len(retrieval_results) == len(expected_results)
# Use context
retrieve_cfrags_params_with_context, _ = retrieval_request_setup(enacted_policy,
blockchain_bob,
blockchain_alice,
context=random_context,
encode_for_rest=True)
request_data = {'method': method, 'params': retrieve_cfrags_params_with_context}
response = blockchain_porter_rpc_controller.send(request_data)
assert response.success
retrieval_results = response.data['result']['retrieval_results']
assert retrieval_results
# Failure - use encrypted treasure map
failure_retrieve_cfrags_params = dict(retrieve_cfrags_params)
failure_retrieve_cfrags_params['treasure_map'] = b64encode(os.urandom(32)).decode()
request_data = {'method': method, 'params': failure_retrieve_cfrags_params}
with pytest.raises(InvalidInputData):
blockchain_porter_rpc_controller.send(request_data)

View File

@ -1,137 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from base64 import b64encode
import datetime
import maya
import pytest
from nucypher.characters.lawful import Enrico
from nucypher.crypto.powers import DecryptingPower
@pytest.fixture(scope='module')
def alice_web_controller_test_client(federated_alice):
web_controller = federated_alice.make_web_controller(crash_on_error=True)
yield web_controller.test_client()
@pytest.fixture(scope='module')
def bob_web_controller_test_client(federated_bob):
web_controller = federated_bob.make_web_controller(crash_on_error=True)
yield web_controller.test_client()
@pytest.fixture(scope='module')
def enrico_web_controller_test_client(capsule_side_channel):
_message_kit = capsule_side_channel()
web_controller = capsule_side_channel.enrico.make_web_controller(crash_on_error=True)
yield web_controller.test_client()
@pytest.fixture(scope='module')
def enrico_web_controller_from_alice(federated_alice, random_policy_label):
enrico = Enrico.from_alice(federated_alice, random_policy_label)
web_controller = enrico.make_web_controller(crash_on_error=True)
yield web_controller.test_client()
#
# RPC
#
@pytest.fixture(scope='module')
def alice_rpc_test_client(federated_alice):
rpc_controller = federated_alice.make_rpc_controller(crash_on_error=True)
yield rpc_controller.test_client()
@pytest.fixture(scope='module')
def bob_rpc_controller(federated_bob):
rpc_controller = federated_bob.make_rpc_controller(crash_on_error=True)
yield rpc_controller.test_client()
@pytest.fixture(scope='module')
def enrico_rpc_controller_test_client(capsule_side_channel):
# Side Channel
_message_kit = capsule_side_channel()
# RPC Controler
rpc_controller = capsule_side_channel.enrico.make_rpc_controller(crash_on_error=True)
yield rpc_controller.test_client()
@pytest.fixture(scope='module')
def enrico_rpc_controller_from_alice(federated_alice, random_policy_label):
enrico = Enrico.from_alice(federated_alice, random_policy_label)
rpc_controller = enrico.make_rpc_controller(crash_on_error=True)
yield rpc_controller.test_client()
@pytest.fixture(scope='module')
def create_policy_control_request(federated_bob):
method_name = 'create_policy'
bob_pubkey_enc = federated_bob.public_keys(DecryptingPower)
params = {
'bob_encrypting_key': bytes(bob_pubkey_enc).hex(),
'bob_verifying_key': bytes(federated_bob.stamp).hex(),
'label': b64encode(bytes(b'test')).decode(),
'threshold': 2,
'shares': 3,
'expiration': (maya.now() + datetime.timedelta(days=3)).iso8601(),
}
return method_name, params
@pytest.fixture(scope='module')
def grant_control_request(federated_bob):
method_name = 'grant'
bob_pubkey_enc = federated_bob.public_keys(DecryptingPower)
params = {
'bob_encrypting_key': bytes(bob_pubkey_enc).hex(),
'bob_verifying_key': bytes(federated_bob.stamp).hex(),
'label': 'test',
'threshold': 2,
'shares': 3,
'expiration': (maya.now() + datetime.timedelta(days=3)).iso8601(),
}
return method_name, params
@pytest.fixture(scope='module')
def retrieve_control_request(federated_bob, enacted_federated_policy, capsule_side_channel):
method_name = 'retrieve_and_decrypt'
message_kit = capsule_side_channel()
params = {
'alice_verifying_key': bytes(enacted_federated_policy.publisher_verifying_key).hex(),
'message_kits': [b64encode(bytes(message_kit)).decode()],
'encrypted_treasure_map': b64encode(bytes(enacted_federated_policy.treasure_map)).decode()
}
return method_name, params
@pytest.fixture(scope='module')
def encrypt_control_request():
method_name = 'encrypt_message'
params = {
'message': b64encode(b"The admiration I had for your work has completely evaporated!").decode(),
}
return method_name, params

View File

@ -1,89 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from nucypher.characters.control.interfaces import BobInterface
from tests.utils.controllers import validate_json_rpc_response_data
def test_alice_rpc_character_control_create_policy(alice_rpc_test_client, create_policy_control_request):
alice_rpc_test_client.__class__.MESSAGE_ID = 0
method_name, params = create_policy_control_request
request_data = {'method': method_name, 'params': params}
rpc_response = alice_rpc_test_client.send(request=request_data)
assert rpc_response.success is True
assert rpc_response.id == 1
try:
bytes.fromhex(rpc_response.content['policy_encrypting_key'])
except (KeyError, ValueError):
pytest.fail("Invalid Policy Encrypting Key")
# Confirm the same message send works again, with a unique ID
request_data = {'method': method_name, 'params': params}
rpc_response = alice_rpc_test_client.send(request=request_data)
assert rpc_response.success is True
assert rpc_response.id == 2
# Send bad data to assert error returns (Request #3)
alice_rpc_test_client.crash_on_error = False
response = alice_rpc_test_client.send(request={'bogus': 'input'}, malformed=True)
assert response.error_code == -32600
# Send a bulk create policy request
bulk_request = list()
for i in range(50):
request_data = {'method': method_name, 'params': params}
bulk_request.append(request_data)
rpc_responses = alice_rpc_test_client.send(request=bulk_request)
for response_id, rpc_response in enumerate(rpc_responses, start=3):
assert rpc_response.success is True
assert rpc_response.id == response_id
def test_alice_rpc_character_control_derive_policy_encrypting_key(alice_rpc_test_client):
method_name = 'derive_policy_encrypting_key'
request_data = {'method': method_name, 'params': {'label': 'test'}}
response = alice_rpc_test_client.send(request_data)
assert response.success is True
assert 'jsonrpc' in response.data
def test_alice_rpc_character_control_grant(alice_rpc_test_client, grant_control_request):
method_name, params = grant_control_request
request_data = {'method': method_name, 'params': params}
response = alice_rpc_test_client.send(request_data)
assert 'jsonrpc' in response.data
def test_enrico_rpc_character_control_encrypt_message(enrico_rpc_controller_test_client, encrypt_control_request):
method_name, params = encrypt_control_request
request_data = {'method': method_name, 'params': params}
response = enrico_rpc_controller_test_client.send(request_data)
assert 'jsonrpc' in response.data
def test_bob_rpc_character_control_retrieve(bob_rpc_controller, retrieve_control_request):
method_name, params = retrieve_control_request
request_data = {'method': method_name, 'params': params}
response = bob_rpc_controller.send(request_data)
assert validate_json_rpc_response_data(response=response,
method_name=method_name,
interface=BobInterface)

View File

@ -1,120 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from base64 import b64encode
import pytest
from nucypher.control.specifications.exceptions import InvalidInputData
from nucypher.network.nodes import Learner
from tests.utils.policy import retrieval_request_setup, retrieval_params_decode_from_rest
def test_get_ursulas(federated_porter_rpc_controller, federated_ursulas):
method = 'get_ursulas'
quantity = 4
federated_ursulas_list = list(federated_ursulas)
include_ursulas = [federated_ursulas_list[0].checksum_address, federated_ursulas_list[1].checksum_address]
exclude_ursulas = [federated_ursulas_list[2].checksum_address, federated_ursulas_list[3].checksum_address]
get_ursulas_params = {
'quantity': quantity,
'include_ursulas': include_ursulas,
'exclude_ursulas': exclude_ursulas
}
#
# Success
#
request_data = {'method': method, 'params': get_ursulas_params}
response = federated_porter_rpc_controller.send(request_data)
expected_response_id = response.id
assert response.success
ursulas_info = response.data['result']['ursulas']
returned_ursula_addresses = {ursula_info['checksum_address'] for ursula_info in ursulas_info} # ensure no repeats
assert len(returned_ursula_addresses) == quantity
for address in include_ursulas:
assert address in returned_ursula_addresses
for address in exclude_ursulas:
assert address not in returned_ursula_addresses
# Confirm the same message send works again, with a unique ID
request_data = {'method': method, 'params': get_ursulas_params}
rpc_response = federated_porter_rpc_controller.send(request=request_data)
expected_response_id += 1
assert rpc_response.success
assert rpc_response.id == expected_response_id
#
# Failure case
#
failed_ursula_params = dict(get_ursulas_params)
failed_ursula_params['quantity'] = len(federated_ursulas_list) + 1 # too many to get
request_data = {'method': method, 'params': failed_ursula_params}
with pytest.raises(Learner.NotEnoughNodes):
federated_porter_rpc_controller.send(request_data)
def test_retrieve_cfrags(federated_porter,
federated_porter_rpc_controller,
enacted_federated_policy,
federated_bob,
federated_alice,
random_federated_treasure_map_data,
random_context):
method = 'retrieve_cfrags'
# Setup
retrieve_cfrags_params, _ = retrieval_request_setup(enacted_federated_policy,
federated_bob,
federated_alice,
encode_for_rest=True)
# Success
request_data = {'method': method, 'params': retrieve_cfrags_params}
response = federated_porter_rpc_controller.send(request_data)
assert response.success
retrieval_results = response.data['result']['retrieval_results']
assert retrieval_results
# expected results - can only compare length of results, ursulas are randomized to obtain cfrags
retrieve_args = retrieval_params_decode_from_rest(retrieve_cfrags_params)
expected_results = federated_porter.retrieve_cfrags(**retrieve_args)
assert len(retrieval_results) == len(expected_results)
# Use context
retrieve_cfrags_params_with_context, _ = retrieval_request_setup(enacted_federated_policy,
federated_bob,
federated_alice,
context=random_context,
encode_for_rest=True)
request_data = {'method': method, 'params': retrieve_cfrags_params_with_context}
response = federated_porter_rpc_controller.send(request_data)
assert response.success
retrieval_results = response.data['result']['retrieval_results']
assert retrieval_results
# Failure - use encrypted treasure map
failure_retrieve_cfrags_params = dict(retrieve_cfrags_params)
_, random_treasure_map = random_federated_treasure_map_data
failure_retrieve_cfrags_params['treasure_map'] = b64encode(bytes(random_treasure_map)).decode()
request_data = {'method': method, 'params': failure_retrieve_cfrags_params}
with pytest.raises(InvalidInputData):
federated_porter_rpc_controller.send(request_data)