Use pathlib

pull/2692/head
Piotr Roslaniec 2021-05-11 19:55:13 +02:00
parent 47d281a30e
commit 6a270c1713
66 changed files with 464 additions and 394 deletions

View File

@ -20,6 +20,7 @@ import sys
import json
import os
import shutil
from pathlib import Path
import maya
@ -38,7 +39,7 @@ from nucypher.utilities.logging import GlobalLoggerSettings
GlobalLoggerSettings.start_console_logging()
TEMP_ALICE_DIR = os.path.join('/', 'tmp', 'heartbeat-demo-alice')
TEMP_ALICE_DIR = Path('/', 'tmp', 'heartbeat-demo-alice')
# if your ursulas are NOT running on your current host,
@ -72,7 +73,7 @@ ursula = Ursula.from_seed_and_stake_info(seed_uri=SEEDNODE_URI,
minimum_stake=0)
alice_config = AliceConfiguration(
config_root=os.path.join(TEMP_ALICE_DIR),
config_root=TEMP_ALICE_DIR,
domain=TEMPORARY_DOMAIN,
known_nodes={ursula},
start_learning_now=False,

View File

@ -20,6 +20,7 @@
import base64
import json
import os
from pathlib import Path
import click
@ -49,7 +50,7 @@ def mario_box_cli(plaintext_dir, alice_config, label, outfile):
with click.progressbar(paths) as bar:
for path in bar:
filepath = os.path.join(plaintext_dir, path)
filepath = Path(plaintext_dir, path)
with open(filepath, 'rb') as file:
plaintext = file.read()
encoded_plaintext = base64.b64encode(plaintext)

View File

@ -43,7 +43,7 @@ def spin_up_federated_ursulas(quantity: int = FLEET_POPULATION):
sage_dir = str(USER_CACHE / 'sage.db')
ursulas = []
if not os.path.exists(sage_dir):
if not sage_dir.exists():
os.makedirs(sage_dir)
sage = ursula_maker(rest_port=ports[0], db_filepath=sage_dir)

View File

@ -15,15 +15,13 @@ You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import json
from pathlib import Path
from typing import List
import random
from os.path import abspath, dirname, join
_HERE = abspath(dirname(__file__))
with open(join(_HERE, 'web_colors.json')) as f:
_HERE = Path(__file__)
with open(_HERE.parent / 'web_colors.json') as f:
_COLORS = json.load(f)['colors']
_SYMBOLS = {

View File

@ -325,7 +325,7 @@ class ContractAdministrator(BaseActor):
def save_deployment_receipts(self, receipts: dict, filename_prefix: str = 'deployment') -> str:
config_root = DEFAULT_CONFIG_ROOT # We force the use of the default here.
filename = f'{filename_prefix}-receipts-{self.deployer_address[:6]}-{maya.now().epoch}.json'
filepath = os.path.join(config_root, filename)
filepath = config_root / filename
os.makedirs(config_root, exist_ok=True)
with open(filepath, 'w') as file:
data = dict()

View File

@ -19,6 +19,8 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import math
import os
import pprint
from pathlib import Path
from eth.typing import TransactionDict
from typing import Callable, NamedTuple, Tuple, Union, Optional
from typing import List
@ -381,7 +383,7 @@ class BlockchainInterface:
# auto-detect for file based ipc
if not provider_scheme:
if os.path.exists(provider_uri):
if Path(provider_uri).is_file():
# file is available - assume ipc/file scheme
provider_scheme = 'file'
self.log.info(f"Auto-detected provider scheme as 'file://' for provider {provider_uri}")

View File

@ -16,10 +16,11 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import json
from json import JSONDecodeError
from os.path import abspath, dirname
import hashlib
import os
from pathlib import Path
import requests
import shutil
import tempfile
@ -345,7 +346,7 @@ class LocalContractRegistry(BaseContractRegistry):
REGISTRY_TYPE = 'contract'
def __init__(self, filepath: str, *args, **kwargs):
def __init__(self, filepath: Path, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__filepath = filepath
self.log.info(f"Using {self.REGISTRY_TYPE} registry {filepath}")
@ -355,8 +356,8 @@ class LocalContractRegistry(BaseContractRegistry):
return r
@property
def filepath(self) -> str:
return str(self.__filepath)
def filepath(self) -> Path:
return self.__filepath
def _swap_registry(self, filepath: str) -> bool:
self.__filepath = filepath
@ -398,7 +399,7 @@ class LocalContractRegistry(BaseContractRegistry):
it will _overwrite_ everything in it.
"""
# Ensure parent path exists
os.makedirs(abspath(dirname(self.__filepath)), exist_ok=True)
os.makedirs(self.__filepath.parent, exist_ok=True)
with open(self.__filepath, 'w') as registry_file:
registry_file.seek(0)
@ -432,12 +433,12 @@ class TemporaryContractRegistry(LocalContractRegistry):
with open(self.filepath, 'w') as registry_file:
registry_file.write('')
def commit(self, filepath) -> str:
def commit(self, filepath) -> Path:
"""writes the current state of the registry to a file"""
self.log.info("Committing temporary registry to {}".format(filepath))
self._swap_registry(filepath) # I'll allow it
if os.path.exists(filepath):
if filepath.exists():
self.log.debug("Removing registry {}".format(filepath))
self.clear() # clear prior sim runs
@ -474,12 +475,12 @@ class InMemoryContractRegistry(BaseContractRegistry):
raise
return registry_data
def commit(self, filepath: str = None, overwrite: bool = False) -> str:
def commit(self, filepath: Path = None, overwrite: bool = False) -> Path:
"""writes the current state of the registry to a file"""
if not filepath:
filepath = os.path.join(DEFAULT_CONFIG_ROOT, self.REGISTRY_NAME)
filepath = DEFAULT_CONFIG_ROOT / self.REGISTRY_NAME
self.log.info("Committing in-memory registry to disk.")
if os.path.exists(filepath) and not overwrite:
if filepath.exists() and not overwrite:
existing_registry = LocalContractRegistry(filepath=filepath)
raise self.CantOverwriteRegistry(f"Registry #{existing_registry.id[:16]} exists at {filepath} "
f"while writing Registry #{self.id[:16]}). "

View File

@ -16,6 +16,7 @@
"""
import json
from pathlib import Path
import eth_utils
import math
@ -90,7 +91,7 @@ class Felix(Character, NucypherTokenActor):
pass
def __init__(self,
db_filepath: str,
db_filepath: Path,
rest_host: str,
rest_port: int,
client_password: str = None,
@ -112,7 +113,7 @@ class Felix(Character, NucypherTokenActor):
# Database
self.db_filepath = db_filepath
self.db = NO_DATABASE_AVAILABLE
self.db_engine = create_engine(f'sqlite:///{self.db_filepath}', convert_unicode=True)
self.db_engine = create_engine(f'sqlite:///{self.db_filepath.absolute()}', convert_unicode=True)
# Blockchain
self.transacting_power = TransactingPower(password=client_password,

View File

@ -17,6 +17,8 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import contextlib
from pathlib import Path
import json
import random
import time
@ -1099,9 +1101,9 @@ class Ursula(Teacher, Character, Worker):
is_me: bool = True,
certificate: Certificate = None,
certificate_filepath: str = None,
certificate_filepath: Path = None,
db_filepath: str = None,
db_filepath: Path = None,
interface_signature=None,
timestamp=None,
availability_check: bool = False, # TODO: Remove from init

View File

@ -17,6 +17,7 @@
import glob
import json
from json.decoder import JSONDecodeError
from pathlib import Path
from typing import Optional, Type, List
import click
@ -50,25 +51,26 @@ def forget(emitter: StdoutEmitter, configuration: CharacterConfiguration) -> Non
emitter.message(SUCCESSFUL_FORGET_NODES, color='red')
def get_config_filepaths(config_class: Type[CharacterConfiguration], config_root: str = None) -> List:
def get_config_filepaths(config_class: Type[CharacterConfiguration], config_root: Path = None) -> List:
#
# Scrape disk for configuration files
#
config_root = config_root or DEFAULT_CONFIG_ROOT
default_config_file = glob.glob(config_class.default_filepath(config_root=config_root))
default_config_file = glob.glob(str(config_class.default_filepath(config_root=config_root)))
# updated glob pattern for secondary configuration files accommodates for:
# 1. configuration files with "0x..." checksum address as suffix - including older ursula config files
# 2. newer (ursula) configuration files which use signing_pub_key[:8] as hex as the suffix
glob_pattern = f'{config_root}/{config_class.NAME}-[0-9a-fA-F]*.{config_class._CONFIG_FILE_EXTENSION}'
glob_pattern = f'{str(config_root)}/{config_class.NAME}-[0-9a-fA-F]*.{config_class._CONFIG_FILE_EXTENSION}'
secondary_config_files = sorted(glob.glob(glob_pattern)) # sort list to make order deterministic
config_files = [*default_config_file, *secondary_config_files]
config_files = [Path(f) for f in config_files]
return config_files
def get_or_update_configuration(emitter: StdoutEmitter,
filepath: str,
filepath: Path,
config_class: Type[CharacterConfiguration],
updates: Optional[dict] = None) -> None:
"""
@ -104,7 +106,7 @@ def destroy_configuration(emitter: StdoutEmitter,
def handle_missing_configuration_file(character_config_class: Type[CharacterConfiguration],
init_command_hint: str = None,
config_file: str = None) -> None:
config_file: Path = None) -> None:
"""Display a message explaining there is no configuration file to use and abort the current operation."""
config_file_location = config_file or character_config_class.default_filepath()
init_command = init_command_hint or f"{character_config_class.NAME} init"
@ -112,12 +114,12 @@ def handle_missing_configuration_file(character_config_class: Type[CharacterConf
if name == StakeHolderConfiguration.NAME.capitalize():
init_command = 'stake init-stakeholder'
message = MISSING_CONFIGURATION_FILE.format(name=name, init_command=init_command)
raise click.FileError(filename=config_file_location, hint=message)
raise click.FileError(filename=str(config_file_location), hint=message)
def handle_invalid_configuration_file(emitter: StdoutEmitter,
config_class: Type[CharacterConfiguration],
filepath: str) -> None:
filepath: Path) -> None:
"""
Attempt to deserialize a config file that is not a valid nucypher character configuration
as a means of user-friendly debugging. :-) I hope this helps!

View File

@ -208,9 +208,9 @@ def select_network(emitter: StdoutEmitter) -> str:
def select_config_file(emitter: StdoutEmitter,
config_class: Type[CharacterConfiguration],
config_root: str = None,
config_root: Path = None,
checksum_address: str = None,
) -> str:
) -> Path:
"""
Selects a nucypher character configuration file from the disk automatically or interactively.

View File

@ -14,7 +14,7 @@ GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import click
@ -185,7 +185,7 @@ class AliceFullConfigOptions:
self.n = n
self.payment_periods = payment_periods
def generate_config(self, emitter: StdoutEmitter, config_root: str) -> AliceConfiguration:
def generate_config(self, emitter: StdoutEmitter, config_root: Path) -> AliceConfiguration:
opts = self.config_options

View File

@ -15,6 +15,7 @@
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from base64 import b64decode
from pathlib import Path
import click
@ -68,7 +69,7 @@ class BobConfigOptions:
def __init__(self,
provider_uri: str,
network: str,
registry_filepath: str,
registry_filepath: Path,
checksum_address: str,
discovery_port: int,
dev: bool,

View File

@ -17,6 +17,7 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import json
from pathlib import Path
import click
from constant_sorrow import constants
@ -89,7 +90,7 @@ from nucypher.cli.painting.deployment import (
from nucypher.cli.painting.help import echo_solidity_version
from nucypher.cli.painting.multisig import paint_multisig_proposed_transaction
from nucypher.cli.painting.transactions import paint_receipt_summary
from nucypher.cli.types import EIP55_CHECKSUM_ADDRESS, EXISTING_READABLE_FILE, WEI
from nucypher.cli.types import EIP55_CHECKSUM_ADDRESS, EXISTING_READABLE_FILE, PathPath, WEI
from nucypher.cli.utils import (
deployer_pre_launch_warnings,
ensure_config_root,
@ -100,7 +101,8 @@ from nucypher.crypto.powers import TransactingPower
option_deployer_address = click.option('--deployer-address', help="Deployer's checksum address", type=EIP55_CHECKSUM_ADDRESS)
option_registry_infile = click.option('--registry-infile', help="Input path for contract registry file", type=EXISTING_READABLE_FILE)
option_registry_outfile = click.option('--registry-outfile', help="Output path for contract registry file", type=click.Path(file_okay=True))
option_registry_outfile = click.option('--registry-outfile', help="Output path for contract registry file",
type=PathPath(file_okay=True))
option_target_address = click.option('--target-address', help="Address of the target contract", type=EIP55_CHECKSUM_ADDRESS)
option_gas = click.option('--gas', help="Operate with a specified gas per-transaction limit", type=click.IntRange(min=1))
option_ignore_deployed = click.option('--ignore-deployed', help="Ignore already deployed contracts if exist.", is_flag=True)

View File

@ -14,7 +14,7 @@ GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import click
import os
@ -73,9 +73,9 @@ class FelixConfigOptions:
provider_uri,
signer_uri,
host,
db_filepath,
db_filepath: Path,
checksum_address,
registry_filepath,
registry_filepath: Path,
poa,
port):
@ -114,7 +114,7 @@ class FelixConfigOptions:
config_file=config_file
)
def generate_config(self, config_root, discovery_port):
def generate_config(self, config_root: Path, discovery_port):
return FelixConfiguration.generate(
password=get_nucypher_password(emitter=StdoutEmitter(), confirm=True),
config_root=config_root,
@ -238,7 +238,7 @@ def createdb(general_config, character_options, config_file, force):
"""Create Felix DB."""
emitter = setup_emitter(general_config, character_options.config_options.checksum_address)
FELIX = character_options.create_character(emitter, config_file, general_config.debug)
if os.path.isfile(FELIX.db_filepath):
if FELIX.db_filepath.is_file():
if not force:
click.confirm(CONFIRM_OVERWRITE_DATABASE, abort=True)
os.remove(FELIX.db_filepath)

View File

@ -15,6 +15,7 @@
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from decimal import Decimal
from pathlib import Path
import click
from web3 import Web3
@ -127,7 +128,7 @@ from nucypher.cli.painting.transactions import paint_receipt_summary
from nucypher.cli.types import (
EIP55_CHECKSUM_ADDRESS,
GWEI,
DecimalRange
DecimalRange, PathPath
)
from nucypher.cli.utils import setup_emitter, retrieve_events
from nucypher.config.characters import StakeHolderConfiguration
@ -139,7 +140,7 @@ option_csv = click.option('--csv', help="Write event data to a CSV file using a
is_flag=True)
option_csv_file = click.option('--csv-file',
help="Write event data to the CSV file at specified filepath",
type=click.Path(dir_okay=False))
type=PathPath(dir_okay=False))
option_value = click.option('--value', help="Token value of stake", type=DecimalRange(min=0))
option_lock_periods = click.option('--lock-periods', help="Duration of stake in periods.", type=click.INT)
option_worker_address = click.option('--worker-address', help="Address to bond as an Ursula-Worker", type=EIP55_CHECKSUM_ADDRESS)

View File

@ -16,6 +16,7 @@
"""
import os
from pathlib import Path
import click
@ -41,6 +42,7 @@ from nucypher.cli.options import (
)
from nucypher.cli.painting.staking import paint_fee_rate_range, paint_stakes
from nucypher.cli.painting.status import paint_contract_status, paint_locked_tokens_status, paint_stakers
from nucypher.cli.types import PathPath
from nucypher.cli.utils import (
connect_to_blockchain,
get_registry,
@ -85,7 +87,7 @@ option_csv = click.option('--csv',
is_flag=True)
option_csv_file = click.option('--csv-file',
help="Write event data to the CSV file at specified filepath",
type=click.Path(dir_okay=False))
type=PathPath(dir_okay=False))
option_event_filters = click.option('--event-filter', '-f', 'event_filters',
help="Event filter of the form <name>=<value>",
multiple=True,

View File

@ -14,7 +14,7 @@
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import click
@ -76,9 +76,9 @@ class UrsulaConfigOptions:
federated_only: bool,
rest_host: str,
rest_port: int,
db_filepath: str,
db_filepath: Path,
network: str,
registry_filepath: str,
registry_filepath: Path,
dev: bool,
poa: bool,
light: bool,

View File

@ -17,6 +17,7 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
from collections import namedtuple
from pathlib import Path
import click
import functools
@ -28,7 +29,7 @@ from nucypher.cli.types import (
GWEI,
NETWORK_PORT,
NuCypherNetworkName,
WEI,
PathPath, WEI,
STAKED_TOKENS_RANGE,
MIN_ALLOWED_LOCKED_TOKENS
)
@ -38,7 +39,7 @@ from nucypher.utilities.logging import Logger
option_checksum_address = click.option('--checksum-address', help="Run with a specified account", type=EIP55_CHECKSUM_ADDRESS)
option_config_file = click.option('--config-file', help="Path to configuration file", type=EXISTING_READABLE_FILE)
option_config_root = click.option('--config-root', help="Custom configuration directory", type=click.Path())
option_config_root = click.option('--config-root', help="Custom configuration directory", type=PathPath())
option_dev = click.option('--dev', '-d', help="Enable development mode", is_flag=True)
option_db_filepath = click.option('--db-filepath', help="The database filepath to connect to", type=click.STRING)
option_dry_run = click.option('--dry-run', '-x', help="Execute normally without actually starting the node", is_flag=True)

View File

@ -45,14 +45,14 @@ def echo_solidity_version(ctx, param, value):
def echo_config_root_path(ctx, param, value):
if not value or ctx.resilient_parsing:
return
click.secho(str(DEFAULT_CONFIG_ROOT.resolve()))
click.secho(str(DEFAULT_CONFIG_ROOT.resolve())))
ctx.exit()
def echo_logging_root_path(ctx, param, value):
if not value or ctx.resilient_parsing:
return
click.secho(str(USER_LOG_DIR.resolve()))
click.secho(str(USER_LOG_DIR.resolve())))
ctx.exit()

View File

@ -14,6 +14,7 @@ GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import click
from cryptography.exceptions import InternalError
@ -133,6 +134,15 @@ class UmbralPublicKeyHex(click.ParamType):
return value
# TODO: Remove after click==8.x.x is released
# Reference: https://github.com/pallets/click/issues/405
# Reference: https://github.com/pallets/click/pull/1804
class PathPath(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""
def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))
# Ethereum
EIP55_CHECKSUM_ADDRESS = ChecksumAddress()
WEI = click.IntRange(min=1, clamp=False) # TODO: Better validation for ether and wei values?
@ -143,8 +153,8 @@ MIN_ALLOWED_LOCKED_TOKENS = Decimal(__min_allowed_locked)
STAKED_TOKENS_RANGE = DecimalRange(min=__min_allowed_locked)
# Filesystem
EXISTING_WRITABLE_DIRECTORY = click.Path(exists=True, dir_okay=True, file_okay=False, writable=True)
EXISTING_READABLE_FILE = click.Path(exists=True, dir_okay=False, file_okay=True, readable=True)
EXISTING_WRITABLE_DIRECTORY = PathPath(exists=True, dir_okay=True, file_okay=False, writable=True)
EXISTING_READABLE_FILE = PathPath(exists=True, dir_okay=False, file_okay=True, readable=True)
# Network
NETWORK_PORT = click.IntRange(min=0, max=65535, clamp=False)

View File

@ -144,7 +144,7 @@ def establish_deployer_registry(emitter,
# Establish a contract registry from disk if specified
filepath = registry_infile
default_registry_filepath = os.path.join(DEFAULT_CONFIG_ROOT, BaseContractRegistry.REGISTRY_NAME)
default_registry_filepath = DEFAULT_CONFIG_ROOT / BaseContractRegistry.REGISTRY_NAME
if registry_outfile:
# mutative usage of existing registry
registry_infile = registry_infile or default_registry_filepath
@ -157,7 +157,7 @@ def establish_deployer_registry(emitter,
if dev:
# TODO: Need a way to detect a geth --dev registry filepath here. (then deprecate the --dev flag)
filepath = os.path.join(DEFAULT_CONFIG_ROOT, BaseContractRegistry.DEVELOPMENT_REGISTRY_NAME)
filepath = DEFAULT_CONFIG_ROOT / BaseContractRegistry.DEVELOPMENT_REGISTRY_NAME
registry_filepath = filepath or default_registry_filepath
@ -225,10 +225,10 @@ def get_env_bool(var_name: str, default: bool) -> bool:
return default
def ensure_config_root(config_root: str) -> None:
def ensure_config_root(config_root: Path) -> None:
"""Ensure config root exists, because we need a default place to put output files."""
config_root = config_root or DEFAULT_CONFIG_ROOT
if not os.path.exists(config_root):
if not config_root.exists():
os.makedirs(config_root)

View File

@ -21,7 +21,7 @@ import os
import re
from abc import ABC, abstractmethod
from decimal import Decimal
from pathlib import Path
from pathlib import Path, PosixPath
from tempfile import TemporaryDirectory
from typing import Union, Callable, Optional, List
@ -133,8 +133,8 @@ class BaseConfiguration(ABC):
pass
def __init__(self,
config_root: str = None,
filepath: str = None,
config_root: Path = None,
filepath: Path = None,
*args, **kwargs):
if self.NAME is NotImplemented:
@ -143,7 +143,7 @@ class BaseConfiguration(ABC):
self.config_root = config_root or self.DEFAULT_CONFIG_ROOT
if not filepath:
filepath = os.path.join(self.config_root, self.generate_filename())
filepath = self.config_root / self.generate_filename()
self.filepath = filepath
super().__init__()
@ -188,17 +188,17 @@ class BaseConfiguration(ABC):
return filename
@classmethod
def default_filepath(cls, config_root: str = None) -> str:
def default_filepath(cls, config_root: Path = None) -> Path:
"""
Generates the default configuration filepath for the class.
:return: The generated filepath string
"""
filename = cls.generate_filename()
default_path = os.path.join(config_root or cls.DEFAULT_CONFIG_ROOT, filename)
default_path = Path(config_root or cls.DEFAULT_CONFIG_ROOT) / filename
return default_path
def generate_filepath(self, filepath: str = None, modifier: str = None, override: bool = False) -> str:
def generate_filepath(self, filepath: Path = None, modifier: str = None, override: bool = False) -> Path:
"""
Generates a filepath for saving to writing to a configuration file.
@ -217,12 +217,12 @@ class BaseConfiguration(ABC):
"""
if not filepath:
filename = self.generate_filename()
filepath = os.path.join(self.config_root, filename)
if os.path.exists(filepath) and not override:
filepath = self.config_root / filename
if filepath.exists() and not override:
if not modifier:
raise FileExistsError(f"{filepath} exists and no filename modifier supplied.")
filename = self.generate_filename(modifier=modifier)
filepath = os.path.join(self.config_root, filename)
filepath = self.config_root / filename
self.filepath = filepath
return filepath
@ -233,14 +233,14 @@ class BaseConfiguration(ABC):
:return: None.
"""
if not os.path.exists(self.config_root):
if not self.config_root.exists():
try:
os.mkdir(self.config_root, mode=0o755)
except FileNotFoundError:
os.makedirs(self.config_root, mode=0o755)
@classmethod
def peek(cls, filepath: str, field: str) -> Union[str, None]:
def peek(cls, filepath: Path, field: str) -> Union[str, None]:
payload = cls._read_configuration_file(filepath=filepath)
try:
result = payload[field]
@ -248,30 +248,30 @@ class BaseConfiguration(ABC):
raise cls.ConfigurationError(f"Cannot peek; No such configuration field '{field}', options are {list(payload.keys())}")
return result
def to_configuration_file(self, filepath: str = None, modifier: str = None, override: bool = False) -> str:
def to_configuration_file(self, filepath: Path = None, modifier: str = None, override: bool = False) -> Path:
filepath = self.generate_filepath(filepath=filepath, modifier=modifier, override=override)
self._ensure_config_root_exists()
filepath = self._write_configuration_file(filepath=filepath, override=override)
return filepath
@classmethod
def from_configuration_file(cls, filepath: str = None, **overrides) -> 'BaseConfiguration':
def from_configuration_file(cls, filepath: Path = None, **overrides) -> 'BaseConfiguration':
filepath = filepath or cls.default_filepath()
payload = cls._read_configuration_file(filepath=filepath)
instance = cls(filepath=filepath, **payload, **overrides)
return instance
@classmethod
def _read_configuration_file(cls, filepath: str) -> dict:
def _read_configuration_file(cls, filepath: Path) -> dict:
"""Reads `filepath` and returns the deserialized JSON payload dict."""
with open(filepath, 'r') as file:
raw_contents = file.read()
payload = cls.deserialize(raw_contents, payload_label=filepath)
payload = cls.deserialize(raw_contents, payload_label=str(filepath))
return payload
def _write_configuration_file(self, filepath: str, override: bool = False) -> str:
def _write_configuration_file(self, filepath: Path, override: bool = False) -> Path:
"""Writes to `filepath` and returns the written filepath. Raises `FileExistsError` if the file exists."""
if os.path.exists(str(filepath)) and not override:
if filepath.exists() and not override:
raise FileExistsError(f"{filepath} exists and no filename modifier supplied.")
with open(filepath, 'w') as file:
file.write(self.serialize())
@ -279,7 +279,16 @@ class BaseConfiguration(ABC):
def serialize(self, serializer=json.dumps) -> str:
"""Returns the JSON serialized output of `static_payload`"""
def _stringify_paths(d: dict):
for key, value in d.items():
if isinstance(value, PosixPath):
d[key] = str(value)
if isinstance(value, dict):
d[key] = _stringify_paths(value)
return d
payload = self.static_payload()
payload = _stringify_paths(payload)
payload['version'] = self.VERSION
serialized_payload = serializer(payload, indent=self.INDENTATION)
return serialized_payload
@ -293,9 +302,19 @@ class BaseConfiguration(ABC):
label = f"'{payload_label}' " if payload_label else ""
raise cls.OldVersion(f"Configuration {label}is the wrong version "
f"Expected version {cls.VERSION}; Got version {version}")
# TODO: Move to implementation of `deserialize` in their respective classes
for key in ['keyring_root', 'db_filepath']:
if key in deserialized_payload:
deserialized_payload[key] = Path(deserialized_payload[key])
for key in ['root_dir', 'metadata_dir', 'certificates_dir']:
if key in payload:
deserialized_payload['node_storage'][key] = Path(deserialized_payload['node_storage'][key])
return deserialized_payload
def update(self, filepath: str = None, **updates) -> None:
def update(self, filepath: Path = None, **updates) -> None:
for field, value in updates.items():
try:
getattr(self, field)
@ -345,8 +364,8 @@ class CharacterConfiguration(BaseConfiguration):
# Base
emitter=None,
config_root: str = None,
filepath: str = None,
config_root: Path = None,
filepath: Path = None,
# Mode
dev_mode: bool = False,
@ -388,7 +407,7 @@ class CharacterConfiguration(BaseConfiguration):
# Registry
registry: BaseContractRegistry = None,
registry_filepath: str = None,
registry_filepath: Path = None,
# Deployed Workers
worker_data: dict = None
@ -414,7 +433,8 @@ class CharacterConfiguration(BaseConfiguration):
# Contract Registry
if registry and registry_filepath:
if registry.filepath != registry_filepath:
error = f"Inconsistent registry filepaths for '{registry.filepath}' and '{registry_filepath}'."
error = f"Inconsistent registry filepaths for '{registry.filepath.absolute()}'" \
f" and '{registry_filepath.absolute()}'."
raise ValueError(error)
else:
self.log.warn(f"Registry and registry filepath were both passed.")
@ -533,7 +553,7 @@ class CharacterConfiguration(BaseConfiguration):
self.__keystore = keystore
@classmethod
def checksum_address_from_filepath(cls, filepath: str) -> str:
def checksum_address_from_filepath(cls, filepath: Path) -> str:
pattern = re.compile(r'''
(^\w+)-
(0x{1} # Then, 0x the start of the string, exactly once
@ -541,7 +561,7 @@ class CharacterConfiguration(BaseConfiguration):
''',
re.VERBOSE)
filename = os.path.basename(filepath)
filename = filepath.name
match = pattern.match(filename)
if match:
@ -615,20 +635,23 @@ class CharacterConfiguration(BaseConfiguration):
return character
@classmethod
def assemble(cls, filepath: str = None, **overrides) -> dict:
def assemble(cls, filepath: Path = None, **overrides) -> dict:
"""
Warning: This method allows mutation and may result in an inconsistent configuration.
"""
payload = cls._read_configuration_file(filepath=filepath)
node_storage = cls.load_node_storage(storage_payload=payload['node_storage'],
federated_only=payload['federated_only'])
domain = payload['domain']
max_gas_price = payload.get('max_gas_price') # gwei
if max_gas_price:
max_gas_price = Decimal(max_gas_price)
# Assemble
payload.update(dict(node_storage=node_storage, domain=domain, max_gas_price=max_gas_price))
payload.update(dict(node_storage=node_storage, max_gas_price=max_gas_price))
for key in ['keyring_root', 'db_filepath']:
if key in payload:
payload[key] = Path(payload[key])
# Filter out None values from **overrides to detect, well, overrides...
# Acts as a shim for optional CLI flags.
overrides = {k: v for k, v in overrides.items() if v is not None}
@ -637,7 +660,7 @@ class CharacterConfiguration(BaseConfiguration):
@classmethod
def from_configuration_file(cls,
filepath: str = None,
filepath: Path = None,
**overrides # < ---- Inlet for CLI Flags
) -> 'CharacterConfiguration':
"""Initialize a CharacterConfiguration from a JSON file."""
@ -649,13 +672,13 @@ class CharacterConfiguration(BaseConfiguration):
def validate(self) -> bool:
# Top-level
if not os.path.exists(self.config_root):
if not self.config_root.exists():
raise self.ConfigurationError(f'No configuration directory found at {self.config_root}.')
# Sub-paths
filepaths = self.runtime_filepaths
for field, path in filepaths.items():
if path and not os.path.exists(path):
if path and not path.exists():
message = 'Missing configuration file or directory: {}.'
if 'registry' in path:
message += ' Did you mean to pass --federated-only?'
@ -722,7 +745,7 @@ class CharacterConfiguration(BaseConfiguration):
return payload
def generate_filepath(self, filepath: str = None, modifier: str = None, override: bool = False) -> str:
def generate_filepath(self, filepath: Path = None, modifier: str = None, override: bool = False) -> Path:
modifier = modifier or self.checksum_address
filepath = super().generate_filepath(filepath=filepath, modifier=modifier, override=override)
return filepath
@ -735,11 +758,11 @@ class CharacterConfiguration(BaseConfiguration):
return filepaths
@classmethod
def generate_runtime_filepaths(cls, config_root: str) -> dict:
def generate_runtime_filepaths(cls, config_root: Path) -> dict:
"""Dynamically generate paths based on configuration root directory"""
filepaths = dict(config_root=config_root,
config_file_location=os.path.join(config_root, cls.generate_filename()),
keystore_dir=os.path.join(config_root, 'keystore'))
config_file_location=config_root / cls.generate_filename(),
keystore_dir=config_root / 'keystore')
return filepaths
def _cache_runtime_filepaths(self) -> None:
@ -757,13 +780,13 @@ class CharacterConfiguration(BaseConfiguration):
power_ups.append(power_up)
return power_ups
def initialize(self, password: str) -> str:
def initialize(self, password: str) -> Path:
"""Initialize a new configuration and write installation files to disk."""
# Development
if self.dev_mode:
self.__temp_dir = TemporaryDirectory(prefix=self.TEMP_CONFIGURATION_DIR_PREFIX)
self.config_root = self.__temp_dir.name
self.config_root = Path(self.__temp_dir.name)
# Persistent
else:

View File

@ -17,6 +17,7 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import os
from pathlib import Path
from tempfile import TemporaryDirectory
from constant_sorrow.constants import UNINITIALIZED_CONFIGURATION
@ -55,7 +56,7 @@ class UrsulaConfiguration(CharacterConfiguration):
rest_host: str = None,
worker_address: str = None,
dev_mode: bool = False,
db_filepath: str = None,
db_filepath: Path = None,
rest_port: int = None,
certificate: Certificate = None,
availability_check: bool = None,
@ -80,7 +81,7 @@ class UrsulaConfiguration(CharacterConfiguration):
super().__init__(dev_mode=dev_mode, *args, **kwargs)
@classmethod
def checksum_address_from_filepath(cls, filepath: str) -> str:
def checksum_address_from_filepath(cls, filepath: Path) -> str:
"""
Extracts worker address by "peeking" inside the ursula configuration file.
"""
@ -93,13 +94,13 @@ class UrsulaConfiguration(CharacterConfiguration):
raise RuntimeError(f"Invalid checksum address detected in configuration file at '{filepath}'.")
return checksum_address
def generate_runtime_filepaths(self, config_root: str) -> dict:
def generate_runtime_filepaths(self, config_root: Path) -> dict:
base_filepaths = super().generate_runtime_filepaths(config_root=config_root)
filepaths = dict(db_filepath=os.path.join(config_root, self.DEFAULT_DB_NAME))
filepaths = dict(db_filepath=config_root / self.DEFAULT_DB_NAME)
base_filepaths.update(filepaths)
return base_filepaths
def generate_filepath(self, modifier: str = None, *args, **kwargs) -> str:
def generate_filepath(self, modifier: str = None, *args, **kwargs) -> Path:
filepath = super().generate_filepath(modifier=modifier or self.keystore.id[:8], *args, **kwargs)
return filepath
@ -108,7 +109,7 @@ class UrsulaConfiguration(CharacterConfiguration):
worker_address=self.worker_address,
rest_host=self.rest_host,
rest_port=self.rest_port,
db_filepath=self.db_filepath,
db_filepath=self.db_filepath.absolute(),
availability_check=self.availability_check,
)
return {**super().static_payload(), **payload}
@ -138,7 +139,7 @@ class UrsulaConfiguration(CharacterConfiguration):
return ursula
def destroy(self) -> None:
if os.path.isfile(self.db_filepath):
if self.db_filepath.is_file():
os.remove(self.db_filepath)
super().destroy()
@ -241,7 +242,7 @@ class FelixConfiguration(CharacterConfiguration):
NAME = CHARACTER_CLASS.__name__.lower()
DEFAULT_DB_NAME = '{}.db'.format(NAME)
DEFAULT_DB_FILEPATH = os.path.join(DEFAULT_CONFIG_ROOT, DEFAULT_DB_NAME)
DEFAULT_DB_FILEPATH = DEFAULT_CONFIG_ROOT / DEFAULT_DB_NAME
DEFAULT_REST_PORT = 6151
DEFAULT_LEARNER_PORT = 9151
DEFAULT_REST_HOST = LOOPBACK_ADDRESS
@ -262,13 +263,13 @@ class FelixConfiguration(CharacterConfiguration):
self.rest_host = rest_host or self.DEFAULT_REST_HOST
self.tls_curve = tls_curve or self.__DEFAULT_TLS_CURVE
self.certificate = certificate
self.db_filepath = db_filepath or os.path.join(self.config_root, self.DEFAULT_DB_NAME)
self.db_filepath = db_filepath or self.config_root / self.DEFAULT_DB_NAME
def static_payload(self) -> dict:
payload = dict(
rest_host=self.rest_host,
rest_port=self.rest_port,
db_filepath=self.db_filepath,
db_filepath=self.db_filepath.absolute(),
signer_uri=self.signer_uri
)
return {**super().static_payload(), **payload}
@ -313,7 +314,7 @@ class StakeHolderConfiguration(CharacterConfiguration):
pass
@classmethod
def assemble(cls, filepath: str = None, **overrides) -> dict:
def assemble(cls, filepath: Path = None, **overrides) -> dict:
payload = cls._read_configuration_file(filepath=filepath)
# Filter out None values from **overrides to detect, well, overrides...
# Acts as a shim for optional CLI flags.
@ -322,19 +323,19 @@ class StakeHolderConfiguration(CharacterConfiguration):
return payload
@classmethod
def generate_runtime_filepaths(cls, config_root: str) -> dict:
def generate_runtime_filepaths(cls, config_root: Path) -> dict:
"""Dynamically generate paths based on configuration root directory"""
filepaths = dict(config_root=config_root,
config_file_location=os.path.join(config_root, cls.generate_filename()))
config_file_location=config_root / cls.generate_filename())
return filepaths
def initialize(self, password: str = None) -> str:
def initialize(self, password: str = None) -> Path:
"""Initialize a new configuration and write installation files to disk."""
# Development
if self.dev_mode:
self.__temp_dir = TemporaryDirectory(prefix=self.TEMP_CONFIGURATION_DIR_PREFIX)
self.config_root = self.__temp_dir.name
self.config_root = Path(self.__temp_dir.name)
# Persistent
else:
@ -358,5 +359,5 @@ class StakeHolderConfiguration(CharacterConfiguration):
node_config.initialize()
return node_config
def to_configuration_file(self, override: bool = True, *args, **kwargs) -> str:
def to_configuration_file(self, override: bool = True, *args, **kwargs) -> Path:
return super().to_configuration_file(override=True, *args, **kwargs)

View File

View File

@ -93,7 +93,7 @@ class NodeStorage(ABC):
def _write_tls_certificate(self,
port: int, # used to avoid duplicate certs with the same IP
certificate: Certificate,
force: bool = True) -> str:
force: bool = True) -> Path:
# Read
x509 = OpenSSL.crypto.X509.from_cryptography(certificate)
@ -103,12 +103,12 @@ class NodeStorage(ABC):
host = common_name_on_certificate
certificate_filepath = self.generate_certificate_filepath(host=host, port=port)
certificate_already_exists = os.path.isfile(certificate_filepath)
certificate_already_exists = certificate_filepath.is_file()
if force is False and certificate_already_exists:
raise FileExistsError('A TLS certificate already exists at {}.'.format(certificate_filepath))
# Write
os.makedirs(os.path.dirname(certificate_filepath), exist_ok=True)
os.makedirs(certificate_filepath.parent, exist_ok=True)
with open(certificate_filepath, 'wb') as certificate_file:
public_pem_bytes = certificate.public_bytes(self.TLS_CERTIFICATE_ENCODING)
certificate_file.write(public_pem_bytes)
@ -121,12 +121,12 @@ class NodeStorage(ABC):
raise NotImplementedError
@abstractmethod
def store_node_metadata(self, node, filepath: str = None) -> str:
def store_node_metadata(self, node, filepath: Path = None) -> str:
"""Save a single node's metadata and tls certificate"""
raise NotImplementedError
@abstractmethod
def generate_certificate_filepath(self, host: str, port: int) -> str:
def generate_certificate_filepath(self, host: str, port: int) -> Path:
raise NotImplementedError
@abstractmethod
@ -164,14 +164,14 @@ class ForgetfulNodeStorage(NodeStorage):
_name = ':memory:'
__base_prefix = "nucypher-tmp-certs-"
def __init__(self, parent_dir: str = None, *args, **kwargs) -> None:
def __init__(self, parent_dir: Path = None, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.__metadata = dict()
# Certificates
self.__certificates = dict()
self.__temporary_certificates = list()
self._temp_certificates_dir = tempfile.mkdtemp(prefix=self.__base_prefix, dir=parent_dir)
self._temp_certificates_dir = Path(tempfile.mkdtemp(prefix=self.__base_prefix, dir=parent_dir))
@property
def source(self) -> str:
@ -212,13 +212,13 @@ class ForgetfulNodeStorage(NodeStorage):
filepath = self._write_tls_certificate(certificate=certificate, port=port)
return filepath
def store_node_metadata(self, node, filepath: str = None) -> bytes:
def store_node_metadata(self, node, filepath: Path = None) -> bytes:
self.__metadata[node.stamp] = node
return self.__metadata[node.stamp]
def generate_certificate_filepath(self, host: str, port: int) -> str:
def generate_certificate_filepath(self, host: str, port: int) -> Path:
filename = f'{host}:{port}.pem'
filepath = os.path.join(self._temp_certificates_dir, filename)
filepath = self._temp_certificates_dir / filename
return filepath
def clear(self, metadata: bool = True, certificates: bool = True) -> None:
@ -255,10 +255,10 @@ class LocalFileBasedNodeStorage(NodeStorage):
"""Node metadata is corrupt or not possible to parse"""
def __init__(self,
config_root: str = None,
storage_root: str = None,
metadata_dir: str = None,
certificates_dir: str = None,
config_root: Path = None,
storage_root: Path = None,
metadata_dir: Path = None,
certificates_dir: Path = None,
*args, **kwargs
) -> None:
@ -271,7 +271,7 @@ class LocalFileBasedNodeStorage(NodeStorage):
self._cache_storage_filepaths(config_root=config_root)
@property
def source(self) -> str:
def source(self) -> Path:
"""Human readable source string"""
return self.root_dir
@ -282,14 +282,14 @@ class LocalFileBasedNodeStorage(NodeStorage):
return encoded_node
@staticmethod
def _generate_storage_filepaths(config_root: str = None,
storage_root: str = None,
metadata_dir: str = None,
certificates_dir: str = None):
def _generate_storage_filepaths(config_root: Path = None,
storage_root: Path = None,
metadata_dir: Path = None,
certificates_dir: Path = None):
storage_root = storage_root or os.path.join(config_root or DEFAULT_CONFIG_ROOT, 'known_nodes')
metadata_dir = metadata_dir or os.path.join(storage_root, 'metadata')
certificates_dir = certificates_dir or os.path.join(storage_root, 'certificates')
storage_root = storage_root or (config_root or DEFAULT_CONFIG_ROOT / 'known_nodes')
metadata_dir = metadata_dir or storage_root / 'metadata'
certificates_dir = certificates_dir or storage_root / 'certificates'
payload = {'storage_root': storage_root,
'metadata_dir': metadata_dir,
@ -297,7 +297,7 @@ class LocalFileBasedNodeStorage(NodeStorage):
return payload
def _cache_storage_filepaths(self, config_root: str = None):
def _cache_storage_filepaths(self, config_root: Path = None):
filepaths = self._generate_storage_filepaths(config_root=config_root,
storage_root=self.root_dir,
metadata_dir=self.metadata_dir,
@ -314,23 +314,20 @@ class LocalFileBasedNodeStorage(NodeStorage):
def __get_certificate_filename(self, host: str, port: int) -> str:
return f'{host}:{port}.{Encoding.PEM.name.lower()}'
def __get_certificate_filepath(self, certificate_filename: str) -> str:
return os.path.join(self.certificates_dir, certificate_filename)
def __get_certificate_filepath(self, certificate_filename: str) -> Path:
return self.certificates_dir / certificate_filename
def generate_certificate_filepath(self, host: str, port: int) -> str:
def generate_certificate_filepath(self, host: str, port: int) -> Path:
certificate_filename = self.__get_certificate_filename(host=host, port=port)
certificate_filepath = self.__get_certificate_filepath(certificate_filename=certificate_filename)
return certificate_filepath
@validate_checksum_address
def __read_node_tls_certificate(self, filepath: str) -> Certificate:
def __read_node_tls_certificate(self, filepath: Path = None) -> Certificate:
"""Deserialize an X509 certificate from a filepath"""
try:
with open(filepath, 'rb') as certificate_file:
certificate = x509.load_pem_x509_certificate(certificate_file.read(), backend=default_backend())
# Sanity check:
# Validate the checksum address inside the cert as a consistency check against
# nodes that may have been altered on the disk somehow.
return certificate
except FileNotFoundError:
raise FileNotFoundError("No SSL certificate found at {}".format(filepath))
@ -339,11 +336,10 @@ class LocalFileBasedNodeStorage(NodeStorage):
# Metadata
#
def __generate_metadata_filepath(self, stamp: Union[SignatureStamp, str], metadata_dir: str = None) -> str:
def __generate_metadata_filepath(self, stamp: Union[SignatureStamp, str], metadata_dir: str = None) -> Path:
if isinstance(stamp, SignatureStamp):
stamp = bytes(stamp).hex()
metadata_path = os.path.join(metadata_dir or self.metadata_dir,
self.__METADATA_FILENAME_TEMPLATE.format(stamp))
metadata_path = metadata_dir or self.metadata_dir / self.__METADATA_FILENAME_TEMPLATE.format(stamp)
return metadata_path
def __read_metadata(self, filepath: str):
@ -362,8 +358,8 @@ class LocalFileBasedNodeStorage(NodeStorage):
return node
def __write_metadata(self, filepath: str, node):
os.makedirs(os.path.dirname(filepath), exist_ok=True)
def __write_metadata(self, filepath: Path, node):
os.makedirs(filepath.parent, exist_ok=True)
with open(filepath, "wb") as f:
f.write(self.encode_node_bytes(bytes(node)))
self.log.info("Wrote new node metadata to filesystem {}".format(filepath))
@ -379,7 +375,7 @@ class LocalFileBasedNodeStorage(NodeStorage):
known_certificates = set()
if certificates_only:
for filename in filenames:
certificate = self.__read_node_tls_certificate(os.path.join(self.certificates_dir, filename))
certificate = self.__read_node_tls_certificate(self.certificates_dir / filename)
known_certificates.add(certificate)
return known_certificates
@ -387,7 +383,7 @@ class LocalFileBasedNodeStorage(NodeStorage):
known_nodes = set()
invalid_metadata = []
for filename in filenames:
metadata_path = os.path.join(self.metadata_dir, filename)
metadata_path = self.metadata_dir / filename
try:
node = self.__read_metadata(filepath=metadata_path)
except self.NodeStorageError:
@ -420,15 +416,15 @@ class LocalFileBasedNodeStorage(NodeStorage):
def clear(self, metadata: bool = True, certificates: bool = True) -> None:
"""Forget all stored nodes and certificates"""
def __destroy_dir_contents(path) -> None:
def __destroy_dir_contents(path: Path) -> None:
try:
paths_to_remove = os.listdir(path)
except FileNotFoundError:
return
else:
for file in paths_to_remove:
file_path = os.path.join(path, file)
if os.path.isfile(file_path):
file_path = path / file
if file_path.is_file():
os.unlink(file_path)
if metadata is True:
@ -441,9 +437,9 @@ class LocalFileBasedNodeStorage(NodeStorage):
def payload(self) -> dict:
payload = {
'storage_type': self._name,
'storage_root': self.root_dir,
'metadata_dir': self.metadata_dir,
'certificates_dir': self.certificates_dir
'storage_root': str(self.root_dir),
'metadata_dir': str(self.metadata_dir),
'certificates_dir': str(self.certificates_dir)
}
return payload
@ -453,7 +449,9 @@ class LocalFileBasedNodeStorage(NodeStorage):
if not storage_type == cls._name:
raise cls.NodeStorageError("Wrong storage type. got {}".format(storage_type))
del payload['storage_type']
for key in ['root_dir', 'metadata_dir', 'certificates_dir']:
if key in payload:
payload[key] = Path(payload[key])
return cls(*args, **payload, **kwargs)
def initialize(self):
@ -492,9 +490,9 @@ class TemporaryFileBasedNodeStorage(LocalFileBasedNodeStorage):
self.root_dir = self.__temp_root_dir
# Metadata
self.__temp_metadata_dir = str(Path(self.__temp_root_dir) / "metadata")
self.__temp_metadata_dir = Path(self.__temp_root_dir) / "metadata"
self.metadata_dir = self.__temp_metadata_dir
# Certificates
self.__temp_certificates_dir = str(Path(self.__temp_root_dir) / "certs")
self.__temp_certificates_dir = Path(self.__temp_root_dir) / "certs"
self.certificates_dir = self.__temp_certificates_dir

View File

@ -14,6 +14,8 @@ GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import lmdb
from contextlib import contextmanager, suppress
from functools import partial
@ -83,7 +85,7 @@ class Datastore:
# We can set this arbitrarily high (1TB) to prevent any run-time crashes.
LMDB_MAP_SIZE = 1_000_000_000_000
def __init__(self, db_path: str) -> None:
def __init__(self, db_path: Path) -> None:
"""
Initializes a Datastore object by path.

View File

@ -19,9 +19,10 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import os
import uuid
import weakref
from pathlib import Path
from datetime import datetime, timedelta
from typing import Tuple
from constant_sorrow import constants
from constant_sorrow.constants import FLEET_STATES_MATCH, RELAX, NOT_STAKING
from flask import Flask, Response, jsonify, request
@ -44,10 +45,10 @@ from nucypher.network.exceptions import NodeSeemsToBeDown
from nucypher.network.protocols import InterfaceInfo
from nucypher.utilities.logging import Logger
HERE = BASE_DIR = os.path.abspath(os.path.dirname(__file__))
TEMPLATES_DIR = os.path.join(HERE, "templates")
HERE = BASE_DIR = Path(__file__).parent
TEMPLATES_DIR = HERE / "templates"
status_template = Template(filename=os.path.join(TEMPLATES_DIR, "basic_status.mako")).get_def('main')
status_template = Template(filename=str(TEMPLATES_DIR / "basic_status.mako")).get_def('main')
class ProxyRESTServer:
@ -76,7 +77,7 @@ class ProxyRESTServer:
def make_rest_app(
db_filepath: str,
db_filepath: Path,
this_node,
domain,
log: Logger = Logger("http-application-layer")

View File

@ -217,10 +217,10 @@ class BaseCloudNodeConfigurator:
return
# where we save our state data so we can remember the resources we created for future use
self.config_path = os.path.join(self.network_config_path, self.namespace, self.config_filename)
self.config_dir = os.path.dirname(self.config_path)
self.config_path = self.network_config_path / self.namespace / self.config_filename
self.config_dir = self.config_path.parent
if os.path.exists(self.config_path):
if self.config_path.exists():
self.config = json.load(open(self.config_path))
self.namespace_network = self.config['namespace']
else:
@ -262,8 +262,8 @@ class BaseCloudNodeConfigurator:
def _write_config(self):
configdir = os.path.dirname(self.config_path)
os.makedirs(configdir, exist_ok=True)
config_dir = self.config_path.parent
os.makedirs(config_dir, exist_ok=True)
with open(self.config_path, 'w') as outfile:
json.dump(self.config, outfile, indent=4)
@ -388,7 +388,7 @@ class BaseCloudNodeConfigurator:
@property
def _inventory_template(self):
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'cloud_deploy_ansible_inventory.mako')
template_path = Path(__file__).parent / 'templates' / 'cloud_deploy_ansible_inventory.mako'
return Template(filename=template_path)
def deploy_nucypher_on_existing_nodes(self, node_names, wipe_nucypher=False):
@ -856,14 +856,14 @@ class AWSNodeConfigurator(BaseCloudNodeConfigurator):
def _create_keypair(self):
new_keypair_data = self.ec2Client.create_key_pair(KeyName=f'{self.namespace_network}')
outpath = Path(DEFAULT_CONFIG_ROOT).joinpath(NODE_CONFIG_STORAGE_KEY, f'{self.namespace_network}.awskeypair')
os.makedirs(os.path.dirname(outpath), exist_ok=True)
with open(outpath, 'w') as outfile:
out_path = DEFAULT_CONFIG_ROOT / NODE_CONFIG_STORAGE_KEY / f'{self.namespace_network}.awskeypair'
os.makedirs(out_path.parent, exist_ok=True)
with open(out_path, 'w') as outfile:
outfile.write(new_keypair_data['KeyMaterial'])
# set local keypair permissions https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html
os.chmod(outpath, 0o400)
self.emitter.echo(f"a new aws keypair was saved to {outpath}, keep it safe.", color='yellow')
return new_keypair_data['KeyName'], outpath
os.chmod(out_path, 0o400)
self.emitter.echo(f"a new aws keypair was saved to {out_path}, keep it safe.", color='yellow')
return new_keypair_data['KeyName'], out_path
def _delete_keypair(self):
# only use self.namespace here to avoid accidental deletions of pre-existing keypairs
@ -1072,7 +1072,7 @@ class GenericConfigurator(BaseCloudNodeConfigurator):
provider_name = 'generic'
def _write_config(self):
if not os.path.exists(self.config_path) and not self.action in self.NAMESSPACE_CREATE_ACTIONS:
if not self.config_path.exists() and self.action not in self.NAMESSPACE_CREATE_ACTIONS:
raise AttributeError(f"Namespace/config '{self.namespace}' does not exist. Show existing namespaces: `nucypher cloudworkers list-namespaces` or create a namespace: `nucypher cloudworkers create`")
super()._write_config()

View File

@ -35,7 +35,7 @@ from nucypher.config.constants import DEFAULT_CONFIG_ROOT
# """
#
# if not filepath:
# filepath = os.path.join(DEFAULT_CONFIG_ROOT, 'static-nodes.json')
# filepath = DEFAULT_CONFIG_ROOT / 'static-nodes.json'
# try:
# with open(filepath, 'r') as file:
# static_nodes = json.load(file)

View File

@ -99,7 +99,7 @@ class PostDevelopCommand(develop):
#
def read_requirements(path):
with open(os.path.join(BASE_DIR, path)) as f:
with open(BASE_DIR / path) as f:
_pipenv_flags, *lines = f.read().split('\n')
# TODO remove when will be no more git dependencies in requirements.txt

View File

@ -32,7 +32,7 @@ from tests.utils.blockchain import TesterBlockchain as _TesterBlockchain
@pytest.mark.usefixtures('testerchain')
def test_rapid_deployment(token_economics, test_registry, tmpdir, get_random_checksum_address):
def test_rapid_deployment(token_economics, test_registry, temp_dir_path, get_random_checksum_address):
blockchain = _TesterBlockchain(eth_airdrop=False, test_accounts=4)
@ -68,7 +68,7 @@ def test_rapid_deployment(token_economics, test_registry, tmpdir, get_random_che
random_allocation = {'checksum_address': checksum_address, 'amount': amount, 'lock_periods': duration}
allocation_data.append(random_allocation)
filepath = tmpdir / "allocations.json"
filepath = temp_dir_path / "allocations.json"
with open(filepath, 'w') as f:
json.dump(allocation_data, f)

View File

@ -37,8 +37,8 @@ def test_software_stakeholder_configuration(testerchain,
path = stakeholder_config_file_location
# Save the stakeholder JSON config
stakeholder_configuration.to_configuration_file(filepath=str(path))
with open(str(path), 'r') as file:
stakeholder_configuration.to_configuration_file(filepath=path)
with open(path, 'r') as file:
# Ensure file contents are serializable
contents = file.read()

View File

@ -18,6 +18,7 @@
import json
import os
from pathlib import Path
from unittest.mock import patch, PropertyMock
import pytest
@ -37,8 +38,8 @@ CONTRACTS_TO_UPGRADE = ('StakingEscrow', 'PolicyManager', 'Adjudicator', 'Stakin
@pytest.fixture(scope="module")
def registry_filepath(temp_dir_path):
return os.path.join(temp_dir_path, 'nucypher-test-autodeploy.json')
def registry_filepath(temp_dir_path: Path):
return temp_dir_path / 'nucypher-test-autodeploy.json'
def test_deploy_single_contract(click_runner, tempfile_path):

View File

@ -17,7 +17,7 @@
import os
import pytest
from pathlib import Path
from nucypher.blockchain.eth.agents import (
AdjudicatorAgent,
@ -42,8 +42,8 @@ from tests.constants import (
YES_ENTER
)
ALTERNATE_REGISTRY_FILEPATH = '/tmp/nucypher-test-registry-alternate.json'
ALTERNATE_REGISTRY_FILEPATH_2 = '/tmp/nucypher-test-registry-alternate-2.json'
ALTERNATE_REGISTRY_FILEPATH = Path('/tmp/nucypher-test-registry-alternate.json')
ALTERNATE_REGISTRY_FILEPATH_2 = Path('/tmp/nucypher-test-registry-alternate-2.json')
def test_nucypher_deploy_inspect_no_deployments(click_runner, testerchain, new_local_registry):
@ -196,9 +196,9 @@ def test_transfer_ownership_staking_interface_router(click_runner, testerchain,
def test_bare_contract_deployment_to_alternate_registry(click_runner, agency_local_registry):
if os.path.exists(ALTERNATE_REGISTRY_FILEPATH):
if ALTERNATE_REGISTRY_FILEPATH.exists():
os.remove(ALTERNATE_REGISTRY_FILEPATH)
assert not os.path.exists(ALTERNATE_REGISTRY_FILEPATH)
assert not ALTERNATE_REGISTRY_FILEPATH.exists()
command = ('contracts',
'--contract-name', StakingEscrowDeployer.contract_name,
@ -215,8 +215,8 @@ def test_bare_contract_deployment_to_alternate_registry(click_runner, agency_loc
assert result.exit_code == 0
# Verify alternate registry output
assert os.path.exists(agency_local_registry.filepath)
assert os.path.exists(ALTERNATE_REGISTRY_FILEPATH)
assert agency_local_registry.filepath.exists()
assert ALTERNATE_REGISTRY_FILEPATH.exists()
new_registry = LocalContractRegistry(filepath=ALTERNATE_REGISTRY_FILEPATH)
assert agency_local_registry != new_registry
@ -230,7 +230,7 @@ def test_bare_contract_deployment_to_alternate_registry(click_runner, agency_loc
def test_manual_proxy_retargeting(monkeypatch, testerchain, click_runner, token_economics):
# A local, alternate filepath registry exists
assert os.path.exists(ALTERNATE_REGISTRY_FILEPATH)
assert ALTERNATE_REGISTRY_FILEPATH.exists()
local_registry = LocalContractRegistry(filepath=ALTERNATE_REGISTRY_FILEPATH)
deployer = StakingEscrowDeployer(registry=local_registry,
economics=token_economics)
@ -267,9 +267,9 @@ def test_manual_proxy_retargeting(monkeypatch, testerchain, click_runner, token_
def test_manual_deployment_of_idle_network(click_runner):
if os.path.exists(ALTERNATE_REGISTRY_FILEPATH_2):
if ALTERNATE_REGISTRY_FILEPATH_2.exists():
os.remove(ALTERNATE_REGISTRY_FILEPATH_2)
assert not os.path.exists(ALTERNATE_REGISTRY_FILEPATH_2)
assert not ALTERNATE_REGISTRY_FILEPATH_2.exists()
registry = LocalContractRegistry(filepath=ALTERNATE_REGISTRY_FILEPATH_2)
registry.write(InMemoryContractRegistry().read()) # TODO: Manual deployments from scratch require an existing but empty registry (i.e., a json file just with "[]")
@ -286,7 +286,7 @@ def test_manual_deployment_of_idle_network(click_runner):
result = click_runner.invoke(deploy, command, input=user_input, catch_exceptions=False)
assert result.exit_code == 0, result.output
assert os.path.exists(ALTERNATE_REGISTRY_FILEPATH_2)
assert ALTERNATE_REGISTRY_FILEPATH_2.exists()
new_registry = LocalContractRegistry(filepath=ALTERNATE_REGISTRY_FILEPATH_2)
deployed_contracts = [NUCYPHER_TOKEN_CONTRACT_NAME]

View File

@ -40,7 +40,7 @@ def multisig_owners(testerchain):
@pytest.fixture(scope="module")
def multisig_parameters_filepath(multisig_owners, temp_dir_path):
filepath = os.path.join(temp_dir_path, 'multisig_params.json')
filepath = temp_dir_path / 'multisig_params.json'
multisig_parameters = {
'threshold': MULTISIG_THRESHOLD,
@ -51,7 +51,7 @@ def multisig_parameters_filepath(multisig_owners, temp_dir_path):
file.write(json.dumps(multisig_parameters))
yield filepath
if os.path.isfile(filepath):
if filepath.exists():
os.remove(filepath)

View File

@ -22,6 +22,7 @@ import shutil
from base64 import b64decode
from collections import namedtuple
from json import JSONDecodeError
from pathlib import Path
import maya
import pytest
@ -138,7 +139,7 @@ def run_entire_cli_lifecycle(click_runner,
GlobalLoggerSettings.stop_console_logging()
# Alice uses her configuration file to run the character "view" command
alice_configuration_file_location = os.path.join(alice_config_root, AliceConfiguration.generate_filename())
alice_configuration_file_location = Path(alice_config_root, AliceConfiguration.generate_filename())
alice_view_args = ('alice', 'public-keys',
'--json-ipc',
'--config-file', alice_configuration_file_location)
@ -178,7 +179,7 @@ def run_entire_cli_lifecycle(click_runner,
assert bob_init_response.exit_code == 0
# Alice uses her configuration file to run the character "view" command
bob_configuration_file_location = os.path.join(bob_config_root, BobConfiguration.generate_filename())
bob_configuration_file_location = Path(bob_config_root, BobConfiguration.generate_filename())
bob_view_args = ('bob', 'public-keys',
'--json-ipc',
'--mock-networking', # TODO: It's absurd for this public-keys command to connect at all. 1710

View File

@ -16,6 +16,7 @@
"""
import os
from pathlib import Path
from unittest import mock
from unittest.mock import PropertyMock
@ -97,6 +98,8 @@ def test_alice_control_starts_with_mocked_keystore(click_runner, mocker, monkeyp
def test_initialize_alice_with_custom_configuration_root(custom_filepath, click_runner, monkeypatch):
custom_filepath = Path(custom_filepath)
monkeypatch.delenv(NUCYPHER_ENVVAR_KEYSTORE_PASSWORD, raising=False)
# Use a custom local filepath for configuration
@ -114,12 +117,12 @@ def test_initialize_alice_with_custom_configuration_root(custom_filepath, click_
assert 'IPv4' not in result.output
# Files and Directories
assert os.path.isdir(custom_filepath), 'Configuration file does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'keystore')), 'KEYSTORE does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'known_nodes')), 'known_nodes directory does not exist'
assert custom_filepath.is_dir(), 'Configuration file does not exist'
assert (custom_filepath / 'keystore').is_dir(), 'Keystore does not exist'
assert (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
custom_config_filepath = os.path.join(custom_filepath, AliceConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
# Auth
assert COLLECT_NUCYPHER_PASSWORD in result.output, 'WARNING: User was not prompted for password'
@ -127,16 +130,18 @@ def test_initialize_alice_with_custom_configuration_root(custom_filepath, click_
def test_alice_control_starts_with_preexisting_configuration(click_runner, custom_filepath):
custom_config_filepath = os.path.join(custom_filepath, AliceConfiguration.generate_filename())
run_args = ('alice', 'run', '--dry-run', '--lonely', '--config-file', custom_config_filepath)
custom_filepath = Path(custom_filepath)
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
run_args = ('alice', 'run', '--dry-run', '--lonely', '--config-file', str(custom_config_filepath))
result = click_runner.invoke(nucypher_cli, run_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0, result.exception
def test_alice_make_card(click_runner, custom_filepath, mocker):
custom_filepath = Path(custom_filepath)
mock_save_card = mocker.patch.object(Card, 'save')
custom_config_filepath = os.path.join(custom_filepath, AliceConfiguration.generate_filename())
command = ('alice', 'make-card', '--nickname', 'flora', '--config-file', custom_config_filepath)
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
command = ('alice', 'make-card', '--nickname', 'flora', '--config-file', str(custom_config_filepath))
result = click_runner.invoke(nucypher_cli, command, input=FAKE_PASSWORD_CONFIRMED, catch_exceptions=False)
assert result.exit_code == 0
mock_save_card.assert_called_once()
@ -169,8 +174,9 @@ def test_alice_public_keys(click_runner):
def test_alice_view_preexisting_configuration(click_runner, custom_filepath):
custom_config_filepath = os.path.join(custom_filepath, AliceConfiguration.generate_filename())
view_args = ('alice', 'config', '--config-file', custom_config_filepath)
custom_filepath = Path(custom_filepath)
custom_config_filepath = custom_filepath / AliceConfiguration.generate_filename()
view_args = ('alice', 'config', '--config-file', str(custom_config_filepath))
result = click_runner.invoke(nucypher_cli, view_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0
assert "checksum_address" in result.output

View File

@ -17,6 +17,7 @@
import json
from base64 import b64encode
from pathlib import Path
from unittest import mock
import os
@ -51,7 +52,7 @@ def test_missing_configuration_file(default_filepath_mock, click_runner):
assert "nucypher bob init" in result.output
def test_initialize_bob_with_custom_configuration_root(custom_filepath, click_runner):
def test_initialize_bob_with_custom_configuration_root(click_runner, custom_filepath: Path):
# Use a custom local filepath for configuration
init_args = ('bob', 'init',
'--network', TEMPORARY_DOMAIN,
@ -66,40 +67,40 @@ def test_initialize_bob_with_custom_configuration_root(custom_filepath, click_ru
assert 'IPv4' not in result.output
# Files and Directories
assert os.path.isdir(custom_filepath), 'Configuration file does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'keystore')), 'KEYSTORE does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'known_nodes')), 'known_nodes directory does not exist'
assert custom_filepath.is_dir(), 'Configuration file does not exist'
assert (custom_filepath / 'keystore').is_dir(), 'Keystore does not exist'
assert (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
custom_config_filepath = os.path.join(custom_filepath, BobConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
# Auth
assert COLLECT_NUCYPHER_PASSWORD in result.output, 'WARNING: User was not prompted for password'
assert 'Repeat for confirmation:' in result.output, 'User was not prompted to confirm password'
def test_bob_control_starts_with_preexisting_configuration(click_runner, custom_filepath):
custom_config_filepath = os.path.join(custom_filepath, BobConfiguration.generate_filename())
init_args = ('bob', 'run', '--dry-run', '--lonely', '--config-file', custom_config_filepath)
def test_bob_control_starts_with_preexisting_configuration(click_runner, custom_filepath: Path):
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
init_args = ('bob', 'run', '--dry-run', '--lonely', '--config-file', str(custom_config_filepath))
result = click_runner.invoke(nucypher_cli, init_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0, result.exception
assert "Bob Verifying Key" in result.output
assert "Bob Encrypting Key" in result.output
def test_bob_make_card(click_runner, custom_filepath, mocker):
def test_bob_make_card(click_runner, custom_filepath: Path, mocker):
mock_save_card = mocker.patch.object(Card, 'save')
custom_config_filepath = os.path.join(custom_filepath, BobConfiguration.generate_filename())
command = ('bob', 'make-card', '--nickname', 'anders', '--config-file', custom_config_filepath)
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
command = ('bob', 'make-card', '--nickname', 'anders', '--config-file', str(custom_config_filepath))
result = click_runner.invoke(nucypher_cli, command, input=FAKE_PASSWORD_CONFIRMED, catch_exceptions=False)
assert result.exit_code == 0
assert "Saved new character card " in result.output
mock_save_card.assert_called_once()
def test_bob_view_with_preexisting_configuration(click_runner, custom_filepath):
custom_config_filepath = os.path.join(custom_filepath, BobConfiguration.generate_filename())
view_args = ('bob', 'config', '--config-file', custom_config_filepath)
def test_bob_view_with_preexisting_configuration(click_runner, custom_filepath: Path):
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
view_args = ('bob', 'config', '--config-file', str(custom_config_filepath))
result = click_runner.invoke(nucypher_cli, view_args, input=FAKE_PASSWORD_CONFIRMED)
assert result.exit_code == 0, result.exception
assert "checksum_address" in result.output
@ -121,7 +122,7 @@ def test_bob_retrieves_twice_via_cli(click_runner,
capsule_side_channel,
enacted_federated_policy,
federated_ursulas,
custom_filepath_2,
custom_filepath_2: Path,
federated_alice,
federated_bob,
mocker):
@ -132,7 +133,7 @@ def test_bob_retrieves_twice_via_cli(click_runner,
three_message_kits = [capsule_side_channel(), capsule_side_channel(), capsule_side_channel()]
bob_config_root = custom_filepath_2
bob_configuration_file_location = os.path.join(bob_config_root, BobConfiguration.generate_filename())
bob_configuration_file_location = bob_config_root / BobConfiguration.generate_filename()
label = enacted_federated_policy.label
# I already have a Bob.
@ -200,10 +201,10 @@ def test_bob_retrieves_twice_via_cli(click_runner,
# NOTE: Should be the last test in this module since it deletes the configuration file
def test_bob_destroy(click_runner, custom_filepath):
custom_config_filepath = os.path.join(custom_filepath, BobConfiguration.generate_filename())
destroy_args = ('bob', 'destroy', '--config-file', custom_config_filepath, '--force')
def test_bob_destroy(click_runner, custom_filepath: Path):
custom_config_filepath = custom_filepath / BobConfiguration.generate_filename()
destroy_args = ('bob', 'destroy', '--config-file', str(custom_config_filepath), '--force')
result = click_runner.invoke(nucypher_cli, destroy_args, catch_exceptions=False)
assert result.exit_code == 0, result.exception
assert SUCCESSFUL_DESTRUCTION in result.output
assert not os.path.exists(custom_config_filepath), "Bob config file was deleted"
assert not custom_config_filepath.exists(), "Bob config file was deleted"

View File

@ -18,6 +18,8 @@
import json
import os
from pathlib import Path
import pytest
from nucypher.blockchain.eth.registry import InMemoryContractRegistry
@ -40,14 +42,14 @@ ENV = {NUCYPHER_ENVVAR_KEYSTORE_PASSWORD: INSECURE_DEVELOPMENT_PASSWORD}
@pytest.mark.parametrize('config_class', CONFIG_CLASSES)
def test_initialize_via_cli(config_class, custom_filepath, click_runner, monkeypatch):
def test_initialize_via_cli(config_class, custom_filepath: Path, click_runner, monkeypatch):
command = config_class.CHARACTER_CLASS.__name__.lower()
# Use a custom local filepath for configuration
init_args = (command, 'init',
'--network', TEMPORARY_DOMAIN,
'--federated-only',
'--config-root', custom_filepath)
'--config-root', str(custom_filepath))
if config_class == UrsulaConfiguration:
init_args += ('--rest-host', MOCK_IP_ADDRESS)
@ -63,19 +65,19 @@ def test_initialize_via_cli(config_class, custom_filepath, click_runner, monkeyp
assert str(MOCK_CUSTOM_INSTALLATION_PATH) in result.output, "Configuration not in system temporary directory"
# Files and Directories
assert os.path.isdir(custom_filepath), 'Configuration file does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'keystore')), 'KEYSTORE does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'known_nodes')), 'known_nodes directory does not exist'
assert custom_filepath.is_dir(), 'Configuration file does not exist'
assert (custom_filepath / 'keystore').is_dir(), 'Keystore does not exist'
assert (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
@pytest.mark.parametrize('config_class', CONFIG_CLASSES)
def test_reconfigure_via_cli(click_runner, custom_filepath, config_class, monkeypatch, test_registry):
def test_reconfigure_via_cli(click_runner, custom_filepath: Path, config_class, monkeypatch, test_registry):
def fake_get_latest_registry(*args, **kwargs):
return test_registry
monkeypatch.setattr(InMemoryContractRegistry, 'from_latest_publication', fake_get_latest_registry)
custom_config_filepath = os.path.join(custom_filepath, config_class.generate_filename())
custom_config_filepath = custom_filepath / config_class.generate_filename()
view_args = (config_class.CHARACTER_CLASS.__name__.lower(), 'config',
'--config-file', custom_config_filepath,

View File

@ -75,7 +75,7 @@ def test_run_felix(click_runner, testerchain, agency_local_registry):
result = click_runner.invoke(nucypher_cli, init_args, catch_exceptions=False, env=envvars)
assert result.exit_code == 0
configuration_file_location = os.path.join(MOCK_CUSTOM_INSTALLATION_PATH_2, FelixConfiguration.generate_filename())
configuration_file_location = MOCK_CUSTOM_INSTALLATION_PATH_2 / FelixConfiguration.generate_filename()
# Felix Creates a Database
db_args = ('felix', 'createdb',
@ -180,4 +180,4 @@ def test_run_felix(click_runner, testerchain, agency_local_registry):
result = click_runner.invoke(nucypher_cli, destroy_args, catch_exceptions=False, env=envvars)
assert result.exit_code == 0
assert SUCCESSFUL_DESTRUCTION in result.output
assert not os.path.exists(configuration_file_location), "Felix configuration file was deleted"
assert not configuration_file_location.exists(), "Felix configuration file was deleted"

View File

@ -43,10 +43,10 @@ from tests.constants import (
@pytest.fixture(scope='function')
def custom_filepath():
_path = str(MOCK_CUSTOM_INSTALLATION_PATH)
_path = MOCK_CUSTOM_INSTALLATION_PATH
shutil.rmtree(_path, ignore_errors=True)
assert not Path(_path).exists()
yield Path(_path)
assert not _path.exists()
yield _path
shutil.rmtree(_path, ignore_errors=True)
@ -96,7 +96,7 @@ def test_coexisting_configurations(click_runner,
assert not known_nodes_dir.exists()
# Not the configuration root...
assert not os.path.isdir(custom_filepath)
assert not custom_filepath.is_dir()
# ... nothing
None
@ -123,8 +123,8 @@ def test_coexisting_configurations(click_runner,
assert result.exit_code == 0
# All configuration files still exist.
assert os.path.isdir(custom_filepath)
assert os.path.isfile(felix_file_location)
assert custom_filepath.is_dir()
assert felix_file_location.is_file()
# Use a custom local filepath to init a persistent Alice
alice_init_args = ('alice', 'init',
@ -138,8 +138,8 @@ def test_coexisting_configurations(click_runner,
assert result.exit_code == 0
# All configuration files still exist.
assert os.path.isfile(felix_file_location)
assert os.path.isfile(alice_file_location)
assert felix_file_location.is_file()
assert alice_file_location.is_file()
# Use the same local filepath to init a persistent Ursula
init_args = ('ursula', 'init',
@ -154,9 +154,9 @@ def test_coexisting_configurations(click_runner,
assert result.exit_code == 0, result.output
# All configuration files still exist.
assert os.path.isfile(felix_file_location)
assert os.path.isfile(alice_file_location)
assert os.path.isfile(ursula_file_location)
assert felix_file_location.is_file()
assert alice_file_location.is_file()
assert ursula_file_location.is_file()
key_spy = mocker.spy(Keystore, 'generate')
@ -174,14 +174,14 @@ def test_coexisting_configurations(click_runner,
assert result.exit_code == 0
# All configuration files still exist.
assert os.path.isfile(felix_file_location)
assert os.path.isfile(alice_file_location)
assert felix_file_location.is_file()
assert alice_file_location.is_file()
kid = key_spy.spy_return.id[:8]
another_ursula_configuration_file_location = custom_filepath / UrsulaConfiguration.generate_filename(modifier=kid)
assert os.path.isfile(another_ursula_configuration_file_location)
assert another_ursula_configuration_file_location.is_file()
assert os.path.isfile(ursula_file_location)
assert ursula_file_location.is_file()
#
# Run
@ -204,10 +204,10 @@ def test_coexisting_configurations(click_runner,
Worker.READY_TIMEOUT = None
# All configuration files still exist.
assert os.path.isfile(felix_file_location)
assert os.path.isfile(alice_file_location)
assert os.path.isfile(another_ursula_configuration_file_location)
assert os.path.isfile(ursula_file_location)
assert felix_file_location.is_file()
assert alice_file_location.is_file()
assert another_ursula_configuration_file_location.is_file()
assert ursula_file_location.is_file()
# Check that the proper Ursula console is attached
assert another_ursula in result.output
@ -221,23 +221,23 @@ def test_coexisting_configurations(click_runner,
'--config-file', another_ursula_configuration_file_location)
result = click_runner.invoke(nucypher_cli, another_ursula_destruction_args, catch_exceptions=False, env=envvars)
assert result.exit_code == 0
assert not os.path.isfile(another_ursula_configuration_file_location)
assert not another_ursula_configuration_file_location.is_file()
ursula_destruction_args = ('ursula', 'destroy', '--config-file', ursula_file_location)
result = click_runner.invoke(nucypher_cli, ursula_destruction_args, input='Y', catch_exceptions=False, env=envvars)
assert result.exit_code == 0
assert 'y/N' in result.output
assert not os.path.isfile(ursula_file_location)
assert not ursula_file_location.is_file()
alice_destruction_args = ('alice', 'destroy', '--force', '--config-file', alice_file_location)
result = click_runner.invoke(nucypher_cli, alice_destruction_args, catch_exceptions=False, env=envvars)
assert result.exit_code == 0
assert not os.path.isfile(alice_file_location)
assert not alice_file_location.is_file()
felix_destruction_args = ('felix', 'destroy', '--force', '--config-file', felix_file_location)
result = click_runner.invoke(nucypher_cli, felix_destruction_args, catch_exceptions=False, env=envvars)
assert result.exit_code == 0
assert not os.path.isfile(felix_file_location)
assert not felix_file_location.is_file()
def test_corrupted_configuration(click_runner,

View File

@ -144,7 +144,7 @@ def test_nucypher_status_locked_tokens(click_runner, testerchain, agency_local_r
assert re.search(f"Min: {all_locked} - Max: {all_locked}", result.output, re.MULTILINE)
def test_nucypher_status_events(click_runner, testerchain, agency_local_registry, stakers, tmpdir):
def test_nucypher_status_events(click_runner, testerchain, agency_local_registry, stakers, temp_dir_path):
# All workers make a commitment
staking_agent = ContractAgency.get_agent(StakingEscrowAgent, registry=agency_local_registry)
starting_block_number = testerchain.get_block_number()
@ -188,7 +188,7 @@ def test_nucypher_status_events(click_runner, testerchain, agency_local_registry
#
# CSV output
#
csv_file = Path(tmpdir) / 'status_events_output.csv'
csv_file = temp_dir_path / 'status_events_output.csv'
csv_status_command = ('events',
'--provider', TEST_PROVIDER_URI,
'--network', TEMPORARY_DOMAIN,

View File

@ -19,6 +19,8 @@ import json
from json import JSONDecodeError
import os
from pathlib import Path
from unittest.mock import PropertyMock
import pytest
@ -61,7 +63,7 @@ def test_initialize_ursula_defaults(click_runner, mocker, tmpdir):
assert 'Repeat for confirmation:' in result.output, 'User was not prompted to confirm password'
def test_initialize_custom_configuration_root(custom_filepath, click_runner):
def test_initialize_custom_configuration_root(click_runner, custom_filepath: Path):
deploy_port = select_test_port()
# Use a custom local filepath for configuration
@ -80,21 +82,21 @@ def test_initialize_custom_configuration_root(custom_filepath, click_runner):
assert 'IPv4' not in result.output
# Files and Directories
assert os.path.isdir(custom_filepath), 'Configuration file does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'keystore')), 'KEYSTORE does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'known_nodes')), 'known_nodes directory does not exist'
assert custom_filepath.is_dir(), 'Configuration file does not exist'
assert (custom_filepath / 'keystore').is_dir(), 'KEYSTORE does not exist'
assert (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
custom_config_filepath = os.path.join(custom_filepath, UrsulaConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
# Auth
assert COLLECT_NUCYPHER_PASSWORD in result.output, 'WARNING: User was not prompted for password'
assert 'Repeat for confirmation:' in result.output, 'User was not prompted to confirm password'
def test_configuration_file_contents(custom_filepath, nominal_federated_configuration_fields):
custom_config_filepath = os.path.join(custom_filepath, UrsulaConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
def test_configuration_file_contents(custom_filepath: Path, nominal_federated_configuration_fields):
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
# Check the contents of the configuration file
with open(custom_config_filepath, 'r') as config_file:
@ -113,14 +115,14 @@ def test_configuration_file_contents(custom_filepath, nominal_federated_configur
# assert os.path.exists(path), '{} does not exist'.format(path)
assert user_data_dir not in path, '{} includes default appdir path {}'.format(field, user_data_dir)
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
def test_ursula_view_configuration(custom_filepath, click_runner, nominal_federated_configuration_fields):
def test_ursula_view_configuration(custom_filepath: Path, click_runner, nominal_federated_configuration_fields):
# Ensure the configuration file still exists
custom_config_filepath = os.path.join(custom_filepath, UrsulaConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
view_args = ('ursula', 'config', '--config-file', custom_config_filepath)
@ -135,14 +137,14 @@ def test_ursula_view_configuration(custom_filepath, click_runner, nominal_federa
assert field in result.output, "Missing field '{}' from configuration file."
# Make sure nothing crazy is happening...
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
def test_run_federated_ursula_from_config_file(custom_filepath, click_runner):
def test_run_federated_ursula_from_config_file(custom_filepath: Path, click_runner):
# Ensure the configuration file still exists
custom_config_filepath = os.path.join(custom_filepath, UrsulaConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
# Run Ursula
run_args = ('ursula', 'run',
@ -177,13 +179,12 @@ def test_ursula_save_metadata(click_runner, custom_filepath):
# Should be the last test since it deletes the configuration file
def test_ursula_destroy_configuration(custom_filepath, click_runner):
preexisting_live_configuration = os.path.isdir(DEFAULT_CONFIG_ROOT)
preexisting_live_configuration_file = os.path.isfile(os.path.join(DEFAULT_CONFIG_ROOT,
UrsulaConfiguration.generate_filename()))
preexisting_live_configuration = DEFAULT_CONFIG_ROOT.is_dir()
preexisting_live_configuration_file = (DEFAULT_CONFIG_ROOT / UrsulaConfiguration.generate_filename()).is_file()
# Ensure the configuration file still exists
custom_config_filepath = os.path.join(custom_filepath, UrsulaConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
# Run the destroy command
destruction_args = ('ursula', 'destroy', '--config-file', custom_config_filepath)
@ -193,7 +194,7 @@ def test_ursula_destroy_configuration(custom_filepath, click_runner):
env={NUCYPHER_ENVVAR_KEYSTORE_PASSWORD: INSECURE_DEVELOPMENT_PASSWORD})
# CLI Output
assert not os.path.isfile(custom_config_filepath), 'Configuration file still exists'
assert not custom_config_filepath.is_file(), 'Configuration file still exists'
assert '? [y/N]:' in result.output, 'WARNING: User was not asked to destroy files'
assert str(custom_filepath) in result.output, 'WARNING: Configuration path not in output. Deleting the wrong path?'
assert SUCCESSFUL_DESTRUCTION in result.output, '"Destroyed" not in output'
@ -201,14 +202,14 @@ def test_ursula_destroy_configuration(custom_filepath, click_runner):
assert result.exit_code == 0, 'Destruction did not succeed'
# Ensure the files are deleted from the filesystem
assert not os.path.isfile(custom_config_filepath), 'Files still exist' # ... shes's gone...
assert os.path.isdir(custom_filepath), 'Nucypher files no longer exist' # ... but not NuCypher ...
assert not custom_config_filepath.is_file(), 'Files still exist' # ... shes's gone...
assert custom_filepath.is_dir(), 'Nucypher files no longer exist' # ... but not NuCypher ...
# If this test started off with a live configuration, ensure it still exists
if preexisting_live_configuration:
configuration_still_exists = os.path.isdir(DEFAULT_CONFIG_ROOT)
configuration_still_exists = DEFAULT_CONFIG_ROOT.is_dir()
assert configuration_still_exists
if preexisting_live_configuration_file:
file_still_exists = os.path.isfile(os.path.join(DEFAULT_CONFIG_ROOT, UrsulaConfiguration.generate_filename()))
file_still_exists = (DEFAULT_CONFIG_ROOT / UrsulaConfiguration.generate_filename()).is_file()
assert file_still_exists, 'WARNING: Test command deleted live non-test files'

View File

@ -176,7 +176,7 @@ def test_persistent_node_storage_integration(click_runner,
alice, ursula, another_ursula, felix, staker, *all_yall = testerchain.unassigned_accounts
filename = UrsulaConfiguration.generate_filename()
another_ursula_configuration_file_location = os.path.join(custom_filepath, filename)
another_ursula_configuration_file_location = custom_filepath / filename
init_args = ('ursula', 'init',
'--provider', TEST_PROVIDER_URI,
@ -254,7 +254,7 @@ def test_ursula_run_ip_checkup(testerchain, custom_filepath, click_runner, mocke
# Setup
teacher = blockchain_ursulas.pop()
filename = UrsulaConfiguration.generate_filename()
another_ursula_configuration_file_location = os.path.join(custom_filepath, filename)
another_ursula_configuration_file_location = custom_filepath / filename
# manual teacher
run_args = ('ursula', 'run',

View File

@ -74,10 +74,10 @@ def test_new_stakeholder(click_runner,
assert result.exit_code == 0
# Files and Directories
assert os.path.isdir(custom_filepath), 'Configuration file does not exist'
assert custom_filepath.is_dir(), 'Configuration file does not exist'
custom_config_filepath = os.path.join(custom_filepath, StakeHolderConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / StakeHolderConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
with open(custom_config_filepath, 'r') as config_file:
raw_config_data = config_file.read()
@ -376,12 +376,12 @@ def test_ursula_init(click_runner,
assert result.exit_code == 0
# Files and Directories
assert os.path.isdir(custom_filepath), 'Configuration file does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'keystore')), 'KEYSTORE does not exist'
assert os.path.isdir(os.path.join(custom_filepath, 'known_nodes')), 'known_nodes directory does not exist'
assert custom_filepath.is_dir(), 'Configuration file does not exist'
assert (custom_filepath / 'keystore').is_dir(), 'KEYSTORE does not exist'
assert (custom_filepath / 'known_nodes').is_dir(), 'known_nodes directory does not exist'
custom_config_filepath = os.path.join(custom_filepath, UrsulaConfiguration.generate_filename())
assert os.path.isfile(custom_config_filepath), 'Configuration file does not exist'
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
assert custom_config_filepath.is_file(), 'Configuration file does not exist'
with open(custom_config_filepath, 'r') as config_file:
raw_config_data = config_file.read()
@ -397,7 +397,7 @@ def test_ursula_run(click_runner,
custom_filepath,
testerchain):
custom_config_filepath = os.path.join(custom_filepath, UrsulaConfiguration.generate_filename())
custom_config_filepath = custom_filepath / UrsulaConfiguration.generate_filename()
# Now start running your Ursula!
init_args = ('ursula', 'run',

View File

@ -17,6 +17,8 @@
import os
import tempfile
from pathlib import Path
import pytest_twisted
import requests
from cryptography.hazmat.primitives import serialization
@ -63,8 +65,8 @@ def test_ursula_serves_statics(ursula_federated_test_config):
try:
with open("test-cert", "wb") as f:
f.write(cert_bytes)
os.makedirs(os.path.join(STATICS_DIR), exist_ok=True)
with open(os.path.join(STATICS_DIR, 'test-never-make-a-file-with-this-name.js'), 'w+') as fout:
os.makedirs(Path(STATICS_DIR), exist_ok=True)
with open(Path(STATICS_DIR, 'test-never-make-a-file-with-this-name.js'), 'w+') as fout:
fout.write("console.log('I am Javascript')\n")
fout.close()
yield threads.deferToThread(check_static_service, node, "test-cert")

View File

@ -52,14 +52,14 @@ BlockchainDeployerInterface.GAS_STRATEGIES = {**BlockchainDeployerInterface.GAS_
'free': free_gas_price_strategy}
def download_github_dir(source_link: str, target_folder: str):
def download_github_dir(source_link: str, target_folder: Path):
response = requests.get(source_link)
if response.status_code != 200:
error = f"Failed to call api {source_link} with status code {response.status_code}"
raise RuntimeError(error)
for content in response.json():
path = os.path.join(target_folder, content["name"])
path = target_folder / content["name"]
if content["type"] == "dir":
os.mkdir(path)
download_github_dir(content["url"], path)

View File

@ -119,7 +119,7 @@ test_logger = Logger("test-logger")
@pytest.fixture(scope="function")
def tempfile_path():
fd, path = tempfile.mkstemp()
yield path
yield Path(path)
os.close(fd)
os.remove(path)
@ -127,7 +127,7 @@ def tempfile_path():
@pytest.fixture(scope="module")
def temp_dir_path():
temp_dir = tempfile.TemporaryDirectory(prefix='nucypher-test-')
yield temp_dir.name
yield Path(temp_dir.name)
temp_dir.cleanup()
@ -141,7 +141,7 @@ def test_datastore():
def certificates_tempdir():
custom_filepath = '/tmp/nucypher-test-certificates-'
cert_tmpdir = tempfile.TemporaryDirectory(prefix=custom_filepath)
yield cert_tmpdir.name
yield Path(cert_tmpdir.name)
cert_tmpdir.cleanup()
@ -637,7 +637,7 @@ def agency_local_registry(testerchain, agency, test_registry):
registry = LocalContractRegistry(filepath=MOCK_REGISTRY_FILEPATH)
registry.write(test_registry.read())
yield registry
if os.path.exists(MOCK_REGISTRY_FILEPATH):
if MOCK_REGISTRY_FILEPATH.exists():
os.remove(MOCK_REGISTRY_FILEPATH)
@ -785,11 +785,11 @@ def mock_ursula_reencrypts():
@pytest.fixture(scope='session')
def stakeholder_config_file_location():
path = os.path.join('/', 'tmp', 'nucypher-test-stakeholder.json')
if os.path.exists(path):
path = Path('/', 'tmp', 'nucypher-test-stakeholder.json')
if path.exists():
os.remove(path)
yield path
if os.path.exists(path):
if path.exists():
os.remove(path)
@ -799,7 +799,7 @@ def software_stakeholder(testerchain, agency, stakeholder_config_file_location,
# Setup
path = stakeholder_config_file_location
if os.path.exists(path):
if path.exists():
os.remove(path)
# 0xaAa482c790b4301bE18D75A0D1B11B2ACBEF798B
@ -835,7 +835,7 @@ def software_stakeholder(testerchain, agency, stakeholder_config_file_location,
# Teardown
yield stakeholder
if os.path.exists(path):
if path.exists():
os.remove(path)
@ -1029,6 +1029,7 @@ def nominal_federated_configuration_fields():
del config
# TODO: Not used?
@pytest.fixture(scope='module')
def mock_allocation_infile(testerchain, token_economics, get_random_checksum_address):
accounts = [get_random_checksum_address() for _ in range(10)]
@ -1044,18 +1045,18 @@ def mock_allocation_infile(testerchain, token_economics, get_random_checksum_add
file.write(json.dumps(allocation_data))
yield MOCK_ALLOCATION_INFILE
if os.path.isfile(MOCK_ALLOCATION_INFILE):
if MOCK_ALLOCATION_INFILE.is_file():
os.remove(MOCK_ALLOCATION_INFILE)
@pytest.fixture(scope='function')
def new_local_registry():
filename = f'{BASE_TEMP_PREFIX}mock-empty-registry-{datetime.now().strftime(DATETIME_FORMAT)}.json'
registry_filepath = os.path.join(BASE_TEMP_DIR, filename)
registry_filepath = BASE_TEMP_DIR / filename
registry = LocalContractRegistry(filepath=registry_filepath)
registry.write(InMemoryContractRegistry().read())
yield registry
if os.path.exists(registry_filepath):
if registry_filepath.exists():
os.remove(registry_filepath)
@ -1082,16 +1083,14 @@ def custom_filepath_2():
@pytest.fixture(scope='module')
def worker_configuration_file_location(custom_filepath):
_configuration_file_location = os.path.join(MOCK_CUSTOM_INSTALLATION_PATH,
UrsulaConfiguration.generate_filename())
def worker_configuration_file_location(custom_filepath) -> Path:
_configuration_file_location = MOCK_CUSTOM_INSTALLATION_PATH / UrsulaConfiguration.generate_filename()
return _configuration_file_location
@pytest.fixture(scope='module')
def stakeholder_configuration_file_location(custom_filepath):
_configuration_file_location = os.path.join(MOCK_CUSTOM_INSTALLATION_PATH,
StakeHolderConfiguration.generate_filename())
def stakeholder_configuration_file_location(custom_filepath) -> Path:
_configuration_file_location = MOCK_CUSTOM_INSTALLATION_PATH / StakeHolderConfiguration.generate_filename()
return _configuration_file_location

View File

@ -110,15 +110,15 @@ def test_invalid_keystore(tmp_path):
Signer.from_signer_uri(uri=f'keystore:{bad_address}', testnet=True)
def test_signer_reads_keystore_from_disk(mock_account, mock_key, tmpdir):
def test_signer_reads_keystore_from_disk(mock_account, mock_key, temp_dir_path):
# Test reading a keyfile from the disk via KeystoreSigner since
# it is mocked for the rest of this test module
fake_ethereum = Path(tmpdir) / '.fake-ethereum'
fake_ethereum = temp_dir_path / '.fake-ethereum'
try:
fake_ethereum.mkdir()
tmp_keystore = Path(tmpdir) / '.fake-ethereum' / 'keystore'
tmp_keystore = temp_dir_path / '.fake-ethereum' / 'keystore'
tmp_keystore.mkdir()
mock_keyfile_path = tmp_keystore / MOCK_KEYFILE_NAME

View File

@ -152,8 +152,7 @@ def test_unlock_nucypher_keystore(mocker,
test_emitter,
capsys,
alice_blockchain_test_config,
patch_keystore,
tmpdir):
patch_keystore):
# Setup
# Do not test "real" unlocking here, just the plumbing

View File

@ -29,17 +29,17 @@ from nucypher.cli.literature import NO_CONFIGURATIONS_ON_DISK, DEFAULT_TO_LONE_C
def test_select_config_file_with_no_config_files(test_emitter,
capsys,
alice_blockchain_test_config,
tmpdir):
temp_dir_path):
# Setup
config_class = alice_blockchain_test_config
# Prove there are no config files on the disk.
assert not os.listdir(tmpdir)
assert not os.listdir(temp_dir_path)
with pytest.raises(click.Abort):
select_config_file(emitter=test_emitter,
config_class=config_class,
config_root=tmpdir)
config_root=temp_dir_path)
# Ensure we notified the user accurately.
captured = capsys.readouterr()
@ -51,12 +51,12 @@ def test_select_config_file_with_no_config_files(test_emitter,
def test_auto_select_config_file(test_emitter,
capsys,
alice_blockchain_test_config,
tmpdir,
temp_dir_path,
mock_stdin):
"""Only one configuration was found, so it was chosen automatically"""
config_class = alice_blockchain_test_config
config_path = Path(tmpdir) / config_class.generate_filename()
config_path = temp_dir_path / config_class.generate_filename()
# Make one configuration
config_class.to_configuration_file(filepath=config_path)
@ -64,10 +64,10 @@ def test_auto_select_config_file(test_emitter,
result = select_config_file(emitter=test_emitter,
config_class=config_class,
config_root=tmpdir)
config_root=temp_dir_path)
# ... ensure the correct account was selected
assert result == str(config_path)
assert result == config_path
# ... the user was *not* prompted
# If they were, `mock_stdin` would complain.
@ -81,7 +81,7 @@ def test_auto_select_config_file(test_emitter,
def test_interactive_select_config_file(test_emitter,
capsys,
alice_blockchain_test_config,
tmpdir,
temp_dir_path,
mock_stdin,
mock_accounts,
patch_keystore):
@ -93,19 +93,19 @@ def test_interactive_select_config_file(test_emitter,
config_class = config.__class__
# Make one configuration...
config_path = Path(tmpdir) / config_class.generate_filename()
config_path = temp_dir_path / config_class.generate_filename()
config.to_configuration_file(filepath=config_path)
assert config_path.exists()
select_config_file(emitter=test_emitter,
config_class=config_class,
config_root=tmpdir)
config_root=temp_dir_path)
# ... and then a bunch more
accounts = list(mock_accounts.items())
filenames = dict()
for filename, account in accounts:
config.checksum_address = account.address
config_path = Path(tmpdir) / config.generate_filename(modifier=account.address)
config_path = temp_dir_path / config.generate_filename(modifier=account.address)
path = config.to_configuration_file(filepath=config_path, modifier=account.address)
filenames[path] = account.address
assert config_path.exists()
@ -113,7 +113,7 @@ def test_interactive_select_config_file(test_emitter,
mock_stdin.line(str(user_input))
result = select_config_file(emitter=test_emitter,
config_class=config_class,
config_root=tmpdir)
config_root=temp_dir_path)
captured = capsys.readouterr()
for filename, account in accounts:

View File

@ -46,13 +46,13 @@ def mock_account_password_keystore(tmp_path_factory):
def test_ursula_init_with_local_keystore_signer(click_runner,
tmp_path,
temp_dir_path,
mocker,
mock_testerchain,
mock_account_password_keystore,
test_registry_source_manager):
custom_filepath = tmp_path
custom_config_filepath = tmp_path / UrsulaConfiguration.generate_filename()
custom_filepath = temp_dir_path
custom_config_filepath = temp_dir_path / UrsulaConfiguration.generate_filename()
worker_account, password, mock_keystore_path = mock_account_password_keystore
mock_signer_uri = f'keystore://{mock_keystore_path}'

View File

@ -18,6 +18,8 @@
import json
import os
from pathlib import Path
import pytest
import shutil
@ -32,32 +34,32 @@ configuration_name = 'something'
expected_extension = 'json'
configuration_value = 're-emerging llamas'
modifier = '1'
manual_expected_default_filepath = os.path.join('/', 'tmp', 'something.json')
manual_expected_modified_filepath = os.path.join('/', 'tmp', 'something-1.json')
manual_expected_default_filepath = Path('/', 'tmp', 'something.json')
manual_expected_modified_filepath = Path('/', 'tmp', 'something-1.json')
@pytest.fixture(scope='function', autouse=True)
def expected_configuration_filepaths():
# Setup
if os.path.exists(manual_expected_default_filepath):
if manual_expected_default_filepath.exists():
os.remove(manual_expected_default_filepath)
if os.path.exists(manual_expected_modified_filepath):
if manual_expected_modified_filepath.exists():
os.remove(manual_expected_modified_filepath)
yield manual_expected_default_filepath, manual_expected_modified_filepath
# Teardown
if os.path.exists(manual_expected_default_filepath):
if manual_expected_default_filepath.exists():
os.remove(manual_expected_default_filepath)
if os.path.exists(manual_expected_modified_filepath):
if manual_expected_modified_filepath.exists():
os.remove(manual_expected_modified_filepath)
class RestorableTestItem(BaseConfiguration):
NAME = 'something'
DEFAULT_CONFIG_ROOT = '/tmp'
DEFAULT_CONFIG_ROOT = Path('/tmp')
VERSION = 1
def __init__(self, item: str, *args, **kwargs):
@ -133,7 +135,7 @@ def test_configuration_filepath_utilities():
expected_default_filename = f'{RestorableTestItem.NAME}.{RestorableTestItem._CONFIG_FILE_EXTENSION}'
assert RestorableTestItem.generate_filename() == expected_default_filename
expected_default_filepath = os.path.join(RestorableTestItem.DEFAULT_CONFIG_ROOT, expected_default_filename)
expected_default_filepath = RestorableTestItem.DEFAULT_CONFIG_ROOT / expected_default_filename
assert expected_default_filepath == manual_expected_default_filepath
assert RestorableTestItem.default_filepath() == expected_default_filepath
@ -149,7 +151,7 @@ def test_configuration_filepath_utilities():
modified_filename = restorable_item.generate_filename(modifier=modifier)
assert modified_filename == expected_modified_filename
expected_modified_filepath = os.path.join(RestorableTestItem.DEFAULT_CONFIG_ROOT, expected_modified_filename)
expected_modified_filepath = RestorableTestItem.DEFAULT_CONFIG_ROOT / expected_modified_filename
modified_filepath = restorable_item.generate_filepath(override=False, modifier=modifier)
assert modified_filepath == expected_modified_filepath
@ -164,11 +166,11 @@ def test_configuration_preservation():
restorable_item = RestorableTestItem(item=configuration_value)
expected_default_filename = f'{RestorableTestItem.NAME}.{RestorableTestItem._CONFIG_FILE_EXTENSION}'
expected_default_filepath = os.path.join(RestorableTestItem.DEFAULT_CONFIG_ROOT, expected_default_filename)
expected_default_filepath = RestorableTestItem.DEFAULT_CONFIG_ROOT / expected_default_filename
# Serialize
serialized_item = restorable_item.serialize()
serialized_payload = json.dumps(restorable_item.static_payload(), indent=BaseConfiguration.INDENTATION)
assert restorable_item.serialize()
assert restorable_item.static_payload()
# Write to JSON file
filepath = restorable_item.to_configuration_file()

View File

@ -16,9 +16,11 @@
"""
import os
from pathlib import Path
from unittest.mock import Mock
import pytest
import tempfile
import pytest
from constant_sorrow.constants import CERTIFICATE_NOT_SAVED, NO_KEYSTORE_ATTACHED
from nucypher.blockchain.eth.actors import StakeHolder
@ -107,18 +109,18 @@ def test_federated_development_character_configurations(character, configuration
@pytest.mark.parametrize('configuration_class', all_configurations)
def test_default_character_configuration_preservation(configuration_class, testerchain, test_registry_source_manager, mocker, tmpdir):
configuration_class.DEFAULT_CONFIG_ROOT = '/tmp'
configuration_class.DEFAULT_CONFIG_ROOT = Path('/tmp')
fake_address = '0xdeadbeef'
network = TEMPORARY_DOMAIN
expected_filename = f'{configuration_class.NAME}.{configuration_class._CONFIG_FILE_EXTENSION}'
generated_filename = configuration_class.generate_filename()
assert generated_filename == expected_filename
expected_filepath = os.path.join('/', 'tmp', generated_filename)
expected_filepath = Path('/', 'tmp', generated_filename)
if os.path.exists(expected_filepath):
if expected_filepath.exists():
os.remove(expected_filepath)
assert not os.path.exists(expected_filepath)
assert not expected_filepath.exists()
if configuration_class == StakeHolderConfiguration:
# special case for defaults
@ -142,7 +144,7 @@ def test_default_character_configuration_preservation(configuration_class, teste
written_filepath = character_config.to_configuration_file()
assert written_filepath == expected_filepath
assert os.path.exists(written_filepath)
assert written_filepath.exists()
try:
# Read
@ -154,10 +156,10 @@ def test_default_character_configuration_preservation(configuration_class, teste
assert character_config.serialize() == restored_configuration.serialize()
# File still exists after reading
assert os.path.exists(written_filepath)
assert written_filepath.exists()
finally:
if os.path.exists(expected_filepath):
if expected_filepath.exists():
os.remove(expected_filepath)
@ -178,7 +180,7 @@ def test_ursula_development_configuration(federated_only=True):
# A Temporary Ursula
port = ursula_one.rest_information()[0].port
assert port == UrsulaConfiguration.DEFAULT_DEVELOPMENT_REST_PORT
assert tempfile.gettempdir() in ursula_one.datastore.db_path
assert tempfile.gettempdir() in str(ursula_one.datastore.db_path)
assert ursula_one.certificate_filepath is CERTIFICATE_NOT_SAVED
assert isinstance(ursula_one.node_storage, ForgetfulNodeStorage)
assert ':memory:' in ursula_one.node_storage._name

View File

@ -17,7 +17,6 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import datetime
import maya
import os
from nucypher.characters.lawful import Bob
from nucypher.config.characters import AliceConfiguration
@ -27,9 +26,9 @@ from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
from tests.utils.middleware import MockRestMiddleware
def test_alices_powers_are_persistent(federated_ursulas, tmpdir):
def test_alices_powers_are_persistent(federated_ursulas, temp_dir_path):
# Create a non-learning AliceConfiguration
config_root = os.path.join(tmpdir, 'nucypher-custom-alice-config')
config_root = temp_dir_path / 'nucypher-custom-alice-config'
alice_config = AliceConfiguration(
config_root=config_root,
network_middleware=MockRestMiddleware(),

View File

@ -36,11 +36,11 @@ from tests.constants import INSECURE_DEVELOPMENT_PASSWORD
from tests.utils.matchers import IsType
def test_generate_alice_keystore(tmpdir):
def test_generate_alice_keystore(temp_dir_path):
keystore = Keystore.generate(
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir
keystore_dir=temp_dir_path
)
with pytest.raises(Keystore.Locked):
@ -68,10 +68,10 @@ def test_generate_alice_keystore(tmpdir):
assert delegating_pubkey == another_delegating_pubkey
def test_characters_use_keystore(tmpdir):
def test_characters_use_keystore(temp_dir_path):
keystore = Keystore.generate(
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir
keystore_dir=temp_dir_path
)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
alice = Alice(federated_only=True, start_learning_now=False, keystore=keystore)
@ -87,10 +87,10 @@ def test_characters_use_keystore(tmpdir):
@pytest.mark.skip('Do we really though?')
def test_tls_hosting_certificate_remains_the_same(tmpdir, mocker):
def test_tls_hosting_certificate_remains_the_same(temp_dir_path, mocker):
keystore = Keystore.generate(
password=INSECURE_DEVELOPMENT_PASSWORD,
keystore_dir=tmpdir
keystore_dir=temp_dir_path
)
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)

View File

@ -108,7 +108,7 @@ class TestTemporaryFileBasedNodeStorage(BaseTestNodeStorageBackends):
some_node, another_node, *other = os.listdir(self.storage_backend.metadata_dir)
# Let's break the metadata (but not the version)
metadata_path = os.path.join(self.storage_backend.metadata_dir, some_node)
metadata_path = self.storage_backend.metadata_dir / some_node
with open(metadata_path, 'wb') as file:
file.write(Learner.LEARNER_VERSION.to_bytes(4, 'big') + b'invalid')
@ -118,7 +118,7 @@ class TestTemporaryFileBasedNodeStorage(BaseTestNodeStorageBackends):
certificate_only=False)
# Let's break the metadata, by putting a completely wrong version
metadata_path = os.path.join(self.storage_backend.metadata_dir, another_node)
metadata_path = self.storage_backend.metadata_dir / another_node
with open(metadata_path, 'wb') as file:
file.write(b'meh') # Versions are expected to be 4 bytes, but this is 3 bytes

View File

@ -14,6 +14,7 @@ GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import lmdb
import os
@ -213,8 +214,8 @@ def worker_address(worker_account):
@pytest.fixture(scope='module')
def custom_config_filepath(custom_filepath):
filepath = os.path.join(custom_filepath, UrsulaConfiguration.generate_filename())
def custom_config_filepath(custom_filepath: Path):
filepath = custom_filepath / UrsulaConfiguration.generate_filename()
return filepath

View File

@ -20,11 +20,12 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import json
from os.path import abspath, dirname
import io
import os
import re
from pathlib import Path
import tabulate
import time
from twisted.logger import ILogObserver, globalLogPublisher, jsonFileLogObserver
@ -69,7 +70,7 @@ class AnalyzeGas:
# Logging
LOG_NAME = 'estimate-gas'
LOG_FILENAME = '{}.log.json'.format(LOG_NAME)
OUTPUT_DIR = os.path.join(abspath(dirname(__file__)), 'results')
OUTPUT_DIR = Path(__file__) / 'results'
JSON_OUTPUT_FILENAME = '{}.json'.format(LOG_NAME)
_PATTERN = re.compile(r'''
@ -87,7 +88,7 @@ class AnalyzeGas:
self.log = Logger(self.__class__.__name__)
self.gas_estimations = dict()
if not os.path.isdir(self.OUTPUT_DIR):
if not self.OUTPUT_DIR.is_dir():
os.mkdir(self.OUTPUT_DIR)
@provider(ILogObserver)
@ -115,14 +116,14 @@ class AnalyzeGas:
epoch_time = str(int(time.time()))
timestamped_filename = '{}-{}'.format(epoch_time, self.JSON_OUTPUT_FILENAME)
filepath = os.path.join(self.OUTPUT_DIR, timestamped_filename)
filepath = self.OUTPUT_DIR / timestamped_filename
with open(filepath, 'w') as file:
file.write(json.dumps(self.gas_estimations, indent=4))
def start_collection(self) -> None:
print("Starting Data Collection...")
json_filepath = os.path.join(self.OUTPUT_DIR, AnalyzeGas.LOG_FILENAME)
json_filepath = self.OUTPUT_DIR / AnalyzeGas.LOG_FILENAME
json_io = io.open(json_filepath, "w")
json_observer = jsonFileLogObserver(json_io)
globalLogPublisher.addObserver(json_observer)

View File

@ -72,7 +72,7 @@ DEFAULT_SEEDNODE_URIS: List[str] = [
*TEACHER_NODES[DOMAIN],
]
INSECURE_PASSWORD: str = "METRICS_INSECURE_DEVELOPMENT_PASSWORD"
TEMP_ALICE_DIR: str = Path('/', 'tmp', 'grant-metrics')
TEMP_ALICE_DIR: Path = Path('/', 'tmp', 'grant-metrics')
# Policy Parameters
M: int = 1
@ -166,7 +166,7 @@ def make_alice(known_nodes: Optional[Set[Ursula]] = None):
provider_uri=ETHEREUM_PROVIDER_URI,
checksum_address=ALICE_ADDRESS,
signer_uri=f'keystore://{SIGNER_URI}',
config_root=os.path.join(TEMP_ALICE_DIR),
config_root=TEMP_ALICE_DIR,
domain=DOMAIN,
known_nodes=known_nodes,
start_learning_now=False,

View File

@ -17,17 +17,19 @@
from bisect import bisect_left
from contextlib import contextmanager
from pathlib import Path
import lmdb
from threading import Lock
from constant_sorrow.constants import MOCK_DB
def mock_lmdb_open(db_path, map_size=10485760):
def mock_lmdb_open(db_path: Path, map_size=10485760):
if db_path == MOCK_DB:
return MockEnvironment()
else:
return lmdb.Environment(db_path, map_size=map_size)
return lmdb.Environment(str(db_path), map_size=map_size)
class MockEnvironment:

View File

@ -14,7 +14,7 @@ GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import maya
import os
@ -22,7 +22,7 @@ import pytest
class NucypherPytestRunner:
TEST_PATH = os.path.join('tests', 'cli')
TEST_PATH = Path('tests') / 'cli'
PYTEST_ARGS = ['--verbose', TEST_PATH]
def pytest_sessionstart(self):

View File

View File

@ -14,6 +14,7 @@ GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
from pathlib import Path
import pytest
@ -28,7 +29,7 @@ def test_contract_registry(tempfile_path):
BaseContractRegistry(filepath='test')
with pytest.raises(BaseContractRegistry.RegistryError):
bad_registry = LocalContractRegistry(filepath='/fake/file/path/registry.json')
bad_registry = LocalContractRegistry(filepath=Path('/fake/file/path/registry.json'))
bad_registry.search(contract_address='0xdeadbeef')
# Tests everything is as it should be when initially created

View File

@ -215,7 +215,7 @@ def test_geth_web3_client():
def test_autodetect_provider_type_file(tempfile_path):
interface = ProviderTypeTestClient(provider_uri=tempfile_path, # existing file for test
interface = ProviderTypeTestClient(provider_uri=str(tempfile_path), # existing file for test
expected_provider_class=IPCProvider,
actual_provider_to_attach=MockGethProvider())
interface.connect()