""" Copyright (c) 2018-2019 Arm Limited and affiliates. SPDX-License-Identifier: Apache-2.0 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import collections import threading from mbed_host_tests import BaseHostTest TestCaseData = collections.namedtuple('TestCaseData', ['index', 'data_to_send']) DEFAULT_SYNC_DELAY = 4.0 MAX_HB_PERIOD = 2.5 # [s] Max expected heartbeat period. MSG_VALUE_DUMMY = '0' CASE_DATA_INVALID = 0xffffffff CASE_DATA_PHASE2_OK = 0xfffffffe CASE_DATA_INSUFF_HB = 0x0 MSG_KEY_SYNC = '__sync' MSG_KEY_DEVICE_READY = 'ready' MSG_KEY_START_CASE = 'start_case' MSG_KEY_DEVICE_RESET = 'dev_reset' MSG_KEY_HEARTBEAT = 'hb' class WatchdogReset(BaseHostTest): """Host side test that handles device reset. Given a device with a watchdog timer started. When the device notifies the host about an incoming reset. Then the host: * keeps track of the test case index of the current test suite, * performs a dev-host handshake. """ def __init__(self): super(WatchdogReset, self).__init__() self.current_case = TestCaseData(0, CASE_DATA_INVALID) self.__handshake_timer = None self.sync_delay = DEFAULT_SYNC_DELAY self.drop_heartbeat_messages = True self.hb_timestamps_us = [] def handshake_timer_start(self, seconds=1.0, pre_sync_fun=None): """Start a new handshake timer.""" def timer_handler(): """Perform a dev-host handshake by sending a sync message.""" if pre_sync_fun is not None: pre_sync_fun() self.send_kv(MSG_KEY_SYNC, MSG_VALUE_DUMMY) self.__handshake_timer = threading.Timer(seconds, timer_handler) self.__handshake_timer.start() def handshake_timer_cancel(self): """Cancel the current handshake timer.""" try: self.__handshake_timer.cancel() except AttributeError: pass finally: self.__handshake_timer = None def heartbeat_timeout_handler(self): """Handler for the heartbeat timeout. Compute the time span of the last heartbeat sequence. Set self.current_case.data_to_send to CASE_DATA_INVALID if no heartbeat was received. Set self.current_case.data_to_send to CASE_DATA_INSUFF_HB if only one heartbeat was received. """ self.drop_heartbeat_messages = True dev_data = CASE_DATA_INVALID if len(self.hb_timestamps_us) == 1: dev_data = CASE_DATA_INSUFF_HB self.log('Not enough heartbeats received.') elif len(self.hb_timestamps_us) >= 2: dev_data = int(round(0.001 * (self.hb_timestamps_us[-1] - self.hb_timestamps_us[0]))) self.log('Heartbeat time span was {} ms.'.format(dev_data)) self.current_case = TestCaseData(self.current_case.index, dev_data) def setup(self): sync_delay = self.get_config_item('forced_reset_timeout') self.sync_delay = sync_delay if sync_delay is not None else DEFAULT_SYNC_DELAY self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready) self.register_callback(MSG_KEY_DEVICE_RESET, self.cb_device_reset) self.register_callback(MSG_KEY_HEARTBEAT, self.cb_heartbeat) def teardown(self): self.handshake_timer_cancel() def cb_device_ready(self, key, value, timestamp): """Advance the device test suite to a proper test case. Additionally, send test case data to the device. """ self.handshake_timer_cancel() msg_value = '{0.index:02x},{0.data_to_send:08x}'.format(self.current_case) self.send_kv(MSG_KEY_START_CASE, msg_value) self.drop_heartbeat_messages = False self.hb_timestamps_us = [] def cb_device_reset(self, key, value, timestamp): """Keep track of the test case number. Also set a new handshake timeout, so when the device gets restarted by the watchdog, the communication will be restored by the __handshake_timer. """ self.handshake_timer_cancel() case_num, dev_reset_delay_ms = (int(i, base=16) for i in value.split(',')) self.current_case = TestCaseData(case_num, CASE_DATA_PHASE2_OK) self.handshake_timer_start(self.sync_delay + dev_reset_delay_ms / 1000.0) def cb_heartbeat(self, key, value, timestamp): """Save the timestamp of a heartbeat message. Additionally, keep track of the test case number. Also each heartbeat sets a new timeout, so when the device gets restarted by the watchdog, the communication will be restored by the __handshake_timer. """ if self.drop_heartbeat_messages: return self.handshake_timer_cancel() case_num, timestamp_us = (int(i, base=16) for i in value.split(',')) self.current_case = TestCaseData(case_num, CASE_DATA_INVALID) self.hb_timestamps_us.append(timestamp_us) self.handshake_timer_start( seconds=(MAX_HB_PERIOD + self.sync_delay), pre_sync_fun=self.heartbeat_timeout_handler)