Fix TRNG test to use reset from python script and not from code

pull/7057/head
David Saada 2018-07-26 12:07:09 +03:00 committed by Yossi Levy
parent 9b0b63169c
commit b2b14ca26f
2 changed files with 89 additions and 70 deletions

View File

@ -24,16 +24,18 @@ for more details see main.cpp file)
import time import time
from mbed_host_tests import BaseHostTest from mbed_host_tests import BaseHostTest
from mbed_host_tests.host_tests_runner.host_test_default import DefaultTestSelector from mbed_host_tests.host_tests_runner.host_test_default import DefaultTestSelector
from time import sleep
DEFAULT_CYCLE_PERIOD = 1.0 DEFAULT_CYCLE_PERIOD = 1.0
MSG_VALUE_DUMMY = '0' MSG_VALUE_DUMMY = '0'
MSG_TRNG_READY = 'ready' MSG_TRNG_READY = 'ready'
MSG_TRNG_BUFFER = 'buffer' MSG_TRNG_BUFFER = 'buffer'
MSG_TRNG_FINISH = 'finish'
MSG_TRNG_TEST_STEP1 = 'check_step1' MSG_TRNG_TEST_STEP1 = 'check_step1'
MSG_TRNG_TEST_STEP2 = 'check_step2' MSG_TRNG_TEST_STEP2 = 'check_step2'
MSG_KEY_SYNC = '__sync' MSG_KEY_SYNC = '__sync'
MSG_KEY_TEST_SUITE_ENDED = 'Test suite ended' MSG_KEY_RESET_COMPLETE = 'reset_complete'
MSG_KEY_TEST_SUITE_ENDED = 'Test_suite_ended'
MSG_KEY_EXIT = 'exit'
class TRNGResetTest(BaseHostTest): class TRNGResetTest(BaseHostTest):
"""Test for the TRNG API. """Test for the TRNG API.
@ -41,12 +43,10 @@ class TRNGResetTest(BaseHostTest):
def __init__(self): def __init__(self):
super(TRNGResetTest, self).__init__() super(TRNGResetTest, self).__init__()
self.reset = False self.did_reset = False
self.finish = False self.ready = False
self.suite_ended = False self.suite_ended = False
self.buffer = 0 self.buffer = 0
cycle_s = self.get_config_item('program_cycle_s')
self.program_cycle_s = cycle_s if cycle_s is not None else DEFAULT_CYCLE_PERIOD
self.test_steps_sequence = self.test_steps() self.test_steps_sequence = self.test_steps()
# Advance the coroutine to it's first yield statement. # Advance the coroutine to it's first yield statement.
self.test_steps_sequence.send(None) self.test_steps_sequence.send(None)
@ -55,8 +55,8 @@ class TRNGResetTest(BaseHostTest):
def setup(self): def setup(self):
self.register_callback(MSG_TRNG_READY, self.cb_device_ready) self.register_callback(MSG_TRNG_READY, self.cb_device_ready)
self.register_callback(MSG_TRNG_BUFFER, self.cb_trng_buffer) self.register_callback(MSG_TRNG_BUFFER, self.cb_trng_buffer)
self.register_callback(MSG_TRNG_FINISH, self.cb_device_finish)
self.register_callback(MSG_KEY_TEST_SUITE_ENDED, self.cb_device_test_suit_ended) self.register_callback(MSG_KEY_TEST_SUITE_ENDED, self.cb_device_test_suit_ended)
self.register_callback(MSG_KEY_RESET_COMPLETE, self.cb_reset_complete)
#receive sent data from device before reset #receive sent data from device before reset
def cb_trng_buffer(self, key, value, timestamp): def cb_trng_buffer(self, key, value, timestamp):
@ -64,10 +64,16 @@ class TRNGResetTest(BaseHostTest):
""" """
self.buffer = value self.buffer = value
try:
if self.test_steps_sequence.send(value):
self.notify_complete(True)
except (StopIteration, RuntimeError) as exc:
self.notify_complete(False)
def cb_device_ready(self, key, value, timestamp): def cb_device_ready(self, key, value, timestamp):
"""Acknowledge device rebooted correctly and feed the test execution """Acknowledge device rebooted correctly and feed the test execution
""" """
self.reset = True self.ready = True
try: try:
if self.test_steps_sequence.send(value): if self.test_steps_sequence.send(value):
@ -75,10 +81,10 @@ class TRNGResetTest(BaseHostTest):
except (StopIteration, RuntimeError) as exc: except (StopIteration, RuntimeError) as exc:
self.notify_complete(False) self.notify_complete(False)
def cb_device_finish(self, key, value, timestamp): def cb_reset_complete(self, key, value, timestamp):
"""Acknowledge device finished a test step correctly and feed the test execution """Acknowledge reset complete
""" """
self.finish = True self.did_reset = True
try: try:
if self.test_steps_sequence.send(value): if self.test_steps_sequence.send(value):
@ -103,31 +109,37 @@ class TRNGResetTest(BaseHostTest):
""" """
wait_for_communication = yield wait_for_communication = yield
self.reset = False self.ready = False
self.did_reset = False
self.suite_ended = False
self.send_kv(MSG_TRNG_TEST_STEP1, MSG_VALUE_DUMMY) self.send_kv(MSG_TRNG_TEST_STEP1, MSG_VALUE_DUMMY)
time.sleep(self.program_cycle_s) wait_for_communication = yield
if self.buffer == 0:
raise RuntimeError('Phase 1: No buffer received.')
self.reset()
"""Test step 2 (After reset)
"""
wait_for_communication = yield
if self.did_reset == False:
raise RuntimeError('Phase 1: Platform did not reset as expected.')
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY) self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
wait_for_communication = yield wait_for_communication = yield
if self.reset == False: if self.ready == False:
raise RuntimeError('Phase 1: Platform did not reset as expected.') raise RuntimeError('Phase 1: Platform not ready as expected.')
"""Test step 2 (After reset)
"""
self.finish = False
self.send_kv(MSG_TRNG_TEST_STEP2, self.buffer) self.send_kv(MSG_TRNG_TEST_STEP2, self.buffer)
time.sleep(self.program_cycle_s)
wait_for_communication = yield
if self.finish == False:
raise RuntimeError('Test failed.')
wait_for_communication = yield wait_for_communication = yield
if self.suite_ended == False: if self.suite_ended == False:
raise RuntimeError('Test failed.') raise RuntimeError('Test failed.')
self.send_kv(MSG_KEY_EXIT, MSG_VALUE_DUMMY)
# The sequence is correct -- test passed. # The sequence is correct -- test passed.
yield True yield

View File

@ -59,6 +59,7 @@
#define MSG_TRNG_READY "ready" #define MSG_TRNG_READY "ready"
#define MSG_TRNG_BUFFER "buffer" #define MSG_TRNG_BUFFER "buffer"
#define MSG_TRNG_EXIT "exit"
#define MSG_TRNG_TEST_STEP1 "check_step1" #define MSG_TRNG_TEST_STEP1 "check_step1"
#define MSG_TRNG_TEST_STEP2 "check_step2" #define MSG_TRNG_TEST_STEP2 "check_step2"
@ -102,11 +103,19 @@ static int fill_buffer_trng(uint8_t *buffer, trng_t *trng_obj, size_t trng_len)
static void compress_and_compare(char *key, char *value) static void compress_and_compare(char *key, char *value)
{ {
trng_t trng_obj; trng_t trng_obj;
uint8_t out_comp_buf[(BUFFER_LEN * 5) + 32] = {0}, buffer[BUFFER_LEN] = {0}; uint8_t *out_comp_buf, *buffer;
uint8_t input_buf[BUFFER_LEN * 4] = {0}, temp_buf[BUFFER_LEN * 2] = {0}; uint8_t *input_buf, *temp_buf;
size_t comp_sz = 0; size_t comp_sz = 0;
unsigned int result = 0; unsigned int result = 0;
#define OUT_COMP_BUF_SIZE ((BUFFER_LEN * 5) + 32)
#define TEMP_BUF_SIZE (BUFFER_LEN * 2)
out_comp_buf = new uint8_t[OUT_COMP_BUF_SIZE];
buffer = new uint8_t[BUFFER_LEN];
input_buf = new uint8_t[BUFFER_LEN * 4];
temp_buf = new uint8_t[BUFFER_LEN * 2];
#if NVSTORE_RESET #if NVSTORE_RESET
NVStore& nvstore = NVStore::get_instance(); NVStore& nvstore = NVStore::get_instance();
#endif #endif
@ -115,7 +124,7 @@ static void compress_and_compare(char *key, char *value)
if (strcmp(key, MSG_TRNG_TEST_STEP2) == 0) { if (strcmp(key, MSG_TRNG_TEST_STEP2) == 0) {
#if NVSTORE_RESET #if NVSTORE_RESET
uint16_t actual = 0; uint16_t actual = 0;
result = nvstore.get(NVKEY, sizeof(buffer), buffer, actual); result = nvstore.get(NVKEY, BUFFER_LEN, buffer, actual);
TEST_ASSERT_EQUAL(RESULT_SUCCESS, result); TEST_ASSERT_EQUAL(RESULT_SUCCESS, result);
#else #else
/*Using base64 to decode data sent from host*/ /*Using base64 to decode data sent from host*/
@ -130,70 +139,72 @@ static void compress_and_compare(char *key, char *value)
&charsProcessed); &charsProcessed);
TEST_ASSERT_EQUAL(0, result); TEST_ASSERT_EQUAL(0, result);
#endif #endif
memcpy(input_buf, buffer, sizeof(buffer)); memcpy(input_buf, buffer, BUFFER_LEN);
} }
if (strcmp(key, MSG_TRNG_TEST_STEP1) == 0) { if (strcmp(key, MSG_TRNG_TEST_STEP1) == 0) {
/*Fill buffer with trng values*/ /*Fill buffer with trng values*/
result = fill_buffer_trng(buffer, &trng_obj, sizeof(buffer)); result = fill_buffer_trng(buffer, &trng_obj, BUFFER_LEN);
TEST_ASSERT_EQUAL(0, result); TEST_ASSERT_EQUAL(0, result);
memcpy(input_buf, buffer, sizeof(buffer)); memcpy(input_buf, buffer, BUFFER_LEN);
} }
/*pithy_Compress will try to compress the random data, if it succeeded it means the data is not really random*/ /*pithy_Compress will try to compress the random data, if it succeeded it means the data is not really random*/
else if (strcmp(key, MSG_TRNG_TEST_STEP2) == 0) { else if (strcmp(key, MSG_TRNG_TEST_STEP2) == 0) {
comp_sz = pithy_Compress((char *)buffer, comp_sz = pithy_Compress((char *)buffer,
sizeof(buffer), BUFFER_LEN,
(char *)out_comp_buf, (char *)out_comp_buf,
sizeof(out_comp_buf), OUT_COMP_BUF_SIZE,
9); 9);
TEST_ASSERT_MESSAGE(comp_sz > sizeof(buffer), TEST_ASSERT_MESSAGE(comp_sz > BUFFER_LEN,
"TRNG_TEST_STEP1: trng_get_bytes was able to compress thus not random"); "TRNG_TEST_STEP1: trng_get_bytes was able to compress thus not random");
/*pithy_Compress will try to compress the random data with a different buffer sizem*/ /*pithy_Compress will try to compress the random data with a different buffer sizem*/
result = fill_buffer_trng(temp_buf, &trng_obj, sizeof(temp_buf)); result = fill_buffer_trng(temp_buf, &trng_obj, TEMP_BUF_SIZE);
TEST_ASSERT_EQUAL(0, result); TEST_ASSERT_EQUAL(0, result);
comp_sz = pithy_Compress((char *)temp_buf, comp_sz = pithy_Compress((char *)temp_buf,
sizeof(temp_buf), TEMP_BUF_SIZE,
(char *)out_comp_buf, (char *)out_comp_buf,
sizeof(out_comp_buf), OUT_COMP_BUF_SIZE,
9); 9);
TEST_ASSERT_MESSAGE(comp_sz > sizeof(temp_buf), TEST_ASSERT_MESSAGE(comp_sz > TEMP_BUF_SIZE,
"TRNG_TEST_STEP2: trng_get_bytes was able to compress thus not random"); "TRNG_TEST_STEP2: trng_get_bytes was able to compress thus not random");
memcpy(input_buf + sizeof(buffer), temp_buf, sizeof(temp_buf)); memcpy(input_buf + BUFFER_LEN, temp_buf, TEMP_BUF_SIZE);
/*pithy_Compress will try to compress the random data from before reset concatenated with new random data*/ /*pithy_Compress will try to compress the random data from before reset concatenated with new random data*/
comp_sz = pithy_Compress((char *)input_buf, comp_sz = pithy_Compress((char *)input_buf,
sizeof(temp_buf) + sizeof(buffer), TEMP_BUF_SIZE + BUFFER_LEN,
(char *)out_comp_buf, (char *)out_comp_buf,
sizeof(out_comp_buf), OUT_COMP_BUF_SIZE,
9); 9);
TEST_ASSERT_MESSAGE(comp_sz > sizeof(temp_buf) + sizeof(buffer), TEST_ASSERT_MESSAGE(comp_sz > TEMP_BUF_SIZE + BUFFER_LEN,
"TRNG_TEST_STEP3: concatenated buffer after reset was able to compress thus not random"); "TRNG_TEST_STEP3: concatenated buffer after reset was able to compress thus not random");
greentea_send_kv(MSG_TRNG_TEST_SUITE_ENDED, MSG_VALUE_DUMMY);
} }
/*At the end of step 1 store trng buffer and reset the device*/ /*At the end of step 1 store trng buffer and reset the device*/
if (strcmp(key, MSG_TRNG_TEST_STEP1) == 0) { if (strcmp(key, MSG_TRNG_TEST_STEP1) == 0) {
int result = 0; int result = 0;
#if NVSTORE_RESET #if NVSTORE_RESET
result = nvstore.set(NVKEY, sizeof(buffer), buffer); result = nvstore.set(NVKEY, BUFFER_LEN, buffer);
TEST_ASSERT_EQUAL(RESULT_SUCCESS, result); TEST_ASSERT_EQUAL(RESULT_SUCCESS, result);
#else #else
/*Using base64 to encode data sending from host*/ /*Using base64 to encode data sending from host*/
result = trng_EncodeBase64(buffer, result = trng_EncodeBase64(buffer,
BUFFER_LEN, BUFFER_LEN,
(char *)out_comp_buf, (char *)out_comp_buf,
sizeof(out_comp_buf)); OUT_COMP_BUF_SIZE);
TEST_ASSERT_EQUAL(RESULT_SUCCESS, result); TEST_ASSERT_EQUAL(RESULT_SUCCESS, result);
greentea_send_kv(MSG_TRNG_BUFFER, (const char *)out_comp_buf); greentea_send_kv(MSG_TRNG_BUFFER, (const char *)out_comp_buf);
#endif #endif
system_reset();
TEST_ASSERT_MESSAGE(false, "system_reset() did not reset the device as expected.");
} }
return; delete[] out_comp_buf;
delete[] buffer;
delete[] input_buf;
delete[] temp_buf;
} }
/*This method call first and second steps, it directs by the key received from the host*/ /*This method call first and second steps, it directs by the key received from the host*/
@ -201,34 +212,31 @@ void trng_test()
{ {
greentea_send_kv(MSG_TRNG_READY, MSG_VALUE_DUMMY); greentea_send_kv(MSG_TRNG_READY, MSG_VALUE_DUMMY);
static char key[MSG_KEY_LEN + 1] = { }; char key[MSG_KEY_LEN + 1] = { };
static char value[MSG_VALUE_LEN + 1] = { }; char *value = new char[MSG_VALUE_LEN + 1];
memset(key, 0, MSG_KEY_LEN + 1); do {
memset(value, 0, MSG_VALUE_LEN + 1); memset(key, 0, MSG_KEY_LEN + 1);
memset(value, 0, MSG_VALUE_LEN + 1);
greentea_parse_kv(key, value, MSG_KEY_LEN, MSG_VALUE_LEN); greentea_parse_kv(key, value, MSG_KEY_LEN, MSG_VALUE_LEN);
if (strcmp(key, MSG_TRNG_TEST_STEP1) == 0) { if (strcmp(key, MSG_TRNG_TEST_STEP1) == 0) {
/*create trng data buffer and try to compress it, store it for later checks*/ /*create trng data buffer and try to compress it, store it for later checks*/
compress_and_compare(key, value); compress_and_compare(key, value);
return trng_test(); }
}
if (strcmp(key, MSG_TRNG_TEST_STEP2) == 0) { if (strcmp(key, MSG_TRNG_TEST_STEP2) == 0) {
/*create another trng data buffer and concatenate it to the stored trng data buffer /*create another trng data buffer and concatenate it to the stored trng data buffer
try to compress them both*/ try to compress them both*/
compress_and_compare(key, value); compress_and_compare(key, value);
} }
} } while (strcmp(key, MSG_TRNG_EXIT) != 0);
utest::v1::status_t greentea_failure_handler(const Case *const source, const failure_t reason) delete[] value;
{
greentea_case_failure_abort_handler(source, reason);
return STATUS_CONTINUE;
} }
Case cases[] = { Case cases[] = {
Case("TRNG: trng_test", trng_test, greentea_failure_handler), Case("TRNG: trng_test", trng_test),
}; };
utest::v1::status_t greentea_test_setup(const size_t number_of_cases) utest::v1::status_t greentea_test_setup(const size_t number_of_cases)
@ -242,7 +250,6 @@ Specification specification(greentea_test_setup, cases, greentea_test_teardown_h
int main() int main()
{ {
bool ret = !Harness::run(specification); bool ret = !Harness::run(specification);
greentea_send_kv(MSG_TRNG_TEST_SUITE_ENDED, MSG_VALUE_DUMMY);
return ret; return ret;
} }