Allow EventScanner to obtain different events as part of the same call to reduce overall rpc calls. Multiple events can be obtained by one call, instead of making one getLogs call per event.

EventScanner will no longer take specified filters since more than one event type can be obtained at a time. We don't utilize event specific filters currently anyway - and we probably won't need to.
remotes/origin/v7.4.x
derekpierre 2024-02-12 16:49:00 -05:00 committed by Derek Pierre
parent 0a01fb900b
commit 9f8dfffd11
3 changed files with 43 additions and 53 deletions

View File

@ -132,7 +132,6 @@ class ActiveRitualTracker:
state=self.state,
contract=self.contract,
events=self.events,
filters={"address": self.contract.address},
# How many maximum blocks at the time we request from JSON-RPC,
# and we are unlikely to exceed the response size limit of the JSON-RPC server
max_chunk_scan_size=self.MAX_CHUNK_SIZE,

View File

@ -19,9 +19,10 @@ from typing import Callable, Dict, Iterable, List, Optional, Tuple
import maya
from eth_abi.codec import ABICodec
from eth_utils import encode_hex
from eth_utils.abi import event_abi_to_log_topic
from web3 import Web3
from web3._utils.events import get_event_data
from web3._utils.filters import construct_event_filter_params
from web3.contract.contract import Contract
from web3.datastructures import AttributeDict
from web3.exceptions import BlockNotFound
@ -138,7 +139,6 @@ class EventScanner:
contract: Contract,
state: EventScannerState,
events: List,
filters: {},
min_chunk_scan_size: int = 10, # 12 s/block = 120 seconds period
max_chunk_scan_size: int = 10000,
max_request_retries: int = 30,
@ -160,7 +160,6 @@ class EventScanner:
self.web3 = web3
self.state = state
self.events = events
self.filters = filters
# Our JSON-RPC throttling parameters
self.min_scan_chunk_size = min_chunk_scan_size
@ -235,19 +234,17 @@ class EventScanner:
return block_timestamps[block_num]
all_processed = []
events = _fetch_events_for_contract(
self.web3,
self.contract,
self.events,
from_block=start_block,
to_block=end_block,
)
for event_type in self.events:
events = _fetch_events_for_all_contracts(
self.web3,
event_type,
self.filters,
from_block=start_block,
to_block=end_block,
)
for evt in events:
processed = self.process_event(event=evt, get_block_when=get_block_when)
all_processed.append(processed)
for evt in events:
processed = self.process_event(event=evt, get_block_when=get_block_when)
all_processed.append(processed)
end_block_timestamp = get_block_when(end_block)
return end_block, end_block_timestamp, all_processed
@ -363,12 +360,9 @@ class EventScanner:
return all_processed, total_chunks_scanned
def _fetch_events_for_all_contracts(
web3,
event,
argument_filters: dict,
from_block: int,
to_block: int) -> Iterable:
def _fetch_events_for_contract(
web3, contract, events, from_block: int, to_block: int
) -> Iterable:
"""Get events using eth_get_logs API.
This method is detached from any contract instance.
@ -380,10 +374,6 @@ def _fetch_events_for_all_contracts(
if from_block is None:
raise TypeError("Missing mandatory keyword argument to get_logs: fromBlock")
# Currently no way to poke this using a public Web3.py API.
# This will return raw underlying ABI JSON object for the event
abi = event._get_event_abi()
# Depending on the Solidity version used to compile
# the contract that uses the ABI,
# it might have Solidity ABI encoding v1 or v2.
@ -391,20 +381,21 @@ def _fetch_events_for_all_contracts(
# More information here https://eth-abi.readthedocs.io/en/latest/index.html
codec: ABICodec = web3.codec
# Here we need to poke a bit into Web3 internals, as this
# functionality is not exposed by default.
# Construct JSON-RPC raw filter presentation based on human readable Python descriptions
# Namely, convert event names to their keccak signatures
# More information here:
# https://github.com/ethereum/web3.py/blob/e176ce0793dafdd0573acc8d4b76425b6eb604ca/web3/_utils/filters.py#L71
data_filter_set, event_filter_params = construct_event_filter_params(
abi,
codec,
address=argument_filters.get("address"),
argument_filters=argument_filters,
fromBlock=from_block,
toBlock=to_block
)
topics = set()
event_topics_to_abis = {}
for event_type in events:
event_abi = event_type._get_event_abi()
event_topic = encode_hex(event_abi_to_log_topic(event_abi)) # type: ignore
topics.add(event_topic)
event_topics_to_abis[event_topic] = event_abi
event_filter_params = {
"address": contract.address,
"topics": [list(topics)],
"fromBlock": from_block,
}
if to_block is not None:
event_filter_params["toBlock"] = to_block
logger.debug(
f"Querying eth_get_logs with the following parameters: {event_filter_params}"
@ -417,10 +408,20 @@ def _fetch_events_for_all_contracts(
# Convert raw binary data to Python proxy objects as described by ABI
all_events = []
for log in logs:
# Convert raw JSON-RPC log result to human readable event by using ABI data
topics = log["topics"]
event_abi = event_topics_to_abis.get(
encode_hex(topics[0])
) # first topic is always event signature
if not event_abi:
# don't expect to get here since the topics were limited to the events specified
raise ValueError(
f"Unable to obtain event abi for received event with signature {topics[0]}"
)
# Convert raw JSON-RPC log result to human-readable event by using ABI data
# More information how processLog works here
# https://github.com/ethereum/web3.py/blob/fbaf1ad11b0c7fac09ba34baff2c256cffe0a148/web3/_utils/events.py#L200
evt = get_event_data(codec, abi, log)
evt = get_event_data(codec, event_abi, log)
# Note: This was originally yield,
# but deferring the timeout exception caused the throttle logic not to work
all_events.append(evt)

View File

@ -13,9 +13,7 @@ CHAIN_REORG_WINDOW = ActiveRitualTracker.CHAIN_REORG_SCAN_WINDOW
def test_estimate_next_chunk_size():
scanner = EventScanner(
web3=Mock(), contract=Mock(), state=Mock(), events=[], filters={}
)
scanner = EventScanner(web3=Mock(), contract=Mock(), state=Mock(), events=[])
# no prior events found
current_chunk_size = 20
@ -63,7 +61,6 @@ def test_suggested_scan_start_block():
contract=Mock(),
state=state,
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
@ -94,7 +91,6 @@ def test_suggested_scan_end_block():
contract=Mock(),
state=Mock(),
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
@ -112,7 +108,6 @@ def test_get_block_timestamp():
contract=Mock(),
state=Mock(),
events=[],
filters={},
)
now = time.time()
@ -132,7 +127,6 @@ def test_scan_invalid_start_end_block():
contract=Mock(),
state=Mock(),
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
@ -153,7 +147,6 @@ def test_scan_when_events_always_found(chunk_size):
contract=Mock(),
state=state,
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=chunk_size,
target_end_block=end_block,
@ -184,7 +177,6 @@ def test_scan_when_events_never_found(chunk_size):
contract=Mock(),
state=state,
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=chunk_size,
return_event_for_scan_chunk=False, # min chunk size not used (but scales up)
@ -223,7 +215,6 @@ def test_scan_when_events_never_found_super_large_chunk_sizes():
contract=Mock(),
state=state,
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
min_chunk_scan_size=min_chunk_size,
max_chunk_scan_size=max_chunk_size,
@ -295,7 +286,6 @@ def test_event_scanner_task():
contract=Mock(),
state=Mock(),
events=[],
filters={},
chain_reorg_rescan_window=CHAIN_REORG_WINDOW,
)
task = EventScannerTask(scanner.scan)