Increase robustness of prometheus metrics task by implementing a PrometheusMetricsTracker.

pull/3405/head
derekpierre 2024-01-18 16:17:28 -05:00 committed by KPrasch
parent 720f133693
commit 3a191041de
4 changed files with 42 additions and 11 deletions

View File

@ -0,0 +1,24 @@
from typing import List
from nucypher.utilities.prometheus.collector import MetricsCollector
from nucypher.utilities.task import SimpleTask
class PrometheusMetricsTracker(SimpleTask):
def __init__(self, collectors: List[MetricsCollector], interval: float):
self.metrics_collectors = collectors
super().__init__(interval=interval)
def run(self):
for collector in self.metrics_collectors:
collector.collect()
def handle_errors(self, *args, **kwargs):
self.log.warn(
"Error during prometheus metrics collection: {}".format(
args[0].getTraceback()
)
)
if not self._task.running:
self.log.warn("Restarting prometheus metrics task!")
self.start(now=False) # take a breather

View File

@ -890,6 +890,8 @@ class Ursula(Teacher, Character, Operator):
certificate_filepath=certificate_filepath,
)
self._prometheus_metrics_tracker = None
def _substantiate_stamp(self):
transacting_power = self.transacting_power
signature = transacting_power.sign_message(message=bytes(self.stamp))
@ -1009,7 +1011,9 @@ class Ursula(Teacher, Character, Operator):
emitter.message("✓ Start Operator Bonded Tracker", color="green")
if prometheus_config:
start_prometheus_exporter(ursula=self, prometheus_config=prometheus_config)
self._prometheus_metrics_tracker = start_prometheus_exporter(
ursula=self, prometheus_config=prometheus_config
)
if emitter:
emitter.message(
f"✓ Prometheus Exporter http://{self.rest_interface.host}:"
@ -1061,6 +1065,8 @@ class Ursula(Teacher, Character, Operator):
self.stop_learning_loop()
self._operator_bonded_tracker.stop()
self.ritual_tracker.stop()
if self._prometheus_metrics_tracker:
self._prometheus_metrics_tracker.stop()
if halt_reactor:
reactor.stop()

View File

@ -6,10 +6,11 @@ from prometheus_client.core import Timestamp
from prometheus_client.registry import REGISTRY, CollectorRegistry
from prometheus_client.twisted import MetricsResource
from prometheus_client.utils import floatToGoString
from twisted.internet import reactor, task
from twisted.internet import reactor
from twisted.web.resource import Resource
from twisted.web.server import Site
from nucypher.blockchain.eth.trackers.prometheus import PrometheusMetricsTracker
from nucypher.characters import lawful
from nucypher.utilities.prometheus.collector import (
BlockchainMetricsCollector,
@ -121,7 +122,7 @@ def start_prometheus_exporter(
ursula: "lawful.Ursula",
prometheus_config: PrometheusMetricsConfig,
registry: CollectorRegistry = REGISTRY,
) -> None:
) -> PrometheusMetricsTracker:
"""Configure, collect, and serve prometheus metrics."""
# Disabling default collector metrics
@ -134,13 +135,10 @@ def start_prometheus_exporter(
for collector in metrics_collectors:
collector.initialize(registry=registry)
# Scheduling
metrics_task = task.LoopingCall(
collect_prometheus_metrics, metrics_collectors=metrics_collectors
)
metrics_task.start(
interval=prometheus_config.collection_interval, now=prometheus_config.start_now
metrics_tracker = PrometheusMetricsTracker(
collectors=metrics_collectors, interval=prometheus_config.collection_interval
)
metrics_tracker.start(now=prometheus_config.start_now)
# WSGI Service
root = Resource()
@ -151,6 +149,8 @@ def start_prometheus_exporter(
prometheus_config.port, factory, interface=prometheus_config.listen_address
)
return metrics_tracker
def create_metrics_collectors(ursula: "lawful.Ursula") -> List[MetricsCollector]:
"""Create collectors used to obtain metrics."""

View File

@ -12,7 +12,8 @@ class SimpleTask(ABC):
INTERVAL = 60 # 60s default
CLOCK = reactor
def __init__(self):
def __init__(self, interval: float = INTERVAL):
self.interval = interval
self.log = Logger(self.__class__.__name__)
self._task = LoopingCall(self.run)
# self.__task.clock = self.CLOCK
@ -25,7 +26,7 @@ class SimpleTask(ABC):
def start(self, now: bool = False):
"""Start task."""
if not self.running:
d = self._task.start(interval=self.INTERVAL, now=now)
d = self._task.start(interval=self.interval, now=now)
d.addErrback(self.handle_errors)
# return d