Remove unused code.

pull/3016/head
derekpierre 2022-11-15 20:55:38 -05:00
parent dc488624e8
commit 6e8a5bdcdf
5 changed files with 9 additions and 77 deletions

View File

@ -1,15 +1,12 @@
import hashlib
import json
import shutil
import tempfile
from abc import ABC, abstractmethod
from json import JSONDecodeError
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Tuple, Type, Union
import requests
from constant_sorrow.constants import REGISTRY_COMMITTED
from nucypher.blockchain.eth import CONTRACT_REGISTRY_BASE
from nucypher.blockchain.eth.networks import NetworksInventory
@ -404,32 +401,6 @@ class LocalContractRegistry(BaseContractRegistry):
return payload
class TemporaryContractRegistry(LocalContractRegistry):
def __init__(self, *args, **kwargs) -> None:
_, self.temp_filepath = tempfile.mkstemp()
super().__init__(filepath=self.temp_filepath, *args, **kwargs)
def clear(self):
self.log.info("Cleared temporary registry at {}".format(self.filepath))
with open(self.filepath, 'w') as registry_file:
registry_file.write('')
def commit(self, filepath) -> Path:
"""writes the current state of the registry to a file"""
self.log.info("Committing temporary registry to {}".format(filepath))
self._swap_registry(filepath) # I'll allow it
if filepath.exists():
self.log.debug("Removing registry {}".format(filepath))
self.clear() # clear prior sim runs
_ = shutil.copy(self.temp_filepath, filepath)
self.temp_filepath = REGISTRY_COMMITTED # just in case
self.log.info("Wrote temporary registry to filesystem {}".format(filepath))
return filepath
class InMemoryContractRegistry(BaseContractRegistry):
def __init__(self, *args, **kwargs):

View File

@ -7,10 +7,9 @@ from typing import ClassVar, Dict, List, Optional
from constant_sorrow.constants import (
NO_BLOCKCHAIN_CONNECTION,
NO_CONTROL_PROTOCOL,
NO_NICKNAME,
NO_SIGNING_POWER,
STRANGER
STRANGER,
)
from eth_keys import KeyAPI as EthKeyAPI
from eth_utils import to_canonical_address
@ -18,7 +17,10 @@ from nucypher_core import MessageKit
from nucypher_core.umbral import PublicKey
from nucypher.acumen.nicknames import Nickname
from nucypher.blockchain.eth.registry import BaseContractRegistry, InMemoryContractRegistry
from nucypher.blockchain.eth.registry import (
BaseContractRegistry,
InMemoryContractRegistry,
)
from nucypher.blockchain.eth.signers.base import Signer
from nucypher.crypto.keystore import Keystore
from nucypher.crypto.powers import (
@ -26,12 +28,9 @@ from nucypher.crypto.powers import (
CryptoPowerUp,
DecryptingPower,
NoSigningPower,
SigningPower
)
from nucypher.crypto.signing import (
SignatureStamp,
StrangerStamp,
SigningPower,
)
from nucypher.crypto.signing import SignatureStamp, StrangerStamp
from nucypher.network.middleware import RestMiddleware
from nucypher.network.nodes import Learner

View File

@ -14,27 +14,12 @@ from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import Certificate
from cryptography.x509.oid import NameOID
from nucypher_core.umbral import SecretKey
_TLS_CERTIFICATE_ENCODING = Encoding.PEM
_TLS_CURVE = ec.SECP384R1
def _write_tls_certificate(certificate: Certificate,
full_filepath: Path,
force: bool = False,
) -> Path:
cert_already_exists = full_filepath.is_file()
if force is False and cert_already_exists:
raise FileExistsError('A TLS certificate already exists at {}.'.format(full_filepath.resolve()))
with open(full_filepath, 'wb') as certificate_file:
public_pem_bytes = certificate.public_bytes(_TLS_CERTIFICATE_ENCODING)
certificate_file.write(public_pem_bytes)
return full_filepath
def _read_tls_certificate(filepath: Path) -> Certificate:
"""Deserialize an X509 certificate from a filepath"""
try:

View File

@ -95,7 +95,7 @@ def get_context_value(context_variable: str, **context) -> Any:
value = context.get(context_variable)
if value is None:
raise RequiredContextVariable(
f'"No value provided for unrecognized context variable "{context_variable}"'
f'No value provided for unrecognized context variable "{context_variable}"'
)
else:
value = func(**context) # required inputs here

View File

@ -8,29 +8,6 @@ from web3._utils.contracts import encode_abi
from web3.contract import ContractConstructor
def to_bytes32(value=None, hexstr=None) -> bytes:
return Web3.toBytes(primitive=value, hexstr=hexstr).rjust(32, b'\0')
def to_32byte_hex(value=None, hexstr=None) -> str:
return Web3.toHex(to_bytes32(value=value, hexstr=hexstr))
def get_mapping_entry_location(key: bytes, mapping_location: int) -> int:
if not(isinstance(key, bytes) and len(key) == 32):
raise ValueError("Mapping key must be a 32-long bytestring")
# See https://solidity.readthedocs.io/en/latest/internals/layout_in_storage.html#mappings-and-dynamic-arrays
entry_location = Web3.toInt(Web3.keccak(key + mapping_location.to_bytes(32, "big")))
return entry_location
def get_array_data_location(array_location: int) -> int:
# See https://solidity.readthedocs.io/en/latest/internals/layout_in_storage.html#mappings-and-dynamic-arrays
data_location = Web3.toInt(Web3.keccak(to_bytes32(array_location)))
return data_location
def encode_constructor_arguments(web3: Web3,
constructor_function: ContractConstructor,
*constructor_args, **constructor_kwargs) -> HexStr:
@ -52,7 +29,7 @@ def connect_web3_provider(eth_provider_uri: str) -> None:
"""
Convenience function for connecting to an ethereum provider now.
This may be used to optimize the startup time of some applications by
establishing the connection eagarly.
establishing the connection eagerly.
"""
from nucypher.blockchain.eth.interfaces import BlockchainInterfaceFactory