Adjust prometheus collectors

pull/2861/head
Kieran Prasch 2022-02-02 11:31:26 -08:00
parent 9692335b8b
commit b5a2733f7c
2 changed files with 26 additions and 65 deletions

View File

@ -134,13 +134,10 @@ class UrsulaInfoMetricsCollector(BaseMetricsCollector):
self.metrics["work_orders_gauge"].set(len(reencryption_requests)) self.metrics["work_orders_gauge"].set(len(reencryption_requests))
if not self.ursula.federated_only: if not self.ursula.federated_only:
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=self.ursula.registry) staking_agent = ContractAgency.get_agent(PREApplicationAgent, registry=self.ursula.registry)
locked = staking_agent.get_locked_tokens(staker_address=self.ursula.checksum_address, periods=1) authorized = staking_agent.get_authorized_stake(staking_provider=self.ursula.checksum_address)
missing_commitments = staking_agent.get_missing_commitments(
checksum_address=self.ursula.checksum_address)
decentralized_payload = {'provider': str(self.ursula.provider_uri), decentralized_payload = {'provider': str(self.ursula.provider_uri),
'active_stake': str(locked), 'active_stake': str(authorized)}
'missing_commitments': str(missing_commitments)}
base_payload.update(decentralized_payload) base_payload.update(decentralized_payload)
# TODO: Arrangements are deprecated and Policies are no longer trackable by arrangement storage. # TODO: Arrangements are deprecated and Policies are no longer trackable by arrangement storage.
@ -190,9 +187,6 @@ class StakerMetricsCollector(BaseMetricsCollector):
'All tokens that belong to the staker, including ' 'All tokens that belong to the staker, including '
'locked, unlocked and rewards', 'locked, unlocked and rewards',
registry=registry), registry=registry),
"missing_commitments_gauge": Gauge(f'{metrics_prefix}_missing_commitments',
'Currently missed commitments',
registry=registry),
} }
def _collect_internal(self) -> None: def _collect_internal(self) -> None:
@ -220,33 +214,29 @@ class StakerMetricsCollector(BaseMetricsCollector):
self.metrics["unlocked_tokens_gauge"].set(unlocked_tokens) self.metrics["unlocked_tokens_gauge"].set(unlocked_tokens)
self.metrics["owned_tokens_gauge"].set(owned_tokens) self.metrics["owned_tokens_gauge"].set(owned_tokens)
# missed commitments
missing_commitments = staking_agent.get_missing_commitments(checksum_address=self.staker_address)
self.metrics["missing_commitments_gauge"].set(missing_commitments)
class OperatorMetricsCollector(BaseMetricsCollector):
class WorkerMetricsCollector(BaseMetricsCollector): """Collector for Operator specific metrics."""
"""Collector for Worker specific metrics.""" def __init__(self, domain: str, operator_address: ChecksumAddress, contract_registry: BaseContractRegistry):
def __init__(self, domain: str, worker_address: ChecksumAddress, contract_registry: BaseContractRegistry):
super().__init__() super().__init__()
self.domain = domain self.domain = domain
self.worker_address = worker_address self.operator_address = operator_address
self.contract_registry = contract_registry self.contract_registry = contract_registry
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None: def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
self.metrics = { self.metrics = {
"worker_eth_balance_gauge": Gauge(f'{metrics_prefix}_worker_eth_balance', "worker_eth_balance_gauge": Gauge(f'{metrics_prefix}_worker_eth_balance',
'Worker Ethereum balance', 'Operator Ethereum balance',
registry=registry), registry=registry),
"worker_token_balance_gauge": Gauge(f'{metrics_prefix}_worker_token_balance', "worker_token_balance_gauge": Gauge(f'{metrics_prefix}_worker_token_balance',
'Worker NuNit balance', 'Operator NuNit balance',
registry=registry), registry=registry),
} }
def _collect_internal(self) -> None: def _collect_internal(self) -> None:
nucypher_worker_token_actor = NucypherTokenActor(registry=self.contract_registry, nucypher_worker_token_actor = NucypherTokenActor(registry=self.contract_registry,
domain=self.domain, domain=self.domain,
checksum_address=self.worker_address) checksum_address=self.operator_address)
self.metrics["worker_eth_balance_gauge"].set(nucypher_worker_token_actor.eth_balance) self.metrics["worker_eth_balance_gauge"].set(nucypher_worker_token_actor.eth_balance)
self.metrics["worker_token_balance_gauge"].set(int(nucypher_worker_token_actor.token_balance)) self.metrics["worker_token_balance_gauge"].set(int(nucypher_worker_token_actor.token_balance))
@ -355,35 +345,6 @@ class CommitmentMadeEventMetricsCollector(EventMetricsCollector):
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None: def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
super().initialize(metrics_prefix=metrics_prefix, registry=registry) super().initialize(metrics_prefix=metrics_prefix, registry=registry)
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
missing_commitments = contract_agent.get_missing_commitments(checksum_address=self.staker_address)
if missing_commitments == 0:
# has either already committed to this period or the next period
# use local event filter for initial data
last_committed_period = contract_agent.get_last_committed_period(staker_address=self.staker_address)
arg_filters = {'staker': self.staker_address, 'period': last_committed_period}
latest_block = contract_agent.blockchain.client.block_number
previous_period = contract_agent.get_current_period() - 1 # just in case
# we estimate the block number for the previous period to start search from since either
# - commitment made during previous period for current period, OR
# - commitment made during current period for next period
block_number_for_previous_period = estimate_block_number_for_period(
period=previous_period,
seconds_per_period=contract_agent.staking_parameters()[1],
latest_block=latest_block)
events_throttler = ContractEventsThrottler(agent=contract_agent,
event_name=self.event_name,
from_block=block_number_for_previous_period,
to_block=latest_block,
**arg_filters)
for event_record in events_throttler:
self._event_occurred(event_record.raw_event)
# update last block checked since we just looked for this event up to and including latest block
# block range is inclusive, hence the increment
self.filter_current_from_block = latest_block + 1
class ReStakeEventMetricsCollector(EventMetricsCollector): class ReStakeEventMetricsCollector(EventMetricsCollector):
@ -412,17 +373,17 @@ class WindDownEventMetricsCollector(EventMetricsCollector):
self.metrics[metric_key].set(contract_agent.is_winding_down(self.staker_address)) self.metrics[metric_key].set(contract_agent.is_winding_down(self.staker_address))
class WorkerBondedEventMetricsCollector(EventMetricsCollector): class OperatorBondedEventMetricsCollector(EventMetricsCollector):
"""Collector for WorkerBonded event.""" """Collector for OperatorBonded event."""
def __init__(self, def __init__(self,
staker_address: ChecksumAddress, staker_address: ChecksumAddress,
worker_address: ChecksumAddress, operator_address: ChecksumAddress,
event_name: str = 'WorkerBonded', event_name: str = 'OperatorBonded',
*args, *args,
**kwargs): **kwargs):
super().__init__(event_name=event_name, argument_filters={'staker': staker_address}, *args, **kwargs) super().__init__(event_name=event_name, argument_filters={'staker': staker_address}, *args, **kwargs)
self.staker_address = staker_address self.staker_address = staker_address
self.worker_address = worker_address self.operator_address = operator_address
def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None: def initialize(self, metrics_prefix: str, registry: CollectorRegistry) -> None:
super().initialize(metrics_prefix=metrics_prefix, registry=registry) super().initialize(metrics_prefix=metrics_prefix, registry=registry)
@ -433,13 +394,13 @@ class WorkerBondedEventMetricsCollector(EventMetricsCollector):
# set initial value # set initial value
self.metrics["current_worker_is_me_gauge"].set( self.metrics["current_worker_is_me_gauge"].set(
contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address) contract_agent.get_worker_from_staker(self.staker_address) == self.operator_address)
def _event_occurred(self, event) -> None: def _event_occurred(self, event) -> None:
super()._event_occurred(event) super()._event_occurred(event)
contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry) contract_agent = ContractAgency.get_agent(self.contract_agent_class, registry=self.contract_registry)
self.metrics["current_worker_is_me_gauge"].set( self.metrics["current_worker_is_me_gauge"].set(
contract_agent.get_worker_from_staker(self.staker_address) == self.worker_address) contract_agent.get_worker_from_staker(self.staker_address) == self.operator_address)
class WorkLockRefundEventMetricsCollector(EventMetricsCollector): class WorkLockRefundEventMetricsCollector(EventMetricsCollector):

View File

@ -36,12 +36,12 @@ from nucypher.utilities.prometheus.collector import (
UrsulaInfoMetricsCollector, UrsulaInfoMetricsCollector,
BlockchainMetricsCollector, BlockchainMetricsCollector,
StakerMetricsCollector, StakerMetricsCollector,
WorkerMetricsCollector, OperatorMetricsCollector,
WorkLockMetricsCollector, WorkLockMetricsCollector,
EventMetricsCollector, EventMetricsCollector,
ReStakeEventMetricsCollector, ReStakeEventMetricsCollector,
WindDownEventMetricsCollector, WindDownEventMetricsCollector,
WorkerBondedEventMetricsCollector, OperatorBondedEventMetricsCollector,
CommitmentMadeEventMetricsCollector, CommitmentMadeEventMetricsCollector,
WorkLockRefundEventMetricsCollector) WorkLockRefundEventMetricsCollector)
@ -191,9 +191,9 @@ def create_metrics_collectors(ursula: 'Ursula', metrics_prefix: str) -> List[Met
staker_address=ursula.checksum_address, staker_address=ursula.checksum_address,
contract_registry=ursula.registry)) contract_registry=ursula.registry))
# Worker prometheus # Operator prometheus
collectors.append(WorkerMetricsCollector(domain=ursula.domain, collectors.append(OperatorMetricsCollector(domain=ursula.domain,
worker_address=ursula.worker_address, operator_address=ursula.operator_address,
contract_registry=ursula.registry)) contract_registry=ursula.registry))
# #
@ -292,14 +292,14 @@ def create_staking_events_metric_collectors(ursula: 'Ursula', metrics_prefix: st
contract_registry=ursula.registry contract_registry=ursula.registry
)) ))
# WorkerBonded # OperatorBonded
collectors.append(WorkerBondedEventMetricsCollector( collectors.append(OperatorBondedEventMetricsCollector(
event_args_config={ event_args_config={
"startPeriod": (Gauge, f'{metrics_prefix}_worker_set_start_period', 'New worker was bonded'), "startPeriod": (Gauge, f'{metrics_prefix}_worker_set_start_period', 'New worker was bonded'),
"block_number": (Gauge, f'{metrics_prefix}_worker_set_block_number', 'WorkerBonded block number') "block_number": (Gauge, f'{metrics_prefix}_worker_set_block_number', 'OperatorBonded block number')
}, },
staker_address=staker_address, staker_address=staker_address,
worker_address=ursula.worker_address, operator_address=ursula.operator_address,
contract_agent_class=StakingEscrowAgent, contract_agent_class=StakingEscrowAgent,
contract_registry=ursula.registry contract_registry=ursula.registry
)) ))