Allow authorization_token to be passed along.

Some code clean up.
pull/3573/head
derekpierre 2024-12-16 10:44:35 -05:00
parent 4655d667e4
commit a629f9b9a1
No known key found for this signature in database
3 changed files with 31 additions and 128 deletions

View File

@ -83,7 +83,7 @@ class JsonApiCondition(ExecutionCallAccessControlCondition):
query: Optional[str] = None,
parameters: Optional[dict] = None,
authorization_token: Optional[str] = None,
condition_type: str = ConditionType.JSONAPI.value,
condition_type: Optional[str] = ConditionType.JSONAPI.value,
name: Optional[str] = None,
):
super().__init__(

View File

@ -2,12 +2,11 @@ from abc import ABC
from http import HTTPMethod
from typing import Any, Optional, Tuple, override
from marshmallow import fields, post_load, validate
from marshmallow import ValidationError, fields, post_load, validate, validates
from marshmallow.fields import Url
from marshmallow.validate import OneOf
from nucypher.policy.conditions.context import is_context_variable
from nucypher.policy.conditions.exceptions import (
ConditionEvaluationFailed,
JsonRequestException,
)
from nucypher.policy.conditions.json.base import JSONPathField, JsonRequestCall
@ -17,7 +16,6 @@ from nucypher.policy.conditions.lingo import (
ExecutionCallAccessControlCondition,
ReturnValueTest,
)
from nucypher.utilities.logging import Logger
class BaseJsonRPCCall(JsonRequestCall, ABC):
@ -25,12 +23,21 @@ class BaseJsonRPCCall(JsonRequestCall, ABC):
method = fields.Str(required=True)
params = fields.Field(required=False, allow_none=True)
query = JSONPathField(required=False, allow_none=True)
authorization_token = fields.Str(required=False, allow_none=True)
@validates("authorization_token")
def validate_auth_token(self, value):
if value and not is_context_variable(value):
raise ValidationError(
f"Invalid value for authorization token; expected a context variable, but got '{value}'"
)
def __init__(
self,
method: str,
params: Optional[Any] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.method = method
self.params = params or []
@ -45,6 +52,7 @@ class BaseJsonRPCCall(JsonRequestCall, ABC):
http_method=HTTPMethod.POST,
parameters=parameters,
query=query,
authorization_token=authorization_token,
)
@override
@ -78,9 +86,15 @@ class JsonEndpointRPCCall(BaseJsonRPCCall):
method: str,
params: Optional[Any] = None,
query: Optional[str] = None,
authorization_token: Optional[str] = None,
):
self.endpoint = endpoint
super().__init__(method=method, params=params, query=query)
super().__init__(
method=method,
params=params,
query=query,
authorization_token=authorization_token,
)
@override
def execute(self, **context) -> Any:
@ -109,16 +123,20 @@ class JsonRPRCCondition(ExecutionCallAccessControlCondition):
return_value_test: ReturnValueTest,
params: Optional[Any] = None,
query: Optional[str] = None,
condition_type: str = ConditionType.JSONRPC.value,
authorization_token: Optional[str] = None,
condition_type: Optional[str] = ConditionType.JSONRPC.value,
name: Optional[str] = None,
):
self.endpoint = endpoint
super().__init__(
endpoint=endpoint,
method=method,
return_value_test=return_value_test,
params=params,
query=query,
authorization_token=authorization_token,
condition_type=condition_type,
return_value_test=return_value_test,
name=name,
)
@property
@ -133,6 +151,10 @@ class JsonRPRCCondition(ExecutionCallAccessControlCondition):
def query(self):
return self.execution_call.query
@property
def authorization_token(self):
return self.execution_call.authorization_token
@property
def timeout(self):
return self.execution_call.timeout
@ -146,117 +168,3 @@ class JsonRPRCCondition(ExecutionCallAccessControlCondition):
)
eval_result = resolved_return_value_test.eval(result_for_eval) # test
return eval_result, result
class NonEvmJsonRPCCall(BaseJsonRPCCall):
class Schema(BaseJsonRPCCall.Schema):
blockchain = fields.Str(required=True, validate=OneOf(["solana", "bitcoin"]))
@post_load
def make(self, data, **kwargs):
return NonEvmJsonRPCCall(**data)
def __init__(
self,
blockchain: str,
method: str,
params: Optional[Any] = None,
query: Optional[str] = None,
):
self.blockchain = blockchain
super().__init__(method=method, params=params, query=query)
@override
def execute(self, endpoint, **context) -> Any:
return self._execute(endpoint, **context)
class NonEvmJsonRPCCondition(ExecutionCallAccessControlCondition):
EXECUTION_CALL_TYPE = NonEvmJsonRPCCall
CONDITION_TYPE = ConditionType.NON_EVM_JSON_RPC.value
# TODO: this should be moved to `nucypher/chainlist`; here for now for POC
BLOCKCHAINS = {
"solana": [
"https://api.mainnet-beta.solana.com",
"https://solana.drpc.org",
],
"bitcoin": [
"https://docs-demo.btc.quiknode.pro/",
"https://bitcoin.drpc.org",
],
}
class Schema(ExecutionCallAccessControlCondition.Schema, NonEvmJsonRPCCall.Schema):
condition_type = fields.Str(
validate=validate.Equal(ConditionType.NON_EVM_JSON_RPC.value), required=True
)
@post_load
def make(self, data, **kwargs):
return NonEvmJsonRPCCondition(**data)
def __init__(
self,
blockchain: str,
method: str,
return_value_test: ReturnValueTest,
query: Optional[str] = None,
params: Optional[Any] = None,
condition_type: str = ConditionType.NON_EVM_JSON_RPC.value,
name: Optional[str] = None,
):
self.logger = Logger(__name__)
super().__init__(
blockchain=blockchain,
method=method,
return_value_test=return_value_test,
query=query,
params=params,
condition_type=condition_type,
name=name,
)
@property
def blockchain(self):
return self.execution_call.blockchain
@property
def method(self):
return self.execution_call.method
@property
def params(self):
return self.execution_call.params
@property
def query(self):
return self.execution_call.query
@property
def timeout(self):
return self.execution_call.timeout
def verify(self, **context) -> Tuple[bool, Any]:
blockchain_urls = self.BLOCKCHAINS[self.blockchain]
latest_error = ""
for url in blockchain_urls:
try:
result = self.execution_call.execute(endpoint=url, **context)
break
except JsonRequestException as e:
latest_error = f"Non-evm RPC call to {url} failed: {e}"
self.logger.warn(f"{latest_error}, attempting to try next endpoint.")
continue
else:
raise ConditionEvaluationFailed(
f"Unable to execute non-evm JSON RPC call using {blockchain_urls}; latest error - {latest_error}"
)
result_for_eval = process_result_for_condition_eval(result)
resolved_return_value_test = self.return_value_test.with_resolved_context(
**context
)
eval_result = resolved_return_value_test.eval(result_for_eval) # test
return eval_result, result

View File

@ -70,7 +70,6 @@ class ConditionType(Enum):
RPC = "rpc"
JSONAPI = "json-api"
JSONRPC = "json-rpc"
NON_EVM_JSON_RPC = "non-evm-json-rpc"
COMPOUND = "compound"
SEQUENTIAL = "sequential"
IF_THEN_ELSE = "if-then-else"
@ -700,10 +699,7 @@ class ConditionLingo(_Serializable):
"""
from nucypher.policy.conditions.evm import ContractCondition, RPCCondition
from nucypher.policy.conditions.json.api import JsonApiCondition
from nucypher.policy.conditions.json.rpc import (
JsonRPRCCondition,
NonEvmJsonRPCCondition,
)
from nucypher.policy.conditions.json.rpc import JsonRPRCCondition
from nucypher.policy.conditions.time import TimeCondition
# version logical adjustments can be made here as required
@ -716,7 +712,6 @@ class ConditionLingo(_Serializable):
CompoundAccessControlCondition,
JsonApiCondition,
JsonRPRCCondition,
NonEvmJsonRPCCondition,
SequentialAccessControlCondition,
IfThenElseCondition,
):