mbed-os/workspace_tools/debugger/pyOCD/gdbserver/gdbserver.py

594 lines
18 KiB
Python

"""
mbed CMSIS-DAP debugger
Copyright (c) 2006-2013 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging, threading, socket
from pyOCD.target.cortex_m import CORE_REGISTER
from pyOCD.target.target import TARGET_HALTED
from struct import unpack
from time import sleep
import sys
from gdb_socket import GDBSocket
from gdb_websocket import GDBWebSocket
"""
import os
parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0,parentdir)
"""
SIGINT = (2)
SIGSEGV = (11)
SIGILL = (4)
SIGSTOP = (17)
SIGTRAP = (5)
SIGBUS = (10)
FAULT = {0: "17", #SIGSTOP
1: "17",
2: "02", #SIGINT
3: "11", #SIGSEGV
4: "11",
5: "10", #SIGBUS
6: "04", #SIGILL
7: "17",
8: "17",
9: "17",
10: "17",
11: "17",
12: "17",
13: "17",
14: "17",
15: "17",
}
class GDBServer(threading.Thread):
"""
This class start a GDB server listening a gdb connection on a specific port.
It implements the RSP (Remote Serial Protocol).
"""
def __init__(self, board, port_urlWSS):
threading.Thread.__init__(self)
self.board = board
self.target = board.target
self.flash = board.flash
self.abstract_socket = None
self.wss_server = None
self.port = 0
if isinstance(port_urlWSS, str) == True:
self.wss_server = port_urlWSS
else:
self.port = port_urlWSS
self.packet_size = 2048
self.flashData = ""
self.conn = None
self.lock = threading.Lock()
self.shutdown_event = threading.Event()
self.detach_event = threading.Event()
self.quit = False
if self.wss_server == None:
self.abstract_socket = GDBSocket(self.port, self.packet_size)
else:
self.abstract_socket = GDBWebSocket(self.wss_server)
self.start()
def restart(self):
if self.isAlive():
self.detach_event.set()
def stop(self):
if self.isAlive():
self.shutdown_event.set()
while self.isAlive():
pass
logging.info("GDB server thread killed")
self.board.uninit()
def setBoard(self, board, stop = True):
self.lock.acquire()
if stop:
self.restart()
self.board = board
self.target = board.target
self.flash = board.flash
self.lock.release()
return
def run(self):
while True:
new_command = False
data = []
logging.info('GDB server started')
self.shutdown_event.clear()
self.detach_event.clear()
while not self.shutdown_event.isSet() and not self.detach_event.isSet():
connected = self.abstract_socket.connect()
if connected != None:
break
if self.shutdown_event.isSet():
return
if self.detach_event.isSet():
continue
logging.info("One client connected!")
while True:
if self.shutdown_event.isSet():
return
if self.detach_event.isSet():
continue
# read command
while True:
if (new_command == True):
new_command = False
break
try:
if self.shutdown_event.isSet() or self.detach_event.isSet():
break
self.abstract_socket.setBlocking(0)
data = self.abstract_socket.read()
if data.index("$") >= 0 and data.index("#") >= 0:
break
except (ValueError, socket.error):
pass
if self.shutdown_event.isSet():
return
if self.detach_event.isSet():
continue
self.abstract_socket.setBlocking(1)
data = data[data.index("$"):]
self.lock.acquire()
if len(data) != 0:
# decode and prepare resp
[resp, ack, detach] = self.handleMsg(data)
if resp is not None:
# ack
if ack:
resp = "+" + resp
# send resp
self.abstract_socket.write(resp)
# wait a '+' from the client
try:
data = self.abstract_socket.read()
if data[0] != '+':
logging.debug('gdb client has not ack!')
else:
logging.debug('gdb client has ack!')
if data.index("$") >= 0 and data.index("#") >= 0:
new_command = True
except:
pass
if detach:
self.abstract_socket.close()
self.lock.release()
break
self.lock.release()
def handleMsg(self, msg):
if msg[0] != '$':
logging.debug('msg ignored: first char != $')
return None, 0, 0
#logging.debug('-->>>>>>>>>>>> GDB rsp packet: %s', msg)
# query command
if msg[1] == 'q':
return self.handleQuery(msg[2:]), 1, 0
elif msg[1] == 'H':
return self.createRSPPacket(''), 1, 0
elif msg[1] == '?':
return self.lastSignal(), 1, 0
elif msg[1] == 'g':
return self.getRegister(), 1, 0
elif msg[1] == 'p':
return self.readRegister(msg[2:]), 1, 0
elif msg[1] == 'P':
return self.writeRegister(msg[2:]), 1, 0
elif msg[1] == 'm':
return self.getMemory(msg[2:]), 1, 0
elif msg[1] == 'X':
return self.writeMemory(msg[2:]), 1, 0
elif msg[1] == 'v':
return self.flashOp(msg[2:]), 1, 0
# we don't send immediately the response for C and S commands
elif msg[1] == 'C' or msg[1] == 'c':
return self.resume()
elif msg[1] == 'S' or msg[1] == 's':
return self.step()
elif msg[1] == 'Z' or msg[1] == 'z':
return self.breakpoint(msg[1:]), 1, 0
elif msg[1] == 'D':
return self.detach(msg[1:]), 1, 1
elif msg[1] == 'k':
return self.kill(), 1, 1
else:
logging.error("Unknown RSP packet: %s", msg)
return None
def detach(self, data):
resp = "OK"
return self.createRSPPacket(resp)
def kill(self):
return self.createRSPPacket("")
def breakpoint(self, data):
# handle Z1/z1 commands
addr = int(data.split(',')[1], 16)
if data[1] == '1':
if data[0] == 'Z':
if self.target.setBreakpoint(addr) == False:
resp = "ENN"
return self.createRSPPacket(resp)
else:
self.target.removeBreakpoint(addr)
resp = "OK"
return self.createRSPPacket(resp)
return None
def resume(self):
self.ack()
self.target.resume()
self.abstract_socket.setBlocking(0)
val = ''
while True:
sleep(0.01)
try:
data = self.abstract_socket.read()
if (data[0] == '\x03'):
self.target.halt()
val = 'S05'
logging.debug("receive CTRL-C")
break
except:
pass
if self.target.getState() == TARGET_HALTED:
logging.debug("state halted")
val = 'S05'
break
self.target.halt()
ipsr = self.target.readCoreRegister('xpsr')
logging.debug("GDB resume xpsr: 0x%X", ipsr)
if (ipsr & 0x1f) == 3:
val = "S" + FAULT[3]
break
self.target.resume()
self.abstract_socket.setBlocking(1)
return self.createRSPPacket(val), 0, 0
def step(self):
self.ack()
self.target.step()
return self.createRSPPacket("S05"), 0, 0
def halt(self):
self.ack()
self.target.halt()
return self.createRSPPacket("S05"), 0, 0
def flashOp(self, data):
ops = data.split(':')[0]
#logging.debug("flash op: %s", ops)
if ops == 'FlashErase':
self.flash.init()
self.flash.eraseAll()
return self.createRSPPacket("OK")
elif ops == 'FlashWrite':
logging.debug("flash write addr: 0x%s", data.split(':')[1])
# search for second ':' (beginning of data encoded in the message)
second_colon = 0
idx_begin = 0
while second_colon != 2:
if data[idx_begin] == ':':
second_colon += 1
idx_begin += 1
self.flashData += data[idx_begin:len(data) - 3]
return self.createRSPPacket("OK")
# we need to flash everything
elif 'FlashDone' in ops :
flashPtr = 0
unescaped_data = self.unescape(self.flashData)
bytes_to_be_written = len(unescaped_data)
"""
bin = open(os.path.join(parentdir, 'res', 'bad_bin.txt'), "w+")
i = 0
while (i < bytes_to_be_written):
bin.write(str(unescaped_data[i:i+16]) + "\n")
i += 16
"""
logging.info("flashing %d bytes", bytes_to_be_written)
while len(unescaped_data) > 0:
size_to_write = min(self.flash.page_size, len(unescaped_data))
self.flash.programPage(flashPtr, unescaped_data[:size_to_write])
flashPtr += size_to_write
unescaped_data = unescaped_data[size_to_write:]
# print progress bar
sys.stdout.write('\r')
i = int((float(flashPtr)/float(bytes_to_be_written))*20.0)
# the exact output you're looking for:
sys.stdout.write("[%-20s] %d%%" % ('='*i, 5*i))
sys.stdout.flush()
sys.stdout.write("\n\r")
self.flashData = ""
"""
bin.close()
"""
# reset and stop on reset handler
self.target.resetStopOnReset()
return self.createRSPPacket("OK")
elif 'Cont' in ops:
if 'Cont?' in ops:
return self.createRSPPacket("vCont;c;s;t")
return None
def unescape(self, data):
data_idx = 0
# unpack the data into binary array
str_unpack = str(len(data)) + 'B'
data = unpack(str_unpack, data)
data = list(data)
# check for escaped characters
while data_idx < len(data):
if data[data_idx] == 0x7d:
data.pop(data_idx)
data[data_idx] = data[data_idx] ^ 0x20
data_idx += 1
return data
def getMemory(self, data):
split = data.split(',')
addr = int(split[0], 16)
length = split[1]
length = int(length[:len(length)-3],16)
val = ''
mem = self.target.readBlockMemoryUnaligned8(addr, length)
for x in mem:
if x >= 0x10:
val += hex(x)[2:4]
else:
val += '0' + hex(x)[2:3]
return self.createRSPPacket(val)
def writeMemory(self, data):
split = data.split(',')
addr = int(split[0], 16)
length = int(split[1].split(':')[0], 16)
idx_begin = 0
for i in range(len(data)):
if data[i] == ':':
idx_begin += 1
break
idx_begin += 1
data = data[idx_begin:len(data) - 3]
data = self.unescape(data)
if length > 0:
self.target.writeBlockMemoryUnaligned8(addr, data)
return self.createRSPPacket("OK")
def readRegister(self, data):
num = int(data.split('#')[0], 16)
reg = self.target.readCoreRegister(num)
logging.debug("GDB: read reg %d: 0x%X", num, reg)
val = self.intToHexGDB(reg)
return self.createRSPPacket(val)
def writeRegister(self, data):
num = int(data.split('=')[0], 16)
val = data.split('=')[1].split('#')[0]
val = val[6:8] + val[4:6] + val[2:4] + val[0:2]
logging.debug("GDB: write reg %d: 0x%X", num, int(val, 16))
self.target.writeCoreRegister(num, int(val, 16))
return self.createRSPPacket("OK")
def intToHexGDB(self, val):
val = hex(int(val))[2:]
size = len(val)
r = ''
for i in range(8-size):
r += '0'
r += str(val)
resp = ''
for i in range(4):
resp += r[8 - 2*i - 2: 8 - 2*i]
return resp
def getRegister(self):
resp = ''
for i in range(len(CORE_REGISTER)):
reg = self.target.readCoreRegister(i)
resp += self.intToHexGDB(reg)
logging.debug("GDB reg: %s = 0x%X", i, reg)
return self.createRSPPacket(resp)
def lastSignal(self):
fault = self.target.readCoreRegister('xpsr') & 0xff
fault = FAULT[fault]
logging.debug("GDB lastSignal: %s", fault)
return self.createRSPPacket('S' + fault)
def handleQuery(self, msg):
query = msg.split(':')
logging.debug('GDB received query: %s', query)
if query is None:
logging.error('GDB received query packet malformed')
return None
if query[0] == 'Supported':
resp = "qXfer:memory-map:read+;qXfer:features:read+;PacketSize="
resp += hex(self.packet_size)[2:]
return self.createRSPPacket(resp)
elif query[0] == 'Xfer':
if query[1] == 'features' and query[2] == 'read' and \
query[3] == 'target.xml':
data = query[4].split(',')
resp = self.handleQueryXML('read_feature', int(data[0], 16), int(data[1].split('#')[0], 16))
return self.createRSPPacket(resp)
elif query[1] == 'memory-map' and query[2] == 'read':
data = query[4].split(',')
resp = self.handleQueryXML('momery_map', int(data[0], 16), int(data[1].split('#')[0], 16))
return self.createRSPPacket(resp)
else:
return None
elif query[0] == 'C#b4':
return self.createRSPPacket("")
elif query[0].find('Attached') != -1:
return self.createRSPPacket("1")
elif query[0].find('TStatus') != -1:
return self.createRSPPacket("")
elif query[0].find('Tf') != -1:
return self.createRSPPacket("")
elif 'Offsets' in query[0]:
resp = "Text=0;Data=0;Bss=0"
return self.createRSPPacket(resp)
elif 'Symbol' in query[0]:
resp = "OK"
return self.createRSPPacket(resp)
else:
return None
def handleQueryXML(self, query, offset, size):
logging.debug('GDB query %s: offset: %s, size: %s', query, offset, size)
xml = ''
if query == 'momery_map':
xml = self.flash.memoryMapXML
elif query == 'read_feature':
xml = self.target.targetXML
size_xml = len(xml)
prefix = 'm'
if offset > size_xml:
logging.error('GDB: offset target.xml > size!')
return
if size > (self.packet_size - 4):
size = self.packet_size - 4
nbBytesAvailable = size_xml - offset
if size > nbBytesAvailable:
prefix = 'l'
size = nbBytesAvailable
resp = prefix + xml[offset:offset + size]
return resp
def createRSPPacket(self, data):
resp = '$' + data + '#'
c = 0
checksum = 0
for c in data:
checksum += ord(c)
checksum = checksum % 256
checksum = hex(checksum)
if int(checksum[2:], 16) < 0x10:
resp += '0'
resp += checksum[2:]
#logging.debug('--<<<<<<<<<<<< GDB rsp packet: %s', resp)
return resp
def ack(self):
self.abstract_socket.write("+")