Merge pull request #42 from tuxxy/header-object

[WIP] Add Header object and refactors EncryptedFile and Client
pull/54/head
Tux 2017-09-25 11:01:26 -06:00 committed by GitHub
commit 06a986c0a4
11 changed files with 380 additions and 238 deletions

View File

@ -1,7 +1,8 @@
import sha3
import msgpack
from nacl import utils
from nkms.network import dummy
from nkms.crypto.keyring import KeyRing
from nkms.crypto.storage import EncryptedFile, Header
from nkms.crypto import (default_algorithm, pre_from_algorithm,
symmetric_from_algorithm)
from io import BytesIO
@ -37,23 +38,9 @@ class Client(object):
self._pre = pre_from_algorithm(default_algorithm)
self._symm = symmetric_from_algorithm(default_algorithm)
# TODO: Check for existing keypair before generation
# TODO: Load existing keys into the KeyRing
# TODO: Save newly generated keypair
self._priv_key = self._pre.gen_priv(dtype='bytes')
self._pub_key = self._pre.priv2pub(self._priv_key)
def _derive_path_key(self, path, is_pub=True):
"""
Derives a public key for the specific path.
:param bytes path: Path to generate key for.
:param bool is_pub: Is the derived key a public key?
:return: Derived key
:rtype: bytes
"""
key = sha3.keccak_256(self._priv_key + path).digest()
return self._pre.priv2pub(key) if is_pub else key
self.keyring = KeyRing()
def _split_path(self, path):
"""
@ -71,43 +58,6 @@ class Client(object):
dirs = path.split(b'/')
return [b'/'.join(dirs[:i + 1]) for i in range(len(dirs))]
def _build_header(self, enc_keys, version=100):
"""
Creates a NuCypher header for the encrypted file.
:param enc_keys: List of encrypted keys in bytes
:param version: Version number of Cryptographic API (default: 0.1.0.0)
:return: Complete header msgpack encoded and length of raw header
:rtype: Tuple of the header and the header length e.g: (<header>, 1200)
"""
if version < 1000:
vers_bytes = version.to_bytes(4, byteorder='big')
num_keys_bytes = len(enc_keys).to_bytes(4, byteorder='big')
keys = b''.join(enc_keys)
header = msgpack.dumps(vers_bytes + num_keys_bytes + keys)
return (header, len(header))
def _read_header(self, header):
"""
Reads a NuCypher header.
:param header: Msgpack encoded header to read
:return: Version number, and list of encrypted keys
:rtype: Tuple of an int and a list e.g: (100, [...])
"""
header = BytesIO(msgpack.loads(header))
vers_bytes = header.read(4)
version = int.from_bytes(vers_bytes, byteorder='big')
# Handle pre-alpha versions
if version < 1000:
num_keys_bytes = header.read(4)
num_keys = int.from_bytes(num_keys_bytes, byteorder='big')
enc_keys = [header.read(Client.KEY_LENGTH) for _ in range(num_keys)]
return (version, enc_keys)
def encrypt_key(self, key, pubkey=None, path=None, algorithm=None):
"""
Encrypt (symmetric) key material with our public key or the public key
@ -140,7 +90,7 @@ class Client(object):
enc_keys = []
subpaths = self._split_path(path)
for subpath in subpaths:
path_pubkey = self._derive_path_key(subpath)
path_pubkey = self.keyring.derive_path_key(subpath)
enc_keys.append(self.encrypt_key(key, pubkey=path_pubkey))
return enc_keys
elif not path:
@ -157,9 +107,9 @@ class Client(object):
:rtype: bytes
"""
if path is not None:
priv_key = self._derive_path_key(path, is_pub=False)
priv_key = self.keyring.derive_path_key(path, is_pub=False)
else:
priv_key = self._priv_key
priv_key = self.keyring.enc_privkey
return self._pre.decrypt(priv_key, enc_key)
def grant(self, pubkey, path=None, policy=None):
@ -192,36 +142,14 @@ class Client(object):
def list_permissions(self, pubkey=None, path=None):
pass
def encrypt_bulk(self, data, key, algorithm=None):
def open(self, file_path, header_path, pubkey=None, path=None):
"""
Encrypt bulk of the data with a symmetric cipher
Returns an EncryptedFile object from a file_path and header_path.
:param bytes data: Data to encrypt
:param bytes key: Symmetric key
:param str algorithm: Algorithm to use or None for default
:return: Encrypted data
:rtype: bytes
:param bytes file_path: Path of the encrypted file
:param bytes
"""
# TODO Handle algorithm
# Nonce is generated implicitly within cipher.encrypt as random data
cipher = self._symm(key)
return cipher.encrypt(data)
def decrypt_bulk(self, edata, key, algorithm=None):
"""
Decrypt bulk of the data with a symmetric cipher
:param bytes edata: Data to decrypt
:param bytes key: Symmetric key
:param str algorithm: Algorithm to use or None for default
:return: Plaintext data
:rtype: bytes
"""
# TODO Handle algorithm
cipher = self._symm(key)
return cipher.decrypt(edata)
pass
def open(self, pubkey=None, path=None, mode='rb', fd=None, algorithm=None):
"""
@ -242,11 +170,9 @@ class Client(object):
If pubkey is not set, we're working on our own files.
"""
file_path = fd or path
try:
with open(file_path, mode=mode) as f:
enc_data = f.read()
except Exception as E:
raise E
with open(file_path, mode=mode) as f:
enc_data = f.read()
return self.decrypt(enc_data, path=path)
def remove(self, pubkey=None, path=None):
@ -256,11 +182,12 @@ class Client(object):
"""
pass
def encrypt(self, data, path=None, algorithm=None):
def encrypt(self, data, key, path=None, algorithm=None):
"""
Encrypts data in a form ready to ship to the storage layer.
:param bytes data: Data to encrypt
:param bytes key: Data encryption key to use when encrypting
:param tuple(str) path: Path to the data (to be able to share
sub-paths). If None, encrypted with just our pubkey.
If contains only 1 element or is a string, this is just used as a
@ -271,9 +198,7 @@ class Client(object):
:return: Encrypted data
:rtype: bytes
"""
# Generate a secure key and encrypt the data
data_key = utils.random(32)
ciphertext = msgpack.dumps(self.encrypt_bulk(data, data_key))
ciphertext = msgpack.dumps(self.keyring.encrypt(data, data_key))
# Derive keys and encrypt them
# TODO: https://github.com/nucypher/nucypher-kms/issues/33
@ -281,13 +206,6 @@ class Client(object):
enc_keys = self.encrypt_key(data_key, path=path)
else:
enc_keys = [self.encrypt_key(data_key, path=path)]
# Build the header
header, header_length = self._build_header(enc_keys)
# Format for storage
header_length_bytes = header_length.to_bytes(4, byteorder='big')
storage_data = header_length_bytes + header + ciphertext
return storage_data
def decrypt(self, edata, path=None, owner=None):

View File

@ -39,6 +39,18 @@ class EncryptingKeypair(object):
"""
return self.pre.decrypt(self.priv_key, enc_data)
def rekey(self, pubkey):
"""
Generates a re-encryption key for the specified pubkey.
:param bytes pubkey: The public key of the recipient
:rtype: bytes
:return: Re-encryption key for the specified pubkey
"""
return self.pre.rekey(self.priv_key, pubkey)
class SigningKeypair(object):
def __init__(self, privkey_bytes=None):

View File

@ -1,6 +1,8 @@
import sha3
from nacl.utils import random
from nkms.crypto.keypairs import SigningKeypair, EncryptingKeypair
from nkms.crypto import (default_algorithm, pre_from_algorithm,
symmetric_from_algorithm)
class KeyRing(object):
@ -15,6 +17,23 @@ class KeyRing(object):
"""
self.sig_keypair = SigningKeypair(sig_privkey)
self.enc_keypair = EncryptingKeypair(enc_privkey)
self.pre = pre_from_algorithm(default_algorithm)
@property
def sig_pubkey(self):
return self.sig_keypair.pub_key
@property
def sig_privkey(self):
return self.sig_keypair.priv_key
@property
def enc_pubkey(self):
return self.enc_keypair.pub_key
@property
def enc_privkey(self):
return self.enc_keypair.priv_key
def sign(self, message):
"""
@ -80,3 +99,16 @@ class KeyRing(object):
:return: Secure random generated bytestring of <length> bytes
"""
return random(length)
def derive_path_key(self, path, is_pub=True):
"""
Derives a key for the specific path.
:param bytes path: Path to generate the key for
:param bool is_pub: Is the derived key a public key?
:rtype: bytes
:return: Derived key
"""
key = sha3.keccak_256(self.enc_privkey + path).digest()
return self.pre.priv2pub(key) if is_pub else key

View File

@ -1 +1,2 @@
from .encrypted_file import EncryptedFile
from .header import Header

View File

@ -0,0 +1,8 @@
# Number of random bytes to prefix before the counter
NONCE_RANDOM_PREFIX_SIZE = 20
# Size of the counter in bytes (4 = int)
NONCE_COUNTER_BYTE_SIZE = 4
# Length of padding from NaCl
PADDING_LENGTH = 16

View File

@ -1,114 +1,41 @@
import msgpack
import os
import io
from nacl.utils import random
import os
from nkms.storage.header import Header
from nkms.storage.constants import NONCE_COUNTER_BYTE_SIZE, PADDING_LENGTH
from nkms.crypto import default_algorithm, symmetric_from_algorithm
class EncryptedFile(object):
def __init__(self, key, path, mode='rb'):
def __init__(self, key, path, header_path):
"""
Creates an EncryptedFile object that allows the user to encrypt or
decrypt data into a file defined at `path`.
An EncryptedFile object actually is composed of two files:
1) The ciphertext -- This is the chunked and encrypted ciphertext
2) The header -- This contains the metadata of the ciphertext that
tells us how to decrypt it, or add more data.
:param bytes key: Symmetric key to use for encryption/decryption
:param string/bytes path: Path of file to open
:param string mode: Mode to use when opening file, default is 'rb'
:param bytes path: Path of ciphertext file to open
:param bytes header_path: Path of header file
"""
self.path = path
self.mode = mode
cipher = symmetric_from_algorithm(default_algorithm)
self.cipher = cipher(key)
def _build_header(self, version=100, nonce=None, keys=None,
chunk_size=1000000, num_chunks=0, msg_len=0):
"""
Builds a header and returns the msgpack encoded form of it.
# Opens the header file and parses it, if it exists. If not, creates it
self.header_path = header_path
self.header_obj = Header(self.header_path)
:param int version: Version of the NuCypher header
:param bytes nonce: Nonce to write to header, default is random(20)
:param list keys: Keys to write to header
:param int chunk_size: Size of each chunk in bytes, default is 1MB
:param int num_chunks: Number of chunks in ciphertext, default is 0
:param int msg_len: Length of the encrypted ciphertext in total
self.path = path
:return: (header_length, encoded_header)
:rtype: Tuple(int, bytes)
"""
if not nonce:
nonce = random(20)
# Always seek the beginning of the file on first open
self.file_obj = open(self.path, mode='a+b')
self.file_obj.seek(0)
self.header = {
'version': version,
'nonce': nonce,
'keys': keys,
'chunk_size': chunk_size,
'num_chunks': num_chunks,
'msg_len': msg_len,
}
try:
encoded_header = msgpack.dumps(self.header)
except ValueError as e:
raise e
self.header_length = len(encoded_header)
return (self.header_length, encoded_header)
def _encode_header(self):
"""
Returns a msgpack encoded header and the length of it in bytes ready to
be written to the file_obj.
:return: (encoded_header, header_length_bytes)
:rtype: Tuple(bytes, bytes)
"""
header_length_bytes = self.header_length.to_bytes(4, byteorder='big')
encoded_header = msgpack.dumps(self.header)
return (encoded_header, header_length_bytes)
def _update_header(self, header):
"""
Updates the self.header with the key/values in header, then updates
the header length.
:param dict header: Dict to update self.header with
:return: (header_length, encoded_header)
:rtype: Tuple(int, bytes)
"""
self.header.update(header)
try:
encoded_header = msgpack.dumps(self.header)
except ValueError as e:
raise e
self.header_length = len(encoded_header)
return (self.header_length, encoded_header)
def _read_header(self):
"""
Reads the header from the self.file_obj.
"""
try:
# Read last four bytes (header length) of file.
self.file_obj.seek(-4, os.SEEK_END)
# The first four bytes of the file are the header length
self.header_length = int.from_bytes(
self.file_obj.read(4), byteorder='big')
# Seek to the beginning of the header and read it
self.file_obj.seek(-(self.header_length + 4), os.SEEK_END)
self.header = msgpack.loads(self.file_obj.read(self.header_length))
except ValueError as e:
self.file_obj.seek(0)
raise e
else:
# Seek to the end of the ciphertext
self.file_obj.seek(-(self.header_length + 4), os.SEEK_END)
@property
def header(self):
return self.header_obj.header
def _read_chunk(self, chunk_size, nonce):
"""
@ -120,51 +47,28 @@ class EncryptedFile(object):
:return: Decrypted/Authenticated chunk
:rtype: Bytes
"""
ciphertext = self.file_obj.read(chunk_size)
ciphertext = self.file_obj.read(chunk_size + PADDING_LENGTH)
return self.cipher.decrypt(ciphertext, nonce=nonce)
def open_new(self, keys, chunk_size=1000000, nonce=None):
"""
Opens a new EncryptedFile and creates a header for it ready for
writing encrypted data.
:param list keys: Encrypted keys to put in the header.
:param int chunk_size: Size of encrypted chunks in bytes, default is 1MB
:param bytes nonce: 20 byte Nonce to use for encryption
"""
self.file_obj = open(self.path, mode=self.mode)
self._build_header(nonce=nonce, keys=keys, chunk_size=chunk_size)
def open(self, is_new=False):
"""
Opens a file for Encryption/Decryption.
:param bool is_new: Is the file new (and empty)?
"""
# TODO: Error if self.file_obj is already defined
self.file_obj = open(self.path, mode=self.mode)
# file_obj is now ready for reading/writing encrypted data
if not is_new:
self._read_header()
def read(self, num_chunks=0):
"""
Reads num_chunks of encrypted ciphertext and decrypt/authenticate it.
:param int num_chunks: Number of chunks to read. Default is all chunks
:param int num_chunks: Number of chunks to read. When set to 0, it will
read the all the chunks and decrypt them.
:return: List of decrypted/authenticated ciphertext chunks
:rtype: List
"""
if num_chunks == 0:
num_chunks = self.header['chunks']
if not num_chunks:
num_chunks = self.header[b'num_chunks']
chunks = []
for chunk_num in range(num_chunks):
nonce = (self.header['nonce']
+ chunk_num.to_bytes(4, byteorder='big'))
chunks.append(self._read_chunk(self.header['chunk_size'], nonce))
nonce = (self.header[b'nonce']
+ chunk_num.to_bytes(NONCE_COUNTER_BYTE_SIZE,
byteorder='big'))
chunks.append(self._read_chunk(self.header[b'chunk_size'], nonce))
return chunks
def write(self, data):
@ -177,24 +81,32 @@ class EncryptedFile(object):
:return: Number of chunks written
:rtype: int
"""
# Always start off at the last chunk_num
chunk_num = self.header['num_chunks']
# Always start writing at the end of the file, never overwrite.
self.file_obj.seek(0, os.SEEK_END)
# Start off at the last chunk_num
chunk_num = self.header[b'num_chunks']
buf_data = io.BytesIO(data)
plaintext = buf_data.read(self.header['chunk_size'])
chunks_written = 0
plaintext = buf_data.read(self.header[b'chunk_size'])
while len(plaintext) > 0:
nonce = (self.header['nonce']
+ chunk_num.to_bytes(4, byteorder='big'))
enc_msg = self.cipher.encrypt(plaintext, nonce=nonce)
self.file_obj.write(enc_msg.ciphertext)
plaintext = buf_data.read(self.header['chunk_size'])
nonce = (self.header[b'nonce']
+ chunk_num.to_bytes(NONCE_COUNTER_BYTE_SIZE,
byteorder='big'))
enc_data = self.cipher.encrypt(plaintext, nonce=nonce)
self.file_obj.write(enc_data.ciphertext)
chunks_written += 1
plaintext = buf_data.read(self.header[b'chunk_size'])
chunk_num += 1
self._update_header({'num_chunks': chunk_num})
self.header_obj.update_header({b'num_chunks': chunk_num})
return chunks_written
def close(self):
"""
Writes the header to the file_obj and closes it. Called after the user
is finished writing data to the file_obj.
Writes the header to the filesystem and closes the file_obj.
"""
header, header_length = self._encode_header()
self.file_obj.write(header + header_length)
self.header_obj.update_header()
self.file_obj.close()

89
nkms/storage/header.py Normal file
View File

@ -0,0 +1,89 @@
import msgpack
import pathlib
from nacl.utils import random
from nkms.storage.constants import NONCE_RANDOM_PREFIX_SIZE
class Header(object):
def __init__(self, header_path, header={}):
"""
Initializes a header object that contains metadata about a storage
object (ie: EncryptedFile)
:param bytes header_path: Path to the file containing the header
:param dict header: Header params to use when building the header
"""
self.path = header_path
header_file = pathlib.Path(self.path.decode())
if header_file.is_file():
self.header = self._read_header(self.path)
else:
self.header = self._build_header(**header)
self._write_header(self.path)
def _read_header(self, header_path):
"""
Reads the header file located at `header_path` and loads it from its
msgpack format into the self.header dict.
:param bytes/string header_path: The path to the header file
:return: The loaded dict from the header file
:rtype: Dict
"""
with open(header_path, mode='rb') as f:
# TODO: Use custom Exception (invalid or corrupt header)
try:
header = msgpack.loads(f.read())
except ValueError as e:
raise e
return header
def _build_header(self, version=100, nonce=None, keys=[],
chunk_size=1000000, num_chunks=0):
"""
Builds a header and sets the header dict in the `Header` object.
:param int version: Version of the NuCypher header
:param bytes nonce: Nonce to write to header, default is random(20)
:param list keys: Keys to write to header
:param int chunk_size: Size of each chunk in bytes, default is 1MB
:param int num_chunks: Number of chunks in ciphertext
:return: dict of header
:rtype: Dict
"""
if not nonce:
nonce = random(NONCE_RANDOM_PREFIX_SIZE)
return {
b'version': version,
b'nonce': nonce,
b'keys': keys,
b'chunk_size': chunk_size,
b'num_chunks': num_chunks,
}
def _write_header(self, header_path):
"""
Writes the msgpack dumped self.header dict to the file located at
`header_path`.
:param string/bytes header_path: The path to write the msgpack dumped
header to
"""
with open(header_path, mode='wb') as f:
try:
f.write(msgpack.dumps(self.header))
except ValueError as e:
raise e
def update_header(self, header={}):
"""
Updates the self.header dict with the dict in header and writes it to
the header file.
:param dict header: Values to use in the dict.update call
"""
self.header.update(header)
self._write_header(self.path)

View File

@ -28,17 +28,17 @@ class TestKeyRing(unittest.TestCase):
self.assertTrue(32, len(sig[2])) # Check s
is_valid = self.keyring_b.verify(self.msg, signature,
pubkey=self.keyring_a.sig_keypair.pub_key)
pubkey=self.keyring_a.sig_pubkey)
self.assertTrue(is_valid)
def test_encryption(self):
ciphertext = self.keyring_a.encrypt(self.msg,
pubkey=self.keyring_b.enc_keypair.pub_key)
pubkey=self.keyring_b.enc_pubkey)
self.assertNotEqual(self.msg, ciphertext)
def test_decryption(self):
ciphertext = self.keyring_a.encrypt(self.msg,
pubkey=self.keyring_b.enc_keypair.pub_key)
pubkey=self.keyring_b.enc_pubkey)
self.assertNotEqual(self.msg, ciphertext)
plaintext = self.keyring_b.decrypt(ciphertext)

View File

@ -0,0 +1,8 @@
import unittest
from nkms.storage import constants
class TestConstants(unittest.TestCase):
def test_constants(self):
self.assertEqual(4, constants.NONCE_COUNTER_BYTE_SIZE)
self.assertEqual(20, constants.NONCE_RANDOM_PREFIX_SIZE)

View File

@ -0,0 +1,90 @@
import unittest
import os
from nkms.storage import EncryptedFile
from nacl.utils import random
class TestEncryptedFile(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.key = random(32)
cls.path = b'test.nuc'
cls.header_path = b'test.nuc.header'
cls.data = random(30)
cls.enc_file_obj = EncryptedFile(cls.key, cls.path, cls.header_path)
@classmethod
def tearDownClass(cls):
os.remove(b'test.nuc')
os.remove(b'test.nuc.header')
def setUp(self):
self.enc_file = TestEncryptedFile.enc_file_obj
self.header = TestEncryptedFile.enc_file_obj.header
self.header_obj = TestEncryptedFile.enc_file_obj.header_obj
def step1_update_header(self):
updated_header = {b'chunk_size': 10}
self.header_obj.update_header(header=updated_header)
self.assertEqual(10, self.header[b'chunk_size'])
def step2_write_data(self):
# Writes the equivalent of three chunks per the updated header
chunks_written = self.enc_file.write(TestEncryptedFile.data)
self.assertEqual(3, chunks_written)
self.enc_file.close()
with open('test.nuc', 'rb') as f:
file_data = f.read()
self.assertFalse(TestEncryptedFile.data in file_data)
def step3_read_chunk(self):
enc_file = EncryptedFile(TestEncryptedFile.key, TestEncryptedFile.path,
TestEncryptedFile.header_path)
chunks = enc_file.read(num_chunks=1)
self.assertEqual(1, len(chunks))
self.assertTrue(chunks[0] in TestEncryptedFile.data)
enc_file.close()
def step4_read_all_chunks(self):
enc_file = EncryptedFile(TestEncryptedFile.key, TestEncryptedFile.path,
TestEncryptedFile.header_path)
chunks = enc_file.read()
self.assertEqual(3, len(chunks))
self.assertTrue(chunks[0] in TestEncryptedFile.data)
self.assertTrue(chunks[1] in TestEncryptedFile.data)
self.assertTrue(chunks[2] in TestEncryptedFile.data)
enc_file.close()
def step5_append_data_and_read(self):
enc_file = EncryptedFile(TestEncryptedFile.key, TestEncryptedFile.path,
TestEncryptedFile.header_path)
data = random(20)
written_chunks = enc_file.write(data)
self.assertEqual(2, written_chunks)
enc_file.close()
# After closing the object, we create another to read the data
enc_file = EncryptedFile(TestEncryptedFile.key, TestEncryptedFile.path,
TestEncryptedFile.header_path)
chunks = enc_file.read()
self.assertEqual(5, len(chunks))
self.assertTrue(chunks[0] in TestEncryptedFile.data)
self.assertTrue(chunks[1] in TestEncryptedFile.data)
self.assertTrue(chunks[2] in TestEncryptedFile.data)
self.assertTrue(chunks[3] in data)
self.assertTrue(chunks[4] in data)
enc_file.close()
def _steps(self):
for attr in sorted(dir(self)):
if not attr.startswith('step'):
continue
yield attr
def test_encrypted_file(self):
for _s in self._steps():
try:
getattr(self, _s)()
except Exception as e:
self.fail('{} failed({})'.format(_s, e))

View File

@ -0,0 +1,72 @@
import unittest
import pathlib
import msgpack
import os
from nkms.storage import Header
class TestHeader(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.header = Header(b'test_header.nuc.header')
@classmethod
def tearDownClass(cls):
os.remove(b'test_header.nuc.header')
def setUp(self):
self.header_obj = TestHeader.header
self.header = TestHeader.header.header
def step1_test_header_defaults(self):
# Test dict values
self.assertEqual(100, self.header[b'version'])
self.assertEqual(20, len(self.header[b'nonce']))
self.assertEqual(list, type(self.header[b'keys']))
self.assertEqual(0, len(self.header[b'keys']))
self.assertEqual(1000000, self.header[b'chunk_size'])
self.assertEqual(0, self.header[b'num_chunks'])
# Test path
self.assertEqual(b'test_header.nuc.header', self.header_obj.path)
# Test that the header exists on the filesystem
self.assertTrue(pathlib.Path(self.header_obj.path.decode()).is_file())
def step2_test_header_update(self):
new_header = {
b'version': 200,
b'keys': [b'test'],
b'chunk_size': 999,
}
self.header_obj.update_header(header=new_header)
self.assertEqual(200, self.header[b'version'])
self.assertEqual(1, len(self.header[b'keys']))
self.assertEqual(b'test', self.header[b'keys'][0])
self.assertEqual(999, self.header[b'chunk_size'])
# Check that the non-updated num_chunks value didn't change
self.assertEqual(0, self.header[b'num_chunks'])
def step3_test_header_read(self):
header = Header(b'test_header.nuc.header').header
self.assertEqual(200, header[b'version'])
self.assertEqual(1, len(header[b'keys']))
self.assertEqual(b'test', header[b'keys'][0])
self.assertEqual(999, header[b'chunk_size'])
self.assertEqual(0, header[b'num_chunks'])
def _steps(self):
for attr in sorted(dir(self)):
if not attr.startswith('step'):
continue
yield attr
def test_header(self):
for _s in self._steps():
try:
getattr(self, _s)()
except Exception as e:
self.fail('{} failed({})'.format(_s, e))