Coordinator agent vertical integration to Ursula

pull/3091/head
Kieran Prasch 2023-04-03 10:07:02 -07:00
parent d5ae5ac559
commit 3c48127fc9
7 changed files with 193 additions and 233 deletions

View File

@ -5,8 +5,9 @@ import maya
import time
from constant_sorrow.constants import FULL
from eth_typing import ChecksumAddress
from ferveo_py import ExternalValidator
from hexbytes import HexBytes
from typing import Optional, Tuple, Union
from typing import Optional, Tuple, Union, List
from web3 import Web3
from web3.types import TxReceipt
@ -35,14 +36,12 @@ from nucypher.blockchain.eth.token import NU
from nucypher.blockchain.eth.trackers.dkg import RitualTracker
from nucypher.blockchain.eth.trackers.pre import WorkTracker
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
from nucypher.crypto.ferveo import dkg
from nucypher.crypto.ferveo.dkg import (
AggregatedTranscript,
DecryptionShare,
Transcript,
)
from nucypher.crypto.ferveo.mock import FerveoError, Keypair
from nucypher.crypto.powers import CryptoPower, TransactingPower
from nucypher.crypto.powers import CryptoPower, TransactingPower, RitualisticPower
from nucypher.network.trackers import OperatorBondedTracker
from nucypher.policy.conditions.lingo import ConditionLingo
from nucypher.policy.payment import ContractPayment
@ -453,7 +452,13 @@ class Ritualist(BaseActor):
contract=self.coordinator_agent.contract
)
self.dkg_storage = {"transcripts": {}, "aggregated_transcripts": {}}
self.dkg_storage = {
"transcripts": {},
"aggregated_transcripts": {},
"public_keys": {},
"generator_inverses": {},
}
self.ritual_power = crypto_power.power_ups(RitualisticPower)
def get_ritual(self, ritual_id: int) -> CoordinatorAgent.Ritual:
try:
@ -466,11 +471,11 @@ class Ritualist(BaseActor):
self.dkg_storage["transcripts"][ritual_id] = bytes(transcript)
def store_aggregated_transcript(
self, ritual_id: int, aggregated_transcript: AggregatedTranscript
self, ritual_id: int, aggregated_transcript: AggregatedTranscript, public_key: bytes, generator_inverse
) -> None:
self.dkg_storage["aggregated_transcripts"][ritual_id] = bytes(
aggregated_transcript
)
self.dkg_storage["aggregated_transcripts"][ritual_id] = bytes(aggregated_transcript)
self.dkg_storage["public_keys"][ritual_id] = public_key
self.dkg_storage["generator_inverses"][ritual_id] = generator_inverse
def get_aggregated_transcript(self, ritual_id: int) -> AggregatedTranscript:
data = self.dkg_storage["aggregated_transcripts"][ritual_id]
@ -482,7 +487,48 @@ class Ritualist(BaseActor):
transcript = Transcript.from_bytes(data)
return transcript
def perform_round_1(self, ritual_id: int, timestamp: int, *args, **kwargs):
def get_public_key(self, ritual_id: int) -> bytes:
return self.dkg_storage["public_keys"][ritual_id]
def recruit_validators(
self,
ritual: CoordinatorAgent.Ritual,
timeout: int = 60
) -> List[Tuple[ExternalValidator, Transcript]]:
sorted_cohort = list(sorted(ritual.transcripts, key=lambda x: int(x[0], 16)))
validator_checksum_addresses = [n[0] for n in sorted_cohort]
if timeout > 0:
nodes_to_discover = list(set(validator_checksum_addresses) - {self.checksum_address})
self.block_until_specific_nodes_are_known(
addresses=nodes_to_discover,
timeout=timeout,
allow_missing=0
)
validators = list()
for node_checksum_address, transcript_bytes in sorted_cohort:
if self.checksum_address == node_checksum_address:
# Local
external_validator = ExternalValidator(
address=self.checksum_address,
public_key=self.ritual_power.public_key()
)
else:
# Remote
try:
remote_ritualist = self.known_nodes[node_checksum_address]
except KeyError:
raise self.ActorError(f"Unknown node {node_checksum_address}")
public_key = remote_ritualist.public_keys(RitualisticPower)
self.log.debug(f"Ferveo public key for {node_checksum_address} is {bytes(public_key).hex()[:-8:-1]}")
external_validator = ExternalValidator(address=node_checksum_address, public_key=public_key)
transcript = Transcript.from_bytes(transcript_bytes) if transcript_bytes else bytes()
validators.append((external_validator, transcript))
return validators
def perform_round_1(self, ritual_id: int, timestamp: int):
ritual = self.get_ritual(ritual_id)
status = self.coordinator_agent.get_ritual_status(ritual_id=ritual_id)
if status != CoordinatorAgent.Ritual.Status.AWAITING_TRANSCRIPTS:
@ -490,7 +536,7 @@ class Ritualist(BaseActor):
f"ritual #{ritual.id} is not waiting for transcripts."
)
node_index = self.ritual_tracker.get_node_index(
ritual_id=ritual_id, node=self.transacting_power.account
ritual_id=ritual_id, node=self.checksum_address
)
if ritual.participants[node_index].transcript:
raise self.RitualError(
@ -500,34 +546,38 @@ class Ritualist(BaseActor):
f"performing round 1 of DKG ritual #{ritual_id} from blocktime {timestamp}"
)
try:
transcript = dkg.generate_transcript(
ritual_id=ritual_id,
checksum_address=self.transacting_power.account,
shares=len(ritual.nodes),
threshold=len(ritual.nodes) // 2 + 1, # TODO: Make this configurable and reachable via contract
nodes=ritual.nodes,
) # TODO: Error handling
except FerveoError:
raise self.ActorError(
f"error generating DKG transcript for ritual #{ritual_id}"
)
nodes, transcripts = list(zip(*self.recruit_validators(ritual)))
if any(transcripts):
self.log.debug(f"ritual #{ritual_id} already has published transcripts")
self.store_transcript(ritual_id=ritual_id, transcript=transcript)
transcript = self.ritual_power.generate_transcript(
nodes=nodes,
threshold=(ritual.shares // 2) + 1,
shares=ritual.shares,
checksum_address=self.checksum_address,
ritual_id=ritual_id
)
self.store_transcript(
ritual_id=ritual_id,
transcript=transcript,
)
receipt = self.coordinator_agent.post_transcript(
node_index=self.ritual_tracker.get_node_index(
ritual_id=ritual_id, node=self.transacting_power.account
ritual_id=ritual_id, node=self.checksum_address
),
ritual_id=ritual_id,
transcript=bytes(transcript),
transacting_power=self.transacting_power,
)
self.log.debug(f"{self.transacting_power.account[:8]} completed round 1 of DKG ritual #{ritual_id}")
total = ritual.total_transcripts + 1
self.log.debug(f"{self.transacting_power.account[:8]} submitted a transcript for "
f"DKG ritual #{ritual_id} ({total}/{len(ritual.nodes)})")
return receipt
def perform_round_2(self, ritual_id: int, timestamp: int, *args, **kwargs):
def perform_round_2(self, ritual_id: int, timestamp: int):
ritual = self.get_ritual(ritual_id)
status = self.coordinator_agent.get_ritual_status(ritual_id=ritual_id)
if status != CoordinatorAgent.Ritual.Status.AWAITING_AGGREGATIONS:
@ -538,64 +588,78 @@ class Ritualist(BaseActor):
f"{self.transacting_power.account[:8]} performing round 2 of DKG ritual #{ritual_id} from blocktime {timestamp}"
)
try:
aggregated_transcript, public_key = dkg.aggregate_transcripts(
ritual_id=ritual_id,
checksum_address=self.transacting_power.account,
shares=len(ritual.nodes),
threshold=len(ritual.nodes) // 2 + 1, # TODO: Make this configurable and reachable via contract
nodes=ritual.nodes,
transcripts=ritual.transcripts,
) # TODO: better error handling
except FerveoError:
nodes, transcripts = list(zip(*self.recruit_validators(ritual)))
if not all(transcripts):
raise self.ActorError(
f"{self.transacting_power.account[:8]} encountered error aggregating DKG transcript fr ritual #{ritual_id}"
f"ritual #{ritual_id} is missing transcripts from {len([t for t in transcripts if not t])} nodes."
)
aggregated_transcript, dkg_public_key, generator_inverse = self.ritual_power.aggregate_transcripts(
nodes=nodes,
threshold=(ritual.shares // 2) + 1,
shares=ritual.shares,
checksum_address=self.checksum_address,
ritual_id=ritual_id,
transcripts=transcripts
)
self.store_aggregated_transcript(
ritual_id=ritual_id,
aggregated_transcript=aggregated_transcript
aggregated_transcript=aggregated_transcript,
public_key=dkg_public_key,
generator_inverse=generator_inverse,
)
receipt = self.coordinator_agent.post_aggregation(
ritual_id=ritual_id,
node_index=self.ritual_tracker.get_node_index(
ritual_id=ritual_id,
node=self.transacting_power.account
node=self.checksum_address
),
aggregated_transcript=aggregated_transcript,
transacting_power=self.transacting_power
)
self.log.debug(f"{self.transacting_power.account[:8]} completed round 2 of DKG ritual #{ritual_id}")
total = ritual.total_aggregations + 1
self.log.debug(f"{self.transacting_power.account[:8]} aggregated a transcript for "
f"DKG ritual #{ritual_id} ({total}/{len(ritual.nodes)})")
if total == len(ritual.nodes):
self.log.debug(f"DKG ritual #{ritual_id} is ready to be finalized")
return receipt
def derive_decryption_share(
self, ritual_id: int, ciphertext: bytes, conditions: ConditionLingo
self,
ritual_id: int,
ciphertext: bytes,
conditions: ConditionLingo
) -> DecryptionShare:
ritual = self.get_ritual(ritual_id)
status = self.coordinator_agent.get_ritual_status(ritual_id=ritual_id)
if status != CoordinatorAgent.Ritual.Status.FINALIZED:
raise self.ActorError(f"ritual #{ritual.id} is not finalized.")
aggregated_transcript = self.get_aggregated_transcript(ritual_id)
try:
decryption_share = dkg.derive_decryption_share(
ritual_id=ritual_id,
checksum_address=self.checksum_address,
shares=len(ritual.nodes),
threshold=len(ritual.nodes) // 2 + 1, # TODO: Make this configurable and reachable via contract
nodes=ritual.nodes,
aggregated_transcript=aggregated_transcript,
keypair=Keypair.random(), # self.crypto_power(RitualPower), # TODO: Use the right keypair
ciphertext=bytes(ciphertext),
aad=bytes(conditions),
)
except FerveoError:
# TODO: better error handling
aggregated_transcript = self.get_aggregated_transcript(ritual_id)
ritual.aggregated_transcript
nodes, transcripts = list(zip(*self.recruit_validators(ritual)))
if not all(transcripts):
raise self.ActorError(
f"{self.transacting_power.account[:8]} encountered an error deriving decryption share for ritual #{ritual_id}"
f"ritual #{ritual_id} is missing transcripts"
)
decryption_share = self.ritual_power.derive_decryption_share(
nodes=nodes,
threshold=(ritual.shares // 2) + 1,
shares=ritual.shares,
checksum_address=self.checksum_address,
ritual_id=ritual_id,
aggregated_transcript=aggregated_transcript,
ciphertext=ciphertext,
conditions=conditions
)
return decryption_share

View File

@ -1,23 +1,21 @@
from dataclasses import dataclass, field
from enum import Enum
import os
import random
import sys
from bisect import bisect_right
from itertools import accumulate
from typing import Dict, Iterable, List, Tuple, Type, Any, Optional, cast, NamedTuple
from dataclasses import dataclass, field
from constant_sorrow.constants import ( # type: ignore
import sys
from constant_sorrow.constants import (
CONTRACT_ATTRIBUTE, # type: ignore
CONTRACT_CALL,
TRANSACTION,
CONTRACT_ATTRIBUTE
)
from eth_typing.evm import ChecksumAddress
from eth_utils.address import to_checksum_address
from ferveo_py import AggregatedTranscript
from itertools import accumulate
from typing import Dict, Iterable, List, Tuple, Type, Any, Optional, cast, NamedTuple
from web3.contract import Contract, ContractFunction
from web3.types import Wei, Timestamp, TxReceipt, TxParams
from web3.types import Timestamp, TxParams, TxReceipt, Wei
from nucypher.blockchain.eth.constants import (
ADJUDICATOR_CONTRACT_NAME,
@ -25,31 +23,23 @@ from nucypher.blockchain.eth.constants import (
ETH_ADDRESS_BYTE_LENGTH,
NUCYPHER_TOKEN_CONTRACT_NAME,
NULL_ADDRESS,
PRE_APPLICATION_CONTRACT_NAME,
SUBSCRIPTION_MANAGER_CONTRACT_NAME,
PRE_APPLICATION_CONTRACT_NAME
)
from nucypher.blockchain.eth.decorators import contract_api
from nucypher.blockchain.eth.events import ContractEvents
from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory
from nucypher.blockchain.eth.registry import BaseContractRegistry
from nucypher.config.constants import (
NUCYPHER_ENVVAR_STAKING_PROVIDERS_PAGINATION_SIZE,
NUCYPHER_ENVVAR_STAKING_PROVIDERS_PAGINATION_SIZE_LIGHT_NODE,
NUCYPHER_ENVVAR_STAKING_PROVIDERS_PAGINATION_SIZE
)
from nucypher.crypto.powers import TransactingPower
from nucypher.crypto.utils import sha256_digest
from nucypher.types import (
Agent,
NuNits,
StakingProviderInfo,
TuNits
)
from nucypher.types import Agent, NuNits, StakingProviderInfo, TuNits
from nucypher.utilities.logging import Logger # type: ignore
from ferveo import (AggregatedTranscript, DecryptionShare, Dkg, Keypair, PublicKey, Transcript)
class EthereumContractAgent:
"""
Base class for ethereum contract wrapper types that interact with blockchain contract instances
@ -564,21 +554,21 @@ class CoordinatorAgent(EthereumContractAgent):
FINALIZED = 5
@dataclass
class Performance:
class Participant:
node: ChecksumAddress
aggregated: bool
transcript: bytes
aggregated: bool = False
transcript: bytes = bytes()
id: int
initiator: ChecksumAddress
dkg_size: int
init_timestamp: int
total_transcripts: int
total_aggregations: int
public_key: bytes
aggregated_transcript_hash: bytes
aggregation_mismatch: bool
aggregated_transcript: bytes
total_transcripts: int = 0
total_aggregations: int = 0
public_key: bytes = bytes()
aggregated_transcript_hash: bytes = bytes()
aggregation_mismatch: bool = False
aggregated_transcript: bytes = bytes()
participants: List = field(default_factory=list)
@property
@ -586,21 +576,10 @@ class CoordinatorAgent(EthereumContractAgent):
return [p.node for p in self.participants]
@property
def transcripts(self) -> List[bytes]:
def transcripts(self) -> List[Tuple[ChecksumAddress, bytes]]:
transcripts = list()
for p in self.participants:
if p.aggregated:
raise RuntimeError(f"{p.node[:8]} transcript is already aggregated")
transcripts.append(p.transcript)
return transcripts
@property
def aggregated_transcripts(self) -> List[bytes]:
transcripts = list()
for p in self.participants:
if not p.aggregated:
raise RuntimeError(f"{p.node[:8]} transcript not aggregated")
transcripts.append(p.transcript)
transcripts.append((p.node, p.transcript))
return transcripts
@property
@ -633,11 +612,11 @@ class CoordinatorAgent(EthereumContractAgent):
return result
@contract_api(CONTRACT_CALL)
def get_participants(self, ritual_id: int) -> List[Ritual.Performance]:
def get_participants(self, ritual_id: int) -> List[Ritual.Participant]:
result = self.contract.functions.getParticipants(ritual_id).call()
participants = list()
for r in result:
performance = self.Ritual.Performance(
performance = self.Ritual.Participant(
node=ChecksumAddress(r[0]), aggregated=r[1], transcript=bytes(r[2])
)
participants.append(performance)

View File

@ -1,9 +1,9 @@
import os
import time
from typing import Callable, List, Tuple, Type, Union, Optional
from typing import Callable, List, Optional, Tuple, Type, Union
from eth_typing import ChecksumAddress
from ferveo import (
from ferveo_py import (
AggregatedTranscript,
DecryptionShare,
Dkg,
@ -27,6 +27,7 @@ class EventActuator(EventScanner):
"""Act on events that are found by the scanner."""
def __init__(self, hooks: List[Callable], clear: bool = True, *args, **kwargs):
self.log = Logger("EventActuator")
if clear and os.path.exists(JSONifiedState.STATE_FILENAME):
os.remove(JSONifiedState.STATE_FILENAME)
self.hooks = hooks
@ -34,7 +35,11 @@ class EventActuator(EventScanner):
def process_event(self, event, get_block_when):
for hook in self.hooks:
hook(event, get_block_when)
try:
hook(event, get_block_when)
except Exception as e:
self.log.warn("Error during event hook: {}".format(e))
raise
super().process_event(event, get_block_when)
@ -132,10 +137,14 @@ class RitualTracker:
"""Start the event scanner task."""
return self.task.start()
def stop(self):
"""Stop the event scanner task."""
return self.task.stop()
def __action_required(self, event_type: Type[ContractEvent], block_number: int, ritual_id: int):
"""Check if an action is required for a given event."""
if (event_type, ritual_id) in self.active_tasks:
self.log.debug(f"Already tracking {event_type} for ritual {ritual_id} from block #{block_number}")
# self.log.debug(f"Already tracking {event_type} for ritual {ritual_id} from block #{block_number}")
return False
return True
@ -145,7 +154,7 @@ class RitualTracker:
event_type = getattr(self.contract.events, event.event)
if hasattr(args, "nodes"):
# Filter out events that are not for me
if self.ritualist.transacting_power.account not in args.nodes:
if self.ritualist.checksum_address not in args.nodes:
self.log.debug(f"Event {name} is not for me, skipping")
return None, event_type
if not self.__action_required(event_type, event.blockNumber, args.ritualId):
@ -181,7 +190,7 @@ class RitualTracker:
def __scan(self, start_block, end_block, account):
# Run the scan
self.log.debug(f"({account[:8]}) Scanning events from blocks {start_block} - {end_block}")
# self.log.debug(f"({account[:8]}) Scanning events from blocks {start_block} - {end_block}")
start = time.time()
result, total_chunks_scanned = self.scanner.scan(start_block, end_block)
if self.persistent:
@ -207,7 +216,7 @@ class RitualTracker:
self.__scan(start_block, end_block, self.ritualist.transacting_power.account)
def get_node_index(self, ritual_id: int, node: ChecksumAddress) -> int:
return [p.node for p in self.rituals[ritual_id].participants].index(node)
return self.rituals[ritual_id].nodes.index(node)
def add_ritual(self, ritual):
self.rituals[ritual.id] = ritual

View File

@ -1,22 +1,7 @@
import contextlib
import json
import time
from pathlib import Path
from queue import Queue
from typing import (
Any,
Dict,
Iterable,
List,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
)
import maya
import time
from constant_sorrow import constants
from constant_sorrow.constants import (
INVALIDATED,
@ -41,14 +26,23 @@ from nucypher_core import (
ReencryptionResponse,
TreasureMap,
)
from nucypher_core.umbral import (
PublicKey,
RecoverableSignature,
VerifiedKeyFrag,
reencrypt,
)
from nucypher_core.umbral import PublicKey, VerifiedKeyFrag, reencrypt, RecoverableSignature
from pathlib import Path
from queue import Queue
from twisted.internet import reactor
from twisted.logger import Logger
from typing import (
Any,
Dict,
Iterable,
List,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
)
from web3.types import TxReceipt
import nucypher
@ -74,7 +68,7 @@ from nucypher.crypto.powers import (
PowerUpError,
SigningPower,
TLSHostingPower,
TransactingPower,
TransactingPower, RitualisticPower,
)
from nucypher.network.exceptions import NodeSeemsToBeDown
from nucypher.network.middleware import RestMiddleware
@ -82,7 +76,7 @@ from nucypher.network.nodes import TEACHER_NODES, NodeSprout, Teacher
from nucypher.network.protocols import parse_node_uri
from nucypher.network.retrieval import RetrievalClient
from nucypher.network.server import ProxyRESTServer, make_rest_app
from nucypher.network.trackers import AvailabilityTracker, OperatorBondedTracker
from nucypher.network.trackers import AvailabilityTracker
from nucypher.policy.conditions.types import LingoList
from nucypher.policy.conditions.utils import validate_condition_lingo
from nucypher.policy.kits import PolicyMessageKit
@ -529,6 +523,7 @@ class Ursula(Teacher, Character, Operator, Ritualist):
_default_crypto_powerups = [
SigningPower,
DecryptingPower,
RitualisticPower,
# TLSHostingPower # Still considered a default for Ursula, but needs the host context
]
@ -889,6 +884,7 @@ class Ursula(Teacher, Character, Operator, Ritualist):
operator_signature=operator_signature,
verifying_key=self.public_keys(SigningPower),
encrypting_key=self.public_keys(DecryptingPower),
ferveo_public_key=bytes(self.public_keys(RitualisticPower)), # TODO: use type
certificate_der=self.certificate.public_bytes(Encoding.DER),
host=self.rest_interface.host,
port=self.rest_interface.port)

View File

@ -1,94 +0,0 @@
import os
from dataclasses import dataclass
from eth_typing import ChecksumAddress
from typing import List
class FerveoError(Exception):
pass
@dataclass
class PublicKey:
public_key: bytes
@dataclass
class Keypair:
public_key: PublicKey
secret_key: bytes
@staticmethod
def random():
return Keypair(PublicKey(os.urandom(32)), os.urandom(32))
@dataclass
class DecryptionShare:
share: bytes
def __bytes__(self):
return self.share
def validate(self, *args, **kwargs) -> bool:
return True
@dataclass
class Transcript:
transcript: bytes
@classmethod
def from_bytes(cls, transcript: bytes):
return cls(transcript=transcript)
def __bytes__(self):
return self.transcript
def validate(self, *args, **kwargs) -> bool:
return True
@dataclass
class AggregatedTranscript:
transcript: bytes
@classmethod
def from_bytes(cls, transcript: bytes):
return cls(transcript=transcript)
def __bytes__(self):
return self.transcript
@staticmethod
def validate(self, *args, **kwargs) -> bool:
return True
@staticmethod
def create_decryption_share(*args, **kwargs) -> DecryptionShare:
return DecryptionShare(share=os.urandom(32))
@dataclass
class Dkg:
tau: int
shares_num: int
security_threshold: int
validators: List[ChecksumAddress]
me: ChecksumAddress
@staticmethod
def generate_transcript(*args, **kwargs) -> Transcript:
return Transcript(transcript=os.urandom(32))
@staticmethod
def aggregate_transcripts(*args, **kwargs) -> AggregatedTranscript:
return AggregatedTranscript(transcript=os.urandom(32))
@staticmethod
def validate(self, *args, **kwargs) -> bool:
return True
@property
def final_key(self) -> PublicKey:
return PublicKey(public_key=os.urandom(32))

View File

@ -128,6 +128,10 @@ class NodeSprout:
def encrypting_key(self):
return self._metadata_payload.encrypting_key
@property
def ferveo_public_key(self):
return self._metadata_payload.ferveo_public_key
@property
def operator_signature_from_metadata(self):
return self._metadata_payload.operator_signature or NOT_SIGNED
@ -150,6 +154,7 @@ class NodeSprout:
crypto_power = CryptoPower()
crypto_power.consume_power_up(SigningPower(public_key=self._metadata_payload.verifying_key))
crypto_power.consume_power_up(DecryptingPower(public_key=self._metadata_payload.encrypting_key))
crypto_power.consume_power_up(RitualPower(public_key=self._metadata_payload.ferveo_public_key))
return Ursula(is_me=False,
crypto_power=crypto_power,

View File

@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure
@ -13,25 +14,25 @@ class SimpleTask(ABC):
def __init__(self):
self.log = Logger(self.__class__.__name__)
self.__task = LoopingCall(self.run)
self._task = LoopingCall(self.run)
# self.__task.clock = self.CLOCK
@property
def running(self) -> bool:
"""Determine whether the task is already running."""
return self.__task.running
return self._task.running
def start(self, now: bool = False):
"""Start task."""
if not self.running:
d = self.__task.start(interval=self.INTERVAL, now=now)
d = self._task.start(interval=self.INTERVAL, now=now)
d.addErrback(self.handle_errors)
return d
# return d
def stop(self):
"""Stop task."""
if self.running:
self.__task.stop()
self._task.stop()
@abstractmethod
def run(self):