Add USB mass storage test

pull/9444/head
Maciej Bocianski 2019-01-21 15:44:20 +01:00
parent 0cd9d24d08
commit 7ff689be20
5 changed files with 854 additions and 1 deletions

View File

@ -0,0 +1,240 @@
"""
Copyright (c) 2019, Arm Limited and affiliates.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from mbed_host_tests import BaseHostTest
import time
import psutil
import tempfile
import uuid
import os
import platform
import subprocess
import sys
system_name = platform.system()
if system_name == "Windows":
import wmi
class PyusbMSDTest(BaseHostTest):
"""Host side test for USB MSD class."""
__result = None
MOUNT_WAIT_TIME = 25 # in [s]
initial_disk_list = None
msd_disk = None
serial_number = None
def _callback_device_ready(self, key, value, timestamp):
"""Send a unique USB SN to the device.
DUT uses this SN every time it connects to host as a USB device.
"""
self.serial_number = uuid.uuid4().hex # 32 hex digit string
self.send_kv("serial_number", self.serial_number)
def _callback_check_file_exist(self, key, value, timestamp):
"""Check if file exist.
"""
folder_name, file_name, file_content = value.split(' ')
msd_disk = MSDUtils.disk_path(self.serial_number)
file_path = os.path.join(msd_disk, folder_name, file_name)
try:
file = open(file_path, 'r')
line = file.readline()
file.close()
time.sleep(2) # wait for msd communication done
if line == file_content:
self.send_kv("exist", "0")
return
self.report_error("file content invalid")
except IOError as err:
self.log('{} !!!'.format(err))
self.send_kv("non-exist", "0")
def _callback_delete_files(self, key, value, timestamp):
"""Delete test file.
"""
dir_name, file_name = value.split(' ')
msd_disk = MSDUtils.disk_path(self.serial_number)
try:
os.remove(os.path.join(msd_disk, dir_name, file_name))
except:
self.report_error("delete files")
return
time.sleep(2) # wait for msd communication done
self.report_success()
def _callback_check_if_mounted(self, key, value, timestamp):
"""Check if disk was mounted.
"""
wait_time = self.MOUNT_WAIT_TIME
while wait_time != 0:
msd_disk = MSDUtils.disk_path(self.serial_number)
if msd_disk is not None:
# MSD disk found
time.sleep(2) # wait for msd communication done
self.report_success()
return
wait_time -= 1
time.sleep(1) # wait 1s and try again
self.report_error("mount check")
def _callback_check_if_not_mounted(self, key, value, timestamp):
"""Check if disk was unmouted.
"""
wait_time = self.MOUNT_WAIT_TIME
while wait_time != 0:
msd_disk = MSDUtils.disk_path(self.serial_number)
if msd_disk is None:
#self.msd_disk = None
time.sleep(2) # wait for msd communication done
self.report_success()
return
wait_time -= 1
time.sleep(1) # wait 1s and try again
self.report_error("unmount check")
def _callback_get_mounted_fs_size(self, key, value, timestamp):
"""Record visible filesystem size.
"""
stats = psutil.disk_usage(MSDUtils.disk_path(self.serial_number))
self.send_kv("{}".format(stats.total), "0")
def _callback_unmount(self, key, value, timestamp):
"""Disk unmount.
"""
if MSDUtils.unmount(serial=self.serial_number):
self.report_success()
else:
self.report_error("unmount")
def setup(self):
self.register_callback("get_serial_number", self._callback_device_ready)
self.register_callback('check_if_mounted', self._callback_check_if_mounted)
self.register_callback('check_if_not_mounted', self._callback_check_if_not_mounted)
self.register_callback('get_mounted_fs_size', self._callback_get_mounted_fs_size)
self.register_callback('check_file_exist', self._callback_check_file_exist)
self.register_callback('delete_files', self._callback_delete_files)
self.register_callback('unmount', self._callback_unmount)
def report_success(self):
self.send_kv("passed", "0")
def report_error(self, msg):
self.log('{} failed !!!'.format(msg))
self.send_kv("failed", "0")
def result(self):
return self.__result
def teardown(self):
pass
class MSDUtils(object):
@staticmethod
def disk_path(serial):
system_name = platform.system()
if system_name == "Windows":
return MSDUtils._disk_path_windows(serial)
elif system_name == "Linux":
return MSDUtils._disk_path_linux(serial)
elif system_name == "Darwin":
return MSDUtils._disk_path_mac(serial)
return None
@staticmethod
def unmount(serial):
system_name = platform.system()
if system_name == "Windows":
return MSDUtils._unmount_windows(serial)
elif system_name == "Linux":
return MSDUtils._unmount_linux(serial)
elif system_name == "Darwin":
return MSDUtils._unmount_mac(serial)
return False
@staticmethod
def _disk_path_windows(serial):
serial_decoded = serial.encode("ascii")
c = wmi.WMI()
for physical_disk in c.Win32_DiskDrive():
if serial_decoded == physical_disk.SerialNumber:
for partition in physical_disk.associators("Win32_DiskDriveToDiskPartition"):
for logical_disk in partition.associators("Win32_LogicalDiskToPartition"):
return logical_disk.Caption
return None
@staticmethod
def _disk_path_linux(serial):
output = subprocess.check_output(['lsblk', '-dnoserial,mountpoint']).split('\n')
for line in output:
serial_and_mount_point = line.split()
if len(serial_and_mount_point) == 2:
if serial_and_mount_point[0] == str(serial):
return serial_and_mount_point[1]
return None
@staticmethod
def _disk_path_mac(serial):
# TODO:
# add implementation
return None
@staticmethod
def _unmount_windows(serial):
disk_path = MSDUtils._disk_path_windows(serial)
tmp_file = tempfile.NamedTemporaryFile(suffix='.ps1', delete=False)
try:
# create unmount script
tmp_file.write('$disk_leter=$args[0]\n')
tmp_file.write('$driveEject = New-Object -comObject Shell.Application\n')
tmp_file.write('$driveEject.Namespace(17).ParseName($disk_leter).InvokeVerb("Eject")\n')
# close to allow open by other process
tmp_file.close()
try_count = 10
while try_count:
p = subprocess.Popen(["powershell.exe", tmp_file.name + " " + disk_path], stdout=sys.stdout)
p.communicate()
try_count -= 1
if MSDUtils._disk_path_windows(serial) is None:
return True
time.sleep(1)
finally:
os.remove(tmp_file.name)
return False
@staticmethod
def _unmount_linux(serial):
disk_path = MSDUtils._disk_path_linux(serial)
os.system("umount " + disk_path)
return MSDUtils._disk_path_linux(serial) is None
@staticmethod
def _unmount_mac(serial):
disk_path = MSDUtils._disk_path_mac(serial)
os.system("diskutil unmount " + disk_path)
disks = set(MSDUtils._disks_mac())
return MSDUtils._disk_path_mac(serial) is None

View File

@ -0,0 +1,12 @@
# USB mass storage test user guide
To run the tests-usb_device-msd test device with at least *70kB* of RAM is required.
Test creates 64kB `HeapBlockDevice` as block device and mounts FAT32 filesystem on it.
64kB block device is the smallest one that can mount FAT32 filesystem.
Test can be easily extended to use any block device available in Mbed
Test run command:
```bash
mbed test -t COMPILER -m TARGET -n tests-usb_device-msd
```

View File

@ -0,0 +1,133 @@
/*
* Copyright (c) 2019, Arm Limited and affiliates.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef Test_USBMSD_H
#define Test_USBMSD_H
#include "USBMSD.h"
#define USB_DEV_SN_LEN (32) // 32 hex digit UUID
#define USB_DEV_SN_DESC_SIZE (USB_DEV_SN_LEN * 2 + 2)
/**
* Convert a C style ASCII to a USB string descriptor
*
* @param usb_desc output buffer for the USB string descriptor
* @param str ASCII string
* @param n size of usb_desc buffer, even number
* @returns number of bytes returned in usb_desc or -1 on failure
*/
int ascii2usb_string_desc(uint8_t *usb_desc, const char *str, size_t n)
{
if (str == NULL || usb_desc == NULL || n < 4) {
return -1;
}
if (n % 2 != 0) {
return -1;
}
size_t s, d;
// set bString (@ offset 2 onwards) as a UNICODE UTF-16LE string
memset(usb_desc, 0, n);
for (s = 0, d = 2; str[s] != '\0' && d < n; s++, d += 2) {
usb_desc[d] = str[s];
}
// set bLength @ offset 0
usb_desc[0] = d;
// set bDescriptorType @ offset 1
usb_desc[1] = STRING_DESCRIPTOR;
return d;
}
class TestUSBMSD: public USBMSD {
public:
TestUSBMSD(BlockDevice *bd, bool connect_blocking = true, uint16_t vendor_id = 0x0703, uint16_t product_id = 0x0104,
uint16_t product_release = 0x0001)
: USBMSD(bd, connect_blocking, vendor_id, product_id, product_release)
{
}
virtual ~TestUSBMSD()
{
}
uint32_t get_read_counter()
{
return read_counter;
}
uint32_t get_program_counter()
{
return program_counter;
}
void reset_counters()
{
read_counter = program_counter = erase_counter = 0;
}
static void setup_serial_number()
{
char _key[128] = { 0 };
char _value[128] = { 0 };
greentea_send_kv("get_serial_number", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("serial_number", _key);
usb_dev_sn[USB_DEV_SN_LEN] = '\0';
memcpy(usb_dev_sn, _value, USB_DEV_SN_LEN);
ascii2usb_string_desc(_serial_num_descriptor, usb_dev_sn, USB_DEV_SN_DESC_SIZE);
}
virtual const uint8_t *string_iserial_desc()
{
return (const uint8_t *)_serial_num_descriptor;
}
static volatile uint32_t read_counter;
static volatile uint32_t program_counter;
static volatile uint32_t erase_counter;
protected:
virtual int disk_read(uint8_t *data, uint64_t block, uint8_t count)
{
read_counter++;
return USBMSD::disk_read(data, block, count);
}
virtual int disk_write(const uint8_t *data, uint64_t block, uint8_t count)
{
erase_counter++;
program_counter++;
return USBMSD::disk_write(data, block, count);
}
private:
static uint8_t _serial_num_descriptor[USB_DEV_SN_DESC_SIZE];
static char usb_dev_sn[USB_DEV_SN_LEN + 1];
};
uint8_t TestUSBMSD::_serial_num_descriptor[USB_DEV_SN_DESC_SIZE] = { 0 };
char TestUSBMSD::usb_dev_sn[USB_DEV_SN_LEN + 1] = { 0 };
volatile uint32_t TestUSBMSD::read_counter = 0;
volatile uint32_t TestUSBMSD::program_counter = 0;
volatile uint32_t TestUSBMSD::erase_counter = 0;
#endif // Test_USBMSD_H

View File

@ -0,0 +1,468 @@
/*
* Copyright (c) 2019, Arm Limited and affiliates.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h> /* srand, rand */
#include "greentea-client/test_env.h"
#include "unity/unity.h"
#include "utest/utest.h"
#include "mbed.h"
#include "USBMSD.h"
#include "TestUSBMSD.h"
#include "HeapBlockDevice.h"
#include "FATFileSystem.h"
#if !defined(DEVICE_USBDEVICE) || !DEVICE_USBDEVICE
#error [NOT_SUPPORTED] USB Device not supported for this target
#endif
#ifdef MIN
#undef MIN
#endif
#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y))
#define DEFAULT_BLOCK_SIZE 512
#define HEAP_BLOCK_DEVICE_SIZE (128 * DEFAULT_BLOCK_SIZE)
#define MIN_HEAP_SIZE (HEAP_BLOCK_DEVICE_SIZE + 6144)
/* TODO:
*
* Test if slave(DUT) can force host to refresh mounted fs. (not supported in USBMSD yet)
*
*/
#define TEST_DIR "usb_msd_test_data"
#define TEST_FILE "usb_msd_test_file"
#define TEST_STRING "usb_msd_test_string"
using namespace utest::v1;
uint32_t prev_read_counter = 0;
uint32_t prev_program_counter = 0;
extern uint32_t mbed_heap_size;
static char _key[256] = { 0 };
static char _value[128] = { 0 };
static volatile bool msd_process_done = false;
FATFileSystem heap_fs("heap_fs");
Semaphore media_remove_event(0, 1);
/** Creates heap block device
*
*/
BlockDevice *get_heap_block_device()
{
// create 64kB heap block device
if (mbed_heap_size >= MIN_HEAP_SIZE) {
static HeapBlockDevice bd(128 * DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
bd.init();
return &bd;
} else {
return NULL;
}
}
uint64_t get_fs_mount_size(FileSystem *fs)
{
struct statvfs stat;
fs->statvfs(fs->getName(), &stat);
uint64_t size = stat.f_bsize * stat.f_blocks;
return size;
}
/**
* Create test data
*
* @param fs_root filesystem path
*/
static bool test_files_create(const char *fs_root, const char *test_file = TEST_FILE, const char *test_string = TEST_STRING)
{
char path[128];
sprintf(path, "/%s/%s", fs_root, TEST_DIR);
int ret = mkdir(path, 0777);
if (ret != 0 && errno != EEXIST) {
utest_printf("mkdir failed!!! errno: %d\n", errno);
return false;
}
sprintf(path, "/%s/%s/%s", fs_root, TEST_DIR, test_file);
FILE *f = fopen(path, "w");
if (f == NULL) {
utest_printf("fopen failed!!! errno: %d\n", errno);
return false;
}
fprintf(f, test_string);
fflush(f);
fclose(f);
return true;
}
/**
* Remove test data
*
* @param fs_root filesystem path
*/
static void test_files_remove(const char *fs_root)
{
DIR *dir;
struct dirent *dp;
char path[512];
sprintf(path, "/%s/%s", fs_root, TEST_DIR);
dir = opendir(path);
if (dir == NULL) {
return;
}
while ((dp = readdir(dir)) != NULL) {
sprintf(path, "/%s/%s/%s", fs_root, TEST_DIR, dp->d_name);
remove(path);
}
sprintf(path, "/%s/%s", fs_root, TEST_DIR);
remove(path);
}
/**
* Check if test data exist
*
* @param fs_root filesystem path
* @return true if data exist
*/
static bool test_files_exist(const char *fs_root, const char *test_file = TEST_FILE, const char *test_string = TEST_STRING)
{
char path[128];
char str[512] = { 0 };
sprintf(path, "/%s/%s/%s", fs_root, TEST_DIR, test_file);
FILE *f = fopen(path, "r");
if (f != NULL) {
fscanf(f, "%s", str);
if (strcmp(test_string, str) == 0) {
return true;
}
}
return false;
}
/**
* Mounts a filesystem to a block device
*
* @param bd block device
* @param fs filesystem
* @return true if success, false otherwise
*/
static bool prepare_storage(BlockDevice *bd, FileSystem *fs)
{
const char *fs_root = fs->getName();
int err = fs->mount(bd);
if (err) {
utest_printf("%s filesystem mount failed\ntry to reformat device... ", fs->getName());
err = fs->reformat(bd);
if (err) {
utest_printf("failed !!!\n");
return false;
} else {
utest_printf("succeed\n");
}
}
// remove old test data
test_files_remove(fs_root);
return true;
}
void run_processing(Semaphore *sem)
{
sem->release();
}
void msd_process(USBMSD *msd)
{
Semaphore proc;
msd->attach(callback(run_processing, &proc));
while (!msd_process_done) {
proc.wait(100);
msd->process();
if (msd->media_removed()) {
media_remove_event.release();
}
}
msd->attach(NULL);
}
// wait until msd negotiation is done (no r/w disk operation for at least 1s)
// max wait time is 15s
#define WAIT_MSD_COMMUNICATION_DONE() \
for (int x = 0; x < 15; x++) { \
prev_read_counter = usb.get_read_counter();\
prev_program_counter = usb.get_program_counter();\
ThisThread::sleep_for(1000);\
if ((usb.get_read_counter() == prev_read_counter) && \
(usb.get_program_counter() == prev_program_counter)) {\
break;\
}\
}
#define TEST_ASSERT_EQUAL_STRING_LOOP(expected, actual, loop_index) \
if (strcmp(expected, actual) != 0) { \
char str[128]; \
sprintf(str, "expected %s was %s (loop index: %lu)", expected, actual, loop_index); \
TEST_ASSERT_MESSAGE(false, str); \
}
#define TEST_ASSERT_EQUAL_LOOP(expected, actual, loop_index) \
if (expected != actual) { \
char str[128]; \
sprintf(str, "expected %d was %d (loop index: %lu)", expected, actual, loop_index); \
TEST_ASSERT_MESSAGE(false, str); \
}
/** Initialize storages
*
* Given the DUT USB mass storage device
* When DUT has enought heap memory
* Then initialize heap block device for tests
* When DUT has any falsh block device
* Then initialize it for tests
*/
void storage_init()
{
if (mbed_heap_size >= MIN_HEAP_SIZE) {
FATFileSystem::format(get_heap_block_device());
bool result = prepare_storage(get_heap_block_device(), &heap_fs);
TEST_ASSERT_MESSAGE(result, "heap storage initialisation failed");
} else {
utest_printf("Not enough heap memory for HeapBlockDevice creation. Heap block device init skipped!!!\n");
}
}
/** Test mass storage device mount and unmount
*
* Given the DUT USB mass storage device connected to the host
* When DUT call @USBMSD::connect
* Then host detects mass storage device is mounted as removable disk drive and it reports valid filesystem size
* When DUT call @USBMSD::disconnect
* Then host detects mass storage device is unmounted (ejected)
*
* Given the DUT USB mass storage device connected to the host and mounted
* When host unmounts (ejects) mass storage device
* Then DUT detects media remove event
*/
template <uint32_t N>
void mount_unmount_test(BlockDevice *bd, FileSystem *fs)
{
Thread msd_thread(osPriorityHigh);
TestUSBMSD usb(bd, false);
msd_process_done = false;
msd_thread.start(callback(msd_process, &usb));
for (uint32_t i = 1; i <= N; i++) {
// mount
usb.connect();
WAIT_MSD_COMMUNICATION_DONE();
// check if device is mounted on host side
greentea_send_kv("check_if_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING_LOOP("passed", _key, i);
greentea_send_kv("get_mounted_fs_size", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
uint64_t ret_size = atoll(_key);
TEST_ASSERT_EQUAL_UINT64(get_fs_mount_size(fs), ret_size);
// unmount
usb.disconnect();
// check if device is detached on host side
greentea_send_kv("check_if_not_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING_LOOP("passed", _key, i);
}
for (uint32_t i = 1; i <= N; i++) {
// mount
usb.connect();
WAIT_MSD_COMMUNICATION_DONE();
// check if device is mounted on host side
greentea_send_kv("check_if_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING_LOOP("passed", _key, i);
greentea_send_kv("get_mounted_fs_size", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
uint64_t ret_size = atoll(_key);
TEST_ASSERT_EQUAL_UINT64(get_fs_mount_size(fs), ret_size);
// unmount msd device on host side
greentea_send_kv("unmount", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING_LOOP("passed", _key, i);
// wait for unmount event (set 10s timeout)
media_remove_event.wait(10000);
if (!usb.media_removed()) {
TEST_ASSERT_EQUAL_LOOP(true, usb.media_removed(), i);
}
// unmount since media_removed doesn't disconnects device side
usb.disconnect();
// check if device is detached on host side
greentea_send_kv("check_if_not_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING_LOOP("passed", _key, i);
}
// mount
usb.connect();
WAIT_MSD_COMMUNICATION_DONE();
// check if device is mounted on host side
greentea_send_kv("check_if_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("passed", _key);
// unmount
usb.disconnect();
// check if device is detached on host side
greentea_send_kv("check_if_not_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("passed", _key);
msd_process_done = true; // terminate msd_thread
msd_thread.join();
}
/** Test mass storage device mount and unmount together with underlying file system operations
*
* Given the DUT USB mass storage device connected to the host
* When DUT call @USBMSD::connect
* Then host detects that mass storage device is mounted as removable disk drive and test files are present
* When DUT call @USBMSD::disconnect
* Then host detects mass storage device is unmounted (ejected)
*
* Given the DUT USB mass storage device connected to the host and already mounted
* When host unmounts (ejects) mass storage device
* Then DUT detects media remove event
*/
void mount_unmount_and_data_test(BlockDevice *bd, FileSystem *fs)
{
const char *fs_root = fs->getName();
Thread msd_thread(osPriorityHigh);
TestUSBMSD usb(bd, false);
msd_process_done = false;
msd_thread.start(callback(msd_process, &usb));
// mount
usb.connect();
WAIT_MSD_COMMUNICATION_DONE();
// check if device is mounted on host side
greentea_send_kv("check_if_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("passed", _key);
greentea_send_kv("check_file_exist", TEST_DIR " " TEST_FILE " " TEST_STRING);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("non-exist", _key);
usb.disconnect();
// check if device is detached on host side
greentea_send_kv("check_if_not_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("passed", _key);
test_files_create(fs_root);
TEST_ASSERT(test_files_exist(fs_root));
usb.connect();
WAIT_MSD_COMMUNICATION_DONE();
// check if device is mounted on host side
greentea_send_kv("check_if_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("passed", _key);
greentea_send_kv("check_file_exist", TEST_DIR " " TEST_FILE " " TEST_STRING);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("exist", _key);
greentea_send_kv("delete_files", TEST_DIR " " TEST_FILE);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("passed", _key);
do {
wait_ms(1);
} while (test_files_exist(fs_root));
TEST_ASSERT_EQUAL(false, test_files_exist(fs_root));
usb.disconnect();
// check if device is detached on host side
greentea_send_kv("check_if_not_mounted", 0);
greentea_parse_kv(_key, _value, sizeof(_key), sizeof(_value));
TEST_ASSERT_EQUAL_STRING("passed", _key);
msd_process_done = true; // terminate msd_thread
msd_thread.join();
test_files_remove(fs_root);
}
void heap_block_device_mount_unmount_test()
{
if (mbed_heap_size < MIN_HEAP_SIZE) {
TEST_SKIP_MESSAGE("Not enough heap memory for HeapBlockDevice creation");
return;
}
mount_unmount_test<3>(get_heap_block_device(), &heap_fs);
}
void heap_block_device_mount_unmount_and_data_test()
{
if (mbed_heap_size < MIN_HEAP_SIZE) {
TEST_SKIP_MESSAGE("Not enough heap memory for HeapBlockDevice creation");
return;
}
mount_unmount_and_data_test(get_heap_block_device(), &heap_fs);
}
Case cases[] = {
Case("storage initialization", storage_init),
Case("mount/unmount test - Heap block device", heap_block_device_mount_unmount_test),
Case("mount/unmount and data test - Heap block device", heap_block_device_mount_unmount_and_data_test),
};
utest::v1::status_t greentea_test_setup(const size_t number_of_cases)
{
GREENTEA_SETUP(300, "pyusb_msd");
utest::v1::status_t status = greentea_test_setup_handler(number_of_cases);
TestUSBMSD::setup_serial_number();
return status;
}
Specification specification(greentea_test_setup, cases, greentea_test_teardown_handler);
int main()
{
Harness::run(specification);
}

View File

@ -219,7 +219,7 @@ bool USBMSD::media_removed()
return _media_removed;
}
int USBMSD::disk_read(uint8_t* data, uint64_t block, uint8_t count)
int USBMSD::disk_read(uint8_t *data, uint64_t block, uint8_t count)
{
bd_addr_t addr = block * _bd->get_erase_size();
bd_size_t size = count * _bd->get_erase_size();