mirror of https://github.com/ARMmbed/mbed-os.git
Duplicate host tests for drivers
parent
0a759aaa38
commit
bb5c2cf32a
|
@ -0,0 +1,32 @@
|
||||||
|
"""
|
||||||
|
Copyright (c) 2011-2020, Arm Limited and affiliates
|
||||||
|
SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
from mbed_host_tests import BaseHostTest
|
||||||
|
|
||||||
|
class Device_Echo(BaseHostTest):
|
||||||
|
|
||||||
|
def _callback_repeat(self, key, value, _):
|
||||||
|
self.send_kv(key, value)
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
self.register_callback("echo", self._callback_repeat)
|
||||||
|
self.register_callback("echo_count", self._callback_repeat)
|
||||||
|
|
||||||
|
def teardown(self):
|
||||||
|
pass
|
|
@ -0,0 +1,181 @@
|
||||||
|
"""
|
||||||
|
Copyright (c) 2018-2019 Arm Limited and affiliates.
|
||||||
|
SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
from mbed_host_tests import BaseHostTest
|
||||||
|
|
||||||
|
DEFAULT_SYNC_DELAY = 4.0
|
||||||
|
|
||||||
|
MSG_VALUE_WATCHDOG_PRESENT = 1
|
||||||
|
MSG_VALUE_DUMMY = '0'
|
||||||
|
MSG_VALUE_RESET_REASON_GET = 'get'
|
||||||
|
MSG_VALUE_RESET_REASON_CLEAR = 'clear'
|
||||||
|
MSG_VALUE_DEVICE_RESET_NVIC = 'nvic'
|
||||||
|
MSG_VALUE_DEVICE_RESET_WATCHDOG = 'watchdog'
|
||||||
|
|
||||||
|
MSG_KEY_DEVICE_READY = 'ready'
|
||||||
|
MSG_KEY_RESET_REASON_RAW = 'reason_raw'
|
||||||
|
MSG_KEY_RESET_REASON = 'reason'
|
||||||
|
MSG_KEY_DEVICE_RESET = 'reset'
|
||||||
|
MSG_KEY_SYNC = '__sync'
|
||||||
|
MSG_KEY_RESET_COMPLETE = 'reset_complete'
|
||||||
|
|
||||||
|
RESET_REASONS = {
|
||||||
|
'POWER_ON': '0',
|
||||||
|
'PIN_RESET': '1',
|
||||||
|
'BROWN_OUT': '2',
|
||||||
|
'SOFTWARE': '3',
|
||||||
|
'WATCHDOG': '4',
|
||||||
|
'LOCKUP': '5',
|
||||||
|
'WAKE_LOW_POWER': '6',
|
||||||
|
'ACCESS_ERROR': '7',
|
||||||
|
'BOOT_ERROR': '8',
|
||||||
|
'MULTIPLE': '9',
|
||||||
|
'PLATFORM': '10',
|
||||||
|
'UNKNOWN': '11'
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def raise_if_different(expected, actual, text=''):
|
||||||
|
"""Raise a RuntimeError if actual is different than expected."""
|
||||||
|
if expected != actual:
|
||||||
|
raise RuntimeError('{}Got {!r}, expected {!r}'
|
||||||
|
.format(text, actual, expected))
|
||||||
|
|
||||||
|
|
||||||
|
class ResetReasonTest(BaseHostTest):
|
||||||
|
"""Test for the Reset Reason HAL API.
|
||||||
|
|
||||||
|
Given a device supporting a Reset Reason API.
|
||||||
|
When the device is restarted using various methods.
|
||||||
|
Then the device returns a correct reset reason for every restart.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(ResetReasonTest, self).__init__()
|
||||||
|
self.device_reasons = None
|
||||||
|
self.device_has_watchdog = None
|
||||||
|
self.raw_reset_reasons = set()
|
||||||
|
self.sync_delay = DEFAULT_SYNC_DELAY
|
||||||
|
self.test_steps_sequence = self.test_steps()
|
||||||
|
# Advance the coroutine to it's first yield statement.
|
||||||
|
self.test_steps_sequence.send(None)
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
sync_delay = self.get_config_item('forced_reset_timeout')
|
||||||
|
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
|
||||||
|
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
|
||||||
|
self.register_callback(MSG_KEY_RESET_REASON_RAW, self.cb_reset_reason_raw)
|
||||||
|
self.register_callback(MSG_KEY_RESET_REASON, self.cb_reset_reason)
|
||||||
|
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_reset_reason)
|
||||||
|
self.register_callback(MSG_KEY_RESET_COMPLETE, self.cb_reset_reason)
|
||||||
|
|
||||||
|
def cb_device_ready(self, key, value, timestamp):
|
||||||
|
"""Request a raw value of the reset_reason register.
|
||||||
|
|
||||||
|
Additionally, save the device's reset_reason capabilities
|
||||||
|
and the watchdog status on the first call.
|
||||||
|
"""
|
||||||
|
if self.device_reasons is None:
|
||||||
|
reasons, wdg_status = (int(i, base=16) for i in value.split(','))
|
||||||
|
self.device_has_watchdog = (wdg_status == MSG_VALUE_WATCHDOG_PRESENT)
|
||||||
|
self.device_reasons = [k for k, v in RESET_REASONS.items() if (reasons & 1 << int(v))]
|
||||||
|
self.send_kv(MSG_KEY_RESET_REASON_RAW, MSG_VALUE_RESET_REASON_GET)
|
||||||
|
|
||||||
|
def cb_reset_reason_raw(self, key, value, timestamp):
|
||||||
|
"""Verify that the raw reset_reason register value is unique.
|
||||||
|
|
||||||
|
Fail the test suite if the raw reset_reason value is not unique.
|
||||||
|
Request a platform independent reset_reason otherwise.
|
||||||
|
"""
|
||||||
|
if value in self.raw_reset_reasons:
|
||||||
|
self.log('TEST FAILED: The raw reset reason is not unique. '
|
||||||
|
'{!r} is already present in {!r}.'
|
||||||
|
.format(value, self.raw_reset_reasons))
|
||||||
|
self.notify_complete(False)
|
||||||
|
else:
|
||||||
|
self.raw_reset_reasons.add(value)
|
||||||
|
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_GET)
|
||||||
|
|
||||||
|
def cb_reset_reason(self, key, value, timestamp):
|
||||||
|
"""Feed the test_steps coroutine with reset_reason value.
|
||||||
|
|
||||||
|
Pass the test suite if the coroutine yields True.
|
||||||
|
Fail the test suite if the iterator stops or raises a RuntimeError.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if self.test_steps_sequence.send(value):
|
||||||
|
self.notify_complete(True)
|
||||||
|
except (StopIteration, RuntimeError) as exc:
|
||||||
|
self.log('TEST FAILED: {}'.format(exc))
|
||||||
|
self.notify_complete(False)
|
||||||
|
|
||||||
|
def test_steps(self):
|
||||||
|
"""Generate a sequence of test steps.
|
||||||
|
|
||||||
|
This coroutine calls yield to wait for the input from the device
|
||||||
|
(the reset_reason). If the device gives the wrong response, the
|
||||||
|
generator raises a RuntimeError exception and fails the test.
|
||||||
|
"""
|
||||||
|
# Ignore the first reason.
|
||||||
|
__ignored_reset_reason = yield
|
||||||
|
self.raw_reset_reasons.clear()
|
||||||
|
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
|
||||||
|
__ignored_clear_ack = yield
|
||||||
|
|
||||||
|
# Request a NVIC_SystemReset() call.
|
||||||
|
expected_reason = 'SOFTWARE'
|
||||||
|
if expected_reason not in self.device_reasons:
|
||||||
|
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
|
||||||
|
else:
|
||||||
|
# Request a NVIC_SystemReset() call.
|
||||||
|
self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DEVICE_RESET_NVIC)
|
||||||
|
__ignored_reset_ack = yield
|
||||||
|
time.sleep(self.sync_delay)
|
||||||
|
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
|
||||||
|
reset_reason = yield
|
||||||
|
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
|
||||||
|
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
|
||||||
|
__ignored_clear_ack = yield
|
||||||
|
|
||||||
|
# Reset the device using DAP.
|
||||||
|
expected_reason = 'PIN_RESET'
|
||||||
|
if expected_reason not in self.device_reasons:
|
||||||
|
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
|
||||||
|
else:
|
||||||
|
self.reset()
|
||||||
|
__ignored_reset_ack = yield # 'reset_complete'
|
||||||
|
time.sleep(self.sync_delay)
|
||||||
|
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
|
||||||
|
reset_reason = yield
|
||||||
|
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
|
||||||
|
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
|
||||||
|
__ignored_clear_ack = yield
|
||||||
|
|
||||||
|
# Start a watchdog timer and wait for it to reset the device.
|
||||||
|
expected_reason = 'WATCHDOG'
|
||||||
|
if expected_reason not in self.device_reasons or not self.device_has_watchdog:
|
||||||
|
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
|
||||||
|
else:
|
||||||
|
self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DEVICE_RESET_WATCHDOG)
|
||||||
|
__ignored_reset_ack = yield
|
||||||
|
time.sleep(self.sync_delay)
|
||||||
|
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
|
||||||
|
reset_reason = yield
|
||||||
|
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
|
||||||
|
|
||||||
|
# The sequence is correct -- test passed.
|
||||||
|
yield True
|
|
@ -0,0 +1,38 @@
|
||||||
|
"""
|
||||||
|
Copyright (c) 2019 Arm Limited and affiliates.
|
||||||
|
|
||||||
|
SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from mbed_host_tests import BaseHostTest
|
||||||
|
|
||||||
|
|
||||||
|
MSG_KEY_ECHO_MESSAGE = "echo_message"
|
||||||
|
|
||||||
|
|
||||||
|
class SerialComms(BaseHostTest):
|
||||||
|
"""Host side test that handles messages sent using serial classes."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize an object."""
|
||||||
|
super(SerialComms, self).__init__()
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
"""Register call backs to handle message from the target."""
|
||||||
|
self.register_callback(MSG_KEY_ECHO_MESSAGE, self.cb_echo_message)
|
||||||
|
|
||||||
|
def cb_echo_message(self, key, value, timestamp):
|
||||||
|
"""Send back the key and value received."""
|
||||||
|
self.send_kv(key, value)
|
|
@ -0,0 +1,74 @@
|
||||||
|
"""
|
||||||
|
Copyright (c) 2018-2019 Arm Limited and affiliates.
|
||||||
|
|
||||||
|
SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
from mbed_host_tests import BaseHostTest
|
||||||
|
|
||||||
|
DEFAULT_SYNC_DELAY = 4.0
|
||||||
|
|
||||||
|
MSG_VALUE_DUMMY = '0'
|
||||||
|
MSG_KEY_DEVICE_READY = 'ready'
|
||||||
|
MSG_KEY_START_CASE = 'start_case'
|
||||||
|
MSG_KEY_DEVICE_RESET = 'reset_on_case_teardown'
|
||||||
|
MSG_KEY_SYNC = '__sync'
|
||||||
|
|
||||||
|
|
||||||
|
class SyncOnReset(BaseHostTest):
|
||||||
|
"""Host side test that handles device reset during case teardown.
|
||||||
|
|
||||||
|
Given a device that performs a reset during a test case teardown.
|
||||||
|
When the device notifies the host about the reset.
|
||||||
|
Then the host:
|
||||||
|
* keeps track of the test case index of the current test suite,
|
||||||
|
* performs a dev-host handshake,
|
||||||
|
* advances the test suite to next test case.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
Developed for a watchdog test, so that it can be run on devices that
|
||||||
|
do not support watchdog timeout updates after the initial setup.
|
||||||
|
As a solution, after testing watchdog with one set of settings, the
|
||||||
|
device performs a reset and notifies the host with the test case number,
|
||||||
|
so that the test suite may be advanced once the device boots again.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(SyncOnReset, self).__init__()
|
||||||
|
self.test_case_num = 0
|
||||||
|
self.sync_delay = DEFAULT_SYNC_DELAY
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
sync_delay = self.get_config_item('forced_reset_timeout')
|
||||||
|
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
|
||||||
|
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
|
||||||
|
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
|
||||||
|
|
||||||
|
def cb_device_ready(self, key, value, timestamp):
|
||||||
|
"""Advance the device test suite to the next test case."""
|
||||||
|
self.send_kv(MSG_KEY_START_CASE, self.test_case_num)
|
||||||
|
|
||||||
|
def cb_device_reset(self, key, value, timestamp):
|
||||||
|
"""Wait for the device to boot and perform a handshake.
|
||||||
|
|
||||||
|
Additionally, keep track of the last test case number.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.test_case_num = int(value)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
self.test_case_num += 1
|
||||||
|
time.sleep(self.sync_delay)
|
||||||
|
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
|
|
@ -0,0 +1,136 @@
|
||||||
|
"""
|
||||||
|
mbed SDK
|
||||||
|
Copyright (c) 2011-2013 ARM Limited
|
||||||
|
SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from mbed_host_tests import BaseHostTest
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class TimingDriftSync(BaseHostTest):
|
||||||
|
"""
|
||||||
|
This works as master-slave fashion
|
||||||
|
1) Device says its booted up and ready to run the test, wait for host to respond
|
||||||
|
2) Host sends the message to get the device current time i.e base time
|
||||||
|
|
||||||
|
#
|
||||||
|
# *
|
||||||
|
# * |
|
||||||
|
#<---* DUT<- base_time | - round_trip_base_time ------
|
||||||
|
# * | |
|
||||||
|
# * - |
|
||||||
|
# - |
|
||||||
|
# | |
|
||||||
|
# | |
|
||||||
|
# | - measurement_stretch | - nominal_time
|
||||||
|
# | |
|
||||||
|
# | |
|
||||||
|
# - |
|
||||||
|
# * - |
|
||||||
|
# * | |
|
||||||
|
#<---* DUT <-final_time | - round_trip_final_time------
|
||||||
|
# * |
|
||||||
|
# * -
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# As we increase the measurement_stretch, the error because of transport delay diminishes.
|
||||||
|
# The values of measurement_stretch is propotional to round_trip_base_time(transport delays)
|
||||||
|
# by factor time_measurement_multiplier.This multiplier is used is 80 to tolerate 2 sec of
|
||||||
|
# transport delay and test time ~ 180 secs
|
||||||
|
#
|
||||||
|
# Failure in timing can occur if we are ticking too fast or we are ticking too slow, hence we have
|
||||||
|
# min_range and max_range. if we cross on either side tests would be marked fail. The range is a function of
|
||||||
|
# tolerance/acceptable drift currently its 5%.
|
||||||
|
#
|
||||||
|
|
||||||
|
"""
|
||||||
|
__result = None
|
||||||
|
mega = 1000000.0
|
||||||
|
max_measurement_time = 180
|
||||||
|
|
||||||
|
# this value is obtained for measurements when there is 0 transport delay and we want accurancy of 5%
|
||||||
|
time_measurement_multiplier = 80
|
||||||
|
|
||||||
|
def _callback_timing_drift_check_start(self, key, value, timestamp):
|
||||||
|
self.round_trip_base_start = timestamp
|
||||||
|
self.send_kv("base_time", 0)
|
||||||
|
|
||||||
|
def _callback_base_time(self, key, value, timestamp):
|
||||||
|
self.round_trip_base_end = timestamp
|
||||||
|
self.device_time_base = float(value)
|
||||||
|
self.round_trip_base_time = self.round_trip_base_end - self.round_trip_base_start
|
||||||
|
|
||||||
|
self.log("Device base time {}".format(value))
|
||||||
|
measurement_stretch = (self.round_trip_base_time * self.time_measurement_multiplier) + 5
|
||||||
|
|
||||||
|
if measurement_stretch > self.max_measurement_time:
|
||||||
|
self.log("Time required {} to determine device timer is too high due to transport delay, skipping".format(measurement_stretch))
|
||||||
|
else:
|
||||||
|
self.log("sleeping for {} to measure drift accurately".format(measurement_stretch))
|
||||||
|
time.sleep(measurement_stretch)
|
||||||
|
self.round_trip_final_start = time.time()
|
||||||
|
self.send_kv("final_time", 0)
|
||||||
|
|
||||||
|
def _callback_final_time(self, key, value, timestamp):
|
||||||
|
self.round_trip_final_end = timestamp
|
||||||
|
self.device_time_final = float(value)
|
||||||
|
self.round_trip_final_time = self.round_trip_final_end - self.round_trip_final_start
|
||||||
|
self.log("Device final time {} ".format(value))
|
||||||
|
|
||||||
|
# compute the test results and send to device
|
||||||
|
results = "pass" if self.compute_parameter() else "fail"
|
||||||
|
self.send_kv(results, "0")
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
self.register_callback('timing_drift_check_start', self._callback_timing_drift_check_start)
|
||||||
|
self.register_callback('base_time', self._callback_base_time)
|
||||||
|
self.register_callback('final_time', self._callback_final_time)
|
||||||
|
|
||||||
|
def compute_parameter(self, failure_criteria=0.05):
|
||||||
|
t_max = self.round_trip_final_end - self.round_trip_base_start
|
||||||
|
t_min = self.round_trip_final_start - self.round_trip_base_end
|
||||||
|
t_max_hi = t_max * (1 + failure_criteria)
|
||||||
|
t_max_lo = t_max * (1 - failure_criteria)
|
||||||
|
t_min_hi = t_min * (1 + failure_criteria)
|
||||||
|
t_min_lo = t_min * (1 - failure_criteria)
|
||||||
|
device_time = (self.device_time_final - self.device_time_base) / self.mega
|
||||||
|
|
||||||
|
self.log("Compute host events")
|
||||||
|
self.log("Transport delay 0: {}".format(self.round_trip_base_time))
|
||||||
|
self.log("Transport delay 1: {}".format(self.round_trip_final_time))
|
||||||
|
self.log("DUT base time : {}".format(self.device_time_base))
|
||||||
|
self.log("DUT end time : {}".format(self.device_time_final))
|
||||||
|
|
||||||
|
self.log("min_pass : {} , max_pass : {} for {}%%".format(t_max_lo, t_min_hi, failure_criteria * 100))
|
||||||
|
self.log("min_inconclusive : {} , max_inconclusive : {}".format(t_min_lo, t_max_hi))
|
||||||
|
self.log("Time reported by device: {}".format(device_time))
|
||||||
|
|
||||||
|
if t_max_lo <= device_time <= t_min_hi:
|
||||||
|
self.log("Test passed !!!")
|
||||||
|
self.__result = True
|
||||||
|
elif t_min_lo <= device_time <= t_max_hi:
|
||||||
|
self.log("Test inconclusive due to transport delay, retrying")
|
||||||
|
self.__result = False
|
||||||
|
else:
|
||||||
|
self.log("Time outside of passing range. Timing drift seems to be present !!!")
|
||||||
|
self.__result = False
|
||||||
|
return self.__result
|
||||||
|
|
||||||
|
def result(self):
|
||||||
|
return self.__result
|
||||||
|
|
||||||
|
def teardown(self):
|
||||||
|
pass
|
|
@ -0,0 +1,145 @@
|
||||||
|
"""
|
||||||
|
Copyright (c) 2018-2019 Arm Limited and affiliates.
|
||||||
|
SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
"""
|
||||||
|
import collections
|
||||||
|
import threading
|
||||||
|
from mbed_host_tests import BaseHostTest
|
||||||
|
|
||||||
|
TestCaseData = collections.namedtuple('TestCaseData', ['index', 'data_to_send'])
|
||||||
|
|
||||||
|
DEFAULT_SYNC_DELAY = 4.0
|
||||||
|
MAX_HB_PERIOD = 2.5 # [s] Max expected heartbeat period.
|
||||||
|
|
||||||
|
MSG_VALUE_DUMMY = '0'
|
||||||
|
CASE_DATA_INVALID = 0xffffffff
|
||||||
|
CASE_DATA_PHASE2_OK = 0xfffffffe
|
||||||
|
CASE_DATA_INSUFF_HB = 0x0
|
||||||
|
|
||||||
|
MSG_KEY_SYNC = '__sync'
|
||||||
|
MSG_KEY_DEVICE_READY = 'ready'
|
||||||
|
MSG_KEY_START_CASE = 'start_case'
|
||||||
|
MSG_KEY_DEVICE_RESET = 'dev_reset'
|
||||||
|
MSG_KEY_HEARTBEAT = 'hb'
|
||||||
|
|
||||||
|
|
||||||
|
class WatchdogReset(BaseHostTest):
|
||||||
|
"""Host side test that handles device reset.
|
||||||
|
|
||||||
|
Given a device with a watchdog timer started.
|
||||||
|
When the device notifies the host about an incoming reset.
|
||||||
|
Then the host:
|
||||||
|
* keeps track of the test case index of the current test suite,
|
||||||
|
* performs a dev-host handshake.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(WatchdogReset, self).__init__()
|
||||||
|
self.current_case = TestCaseData(0, CASE_DATA_INVALID)
|
||||||
|
self.__handshake_timer = None
|
||||||
|
self.sync_delay = DEFAULT_SYNC_DELAY
|
||||||
|
self.drop_heartbeat_messages = True
|
||||||
|
self.hb_timestamps_us = []
|
||||||
|
|
||||||
|
def handshake_timer_start(self, seconds=1.0, pre_sync_fun=None):
|
||||||
|
"""Start a new handshake timer."""
|
||||||
|
|
||||||
|
def timer_handler():
|
||||||
|
"""Perform a dev-host handshake by sending a sync message."""
|
||||||
|
if pre_sync_fun is not None:
|
||||||
|
pre_sync_fun()
|
||||||
|
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
|
||||||
|
|
||||||
|
self.__handshake_timer = threading.Timer(seconds, timer_handler)
|
||||||
|
self.__handshake_timer.start()
|
||||||
|
|
||||||
|
def handshake_timer_cancel(self):
|
||||||
|
"""Cancel the current handshake timer."""
|
||||||
|
try:
|
||||||
|
self.__handshake_timer.cancel()
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self.__handshake_timer = None
|
||||||
|
|
||||||
|
def heartbeat_timeout_handler(self):
|
||||||
|
"""Handler for the heartbeat timeout.
|
||||||
|
|
||||||
|
Compute the time span of the last heartbeat sequence.
|
||||||
|
Set self.current_case.data_to_send to CASE_DATA_INVALID if no heartbeat was received.
|
||||||
|
Set self.current_case.data_to_send to CASE_DATA_INSUFF_HB if only one heartbeat was
|
||||||
|
received.
|
||||||
|
"""
|
||||||
|
self.drop_heartbeat_messages = True
|
||||||
|
dev_data = CASE_DATA_INVALID
|
||||||
|
if len(self.hb_timestamps_us) == 1:
|
||||||
|
dev_data = CASE_DATA_INSUFF_HB
|
||||||
|
self.log('Not enough heartbeats received.')
|
||||||
|
elif len(self.hb_timestamps_us) >= 2:
|
||||||
|
dev_data = int(round(0.001 * (self.hb_timestamps_us[-1] - self.hb_timestamps_us[0])))
|
||||||
|
self.log('Heartbeat time span was {} ms.'.format(dev_data))
|
||||||
|
self.current_case = TestCaseData(self.current_case.index, dev_data)
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
sync_delay = self.get_config_item('forced_reset_timeout')
|
||||||
|
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
|
||||||
|
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
|
||||||
|
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
|
||||||
|
self.register_callback(MSG_KEY_HEARTBEAT, self.cb_heartbeat)
|
||||||
|
|
||||||
|
def teardown(self):
|
||||||
|
self.handshake_timer_cancel()
|
||||||
|
|
||||||
|
def cb_device_ready(self, key, value, timestamp):
|
||||||
|
"""Advance the device test suite to a proper test case.
|
||||||
|
|
||||||
|
Additionally, send test case data to the device.
|
||||||
|
"""
|
||||||
|
self.handshake_timer_cancel()
|
||||||
|
msg_value = '{0.index:02x},{0.data_to_send:08x}'.format(self.current_case)
|
||||||
|
self.send_kv(MSG_KEY_START_CASE, msg_value)
|
||||||
|
self.drop_heartbeat_messages = False
|
||||||
|
self.hb_timestamps_us = []
|
||||||
|
|
||||||
|
def cb_device_reset(self, key, value, timestamp):
|
||||||
|
"""Keep track of the test case number.
|
||||||
|
|
||||||
|
Also set a new handshake timeout, so when the device gets
|
||||||
|
restarted by the watchdog, the communication will be restored
|
||||||
|
by the __handshake_timer.
|
||||||
|
"""
|
||||||
|
self.handshake_timer_cancel()
|
||||||
|
case_num, dev_reset_delay_ms = (int(i, base=16) for i in value.split(','))
|
||||||
|
self.current_case = TestCaseData(case_num, CASE_DATA_PHASE2_OK)
|
||||||
|
self.handshake_timer_start(self.sync_delay + dev_reset_delay_ms / 1000.0)
|
||||||
|
|
||||||
|
def cb_heartbeat(self, key, value, timestamp):
|
||||||
|
"""Save the timestamp of a heartbeat message.
|
||||||
|
|
||||||
|
Additionally, keep track of the test case number.
|
||||||
|
|
||||||
|
Also each heartbeat sets a new timeout, so when the device gets
|
||||||
|
restarted by the watchdog, the communication will be restored
|
||||||
|
by the __handshake_timer.
|
||||||
|
"""
|
||||||
|
if self.drop_heartbeat_messages:
|
||||||
|
return
|
||||||
|
self.handshake_timer_cancel()
|
||||||
|
case_num, timestamp_us = (int(i, base=16) for i in value.split(','))
|
||||||
|
self.current_case = TestCaseData(case_num, CASE_DATA_INVALID)
|
||||||
|
self.hb_timestamps_us.append(timestamp_us)
|
||||||
|
self.handshake_timer_start(
|
||||||
|
seconds=(MAX_HB_PERIOD + self.sync_delay),
|
||||||
|
pre_sync_fun=self.heartbeat_timeout_handler)
|
Loading…
Reference in New Issue