Merge pull request #3489 from derekpierre/starvation-protection

Ritual tx starvation protection
v7.3.x
Derek Pierre 2024-04-24 19:14:50 -04:00 committed by GitHub
commit 5c873acb3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 74 additions and 24 deletions

View File

@ -0,0 +1 @@
Prevent ritual tx starvation when broadcast failures occur by proactively removing problematic tx from the ATxM queue, and resubmitting a fresh tx.

View File

@ -385,6 +385,13 @@ class Operator(BaseActor):
f"{tx_type} async tx {tx.id} for DKG ritual# {phase_id.ritual_id} "
f"failed to broadcast {e}; the same tx will be retried"
)
# either multiple retries already completed for recoverable error,
# or simply a non-recoverable error - remove and resubmit
# (analogous action to a node restart of old)
self.coordinator_agent.blockchain.tx_machine.remove_queued_transaction(tx)
# submit a new one
resubmit_tx()
def on_fault(tx: FaultedTx):
# fault means that tx was removed from atxm

View File

@ -170,24 +170,34 @@ def perform_round_1_with_fault_tolerance(
assert len(testerchain.tx_machine.queued) == 1
# broadcast failure callback called
# tx is explicitly removed and resubmitted by callback
original_async_tx.on_broadcast_failure(original_async_tx, Exception())
publish_transcript_call_count += 1 # on_fault should trigger resubmission
assert (
publish_transcript_spy.call_count == publish_transcript_call_count
), "no change"
), "updated call"
assert (
publish_aggregated_transcript_spy.call_count == 0
), "phase 2 method never called"
resubmitted_after_broadcast_failure_async_tx = (
ursula_experiencing_problems.dkg_storage.get_ritual_phase_async_tx(phase_id)
)
assert (
resubmitted_after_broadcast_failure_async_tx is not original_async_tx
), "cache updated with resubmitted tx"
assert len(testerchain.tx_machine.queued) == 1
# on_fault callback called - this should cause a resubmission of tx because
# tx was removed from atxm after faulting
testerchain.tx_machine.remove_queued_transaction(
original_async_tx
resubmitted_after_broadcast_failure_async_tx
) # simulate removal from atxm
assert len(testerchain.tx_machine.queued) == 0
original_async_tx.fault = Fault.ERROR
original_async_tx.error = None
original_async_tx.on_fault(original_async_tx)
resubmitted_after_broadcast_failure_async_tx.fault = Fault.ERROR
resubmitted_after_broadcast_failure_async_tx.error = None
resubmitted_after_broadcast_failure_async_tx.on_fault(
resubmitted_after_broadcast_failure_async_tx
)
publish_transcript_call_count += 1 # on_fault should trigger resubmission
assert (
publish_transcript_spy.call_count == publish_transcript_call_count
@ -200,7 +210,8 @@ def perform_round_1_with_fault_tolerance(
ursula_experiencing_problems.dkg_storage.get_ritual_phase_async_tx(phase_id)
)
assert (
resubmitted_after_fault_async_tx is not original_async_tx
resubmitted_after_fault_async_tx
is not resubmitted_after_broadcast_failure_async_tx
), "cache updated with resubmitted tx"
assert len(testerchain.tx_machine.queued) == 1
@ -346,23 +357,35 @@ def perform_round_2_with_fault_tolerance(
assert len(testerchain.tx_machine.queued) == 1
# broadcast failure callback called
# tx is explicitly removed and resubmitted by callback
original_async_tx.on_broadcast_failure(original_async_tx, Exception())
publish_aggregated_transcript_call_count += (
1 # on_fault should trigger resubmission
)
assert (
publish_aggregated_transcript_spy.call_count
== publish_aggregated_transcript_call_count
), "no change"
), "updated call"
assert publish_transcript_spy.call_count == 0, "phase 1 method never called"
resubmitted_after_broadcast_failure_async_tx = (
ursula_experiencing_problems.dkg_storage.get_ritual_phase_async_tx(phase_id)
)
assert (
resubmitted_after_broadcast_failure_async_tx is not original_async_tx
), "cache updated with resubmitted tx"
assert len(testerchain.tx_machine.queued) == 1
# on_fault callback called - this should cause a resubmission of tx because
# tx was removed from atxm after faulting
testerchain.tx_machine.remove_queued_transaction(
original_async_tx
resubmitted_after_broadcast_failure_async_tx
) # simulate removal from atxm
assert len(testerchain.tx_machine.queued) == 0
original_async_tx.fault = Fault.ERROR
original_async_tx.error = None
original_async_tx.on_fault(original_async_tx)
resubmitted_after_broadcast_failure_async_tx.fault = Fault.ERROR
resubmitted_after_broadcast_failure_async_tx.error = None
resubmitted_after_broadcast_failure_async_tx.on_fault(
resubmitted_after_broadcast_failure_async_tx
)
publish_aggregated_transcript_call_count += (
1 # on_fault should trigger resubmission
)
@ -376,7 +399,8 @@ def perform_round_2_with_fault_tolerance(
ursula_experiencing_problems.dkg_storage.get_ritual_phase_async_tx(phase_id)
)
assert (
resubmitted_after_fault_async_tx is not original_async_tx
resubmitted_after_fault_async_tx
is not resubmitted_after_broadcast_failure_async_tx
), "cache updated with resubmitted tx"
assert len(testerchain.tx_machine.queued) == 1

View File

@ -302,13 +302,6 @@ def test_async_tx_hooks_phase_1(ursula, mocker):
resubmit_call_count = 0
# broadcast failure - just logging
async_tx_hooks.on_broadcast_failure(mock_tx, Exception("test"))
assert mock_publish_transcript.call_count == 0
assert (
mock_publish_aggregated_transcript.call_count == 0
), "phase 2 publish never called"
# broadcast - just logging
mock_tx.txhash = MockBlockchain.FAKE_TX_HASH
async_tx_hooks.on_broadcast(mock_tx)
@ -329,6 +322,15 @@ def test_async_tx_hooks_phase_1(ursula, mocker):
#
mocker.patch.object(ursula, "_is_phase_1_action_required", return_value=True)
# broadcast failure
async_tx_hooks.on_broadcast_failure(mock_tx, Exception("test"))
resubmit_call_count += 1
assert mock_publish_transcript.call_count == resubmit_call_count, "tx resubmitted"
mock_publish_transcript.assert_called_with(ritual_id, transcript)
assert (
mock_publish_aggregated_transcript.call_count == 0
), "phase 2 publish never called"
# fault
mock_tx.fault = Fault.ERROR
mock_tx.error = "fault error"
@ -372,6 +374,11 @@ def test_async_tx_hooks_phase_1(ursula, mocker):
mocker.patch.object(ursula, "_is_phase_1_action_required", return_value=False)
current_call_count = mock_publish_transcript.call_count
async_tx_hooks.on_broadcast_failure(mock_tx, Exception("test"))
assert (
mock_publish_transcript.call_count == current_call_count
), "no action needed, so not called"
async_tx_hooks.on_fault(mock_tx)
assert (
mock_publish_transcript.call_count == current_call_count
@ -413,11 +420,6 @@ def test_async_tx_hooks_phase_2(ursula, mocker, aggregated_transcript, dkg_publi
resubmit_call_count = 0
# broadcast failure - just logging
async_tx_hooks.on_broadcast_failure(mock_tx, Exception("test"))
assert mock_publish_transcript.call_count == 0, "phase 1 publish never called"
assert mock_publish_aggregated_transcript.call_count == 0
# broadcast - just logging
mock_tx.txhash = MockBlockchain.FAKE_TX_HASH
async_tx_hooks.on_broadcast(mock_tx)
@ -438,6 +440,17 @@ def test_async_tx_hooks_phase_2(ursula, mocker, aggregated_transcript, dkg_publi
#
mocker.patch.object(ursula, "_is_phase_2_action_required", return_value=True)
# broadcast failure
async_tx_hooks.on_broadcast_failure(mock_tx, Exception("test"))
resubmit_call_count += 1
assert (
mock_publish_aggregated_transcript.call_count == resubmit_call_count
), "tx resubmitted"
mock_publish_aggregated_transcript.assert_called_with(
ritual_id, aggregated_transcript, public_key
)
assert mock_publish_transcript.call_count == 0, "phase 1 publish never called"
# fault
mock_tx.fault = Fault.TIMEOUT
mock_tx.error = "fault error"
@ -483,6 +496,11 @@ def test_async_tx_hooks_phase_2(ursula, mocker, aggregated_transcript, dkg_publi
mocker.patch.object(ursula, "_is_phase_2_action_required", return_value=False)
current_call_count = mock_publish_transcript.call_count
async_tx_hooks.on_broadcast_failure(mock_tx, Exception("test"))
assert (
mock_publish_transcript.call_count == current_call_count
), "no action needed, so not called"
async_tx_hooks.on_fault(mock_tx)
assert (
mock_publish_transcript.call_count == current_call_count