mirror of https://github.com/nucypher/nucypher.git
commit
170541b189
|
@ -0,0 +1 @@
|
|||
Minor memory improvement when collecting staker/worker metrics for prometheus.
|
|
@ -98,8 +98,8 @@ class ContractEventsThrottler:
|
|||
self.to_block = to_block if to_block is not None else agent.blockchain.client.block_number
|
||||
# validity check of block range
|
||||
if self.to_block < self.from_block:
|
||||
raise ValueError(f"Invalid events block range: to_block {self.to_block} must be greater than or equal "
|
||||
f"to from_block {self.from_block}")
|
||||
raise ValueError(f"Invalid events block range: to_block ({self.to_block}) must be ≥ "
|
||||
f"from_block ({self.from_block})")
|
||||
|
||||
self.max_blocks_per_call = max_blocks_per_call
|
||||
self.argument_filters = argument_filters
|
||||
|
|
|
@ -33,7 +33,7 @@ from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory
|
|||
from nucypher.blockchain.eth.registry import BaseContractRegistry
|
||||
from nucypher.datastore.queries import get_policy_arrangements, get_reencryption_requests
|
||||
|
||||
from typing import Dict, Union
|
||||
from typing import Dict, Union, Type
|
||||
|
||||
ContractAgents = Union[StakingEscrowAgent, WorkLockAgent, PolicyManagerAgent]
|
||||
|
||||
|
@ -293,13 +293,16 @@ class EventMetricsCollector(BaseMetricsCollector):
|
|||
event_name: str,
|
||||
event_args_config: Dict[str, tuple],
|
||||
argument_filters: Dict[str, str],
|
||||
contract_agent: ContractAgents):
|
||||
contract_agent_class: Type[ContractAgents],
|
||||
contract_registry: BaseContractRegistry):
|
||||
super().__init__()
|
||||
self.event_name = event_name
|
||||
self.contract_agent = contract_agent
|
||||
self.contract_agent_class = contract_agent_class
|
||||
self.contract_registry = contract_registry
|
||||
|
||||
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
|
||||
# this way we don't have to deal with 'latest' at all
|
||||
self.filter_current_from_block = self.contract_agent.blockchain.client.block_number
|
||||
self.filter_current_from_block = contract_agent.blockchain.client.block_number
|
||||
self.filter_arguments = argument_filters
|
||||
self.event_args_config = event_args_config
|
||||
|
||||
|
@ -311,14 +314,19 @@ class EventMetricsCollector(BaseMetricsCollector):
|
|||
self.metrics[metric_key] = metric_class(metric_name, metric_doc, registry=registry)
|
||||
|
||||
def _collect_internal(self) -> None:
|
||||
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
|
||||
from_block = self.filter_current_from_block
|
||||
to_block = self.contract_agent.blockchain.client.block_number
|
||||
to_block = contract_agent.blockchain.client.block_number
|
||||
if from_block >= to_block:
|
||||
# we've already checked the latest block and waiting for a new block
|
||||
# nothing to see here
|
||||
return
|
||||
|
||||
events_throttler = ContractEventsThrottler(agent=self.contract_agent,
|
||||
# update last block checked for the next round - from/to block range is inclusive
|
||||
# increment before potentially long running execution to improve concurrency handling
|
||||
self.filter_current_from_block = to_block + 1
|
||||
|
||||
events_throttler = ContractEventsThrottler(agent=contract_agent,
|
||||
event_name=self.event_name,
|
||||
from_block=from_block,
|
||||
to_block=to_block,
|
||||
|
@ -326,9 +334,6 @@ class EventMetricsCollector(BaseMetricsCollector):
|
|||
for event_record in events_throttler:
|
||||
self._event_occurred(event_record.raw_event)
|
||||
|
||||
# update last block checked for the next round - from/to block range is inclusive
|
||||
self.filter_current_from_block = to_block + 1
|
||||
|
||||
def _event_occurred(self, event) -> None:
|
||||
for arg_name in self.event_args_config:
|
||||
metric_key = self._get_arg_metric_key(arg_name)
|
||||
|
@ -349,25 +354,25 @@ class CommitmentMadeEventMetricsCollector(EventMetricsCollector):
|
|||
|
||||
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
|
||||
super().initialize(metrics_prefix=metrics_prefix, registry=registry)
|
||||
|
||||
missing_commitments = self.contract_agent.get_missing_commitments(checksum_address=self.staker_address)
|
||||
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
|
||||
missing_commitments = contract_agent.get_missing_commitments(checksum_address=self.staker_address)
|
||||
if missing_commitments == 0:
|
||||
# has either already committed to this period or the next period
|
||||
|
||||
# use local event filter for initial data
|
||||
last_committed_period = self.contract_agent.get_last_committed_period(staker_address=self.staker_address)
|
||||
last_committed_period = contract_agent.get_last_committed_period(staker_address=self.staker_address)
|
||||
arg_filters = {'staker': self.staker_address, 'period': last_committed_period}
|
||||
latest_block = self.contract_agent.blockchain.client.block_number
|
||||
previous_period = self.contract_agent.get_current_period() - 1 # just in case
|
||||
latest_block = contract_agent.blockchain.client.block_number
|
||||
previous_period = contract_agent.get_current_period() - 1 # just in case
|
||||
# we estimate the block number for the previous period to start search from since either
|
||||
# - commitment made during previous period for current period, OR
|
||||
# - commitment made during current period for next period
|
||||
block_number_for_previous_period = estimate_block_number_for_period(
|
||||
period=previous_period,
|
||||
seconds_per_period=self.contract_agent.staking_parameters()[1],
|
||||
seconds_per_period=contract_agent.staking_parameters()[1],
|
||||
latest_block=latest_block)
|
||||
|
||||
events_throttler = ContractEventsThrottler(agent=self.contract_agent,
|
||||
events_throttler = ContractEventsThrottler(agent=contract_agent,
|
||||
event_name=self.event_name,
|
||||
from_block=block_number_for_previous_period,
|
||||
to_block=latest_block,
|
||||
|
@ -388,9 +393,9 @@ class ReStakeEventMetricsCollector(EventMetricsCollector):
|
|||
|
||||
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
|
||||
super().initialize(metrics_prefix=metrics_prefix, registry=registry)
|
||||
|
||||
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
|
||||
metric_key = self._get_arg_metric_key("reStake")
|
||||
self.metrics[metric_key].set(self.contract_agent.is_restaking(self.staker_address))
|
||||
self.metrics[metric_key].set(contract_agent.is_restaking(self.staker_address))
|
||||
|
||||
|
||||
class WindDownEventMetricsCollector(EventMetricsCollector):
|
||||
|
@ -401,9 +406,9 @@ class WindDownEventMetricsCollector(EventMetricsCollector):
|
|||
|
||||
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
|
||||
super().initialize(metrics_prefix=metrics_prefix, registry=registry)
|
||||
|
||||
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
|
||||
metric_key = self._get_arg_metric_key("windDown")
|
||||
self.metrics[metric_key].set(self.contract_agent.is_winding_down(self.staker_address))
|
||||
self.metrics[metric_key].set(contract_agent.is_winding_down(self.staker_address))
|
||||
|
||||
|
||||
class WorkerBondedEventMetricsCollector(EventMetricsCollector):
|
||||
|
@ -420,18 +425,20 @@ class WorkerBondedEventMetricsCollector(EventMetricsCollector):
|
|||
|
||||
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
|
||||
super().initialize(metrics_prefix=metrics_prefix, registry=registry)
|
||||
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
|
||||
self.metrics["current_worker_is_me_gauge"] = Gauge(f'{metrics_prefix}_current_worker_is_me',
|
||||
'Current worker is me',
|
||||
registry=registry)
|
||||
|
||||
# set initial value
|
||||
self.metrics["current_worker_is_me_gauge"].set(
|
||||
self.contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address)
|
||||
contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address)
|
||||
|
||||
def _event_occurred(self, event) -> None:
|
||||
super()._event_occurred(event)
|
||||
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
|
||||
self.metrics["current_worker_is_me_gauge"].set(
|
||||
self.contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address)
|
||||
contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address)
|
||||
|
||||
|
||||
class WorkLockRefundEventMetricsCollector(EventMetricsCollector):
|
||||
|
@ -449,4 +456,5 @@ class WorkLockRefundEventMetricsCollector(EventMetricsCollector):
|
|||
|
||||
def _event_occurred(self, event) -> None:
|
||||
super()._event_occurred(event)
|
||||
self.metrics["worklock_deposited_eth_gauge"].set(self.contract_agent.get_deposited_eth(self.staker_address))
|
||||
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
|
||||
self.metrics["worklock_deposited_eth_gauge"].set(contract_agent.get_deposited_eth(self.staker_address))
|
||||
|
|
|
@ -50,7 +50,7 @@ from typing import List
|
|||
from twisted.internet import reactor, task
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent, PolicyManagerAgent, WorkLockAgent
|
||||
from nucypher.blockchain.eth.agents import StakingEscrowAgent, PolicyManagerAgent, WorkLockAgent
|
||||
|
||||
|
||||
class PrometheusMetricsConfig:
|
||||
|
@ -229,7 +229,6 @@ def create_metrics_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[Met
|
|||
def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]:
|
||||
"""Create collectors for staking-related events."""
|
||||
collectors: List[MetricsCollector] = []
|
||||
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=ursula.registry)
|
||||
|
||||
staker_address = ursula.checksum_address
|
||||
|
||||
|
@ -242,7 +241,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
|
|||
"period": (Gauge, f'{metrics_prefix}_activity_confirmed_period', 'Commitment made for period')
|
||||
},
|
||||
staker_address=staker_address,
|
||||
contract_agent=staking_agent))
|
||||
contract_agent_class=StakingEscrowAgent,
|
||||
contract_registry=ursula.registry
|
||||
))
|
||||
|
||||
# Minted
|
||||
collectors.append(EventMetricsCollector(
|
||||
|
@ -253,7 +254,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
|
|||
"block_number": (Gauge, f'{metrics_prefix}_mined_block_number', 'Minted block number')
|
||||
},
|
||||
argument_filters={'staker': staker_address},
|
||||
contract_agent=staking_agent))
|
||||
contract_agent_class=StakingEscrowAgent,
|
||||
contract_registry=ursula.registry
|
||||
))
|
||||
|
||||
# Slashed
|
||||
collectors.append(EventMetricsCollector(
|
||||
|
@ -265,7 +268,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
|
|||
'Slashed penalty block number')
|
||||
},
|
||||
argument_filters={'staker': staker_address},
|
||||
contract_agent=staking_agent))
|
||||
contract_agent_class=StakingEscrowAgent,
|
||||
contract_registry=ursula.registry
|
||||
))
|
||||
|
||||
# RestakeSet
|
||||
collectors.append(ReStakeEventMetricsCollector(
|
||||
|
@ -273,7 +278,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
|
|||
"reStake": (Gauge, f'{metrics_prefix}_restaking', 'Restake set')
|
||||
},
|
||||
staker_address=staker_address,
|
||||
contract_agent=staking_agent))
|
||||
contract_agent_class=StakingEscrowAgent,
|
||||
contract_registry=ursula.registry
|
||||
))
|
||||
|
||||
# WindDownSet
|
||||
collectors.append(WindDownEventMetricsCollector(
|
||||
|
@ -281,7 +288,9 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
|
|||
"windDown": (Gauge, f'{metrics_prefix}_wind_down', 'is windDown')
|
||||
},
|
||||
staker_address=staker_address,
|
||||
contract_agent=staking_agent))
|
||||
contract_agent_class=StakingEscrowAgent,
|
||||
contract_registry=ursula.registry
|
||||
))
|
||||
|
||||
# WorkerBonded
|
||||
collectors.append(WorkerBondedEventMetricsCollector(
|
||||
|
@ -291,42 +300,40 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
|
|||
},
|
||||
staker_address=staker_address,
|
||||
worker_address=ursula.worker_address,
|
||||
contract_agent=staking_agent))
|
||||
contract_agent_class=StakingEscrowAgent,
|
||||
contract_registry=ursula.registry
|
||||
))
|
||||
|
||||
return collectors
|
||||
|
||||
|
||||
def create_worklock_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]:
|
||||
"""Create collectors for worklock-related events."""
|
||||
collectors: List[MetricsCollector] = []
|
||||
worklock_agent = ContractAgency.get_agent(WorkLockAgent, registry=ursula.registry)
|
||||
staker_address = ursula.checksum_address
|
||||
|
||||
# Refund
|
||||
collectors.append(WorkLockRefundEventMetricsCollector(
|
||||
collectors: List[MetricsCollector] = [WorkLockRefundEventMetricsCollector(
|
||||
event_args_config={
|
||||
"refundETH": (Gauge, f'{metrics_prefix}_worklock_refund_refundETH',
|
||||
'Refunded ETH'),
|
||||
},
|
||||
staker_address=staker_address,
|
||||
contract_agent=worklock_agent,
|
||||
))
|
||||
staker_address=ursula.checksum_address,
|
||||
contract_agent_class=WorkLockAgent,
|
||||
contract_registry=ursula.registry
|
||||
)]
|
||||
|
||||
return collectors
|
||||
|
||||
|
||||
def create_policy_events_metric_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[MetricsCollector]:
|
||||
"""Create collectors for policy-related events."""
|
||||
collectors: List[MetricsCollector] = []
|
||||
policy_manager_agent = ContractAgency.get_agent(PolicyManagerAgent, registry=ursula.registry)
|
||||
|
||||
# Withdrawn
|
||||
collectors.append(EventMetricsCollector(
|
||||
collectors: List[MetricsCollector] = [EventMetricsCollector(
|
||||
event_name='Withdrawn',
|
||||
event_args_config={
|
||||
"value": (Gauge, f'{metrics_prefix}_policy_withdrawn_reward', 'Policy reward')
|
||||
},
|
||||
argument_filters={"recipient": ursula.checksum_address},
|
||||
contract_agent=policy_manager_agent))
|
||||
contract_agent_class=PolicyManagerAgent,
|
||||
contract_registry=ursula.registry
|
||||
)]
|
||||
|
||||
return collectors
|
||||
|
|
Loading…
Reference in New Issue