Enforce KeyFrag verification before reencryption

pull/267/head
Bogdan Opanchuk 2021-04-19 17:53:32 -07:00
parent b03c83eb93
commit 65d32fd63e
11 changed files with 146 additions and 64 deletions

View File

@ -39,6 +39,9 @@ Intermediate objects
:special-members: __eq__, __hash__
:show-inheritance:
.. autoclass:: VerifiedKeyFrag()
:special-members: __eq__, __hash__
.. autoclass:: CapsuleFrag()
:members:
:special-members: __eq__, __hash__
@ -63,6 +66,9 @@ Utilities
.. autoclass:: umbral.GenericError
:show-inheritance:
.. autoclass:: umbral.VerificationError
:show-inheritance:
.. autoclass:: umbral.serializable.Serializable
:members: from_bytes
:special-members: __bytes__

View File

@ -17,6 +17,17 @@ def bobs_keys():
return sk, pk
@pytest.fixture
def verification_keys(alices_keys, bobs_keys):
delegating_sk, signing_sk = alices_keys
_receiving_sk, receiving_pk = bobs_keys
verifying_pk = PublicKey.from_secret_key(signing_sk)
delegating_pk = PublicKey.from_secret_key(delegating_sk)
return verifying_pk, delegating_pk, receiving_pk
@pytest.fixture
def kfrags(alices_keys, bobs_keys):
delegating_sk, signing_sk = alices_keys

View File

@ -116,9 +116,9 @@ def _verify_kfrags(umbral, kfrags_bytes, verifying_pk_bytes, delegating_pk_bytes
verifying_pk = umbral.PublicKey.from_bytes(verifying_pk_bytes)
delegating_pk = umbral.PublicKey.from_bytes(delegating_pk_bytes)
receiving_pk = umbral.PublicKey.from_bytes(receiving_pk_bytes)
assert all(kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk) for kfrag in kfrags)
return [kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk) for kfrag in kfrags]
def test_kfrags(implementations):
@ -142,10 +142,12 @@ def test_kfrags(implementations):
_verify_kfrags(umbral2, kfrags_bytes, verifying_pk_bytes, delegating_pk_bytes, receiving_pk_bytes)
def _reencrypt(umbral, capsule_bytes, kfrags_bytes, threshold, metadata):
def _reencrypt(umbral, verifying_pk_bytes, delegating_pk_bytes, receiving_pk_bytes,
capsule_bytes, kfrags_bytes, threshold, metadata):
capsule = umbral.Capsule.from_bytes(bytes(capsule_bytes))
kfrags = [umbral.KeyFrag.from_bytes(kfrag_bytes) for kfrag_bytes in kfrags_bytes]
cfrags = [umbral.reencrypt(capsule, kfrag, metadata=metadata) for kfrag in kfrags[:threshold]]
verified_kfrags = _verify_kfrags(umbral, kfrags_bytes,
verifying_pk_bytes, delegating_pk_bytes, receiving_pk_bytes)
cfrags = [umbral.reencrypt(capsule, kfrag, metadata=metadata) for kfrag in verified_kfrags[:threshold]]
return [bytes(cfrag) for cfrag in cfrags]
@ -200,7 +202,8 @@ def test_reencrypt(implementations):
# On client 2
cfrags_bytes = _reencrypt(umbral2, capsule_bytes, kfrags_bytes, threshold, metadata)
cfrags_bytes = _reencrypt(umbral2, verifying_pk_bytes, delegating_pk_bytes, receiving_pk_bytes,
capsule_bytes, kfrags_bytes, threshold, metadata)
# On client 1

View File

@ -1,54 +1,48 @@
import pytest
from umbral import KeyFrag, PublicKey, Signer, generate_kfrags
from umbral import KeyFrag, PublicKey, Signer, generate_kfrags, VerificationError
from umbral.key_frag import KeyFragID
from umbral.curve_scalar import CurveScalar
def test_kfrag_serialization(alices_keys, bobs_keys, kfrags):
def test_kfrag_serialization(verification_keys, kfrags):
delegating_sk, signing_sk = alices_keys
_receiving_sk, receiving_pk = bobs_keys
verifying_pk = PublicKey.from_secret_key(signing_sk)
delegating_pk = PublicKey.from_secret_key(delegating_sk)
verifying_pk, delegating_pk, receiving_pk = verification_keys
for kfrag in kfrags:
kfrag_bytes = bytes(kfrag)
new_kfrag = KeyFrag.from_bytes(kfrag_bytes)
assert new_kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
new_kfrag = new_kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
assert new_kfrag == kfrag
def test_kfrag_verification(alices_keys, bobs_keys, kfrags):
def test_kfrag_verification(verification_keys, kfrags):
delegating_sk, signing_sk = alices_keys
_receiving_sk, receiving_pk = bobs_keys
verifying_pk = PublicKey.from_secret_key(signing_sk)
delegating_pk = PublicKey.from_secret_key(delegating_sk)
verifying_pk, delegating_pk, receiving_pk = verification_keys
# Wrong signature
kfrag = kfrags[0]
kfrag.id = KeyFragID.random()
kfrag.kfrag.id = KeyFragID.random()
kfrag_bytes = bytes(kfrag)
new_kfrag = KeyFrag.from_bytes(kfrag_bytes)
assert not new_kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
with pytest.raises(VerificationError):
new_kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
# Wrong key
kfrag = kfrags[1]
kfrag.key = CurveScalar.random_nonzero()
kfrag.kfrag.key = CurveScalar.random_nonzero()
kfrag_bytes = bytes(kfrag)
new_kfrag = KeyFrag.from_bytes(kfrag_bytes)
assert not new_kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
with pytest.raises(VerificationError):
new_kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
@pytest.mark.parametrize('sign_delegating_key',
@ -84,23 +78,41 @@ def test_kfrag_signing(alices_keys, bobs_keys, sign_delegating_key, sign_receivi
receiving_key_ok = (not sign_receiving_key) or pass_receiving_key
should_verify = delegating_key_ok and receiving_key_ok
result = kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk if pass_delegating_key else None,
receiving_pk=receiving_pk if pass_receiving_key else None)
verification_passed = True
try:
kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk if pass_delegating_key else None,
receiving_pk=receiving_pk if pass_receiving_key else None)
except VerificationError:
verification_passed = False
assert result == should_verify
assert verification_passed == should_verify
def test_kfrag_is_hashable(kfrags):
def test_kfrag_is_hashable(verification_keys, kfrags):
verifying_pk, delegating_pk, receiving_pk = verification_keys
assert hash(kfrags[0]) != hash(kfrags[1])
new_kfrag = KeyFrag.from_bytes(bytes(kfrags[0]))
assert hash(new_kfrag) == hash(kfrags[0])
# Not verified yet
assert hash(new_kfrag) != hash(kfrags[0])
verified_kfrag = new_kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
assert hash(verified_kfrag) == hash(kfrags[0])
def test_kfrag_str(kfrags):
s = str(kfrags[0])
assert "VerifiedKeyFrag" in s
s = str(KeyFrag.from_bytes(bytes(kfrags[0])))
assert "VerifiedKeyFrag" not in s
assert "KeyFrag" in s

View File

@ -5,6 +5,7 @@ from umbral import (
PublicKey,
Signer,
GenericError,
KeyFrag,
encrypt,
generate_kfrags,
decrypt_original,
@ -78,10 +79,6 @@ def test_simple_api(num_kfrags, threshold):
# Bob requests re-encryption to some set of M ursulas
cfrags = list()
for kfrag in kfrags[:threshold]:
# Ursula checks that the received kfrag is valid
assert kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
# Re-encryption by an Ursula
cfrag = reencrypt(capsule, kfrag)
@ -105,3 +102,9 @@ def test_simple_api(num_kfrags, threshold):
)
assert plaintext_reenc == plaintext
def test_reencrypt_unverified_kfrag(capsule, kfrags):
kfrag = KeyFrag.from_bytes(bytes(kfrags[0]))
with pytest.raises(TypeError):
reencrypt(capsule, kfrag)

View File

@ -150,12 +150,11 @@ def test_cfrags():
metadata = bytes.fromhex(vector_suite['metadata'])
for kfrag, cfrag in kfrags_n_cfrags:
assert kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk), \
'Invalid KeyFrag {}'.format(bytes(kfrag.to_bytes).hex())
verified_kfrag = kfrag.verify(verifying_pk=verifying_pk,
delegating_pk=delegating_pk,
receiving_pk=receiving_pk)
new_cfrag = reencrypt(capsule, kfrag, metadata=metadata)
new_cfrag = reencrypt(capsule, verified_kfrag, metadata=metadata)
assert new_cfrag.point_e1 == cfrag.point_e1
assert new_cfrag.point_v1 == cfrag.point_v1
assert new_cfrag.kfrag_id == cfrag.kfrag_id

View File

@ -4,8 +4,8 @@ from .__about__ import (
from .capsule import Capsule
from .capsule_frag import CapsuleFrag
from .errors import GenericError
from .key_frag import KeyFrag, generate_kfrags
from .errors import GenericError, VerificationError
from .key_frag import KeyFrag, VerifiedKeyFrag, generate_kfrags
from .keys import SecretKey, PublicKey, SecretKeyFactory
from .pre import encrypt, decrypt_original, decrypt_reencrypted, reencrypt
from .signing import Signature, Signer
@ -26,8 +26,10 @@ __all__ = [
"Signer",
"Capsule",
"KeyFrag",
"VerifiedKeyFrag",
"CapsuleFrag",
"GenericError",
"VerificationError",
"encrypt",
"decrypt_original",
"generate_kfrags",

View File

@ -3,3 +3,9 @@ class GenericError(Exception):
An interal Umbral error, see the message for details.
"""
pass
class VerificationError(GenericError):
"""
Integrity of the data cannot be verified, see the message for details.
"""

View File

@ -3,6 +3,7 @@ from typing import Tuple, List, Optional
from .curve_point import CurvePoint
from .curve_scalar import CurveScalar
from .errors import VerificationError
from .hashing import hash_to_shared_secret, kfrag_signature_message, hash_to_polynomial_arg
from .keys import PublicKey, SecretKey
from .params import PARAMETERS
@ -200,7 +201,7 @@ class KeyFrag(Serializable):
verifying_pk: PublicKey,
delegating_pk: Optional[PublicKey] = None,
receiving_pk: Optional[PublicKey] = None,
) -> bool:
) -> 'VerifiedKeyFrag':
"""
Verifies the validity of this fragment.
@ -215,16 +216,16 @@ class KeyFrag(Serializable):
commitment = self.proof.commitment
precursor = self.precursor
# We check that the commitment is well-formed
if commitment != u * key:
return False
raise VerificationError("Invalid kfrag commitment")
# A shortcut, perhaps not necessary
delegating_key_missing = self.proof.delegating_key_signed and not bool(delegating_pk)
receiving_key_missing = self.proof.receiving_key_signed and not bool(receiving_pk)
if self.proof.delegating_key_signed and not bool(delegating_pk):
raise VerificationError("A signature of a delegating key was included in this kfrag, "
"but the key is not provided")
if delegating_key_missing or receiving_key_missing:
return False
if self.proof.receiving_key_signed and not bool(receiving_pk):
raise VerificationError("A signature of a receiving key was included in this kfrag, "
"but the key is not provided")
delegating_pk = delegating_pk if self.proof.delegating_key_signed else None
receiving_pk = receiving_pk if self.proof.receiving_key_signed else None
@ -233,7 +234,33 @@ class KeyFrag(Serializable):
precursor=precursor,
maybe_delegating_pk=delegating_pk,
maybe_receiving_pk=receiving_pk)
return self.proof.signature_for_proxy.verify(verifying_pk, kfrag_message)
if not self.proof.signature_for_proxy.verify(verifying_pk, kfrag_message):
raise VerificationError("Failed to verify the kfrag signature")
return VerifiedKeyFrag(self)
class VerifiedKeyFrag:
"""
Verified kfrag, good for reencryption.
Can be cast to ``bytes``, but cannot be deserialized from bytes directly.
It can only be obtained from :py:meth:`KeyFrag.verify`.
"""
def __init__(self, kfrag: KeyFrag):
self.kfrag = kfrag
def __bytes__(self):
return bytes(self.kfrag)
def __eq__(self, other):
return self.kfrag == other.kfrag
def __hash__(self):
return hash((self.__class__, bytes(self)))
def __str__(self):
return f"{self.__class__.__name__}:{bytes(self).hex()[:16]}"
class KeyFragBase:
@ -291,7 +318,7 @@ def generate_kfrags(delegating_sk: SecretKey,
num_kfrags: int,
sign_delegating_key: bool = True,
sign_receiving_key: bool = True,
) -> List[KeyFrag]:
) -> List[VerifiedKeyFrag]:
"""
Generates ``num_kfrags`` key fragments to pass to proxies for re-encryption.
At least ``threshold`` of them will be needed for decryption.
@ -305,4 +332,8 @@ def generate_kfrags(delegating_sk: SecretKey,
if num_kfrags < threshold:
raise ValueError(f"Creating less kfrags ({num_kfrags}) than threshold ({threshold}) makes them useless")
return [KeyFrag.from_base(base, sign_delegating_key, sign_receiving_key) for _ in range(num_kfrags)]
kfrags = [KeyFrag.from_base(base, sign_delegating_key, sign_receiving_key)
for _ in range(num_kfrags)]
# Make them verified - we know they're good.
return [VerifiedKeyFrag(kfrag) for kfrag in kfrags]

View File

@ -4,7 +4,7 @@ from .capsule import Capsule
from .capsule_frag import CapsuleFrag
from .dem import DEM
from .keys import PublicKey, SecretKey
from .key_frag import KeyFrag
from .key_frag import VerifiedKeyFrag, KeyFrag
def encrypt(pk: PublicKey, plaintext: bytes) -> Tuple[Capsule, bytes]:
@ -30,7 +30,10 @@ def decrypt_original(sk: SecretKey, capsule: Capsule, ciphertext: bytes) -> byte
return dem.decrypt(ciphertext, authenticated_data=bytes(capsule))
def reencrypt(capsule: Capsule, kfrag: KeyFrag, metadata: Optional[bytes] = None) -> CapsuleFrag:
def reencrypt(capsule: Capsule,
kfrag: VerifiedKeyFrag,
metadata: Optional[bytes] = None
) -> CapsuleFrag:
"""
Creates a capsule fragment using the given key fragment.
Capsule fragments can later be used to decrypt the ciphertext.
@ -38,7 +41,12 @@ def reencrypt(capsule: Capsule, kfrag: KeyFrag, metadata: Optional[bytes] = None
If `metadata` is provided, it will have to be used for verification in
:py:meth:`CapsuleFrag.verify`.
"""
return CapsuleFrag.reencrypted(capsule, kfrag, metadata)
# We could let duck typing do its work,
# but it's better to make a common error more understandable.
if isinstance(kfrag, KeyFrag) and not isinstance(kfrag, VerifiedKeyFrag):
raise TypeError("KeyFrag must be verified before reencryption")
return CapsuleFrag.reencrypted(capsule, kfrag.kfrag, metadata)
def decrypt_reencrypted(decrypting_sk: SecretKey,

View File

@ -1,7 +1,7 @@
import json
import os
from umbral import SecretKey, PublicKey, Signer, encrypt, generate_kfrags, reencrypt
from umbral import SecretKey, PublicKey, Signer, KeyFrag, encrypt, generate_kfrags, reencrypt
from umbral.curve_scalar import CurveScalar
from umbral.curve_point import CurvePoint
from umbral.hashing import Hash, unsafe_hash_to_point
@ -207,7 +207,8 @@ create_test_vector_file(vector_suite, 'vectors_unsafe_hash_to_point.json', gener
vectors = list()
for kfrag in kfrags:
assert kfrag.verify(verifying_pk, delegating_pk, receiving_pk)
kfrag = KeyFrag.from_bytes(bytes(kfrag))
kfrag.verify(verifying_pk, delegating_pk, receiving_pk)
json_input = {'kfrag': hexlify(kfrag)}