Add the ability for a moving average window to be used with the collector.

pull/3562/head
derekpierre 2024-11-05 14:29:04 -05:00
parent bc8f8f1b84
commit fb4152d8e2
No known key found for this signature in database
1 changed files with 11 additions and 17 deletions

View File

@ -1,3 +1,4 @@
import collections
import time
from collections import defaultdict
from threading import Lock
@ -11,8 +12,7 @@ class NodeLatencyStatsCollector:
Thread-safe utility that tracks latency statistics related to P2P connections with other nodes.
"""
CURRENT_AVERAGE = "current_avg"
COUNT = "count"
MAX_MOVING_AVERAGE_WINDOW = 5
MAX_LATENCY = float(2**16) # just need a large number for sorting
class NodeLatencyContextManager:
@ -38,28 +38,19 @@ class NodeLatencyStatsCollector:
execution_time = end_time - self.start_time
self._stats_collector._update_stats(self.staker_address, execution_time)
def __init__(self):
# staker_address -> { "total_time": <float>, "count": <integer> }
def __init__(self, max_moving_average_window: int = MAX_MOVING_AVERAGE_WINDOW):
self._node_stats = defaultdict(
lambda: {self.CURRENT_AVERAGE: 0.0, self.COUNT: 0}
lambda: collections.deque([], maxlen=max_moving_average_window)
)
self._lock = Lock()
def _update_stats(self, staking_address: ChecksumAddress, latest_time_taken: float):
with self._lock:
old_avg = self._node_stats[staking_address][self.CURRENT_AVERAGE]
old_count = self._node_stats[staking_address][self.COUNT]
updated_count = old_count + 1
updated_avg = ((old_avg * old_count) + latest_time_taken) / updated_count
self._node_stats[staking_address][self.CURRENT_AVERAGE] = updated_avg
self._node_stats[staking_address][self.COUNT] = updated_count
self._node_stats[staking_address].append(latest_time_taken)
def reset_stats(self, staking_address: ChecksumAddress):
with self._lock:
self._node_stats[staking_address][self.CURRENT_AVERAGE] = 0
self._node_stats[staking_address][self.COUNT] = 0
self._node_stats[staking_address].clear()
def get_latency_tracker(
self, staker_address: ChecksumAddress
@ -70,9 +61,12 @@ class NodeLatencyStatsCollector:
def get_average_latency_time(self, staking_address: ChecksumAddress) -> float:
with self._lock:
current_avg = self._node_stats[staking_address][self.CURRENT_AVERAGE]
readings = list(self._node_stats[staking_address])
num_readings = len(readings)
# just need a large number > 0
return self.MAX_LATENCY if current_avg == 0 else current_avg
return (
self.MAX_LATENCY if num_readings == 0 else sum(readings) / num_readings
)
def order_addresses_by_latency(
self, staking_addresses: List[ChecksumAddress]