nucypher/tests/unit/test_event_scanner.py

291 lines
9.0 KiB
Python

import time
from datetime import datetime
from typing import Tuple
from unittest.mock import MagicMock, Mock
import pytest
from nucypher.utilities.events import EventScanner, EventScannerState, JSONifiedState
CHAIN_REORG_WINDOW = 10
def test_estimate_next_chunk_size():
scanner = EventScanner(
web3=Mock(), contract=Mock(), state=Mock(), events=[], filters={}
)
# no prior events found
current_chunk_size = 20
while current_chunk_size < scanner.max_scan_chunk_size:
next_chunk_size = scanner.estimate_next_chunk_size(
current_chunk_size=current_chunk_size, event_found_count=0
)
assert next_chunk_size == min(
scanner.max_scan_chunk_size,
(current_chunk_size * scanner.chunk_size_increase),
)
current_chunk_size = next_chunk_size
next_chunk_size = scanner.estimate_next_chunk_size(
current_chunk_size=current_chunk_size, event_found_count=0
)
assert next_chunk_size == scanner.max_scan_chunk_size
current_chunk_size = next_chunk_size
# event(s) found
for i in range(1, 10):
next_chunk_size = scanner.estimate_next_chunk_size(
current_chunk_size=current_chunk_size, event_found_count=i
)
assert next_chunk_size == scanner.min_scan_chunk_size
current_chunk_size = next_chunk_size
# events no longer found again
while current_chunk_size < scanner.max_scan_chunk_size:
next_chunk_size = scanner.estimate_next_chunk_size(
current_chunk_size=current_chunk_size, event_found_count=0
)
assert next_chunk_size == min(
scanner.max_scan_chunk_size,
(current_chunk_size * scanner.chunk_size_increase),
)
current_chunk_size = next_chunk_size
def test_suggested_scan_start_block():
state = Mock(spec=EventScannerState)
scanner = EventScanner(
web3=Mock(),
contract=Mock(),
state=state,
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
# mimic start
last_scanned_block = 0
state.get_last_scanned_block.return_value = last_scanned_block
assert scanner.get_suggested_scan_start_block() == 1 # first block
# we've progressed less than change reorg
last_scanned_block = CHAIN_REORG_WINDOW - 4
state.get_last_scanned_block.return_value = last_scanned_block
assert scanner.get_suggested_scan_start_block() == 1 # still first block
# we've progressed further
last_scanned_blocks = [19, 100, 242341, 151552423]
for last_scanned_block in last_scanned_blocks:
state.get_last_scanned_block.return_value = last_scanned_block
assert scanner.get_suggested_scan_start_block() == (
last_scanned_block - CHAIN_REORG_WINDOW
)
def test_suggested_scan_end_block():
web3 = MagicMock()
scanner = EventScanner(
web3=web3,
contract=Mock(),
state=Mock(),
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
block_nums = [1, 10, 231, 12319021]
for block_num in block_nums:
web3.eth.block_number = block_num
assert scanner.get_suggested_scan_end_block() == (block_num - 1)
def test_get_block_timestamp():
web3 = MagicMock()
scanner = EventScanner(
web3=web3,
contract=Mock(),
state=Mock(),
events=[],
filters={},
)
now = time.time()
web3.eth.get_block.return_value = {"timestamp": now}
assert scanner.get_block_timestamp(block_num=0) == datetime.utcfromtimestamp(now)
other_time = time.time() - 1231231
web3.eth.get_block.return_value = {"timestamp": other_time}
assert scanner.get_block_timestamp(block_num=21) == datetime.utcfromtimestamp(
other_time
)
def test_scan_invalid_start_end_block():
scanner = EventScanner(
web3=Mock(),
contract=Mock(),
state=Mock(),
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
with pytest.raises(ValueError):
# invalid: end_block > start_block
scanner.scan(start_block=11, end_block=10)
@pytest.mark.parametrize("chunk_size", [1, 3, 5, 7, 10])
def test_scan_when_events_always_found(chunk_size):
state = JSONifiedState(persistent=False)
state.reset() # TODO why is this needed if persistent is False
start_block = 0
end_block = 100
scanner = MyEventScanner(
web3=Mock(),
contract=Mock(),
state=state,
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=chunk_size,
target_end_block=end_block,
)
expected_calls = generate_expected_scan_calls_results(
scanner, start_block, end_block
)
all_processed, total_chunks_scanned = scanner.scan(
start_block, end_block, start_chunk_size=chunk_size
)
assert total_chunks_scanned == len(expected_calls)
assert scanner.scan_chunk_calls_made == expected_calls
assert scanner.get_last_scanned_block() == end_block
# check value for next scan
assert scanner.get_suggested_scan_start_block() == (end_block - CHAIN_REORG_WINDOW)
@pytest.mark.parametrize("chunk_size", [2, 6, 7, 11, 15])
def test_scan_when_events_never_found(chunk_size):
state = JSONifiedState(persistent=False)
state.reset() # TODO why is this needed if persistent is False
start_block = 0
end_block = 999
scanner = MyEventScanner(
web3=Mock(),
contract=Mock(),
state=state,
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=chunk_size,
return_event_for_scan_chunk=False, # min chunk size not used (but scales up)
target_end_block=end_block,
)
expected_calls = generate_expected_scan_calls_results(
scanner, start_block, end_block
)
all_processed, total_chunks_scanned = scanner.scan(
start_block, end_block, start_chunk_size=chunk_size
)
assert total_chunks_scanned == len(expected_calls)
assert len(all_processed) == 0 # no events processed
assert scanner.scan_chunk_calls_made == expected_calls
assert scanner.get_last_scanned_block() == end_block
# check value for next scan
assert scanner.get_suggested_scan_start_block() == (end_block - CHAIN_REORG_WINDOW)
def test_scan_when_events_never_found_super_large_chunk_sizes():
state = JSONifiedState(persistent=False)
state.reset() # TODO why is this needed if persistent is False
start_block = 0
end_block = 1320000
min_chunk_size = 200
max_chunk_size = 10000
scanner = MyEventScanner(
web3=Mock(),
contract=Mock(),
state=state,
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=min_chunk_size,
max_chunk_scan_size=max_chunk_size,
return_event_for_scan_chunk=False, # min chunk size not used (but scales up)
target_end_block=end_block,
)
expected_calls = generate_expected_scan_calls_results(
scanner, start_block, end_block
)
all_processed, total_chunks_scanned = scanner.scan(
start_block, end_block, start_chunk_size=min_chunk_size
)
assert total_chunks_scanned == len(expected_calls)
assert len(all_processed) == 0 # no events processed
assert scanner.scan_chunk_calls_made == expected_calls
assert scanner.get_last_scanned_block() == end_block
# check value for next scan
assert scanner.get_suggested_scan_start_block() == (end_block - CHAIN_REORG_WINDOW)
def generate_expected_scan_calls_results(scanner, start_block, end_block):
expected_calls = []
current_chunk_size = scanner.min_scan_chunk_size
while True:
chunk_end_block = min(start_block + current_chunk_size, end_block)
expected_calls.append((start_block, chunk_end_block))
start_block = chunk_end_block + 1 # next block
if not scanner.return_chunk_scan_event:
current_chunk_size = min(
scanner.max_scan_chunk_size,
current_chunk_size * scanner.chunk_size_increase,
)
if start_block > end_block:
break
return expected_calls
class MyEventScanner(EventScanner):
def __init__(
self,
target_end_block: int,
return_event_for_scan_chunk: bool = True,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.target_end_block = target_end_block
self.chunk_calls_made = []
self.return_chunk_scan_event = return_event_for_scan_chunk
def scan_chunk(self, start_block, end_block) -> Tuple[int, datetime, list]:
assert start_block <= end_block
assert end_block <= self.target_end_block
self.chunk_calls_made.append((start_block, end_block))
event = ["event"] if self.return_chunk_scan_event else []
return end_block, datetime.now(), event # results
@property
def scan_chunk_calls_made(self):
return self.chunk_calls_made