From 9e8df7509fc240ce61f3f346bb96da1488f367a8 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Tue, 22 Sep 2020 17:04:45 -0400 Subject: [PATCH] Initial work for retry functionality with Alchemy providers. --- nucypher/blockchain/eth/providers.py | 87 +++++++++++++++++++++++++++- tests/unit/test_web3_clients.py | 69 +++++++++++++++++++++- 2 files changed, 153 insertions(+), 3 deletions(-) diff --git a/nucypher/blockchain/eth/providers.py b/nucypher/blockchain/eth/providers.py index 5611dea3e..16553dba7 100644 --- a/nucypher/blockchain/eth/providers.py +++ b/nucypher/blockchain/eth/providers.py @@ -15,16 +15,21 @@ You should have received a copy of the GNU Affero General Public License along with nucypher. If not, see . """ import os +import time + from eth_tester import EthereumTester, PyEVMBackend from eth_tester.backends.mock.main import MockBackend -from typing import Union +from typing import Union, Callable from urllib.parse import urlparse from web3 import HTTPProvider, IPCProvider, WebsocketProvider from web3.exceptions import InfuraKeyNotFound +from web3.providers import BaseProvider from web3.providers.eth_tester.main import EthereumTesterProvider +from web3.types import RPCResponse, RPCError from nucypher.blockchain.eth.clients import NuCypherGethDevProcess from nucypher.exceptions import DevelopmentInstallationRequired +from nucypher.utilities.logging import Logger class ProviderError(Exception): @@ -41,11 +46,17 @@ def _get_IPC_provider(provider_uri): def _get_HTTP_provider(provider_uri): from nucypher.blockchain.eth.interfaces import BlockchainInterface + if 'alchemyapi.io' in provider_uri: + return AlchemyHTTPProvider(endpoint_uri=provider_uri, request_kwargs={'timeout': BlockchainInterface.TIMEOUT}) + return HTTPProvider(endpoint_uri=provider_uri, request_kwargs={'timeout': BlockchainInterface.TIMEOUT}) def _get_websocket_provider(provider_uri): from nucypher.blockchain.eth.interfaces import BlockchainInterface + if 'alchemyapi.io' in provider_uri: + return AlchemyWebsocketProvider(endpoint_uri=provider_uri, + websocket_kwargs={'timeout': BlockchainInterface.TIMEOUT}) return WebsocketProvider(endpoint_uri=provider_uri, websocket_kwargs={'timeout': BlockchainInterface.TIMEOUT}) @@ -140,3 +151,77 @@ def _get_test_geth_parity_provider(provider_uri): def _get_tester_ganache(provider_uri=None): endpoint_uri = provider_uri or 'http://localhost:7545' return HTTPProvider(endpoint_uri=endpoint_uri) + + +def make_rpc_request_with_retry(provider: BaseProvider, + should_retry: Callable[[RPCResponse], bool], + logger: Logger = None, + num_retries: int = 3, + exponential_backoff: bool = True, + *args, + **kwargs) -> RPCResponse: + response = provider.make_request(*args, **kwargs) + if should_retry(response): + # make additional retries with exponential back-off + retries = 1 + while True: + if exponential_backoff: + time.sleep(2 ** retries) # exponential back-off + + response = provider.make_request(*args, **kwargs) + if not should_retry(response): + if logger: + logger.debug(f'Retried alchemy request completed after {retries} request') + break + + if retries >= num_retries: + if logger: + logger.warn(f'Alchemy request retried {num_retries} times but was not completed') + break + + retries += 1 + + return response + + +# Alchemy specific code +def _alchemy_should_retry_request(response: RPCResponse) -> bool: + error = response.get('error') + if error: + # see see https://docs.alchemyapi.io/guides/rate-limits#test-rate-limits-retries + # either instance of RPCError or str + if isinstance(error, str) and 'retries' in error: + return True + else: # RPCError TypeDict + if error.get('code') == 429 or 'retries' in error.get('message'): + return True + + return False + + +class AlchemyHTTPProvider(HTTPProvider): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.log = Logger(self.__class__.__name__) + + def make_request(self, *args, **kwargs) -> RPCResponse: + response = make_rpc_request_with_retry(provider=super(), + should_retry=_alchemy_should_retry_request, + logger=self.log, + *args, + **kwargs) + return response + + +class AlchemyWebsocketProvider(WebsocketProvider): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.log = Logger(self.__class__.__name__) + + def make_request(self, *args, **kwargs) -> RPCResponse: + response = make_rpc_request_with_retry(provider=super(), + should_retry=_alchemy_should_retry_request, + logger=self.log, + *args, + **kwargs) + return response diff --git a/tests/unit/test_web3_clients.py b/tests/unit/test_web3_clients.py index 956a18f8e..b6650fc63 100644 --- a/tests/unit/test_web3_clients.py +++ b/tests/unit/test_web3_clients.py @@ -16,15 +16,17 @@ """ import datetime -from unittest.mock import PropertyMock +from unittest.mock import Mock, PropertyMock +import maya import pytest from web3 import HTTPProvider, IPCProvider, WebsocketProvider +from web3.types import RPCResponse, RPCError from nucypher.blockchain.eth.clients import (EthereumClient, GanacheClient, GethClient, InfuraClient, PUBLIC_CHAINS, ParityClient) from nucypher.blockchain.eth.interfaces import BlockchainInterface -from tests.mock.interfaces import MockEthereumClient +from nucypher.blockchain.eth.providers import make_rpc_request_with_retry, _alchemy_should_retry_request DEFAULT_GAS_PRICE = 42 GAS_PRICE_FROM_STRATEGY = 1234 @@ -346,3 +348,66 @@ def test_gas_prices(mocker, mock_ethereum_client): assert mock_ethereum_client.gas_price == DEFAULT_GAS_PRICE assert mock_ethereum_client.gas_price_for_transaction("there's no gas strategy") == DEFAULT_GAS_PRICE assert mock_ethereum_client.gas_price_for_transaction("2nd time is the charm") == GAS_PRICE_FROM_STRATEGY + + +def test_alchemy_rpc_request_with_retry(): + retries = 4 + + # Retry Case - RPCResponse fails due to limits, and retry required + retry_responses = [ + RPCResponse(error=RPCError(code=-32000, + message='Your app has exceeded its compute units per second capacity. If you have ' + 'retries enabled, you can safely ignore this message. If not, ' + 'check out https://docs.alchemyapi.io/guides/rate-limits')), + RPCResponse(error=RPCError(code=429, message='Too many concurrent requests')), + RPCResponse(error='Your app has exceeded its compute units per second capacity. If you have retries enabled, ' + 'you can safely ignore this message. If not, ' + 'check out https://docs.alchemyapi.io/guides/rate-limits') + ] + for test_response in retry_responses: + provider = Mock() + provider.make_request.return_value = test_response + retry_response = make_rpc_request_with_retry(provider, + should_retry=_alchemy_should_retry_request, + logger=None, + num_retries=retries, + exponential_backoff=False) # disable exponential backoff + assert retry_response == test_response + assert provider.make_request.call_count == (retries + 1) # one call, and then the number of retries + + +def test_alchemy_rpc_request_success_with_no_retry(): + # Success Case - retry not needed + provider = Mock() + successful_response = RPCResponse(id=0, result='0xa1c054') + provider.make_request.return_value = successful_response + retry_response = make_rpc_request_with_retry(provider, + should_retry=_alchemy_should_retry_request, + logger=None, + num_retries=10, + exponential_backoff=False) # disable exponential backoff + assert retry_response == successful_response + assert provider.make_request.call_count == 1 # first request was successful, no need for retries + + +# TODO - since this test does exponential backoff it takes >= 2^1 = 2s, should we only run on circleci? +def test_alchemy_rpc_request_with_retry_exponential_backoff(): + retries = 1 + provider = Mock() + + # Retry Case - RPCResponse fails due to limits, and retry required + test_response = RPCResponse(error=RPCError(code=429, message='Too many concurrent requests')) + provider.make_request.return_value = test_response + start = maya.now() + retry_response = make_rpc_request_with_retry(provider, + should_retry=_alchemy_should_retry_request, + logger=None, + num_retries=retries, + exponential_backoff=True) # enable exponential backoff + end = maya.now() + assert retry_response == test_response + assert provider.make_request.call_count == (retries + 1) # one call, and then the number of retries + + # check exponential backoff + delta = end - start + assert delta.total_seconds() >= 2**retries