mirror of https://github.com/nucypher/nucypher.git
Fixes for the new nucypher-core/umbral-pre API
parent
3731ae7226
commit
aa779c6557
|
@ -108,8 +108,12 @@ label = label.encode()
|
|||
# even before creating any associated policy.
|
||||
policy_pubkey = alicia.get_policy_encrypting_key_from_label(label)
|
||||
|
||||
print("The policy public key for "
|
||||
"label '{}' is {}".format(label.decode("utf-8"), bytes(policy_pubkey).hex()))
|
||||
print(
|
||||
"The policy public key for "
|
||||
"label '{}' is {}".format(
|
||||
label.decode("utf-8"), policy_pubkey.to_compressed_bytes().hex()
|
||||
)
|
||||
)
|
||||
|
||||
# Data Sources can produce encrypted data for access policies
|
||||
# that **don't exist yet**.
|
||||
|
@ -155,7 +159,7 @@ print("Done!")
|
|||
# For the demo, we need a way to share with Bob some additional info
|
||||
# about the policy, so we store it in a JSON file
|
||||
policy_info = {
|
||||
"policy_pubkey": bytes(policy.public_key).hex(),
|
||||
"policy_pubkey": policy.public_key.to_compressed_bytes().hex(),
|
||||
"alice_sig_pubkey": bytes(alicia.stamp).hex(),
|
||||
"label": label.decode("utf-8"),
|
||||
"treasure_map": base64.b64encode(bytes(policy.treasure_map)).decode()
|
||||
|
|
|
@ -47,8 +47,12 @@ print("Doctor = ", doctor)
|
|||
with open("policy-metadata.json", 'r') as f:
|
||||
policy_data = json.load(f)
|
||||
|
||||
policy_pubkey = PublicKey.from_bytes(bytes.fromhex(policy_data["policy_pubkey"]))
|
||||
alices_sig_pubkey = PublicKey.from_bytes(bytes.fromhex(policy_data["alice_sig_pubkey"]))
|
||||
policy_pubkey = PublicKey.from_compressed_bytes(
|
||||
bytes.fromhex(policy_data["policy_pubkey"])
|
||||
)
|
||||
alices_sig_pubkey = PublicKey.from_compressed_bytes(
|
||||
bytes.fromhex(policy_data["alice_sig_pubkey"])
|
||||
)
|
||||
label = policy_data["label"].encode()
|
||||
treasure_map = EncryptedTreasureMap.from_bytes(base64.b64decode(policy_data["treasure_map"].encode()))
|
||||
|
||||
|
|
|
@ -12,8 +12,8 @@ def generate_doctor_keys():
|
|||
sig_privkey = SecretKey.random()
|
||||
|
||||
doctor_privkeys = {
|
||||
'enc': enc_privkey.to_secret_bytes().hex(),
|
||||
'sig': sig_privkey.to_secret_bytes().hex(),
|
||||
"enc": enc_privkey.to_be_bytes().hex(),
|
||||
"sig": sig_privkey.to_be_bytes().hex(),
|
||||
}
|
||||
|
||||
with open(DOCTOR_PRIVATE_JSON, 'w') as f:
|
||||
|
@ -22,14 +22,14 @@ def generate_doctor_keys():
|
|||
enc_pubkey = enc_privkey.public_key()
|
||||
sig_pubkey = sig_privkey.public_key()
|
||||
doctor_pubkeys = {
|
||||
'enc': bytes(enc_pubkey).hex(),
|
||||
'sig': bytes(sig_pubkey).hex()
|
||||
"enc": enc_pubkey.to_compressed_bytes().hex(),
|
||||
"sig": sig_pubkey.to_compressed_bytes().hex(),
|
||||
}
|
||||
with open(DOCTOR_PUBLIC_JSON, 'w') as f:
|
||||
json.dump(doctor_pubkeys, f)
|
||||
|
||||
|
||||
def _get_keys(file, key_class):
|
||||
def _get_keys(file, public=False):
|
||||
if not file.exists():
|
||||
generate_doctor_keys()
|
||||
|
||||
|
@ -37,13 +37,18 @@ def _get_keys(file, key_class):
|
|||
stored_keys = json.load(f)
|
||||
keys = dict()
|
||||
for key_type, key_str in stored_keys.items():
|
||||
keys[key_type] = key_class.from_bytes(bytes.fromhex(key_str))
|
||||
data = bytes.fromhex(key_str)
|
||||
if public:
|
||||
key = PublicKey.from_compressed_bytes(data)
|
||||
else:
|
||||
key = SecretKey.from_be_bytes(data)
|
||||
keys[key_type] = key
|
||||
return keys
|
||||
|
||||
|
||||
def get_doctor_pubkeys():
|
||||
return _get_keys(DOCTOR_PUBLIC_JSON, PublicKey)
|
||||
return _get_keys(DOCTOR_PUBLIC_JSON, public=True)
|
||||
|
||||
|
||||
def get_doctor_privkeys():
|
||||
return _get_keys(DOCTOR_PRIVATE_JSON, SecretKey)
|
||||
return _get_keys(DOCTOR_PRIVATE_JSON, public=False)
|
||||
|
|
|
@ -237,7 +237,7 @@ class Character(Learner):
|
|||
|
||||
for power_up, public_key in powers_and_material.items():
|
||||
try:
|
||||
umbral_key = PublicKey.from_bytes(public_key)
|
||||
umbral_key = PublicKey.from_compressed_bytes(public_key)
|
||||
except TypeError:
|
||||
umbral_key = public_key
|
||||
|
||||
|
|
|
@ -41,7 +41,12 @@ from nucypher_core import (
|
|||
ReencryptionResponse,
|
||||
TreasureMap,
|
||||
)
|
||||
from nucypher_core.umbral import PublicKey, VerifiedKeyFrag, reencrypt
|
||||
from nucypher_core.umbral import (
|
||||
PublicKey,
|
||||
RecoverableSignature,
|
||||
VerifiedKeyFrag,
|
||||
reencrypt,
|
||||
)
|
||||
from twisted.internet import reactor
|
||||
from twisted.logger import Logger
|
||||
from web3.types import TxReceipt
|
||||
|
@ -453,7 +458,6 @@ class Bob(Character):
|
|||
|
||||
if not publisher_verifying_key:
|
||||
publisher_verifying_key = alice_verifying_key
|
||||
publisher_verifying_key = PublicKey.from_bytes(bytes(publisher_verifying_key))
|
||||
|
||||
# A small optimization to avoid multiple treasure map decryptions.
|
||||
map_hash = hash(bytes(encrypted_treasure_map))
|
||||
|
@ -858,7 +862,12 @@ class Ursula(Teacher, Character, Operator):
|
|||
# so we can cache the result of this method.
|
||||
# TODO: should this be a method of Teacher?
|
||||
timestamp = maya.now()
|
||||
operator_signature = self.operator_signature
|
||||
|
||||
# TODO: federated mode is gone, but for some reason a node may still not have
|
||||
# an operator signature created. Fill in a dummy value for now.
|
||||
operator_signature = self.operator_signature or (b"0" * 64 + b"\x00")
|
||||
|
||||
operator_signature = RecoverableSignature.from_be_bytes(operator_signature)
|
||||
payload = NodeMetadataPayload(staking_provider_address=Address(self.canonical_address),
|
||||
domain=self.domain,
|
||||
timestamp_epoch=timestamp.epoch,
|
||||
|
@ -1105,7 +1114,9 @@ class Enrico:
|
|||
def __init__(self, policy_encrypting_key: PublicKey):
|
||||
self.signing_power = SigningPower()
|
||||
self._policy_pubkey = policy_encrypting_key
|
||||
self.log = Logger(f'{self.__class__.__name__}-{bytes(self.signing_power.public_key()).hex()[:6]}')
|
||||
self.log = Logger(
|
||||
f"{self.__class__.__name__}-{self.signing_power.public_key().to_compressed_bytes().hex()[:6]}"
|
||||
)
|
||||
self.log.info(self.banner.format(policy_encrypting_key))
|
||||
|
||||
def encrypt_message(
|
||||
|
|
|
@ -85,8 +85,10 @@ class Vladimir(Ursula):
|
|||
|
||||
# Use our own verifying key
|
||||
if substitute_verifying_key:
|
||||
metadata_bytes = metadata_bytes.replace(bytes(metadata.payload.verifying_key),
|
||||
bytes(vladimir.stamp.as_umbral_pubkey()))
|
||||
metadata_bytes = metadata_bytes.replace(
|
||||
metadata.payload.verifying_key.to_compressed_bytes(),
|
||||
vladimir.stamp.as_umbral_pubkey().to_compressed_bytes(),
|
||||
)
|
||||
|
||||
fake_metadata = NodeMetadata.from_bytes(metadata_bytes)
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ class UmbralPublicKeyHex(click.ParamType):
|
|||
def convert(self, value, param, ctx):
|
||||
if self.validate:
|
||||
try:
|
||||
_key = PublicKey.from_bytes(bytes.fromhex(value))
|
||||
_key = PublicKey.from_compressed_bytes(bytes.fromhex(value))
|
||||
except (InternalError, ValueError):
|
||||
self.fail(f"'{value}' is not a valid nucypher public key.")
|
||||
return value
|
||||
|
|
|
@ -68,7 +68,7 @@ class Keypair(object):
|
|||
|
||||
:return: Hexdigest fingerprint of key (keccak-256) in bytes
|
||||
"""
|
||||
return sha3.keccak_256(bytes(self.pubkey)).hexdigest().encode()
|
||||
return sha3.keccak_256(self.pubkey.to_compressed_bytes()).hexdigest().encode()
|
||||
|
||||
|
||||
class DecryptingKeypair(Keypair):
|
||||
|
@ -159,7 +159,7 @@ class HostingKeypair(Keypair):
|
|||
message = "If you don't supply a TLS certificate, one will be generated for you." \
|
||||
"But for that, you need to pass a host and checksum address."
|
||||
raise TypeError(message)
|
||||
certificate, private_key = generate_self_signed_certificate(host=host, private_key=private_key)
|
||||
certificate, private_key = generate_self_signed_certificate(host=host)
|
||||
super().__init__(private_key=private_key)
|
||||
|
||||
else:
|
||||
|
|
|
@ -200,9 +200,16 @@ def _parse_path(path: Path) -> Tuple[int, str]:
|
|||
return created, keystore_id
|
||||
|
||||
|
||||
def _derive_hosting_power(host: str, private_key: SecretKey) -> TLSHostingPower:
|
||||
certificate, private_key = generate_self_signed_certificate(host=host, private_key=private_key)
|
||||
keypair = HostingKeypair(host=host, private_key=private_key, certificate=certificate, generate_certificate=False)
|
||||
def _derive_hosting_power(host: str, secret_seed: bytes) -> TLSHostingPower:
|
||||
certificate, private_key = generate_self_signed_certificate(
|
||||
host=host, secret_seed=secret_seed
|
||||
)
|
||||
keypair = HostingKeypair(
|
||||
host=host,
|
||||
private_key=private_key,
|
||||
certificate=certificate,
|
||||
generate_certificate=False,
|
||||
)
|
||||
power = TLSHostingPower(keypair=keypair, host=host)
|
||||
return power
|
||||
|
||||
|
@ -266,8 +273,12 @@ class Keystore:
|
|||
raise InvalidPassword(''.join(failures))
|
||||
|
||||
# Derive verifying key (for use as ID)
|
||||
signing_key = SecretKeyFactory.from_secure_randomness(secret).make_key(_SIGNING_INFO)
|
||||
keystore_id = bytes(signing_key.public_key()).hex()[:Keystore._ID_SIZE]
|
||||
signing_key = SecretKeyFactory.from_secure_randomness(secret).make_key(
|
||||
_SIGNING_INFO
|
||||
)
|
||||
keystore_id = (
|
||||
signing_key.public_key().to_compressed_bytes().hex()[: Keystore._ID_SIZE]
|
||||
)
|
||||
|
||||
# Generate paths
|
||||
keystore_dir = keystore_dir or Keystore._DEFAULT_DIR
|
||||
|
@ -312,9 +323,13 @@ class Keystore:
|
|||
emitter = StdoutEmitter()
|
||||
emitter.message(f'WARNING: Key importing assumes that you have already secured your secret '
|
||||
f'and can recover it. No mnemonic will be generated.\n', color='yellow')
|
||||
if len(key_material) != SecretKey.serialized_size():
|
||||
raise ValueError(f'Entropy bytes bust be exactly {SecretKey.serialized_size()}.')
|
||||
path = Keystore.__save(secret=key_material, password=password, keystore_dir=keystore_dir)
|
||||
if len(key_material) != SecretKeyFactory.seed_size():
|
||||
raise ValueError(
|
||||
f"Entropy bytes bust be exactly {SecretKeyFactory.seed_size()}."
|
||||
)
|
||||
path = Keystore.__save(
|
||||
secret=key_material, password=password, keystore_dir=keystore_dir
|
||||
)
|
||||
keystore = cls(keystore_path=path)
|
||||
return keystore
|
||||
|
||||
|
@ -396,13 +411,15 @@ class Keystore:
|
|||
failure_message = f"{power_class.__name__} is an invalid type for deriving a CryptoPower"
|
||||
raise TypeError(failure_message)
|
||||
else:
|
||||
__private_key = SecretKeyFactory.from_secure_randomness(self.__secret).make_key(info)
|
||||
__skf = SecretKeyFactory.from_secure_randomness(self.__secret)
|
||||
|
||||
if power_class is TLSHostingPower: # TODO: something more elegant?
|
||||
power = _derive_hosting_power(private_key=__private_key, *power_args, **power_kwargs)
|
||||
power = _derive_hosting_power(
|
||||
secret_seed=__skf.make_secret(info), *power_args, **power_kwargs
|
||||
)
|
||||
|
||||
elif issubclass(power_class, KeyPairBasedPower):
|
||||
keypair = power_class._keypair_class(__private_key)
|
||||
keypair = power_class._keypair_class(__skf.make_key(info))
|
||||
power = power_class(keypair=keypair, *power_args, **power_kwargs)
|
||||
|
||||
elif issubclass(power_class, DerivedKeyBasedPower):
|
||||
|
|
|
@ -193,7 +193,7 @@ class KeyPairBasedPower(CryptoPowerUp):
|
|||
public_key = public_key.as_umbral_pubkey()
|
||||
except AttributeError:
|
||||
try:
|
||||
public_key = PublicKey.from_bytes(public_key)
|
||||
public_key = PublicKey.from_compressed_bytes(public_key)
|
||||
except TypeError:
|
||||
public_key = public_key
|
||||
self.keypair = self._keypair_class(
|
||||
|
|
|
@ -11,7 +11,7 @@ class SignatureStamp(object):
|
|||
|
||||
def __init__(self, verifying_key, signer: Signer = None) -> None:
|
||||
self.__signer = signer
|
||||
self._as_bytes = bytes(verifying_key)
|
||||
self._as_bytes = verifying_key.to_compressed_bytes()
|
||||
self._as_umbral_pubkey = verifying_key
|
||||
|
||||
def __bytes__(self):
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
import datetime
|
||||
from ipaddress import IPv4Address
|
||||
from pathlib import Path
|
||||
from typing import ClassVar, Tuple
|
||||
from typing import ClassVar, Optional, Tuple
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
@ -30,14 +30,14 @@ def _read_tls_certificate(filepath: Path) -> Certificate:
|
|||
raise FileNotFoundError("No SSL certificate found at {}".format(filepath))
|
||||
|
||||
|
||||
def generate_self_signed_certificate(host: str,
|
||||
private_key: SecretKey = None,
|
||||
days_valid: int = 365,
|
||||
curve: ClassVar[EllipticCurve] = _TLS_CURVE,
|
||||
) -> Tuple[Certificate, _EllipticCurvePrivateKey]:
|
||||
|
||||
if private_key:
|
||||
private_bn = int.from_bytes(private_key.to_secret_bytes(), 'big')
|
||||
def generate_self_signed_certificate(
|
||||
host: str,
|
||||
secret_seed: Optional[bytes] = None,
|
||||
days_valid: int = 365,
|
||||
curve: ClassVar[EllipticCurve] = _TLS_CURVE,
|
||||
) -> Tuple[Certificate, _EllipticCurvePrivateKey]:
|
||||
if secret_seed:
|
||||
private_bn = int.from_bytes(secret_seed[: _TLS_CURVE.key_size // 8], "big")
|
||||
private_key = ec.derive_private_key(private_value=private_bn, curve=curve())
|
||||
else:
|
||||
private_key = ec.generate_private_key(curve(), default_backend())
|
||||
|
|
|
@ -21,7 +21,7 @@ SYSTEM_RAND = SystemRandom()
|
|||
def canonical_address_from_umbral_key(public_key: Union[PublicKey, SignatureStamp]) -> bytes:
|
||||
if isinstance(public_key, SignatureStamp):
|
||||
public_key = public_key.as_umbral_pubkey()
|
||||
pubkey_compressed_bytes = bytes(public_key)
|
||||
pubkey_compressed_bytes = public_key.to_compressed_bytes()
|
||||
eth_pubkey = EthKeyAPI.PublicKey.from_compressed_bytes(pubkey_compressed_bytes)
|
||||
canonical_address = eth_pubkey.to_canonical_address()
|
||||
return canonical_address
|
||||
|
|
|
@ -11,13 +11,12 @@ from eth_account.messages import HexBytes, SignableMessage, encode_defunct
|
|||
from eth_keys import KeyAPI as EthKeyAPI
|
||||
from eth_tester.exceptions import TransactionFailed
|
||||
from eth_utils import to_canonical_address, to_checksum_address, to_normalized_address
|
||||
|
||||
from nucypher_core.umbral import SecretKey, PublicKey, Signer, Signature
|
||||
from nucypher_core.umbral import PublicKey, SecretKey, Signature, Signer
|
||||
|
||||
from nucypher.crypto.utils import (
|
||||
canonical_address_from_umbral_key,
|
||||
keccak_digest,
|
||||
verify_eip_191
|
||||
verify_eip_191,
|
||||
)
|
||||
|
||||
ALGORITHM_KECCAK256 = 0
|
||||
|
@ -39,7 +38,7 @@ def get_signature_recovery_value(message: bytes,
|
|||
:return: The compressed byte-serialized representation of the recovered public key
|
||||
"""
|
||||
|
||||
signature = bytes(signature)
|
||||
signature = signature.to_be_bytes()
|
||||
ecdsa_signature_size = 64 # two curve scalars
|
||||
if len(signature) != ecdsa_signature_size:
|
||||
raise ValueError(f"The signature size should be {ecdsa_signature_size} B.")
|
||||
|
@ -70,7 +69,9 @@ def pubkey_as_uncompressed_bytes(umbral_pubkey):
|
|||
"""
|
||||
Returns the public key as uncompressed bytes (without the prefix, so 64 bytes long)
|
||||
"""
|
||||
return EthKeyAPI.PublicKey.from_compressed_bytes(bytes(umbral_pubkey)).to_bytes()
|
||||
return EthKeyAPI.PublicKey.from_compressed_bytes(
|
||||
umbral_pubkey.to_compressed_bytes()
|
||||
).to_bytes()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
|
@ -100,7 +101,7 @@ def test_recover(testerchain, signature_verifier):
|
|||
# If we don't have recovery id while signing then we should try to recover public key with different v
|
||||
# Only the correct v will match the correct public key
|
||||
v = get_signature_recovery_value(message, signature, umbral_pubkey)
|
||||
recoverable_signature = bytes(signature) + v
|
||||
recoverable_signature = signature.to_be_bytes() + v
|
||||
|
||||
# Check recovery method in the contract
|
||||
assert signer_address == to_normalized_address(
|
||||
|
@ -112,12 +113,12 @@ def test_recover(testerchain, signature_verifier):
|
|||
signature_verifier.functions.recover(message_hash, recoverable_signature).call())
|
||||
|
||||
# Only number 0,1,27,28 are supported for v
|
||||
recoverable_signature = bytes(signature) + bytes([2])
|
||||
recoverable_signature = signature.to_be_bytes() + bytes([2])
|
||||
with pytest.raises((TransactionFailed, ValueError)):
|
||||
signature_verifier.functions.recover(message_hash, recoverable_signature).call()
|
||||
|
||||
# Signature must include r, s and v
|
||||
recoverable_signature = bytes(signature)
|
||||
recoverable_signature = signature.to_be_bytes()
|
||||
with pytest.raises((TransactionFailed, ValueError)):
|
||||
signature_verifier.functions.recover(message_hash, recoverable_signature).call()
|
||||
|
||||
|
@ -160,7 +161,7 @@ def test_verify(testerchain, signature_verifier):
|
|||
|
||||
# Get recovery id (v) before using contract
|
||||
v = get_signature_recovery_value(message, signature, umbral_pubkey)
|
||||
recoverable_signature = bytes(signature) + v
|
||||
recoverable_signature = signature.to_be_bytes() + v
|
||||
|
||||
# Verify signature
|
||||
assert signature_verifier.functions.verify(message,
|
||||
|
@ -191,8 +192,9 @@ def test_verify_eip191(testerchain, signature_verifier):
|
|||
|
||||
# Produce EIP191 signature (version E)
|
||||
signable_message = encode_defunct(primitive=message)
|
||||
signature = Account.sign_message(signable_message=signable_message,
|
||||
private_key=umbral_privkey.to_secret_bytes())
|
||||
signature = Account.sign_message(
|
||||
signable_message=signable_message, private_key=umbral_privkey.to_be_bytes()
|
||||
)
|
||||
signature = bytes(signature.signature)
|
||||
|
||||
# Off-chain verify, just in case
|
||||
|
@ -229,11 +231,12 @@ def test_verify_eip191(testerchain, signature_verifier):
|
|||
|
||||
# Produce EIP191 signature (version 0)
|
||||
validator = to_canonical_address(signature_verifier.address)
|
||||
signable_message = SignableMessage(version=HexBytes(version_0),
|
||||
header=HexBytes(validator),
|
||||
body=HexBytes(message))
|
||||
signature = Account.sign_message(signable_message=signable_message,
|
||||
private_key=umbral_privkey.to_secret_bytes())
|
||||
signable_message = SignableMessage(
|
||||
version=HexBytes(version_0), header=HexBytes(validator), body=HexBytes(message)
|
||||
)
|
||||
signature = Account.sign_message(
|
||||
signable_message=signable_message, private_key=umbral_privkey.to_be_bytes()
|
||||
)
|
||||
signature = bytes(signature.signature)
|
||||
|
||||
# Off-chain verify, just in case
|
||||
|
|
|
@ -51,7 +51,7 @@ def test_capsule(testerchain, deserializer, fragments):
|
|||
|
||||
# Check real capsule
|
||||
capsule, _cfrag = fragments
|
||||
capsule_bytes = bytes(capsule)
|
||||
capsule_bytes = capsule.to_bytes_simple()
|
||||
result = deserializer.functions.toCapsule(capsule_bytes).call()
|
||||
assert b''.join(result) == capsule_bytes
|
||||
|
||||
|
@ -72,7 +72,7 @@ def test_cfrag(testerchain, deserializer, fragments):
|
|||
|
||||
# Check real cfrag
|
||||
_capsule, cfrag = fragments
|
||||
cfrag_bytes = bytes(cfrag)
|
||||
cfrag_bytes = cfrag.unverify().to_bytes_simple()
|
||||
result_frag = deserializer.functions.toCapsuleFrag(cfrag_bytes).call()
|
||||
result_proof = deserializer.functions.toCorrectnessProofFromCapsuleFrag(cfrag_bytes).call()
|
||||
assert cfrag_bytes == b''.join(result_frag) + b''.join(result_proof)
|
||||
|
|
|
@ -123,8 +123,10 @@ def collect(alice: Alice,
|
|||
print(f'GRANT FAIL\n{e}')
|
||||
else:
|
||||
success += 1
|
||||
policies[bytes(policy.public_key).hex()] = policy # track
|
||||
print(f"PEK:{bytes(policy.public_key).hex()} | {policy.hrac}")
|
||||
policies[policy.public_key.to_compressed_bytes().hex()] = policy # track
|
||||
print(
|
||||
f"PEK:{policy.public_key.to_compressed_bytes().hex()} | {policy.hrac}"
|
||||
)
|
||||
|
||||
# timeit
|
||||
end = maya.now()
|
||||
|
|
|
@ -37,7 +37,7 @@ class NotAPublicKey:
|
|||
_serial_bytes_length = 5
|
||||
_serial = 10000
|
||||
|
||||
_umbral_pubkey_from_bytes = PublicKey.from_bytes
|
||||
_umbral_pubkey_from_bytes = PublicKey.from_compressed_bytes
|
||||
|
||||
def _tick():
|
||||
for serial in good_serials:
|
||||
|
|
|
@ -26,10 +26,16 @@ def test_keypair_with_umbral_keys():
|
|||
|
||||
new_keypair_from_priv = keypairs.Keypair(umbral_privkey)
|
||||
assert new_keypair_from_priv._privkey == umbral_privkey
|
||||
assert bytes(new_keypair_from_priv.pubkey) == bytes(umbral_pubkey)
|
||||
assert (
|
||||
new_keypair_from_priv.pubkey.to_compressed_bytes()
|
||||
== umbral_pubkey.to_compressed_bytes()
|
||||
)
|
||||
|
||||
new_keypair_from_pub = keypairs.Keypair(public_key=umbral_pubkey)
|
||||
assert bytes(new_keypair_from_pub.pubkey) == bytes(umbral_pubkey)
|
||||
assert (
|
||||
new_keypair_from_pub.pubkey.to_compressed_bytes()
|
||||
== umbral_pubkey.to_compressed_bytes()
|
||||
)
|
||||
assert new_keypair_from_pub._privkey == PUBLIC_ONLY
|
||||
|
||||
|
||||
|
@ -37,8 +43,8 @@ def test_keypair_serialization():
|
|||
umbral_pubkey = SecretKey.random().public_key()
|
||||
new_keypair = keypairs.Keypair(public_key=umbral_pubkey)
|
||||
|
||||
pubkey_bytes = bytes(new_keypair.pubkey)
|
||||
assert pubkey_bytes == bytes(umbral_pubkey)
|
||||
pubkey_bytes = new_keypair.pubkey.to_compressed_bytes()
|
||||
assert pubkey_bytes == umbral_pubkey.to_compressed_bytes()
|
||||
|
||||
|
||||
def test_keypair_fingerprint():
|
||||
|
@ -48,7 +54,9 @@ def test_keypair_fingerprint():
|
|||
fingerprint = new_keypair.fingerprint()
|
||||
assert fingerprint is not None
|
||||
|
||||
umbral_fingerprint = sha3.keccak_256(bytes(umbral_pubkey)).hexdigest().encode()
|
||||
umbral_fingerprint = (
|
||||
sha3.keccak_256(umbral_pubkey.to_compressed_bytes()).hexdigest().encode()
|
||||
)
|
||||
assert fingerprint == umbral_fingerprint
|
||||
|
||||
|
||||
|
|
|
@ -232,26 +232,37 @@ def test_restore_keystore_from_mnemonic(tmpdir, mocker):
|
|||
|
||||
|
||||
def test_import_custom_keystore(tmpdir):
|
||||
# Too short - 32 bytes is required
|
||||
custom_secret = b"tooshort"
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match=f"Entropy bytes bust be exactly {SecretKeyFactory.seed_size()}.",
|
||||
):
|
||||
_keystore = Keystore.import_secure(
|
||||
key_material=custom_secret,
|
||||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
keystore_dir=tmpdir,
|
||||
)
|
||||
|
||||
# Too short - 32 bytes is required
|
||||
custom_secret = b'tooshort'
|
||||
with pytest.raises(ValueError, match=f'Entropy bytes bust be exactly {SecretKey.serialized_size()}.'):
|
||||
_keystore = Keystore.import_secure(key_material=custom_secret,
|
||||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
keystore_dir=tmpdir)
|
||||
|
||||
# Too short - 32 bytes is required
|
||||
custom_secret = b'thisisabunchofbytesthatisabittoolong'
|
||||
with pytest.raises(ValueError, match=f'Entropy bytes bust be exactly {SecretKey.serialized_size()}.'):
|
||||
_keystore = Keystore.import_secure(key_material=custom_secret,
|
||||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
keystore_dir=tmpdir)
|
||||
custom_secret = b"thisisabunchofbytesthatisabittoolong"
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match=f"Entropy bytes bust be exactly {SecretKeyFactory.seed_size()}.",
|
||||
):
|
||||
_keystore = Keystore.import_secure(
|
||||
key_material=custom_secret,
|
||||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
keystore_dir=tmpdir,
|
||||
)
|
||||
|
||||
# Import private key
|
||||
custom_secret = os.urandom(SecretKey.serialized_size()) # insecure but works
|
||||
keystore = Keystore.import_secure(key_material=custom_secret,
|
||||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
keystore_dir=tmpdir)
|
||||
custom_secret = os.urandom(SecretKeyFactory.seed_size()) # insecure but works
|
||||
keystore = Keystore.import_secure(
|
||||
key_material=custom_secret,
|
||||
password=INSECURE_DEVELOPMENT_PASSWORD,
|
||||
keystore_dir=tmpdir,
|
||||
)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
assert keystore._Keystore__secret == custom_secret
|
||||
keystore.lock()
|
||||
|
@ -269,7 +280,7 @@ def test_derive_signing_power(tmpdir):
|
|||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
signing_power = keystore.derive_crypto_power(power_class=SigningPower)
|
||||
assert bytes(signing_power.public_key()).hex()
|
||||
assert signing_power.public_key().to_compressed_bytes().hex()
|
||||
assert signing_power.keypair.fingerprint()
|
||||
|
||||
|
||||
|
@ -277,7 +288,7 @@ def test_derive_decrypting_power(tmpdir):
|
|||
keystore = Keystore.generate(INSECURE_DEVELOPMENT_PASSWORD, keystore_dir=tmpdir)
|
||||
keystore.unlock(password=INSECURE_DEVELOPMENT_PASSWORD)
|
||||
decrypting_power = keystore.derive_crypto_power(power_class=DecryptingPower)
|
||||
assert bytes(decrypting_power.public_key()).hex()
|
||||
assert decrypting_power.public_key().to_compressed_bytes().hex()
|
||||
assert decrypting_power.keypair.fingerprint()
|
||||
|
||||
|
||||
|
@ -287,8 +298,11 @@ def test_derive_delegating_power(tmpdir):
|
|||
delegating_power = keystore.derive_crypto_power(power_class=DelegatingPower)
|
||||
parent_skf = SecretKeyFactory.from_secure_randomness(keystore._Keystore__secret)
|
||||
child_skf = parent_skf.make_factory(_DELEGATING_INFO)
|
||||
assert delegating_power._DelegatingPower__secret_key_factory.to_secret_bytes() == child_skf.to_secret_bytes()
|
||||
assert delegating_power._get_privkey_from_label(label=b'some-label')
|
||||
ref_pk = child_skf.make_key(b"some-label").public_key()
|
||||
assert (
|
||||
delegating_power._get_privkey_from_label(label=b"some-label").public_key()
|
||||
== ref_pk
|
||||
)
|
||||
|
||||
|
||||
def test_derive_hosting_power(tmpdir):
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
|
||||
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from eth_utils import to_checksum_address
|
||||
from nucypher_core import Address, NodeMetadata, NodeMetadataPayload
|
||||
from nucypher_core.umbral import SecretKey, Signer
|
||||
from nucypher_core.umbral import RecoverableSignature, SecretKey, Signer
|
||||
|
||||
from nucypher.acumen.perception import FleetSensor
|
||||
from nucypher.characters.lawful import Ursula
|
||||
|
@ -16,12 +17,12 @@ from nucypher.network.middleware import NucypherMiddlewareClient
|
|||
from nucypher.network.nodes import TEACHER_NODES
|
||||
from nucypher.network.protocols import InterfaceInfo
|
||||
from nucypher.utilities.networking import (
|
||||
CENTRALIZED_IP_ORACLE_URL,
|
||||
UnknownIPAddress,
|
||||
determine_external_ip_address,
|
||||
get_external_ip_from_centralized_source,
|
||||
get_external_ip_from_default_teacher,
|
||||
get_external_ip_from_known_nodes,
|
||||
CENTRALIZED_IP_ORACLE_URL,
|
||||
UnknownIPAddress
|
||||
)
|
||||
from tests.constants import MOCK_IP_ADDRESS
|
||||
|
||||
|
@ -63,7 +64,9 @@ class Dummy: # Teacher
|
|||
signer = Signer(SecretKey.random())
|
||||
|
||||
# A dummy signature with the recovery byte
|
||||
dummy_signature = bytes(signer.sign(b'whatever')) + b'\x00'
|
||||
dummy_signature = RecoverableSignature.from_be_bytes(
|
||||
signer.sign(b"whatever").to_be_bytes() + b"\x00"
|
||||
)
|
||||
|
||||
payload = NodeMetadataPayload(staking_provider_address=Address(self.canonical_address),
|
||||
domain=':dummy:',
|
||||
|
|
Loading…
Reference in New Issue