Merge pull request #2056 from fjarri/sampling

Simplify staker sampling
K Prasch 2020-08-13 13:51:48 -07:00 committed by GitHub
commit 697a6d3d02
No known key found for this signature in database
7 changed files with 212 additions and 143 deletions

View File

@ -44,7 +44,8 @@ from nucypher.blockchain.eth.agents import (
from nucypher.blockchain.eth.constants import NULL_ADDRESS
from nucypher.blockchain.eth.decorators import (
@ -1626,16 +1627,11 @@ class BlockchainPolicyAuthor(NucypherTokenActor):
payload = {**blockchain_payload, **policy_end_time}
return payload
def recruit(self, quantity: int, **options) -> List[str]:
def get_stakers_reservoir(self, **options) -> StakersReservoir:
Uses sampling logic to gather stakers from the blockchain and
caches the resulting node ethereum addresses.
:param quantity: Number of ursulas to sample from the blockchain.
Get a sampler object containing the currently registered stakers.
staker_addresses = self.staking_agent.sample(quantity=quantity, **options)
return staker_addresses
return self.staking_agent.get_stakers_reservoir(**options)
def create_policy(self, *args, **kwargs):

View File

@ -15,8 +15,9 @@ You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <>.
from bisect import bisect_right
from itertools import accumulate
import random
import math
import sys
from constant_sorrow.constants import ( # type: ignore
@ -732,73 +733,28 @@ class StakingEscrowAgent(EthereumContractAgent):
staker_address: ChecksumAddress = self.contract.functions.stakers(index).call()
yield staker_address
def sample(self,
quantity: int,
duration: int,
additional_ursulas: float = 1.5,
attempts: int = 5,
pagination_size: Optional[int] = None
) -> List[ChecksumAddress]:
Select n random Stakers, according to their stake distribution.
def get_stakers_reservoir(self,
duration: int,
without: Iterable[ChecksumAddress] = [],
pagination_size: Optional[int] = None) -> 'StakersReservoir':
The returned addresses are shuffled, so one can request more than needed and
throw away those which do not respond.
n_tokens, stakers_map = self.get_all_active_stakers(periods=duration,
See full diagram here:
filtered_out = 0
for address in without:
if address in stakers_map:
n_tokens -= stakers_map[address]
del stakers_map[address]
filtered_out += 1
This method implements the Probability Proportional to Size (PPS) sampling algorithm.
In few words, the algorithm places in a line all active stakes that have locked tokens for
at least `duration` periods; a staker is selected if an input point is within its stake.
For example:
self.log.debug(f"Got {len(stakers_map)} stakers with {n_tokens} total tokens "
f"({filtered_out} filtered out)")
Stakes: |----- S0 ----|--------- S1 ---------|-- S2 --|---- S3 ---|-S4-|----- S5 -----|
Points: ....R0.......................R1..................R2...............R3...........
In this case, Stakers 0, 1, 3 and 5 will be selected.
Only stakers which made a commitment to the current period (in the previous period) are used.
system_random = random.SystemRandom()
n_tokens, stakers_map = self.get_all_active_stakers(periods=duration, pagination_size=pagination_size)
if n_tokens == 0:
raise self.NotEnoughStakers('There are no locked tokens for duration {}.'.format(duration))
raise self.NotEnoughStakers(f'There are no locked tokens for duration {duration}.')
sample_size = quantity
for _ in range(attempts):
sample_size = math.ceil(sample_size * additional_ursulas)
points = sorted(system_random.randrange(n_tokens) for _ in range(sample_size))
self.log.debug(f"Sampling {sample_size} stakers with random points: {points}")
addresses = set()
stakers = list(stakers_map.items())
point_index = 0
sum_of_locked_tokens = 0
staker_index = 0
stakers_len = len(stakers)
while staker_index < stakers_len and point_index < sample_size:
current_staker = stakers[staker_index][0]
staker_tokens = stakers[staker_index][1]
next_sum_value = sum_of_locked_tokens + staker_tokens
point = points[point_index]
if sum_of_locked_tokens <= point < next_sum_value:
point_index += 1
staker_index += 1
sum_of_locked_tokens = next_sum_value
self.log.debug(f"Sampled {len(addresses)} stakers: {list(addresses)}")
if len(addresses) >= quantity:
return system_random.sample(addresses, quantity)
raise self.NotEnoughStakers('Selection failed after {} attempts'.format(attempts))
return StakersReservoir(stakers_map)
def get_completed_work(self, bidder_address: ChecksumAddress) -> Work:
@ -1656,3 +1612,69 @@ class ContractAgency:
agent_class: Type[EthereumContractAgent] = getattr(agents_module, agent_name)
agent: EthereumContractAgent = cls.get_agent(agent_class=agent_class, registry=registry, provider_uri=provider_uri)
return agent
class WeightedSampler:
Samples random elements with probabilities proportional to given weights.
def __init__(self, weighted_elements: Dict[Any, int]):
elements, weights = zip(*weighted_elements.items())
self.totals = list(accumulate(weights))
self.elements = elements
def sample_no_replacement(self, rng, quantity: int) -> list:
Samples ``quantity`` of elements from the internal array.
The probablity of an element to appear is proportional
to the weight provided to the constructor.
The elements will not repeat; every time an element is sampled its weight is set to 0.
(does not mutate the object and only applies to the current invocation of the method).
if quantity == 0:
return []
if quantity > len(self):
raise ValueError("Cannot sample more than the total amount of elements without replacement")
totals = self.totals.copy()
samples = []
for i in range(quantity):
position = rng.randint(0, totals[-1] - 1)
idx = bisect_right(totals, position)
# Adjust the totals so that they correspond
# to the weight of the element `idx` being set to 0.
prev_total = totals[idx - 1] if idx > 0 else 0
weight = totals[idx] - prev_total
for j in range(idx, len(totals)):
totals[j] -= weight
return samples
def __len__(self):
return len(self.totals)
class StakersReservoir:
def __init__(self, stakers_map: Dict[ChecksumAddress, int]):
self._sampler = WeightedSampler(stakers_map)
self._rng = random.SystemRandom()
def __len__(self):
return len(self._sampler)
def draw(self, quantity):
if quantity > len(self):
raise StakingEscrowAgent.NotEnoughStakers(f'Cannot sample {quantity} out of {len(self)} total stakers')
return self._sampler.sample_no_replacement(self._rng, quantity)
def draw_at_most(self, quantity):
return self.draw(min(quantity, len(self)))

View File

@ -34,7 +34,7 @@ from eth_utils import to_checksum_address
from requests.exceptions import SSLError
from twisted.internet import defer, reactor, task
from twisted.internet.threads import deferToThread
from typing import Set, Tuple, Union
from typing import Set, Tuple, Union, Iterable
from umbral.signing import Signature
import nucypher
@ -603,9 +603,10 @@ class Learner:
# TODO: Allow the user to set eagerness? 1712
def learn_about_specific_nodes(self, addresses: Set):
self._node_ids_to_learn_about_immediately.update(addresses) # hmmmm
def learn_about_specific_nodes(self, addresses: Iterable):
if len(addresses) > 0:
self._node_ids_to_learn_about_immediately.update(addresses) # hmmmm
# TODO: Dehydrate these next two methods. NRN

View File

@ -15,6 +15,7 @@ You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <>.
import time
import random
from collections import OrderedDict, deque
@ -22,7 +23,7 @@ import maya
from abc import ABC, abstractmethod
from bytestring_splitter import BytestringSplitter, VariableLengthBytestring
from constant_sorrow.constants import NOT_SIGNED, UNKNOWN_KFRAG
from typing import Generator, List, Set
from typing import Generator, List, Set, Optional
from umbral.keys import UmbralPublicKey
from umbral.kfrags import KFrag
@ -205,9 +206,6 @@ class Policy(ABC):
self.treasure_map = TreasureMap(m=m)
self.expiration = expiration
# Keep track of this stuff
self.selection_buffer = 1
self._accepted_arrangements = set() # type: Set[Arrangement]
self._rejected_arrangements = set() # type: Set[Arrangement]
self._spare_candidates = set() # type: Set[Ursula]
@ -384,7 +382,7 @@ class Policy(ABC):
def make_arrangements(self,
network_middleware: RestMiddleware,
handpicked_ursulas: Set[Ursula] = None,
handpicked_ursulas: Optional[Set[Ursula]] = None,
*args, **kwargs,
) -> None:
@ -411,17 +409,17 @@ class Policy(ABC):
raise NotImplementedError
def sample_essential(self, quantity: int, handpicked_ursulas: Set[Ursula] = None) -> Set[Ursula]:
def sample_essential(self, quantity: int, handpicked_ursulas: Set[Ursula]) -> Set[Ursula]:
raise NotImplementedError
def sample(self, handpicked_ursulas: Set[Ursula] = None) -> Set[Ursula]:
def sample(self, handpicked_ursulas: Optional[Set[Ursula]] = None) -> Set[Ursula]:
selected_ursulas = set(handpicked_ursulas) if handpicked_ursulas else set()
# Calculate the target sample quantity
target_sample_quantity = self.n - len(selected_ursulas)
if target_sample_quantity > 0:
sampled_ursulas = self.sample_essential(quantity=target_sample_quantity,
return selected_ursulas
@ -478,11 +476,11 @@ class FederatedPolicy(Policy):
"Pass them here as handpicked_ursulas.".format(self.n)
raise self.MoreKFragsThanArrangements(error) # TODO: NotEnoughUrsulas where in the exception tree is this?
def sample_essential(self, quantity: int, handpicked_ursulas: Set[Ursula] = None) -> Set[Ursula]:
def sample_essential(self, quantity: int, handpicked_ursulas: Set[Ursula]) -> Set[Ursula]:
known_nodes = self.alice.known_nodes
if handpicked_ursulas:
# Prevent re-sampling of handpicked ursulas.
known_nodes = set(known_nodes) - set(handpicked_ursulas)
known_nodes = set(known_nodes) - handpicked_ursulas
sampled_ursulas = set(random.sample(k=quantity, population=list(known_nodes)))
return sampled_ursulas
@ -532,7 +530,6 @@ class BlockchainPolicy(Policy):
super().__init__(alice=alice, expiration=expiration, *args, **kwargs)
self.selection_buffer = 1.5
def validate_fee_value(self) -> None:
@ -576,58 +573,69 @@ class BlockchainPolicy(Policy):
params = dict(rate=rate, value=value)
return params
def __find_ursulas(self,
ether_addresses: List[str],
target_quantity: int,
timeout: int = 10) -> set: # TODO #843: Make timeout configurable
def sample_essential(self,
quantity: int,
handpicked_ursulas: Set[Ursula],
learner_timeout: int = 1,
timeout: int = 10) -> Set[Ursula]: # TODO #843: Make timeout configurable
start_time = # marker for timeout calculation
selected_ursulas = set(handpicked_ursulas)
quantity_remaining = quantity
found_ursulas, unknown_addresses = set(), deque()
while len(found_ursulas) < target_quantity: # until there are enough Ursulas
# Need to sample some stakers
delta = - start_time # check for a timeout
if delta.total_seconds() >= timeout:
missing_nodes = ', '.join(a for a in unknown_addresses)
raise RuntimeError("Timed out after {} seconds; Cannot find {}.".format(timeout, missing_nodes))
# Select an ether_address: Prefer the selection pool, then unknowns queue
if ether_addresses:
ether_address = ether_addresses.pop()
ether_address = unknown_addresses.popleft()
# Check if this is a known node.
selected_ursula = self.alice.known_nodes[ether_address]
except KeyError:
# Unknown Node
self.alice.learn_about_specific_nodes({ether_address}) # enter address in learning loop
# Known Node
found_ursulas.add(selected_ursula) # We already knew, or just learned about this ursula
return found_ursulas
def sample_essential(self, quantity: int, handpicked_ursulas: Set[Ursula] = None) -> Set[Ursula]:
# TODO: Prevent re-sampling of handpicked ursulas.
selected_addresses = set()
sampled_addresses = self.alice.recruit(quantity=quantity,
except StakingEscrowAgent.NotEnoughStakers as e:
error = f"Cannot create policy with {quantity} arrangements: {e}"
handpicked_addresses = [ursula.checksum_address for ursula in handpicked_ursulas]
reservoir = self.alice.get_stakers_reservoir(duration=self.duration_periods,
if len(reservoir) < quantity_remaining:
error = f"Cannot create policy with {quantity} arrangements"
raise self.NotEnoughBlockchainUrsulas(error)
# Capture the selection and search the network for those Ursulas
found_ursulas = self.__find_ursulas(sampled_addresses, quantity)
return found_ursulas
to_check = set(reservoir.draw(quantity_remaining))
# Sample stakers in a loop and feed them to the learner to check
# until we have enough in `selected_ursulas`.
start_time =
new_to_check = to_check
while True:
# Check if the sampled addresses are already known.
# If we're lucky, we won't have to wait for the learner iteration to finish.
known = {x for x in to_check if x in self.alice.known_nodes}
to_check = to_check - known
known = random.sample(known, min(len(known), quantity_remaining)) # we only need so many
selected_ursulas.update([self.alice.known_nodes[address] for address in known])
quantity_remaining -= len(known)
if quantity_remaining == 0:
new_to_check = reservoir.draw_at_most(quantity_remaining)
# Feed newly sampled stakers to the learner
# TODO: would be nice to wait for the learner to finish an iteration here,
# because if it hasn't, we really have nothing to do.
delta = - start_time
if delta.total_seconds() >= timeout:
still_checking = ', '.join(to_check)
raise RuntimeError(f"Timed out after {timeout} seconds; "
f"need {quantity_remaining} more, still checking {still_checking}.")
found_ursulas = list(selected_ursulas)
# Randomize the output to avoid the largest stakers always being the first in the list
system_random = random.SystemRandom()
system_random.shuffle(found_ursulas) # inplace
return set(found_ursulas)
def publish_to_blockchain(self) -> dict:

View File

@ -35,7 +35,7 @@ def policy_meta(testerchain, agency, token_economics, blockchain_ursulas):
agent = policy_agent
_policy_id = os.urandom(16)
staker_addresses = list(staking_agent.sample(quantity=3, duration=1))
staker_addresses = list(staking_agent.get_stakers_reservoir(duration=1).draw(3))
number_of_periods = 10
now = testerchain.w3.eth.getBlock(block_identifier='latest').timestamp
_txhash = agent.create_policy(policy_id=_policy_id,
@ -56,7 +56,7 @@ def test_create_policy(testerchain, agency, token_economics, mock_transacting_po
mock_transacting_power_activation(account=testerchain.alice_account, password=INSECURE_DEVELOPMENT_PASSWORD)
policy_id = os.urandom(16)
node_addresses = list(staking_agent.sample(quantity=3, duration=1))
node_addresses = list(staking_agent.get_stakers_reservoir(duration=1).draw(3))
now = testerchain.w3.eth.getBlock(block_identifier='latest').timestamp
receipt = agent.create_policy(policy_id=policy_id,

View File

@ -16,11 +16,13 @@ along with nucypher. If not, see <>.
from collections import Counter
from itertools import permutations
import random
import pytest
from nucypher.blockchain.economics import BaseEconomics
from nucypher.blockchain.eth.agents import StakingEscrowAgent
from nucypher.blockchain.eth.agents import StakingEscrowAgent, WeightedSampler
from nucypher.blockchain.eth.constants import NULL_ADDRESS, STAKING_ESCROW_CONTRACT_NAME
@ -115,7 +117,8 @@ def test_sampling_distribution(testerchain, token, deploy_contract, token_econom
sampled, failed = 0, 0
while sampled < SAMPLES:
addresses = set(staking_agent.sample(quantity=quantity, additional_ursulas=1, duration=1))
reservoir = staking_agent.get_stakers_reservoir(duration=1)
addresses = set(reservoir.draw(quantity))
except staking_agent.NotEnoughStakers:
failed += 1
@ -134,3 +137,42 @@ def test_sampling_distribution(testerchain, token, deploy_contract, token_econom
assert abs_error < ERROR_TOLERANCE
# TODO: Test something wrt to % of failed
def probability_reference_no_replacement(weights, idxs):
The probability of drawing elements with (distinct) indices ``idxs`` (in given order),
given ``weights``. No replacement.
assert len(set(idxs)) == len(idxs)
all_weights = sum(weights)
p = 1
for idx in idxs:
p *= weights[idx] / all_weights
all_weights -= weights[idx]
return p
@pytest.mark.parametrize('sample_size', [1, 2, 3])
def test_weighted_sampler(sample_size):
weights = [1, 9, 100, 2, 18, 70]
elements = list(range(len(weights)))
rng = random.SystemRandom()
counter = Counter()
weighted_elements = {element: weight for element, weight in zip(elements, weights)}
samples = 100000
sampler = WeightedSampler(weighted_elements)
for i in range(samples):
sample_set = sampler.sample_no_replacement(rng, sample_size)
counter.update({tuple(sample_set): 1})
for idxs in permutations(elements, sample_size):
test_prob = counter[idxs] / samples
ref_prob = probability_reference_no_replacement(weights, idxs)
# A rough estimate to check probabilities.
# A little too forgiving for samples with smaller probabilities,
# but can go up to 0.5 on occasion.
assert abs(test_prob - ref_prob) * samples**0.5 < 1

View File

@ -152,19 +152,19 @@ def test_sample_stakers(agency):
stakers_population = staking_agent.get_staker_population()
with pytest.raises(StakingEscrowAgent.NotEnoughStakers):
staking_agent.sample(quantity=stakers_population + 1, duration=1) # One more than we have deployed
staking_agent.get_stakers_reservoir(duration=1).draw(stakers_population + 1) # One more than we have deployed
stakers = staking_agent.sample(quantity=3, duration=5)
stakers = staking_agent.get_stakers_reservoir(duration=5).draw(3)
assert len(stakers) == 3 # Three...
assert len(set(stakers)) == 3 # ...unique addresses
# Same but with pagination
stakers = staking_agent.sample(quantity=3, duration=5, pagination_size=1)
stakers = staking_agent.get_stakers_reservoir(duration=5, pagination_size=1).draw(3)
assert len(stakers) == 3
assert len(set(stakers)) == 3
light = staking_agent.blockchain.is_light
staking_agent.blockchain.is_light = not light
stakers = staking_agent.sample(quantity=3, duration=5)
stakers = staking_agent.get_stakers_reservoir(duration=5).draw(3)
assert len(stakers) == 3
assert len(set(stakers)) == 3
staking_agent.blockchain.is_light = light
@ -292,13 +292,13 @@ def test_lock_restaking(agency, testerchain, test_registry):
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=test_registry)
current_period = staking_agent.get_current_period()
terminal_period = current_period + 2
assert staking_agent.is_restaking(staker_account)
assert not staking_agent.is_restaking_locked(staker_account)
receipt = staking_agent.lock_restaking(staker_account, release_period=terminal_period)
assert receipt['status'] == 1, "Transaction Rejected"
assert staking_agent.is_restaking_locked(staker_account)
testerchain.time_travel(periods=2) # Wait for re-staking lock to be released.
assert not staking_agent.is_restaking_locked(staker_account)