Update ActiveRitualTracker to use TTLCache for tracking participation states; periodically purge the cache of expired entries no longer needed to be tracked.

pull/3191/head
derekpierre 2023-07-20 11:16:36 -04:00
parent a8f2e49c76
commit 84f85b2049
2 changed files with 29 additions and 2 deletions

View File

@ -3,12 +3,14 @@ import os
import time
from typing import Callable, List, Optional, Tuple
import maya
from twisted.internet import threads
from web3.datastructures import AttributeDict
from nucypher.blockchain.eth import actors
from nucypher.blockchain.eth.agents import CoordinatorAgent
from nucypher.policy.conditions.utils import camel_case_to_snake
from nucypher.utilities.cache import TTLCache
from nucypher.utilities.events import EventScanner, JSONifiedState
from nucypher.utilities.logging import Logger
from nucypher.utilities.task import SimpleTask
@ -68,6 +70,9 @@ class ActiveRitualTracker:
MAX_CHUNK_SIZE = 10000
# how often to check/purge for expired cached values - 8hrs?
_PARTICIPATION_STATES_PURGE_INTERVAL = 60 * 60 * 8
class ParticipationState:
def __init__(
self,
@ -123,7 +128,17 @@ class ActiveRitualTracker:
)
self.task = EventScannerTask(scanner=self.scan)
self._participation_states = dict() # { ritual_id -> ParticipationState }
ritual_timeout = self.coordinator_agent.get_timeout()
# what's the buffer for potentially receiving repeated events - one hour?
cache_ttl = ritual_timeout + (60 * 60)
self._participation_states = TTLCache(
ttl=cache_ttl
) # { ritual_id -> ParticipationState }
self._participation_states_next_purge_timestamp = maya.now().add(
seconds=self._PARTICIPATION_STATES_PURGE_INTERVAL
)
@property
def provider(self):
@ -228,6 +243,15 @@ class ActiveRitualTracker:
return None
def _purge_expired_participation_states_as_needed(self):
# let's check whether we should purge participation states before returning
now = maya.now()
if now > self._participation_states_next_purge_timestamp:
self._participation_states.purge_expired()
self._participation_states_next_purge_timestamp = now.add(
seconds=self._PARTICIPATION_STATES_PURGE_INTERVAL
)
def _get_participation_state_values_from_contract(
self, ritual_id: int
) -> Tuple[bool, bool, bool]:
@ -253,6 +277,8 @@ class ActiveRitualTracker:
Returns the current participation state of the Ritualist as it pertains to
the ritual associated with the provided event.
"""
self._purge_expired_participation_states_as_needed()
event_type = getattr(self.contract.events, event.event)
if event_type not in self.events:
# should never happen since we specify the list of events we
@ -269,7 +295,7 @@ class ActiveRitualTracker:
f"Unexpected event type: '{event_type}' has no ritual id as argument"
)
participation_state = self._participation_states.get(ritual_id)
participation_state = self._participation_states[ritual_id]
if not participation_state:
# not previously tracked; get current state and return
# need to determine if participating in this ritual or not

View File

@ -21,6 +21,7 @@ def ritualist(ursulas, mock_coordinator_agent) -> Ritualist:
ursula = ursulas[0]
mocked_agent = Mock(spec=CoordinatorAgent)
mocked_agent.contract = mock_coordinator_agent.contract
mocked_agent.get_timeout.return_value = 60 # 60s
mocked_blockchain = Mock()
mocked_agent.blockchain = mocked_blockchain
mocked_w3 = Mock()