mirror of https://github.com/nucypher/nucypher.git
parent
393c699b5f
commit
1cde0b88e2
|
@ -61,14 +61,14 @@ class EncryptedFile(object):
|
|||
:rtype: List
|
||||
"""
|
||||
if not num_chunks:
|
||||
num_chunks = self.header['num_chunks']
|
||||
num_chunks = self.header[b'num_chunks']
|
||||
|
||||
chunks = []
|
||||
for chunk_num in range(num_chunks):
|
||||
nonce = (self.header['nonce']
|
||||
nonce = (self.header[b'nonce']
|
||||
+ chunk_num.to_bytes(NONCE_COUNTER_BYTE_SIZE,
|
||||
byteorder='big'))
|
||||
chunks.append(self._read_chunk(self.header['chunk_size'], nonce))
|
||||
chunks.append(self._read_chunk(self.header[b'chunk_size'], nonce))
|
||||
return chunks
|
||||
|
||||
def write(self, data):
|
||||
|
@ -85,23 +85,23 @@ class EncryptedFile(object):
|
|||
self.file_obj.seek(0, os.SEEK_END)
|
||||
|
||||
# Start off at the last chunk_num
|
||||
chunk_num = self.header['num_chunks']
|
||||
chunk_num = self.header[b'num_chunks']
|
||||
|
||||
buf_data = io.BytesIO(data)
|
||||
|
||||
chunks_written = 0
|
||||
plaintext = buf_data.read(self.header['chunk_size'])
|
||||
plaintext = buf_data.read(self.header[b'chunk_size'])
|
||||
while len(plaintext) > 0:
|
||||
nonce = (self.header['nonce']
|
||||
nonce = (self.header[b'nonce']
|
||||
+ chunk_num.to_bytes(NONCE_COUNTER_BYTE_SIZE,
|
||||
byteorder='big'))
|
||||
enc_data = self.cipher.encrypt(plaintext, nonce=nonce)
|
||||
self.file_obj.write(enc_data.ciphertext)
|
||||
chunks_written += 1
|
||||
|
||||
plaintext = buf_data.read(self.header['chunk_size'])
|
||||
plaintext = buf_data.read(self.header[b'chunk_size'])
|
||||
chunk_num += 1
|
||||
self.header_obj.update_header({'num_chunks': chunk_num})
|
||||
self.header_obj.update_header({b'num_chunks': chunk_num})
|
||||
return chunks_written
|
||||
|
||||
def close(self):
|
||||
|
|
|
@ -57,11 +57,11 @@ class Header(object):
|
|||
nonce = random(NONCE_RANDOM_PREFIX_SIZE)
|
||||
|
||||
return {
|
||||
'version': version,
|
||||
'nonce': nonce,
|
||||
'keys': keys,
|
||||
'chunk_size': chunk_size,
|
||||
'num_chunks': num_chunks,
|
||||
b'version': version,
|
||||
b'nonce': nonce,
|
||||
b'keys': keys,
|
||||
b'chunk_size': chunk_size,
|
||||
b'num_chunks': num_chunks,
|
||||
}
|
||||
|
||||
def _write_header(self, header_path):
|
||||
|
|
|
@ -24,9 +24,9 @@ class TestEncryptedFile(unittest.TestCase):
|
|||
self.header_obj = TestEncryptedFile.enc_file_obj.header_obj
|
||||
|
||||
def step1_update_header(self):
|
||||
updated_header = {'chunk_size': 10}
|
||||
updated_header = {b'chunk_size': 10}
|
||||
self.header_obj.update_header(header=updated_header)
|
||||
self.assertEqual(10, self.header['chunk_size'])
|
||||
self.assertEqual(10, self.header[b'chunk_size'])
|
||||
|
||||
def step2_write_data(self):
|
||||
# Writes the equivalent of three chunks per the updated header
|
||||
|
|
|
@ -20,12 +20,12 @@ class TestHeader(unittest.TestCase):
|
|||
|
||||
def step1_test_header_defaults(self):
|
||||
# Test dict values
|
||||
self.assertEqual(100, self.header['version'])
|
||||
self.assertEqual(20, len(self.header['nonce']))
|
||||
self.assertEqual(list, type(self.header['keys']))
|
||||
self.assertEqual(0, len(self.header['keys']))
|
||||
self.assertEqual(1000000, self.header['chunk_size'])
|
||||
self.assertEqual(0, self.header['num_chunks'])
|
||||
self.assertEqual(100, self.header[b'version'])
|
||||
self.assertEqual(20, len(self.header[b'nonce']))
|
||||
self.assertEqual(list, type(self.header[b'keys']))
|
||||
self.assertEqual(0, len(self.header[b'keys']))
|
||||
self.assertEqual(1000000, self.header[b'chunk_size'])
|
||||
self.assertEqual(0, self.header[b'num_chunks'])
|
||||
|
||||
# Test path
|
||||
self.assertEqual(b'test_header.nuc.header', self.header_obj.path)
|
||||
|
@ -35,19 +35,19 @@ class TestHeader(unittest.TestCase):
|
|||
|
||||
def step2_test_header_update(self):
|
||||
new_header = {
|
||||
'version': 200,
|
||||
'keys': [b'test'],
|
||||
'chunk_size': 999,
|
||||
b'version': 200,
|
||||
b'keys': [b'test'],
|
||||
b'chunk_size': 999,
|
||||
}
|
||||
self.header_obj.update_header(header=new_header)
|
||||
|
||||
self.assertEqual(200, self.header['version'])
|
||||
self.assertEqual(1, len(self.header['keys']))
|
||||
self.assertEqual(b'test', self.header['keys'][0])
|
||||
self.assertEqual(999, self.header['chunk_size'])
|
||||
self.assertEqual(200, self.header[b'version'])
|
||||
self.assertEqual(1, len(self.header[b'keys']))
|
||||
self.assertEqual(b'test', self.header[b'keys'][0])
|
||||
self.assertEqual(999, self.header[b'chunk_size'])
|
||||
|
||||
# Check that the non-updated num_chunks value didn't change
|
||||
self.assertEqual(0, self.header['num_chunks'])
|
||||
self.assertEqual(0, self.header[b'num_chunks'])
|
||||
|
||||
def step3_test_header_read(self):
|
||||
header = Header(b'test_header.nuc.header').header
|
||||
|
|
Loading…
Reference in New Issue