Refactor JSON functionality into its own package.

JsonRequestCall is the base abstract class for all HTTPS JSON-based requests.
JsonApiCall subclasses JSONRequestCall for use with the JsonApiCondition.
Update imports and references accordingly.
pull/3573/head
derekpierre 2024-12-16 09:14:42 -05:00
parent 254b0c54fb
commit 2a112005ff
No known key found for this signature in database
9 changed files with 298 additions and 276 deletions

View File

@ -74,9 +74,7 @@ class RPCCall(ExecutionCall):
"null": "Undefined method name",
},
)
parameters = fields.List(
fields.Field, attribute="parameters", required=False, allow_none=True
)
parameters = fields.List(fields.Field, required=False, allow_none=True)
@validates("method")
def validate_method(self, value):

View File

@ -45,3 +45,7 @@ class ConditionEvaluationFailed(Exception):
class RPCExecutionFailed(ConditionEvaluationFailed):
"""Raised when an exception is raised from an RPC call."""
class JsonRequestException(ConditionEvaluationFailed):
"""Raised when an exception is raised from a JSON request."""

View File

@ -0,0 +1,132 @@
from http import HTTPMethod
from typing import Any, Optional, Tuple, override
from marshmallow import ValidationError, fields, post_load, validate, validates
from marshmallow.fields import Url
from nucypher.policy.conditions.context import is_context_variable
from nucypher.policy.conditions.json.base import JSONPathField, JsonRequestCall
from nucypher.policy.conditions.json.utils import process_result_for_condition_eval
from nucypher.policy.conditions.lingo import (
ConditionType,
ExecutionCallAccessControlCondition,
ReturnValueTest,
)
from nucypher.utilities.logging import Logger
class JsonApiCall(JsonRequestCall):
TIMEOUT = 5 # seconds
class Schema(JsonRequestCall.Schema):
endpoint = Url(required=True, relative=False, schemes=["https"])
parameters = fields.Dict(required=False, allow_none=True)
query = JSONPathField(required=False, allow_none=True)
authorization_token = fields.Str(required=False, allow_none=True)
@post_load
def make(self, data, **kwargs):
return JsonApiCall(**data)
@validates("authorization_token")
def validate_auth_token(self, value):
if value and not is_context_variable(value):
raise ValidationError(
f"Invalid value for authorization token; expected a context variable, but got '{value}'"
)
def __init__(
self,
endpoint: str,
parameters: Optional[dict] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.endpoint = endpoint
super().__init__(
http_method=HTTPMethod.GET,
parameters=parameters,
query=query,
authorization_token=authorization_token,
)
self.logger = Logger(__name__)
@override
def execute(self, **context) -> Any:
return super()._execute(endpoint=self.endpoint, **context)
class JsonApiCondition(ExecutionCallAccessControlCondition):
"""
A JSON API condition is a condition that can be evaluated by reading from a JSON
HTTPS endpoint. The response must return an HTTP 200 with valid JSON in the response body.
The response will be deserialized as JSON and parsed using jsonpath.
"""
EXECUTION_CALL_TYPE = JsonApiCall
CONDITION_TYPE = ConditionType.JSONAPI.value
class Schema(ExecutionCallAccessControlCondition.Schema, JsonApiCall.Schema):
condition_type = fields.Str(
validate=validate.Equal(ConditionType.JSONAPI.value), required=True
)
@post_load
def make(self, data, **kwargs):
return JsonApiCondition(**data)
def __init__(
self,
endpoint: str,
return_value_test: ReturnValueTest,
query: Optional[str] = None,
parameters: Optional[dict] = None,
authorization_token: Optional[str] = None,
condition_type: str = ConditionType.JSONAPI.value,
name: Optional[str] = None,
):
super().__init__(
endpoint=endpoint,
return_value_test=return_value_test,
query=query,
parameters=parameters,
authorization_token=authorization_token,
condition_type=condition_type,
name=name,
)
@property
def endpoint(self):
return self.execution_call.endpoint
@property
def query(self):
return self.execution_call.query
@property
def parameters(self):
return self.execution_call.parameters
@property
def timeout(self):
return self.execution_call.timeout
@property
def authorization_token(self):
return self.execution_call.authorization_token
def verify(self, **context) -> Tuple[bool, Any]:
"""
Verifies the offchain condition is met by performing a read operation on the endpoint
and evaluating the return value test with the result. Parses the endpoint's JSON response using
JSONPath.
"""
result = self.execution_call.execute(**context)
result_for_eval = process_result_for_condition_eval(result)
resolved_return_value_test = self.return_value_test.with_resolved_context(
**context
)
eval_result = resolved_return_value_test.eval(result_for_eval) # test
return eval_result, result

View File

@ -0,0 +1,133 @@
import json
from abc import ABC
from http import HTTPMethod, HTTPStatus
from typing import Any, Optional
import requests
from jsonpath_ng.exceptions import JsonPathLexerError, JsonPathParserError
from jsonpath_ng.ext import parse
from marshmallow.fields import Field
from nucypher.policy.conditions.base import ExecutionCall
from nucypher.policy.conditions.context import (
resolve_any_context_variables,
string_contains_context_variable,
)
from nucypher.policy.conditions.exceptions import JsonRequestException
from nucypher.utilities.logging import Logger
class JsonRequestCall(ExecutionCall, ABC):
TIMEOUT = 5 # seconds
def __init__(
self,
http_method: HTTPMethod,
parameters: Optional[dict] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.http_method = http_method
self.parameters = parameters or {}
self.query = query
self.authorization_token = authorization_token
self.timeout = self.TIMEOUT
self.logger = Logger(__name__)
super().__init__()
def _execute(self, endpoint: str, **context) -> Any:
data = self._fetch(endpoint, **context)
result = self._query_response(data, **context)
return result
def _fetch(self, endpoint: str, **context) -> Any:
resolved_endpoint = resolve_any_context_variables(endpoint, **context)
resolved_parameters = resolve_any_context_variables(self.parameters, **context)
headers = {"Content-Type": "application/json"}
if self.authorization_token:
resolved_authorization_token = resolve_any_context_variables(
self.authorization_token, **context
)
headers = {"Authorization": f"Bearer {resolved_authorization_token}"}
try:
if self.http_method == HTTPMethod.GET:
response = requests.get(
resolved_endpoint,
params=resolved_parameters,
timeout=self.timeout,
headers=headers,
)
elif self.http_method == HTTPMethod.POST:
response = requests.post(
resolved_endpoint,
data=json.dumps(resolved_parameters),
timeout=self.timeout,
headers=headers,
)
else:
raise RuntimeError(f"Unsupported http method: {self.http_method}")
response.raise_for_status()
if response.status_code != HTTPStatus.OK:
raise JsonRequestException(
f"Failed to fetch from endpoint {resolved_endpoint}: {response.status_code}"
)
except requests.exceptions.RequestException as request_error:
raise JsonRequestException(
f"Failed to fetch from endpoint {resolved_endpoint}: {request_error}"
)
try:
data = response.json()
return data
except (requests.exceptions.RequestException, ValueError) as json_error:
raise JsonRequestException(
f"Failed to extract JSON response from {resolved_endpoint}: {json_error}"
)
def _query_response(self, response_json: Any, **context) -> Any:
if not self.query:
return response_json # primitive value
resolved_query = resolve_any_context_variables(self.query, **context)
try:
expression = parse(resolved_query)
matches = expression.find(response_json)
if not matches:
message = f"No matches found for the JSONPath query: {resolved_query}"
raise JsonRequestException(message)
except JsonRequestException as jsonpath_err:
self.logger.error(f"JSONPath error occurred: {jsonpath_err}")
raise JsonRequestException(
f"JSONPath error: {jsonpath_err}"
) from jsonpath_err
if len(matches) > 1:
message = f"Ambiguous JSONPath query - multiple matches found for: {resolved_query}"
self.logger.info(message)
raise JsonRequestException(message)
result = matches[0].value
return result
class JSONPathField(Field):
default_error_messages = {
"invalidType": "Expression of type {value} is not valid for JSONPath",
"invalid": "'{value}' is not a valid JSONPath expression",
}
def _deserialize(self, value, attr, data, **kwargs):
if not isinstance(value, str):
raise self.make_error("invalidType", value=type(value))
try:
if not string_contains_context_variable(value):
parse(value)
except (JsonPathLexerError, JsonPathParserError):
raise self.make_error("invalid", value=value)
return value

View File

@ -0,0 +1,17 @@
from typing import Any
def process_result_for_condition_eval(result: Any):
# strings that are not already quoted will cause a problem for literal_eval
if not isinstance(result, str):
return result
# check if already quoted; if not, quote it
if not (
(result.startswith("'") and result.endswith("'"))
or (result.startswith('"') and result.endswith('"'))
):
quote_type_to_use = '"' if "'" in result else "'"
result = f"{quote_type_to_use}{result}{quote_type_to_use}"
return result

View File

@ -697,7 +697,7 @@ class ConditionLingo(_Serializable):
conditions expression framework.
"""
from nucypher.policy.conditions.evm import ContractCondition, RPCCondition
from nucypher.policy.conditions.offchain import JsonApiCondition
from nucypher.policy.conditions.json.api import JsonApiCondition
from nucypher.policy.conditions.time import TimeCondition
# version logical adjustments can be made here as required

View File

@ -1,255 +0,0 @@
from typing import Any, Optional, Tuple
import requests
from jsonpath_ng.exceptions import JsonPathLexerError, JsonPathParserError
from jsonpath_ng.ext import parse
from marshmallow import ValidationError, fields, post_load, validate, validates
from marshmallow.fields import Field, Url
from nucypher.policy.conditions.base import ExecutionCall
from nucypher.policy.conditions.context import (
is_context_variable,
resolve_any_context_variables,
string_contains_context_variable,
)
from nucypher.policy.conditions.exceptions import (
ConditionEvaluationFailed,
InvalidCondition,
)
from nucypher.policy.conditions.lingo import (
ConditionType,
ExecutionCallAccessControlCondition,
ReturnValueTest,
)
from nucypher.utilities.logging import Logger
class JSONPathField(Field):
default_error_messages = {
"invalidType": "Expression of type {value} is not valid for JSONPath",
"invalid": "'{value}' is not a valid JSONPath expression",
}
def _deserialize(self, value, attr, data, **kwargs):
if not isinstance(value, str):
raise self.make_error("invalidType", value=type(value))
try:
if not string_contains_context_variable(value):
parse(value)
except (JsonPathLexerError, JsonPathParserError):
raise self.make_error("invalid", value=value)
return value
class JsonApiCall(ExecutionCall):
TIMEOUT = 5 # seconds
class Schema(ExecutionCall.Schema):
endpoint = Url(required=True, relative=False, schemes=["https"])
parameters = fields.Dict(required=False, allow_none=True)
query = JSONPathField(required=False, allow_none=True)
authorization_token = fields.Str(required=False, allow_none=True)
@post_load
def make(self, data, **kwargs):
return JsonApiCall(**data)
@validates("authorization_token")
def validate_auth_token(self, value):
if value and not is_context_variable(value):
raise ValidationError(
f"Invalid value for authorization token; expected a context variable, but got '{value}'"
)
def __init__(
self,
endpoint: str,
parameters: Optional[dict] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.endpoint = endpoint
self.parameters = parameters or {}
self.query = query
self.authorization_token = authorization_token
self.timeout = self.TIMEOUT
self.logger = Logger(__name__)
super().__init__()
def execute(self, **context) -> Any:
response = self._fetch(**context)
data = self._deserialize_response(response)
result = self._query_response(data, **context)
return result
def _fetch(self, **context) -> requests.Response:
"""Fetches data from the endpoint."""
resolved_endpoint = resolve_any_context_variables(self.endpoint, **context)
resolved_parameters = resolve_any_context_variables(self.parameters, **context)
headers = None
if self.authorization_token:
resolved_authorization_token = resolve_any_context_variables(
self.authorization_token, **context
)
headers = {"Authorization": f"Bearer {resolved_authorization_token}"}
try:
response = requests.get(
resolved_endpoint,
params=resolved_parameters,
timeout=self.timeout,
headers=headers,
)
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
self.logger.error(f"HTTP error occurred: {http_error}")
raise ConditionEvaluationFailed(
f"Failed to fetch endpoint {resolved_endpoint}: {http_error}"
)
except requests.exceptions.RequestException as request_error:
self.logger.error(f"Request exception occurred: {request_error}")
raise InvalidCondition(
f"Failed to fetch endpoint {resolved_endpoint}: {request_error}"
)
if response.status_code != 200:
self.logger.error(
f"Failed to fetch endpoint {resolved_endpoint}: {response.status_code}"
)
raise ConditionEvaluationFailed(
f"Failed to fetch endpoint {resolved_endpoint}: {response.status_code}"
)
return response
def _deserialize_response(self, response: requests.Response) -> Any:
"""Deserializes the JSON response from the endpoint."""
try:
data = response.json()
except (requests.exceptions.RequestException, ValueError) as json_error:
self.logger.error(f"JSON parsing error occurred: {json_error}")
raise ConditionEvaluationFailed(
f"Failed to parse JSON response: {json_error}"
)
return data
def _query_response(self, data: Any, **context) -> Any:
if not self.query:
return data # primitive value
resolved_query = resolve_any_context_variables(self.query, **context)
try:
expression = parse(resolved_query)
matches = expression.find(data)
if not matches:
message = f"No matches found for the JSONPath query: {resolved_query}"
self.logger.info(message)
raise ConditionEvaluationFailed(message)
except (JsonPathLexerError, JsonPathParserError) as jsonpath_err:
self.logger.error(f"JSONPath error occurred: {jsonpath_err}")
raise ConditionEvaluationFailed(f"JSONPath error: {jsonpath_err}")
if len(matches) > 1:
message = f"Ambiguous JSONPath query - Multiple matches found for: {resolved_query}"
self.logger.info(message)
raise ConditionEvaluationFailed(message)
result = matches[0].value
return result
class JsonApiCondition(ExecutionCallAccessControlCondition):
"""
A JSON API condition is a condition that can be evaluated by reading from a JSON
HTTPS endpoint. The response must return an HTTP 200 with valid JSON in the response body.
The response will be deserialized as JSON and parsed using jsonpath.
"""
EXECUTION_CALL_TYPE = JsonApiCall
CONDITION_TYPE = ConditionType.JSONAPI.value
class Schema(ExecutionCallAccessControlCondition.Schema, JsonApiCall.Schema):
condition_type = fields.Str(
validate=validate.Equal(ConditionType.JSONAPI.value), required=True
)
@post_load
def make(self, data, **kwargs):
return JsonApiCondition(**data)
def __init__(
self,
endpoint: str,
return_value_test: ReturnValueTest,
query: Optional[str] = None,
parameters: Optional[dict] = None,
authorization_token: Optional[str] = None,
condition_type: str = ConditionType.JSONAPI.value,
name: Optional[str] = None,
):
super().__init__(
endpoint=endpoint,
return_value_test=return_value_test,
query=query,
parameters=parameters,
authorization_token=authorization_token,
condition_type=condition_type,
name=name,
)
@property
def endpoint(self):
return self.execution_call.endpoint
@property
def query(self):
return self.execution_call.query
@property
def parameters(self):
return self.execution_call.parameters
@property
def timeout(self):
return self.execution_call.timeout
@property
def authorization_token(self):
return self.execution_call.authorization_token
@staticmethod
def _process_result_for_eval(result: Any):
# strings that are not already quoted will cause a problem for literal_eval
if not isinstance(result, str):
return result
# check if already quoted; if not, quote it
if not (
(result.startswith("'") and result.endswith("'"))
or (result.startswith('"') and result.endswith('"'))
):
quote_type_to_use = '"' if "'" in result else "'"
result = f"{quote_type_to_use}{result}{quote_type_to_use}"
return result
def verify(self, **context) -> Tuple[bool, Any]:
"""
Verifies the offchain condition is met by performing a read operation on the endpoint
and evaluating the return value test with the result. Parses the endpoint's JSON response using
JSONPath.
"""
result = self.execution_call.execute(**context)
result_for_eval = self._process_result_for_eval(result)
resolved_return_value_test = self.return_value_test.with_resolved_context(
**context
)
eval_result = resolved_return_value_test.eval(result_for_eval) # test
return eval_result, result

View File

@ -5,14 +5,12 @@ import requests
from marshmallow import ValidationError
from nucypher.policy.conditions.exceptions import (
ConditionEvaluationFailed,
InvalidCondition,
JsonRequestException,
)
from nucypher.policy.conditions.json.api import JsonApiCondition
from nucypher.policy.conditions.json.base import JSONPathField
from nucypher.policy.conditions.lingo import ConditionLingo, ReturnValueTest
from nucypher.policy.conditions.offchain import (
JsonApiCondition,
JSONPathField,
)
def test_jsonpath_field_valid():
@ -97,9 +95,8 @@ def test_json_api_condition_fetch(mocker):
query="$.store.book[0].title",
return_value_test=ReturnValueTest("==", "'Test Title'"),
)
response = condition.execution_call._fetch()
assert response.status_code == 200
assert response.json() == {"store": {"book": [{"title": "Test Title"}]}}
data = condition.execution_call._fetch(condition.endpoint)
assert data == {"store": {"book": [{"title": "Test Title"}]}}
def test_json_api_condition_fetch_failure(mocker):
@ -112,8 +109,8 @@ def test_json_api_condition_fetch_failure(mocker):
query="$.store.book[0].price",
return_value_test=ReturnValueTest("==", 1),
)
with pytest.raises(InvalidCondition, match="Failed to fetch endpoint"):
condition.execution_call._fetch()
with pytest.raises(JsonRequestException, match="Failed to fetch from endpoint"):
condition.execution_call._fetch(condition.endpoint)
def test_json_api_condition_verify(mocker):
@ -179,9 +176,7 @@ def test_json_api_condition_verify_invalid_json(mocker):
query="$.store.book[0].price",
return_value_test=ReturnValueTest("==", 2),
)
with pytest.raises(
ConditionEvaluationFailed, match="Failed to parse JSON response"
):
with pytest.raises(JsonRequestException, match="Failed to extract JSON response"):
condition.verify()
@ -200,9 +195,7 @@ def test_non_json_response(mocker):
return_value_test=ReturnValueTest("==", 18),
)
with pytest.raises(
ConditionEvaluationFailed, match="Failed to parse JSON response"
):
with pytest.raises(JsonRequestException, match="Failed to extract JSON response"):
condition.verify()
@ -382,5 +375,5 @@ def test_ambiguous_json_path_multiple_results(mocker):
return_value_test=ReturnValueTest("==", 1),
)
with pytest.raises(ConditionEvaluationFailed, match="Ambiguous JSONPath query"):
with pytest.raises(JsonRequestException, match="Ambiguous JSONPath query"):
condition.verify()