mirror of https://github.com/nucypher/nucypher.git
Adapt PeriodTracker to WorkTracker; Geth client duration bug fix.
parent
107b6aa1c4
commit
22fc7d16b4
|
@ -52,7 +52,7 @@ from nucypher.blockchain.eth.deployers import (
|
|||
from nucypher.blockchain.eth.interfaces import BlockchainDeployerInterface, BlockchainInterfaceFactory
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterface
|
||||
from nucypher.blockchain.eth.registry import AllocationRegistry, BaseContractRegistry
|
||||
from nucypher.blockchain.eth.token import NU, Stake, StakeList, PeriodTracker
|
||||
from nucypher.blockchain.eth.token import NU, Stake, StakeList, WorkTracker
|
||||
from nucypher.blockchain.eth.utils import datetime_to_period, calculate_period_duration, datetime_at_period
|
||||
from nucypher.characters.control.emitters import StdoutEmitter
|
||||
from nucypher.cli.painting import paint_contract_deployment
|
||||
|
@ -601,9 +601,9 @@ class Worker(NucypherTokenActor):
|
|||
|
||||
def __init__(self,
|
||||
is_me: bool,
|
||||
period_tracker: PeriodTracker = None,
|
||||
work_tracker: WorkTracker = None,
|
||||
worker_address: str = None,
|
||||
start_working_loop: bool = True,
|
||||
start_working_now: bool = True,
|
||||
confirm_now: bool = True,
|
||||
check_active_worker: bool = True,
|
||||
*args, **kwargs):
|
||||
|
@ -628,13 +628,8 @@ class Worker(NucypherTokenActor):
|
|||
if check_active_worker and not len(self.stakes):
|
||||
raise self.DetachedWorker(f"{self.__worker_address} is not bonded to {self.checksum_address}.")
|
||||
|
||||
self.period_tracker = period_tracker or PeriodTracker(registry=self.registry)
|
||||
self.period_tracker.add_action(self._confirm_period)
|
||||
self.stakes.start_tracking(self.period_tracker)
|
||||
|
||||
if confirm_now:
|
||||
self.confirm_activity()
|
||||
if start_working_loop:
|
||||
self.period_tracker = work_tracker or WorkTracker(worker=self)
|
||||
if start_working_now:
|
||||
self.period_tracker.start(act_now=False)
|
||||
|
||||
@property
|
||||
|
@ -649,27 +644,6 @@ class Worker(NucypherTokenActor):
|
|||
receipt = self.staking_agent.confirm_activity(worker_address=self.__worker_address)
|
||||
return receipt
|
||||
|
||||
@only_me
|
||||
def _confirm_period(self) -> None:
|
||||
interval = self.staking_agent.get_current_period() - self.last_active_period
|
||||
|
||||
# TODO: Check for stake expiration and exit
|
||||
if interval < 0:
|
||||
return # No need to confirm this period. Save the gas.
|
||||
|
||||
if interval > 0:
|
||||
# TODO: Follow-up actions for downtime
|
||||
self.log.warn(f"MISSED CONFIRMATIONS - {interval} missed staking confirmations detected.")
|
||||
|
||||
#
|
||||
# Confirm
|
||||
#
|
||||
|
||||
self.log.info("Confirmed activity for period {}".format(self.period_tracker.current_period))
|
||||
transacting_power = self.staking_agent.blockchain.transacting_power
|
||||
with transacting_power:
|
||||
self.confirm_activity() # < --- blockchain WRITE
|
||||
|
||||
|
||||
class BlockchainPolicyAuthor(NucypherTokenActor):
|
||||
"""Alice base class for blockchain operations, mocking up new policies!"""
|
||||
|
|
|
@ -165,7 +165,7 @@ class Web3Client:
|
|||
|
||||
def unlock_account(self, address, password, duration=None) -> bool:
|
||||
if not self.is_local:
|
||||
return self.unlock_account(address, password, duration=duration)
|
||||
return self.unlock_account(address, password)
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
|
@ -315,7 +315,7 @@ class GethClient(Web3Client):
|
|||
if password is None:
|
||||
debug_message += " with no password."
|
||||
self.log.debug(debug_message)
|
||||
return self.w3.geth.personal.unlockAccount(address, password, duration=duration)
|
||||
return self.w3.geth.personal.unlockAccount(address, password, duration)
|
||||
|
||||
def sign_transaction(self, transaction: dict) -> bytes:
|
||||
|
||||
|
@ -383,7 +383,7 @@ class EthereumTesterClient(Web3Client):
|
|||
else:
|
||||
return self.w3.provider.ethereum_tester.unlock_account(account=address,
|
||||
password=password,
|
||||
duration=duration)
|
||||
unlock_seconds=duration)
|
||||
|
||||
def sync(self, *args, **kwargs):
|
||||
return True
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
from _pydecimal import Decimal
|
||||
from collections import UserList
|
||||
from typing import Union, Tuple, Callable, List
|
||||
from typing import Union, Tuple, List
|
||||
|
||||
import maya
|
||||
from constant_sorrow.constants import (
|
||||
NEW_STAKE,
|
||||
NO_STAKING_RECEIPT,
|
||||
NOT_STAKING,
|
||||
UNKNOWN_STAKES,
|
||||
NO_STAKES,
|
||||
EMPTY_STAKING_SLOT,
|
||||
UNKNOWN_WORKER_STATUS
|
||||
)
|
||||
|
@ -450,45 +448,35 @@ class Stake:
|
|||
return stake
|
||||
|
||||
|
||||
class PeriodTracker:
|
||||
class WorkTracker:
|
||||
|
||||
CLOCK = reactor
|
||||
REFRESH_RATE = 60 * 60 # one hour.
|
||||
__actions = list() # type: List[Tuple[Callable, tuple]]
|
||||
REFRESH_RATE = 60 * 15 # Fifteen minutes
|
||||
|
||||
def __init__(self, registry: BaseContractRegistry, refresh_rate: int = None, *args, **kwargs):
|
||||
def __init__(self, worker, refresh_rate: int = None, *args, **kwargs):
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
self.log = Logger('stake-tracker')
|
||||
|
||||
self.staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=registry)
|
||||
self.worker = worker
|
||||
self.staking_agent = self.worker.staking_agent
|
||||
|
||||
self._refresh_rate = refresh_rate or self.REFRESH_RATE
|
||||
self._tracking_task = task.LoopingCall(self.__update)
|
||||
self._tracking_task = task.LoopingCall(self._do_work)
|
||||
self._tracking_task.clock = self.CLOCK
|
||||
|
||||
self.__current_period = None
|
||||
self.__start_time = NOT_STAKING
|
||||
self.__uptime_period = NOT_STAKING
|
||||
self._abort_on_stake_tracking_error = True
|
||||
self._abort_on_error = True
|
||||
|
||||
@property
|
||||
def current_period(self):
|
||||
return self.__current_period
|
||||
|
||||
def perform_actions(self) -> None:
|
||||
for action, args in self.__actions:
|
||||
self.log.info(f"Performing action: '{str(action.__name__)}'")
|
||||
action(*args)
|
||||
|
||||
def add_action(self, func: Callable, args=()) -> None:
|
||||
self.__actions.append((func, args))
|
||||
|
||||
def clear_actions(self) -> None:
|
||||
self.__actions.clear()
|
||||
|
||||
def stop(self) -> None:
|
||||
self._tracking_task.stop()
|
||||
self.log.info(f"STOPPED STAKE TRACKING")
|
||||
self.log.info(f"STOPPED WORK TRACKING")
|
||||
|
||||
def start(self, act_now: bool = False, force: bool = False) -> None:
|
||||
"""
|
||||
|
@ -505,11 +493,11 @@ class PeriodTracker:
|
|||
self.__current_period = self.__uptime_period
|
||||
|
||||
d = self._tracking_task.start(interval=self._refresh_rate)
|
||||
d.addErrback(self.handle_tracking_errors)
|
||||
self.log.info(f"STARTED PERIOD TRACKING with {len(self.__actions)} registered actions.")
|
||||
d.addErrback(self.handle_working_errors)
|
||||
self.log.info(f"STARTED WORK TRACKING")
|
||||
|
||||
if act_now:
|
||||
self.perform_actions()
|
||||
self._do_work()
|
||||
|
||||
def _crash_gracefully(self, failure=None) -> None:
|
||||
"""
|
||||
|
@ -519,20 +507,37 @@ class PeriodTracker:
|
|||
self._crashed = failure
|
||||
failure.raiseException()
|
||||
|
||||
def handle_tracking_errors(self, *args, **kwargs) -> None:
|
||||
def handle_working_errors(self, *args, **kwargs) -> None:
|
||||
failure = args[0]
|
||||
if self._abort_on_stake_tracking_error:
|
||||
self.log.critical(f"Unhandled error during node stake tracking. {failure}")
|
||||
if self._abort_on_error:
|
||||
self.log.critical(f"Unhandled error during node work tracking. {failure}")
|
||||
reactor.callFromThread(self._crash_gracefully, failure=failure)
|
||||
else:
|
||||
self.log.warn(f"Unhandled error during stake tracking: {failure.getTraceback()}")
|
||||
self.log.warn(f"Unhandled error during work tracking: {failure.getTraceback()}")
|
||||
|
||||
def __update(self) -> None:
|
||||
def _do_work(self) -> None:
|
||||
# TODO: Check for stake expiration and exit
|
||||
# TODO: Follow-up actions for downtime
|
||||
|
||||
# Update on-chain status
|
||||
self.log.info(f"Checking for new period. Current period is {self.__current_period}")
|
||||
onchain_period = self.staking_agent.get_current_period() # < -- Read from contract
|
||||
if self.__current_period != onchain_period:
|
||||
self.perform_actions()
|
||||
if self.current_period != onchain_period:
|
||||
self.__current_period = onchain_period
|
||||
# self.worker.stakes.refresh() # TODO: Track stakes
|
||||
|
||||
# Measure working interval
|
||||
interval = onchain_period - self.worker.last_active_period
|
||||
if interval < 0:
|
||||
return # No need to confirm this period. Save the gas.
|
||||
if interval > 0:
|
||||
self.log.warn(f"MISSED CONFIRMATIONS - {interval} missed staking confirmations detected.")
|
||||
|
||||
# Confirm Activity
|
||||
self.log.info("Confirmed activity for period {}".format(self.current_period))
|
||||
transacting_power = self.worker.transacting_power
|
||||
with transacting_power:
|
||||
self.worker.confirm_activity() # < --- blockchain WRITE
|
||||
|
||||
|
||||
class StakeList(UserList):
|
||||
|
@ -556,10 +561,6 @@ class StakeList(UserList):
|
|||
self.checksum_address = checksum_address
|
||||
self.__updated = None
|
||||
|
||||
def start_tracking(self, period_tracker: PeriodTracker) -> None:
|
||||
period_tracker.add_action(self.__read_stakes)
|
||||
self.log.info(f"STARTED STAKE TRACKING for {self.checksum_address}")
|
||||
|
||||
@property
|
||||
def updated(self) -> maya.MayaDT:
|
||||
return self.__updated
|
||||
|
|
|
@ -48,7 +48,7 @@ from nucypher.blockchain.eth.agents import StakingEscrowAgent, NucypherTokenAgen
|
|||
from nucypher.blockchain.eth.decorators import validate_checksum_address
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory
|
||||
from nucypher.blockchain.eth.registry import BaseContractRegistry
|
||||
from nucypher.blockchain.eth.token import StakeList, PeriodTracker, NU
|
||||
from nucypher.blockchain.eth.token import StakeList, WorkTracker, NU
|
||||
from nucypher.characters.banners import ALICE_BANNER, BOB_BANNER, ENRICO_BANNER, URSULA_BANNER, STAKEHOLDER_BANNER
|
||||
from nucypher.characters.base import Character, Learner
|
||||
from nucypher.characters.control.controllers import (
|
||||
|
@ -795,7 +795,7 @@ class Ursula(Teacher, Character, Worker):
|
|||
decentralized_identity_evidence: bytes = constants.NOT_SIGNED,
|
||||
checksum_address: str = None, # Staker address
|
||||
worker_address: str = None,
|
||||
period_tracker: PeriodTracker = None,
|
||||
work_tracker: WorkTracker = None,
|
||||
client_password: str = None,
|
||||
|
||||
# Character
|
||||
|
@ -843,8 +843,8 @@ class Ursula(Teacher, Character, Worker):
|
|||
#
|
||||
if not federated_only:
|
||||
# Prepare a TransactingPower from worker node's transacting keys
|
||||
transacting_power = TransactingPower(account=worker_address, password=client_password, cache=True)
|
||||
self._crypto_power.consume_power_up(transacting_power)
|
||||
self.transacting_power = TransactingPower(account=worker_address, password=client_password, cache=True)
|
||||
self._crypto_power.consume_power_up(self.transacting_power)
|
||||
|
||||
# Use this power to substantiate the stamp
|
||||
self.substantiate_stamp()
|
||||
|
@ -856,7 +856,7 @@ class Ursula(Teacher, Character, Worker):
|
|||
registry=self.registry,
|
||||
checksum_address=checksum_address,
|
||||
worker_address=worker_address,
|
||||
period_tracker=period_tracker)
|
||||
work_tracker=work_tracker)
|
||||
|
||||
#
|
||||
# ProxyRESTServer and TLSHostingPower #
|
||||
|
|
|
@ -142,20 +142,20 @@ class TransactingPower(CryptoPowerUp):
|
|||
|
||||
def activate(self, password: str = None):
|
||||
"""Be Consumed"""
|
||||
self.unlock_account(password=password or self.__password)
|
||||
if not self.__cache:
|
||||
self.unlock_account(password=password)
|
||||
if self.__cache is False:
|
||||
self.__password = None
|
||||
self.blockchain.transacting_power = self
|
||||
|
||||
def lock_account(self):
|
||||
if self.device:
|
||||
# TODO: Force Disconnect Devices?
|
||||
pass
|
||||
pass # TODO: Force Disconnect Devices?
|
||||
else:
|
||||
_result = self.blockchain.client.lock_account(address=self.account)
|
||||
self.__unlocked = False
|
||||
|
||||
def unlock_account(self, password: str = None, duration: int = None):
|
||||
password = password or self.__password
|
||||
if self.device:
|
||||
unlocked = True
|
||||
else:
|
||||
|
|
|
@ -22,7 +22,7 @@ from nucypher.blockchain.eth.actors import Staker
|
|||
from nucypher.blockchain.eth.agents import StakingEscrowAgent
|
||||
from nucypher.blockchain.eth.interfaces import BlockchainInterface
|
||||
from nucypher.blockchain.eth.registry import BaseContractRegistry
|
||||
from nucypher.blockchain.eth.token import PeriodTracker
|
||||
from nucypher.blockchain.eth.token import WorkTracker
|
||||
from nucypher.characters.lawful import Ursula
|
||||
from nucypher.config.characters import UrsulaConfiguration
|
||||
from nucypher.crypto.powers import TransactingPower
|
||||
|
@ -80,15 +80,11 @@ def make_decentralized_ursulas(ursula_config: UrsulaConfiguration,
|
|||
stakers_and_workers = zip(stakers_addresses, workers_addresses)
|
||||
ursulas = list()
|
||||
|
||||
registry = ursula_config.registry
|
||||
period_tracker = PeriodTracker(registry=registry)
|
||||
|
||||
for port, (staker_address, worker_address) in enumerate(stakers_and_workers, start=starting_port):
|
||||
ursula = ursula_config.produce(checksum_address=staker_address,
|
||||
worker_address=worker_address,
|
||||
db_filepath=MOCK_URSULA_DB_FILEPATH,
|
||||
rest_port=port + 100,
|
||||
period_tracker=period_tracker,
|
||||
**ursula_overrides)
|
||||
if confirm_activity:
|
||||
ursula.confirm_activity()
|
||||
|
|
|
@ -3,7 +3,7 @@ import pytest_twisted
|
|||
from twisted.internet import threads
|
||||
from twisted.internet.task import Clock
|
||||
|
||||
from nucypher.blockchain.eth.token import NU, PeriodTracker
|
||||
from nucypher.blockchain.eth.token import NU, WorkTracker
|
||||
from nucypher.crypto.powers import TransactingPower
|
||||
from nucypher.utilities.sandbox.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
from nucypher.utilities.sandbox.ursula import make_decentralized_ursulas, start_pytest_ursula_services
|
||||
|
@ -31,7 +31,7 @@ def test_worker_auto_confirmations(testerchain,
|
|||
|
||||
# Control time
|
||||
clock = Clock()
|
||||
PeriodTracker.CLOCK = clock
|
||||
WorkTracker.CLOCK = clock
|
||||
|
||||
# Bond the Worker and Staker
|
||||
staker.set_worker(worker_address=worker_address)
|
||||
|
@ -49,10 +49,8 @@ def test_worker_auto_confirmations(testerchain,
|
|||
ursula.period_tracker.start()
|
||||
|
||||
def time_travel(_):
|
||||
# Advance one period, and two hours, somehow separately
|
||||
testerchain.time_travel(periods=2)
|
||||
two_hours = (60*60) * 2
|
||||
clock.advance(two_hours)
|
||||
testerchain.time_travel(periods=1)
|
||||
clock.advance(WorkTracker.REFRESH_RATE+1)
|
||||
|
||||
def verify(_):
|
||||
# Verify that periods were confirmed on-chain automatically
|
||||
|
|
|
@ -109,7 +109,7 @@ def test_collect_inflation_rewards(software_stakeholder, manual_worker, testerch
|
|||
worker = Worker(is_me=True,
|
||||
worker_address=manual_worker,
|
||||
checksum_address=stake.staker_address,
|
||||
start_working_loop=False,
|
||||
start_working_now=False,
|
||||
registry=test_registry)
|
||||
|
||||
# Mock TransactingPower consumption (Worker-Ursula)
|
||||
|
|
|
@ -35,7 +35,8 @@ NucypherClickConfig.log_to_file = True
|
|||
WebEmitter._crash_on_error_default = True
|
||||
|
||||
# Dont re-lock account in background during activity confirmations
|
||||
TransactingPower.lock_account = lambda: True
|
||||
LOCK_FUNCTION = TransactingPower.lock_account
|
||||
TransactingPower.lock_account = lambda *a, **k: True
|
||||
|
||||
|
||||
##########################################
|
||||
|
|
|
@ -7,6 +7,9 @@ from nucypher.crypto.api import verify_eip_191
|
|||
from nucypher.crypto.powers import (PowerUpError)
|
||||
from nucypher.crypto.powers import TransactingPower
|
||||
from nucypher.utilities.sandbox.constants import INSECURE_DEVELOPMENT_PASSWORD
|
||||
from tests.conftest import LOCK_FUNCTION
|
||||
|
||||
TransactingPower.lock_account = LOCK_FUNCTION
|
||||
|
||||
|
||||
def test_transacting_power_sign_message(testerchain):
|
||||
|
|
|
@ -57,6 +57,7 @@ def test_blockchain_ursula_stamp_verification_tolerance(blockchain_ursulas):
|
|||
|
||||
@pytest.mark.skip("See Issue #1075") # TODO: Issue #1075
|
||||
def test_invalid_workers_tolerance(testerchain,
|
||||
test_registry,
|
||||
blockchain_ursulas,
|
||||
agency,
|
||||
idle_staker,
|
||||
|
@ -106,8 +107,8 @@ def test_invalid_workers_tolerance(testerchain,
|
|||
# The worker is valid and can be verified (even with the force option)
|
||||
worker.verify_node(force=True, network_middleware=MockRestMiddleware(), certificate_filepath="quietorl")
|
||||
# In particular, we know that it's bonded to a staker who is really staking.
|
||||
assert worker._worker_is_bonded_to_staker()
|
||||
assert worker._staker_is_really_staking()
|
||||
assert worker._worker_is_bonded_to_staker(registry=test_registry)
|
||||
assert worker._staker_is_really_staking(registry=test_registry)
|
||||
|
||||
# OK. Now we learn about this worker.
|
||||
lonely_blockchain_learner.remember_node(worker)
|
||||
|
|
Loading…
Reference in New Issue