nucypher/tests/unit/test_event_scanner.py

295 lines
9.3 KiB
Python

import math
import time
from datetime import datetime
from typing import Tuple
from unittest.mock import MagicMock, Mock
import pytest
from nucypher.blockchain.eth.trackers.dkg import ActiveRitualTracker, EventScannerTask
from nucypher.utilities.events import EventScanner, EventScannerState, JSONifiedState
CHAIN_REORG_WINDOW = ActiveRitualTracker.CHAIN_REORG_SCAN_WINDOW
def test_estimate_next_chunk_size():
scanner = EventScanner(web3=Mock(), contract=Mock(), state=Mock(), events=[])
# no prior events found
current_chunk_size = 20
while current_chunk_size < scanner.max_scan_chunk_size:
next_chunk_size = scanner.estimate_next_chunk_size(
current_chunk_size=current_chunk_size, event_found_count=0
)
assert next_chunk_size == min(
scanner.max_scan_chunk_size,
(current_chunk_size * scanner.chunk_size_increase),
)
current_chunk_size = next_chunk_size
next_chunk_size = scanner.estimate_next_chunk_size(
current_chunk_size=current_chunk_size, event_found_count=0
)
assert next_chunk_size == scanner.max_scan_chunk_size
current_chunk_size = next_chunk_size
# event(s) found
for i in range(1, 10):
next_chunk_size = scanner.estimate_next_chunk_size(
current_chunk_size=current_chunk_size, event_found_count=i
)
assert next_chunk_size == scanner.min_scan_chunk_size
current_chunk_size = next_chunk_size
# events no longer found again
while current_chunk_size < scanner.max_scan_chunk_size:
next_chunk_size = scanner.estimate_next_chunk_size(
current_chunk_size=current_chunk_size, event_found_count=0
)
assert next_chunk_size == min(
scanner.max_scan_chunk_size,
(current_chunk_size * scanner.chunk_size_increase),
)
current_chunk_size = next_chunk_size
def test_suggested_scan_start_block():
state = Mock(spec=EventScannerState)
scanner = EventScanner(
web3=Mock(),
contract=Mock(),
state=state,
events=[],
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
# mimic start
last_scanned_block = 0
state.get_last_scanned_block.return_value = last_scanned_block
assert scanner.get_suggested_scan_start_block() == 1 # first block
# we've progressed less than change reorg
last_scanned_block = CHAIN_REORG_WINDOW - 4
state.get_last_scanned_block.return_value = last_scanned_block
assert scanner.get_suggested_scan_start_block() == 1 # still first block
# we've progressed further
last_scanned_blocks = [19, 100, 242341, 151552423]
for last_scanned_block in last_scanned_blocks:
state.get_last_scanned_block.return_value = last_scanned_block
assert scanner.get_suggested_scan_start_block() == max(
1, last_scanned_block - CHAIN_REORG_WINDOW
)
def test_suggested_scan_end_block():
web3 = MagicMock()
scanner = EventScanner(
web3=web3,
contract=Mock(),
state=Mock(),
events=[],
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
block_nums = [1, 10, 231, 12319021]
for block_num in block_nums:
web3.eth.block_number = block_num
assert scanner.get_suggested_scan_end_block() == (block_num - 1)
def test_get_block_timestamp():
web3 = MagicMock()
scanner = EventScanner(
web3=web3,
contract=Mock(),
state=Mock(),
events=[],
)
now = time.time()
web3.eth.get_block.return_value = {"timestamp": now}
assert scanner.get_block_timestamp(block_num=0) == datetime.utcfromtimestamp(now)
other_time = time.time() - 1231231
web3.eth.get_block.return_value = {"timestamp": other_time}
assert scanner.get_block_timestamp(block_num=21) == datetime.utcfromtimestamp(
other_time
)
def test_scan_invalid_start_end_block():
scanner = EventScanner(
web3=Mock(),
contract=Mock(),
state=Mock(),
events=[],
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
with pytest.raises(ValueError):
# invalid: end_block > start_block
scanner.scan(start_block=11, end_block=10)
@pytest.mark.parametrize("chunk_size", [1, 3, 5, 7, 10])
def test_scan_when_events_always_found(chunk_size):
state = JSONifiedState(persistent=False)
state.reset() # TODO why is this needed if persistent is False
start_block = 0
end_block = 100
scanner = MyEventScanner(
web3=Mock(),
contract=Mock(),
state=state,
events=[],
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=chunk_size,
target_end_block=end_block,
)
expected_calls = generate_expected_scan_calls_results(
scanner, start_block, end_block
)
all_processed, total_chunks_scanned = scanner.scan(start_block, end_block)
assert total_chunks_scanned == len(expected_calls)
assert scanner.scan_chunk_calls_made == expected_calls
assert scanner.get_last_scanned_block() == end_block
# check value for next scan
assert scanner.get_suggested_scan_start_block() == (end_block - CHAIN_REORG_WINDOW)
@pytest.mark.parametrize("chunk_size", [2, 6, 7, 11, 15, 30])
def test_scan_when_events_never_found(chunk_size):
state = JSONifiedState(persistent=False)
state.reset() # TODO why is this needed if persistent is False
start_block = 0
end_block = 999
scanner = MyEventScanner(
web3=Mock(),
contract=Mock(),
state=state,
events=[],
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=chunk_size,
return_event_for_scan_chunk=False, # min chunk size not used (but scales up)
target_end_block=end_block,
)
expected_calls = generate_expected_scan_calls_results(
scanner, start_block, end_block
)
all_processed, total_chunks_scanned = scanner.scan(start_block, end_block)
assert total_chunks_scanned == len(expected_calls)
assert len(all_processed) == 0 # no events processed
assert scanner.scan_chunk_calls_made == expected_calls
assert len(scanner.scan_chunk_calls_made) <= math.ceil(
(end_block - start_block) / chunk_size
)
assert scanner.get_last_scanned_block() == end_block
# check value for next scan
assert scanner.get_suggested_scan_start_block() == (end_block - CHAIN_REORG_WINDOW)
def test_scan_when_events_never_found_super_large_chunk_sizes():
state = JSONifiedState(persistent=False)
state.reset() # TODO why is this needed if persistent is False
start_block = 0
end_block = 1320000
min_chunk_size = 200
max_chunk_size = 10000
scanner = MyEventScanner(
web3=Mock(),
contract=Mock(),
state=state,
events=[],
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=min_chunk_size,
max_chunk_scan_size=max_chunk_size,
return_event_for_scan_chunk=False, # min chunk size not used (but scales up)
target_end_block=end_block,
)
expected_calls = generate_expected_scan_calls_results(
scanner, start_block, end_block
)
all_processed, total_chunks_scanned = scanner.scan(start_block, end_block)
assert total_chunks_scanned == len(expected_calls)
assert len(all_processed) == 0 # no events processed
assert scanner.scan_chunk_calls_made == expected_calls
assert scanner.get_last_scanned_block() == end_block
# check value for next scan
assert scanner.get_suggested_scan_start_block() == (end_block - CHAIN_REORG_WINDOW)
def generate_expected_scan_calls_results(scanner, start_block, end_block):
expected_calls = []
current_chunk_size = scanner.min_scan_chunk_size
while True:
chunk_end_block = min(start_block + current_chunk_size, end_block)
expected_calls.append((start_block, chunk_end_block))
start_block = chunk_end_block + 1 # next block
if not scanner.return_chunk_scan_event:
current_chunk_size = min(
scanner.max_scan_chunk_size,
current_chunk_size * scanner.chunk_size_increase,
)
if start_block > end_block:
break
return expected_calls
class MyEventScanner(EventScanner):
def __init__(
self,
target_end_block: int,
return_event_for_scan_chunk: bool = True,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.target_end_block = target_end_block
self.chunk_calls_made = []
self.return_chunk_scan_event = return_event_for_scan_chunk
def scan_chunk(self, start_block, end_block) -> Tuple[int, datetime, list]:
assert start_block <= end_block
assert end_block <= self.target_end_block
self.chunk_calls_made.append((start_block, end_block))
event = ["event"] if self.return_chunk_scan_event else []
return end_block, datetime.now(), event # results
@property
def scan_chunk_calls_made(self):
return self.chunk_calls_made
def test_event_scanner_task():
scanner = EventScanner(
web3=Mock(),
contract=Mock(),
state=Mock(),
events=[],
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
task = EventScannerTask(scanner.scan)
assert task.interval == EventScannerTask.INTERVAL
assert task.scanner == scanner.scan