Add tests/fixes for compound conditions.

pull/3140/head
derekpierre 2023-06-10 12:54:09 -04:00
parent 610f7436f0
commit 4214a9b009
5 changed files with 243 additions and 40 deletions

View File

@ -164,6 +164,8 @@ def _make_rest_app(this_node, log: Logger) -> Flask:
"No conditions present for ciphertext - invalid for CBD functionality",
status=HTTPStatus.FORBIDDEN,
)
# TODO what if this fails i.e. ValidationError with the schema
lingo = ConditionLingo.from_dict(
json.loads(conditions_data)
) # str -> list -> ConditionLingo

View File

@ -50,7 +50,9 @@ class _ConditionsField(fields.Dict):
class CompoundAccessControlCondition(AccessControlCondition):
OPERATORS = ("and", "or")
AND_OPERATOR = "and"
OR_OPERATOR = "or"
OPERATORS = (AND_OPERATOR, OR_OPERATOR)
class Schema(CamelCaseSchema):
SKIP_VALUES = (None,)
@ -62,9 +64,7 @@ class CompoundAccessControlCondition(AccessControlCondition):
@post_load
def make(self, data, **kwargs):
return CompoundAccessControlCondition(
operator=data["operator"], operands=data["operands"]
)
return CompoundAccessControlCondition(**data)
def __init__(
self, operator: str, operands: List[Condition], name: Optional[str] = None
@ -83,42 +83,36 @@ class CompoundAccessControlCondition(AccessControlCondition):
self.id = md5(bytes(self)).hexdigest()[:6]
def __repr__(self):
return f"Operator={self.operator} (NumOperands={len(self.operands)} | id={self.id})"
return f"Operator={self.operator} (NumOperands={len(self.operands)}), id={self.id})"
def verify(self, *args, **kwargs) -> Tuple[bool, Any]:
values = []
overall_result = True
overall_result = True if self.operator == self.AND_OPERATOR else False
for condition in self.operands:
current_result, current_value = condition.verify(*args, **kwargs)
values.append(current_value)
# TODO: Additional protection and/or sanitation here
# [True/False, <Operator>, True/False] -> 'True/False and/or True/False'
eval_string = f"{overall_result} {self.operator} {current_result}"
# TODO: Additional protection and/or sanitation here
overall_result = eval(eval_string)
# TODO need to test this short-circuit
if self.operator == "and" and overall_result is False:
# short-circuit checks
# short-circuit checks
if self.operator == self.AND_OPERATOR and overall_result is False:
return False, values
if self.operator == self.OR_OPERATOR and overall_result is True:
return True, values
return True, values
return overall_result, values
class OR(CompoundAccessControlCondition):
def __init__(self, operands: List[Condition], operator: Optional[str] = "or"):
if operator != "or":
raise InvalidLogicalOperator(
f"'or' operator must be used with {self.__class__.__name__}"
)
super().__init__(operator=operator, operands=operands)
class OrCompoundCondition(CompoundAccessControlCondition):
def __init__(self, operands: List[Condition]):
super().__init__(operator=self.OR_OPERATOR, operands=operands)
class AND(CompoundAccessControlCondition):
def __init__(self, operands: List[Condition], operator: Optional[str] = "and"):
if operator != "and":
raise InvalidLogicalOperator(
f"'and' operator must be used with {self.__class__.__name__}"
)
super().__init__(operator=operator, operands=operands)
class AndCompoundCondition(CompoundAccessControlCondition):
def __init__(self, operands: List[Condition]):
super().__init__(operator=self.AND_OPERATOR, operands=operands)
class ReturnValueTest:

View File

@ -8,7 +8,12 @@ from nucypher.blockchain.eth.agents import (
)
from nucypher.policy.conditions.context import USER_ADDRESS_CONTEXT
from nucypher.policy.conditions.evm import ContractCondition
from nucypher.policy.conditions.lingo import AND, OR, ConditionLingo, ReturnValueTest
from nucypher.policy.conditions.lingo import (
AndCompoundCondition,
ConditionLingo,
OrCompoundCondition,
ReturnValueTest,
)
from tests.constants import TESTERCHAIN_CHAIN_ID
@ -34,14 +39,14 @@ def compound_lingo(
):
"""depends on contract deployments"""
lingo = ConditionLingo(
condition=AND(
condition=OrCompoundCondition(
operands=[
erc721_evm_condition_balanceof,
OR(
time_condition,
AndCompoundCondition(
operands=[
erc20_evm_condition_balanceof,
time_condition,
rpc_condition,
erc20_evm_condition_balanceof,
]
),
]

View File

@ -4,7 +4,12 @@ import pytest
from nucypher.policy.conditions.context import USER_ADDRESS_CONTEXT
from nucypher.policy.conditions.evm import ContractCondition
from nucypher.policy.conditions.lingo import ReturnValueTest, ConditionLingo, OR, AND
from nucypher.policy.conditions.lingo import (
AndCompoundCondition,
ConditionLingo,
OrCompoundCondition,
ReturnValueTest,
)
from tests.constants import TESTERCHAIN_CHAIN_ID
@ -14,16 +19,11 @@ def compound_lingo(
):
"""does not depend on contract deployments"""
lingo = ConditionLingo(
condition=AND(
condition=OrCompoundCondition(
operands=[
erc20_evm_condition,
OR(
operands=[
erc721_evm_condition,
time_condition,
rpc_condition,
]
),
erc721_evm_condition,
time_condition,
AndCompoundCondition(operands=[rpc_condition, erc20_evm_condition]),
]
)
)

View File

@ -0,0 +1,202 @@
from unittest.mock import Mock
import pytest
from nucypher.policy.conditions.base import AccessControlCondition
from nucypher.policy.conditions.lingo import AndCompoundCondition, OrCompoundCondition
@pytest.fixture(scope="function")
def mock_conditions():
condition_1 = Mock(spec=AccessControlCondition)
condition_1.verify.return_value = (True, 1)
condition_1.to_dict.return_value = {
"value": 1
} # needed for "id" value calc for CompoundAccessControlCondition
condition_2 = Mock(spec=AccessControlCondition)
condition_2.verify.return_value = (True, 2)
condition_2.to_dict.return_value = {"value": 2}
condition_3 = Mock(spec=AccessControlCondition)
condition_3.verify.return_value = (True, 3)
condition_3.to_dict.return_value = {"value": 3}
condition_4 = Mock(spec=AccessControlCondition)
condition_4.verify.return_value = (True, 4)
condition_4.to_dict.return_value = {"value": 4}
return condition_1, condition_2, condition_3, condition_4
def test_and_condition_and_short_circuit(mock_conditions):
condition_1, condition_2, condition_3, condition_4 = mock_conditions
and_condition = AndCompoundCondition(
operands=[
condition_1,
condition_2,
condition_3,
condition_4,
]
)
# ensure that all conditions evaluated when all return True
result, value = and_condition.verify()
assert result is True
assert len(value) == 4, "all conditions evaluated"
assert value == [1, 2, 3, 4]
# ensure that short circuit happens when 1st condition is false
condition_1.verify.return_value = (False, 1)
result, value = and_condition.verify()
assert result is False
assert len(value) == 1, "only one condition evaluated"
assert value == [1]
# short circuit occurs for 3rd entry
condition_1.verify.return_value = (True, 1)
condition_3.verify.return_value = (False, 3)
result, value = and_condition.verify()
assert result is False
assert len(value) == 3, "3-of-4 conditions evaluated"
assert value == [1, 2, 3]
def test_or_condition_and_short_circuit(mock_conditions):
condition_1, condition_2, condition_3, condition_4 = mock_conditions
or_condition = OrCompoundCondition(
operands=[
condition_1,
condition_2,
condition_3,
condition_4,
]
)
# ensure that only first condition evaluated when first is True
condition_1.verify.return_value = (True, 1) # short circuit here
result, value = or_condition.verify()
assert result is True
assert len(value) == 1, "only first condition needs to be evaluated"
assert value == [1]
# ensure first True condition is returned
condition_1.verify.return_value = (False, 1)
condition_2.verify.return_value = (False, 2)
condition_3.verify.return_value = (True, 3) # short circuit here
result, value = or_condition.verify()
assert result is True
assert len(value) == 3, "third condition causes short circuit"
assert value == [1, 2, 3]
# no short circuit occurs when all are False
condition_1.verify.return_value = (False, 1)
condition_2.verify.return_value = (False, 2)
condition_3.verify.return_value = (False, 3)
condition_4.verify.return_value = (False, 4)
result, value = or_condition.verify()
assert result is False
assert len(value) == 4, "all conditions evaluated"
assert value == [1, 2, 3, 4]
def test_compound_condition(mock_conditions):
condition_1, condition_2, condition_3, condition_4 = mock_conditions
compound_condition = AndCompoundCondition(
operands=[
OrCompoundCondition(
operands=[
condition_1,
condition_2,
condition_3,
]
),
condition_4,
]
)
# all conditions are True
result, value = compound_condition.verify()
assert result is True
assert len(value) == 2, "or_condition and condition_4"
assert value == [[1], 4]
# or condition is False
condition_1.verify.return_value = (False, 1)
condition_2.verify.return_value = (False, 2)
condition_3.verify.return_value = (False, 3)
result, value = compound_condition.verify()
assert result is False
assert len(value) == 1, "or_condition"
assert value == [
[1, 2, 3]
] # or condition does not short circuit, but and is short-circuited because or is False
# or condition is True but condition 4 is False
condition_1.verify.return_value = (True, 1)
condition_4.verify.return_value = (False, 4)
result, value = compound_condition.verify()
assert result is False
assert len(value) == 2, "or_condition and condition_4"
assert value == [
[1],
4,
] # or condition short-circuited because condition_1 was True
# condition_4 is now true
condition_4.verify.return_value = (True, 4)
result, value = compound_condition.verify()
assert result is True
assert len(value) == 2, "or_condition and condition_4"
assert value == [
[1],
4,
] # or condition short-circuited because condition_1 was True
def test_nested_compound_condition(mock_conditions):
condition_1, condition_2, condition_3, condition_4 = mock_conditions
nested_compound_condition = AndCompoundCondition(
operands=[
OrCompoundCondition(
operands=[
condition_1,
AndCompoundCondition(
operands=[
condition_2,
condition_3,
]
),
]
),
condition_4,
]
)
# all conditions are True
result, value = nested_compound_condition.verify()
assert result is True
assert len(value) == 2, "or_condition and condition_4"
assert value == [[1], 4] # or short-circuited since condition_1 is True
# condition_1 is now false so nested and condition must be evaluated
condition_1.verify.return_value = (False, 1)
result, value = nested_compound_condition.verify()
assert result is True
assert len(value) == 2, "or_condition and condition_4"
assert value == [[1, [2, 3]], 4] # nested and was evaluated and evaluated to True
# condition_4 is False so result flips to True
condition_4.verify.return_value = (False, 4)
result, value = nested_compound_condition.verify()
assert result is False
assert len(value) == 2, "or_condition and condition_4"
assert value == [[1, [2, 3]], 4]