diff --git a/nucypher/utilities/prometheus/collector.py b/nucypher/utilities/prometheus/collector.py index 8a7a16ccf..6bf311fcc 100644 --- a/nucypher/utilities/prometheus/collector.py +++ b/nucypher/utilities/prometheus/collector.py @@ -33,7 +33,7 @@ from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory from nucypher.blockchain.eth.registry import BaseContractRegistry from nucypher.datastore.queries import get_policy_arrangements, get_work_orders -from typing import Dict, Union +from typing import Dict, Union, Type ContractAgents = Union[StakingEscrowAgent, WorkLockAgent, PolicyManagerAgent] @@ -291,13 +291,16 @@ class EventMetricsCollector(BaseMetricsCollector): event_name: str, event_args_config: Dict[str, tuple], argument_filters: Dict[str, str], - contract_agent: ContractAgents): + contract_agent_class: Type[ContractAgents], + contract_registry: BaseContractRegistry): super().__init__() self.event_name = event_name - self.contract_agent = contract_agent + self.contract_agent_class = contract_agent_class + self.contract_registry = contract_registry + contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) # this way we don't have to deal with 'latest' at all - self.filter_current_from_block = self.contract_agent.blockchain.client.block_number + self.filter_current_from_block = contract_agent.blockchain.client.block_number self.filter_arguments = argument_filters self.event_args_config = event_args_config @@ -309,14 +312,19 @@ class EventMetricsCollector(BaseMetricsCollector): self.metrics[metric_key] = metric_class(metric_name, metric_doc, registry=registry) def _collect_internal(self) -> None: + contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) from_block = self.filter_current_from_block - to_block = self.contract_agent.blockchain.client.block_number + to_block = contract_agent.blockchain.client.block_number if from_block >= to_block: # we've already checked the latest block and waiting for a new block # nothing to see here return - events_throttler = ContractEventsThrottler(agent=self.contract_agent, + # update last block checked for the next round - from/to block range is inclusive + # increment before potentially long running execution to improve concurrency handling + self.filter_current_from_block = to_block + 1 + + events_throttler = ContractEventsThrottler(agent=contract_agent, event_name=self.event_name, from_block=from_block, to_block=to_block, @@ -324,9 +332,6 @@ class EventMetricsCollector(BaseMetricsCollector): for event_record in events_throttler: self._event_occurred(event_record.raw_event) - # update last block checked for the next round - from/to block range is inclusive - self.filter_current_from_block = to_block + 1 - def _event_occurred(self, event) -> None: for arg_name in self.event_args_config: metric_key = self._get_arg_metric_key(arg_name) @@ -347,25 +352,25 @@ class CommitmentMadeEventMetricsCollector(EventMetricsCollector): def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None: super().initialize(metrics_prefix=metrics_prefix, registry=registry) - - missing_commitments = self.contract_agent.get_missing_commitments(checksum_address=self.staker_address) + contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) + missing_commitments = contract_agent.get_missing_commitments(checksum_address=self.staker_address) if missing_commitments == 0: # has either already committed to this period or the next period # use local event filter for initial data - last_committed_period = self.contract_agent.get_last_committed_period(staker_address=self.staker_address) + last_committed_period = contract_agent.get_last_committed_period(staker_address=self.staker_address) arg_filters = {'staker': self.staker_address, 'period': last_committed_period} - latest_block = self.contract_agent.blockchain.client.block_number - previous_period = self.contract_agent.get_current_period() - 1 # just in case + latest_block = contract_agent.blockchain.client.block_number + previous_period = contract_agent.get_current_period() - 1 # just in case # we estimate the block number for the previous period to start search from since either # - commitment made during previous period for current period, OR # - commitment made during current period for next period block_number_for_previous_period = estimate_block_number_for_period( period=previous_period, - seconds_per_period=self.contract_agent.staking_parameters()[1], + seconds_per_period=contract_agent.staking_parameters()[1], latest_block=latest_block) - events_throttler = ContractEventsThrottler(agent=self.contract_agent, + events_throttler = ContractEventsThrottler(agent=contract_agent, event_name=self.event_name, from_block=block_number_for_previous_period, to_block=latest_block, @@ -386,9 +391,9 @@ class ReStakeEventMetricsCollector(EventMetricsCollector): def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None: super().initialize(metrics_prefix=metrics_prefix, registry=registry) - + contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) metric_key = self._get_arg_metric_key("reStake") - self.metrics[metric_key].set(self.contract_agent.is_restaking(self.staker_address)) + self.metrics[metric_key].set(contract_agent.is_restaking(self.staker_address)) class WindDownEventMetricsCollector(EventMetricsCollector): @@ -399,9 +404,9 @@ class WindDownEventMetricsCollector(EventMetricsCollector): def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None: super().initialize(metrics_prefix=metrics_prefix, registry=registry) - + contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) metric_key = self._get_arg_metric_key("windDown") - self.metrics[metric_key].set(self.contract_agent.is_winding_down(self.staker_address)) + self.metrics[metric_key].set(contract_agent.is_winding_down(self.staker_address)) class WorkerBondedEventMetricsCollector(EventMetricsCollector): @@ -418,18 +423,20 @@ class WorkerBondedEventMetricsCollector(EventMetricsCollector): def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None: super().initialize(metrics_prefix=metrics_prefix, registry=registry) + contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) self.metrics["current_worker_is_me_gauge"] = Gauge(f'{metrics_prefix}_current_worker_is_me', 'Current worker is me', registry=registry) # set initial value self.metrics["current_worker_is_me_gauge"].set( - self.contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address) + contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address) def _event_occurred(self, event) -> None: super()._event_occurred(event) + contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) self.metrics["current_worker_is_me_gauge"].set( - self.contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address) + contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address) class WorkLockRefundEventMetricsCollector(EventMetricsCollector): @@ -447,4 +454,5 @@ class WorkLockRefundEventMetricsCollector(EventMetricsCollector): def _event_occurred(self, event) -> None: super()._event_occurred(event) - self.metrics["worklock_deposited_eth_gauge"].set(self.contract_agent.get_deposited_eth(self.staker_address)) + contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) + self.metrics["worklock_deposited_eth_gauge"].set(contract_agent.get_deposited_eth(self.staker_address)) diff --git a/nucypher/utilities/prometheus/metrics.py b/nucypher/utilities/prometheus/metrics.py index c012cb9d6..f2b4af277 100644 --- a/nucypher/utilities/prometheus/metrics.py +++ b/nucypher/utilities/prometheus/metrics.py @@ -50,7 +50,7 @@ from typing import List from twisted.internet import reactor, task from twisted.web.resource import Resource -from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent, PolicyManagerAgent, WorkLockAgent +from nucypher.blockchain.eth.agents import StakingEscrowAgent, PolicyManagerAgent, WorkLockAgent class PrometheusMetricsConfig: @@ -229,7 +229,6 @@ def create_metrics_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[Met def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]: """Create collectors for staking-related events.""" collectors: List[MetricsCollector] = [] - staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=ursula.registry) staker_address = ursula.checksum_address @@ -242,7 +241,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st "period": (Gauge, f'{metrics_prefix}_activity_confirmed_period', 'Commitment made for period') }, staker_address=staker_address, - contract_agent=staking_agent)) + contract_agent_class=StakingEscrowAgent, + contract_registry=ursula.registry + )) # Minted collectors.append(EventMetricsCollector( @@ -253,7 +254,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st "block_number": (Gauge, f'{metrics_prefix}_mined_block_number', 'Minted block number') }, argument_filters={'staker': staker_address}, - contract_agent=staking_agent)) + contract_agent_class=StakingEscrowAgent, + contract_registry=ursula.registry + )) # Slashed collectors.append(EventMetricsCollector( @@ -265,7 +268,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st 'Slashed penalty block number') }, argument_filters={'staker': staker_address}, - contract_agent=staking_agent)) + contract_agent_class=StakingEscrowAgent, + contract_registry=ursula.registry + )) # RestakeSet collectors.append(ReStakeEventMetricsCollector( @@ -273,7 +278,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st "reStake": (Gauge, f'{metrics_prefix}_restaking', 'Restake set') }, staker_address=staker_address, - contract_agent=staking_agent)) + contract_agent_class=StakingEscrowAgent, + contract_registry=ursula.registry + )) # WindDownSet collectors.append(WindDownEventMetricsCollector( @@ -281,7 +288,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st "windDown": (Gauge, f'{metrics_prefix}_wind_down', 'is windDown') }, staker_address=staker_address, - contract_agent=staking_agent)) + contract_agent_class=StakingEscrowAgent, + contract_registry=ursula.registry + )) # WorkerBonded collectors.append(WorkerBondedEventMetricsCollector( @@ -291,7 +300,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st }, staker_address=staker_address, worker_address=ursula.worker_address, - contract_agent=staking_agent)) + contract_agent_class=StakingEscrowAgent, + contract_registry=ursula.registry + )) return collectors @@ -299,7 +310,6 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st def create_worklock_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]: """Create collectors for worklock-related events.""" collectors: List[MetricsCollector] = [] - worklock_agent = ContractAgency.get_agent(WorkLockAgent, registry=ursula.registry) staker_address = ursula.checksum_address # Refund @@ -309,7 +319,8 @@ def create_worklock_events_metric_collectors(ursula: 'Ursula', metrics_prefix: s 'Refunded ETH'), }, staker_address=staker_address, - contract_agent=worklock_agent, + contract_agent_class=WorkLockAgent, + contract_registry=ursula.registry )) return collectors @@ -317,16 +328,15 @@ def create_worklock_events_metric_collectors(ursula: 'Ursula', metrics_prefix: s def create_policy_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]: """Create collectors for policy-related events.""" - collectors: List[MetricsCollector] = [] - policy_manager_agent = ContractAgency.get_agent(PolicyManagerAgent, registry=ursula.registry) - # Withdrawn - collectors.append(EventMetricsCollector( + collectors: List[MetricsCollector] = [EventMetricsCollector( event_name='Withdrawn', event_args_config={ "value": (Gauge, f'{metrics_prefix}_policy_withdrawn_reward', 'Policy reward') }, argument_filters={"recipient": ursula.checksum_address}, - contract_agent=policy_manager_agent)) + contract_agent_class=PolicyManagerAgent, + contract_registry=ursula.registry + )] return collectors