pull/219/head^2
Przemek Wirkus 2014-03-19 11:55:00 +00:00
commit 8e21b1ac83
6 changed files with 1 additions and 439 deletions

View File

@ -1,165 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import json
from os.path import join, abspath, dirname, exists
import datetime
from time import time
ROOT = abspath(join(dirname(__file__), ".."))
sys.path.insert(0, ROOT)
from workspace_tools.build_api import build_project, build_mbed_libs
from workspace_tools.tests import TEST_MAP, GROUPS
from workspace_tools.client import request_test, get_muts
from workspace_tools.settings import *
from workspace_tools.paths import BUILD_DIR
from workspace_tools.utils import error
from workspace_tools.targets import TARGET_MAP
class TestServer:
def __init__(self):
self.muts = get_muts()
def available(self, target, peripherals=None):
if peripherals is not None:
peripherals = set(peripherals)
for id, mut in self.muts.iteritems():
# Target check
if mut["mcu"] != target: continue
# Peripherals check
if peripherals is not None:
if 'peripherals' not in mut: continue
if not peripherals.issubset(set(mut['peripherals'])): continue
return True
return False
def request(self, target, path, duration, test_id):
return request_test(target, path, test_id, duration)
def print_results(results, label):
print "=== %d %s" % (len(results), label)
for r in results:
print " [%s::%s]: %s" % (r['target'], r['toolchain'], TEST_MAP[r["test_id"]].get_description())
def store_data(path, data):
f = open(path, "w")
f.write(json.dumps(data))
def load_data(path):
f = open(path)
return json.loads(f.read())
if __name__ == "__main__":
start = time()
# Load test specification
if len(sys.argv) != 2: error("You should provide a test specification file")
test_spec_file = sys.argv[1]
if not exists(test_spec_file):
error('The selected test spec file does not exists: "%s"' % test_spec_file)
f = open(test_spec_file)
test_spec = json.load(f)
clean = test_spec.get('clean', False)
test_ids = test_spec.get('test_ids', [])
groups = test_spec.get('test_groups', [])
for group in groups:
tests = GROUPS.get(group, [])
if not tests:
print "WARNING: test group '%s' not found." % group
continue
for test in tests:
if not test in test_ids:
test_ids.append(test)
# Test Server
test_server = TestServer()
# Test Results
tests_results = []
test_time = datetime.datetime.utcnow()
test_report = {
"date": test_time.isoformat(),
"test_server": SERVER_ADDRESS,
"test_results": tests_results
}
successes = []
failures = []
for target, toolchains in test_spec['targets'].iteritems():
for toolchain in toolchains:
print '=== %s::%s ===' % (target, toolchain)
T = TARGET_MAP[target]
build_mbed_libs(T, toolchain)
build_dir = join(BUILD_DIR, "test", target, toolchain)
for test_id, test in TEST_MAP.iteritems():
if test_ids and test_id not in test_ids:
continue
if test.automated and test.is_supported(target, toolchain):
# Check if the server has the capability to serve our test request
if not test_server.available(target, test.peripherals):
print "The test server does not have such device: %s, %s" % (target, test.peripherals)
continue
test_result = {
'target': target,
'toolchain': toolchain,
'test_id': test_id,
}
path = build_project(test.source_dir, join(build_dir, test_id),
T, toolchain, test.dependencies, clean=clean)
test_result_cache = join(dirname(path), "test_result.json")
if not clean and exists(test_result_cache):
test_result = load_data(test_result_cache)
else:
# For an automated test the duration act as a timeout after
# which the test gets interrupted
report = test_server.request(target, path, test.duration, test_id)
test_result['result'] = report['result']
store_data(test_result_cache, test_result)
tests_results.append(test_result)
print str(test_result) + '\n'
if test_result['result'] == 'success':
successes.append(test_result)
else:
failures.append(test_result)
# Print a basic summary
print_results(successes, "Successes")
print_results(failures, "Failures")
# Save the report on a file
report_filename = "test_%s.json" % test_time.strftime("%Y-%m-%d_%H-%M-%S")
store_data(report_filename, test_report)
print "\nCompleted in (%d)sec" % (time() - start)

View File

@ -1,71 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
from socket import *
from workspace_tools.settings import *
def get_server_connection():
sock = socket(AF_INET, SOCK_STREAM)
sock.connect((SERVER_ADDRESS, SERVER_PORT))
return sock
def get_muts():
# Get the data structure about the Mbed Under Test provided by this server
sock = get_server_connection()
sock.send(json.dumps({"info":"muts"}))
muts = sock.recv(1024)
return json.loads(muts)
def request_test(mcu, image_path, test_id, duration=10):
"""
The test_id needs to be provided only for an automated test the server will
load the correspondent Checker class and will return a json report at the
end of the test
"""
test_spec = {
"mcu": mcu,
"image": image_path,
"duration": duration,
"test_id": test_id,
}
sock = get_server_connection()
sock.send(json.dumps(test_spec))
report_buffer = []
try:
while True:
# Here we do not worry about efficiency the most important feature
# for the developer in front of the terminal is responsiveness
c = sock.recv(1)
if not c: break
report_buffer.append(c)
except KeyboardInterrupt, _:
print "\n[CTRL+c] closing connection"
sock.send("!")
print '\nTest completed'
sock.close()
try:
return json.loads(''.join(report_buffer))
except Exception, _:
return {"result": "error"}

View File

@ -1,6 +0,0 @@
{
"targets": {
"LPC1768": ["ARM", "GCC_ARM"],
"LPC11U24": ["uARM"]
}
}

View File

@ -77,7 +77,7 @@ if __name__ == '__main__':
for toolchain, target in [
('uvision', 'LPC1768'), ('uvision', 'LPC11U24'), ('uvision', 'KL25Z'), ('uvision', 'LPC1347'), ('uvision', 'LPC1114'), ('uvision', 'LPC4088'),
('uvision', 'NUCLEO_F103RB'), ('uvision', 'NUCLEO_L152RE'), ('uvision', 'NUCLEO_F401RE'), ('uvision', 'NUCLEO_F030R8'), ('uvision', 'LPC11U35_501'),
('uvision', 'NUCLEO_F103RB'), ('uvision', 'NUCLEO_L152RE'), ('uvision', 'NUCLEO_F401RE'), ('uvision', 'NUCLEO_F030R8'),
('codered', 'LPC1768'), ('codered', 'LPC4088'),('codered', 'LPC1114'),
('codered', 'LPC11U35_401'),

View File

@ -1,196 +0,0 @@
"""
mbed SDK
Copyright (c) 2011-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
from os.path import join, abspath, dirname, exists
from shutil import copy
from time import sleep, time
from SocketServer import BaseRequestHandler, TCPServer
from subprocess import Popen, PIPE
from threading import Thread
from Queue import Queue, Empty
import json
# Be sure that the tools directory is in the search path
ROOT = abspath(join(dirname(__file__), ".."))
sys.path.insert(0, ROOT)
from workspace_tools.utils import delete_dir_files
from workspace_tools.settings import *
from workspace_tools.tests import *
from workspace_tools.targets import TARGET_MAP
class ProcessObserver(Thread):
def __init__(self, proc):
Thread.__init__(self)
self.proc = proc
self.queue = Queue()
self.daemon = True
self.active = True
self.start()
def run(self):
while self.active:
c = self.proc.stdout.read(1)
self.queue.put(c)
def stop(self):
self.active = False
try:
self.proc.terminate()
except Exception, _:
pass
def run_host_test(client, name, disk, port, duration, extra_serial):
print "{%s}" % name,
cmd = ["python", "%s.py" % name, '-p', port, '-d', disk, '-t', str(duration), "-e", extra_serial]
proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS)
obs = ProcessObserver(proc)
start = time()
line = ''
output = []
while (time() - start) < duration:
# Give the client a way to interrupt the test
try:
c = client.recv(1)
if c == '!':
break
except Exception, _:
pass
try:
c = obs.queue.get(block=True, timeout=1)
except Empty, _:
c = None
if c:
output.append(c)
# Give the mbed under test a way to communicate the end of the test
if c in ['\n', '\r']:
if '{end}' in line: break
line = ''
else:
line += c
# Stop test process
obs.stop()
# Parse Output
result = None
for line in ''.join(output).splitlines():
if '{success}' in line: result = "success"
if '{failure}' in line: result = "failure"
if '{error}' in line: result = "error"
if '{end}' in line: break
if result is None:
result = "{error}"
return result
class Tester(BaseRequestHandler):
def send_result(self, result):
report_string = json.dumps({'result': result})
self.request.send(report_string)
print "Result: %s" % report_string
def handle(self):
print "\nRequest",
test_spec = self.request.recv(1024)
print test_spec,
data = json.loads(test_spec)
# The MUTs list can be different from server to server, so we have to
# provide a way for the client to query the capabilities of this server
if 'info' in data:
if data['info'] == 'muts':
muts_info = json.dumps(MUTs)
print muts_info
self.request.send(muts_info)
return
test_id = data['test_id']
test = TEST_MAP[test_id]
image = data["image"]
duration = data.get("duration", 10)
# Find a suitable MUT:
mut = None
for id, m in MUTs.iteritems():
if m['mcu'] == data['mcu']:
mut = m
break
if mut is None:
print "No mbed available: %s" % data['mcu']
self.send_result("{error}")
return
disk = mut['disk']
port = mut['port']
extra_serial = mut.get('extra_serial', "")
target = TARGET_MAP[mut['mcu']]
# Program
# When the build and test system were separate, this was relative to a
# base network folder base path: join(NETWORK_BASE_PATH, )
image_path = image
if not exists(image_path):
print "Image file does not exist: %s" % image_path
self.send_result("{error}")
return
if not target.is_disk_virtual:
delete_dir_files(disk)
copy(image_path, disk)
# Copy Extra Files
if not target.is_disk_virtual and test.extra_files:
for f in test.extra_files:
copy(f, disk)
sleep(target.program_cycle_s())
# Host test
self.request.setblocking(0)
result = run_host_test(self.request, test.host_test, disk, port, duration, extra_serial)
self.send_result(result)
def run_test_server():
while True:
try:
server = TCPServer((SERVER_ADDRESS, SERVER_PORT), Tester)
break
except Exception, e:
print e
sleep(1)
print "[Test server]"
try:
server.serve_forever()
except KeyboardInterrupt, e:
print "\n[CTRL+c] shutdown server"
server.shutdown()
server.server_close()
if __name__ == '__main__':
run_test_server()