diff --git a/nucypher/blockchain/eth/clients.py b/nucypher/blockchain/eth/clients.py index 28eb298e6..e96fb81e3 100644 --- a/nucypher/blockchain/eth/clients.py +++ b/nucypher/blockchain/eth/clients.py @@ -384,6 +384,9 @@ class EthereumClient: def get_transaction(self, transaction_hash) -> dict: return self.w3.eth.getTransaction(transaction_hash) + def get_transaction_receipt(self, transaction_hash) -> Union[dict, None]: + return self.w3.eth.getTransactionReceipt(transaction_hash) + def get_transaction_count(self, account: str, pending: bool) -> int: block_identifier = 'pending' if pending else 'latest' return self.w3.eth.getTransactionCount(account, block_identifier) diff --git a/nucypher/blockchain/eth/token.py b/nucypher/blockchain/eth/token.py index 8d52a59bd..e4e86e914 100644 --- a/nucypher/blockchain/eth/token.py +++ b/nucypher/blockchain/eth/token.py @@ -627,85 +627,89 @@ class WorkTracker: def pending(self) -> Dict[int, HexBytes]: return self.__pending.copy() - def __track_pending_commitments(self) -> bool: - - worker_address = self.worker.transacting_power.account # FIXME: Why is worker_address private in Worker? + def __tracking_consistency_check(self) -> bool: + worker_address = self.worker.worker_address tx_count_pending = self.client.get_transaction_count(account=worker_address, pending=True) tx_count_latest = self.client.get_transaction_count(account=worker_address, pending=False) txs_in_mempool = tx_count_pending - tx_count_latest - if txs_in_mempool > len(self.__pending): # We're not tracking all pending TXs tx_tracker_is_ok = False elif txs_in_mempool < len(self.__pending): # Our tracking is somehow outdated tx_tracker_is_ok = False # TODO: Not sure what to do in this case, but let's do this for the moment else: tx_tracker_is_ok = True + return tx_tracker_is_ok - if not tx_tracker_is_ok: - # If we detect there's a mismatch between the number of tracked and existing pending transactions, - # we create a special pending TX that accounts for this. - self.__pending[0] = UNTRACKED_PENDING_TRANSACTION - return True - - if not self.__pending: - return False - + def __track_pending_commitments(self) -> bool: + # TODO: Keep a purpose-built persistent log of worker transaction history unmined_transactions = list() pending_transactions = self.pending.items() # note: this must be performed non-mutatively - for tx_firing_block_number, txhash in sorted(pending_transactions): try: - confirmed_tx = self.client.get_transaction(transaction_hash=txhash) + confirmed_tx_receipt = self.client.get_transaction_receipt(transaction_hash=txhash) except TransactionNotFound: unmined_transactions.append(txhash) # mark as unmined - Keep tracking it for now continue else: - confirmation_block_number = confirmed_tx['blockNumber'] + confirmation_block_number = confirmed_tx_receipt['blockNumber'] confirmations = confirmation_block_number - tx_firing_block_number self.log.info(f'Commitment transaction {txhash.hex()[:10]} confirmed: {confirmations} confirmations') del self.__pending[tx_firing_block_number] if unmined_transactions: pluralize = "s" if len(unmined_transactions) > 1 else "" self.log.info(f'{len(unmined_transactions)} pending commitment transaction{pluralize} detected.') + + tx_tracker_is_ok = self.__tracking_consistency_check + if not tx_tracker_is_ok: + # If we detect there's a mismatch between the number of internally tracked and + # pending block transactions, create a special pending TX that accounts for this. + self.__pending[0] = UNTRACKED_PENDING_TRANSACTION + return True + if not self.__pending: + return False return bool(unmined_transactions) + def __handle_replacement_commitment(self, current_block_number: int) -> None: + tx_firing_block_number, txhash = list(sorted(self.pending.items()))[0] + self.log.info(f'Waiting for pending commitment transaction to be mined ({txhash}).') + + # If the transaction is still not mined after a max confirmation time + # (based on current gas strategy) issue a replacement transaction. + wait_time_in_blocks = current_block_number - tx_firing_block_number + wait_time_in_seconds = wait_time_in_blocks * AVERAGE_BLOCK_TIME_IN_SECONDS + if wait_time_in_seconds > self.max_confirmation_time: + if txhash is UNTRACKED_PENDING_TRANSACTION: + # TODO: Detect if this untracked pending transaction is a commitment transaction at all. + message = f"We've an untracked pending transaction. Issuing a replacement transaction." + else: + message = f"We've waited for {wait_time_in_seconds}, but max time is {self.max_confirmation_time}" \ + f" for {self.gas_strategy} gas strategy. Issuing a replacement transaction." + self.log.info(message) + replacement_txhash = self.__fire_commitment() + self.__pending[current_block_number] = replacement_txhash # track this transaction + del self.__pending[tx_firing_block_number] # Assume our original TX is stuck + + # while there are known pending transactions, remain in fast interval mode + self._tracking_task.interval = self.INTERVAL_FLOOR + return + def _do_work(self) -> None: """Async working task for Ursula""" - # TODO: Move this to another async task? + # TODO: Split into multiple async tasks? + # Call once here, and inject later for temporal consistency current_block_number = self.client.block_number # self-tracking unmined_transactions = self.__track_pending_commitments() if unmined_transactions: - tx_firing_block_number, txhash = list(sorted(self.pending.items()))[0] - self.log.info(f'Waiting for pending commitment transaction to be mined ({txhash}).') - - # If the transaction is still not mined after a max confirmation time (based on current gas strategy), - # issue a replacement transaction. - wait_time_in_blocks = current_block_number - tx_firing_block_number - wait_time_in_seconds = wait_time_in_blocks * AVERAGE_BLOCK_TIME_IN_SECONDS - if wait_time_in_seconds > self.max_confirmation_time: - if txhash is UNTRACKED_PENDING_TRANSACTION: - message = f"We've an untracked pending transaction. Issuing a replacement transaction." - else: - message = f"We've waited for {wait_time_in_seconds}, but max time is {self.max_confirmation_time}" \ - f" for {self.gas_strategy} gas strategy. Issuing a replacement transaction." - self.log.info(message) - replacement_txhash = self.__fire_commitment() - self.__pending[current_block_number] = replacement_txhash # track this transaction - del self.__pending[tx_firing_block_number] # Assume our original TX is stuck - - # while there are known pending transactions, remain in fast interval mode - self._tracking_task.interval = self.INTERVAL_FLOOR - return + return self.__handle_replacement_commitment(current_block_number=current_block_number) # Randomize the next task interval over time, within bounds. self._tracking_task.interval = self.random_interval() - # TODO: #1515 Shut down at end of terminal stake # Update on-chain status self.log.info(f"Checking for new period. Current period is {self.__current_period}") onchain_period = self.staking_agent.get_current_period() # < -- Read from contract @@ -736,7 +740,7 @@ class WorkTracker: self.__pending[current_block_number] = txhash # track this transaction def __fire_commitment(self): - # Make a Commitment + """Makes an initial/replacement worker commitment transaction""" transacting_power = self.worker.transacting_power with transacting_power: txhash = self.worker.commit_to_next_period(fire_and_forget=True) # < --- blockchain WRITE