Tests: HAL API: Watchdog: Add time accuracy tests

pull/10657/head
Filip Jagodzinski 2018-01-15 10:35:45 +01:00 committed by Filip Jagodzinski
parent f1e744d4a0
commit c4d9300f9d
3 changed files with 249 additions and 6 deletions

View File

@ -21,15 +21,18 @@ from mbed_host_tests import BaseHostTest
TestCaseData = collections.namedtuple('TestCaseData', ['index', 'data_to_send'])
DEFAULT_CYCLE_PERIOD = 4.0
MAX_HB_PERIOD = 2.5 # [s] Max expected heartbeat period.
MSG_VALUE_DUMMY = '0'
CASE_DATA_INVALID = 0xffffffff
CASE_DATA_PHASE2_OK = 0xfffffffe
CASE_DATA_INSUFF_HB = 0x0
MSG_KEY_SYNC = '__sync'
MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_START_CASE = 'start_case'
MSG_KEY_DEVICE_RESET = 'dev_reset'
MSG_KEY_HEARTBEAT = 'hb'
class WatchdogReset(BaseHostTest):
@ -48,14 +51,19 @@ class WatchdogReset(BaseHostTest):
self.__handshake_timer = None
cycle_s = self.get_config_item('program_cycle_s')
self.program_cycle_s = cycle_s if cycle_s is not None else DEFAULT_CYCLE_PERIOD
self.drop_heartbeat_messages = True
self.hb_timestamps_us = []
def handshake_timer_start(self, seconds=1.0):
"""Start a new handshake timer.
def handshake_timer_start(self, seconds=1.0, pre_sync_fun=None):
"""Start a new handshake timer."""
Perform a dev-host handshake by sending a sync message.
"""
self.__handshake_timer = threading.Timer(seconds, self.send_kv,
[MSG_KEY_SYNC, MSG_VALUE_DUMMY])
def timer_handler():
"""Perform a dev-host handshake by sending a sync message."""
if pre_sync_fun is not None:
pre_sync_fun()
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
self.__handshake_timer = threading.Timer(seconds, timer_handler)
self.__handshake_timer.start()
def handshake_timer_cancel(self):
@ -67,9 +75,28 @@ class WatchdogReset(BaseHostTest):
finally:
self.__handshake_timer = None
def heartbeat_timeout_handler(self):
"""Handler for the heartbeat timeout.
Compute the time span of the last heartbeat sequence.
Set self.current_case.data_to_send to CASE_DATA_INVALID if no heartbeat was received.
Set self.current_case.data_to_send to CASE_DATA_INSUFF_HB if only one heartbeat was
received.
"""
self.drop_heartbeat_messages = True
dev_data = CASE_DATA_INVALID
if len(self.hb_timestamps_us) == 1:
dev_data = CASE_DATA_INSUFF_HB
self.log('Not enough heartbeats received.')
elif len(self.hb_timestamps_us) >= 2:
dev_data = int(round(0.001 * (self.hb_timestamps_us[-1] - self.hb_timestamps_us[0])))
self.log('Heartbeat time span was {} ms.'.format(dev_data))
self.current_case = TestCaseData(self.current_case.index, dev_data)
def setup(self):
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
self.register_callback(MSG_KEY_HEARTBEAT, self.cb_heartbeat)
def teardown(self):
self.handshake_timer_cancel()
@ -82,6 +109,8 @@ class WatchdogReset(BaseHostTest):
self.handshake_timer_cancel()
msg_value = '{0.index:02x},{0.data_to_send:08x}'.format(self.current_case)
self.send_kv(MSG_KEY_START_CASE, msg_value)
self.drop_heartbeat_messages = False
self.hb_timestamps_us = []
def cb_device_reset(self, key, value, timestamp):
"""Keep track of the test case number.
@ -94,3 +123,22 @@ class WatchdogReset(BaseHostTest):
case_num, dev_reset_delay_ms = (int(i, base=16) for i in value.split(','))
self.current_case = TestCaseData(case_num, CASE_DATA_PHASE2_OK)
self.handshake_timer_start(self.program_cycle_s + dev_reset_delay_ms / 1000.0)
def cb_heartbeat(self, key, value, timestamp):
"""Save the timestamp of a heartbeat message.
Additionally, keep track of the test case number.
Also each heartbeat sets a new timeout, so when the device gets
restarted by the watchdog, the communication will be restored
by the __handshake_timer.
"""
if self.drop_heartbeat_messages:
return
self.handshake_timer_cancel()
case_num, timestamp_us = (int(i, base=16) for i in value.split(','))
self.current_case = TestCaseData(case_num, CASE_DATA_INVALID)
self.hb_timestamps_us.append(timestamp_us)
self.handshake_timer_start(
seconds=(MAX_HB_PERIOD + self.program_cycle_s),
pre_sync_fun=self.heartbeat_timeout_handler)

View File

@ -0,0 +1,148 @@
/* mbed Microcontroller Library
* Copyright (c) 2018 ARM Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if !DEVICE_WATCHDOG
#error [NOT_SUPPORTED] Watchdog not supported for this target
#endif
#include "greentea-client/test_env.h"
#include "utest/utest.h"
#include "unity/unity.h"
#include "hal/watchdog_api.h"
#include "watchdog_timing_tests.h"
#define MSG_VALUE_DUMMY "0"
#define CASE_DATA_INVALID 0xffffffffUL
#define MSG_VALUE_LEN 24
#define MSG_KEY_LEN 24
#define MSG_KEY_DEVICE_READY "ready"
#define MSG_KEY_START_CASE "start_case"
#define MSG_KEY_HEARTBEAT "hb"
using utest::v1::Case;
using utest::v1::Specification;
using utest::v1::Harness;
struct testcase_data {
int index;
int start_index;
uint32_t received_data;
};
testcase_data current_case;
template<uint32_t timeout_ms, uint32_t delta_ms>
void test_timing()
{
// Phase 2. -- verify the test results.
// Verify the heartbeat time span sent by host is within given delta.
if (current_case.received_data != CASE_DATA_INVALID) {
TEST_ASSERT_UINT32_WITHIN(delta_ms, timeout_ms, current_case.received_data);
current_case.received_data = CASE_DATA_INVALID;
return;
}
// Phase 1. -- run the test code.
// Send heartbeat messages to host until the watchdeg resets the device.
const ticker_data_t * const us_ticker = get_us_ticker_data();
us_timestamp_t current_ts, next_ts, expected_reset_ts, divider, ts_increment;
char msg_value[12];
watchdog_config_t config = { timeout_ms };
TEST_ASSERT_EQUAL(WATCHDOG_STATUS_OK, hal_watchdog_init(&config));
next_ts = ticker_read_us(us_ticker);
expected_reset_ts = next_ts + 1000ULL * timeout_ms;
divider = 0x2ULL;
while (1) {
current_ts = ticker_read_us(us_ticker);
if (current_ts < next_ts) {
continue;
}
int str_len = snprintf(msg_value, sizeof msg_value, "%02x,%08lx", current_case.start_index + current_case.index,
(uint32_t) current_ts);
if (str_len != (sizeof msg_value) - 1) {
utest_printf("Failed to compose a value string to be sent to host.");
return;
}
greentea_send_kv(MSG_KEY_HEARTBEAT, msg_value);
// The closer to expected reset, the smaller heartbeat time difference.
// This should reduce measurement error present for heartbeat with
// equal periods.
ts_increment = (1000ULL * timeout_ms / divider);
next_ts += ts_increment;
if (current_ts <= expected_reset_ts) {
divider <<= 1;
} else if (divider > 0x2ULL) {
divider >>= 1;
}
}
}
utest::v1::status_t case_setup(const Case * const source, const size_t index_of_case)
{
current_case.index = index_of_case;
return utest::v1::greentea_case_setup_handler(source, index_of_case);
}
int testsuite_setup(const size_t number_of_cases)
{
GREENTEA_SETUP(90, "watchdog_reset");
utest::v1::status_t status = utest::v1::greentea_test_setup_handler(number_of_cases);
if (status != utest::v1::STATUS_CONTINUE) {
return status;
}
char key[MSG_KEY_LEN + 1] = { };
char value[MSG_VALUE_LEN + 1] = { };
greentea_send_kv(MSG_KEY_DEVICE_READY, MSG_VALUE_DUMMY);
greentea_parse_kv(key, value, MSG_KEY_LEN, MSG_VALUE_LEN);
if (strcmp(key, MSG_KEY_START_CASE) != 0) {
utest_printf("Invalid message key.\n");
return utest::v1::STATUS_ABORT;
}
int num_args = sscanf(value, "%02x,%08lx", &(current_case.start_index), &(current_case.received_data));
if (num_args == 0 || num_args == EOF) {
utest_printf("Invalid data received from host\n");
return utest::v1::STATUS_ABORT;
}
utest_printf("This test suite is composed of %i test cases. Starting at index %i.\n", number_of_cases,
current_case.start_index);
return current_case.start_index;
}
Case cases[] = {
Case("Timing, 200 ms", case_setup, test_timing<200UL, 55UL>),
Case("Timing, 500 ms", case_setup, test_timing<500UL, 65UL>),
Case("Timing, 1000 ms", case_setup, test_timing<1000UL, 130UL>),
Case("Timing, 3000 ms", case_setup, test_timing<3000UL, 190UL>),
};
Specification specification((utest::v1::test_setup_handler_t) testsuite_setup, cases);
int main()
{
// Harness will start with a test case index provided by host script.
return !Harness::run(specification);
}

View File

@ -0,0 +1,47 @@
/* mbed Microcontroller Library
* Copyright (c) 2018 ARM Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @addtogroup hal_watchdog_tests
* @{
*/
#ifndef MBED_HAL_WATCHDOG_TIMING_TESTS_H
#define MBED_HAL_WATCHDOG_TIMING_TESTS_H
#if DEVICE_WATCHDOG
/** Test watchdog timing accuracy
*
* Phase 1.
* Given a watchdog timer started with a timeout value of X ms
* When given time of X ms elapses
* Then the device is restarted by the watchdog
*
* Phase 2.
* Given a device restarted by the watchdog timer
* When the device receives time measurement from the host
* Then time measured by host equals X ms
*/
template<uint32_t timeout_ms, uint32_t delta_ms>
void test_timing();
#endif
#endif
/** @}*/