Added some tests that provide a context for re-encryption.

pull/2960/head
derekpierre 2022-08-30 13:59:02 -04:00 committed by Kieran Prasch
parent 40e2c5c0ea
commit 15cb062940
9 changed files with 123 additions and 20 deletions

View File

@ -77,7 +77,8 @@ def test_retrieve_cfrags(blockchain_porter,
blockchain_porter_rpc_controller, blockchain_porter_rpc_controller,
random_blockchain_policy, random_blockchain_policy,
blockchain_bob, blockchain_bob,
blockchain_alice): blockchain_alice,
random_context):
method = 'retrieve_cfrags' method = 'retrieve_cfrags'
# Setup # Setup
@ -102,6 +103,19 @@ def test_retrieve_cfrags(blockchain_porter,
expected_results = blockchain_porter.retrieve_cfrags(**retrieve_args) expected_results = blockchain_porter.retrieve_cfrags(**retrieve_args)
assert len(retrieval_results) == len(expected_results) assert len(retrieval_results) == len(expected_results)
# Use context
retrieve_cfrags_params_with_context, _ = retrieval_request_setup(enacted_policy,
blockchain_bob,
blockchain_alice,
context=random_context,
encode_for_rest=True)
request_data = {'method': method, 'params': retrieve_cfrags_params_with_context}
response = blockchain_porter_rpc_controller.send(request_data)
assert response.success
retrieval_results = response.data['result']['retrieval_results']
assert retrieval_results
# Failure - use encrypted treasure map # Failure - use encrypted treasure map
failure_retrieve_cfrags_params = dict(retrieve_cfrags_params) failure_retrieve_cfrags_params = dict(retrieve_cfrags_params)
failure_retrieve_cfrags_params['treasure_map'] = b64encode(os.urandom(32)).decode() failure_retrieve_cfrags_params['treasure_map'] = b64encode(os.urandom(32)).decode()

View File

@ -23,6 +23,7 @@ from urllib.parse import urlencode
from nucypher_core import RetrievalKit from nucypher_core import RetrievalKit
from nucypher.characters.lawful import Enrico from nucypher.characters.lawful import Enrico
from nucypher.control.specifications.fields import Base64JSON
from nucypher.crypto.powers import DecryptingPower from nucypher.crypto.powers import DecryptingPower
from nucypher.policy.kits import PolicyMessageKit, RetrievalResult from nucypher.policy.kits import PolicyMessageKit, RetrievalResult
from nucypher.utilities.porter.control.specifications.fields import RetrievalResultSchema, RetrievalKit as RetrievalKitField from nucypher.utilities.porter.control.specifications.fields import RetrievalResultSchema, RetrievalKit as RetrievalKitField
@ -91,7 +92,8 @@ def test_retrieve_cfrags(blockchain_porter,
blockchain_porter_web_controller, blockchain_porter_web_controller,
random_blockchain_policy, random_blockchain_policy,
blockchain_bob, blockchain_bob,
blockchain_alice): blockchain_alice,
random_context):
# Send bad data to assert error return # Send bad data to assert error return
response = blockchain_porter_web_controller.post('/retrieve_cfrags', data=json.dumps({'bad': 'input'})) response = blockchain_porter_web_controller.post('/retrieve_cfrags', data=json.dumps({'bad': 'input'}))
assert response.status_code == 400 assert response.status_code == 400
@ -168,6 +170,21 @@ def test_retrieve_cfrags(blockchain_porter,
assert retrieval_results assert retrieval_results
assert len(retrieval_results) == 2 assert len(retrieval_results) == 2
#
# Use context
#
context_field = Base64JSON()
multiple_retrieval_kits_params['context'] = context_field._serialize(random_context, attr=None, obj=None)
response = blockchain_porter_web_controller.post('/retrieve_cfrags', data=json.dumps(
multiple_retrieval_kits_params))
assert response.status_code == 200
response_data = json.loads(response.data)
retrieval_results = response_data['result']['retrieval_results']
assert retrieval_results
assert len(retrieval_results) == 2
# #
# Try same retrieval (with multiple retrieval kits) using query parameters # Try same retrieval (with multiple retrieval kits) using query parameters
# #

View File

@ -76,3 +76,22 @@ def test_retrieve_cfrags(blockchain_porter,
# use porter # use porter
result = blockchain_porter.retrieve_cfrags(**retrieval_args) result = blockchain_porter.retrieve_cfrags(**retrieval_args)
assert result assert result
def test_retrieve_cfrags_with_context(blockchain_porter,
random_blockchain_policy,
blockchain_bob,
blockchain_alice,
random_context):
# Setup
network_middleware = MockRestMiddleware()
# enact new random policy since idle_blockchain_policy/enacted_blockchain_policy already modified in previous tests
enacted_policy = random_blockchain_policy.enact(network_middleware=network_middleware)
retrieval_args, _ = retrieval_request_setup(enacted_policy,
blockchain_bob,
blockchain_alice,
context=random_context)
# use porter
result = blockchain_porter.retrieve_cfrags(**retrieval_args)
assert result

View File

@ -21,6 +21,7 @@ import os
import random import random
import shutil import shutil
import tempfile import tempfile
from base64 import b64encode
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import partial from functools import partial
from pathlib import Path from pathlib import Path
@ -1034,3 +1035,23 @@ def basic_auth_file(temp_dir_path):
f.write("admin:$apr1$hlEpWVoI$0qjykXrvdZ0yO2TnBggQO0\n") f.write("admin:$apr1$hlEpWVoI$0qjykXrvdZ0yO2TnBggQO0\n")
yield basic_auth yield basic_auth
basic_auth.unlink() basic_auth.unlink()
#
# Condition Context
#
@pytest.fixture(scope='module')
def random_context():
context = {
"domain": {"name": "tdec", "version": 1, "chainId": 1, "salt": "blahblahblah"},
"message": {
"address": "0x03e75d7dd38cce2e20ffee35ec914c57780a8e29",
"conditions": b64encode(
"random condition for reencryption".encode()
).decode(),
"blockNumber": 15440685,
"blockHash": "0x2220da8b777767df526acffd5375ebb340fc98e53c1040b25ad1a8119829e3bd",
},
}
return context

View File

@ -14,10 +14,8 @@
You should have received a copy of the GNU Affero General Public License You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>. along with nucypher. If not, see <https://www.gnu.org/licenses/>.
""" """
from base64 import b64decode
import pytest import pytest
from nucypher_core import HRAC, TreasureMap from nucypher_core import HRAC, TreasureMap
from nucypher.crypto.powers import DecryptingPower from nucypher.crypto.powers import DecryptingPower

View File

@ -75,7 +75,8 @@ def test_retrieve_cfrags(federated_porter,
enacted_federated_policy, enacted_federated_policy,
federated_bob, federated_bob,
federated_alice, federated_alice,
random_federated_treasure_map_data): random_federated_treasure_map_data,
random_context):
method = 'retrieve_cfrags' method = 'retrieve_cfrags'
# Setup # Setup
@ -97,6 +98,19 @@ def test_retrieve_cfrags(federated_porter,
expected_results = federated_porter.retrieve_cfrags(**retrieve_args) expected_results = federated_porter.retrieve_cfrags(**retrieve_args)
assert len(retrieval_results) == len(expected_results) assert len(retrieval_results) == len(expected_results)
# Use context
retrieve_cfrags_params_with_context, _ = retrieval_request_setup(enacted_federated_policy,
federated_bob,
federated_alice,
context=random_context,
encode_for_rest=True)
request_data = {'method': method, 'params': retrieve_cfrags_params_with_context}
response = federated_porter_rpc_controller.send(request_data)
assert response.success
retrieval_results = response.data['result']['retrieval_results']
assert retrieval_results
# Failure - use encrypted treasure map # Failure - use encrypted treasure map
failure_retrieve_cfrags_params = dict(retrieve_cfrags_params) failure_retrieve_cfrags_params = dict(retrieve_cfrags_params)
_, random_treasure_map = random_federated_treasure_map_data _, random_treasure_map = random_federated_treasure_map_data

View File

@ -23,6 +23,7 @@ from urllib.parse import urlencode
from nucypher_core import RetrievalKit from nucypher_core import RetrievalKit
from nucypher.characters.lawful import Enrico from nucypher.characters.lawful import Enrico
from nucypher.control.specifications.fields import Base64JSON
from nucypher.crypto.powers import DecryptingPower from nucypher.crypto.powers import DecryptingPower
from nucypher.policy.kits import PolicyMessageKit, RetrievalResult from nucypher.policy.kits import PolicyMessageKit, RetrievalResult
from nucypher.utilities.porter.control.specifications.fields import RetrievalResultSchema, RetrievalKit as RetrievalKitField from nucypher.utilities.porter.control.specifications.fields import RetrievalResultSchema, RetrievalKit as RetrievalKitField
@ -90,7 +91,8 @@ def test_retrieve_cfrags(federated_porter,
enacted_federated_policy, enacted_federated_policy,
federated_bob, federated_bob,
federated_alice, federated_alice,
random_federated_treasure_map_data): random_federated_treasure_map_data,
random_context):
# Send bad data to assert error return # Send bad data to assert error return
response = federated_porter_web_controller.post('/retrieve_cfrags', data=json.dumps({'bad': 'input'})) response = federated_porter_web_controller.post('/retrieve_cfrags', data=json.dumps({'bad': 'input'}))
assert response.status_code == 400 assert response.status_code == 400
@ -169,6 +171,19 @@ def test_retrieve_cfrags(federated_porter,
assert retrieval_results assert retrieval_results
assert len(retrieval_results) == 4 assert len(retrieval_results) == 4
#
# Use context
#
context_field = Base64JSON()
multiple_retrieval_kits_params['context'] = context_field._serialize(random_context, attr=None, obj=None)
response = federated_porter_web_controller.post('/retrieve_cfrags', data=json.dumps(multiple_retrieval_kits_params))
assert response.status_code == 200
response_data = json.loads(response.data)
retrieval_results = response_data['result']['retrieval_results']
assert retrieval_results
assert len(retrieval_results) == 4
# #
# Try same retrieval (with multiple retrieval kits) using query parameters # Try same retrieval (with multiple retrieval kits) using query parameters
# #

View File

@ -74,3 +74,18 @@ def test_retrieve_cfrags(federated_porter,
result = federated_porter.retrieve_cfrags(**retrieval_args) result = federated_porter.retrieve_cfrags(**retrieval_args)
assert result, "valid result returned" assert result, "valid result returned"
def test_retrieve_cfrags_with_context(federated_porter,
federated_bob,
federated_alice,
enacted_federated_policy,
random_context):
# Setup
retrieval_args, _ = retrieval_request_setup(enacted_federated_policy,
federated_bob,
federated_alice,
context=random_context)
result = federated_porter.retrieve_cfrags(**retrieval_args)
assert result, "valid result returned"

View File

@ -169,7 +169,8 @@ def test_alice_revoke():
def test_bob_retrieve_cfrags(federated_porter, def test_bob_retrieve_cfrags(federated_porter,
enacted_federated_policy, enacted_federated_policy,
federated_bob, federated_bob,
federated_alice): federated_alice,
random_context):
bob_retrieve_cfrags_schema = BobRetrieveCFrags() bob_retrieve_cfrags_schema = BobRetrieveCFrags()
# no args # no args
@ -184,23 +185,12 @@ def test_bob_retrieve_cfrags(federated_porter,
bob_retrieve_cfrags_schema.load(retrieval_args) bob_retrieve_cfrags_schema.load(retrieval_args)
# simple schema load w/ optional context # simple schema load w/ optional context
context = {
"domain": {"name": "tdec", "version": 1, "chainId": 1, "salt": "blahblahblah"},
"message": {
"address": "0x03e75d7dd38cce2e20ffee35ec914c57780a8e29",
"conditions": b64encode(
"random condition for reencryption".encode()
).decode(),
"blockNumber": 15440685,
"blockHash": "0x2220da8b777767df526acffd5375ebb340fc98e53c1040b25ad1a8119829e3bd",
},
}
retrieval_args, _ = retrieval_request_setup( retrieval_args, _ = retrieval_request_setup(
enacted_federated_policy, enacted_federated_policy,
federated_bob, federated_bob,
federated_alice, federated_alice,
encode_for_rest=True, encode_for_rest=True,
context=context, context=random_context,
) )
bob_retrieve_cfrags_schema.load(retrieval_args) bob_retrieve_cfrags_schema.load(retrieval_args)
@ -220,7 +210,7 @@ def test_bob_retrieve_cfrags(federated_porter,
federated_bob, federated_bob,
federated_alice, federated_alice,
encode_for_rest=False, encode_for_rest=False,
context=context, context=random_context,
) )
retrieval_results = federated_porter.retrieve_cfrags(**non_encoded_retrieval_args) retrieval_results = federated_porter.retrieve_cfrags(**non_encoded_retrieval_args)
expected_retrieval_results_json = [] expected_retrieval_results_json = []