Merge pull request #3571 from derekpierre/non-evm-rpc

JsonRpcCondition (JSON RPC 2.0 specification)
pull/3570/merge
Derek Pierre 2025-01-06 08:41:19 -05:00 committed by GitHub
commit ee26e9ecad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 913 additions and 298 deletions

View File

@ -0,0 +1 @@
Support for conditions based on APIs provided by off-chain JSON RPC 2.0 endpoints.

View File

@ -74,9 +74,7 @@ class RPCCall(ExecutionCall):
"null": "Undefined method name",
},
)
parameters = fields.List(
fields.Field, attribute="parameters", required=False, allow_none=True
)
parameters = fields.List(fields.Field, required=False, allow_none=True)
@validates("method")
def validate_method(self, value):

View File

@ -45,3 +45,7 @@ class ConditionEvaluationFailed(Exception):
class RPCExecutionFailed(ConditionEvaluationFailed):
"""Raised when an exception is raised from an RPC call."""
class JsonRequestException(ConditionEvaluationFailed):
"""Raised when an exception is raised from a JSON request."""

View File

@ -0,0 +1,121 @@
from typing import Any, Optional
from marshmallow import ValidationError, fields, post_load, validate, validates
from marshmallow.fields import Url
from typing_extensions import override
from nucypher.policy.conditions.context import is_context_variable
from nucypher.policy.conditions.json.base import (
BaseJsonRequestCondition,
HTTPMethod,
JSONPathField,
JsonRequestCall,
)
from nucypher.policy.conditions.lingo import (
ConditionType,
ExecutionCallAccessControlCondition,
ReturnValueTest,
)
from nucypher.utilities.logging import Logger
class JsonApiCall(JsonRequestCall):
TIMEOUT = 5 # seconds
class Schema(JsonRequestCall.Schema):
endpoint = Url(required=True, relative=False, schemes=["https"])
parameters = fields.Dict(required=False, allow_none=True)
query = JSONPathField(required=False, allow_none=True)
authorization_token = fields.Str(required=False, allow_none=True)
@post_load
def make(self, data, **kwargs):
return JsonApiCall(**data)
@validates("authorization_token")
def validate_auth_token(self, value):
if value and not is_context_variable(value):
raise ValidationError(
f"Invalid value for authorization token; expected a context variable, but got '{value}'"
)
def __init__(
self,
endpoint: str,
parameters: Optional[dict] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.endpoint = endpoint
super().__init__(
http_method=HTTPMethod.GET,
parameters=parameters,
query=query,
authorization_token=authorization_token,
)
self.logger = Logger(__name__)
@override
def execute(self, **context) -> Any:
return super()._execute(endpoint=self.endpoint, **context)
class JsonApiCondition(BaseJsonRequestCondition):
"""
A JSON API condition is a condition that can be evaluated by performing a GET on a JSON
HTTPS endpoint. The response must return an HTTP 200 with valid JSON in the response body.
The response will be deserialized as JSON and parsed using jsonpath.
"""
EXECUTION_CALL_TYPE = JsonApiCall
CONDITION_TYPE = ConditionType.JSONAPI.value
class Schema(ExecutionCallAccessControlCondition.Schema, JsonApiCall.Schema):
condition_type = fields.Str(
validate=validate.Equal(ConditionType.JSONAPI.value), required=True
)
@post_load
def make(self, data, **kwargs):
return JsonApiCondition(**data)
def __init__(
self,
endpoint: str,
return_value_test: ReturnValueTest,
query: Optional[str] = None,
parameters: Optional[dict] = None,
authorization_token: Optional[str] = None,
condition_type: Optional[str] = ConditionType.JSONAPI.value,
name: Optional[str] = None,
):
super().__init__(
endpoint=endpoint,
return_value_test=return_value_test,
query=query,
parameters=parameters,
authorization_token=authorization_token,
condition_type=condition_type,
name=name,
)
@property
def endpoint(self):
return self.execution_call.endpoint
@property
def query(self):
return self.execution_call.query
@property
def parameters(self):
return self.execution_call.parameters
@property
def timeout(self):
return self.execution_call.timeout
@property
def authorization_token(self):
return self.execution_call.authorization_token

View File

@ -0,0 +1,159 @@
from abc import ABC
from enum import Enum
from http import HTTPStatus
from typing import Any, Optional, Tuple
import requests
from jsonpath_ng.exceptions import JsonPathLexerError, JsonPathParserError
from jsonpath_ng.ext import parse
from marshmallow.fields import Field
from nucypher.policy.conditions.base import ExecutionCall
from nucypher.policy.conditions.context import (
resolve_any_context_variables,
string_contains_context_variable,
)
from nucypher.policy.conditions.exceptions import (
ConditionEvaluationFailed,
JsonRequestException,
)
from nucypher.policy.conditions.json.utils import process_result_for_condition_eval
from nucypher.policy.conditions.lingo import ExecutionCallAccessControlCondition
from nucypher.utilities.logging import Logger
class HTTPMethod(Enum):
GET = "GET"
POST = "POST"
class JsonRequestCall(ExecutionCall, ABC):
TIMEOUT = 5 # seconds
def __init__(
self,
http_method: HTTPMethod,
parameters: Optional[dict] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.http_method = http_method
self.parameters = parameters or {}
self.query = query
self.authorization_token = authorization_token
self.timeout = self.TIMEOUT
self.logger = Logger(__name__)
super().__init__()
def _execute(self, endpoint: str, **context) -> Any:
data = self._fetch(endpoint, **context)
result = self._query_response(data, **context)
return result
def _fetch(self, endpoint: str, **context) -> Any:
resolved_endpoint = resolve_any_context_variables(endpoint, **context)
resolved_parameters = resolve_any_context_variables(self.parameters, **context)
headers = {"Content-Type": "application/json"}
if self.authorization_token:
resolved_authorization_token = resolve_any_context_variables(
self.authorization_token, **context
)
headers["Authorization"] = f"Bearer {resolved_authorization_token}"
try:
if self.http_method == HTTPMethod.GET:
response = requests.get(
resolved_endpoint,
params=resolved_parameters,
timeout=self.timeout,
headers=headers,
)
else:
# POST
response = requests.post(
resolved_endpoint,
json=resolved_parameters,
timeout=self.timeout,
headers=headers,
)
response.raise_for_status()
if response.status_code != HTTPStatus.OK:
raise JsonRequestException(
f"Failed to fetch from endpoint {resolved_endpoint}: {response.status_code}"
)
except requests.exceptions.RequestException as request_error:
raise JsonRequestException(
f"Failed to fetch from endpoint {resolved_endpoint}: {request_error}"
)
try:
data = response.json()
return data
except (requests.exceptions.RequestException, ValueError) as json_error:
raise JsonRequestException(
f"Failed to extract JSON response from {resolved_endpoint}: {json_error}"
)
def _query_response(self, response_json: Any, **context) -> Any:
if not self.query:
return response_json # primitive value
resolved_query = resolve_any_context_variables(self.query, **context)
try:
expression = parse(resolved_query)
matches = expression.find(response_json)
if not matches:
message = f"No matches found for the JSONPath query: {resolved_query}"
self.logger.info(message)
raise ConditionEvaluationFailed(message)
except (JsonPathLexerError, JsonPathParserError) as jsonpath_err:
self.logger.error(f"JSONPath error occurred: {jsonpath_err}")
raise ConditionEvaluationFailed(
f"JSONPath error: {jsonpath_err}"
) from jsonpath_err
if len(matches) > 1:
message = f"Ambiguous JSONPath query - multiple matches found for: {resolved_query}"
self.logger.info(message)
raise JsonRequestException(message)
result = matches[0].value
return result
class JSONPathField(Field):
default_error_messages = {
"invalidType": "Expression of type {value} is not valid for JSONPath",
"invalid": "'{value}' is not a valid JSONPath expression",
}
def _deserialize(self, value, attr, data, **kwargs):
if not isinstance(value, str):
raise self.make_error("invalidType", value=type(value))
try:
if not string_contains_context_variable(value):
parse(value)
except (JsonPathLexerError, JsonPathParserError) as e:
raise self.make_error("invalid", value=value) from e
return value
class BaseJsonRequestCondition(ExecutionCallAccessControlCondition, ABC):
def verify(self, **context) -> Tuple[bool, Any]:
"""
Verifies the JSON condition.
"""
result = self.execution_call.execute(**context)
result_for_eval = process_result_for_condition_eval(result)
resolved_return_value_test = self.return_value_test.with_resolved_context(
**context
)
eval_result = resolved_return_value_test.eval(result_for_eval) # test
return eval_result, result

View File

@ -0,0 +1,178 @@
from abc import ABC
from typing import Any, Optional
from uuid import uuid4
from marshmallow import ValidationError, fields, post_load, validate, validates
from marshmallow.fields import Url
from typing_extensions import override
from nucypher.policy.conditions.context import is_context_variable
from nucypher.policy.conditions.exceptions import (
JsonRequestException,
)
from nucypher.policy.conditions.json.base import (
BaseJsonRequestCondition,
HTTPMethod,
JSONPathField,
JsonRequestCall,
)
from nucypher.policy.conditions.lingo import (
ConditionType,
ExecutionCallAccessControlCondition,
ReturnValueTest,
)
class BaseJsonRPCCall(JsonRequestCall, ABC):
class Schema(JsonRequestCall.Schema):
method = fields.Str(required=True)
params = fields.Field(required=False, allow_none=True)
query = JSONPathField(required=False, allow_none=True)
authorization_token = fields.Str(required=False, allow_none=True)
@validates("authorization_token")
def validate_auth_token(self, value):
if value and not is_context_variable(value):
raise ValidationError(
f"Invalid value for authorization token; expected a context variable, but got '{value}'"
)
def __init__(
self,
method: str,
params: Optional[Any] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.method = method
self.params = params or []
parameters = {
"jsonrpc": "2.0",
"method": self.method,
"params": self.params,
"id": str(uuid4()), # any random id will do
}
super().__init__(
http_method=HTTPMethod.POST,
parameters=parameters,
query=query,
authorization_token=authorization_token,
)
@override
def _execute(self, endpoint, **context):
data = self._fetch(endpoint, **context)
# response contains a value for either "result" or "error"
error = data.get("error")
if error:
raise JsonRequestException(
f"JSON RPC Request failed with error in response: code={error.get('code')}, msg={error.get('message')}"
)
# obtain result first then perform query
result = data.get("result")
if not result:
raise JsonRequestException(
f"Malformed JSON RPC response, no 'result' field - data={data}"
)
query_result = self._query_response(result, **context)
return query_result
class JsonEndpointRPCCall(BaseJsonRPCCall):
class Schema(BaseJsonRPCCall.Schema):
endpoint = Url(required=True, relative=False, schemes=["https"])
@post_load
def make(self, data, **kwargs):
return JsonEndpointRPCCall(**data)
def __init__(
self,
endpoint: str,
method: str,
params: Optional[Any] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.endpoint = endpoint
super().__init__(
method=method,
params=params,
query=query,
authorization_token=authorization_token,
)
@override
def execute(self, **context) -> Any:
return super()._execute(endpoint=self.endpoint, **context)
class JsonRpcCondition(BaseJsonRequestCondition):
"""
A JSON RPC condition is a condition that can be evaluated by performing a POST on a JSON
HTTPS endpoint. The response must return an HTTP 200 with valid JSON RPC 2.0 response.
The response will be deserialized as JSON and parsed using jsonpath.
"""
EXECUTION_CALL_TYPE = JsonEndpointRPCCall
CONDITION_TYPE = ConditionType.JSONRPC.value
class Schema(
ExecutionCallAccessControlCondition.Schema, JsonEndpointRPCCall.Schema
):
condition_type = fields.Str(
validate=validate.Equal(ConditionType.JSONRPC.value), required=True
)
@post_load
def make(self, data, **kwargs):
return JsonRpcCondition(**data)
def __init__(
self,
endpoint: str,
method: str,
return_value_test: ReturnValueTest,
params: Optional[Any] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
condition_type: Optional[str] = ConditionType.JSONRPC.value,
name: Optional[str] = None,
):
super().__init__(
endpoint=endpoint,
method=method,
return_value_test=return_value_test,
params=params,
query=query,
authorization_token=authorization_token,
condition_type=condition_type,
name=name,
)
@property
def endpoint(self):
return self.execution_call.endpoint
@property
def method(self):
return self.execution_call.method
@property
def params(self):
return self.execution_call.params
@property
def query(self):
return self.execution_call.query
@property
def authorization_token(self):
return self.execution_call.authorization_token
@property
def timeout(self):
return self.execution_call.timeout

View File

@ -0,0 +1,17 @@
from typing import Any
def process_result_for_condition_eval(result: Any):
# strings that are not already quoted will cause a problem for literal_eval
if not isinstance(result, str):
return result
# check if already quoted; if not, quote it
if not (
(result.startswith("'") and result.endswith("'"))
or (result.startswith('"') and result.endswith('"'))
):
quote_type_to_use = '"' if "'" in result else "'"
result = f"{quote_type_to_use}{result}{quote_type_to_use}"
return result

View File

@ -69,6 +69,7 @@ class ConditionType(Enum):
CONTRACT = "contract"
RPC = "rpc"
JSONAPI = "json-api"
JSONRPC = "json-rpc"
COMPOUND = "compound"
SEQUENTIAL = "sequential"
IF_THEN_ELSE = "if-then-else"
@ -697,7 +698,8 @@ class ConditionLingo(_Serializable):
conditions expression framework.
"""
from nucypher.policy.conditions.evm import ContractCondition, RPCCondition
from nucypher.policy.conditions.offchain import JsonApiCondition
from nucypher.policy.conditions.json.api import JsonApiCondition
from nucypher.policy.conditions.json.rpc import JsonRpcCondition
from nucypher.policy.conditions.time import TimeCondition
# version logical adjustments can be made here as required
@ -709,6 +711,7 @@ class ConditionLingo(_Serializable):
RPCCondition,
CompoundAccessControlCondition,
JsonApiCondition,
JsonRpcCondition,
SequentialAccessControlCondition,
IfThenElseCondition,
):

View File

@ -1,255 +0,0 @@
from typing import Any, Optional, Tuple
import requests
from jsonpath_ng.exceptions import JsonPathLexerError, JsonPathParserError
from jsonpath_ng.ext import parse
from marshmallow import ValidationError, fields, post_load, validate, validates
from marshmallow.fields import Field, Url
from nucypher.policy.conditions.base import ExecutionCall
from nucypher.policy.conditions.context import (
is_context_variable,
resolve_any_context_variables,
string_contains_context_variable,
)
from nucypher.policy.conditions.exceptions import (
ConditionEvaluationFailed,
InvalidCondition,
)
from nucypher.policy.conditions.lingo import (
ConditionType,
ExecutionCallAccessControlCondition,
ReturnValueTest,
)
from nucypher.utilities.logging import Logger
class JSONPathField(Field):
default_error_messages = {
"invalidType": "Expression of type {value} is not valid for JSONPath",
"invalid": "'{value}' is not a valid JSONPath expression",
}
def _deserialize(self, value, attr, data, **kwargs):
if not isinstance(value, str):
raise self.make_error("invalidType", value=type(value))
try:
if not string_contains_context_variable(value):
parse(value)
except (JsonPathLexerError, JsonPathParserError):
raise self.make_error("invalid", value=value)
return value
class JsonApiCall(ExecutionCall):
TIMEOUT = 5 # seconds
class Schema(ExecutionCall.Schema):
endpoint = Url(required=True, relative=False, schemes=["https"])
parameters = fields.Dict(required=False, allow_none=True)
query = JSONPathField(required=False, allow_none=True)
authorization_token = fields.Str(required=False, allow_none=True)
@post_load
def make(self, data, **kwargs):
return JsonApiCall(**data)
@validates("authorization_token")
def validate_auth_token(self, value):
if value and not is_context_variable(value):
raise ValidationError(
f"Invalid value for authorization token; expected a context variable, but got '{value}'"
)
def __init__(
self,
endpoint: str,
parameters: Optional[dict] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.endpoint = endpoint
self.parameters = parameters or {}
self.query = query
self.authorization_token = authorization_token
self.timeout = self.TIMEOUT
self.logger = Logger(__name__)
super().__init__()
def execute(self, **context) -> Any:
response = self._fetch(**context)
data = self._deserialize_response(response)
result = self._query_response(data, **context)
return result
def _fetch(self, **context) -> requests.Response:
"""Fetches data from the endpoint."""
resolved_endpoint = resolve_any_context_variables(self.endpoint, **context)
resolved_parameters = resolve_any_context_variables(self.parameters, **context)
headers = None
if self.authorization_token:
resolved_authorization_token = resolve_any_context_variables(
self.authorization_token, **context
)
headers = {"Authorization": f"Bearer {resolved_authorization_token}"}
try:
response = requests.get(
resolved_endpoint,
params=resolved_parameters,
timeout=self.timeout,
headers=headers,
)
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
self.logger.error(f"HTTP error occurred: {http_error}")
raise ConditionEvaluationFailed(
f"Failed to fetch endpoint {resolved_endpoint}: {http_error}"
)
except requests.exceptions.RequestException as request_error:
self.logger.error(f"Request exception occurred: {request_error}")
raise InvalidCondition(
f"Failed to fetch endpoint {resolved_endpoint}: {request_error}"
)
if response.status_code != 200:
self.logger.error(
f"Failed to fetch endpoint {resolved_endpoint}: {response.status_code}"
)
raise ConditionEvaluationFailed(
f"Failed to fetch endpoint {resolved_endpoint}: {response.status_code}"
)
return response
def _deserialize_response(self, response: requests.Response) -> Any:
"""Deserializes the JSON response from the endpoint."""
try:
data = response.json()
except (requests.exceptions.RequestException, ValueError) as json_error:
self.logger.error(f"JSON parsing error occurred: {json_error}")
raise ConditionEvaluationFailed(
f"Failed to parse JSON response: {json_error}"
)
return data
def _query_response(self, data: Any, **context) -> Any:
if not self.query:
return data # primitive value
resolved_query = resolve_any_context_variables(self.query, **context)
try:
expression = parse(resolved_query)
matches = expression.find(data)
if not matches:
message = f"No matches found for the JSONPath query: {resolved_query}"
self.logger.info(message)
raise ConditionEvaluationFailed(message)
except (JsonPathLexerError, JsonPathParserError) as jsonpath_err:
self.logger.error(f"JSONPath error occurred: {jsonpath_err}")
raise ConditionEvaluationFailed(f"JSONPath error: {jsonpath_err}")
if len(matches) > 1:
message = f"Ambiguous JSONPath query - Multiple matches found for: {resolved_query}"
self.logger.info(message)
raise ConditionEvaluationFailed(message)
result = matches[0].value
return result
class JsonApiCondition(ExecutionCallAccessControlCondition):
"""
A JSON API condition is a condition that can be evaluated by reading from a JSON
HTTPS endpoint. The response must return an HTTP 200 with valid JSON in the response body.
The response will be deserialized as JSON and parsed using jsonpath.
"""
EXECUTION_CALL_TYPE = JsonApiCall
CONDITION_TYPE = ConditionType.JSONAPI.value
class Schema(ExecutionCallAccessControlCondition.Schema, JsonApiCall.Schema):
condition_type = fields.Str(
validate=validate.Equal(ConditionType.JSONAPI.value), required=True
)
@post_load
def make(self, data, **kwargs):
return JsonApiCondition(**data)
def __init__(
self,
endpoint: str,
return_value_test: ReturnValueTest,
query: Optional[str] = None,
parameters: Optional[dict] = None,
authorization_token: Optional[str] = None,
condition_type: str = ConditionType.JSONAPI.value,
name: Optional[str] = None,
):
super().__init__(
endpoint=endpoint,
return_value_test=return_value_test,
query=query,
parameters=parameters,
authorization_token=authorization_token,
condition_type=condition_type,
name=name,
)
@property
def endpoint(self):
return self.execution_call.endpoint
@property
def query(self):
return self.execution_call.query
@property
def parameters(self):
return self.execution_call.parameters
@property
def timeout(self):
return self.execution_call.timeout
@property
def authorization_token(self):
return self.execution_call.authorization_token
@staticmethod
def _process_result_for_eval(result: Any):
# strings that are not already quoted will cause a problem for literal_eval
if not isinstance(result, str):
return result
# check if already quoted; if not, quote it
if not (
(result.startswith("'") and result.endswith("'"))
or (result.startswith('"') and result.endswith('"'))
):
quote_type_to_use = '"' if "'" in result else "'"
result = f"{quote_type_to_use}{result}{quote_type_to_use}"
return result
def verify(self, **context) -> Tuple[bool, Any]:
"""
Verifies the offchain condition is met by performing a read operation on the endpoint
and evaluating the return value test with the result. Parses the endpoint's JSON response using
JSONPath.
"""
result = self.execution_call.execute(**context)
result_for_eval = self._process_result_for_eval(result)
resolved_return_value_test = self.return_value_test.with_resolved_context(
**context
)
eval_result = resolved_return_value_test.eval(result_for_eval) # test
return eval_result, result

View File

@ -26,6 +26,7 @@ from nucypher.policy.conditions.exceptions import (
RequiredContextVariable,
RPCExecutionFailed,
)
from nucypher.policy.conditions.json.rpc import JsonRpcCondition
from nucypher.policy.conditions.lingo import (
ConditionLingo,
ConditionType,
@ -893,3 +894,35 @@ def test_contract_condition_using_overloaded_function(
)
with pytest.raises(RPCExecutionFailed):
_ = condition.verify(providers=condition_providers, **context)
@pytest.mark.xfail(reason="This test uses a public rpc endpoint")
def test_json_rpc_condition_non_evm_prototyping_example():
condition = JsonRpcCondition(
endpoint="https://api.mainnet-beta.solana.com",
method="getBlockTime",
params=[308103883],
return_value_test=ReturnValueTest(">=", 1734461499),
)
success, _ = condition.verify()
assert success
condition = JsonRpcCondition(
endpoint="https://api.mainnet-beta.solana.com",
method="getBalance",
params=["83astBRguLMdt2h5U1Tpdq5tjFoJ6noeGwaY3mDLVcri"],
query="$.value",
return_value_test=ReturnValueTest(">=", 0),
)
success, _ = condition.verify()
assert success
condition = JsonRpcCondition(
endpoint="https://bitcoin.drpc.org",
method="getblock",
params=["00000000000000000001ed4d40e6b602d7f09b9d47d5e046d52339cc6673a486"],
query="$.time",
return_value_test=ReturnValueTest(">=", 1734461294),
)
success, _ = condition.verify()
assert success

View File

@ -81,6 +81,18 @@ def lingo_with_all_condition_types(get_random_checksum_address):
"value": 2,
},
}
json_rpc_condition = {
# JSON RPC
"conditionType": "json-rpc",
"endpoint": "https://math.example.com/",
"method": "subtract",
"params": [42, 23],
"query": "$.mathresult",
"returnValueTest": {
"comparator": "==",
"value": 19,
},
}
sequential_condition = {
"conditionType": ConditionType.SEQUENTIAL.value,
"conditionVariables": [
@ -106,7 +118,7 @@ def lingo_with_all_condition_types(get_random_checksum_address):
"conditionType": ConditionType.IF_THEN_ELSE.value,
"ifCondition": rpc_condition,
"thenCondition": json_api_condition,
"elseCondition": time_condition,
"elseCondition": json_rpc_condition,
}
return {
"version": ConditionLingo.VERSION,

View File

@ -2,34 +2,13 @@ import json
import pytest
import requests
from marshmallow import ValidationError
from nucypher.policy.conditions.exceptions import (
ConditionEvaluationFailed,
InvalidCondition,
JsonRequestException,
)
from nucypher.policy.conditions.json.api import JsonApiCondition
from nucypher.policy.conditions.lingo import ConditionLingo, ReturnValueTest
from nucypher.policy.conditions.offchain import (
JsonApiCondition,
JSONPathField,
)
def test_jsonpath_field_valid():
field = JSONPathField()
valid_jsonpath = "$.store.book[0].price"
result = field.deserialize(valid_jsonpath)
assert result == valid_jsonpath
def test_jsonpath_field_invalid():
field = JSONPathField()
invalid_jsonpath = "invalid jsonpath"
with pytest.raises(
ValidationError,
match=f"'{invalid_jsonpath}' is not a valid JSONPath expression",
):
field.deserialize(invalid_jsonpath)
def test_json_api_condition_initialization():
@ -41,6 +20,7 @@ def test_json_api_condition_initialization():
assert condition.endpoint == "https://api.example.com/data"
assert condition.query == "$.store.book[0].price"
assert condition.return_value_test.eval(0)
assert condition.timeout == JsonApiCondition.EXECUTION_CALL_TYPE.TIMEOUT
def test_json_api_condition_invalid_type():
@ -55,7 +35,7 @@ def test_json_api_condition_invalid_type():
)
def test_https_enforcement():
def test_json_api_https_enforcement():
with pytest.raises(InvalidCondition, match="Not a valid URL"):
_ = JsonApiCondition(
endpoint="http://api.example.com/data",
@ -64,7 +44,7 @@ def test_https_enforcement():
)
def test_invalid_authorization_token():
def test_json_api_invalid_authorization_token():
with pytest.raises(InvalidCondition, match="Invalid value for authorization token"):
_ = JsonApiCondition(
endpoint="https://api.example.com/data",
@ -97,9 +77,8 @@ def test_json_api_condition_fetch(mocker):
query="$.store.book[0].title",
return_value_test=ReturnValueTest("==", "'Test Title'"),
)
response = condition.execution_call._fetch()
assert response.status_code == 200
assert response.json() == {"store": {"book": [{"title": "Test Title"}]}}
data = condition.execution_call._fetch(condition.endpoint)
assert data == {"store": {"book": [{"title": "Test Title"}]}}
def test_json_api_condition_fetch_failure(mocker):
@ -112,8 +91,8 @@ def test_json_api_condition_fetch_failure(mocker):
query="$.store.book[0].price",
return_value_test=ReturnValueTest("==", 1),
)
with pytest.raises(InvalidCondition, match="Failed to fetch endpoint"):
condition.execution_call._fetch()
with pytest.raises(JsonRequestException, match="Failed to fetch from endpoint"):
condition.execution_call._fetch(condition.endpoint)
def test_json_api_condition_verify(mocker):
@ -179,13 +158,27 @@ def test_json_api_condition_verify_invalid_json(mocker):
query="$.store.book[0].price",
return_value_test=ReturnValueTest("==", 2),
)
with pytest.raises(
ConditionEvaluationFailed, match="Failed to parse JSON response"
):
with pytest.raises(JsonRequestException, match="Failed to extract JSON response"):
condition.verify()
def test_non_json_response(mocker):
def test_json_api_non_200_status(mocker):
# Mock the requests.get method to return a response with non-JSON content
mock_response = mocker.Mock()
mock_response.status_code = 400
mocker.patch("requests.get", return_value=mock_response)
condition = JsonApiCondition(
endpoint="https://api.example.com/data",
query="$.store.book[0].price",
return_value_test=ReturnValueTest("==", 18),
)
with pytest.raises(JsonRequestException, match="Failed to fetch from endpoint"):
condition.verify()
def test_json_api_non_json_response(mocker):
# Mock the requests.get method to return a response with non-JSON content
mock_response = mocker.Mock()
mock_response.status_code = 200
@ -200,9 +193,7 @@ def test_non_json_response(mocker):
return_value_test=ReturnValueTest("==", 18),
)
with pytest.raises(
ConditionEvaluationFailed, match="Failed to parse JSON response"
):
with pytest.raises(JsonRequestException, match="Failed to extract JSON response"):
condition.verify()
@ -382,5 +373,5 @@ def test_ambiguous_json_path_multiple_results(mocker):
return_value_test=ReturnValueTest("==", 1),
)
with pytest.raises(ConditionEvaluationFailed, match="Ambiguous JSONPath query"):
with pytest.raises(JsonRequestException, match="Ambiguous JSONPath query"):
condition.verify()

View File

@ -0,0 +1,27 @@
import pytest
from marshmallow import ValidationError
from nucypher.policy.conditions.json.base import JSONPathField
def test_jsonpath_field_valid():
field = JSONPathField()
valid_jsonpath = "$.store.book[0].price"
result = field.deserialize(valid_jsonpath)
assert result == valid_jsonpath
@pytest.mark.parametrize(
"invalid_jsonpath",
[
"invalid jsonpath",
"}{[]$%",
12,
12.25,
True,
],
)
def test_jsonpath_field_invalid(invalid_jsonpath):
field = JSONPathField()
with pytest.raises(ValidationError):
field.deserialize(invalid_jsonpath)

View File

@ -0,0 +1,326 @@
import json
import uuid
import pytest
import requests
from nucypher.policy.conditions.exceptions import (
InvalidCondition,
JsonRequestException,
)
from nucypher.policy.conditions.json.rpc import JsonRpcCondition
from nucypher.policy.conditions.lingo import (
ConditionLingo,
ConditionType,
ReturnValueTest,
)
UUID4_STR = "b192fdd2-1529-4fe9-a671-e5386453aa9c"
@pytest.fixture(scope="function", autouse=True)
def mock_uuid4(mocker):
u = uuid.UUID(UUID4_STR, version=4)
mocker.patch("nucypher.policy.conditions.json.rpc.uuid4", return_value=u)
def test_json_rpc_condition_initialization():
condition = JsonRpcCondition(
endpoint="https://math.example.com/",
method="subtract",
query="$.mathresult",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
)
assert condition.endpoint == "https://math.example.com/"
assert condition.method == "subtract"
assert condition.query == "$.mathresult"
assert condition.return_value_test.eval(19)
assert condition.timeout == JsonRpcCondition.EXECUTION_CALL_TYPE.TIMEOUT
def test_json_rpc_condition_invalid_type():
with pytest.raises(
InvalidCondition,
match=f"'condition_type' field - Must be equal to {ConditionType.JSONRPC.value}",
):
_ = JsonRpcCondition(
condition_type=ConditionType.JSONAPI.value,
endpoint="https://math.example.com/",
method="subtract",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
)
def test_https_enforcement():
with pytest.raises(InvalidCondition, match="Not a valid URL"):
_ = JsonRpcCondition(
endpoint="http://math.example.com/",
method="subtract",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
)
def test_invalid_authorization_token():
with pytest.raises(InvalidCondition, match="Invalid value for authorization token"):
_ = JsonRpcCondition(
endpoint="https://math.example.com/",
method="subtract",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
authorization_token="github_pat_123456789",
)
def test_json_rpc_condition_verify(mocker):
mock_response = mocker.Mock(status_code=200)
mock_response.json.return_value = {"jsonrpc": "2.0", "result": 19, "id": 1}
mocked_method = mocker.patch("requests.post", return_value=mock_response)
condition = JsonRpcCondition(
endpoint="https://math.example.com/",
method="subtract",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
)
success, result = condition.verify()
assert success is True
assert result == 19
assert mocked_method.call_count == 1
assert mocked_method.call_args.kwargs["json"] == {
"jsonrpc": "2.0",
"id": UUID4_STR,
"method": condition.method,
"params": condition.params,
}
def test_json_rpc_condition_verify_params_as_dict(mocker):
mock_response = mocker.Mock(status_code=200)
mock_response.json.return_value = {"jsonrpc": "2.0", "result": 19, "id": 1}
mocked_method = mocker.patch("requests.post", return_value=mock_response)
condition = JsonRpcCondition(
endpoint="https://math.example.com/",
method="subtract",
params={
"value1": 42,
"value2": 23,
},
return_value_test=ReturnValueTest("==", 19),
)
success, result = condition.verify()
assert success is True
assert result == 19
assert mocked_method.call_count == 1
assert mocked_method.call_args.kwargs["json"] == {
"jsonrpc": "2.0",
"id": UUID4_STR,
"method": condition.method,
"params": condition.params,
}
def test_json_rpc_non_200_status(mocker):
# Mock the requests.get method to return a response with non-JSON content
mock_response = mocker.Mock()
mock_response.status_code = 400
mocker.patch("requests.post", return_value=mock_response)
condition = JsonRpcCondition(
endpoint="https://math.example.com/",
method="subtract",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
)
with pytest.raises(JsonRequestException, match="Failed to fetch from endpoint"):
condition.verify()
def test_json_rpc_condition_verify_error(mocker):
mock_response = mocker.Mock(status_code=200)
error = {
"jsonrpc": "2.0",
"error": {"code": -32601, "message": "Method not found"},
"id": "1",
}
mock_response.json.return_value = error
mocker.patch("requests.post", return_value=mock_response)
condition = JsonRpcCondition(
endpoint="https://math.example.com/",
method="foobar",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
)
with pytest.raises(
JsonRequestException,
match=f"code={error['error']['code']}, msg={error['error']['message']}",
):
condition.verify()
def test_json_rpc_condition_verify_invalid_json(mocker):
mock_response = mocker.Mock(status_code=200)
mock_response.json.side_effect = requests.exceptions.RequestException("Error")
mocker.patch("requests.post", return_value=mock_response)
condition = JsonRpcCondition(
endpoint="https://math.example.com/",
method="subtract",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
)
with pytest.raises(JsonRequestException, match="Failed to extract JSON response"):
condition.verify()
def test_json_rpc_condition_evaluation_with_auth_token(mocker):
mock_response = mocker.Mock(status_code=200)
mock_response.json.return_value = {"jsonrpc": "2.0", "result": 19, "id": 1}
mocked_method = mocker.patch("requests.post", return_value=mock_response)
condition = JsonRpcCondition(
endpoint="https://math.example.com/",
method="subtract",
params=[42, 23],
return_value_test=ReturnValueTest("==", 19),
authorization_token=":authToken",
)
assert condition.authorization_token == ":authToken"
auth_token = "1234567890"
context = {":authToken": f"{auth_token}"}
success, result = condition.verify(**context)
assert success is True
assert result == 19
assert mocked_method.call_count == 1
assert mocked_method.call_args.kwargs["json"] == {
"jsonrpc": "2.0",
"id": UUID4_STR,
"method": condition.method,
"params": condition.params,
}
assert mocked_method.call_count == 1
assert (
mocked_method.call_args.kwargs["headers"]["Authorization"]
== f"Bearer {auth_token}"
)
def test_json_rpc_condition_evaluation_with_various_context_variables(mocker):
mocked_post = mocker.patch(
"requests.post",
return_value=mocker.Mock(
status_code=200, json=lambda: {"result": {"mathresult": 19}}
),
)
condition = JsonRpcCondition(
endpoint="https://math.example.com/:version/simple",
method=":methodContextVar",
params=[":value1", 23],
query="$.:queryKey",
authorization_token=":authToken",
return_value_test=ReturnValueTest("==", ":expectedResult"),
)
auth_token = "1234567890"
context = {
":version": "v3",
":methodContextVar": "subtract", # TODO, should we allow this?
":value1": 42,
":queryKey": "mathresult",
":authToken": f"{auth_token}",
":expectedResult": 19,
}
assert condition.verify(**context) == (True, 19)
assert mocked_post.call_count == 1
call_args = mocked_post.call_args
assert call_args.args == (f"https://math.example.com/{context[':version']}/simple",)
assert call_args.kwargs["headers"]["Authorization"] == f"Bearer {auth_token}"
assert call_args.kwargs["json"] == {
"jsonrpc": "2.0",
"id": UUID4_STR,
"method": context[":methodContextVar"],
"params": [42, 23],
}
def test_json_rpc_condition_from_lingo_expression():
lingo_dict = {
"conditionType": ConditionType.JSONRPC.value,
"endpoint": "https://math.example.com/",
"method": "subtract",
"params": [42, 23],
"query": "$.mathresult",
"returnValueTest": {
"comparator": "==",
"value": 19,
},
}
cls = ConditionLingo.resolve_condition_class(lingo_dict, version=1)
assert cls == JsonRpcCondition
lingo_json = json.dumps(lingo_dict)
condition = JsonRpcCondition.from_json(lingo_json)
assert isinstance(condition, JsonRpcCondition)
assert condition.to_dict() == lingo_dict
def test_json_rpc_condition_from_lingo_expression_with_authorization():
lingo_dict = {
"conditionType": ConditionType.JSONRPC.value,
"endpoint": "https://example.com/",
"method": "subtract",
"params": {
"param1": 42,
"param2": "rando_param",
"param3": 1.25,
"param4": True,
},
"query": "$.mathresult",
"authorizationToken": ":authorizationToken",
"returnValueTest": {
"comparator": "==",
"value": 19,
},
}
cls = ConditionLingo.resolve_condition_class(lingo_dict, version=1)
assert cls == JsonRpcCondition
lingo_json = json.dumps(lingo_dict)
condition = JsonRpcCondition.from_json(lingo_json)
assert isinstance(condition, JsonRpcCondition)
assert condition.to_dict() == lingo_dict
def test_ambiguous_json_path_multiple_results(mocker):
mock_response = mocker.Mock(status_code=200)
mock_response.json.return_value = {
"result": {"mathresult": [{"answer": 19}, {"answer": -19}]}
}
mocker.patch("requests.post", return_value=mock_response)
condition = JsonRpcCondition(
endpoint="https://math.example.com/",
method="subtract",
params=[42, 23],
query="$.mathresult[*].answer",
return_value_test=ReturnValueTest("==", 19),
)
with pytest.raises(JsonRequestException, match="Ambiguous JSONPath query"):
condition.verify()