Ensure that prometheus collectors do not hold references to contract agents.

Update cached from_block for EventMetricsCollectors before running potentially long running execution task to improve concurrency handling.
pull/2785/head
derekpierre 2021-08-26 12:23:59 -04:00
parent bcd6071e3d
commit 0d3388f7d7
2 changed files with 56 additions and 38 deletions

View File

@ -33,7 +33,7 @@ from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory
from nucypher.blockchain.eth.registry import BaseContractRegistry
from nucypher.datastore.queries import get_policy_arrangements, get_work_orders
from typing import Dict, Union
from typing import Dict, Union, Type
ContractAgents = Union[StakingEscrowAgent, WorkLockAgent, PolicyManagerAgent]
@ -291,13 +291,16 @@ class EventMetricsCollector(BaseMetricsCollector):
event_name: str,
event_args_config: Dict[str, tuple],
argument_filters: Dict[str, str],
contract_agent: ContractAgents):
contract_agent_class: Type[ContractAgents],
contract_registry: BaseContractRegistry):
super().__init__()
self.event_name = event_name
self.contract_agent = contract_agent
self.contract_agent_class = contract_agent_class
self.contract_registry = contract_registry
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
# this way we don't have to deal with 'latest' at all
self.filter_current_from_block = self.contract_agent.blockchain.client.block_number
self.filter_current_from_block = contract_agent.blockchain.client.block_number
self.filter_arguments = argument_filters
self.event_args_config = event_args_config
@ -309,14 +312,19 @@ class EventMetricsCollector(BaseMetricsCollector):
self.metrics[metric_key] = metric_class(metric_name, metric_doc, registry=registry)
def _collect_internal(self) -> None:
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
from_block = self.filter_current_from_block
to_block = self.contract_agent.blockchain.client.block_number
to_block = contract_agent.blockchain.client.block_number
if from_block >= to_block:
# we've already checked the latest block and waiting for a new block
# nothing to see here
return
events_throttler = ContractEventsThrottler(agent=self.contract_agent,
# update last block checked for the next round - from/to block range is inclusive
# increment before potentially long running execution to improve concurrency handling
self.filter_current_from_block = to_block + 1
events_throttler = ContractEventsThrottler(agent=contract_agent,
event_name=self.event_name,
from_block=from_block,
to_block=to_block,
@ -324,9 +332,6 @@ class EventMetricsCollector(BaseMetricsCollector):
for event_record in events_throttler:
self._event_occurred(event_record.raw_event)
# update last block checked for the next round - from/to block range is inclusive
self.filter_current_from_block = to_block + 1
def _event_occurred(self, event) -> None:
for arg_name in self.event_args_config:
metric_key = self._get_arg_metric_key(arg_name)
@ -347,25 +352,25 @@ class CommitmentMadeEventMetricsCollector(EventMetricsCollector):
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
super().initialize(metrics_prefix=metrics_prefix, registry=registry)
missing_commitments = self.contract_agent.get_missing_commitments(checksum_address=self.staker_address)
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
missing_commitments = contract_agent.get_missing_commitments(checksum_address=self.staker_address)
if missing_commitments == 0:
# has either already committed to this period or the next period
# use local event filter for initial data
last_committed_period = self.contract_agent.get_last_committed_period(staker_address=self.staker_address)
last_committed_period = contract_agent.get_last_committed_period(staker_address=self.staker_address)
arg_filters = {'staker': self.staker_address, 'period': last_committed_period}
latest_block = self.contract_agent.blockchain.client.block_number
previous_period = self.contract_agent.get_current_period() - 1 # just in case
latest_block = contract_agent.blockchain.client.block_number
previous_period = contract_agent.get_current_period() - 1 # just in case
# we estimate the block number for the previous period to start search from since either
# - commitment made during previous period for current period, OR
# - commitment made during current period for next period
block_number_for_previous_period = estimate_block_number_for_period(
period=previous_period,
seconds_per_period=self.contract_agent.staking_parameters()[1],
seconds_per_period=contract_agent.staking_parameters()[1],
latest_block=latest_block)
events_throttler = ContractEventsThrottler(agent=self.contract_agent,
events_throttler = ContractEventsThrottler(agent=contract_agent,
event_name=self.event_name,
from_block=block_number_for_previous_period,
to_block=latest_block,
@ -386,9 +391,9 @@ class ReStakeEventMetricsCollector(EventMetricsCollector):
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
super().initialize(metrics_prefix=metrics_prefix, registry=registry)
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
metric_key = self._get_arg_metric_key("reStake")
self.metrics[metric_key].set(self.contract_agent.is_restaking(self.staker_address))
self.metrics[metric_key].set(contract_agent.is_restaking(self.staker_address))
class WindDownEventMetricsCollector(EventMetricsCollector):
@ -399,9 +404,9 @@ class WindDownEventMetricsCollector(EventMetricsCollector):
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
super().initialize(metrics_prefix=metrics_prefix, registry=registry)
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
metric_key = self._get_arg_metric_key("windDown")
self.metrics[metric_key].set(self.contract_agent.is_winding_down(self.staker_address))
self.metrics[metric_key].set(contract_agent.is_winding_down(self.staker_address))
class WorkerBondedEventMetricsCollector(EventMetricsCollector):
@ -418,18 +423,20 @@ class WorkerBondedEventMetricsCollector(EventMetricsCollector):
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
super().initialize(metrics_prefix=metrics_prefix, registry=registry)
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
self.metrics["current_worker_is_me_gauge"] = Gauge(f'{metrics_prefix}_current_worker_is_me',
'Current worker is me',
registry=registry)
# set initial value
self.metrics["current_worker_is_me_gauge"].set(
self.contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address)
contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address)
def _event_occurred(self, event) -> None:
super()._event_occurred(event)
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
self.metrics["current_worker_is_me_gauge"].set(
self.contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address)
contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address)
class WorkLockRefundEventMetricsCollector(EventMetricsCollector):
@ -447,4 +454,5 @@ class WorkLockRefundEventMetricsCollector(EventMetricsCollector):
def _event_occurred(self, event) -> None:
super()._event_occurred(event)
self.metrics["worklock_deposited_eth_gauge"].set(self.contract_agent.get_deposited_eth(self.staker_address))
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
self.metrics["worklock_deposited_eth_gauge"].set(contract_agent.get_deposited_eth(self.staker_address))

View File

@ -50,7 +50,7 @@ from typing import List
from twisted.internet import reactor, task
from twisted.web.resource import Resource
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent, PolicyManagerAgent, WorkLockAgent
from nucypher.blockchain.eth.agents import StakingEscrowAgent, PolicyManagerAgent, WorkLockAgent
class PrometheusMetricsConfig:
@ -229,7 +229,6 @@ def create_metrics_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[Met
def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]:
"""Create collectors for staking-related events."""
collectors: List[MetricsCollector] = []
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=ursula.registry)
staker_address = ursula.checksum_address
@ -242,7 +241,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
"period": (Gauge, f'{metrics_prefix}_activity_confirmed_period', 'Commitment made for period')
},
staker_address=staker_address,
contract_agent=staking_agent))
contract_agent_class=StakingEscrowAgent,
contract_registry=ursula.registry
))
# Minted
collectors.append(EventMetricsCollector(
@ -253,7 +254,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
"block_number": (Gauge, f'{metrics_prefix}_mined_block_number', 'Minted block number')
},
argument_filters={'staker': staker_address},
contract_agent=staking_agent))
contract_agent_class=StakingEscrowAgent,
contract_registry=ursula.registry
))
# Slashed
collectors.append(EventMetricsCollector(
@ -265,7 +268,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
'Slashed penalty block number')
},
argument_filters={'staker': staker_address},
contract_agent=staking_agent))
contract_agent_class=StakingEscrowAgent,
contract_registry=ursula.registry
))
# RestakeSet
collectors.append(ReStakeEventMetricsCollector(
@ -273,7 +278,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
"reStake": (Gauge, f'{metrics_prefix}_restaking', 'Restake set')
},
staker_address=staker_address,
contract_agent=staking_agent))
contract_agent_class=StakingEscrowAgent,
contract_registry=ursula.registry
))
# WindDownSet
collectors.append(WindDownEventMetricsCollector(
@ -281,7 +288,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
"windDown": (Gauge, f'{metrics_prefix}_wind_down', 'is windDown')
},
staker_address=staker_address,
contract_agent=staking_agent))
contract_agent_class=StakingEscrowAgent,
contract_registry=ursula.registry
))
# WorkerBonded
collectors.append(WorkerBondedEventMetricsCollector(
@ -291,7 +300,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
},
staker_address=staker_address,
worker_address=ursula.worker_address,
contract_agent=staking_agent))
contract_agent_class=StakingEscrowAgent,
contract_registry=ursula.registry
))
return collectors
@ -299,7 +310,6 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
def create_worklock_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]:
"""Create collectors for worklock-related events."""
collectors: List[MetricsCollector] = []
worklock_agent = ContractAgency.get_agent(WorkLockAgent, registry=ursula.registry)
staker_address = ursula.checksum_address
# Refund
@ -309,7 +319,8 @@ def create_worklock_events_metric_collectors(ursula: 'Ursula', metrics_prefix: s
'Refunded ETH'),
},
staker_address=staker_address,
contract_agent=worklock_agent,
contract_agent_class=WorkLockAgent,
contract_registry=ursula.registry
))
return collectors
@ -317,16 +328,15 @@ def create_worklock_events_metric_collectors(ursula: 'Ursula', metrics_prefix: s
def create_policy_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]:
"""Create collectors for policy-related events."""
collectors: List[MetricsCollector] = []
policy_manager_agent = ContractAgency.get_agent(PolicyManagerAgent, registry=ursula.registry)
# Withdrawn
collectors.append(EventMetricsCollector(
collectors: List[MetricsCollector] = [EventMetricsCollector(
event_name='Withdrawn',
event_args_config={
"value": (Gauge, f'{metrics_prefix}_policy_withdrawn_reward', 'Policy reward')
},
argument_filters={"recipient": ursula.checksum_address},
contract_agent=policy_manager_agent))
contract_agent_class=PolicyManagerAgent,
contract_registry=ursula.registry
)]
return collectors