mirror of https://github.com/nucypher/nucypher.git
Integrate internal commitment tx tracking and conssistency check; Breakout work tracking taks into multiple sub-functions
parent
5410aecc7c
commit
0522777c33
|
@ -384,6 +384,9 @@ class EthereumClient:
|
|||
def get_transaction(self, transaction_hash) -> dict:
|
||||
return self.w3.eth.getTransaction(transaction_hash)
|
||||
|
||||
def get_transaction_receipt(self, transaction_hash) -> Union[dict, None]:
|
||||
return self.w3.eth.getTransactionReceipt(transaction_hash)
|
||||
|
||||
def get_transaction_count(self, account: str, pending: bool) -> int:
|
||||
block_identifier = 'pending' if pending else 'latest'
|
||||
return self.w3.eth.getTransactionCount(account, block_identifier)
|
||||
|
|
|
@ -627,85 +627,89 @@ class WorkTracker:
|
|||
def pending(self) -> Dict[int, HexBytes]:
|
||||
return self.__pending.copy()
|
||||
|
||||
def __track_pending_commitments(self) -> bool:
|
||||
|
||||
worker_address = self.worker.transacting_power.account # FIXME: Why is worker_address private in Worker?
|
||||
def __tracking_consistency_check(self) -> bool:
|
||||
worker_address = self.worker.worker_address
|
||||
tx_count_pending = self.client.get_transaction_count(account=worker_address, pending=True)
|
||||
tx_count_latest = self.client.get_transaction_count(account=worker_address, pending=False)
|
||||
|
||||
txs_in_mempool = tx_count_pending - tx_count_latest
|
||||
|
||||
if txs_in_mempool > len(self.__pending): # We're not tracking all pending TXs
|
||||
tx_tracker_is_ok = False
|
||||
elif txs_in_mempool < len(self.__pending): # Our tracking is somehow outdated
|
||||
tx_tracker_is_ok = False # TODO: Not sure what to do in this case, but let's do this for the moment
|
||||
else:
|
||||
tx_tracker_is_ok = True
|
||||
return tx_tracker_is_ok
|
||||
|
||||
if not tx_tracker_is_ok:
|
||||
# If we detect there's a mismatch between the number of tracked and existing pending transactions,
|
||||
# we create a special pending TX that accounts for this.
|
||||
self.__pending[0] = UNTRACKED_PENDING_TRANSACTION
|
||||
return True
|
||||
|
||||
if not self.__pending:
|
||||
return False
|
||||
|
||||
def __track_pending_commitments(self) -> bool:
|
||||
# TODO: Keep a purpose-built persistent log of worker transaction history
|
||||
unmined_transactions = list()
|
||||
pending_transactions = self.pending.items() # note: this must be performed non-mutatively
|
||||
|
||||
for tx_firing_block_number, txhash in sorted(pending_transactions):
|
||||
try:
|
||||
confirmed_tx = self.client.get_transaction(transaction_hash=txhash)
|
||||
confirmed_tx_receipt = self.client.get_transaction_receipt(transaction_hash=txhash)
|
||||
except TransactionNotFound:
|
||||
unmined_transactions.append(txhash) # mark as unmined - Keep tracking it for now
|
||||
continue
|
||||
else:
|
||||
confirmation_block_number = confirmed_tx['blockNumber']
|
||||
confirmation_block_number = confirmed_tx_receipt['blockNumber']
|
||||
confirmations = confirmation_block_number - tx_firing_block_number
|
||||
self.log.info(f'Commitment transaction {txhash.hex()[:10]} confirmed: {confirmations} confirmations')
|
||||
del self.__pending[tx_firing_block_number]
|
||||
if unmined_transactions:
|
||||
pluralize = "s" if len(unmined_transactions) > 1 else ""
|
||||
self.log.info(f'{len(unmined_transactions)} pending commitment transaction{pluralize} detected.')
|
||||
|
||||
tx_tracker_is_ok = self.__tracking_consistency_check
|
||||
if not tx_tracker_is_ok:
|
||||
# If we detect there's a mismatch between the number of internally tracked and
|
||||
# pending block transactions, create a special pending TX that accounts for this.
|
||||
self.__pending[0] = UNTRACKED_PENDING_TRANSACTION
|
||||
return True
|
||||
if not self.__pending:
|
||||
return False
|
||||
return bool(unmined_transactions)
|
||||
|
||||
def __handle_replacement_commitment(self, current_block_number: int) -> None:
|
||||
tx_firing_block_number, txhash = list(sorted(self.pending.items()))[0]
|
||||
self.log.info(f'Waiting for pending commitment transaction to be mined ({txhash}).')
|
||||
|
||||
# If the transaction is still not mined after a max confirmation time
|
||||
# (based on current gas strategy) issue a replacement transaction.
|
||||
wait_time_in_blocks = current_block_number - tx_firing_block_number
|
||||
wait_time_in_seconds = wait_time_in_blocks * AVERAGE_BLOCK_TIME_IN_SECONDS
|
||||
if wait_time_in_seconds > self.max_confirmation_time:
|
||||
if txhash is UNTRACKED_PENDING_TRANSACTION:
|
||||
# TODO: Detect if this untracked pending transaction is a commitment transaction at all.
|
||||
message = f"We've an untracked pending transaction. Issuing a replacement transaction."
|
||||
else:
|
||||
message = f"We've waited for {wait_time_in_seconds}, but max time is {self.max_confirmation_time}" \
|
||||
f" for {self.gas_strategy} gas strategy. Issuing a replacement transaction."
|
||||
self.log.info(message)
|
||||
replacement_txhash = self.__fire_commitment()
|
||||
self.__pending[current_block_number] = replacement_txhash # track this transaction
|
||||
del self.__pending[tx_firing_block_number] # Assume our original TX is stuck
|
||||
|
||||
# while there are known pending transactions, remain in fast interval mode
|
||||
self._tracking_task.interval = self.INTERVAL_FLOOR
|
||||
return
|
||||
|
||||
def _do_work(self) -> None:
|
||||
"""Async working task for Ursula"""
|
||||
|
||||
# TODO: Move this to another async task?
|
||||
# TODO: Split into multiple async tasks?
|
||||
|
||||
# Call once here, and inject later for temporal consistency
|
||||
current_block_number = self.client.block_number
|
||||
|
||||
# self-tracking
|
||||
unmined_transactions = self.__track_pending_commitments()
|
||||
if unmined_transactions:
|
||||
tx_firing_block_number, txhash = list(sorted(self.pending.items()))[0]
|
||||
self.log.info(f'Waiting for pending commitment transaction to be mined ({txhash}).')
|
||||
|
||||
# If the transaction is still not mined after a max confirmation time (based on current gas strategy),
|
||||
# issue a replacement transaction.
|
||||
wait_time_in_blocks = current_block_number - tx_firing_block_number
|
||||
wait_time_in_seconds = wait_time_in_blocks * AVERAGE_BLOCK_TIME_IN_SECONDS
|
||||
if wait_time_in_seconds > self.max_confirmation_time:
|
||||
if txhash is UNTRACKED_PENDING_TRANSACTION:
|
||||
message = f"We've an untracked pending transaction. Issuing a replacement transaction."
|
||||
else:
|
||||
message = f"We've waited for {wait_time_in_seconds}, but max time is {self.max_confirmation_time}" \
|
||||
f" for {self.gas_strategy} gas strategy. Issuing a replacement transaction."
|
||||
self.log.info(message)
|
||||
replacement_txhash = self.__fire_commitment()
|
||||
self.__pending[current_block_number] = replacement_txhash # track this transaction
|
||||
del self.__pending[tx_firing_block_number] # Assume our original TX is stuck
|
||||
|
||||
# while there are known pending transactions, remain in fast interval mode
|
||||
self._tracking_task.interval = self.INTERVAL_FLOOR
|
||||
return
|
||||
return self.__handle_replacement_commitment(current_block_number=current_block_number)
|
||||
|
||||
# Randomize the next task interval over time, within bounds.
|
||||
self._tracking_task.interval = self.random_interval()
|
||||
|
||||
# TODO: #1515 Shut down at end of terminal stake
|
||||
# Update on-chain status
|
||||
self.log.info(f"Checking for new period. Current period is {self.__current_period}")
|
||||
onchain_period = self.staking_agent.get_current_period() # < -- Read from contract
|
||||
|
@ -736,7 +740,7 @@ class WorkTracker:
|
|||
self.__pending[current_block_number] = txhash # track this transaction
|
||||
|
||||
def __fire_commitment(self):
|
||||
# Make a Commitment
|
||||
"""Makes an initial/replacement worker commitment transaction"""
|
||||
transacting_power = self.worker.transacting_power
|
||||
with transacting_power:
|
||||
txhash = self.worker.commit_to_next_period(fire_and_forget=True) # < --- blockchain WRITE
|
||||
|
|
Loading…
Reference in New Issue