Extract Identity entities

pull/2115/head
Kieran R. Prasch 2020-06-23 19:03:49 -07:00
parent ca28cd0288
commit 1580c5f89c
No known key found for this signature in database
GPG Key ID: FDC3146ED25617D8
5 changed files with 311 additions and 301 deletions

View File

@ -182,7 +182,7 @@ class Alice(Character, BlockchainPolicyAuthor):
self.revocation_kits = dict()
def get_card(self) -> 'Card':
from nucypher.policy.collections import Card
from nucypher.policy.identity import Card
card = Card.from_character(self)
return card
@ -515,7 +515,7 @@ class Bob(Character):
return treasure_map
def get_card(self) -> 'Card':
from nucypher.policy.collections import Card
from nucypher.policy.identity import Card
card = Card.from_character(self)
return card

View File

@ -15,53 +15,33 @@ You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import json
from collections import OrderedDict
import base64
import hashlib
import maya
import os
from bytestring_splitter import BytestringKwargifier
from bytestring_splitter import (
BytestringSplitter,
BytestringSplittingError,
VariableLengthBytestring,
BytestringKwargifier
)
from constant_sorrow.constants import (
ALICE_CARD,
BOB_CARD,
URSULA_CARD
VariableLengthBytestring
)
from constant_sorrow.constants import CFRAG_NOT_RETAINED, NO_DECRYPTION_PERFORMED
from constant_sorrow.constants import NOT_SIGNED
from cryptography.hazmat.backends.openssl import backend
from cryptography.hazmat.primitives import hashes
from eth_utils import to_canonical_address, to_checksum_address
from hexbytes.main import HexBytes
from pathlib import Path
from typing import Dict
from typing import List, Union
from typing import Optional, Tuple, Callable, Type
from typing import Optional, Tuple
from umbral.config import default_params
from umbral.curvebn import CurveBN
from umbral.keys import UmbralPublicKey
from umbral.pre import Capsule
from nucypher.blockchain.eth.constants import ETH_ADDRESS_BYTE_LENGTH, ETH_HASH_BYTE_LENGTH
from nucypher.characters.lawful import Bob, Character, Alice
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
from nucypher.characters.lawful import Bob, Character
from nucypher.crypto.api import encrypt_and_sign, keccak_digest
from nucypher.crypto.api import verify_eip_191
from nucypher.crypto.constants import HRAC_LENGTH
from nucypher.crypto.kits import UmbralMessageKit
from nucypher.crypto.powers import DecryptingPower, SigningPower
from nucypher.crypto.signing import (
InvalidSignature,
Signature,
signature_splitter,
SignatureStamp
)
from nucypher.crypto.signing import InvalidSignature, Signature, signature_splitter, SignatureStamp
from nucypher.crypto.splitters import capsule_splitter, key_splitter
from nucypher.crypto.splitters import cfrag_splitter
from nucypher.crypto.utils import (
@ -297,276 +277,6 @@ class SignedTreasureMap(TreasureMap):
"Can't cast a DecentralizedTreasureMap to bytes until it has a blockchain signature (otherwise, is it really a 'DecentralizedTreasureMap'?")
return self._blockchain_signature + super().__bytes__()
class Card:
""""
A simple serializable representation of a character's public materials.
"""
_specification = dict(
character_flag=(bytes, 8),
verifying_key=(bytes, 33),
encrypting_key=(bytes, 33),
nickname=VariableLengthBytestring
)
__FLAGS = {
bytes(ALICE_CARD): Alice,
bytes(BOB_CARD): Bob,
# bytes(URSULA_CARD): Ursula # TODO: Consider an Ursula card
}
__FILE_EXTENSION = 'card'
CARD_DIR = Path(DEFAULT_CONFIG_ROOT) / 'cards'
class UnknownCard(Exception):
"""raised when a card cannot be found in storage"""
def __init__(self,
character_flag: Union[ALICE_CARD, BOB_CARD, URSULA_CARD],
verifying_key: UmbralPublicKey,
encrypting_key: Optional[UmbralPublicKey] = None,
card_dir: Path = CARD_DIR,
nickname: bytes = None):
self.card_dir = card_dir
if not self.card_dir.exists():
os.mkdir(str(self.card_dir))
self.__verifying_key = verifying_key # signing public key
self.__encrypting_key = encrypting_key # public key
self.__character_flag = character_flag
self.__character_class = self.__FLAGS[character_flag]
self.__nickname = nickname
self.__validate()
def __repr__(self) -> str:
name = self.nickname or f'{self.__character_class.__name__}'
short_key = bytes(self.__verifying_key).hex()[:6]
r = f'{self.__class__.__name__}({name}:{short_key}:{self.id.hex()[:6]})'
return r
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
raise TypeError(f'Cannot compare {self.__class__.__name__} and {other}')
return self.id == other.id
def __validate(self) -> bool:
# TODO: Validate umbral keys?
return True
@staticmethod
def __checksum(payload: bytes) -> HexBytes:
blake = hashlib.blake2b()
blake.update(payload)
digest = blake.digest().hex()
return HexBytes(digest)
#
# Serializers
#
def __bytes__(self) -> bytes:
payload = self.__to_bytes()
if self.nickname:
payload += VariableLengthBytestring(self.__nickname)
return payload
def __hex__(self) -> str:
return self.to_hex()
def __to_bytes(self) -> bytes:
card_bytes = bytes()
card_bytes += bytes(self.__character_flag)
card_bytes += bytes(self.__verifying_key)
if self.__encrypting_key:
card_bytes += bytes(self.__encrypting_key)
return card_bytes
@classmethod
def from_bytes(cls, card_bytes: bytes) -> 'Card':
return BytestringKwargifier(cls, **cls._specification)(card_bytes)
@classmethod
def from_hex(cls, hexdata: str):
return cls.from_bytes(bytes.fromhex(hexdata))
def to_hex(self) -> str:
return bytes(self).hex()
@classmethod
def from_base64(cls, b64data: str):
return cls.from_bytes(base64.urlsafe_b64decode(b64data))
def to_base64(self) -> str:
return base64.urlsafe_b64encode(bytes(self)).decode()
def to_qr_code(self):
import qrcode
from qrcode.main import QRCode
qr = QRCode(
version=1,
box_size=1,
border=4, # min spec is 4
error_correction=qrcode.constants.ERROR_CORRECT_L,
)
qr.add_data(bytes(self))
qr.print_ascii()
@classmethod
def from_dict(cls, card: Dict):
instance = cls(verifying_key=card['verifying_key'],
encrypting_key=card['encrypting_key'],
character_flag=card['character'])
return instance
def to_dict(self) -> Dict:
payload = dict(
verifying_key=self.verifying_key,
encrypting_key=self.encrypting_key,
character=self.__character_flag
)
return payload
@classmethod
def from_character(cls, character: Type[Character]) -> 'Card':
for flag, character_class in cls.__FLAGS.items():
if character_class is character.__class__:
break
else:
raise ValueError('Unknown character flag')
instance = cls(verifying_key=character.public_keys(power_up_class=SigningPower),
encrypting_key=character.public_keys(power_up_class=DecryptingPower),
character_flag=flag)
return instance
#
# Card API
#
@property
def verifying_key(self) -> UmbralPublicKey:
return self.__verifying_key
@property
def encrypting_key(self) -> UmbralPublicKey:
return self.__encrypting_key
@property
def id(self) -> HexBytes:
return self.__checksum(self.__to_bytes())
@property
def nickname(self) -> str:
if self.__nickname:
return self.__nickname.decode()
def set_nickname(self, nickname: str) -> None:
self.__nickname = nickname.encode()
@nickname.setter
def nickname(self, nickname: str):
self.set_nickname(nickname)
#
# Card Storage API
#
@property
def is_saved(self) -> bool:
filename = f'{self.id.hex()}.{self.__FILE_EXTENSION}'
filepath = self.card_dir / filename
exists = filepath.exists()
return exists
def save(self, encoder: Callable = base64.b64encode) -> Path:
filename = f'{self.id.hex()}.{self.__FILE_EXTENSION}'
filepath = self.card_dir / filename
with open(str(filepath), 'w') as file:
file.write(encoder(bytes(self)))
return Path(filepath)
@classmethod
def load(cls,
checksum: str,
card_dir: Path = CARD_DIR,
decoder: Callable = base64.b64decode
) -> 'Card':
filename = f'{checksum}.{cls.__FILE_EXTENSION}'
filepath = card_dir / filename
try:
with open(str(filepath), 'rb') as file:
card_bytes = decoder(file.read())
except FileNotFoundError:
raise cls.UnknownCard
instance = cls.from_bytes(card_bytes)
return instance
class PolicyCredential:
"""
A portable structure that contains information necessary for Alice or Bob
to utilize the policy on the network that the credential describes.
"""
def __init__(self, alice_verifying_key, label, expiration, policy_pubkey,
treasure_map=None):
self.alice_verifying_key = alice_verifying_key
self.label = label
self.expiration = expiration
self.policy_pubkey = policy_pubkey
self.treasure_map = treasure_map
def to_json(self):
"""
Serializes the PolicyCredential to JSON.
"""
cred_dict = {
'alice_verifying_key': bytes(self.alice_verifying_key).hex(),
'label': self.label.hex(),
'expiration': self.expiration.iso8601(),
'policy_pubkey': bytes(self.policy_pubkey).hex()
}
if self.treasure_map is not None:
cred_dict['treasure_map'] = bytes(self.treasure_map).hex()
return json.dumps(cred_dict)
@classmethod
def from_json(cls, data: str, federated=False):
"""
Deserializes the PolicyCredential from JSON.
"""
cred_json = json.loads(data)
alice_verifying_key = UmbralPublicKey.from_bytes(
cred_json['alice_verifying_key'],
decoder=bytes().fromhex)
label = bytes().fromhex(cred_json['label'])
expiration = maya.MayaDT.from_iso8601(cred_json['expiration'])
policy_pubkey = UmbralPublicKey.from_bytes(
cred_json['policy_pubkey'],
decoder=bytes().fromhex)
treasure_map = None
if 'treasure_map' in cred_json:
if federated: # I know know. TODO: WTF. 466 and just... you know... whatever.
_MapClass = TreasureMap
else:
_MapClass = SignedTreasureMap
treasure_map = _MapClass.from_bytes(
bytes().fromhex(cred_json['treasure_map']))
return cls(alice_verifying_key, label, expiration, policy_pubkey,
treasure_map)
def __eq__(self, other):
return ((self.alice_verifying_key == other.alice_verifying_key) and
(self.label == other.label) and
(self.expiration == other.expiration) and
(self.policy_pubkey == other.policy_pubkey))
class WorkOrder:
class PRETask:

298
nucypher/policy/identity.py Normal file
View File

@ -0,0 +1,298 @@
"""
This file is part of nucypher.
nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with nucypher. If not, see <https://www.gnu.org/licenses/>.
"""
import base64
import json
import hashlib
import os
from pathlib import Path
from typing import Union, Optional, Dict, Type, Callable
from hexbytes.main import HexBytes
from maya import __init__
from umbral.keys import UmbralPublicKey
from nucypher.characters.base import Character
from nucypher.characters.lawful import Alice, Bob
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
from nucypher.crypto.powers import SigningPower, DecryptingPower
from nucypher.policy.collections import TreasureMap
class Card:
""""
A simple serializable representation of a character's public materials.
"""
_specification = dict(
character_flag=(bytes, 8),
verifying_key=(bytes, 33),
encrypting_key=(bytes, 33),
nickname=VariableLengthBytestring
)
__FLAGS = {
bytes(ALICE_CARD): Alice,
bytes(BOB_CARD): Bob,
# bytes(URSULA_CARD): Ursula # TODO: Consider an Ursula card
}
__FILE_EXTENSION = 'card'
CARD_DIR = Path(DEFAULT_CONFIG_ROOT) / 'cards'
class UnknownCard(Exception):
"""raised when a card cannot be found in storage"""
def __init__(self,
character_flag: Union[ALICE_CARD, BOB_CARD, URSULA_CARD],
verifying_key: UmbralPublicKey,
encrypting_key: Optional[UmbralPublicKey] = None,
card_dir: Path = CARD_DIR,
nickname: bytes = None):
self.card_dir = card_dir
if not self.card_dir.exists():
os.mkdir(str(self.card_dir))
self.__verifying_key = verifying_key # signing public key
self.__encrypting_key = encrypting_key # public key
self.__character_flag = character_flag
self.__character_class = self.__FLAGS[character_flag]
self.__nickname = nickname
self.__validate()
def __repr__(self) -> str:
name = self.nickname or f'{self.__character_class.__name__}'
short_key = bytes(self.__verifying_key).hex()[:6]
r = f'{self.__class__.__name__}({name}:{short_key}:{self.id.hex()[:6]})'
return r
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
raise TypeError(f'Cannot compare {self.__class__.__name__} and {other}')
return self.id == other.id
def __validate(self) -> bool:
# TODO: Validate umbral keys?
return True
@staticmethod
def __checksum(payload: bytes) -> HexBytes:
blake = hashlib.blake2b()
blake.update(payload)
digest = blake.digest().hex()
return HexBytes(digest)
#
# Serializers
#
def __bytes__(self) -> bytes:
payload = self.__to_bytes()
if self.nickname:
payload += VariableLengthBytestring(self.__nickname)
return payload
def __hex__(self) -> str:
return self.to_hex()
def __to_bytes(self) -> bytes:
card_bytes = bytes()
card_bytes += bytes(self.__character_flag)
card_bytes += bytes(self.__verifying_key)
if self.__encrypting_key:
card_bytes += bytes(self.__encrypting_key)
return card_bytes
@classmethod
def from_bytes(cls, card_bytes: bytes) -> 'Card':
return BytestringKwargifier(cls, **cls._specification)(card_bytes)
@classmethod
def from_hex(cls, hexdata: str):
return cls.from_bytes(bytes.fromhex(hexdata))
def to_hex(self) -> str:
return bytes(self).hex()
@classmethod
def from_base64(cls, b64data: str):
return cls.from_bytes(base64.urlsafe_b64decode(b64data))
def to_base64(self) -> str:
return base64.urlsafe_b64encode(bytes(self)).decode()
def to_qr_code(self):
import qrcode
from qrcode.main import QRCode
qr = QRCode(
version=1,
box_size=1,
border=4, # min spec is 4
error_correction=qrcode.constants.ERROR_CORRECT_L,
)
qr.add_data(bytes(self))
qr.print_ascii()
@classmethod
def from_dict(cls, card: Dict):
instance = cls(verifying_key=card['verifying_key'],
encrypting_key=card['encrypting_key'],
character_flag=card['character'])
return instance
def to_dict(self) -> Dict:
payload = dict(
verifying_key=self.verifying_key,
encrypting_key=self.encrypting_key,
character=self.__character_flag
)
return payload
@classmethod
def from_character(cls, character: Type[Character]) -> 'Card':
for flag, character_class in cls.__FLAGS.items():
if character_class is character.__class__:
break
else:
raise ValueError('Unknown character flag')
instance = cls(verifying_key=character.public_keys(power_up_class=SigningPower),
encrypting_key=character.public_keys(power_up_class=DecryptingPower),
character_flag=flag)
return instance
#
# Card API
#
@property
def verifying_key(self) -> UmbralPublicKey:
return self.__verifying_key
@property
def encrypting_key(self) -> UmbralPublicKey:
return self.__encrypting_key
@property
def id(self) -> HexBytes:
return self.__checksum(self.__to_bytes())
@property
def nickname(self) -> str:
if self.__nickname:
return self.__nickname.decode()
def set_nickname(self, nickname: str) -> None:
self.__nickname = nickname.encode()
@nickname.setter
def nickname(self, nickname: str):
self.set_nickname(nickname)
#
# Card Storage API
#
@property
def is_saved(self) -> bool:
filename = f'{self.id.hex()}.{self.__FILE_EXTENSION}'
filepath = self.card_dir / filename
exists = filepath.exists()
return exists
def save(self, encoder: Callable = base64.b64encode) -> Path:
filename = f'{self.id.hex()}.{self.__FILE_EXTENSION}'
filepath = self.card_dir / filename
with open(str(filepath), 'w') as file:
file.write(encoder(bytes(self)))
return Path(filepath)
@classmethod
def load(cls,
checksum: str,
card_dir: Path = CARD_DIR,
decoder: Callable = base64.b64decode
) -> 'Card':
filename = f'{checksum}.{cls.__FILE_EXTENSION}'
filepath = card_dir / filename
try:
with open(str(filepath), 'rb') as file:
card_bytes = decoder(file.read())
except FileNotFoundError:
raise cls.UnknownCard
instance = cls.from_bytes(card_bytes)
return instance
class PolicyCredential:
"""
A portable structure that contains information necessary for Alice or Bob
to utilize the policy on the network that the credential describes.
"""
def __init__(self, alice_verifying_key, label, expiration, policy_pubkey,
treasure_map=None):
self.alice_verifying_key = alice_verifying_key
self.label = label
self.expiration = expiration
self.policy_pubkey = policy_pubkey
self.treasure_map = treasure_map
def to_json(self):
"""
Serializes the PolicyCredential to JSON.
"""
cred_dict = {
'alice_verifying_key': bytes(self.alice_verifying_key).hex(),
'label': self.label.hex(),
'expiration': self.expiration.iso8601(),
'policy_pubkey': bytes(self.policy_pubkey).hex()
}
if self.treasure_map is not None:
cred_dict['treasure_map'] = bytes(self.treasure_map).hex()
return json.dumps(cred_dict)
@classmethod
def from_json(cls, data: str):
"""
Deserializes the PolicyCredential from JSON.
"""
cred_json = json.loads(data)
alice_verifying_key = UmbralPublicKey.from_bytes(
cred_json['alice_verifying_key'],
decoder=bytes().fromhex)
label = bytes().fromhex(cred_json['label'])
expiration = maya.MayaDT.from_iso8601(cred_json['expiration'])
policy_pubkey = UmbralPublicKey.from_bytes(
cred_json['policy_pubkey'],
decoder=bytes().fromhex)
treasure_map = None
if 'treasure_map' in cred_json:
treasure_map = TreasureMap.from_bytes(
bytes().fromhex(cred_json['treasure_map']))
return cls(alice_verifying_key, label, expiration, policy_pubkey,
treasure_map)
def __eq__(self, other):
return ((self.alice_verifying_key == other.alice_verifying_key) and
(self.label == other.label) and
(self.expiration == other.expiration) and
(self.policy_pubkey == other.policy_pubkey))

View File

@ -438,7 +438,7 @@ class Policy(ABC):
Alice or Bob. By default, it will include the treasure_map for the
policy unless `with_treasure_map` is False.
"""
from nucypher.policy.collections import PolicyCredential
from nucypher.policy.identity import PolicyCredential
treasure_map = self.treasure_map
if not with_treasure_map:
treasure_map = None

View File

@ -18,11 +18,13 @@ along with nucypher. If not, see <https://www.gnu.org/licenses/>.
import datetime
import maya
import pytest
from umbral.kfrags import KFrag
from nucypher.crypto.api import keccak_digest
from nucypher.datastore.models import PolicyArrangement, TreasureMap as DatastoreTreasureMap
from nucypher.policy.collections import PolicyCredential, SignedTreasureMap as DecentralizedTreasureMap
from nucypher.datastore.models import PolicyArrangement
from nucypher.datastore.models import TreasureMap as DatastoreTreasureMap
from nucypher.policy.collections import PolicyCredential
from nucypher.policy.collections import SignedTreasureMap as DecentralizedTreasureMap
from nucypher.policy.identity import PolicyCredential
from tests.utils.middleware import MockRestMiddleware