Merge pull request #13356 from gpsimenos/gp-drivers-restruct

Drivers directory restructuring
pull/13408/head
Anna Bridge 2020-08-05 11:55:41 +01:00 committed by GitHub
commit 7418b9d3d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
108 changed files with 614 additions and 3 deletions

View File

@ -29,3 +29,4 @@
^UNITTESTS
^storage/blockdevice/tests/UNITTESTS
^storage/kvstore/tests/UNITTESTS
^drivers/tests/UNITTESTS

View File

@ -101,7 +101,7 @@ matrix:
! git grep '^#include\s["'"']mbed.h['"'"]$' -- '*.c' '*.h' '*.cpp' '*.hpp' \
':!*platform_mbed.h' ':!*TESTS/*' ':!TEST_APPS/' ':!UNITTESTS/' \
':!*tests/*' ':!*targets/*' ':!*TARGET_*' ':!*unsupported/*' \
':!*events/tests/*'
':!*events/tests/*' ':!*drivers/tests/*'
### Docs Tests ###

View File

@ -121,6 +121,9 @@ set(unittest-includes-base
"${PROJECT_SOURCE_DIR}/../storage/kvstore/kv_config"
"${PROJECT_SOURCE_DIR}/../storage/kvstore/kv_config/include"
"${PROJECT_SOURCE_DIR}/../drivers"
"${PROJECT_SOURCE_DIR}/../drivers/include"
"${PROJECT_SOURCE_DIR}/../drivers/include/drivers"
"${PROJECT_SOURCE_DIR}/../drivers/include/drivers/internal"
"${PROJECT_SOURCE_DIR}/../hal"
"${PROJECT_SOURCE_DIR}/../events/include"
"${PROJECT_SOURCE_DIR}/../events/include/events/internal"

View File

@ -0,0 +1 @@
UNITTESTS/*

View File

@ -0,0 +1,32 @@
"""
Copyright (c) 2011-2020, Arm Limited and affiliates
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import uuid
from mbed_host_tests import BaseHostTest
class Device_Echo(BaseHostTest):
def _callback_repeat(self, key, value, _):
self.send_kv(key, value)
def setup(self):
self.register_callback("echo", self._callback_repeat)
self.register_callback("echo_count", self._callback_repeat)
def teardown(self):
pass

View File

@ -0,0 +1,181 @@
"""
Copyright (c) 2018-2019 Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from mbed_host_tests import BaseHostTest
DEFAULT_SYNC_DELAY = 4.0
MSG_VALUE_WATCHDOG_PRESENT = 1
MSG_VALUE_DUMMY = '0'
MSG_VALUE_RESET_REASON_GET = 'get'
MSG_VALUE_RESET_REASON_CLEAR = 'clear'
MSG_VALUE_DEVICE_RESET_NVIC = 'nvic'
MSG_VALUE_DEVICE_RESET_WATCHDOG = 'watchdog'
MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_RESET_REASON_RAW = 'reason_raw'
MSG_KEY_RESET_REASON = 'reason'
MSG_KEY_DEVICE_RESET = 'reset'
MSG_KEY_SYNC = '__sync'
MSG_KEY_RESET_COMPLETE = 'reset_complete'
RESET_REASONS = {
'POWER_ON': '0',
'PIN_RESET': '1',
'BROWN_OUT': '2',
'SOFTWARE': '3',
'WATCHDOG': '4',
'LOCKUP': '5',
'WAKE_LOW_POWER': '6',
'ACCESS_ERROR': '7',
'BOOT_ERROR': '8',
'MULTIPLE': '9',
'PLATFORM': '10',
'UNKNOWN': '11'
}
def raise_if_different(expected, actual, text=''):
"""Raise a RuntimeError if actual is different than expected."""
if expected != actual:
raise RuntimeError('{}Got {!r}, expected {!r}'
.format(text, actual, expected))
class ResetReasonTest(BaseHostTest):
"""Test for the Reset Reason HAL API.
Given a device supporting a Reset Reason API.
When the device is restarted using various methods.
Then the device returns a correct reset reason for every restart.
"""
def __init__(self):
super(ResetReasonTest, self).__init__()
self.device_reasons = None
self.device_has_watchdog = None
self.raw_reset_reasons = set()
self.sync_delay = DEFAULT_SYNC_DELAY
self.test_steps_sequence = self.test_steps()
# Advance the coroutine to it's first yield statement.
self.test_steps_sequence.send(None)
def setup(self):
sync_delay = self.get_config_item('forced_reset_timeout')
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
self.register_callback(MSG_KEY_RESET_REASON_RAW, self.cb_reset_reason_raw)
self.register_callback(MSG_KEY_RESET_REASON, self.cb_reset_reason)
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_reset_reason)
self.register_callback(MSG_KEY_RESET_COMPLETE, self.cb_reset_reason)
def cb_device_ready(self, key, value, timestamp):
"""Request a raw value of the reset_reason register.
Additionally, save the device's reset_reason capabilities
and the watchdog status on the first call.
"""
if self.device_reasons is None:
reasons, wdg_status = (int(i, base=16) for i in value.split(','))
self.device_has_watchdog = (wdg_status == MSG_VALUE_WATCHDOG_PRESENT)
self.device_reasons = [k for k, v in RESET_REASONS.items() if (reasons & 1 << int(v))]
self.send_kv(MSG_KEY_RESET_REASON_RAW, MSG_VALUE_RESET_REASON_GET)
def cb_reset_reason_raw(self, key, value, timestamp):
"""Verify that the raw reset_reason register value is unique.
Fail the test suite if the raw reset_reason value is not unique.
Request a platform independent reset_reason otherwise.
"""
if value in self.raw_reset_reasons:
self.log('TEST FAILED: The raw reset reason is not unique. '
'{!r} is already present in {!r}.'
.format(value, self.raw_reset_reasons))
self.notify_complete(False)
else:
self.raw_reset_reasons.add(value)
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_GET)
def cb_reset_reason(self, key, value, timestamp):
"""Feed the test_steps coroutine with reset_reason value.
Pass the test suite if the coroutine yields True.
Fail the test suite if the iterator stops or raises a RuntimeError.
"""
try:
if self.test_steps_sequence.send(value):
self.notify_complete(True)
except (StopIteration, RuntimeError) as exc:
self.log('TEST FAILED: {}'.format(exc))
self.notify_complete(False)
def test_steps(self):
"""Generate a sequence of test steps.
This coroutine calls yield to wait for the input from the device
(the reset_reason). If the device gives the wrong response, the
generator raises a RuntimeError exception and fails the test.
"""
# Ignore the first reason.
__ignored_reset_reason = yield
self.raw_reset_reasons.clear()
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
__ignored_clear_ack = yield
# Request a NVIC_SystemReset() call.
expected_reason = 'SOFTWARE'
if expected_reason not in self.device_reasons:
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
else:
# Request a NVIC_SystemReset() call.
self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DEVICE_RESET_NVIC)
__ignored_reset_ack = yield
time.sleep(self.sync_delay)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
reset_reason = yield
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
__ignored_clear_ack = yield
# Reset the device using DAP.
expected_reason = 'PIN_RESET'
if expected_reason not in self.device_reasons:
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
else:
self.reset()
__ignored_reset_ack = yield # 'reset_complete'
time.sleep(self.sync_delay)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
reset_reason = yield
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
__ignored_clear_ack = yield
# Start a watchdog timer and wait for it to reset the device.
expected_reason = 'WATCHDOG'
if expected_reason not in self.device_reasons or not self.device_has_watchdog:
self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
else:
self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DEVICE_RESET_WATCHDOG)
__ignored_reset_ack = yield
time.sleep(self.sync_delay)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
reset_reason = yield
raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
# The sequence is correct -- test passed.
yield True

View File

@ -0,0 +1,38 @@
"""
Copyright (c) 2019 Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from mbed_host_tests import BaseHostTest
MSG_KEY_ECHO_MESSAGE = "echo_message"
class SerialComms(BaseHostTest):
"""Host side test that handles messages sent using serial classes."""
def __init__(self):
"""Initialize an object."""
super(SerialComms, self).__init__()
def setup(self):
"""Register call backs to handle message from the target."""
self.register_callback(MSG_KEY_ECHO_MESSAGE, self.cb_echo_message)
def cb_echo_message(self, key, value, timestamp):
"""Send back the key and value received."""
self.send_kv(key, value)

View File

@ -0,0 +1,74 @@
"""
Copyright (c) 2018-2019 Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from mbed_host_tests import BaseHostTest
DEFAULT_SYNC_DELAY = 4.0
MSG_VALUE_DUMMY = '0'
MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_START_CASE = 'start_case'
MSG_KEY_DEVICE_RESET = 'reset_on_case_teardown'
MSG_KEY_SYNC = '__sync'
class SyncOnReset(BaseHostTest):
"""Host side test that handles device reset during case teardown.
Given a device that performs a reset during a test case teardown.
When the device notifies the host about the reset.
Then the host:
* keeps track of the test case index of the current test suite,
* performs a dev-host handshake,
* advances the test suite to next test case.
Note:
Developed for a watchdog test, so that it can be run on devices that
do not support watchdog timeout updates after the initial setup.
As a solution, after testing watchdog with one set of settings, the
device performs a reset and notifies the host with the test case number,
so that the test suite may be advanced once the device boots again.
"""
def __init__(self):
super(SyncOnReset, self).__init__()
self.test_case_num = 0
self.sync_delay = DEFAULT_SYNC_DELAY
def setup(self):
sync_delay = self.get_config_item('forced_reset_timeout')
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
def cb_device_ready(self, key, value, timestamp):
"""Advance the device test suite to the next test case."""
self.send_kv(MSG_KEY_START_CASE, self.test_case_num)
def cb_device_reset(self, key, value, timestamp):
"""Wait for the device to boot and perform a handshake.
Additionally, keep track of the last test case number.
"""
try:
self.test_case_num = int(value)
except ValueError:
pass
self.test_case_num += 1
time.sleep(self.sync_delay)
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)

View File

@ -0,0 +1,136 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from mbed_host_tests import BaseHostTest
import time
class TimingDriftSync(BaseHostTest):
"""
This works as master-slave fashion
1) Device says its booted up and ready to run the test, wait for host to respond
2) Host sends the message to get the device current time i.e base time
#
# *
# * |
#<---* DUT<- base_time | - round_trip_base_time ------
# * | |
# * - |
# - |
# | |
# | |
# | - measurement_stretch | - nominal_time
# | |
# | |
# - |
# * - |
# * | |
#<---* DUT <-final_time | - round_trip_final_time------
# * |
# * -
#
#
# As we increase the measurement_stretch, the error because of transport delay diminishes.
# The values of measurement_stretch is propotional to round_trip_base_time(transport delays)
# by factor time_measurement_multiplier.This multiplier is used is 80 to tolerate 2 sec of
# transport delay and test time ~ 180 secs
#
# Failure in timing can occur if we are ticking too fast or we are ticking too slow, hence we have
# min_range and max_range. if we cross on either side tests would be marked fail. The range is a function of
# tolerance/acceptable drift currently its 5%.
#
"""
__result = None
mega = 1000000.0
max_measurement_time = 180
# this value is obtained for measurements when there is 0 transport delay and we want accurancy of 5%
time_measurement_multiplier = 80
def _callback_timing_drift_check_start(self, key, value, timestamp):
self.round_trip_base_start = timestamp
self.send_kv("base_time", 0)
def _callback_base_time(self, key, value, timestamp):
self.round_trip_base_end = timestamp
self.device_time_base = float(value)
self.round_trip_base_time = self.round_trip_base_end - self.round_trip_base_start
self.log("Device base time {}".format(value))
measurement_stretch = (self.round_trip_base_time * self.time_measurement_multiplier) + 5
if measurement_stretch > self.max_measurement_time:
self.log("Time required {} to determine device timer is too high due to transport delay, skipping".format(measurement_stretch))
else:
self.log("sleeping for {} to measure drift accurately".format(measurement_stretch))
time.sleep(measurement_stretch)
self.round_trip_final_start = time.time()
self.send_kv("final_time", 0)
def _callback_final_time(self, key, value, timestamp):
self.round_trip_final_end = timestamp
self.device_time_final = float(value)
self.round_trip_final_time = self.round_trip_final_end - self.round_trip_final_start
self.log("Device final time {} ".format(value))
# compute the test results and send to device
results = "pass" if self.compute_parameter() else "fail"
self.send_kv(results, "0")
def setup(self):
self.register_callback('timing_drift_check_start', self._callback_timing_drift_check_start)
self.register_callback('base_time', self._callback_base_time)
self.register_callback('final_time', self._callback_final_time)
def compute_parameter(self, failure_criteria=0.05):
t_max = self.round_trip_final_end - self.round_trip_base_start
t_min = self.round_trip_final_start - self.round_trip_base_end
t_max_hi = t_max * (1 + failure_criteria)
t_max_lo = t_max * (1 - failure_criteria)
t_min_hi = t_min * (1 + failure_criteria)
t_min_lo = t_min * (1 - failure_criteria)
device_time = (self.device_time_final - self.device_time_base) / self.mega
self.log("Compute host events")
self.log("Transport delay 0: {}".format(self.round_trip_base_time))
self.log("Transport delay 1: {}".format(self.round_trip_final_time))
self.log("DUT base time : {}".format(self.device_time_base))
self.log("DUT end time : {}".format(self.device_time_final))
self.log("min_pass : {} , max_pass : {} for {}%%".format(t_max_lo, t_min_hi, failure_criteria * 100))
self.log("min_inconclusive : {} , max_inconclusive : {}".format(t_min_lo, t_max_hi))
self.log("Time reported by device: {}".format(device_time))
if t_max_lo <= device_time <= t_min_hi:
self.log("Test passed !!!")
self.__result = True
elif t_min_lo <= device_time <= t_max_hi:
self.log("Test inconclusive due to transport delay, retrying")
self.__result = False
else:
self.log("Time outside of passing range. Timing drift seems to be present !!!")
self.__result = False
return self.__result
def result(self):
return self.__result
def teardown(self):
pass

View File

@ -0,0 +1,145 @@
"""
Copyright (c) 2018-2019 Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import collections
import threading
from mbed_host_tests import BaseHostTest
TestCaseData = collections.namedtuple('TestCaseData', ['index', 'data_to_send'])
DEFAULT_SYNC_DELAY = 4.0
MAX_HB_PERIOD = 2.5 # [s] Max expected heartbeat period.
MSG_VALUE_DUMMY = '0'
CASE_DATA_INVALID = 0xffffffff
CASE_DATA_PHASE2_OK = 0xfffffffe
CASE_DATA_INSUFF_HB = 0x0
MSG_KEY_SYNC = '__sync'
MSG_KEY_DEVICE_READY = 'ready'
MSG_KEY_START_CASE = 'start_case'
MSG_KEY_DEVICE_RESET = 'dev_reset'
MSG_KEY_HEARTBEAT = 'hb'
class WatchdogReset(BaseHostTest):
"""Host side test that handles device reset.
Given a device with a watchdog timer started.
When the device notifies the host about an incoming reset.
Then the host:
* keeps track of the test case index of the current test suite,
* performs a dev-host handshake.
"""
def __init__(self):
super(WatchdogReset, self).__init__()
self.current_case = TestCaseData(0, CASE_DATA_INVALID)
self.__handshake_timer = None
self.sync_delay = DEFAULT_SYNC_DELAY
self.drop_heartbeat_messages = True
self.hb_timestamps_us = []
def handshake_timer_start(self, seconds=1.0, pre_sync_fun=None):
"""Start a new handshake timer."""
def timer_handler():
"""Perform a dev-host handshake by sending a sync message."""
if pre_sync_fun is not None:
pre_sync_fun()
self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
self.__handshake_timer = threading.Timer(seconds, timer_handler)
self.__handshake_timer.start()
def handshake_timer_cancel(self):
"""Cancel the current handshake timer."""
try:
self.__handshake_timer.cancel()
except AttributeError:
pass
finally:
self.__handshake_timer = None
def heartbeat_timeout_handler(self):
"""Handler for the heartbeat timeout.
Compute the time span of the last heartbeat sequence.
Set self.current_case.data_to_send to CASE_DATA_INVALID if no heartbeat was received.
Set self.current_case.data_to_send to CASE_DATA_INSUFF_HB if only one heartbeat was
received.
"""
self.drop_heartbeat_messages = True
dev_data = CASE_DATA_INVALID
if len(self.hb_timestamps_us) == 1:
dev_data = CASE_DATA_INSUFF_HB
self.log('Not enough heartbeats received.')
elif len(self.hb_timestamps_us) >= 2:
dev_data = int(round(0.001 * (self.hb_timestamps_us[-1] - self.hb_timestamps_us[0])))
self.log('Heartbeat time span was {} ms.'.format(dev_data))
self.current_case = TestCaseData(self.current_case.index, dev_data)
def setup(self):
sync_delay = self.get_config_item('forced_reset_timeout')
self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
self.register_callback(MSG_KEY_HEARTBEAT, self.cb_heartbeat)
def teardown(self):
self.handshake_timer_cancel()
def cb_device_ready(self, key, value, timestamp):
"""Advance the device test suite to a proper test case.
Additionally, send test case data to the device.
"""
self.handshake_timer_cancel()
msg_value = '{0.index:02x},{0.data_to_send:08x}'.format(self.current_case)
self.send_kv(MSG_KEY_START_CASE, msg_value)
self.drop_heartbeat_messages = False
self.hb_timestamps_us = []
def cb_device_reset(self, key, value, timestamp):
"""Keep track of the test case number.
Also set a new handshake timeout, so when the device gets
restarted by the watchdog, the communication will be restored
by the __handshake_timer.
"""
self.handshake_timer_cancel()
case_num, dev_reset_delay_ms = (int(i, base=16) for i in value.split(','))
self.current_case = TestCaseData(case_num, CASE_DATA_PHASE2_OK)
self.handshake_timer_start(self.sync_delay + dev_reset_delay_ms / 1000.0)
def cb_heartbeat(self, key, value, timestamp):
"""Save the timestamp of a heartbeat message.
Additionally, keep track of the test case number.
Also each heartbeat sets a new timeout, so when the device gets
restarted by the watchdog, the communication will be restored
by the __handshake_timer.
"""
if self.drop_heartbeat_messages:
return
self.handshake_timer_cancel()
case_num, timestamp_us = (int(i, base=16) for i in value.split(','))
self.current_case = TestCaseData(case_num, CASE_DATA_INVALID)
self.hb_timestamps_us.append(timestamp_us)
self.handshake_timer_start(
seconds=(MAX_HB_PERIOD + self.sync_delay),
pre_sync_fun=self.heartbeat_timeout_handler)

Some files were not shown because too many files have changed in this diff Show More