Merge pull request #13489 from rajkan01/hal_directory_restructuring

Refactor hal directory
pull/13531/head
Martin Kojtal 2020-09-02 12:10:42 +01:00 committed by GitHub
commit 9073a48667
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
147 changed files with 1050 additions and 5 deletions

View File

@ -17,6 +17,8 @@
^storage/filesystem/littlefsv2/littlefs/
^features/unsupported/
^hal/storage_abstraction
^hal/tests/TESTS/mbed_hal/trng/pithy
^hal/tests/TESTS/mbed_hal/trng/pithy
^platform/cxxsupport
^platform/FEATURE_EXPERIMENTAL_API/FEATURE_PSA/TARGET_MBED_PSA_SRV
^platform/FEATURE_EXPERIMENTAL_API/FEATURE_PSA/TARGET_TFM
@ -25,8 +27,6 @@
^rtos/source/TARGET_CORTEX/rtx4
^rtos/source/TARGET_CORTEX/rtx5
^targets
^TESTS/mbed_hal/trng/pithy
^TESTS/mbed_hal/trng/pithy
^tools
^UNITTESTS
^storage/blockdevice/tests/UNITTESTS

View File

@ -126,6 +126,7 @@ set(unittest-includes-base
"${PROJECT_SOURCE_DIR}/../drivers/include/drivers"
"${PROJECT_SOURCE_DIR}/../drivers/include/drivers/internal"
"${PROJECT_SOURCE_DIR}/../hal"
"${PROJECT_SOURCE_DIR}/../hal/include"
"${PROJECT_SOURCE_DIR}/../events/include"
"${PROJECT_SOURCE_DIR}/../events/include/events/internal"
"${PROJECT_SOURCE_DIR}/../events/source"

View File

@ -17,7 +17,7 @@
#include "stdlib.h"
#include "us_ticker_api.h"
#include "hal/us_ticker_api.h"
const ticker_data_t *get_us_ticker_data(void)
{

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "watchdog_api.h"
#include "hal/watchdog_api.h"
#if DEVICE_WATCHDOG

View File

@ -47,6 +47,6 @@ typedef enum {
#ifdef __cplusplus
}
#endif
#include "pinmap.h"
#include "hal/pinmap.h"
#endif

View File

@ -0,0 +1,181 @@
"""
Copyright (c) 2018-2019 Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from mbed_host_tests import BaseHostTest
DEFAULT_SYNC_DELAY = 4.0
MSG_VALUE_WATCHDOG_PRESENT = 1
MSG_VALUE_DUMMY = '0'
MSG_VALUE_RESET_REASON_GET = 'get'
MSG_VALUE_RESET_REASON_CLEAR = 'clear'
MSG_VALUE_DEVICE_RESET_NVIC = 'nvic'
MSG_VALUE_DEVICE_RESET_WATCHDOG = 'watchdog'
MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_RESET_REASON_RAW = 'reason_raw'
MSG_KEY_RESET_REASON = 'reason'
MSG_KEY_DEVICE_RESET = 'reset'
MSG_KEY_SYNC = '__sync'
MSG_KEY_RESET_COMPLETE = 'reset_complete'
RESET_REASONS = {
'POWER_ON': '0',
'PIN_RESET': '1',
'BROWN_OUT': '2',
'SOFTWARE': '3',
'WATCHDOG': '4',
'LOCKUP': '5',
'WAKE_LOW_POWER': '6',
'ACCESS_ERROR': '7',
'BOOT_ERROR': '8',
'MULTIPLE': '9',
'PLATFORM': '10',
'UNKNOWN': '11'
}
def raise_if_different(expected, actual, text=''):
"""Raise a RuntimeError if actual is different than expected."""
if expected != actual:
raise RuntimeError('{}Got {!r}, expected {!r}'
.format(text, actual, expected))
class ResetReasonTest(BaseHostTest):
"""Test for the Reset Reason HAL API.
Given a device supporting a Reset Reason API.
When the device is restarted using various methods.
Then the device returns a correct reset reason for every restart.
"""
def __init__(self):
super(ResetReasonTest, self).__init__()
self.device_reasons = None
self.device_has_watchdog = None
self.raw_reset_reasons = set()
self.sync_delay = DEFAULT_SYNC_DELAY
self.test_steps_sequence = self.test_steps()
# Advance the coroutine to it's first yield statement.
self.test_steps_sequence.send(None)
def setup(self):
sync_delay = self.get_config_item('forced_reset_timeout')
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
self.register_callback(MSG_KEY_RESET_REASON_RAW, self.cb_reset_reason_raw)
self.register_callback(MSG_KEY_RESET_REASON, self.cb_reset_reason)
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_reset_reason)
self.register_callback(MSG_KEY_RESET_COMPLETE, self.cb_reset_reason)
def cb_device_ready(self, key, value, timestamp):
"""Request a raw value of the reset_reason register.
Additionally, save the device's reset_reason capabilities
and the watchdog status on the first call.
"""
if self.device_reasons is None:
reasons, wdg_status = (int(i, base=16) for i in value.split(','))
self.device_has_watchdog = (wdg_status == MSG_VALUE_WATCHDOG_PRESENT)
self.device_reasons = [k for k, v in RESET_REASONS.items() if (reasons & 1 << int(v))]
self.send_kv(MSG_KEY_RESET_REASON_RAW, MSG_VALUE_RESET_REASON_GET)
def cb_reset_reason_raw(self, key, value, timestamp):
"""Verify that the raw reset_reason register value is unique.
Fail the test suite if the raw reset_reason value is not unique.
Request a platform independent reset_reason otherwise.
"""
if value in self.raw_reset_reasons:
self.log('TEST FAILED: The raw reset reason is not unique. '
'{!r} is already present in {!r}.'
.format(value, self.raw_reset_reasons))
self.notify_complete(False)
else:
self.raw_reset_reasons.add(value)
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_GET)
def cb_reset_reason(self, key, value, timestamp):
"""Feed the test_steps coroutine with reset_reason value.
Pass the test suite if the coroutine yields True.
Fail the test suite if the iterator stops or raises a RuntimeError.
"""
try:
if self.test_steps_sequence.send(value):
self.notify_complete(True)
except (StopIteration, RuntimeError) as exc:
self.log('TEST FAILED: {}'.format(exc))
self.notify_complete(False)
def test_steps(self):
"""Generate a sequence of test steps.
This coroutine calls yield to wait for the input from the device
(the reset_reason). If the device gives the wrong response, the
generator raises a RuntimeError exception and fails the test.
"""
# Ignore the first reason.
__ignored_reset_reason = yield
self.raw_reset_reasons.clear()
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
__ignored_clear_ack = yield
# Request a NVIC_SystemReset() call.
expected_reason = 'SOFTWARE'
if expected_reason not in self.device_reasons:
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
else:
# Request a NVIC_SystemReset() call.
self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DEVICE_RESET_NVIC)
__ignored_reset_ack = yield
time.sleep(self.sync_delay)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
reset_reason = yield
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
__ignored_clear_ack = yield
# Reset the device using DAP.
expected_reason = 'PIN_RESET'
if expected_reason not in self.device_reasons:
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
else:
self.reset()
__ignored_reset_ack = yield # 'reset_complete'
time.sleep(self.sync_delay)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
reset_reason = yield
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
__ignored_clear_ack = yield
# Start a watchdog timer and wait for it to reset the device.
expected_reason = 'WATCHDOG'
if expected_reason not in self.device_reasons or not self.device_has_watchdog:
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
else:
self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DEVICE_RESET_WATCHDOG)
__ignored_reset_ack = yield
time.sleep(self.sync_delay)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
reset_reason = yield
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
# The sequence is correct -- test passed.
yield True

View File

@ -0,0 +1,140 @@
"""
Copyright (c) 2011-2020, Arm Limited and affiliates
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from mbed_host_tests import BaseHostTest
import time
import calendar
import datetime
class RTC_time_calc_test(BaseHostTest):
"""
This is the host part of the test to verify if:
- _rtc_mktime function converts a calendar time into time since UNIX epoch as a time_t,
- _rtc_localtime function converts a given time in seconds since epoch into calendar time.
The same algoritm to generate next calendar time to be tested is used by both parts of the test.
We will check if correct time since UNIX epoch is calculated for the first and the last day
of each month and across valid years.
Mbed part of the test sends calculated time since UNIX epoch.
This part validates given value and responds to indicate pass or fail.
Additionally it sends also encoded day of week and day of year which
will be needed to verify _rtc_localtime.
Support for both types of RTC devices is provided:
- RTCs which handles all leap years in the mentioned year range correctly. Leap year is determined by checking if
the year counter value is divisible by 400, 100, and 4. No problem here.
- RTCs which handles leap years correctly up to 2100. The RTC does a simple bit comparison to see if the two
lowest order bits of the year counter are zero. In this case 2100 year will be considered
incorrectly as a leap year, so the last valid point in time will be 28.02.2100 23:59:59 and next day will be
29.02.2100 (invalid). So after 28.02.2100 the day counter will be off by a day.
"""
edge_date = datetime.datetime(2100, 2, 28, 0, 0, 0)
# Test the following years:
# - first - 1970
# - example not leap year (not divisible by 4)
# - example leap year (divisible by 4 and by 100 and by 400)
# - example leap year (divisible by 4 and not by 100)
# - example not leap year (divisible by 4 and by 100)
# - last fully supported - 2105
years = [1970, 1971, 2000, 2096, 2100, 2105]
year_id = 0
full_leap_year_support = False
RTC_FULL_LEAP_YEAR_SUPPORT = 0
RTC_PARTIAL_LEAP_YEAR_SUPPORT = 1
def _set_leap_year_support(self, key, value, timestamp):
if (int(value) == self.RTC_FULL_LEAP_YEAR_SUPPORT):
self.full_leap_year_support = True
else:
self.full_leap_year_support = False
self.first = True
self.date = datetime.datetime(1970, 1, 1, 23, 0, 0)
self.year_id = 0
def _verify_timestamp(self, key, value, timestamp):
# week day in python is counted from sunday(0) and on mbed side week day is counted from monday(0).
# year day in python is counted from 1 and on mbed side year day is counted from 0.
week_day = ((self.date.timetuple().tm_wday + 1) % 7)
year_day = self.date.timetuple().tm_yday - 1
# Fix for RTC which not have full leap year support.
if (not self.full_leap_year_support):
if self.date >= self.edge_date:
# After 28.02.2100 we should be one day off - add this day and store original
date_org = self.date
self.date += datetime.timedelta(days = 1)
# Adjust week day.
week_day = ((self.date.timetuple().tm_wday + 1) % 7)
# Adjust year day.
if (self.date.year == 2100):
year_day = self.date.timetuple().tm_yday - 1
else:
year_day = date_org.timetuple().tm_yday - 1
# Last day in year
if (self.date.month == 1 and self.date.day == 1):
if (self.date.year == 2101):
# Exception for year 2100 - ivalid handled by RTC without full leap year support
year_day = 365
else:
year_day = date_org.timetuple().tm_yday - 1
t = (self.date.year , self.date.month, self.date.day, self.date.hour, self.date.minute, self.date.second, 0, 0, 0)
expected_timestamp = calendar.timegm(t)
actual_timestamp = int(value) & 0xffffffff # convert to unsigned int
# encode week day and year day in the response
response = (week_day << 16) | year_day
if (actual_timestamp == expected_timestamp):
# response contains encoded week day and year day
self.send_kv("passed", str(response))
else:
self.send_kv("failed", 0)
print("expected = %d, result = %d" % (expected_timestamp , actual_timestamp))
# calculate next date
if (self.first):
days_range = calendar.monthrange(self.date.year, self.date.month)
self.date = self.date.replace(day = days_range[1], minute = 59, second = 59)
self.first = not self.first
else:
self.date += datetime.timedelta(days = 1)
if (self.date.month == 1):
self.year_id += 1
if (len(self.years) == self.year_id):
# All years were processed, no need to calc next date
return
self.date = self.date.replace(year = self.years[self.year_id])
self.date = self.date.replace(day = 1, minute = 0, second = 0)
self.first = not self.first
def setup(self):
self.register_callback('timestamp', self._verify_timestamp)
self.register_callback('leap_year_setup', self._set_leap_year_support)

View File

@ -0,0 +1,145 @@
"""
mbed SDK
Copyright (c) 2017-2017 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
from mbed_host_tests import BaseHostTest
from time import sleep
class RtcResetTest(BaseHostTest):
"""This test checks that a device's RTC keeps count through a reset
It does this by setting the RTC's time, triggering a reset,
delaying and then reading the RTC's time again to ensure
that the RTC is still counting.
"""
"""Start of the RTC"""
START_TIME = 1537789823 # GMT: Monday, 24 September 2018 11:50:23
START_TIME_TOLERANCE = 10
"""Time to delay after sending reset"""
DELAY_TIME = 5.0
DELAY_TOLERANCE = 1.0
VALUE_PLACEHOLDER = "0"
def setup(self):
"""Register callbacks required for the test"""
self._error = False
generator = self.rtc_reset_test()
next(generator)
def run_gen(key, value, time):
"""Run the generator, and fail testing if the iterator stops"""
if self._error:
return
try:
generator.send((key, value, time))
except StopIteration:
self._error = True
for resp in ("start", "read", "ack"):
self.register_callback(resp, run_gen)
def teardown(self):
"""No work to do here"""
pass
def rtc_reset_test(self):
"""Generator for running the reset test
This function calls yield to wait for the next event from
the device. If the device gives the wrong response, then the
generator terminates by returing which raises a StopIteration
exception and fails the test.
"""
# Wait for start token
key, value, time = yield
if key != "start":
return
# Initialize, and set the time
self.send_kv("init", self.VALUE_PLACEHOLDER)
# Wait for ack from the device
key, value, time = yield
if key != "ack":
return
self.send_kv("write", str(self.START_TIME))
# Wait for ack from the device
key, value, time = yield
if key != "ack":
return
self.send_kv("read", self.VALUE_PLACEHOLDER)
key, value, time = yield
if key != "read":
return
dev_time_start = int(value)
# Unitialize, and reset
self.send_kv("free", self.VALUE_PLACEHOLDER)
# Wait for ack from the device
key, value, time = yield
if key != "ack":
return
self.send_kv("reset", self.VALUE_PLACEHOLDER)
# No ack after reset
sleep(self.DELAY_TIME)
# Restart the test, and send the sync token
self.send_kv("__sync", "00000000-0000-000000000-000000000000")
key, value, time = yield
if key != "start":
return
# Initialize, and read the time
self.send_kv("init", self.VALUE_PLACEHOLDER)
# Wait for ack from the device
key, value, time = yield
if key != "ack":
return
self.send_kv("read", self.VALUE_PLACEHOLDER)
key, value, time = yield
if key != "read":
return
dev_time_end = int(value)
# Check result
elapsed = dev_time_end - dev_time_start
start_time_valid = (self.START_TIME <= dev_time_start <
self.START_TIME + self.START_TIME_TOLERANCE)
elapsed_time_valid = elapsed >= self.DELAY_TIME - self.DELAY_TOLERANCE
passed = start_time_valid and elapsed_time_valid
if not start_time_valid:
self.log("FAIL: Expected start time of %i got %i" %
(self.START_TIME, dev_time_start))
elif not passed:
self.log("FAIL: Delayed for %fs but device "
"reported elapsed time of %fs" %
(self.DELAY_TIME, elapsed))
self.send_kv("exit", "pass" if passed else "fail")
yield # No more events expected

View File

@ -0,0 +1,74 @@
"""
Copyright (c) 2018-2019 Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from mbed_host_tests import BaseHostTest
DEFAULT_SYNC_DELAY = 4.0
MSG_VALUE_DUMMY = '0'
MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_START_CASE = 'start_case'
MSG_KEY_DEVICE_RESET = 'reset_on_case_teardown'
MSG_KEY_SYNC = '__sync'
class SyncOnReset(BaseHostTest):
"""Host side test that handles device reset during case teardown.
Given a device that performs a reset during a test case teardown.
When the device notifies the host about the reset.
Then the host:
* keeps track of the test case index of the current test suite,
* performs a dev-host handshake,
* advances the test suite to next test case.
Note:
Developed for a watchdog test, so that it can be run on devices that
do not support watchdog timeout updates after the initial setup.
As a solution, after testing watchdog with one set of settings, the
device performs a reset and notifies the host with the test case number,
so that the test suite may be advanced once the device boots again.
"""
def __init__(self):
super(SyncOnReset, self).__init__()
self.test_case_num = 0
self.sync_delay = DEFAULT_SYNC_DELAY
def setup(self):
sync_delay = self.get_config_item('forced_reset_timeout')
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
def cb_device_ready(self, key, value, timestamp):
"""Advance the device test suite to the next test case."""
self.send_kv(MSG_KEY_START_CASE, self.test_case_num)
def cb_device_reset(self, key, value, timestamp):
"""Wait for the device to boot and perform a handshake.
Additionally, keep track of the last test case number.
"""
try:
self.test_case_num = int(value)
except ValueError:
pass
self.test_case_num += 1
time.sleep(self.sync_delay)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)

View File

@ -0,0 +1,76 @@
"""
Copyright (c) 2018 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from mbed_host_tests import BaseHostTest
from mbed_host_tests.host_tests_runner.host_test_default import DefaultTestSelector
DEFAULT_CYCLE_PERIOD = 1.0
MSG_VALUE_DUMMY = '0'
MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_DEVICE_RESET = 'reset'
MSG_KEY_SYNC = '__sync'
class SystemResetTest(BaseHostTest):
"""Test for the system_reset API.
Given a device running code
When the device is restarted using @a system_reset()
Then the device is restarted
"""
def __init__(self):
super(SystemResetTest, self).__init__()
self.reset = False
self.test_steps_sequence = self.test_steps()
# Advance the coroutine to it's first yield statement.
self.test_steps_sequence.send(None)
def setup(self):
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
def cb_device_ready(self, key, value, timestamp):
"""Acknowledge device rebooted correctly and feed the test execution
"""
self.reset = True
try:
if self.test_steps_sequence.send(value):
self.notify_complete(True)
except (StopIteration, RuntimeError) as exc:
self.notify_complete(False)
def test_steps(self):
"""Reset the device and check the status
"""
system_reset = yield
self.reset = False
wait_after_reset = self.get_config_item('forced_reset_timeout')
wait_after_reset = wait_after_reset if wait_after_reset is not None else DEFAULT_CYCLE_PERIOD
self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DUMMY)
time.sleep(wait_after_reset)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
system_reset = yield
if self.reset == False:
raise RuntimeError('Platform did not reset as expected.')
# The sequence is correct -- test passed.
yield True

View File

@ -0,0 +1,136 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from mbed_host_tests import BaseHostTest
import time
class TimingDriftSync(BaseHostTest):
"""
This works as master-slave fashion
1) Device says its booted up and ready to run the test, wait for host to respond
2) Host sends the message to get the device current time i.e base time
#
# *
# * |
#<---* DUT<- base_time | - round_trip_base_time ------
# * | |
# * - |
# - |
# | |
# | |
# | - measurement_stretch | - nominal_time
# | |
# | |
# - |
# * - |
# * | |
#<---* DUT <-final_time | - round_trip_final_time------
# * |
# * -
#
#
# As we increase the measurement_stretch, the error because of transport delay diminishes.
# The values of measurement_stretch is propotional to round_trip_base_time(transport delays)
# by factor time_measurement_multiplier.This multiplier is used is 80 to tolerate 2 sec of
# transport delay and test time ~ 180 secs
#
# Failure in timing can occur if we are ticking too fast or we are ticking too slow, hence we have
# min_range and max_range. if we cross on either side tests would be marked fail. The range is a function of
# tolerance/acceptable drift currently its 5%.
#
"""
__result = None
mega = 1000000.0
max_measurement_time = 180
# this value is obtained for measurements when there is 0 transport delay and we want accurancy of 5%
time_measurement_multiplier = 80
def _callback_timing_drift_check_start(self, key, value, timestamp):
self.round_trip_base_start = timestamp
self.send_kv("base_time", 0)
def _callback_base_time(self, key, value, timestamp):
self.round_trip_base_end = timestamp
self.device_time_base = float(value)
self.round_trip_base_time = self.round_trip_base_end - self.round_trip_base_start
self.log("Device base time {}".format(value))
measurement_stretch = (self.round_trip_base_time * self.time_measurement_multiplier) + 5
if measurement_stretch > self.max_measurement_time:
self.log("Time required {} to determine device timer is too high due to transport delay, skipping".format(measurement_stretch))
else:
self.log("sleeping for {} to measure drift accurately".format(measurement_stretch))
time.sleep(measurement_stretch)
self.round_trip_final_start = time.time()
self.send_kv("final_time", 0)
def _callback_final_time(self, key, value, timestamp):
self.round_trip_final_end = timestamp
self.device_time_final = float(value)
self.round_trip_final_time = self.round_trip_final_end - self.round_trip_final_start
self.log("Device final time {} ".format(value))
# compute the test results and send to device
results = "pass" if self.compute_parameter() else "fail"
self.send_kv(results, "0")
def setup(self):
self.register_callback('timing_drift_check_start', self._callback_timing_drift_check_start)
self.register_callback('base_time', self._callback_base_time)
self.register_callback('final_time', self._callback_final_time)
def compute_parameter(self, failure_criteria=0.05):
t_max = self.round_trip_final_end - self.round_trip_base_start
t_min = self.round_trip_final_start - self.round_trip_base_end
t_max_hi = t_max * (1 + failure_criteria)
t_max_lo = t_max * (1 - failure_criteria)
t_min_hi = t_min * (1 + failure_criteria)
t_min_lo = t_min * (1 - failure_criteria)
device_time = (self.device_time_final - self.device_time_base) / self.mega
self.log("Compute host events")
self.log("Transport delay 0: {}".format(self.round_trip_base_time))
self.log("Transport delay 1: {}".format(self.round_trip_final_time))
self.log("DUT base time : {}".format(self.device_time_base))
self.log("DUT end time : {}".format(self.device_time_final))
self.log("min_pass : {} , max_pass : {} for {}%%".format(t_max_lo, t_min_hi, failure_criteria * 100))
self.log("min_inconclusive : {} , max_inconclusive : {}".format(t_min_lo, t_max_hi))
self.log("Time reported by device: {}".format(device_time))
if t_max_lo <= device_time <= t_min_hi:
self.log("Test passed !!!")
self.__result = True
elif t_min_lo <= device_time <= t_max_hi:
self.log("Test inconclusive due to transport delay, retrying")
self.__result = False
else:
self.log("Time outside of passing range. Timing drift seems to be present !!!")
self.__result = False
return self.__result
def result(self):
return self.__result
def teardown(self):
pass

View File

@ -0,0 +1,147 @@
"""
Copyright (c) 2018-2020, Arm Limited and affiliates
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
This script is the host script for trng test sequence, it send the
step signaling sequence and receive and transmit data to the device after
reset if necesarry (default loading and storing mechanism while reseting the device
is KVStore, in case KVStore isn't enabled we'll use current infrastructure,
for more details see main.cpp file)
"""
import time
from mbed_host_tests import BaseHostTest
from mbed_host_tests.host_tests_runner.host_test_default import DefaultTestSelector
from time import sleep
DEFAULT_CYCLE_PERIOD = 1.0
MSG_VALUE_DUMMY = '0'
MSG_TRNG_READY = 'ready'
MSG_TRNG_BUFFER = 'buffer'
MSG_TRNG_TEST_STEP1 = 'check_step1'
MSG_TRNG_TEST_STEP2 = 'check_step2'
MSG_KEY_SYNC = '__sync'
MSG_KEY_RESET_COMPLETE = 'reset_complete'
MSG_KEY_TEST_SUITE_ENDED = 'Test_suite_ended'
MSG_KEY_EXIT = 'exit'
class TRNGResetTest(BaseHostTest):
"""Test for the TRNG API.
"""
def __init__(self):
super(TRNGResetTest, self).__init__()
self.did_reset = False
self.ready = False
self.suite_ended = False
self.buffer = 0
self.test_steps_sequence = self.test_steps()
# Advance the coroutine to it's first yield statement.
self.test_steps_sequence.send(None)
#define callback functions for msg handling
def setup(self):
self.register_callback(MSG_TRNG_READY, self.cb_device_ready)
self.register_callback(MSG_TRNG_BUFFER, self.cb_trng_buffer)
self.register_callback(MSG_KEY_TEST_SUITE_ENDED, self.cb_device_test_suit_ended)
self.register_callback(MSG_KEY_RESET_COMPLETE, self.cb_reset_complete)
#receive sent data from device before reset
def cb_trng_buffer(self, key, value, timestamp):
"""Acknowledge device rebooted correctly and feed the test execution
"""
self.buffer = value
try:
if self.test_steps_sequence.send(value):
self.notify_complete(True)
except (StopIteration, RuntimeError) as exc:
self.notify_complete(False)
def cb_device_ready(self, key, value, timestamp):
"""Acknowledge device rebooted correctly and feed the test execution
"""
self.ready = True
try:
if self.test_steps_sequence.send(value):
self.notify_complete(True)
except (StopIteration, RuntimeError) as exc:
self.notify_complete(False)
def cb_reset_complete(self, key, value, timestamp):
"""Acknowledge reset complete
"""
self.did_reset = True
try:
if self.test_steps_sequence.send(value):
self.notify_complete(True)
except (StopIteration, RuntimeError) as exc:
self.notify_complete(False)
def cb_device_test_suit_ended(self, key, value, timestamp):
"""Acknowledge device finished a test step correctly and feed the test execution
"""
self.suite_ended = True
try:
if self.test_steps_sequence.send(value):
self.notify_complete(True)
except (StopIteration, RuntimeError) as exc:
self.notify_complete(False)
#define test steps and actions
def test_steps(self):
"""Test step 1
"""
wait_for_communication = yield
self.ready = False
self.did_reset = False
self.suite_ended = False
self.send_kv(MSG_TRNG_TEST_STEP1, MSG_VALUE_DUMMY)
wait_for_communication = yield
if self.buffer == 0:
raise RuntimeError('Phase 1: No buffer received.')
self.reset()
"""Test step 2 (After reset)
"""
wait_for_communication = yield
if self.did_reset == False:
raise RuntimeError('Phase 1: Platform did not reset as expected.')
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
wait_for_communication = yield
if self.ready == False:
raise RuntimeError('Phase 1: Platform not ready as expected.')
self.send_kv(MSG_TRNG_TEST_STEP2, self.buffer)
wait_for_communication = yield
if self.suite_ended == False:
raise RuntimeError('Test failed.')
self.send_kv(MSG_KEY_EXIT, MSG_VALUE_DUMMY)
# The sequence is correct -- test passed.
yield

View File

@ -0,0 +1,145 @@
"""
Copyright (c) 2018-2019 Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import collections
import threading
from mbed_host_tests import BaseHostTest
TestCaseData = collections.namedtuple('TestCaseData', ['index', 'data_to_send'])
DEFAULT_SYNC_DELAY = 4.0
MAX_HB_PERIOD = 2.5 # [s] Max expected heartbeat period.
MSG_VALUE_DUMMY = '0'
CASE_DATA_INVALID = 0xffffffff
CASE_DATA_PHASE2_OK = 0xfffffffe
CASE_DATA_INSUFF_HB = 0x0
MSG_KEY_SYNC = '__sync'
MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_START_CASE = 'start_case'
MSG_KEY_DEVICE_RESET = 'dev_reset'
MSG_KEY_HEARTBEAT = 'hb'
class WatchdogReset(BaseHostTest):
"""Host side test that handles device reset.
Given a device with a watchdog timer started.
When the device notifies the host about an incoming reset.
Then the host:
* keeps track of the test case index of the current test suite,
* performs a dev-host handshake.
"""
def __init__(self):
super(WatchdogReset, self).__init__()
self.current_case = TestCaseData(0, CASE_DATA_INVALID)
self.__handshake_timer = None
self.sync_delay = DEFAULT_SYNC_DELAY
self.drop_heartbeat_messages = True
self.hb_timestamps_us = []
def handshake_timer_start(self, seconds=1.0, pre_sync_fun=None):
"""Start a new handshake timer."""
def timer_handler():
"""Perform a dev-host handshake by sending a sync message."""
if pre_sync_fun is not None:
pre_sync_fun()
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
self.__handshake_timer = threading.Timer(seconds, timer_handler)
self.__handshake_timer.start()
def handshake_timer_cancel(self):
"""Cancel the current handshake timer."""
try:
self.__handshake_timer.cancel()
except AttributeError:
pass
finally:
self.__handshake_timer = None
def heartbeat_timeout_handler(self):
"""Handler for the heartbeat timeout.
Compute the time span of the last heartbeat sequence.
Set self.current_case.data_to_send to CASE_DATA_INVALID if no heartbeat was received.
Set self.current_case.data_to_send to CASE_DATA_INSUFF_HB if only one heartbeat was
received.
"""
self.drop_heartbeat_messages = True
dev_data = CASE_DATA_INVALID
if len(self.hb_timestamps_us) == 1:
dev_data = CASE_DATA_INSUFF_HB
self.log('Not enough heartbeats received.')
elif len(self.hb_timestamps_us) >= 2:
dev_data = int(round(0.001 * (self.hb_timestamps_us[-1] - self.hb_timestamps_us[0])))
self.log('Heartbeat time span was {} ms.'.format(dev_data))
self.current_case = TestCaseData(self.current_case.index, dev_data)
def setup(self):
sync_delay = self.get_config_item('forced_reset_timeout')
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
self.register_callback(MSG_KEY_HEARTBEAT, self.cb_heartbeat)
def teardown(self):
self.handshake_timer_cancel()
def cb_device_ready(self, key, value, timestamp):
"""Advance the device test suite to a proper test case.
Additionally, send test case data to the device.
"""
self.handshake_timer_cancel()
msg_value = '{0.index:02x},{0.data_to_send:08x}'.format(self.current_case)
self.send_kv(MSG_KEY_START_CASE, msg_value)
self.drop_heartbeat_messages = False
self.hb_timestamps_us = []
def cb_device_reset(self, key, value, timestamp):
"""Keep track of the test case number.
Also set a new handshake timeout, so when the device gets
restarted by the watchdog, the communication will be restored
by the __handshake_timer.
"""
self.handshake_timer_cancel()
case_num, dev_reset_delay_ms = (int(i, base=16) for i in value.split(','))
self.current_case = TestCaseData(case_num, CASE_DATA_PHASE2_OK)
self.handshake_timer_start(self.sync_delay + dev_reset_delay_ms / 1000.0)
def cb_heartbeat(self, key, value, timestamp):
"""Save the timestamp of a heartbeat message.
Additionally, keep track of the test case number.
Also each heartbeat sets a new timeout, so when the device gets
restarted by the watchdog, the communication will be restored
by the __handshake_timer.
"""
if self.drop_heartbeat_messages:
return
self.handshake_timer_cancel()
case_num, timestamp_us = (int(i, base=16) for i in value.split(','))
self.current_case = TestCaseData(case_num, CASE_DATA_INVALID)
self.hb_timestamps_us.append(timestamp_us)
self.handshake_timer_start(
seconds=(MAX_HB_PERIOD + self.sync_delay),
pre_sync_fun=self.heartbeat_timeout_handler)

Some files were not shown because too many files have changed in this diff Show More