import sys from os.path import join, abspath, dirname, exists from shutil import copy from time import sleep, time from SocketServer import BaseRequestHandler, TCPServer from subprocess import Popen, PIPE from threading import Thread from Queue import Queue, Empty import json # Be sure that the tools directory is in the search path ROOT = abspath(join(dirname(__file__), "..")) sys.path.append(ROOT) from workspace_tools.utils import delete_dir_files from workspace_tools.settings import * from workspace_tools.tests import * class ProcessObserver(Thread): def __init__(self, proc): Thread.__init__(self) self.proc = proc self.queue = Queue() self.daemon = True self.active = True self.start() def run(self): while self.active: c = self.proc.stdout.read(1) self.queue.put(c) def stop(self): self.active = False try: self.proc.terminate() except Exception, _: pass def run_host_test(client, name, disk, port, duration): print "{%s}" % name, cmd = ["python", "%s.py" % name, '-p', port, '-d', disk, '-t', str(duration)] proc = Popen(cmd, stdout=PIPE, cwd=HOST_TESTS) obs = ProcessObserver(proc) start = time() line = '' output = [] while (time() - start) < duration: # Give the client a way to interrupt the test try: c = client.recv(1) if c == '!': break except Exception, _: pass try: c = obs.queue.get(block=True, timeout=1) except Empty, _: c = None if c: output.append(c) # Give the mbed under test a way to communicate the end of the test if c in ['\n', '\r']: if '{end}' in line: break line = '' else: line += c # Stop test process obs.stop() # Parse Output result = None for line in ''.join(output).splitlines(): if '{success}' in line: result = "success" if '{failure}' in line: result = "failure" if '{error}' in line: result = "error" if '{end}' in line: break if result is None: result = "{error}" return result class Tester(BaseRequestHandler): def send_result(self, result): report_string = json.dumps({'result': result}) self.request.send(report_string) print "Result: %s" % report_string def handle(self): print "\nRequest", test_spec = self.request.recv(1024) print test_spec, data = json.loads(test_spec) # The MUTs list can be different from server to server, so we have to # provide a way for the client to query the capabilities of this server if 'info' in data: if data['info'] == 'muts': muts_info = json.dumps(MUTs) print muts_info self.request.send(muts_info) return test_id = data['test_id'] test = TEST_MAP[test_id] image = data["image"] duration = data.get("duration", 10) # Find a suitable MUT: mut = None for id, m in MUTs.iteritems(): if m['mcu'] == data['mcu']: mut = m break if mut is None: print "No mbed available: %s" % data['mcu'] self.send_result("{error}") return disk = mut['disk'] port = mut['port'] # Program # When the build and test system were separate, this was relative to a # base network folder base path: join(NETWORK_BASE_PATH, ) image_path = image if not exists(image_path): print "Image file does not exist: %s" % image_path self.send_result("{error}") return delete_dir_files(disk) copy(image_path, disk) # Copy Extra Files if test.extra_files: for f in test.extra_files: copy(f, disk) sleep(1) # Host test self.request.setblocking(0) result = run_host_test(self.request, test.host_test, disk, port, duration) self.send_result(result) def run_test_server(): while True: try: server = TCPServer((SERVER_ADDRESS, SERVER_PORT), Tester) break except Exception, e: print e sleep(1) print "[Test server]" try: server.serve_forever() except KeyboardInterrupt, e: print "\n[CTRL+c] shutdown server" server.shutdown() server.server_close() if __name__ == '__main__': run_test_server()