diff --git a/nucypher/blockchain/eth/trackers/dkg.py b/nucypher/blockchain/eth/trackers/dkg.py index 7721ec159..28aa06932 100644 --- a/nucypher/blockchain/eth/trackers/dkg.py +++ b/nucypher/blockchain/eth/trackers/dkg.py @@ -3,12 +3,14 @@ import os import time from typing import Callable, List, Optional, Tuple +import maya from twisted.internet import threads from web3.datastructures import AttributeDict from nucypher.blockchain.eth import actors from nucypher.blockchain.eth.agents import CoordinatorAgent from nucypher.policy.conditions.utils import camel_case_to_snake +from nucypher.utilities.cache import TTLCache from nucypher.utilities.events import EventScanner, JSONifiedState from nucypher.utilities.logging import Logger from nucypher.utilities.task import SimpleTask @@ -68,6 +70,9 @@ class ActiveRitualTracker: MAX_CHUNK_SIZE = 10000 + # how often to check/purge for expired cached values - 8hrs? + _PARTICIPATION_STATES_PURGE_INTERVAL = 60 * 60 * 8 + class ParticipationState: def __init__( self, @@ -123,7 +128,17 @@ class ActiveRitualTracker: ) self.task = EventScannerTask(scanner=self.scan) - self._participation_states = dict() # { ritual_id -> ParticipationState } + + ritual_timeout = self.coordinator_agent.get_timeout() + # what's the buffer for potentially receiving repeated events - one hour? + cache_ttl = ritual_timeout + (60 * 60) + + self._participation_states = TTLCache( + ttl=cache_ttl + ) # { ritual_id -> ParticipationState } + self._participation_states_next_purge_timestamp = maya.now().add( + seconds=self._PARTICIPATION_STATES_PURGE_INTERVAL + ) @property def provider(self): @@ -228,6 +243,15 @@ class ActiveRitualTracker: return None + def _purge_expired_participation_states_as_needed(self): + # let's check whether we should purge participation states before returning + now = maya.now() + if now > self._participation_states_next_purge_timestamp: + self._participation_states.purge_expired() + self._participation_states_next_purge_timestamp = now.add( + seconds=self._PARTICIPATION_STATES_PURGE_INTERVAL + ) + def _get_participation_state_values_from_contract( self, ritual_id: int ) -> Tuple[bool, bool, bool]: @@ -253,6 +277,8 @@ class ActiveRitualTracker: Returns the current participation state of the Ritualist as it pertains to the ritual associated with the provided event. """ + self._purge_expired_participation_states_as_needed() + event_type = getattr(self.contract.events, event.event) if event_type not in self.events: # should never happen since we specify the list of events we @@ -269,7 +295,7 @@ class ActiveRitualTracker: f"Unexpected event type: '{event_type}' has no ritual id as argument" ) - participation_state = self._participation_states.get(ritual_id) + participation_state = self._participation_states[ritual_id] if not participation_state: # not previously tracked; get current state and return # need to determine if participating in this ritual or not diff --git a/tests/integration/blockchain/test_ritual_tracker.py b/tests/integration/blockchain/test_ritual_tracker.py index 77e08a448..66120bfbd 100644 --- a/tests/integration/blockchain/test_ritual_tracker.py +++ b/tests/integration/blockchain/test_ritual_tracker.py @@ -21,6 +21,7 @@ def ritualist(ursulas, mock_coordinator_agent) -> Ritualist: ursula = ursulas[0] mocked_agent = Mock(spec=CoordinatorAgent) mocked_agent.contract = mock_coordinator_agent.contract + mocked_agent.get_timeout.return_value = 60 # 60s mocked_blockchain = Mock() mocked_agent.blockchain = mocked_blockchain mocked_w3 = Mock()