mirror of https://github.com/nucypher/nucypher.git
Merge pull request #2389 from cygnusv/gaseoso
Track pending commitment transactions in WorkTrackerpull/2396/head
commit
3d0e54f328
|
@ -575,7 +575,10 @@ class StakingEscrowAgent(EthereumContractAgent):
|
|||
return self.bond_worker(staker_address=staker_address, worker_address=NULL_ADDRESS)
|
||||
|
||||
@contract_api(TRANSACTION)
|
||||
def commit_to_next_period(self, worker_address: ChecksumAddress, fire_and_forget: bool = True) -> TxReceipt: # TODO: make fire_and_forget required
|
||||
def commit_to_next_period(self,
|
||||
worker_address: ChecksumAddress,
|
||||
fire_and_forget: bool = True, # TODO: make fire_and_forget required? See #2385 too.
|
||||
) -> Union[TxReceipt, HexBytes]:
|
||||
"""
|
||||
For each period that the worker makes a commitment, the staker is rewarded.
|
||||
"""
|
||||
|
|
|
@ -384,6 +384,13 @@ class EthereumClient:
|
|||
def get_transaction(self, transaction_hash) -> dict:
|
||||
return self.w3.eth.getTransaction(transaction_hash)
|
||||
|
||||
def get_transaction_receipt(self, transaction_hash) -> Union[dict, None]:
|
||||
return self.w3.eth.getTransactionReceipt(transaction_hash)
|
||||
|
||||
def get_transaction_count(self, account: str, pending: bool) -> int:
|
||||
block_identifier = 'pending' if pending else 'latest'
|
||||
return self.w3.eth.getTransactionCount(account, block_identifier)
|
||||
|
||||
def send_transaction(self, transaction_dict: dict) -> str:
|
||||
return self.w3.eth.sendTransaction(transaction_dict)
|
||||
|
||||
|
|
|
@ -59,8 +59,8 @@ from nucypher.blockchain.eth.registry import BaseContractRegistry
|
|||
from nucypher.blockchain.eth.sol.compile import SolidityCompiler
|
||||
from nucypher.blockchain.eth.utils import get_transaction_name, prettify_eth_amount
|
||||
from nucypher.characters.control.emitters import JSONRPCStdoutEmitter, StdoutEmitter
|
||||
from nucypher.utilities.datafeeds import datafeed_fallback_gas_price_strategy
|
||||
from nucypher.utilities.ethereum import encode_constructor_arguments
|
||||
from nucypher.utilities.gas_strategies import datafeed_fallback_gas_price_strategy, WEB3_GAS_STRATEGIES
|
||||
from nucypher.utilities.logging import GlobalLoggerSettings, Logger
|
||||
|
||||
|
||||
|
@ -77,11 +77,7 @@ class BlockchainInterface:
|
|||
TIMEOUT = 600 # seconds # TODO: Correlate with the gas strategy - #2070
|
||||
|
||||
DEFAULT_GAS_STRATEGY = 'fast'
|
||||
GAS_STRATEGIES = {'glacial': time_based.glacial_gas_price_strategy, # 24h
|
||||
'slow': time_based.slow_gas_price_strategy, # 1h
|
||||
'medium': time_based.medium_gas_price_strategy, # 5m
|
||||
'fast': time_based.fast_gas_price_strategy # 60s
|
||||
}
|
||||
GAS_STRATEGIES = WEB3_GAS_STRATEGIES
|
||||
|
||||
process = NO_PROVIDER_PROCESS.bool_value(False)
|
||||
Web3 = Web3
|
||||
|
@ -235,7 +231,7 @@ class BlockchainInterface:
|
|||
self.client = NO_BLOCKCHAIN_CONNECTION # type: EthereumClient
|
||||
self.transacting_power = READ_ONLY_INTERFACE
|
||||
self.is_light = light
|
||||
self.gas_strategy = self.get_gas_strategy(gas_strategy)
|
||||
self.gas_strategy = gas_strategy
|
||||
|
||||
def __repr__(self):
|
||||
r = '{name}({uri})'.format(name=self.__class__.__name__, uri=self.provider_uri)
|
||||
|
@ -289,8 +285,9 @@ class BlockchainInterface:
|
|||
# so we use external gas price oracles, instead (see #2139)
|
||||
if isinstance(self.client, InfuraClient):
|
||||
gas_strategy = datafeed_fallback_gas_price_strategy
|
||||
self.gas_strategy = 'fast' # FIXME
|
||||
else:
|
||||
gas_strategy = self.gas_strategy
|
||||
gas_strategy = self.get_gas_strategy(self.gas_strategy)
|
||||
self.client.set_gas_strategy(gas_strategy=gas_strategy)
|
||||
gwei_gas_price = Web3.fromWei(self.client.gas_price_for_transaction(), 'gwei')
|
||||
self.log.debug(f"Currently, our gas strategy returns a gas price of {gwei_gas_price} gwei")
|
||||
|
@ -493,10 +490,12 @@ class BlockchainInterface:
|
|||
sender_address: str,
|
||||
payload: dict = None,
|
||||
transaction_gas_limit: int = None,
|
||||
use_pending_nonce: bool = True,
|
||||
) -> dict:
|
||||
|
||||
nonce = self.client.get_transaction_count(account=sender_address, pending=use_pending_nonce)
|
||||
base_payload = {'chainId': int(self.client.chain_id),
|
||||
'nonce': self.client.w3.eth.getTransactionCount(sender_address, 'pending'),
|
||||
'nonce': nonce,
|
||||
'from': sender_address}
|
||||
|
||||
# Aggregate
|
||||
|
@ -515,6 +514,7 @@ class BlockchainInterface:
|
|||
payload: dict = None,
|
||||
transaction_gas_limit: Optional[int] = None,
|
||||
gas_estimation_multiplier: Optional[float] = None,
|
||||
use_pending_nonce: Optional[bool] = None,
|
||||
) -> dict:
|
||||
|
||||
# Sanity checks for the gas estimation multiplier
|
||||
|
@ -527,7 +527,8 @@ class BlockchainInterface:
|
|||
|
||||
payload = self.build_payload(sender_address=sender_address,
|
||||
payload=payload,
|
||||
transaction_gas_limit=transaction_gas_limit)
|
||||
transaction_gas_limit=transaction_gas_limit,
|
||||
use_pending_nonce=use_pending_nonce)
|
||||
self.__log_transaction(transaction_dict=payload, contract_function=contract_function)
|
||||
try:
|
||||
transaction_dict = contract_function.buildTransaction(payload) # Gas estimation occurs here
|
||||
|
@ -653,17 +654,24 @@ class BlockchainInterface:
|
|||
transaction_gas_limit: Optional[int] = None,
|
||||
gas_estimation_multiplier: Optional[float] = None,
|
||||
confirmations: int = 0,
|
||||
fire_and_forget: bool = False # do not wait for receipt.
|
||||
fire_and_forget: bool = False, # do not wait for receipt. See #2385
|
||||
) -> dict:
|
||||
|
||||
if fire_and_forget and confirmations > 0:
|
||||
raise ValueError('Transaction Prevented: Cannot use confirmations and fire_and_forget options together.')
|
||||
if fire_and_forget:
|
||||
if confirmations > 0:
|
||||
raise ValueError("Transaction Prevented: "
|
||||
"Cannot use 'confirmations' and 'fire_and_forget' options together.")
|
||||
|
||||
use_pending_nonce = False # TODO: #2385
|
||||
else:
|
||||
use_pending_nonce = None # TODO: #2385
|
||||
|
||||
transaction = self.build_contract_transaction(contract_function=contract_function,
|
||||
sender_address=sender_address,
|
||||
payload=payload,
|
||||
transaction_gas_limit=transaction_gas_limit,
|
||||
gas_estimation_multiplier=gas_estimation_multiplier)
|
||||
gas_estimation_multiplier=gas_estimation_multiplier,
|
||||
use_pending_nonce=use_pending_nonce)
|
||||
|
||||
# Get transaction name
|
||||
try:
|
||||
|
|
|
@ -18,19 +18,28 @@ import random
|
|||
from _pydecimal import Decimal
|
||||
from collections import UserList
|
||||
from enum import Enum
|
||||
from typing import Callable, Dict, Union, List
|
||||
|
||||
import maya
|
||||
from constant_sorrow.constants import (EMPTY_STAKING_SLOT, NEW_STAKE, NOT_STAKING, NO_STAKING_RECEIPT,
|
||||
UNKNOWN_WORKER_STATUS)
|
||||
import random
|
||||
from constant_sorrow.constants import (
|
||||
EMPTY_STAKING_SLOT,
|
||||
NEW_STAKE,
|
||||
NOT_STAKING,
|
||||
UNTRACKED_PENDING_TRANSACTION
|
||||
)
|
||||
from eth_utils import currency, is_checksum_address
|
||||
from hexbytes.main import HexBytes
|
||||
from twisted.internet import reactor, task
|
||||
from typing import Callable, Dict, Union
|
||||
from web3.exceptions import TransactionNotFound
|
||||
|
||||
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent
|
||||
from nucypher.blockchain.eth.constants import AVERAGE_BLOCK_TIME_IN_SECONDS
|
||||
from nucypher.blockchain.eth.decorators import validate_checksum_address
|
||||
from nucypher.blockchain.eth.registry import BaseContractRegistry
|
||||
from nucypher.blockchain.eth.utils import datetime_at_period
|
||||
from nucypher.types import SubStakeInfo, NuNits, StakerInfo, Period
|
||||
from nucypher.utilities.gas_strategies import EXPECTED_CONFIRMATION_TIME_IN_SECONDS
|
||||
from nucypher.utilities.logging import Logger
|
||||
|
||||
|
||||
|
@ -529,16 +538,22 @@ class WorkTracker:
|
|||
INTERVAL_FLOOR = 60 * 15 # fifteen minutes
|
||||
INTERVAL_CEIL = 60 * 180 # three hours
|
||||
|
||||
ALLOWED_DEVIATION = 0.5 # i.e., up to +50% from the expected confirmation time
|
||||
|
||||
def __init__(self, worker, *args, **kwargs):
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
self.log = Logger('stake-tracker')
|
||||
self.worker = worker
|
||||
self.staking_agent = self.worker.staking_agent
|
||||
self.client = self.staking_agent.blockchain.client
|
||||
|
||||
self.gas_strategy = self.staking_agent.blockchain.gas_strategy
|
||||
|
||||
self._tracking_task = task.LoopingCall(self._do_work)
|
||||
self._tracking_task.clock = self.CLOCK
|
||||
|
||||
self.__pending = dict() # TODO: Prime with pending worker transactions
|
||||
self.__requirement = None
|
||||
self.__current_period = None
|
||||
self.__start_time = NOT_STAKING
|
||||
|
@ -553,6 +568,11 @@ class WorkTracker:
|
|||
def current_period(self):
|
||||
return self.__current_period
|
||||
|
||||
def max_confirmation_time(self) -> int:
|
||||
expected_time = EXPECTED_CONFIRMATION_TIME_IN_SECONDS[self.gas_strategy]
|
||||
result = expected_time * (1 + self.ALLOWED_DEVIATION)
|
||||
return result
|
||||
|
||||
def stop(self) -> None:
|
||||
if self._tracking_task.running:
|
||||
self._tracking_task.stop()
|
||||
|
@ -590,58 +610,152 @@ class WorkTracker:
|
|||
def handle_working_errors(self, *args, **kwargs) -> None:
|
||||
failure = args[0]
|
||||
if self._abort_on_error:
|
||||
self.log.critical('Unhandled error during node work tracking. {failure!r}',
|
||||
self.log.critical(f'Unhandled error during node work tracking. {failure!r}',
|
||||
failure=failure)
|
||||
reactor.callFromThread(self._crash_gracefully, failure=failure)
|
||||
else:
|
||||
self.log.warn('Unhandled error during work tracking: {failure.getTraceback()!r}',
|
||||
self.log.warn(f'Unhandled error during work tracking: {failure.getTraceback()!r}',
|
||||
failure=failure)
|
||||
|
||||
def __check_work_requirement(self) -> bool:
|
||||
def __work_requirement_is_satisfied(self) -> bool:
|
||||
# TODO: Check for stake expiration and exit
|
||||
if self.__requirement is None:
|
||||
return True
|
||||
try:
|
||||
r = self.__requirement()
|
||||
if not isinstance(r, bool):
|
||||
raise ValueError(f"'requirement' must return a boolean.")
|
||||
except TypeError:
|
||||
raise ValueError(f"'requirement' must be a callable.")
|
||||
r = self.__requirement()
|
||||
if not isinstance(r, bool):
|
||||
raise ValueError(f"'requirement' must return a boolean.")
|
||||
return r
|
||||
|
||||
@property
|
||||
def pending(self) -> Dict[int, HexBytes]:
|
||||
return self.__pending.copy()
|
||||
|
||||
def __tracking_consistency_check(self) -> bool:
|
||||
worker_address = self.worker.worker_address
|
||||
tx_count_pending = self.client.get_transaction_count(account=worker_address, pending=True)
|
||||
tx_count_latest = self.client.get_transaction_count(account=worker_address, pending=False)
|
||||
txs_in_mempool = tx_count_pending - tx_count_latest
|
||||
if len(self.__pending) == txs_in_mempool:
|
||||
return True # OK!
|
||||
if txs_in_mempool > len(self.__pending): # We're missing some pending TXs
|
||||
return False
|
||||
|
||||
# TODO: Not sure what to do in this case, but let's do this for the moment
|
||||
# Note that the block my have changed since the previous query
|
||||
# elif txs_in_mempool < len(self.__pending): # Our tracking is somehow outdated
|
||||
# return False
|
||||
|
||||
def __track_pending_commitments(self) -> bool:
|
||||
# TODO: Keep a purpose-built persistent log of worker transaction history
|
||||
|
||||
unmined_transactions = list()
|
||||
pending_transactions = self.pending.items() # note: this must be performed non-mutatively
|
||||
for tx_firing_block_number, txhash in sorted(pending_transactions):
|
||||
try:
|
||||
confirmed_tx_receipt = self.client.get_transaction_receipt(transaction_hash=txhash)
|
||||
except TransactionNotFound:
|
||||
unmined_transactions.append(txhash) # mark as unmined - Keep tracking it for now
|
||||
continue
|
||||
else:
|
||||
confirmation_block_number = confirmed_tx_receipt['blockNumber']
|
||||
confirmations = confirmation_block_number - tx_firing_block_number
|
||||
self.log.info(f'Commitment transaction {txhash.hex()[:10]} confirmed: {confirmations} confirmations')
|
||||
del self.__pending[tx_firing_block_number]
|
||||
|
||||
if unmined_transactions:
|
||||
pluralize = "s" if len(unmined_transactions) > 1 else ""
|
||||
self.log.info(f'{len(unmined_transactions)} pending commitment transaction{pluralize} detected.')
|
||||
|
||||
inconsistency = self.__tracking_consistency_check() is False
|
||||
if inconsistency:
|
||||
# If we detect there's a mismatch between the number of internally tracked and
|
||||
# pending block transactions, create a special pending TX that accounts for this.
|
||||
# TODO: Detect if this untracked pending transaction is a commitment transaction at all.
|
||||
self.__pending[0] = UNTRACKED_PENDING_TRANSACTION
|
||||
return True
|
||||
|
||||
return bool(self.__pending)
|
||||
|
||||
def __fire_replacement_commitment(self, current_block_number: int, tx_firing_block_number: int) -> None:
|
||||
replacement_txhash = self.__fire_commitment() # replace
|
||||
self.__pending[current_block_number] = replacement_txhash # track this transaction
|
||||
del self.__pending[tx_firing_block_number] # assume our original TX is stuck
|
||||
|
||||
def __handle_replacement_commitment(self, current_block_number: int) -> None:
|
||||
tx_firing_block_number, txhash = list(sorted(self.pending.items()))[0]
|
||||
self.log.info(f'Waiting for pending commitment transaction to be mined ({txhash}).')
|
||||
|
||||
# If the transaction is still not mined after a max confirmation time
|
||||
# (based on current gas strategy) issue a replacement transaction.
|
||||
wait_time_in_blocks = current_block_number - tx_firing_block_number
|
||||
wait_time_in_seconds = wait_time_in_blocks * AVERAGE_BLOCK_TIME_IN_SECONDS
|
||||
if wait_time_in_seconds > self.max_confirmation_time():
|
||||
if txhash is UNTRACKED_PENDING_TRANSACTION:
|
||||
# TODO: Detect if this untracked pending transaction is a commitment transaction at all.
|
||||
message = f"We've an untracked pending transaction. Issuing a replacement transaction."
|
||||
else:
|
||||
message = f"We've waited for {wait_time_in_seconds}, but max time is {self.max_confirmation_time()}" \
|
||||
f" for {self.gas_strategy} gas strategy. Issuing a replacement transaction."
|
||||
self.log.info(message)
|
||||
self.__fire_replacement_commitment(current_block_number=current_block_number,
|
||||
tx_firing_block_number=tx_firing_block_number)
|
||||
|
||||
|
||||
def _do_work(self) -> None:
|
||||
"""
|
||||
Async working task for Ursula # TODO: Split into multiple async tasks
|
||||
"""
|
||||
|
||||
# Randomize the task interval over time, within bounds.
|
||||
self._tracking_task.interval = self.random_interval()
|
||||
# Call once here, and inject later for temporal consistency
|
||||
current_block_number = self.client.block_number
|
||||
|
||||
# Commitment tracking
|
||||
unmined_transactions = self.__track_pending_commitments()
|
||||
if unmined_transactions:
|
||||
self.__handle_replacement_commitment(current_block_number=current_block_number)
|
||||
# while there are known pending transactions, remain in fast interval mode
|
||||
self._tracking_task.interval = self.INTERVAL_FLOOR
|
||||
return # This cycle is finished.
|
||||
else:
|
||||
# Randomize the next task interval over time, within bounds.
|
||||
self._tracking_task.interval = self.random_interval()
|
||||
|
||||
# TODO: #1515 Shut down at end of terminal stake
|
||||
# Update on-chain status
|
||||
self.log.info(f"Checking for new period. Current period is {self.__current_period}")
|
||||
onchain_period = self.staking_agent.get_current_period() # < -- Read from contract
|
||||
if self.current_period != onchain_period:
|
||||
self.__current_period = onchain_period
|
||||
# self.worker.stakes.refresh() # TODO: #1517 Track stakes for fast access to terminal period.
|
||||
|
||||
# TODO: #1515 and #1517 - Shut down at end of terminal stake
|
||||
# This slows down tests substantially and adds additional
|
||||
# RPC calls, but might be acceptable in production
|
||||
# self.worker.stakes.refresh()
|
||||
|
||||
# Measure working interval
|
||||
interval = onchain_period - self.worker.last_committed_period
|
||||
if interval < 0:
|
||||
return # No need to commit to this period. Save the gas.
|
||||
if interval > 0:
|
||||
# TODO: #1516 Follow-up actions for downtime
|
||||
# TODO: #1516 Follow-up actions for missed commitments
|
||||
self.log.warn(f"MISSED COMMITMENTS - {interval} missed staking commitments detected.")
|
||||
|
||||
# Only perform work this round if the requirements are met
|
||||
if not self.__check_work_requirement():
|
||||
if not self.__work_requirement_is_satisfied():
|
||||
self.log.warn(f'COMMIT PREVENTED (callable: "{self.__requirement.__name__}") - '
|
||||
f'There are unmet commit requirements.')
|
||||
# TODO: Follow-up actions for downtime
|
||||
# TODO: Follow-up actions for failed requirements
|
||||
return
|
||||
|
||||
# Make a Commitment
|
||||
self.log.info("Made a commitment to period {}".format(self.current_period))
|
||||
txhash = self.__fire_commitment()
|
||||
self.__pending[current_block_number] = txhash
|
||||
|
||||
def __fire_commitment(self):
|
||||
"""Makes an initial/replacement worker commitment transaction"""
|
||||
transacting_power = self.worker.transacting_power
|
||||
with transacting_power:
|
||||
self.worker.commit_to_next_period(fire_and_forget=True) # < --- blockchain WRITE | Do not wait for receipt
|
||||
txhash = self.worker.commit_to_next_period(fire_and_forget=True) # < --- blockchain WRITE
|
||||
self.log.info(f"Making a commitment to period {self.current_period} - TxHash: {txhash}")
|
||||
return txhash
|
||||
|
||||
|
||||
class StakeList(UserList):
|
||||
|
|
|
@ -1363,7 +1363,8 @@ class Teacher:
|
|||
"fleet_state": node.fleet_state_checksum or 'unknown',
|
||||
"fleet_state_icon": fleet_icon,
|
||||
"domain": node.serving_domain,
|
||||
'version': nucypher.__version__}
|
||||
'version': nucypher.__version__
|
||||
}
|
||||
return payload
|
||||
|
||||
def abridged_node_details(self) -> dict:
|
||||
|
|
|
@ -99,22 +99,5 @@ class UpvestGasPriceDatafeed(EthereumGasPriceDatafeed):
|
|||
self.gas_prices = {k: int(Web3.toWei(v, 'gwei')) for k, v in self._raw_data['estimates'].items()}
|
||||
|
||||
|
||||
def datafeed_fallback_gas_price_strategy(web3: Web3, transaction_params: TxParams = None) -> Wei:
|
||||
feeds = (EtherchainGasPriceDatafeed, UpvestGasPriceDatafeed)
|
||||
|
||||
for gas_price_feed_class in feeds:
|
||||
try:
|
||||
gas_strategy = gas_price_feed_class.construct_gas_strategy()
|
||||
gas_price = gas_strategy(web3, transaction_params)
|
||||
except Datafeed.DatafeedError:
|
||||
continue
|
||||
else:
|
||||
return gas_price
|
||||
else:
|
||||
# Worst-case scenario, we get the price from the ETH node itself
|
||||
return rpc_gas_price_strategy(web3, transaction_params)
|
||||
|
||||
|
||||
|
||||
# TODO: We can implement here other datafeeds, like the ETH/USD (e.g., https://api.coinmarketcap.com/v1/ticker/ethereum/)
|
||||
# suggested in a comment in nucypher.blockchain.eth.interfaces.BlockchainInterface#sign_and_broadcast_transaction
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
"""
|
||||
This file is part of nucypher.
|
||||
|
||||
nucypher is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
nucypher is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
import datetime
|
||||
import functools
|
||||
from typing import Callable
|
||||
|
||||
from web3 import Web3
|
||||
from web3.exceptions import ValidationError
|
||||
from web3.gas_strategies import time_based
|
||||
from web3.gas_strategies.rpc import rpc_gas_price_strategy
|
||||
from web3.types import Wei, TxParams
|
||||
|
||||
from nucypher.utilities.datafeeds import Datafeed, EtherchainGasPriceDatafeed, UpvestGasPriceDatafeed
|
||||
|
||||
|
||||
class GasStrategyError(RuntimeError):
|
||||
"""
|
||||
Generic exception when retrieving a gas price using a gas strategy
|
||||
"""
|
||||
|
||||
#
|
||||
# Datafeed gas strategies
|
||||
#
|
||||
|
||||
|
||||
def datafeed_fallback_gas_price_strategy(web3: Web3, transaction_params: TxParams = None) -> Wei:
|
||||
feeds = (EtherchainGasPriceDatafeed, UpvestGasPriceDatafeed)
|
||||
|
||||
for gas_price_feed_class in feeds:
|
||||
try:
|
||||
gas_strategy = gas_price_feed_class.construct_gas_strategy()
|
||||
gas_price = gas_strategy(web3, transaction_params)
|
||||
except Datafeed.DatafeedError:
|
||||
continue
|
||||
else:
|
||||
return gas_price
|
||||
else:
|
||||
# Worst-case scenario, we get the price from the ETH node itself
|
||||
return rpc_gas_price_strategy(web3, transaction_params)
|
||||
|
||||
|
||||
#
|
||||
# Web3 gas strategies
|
||||
#
|
||||
|
||||
__RAW_WEB3_GAS_STRATEGIES = {
|
||||
'slow': time_based.slow_gas_price_strategy, # 1h
|
||||
'medium': time_based.medium_gas_price_strategy, # 5m
|
||||
'fast': time_based.fast_gas_price_strategy # 60s
|
||||
}
|
||||
|
||||
|
||||
def wrap_web3_gas_strategy(web3_gas_strategy: Callable):
|
||||
"""
|
||||
Enriches the web3 exceptions thrown by gas strategies
|
||||
"""
|
||||
@functools.wraps(web3_gas_strategy)
|
||||
def _wrapper(*args, **kwargs):
|
||||
try:
|
||||
return web3_gas_strategy(*args, **kwargs)
|
||||
except ValidationError as e:
|
||||
raise GasStrategyError("Calling the web3 gas strategy failed, probably due to an unsynced chain.") from e
|
||||
return _wrapper
|
||||
|
||||
|
||||
WEB3_GAS_STRATEGIES = {speed: wrap_web3_gas_strategy(strategy) for speed, strategy in __RAW_WEB3_GAS_STRATEGIES.items()}
|
||||
|
||||
EXPECTED_CONFIRMATION_TIME_IN_SECONDS = {
|
||||
'slow': int(datetime.timedelta(hours=1).total_seconds()),
|
||||
'medium': int(datetime.timedelta(minutes=5).total_seconds()),
|
||||
'fast': 60
|
||||
}
|
|
@ -15,10 +15,11 @@
|
|||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
import pytest_twisted
|
||||
from twisted.internet import threads
|
||||
from twisted.internet.task import Clock
|
||||
from web3.middleware.simulate_unmined_transaction import unmined_receipt_simulator_middleware
|
||||
|
||||
from nucypher.blockchain.eth.actors import Worker
|
||||
from nucypher.blockchain.eth.token import NU, WorkTracker
|
||||
|
@ -26,7 +27,6 @@ from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
|
|||
from tests.utils.ursula import make_decentralized_ursulas, start_pytest_ursula_services
|
||||
|
||||
|
||||
|
||||
@pytest_twisted.inlineCallbacks
|
||||
def test_worker_auto_commitments(mocker,
|
||||
testerchain,
|
||||
|
@ -53,6 +53,7 @@ def test_worker_auto_commitments(mocker,
|
|||
staker.bond_worker(worker_address=worker_address)
|
||||
|
||||
commit_spy = mocker.spy(Worker, 'commit_to_next_period')
|
||||
replacement_spy = mocker.spy(WorkTracker, '_WorkTracker__fire_replacement_commitment')
|
||||
|
||||
# Make the Worker
|
||||
ursula = make_decentralized_ursulas(ursula_config=ursula_decentralized_test_config,
|
||||
|
@ -64,25 +65,76 @@ def test_worker_auto_commitments(mocker,
|
|||
initial_period = staker.staking_agent.get_current_period()
|
||||
|
||||
def start():
|
||||
# Start running the worker
|
||||
print("Starting Worker for auto-commitment simulation")
|
||||
start_pytest_ursula_services(ursula=ursula)
|
||||
|
||||
def time_travel(_):
|
||||
def advance_one_period(_):
|
||||
print('Advancing one period')
|
||||
testerchain.time_travel(periods=1)
|
||||
clock.advance(WorkTracker.INTERVAL_CEIL + 1)
|
||||
return clock
|
||||
|
||||
def verify(clock):
|
||||
def pending_commitments(_):
|
||||
print('Starting unmined transaction simulation')
|
||||
testerchain.client.add_middleware(unmined_receipt_simulator_middleware)
|
||||
|
||||
def advance_one_cycle(_):
|
||||
print('Advancing one tracking iteration')
|
||||
clock.advance(ursula.work_tracker._tracking_task.interval + 1)
|
||||
|
||||
def advance_until_replacement_indicated(_):
|
||||
print("Advancing until replacement is indicated")
|
||||
testerchain.time_travel(periods=1)
|
||||
clock.advance(WorkTracker.INTERVAL_CEIL + 1)
|
||||
mocker.patch.object(WorkTracker, 'max_confirmation_time', return_value=1.0)
|
||||
clock.advance(ursula.work_tracker.max_confirmation_time() + 1)
|
||||
|
||||
def verify_unmined_commitment(_):
|
||||
print('Verifying worker has unmined commitment transaction')
|
||||
assert len(ursula.work_tracker.pending) == 1
|
||||
current_period = staker.staking_agent.get_current_period()
|
||||
assert commit_spy.call_count == current_period - initial_period + 1
|
||||
|
||||
def verify_replacement_commitment(_):
|
||||
print('Verifying worker has replaced commitment transaction')
|
||||
assert len(ursula.work_tracker.pending) == 1
|
||||
assert replacement_spy.call_count > 0
|
||||
|
||||
def verify_confirmed(_):
|
||||
print('Verifying worker made a commitments')
|
||||
# Verify that periods were committed on-chain automatically
|
||||
last_committed_period = staker.staking_agent.get_last_committed_period(staker_address=staker.checksum_address)
|
||||
current_period = staker.staking_agent.get_current_period()
|
||||
assert (last_committed_period - current_period) == 1
|
||||
assert commit_spy.call_count == current_period - initial_period + 1
|
||||
assert replacement_spy.call_count == 0
|
||||
|
||||
# Run the callbacks
|
||||
|
||||
# Behavioural Test, like a screenplay made of legos
|
||||
|
||||
# Ursula commits on startup
|
||||
d = threads.deferToThread(start)
|
||||
d.addCallback(verify)
|
||||
for i in range(5):
|
||||
d.addCallback(time_travel)
|
||||
d.addCallback(verify)
|
||||
d.addCallback(verify_confirmed)
|
||||
|
||||
# Ursula commits for 3 periods with no problem
|
||||
for i in range(3):
|
||||
d.addCallback(advance_one_period)
|
||||
d.addCallback(verify_confirmed)
|
||||
|
||||
# Introduce unmined transactions
|
||||
d.addCallback(pending_commitments)
|
||||
|
||||
# Ursula's commitment transaction gets stuck
|
||||
for i in range(4):
|
||||
d.addCallback(advance_one_cycle)
|
||||
d.addCallback(verify_unmined_commitment)
|
||||
|
||||
# Ursula recovers from this situation
|
||||
d.addCallback(advance_one_cycle)
|
||||
d.addCallback(verify_confirmed)
|
||||
|
||||
# but it happens again, resulting in a replacement transaction
|
||||
d.addCallback(advance_until_replacement_indicated)
|
||||
d.addCallback(advance_one_cycle)
|
||||
d.addCallback(verify_replacement_commitment)
|
||||
|
||||
yield d
|
||||
|
|
|
@ -535,6 +535,14 @@ def testerchain(_testerchain) -> TesterBlockchain:
|
|||
yield testerchain
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def _mock_testerchain() -> MockBlockchain:
|
||||
BlockchainInterfaceFactory._interfaces = dict()
|
||||
testerchain = _make_testerchain(mock_backend=True)
|
||||
BlockchainInterfaceFactory.register_interface(interface=testerchain)
|
||||
yield testerchain
|
||||
|
||||
|
||||
def _make_agency(testerchain,
|
||||
test_registry,
|
||||
token_economics) -> Tuple[NucypherTokenAgent, StakingEscrowAgent, PolicyManagerAgent]:
|
||||
|
|
|
@ -241,7 +241,7 @@ def test_bob_retrieves_too_late(federated_bob, federated_ursulas,
|
|||
urs._datastore_pruning_task.clock = clock
|
||||
urs._datastore_pruning_task.start(interval=Ursula._pruning_interval)
|
||||
|
||||
clock.advance(86400 * 7) # 1 week
|
||||
clock.advance(86400 * 8) # 1 week # TODO: this is supposed to be seven days, not eight
|
||||
|
||||
enrico = capsule_side_channel.enrico
|
||||
message_kit = capsule_side_channel()
|
||||
|
|
|
@ -33,7 +33,7 @@ from nucypher.blockchain.eth.agents import (
|
|||
StakingEscrowAgent,
|
||||
WorkLockAgent
|
||||
)
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterface, BlockchainInterfaceFactory
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterface
|
||||
from nucypher.blockchain.eth.registry import InMemoryContractRegistry
|
||||
from nucypher.blockchain.eth.signers import KeystoreSigner
|
||||
from nucypher.config.characters import UrsulaConfiguration, StakeHolderConfiguration
|
||||
|
@ -43,7 +43,7 @@ from tests.constants import (
|
|||
MOCK_PROVIDER_URI,
|
||||
NUMBER_OF_MOCK_KEYSTORE_ACCOUNTS
|
||||
)
|
||||
from tests.fixtures import _make_testerchain, make_token_economics
|
||||
from tests.fixtures import make_token_economics
|
||||
from tests.mock.agents import MockContractAgency, MockContractAgent
|
||||
from tests.mock.interfaces import MockBlockchain, mock_registry_source_manager
|
||||
from tests.mock.io import MockStdinWrapper
|
||||
|
@ -171,11 +171,8 @@ def mock_stdin(mocker):
|
|||
|
||||
|
||||
@pytest.fixture(scope='module', autouse=True)
|
||||
def mock_testerchain() -> MockBlockchain:
|
||||
BlockchainInterfaceFactory._interfaces = dict()
|
||||
testerchain = _make_testerchain(mock_backend=True)
|
||||
BlockchainInterfaceFactory.register_interface(interface=testerchain)
|
||||
yield testerchain
|
||||
def mock_testerchain(_mock_testerchain) -> MockBlockchain:
|
||||
yield _mock_testerchain
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
|
|
|
@ -27,10 +27,14 @@ from nucypher.blockchain.eth.agents import Agent, ContractAgency, EthereumContra
|
|||
from nucypher.blockchain.eth.constants import NULL_ADDRESS
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory
|
||||
from tests.constants import MOCK_PROVIDER_URI
|
||||
from tests.utils.blockchain import free_gas_price_strategy
|
||||
from tests.mock.interfaces import MockBlockchain
|
||||
|
||||
MOCK_TESTERCHAIN = MockBlockchain()
|
||||
CACHED_MOCK_TESTERCHAIN = BlockchainInterfaceFactory.CachedInterface(interface=MOCK_TESTERCHAIN,
|
||||
sync=False,
|
||||
emitter=None)
|
||||
BlockchainInterfaceFactory._interfaces[MOCK_PROVIDER_URI] = CACHED_MOCK_TESTERCHAIN
|
||||
|
||||
MOCK_TESTERCHAIN = BlockchainInterfaceFactory.get_or_create_interface(provider_uri=MOCK_PROVIDER_URI,
|
||||
gas_strategy=free_gas_price_strategy)
|
||||
CURRENT_BLOCK = MOCK_TESTERCHAIN.w3.eth.getBlock('latest')
|
||||
|
||||
|
||||
|
|
|
@ -14,20 +14,27 @@
|
|||
You should have received a copy of the GNU Affero General Public License
|
||||
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from web3.gas_strategies import time_based
|
||||
|
||||
from constant_sorrow.constants import ALL_OF_THEM
|
||||
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterface
|
||||
from nucypher.utilities.gas_strategies import WEB3_GAS_STRATEGIES
|
||||
from tests.mock.interfaces import MockBlockchain
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def mock_testerchain(_mock_testerchain) -> MockBlockchain:
|
||||
testerchain = _mock_testerchain
|
||||
yield testerchain
|
||||
|
||||
|
||||
def test_get_gas_strategy():
|
||||
|
||||
# Testing Web3's bundled time-based gas strategies
|
||||
bundled_gas_strategies = {'glacial': time_based.glacial_gas_price_strategy, # 24h
|
||||
'slow': time_based.slow_gas_price_strategy, # 1h
|
||||
'medium': time_based.medium_gas_price_strategy, # 5m
|
||||
'fast': time_based.fast_gas_price_strategy # 60s
|
||||
}
|
||||
for gas_strategy_name, expected_gas_strategy in bundled_gas_strategies.items():
|
||||
for gas_strategy_name, expected_gas_strategy in WEB3_GAS_STRATEGIES.items():
|
||||
gas_strategy = BlockchainInterface.get_gas_strategy(gas_strategy_name)
|
||||
assert expected_gas_strategy == gas_strategy
|
||||
assert callable(gas_strategy)
|
||||
|
@ -38,6 +45,81 @@ def test_get_gas_strategy():
|
|||
|
||||
# Passing None should retrieve the default gas strategy
|
||||
assert BlockchainInterface.DEFAULT_GAS_STRATEGY == 'fast'
|
||||
default = bundled_gas_strategies[BlockchainInterface.DEFAULT_GAS_STRATEGY]
|
||||
default = WEB3_GAS_STRATEGIES[BlockchainInterface.DEFAULT_GAS_STRATEGY]
|
||||
gas_strategy = BlockchainInterface.get_gas_strategy()
|
||||
assert default == gas_strategy
|
||||
|
||||
|
||||
def test_use_pending_nonce_when_building_payload(mock_testerchain, mocker):
|
||||
sender = mock_testerchain.unassigned_accounts[0]
|
||||
|
||||
# Mock transaction count retrieval
|
||||
transaction_count = dict(latest=0, pending=0)
|
||||
|
||||
def mock_get_transaction_count(sender, block_identifier) -> int:
|
||||
return transaction_count[block_identifier]
|
||||
|
||||
mock_testerchain.client.w3.eth.getTransactionCount = mocker.Mock(side_effect=mock_get_transaction_count)
|
||||
|
||||
def simulate_successful_transaction():
|
||||
transaction_count['pending'] += 1
|
||||
transaction_count['latest'] = transaction_count['pending']
|
||||
|
||||
def simulate_pending_transaction():
|
||||
transaction_count['pending'] += 1
|
||||
|
||||
def simulate_clearing_transactions(how_many: int = ALL_OF_THEM):
|
||||
if how_many == ALL_OF_THEM:
|
||||
transaction_count['latest'] = transaction_count['pending']
|
||||
else:
|
||||
transaction_count['latest'] += how_many
|
||||
|
||||
# Initially, the transaction count is 0, so the computed nonce is 0 in both modes
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=True)
|
||||
assert payload['nonce'] == 0
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=False)
|
||||
assert payload['nonce'] == 0
|
||||
|
||||
# Let's assume we have a successful TX, so next payload should get nonce == 1
|
||||
simulate_successful_transaction()
|
||||
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None)
|
||||
assert payload['nonce'] == 1
|
||||
|
||||
# Let's assume next TX has a low price and when we query the TX count, it's pending.
|
||||
simulate_pending_transaction()
|
||||
|
||||
# Default behavior gets the TX count including pending, so nonce should be 2
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None)
|
||||
assert payload['nonce'] == 2
|
||||
|
||||
# But if we ignore pending, nonce should still be 1
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=False)
|
||||
assert payload['nonce'] == 1
|
||||
|
||||
# Let's fire some pending TXs
|
||||
simulate_pending_transaction()
|
||||
simulate_pending_transaction()
|
||||
simulate_pending_transaction()
|
||||
simulate_pending_transaction()
|
||||
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=True)
|
||||
assert payload['nonce'] == 6
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=False)
|
||||
assert payload['nonce'] == 1
|
||||
|
||||
# One of them gets mined ...
|
||||
simulate_clearing_transactions(how_many=1)
|
||||
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=True)
|
||||
assert payload['nonce'] == 6
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=False)
|
||||
assert payload['nonce'] == 2
|
||||
|
||||
# If all TXs clear up, then nonce should be 6 in both modes
|
||||
simulate_clearing_transactions(how_many=ALL_OF_THEM)
|
||||
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=True)
|
||||
assert payload['nonce'] == 6
|
||||
payload = mock_testerchain.build_payload(sender_address=sender, payload=None, use_pending_nonce=False)
|
||||
assert payload['nonce'] == 6
|
||||
|
|
|
@ -24,9 +24,9 @@ from web3 import Web3
|
|||
from nucypher.utilities.datafeeds import (
|
||||
EtherchainGasPriceDatafeed,
|
||||
Datafeed,
|
||||
datafeed_fallback_gas_price_strategy,
|
||||
UpvestGasPriceDatafeed
|
||||
)
|
||||
from nucypher.utilities.gas_strategies import datafeed_fallback_gas_price_strategy
|
||||
|
||||
etherchain_json = {
|
||||
"safeLow": "99.0",
|
||||
|
@ -144,5 +144,5 @@ def test_datafeed_fallback_gas_price_strategy():
|
|||
side_effect=Datafeed.DatafeedError):
|
||||
with patch('nucypher.utilities.datafeeds.UpvestGasPriceDatafeed._probe_feed',
|
||||
side_effect=Datafeed.DatafeedError):
|
||||
with patch('nucypher.utilities.datafeeds.rpc_gas_price_strategy', side_effect=mock_gas_strategy):
|
||||
with patch('nucypher.utilities.gas_strategies.rpc_gas_price_strategy', side_effect=mock_gas_strategy):
|
||||
assert datafeed_fallback_gas_price_strategy("web3", "tx") == mocked_gas_price
|
||||
|
|
|
@ -32,6 +32,7 @@ from nucypher.blockchain.eth.sol.compile import SolidityCompiler
|
|||
from nucypher.blockchain.eth.token import NU
|
||||
from nucypher.blockchain.eth.utils import epoch_to_period
|
||||
from nucypher.crypto.powers import TransactingPower
|
||||
from nucypher.utilities.gas_strategies import EXPECTED_CONFIRMATION_TIME_IN_SECONDS
|
||||
from nucypher.utilities.logging import Logger
|
||||
|
||||
from tests.constants import (
|
||||
|
@ -110,6 +111,8 @@ class TesterBlockchain(BlockchainDeployerInterface):
|
|||
test_accounts = self._default_test_accounts
|
||||
self.free_transactions = free_transactions
|
||||
|
||||
EXPECTED_CONFIRMATION_TIME_IN_SECONDS['free'] = 5 # Just some upper-limit
|
||||
|
||||
if compiler:
|
||||
TesterBlockchain._compiler = compiler
|
||||
|
||||
|
|
Loading…
Reference in New Issue