Removes discrete staker modules

pull/2867/head
Kieran Prasch 2022-02-09 12:00:39 -08:00 committed by Kieran R. Prasch
parent 2edc71fe96
commit d2852c9e71
13 changed files with 0 additions and 6254 deletions

View File

@ -1,49 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import click
from nucypher.blockchain.eth.actors import StakeHolder
from nucypher.blockchain.eth.token import Stake
from nucypher.cli.literature import PERIOD_ADVANCED_WARNING, SUCCESSFUL_STAKE_REMOVAL, CONFIRM_REMOVE_SUBSTAKE
from nucypher.cli.painting.staking import paint_stakes
from nucypher.cli.painting.transactions import paint_receipt_summary
def remove_inactive_substake(emitter,
stakeholder: StakeHolder,
action_period: int,
stake: Stake,
chain_name: str,
force: bool
) -> None:
# Non-interactive: Consistency check to prevent the above agreement from going stale.
last_second_current_period = stakeholder.staker.staking_agent.get_current_period()
if action_period != last_second_current_period:
emitter.echo(PERIOD_ADVANCED_WARNING, color='red')
raise click.Abort
if not force:
click.confirm(CONFIRM_REMOVE_SUBSTAKE.format(stake_index=stake.index), abort=True)
# Execute
receipt = stakeholder.staker.remove_inactive_stake(stake=stake)
# Report
emitter.echo(SUCCESSFUL_STAKE_REMOVAL, color='green', verbosity=1)
paint_receipt_summary(emitter=emitter, receipt=receipt, chain_name=chain_name)
paint_stakes(emitter=emitter, staker=stakeholder.staker)

File diff suppressed because it is too large Load Diff

View File

@ -1,302 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import maya
import tabulate
from typing import List
from web3.main import Web3
from nucypher.blockchain.eth.agents import ContractAgency, NucypherTokenAgent
from nucypher.blockchain.eth.constants import STAKING_ESCROW_CONTRACT_NAME, NULL_ADDRESS
from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory
from nucypher.blockchain.eth.token import NU, Stake
from nucypher.blockchain.eth.utils import datetime_at_period, estimate_block_number_for_period, prettify_eth_amount
from nucypher.control.emitters import StdoutEmitter
from nucypher.cli.literature import (
POST_STAKING_ADVICE,
TOKEN_REWARD_CURRENT,
TOKEN_REWARD_NOT_FOUND,
TOKEN_REWARD_PAST,
TOKEN_REWARD_PAST_HEADER
)
from nucypher.cli.painting.transactions import paint_receipt_summary
STAKE_TABLE_COLUMNS = ('Slot', 'Value', 'Remaining', 'Enactment', 'Termination', 'Boost', 'Status')
STAKER_TABLE_COLUMNS = ('Status', 'Restaking', 'Winding Down', 'Snapshots', 'Unclaimed Fees', 'Min fee rate')
REWARDS_TABLE_COLUMNS = ('Date', 'Block Number', 'Period', 'Value (NU)')
TOKEN_DECIMAL_PLACE = 5
def paint_all_stakes(emitter: StdoutEmitter,
stakeholder: 'StakeHolder',
paint_unlocked: bool = False) -> None:
stakers = stakeholder.get_stakers()
if not stakers:
emitter.echo("No staking accounts found.")
total_stakers = 0
for staker in stakers:
if not staker.stakes:
# This staker has no active stakes.
# TODO: Something with non-staking accounts?
continue
paint_stakes(emitter=emitter, staker=staker, paint_unlocked=paint_unlocked, stakeholder=stakeholder)
total_stakers += 1
if not total_stakers:
emitter.echo("No Stakes found", color='red')
def paint_stakes(emitter: StdoutEmitter,
staker: 'Staker',
stakes: List[Stake] = None,
paint_unlocked: bool = False,
stakeholder=None) -> None:
stakes = stakes or staker.sorted_stakes()
fees = staker.policy_agent.get_fee_amount(staker.checksum_address)
pretty_fees = prettify_eth_amount(fees)
last_committed = staker.staking_agent.get_last_committed_period(staker.checksum_address)
missing = staker.missing_commitments
min_fee_rate = prettify_eth_amount(staker.min_fee_rate)
if missing == -1:
missing_info = "Never Made a Commitment (New Stake)"
else:
missing_info = f'Missing {missing} commitments{"s" if missing > 1 else ""}' if missing else f'Committed #{last_committed}'
staker_data = [missing_info,
"Yes" if staker.is_restaking else "No",
"Yes" if bool(staker.is_winding_down) else "No",
"Yes" if bool(staker.is_taking_snapshots) else "No",
pretty_fees,
min_fee_rate]
line_width = 54
if staker.registry.source: # TODO: #1580 - Registry source might be Falsy in tests.
network_snippet = f"\nNetwork {staker.registry.source.network.capitalize()} "
snippet_with_line = network_snippet + ''*(line_width-len(network_snippet)+1)
emitter.echo(snippet_with_line, bold=True)
emitter.echo(f"Staker {staker.checksum_address} ════", bold=True, color='red' if missing else 'green')
worker = staker.worker_address if staker.worker_address != NULL_ADDRESS else 'not bonded'
emitter.echo(f"Worker {worker} ════", color='red' if staker.worker_address == NULL_ADDRESS else None)
if stakeholder and stakeholder.worker_data:
worker_data = stakeholder.worker_data.get(staker.checksum_address)
if worker_data:
emitter.echo(f"\t public address: {worker_data['publicaddress']}")
if worker_data.get('nucypher version'):
emitter.echo(f"\t NuCypher Version: {worker_data['nucypher version']}")
if worker_data.get('blockchain_provider'):
emitter.echo(f"\t Blockchain Provider: {worker_data['blockchain_provider']}")
emitter.echo(tabulate.tabulate(zip(STAKER_TABLE_COLUMNS, staker_data), floatfmt="fancy_grid"))
rows, inactive_substakes = list(), list()
for index, stake in enumerate(stakes):
is_inactive = False
if stake.status().is_child(Stake.Status.INACTIVE):
inactive_substakes.append(index)
is_inactive = True
if stake.status().is_child(Stake.Status.UNLOCKED) and not paint_unlocked:
# This stake is unlocked.
continue
stake_description = stake.describe()
if is_inactive:
# stake is inactive - update display values since they don't make much sense to display
stake_description['remaining'] = 'N/A'
stake_description['last_period'] = 'N/A'
stake_description['boost'] = 'N/A'
rows.append(list(stake_description.values()))
if not rows:
emitter.echo(f"There are no locked stakes\n")
emitter.echo(tabulate.tabulate(rows, headers=STAKE_TABLE_COLUMNS, tablefmt="fancy_grid")) # newline
if not paint_unlocked and inactive_substakes:
emitter.echo(f"Note that some sub-stakes are inactive: {inactive_substakes}\n"
f"Run `nucypher stake list --all` to show all sub-stakes.\n"
f"Run `nucypher stake remove-inactive --all` to remove inactive sub-stakes; removal of inactive "
f"sub-stakes will reduce commitment gas costs.", color='yellow')
# TODO - it would be nice to provide remove-inactive hint when painting_unlocked - however, this same function
# is used by remove-inactive command is run, and it is redundant to be shown then
def prettify_stake(stake, index: int = None) -> str:
start_datetime = stake.start_datetime.local_datetime().strftime("%b %d %H:%M %Z")
expiration_datetime = stake.unlock_datetime.local_datetime().strftime("%b %d %H:%M %Z")
duration = stake.duration
pretty_periods = f'{duration} periods {"." if len(str(duration)) == 2 else ""}'
pretty = f'| {index if index is not None else "-"} ' \
f'| {stake.staker_address[:6]} ' \
f'| {stake.index} ' \
f'| {str(stake.value)} ' \
f'| {pretty_periods} ' \
f'| {start_datetime} - {expiration_datetime} ' \
return pretty
def paint_staged_stake_division(emitter,
blockchain,
stakeholder,
original_stake,
target_value,
extension):
new_end_period = original_stake.final_locked_period + extension
new_duration = new_end_period - original_stake.first_locked_period + 1
staking_address = original_stake.staker_address
division_message = f"""
Staking address: {staking_address}
~ Original Stake: {prettify_stake(stake=original_stake, index=None)}
"""
paint_staged_stake(emitter=emitter,
blockchain=blockchain,
stakeholder=stakeholder,
staking_address=staking_address,
stake_value=target_value,
lock_periods=new_duration,
start_period=original_stake.first_locked_period,
unlock_period=new_end_period + 1,
division_message=division_message)
def paint_staged_stake(emitter,
blockchain,
stakeholder,
staking_address,
stake_value,
lock_periods,
start_period,
unlock_period,
division_message: str = None):
economics = stakeholder.staker.economics
start_datetime = datetime_at_period(period=start_period,
seconds_per_period=economics.seconds_per_period,
start_of_period=True)
unlock_datetime = datetime_at_period(period=unlock_period,
seconds_per_period=economics.seconds_per_period,
start_of_period=True)
locked_days = (lock_periods * economics.hours_per_period) // 24
start_datetime_pretty = start_datetime.local_datetime().strftime("%b %d %Y %H:%M %Z")
unlock_datetime_pretty = unlock_datetime.local_datetime().strftime("%b %d %Y %H:%M %Z")
if division_message:
emitter.echo(f"\n{'' * 30} ORIGINAL STAKE {'' * 28}", bold=True)
emitter.echo(division_message)
emitter.echo(f"\n{'' * 30} STAGED STAKE {'' * 30}", bold=True)
emitter.echo(f"""
Staking address: {staking_address}
~ Chain -> ID # {blockchain.client.chain_id} | {blockchain.client.chain_name}
~ Value -> {stake_value} ({int(stake_value)} NuNits)
~ Duration -> {locked_days} Days ({lock_periods} Periods)
~ Enactment -> {start_datetime_pretty} (period #{start_period})
~ Expiration -> {unlock_datetime_pretty} (period #{unlock_period})
""")
# TODO: periods != Days - Do we inform the user here?
emitter.echo(''*73, bold=True)
def paint_staking_confirmation(emitter, staker, receipt):
emitter.echo("\nStake initialization transaction was successful.", color='green')
emitter.echo(f'\nTransaction details:')
paint_receipt_summary(emitter=emitter, receipt=receipt, transaction_type="deposit stake")
emitter.echo(f'\n{STAKING_ESCROW_CONTRACT_NAME} address: {staker.staking_agent.contract_address}', color='blue')
emitter.echo(POST_STAKING_ADVICE, color='green')
def paint_staking_accounts(emitter, signer, registry, domain):
from nucypher.blockchain.eth.actors import Staker
rows = list()
blockchain = BlockchainInterfaceFactory.get_interface()
token_agent = ContractAgency.get_agent(NucypherTokenAgent, registry=registry)
for account in signer.accounts:
eth = str(Web3.fromWei(blockchain.client.get_balance(account), 'ether')) + " ETH"
nu = str(NU.from_units(token_agent.get_balance(account)))
staker = Staker(checksum_address=account,
domain=domain,
registry=registry)
staker.refresh_stakes()
is_staking = 'Yes' if bool(staker.stakes) else 'No'
rows.append((is_staking, account, eth, nu))
headers = ('Staking', 'Account', 'ETH', 'NU')
emitter.echo(tabulate.tabulate(rows, showindex=True, headers=headers, tablefmt="fancy_grid"))
def paint_staking_rewards(stakeholder, blockchain, emitter, past_periods, staking_address, staking_agent):
if not past_periods:
reward_amount = stakeholder.staker.calculate_staking_reward()
emitter.echo(message=TOKEN_REWARD_CURRENT.format(reward_amount=round(reward_amount, TOKEN_DECIMAL_PLACE)))
return
economics = stakeholder.staker.economics
seconds_per_period = economics.seconds_per_period
current_period = staking_agent.get_current_period()
from_period = current_period - past_periods
latest_block = blockchain.client.block_number
from_block = estimate_block_number_for_period(period=from_period,
seconds_per_period=seconds_per_period,
latest_block=latest_block)
argument_filters = {'staker': staking_address}
event_type = staking_agent.contract.events['Minted']
entries = event_type.getLogs(fromBlock=from_block,
toBlock='latest',
argument_filters=argument_filters)
rows = []
rewards_total = NU(0, 'NU')
for event_record in entries:
event_block_number = int(event_record['blockNumber'])
event_period = event_record['args']['period']
event_reward = NU(event_record['args']['value'], 'NuNit')
timestamp = blockchain.client.get_block(event_block_number).timestamp
event_date = maya.MayaDT(epoch=timestamp).local_datetime().strftime("%b %d %Y")
rows.append([
event_date,
event_block_number,
int(event_period),
round(event_reward, TOKEN_DECIMAL_PLACE),
])
rewards_total += event_reward
if not rows:
emitter.echo(TOKEN_REWARD_NOT_FOUND)
return
periods_as_days = economics.days_per_period * past_periods
emitter.echo(message=TOKEN_REWARD_PAST_HEADER.format(periods=past_periods, days=periods_as_days))
emitter.echo(tabulate.tabulate(rows, headers=REWARDS_TABLE_COLUMNS, tablefmt="fancy_grid"))
emitter.echo(message=TOKEN_REWARD_PAST.format(reward_amount=round(rewards_total, TOKEN_DECIMAL_PLACE)))

View File

@ -1,387 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import maya
import pytest
from eth_tester.exceptions import TransactionFailed
from nucypher.blockchain.eth.agents import NucypherTokenAgent, StakingEscrowAgent, ContractAgency
from nucypher.blockchain.eth.signers.software import Web3Signer
from nucypher.blockchain.eth.token import NU, Stake
from nucypher.blockchain.eth.utils import datetime_at_period
from nucypher.crypto.powers import TransactingPower
from tests.constants import FEE_RATE_RANGE, DEVELOPMENT_TOKEN_AIRDROP_AMOUNT
from tests.utils.blockchain import token_airdrop
from tests.utils.ursula import make_decentralized_ursulas
@pytest.mark.skip()
def test_staker_locking_tokens(testerchain, agency, staker, application_economics, test_registry):
token_agent = ContractAgency.get_agent(NucypherTokenAgent, registry=test_registry)
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
assert NU(application_economics.min_authorization, 'NuNit') < staker.token_balance, "Insufficient staker balance"
# Make sure staking handles existing token allowance
staker.token_agent.approve_transfer(1000000000, staking_agent.contract_address, staker.transacting_power)
staker.initialize_stake(amount=NU(application_economics.min_authorization, 'NuNit'),
# Lock the minimum amount of tokens
lock_periods=application_economics.min_operator_seconds)
# Verify that the escrow is "approved" to receive tokens
allowance = token_agent.contract.functions.allowance(
staker.checksum_address,
staking_agent.contract_address).call()
assert 0 == allowance
# Staking starts after one period
locked_tokens = staker.locked_tokens()
assert 0 == locked_tokens
locked_tokens = staker.locked_tokens(periods=1)
assert application_economics.min_authorization == locked_tokens
@pytest.mark.skip()
@pytest.mark.usefixtures("agency")
def test_staker_divides_stake(staker, application_economics):
stake_value = NU(application_economics.min_authorization * 5, 'NuNit')
new_stake_value = NU(application_economics.min_authorization * 2, 'NuNit')
stake_index = 0
duration = int(application_economics.min_operator_seconds)
staker.initialize_stake(amount=stake_value, lock_periods=duration)
stake = staker.stakes[stake_index + 1]
# Can't use additional periods and expiration together
with pytest.raises(ValueError):
staker.divide_stake(target_value=new_stake_value, stake=stake, additional_periods=2, expiration=maya.now())
staker.divide_stake(target_value=new_stake_value, stake=stake, additional_periods=2)
current_period = staker.staking_agent.get_current_period()
expected_old_stake = (current_period + 1, current_period + duration, stake_value - new_stake_value)
expected_new_stake = (current_period + 1, current_period + duration + 2, new_stake_value)
assert 3 == len(staker.stakes), 'A new stake was not added to this stakers stakes'
assert expected_old_stake == staker.stakes[stake_index + 1].to_stake_info(), 'Old stake values are invalid'
assert expected_new_stake == staker.stakes[stake_index + 2].to_stake_info(), 'New stake values are invalid'
# Provided stake must be part of current stakes
new_stake_value = NU.from_units(application_economics.min_authorization)
with pytest.raises(ValueError):
staker.divide_stake(target_value=new_stake_value, stake=stake, additional_periods=2)
stake = staker.stakes[stake_index + 1]
stake.index = len(staker.stakes)
with pytest.raises(ValueError):
staker.divide_stake(target_value=new_stake_value, stake=stake, additional_periods=2)
yet_another_stake_value = NU(application_economics.min_authorization, 'NuNit')
stake = staker.stakes[stake_index + 2]
# New expiration date must extend stake duration
origin_stake = stake
new_expiration = datetime_at_period(period=origin_stake.final_locked_period,
seconds_per_period=application_economics.seconds_per_period,
start_of_period=True)
with pytest.raises(ValueError):
staker.divide_stake(target_value=yet_another_stake_value, stake=stake, expiration=new_expiration)
new_expiration = datetime_at_period(period=origin_stake.final_locked_period + 2,
seconds_per_period=application_economics.seconds_per_period,
start_of_period=True)
staker.divide_stake(target_value=yet_another_stake_value, stake=stake, expiration=new_expiration)
expected_new_stake = (current_period + 1, current_period + duration + 2, new_stake_value)
expected_yet_another_stake = Stake(first_locked_period=current_period + 1,
final_locked_period=current_period + duration + 4,
value=yet_another_stake_value,
checksum_address=staker.checksum_address,
index=3,
staking_agent=staker.application_agent,
economics=application_economics)
assert 4 == len(staker.stakes), 'A new stake was not added after two stake divisions'
assert expected_old_stake == staker.stakes[
stake_index + 1].to_stake_info(), 'Old stake values are invalid after two stake divisions'
assert expected_new_stake == staker.stakes[
stake_index + 2].to_stake_info(), 'New stake values are invalid after two stake divisions'
assert expected_yet_another_stake.value == staker.stakes[stake_index + 3].value, 'Third stake values are invalid'
@pytest.mark.skip()
@pytest.mark.usefixtures("agency")
def test_staker_prolongs_stake(staker, application_economics):
stake_index = 0
origin_stake = staker.stakes[stake_index]
# Can't use additional periods and expiration together
new_expiration = datetime_at_period(period=origin_stake.final_locked_period + 3,
seconds_per_period=application_economics.seconds_per_period,
start_of_period=True)
with pytest.raises(ValueError):
staker.prolong_stake(stake=origin_stake, additional_periods=3, expiration=new_expiration)
staker.prolong_stake(stake=origin_stake, additional_periods=3)
stake = staker.stakes[stake_index]
assert stake.first_locked_period == origin_stake.first_locked_period
assert stake.final_locked_period == origin_stake.final_locked_period + 3
assert stake.value == origin_stake.value
# Provided stake must be part of current stakes
with pytest.raises(ValueError):
staker.prolong_stake(stake=origin_stake, additional_periods=2)
stake.index = len(staker.stakes)
with pytest.raises(ValueError):
staker.prolong_stake(stake=stake, additional_periods=2)
stake.index = stake_index
# New expiration date must extend stake duration
origin_stake = stake
new_expiration = datetime_at_period(period=origin_stake.final_locked_period,
seconds_per_period=application_economics.seconds_per_period,
start_of_period=True)
with pytest.raises(ValueError):
staker.prolong_stake(stake=origin_stake, expiration=new_expiration)
new_expiration = origin_stake.unlock_datetime
staker.prolong_stake(stake=origin_stake, expiration=new_expiration)
stake = staker.stakes[stake_index]
assert stake.first_locked_period == origin_stake.first_locked_period
assert stake.final_locked_period == origin_stake.final_locked_period + 1
assert stake.value == origin_stake.value
@pytest.mark.skip()
@pytest.mark.usefixtures("agency")
def test_staker_increases_stake(staker, application_economics):
stake_index = 0
origin_stake = staker.stakes[stake_index]
additional_amount = NU.from_units(application_economics.min_authorization // 100)
with pytest.raises(ValueError):
staker.increase_stake(stake=origin_stake)
# Can't use amount and entire balance flag together
with pytest.raises(ValueError):
staker.increase_stake(stake=origin_stake, amount=additional_amount, entire_balance=True)
staker.increase_stake(stake=origin_stake, amount=additional_amount)
stake = staker.stakes[stake_index]
assert stake.first_locked_period == origin_stake.first_locked_period
assert stake.final_locked_period == origin_stake.final_locked_period
assert stake.value == origin_stake.value + additional_amount
# Provided stake must be part of current stakes
with pytest.raises(ValueError):
staker.increase_stake(stake=origin_stake, amount=additional_amount)
stake.index = len(staker.stakes)
with pytest.raises(ValueError):
staker.increase_stake(stake=stake, amount=additional_amount)
stake.index = stake_index
# Try to increase again using entire balance
origin_stake = stake
balance = staker.token_balance
staker.increase_stake(stake=stake, entire_balance=True)
stake = staker.stakes[stake_index]
assert stake.first_locked_period == origin_stake.first_locked_period
assert stake.final_locked_period == origin_stake.final_locked_period
assert stake.value == origin_stake.value + balance
@pytest.mark.skip()
def test_staker_merges_stakes(agency, staker):
stake_index_1 = 0
stake_index_2 = 3
origin_stake_1 = staker.stakes[stake_index_1]
origin_stake_2 = staker.stakes[stake_index_2]
assert origin_stake_2.final_locked_period == origin_stake_1.final_locked_period
staker.merge_stakes(stake_1=origin_stake_1, stake_2=origin_stake_2)
stake = staker.stakes[stake_index_1]
assert stake.final_locked_period == origin_stake_1.final_locked_period
assert stake.value == origin_stake_1.value + origin_stake_2.value
# Provided stakes must be part of current stakes
with pytest.raises(ValueError):
staker.merge_stakes(stake_1=origin_stake_1, stake_2=stake)
with pytest.raises(ValueError):
staker.merge_stakes(stake_1=stake, stake_2=origin_stake_2)
stake.index = len(staker.stakes)
with pytest.raises(ValueError):
staker.merge_stakes(stake_1=stake, stake_2=staker.stakes[1])
with pytest.raises(ValueError):
staker.merge_stakes(stake_1=staker.stakes[1], stake_2=stake)
@pytest.mark.skip()
def test_remove_inactive_stake(agency, staker):
stake_index = 3
staker.refresh_stakes()
original_stakes = list(staker.stakes)
unused_stake = original_stakes[stake_index]
assert unused_stake.final_locked_period == 1
staker.remove_inactive_stake(stake=unused_stake)
stakes = staker.stakes
assert stakes == original_stakes[:-1]
@pytest.mark.skip()
def test_staker_manages_restaking(testerchain, test_registry, staker):
# Enable Restaking
receipt = staker.enable_restaking()
assert receipt['status'] == 1
receipt = staker.disable_restaking()
assert receipt['status'] == 1
@pytest.mark.skip()
def test_staker_collects_staking_reward(testerchain,
test_registry,
staker,
blockchain_ursulas,
agency,
application_economics,
ursula_decentralized_test_config):
token_agent = ContractAgency.get_agent(NucypherTokenAgent, registry=test_registry)
tpower = TransactingPower(account=testerchain.etherbase_account,
signer=Web3Signer(testerchain.client))
# Give more tokens to staker
token_airdrop(token_agent=token_agent,
transacting_power=tpower,
addresses=[staker.checksum_address],
amount=DEVELOPMENT_TOKEN_AIRDROP_AMOUNT)
staker.initialize_stake(amount=NU(application_economics.min_authorization, 'NuNit'), # Lock the minimum amount of tokens
lock_periods=int(application_economics.min_operator_seconds)) # ... for the fewest number of periods
# Get an unused address for a new worker
worker_address = testerchain.unassigned_accounts[-1]
staker.bond_worker(worker_address=worker_address)
# Create this worker and bond it with the staker
ursula = make_decentralized_ursulas(ursula_config=ursula_decentralized_test_config,
stakers_addresses=[staker.checksum_address],
workers_addresses=[worker_address],
registry=test_registry,
commit_now=False).pop()
# ...mint few tokens...
for _ in range(2):
ursula.commit_to_next_period()
testerchain.time_travel(periods=1)
# Check mintable periods
assert staker.mintable_periods() == 1
ursula.commit_to_next_period()
# ...wait more...
assert staker.mintable_periods() == 0
testerchain.time_travel(periods=2)
assert staker.mintable_periods() == 2
# Capture the current token balance of the staker
initial_balance = staker.token_balance
assert token_agent.get_balance(staker.checksum_address) == initial_balance
# Profit!
staked = staker.non_withdrawable_stake()
owned = staker.owned_tokens()
staker.collect_staking_reward()
assert staker.owned_tokens() == staked
final_balance = staker.token_balance
assert final_balance == initial_balance + owned - staked
@pytest.mark.skip()
def test_staker_manages_winding_down(testerchain,
test_registry,
staker,
application_economics,
ursula_decentralized_test_config):
# Get worker
ursula = make_decentralized_ursulas(ursula_config=ursula_decentralized_test_config,
stakers_addresses=[staker.checksum_address],
workers_addresses=[staker.operator_address],
registry=test_registry).pop()
# Enable winding down
testerchain.time_travel(periods=1)
base_duration = application_economics.min_operator_seconds + 4
receipt = staker.enable_winding_down()
assert receipt['status'] == 1
assert staker.locked_tokens(base_duration) != 0
assert staker.locked_tokens(base_duration + 1) == 0
ursula.commit_to_next_period()
assert staker.locked_tokens(base_duration) != 0
assert staker.locked_tokens(base_duration + 1) == 0
# Disable winding down
testerchain.time_travel(periods=1)
receipt = staker.disable_winding_down()
assert receipt['status'] == 1
assert staker.locked_tokens(base_duration - 1) != 0
assert staker.locked_tokens(base_duration) == 0
ursula.commit_to_next_period()
assert staker.locked_tokens(base_duration - 1) != 0
assert staker.locked_tokens(base_duration) == 0
@pytest.mark.skip()
def test_staker_manages_snapshots(testerchain,
test_registry,
staker,
application_economics,
ursula_decentralized_test_config):
# Disable taking snapshots
testerchain.time_travel(periods=1)
assert staker.is_taking_snapshots
receipt = staker.disable_snapshots()
assert receipt['status'] == 1
assert not staker.is_taking_snapshots
# Enable taking snapshots
receipt = staker.enable_snapshots()
assert receipt['status'] == 1
assert staker.is_taking_snapshots
@pytest.mark.skip()
def test_set_min_fee_rate(testerchain, test_registry, staker):
# Check before set
_minimum, default, maximum = FEE_RATE_RANGE
assert staker.min_fee_rate == default
# New value must be within range
with pytest.raises((TransactionFailed, ValueError)):
staker.set_min_fee_rate(maximum + 1)
receipt = staker.set_min_fee_rate(maximum - 1)
assert receipt['status'] == 1
assert staker.min_fee_rate == maximum - 1

View File

@ -1,467 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import os
import pytest
from eth_utils.address import is_address, to_checksum_address
from nucypher.crypto.powers import TransactingPower
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent, NucypherTokenAgent
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.blockchain.eth.registry import BaseContractRegistry
from nucypher.blockchain.eth.signers.software import Web3Signer
from nucypher.types import StakerInfo
def test_unknown_contract(testerchain, test_registry):
with pytest.raises(BaseContractRegistry.UnknownContract) as exception:
_staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
assert exception.value.args[0] == StakingEscrowAgent.contract_name
@pytest.mark.skip()
def test_deposit_tokens(testerchain, agency, application_economics, test_registry):
token_agent = ContractAgency.get_agent(NucypherTokenAgent, registry=test_registry)
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
locked_tokens = application_economics.min_authorization * 5
staker_account = testerchain.unassigned_accounts[0]
balance = token_agent.get_balance(address=staker_account)
assert balance == 0
# The staker receives an initial amount of tokens
tpower = TransactingPower(account=testerchain.etherbase_account,
signer=Web3Signer(testerchain.client))
_txhash = token_agent.transfer(amount=application_economics.min_authorization * 10,
target_address=staker_account,
transacting_power=tpower)
#
# Deposit: The staker deposits tokens in the StakingEscrow contract.
# Previously, she needs to approve this transfer on the token contract.
#
staker_power = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
_receipt = token_agent.approve_transfer(amount=application_economics.min_authorization * 10, # Approve
spender_address=staking_agent.contract_address,
transacting_power=staker_power)
receipt = staking_agent.deposit_tokens(amount=locked_tokens,
lock_periods=application_economics.min_operator_seconds,
transacting_power=staker_power,
staker_address=staker_account)
# Check the receipt for the contract address success code
assert receipt['status'] == 1, "Transaction Rejected"
assert receipt['logs'][2]['address'] == staking_agent.contract_address
testerchain.time_travel(periods=1)
balance = token_agent.get_balance(address=staker_account)
assert balance == locked_tokens
assert staking_agent.get_locked_tokens(staker_address=staker_account) == locked_tokens
@pytest.mark.skip()
def test_locked_tokens(testerchain, agency, application_economics, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account = testerchain.unassigned_accounts[0]
locked_amount = staking_agent.get_locked_tokens(staker_address=staker_account)
assert application_economics.maximum_allowed_locked >= locked_amount >= application_economics.min_authorization
@pytest.mark.skip()
def test_get_all_stakes(testerchain, agency, application_economics, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account = testerchain.unassigned_accounts[0]
all_stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
assert len(all_stakes) == 1
stake_info = all_stakes[0]
assert len(stake_info) == 3
start_period, end_period, value = stake_info
assert end_period > start_period
assert application_economics.maximum_allowed_locked > value > application_economics.min_authorization
@pytest.mark.skip()
def test_stakers_and_workers_relationships(testerchain, agency, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
# The staker hasn't bond a worker yet
assert NULL_ADDRESS == staking_agent.get_worker_from_staker(staker_address=staker_account)
tpower = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
_txhash = staking_agent.bond_worker(transacting_power=tpower, operator_address=worker_account)
# We can check the staker-worker relation from both sides
assert worker_account == staking_agent.get_worker_from_staker(staker_address=staker_account)
assert staker_account == staking_agent.get_staker_from_worker(operator_address=worker_account)
# No staker-worker relationship
random_address = to_checksum_address(os.urandom(20))
assert NULL_ADDRESS == staking_agent.get_worker_from_staker(staker_address=random_address)
assert NULL_ADDRESS == staking_agent.get_staker_from_worker(operator_address=random_address)
@pytest.mark.skip()
def test_get_staker_population(agency, staking_providers, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
# Apart from all the stakers in the fixture, we also added a new staker above
assert staking_agent.get_staker_population() == len(staking_providers) + 1
@pytest.mark.skip()
def test_get_swarm(agency, blockchain_ursulas, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
swarm = staking_agent.swarm()
swarm_addresses = list(swarm)
assert len(swarm_addresses) == len(blockchain_ursulas) + 1
# Grab a staker address from the swarm
staker_addr = swarm_addresses[0]
assert isinstance(staker_addr, str)
assert is_address(staker_addr)
@pytest.mark.skip()
@pytest.mark.usefixtures("blockchain_ursulas")
def test_sample_stakers(agency, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
stakers_population = staking_agent.get_staker_population()
with pytest.raises(StakingEscrowAgent.NotEnoughStakers):
staking_agent.get_stakers_reservoir(periods=1).draw(stakers_population + 1) # One more than we have deployed
stakers = staking_agent.get_stakers_reservoir(periods=5).draw(3)
assert len(stakers) == 3 # Three...
assert len(set(stakers)) == 3 # ...unique addresses
# Same but with pagination
stakers = staking_agent.get_stakers_reservoir(periods=5, pagination_size=1).draw(3)
assert len(stakers) == 3
assert len(set(stakers)) == 3
light = staking_agent.blockchain.is_light
staking_agent.blockchain.is_light = not light
stakers = staking_agent.get_stakers_reservoir(periods=5).draw(3)
assert len(stakers) == 3
assert len(set(stakers)) == 3
staking_agent.blockchain.is_light = light
@pytest.mark.skip()
def test_get_current_period(agency, testerchain, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
start_period = staking_agent.get_current_period()
testerchain.time_travel(periods=1)
end_period = staking_agent.get_current_period()
assert end_period > start_period
@pytest.mark.skip()
def test_commit_to_next_period(agency, testerchain, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
tpower = TransactingPower(account=worker_account, signer=Web3Signer(testerchain.client))
txhash = staking_agent.commit_to_next_period(transacting_power=tpower)
receipt = testerchain.wait_for_receipt(txhash)
assert receipt['status'] == 1, "Transaction Rejected"
assert receipt['logs'][0]['address'] == staking_agent.contract_address
@pytest.mark.skip()
def test_get_staker_info(agency, testerchain, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
info: StakerInfo = staking_agent.get_staker_info(staker_address=staker_account)
assert info.value > 0
assert info.worker == worker_account
@pytest.mark.skip()
def test_divide_stake(agency, testerchain, application_economics, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
agent = staking_agent
staker_account = testerchain.unassigned_accounts[0]
locked_tokens = application_economics.min_authorization * 2
# Deposit
tpower = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
_txhash = agent.deposit_tokens(amount=locked_tokens,
lock_periods=application_economics.min_operator_seconds,
transacting_power=tpower,
staker_address=staker_account)
stakes = list(agent.get_all_stakes(staker_address=staker_account))
stakes_length = len(stakes)
origin_stake = stakes[-1]
receipt = agent.divide_stake(transacting_power=tpower,
stake_index=1,
target_value=application_economics.min_authorization,
periods=1)
assert receipt['status'] == 1, "Transaction Rejected"
assert receipt['logs'][0]['address'] == agent.contract_address
stakes = list(agent.get_all_stakes(staker_address=staker_account))
assert len(stakes) == stakes_length + 1
assert stakes[-2].locked_value == origin_stake.locked_value - application_economics.min_authorization
assert stakes[-2].last_period == origin_stake.last_period
assert stakes[-1].locked_value == application_economics.min_authorization
assert stakes[-1].last_period == origin_stake.last_period + 1
@pytest.mark.skip()
def test_prolong_stake(agency, testerchain, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
original_termination = stakes[0].last_period
tpower = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
receipt = staking_agent.prolong_stake(transacting_power=tpower, stake_index=0, periods=1)
assert receipt['status'] == 1
# Ensure stake was extended by one period.
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
new_termination = stakes[0].last_period
assert new_termination == original_termination + 1
@pytest.mark.skip()
def test_deposit_and_increase(agency, testerchain, test_registry, application_economics):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
original_stake = stakes[0]
locked_tokens = staking_agent.get_locked_tokens(staker_account, 1)
amount = application_economics.min_authorization // 2
tpower = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
receipt = staking_agent.deposit_and_increase(transacting_power=tpower,
stake_index=0,
amount=amount)
assert receipt['status'] == 1
# Ensure stake was extended by one period.
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
new_stake = stakes[0]
assert new_stake.locked_value == original_stake.locked_value + amount
assert staking_agent.get_locked_tokens(staker_account, 1) == locked_tokens + amount
@pytest.mark.skip()
def test_disable_restaking(agency, testerchain, test_registry):
staker_account, worker_account, *other = testerchain.unassigned_accounts
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
tpower = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
assert staking_agent.is_restaking(staker_account)
receipt = staking_agent.set_restaking(transacting_power=tpower, value=False)
assert receipt['status'] == 1, "Transaction Rejected"
assert not staking_agent.is_restaking(staker_account)
@pytest.mark.skip()
def test_collect_staking_reward(agency, testerchain, test_registry):
token_agent = ContractAgency.get_agent(NucypherTokenAgent, registry=test_registry)
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
# Commit to next period
testerchain.time_travel(periods=1)
tpower = TransactingPower(account=worker_account, signer=Web3Signer(testerchain.client))
staking_agent.commit_to_next_period(transacting_power=tpower)
testerchain.time_travel(periods=2)
# Mint
staker_power = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
_receipt = staking_agent.mint(transacting_power=staker_power)
old_balance = token_agent.get_balance(address=staker_account)
owned_tokens = staking_agent.owned_tokens(staker_address=staker_account)
staked = staking_agent.non_withdrawable_stake(staker_address=staker_account)
receipt = staking_agent.collect_staking_reward(transacting_power=staker_power)
assert receipt['status'] == 1, "Transaction Rejected"
assert receipt['logs'][-1]['address'] == staking_agent.contract_address
new_balance = token_agent.get_balance(address=staker_account) # not the shoes
assert new_balance == old_balance + owned_tokens - staked
assert staking_agent.owned_tokens(staker_address=staker_account) == staked
@pytest.mark.skip()
def test_winding_down(agency, testerchain, test_registry, application_economics):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
duration = application_economics.min_operator_seconds + 1
worker_power = TransactingPower(account=worker_account, signer=Web3Signer(testerchain.client))
def check_last_period():
assert staking_agent.get_locked_tokens(staker_account, duration) != 0, "Sub-stake is already unlocked"
assert staking_agent.get_locked_tokens(staker_account, duration + 1) == 0, "Sub-stake is still locked"
assert not staking_agent.is_winding_down(staker_account)
check_last_period()
staking_agent.commit_to_next_period(transacting_power=worker_power)
check_last_period()
# Examine the last periods of sub-stakes
staker_power = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
testerchain.time_travel(periods=1)
check_last_period()
receipt = staking_agent.set_winding_down(transacting_power=staker_power, value=True)
assert receipt['status'] == 1
assert staking_agent.is_winding_down(staker_account)
check_last_period()
staking_agent.commit_to_next_period(transacting_power=worker_power)
check_last_period()
testerchain.time_travel(periods=1)
duration -= 1
check_last_period()
receipt = staking_agent.set_winding_down(transacting_power=staker_power, value=False)
assert receipt['status'] == 1
assert not staking_agent.is_winding_down(staker_account)
check_last_period()
staking_agent.commit_to_next_period(transacting_power=worker_power)
check_last_period()
@pytest.mark.skip()
def test_lock_and_create(agency, testerchain, test_registry, application_economics):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
staker_power = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
stakes_length = len(stakes)
current_locked_tokens = staking_agent.get_locked_tokens(staker_account, 0)
next_locked_tokens = staking_agent.get_locked_tokens(staker_account, 1)
amount = application_economics.min_authorization
receipt = staking_agent.lock_and_create(transacting_power=staker_power,
lock_periods=application_economics.min_operator_seconds,
amount=amount)
assert receipt['status'] == 1
# Ensure stake was extended by one period.
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
assert len(stakes) == stakes_length + 1
new_stake = stakes[-1]
current_period = staking_agent.get_current_period()
assert new_stake.last_period == current_period + application_economics.min_operator_seconds
assert new_stake.first_period == current_period + 1
assert new_stake.locked_value == amount
assert staking_agent.get_locked_tokens(staker_account, 1) == next_locked_tokens + amount
assert staking_agent.get_locked_tokens(staker_account, 0) == current_locked_tokens
@pytest.mark.skip()
def test_lock_and_increase(agency, testerchain, test_registry, application_economics):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account, worker_account, *other = testerchain.unassigned_accounts
staker_power = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
original_stake = stakes[0]
current_locked_tokens = staking_agent.get_locked_tokens(staker_account, 0)
next_locked_tokens = staking_agent.get_locked_tokens(staker_account, 1)
amount = staking_agent.calculate_staking_reward(staker_address=staker_account)
receipt = staking_agent.lock_and_increase(transacting_power=staker_power,
stake_index=0,
amount=amount)
assert receipt['status'] == 1
# Ensure stake was extended by one period.
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
new_stake = stakes[0]
assert new_stake.locked_value == original_stake.locked_value + amount
assert staking_agent.get_locked_tokens(staker_account, 1) == next_locked_tokens + amount
assert staking_agent.get_locked_tokens(staker_account, 0) == current_locked_tokens
@pytest.mark.skip()
def test_merge(agency, testerchain, test_registry, application_economics):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account = testerchain.unassigned_accounts[0]
staker_power = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
original_stake_1 = stakes[0]
original_stake_2 = stakes[2]
assert original_stake_1.last_period == original_stake_2.last_period
current_locked_tokens = staking_agent.get_locked_tokens(staker_account, 0)
next_locked_tokens = staking_agent.get_locked_tokens(staker_account, 1)
receipt = staking_agent.merge_stakes(transacting_power=staker_power,
stake_index_1=0,
stake_index_2=2)
assert receipt['status'] == 1
# Ensure stake was extended by one period.
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
new_stake = stakes[0]
assert new_stake.locked_value == original_stake_1.locked_value + original_stake_2.locked_value
assert staking_agent.get_locked_tokens(staker_account, 1) == next_locked_tokens
assert staking_agent.get_locked_tokens(staker_account, 0) == current_locked_tokens
@pytest.mark.skip()
def test_remove_inactive_stake(agency, testerchain, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
staker_account = testerchain.unassigned_accounts[0]
staker_power = TransactingPower(account=staker_account, signer=Web3Signer(testerchain.client))
testerchain.time_travel(periods=1)
staking_agent.mint(transacting_power=staker_power)
current_period = staking_agent.get_current_period()
original_stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
assert original_stakes[2].last_period == current_period - 1
current_locked_tokens = staking_agent.get_locked_tokens(staker_account, 0)
next_locked_tokens = staking_agent.get_locked_tokens(staker_account, 1)
receipt = staking_agent.remove_inactive_stake(transacting_power=staker_power, stake_index=2)
assert receipt['status'] == 1
# Ensure stake was extended by one period.
stakes = list(staking_agent.get_all_stakes(staker_address=staker_account))
assert len(stakes) == len(original_stakes) - 1
assert stakes[0] == original_stakes[0]
assert stakes[1] == original_stakes[1]
assert stakes[2] == original_stakes[4]
assert stakes[3] == original_stakes[3]
assert staking_agent.get_locked_tokens(staker_account, 1) == next_locked_tokens
assert staking_agent.get_locked_tokens(staker_account, 0) == current_locked_tokens

View File

@ -1,182 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from constant_sorrow import constants
from constant_sorrow.constants import BARE
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent
from nucypher.blockchain.eth.deployers import (DispatcherDeployer, StakingEscrowDeployer)
def test_staking_escrow_deployment(staking_escrow_deployer, deployment_progress, transacting_power):
deployment_receipts = staking_escrow_deployer.deploy(progress=deployment_progress,
deployment_mode=constants.FULL,
transacting_power=transacting_power)
# deployment steps must match expected number of steps
assert deployment_progress.num_steps == len(staking_escrow_deployer.deployment_steps) == len(deployment_receipts) == 2
for step in staking_escrow_deployer.deployment_steps:
assert deployment_receipts[step]['status'] == 1
def test_make_agent(staking_escrow_deployer, test_registry):
# Create a StakingEscrowAgent instance
staking_agent = staking_escrow_deployer.make_agent()
# Retrieve the StakingEscrowAgent singleton
same_staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
assert staking_agent == same_staking_agent
# Compare the contract address for equality
assert staking_agent.contract_address == same_staking_agent.contract_address
def test_staking_escrow_has_dispatcher(staking_escrow_deployer, testerchain, test_registry, transacting_power):
# Let's get the "bare" StakingEscrow contract (i.e., unwrapped, no dispatcher)
existing_bare_contract = testerchain.get_contract_by_name(registry=test_registry,
contract_name=staking_escrow_deployer.contract_name,
proxy_name=DispatcherDeployer.contract_name,
use_proxy_address=False)
# This contract shouldn't be accessible directly through the deployer or the agent
assert staking_escrow_deployer.contract_address != existing_bare_contract.address
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
assert staking_agent.contract_address != existing_bare_contract
# The wrapped contract, on the other hand, points to the bare one.
target = staking_escrow_deployer.contract.functions.target().call()
assert target == existing_bare_contract.address
def test_upgrade(testerchain, test_registry, application_economics, transacting_power, threshold_staking):
deployer = StakingEscrowDeployer(staking_interface=threshold_staking.address,
registry=test_registry,
economics=application_economics)
receipts = deployer.upgrade(ignore_deployed=True, confirmations=0, transacting_power=transacting_power)
for title, receipt in receipts.items():
assert receipt['status'] == 1
def test_rollback(testerchain, test_registry, transacting_power, threshold_staking):
deployer = StakingEscrowDeployer(staking_interface=threshold_staking.address, registry=test_registry)
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
current_target = staking_agent.contract.functions.target().call()
# Let's do one more upgrade
receipts = deployer.upgrade(ignore_deployed=True, confirmations=0, transacting_power=transacting_power)
for title, receipt in receipts.items():
assert receipt['status'] == 1
old_target = current_target
current_target = staking_agent.contract.functions.target().call()
assert current_target != old_target
# It's time to rollback.
receipt = deployer.rollback(transacting_power=transacting_power)
assert receipt['status'] == 1
new_target = staking_agent.contract.functions.target().call()
assert new_target != current_target
assert new_target == old_target
def test_deploy_bare_upgradeable_contract_deployment(testerchain,
test_registry,
application_economics,
transacting_power,
threshold_staking):
deployer = StakingEscrowDeployer(staking_interface=threshold_staking.address,
registry=test_registry,
economics=application_economics)
enrolled_names = list(test_registry.enrolled_names)
old_number_of_enrollments = enrolled_names.count(StakingEscrowDeployer.contract_name)
old_number_of_proxy_enrollments = enrolled_names.count(StakingEscrowDeployer._proxy_deployer.contract_name)
receipts = deployer.deploy(deployment_mode=BARE, ignore_deployed=True, transacting_power=transacting_power)
for title, receipt in receipts.items():
assert receipt['status'] == 1
enrolled_names = list(test_registry.enrolled_names)
new_number_of_enrollments = enrolled_names.count(StakingEscrowDeployer.contract_name)
new_number_of_proxy_enrollments = enrolled_names.count(StakingEscrowDeployer._proxy_deployer.contract_name)
# The principal contract was deployed.
assert new_number_of_enrollments == (old_number_of_enrollments + 1)
# The Dispatcher was not deployed.
assert new_number_of_proxy_enrollments == old_number_of_proxy_enrollments
def test_deployer_version_management(testerchain, test_registry, application_economics):
deployer = StakingEscrowDeployer(registry=test_registry, economics=application_economics)
untargeted_deployment = deployer.get_latest_enrollment()
latest_targeted_deployment = deployer.get_principal_contract()
proxy_deployer = deployer.get_proxy_deployer()
proxy_target = proxy_deployer.target_contract.address
assert latest_targeted_deployment.address == proxy_target
assert untargeted_deployment.address != latest_targeted_deployment.address
def test_manual_proxy_retargeting(testerchain, test_registry, application_economics, transacting_power):
deployer = StakingEscrowDeployer(registry=test_registry, economics=application_economics)
# Get Proxy-Direct
proxy_deployer = deployer.get_proxy_deployer()
# Re-Deploy Staking Escrow
old_target = proxy_deployer.target_contract.address
# Get the latest un-targeted contract from the registry
latest_deployment = deployer.get_latest_enrollment()
# Build retarget transaction (just for informational purposes)
transaction = deployer.retarget(transacting_power=transacting_power,
target_address=latest_deployment.address,
just_build_transaction=True,
confirmations=0)
assert transaction['to'] == proxy_deployer.contract.address
upgrade_function, _params = proxy_deployer.contract.decode_function_input(transaction['data']) # TODO: this only tests for ethtester
assert upgrade_function.fn_name == proxy_deployer.contract.functions.upgrade.fn_name
# Retarget, for real
receipt = deployer.retarget(transacting_power=transacting_power,
target_address=latest_deployment.address,
confirmations=0)
assert receipt['status'] == 1
# Check proxy targets
new_target = proxy_deployer.contract.functions.target().call()
assert old_target != new_target
assert new_target == latest_deployment.address
# Check address consistency
new_bare_contract = deployer.get_principal_contract()
assert new_bare_contract.address == latest_deployment.address == new_target

View File

@ -1,117 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from web3 import Web3
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent, NucypherTokenAgent
from nucypher.blockchain.eth.token import NU, Stake
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
@pytest.mark.skip()
def test_stake(testerchain, application_economics, agency, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
class FakeUrsula:
token_agent = ContractAgency.get_agent(NucypherTokenAgent, registry=test_registry)
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
burner_wallet = Web3().eth.account.create(INSECURE_DEVELOPMENT_PASSWORD)
checksum_address = burner_wallet.address
staking_agent = staking_agent
token_agent = token_agent
blockchain = testerchain
ursula = FakeUrsula()
stake = Stake(checksum_address=ursula.checksum_address,
first_locked_period=1,
final_locked_period=100,
value=NU(100, 'NU'),
index=0,
staking_agent=staking_agent,
economics=application_economics)
assert stake.value, 'NU' == NU(100, 'NU')
assert isinstance(stake.time_remaining(), int) # seconds
slang_remaining = stake.time_remaining(slang=True) # words
assert isinstance(slang_remaining, str)
@pytest.mark.skip()
def test_stake_equality(application_economics, get_random_checksum_address, mocker):
address = get_random_checksum_address()
a_different_address = get_random_checksum_address()
mock_agent = mocker.Mock(contract_address=a_different_address)
stake = Stake(checksum_address=address,
first_locked_period=1,
final_locked_period=2,
value=NU(100, 'NU'),
index=0,
staking_agent=mock_agent,
economics=application_economics)
assert stake == stake
duck_stake = mocker.Mock(index=0,
value=NU(100, 'NU'),
first_locked_period=1,
final_locked_period=2,
staker_address=address,
staking_agent=mock_agent)
assert stake == duck_stake
a_different_stake = Stake(checksum_address=address,
first_locked_period=0,
final_locked_period=2,
value=NU(100, 'NU'),
index=1,
staking_agent=mock_agent,
economics=application_economics)
assert stake != a_different_stake
undercover_agent = mocker.Mock(contract_address=address)
another_different_stake = Stake(checksum_address=a_different_address,
first_locked_period=1,
final_locked_period=2,
value=NU(100, 'NU'),
index=0,
staking_agent=undercover_agent,
economics=application_economics)
assert stake != another_different_stake
@pytest.mark.skip()
def test_stake_integration(staking_providers):
staker = list(staking_providers)[1]
stakes = staker.stakes
assert stakes
stake = stakes[0]
stake.sync()
blockchain_stakes = staker.application_agent.get_all_stakes(staker_address=staker.checksum_address)
stake_info = (stake.first_locked_period, stake.final_locked_period, int(stake.value))
published_stake_info = list(blockchain_stakes)[0]
assert stake_info == published_stake_info
assert stake_info == stake.to_stake_info()
assert stake.status() == Stake.Status.DIVISIBLE

View File

@ -1,153 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import json
import pytest
from nucypher.blockchain.eth.signers.software import Web3Signer
from nucypher.crypto.powers import TransactingPower
from nucypher.config.constants import TEMPORARY_DOMAIN
from nucypher.blockchain.eth.actors import Operator
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent
from nucypher.config.characters import StakeHolderConfiguration
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
@pytest.mark.skip()
def test_software_stakeholder_configuration(testerchain,
test_registry,
stakeholder_configuration,
stakeholder_config_file_location):
path = stakeholder_config_file_location
# Save the stakeholder JSON config
stakeholder_configuration.to_configuration_file(filepath=path)
with open(path, 'r') as file:
# Ensure file contents are serializable
contents = file.read()
first_config_contents = json.loads(contents)
# Destroy this stake holder, leaving only the configuration file behind
del stakeholder_configuration
# Restore StakeHolder instance from JSON config
stakeholder_config = StakeHolderConfiguration.from_configuration_file(filepath=path)
the_same_stakeholder = stakeholder_config.produce()
# Save the JSON config again
stakeholder_config.to_configuration_file(filepath=path, override=True)
with open(stakeholder_config.filepath, 'r') as file:
contents = file.read()
second_config_contents = json.loads(contents)
# Ensure the stakeholder was accurately restored from JSON config
assert first_config_contents == second_config_contents
@pytest.mark.skip()
def test_initialize_stake_with_existing_account(testerchain,
software_stakeholder,
stake_value,
application_economics,
test_registry):
assert len(software_stakeholder.staker.stakes) == 0
# No Stakes
with pytest.raises(IndexError):
_stake = software_stakeholder.staker.stakes[0]
# Really... there are no stakes.
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
assert staking_agent.get_staker_population() == 0
# Stake, deriving a new account with a password,
# sending tokens and ethers from the funding account
# to the staker's account, then initializing a new stake.
software_stakeholder.staker.initialize_stake(amount=stake_value,
lock_periods=application_economics.min_operator_seconds)
stake = software_stakeholder.staker.stakes[0]
# Wait for stake to begin
testerchain.time_travel(periods=1)
# Ensure the stakeholder is tracking the new staker and stake.
assert len(software_stakeholder.staker.stakes) == 1
# Ensure common stake perspective between stakeholder and stake
assert stake.value == stake_value
assert stake.duration == application_economics.min_operator_seconds
stakes = list(staking_agent.get_all_stakes(staker_address=stake.staker_address))
assert len(stakes) == 1
@pytest.mark.skip()
def test_divide_stake(software_stakeholder, application_economics, test_registry):
stake = software_stakeholder.staker.stakes[0]
target_value = application_economics.min_authorization
pre_divide_stake_value = stake.value
software_stakeholder.staker.divide_stake(stake=stake, additional_periods=10, target_value=target_value)
original_stake = software_stakeholder.staker.stakes[0]
new_stake = software_stakeholder.staker.stakes[-1]
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
stakes = list(staking_agent.get_all_stakes(staker_address=stake.staker_address))
assert len(stakes) == 2
assert new_stake.value == target_value
assert original_stake.value == (pre_divide_stake_value - target_value)
@pytest.mark.skip()
def test_bond_worker(software_stakeholder, manual_operator, test_registry):
software_stakeholder.staker.bond_worker(operator_address=manual_operator)
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
assert staking_agent.get_worker_from_staker(staker_address=software_stakeholder.checksum_address) == manual_operator
@pytest.mark.skip()
def test_collect_inflation_rewards(software_stakeholder, manual_operator, testerchain, test_registry):
# Get stake
stake = software_stakeholder.staker.stakes[1]
# Make bonded Operator
tpower = TransactingPower(account=manual_operator, signer=Web3Signer(testerchain.client))
tpower.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
worker = Operator(is_me=True,
transacting_power=tpower,
domain=TEMPORARY_DOMAIN,
operator_address=manual_operator,
registry=test_registry)
# Wait out stake lock periods, manually make a commitment once per period.
for period in range(stake.periods_remaining-1):
worker.commit_to_next_period()
testerchain.time_travel(periods=1)
# Collect the staking reward in NU.
result = software_stakeholder.staker.collect_staking_reward()
# TODO: Make Assertions reasonable for this layer.
# Consider recycling logic from test_collect_reward_integration CLI test.
assert result

View File

@ -1,853 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import json
import os
import random
import tempfile
from pathlib import Path
from unittest import mock
import maya
import pytest
from web3 import Web3
from nucypher.blockchain.eth.actors import Staker
from nucypher.blockchain.eth.agents import ContractAgency, StakingEscrowAgent
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.blockchain.eth.signers.software import Web3Signer
from nucypher.blockchain.eth.token import NU, Stake
from nucypher.blockchain.eth.utils import prettify_eth_amount
from nucypher.characters.lawful import Enrico, Ursula
from nucypher.cli.literature import SUCCESSFUL_MINTING
from nucypher.cli.main import nucypher_cli
from nucypher.config.characters import StakeHolderConfiguration, UrsulaConfiguration
from nucypher.config.constants import TEMPORARY_DOMAIN
from nucypher.policy.payment import SubscriptionManagerPayment
from nucypher.utilities.logging import Logger
from nucypher.utilities.networking import LOOPBACK_ADDRESS
from tests.constants import (
FAKE_PASSWORD_CONFIRMED,
FEE_RATE_RANGE,
INSECURE_DEVELOPMENT_PASSWORD,
MOCK_IP_ADDRESS,
TEST_ETH_PROVIDER_URI,
YES_ENTER
)
from tests.utils.middleware import MockRestMiddleware
from tests.utils.ursula import MOCK_KNOWN_URSULAS_CACHE, select_test_port
@mock.patch('nucypher.config.characters.StakeHolderConfiguration.default_filepath', return_value=Path('/non/existent/file'))
def test_missing_configuration_file(default_filepath_mock, click_runner):
cmd_args = ('stake', 'list')
result = click_runner.invoke(nucypher_cli, cmd_args, catch_exceptions=False)
assert result.exit_code != 0
assert default_filepath_mock.called
assert "nucypher stake init-stakeholder" in result.output
@pytest.mark.skip()
def test_new_stakeholder(click_runner,
custom_filepath,
agency_local_registry,
testerchain):
init_args = ('stake', 'init-stakeholder',
'--config-root', str(custom_filepath.absolute()),
'--eth-provider', TEST_ETH_PROVIDER_URI,
'--network', TEMPORARY_DOMAIN,
'--registry-filepath', str(agency_local_registry.filepath.absolute()))
result = click_runner.invoke(nucypher_cli, init_args, catch_exceptions=False)
assert result.exit_code == 0
# Files and Directories
assert custom_filepath.is_dir(), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / StakeHolderConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
with open(custom_config_filepath, 'r') as config_file:
raw_config_data = config_file.read()
config_data = json.loads(raw_config_data)
assert config_data['eth_provider_uri'] == TEST_ETH_PROVIDER_URI
@pytest.mark.skip()
def test_stake_init(click_runner,
stakeholder_configuration_file_location,
stake_value,
application_economics,
testerchain,
agency_local_registry,
manual_staker):
# Staker address has not stakes
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=agency_local_registry)
stakes = list(staking_agent.get_all_stakes(staker_address=manual_staker))
assert not stakes
stake_args = ('stake', 'create',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--value', stake_value.to_tokens(),
'--lock-periods', application_economics.min_operator_seconds,
'--force')
# TODO: This test is writing to the default system directory and ignoring updates to the passed filepath
user_input = f'0\n' + f'{INSECURE_DEVELOPMENT_PASSWORD}\n' + YES_ENTER
result = click_runner.invoke(nucypher_cli, stake_args, input=user_input, catch_exceptions=False)
assert result.exit_code == 0
# Test integration with BaseConfiguration
with open(stakeholder_configuration_file_location, 'r') as config_file:
_config_data = json.loads(config_file.read())
# Verify the stake is on-chain
# Test integration with Agency
stakes = list(staking_agent.get_all_stakes(staker_address=manual_staker))
assert len(stakes) == 1
# Test integration with NU
start_period, end_period, value = stakes[0]
assert NU(int(value), 'NuNit') == stake_value
assert (end_period - start_period) == application_economics.min_operator_seconds - 1
# Test integration with Stake
stake = Stake.from_stake_info(index=0,
checksum_address=manual_staker,
stake_info=stakes[0],
staking_agent=staking_agent,
economics=application_economics)
assert stake.value == stake_value
assert stake.duration == application_economics.min_operator_seconds
@pytest.mark.skip()
def test_stake_list(click_runner,
stakeholder_configuration_file_location,
stake_value,
agency_local_registry,
testerchain):
stake_args = ('stake', 'list',
'--config-file', str(stakeholder_configuration_file_location.absolute()))
user_input = INSECURE_DEVELOPMENT_PASSWORD
result = click_runner.invoke(nucypher_cli, stake_args, input=user_input, catch_exceptions=False)
assert result.exit_code == 0
assert str(stake_value) in result.output
_minimum, default, _maximum = FEE_RATE_RANGE
assert f"{default} wei" in result.output
@pytest.mark.skip()
def test_staker_divide_stakes(click_runner,
stakeholder_configuration_file_location,
application_economics,
manual_staker,
testerchain,
agency_local_registry):
divide_args = ('stake', 'divide',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--force',
'--staking-address', manual_staker,
'--index', 0,
'--value', NU(application_economics.min_authorization, 'NuNit').to_tokens(),
'--lock-periods', 10)
result = click_runner.invoke(nucypher_cli,
divide_args,
catch_exceptions=False,
env=dict(NUCYPHER_KEYSTORE_PASSWORD=INSECURE_DEVELOPMENT_PASSWORD))
assert result.exit_code == 0
stake_args = ('stake', 'list', '--config-file', str(stakeholder_configuration_file_location.absolute()))
user_input = INSECURE_DEVELOPMENT_PASSWORD
result = click_runner.invoke(nucypher_cli, stake_args, input=user_input, catch_exceptions=False)
assert result.exit_code == 0
assert str(NU(application_economics.min_authorization, 'NuNit').to_tokens()) in result.output
@pytest.mark.skip()
def test_stake_prolong(click_runner,
testerchain,
agency_local_registry,
manual_staker,
manual_worker,
stakeholder_configuration_file_location):
prolong_args = ('stake', 'prolong',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--index', 0,
'--lock-periods', 1,
'--staking-address', manual_staker,
'--force')
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
staker.refresh_stakes()
stake = staker.stakes[0]
old_termination = stake.final_locked_period
user_input = INSECURE_DEVELOPMENT_PASSWORD
result = click_runner.invoke(nucypher_cli,
prolong_args,
input=user_input,
catch_exceptions=False)
assert result.exit_code == 0
# Ensure Integration with Stakes
stake.sync()
new_termination = stake.final_locked_period
assert new_termination == old_termination + 1
@pytest.mark.skip()
def test_stake_increase(click_runner,
stakeholder_configuration_file_location,
application_economics,
testerchain,
agency_local_registry,
manual_staker):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=agency_local_registry)
stakes = list(staking_agent.get_all_stakes(staker_address=manual_staker))
stakes_length = len(stakes)
assert stakes_length > 0
selection = 0
new_value = NU.from_units(application_economics.min_authorization // 10)
origin_stake = stakes[selection]
stake_args = ('stake', 'increase',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--value', new_value.to_tokens(),
'--index', selection,
'--force')
user_input = f'{INSECURE_DEVELOPMENT_PASSWORD}\n'
result = click_runner.invoke(nucypher_cli, stake_args, input=user_input, catch_exceptions=False)
assert result.exit_code == 0
# Verify the stake is on-chain
# Test integration with Agency
stakes = list(staking_agent.get_all_stakes(staker_address=manual_staker))
assert len(stakes) == stakes_length
# Test integration with NU
_start_period, end_period, value = stakes[selection]
assert NU(int(value), 'NuNit') == origin_stake.locked_value + new_value
assert end_period == origin_stake.last_period
@pytest.mark.skip()
def test_merge_stakes(click_runner,
stakeholder_configuration_file_location,
application_economics,
testerchain,
agency_local_registry,
manual_staker,
stake_value):
# Prepare new stake
stake_args = ('stake', 'create',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--value', stake_value.to_tokens(),
'--lock-periods', application_economics.min_operator_seconds + 1,
'--force')
user_input = f'0\n' + f'{INSECURE_DEVELOPMENT_PASSWORD}\n' + YES_ENTER
result = click_runner.invoke(nucypher_cli, stake_args, input=user_input, catch_exceptions=False)
assert result.exit_code == 0
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=agency_local_registry)
stakes = list(staking_agent.get_all_stakes(staker_address=manual_staker))
stakes_length = len(stakes)
assert stakes_length > 0
selection_1 = 0
selection_2 = 2
origin_stake_1 = stakes[selection_1]
origin_stake_2 = stakes[selection_2]
assert origin_stake_1.last_period == origin_stake_2.last_period
stake_args = ('stake', 'merge',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--index-1', selection_1,
'--index-2', selection_2,
'--force')
user_input = f'{INSECURE_DEVELOPMENT_PASSWORD}\n'
result = click_runner.invoke(nucypher_cli, stake_args, input=user_input, catch_exceptions=False)
assert result.exit_code == 0
# Verify the tx is on-chain
stakes = list(staking_agent.get_all_stakes(staker_address=manual_staker))
assert len(stakes) == stakes_length
assert stakes[selection_1].locked_value == origin_stake_1.locked_value + origin_stake_2.locked_value
assert stakes[selection_2].last_period == 1
@pytest.mark.skip()
def test_remove_inactive(click_runner,
stakeholder_configuration_file_location,
application_economics,
testerchain,
agency_local_registry,
manual_staker,
stake_value):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=agency_local_registry)
original_stakes = list(staking_agent.get_all_stakes(staker_address=manual_staker))
selection = 2
assert original_stakes[selection].last_period == 1
stake_args = ('stake', 'remove-inactive',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--index', selection,
'--force')
user_input = f'0\n' + f'{INSECURE_DEVELOPMENT_PASSWORD}\n' + YES_ENTER
result = click_runner.invoke(nucypher_cli, stake_args, input=user_input, catch_exceptions=False)
assert result.exit_code == 0
stakes = list(staking_agent.get_all_stakes(staker_address=manual_staker))
assert len(stakes) == len(original_stakes) - 1
@pytest.mark.skip()
def test_stake_bond_worker(click_runner,
testerchain,
agency_local_registry,
manual_staker,
manual_worker,
stakeholder_configuration_file_location):
init_args = ('stake', 'bond-worker',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--worker-address', manual_worker,
'--force')
user_input = INSECURE_DEVELOPMENT_PASSWORD
result = click_runner.invoke(nucypher_cli,
init_args,
input=user_input,
catch_exceptions=False)
assert result.exit_code == 0
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
assert staker.worker_address == manual_worker
@pytest.mark.skip()
def test_ursula_init(click_runner,
custom_filepath,
agency_local_registry,
manual_staker,
manual_worker,
testerchain):
deploy_port = select_test_port()
init_args = ('ursula', 'init',
'--network', TEMPORARY_DOMAIN,
'--payment-network', TEMPORARY_DOMAIN,
'--worker-address', manual_worker,
'--config-root', str(custom_filepath.absolute()),
'--eth-provider', TEST_ETH_PROVIDER_URI,
'--registry-filepath', str(agency_local_registry.filepath.absolute()),
'--rest-host', MOCK_IP_ADDRESS,
'--rest-port', deploy_port)
result = click_runner.invoke(nucypher_cli,
init_args,
input=FAKE_PASSWORD_CONFIRMED,
catch_exceptions=False)
assert result.exit_code == 0
# Files and Directories
assert custom_filepath.is_dir(), 'Configuration file does not exist'
assert (custom_filepath / 'keystore').is_dir(), 'KEYSTORE does not exist'
assert (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
with open(custom_config_filepath, 'r') as config_file:
raw_config_data = config_file.read()
config_data = json.loads(raw_config_data)
assert config_data['eth_provider_uri'] == TEST_ETH_PROVIDER_URI
assert config_data['worker_address'] == manual_worker
assert TEMPORARY_DOMAIN == config_data['domain']
@pytest.mark.skip()
def test_ursula_run(click_runner,
manual_worker,
manual_staker,
custom_filepath,
testerchain):
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
# Now start running your Ursula!
init_args = ('ursula', 'run',
'--dry-run',
'--config-file', str(custom_config_filepath.absolute()))
user_input = f'{INSECURE_DEVELOPMENT_PASSWORD}\n' * 2
result = click_runner.invoke(nucypher_cli,
init_args,
input=user_input,
catch_exceptions=False)
assert result.exit_code == 0
@pytest.mark.skip()
def test_stake_restake(click_runner,
manual_staker,
custom_filepath,
testerchain,
agency_local_registry,
stakeholder_configuration_file_location):
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
assert staker.is_restaking
restake_args = ('stake', 'restake',
'--disable',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
restake_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
assert not staker.is_restaking
assert "Successfully disabled" in result.output
disable_args = ('stake', 'restake',
'--enable',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
disable_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
assert staker.is_restaking
assert "Successfully enabled" in result.output
# Disable again
disable_args = ('stake', 'restake',
'--disable',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
disable_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
@pytest.mark.skip()
def test_stake_winddown(click_runner,
manual_staker,
custom_filepath,
testerchain,
agency_local_registry,
stakeholder_configuration_file_location):
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
assert not staker.is_winding_down
restake_args = ('stake', 'winddown',
'--enable',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
restake_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
assert staker.is_winding_down
assert "Successfully enabled" in result.output
disable_args = ('stake', 'winddown',
'--disable',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
disable_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
assert not staker.is_winding_down
assert "Successfully disabled" in result.output
@pytest.mark.skip()
def test_stake_snapshots(click_runner,
manual_staker,
custom_filepath,
testerchain,
agency_local_registry,
stakeholder_configuration_file_location):
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
assert staker.is_taking_snapshots
restake_args = ('stake', 'snapshots',
'--disable',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
restake_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
assert not staker.is_taking_snapshots
assert "Successfully disabled" in result.output
disable_args = ('stake', 'snapshots',
'--enable',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
disable_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
assert staker.is_taking_snapshots
assert "Successfully enabled" in result.output
@pytest.mark.skip()
def test_collect_rewards_integration(click_runner,
testerchain,
agency_local_registry,
stakeholder_configuration_file_location,
blockchain_alice,
blockchain_bob,
random_policy_label,
manual_staker,
manual_worker,
application_economics,
policy_value):
half_stake_time = 2 * application_economics.min_operator_seconds # Test setup
logger = Logger("Test-CLI") # Enter the Teacher's Logger, and
current_period = 0 # State the initial period for incrementing
staker_address = manual_staker
worker_address = manual_worker
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=staker_address,
registry=agency_local_registry)
staker.refresh_stakes()
# The staker is staking.
assert staker.is_staking
assert staker.stakes
assert staker.worker_address == worker_address
# TODO: Test for SubscriptionManager?
payment_method = SubscriptionManagerPayment(provider=TEST_ETH_PROVIDER_URI, network=TEMPORARY_DOMAIN)
ursula_port = select_test_port()
ursula = Ursula(is_me=True,
checksum_address=staker_address,
signer=Web3Signer(testerchain.client),
worker_address=worker_address,
registry=agency_local_registry,
rest_host=LOOPBACK_ADDRESS,
rest_port=ursula_port,
eth_provider_uri=TEST_ETH_PROVIDER_URI,
network_middleware=MockRestMiddleware(),
db_filepath=tempfile.mkdtemp(),
domain=TEMPORARY_DOMAIN,
payment_method=payment_method)
MOCK_KNOWN_URSULAS_CACHE[ursula_port] = ursula
assert ursula.worker_address == worker_address
assert ursula.checksum_address == staker_address
# Make a commitment for half the first stake duration
testerchain.time_travel(periods=1)
for _ in range(half_stake_time):
logger.debug(f">>>>>>>>>>> TEST PERIOD {current_period} <<<<<<<<<<<<<<<<")
ursula.commit_to_next_period()
testerchain.time_travel(periods=1)
current_period += 1
# Alice creates a policy and grants Bob access
blockchain_alice.selection_buffer = 1
threshold, shares = 1, 1
duration_in_periods = 3
days = (duration_in_periods - 1) * (application_economics.hours_per_period // 24)
now = testerchain.w3.eth.getBlock('latest').timestamp
expiration = maya.MayaDT(now).add(days=days)
blockchain_policy = blockchain_alice.grant(bob=blockchain_bob,
label=random_policy_label,
threshold=threshold,
shares=shares,
value=policy_value,
expiration=expiration,
payment_method=payment_method,
ursulas={ursula})
# Ensure that the handpicked Ursula was selected for the policy
treasure_map = blockchain_bob._decrypt_treasure_map(blockchain_policy.treasure_map,
blockchain_policy.publisher_verifying_key)
assert ursula.canonical_address in treasure_map.destinations
# Bob learns about the new staker and joins the policy
blockchain_bob.start_learning_loop()
blockchain_bob.remember_node(node=ursula)
# Enrico Encrypts (of course)
enrico = Enrico(policy_encrypting_key=blockchain_policy.public_key,
network_middleware=MockRestMiddleware())
verifying_key = blockchain_alice.stamp.as_umbral_pubkey()
for index in range(half_stake_time - 5):
logger.debug(f">>>>>>>>>>> TEST PERIOD {current_period} <<<<<<<<<<<<<<<<")
ursula.commit_to_next_period()
# Encrypt
random_data = os.urandom(random.randrange(20, 100))
message_kit = enrico.encrypt_message(plaintext=random_data)
# Decrypt
cleartexts = blockchain_bob.retrieve_and_decrypt([message_kit],
alice_verifying_key=verifying_key,
encrypted_treasure_map=blockchain_policy.treasure_map)
assert random_data == cleartexts[0]
# Ursula Staying online and the clock advancing
testerchain.time_travel(periods=1)
current_period += 1
# Finish the passage of time for the first Stake
for _ in range(5): # plus the extended periods from stake division
logger.debug(f">>>>>>>>>>> TEST PERIOD {current_period} <<<<<<<<<<<<<<<<")
ursula.commit_to_next_period()
testerchain.time_travel(periods=1)
current_period += 1
#
# WHERES THE MONEY URSULA?? - Collecting Rewards
#
# The address the client wants Ursula to send rewards to
burner_wallet = testerchain.w3.eth.account.create(INSECURE_DEVELOPMENT_PASSWORD)
# The rewards wallet is initially empty, because it is freshly created
assert testerchain.client.get_balance(burner_wallet.address) == 0
# Rewards will be unlocked after the
# final committed period has passed (+1).
logger.debug(f">>>>>>>>>>> TEST PERIOD {current_period} <<<<<<<<<<<<<<<<")
testerchain.time_travel(periods=1)
current_period += 1
logger.debug(f">>>>>>>>>>> TEST PERIOD {current_period} <<<<<<<<<<<<<<<<")
# At least half of the tokens are unlocked (restaking was enabled for some prior periods)
assert staker.locked_tokens() >= application_economics.min_authorization
# Collect Policy Fee
collection_args = ('stake', 'rewards', 'withdraw',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--fees',
'--no-tokens',
'--staking-address', staker_address,
'--withdraw-address', burner_wallet.address)
result = click_runner.invoke(nucypher_cli,
collection_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
# Policy Fee
collected_policy_fee = testerchain.client.get_balance(burner_wallet.address)
assert collected_policy_fee # TODO: staker CLI is pending removal
# Finish the passage of time... once and for all
# Extended periods from stake division
for _ in range(9):
ursula.commit_to_next_period()
current_period += 1
logger.debug(f">>>>>>>>>>> TEST PERIOD {current_period} <<<<<<<<<<<<<<<<")
testerchain.time_travel(periods=1)
#
# Collect Staking Reward
#
balance_before_collecting = staker.token_agent.get_balance(address=staker_address)
collection_args = ('stake', 'rewards', 'withdraw',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--no-fees',
'--tokens',
'--staking-address', staker_address,
'--force')
result = click_runner.invoke(nucypher_cli,
collection_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
# The staker has withdrawn her staking rewards
assert staker.token_agent.get_balance(address=staker_address) > balance_before_collecting
@pytest.mark.skip()
def test_stake_unbond_worker(click_runner,
testerchain,
manual_staker,
manual_worker,
agency_local_registry,
stakeholder_configuration_file_location):
testerchain.time_travel(periods=1)
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
assert staker.worker_address == manual_worker
init_args = ('stake', 'unbond-worker',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force'
)
user_input = f'{INSECURE_DEVELOPMENT_PASSWORD}'
result = click_runner.invoke(nucypher_cli,
init_args,
input=user_input,
catch_exceptions=False)
assert result.exit_code == 0
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
assert staker.worker_address == NULL_ADDRESS
@pytest.mark.skip()
def test_set_min_rate(click_runner,
manual_staker,
testerchain,
agency_local_registry,
stakeholder_configuration_file_location):
_minimum, _default, maximum = FEE_RATE_RANGE
min_rate = maximum - 1
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
assert staker.raw_min_fee_rate == 0
min_rate_in_gwei = Web3.fromWei(min_rate, 'gwei')
restake_args = ('stake', 'set-min-rate',
'--min-rate', min_rate_in_gwei,
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
restake_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
assert staker.raw_min_fee_rate == min_rate
assert "successfully set" in result.output
stake_args = ('stake', 'list',
'--config-file', str(stakeholder_configuration_file_location.absolute()))
user_input = INSECURE_DEVELOPMENT_PASSWORD
result = click_runner.invoke(nucypher_cli, stake_args, input=user_input, catch_exceptions=False)
assert result.exit_code == 0
assert f"{prettify_eth_amount(min_rate)}" in result.output
@pytest.mark.skip()
def test_mint(click_runner,
manual_staker,
testerchain,
agency_local_registry,
stakeholder_configuration_file_location):
testerchain.time_travel(periods=2)
staker = Staker(domain=TEMPORARY_DOMAIN,
checksum_address=manual_staker,
registry=agency_local_registry)
assert staker.mintable_periods() > 0
owned_tokens = staker.owned_tokens()
mint_args = ('stake', 'mint',
'--config-file', str(stakeholder_configuration_file_location.absolute()),
'--staking-address', manual_staker,
'--force')
result = click_runner.invoke(nucypher_cli,
mint_args,
input=INSECURE_DEVELOPMENT_PASSWORD,
catch_exceptions=False)
assert result.exit_code == 0
assert staker.owned_tokens() > owned_tokens
assert staker.mintable_periods() == 0
assert SUCCESSFUL_MINTING in result.output

View File

@ -1,308 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import pytest
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.blockchain.eth.token import NU, Stake, validate_divide, validate_prolong, validate_increase, \
validate_merge
from nucypher.types import StakerInfo
def test_child_status():
for status in Stake.Status:
assert status.is_child(status)
# Check relations for inactive status
assert Stake.Status.INACTIVE.is_child(Stake.Status.UNLOCKED)
assert not Stake.Status.INACTIVE.is_child(Stake.Status.LOCKED)
assert not Stake.Status.INACTIVE.is_child(Stake.Status.EDITABLE)
assert not Stake.Status.INACTIVE.is_child(Stake.Status.DIVISIBLE)
# Check relations for unlocked status
assert not Stake.Status.UNLOCKED.is_child(Stake.Status.INACTIVE)
assert not Stake.Status.UNLOCKED.is_child(Stake.Status.LOCKED)
assert not Stake.Status.UNLOCKED.is_child(Stake.Status.EDITABLE)
assert not Stake.Status.UNLOCKED.is_child(Stake.Status.DIVISIBLE)
# Check relations for locked status
assert not Stake.Status.LOCKED.is_child(Stake.Status.INACTIVE)
assert not Stake.Status.LOCKED.is_child(Stake.Status.UNLOCKED)
assert not Stake.Status.LOCKED.is_child(Stake.Status.EDITABLE)
assert not Stake.Status.LOCKED.is_child(Stake.Status.DIVISIBLE)
# Check relations for editable status
assert not Stake.Status.EDITABLE.is_child(Stake.Status.INACTIVE)
assert not Stake.Status.EDITABLE.is_child(Stake.Status.UNLOCKED)
assert Stake.Status.EDITABLE.is_child(Stake.Status.LOCKED)
assert not Stake.Status.EDITABLE.is_child(Stake.Status.DIVISIBLE)
# Check relations for divisible status
assert not Stake.Status.DIVISIBLE.is_child(Stake.Status.INACTIVE)
assert not Stake.Status.DIVISIBLE.is_child(Stake.Status.UNLOCKED)
assert Stake.Status.DIVISIBLE.is_child(Stake.Status.LOCKED)
assert Stake.Status.DIVISIBLE.is_child(Stake.Status.EDITABLE)
@pytest.mark.skip('reuse me')
def test_stake_status(mock_testerchain, application_economics, mock_staking_agent):
address = mock_testerchain.etherbase_account
current_period = 3
staker_info = StakerInfo(current_committed_period=current_period-1,
next_committed_period=current_period,
value=0,
last_committed_period=0,
lock_restake_until_period=False,
completed_work=0,
worker_start_period=0,
worker=NULL_ADDRESS,
flags=bytes())
mock_staking_agent.get_current_period.return_value = current_period
mock_staking_agent.get_staker_info.return_value = staker_info
def make_sub_stake(value, first_locked_period, final_locked_period):
return Stake(checksum_address=address,
first_locked_period=first_locked_period,
final_locked_period=final_locked_period,
value=value,
index=0,
staking_agent=mock_staking_agent,
economics=application_economics)
# Prepare unlocked sub-stake
nu = NU.from_units(2 * application_economics.min_authorization - 1)
stake = make_sub_stake(nu, current_period - 2, current_period - 1)
assert stake.status() == Stake.Status.UNLOCKED
# Prepare inactive sub-stake
# Update staker info and create new state
stake = make_sub_stake(nu, current_period - 2, current_period - 1)
staker_info = staker_info._replace(current_committed_period=current_period,
next_committed_period=current_period + 1)
mock_staking_agent.get_staker_info.return_value = staker_info
assert stake.status() == Stake.Status.INACTIVE
# Prepare locked sub-stake
stake = make_sub_stake(nu, current_period - 2, current_period)
assert stake.status() == Stake.Status.LOCKED
# Prepare editable sub-stake
stake = make_sub_stake(nu, current_period - 2, current_period + 1)
assert stake.status() == Stake.Status.EDITABLE
# Prepare divisible sub-stake
nu = NU.from_units(2 * application_economics.min_authorization)
stake = make_sub_stake(nu, current_period - 2, current_period + 1)
assert stake.status() == Stake.Status.DIVISIBLE
@pytest.mark.skip('remove me')
def test_stake_sync(mock_testerchain, application_economics, mock_staking_agent):
address = mock_testerchain.etherbase_account
current_period = 3
staker_info = StakerInfo(current_committed_period=current_period-1,
next_committed_period=current_period,
value=0,
last_committed_period=0,
lock_restake_until_period=False,
completed_work=0,
worker_start_period=0,
worker=NULL_ADDRESS,
flags=bytes())
mock_staking_agent.get_current_period.return_value = current_period
mock_staking_agent.get_staker_info.return_value = staker_info
# Prepare sub-stake
nu = NU.from_units(2 * application_economics.min_authorization - 1)
stake = Stake(checksum_address=address,
first_locked_period=current_period - 2,
final_locked_period=current_period + 1,
value=nu,
index=0,
staking_agent=mock_staking_agent,
economics=application_economics)
assert stake.status() == Stake.Status.EDITABLE
# Update locked value and sync
sub_stake_info = stake.to_stake_info()
nunits = 2 * application_economics.min_authorization
sub_stake_info = sub_stake_info._replace(locked_value=nunits)
mock_staking_agent.get_substake_info.return_value = sub_stake_info
stake.sync()
assert stake.status() == Stake.Status.DIVISIBLE
assert stake.value == NU.from_units(nunits)
# Update current period and sync
mock_staking_agent.get_current_period.return_value = current_period + 1
sub_stake_info = sub_stake_info._replace(locked_value=nunits)
mock_staking_agent.get_substake_info.return_value = sub_stake_info
stake.sync()
assert stake.status() == Stake.Status.LOCKED
assert stake.final_locked_period == current_period + 1
# Update final period and sync
sub_stake_info = sub_stake_info._replace(last_period=current_period)
mock_staking_agent.get_substake_info.return_value = sub_stake_info
stake.sync()
assert stake.status() == Stake.Status.UNLOCKED
assert stake.final_locked_period == current_period
# Update first period and sync
sub_stake_info = sub_stake_info._replace(first_period=current_period)
mock_staking_agent.get_substake_info.return_value = sub_stake_info
with pytest.raises(Stake.StakingError):
stake.sync()
@pytest.mark.skip('remove me')
def test_stake_validation(mock_testerchain, application_economics, mock_staking_agent):
address = mock_testerchain.etherbase_account
# Validate stake initialization
with pytest.raises(Stake.StakingError):
Stake.initialize_stake(staking_agent=mock_staking_agent,
checksum_address=address,
economics=application_economics,
amount=application_economics.min_authorization - 1,
lock_periods=application_economics.min_operator_seconds)
with pytest.raises(Stake.StakingError):
Stake.initialize_stake(staking_agent=mock_staking_agent,
checksum_address=address,
economics=application_economics,
amount=application_economics.min_authorization,
lock_periods=application_economics.min_operator_seconds - 1)
with pytest.raises(Stake.StakingError):
Stake.initialize_stake(staking_agent=mock_staking_agent,
checksum_address=address,
economics=application_economics,
amount=application_economics.maximum_allowed_locked + 1,
lock_periods=application_economics.min_operator_seconds)
mock_staking_agent.get_locked_tokens.return_value = 0
Stake.initialize_stake(staking_agent=mock_staking_agent,
checksum_address=address,
economics=application_economics,
amount=application_economics.maximum_allowed_locked,
lock_periods=application_economics.min_operator_seconds)
# Validate divide method
current_period = 10
mock_staking_agent.get_current_period.return_value = 10
def make_sub_stake(value, first_locked_period, final_locked_period, index=0):
return Stake(checksum_address=address,
first_locked_period=first_locked_period,
final_locked_period=final_locked_period,
value=value,
index=index,
staking_agent=mock_staking_agent,
economics=application_economics)
nu = NU.from_units(2 * application_economics.min_authorization - 1)
stake = make_sub_stake(first_locked_period=current_period - 2,
final_locked_period=current_period + 1,
value=nu)
with pytest.raises(Stake.StakingError):
validate_divide(stake=stake, target_value=application_economics.min_authorization, additional_periods=1)
stake = make_sub_stake(first_locked_period=current_period - 2,
final_locked_period=current_period,
value=nu + 1)
with pytest.raises(Stake.StakingError):
validate_divide(stake=stake, target_value=application_economics.min_authorization, additional_periods=1)
stake = make_sub_stake(first_locked_period=current_period - 2,
final_locked_period=current_period + 1,
value=nu + 1)
with pytest.raises(Stake.StakingError):
validate_divide(stake=stake, target_value=application_economics.min_authorization - 1, additional_periods=1)
validate_divide(stake=stake, target_value=application_economics.min_authorization, additional_periods=1)
# Validate prolong method
stake = make_sub_stake(first_locked_period=current_period - 2,
final_locked_period=current_period,
value=nu)
with pytest.raises(Stake.StakingError):
validate_prolong(stake=stake, additional_periods=1)
stake = make_sub_stake(first_locked_period=current_period - 2,
final_locked_period=current_period + 2,
value=nu)
with pytest.raises(Stake.StakingError):
validate_prolong(stake=stake, additional_periods=1)
with pytest.raises(Stake.StakingError):
validate_prolong(stake=stake, additional_periods=application_economics.min_operator_seconds - 3)
validate_prolong(stake=stake, additional_periods=application_economics.min_operator_seconds - 2)
# Validate increase method
stake = make_sub_stake(first_locked_period=current_period - 2,
final_locked_period=current_period,
value=nu)
with pytest.raises(Stake.StakingError):
validate_increase(stake=stake, amount=nu)
stake = make_sub_stake(first_locked_period=current_period - 2,
final_locked_period=current_period + 1,
value=nu)
stake.staking_agent.get_locked_tokens.return_value = nu
with pytest.raises(Stake.StakingError):
validate_increase(stake=stake, amount=NU.from_units(application_economics.maximum_allowed_locked - int(nu) + 1))
validate_increase(stake=stake, amount=NU.from_units(application_economics.maximum_allowed_locked - int(nu)))
# Validate merge method
stake_1 = make_sub_stake(first_locked_period=current_period - 1,
final_locked_period=current_period,
value=nu,
index=0)
stake_2 = make_sub_stake(first_locked_period=current_period - 1,
final_locked_period=current_period,
value=nu,
index=1)
with pytest.raises(Stake.StakingError):
validate_merge(stake_1=stake_1, stake_2=stake_2)
stake_1 = make_sub_stake(first_locked_period=current_period - 1,
final_locked_period=current_period + 1,
value=nu,
index=2)
stake_2 = make_sub_stake(first_locked_period=current_period - 1,
final_locked_period=current_period + 2,
value=nu,
index=3)
with pytest.raises(Stake.StakingError):
validate_merge(stake_1=stake_1, stake_2=stake_2)
with pytest.raises(Stake.StakingError):
validate_merge(stake_1=stake_1, stake_2=stake_1)
stake_2 = make_sub_stake(first_locked_period=current_period - 3,
final_locked_period=current_period + 1,
value=nu,
index=4)
validate_merge(stake_1=stake_1, stake_2=stake_2)

View File

@ -1,52 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from nucypher.blockchain.eth.actors import StakeHolder
from nucypher.blockchain.eth.signers.software import Web3Signer
from nucypher.cli.actions.select import select_client_account_for_staking
from nucypher.config.constants import TEMPORARY_DOMAIN
def test_select_client_account_for_staking_cli_action(test_emitter,
test_registry,
test_registry_source_manager,
mock_stdin,
mock_testerchain,
capsys,
mock_staking_agent):
"""Fine-grained assertions about the return value of interactive client account selection"""
mock_staking_agent.get_all_stakes.return_value = []
selected_index = 0
selected_account = mock_testerchain.client.accounts[selected_index]
stakeholder = StakeHolder(registry=test_registry,
domain=TEMPORARY_DOMAIN,
signer=Web3Signer(mock_testerchain.client))
client_account, staking_address = select_client_account_for_staking(emitter=test_emitter,
stakeholder=stakeholder,
staking_address=selected_account)
assert client_account == staking_address == selected_account
mock_stdin.line(str(selected_index))
client_account, staking_address = select_client_account_for_staking(emitter=test_emitter,
stakeholder=stakeholder,
staking_address=None)
assert client_account == staking_address == selected_account
assert mock_stdin.empty()

View File

@ -1,381 +0,0 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import click
import pytest
from typing import Callable, List
from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
from nucypher.blockchain.eth.actors import StakeHolder
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.blockchain.eth.signers.software import Web3Signer
from nucypher.blockchain.eth.token import Stake
from nucypher.cli.actions.select import select_stake
from nucypher.cli.literature import NO_STAKES_FOUND, ONLY_DISPLAYING_DIVISIBLE_STAKES_NOTE
from nucypher.cli.painting.staking import STAKER_TABLE_COLUMNS, STAKE_TABLE_COLUMNS
from nucypher.config.constants import TEMPORARY_DOMAIN
from nucypher.types import SubStakeInfo, StakerInfo
def make_sub_stakes(current_period, token_economics, sub_stakes_functions: List[Callable]) -> List[SubStakeInfo]:
sub_stakes = []
for function in sub_stakes_functions:
sub_stakes.extend(function(current_period, token_economics))
return sub_stakes
def empty_sub_stakes(_current_period, _token_economics) -> List[SubStakeInfo]:
return []
def inactive_sub_stakes(current_period, token_economics) -> List[SubStakeInfo]:
stakes = [SubStakeInfo(first_period=1,
last_period=current_period - 2,
locked_value=token_economics.min_authorization),
SubStakeInfo(first_period=current_period - 4,
last_period=current_period - 3,
locked_value=2 * token_economics.min_authorization + 1)]
return stakes
def unlocked_sub_stakes(current_period, token_economics) -> List[SubStakeInfo]:
stakes = [SubStakeInfo(first_period=1,
last_period=current_period - 1,
locked_value=token_economics.min_authorization),
SubStakeInfo(first_period=current_period - 3,
last_period=current_period - 1,
locked_value=2 * token_economics.min_authorization + 1)]
return stakes
def not_editable_sub_stakes(current_period, token_economics) -> List[SubStakeInfo]:
stakes = [SubStakeInfo(first_period=1,
last_period=current_period,
locked_value=token_economics.min_authorization),
SubStakeInfo(first_period=current_period - 3,
last_period=current_period,
locked_value=2 * token_economics.min_authorization + 1)]
return stakes
def non_divisible_sub_stakes(current_period, token_economics) -> List[SubStakeInfo]:
stakes = [SubStakeInfo(first_period=1,
last_period=current_period + 1,
locked_value=token_economics.min_authorization),
SubStakeInfo(first_period=current_period - 3,
last_period=current_period + 2,
locked_value=2 * token_economics.min_authorization - 1),
SubStakeInfo(first_period=current_period - 1,
last_period=current_period + 2,
locked_value=token_economics.min_authorization + 1)]
return stakes
def divisible_sub_stakes(current_period, token_economics) -> List[SubStakeInfo]:
stakes = [SubStakeInfo(first_period=1,
last_period=current_period + 1,
locked_value=2 * token_economics.min_authorization),
SubStakeInfo(first_period=current_period - 3,
last_period=current_period + 2,
locked_value=2 * token_economics.min_authorization + 1)]
return stakes
@pytest.fixture()
def current_period(mock_staking_agent):
current_period = 10
return current_period
@pytest.fixture()
def stakeholder(current_period, mock_staking_agent, test_registry, mock_testerchain):
mock_staking_agent.get_current_period.return_value = current_period
staker_info = StakerInfo(current_committed_period=current_period-1,
next_committed_period=current_period,
value=0,
last_committed_period=0,
lock_restake_until_period=False,
completed_work=0,
worker_start_period=0,
worker=NULL_ADDRESS,
flags=bytes())
mock_staking_agent.get_staker_info.return_value = staker_info
return StakeHolder(registry=test_registry,
domain=TEMPORARY_DOMAIN,
signer=Web3Signer(mock_testerchain.client))
def assert_stake_table_painted(output: str) -> None:
for column_name in (*STAKER_TABLE_COLUMNS, *STAKE_TABLE_COLUMNS):
assert column_name in output
def assert_stake_table_not_painted(output: str) -> None:
for column_name in (*STAKER_TABLE_COLUMNS, *STAKE_TABLE_COLUMNS):
assert column_name not in output
@pytest.mark.skip()
@pytest.mark.parametrize('sub_stakes_functions', [
[empty_sub_stakes],
[inactive_sub_stakes],
[unlocked_sub_stakes],
[not_editable_sub_stakes],
[inactive_sub_stakes, unlocked_sub_stakes, not_editable_sub_stakes]
])
def test_handle_selection_with_with_no_editable_stakes(test_emitter,
stakeholder,
mock_staking_agent,
mock_testerchain,
mock_stdin, # used to assert user hasn't been prompted
capsys,
current_period,
application_economics,
sub_stakes_functions):
mock_stakes = make_sub_stakes(current_period, application_economics, sub_stakes_functions)
mock_staking_agent.get_all_stakes.return_value = mock_stakes
staker = mock_testerchain.unassigned_accounts[0]
stakeholder.assimilate(staker, password=INSECURE_DEVELOPMENT_PASSWORD)
# Test
with pytest.raises(click.Abort):
select_stake(emitter=test_emitter, staker=stakeholder.staker)
# Examine
captured = capsys.readouterr()
assert NO_STAKES_FOUND in captured.out
assert_stake_table_not_painted(output=captured.out)
assert mock_stdin.empty()
@pytest.mark.skip()
@pytest.mark.parametrize('sub_stakes_functions', [
[non_divisible_sub_stakes],
[divisible_sub_stakes],
[inactive_sub_stakes, non_divisible_sub_stakes],
[unlocked_sub_stakes, non_divisible_sub_stakes],
[not_editable_sub_stakes, non_divisible_sub_stakes],
[unlocked_sub_stakes, divisible_sub_stakes],
[not_editable_sub_stakes, divisible_sub_stakes],
[inactive_sub_stakes, divisible_sub_stakes],
[inactive_sub_stakes, not_editable_sub_stakes, non_divisible_sub_stakes, unlocked_sub_stakes, divisible_sub_stakes]
])
def test_select_editable_stake(test_emitter,
stakeholder,
mock_staking_agent,
mock_testerchain,
mock_stdin, # used to assert user hasn't been prompted
capsys,
current_period,
application_economics,
sub_stakes_functions):
mock_stakes = make_sub_stakes(current_period, application_economics, sub_stakes_functions)
mock_staking_agent.get_all_stakes.return_value = mock_stakes
staker = mock_testerchain.unassigned_accounts[0]
stakeholder.assimilate(staker, password=INSECURE_DEVELOPMENT_PASSWORD)
selection = len(mock_stakes) - 1
expected_stake = Stake.from_stake_info(stake_info=mock_stakes[selection],
staking_agent=mock_staking_agent, # stakinator
index=selection,
checksum_address=stakeholder.checksum_address,
economics=application_economics)
# User's selection
mock_stdin.line(str(selection))
selected_stake = select_stake(emitter=test_emitter, staker=stakeholder.staker)
# Check stake accuracy
assert isinstance(selected_stake, Stake)
assert selected_stake == expected_stake
# Examine the output
captured = capsys.readouterr()
assert NO_STAKES_FOUND not in captured.out
assert ONLY_DISPLAYING_DIVISIBLE_STAKES_NOTE not in captured.out
assert_stake_table_painted(output=captured.out)
assert mock_stdin.empty()
@pytest.mark.skip()
def test_handle_selection_with_no_divisible_stakes(test_emitter,
stakeholder,
mock_staking_agent,
mock_testerchain,
mock_stdin, # used to assert user hasn't been prompted
capsys,
current_period,
application_economics):
# Setup
mock_stakes = make_sub_stakes(current_period, application_economics, [non_divisible_sub_stakes])
mock_staking_agent.get_all_stakes.return_value = mock_stakes
staker = mock_testerchain.unassigned_accounts[0]
stakeholder.assimilate(staker, password=INSECURE_DEVELOPMENT_PASSWORD)
# FAILURE: Divisible only with no divisible stakes on chain
with pytest.raises(click.Abort):
select_stake(emitter=test_emitter, staker=stakeholder.staker, stakes_status=Stake.Status.DIVISIBLE)
# Divisible warning was displayed, but having
# no divisible stakes cases an expected failure
captured = capsys.readouterr()
assert NO_STAKES_FOUND in captured.out
assert ONLY_DISPLAYING_DIVISIBLE_STAKES_NOTE in captured.out
assert_stake_table_not_painted(output=captured.out)
assert mock_stdin.empty()
@pytest.mark.skip()
@pytest.mark.parametrize('sub_stakes_functions', [
[divisible_sub_stakes],
[inactive_sub_stakes, divisible_sub_stakes],
[unlocked_sub_stakes, divisible_sub_stakes],
[not_editable_sub_stakes, divisible_sub_stakes],
[non_divisible_sub_stakes, divisible_sub_stakes],
[inactive_sub_stakes, not_editable_sub_stakes, non_divisible_sub_stakes, unlocked_sub_stakes, divisible_sub_stakes]
])
def test_select_divisible_stake(test_emitter,
stakeholder,
mock_staking_agent,
mock_testerchain,
mock_stdin, # used to assert user hasn't been prompted
capsys,
current_period,
application_economics,
sub_stakes_functions):
# Setup
mock_stakes = make_sub_stakes(current_period, application_economics, sub_stakes_functions)
mock_staking_agent.get_all_stakes.return_value = mock_stakes
staker = mock_testerchain.unassigned_accounts[0]
stakeholder.assimilate(staker, password=INSECURE_DEVELOPMENT_PASSWORD)
selection = len(mock_stakes) - 1
expected_stake = Stake.from_stake_info(stake_info=mock_stakes[selection],
staking_agent=mock_staking_agent, # stakinator
index=selection,
checksum_address=stakeholder.checksum_address,
economics=application_economics)
# SUCCESS: Display all divisible-only stakes and make a selection
mock_stdin.line(str(selection))
selected_stake = select_stake(emitter=test_emitter, staker=stakeholder.staker, stakes_status=Stake.Status.DIVISIBLE)
assert isinstance(selected_stake, Stake)
assert selected_stake == expected_stake
# Examine the output
captured = capsys.readouterr()
assert NO_STAKES_FOUND not in captured.out
assert ONLY_DISPLAYING_DIVISIBLE_STAKES_NOTE in captured.out
assert_stake_table_painted(output=captured.out)
assert mock_stdin.empty()
@pytest.mark.skip()
@pytest.mark.parametrize('sub_stakes_functions', [
[not_editable_sub_stakes],
[inactive_sub_stakes, not_editable_sub_stakes],
[unlocked_sub_stakes, not_editable_sub_stakes],
[divisible_sub_stakes, not_editable_sub_stakes],
[non_divisible_sub_stakes, not_editable_sub_stakes],
[inactive_sub_stakes, non_divisible_sub_stakes, unlocked_sub_stakes, divisible_sub_stakes, not_editable_sub_stakes]
])
def test_select_using_filter_function(test_emitter,
stakeholder,
mock_staking_agent,
mock_testerchain,
mock_stdin, # used to assert user hasn't been prompted
capsys,
current_period,
application_economics,
sub_stakes_functions):
# Setup
mock_stakes = make_sub_stakes(current_period, application_economics, sub_stakes_functions)
mock_staking_agent.get_all_stakes.return_value = mock_stakes
staker = mock_testerchain.unassigned_accounts[0]
stakeholder.assimilate(staker, password=INSECURE_DEVELOPMENT_PASSWORD)
selection = len(mock_stakes) - 1
expected_stake = Stake.from_stake_info(stake_info=mock_stakes[selection],
staking_agent=mock_staking_agent, # stakinator
index=selection,
checksum_address=stakeholder.checksum_address,
economics=application_economics)
# SUCCESS: Display all editable-only stakes with specified final period
mock_stdin.line(str(selection))
selected_stake = select_stake(emitter=test_emitter,
staker=stakeholder.staker,
stakes_status=Stake.Status.LOCKED,
filter_function=lambda stake: stake.final_locked_period == current_period)
assert isinstance(selected_stake, Stake)
assert selected_stake == expected_stake
# Examine the output
captured = capsys.readouterr()
assert NO_STAKES_FOUND not in captured.out
assert_stake_table_painted(output=captured.out)
assert mock_stdin.empty()
@pytest.mark.skip()
@pytest.mark.parametrize('sub_stakes_functions', [
[inactive_sub_stakes],
[unlocked_sub_stakes],
[divisible_sub_stakes],
[non_divisible_sub_stakes],
[inactive_sub_stakes, non_divisible_sub_stakes, unlocked_sub_stakes, divisible_sub_stakes]
])
def test_no_stakes_with_filter_function(test_emitter,
stakeholder,
mock_staking_agent,
mock_testerchain,
mock_stdin, # used to assert user hasn't been prompted
capsys,
current_period,
application_economics,
sub_stakes_functions):
# Setup
mock_stakes = make_sub_stakes(current_period, application_economics, sub_stakes_functions)
mock_staking_agent.get_all_stakes.return_value = mock_stakes
staker = mock_testerchain.unassigned_accounts[0]
stakeholder.assimilate(staker, password=INSECURE_DEVELOPMENT_PASSWORD)
# FAILURE: no stakes with specified final period
with pytest.raises(click.Abort):
select_stake(emitter=test_emitter,
staker=stakeholder.staker,
stakes_status=Stake.Status.LOCKED,
filter_function=lambda stake: stake.final_locked_period == current_period)
# Divisible warning was displayed, but having
# no divisible stakes causes an expected failure
captured = capsys.readouterr()
assert NO_STAKES_FOUND in captured.out
assert_stake_table_not_painted(output=captured.out)
assert mock_stdin.empty()

File diff suppressed because it is too large Load Diff