Merge pull request #3016 from derekpierre/testing-blitz

Testing blitz to improve code coverage.
pull/3024/head
KPrasch 2022-11-17 11:40:25 +00:00 committed by GitHub
commit 275751f340
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 223 additions and 92 deletions

6
.gitignore vendored
View File

@ -10,7 +10,6 @@ __pycache__
/MANIFEST
/.tox
/.idea
.coverage
_temp_test_datastore
.mypy_cache
.pytest_cache/
@ -51,3 +50,8 @@ docs/source/contracts_api/
tests/unit/.hypothesis/
.hypothesis/
# Coverage
.coverage
coverage.*
htmlcov/*

View File

@ -1,15 +1,12 @@
import hashlib
import json
import shutil
import tempfile
from abc import ABC, abstractmethod
from json import JSONDecodeError
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Tuple, Type, Union
import requests
from constant_sorrow.constants import REGISTRY_COMMITTED
from nucypher.blockchain.eth import CONTRACT_REGISTRY_BASE
from nucypher.blockchain.eth.networks import NetworksInventory
@ -404,32 +401,6 @@ class LocalContractRegistry(BaseContractRegistry):
return payload
class TemporaryContractRegistry(LocalContractRegistry):
def __init__(self, *args, **kwargs) -> None:
_, self.temp_filepath = tempfile.mkstemp()
super().__init__(filepath=self.temp_filepath, *args, **kwargs)
def clear(self):
self.log.info("Cleared temporary registry at {}".format(self.filepath))
with open(self.filepath, 'w') as registry_file:
registry_file.write('')
def commit(self, filepath) -> Path:
"""writes the current state of the registry to a file"""
self.log.info("Committing temporary registry to {}".format(filepath))
self._swap_registry(filepath) # I'll allow it
if filepath.exists():
self.log.debug("Removing registry {}".format(filepath))
self.clear() # clear prior sim runs
_ = shutil.copy(self.temp_filepath, filepath)
self.temp_filepath = REGISTRY_COMMITTED # just in case
self.log.info("Wrote temporary registry to filesystem {}".format(filepath))
return filepath
class InMemoryContractRegistry(BaseContractRegistry):
def __init__(self, *args, **kwargs):

View File

@ -7,10 +7,9 @@ from typing import ClassVar, Dict, List, Optional
from constant_sorrow.constants import (
NO_BLOCKCHAIN_CONNECTION,
NO_CONTROL_PROTOCOL,
NO_NICKNAME,
NO_SIGNING_POWER,
STRANGER
STRANGER,
)
from eth_keys import KeyAPI as EthKeyAPI
from eth_utils import to_canonical_address
@ -18,7 +17,10 @@ from nucypher_core import MessageKit
from nucypher_core.umbral import PublicKey
from nucypher.acumen.nicknames import Nickname
from nucypher.blockchain.eth.registry import BaseContractRegistry, InMemoryContractRegistry
from nucypher.blockchain.eth.registry import (
BaseContractRegistry,
InMemoryContractRegistry,
)
from nucypher.blockchain.eth.signers.base import Signer
from nucypher.crypto.keystore import Keystore
from nucypher.crypto.powers import (
@ -26,12 +28,9 @@ from nucypher.crypto.powers import (
CryptoPowerUp,
DecryptingPower,
NoSigningPower,
SigningPower
)
from nucypher.crypto.signing import (
SignatureStamp,
StrangerStamp,
SigningPower,
)
from nucypher.crypto.signing import SignatureStamp, StrangerStamp
from nucypher.network.middleware import RestMiddleware
from nucypher.network.nodes import Learner

View File

@ -14,27 +14,12 @@ from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import Certificate
from cryptography.x509.oid import NameOID
from nucypher_core.umbral import SecretKey
_TLS_CERTIFICATE_ENCODING = Encoding.PEM
_TLS_CURVE = ec.SECP384R1
def _write_tls_certificate(certificate: Certificate,
full_filepath: Path,
force: bool = False,
) -> Path:
cert_already_exists = full_filepath.is_file()
if force is False and cert_already_exists:
raise FileExistsError('A TLS certificate already exists at {}.'.format(full_filepath.resolve()))
with open(full_filepath, 'wb') as certificate_file:
public_pem_bytes = certificate.public_bytes(_TLS_CERTIFICATE_ENCODING)
certificate_file.write(public_pem_bytes)
return full_filepath
def _read_tls_certificate(filepath: Path) -> Certificate:
"""Deserialize an X509 certificate from a filepath"""
try:

View File

@ -1,8 +1,9 @@
import json
import re
from http import HTTPStatus
from typing import Dict, NamedTuple, Optional, Tuple, Type, Union
from typing import Dict, NamedTuple, Optional, Type, Union
from marshmallow import Schema, post_dump
from web3.providers import BaseProvider
@ -33,6 +34,11 @@ def to_camelcase(s):
return next(parts) + "".join(i.title() for i in parts)
def camel_case_to_snake(data: str) -> str:
data = re.sub(r"(?<!^)(?=[A-Z])", "_", data).lower()
return data
class CamelCaseSchema(Schema):
"""Schema that uses camel-case for its external representation
and snake-case for its internal representation.
@ -92,7 +98,7 @@ def _deserialize_condition_lingo(data: Union[str, Dict[str, str]]) -> Union['Ope
def evaluate_conditions(
lingo: "ConditionLingo",
providers: Optional[Dict[str, BaseProvider]] = None,
providers: Optional[Dict[int, BaseProvider]] = None,
context: Optional[Dict[Union[str, int], Union[str, int]]] = None,
log: Logger = __LOGGER,
) -> Optional[EvalError]:

View File

@ -95,7 +95,7 @@ def get_context_value(context_variable: str, **context) -> Any:
value = context.get(context_variable)
if value is None:
raise RequiredContextVariable(
f'"No value provided for unrecognized context variable "{context_variable}"'
f'No value provided for unrecognized context variable "{context_variable}"'
)
else:
value = func(**context) # required inputs here

View File

@ -1,6 +1,5 @@
import re
from typing import Any, Dict, List, Optional, Tuple
from eth_typing import ChecksumAddress
@ -12,7 +11,7 @@ from web3.providers import BaseProvider
from web3.types import ABIFunction
from nucypher.policy.conditions import STANDARD_ABI_CONTRACT_TYPES, STANDARD_ABIS
from nucypher.policy.conditions._utils import CamelCaseSchema
from nucypher.policy.conditions._utils import CamelCaseSchema, camel_case_to_snake
from nucypher.policy.conditions.base import ReencryptionCondition
from nucypher.policy.conditions.context import get_context_value, is_context_variable
from nucypher.policy.conditions.exceptions import (
@ -60,17 +59,9 @@ def _resolve_abi(
except ValueError as e:
raise InvalidCondition(str(e))
if not function_abi:
raise InvalidCondition(f"No function ABI supplied for '{method}'")
return ABIFunction(function_abi)
def camel_case_to_snake(data: str) -> str:
data = re.sub(r'(?<!^)(?=[A-Z])', '_', data).lower()
return data
def _resolve_any_context_variables(
parameters: List[Any], return_value_test: ReturnValueTest, **context
):

View File

@ -8,29 +8,6 @@ from web3._utils.contracts import encode_abi
from web3.contract import ContractConstructor
def to_bytes32(value=None, hexstr=None) -> bytes:
return Web3.toBytes(primitive=value, hexstr=hexstr).rjust(32, b'\0')
def to_32byte_hex(value=None, hexstr=None) -> str:
return Web3.toHex(to_bytes32(value=value, hexstr=hexstr))
def get_mapping_entry_location(key: bytes, mapping_location: int) -> int:
if not(isinstance(key, bytes) and len(key) == 32):
raise ValueError("Mapping key must be a 32-long bytestring")
# See https://solidity.readthedocs.io/en/latest/internals/layout_in_storage.html#mappings-and-dynamic-arrays
entry_location = Web3.toInt(Web3.keccak(key + mapping_location.to_bytes(32, "big")))
return entry_location
def get_array_data_location(array_location: int) -> int:
# See https://solidity.readthedocs.io/en/latest/internals/layout_in_storage.html#mappings-and-dynamic-arrays
data_location = Web3.toInt(Web3.keccak(to_bytes32(array_location)))
return data_location
def encode_constructor_arguments(web3: Web3,
constructor_function: ContractConstructor,
*constructor_args, **constructor_kwargs) -> HexStr:
@ -52,7 +29,7 @@ def connect_web3_provider(eth_provider_uri: str) -> None:
"""
Convenience function for connecting to an ethereum provider now.
This may be used to optimize the startup time of some applications by
establishing the connection eagarly.
establishing the connection eagerly.
"""
from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory

View File

@ -1,3 +1,5 @@
import pytest
from nucypher.policy.conditions.lingo import ConditionLingo
CONDITIONS = [
@ -13,7 +15,21 @@ CONDITIONS = [
]
def test_compound_condition_timelock():
def test_invalid_condition():
with pytest.raises(Exception):
ConditionLingo.from_list([{}])
with pytest.raises(Exception):
ConditionLingo.from_list([{"dont_mind_me": "nothing_to_see_here"}])
def test_condition_lingo_to_from_list():
clingo = ConditionLingo.from_list(CONDITIONS)
clingo_list = clingo.to_list()
assert clingo_list == CONDITIONS
def test_compound_condition():
clingo = ConditionLingo.from_list(CONDITIONS)
assert clingo.eval()

View File

@ -2,7 +2,7 @@ import json
from nucypher.policy.conditions._utils import _deserialize_condition_lingo
from nucypher.policy.conditions.evm import ContractCondition
from nucypher.policy.conditions.lingo import ConditionLingo
from nucypher.policy.conditions.lingo import ConditionLingo, Operator
def test_evm_condition_function_abi(t_staking_data):
@ -40,6 +40,7 @@ def test_type_resolution_from_json(
def test_conditions_lingo_serialization(lingo):
# json
json_serialized_lingo = json.dumps([l.to_dict() for l in lingo.conditions])
lingo_json = lingo.to_json()
restored_lingo = ConditionLingo.from_json(data=lingo_json)
@ -47,8 +48,28 @@ def test_conditions_lingo_serialization(lingo):
restored_lingo_json = restored_lingo.to_json()
assert restored_lingo_json == json_serialized_lingo
# base64
lingo_b64 = restored_lingo.to_base64()
restored_lingo = ConditionLingo.from_base64(lingo_b64)
# after all the serialization and transformation the content must remain identical
assert restored_lingo.to_json() == lingo_json
def test_reencryption_condition_to_from_bytes(lingo):
# bytes
for l in lingo.conditions:
if isinstance(l, Operator):
# operators don't have byte representations
continue
condition_bytes = bytes(l)
condition = l.__class__.from_bytes(condition_bytes)
assert condition.to_json() == l.to_json()
def test_reencryption_condition_to_from_dict(lingo):
# bytes
for l in lingo.conditions:
condition_bytes = l.to_dict()
condition = l.__class__.from_dict(condition_bytes)
assert condition.to_json() == l.to_json()

View File

@ -0,0 +1,126 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from dataclasses import dataclass
from http import HTTPStatus
from typing import List, Optional, Tuple, Type
from unittest.mock import Mock
import pytest
from marshmallow import fields
from web3.providers import BaseProvider
from nucypher.policy.conditions._utils import (
CamelCaseSchema,
camel_case_to_snake,
evaluate_conditions,
to_camelcase,
)
from nucypher.policy.conditions.exceptions import *
FAILURE_CASE_EXCEPTION_CODE_MATCHING = [
# (exception, constructor parameters, expected status code)
(ReturnValueEvaluationError, None, HTTPStatus.BAD_REQUEST),
(InvalidCondition, None, HTTPStatus.BAD_REQUEST),
(RequiredContextVariable, None, HTTPStatus.BAD_REQUEST),
(InvalidContextVariableData, None, HTTPStatus.BAD_REQUEST),
(ContextVariableVerificationFailed, None, HTTPStatus.FORBIDDEN),
(NoConnectionToChain, [1], HTTPStatus.NOT_IMPLEMENTED),
(ConditionEvaluationFailed, None, HTTPStatus.BAD_REQUEST),
(Exception, None, HTTPStatus.INTERNAL_SERVER_ERROR),
]
@pytest.mark.parametrize("failure_case", FAILURE_CASE_EXCEPTION_CODE_MATCHING)
def test_evaluate_condition_exception_cases(
failure_case: Tuple[Type[Exception], Optional[List], int]
):
exception_class, exception_constructor_params, expected_status_code = failure_case
exception_constructor_params = exception_constructor_params or []
condition_lingo = Mock()
condition_lingo.eval.side_effect = exception_class(*exception_constructor_params)
eval_error = evaluate_conditions(
lingo=condition_lingo
) # provider and context default to empty dicts
assert eval_error
assert eval_error.status_code == expected_status_code
def test_evaluate_condition_eval_returns_false():
condition_lingo = Mock()
condition_lingo.eval.return_value = False
eval_error = evaluate_conditions(
lingo=condition_lingo,
providers={1: Mock(spec=BaseProvider)}, # fake provider
context={"key": "value"}, # fake context
)
assert eval_error
assert eval_error.status_code == HTTPStatus.FORBIDDEN
def test_evaluate_condition_eval_returns_true():
condition_lingo = Mock()
condition_lingo.eval.return_value = True
eval_error = evaluate_conditions(
lingo=condition_lingo,
providers={
1: Mock(spec=BaseProvider),
2: Mock(spec=BaseProvider),
}, # multiple fake provider
context={"key1": "value1", "key2": "value2"}, # multiple values in fake context
)
assert eval_error is None
@pytest.mark.parametrize(
"test_case",
(
("nounderscores", "nounderscores"),
("one_underscore", "oneUnderscore"),
("two_under_scores", "twoUnderScores"),
),
)
def test_to_from_camel_case(test_case: Tuple[str, str]):
# test to_camelcase()
snake_case, camel_case = test_case
result = to_camelcase(snake_case)
assert result == camel_case
# test camel_case_to_snake()
result = camel_case_to_snake(camel_case)
assert result == snake_case
def test_camel_case_schema():
# test CamelCaseSchema
@dataclass
class Function:
field_name_with_underscores: str
class FunctionSchema(CamelCaseSchema):
field_name_with_underscores = fields.Str()
value = "field_name_value"
function = Function(field_name_with_underscores=value)
schema = FunctionSchema()
output = schema.dump(function)
assert output == {"fieldNameWithUnderscores": f"{value}"}
reloaded_function = schema.load(output)
assert reloaded_function == {"field_name_with_underscores": f"{value}"}

View File

@ -66,3 +66,38 @@ def test_operator_bonded_but_becomes_unbonded(mocker, get_random_checksum_addres
finally:
ursula.stop.assert_called_once_with(halt_reactor=True) # stop entire reactor
tracker.stop()
def test_operator_handle_errors(mocker, get_random_checksum_address):
ursula = mocker.Mock()
tracker = OperatorBondedTracker(ursula=ursula)
f = mocker.Mock()
f.getTraceback.return_value = "traceback"
f.raiseException.side_effect = OperatorBondedTracker.OperatorNoLongerBonded()
# inconsequential exception so no exception raised
f.check.return_value = False
tracker.handle_errors(failure=f) # no exception
# exception that is cared about, so exception raised
f.check.return_value = True
with pytest.raises(OperatorBondedTracker.OperatorNoLongerBonded):
tracker.handle_errors(failure=f)
@pytest.mark.parametrize(
"traceback",
(
"just some text",
"text with {",
"test with }",
"test with {a} pair of curly braces" "test with {more} curly } braces {",
),
)
def test_operator_bonded_clean_traceback(traceback, mocker):
f = mocker.Mock()
f.getTraceback.return_value = traceback
result = OperatorBondedTracker.clean_traceback(f)
assert "{" not in result
assert "}" not in result