Update Operator to handle the various callbacks used by async transactions for improved fault tolerance when publishing a transcript or aggregated transcript.

Implementations of the callbacks for error cases may resubmit a tx to execute for fault tolerance.
pull/3476/head
derekpierre 2024-04-11 12:37:12 -04:00
parent 4c6b8afcc0
commit c5adbfde98
No known key found for this signature in database
1 changed files with 74 additions and 4 deletions

View File

@ -7,7 +7,8 @@ from decimal import Decimal
from typing import DefaultDict, Dict, List, Optional, Set, Union
import maya
from atxm.tx import AsyncTx
from atxm.exceptions import InsufficientFunds
from atxm.tx import AsyncTx, FaultedTx, FinalizedTx, FutureTx, PendingTx
from eth_typing import ChecksumAddress
from nucypher_core import (
EncryptedThresholdDecryptionRequest,
@ -40,7 +41,10 @@ from nucypher.blockchain.eth.clients import PUBLIC_CHAINS
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.blockchain.eth.decorators import validate_checksum_address
from nucypher.blockchain.eth.domains import TACoDomain
from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory
from nucypher.blockchain.eth.interfaces import (
BlockchainInterface,
BlockchainInterfaceFactory,
)
from nucypher.blockchain.eth.models import PHASE1, PHASE2, Coordinator
from nucypher.blockchain.eth.registry import ContractRegistry
from nucypher.blockchain.eth.signers import Signer
@ -347,13 +351,74 @@ class Operator(BaseActor):
return result
def _setup_async_hooks(self, phase_id: PhaseId, *args):
tx_type = "POST_TRANSCRIPT" if phase_id.phase == PHASE1 else "POST_AGGREGATE"
def resubmit_tx():
if phase_id.phase == PHASE1:
async_tx = self.publish_transcript(*args)
else:
async_tx = self.publish_aggregated_transcript(*args)
self.log.debug(
f"{self.transacting_power.account[:8]} resubmitted a new async tx {async_tx.id} "
f"for DKG ritual #{phase_id.ritual_id}"
)
def on_broadcast_failure(tx: FutureTx, e: Exception):
# although error, tx was not removed from queue
self.log.warn(
f"{tx_type} async tx {tx.id} for DKG ritual# {phase_id.ritual_id} "
f"failed to broadcast {e}; the same tx will be retried"
)
def on_fault(tx: FaultedTx):
# fault means that tx was removed from queue
error = f"({tx.error})" if tx.error else ""
self.log.warn(
f"{tx_type} async tx {tx.id} for DKG ritual# {phase_id.ritual_id} "
f"failed with fault {tx.fault.name}{error}; resubmitting a new one"
)
# submit a new one.
resubmit_tx()
def on_finalized(tx: FinalizedTx):
# finalized means that tx was removed from queue
if not tx.successful:
self.log.warn(
f"{tx_type} async tx {tx.id} for DKG ritual# {phase_id.ritual_id} "
f"was reverted; resubmitting a new one"
)
# submit a new one.
resubmit_tx()
def on_insufficient_funds(tx: Union[FutureTx, PendingTx], e: InsufficientFunds):
# although error, tx was not removed from queue
self.log.error(
f"{tx_type} async tx {tx.id} for DKG ritual# {phase_id.ritual_id} "
f"cannot be executed because {self.transacting_power.account[:8]} "
f"has insufficient funds {e}"
)
async_tx_hooks = BlockchainInterface.AsyncTxHooks(
on_broadcast_failure=on_broadcast_failure,
on_fault=on_fault,
on_finalized=on_finalized,
on_insufficient_funds=on_insufficient_funds,
)
return async_tx_hooks
def publish_transcript(self, ritual_id: int, transcript: Transcript) -> AsyncTx:
identifier = PhaseId(ritual_id, PHASE1)
async_tx_hooks = self._setup_async_hooks(identifier, ritual_id, transcript)
async_tx = self.coordinator_agent.post_transcript(
ritual_id=ritual_id,
transcript=transcript,
transacting_power=self.transacting_power,
async_tx_hooks=async_tx_hooks,
)
identifier = PhaseId(ritual_id, PHASE1)
self.ritual_tracker.active_rituals[identifier] = async_tx
return async_tx
@ -368,14 +433,19 @@ class Operator(BaseActor):
participant_public_key = self.threshold_request_power.get_pubkey_from_ritual_id(
ritual_id
)
identifier = PhaseId(ritual_id=ritual_id, phase=PHASE2)
async_tx_hooks = self._setup_async_hooks(
identifier, ritual_id, aggregated_transcript, public_key
)
async_tx = self.coordinator_agent.post_aggregation(
ritual_id=ritual_id,
aggregated_transcript=aggregated_transcript,
public_key=public_key,
participant_public_key=participant_public_key,
transacting_power=self.transacting_power,
async_tx_hooks=async_tx_hooks,
)
identifier = PhaseId(ritual_id=ritual_id, phase=PHASE2)
self.ritual_tracker.active_rituals[identifier] = async_tx
return async_tx