mirror of https://github.com/ARMmbed/mbed-os.git
				
				
				
			Move greentea tests closure to library
							parent
							
								
									993ed3b975
								
							
						
					
					
						commit
						159410bea7
					
				| 
						 | 
				
			
			@ -0,0 +1,181 @@
 | 
			
		|||
"""
 | 
			
		||||
Copyright (c) 2018-2019 Arm Limited and affiliates.
 | 
			
		||||
SPDX-License-Identifier: Apache-2.0
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
"""
 | 
			
		||||
import time
 | 
			
		||||
from mbed_host_tests import BaseHostTest
 | 
			
		||||
 | 
			
		||||
DEFAULT_SYNC_DELAY = 4.0
 | 
			
		||||
 | 
			
		||||
MSG_VALUE_WATCHDOG_PRESENT = 1
 | 
			
		||||
MSG_VALUE_DUMMY = '0'
 | 
			
		||||
MSG_VALUE_RESET_REASON_GET = 'get'
 | 
			
		||||
MSG_VALUE_RESET_REASON_CLEAR = 'clear'
 | 
			
		||||
MSG_VALUE_DEVICE_RESET_NVIC = 'nvic'
 | 
			
		||||
MSG_VALUE_DEVICE_RESET_WATCHDOG = 'watchdog'
 | 
			
		||||
 | 
			
		||||
MSG_KEY_DEVICE_READY = 'ready'
 | 
			
		||||
MSG_KEY_RESET_REASON_RAW = 'reason_raw'
 | 
			
		||||
MSG_KEY_RESET_REASON = 'reason'
 | 
			
		||||
MSG_KEY_DEVICE_RESET = 'reset'
 | 
			
		||||
MSG_KEY_SYNC = '__sync'
 | 
			
		||||
MSG_KEY_RESET_COMPLETE = 'reset_complete'
 | 
			
		||||
 | 
			
		||||
RESET_REASONS = {
 | 
			
		||||
    'POWER_ON': '0',
 | 
			
		||||
    'PIN_RESET': '1',
 | 
			
		||||
    'BROWN_OUT': '2',
 | 
			
		||||
    'SOFTWARE': '3',
 | 
			
		||||
    'WATCHDOG': '4',
 | 
			
		||||
    'LOCKUP': '5',
 | 
			
		||||
    'WAKE_LOW_POWER': '6',
 | 
			
		||||
    'ACCESS_ERROR': '7',
 | 
			
		||||
    'BOOT_ERROR': '8',
 | 
			
		||||
    'MULTIPLE': '9',
 | 
			
		||||
    'PLATFORM': '10',
 | 
			
		||||
    'UNKNOWN': '11'
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def raise_if_different(expected, actual, text=''):
 | 
			
		||||
    """Raise a RuntimeError if actual is different than expected."""
 | 
			
		||||
    if expected != actual:
 | 
			
		||||
        raise RuntimeError('{}Got {!r}, expected {!r}'
 | 
			
		||||
                           .format(text, actual, expected))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ResetReasonTest(BaseHostTest):
 | 
			
		||||
    """Test for the Reset Reason HAL API.
 | 
			
		||||
 | 
			
		||||
    Given a device supporting a Reset Reason API.
 | 
			
		||||
    When the device is restarted using various methods.
 | 
			
		||||
    Then the device returns a correct reset reason for every restart.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(ResetReasonTest, self).__init__()
 | 
			
		||||
        self.device_reasons = None
 | 
			
		||||
        self.device_has_watchdog = None
 | 
			
		||||
        self.raw_reset_reasons = set()
 | 
			
		||||
        self.sync_delay = DEFAULT_SYNC_DELAY
 | 
			
		||||
        self.test_steps_sequence = self.test_steps()
 | 
			
		||||
        # Advance the coroutine to it's first yield statement.
 | 
			
		||||
        self.test_steps_sequence.send(None)
 | 
			
		||||
 | 
			
		||||
    def setup(self):
 | 
			
		||||
        sync_delay = self.get_config_item('forced_reset_timeout')
 | 
			
		||||
        self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
 | 
			
		||||
        self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
 | 
			
		||||
        self.register_callback(MSG_KEY_RESET_REASON_RAW, self.cb_reset_reason_raw)
 | 
			
		||||
        self.register_callback(MSG_KEY_RESET_REASON, self.cb_reset_reason)
 | 
			
		||||
        self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_reset_reason)
 | 
			
		||||
        self.register_callback(MSG_KEY_RESET_COMPLETE, self.cb_reset_reason)
 | 
			
		||||
 | 
			
		||||
    def cb_device_ready(self, key, value, timestamp):
 | 
			
		||||
        """Request a raw value of the reset_reason register.
 | 
			
		||||
 | 
			
		||||
        Additionally, save the device's reset_reason capabilities
 | 
			
		||||
        and the watchdog status on the first call.
 | 
			
		||||
        """
 | 
			
		||||
        if self.device_reasons is None:
 | 
			
		||||
            reasons, wdg_status = (int(i, base=16) for i in value.split(','))
 | 
			
		||||
            self.device_has_watchdog = (wdg_status == MSG_VALUE_WATCHDOG_PRESENT)
 | 
			
		||||
            self.device_reasons = [k for k, v in RESET_REASONS.items() if (reasons & 1 << int(v))]
 | 
			
		||||
        self.send_kv(MSG_KEY_RESET_REASON_RAW, MSG_VALUE_RESET_REASON_GET)
 | 
			
		||||
 | 
			
		||||
    def cb_reset_reason_raw(self, key, value, timestamp):
 | 
			
		||||
        """Verify that the raw reset_reason register value is unique.
 | 
			
		||||
 | 
			
		||||
        Fail the test suite if the raw reset_reason value is not unique.
 | 
			
		||||
        Request a platform independent reset_reason otherwise.
 | 
			
		||||
        """
 | 
			
		||||
        if value in self.raw_reset_reasons:
 | 
			
		||||
            self.log('TEST FAILED: The raw reset reason is not unique. '
 | 
			
		||||
                     '{!r} is already present in {!r}.'
 | 
			
		||||
                     .format(value, self.raw_reset_reasons))
 | 
			
		||||
            self.notify_complete(False)
 | 
			
		||||
        else:
 | 
			
		||||
            self.raw_reset_reasons.add(value)
 | 
			
		||||
        self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_GET)
 | 
			
		||||
 | 
			
		||||
    def cb_reset_reason(self, key, value, timestamp):
 | 
			
		||||
        """Feed the test_steps coroutine with reset_reason value.
 | 
			
		||||
 | 
			
		||||
        Pass the test suite if the coroutine yields True.
 | 
			
		||||
        Fail the test suite if the iterator stops or raises a RuntimeError.
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            if self.test_steps_sequence.send(value):
 | 
			
		||||
                self.notify_complete(True)
 | 
			
		||||
        except (StopIteration, RuntimeError) as exc:
 | 
			
		||||
            self.log('TEST FAILED: {}'.format(exc))
 | 
			
		||||
            self.notify_complete(False)
 | 
			
		||||
 | 
			
		||||
    def test_steps(self):
 | 
			
		||||
        """Generate a sequence of test steps.
 | 
			
		||||
 | 
			
		||||
        This coroutine calls yield to wait for the input from the device
 | 
			
		||||
        (the reset_reason). If the device gives the wrong response, the
 | 
			
		||||
        generator raises a RuntimeError exception and fails the test.
 | 
			
		||||
        """
 | 
			
		||||
        # Ignore the first reason.
 | 
			
		||||
        __ignored_reset_reason = yield
 | 
			
		||||
        self.raw_reset_reasons.clear()
 | 
			
		||||
        self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
 | 
			
		||||
        __ignored_clear_ack = yield
 | 
			
		||||
 | 
			
		||||
        # Request a NVIC_SystemReset() call.
 | 
			
		||||
        expected_reason = 'SOFTWARE'
 | 
			
		||||
        if expected_reason not in self.device_reasons:
 | 
			
		||||
            self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
 | 
			
		||||
        else:
 | 
			
		||||
            # Request a NVIC_SystemReset() call.
 | 
			
		||||
            self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DEVICE_RESET_NVIC)
 | 
			
		||||
            __ignored_reset_ack = yield
 | 
			
		||||
            time.sleep(self.sync_delay)
 | 
			
		||||
            self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
 | 
			
		||||
            reset_reason = yield
 | 
			
		||||
            raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
 | 
			
		||||
            self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
 | 
			
		||||
            __ignored_clear_ack = yield
 | 
			
		||||
 | 
			
		||||
        # Reset the device using DAP.
 | 
			
		||||
        expected_reason = 'PIN_RESET'
 | 
			
		||||
        if expected_reason not in self.device_reasons:
 | 
			
		||||
            self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
 | 
			
		||||
        else:
 | 
			
		||||
            self.reset()
 | 
			
		||||
            __ignored_reset_ack = yield  # 'reset_complete'
 | 
			
		||||
            time.sleep(self.sync_delay)
 | 
			
		||||
            self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
 | 
			
		||||
            reset_reason = yield
 | 
			
		||||
            raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
 | 
			
		||||
            self.send_kv(MSG_KEY_RESET_REASON, MSG_VALUE_RESET_REASON_CLEAR)
 | 
			
		||||
            __ignored_clear_ack = yield
 | 
			
		||||
 | 
			
		||||
        # Start a watchdog timer and wait for it to reset the device.
 | 
			
		||||
        expected_reason = 'WATCHDOG'
 | 
			
		||||
        if expected_reason not in self.device_reasons or not self.device_has_watchdog:
 | 
			
		||||
            self.log('Skipping the {} reset reason -- not supported.'.format(expected_reason))
 | 
			
		||||
        else:
 | 
			
		||||
            self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DEVICE_RESET_WATCHDOG)
 | 
			
		||||
            __ignored_reset_ack = yield
 | 
			
		||||
            time.sleep(self.sync_delay)
 | 
			
		||||
            self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
 | 
			
		||||
            reset_reason = yield
 | 
			
		||||
            raise_if_different(RESET_REASONS[expected_reason], reset_reason, 'Wrong reset reason. ')
 | 
			
		||||
 | 
			
		||||
        # The sequence is correct -- test passed.
 | 
			
		||||
        yield True
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,140 @@
 | 
			
		|||
"""
 | 
			
		||||
Copyright (c) 2011-2020, Arm Limited and affiliates
 | 
			
		||||
SPDX-License-Identifier: Apache-2.0
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from mbed_host_tests import BaseHostTest
 | 
			
		||||
import time
 | 
			
		||||
import calendar
 | 
			
		||||
import datetime
 | 
			
		||||
 | 
			
		||||
class RTC_time_calc_test(BaseHostTest):
 | 
			
		||||
    """
 | 
			
		||||
    This is the host part of the test to verify if:
 | 
			
		||||
    - _rtc_mktime function converts a calendar time into time since UNIX epoch as a time_t,
 | 
			
		||||
    - _rtc_localtime function converts a given time in seconds since epoch into calendar time.
 | 
			
		||||
    
 | 
			
		||||
    The same algoritm to generate next calendar time to be tested is used by both parts of the test.
 | 
			
		||||
    We will check if correct time since UNIX epoch is calculated for the first and the last day
 | 
			
		||||
    of each month and across valid years.
 | 
			
		||||
    
 | 
			
		||||
    Mbed part of the test sends calculated time since UNIX epoch.
 | 
			
		||||
    This part validates given value and responds to indicate pass or fail.
 | 
			
		||||
    Additionally it sends also encoded day of week and day of year which
 | 
			
		||||
    will be needed to verify _rtc_localtime.
 | 
			
		||||
    
 | 
			
		||||
    Support for both types of RTC devices is provided:
 | 
			
		||||
    - RTCs which handles all leap years in the mentioned year range correctly. Leap year is determined by checking if
 | 
			
		||||
      the year counter value is divisible by 400, 100, and 4. No problem here.
 | 
			
		||||
    - RTCs which handles leap years correctly up to 2100. The RTC does a simple bit comparison to see if the two
 | 
			
		||||
      lowest order bits of the year counter are zero. In this case 2100 year will be considered
 | 
			
		||||
      incorrectly as a leap year, so the last valid point in time will be 28.02.2100 23:59:59 and next day will be
 | 
			
		||||
      29.02.2100 (invalid). So after 28.02.2100 the day counter will be off by a day.
 | 
			
		||||
      
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    edge_date = datetime.datetime(2100, 2, 28, 0, 0, 0)
 | 
			
		||||
    
 | 
			
		||||
    # Test the following years:
 | 
			
		||||
    # - first - 1970
 | 
			
		||||
    # - example not leap year (not divisible by 4)
 | 
			
		||||
    # - example leap year (divisible by 4 and by 100 and by 400) 
 | 
			
		||||
    # - example leap year (divisible by 4 and not by 100) 
 | 
			
		||||
    # - example not leap year (divisible by 4 and by 100) 
 | 
			
		||||
    # - last fully supported  - 2105
 | 
			
		||||
    years = [1970, 1971, 2000, 2096, 2100, 2105]
 | 
			
		||||
    year_id = 0
 | 
			
		||||
             
 | 
			
		||||
             
 | 
			
		||||
 | 
			
		||||
    full_leap_year_support = False
 | 
			
		||||
 | 
			
		||||
    RTC_FULL_LEAP_YEAR_SUPPORT = 0
 | 
			
		||||
    RTC_PARTIAL_LEAP_YEAR_SUPPORT = 1
 | 
			
		||||
 | 
			
		||||
    def _set_leap_year_support(self, key, value, timestamp):
 | 
			
		||||
        if (int(value) == self.RTC_FULL_LEAP_YEAR_SUPPORT):
 | 
			
		||||
            self.full_leap_year_support = True
 | 
			
		||||
        else:
 | 
			
		||||
            self.full_leap_year_support = False
 | 
			
		||||
 | 
			
		||||
        self.first = True
 | 
			
		||||
        self.date = datetime.datetime(1970, 1, 1, 23, 0, 0)
 | 
			
		||||
        self.year_id = 0
 | 
			
		||||
 | 
			
		||||
    def _verify_timestamp(self, key, value, timestamp):
 | 
			
		||||
        # week day in python is counted from sunday(0) and on mbed side week day is counted from monday(0).
 | 
			
		||||
        # year day in python is counted from 1 and on mbed side year day is counted from 0.
 | 
			
		||||
        week_day = ((self.date.timetuple().tm_wday + 1) % 7)
 | 
			
		||||
        year_day = self.date.timetuple().tm_yday - 1
 | 
			
		||||
 | 
			
		||||
        # Fix for RTC which not have full leap year support.
 | 
			
		||||
        if (not self.full_leap_year_support):
 | 
			
		||||
            if self.date >= self.edge_date:
 | 
			
		||||
                # After 28.02.2100 we should be one day off - add this day and store original
 | 
			
		||||
                date_org = self.date
 | 
			
		||||
                self.date += datetime.timedelta(days = 1)
 | 
			
		||||
                
 | 
			
		||||
                # Adjust week day.
 | 
			
		||||
                week_day = ((self.date.timetuple().tm_wday + 1) % 7)
 | 
			
		||||
 | 
			
		||||
                # Adjust year day.
 | 
			
		||||
                if (self.date.year == 2100):
 | 
			
		||||
                    year_day = self.date.timetuple().tm_yday - 1
 | 
			
		||||
                else:
 | 
			
		||||
                    year_day = date_org.timetuple().tm_yday - 1
 | 
			
		||||
 | 
			
		||||
                # Last day in year
 | 
			
		||||
                if (self.date.month == 1 and self.date.day == 1):
 | 
			
		||||
                    if (self.date.year == 2101):
 | 
			
		||||
                        # Exception for year 2100 - ivalid handled by RTC without full leap year support
 | 
			
		||||
                        year_day = 365
 | 
			
		||||
                    else:
 | 
			
		||||
                        year_day = date_org.timetuple().tm_yday - 1
 | 
			
		||||
 | 
			
		||||
        t = (self.date.year , self.date.month, self.date.day, self.date.hour, self.date.minute, self.date.second, 0, 0, 0)
 | 
			
		||||
        
 | 
			
		||||
        expected_timestamp = calendar.timegm(t)
 | 
			
		||||
        actual_timestamp = int(value) & 0xffffffff # convert to unsigned int
 | 
			
		||||
 | 
			
		||||
        # encode week day and year day in the response
 | 
			
		||||
        response = (week_day << 16) | year_day
 | 
			
		||||
        
 | 
			
		||||
        if (actual_timestamp == expected_timestamp):
 | 
			
		||||
            # response contains encoded week day and year day
 | 
			
		||||
            self.send_kv("passed", str(response))
 | 
			
		||||
        else:
 | 
			
		||||
            self.send_kv("failed", 0)
 | 
			
		||||
            print("expected = %d, result = %d" %  (expected_timestamp , actual_timestamp))
 | 
			
		||||
 | 
			
		||||
        # calculate next date
 | 
			
		||||
        if (self.first):
 | 
			
		||||
            days_range = calendar.monthrange(self.date.year, self.date.month)
 | 
			
		||||
            self.date = self.date.replace(day = days_range[1], minute = 59, second = 59)
 | 
			
		||||
            self.first = not self.first
 | 
			
		||||
        else:
 | 
			
		||||
            self.date += datetime.timedelta(days = 1)
 | 
			
		||||
            if (self.date.month == 1):
 | 
			
		||||
                self.year_id += 1
 | 
			
		||||
                if (len(self.years) == self.year_id):
 | 
			
		||||
                    # All years were processed, no need to calc next date
 | 
			
		||||
                    return
 | 
			
		||||
                self.date = self.date.replace(year = self.years[self.year_id])
 | 
			
		||||
            self.date = self.date.replace(day = 1, minute = 0, second = 0)
 | 
			
		||||
            self.first = not self.first
 | 
			
		||||
 | 
			
		||||
    def setup(self):
 | 
			
		||||
        self.register_callback('timestamp', self._verify_timestamp)
 | 
			
		||||
        self.register_callback('leap_year_setup', self._set_leap_year_support)
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,145 @@
 | 
			
		|||
"""
 | 
			
		||||
mbed SDK
 | 
			
		||||
Copyright (c) 2017-2017 ARM Limited
 | 
			
		||||
SPDX-License-Identifier: Apache-2.0
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
"""
 | 
			
		||||
from __future__ import print_function
 | 
			
		||||
 | 
			
		||||
from mbed_host_tests import BaseHostTest
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RtcResetTest(BaseHostTest):
 | 
			
		||||
    """This test checks that a device's RTC keeps count through a reset
 | 
			
		||||
    
 | 
			
		||||
    It does this by setting the RTC's time, triggering a reset,
 | 
			
		||||
    delaying and then reading the RTC's time again to ensure
 | 
			
		||||
    that the RTC is still counting.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    """Start of the RTC"""
 | 
			
		||||
    START_TIME = 1537789823 # GMT: Monday, 24 September 2018 11:50:23
 | 
			
		||||
    START_TIME_TOLERANCE = 10
 | 
			
		||||
    """Time to delay after sending reset"""
 | 
			
		||||
    DELAY_TIME = 5.0
 | 
			
		||||
    DELAY_TOLERANCE = 1.0
 | 
			
		||||
    VALUE_PLACEHOLDER = "0"
 | 
			
		||||
 | 
			
		||||
    def setup(self):
 | 
			
		||||
        """Register callbacks required for the test"""
 | 
			
		||||
        self._error = False
 | 
			
		||||
        generator = self.rtc_reset_test()
 | 
			
		||||
        next(generator)
 | 
			
		||||
 | 
			
		||||
        def run_gen(key, value, time):
 | 
			
		||||
            """Run the generator, and fail testing if the iterator stops"""
 | 
			
		||||
            if self._error:
 | 
			
		||||
                return
 | 
			
		||||
            try:
 | 
			
		||||
                generator.send((key, value, time))
 | 
			
		||||
            except StopIteration:
 | 
			
		||||
                self._error = True
 | 
			
		||||
 | 
			
		||||
        for resp in ("start", "read", "ack"):
 | 
			
		||||
            self.register_callback(resp, run_gen)
 | 
			
		||||
 | 
			
		||||
    def teardown(self):
 | 
			
		||||
        """No work to do here"""
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def rtc_reset_test(self):
 | 
			
		||||
        """Generator for running the reset test
 | 
			
		||||
        
 | 
			
		||||
        This function calls yield to wait for the next event from
 | 
			
		||||
        the device. If the device gives the wrong response, then the
 | 
			
		||||
        generator terminates by returing which raises a StopIteration
 | 
			
		||||
        exception and fails the test.
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        # Wait for start token
 | 
			
		||||
        key, value, time = yield
 | 
			
		||||
        if key != "start":
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Initialize, and set the time
 | 
			
		||||
        self.send_kv("init", self.VALUE_PLACEHOLDER)
 | 
			
		||||
        
 | 
			
		||||
        # Wait for ack from the device
 | 
			
		||||
        key, value, time = yield
 | 
			
		||||
        if key != "ack":
 | 
			
		||||
            return
 | 
			
		||||
        
 | 
			
		||||
        self.send_kv("write", str(self.START_TIME))
 | 
			
		||||
        
 | 
			
		||||
        # Wait for ack from the device
 | 
			
		||||
        key, value, time = yield
 | 
			
		||||
        if key != "ack":
 | 
			
		||||
            return
 | 
			
		||||
        
 | 
			
		||||
        self.send_kv("read", self.VALUE_PLACEHOLDER)
 | 
			
		||||
        key, value, time = yield
 | 
			
		||||
        if key != "read":
 | 
			
		||||
            return
 | 
			
		||||
        dev_time_start = int(value)
 | 
			
		||||
 | 
			
		||||
        # Unitialize, and reset
 | 
			
		||||
        self.send_kv("free", self.VALUE_PLACEHOLDER)
 | 
			
		||||
        
 | 
			
		||||
        # Wait for ack from the device
 | 
			
		||||
        key, value, time = yield
 | 
			
		||||
        if key != "ack":
 | 
			
		||||
            return
 | 
			
		||||
        
 | 
			
		||||
        self.send_kv("reset", self.VALUE_PLACEHOLDER)
 | 
			
		||||
        
 | 
			
		||||
        # No ack after reset
 | 
			
		||||
        sleep(self.DELAY_TIME)
 | 
			
		||||
 | 
			
		||||
        # Restart the test, and send the sync token
 | 
			
		||||
        self.send_kv("__sync", "00000000-0000-000000000-000000000000")
 | 
			
		||||
        key, value, time = yield
 | 
			
		||||
        if key != "start":
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # Initialize, and read the time
 | 
			
		||||
        self.send_kv("init", self.VALUE_PLACEHOLDER)
 | 
			
		||||
        
 | 
			
		||||
        # Wait for ack from the device
 | 
			
		||||
        key, value, time = yield
 | 
			
		||||
        if key != "ack":
 | 
			
		||||
            return
 | 
			
		||||
        
 | 
			
		||||
        self.send_kv("read", self.VALUE_PLACEHOLDER)
 | 
			
		||||
        key, value, time = yield
 | 
			
		||||
        if key != "read":
 | 
			
		||||
            return
 | 
			
		||||
        dev_time_end = int(value)
 | 
			
		||||
 | 
			
		||||
        # Check result
 | 
			
		||||
        elapsed = dev_time_end - dev_time_start
 | 
			
		||||
        start_time_valid = (self.START_TIME <= dev_time_start <
 | 
			
		||||
                            self.START_TIME + self.START_TIME_TOLERANCE)
 | 
			
		||||
        elapsed_time_valid = elapsed >= self.DELAY_TIME - self.DELAY_TOLERANCE
 | 
			
		||||
        passed = start_time_valid and elapsed_time_valid
 | 
			
		||||
        if not start_time_valid:
 | 
			
		||||
            self.log("FAIL: Expected start time of %i got %i" %
 | 
			
		||||
                     (self.START_TIME, dev_time_start))
 | 
			
		||||
        elif not passed:
 | 
			
		||||
            self.log("FAIL: Delayed for %fs but device "
 | 
			
		||||
                     "reported elapsed time of %fs" %
 | 
			
		||||
                     (self.DELAY_TIME, elapsed))
 | 
			
		||||
        self.send_kv("exit", "pass" if passed else "fail")
 | 
			
		||||
        yield    # No more events expected
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,74 @@
 | 
			
		|||
"""
 | 
			
		||||
Copyright (c) 2018-2019 Arm Limited and affiliates.
 | 
			
		||||
 | 
			
		||||
SPDX-License-Identifier: Apache-2.0
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
"""
 | 
			
		||||
import time
 | 
			
		||||
from mbed_host_tests import BaseHostTest
 | 
			
		||||
 | 
			
		||||
DEFAULT_SYNC_DELAY = 4.0
 | 
			
		||||
 | 
			
		||||
MSG_VALUE_DUMMY = '0'
 | 
			
		||||
MSG_KEY_DEVICE_READY = 'ready'
 | 
			
		||||
MSG_KEY_START_CASE = 'start_case'
 | 
			
		||||
MSG_KEY_DEVICE_RESET = 'reset_on_case_teardown'
 | 
			
		||||
MSG_KEY_SYNC = '__sync'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SyncOnReset(BaseHostTest):
 | 
			
		||||
    """Host side test that handles device reset during case teardown.
 | 
			
		||||
 | 
			
		||||
    Given a device that performs a reset during a test case teardown.
 | 
			
		||||
    When the device notifies the host about the reset.
 | 
			
		||||
    Then the host:
 | 
			
		||||
    * keeps track of the test case index of the current test suite,
 | 
			
		||||
    * performs a dev-host handshake,
 | 
			
		||||
    * advances the test suite to next test case.
 | 
			
		||||
 | 
			
		||||
    Note:
 | 
			
		||||
    Developed for a watchdog test, so that it can be run on devices that
 | 
			
		||||
    do not support watchdog timeout updates after the initial setup.
 | 
			
		||||
    As a solution, after testing watchdog with one set of settings, the
 | 
			
		||||
    device performs a reset and notifies the host with the test case number,
 | 
			
		||||
    so that the test suite may be advanced once the device boots again.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(SyncOnReset, self).__init__()
 | 
			
		||||
        self.test_case_num = 0
 | 
			
		||||
        self.sync_delay = DEFAULT_SYNC_DELAY
 | 
			
		||||
 | 
			
		||||
    def setup(self):
 | 
			
		||||
        sync_delay = self.get_config_item('forced_reset_timeout')
 | 
			
		||||
        self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
 | 
			
		||||
        self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
 | 
			
		||||
        self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
 | 
			
		||||
 | 
			
		||||
    def cb_device_ready(self, key, value, timestamp):
 | 
			
		||||
        """Advance the device test suite to the next test case."""
 | 
			
		||||
        self.send_kv(MSG_KEY_START_CASE, self.test_case_num)
 | 
			
		||||
 | 
			
		||||
    def cb_device_reset(self, key, value, timestamp):
 | 
			
		||||
        """Wait for the device to boot and perform a handshake.
 | 
			
		||||
 | 
			
		||||
        Additionally, keep track of the last test case number.
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            self.test_case_num = int(value)
 | 
			
		||||
        except ValueError:
 | 
			
		||||
            pass
 | 
			
		||||
        self.test_case_num += 1
 | 
			
		||||
        time.sleep(self.sync_delay)
 | 
			
		||||
        self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,76 @@
 | 
			
		|||
"""
 | 
			
		||||
Copyright (c) 2018 ARM Limited
 | 
			
		||||
SPDX-License-Identifier: Apache-2.0
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
"""
 | 
			
		||||
import time
 | 
			
		||||
from mbed_host_tests import BaseHostTest
 | 
			
		||||
from mbed_host_tests.host_tests_runner.host_test_default import DefaultTestSelector
 | 
			
		||||
 | 
			
		||||
DEFAULT_CYCLE_PERIOD = 1.0
 | 
			
		||||
 | 
			
		||||
MSG_VALUE_DUMMY = '0'
 | 
			
		||||
 | 
			
		||||
MSG_KEY_DEVICE_READY = 'ready'
 | 
			
		||||
MSG_KEY_DEVICE_RESET = 'reset'
 | 
			
		||||
MSG_KEY_SYNC = '__sync'
 | 
			
		||||
 | 
			
		||||
class SystemResetTest(BaseHostTest):
 | 
			
		||||
    """Test for the system_reset API.
 | 
			
		||||
 | 
			
		||||
    Given a device running code
 | 
			
		||||
    When the device is restarted using @a system_reset()
 | 
			
		||||
    Then the device is restarted
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(SystemResetTest, self).__init__()
 | 
			
		||||
        self.reset = False
 | 
			
		||||
        self.test_steps_sequence = self.test_steps()
 | 
			
		||||
        # Advance the coroutine to it's first yield statement.
 | 
			
		||||
        self.test_steps_sequence.send(None)
 | 
			
		||||
 | 
			
		||||
    def setup(self):
 | 
			
		||||
        self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
 | 
			
		||||
 | 
			
		||||
    def cb_device_ready(self, key, value, timestamp):
 | 
			
		||||
        """Acknowledge device rebooted correctly and feed the test execution
 | 
			
		||||
        """
 | 
			
		||||
        self.reset = True
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            if self.test_steps_sequence.send(value):
 | 
			
		||||
                self.notify_complete(True)
 | 
			
		||||
        except (StopIteration, RuntimeError) as exc:
 | 
			
		||||
            self.notify_complete(False)
 | 
			
		||||
 | 
			
		||||
    def test_steps(self):
 | 
			
		||||
        """Reset the device and check the status
 | 
			
		||||
        """
 | 
			
		||||
        system_reset = yield
 | 
			
		||||
        self.reset = False
 | 
			
		||||
 | 
			
		||||
        wait_after_reset = self.get_config_item('forced_reset_timeout')
 | 
			
		||||
        wait_after_reset = wait_after_reset if wait_after_reset is not None else DEFAULT_CYCLE_PERIOD
 | 
			
		||||
 | 
			
		||||
        self.send_kv(MSG_KEY_DEVICE_RESET, MSG_VALUE_DUMMY)
 | 
			
		||||
        time.sleep(wait_after_reset)
 | 
			
		||||
        self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
 | 
			
		||||
 | 
			
		||||
        system_reset = yield
 | 
			
		||||
        if self.reset == False:
 | 
			
		||||
            raise RuntimeError('Platform did not reset as expected.')
 | 
			
		||||
 | 
			
		||||
        # The sequence is correct -- test passed.
 | 
			
		||||
        yield True
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,136 @@
 | 
			
		|||
"""
 | 
			
		||||
mbed SDK
 | 
			
		||||
Copyright (c) 2011-2013 ARM Limited
 | 
			
		||||
SPDX-License-Identifier: Apache-2.0
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from mbed_host_tests import BaseHostTest
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TimingDriftSync(BaseHostTest):
 | 
			
		||||
    """
 | 
			
		||||
    This works as master-slave fashion
 | 
			
		||||
    1) Device says its booted up and ready to run the test, wait for host to respond
 | 
			
		||||
    2) Host sends the message to get the device current time i.e base time
 | 
			
		||||
 | 
			
		||||
    #
 | 
			
		||||
    # *
 | 
			
		||||
    #   *                   |
 | 
			
		||||
    #<---* DUT<- base_time  | - round_trip_base_time ------
 | 
			
		||||
    #   *                   |                              |
 | 
			
		||||
    # *                    -                               |
 | 
			
		||||
    #                      -                               |
 | 
			
		||||
    #                       |                              |
 | 
			
		||||
    #                       |                              |
 | 
			
		||||
    #                       | - measurement_stretch        | - nominal_time
 | 
			
		||||
    #                       |                              |
 | 
			
		||||
    #                       |                              |
 | 
			
		||||
    #                      -                               |
 | 
			
		||||
    # *                    -                               |
 | 
			
		||||
    #   *                   |                              |
 | 
			
		||||
    #<---* DUT <-final_time | - round_trip_final_time------
 | 
			
		||||
    #   *                   |
 | 
			
		||||
    # *                    -
 | 
			
		||||
    #
 | 
			
		||||
    #
 | 
			
		||||
    # As we increase the measurement_stretch, the error because of transport delay diminishes.
 | 
			
		||||
    # The values of measurement_stretch is propotional to round_trip_base_time(transport delays)
 | 
			
		||||
    # by factor time_measurement_multiplier.This multiplier is used is 80 to tolerate 2 sec of
 | 
			
		||||
    # transport delay and test time ~ 180 secs
 | 
			
		||||
    #
 | 
			
		||||
    # Failure in timing can occur if we are ticking too fast or we are ticking too slow, hence we have
 | 
			
		||||
    # min_range and max_range. if we cross on either side tests would be marked fail. The range is a function of
 | 
			
		||||
    # tolerance/acceptable drift currently its 5%.
 | 
			
		||||
    #
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    __result = None
 | 
			
		||||
    mega = 1000000.0
 | 
			
		||||
    max_measurement_time = 180
 | 
			
		||||
 | 
			
		||||
    # this value is obtained for measurements when there is 0 transport delay and we want accurancy of 5%
 | 
			
		||||
    time_measurement_multiplier = 80
 | 
			
		||||
 | 
			
		||||
    def _callback_timing_drift_check_start(self, key, value, timestamp):
 | 
			
		||||
        self.round_trip_base_start = timestamp
 | 
			
		||||
        self.send_kv("base_time", 0)
 | 
			
		||||
 | 
			
		||||
    def _callback_base_time(self, key, value, timestamp):
 | 
			
		||||
        self.round_trip_base_end = timestamp
 | 
			
		||||
        self.device_time_base = float(value)
 | 
			
		||||
        self.round_trip_base_time = self.round_trip_base_end - self.round_trip_base_start
 | 
			
		||||
 | 
			
		||||
        self.log("Device base time {}".format(value))
 | 
			
		||||
        measurement_stretch = (self.round_trip_base_time * self.time_measurement_multiplier) + 5
 | 
			
		||||
 | 
			
		||||
        if measurement_stretch > self.max_measurement_time:
 | 
			
		||||
            self.log("Time required {} to determine device timer is too high due to transport delay, skipping".format(measurement_stretch))
 | 
			
		||||
        else:
 | 
			
		||||
            self.log("sleeping for {} to measure drift accurately".format(measurement_stretch))
 | 
			
		||||
            time.sleep(measurement_stretch)
 | 
			
		||||
        self.round_trip_final_start = time.time()
 | 
			
		||||
        self.send_kv("final_time", 0)
 | 
			
		||||
 | 
			
		||||
    def _callback_final_time(self, key, value, timestamp):
 | 
			
		||||
        self.round_trip_final_end = timestamp
 | 
			
		||||
        self.device_time_final = float(value)
 | 
			
		||||
        self.round_trip_final_time = self.round_trip_final_end - self.round_trip_final_start
 | 
			
		||||
        self.log("Device final time {} ".format(value))
 | 
			
		||||
 | 
			
		||||
        # compute the test results and send to device
 | 
			
		||||
        results = "pass" if self.compute_parameter() else "fail"
 | 
			
		||||
        self.send_kv(results, "0")
 | 
			
		||||
 | 
			
		||||
    def setup(self):
 | 
			
		||||
        self.register_callback('timing_drift_check_start', self._callback_timing_drift_check_start)
 | 
			
		||||
        self.register_callback('base_time', self._callback_base_time)
 | 
			
		||||
        self.register_callback('final_time', self._callback_final_time)
 | 
			
		||||
 | 
			
		||||
    def compute_parameter(self, failure_criteria=0.05):
 | 
			
		||||
        t_max = self.round_trip_final_end - self.round_trip_base_start
 | 
			
		||||
        t_min = self.round_trip_final_start - self.round_trip_base_end
 | 
			
		||||
        t_max_hi = t_max * (1 + failure_criteria)
 | 
			
		||||
        t_max_lo = t_max * (1 - failure_criteria)
 | 
			
		||||
        t_min_hi = t_min * (1 + failure_criteria)
 | 
			
		||||
        t_min_lo = t_min * (1 - failure_criteria)
 | 
			
		||||
        device_time = (self.device_time_final - self.device_time_base) / self.mega
 | 
			
		||||
 | 
			
		||||
        self.log("Compute host events")
 | 
			
		||||
        self.log("Transport delay 0: {}".format(self.round_trip_base_time))
 | 
			
		||||
        self.log("Transport delay 1: {}".format(self.round_trip_final_time))
 | 
			
		||||
        self.log("DUT base time : {}".format(self.device_time_base))
 | 
			
		||||
        self.log("DUT end time  : {}".format(self.device_time_final))
 | 
			
		||||
 | 
			
		||||
        self.log("min_pass : {} , max_pass : {} for {}%%".format(t_max_lo, t_min_hi, failure_criteria * 100))
 | 
			
		||||
        self.log("min_inconclusive : {} , max_inconclusive : {}".format(t_min_lo, t_max_hi))
 | 
			
		||||
        self.log("Time reported by device: {}".format(device_time))
 | 
			
		||||
 | 
			
		||||
        if t_max_lo <= device_time <= t_min_hi:
 | 
			
		||||
            self.log("Test passed !!!")
 | 
			
		||||
            self.__result = True
 | 
			
		||||
        elif t_min_lo <= device_time <= t_max_hi:
 | 
			
		||||
            self.log("Test inconclusive due to transport delay, retrying")
 | 
			
		||||
            self.__result = False
 | 
			
		||||
        else:
 | 
			
		||||
            self.log("Time outside of passing range. Timing drift seems to be present !!!")
 | 
			
		||||
            self.__result = False
 | 
			
		||||
        return self.__result
 | 
			
		||||
 | 
			
		||||
    def result(self):
 | 
			
		||||
        return self.__result
 | 
			
		||||
 | 
			
		||||
    def teardown(self):
 | 
			
		||||
        pass
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,147 @@
 | 
			
		|||
"""
 | 
			
		||||
Copyright (c) 2018-2020, Arm Limited and affiliates
 | 
			
		||||
SPDX-License-Identifier: Apache-2.0
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
This script is the host script for trng test sequence, it send the 
 | 
			
		||||
step signaling sequence and receive and transmit data to the device after 
 | 
			
		||||
reset if necesarry (default loading and storing mechanism while reseting the device
 | 
			
		||||
is KVStore, in case KVStore isn't enabled we'll use current infrastructure,
 | 
			
		||||
for more details see main.cpp file)
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
import time
 | 
			
		||||
from mbed_host_tests import BaseHostTest
 | 
			
		||||
from mbed_host_tests.host_tests_runner.host_test_default import DefaultTestSelector
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
DEFAULT_CYCLE_PERIOD      = 1.0
 | 
			
		||||
MSG_VALUE_DUMMY           = '0'
 | 
			
		||||
MSG_TRNG_READY            = 'ready'
 | 
			
		||||
MSG_TRNG_BUFFER           = 'buffer'
 | 
			
		||||
MSG_TRNG_TEST_STEP1       = 'check_step1'
 | 
			
		||||
MSG_TRNG_TEST_STEP2       = 'check_step2'
 | 
			
		||||
MSG_KEY_SYNC              = '__sync'
 | 
			
		||||
MSG_KEY_RESET_COMPLETE    = 'reset_complete'
 | 
			
		||||
MSG_KEY_TEST_SUITE_ENDED  = 'Test_suite_ended'
 | 
			
		||||
MSG_KEY_EXIT              = 'exit'
 | 
			
		||||
 | 
			
		||||
class TRNGResetTest(BaseHostTest):
 | 
			
		||||
    """Test for the TRNG API.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(TRNGResetTest, self).__init__()
 | 
			
		||||
        self.did_reset = False
 | 
			
		||||
        self.ready = False
 | 
			
		||||
        self.suite_ended = False
 | 
			
		||||
        self.buffer = 0
 | 
			
		||||
        self.test_steps_sequence = self.test_steps()
 | 
			
		||||
        # Advance the coroutine to it's first yield statement.
 | 
			
		||||
        self.test_steps_sequence.send(None)
 | 
			
		||||
 | 
			
		||||
    #define callback functions for msg handling
 | 
			
		||||
    def setup(self):
 | 
			
		||||
        self.register_callback(MSG_TRNG_READY, self.cb_device_ready)
 | 
			
		||||
        self.register_callback(MSG_TRNG_BUFFER, self.cb_trng_buffer)
 | 
			
		||||
        self.register_callback(MSG_KEY_TEST_SUITE_ENDED, self.cb_device_test_suit_ended)
 | 
			
		||||
        self.register_callback(MSG_KEY_RESET_COMPLETE, self.cb_reset_complete)
 | 
			
		||||
 | 
			
		||||
    #receive sent data from device before reset
 | 
			
		||||
    def cb_trng_buffer(self, key, value, timestamp):
 | 
			
		||||
        """Acknowledge device rebooted correctly and feed the test execution
 | 
			
		||||
        """
 | 
			
		||||
        self.buffer = value
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            if self.test_steps_sequence.send(value):
 | 
			
		||||
                self.notify_complete(True)
 | 
			
		||||
        except (StopIteration, RuntimeError) as exc:
 | 
			
		||||
            self.notify_complete(False)
 | 
			
		||||
 | 
			
		||||
    def cb_device_ready(self, key, value, timestamp):
 | 
			
		||||
        """Acknowledge device rebooted correctly and feed the test execution
 | 
			
		||||
        """
 | 
			
		||||
        self.ready = True
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            if self.test_steps_sequence.send(value):
 | 
			
		||||
                self.notify_complete(True)
 | 
			
		||||
        except (StopIteration, RuntimeError) as exc:
 | 
			
		||||
            self.notify_complete(False)
 | 
			
		||||
 | 
			
		||||
    def cb_reset_complete(self, key, value, timestamp):
 | 
			
		||||
        """Acknowledge reset complete
 | 
			
		||||
        """
 | 
			
		||||
        self.did_reset = True
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            if self.test_steps_sequence.send(value):
 | 
			
		||||
                self.notify_complete(True)
 | 
			
		||||
        except (StopIteration, RuntimeError) as exc:
 | 
			
		||||
            self.notify_complete(False)
 | 
			
		||||
 | 
			
		||||
    def cb_device_test_suit_ended(self, key, value, timestamp):
 | 
			
		||||
        """Acknowledge device finished a test step correctly and feed the test execution
 | 
			
		||||
        """
 | 
			
		||||
        self.suite_ended = True
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            if self.test_steps_sequence.send(value):
 | 
			
		||||
                self.notify_complete(True)
 | 
			
		||||
        except (StopIteration, RuntimeError) as exc:
 | 
			
		||||
            self.notify_complete(False)
 | 
			
		||||
 | 
			
		||||
    #define test steps and actions
 | 
			
		||||
    def test_steps(self):
 | 
			
		||||
        """Test step 1
 | 
			
		||||
        """
 | 
			
		||||
        wait_for_communication = yield
 | 
			
		||||
 | 
			
		||||
        self.ready = False
 | 
			
		||||
        self.did_reset = False
 | 
			
		||||
        self.suite_ended = False
 | 
			
		||||
        self.send_kv(MSG_TRNG_TEST_STEP1, MSG_VALUE_DUMMY)
 | 
			
		||||
        wait_for_communication = yield
 | 
			
		||||
        if self.buffer == 0:
 | 
			
		||||
            raise RuntimeError('Phase 1: No buffer received.')
 | 
			
		||||
 | 
			
		||||
        self.reset()
 | 
			
		||||
 | 
			
		||||
        """Test step 2 (After reset)
 | 
			
		||||
        """
 | 
			
		||||
        wait_for_communication = yield
 | 
			
		||||
        if self.did_reset == False:
 | 
			
		||||
            raise RuntimeError('Phase 1: Platform did not reset as expected.')
 | 
			
		||||
 | 
			
		||||
        self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
 | 
			
		||||
 | 
			
		||||
        wait_for_communication = yield
 | 
			
		||||
 | 
			
		||||
        if self.ready == False:
 | 
			
		||||
            raise RuntimeError('Phase 1: Platform not ready as expected.')
 | 
			
		||||
 | 
			
		||||
        self.send_kv(MSG_TRNG_TEST_STEP2, self.buffer)
 | 
			
		||||
 | 
			
		||||
        wait_for_communication = yield
 | 
			
		||||
 | 
			
		||||
        if self.suite_ended == False:
 | 
			
		||||
            raise RuntimeError('Test failed.')
 | 
			
		||||
 | 
			
		||||
        self.send_kv(MSG_KEY_EXIT, MSG_VALUE_DUMMY)
 | 
			
		||||
 | 
			
		||||
        # The sequence is correct -- test passed.
 | 
			
		||||
        yield
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,145 @@
 | 
			
		|||
"""
 | 
			
		||||
Copyright (c) 2018-2019 Arm Limited and affiliates.
 | 
			
		||||
SPDX-License-Identifier: Apache-2.0
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
"""
 | 
			
		||||
import collections
 | 
			
		||||
import threading
 | 
			
		||||
from mbed_host_tests import BaseHostTest
 | 
			
		||||
 | 
			
		||||
TestCaseData = collections.namedtuple('TestCaseData', ['index', 'data_to_send'])
 | 
			
		||||
 | 
			
		||||
DEFAULT_SYNC_DELAY = 4.0
 | 
			
		||||
MAX_HB_PERIOD = 2.5  # [s] Max expected heartbeat period.
 | 
			
		||||
 | 
			
		||||
MSG_VALUE_DUMMY = '0'
 | 
			
		||||
CASE_DATA_INVALID = 0xffffffff
 | 
			
		||||
CASE_DATA_PHASE2_OK = 0xfffffffe
 | 
			
		||||
CASE_DATA_INSUFF_HB = 0x0
 | 
			
		||||
 | 
			
		||||
MSG_KEY_SYNC = '__sync'
 | 
			
		||||
MSG_KEY_DEVICE_READY = 'ready'
 | 
			
		||||
MSG_KEY_START_CASE = 'start_case'
 | 
			
		||||
MSG_KEY_DEVICE_RESET = 'dev_reset'
 | 
			
		||||
MSG_KEY_HEARTBEAT = 'hb'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class WatchdogReset(BaseHostTest):
 | 
			
		||||
    """Host side test that handles device reset.
 | 
			
		||||
 | 
			
		||||
    Given a device with a watchdog timer started.
 | 
			
		||||
    When the device notifies the host about an incoming reset.
 | 
			
		||||
    Then the host:
 | 
			
		||||
    * keeps track of the test case index of the current test suite,
 | 
			
		||||
    * performs a dev-host handshake.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(WatchdogReset, self).__init__()
 | 
			
		||||
        self.current_case = TestCaseData(0, CASE_DATA_INVALID)
 | 
			
		||||
        self.__handshake_timer = None
 | 
			
		||||
        self.sync_delay = DEFAULT_SYNC_DELAY
 | 
			
		||||
        self.drop_heartbeat_messages = True
 | 
			
		||||
        self.hb_timestamps_us = []
 | 
			
		||||
 | 
			
		||||
    def handshake_timer_start(self, seconds=1.0, pre_sync_fun=None):
 | 
			
		||||
        """Start a new handshake timer."""
 | 
			
		||||
 | 
			
		||||
        def timer_handler():
 | 
			
		||||
            """Perform a dev-host handshake by sending a sync message."""
 | 
			
		||||
            if pre_sync_fun is not None:
 | 
			
		||||
                pre_sync_fun()
 | 
			
		||||
            self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY)
 | 
			
		||||
 | 
			
		||||
        self.__handshake_timer = threading.Timer(seconds, timer_handler)
 | 
			
		||||
        self.__handshake_timer.start()
 | 
			
		||||
 | 
			
		||||
    def handshake_timer_cancel(self):
 | 
			
		||||
        """Cancel the current handshake timer."""
 | 
			
		||||
        try:
 | 
			
		||||
            self.__handshake_timer.cancel()
 | 
			
		||||
        except AttributeError:
 | 
			
		||||
            pass
 | 
			
		||||
        finally:
 | 
			
		||||
            self.__handshake_timer = None
 | 
			
		||||
 | 
			
		||||
    def heartbeat_timeout_handler(self):
 | 
			
		||||
        """Handler for the heartbeat timeout.
 | 
			
		||||
 | 
			
		||||
        Compute the time span of the last heartbeat sequence.
 | 
			
		||||
        Set self.current_case.data_to_send to CASE_DATA_INVALID if no heartbeat was received.
 | 
			
		||||
        Set self.current_case.data_to_send to CASE_DATA_INSUFF_HB if only one heartbeat was
 | 
			
		||||
        received.
 | 
			
		||||
        """
 | 
			
		||||
        self.drop_heartbeat_messages = True
 | 
			
		||||
        dev_data = CASE_DATA_INVALID
 | 
			
		||||
        if len(self.hb_timestamps_us) == 1:
 | 
			
		||||
            dev_data = CASE_DATA_INSUFF_HB
 | 
			
		||||
            self.log('Not enough heartbeats received.')
 | 
			
		||||
        elif len(self.hb_timestamps_us) >= 2:
 | 
			
		||||
            dev_data = int(round(0.001 * (self.hb_timestamps_us[-1] - self.hb_timestamps_us[0])))
 | 
			
		||||
            self.log('Heartbeat time span was {} ms.'.format(dev_data))
 | 
			
		||||
        self.current_case = TestCaseData(self.current_case.index, dev_data)
 | 
			
		||||
 | 
			
		||||
    def setup(self):
 | 
			
		||||
        sync_delay = self.get_config_item('forced_reset_timeout')
 | 
			
		||||
        self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY
 | 
			
		||||
        self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready)
 | 
			
		||||
        self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset)
 | 
			
		||||
        self.register_callback(MSG_KEY_HEARTBEAT, self.cb_heartbeat)
 | 
			
		||||
 | 
			
		||||
    def teardown(self):
 | 
			
		||||
        self.handshake_timer_cancel()
 | 
			
		||||
 | 
			
		||||
    def cb_device_ready(self, key, value, timestamp):
 | 
			
		||||
        """Advance the device test suite to a proper test case.
 | 
			
		||||
 | 
			
		||||
        Additionally, send test case data to the device.
 | 
			
		||||
        """
 | 
			
		||||
        self.handshake_timer_cancel()
 | 
			
		||||
        msg_value = '{0.index:02x},{0.data_to_send:08x}'.format(self.current_case)
 | 
			
		||||
        self.send_kv(MSG_KEY_START_CASE, msg_value)
 | 
			
		||||
        self.drop_heartbeat_messages = False
 | 
			
		||||
        self.hb_timestamps_us = []
 | 
			
		||||
 | 
			
		||||
    def cb_device_reset(self, key, value, timestamp):
 | 
			
		||||
        """Keep track of the test case number.
 | 
			
		||||
 | 
			
		||||
        Also set a new handshake timeout, so when the device gets
 | 
			
		||||
        restarted by the watchdog, the communication will be restored
 | 
			
		||||
        by the __handshake_timer.
 | 
			
		||||
        """
 | 
			
		||||
        self.handshake_timer_cancel()
 | 
			
		||||
        case_num, dev_reset_delay_ms = (int(i, base=16) for i in value.split(','))
 | 
			
		||||
        self.current_case = TestCaseData(case_num, CASE_DATA_PHASE2_OK)
 | 
			
		||||
        self.handshake_timer_start(self.sync_delay + dev_reset_delay_ms / 1000.0)
 | 
			
		||||
 | 
			
		||||
    def cb_heartbeat(self, key, value, timestamp):
 | 
			
		||||
        """Save the timestamp of a heartbeat message.
 | 
			
		||||
 | 
			
		||||
        Additionally, keep track of the test case number.
 | 
			
		||||
 | 
			
		||||
        Also each heartbeat sets a new timeout, so when the device gets
 | 
			
		||||
        restarted by the watchdog, the communication will be restored
 | 
			
		||||
        by the __handshake_timer.
 | 
			
		||||
        """
 | 
			
		||||
        if self.drop_heartbeat_messages:
 | 
			
		||||
            return
 | 
			
		||||
        self.handshake_timer_cancel()
 | 
			
		||||
        case_num, timestamp_us = (int(i, base=16) for i in value.split(','))
 | 
			
		||||
        self.current_case = TestCaseData(case_num, CASE_DATA_INVALID)
 | 
			
		||||
        self.hb_timestamps_us.append(timestamp_us)
 | 
			
		||||
        self.handshake_timer_start(
 | 
			
		||||
            seconds=(MAX_HB_PERIOD + self.sync_delay),
 | 
			
		||||
            pre_sync_fun=self.heartbeat_timeout_handler)
 | 
			
		||||
| 
		 Before Width: | Height: | Size: 41 KiB After Width: | Height: | Size: 41 KiB  | 
		Loading…
	
		Reference in New Issue