From 7bfffdbbfe7b6ca42575c176083a2c9466692e6e Mon Sep 17 00:00:00 2001 From: Michael Buesch Date: Thu, 26 Apr 2012 19:06:16 +0200 Subject: Move hardware access routines to programmer implementation Signed-off-by: Michael Buesch --- libtoprammer/command_queue.py | 69 +++++++ libtoprammer/hardware_access_usb.py | 133 ++++++++++++ libtoprammer/main.py | 353 +++++++------------------------- libtoprammer/top2049/hardware_access.py | 183 +++++++++++++++++ libtoprammer/top_devices.py | 1 + toprammer | 13 +- toprammer-gui | 24 +-- 7 files changed, 477 insertions(+), 299 deletions(-) create mode 100644 libtoprammer/command_queue.py create mode 100644 libtoprammer/hardware_access_usb.py create mode 100644 libtoprammer/top2049/hardware_access.py diff --git a/libtoprammer/command_queue.py b/libtoprammer/command_queue.py new file mode 100644 index 0000000..233b8ac --- /dev/null +++ b/libtoprammer/command_queue.py @@ -0,0 +1,69 @@ +""" +# TOP2049 Open Source programming suite +# +# Generic command queue. +# +# Copyright (c) 2012 Michael Buesch +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" + +from util import * +import time + + +class CommandQueue(object): + "Generic hardware-command queue. Needs to be subclassed." + + def __init__(self, maxPacketBytes, synchronous=False): + self.maxPacketBytes = maxPacketBytes + self.synchronous = synchronous + self.commandQueue = [] + + def queueCommand(self, command): + """Queue a raw command for transmission.""" + assert(len(command) <= self.maxPacketBytes) + if self.synchronous: + self.send(command) + else: + self.commandQueue.append(command) + + def runCommandSync(self, command): + """Run a command synchronously. + This is slow. Don't use it without a very good reason.""" + self.flushCommands() + self.queueCommand(command) + self.flushCommands() + + def flushCommands(self, sleepSeconds=0): + """Flush the command queue.""" + command = b"" + for oneCommand in self.commandQueue: + assert(len(oneCommand) <= self.maxPacketBytes) + if len(command) + len(oneCommand) > self.maxPacketBytes: + self.send(command) + command = b"" + command += oneCommand + if command: + self.send(command) + self.commandQueue = [] + if sleepSeconds: + time.sleep(sleepSeconds) + + def send(self, data): + raise NotImplementedError # Reimplement in subclass. + + def receive(self, size): + raise NotImplementedError # Reimplement in subclass. diff --git a/libtoprammer/hardware_access_usb.py b/libtoprammer/hardware_access_usb.py new file mode 100644 index 0000000..682dee9 --- /dev/null +++ b/libtoprammer/hardware_access_usb.py @@ -0,0 +1,133 @@ +""" +# TOP2049 Open Source programming suite +# +# Lowlevel USB hardware access. +# +# Copyright (c) 2012 Michael Buesch +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" + +from util import * +from command_queue import * +try: + import usb +except (ImportError), e: + print "Python USB support module not found. Please install python-usb." + sys.exit(1) + + +class FoundUSBDev(object): + def __init__(self, usbdev, busNr, devNr): + self.usbdev = usbdev + self.busNr = busNr + self.devNr = devNr + +class HardwareAccessUSB(CommandQueue): + "Lowlevel USB hardware access" + + @classmethod + def scan(cls, checkCallback): + "Scan for devices. Returns a list of FoundUSBDev()." + devices = [] + for bus in usb.busses(): + for dev in bus.devices: + if not checkCallback(dev): + continue + try: + busNr = int(bus.dirname, 10) + devNr = int(dev.filename, 10) + except (ValueError), e: + continue + devices.append(FoundUSBDev(dev, busNr, devNr)) + return devices + + def __init__(self, usbdev, maxPacketBytes, noQueue, + doRawDump=False): + CommandQueue.__init__(self, + maxPacketBytes = maxPacketBytes, + synchronous = noQueue) + self.doRawDump = doRawDump + self.usbdev = usbdev + self.usbh = None + + self.__initUSB() + + def __initUSB(self): + try: + self.usbh = self.usbdev.open() + config = self.usbdev.configurations[0] + interface = config.interfaces[0][0] + + # Find the endpoints + self.bulkOut = None + self.bulkIn = None + for ep in interface.endpoints: + if not self.bulkIn and \ + ep.type == usb.ENDPOINT_TYPE_BULK and \ + (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_IN: + self.bulkIn = ep + if not self.bulkOut and \ + ep.type == usb.ENDPOINT_TYPE_BULK and \ + (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_OUT: + self.bulkOut = ep + if not self.bulkIn or not self.bulkOut: + raise TOPException("Did not find all USB EPs") + + self.usbh.setConfiguration(config) + self.usbh.claimInterface(interface) + self.usbh.setAltInterface(interface) + self.usbh.clearHalt(self.bulkOut.address) + self.usbh.clearHalt(self.bulkIn.address) + except (usb.USBError), e: + self.usbh = None + raise TOPException("USB error: " + str(e)) + + def shutdown(self): + "Shutdown the USB connection" + try: + if self.usbh: + self.usbh.releaseInterface() + self.usbh = None + except (usb.USBError), e: + raise TOPException("USB error: " + str(e)) + + def send(self, data): + try: + assert(len(data) <= self.maxPacketBytes) + if self.doRawDump: + print("Sending command:") + dumpMem(data) + self.usbh.bulkWrite(self.bulkOut.address, data) + except (usb.USBError), e: + raise TOPException("USB bulk write error: " + str(e)) + + def receive(self, size): + """Receive 'size' bytes on the bulk-in ep.""" + # If there are blocked commands in the queue, send them now. + self.flushCommands() + try: + ep = self.bulkIn.address + data = b"".join(map(lambda b: int2byte(b), + self.usbh.bulkRead(ep, size))) + if len(data) != size: + raise TOPException("USB bulk read error: Could not read the " +\ + "requested number of bytes (req %d, got %d)" % (size, len(data))) + if self.doRawDump: + print("Received data:") + dumpMem(data) + except (usb.USBError), e: + raise TOPException("USB bulk read error: " + str(e)) + return data diff --git a/libtoprammer/main.py b/libtoprammer/main.py index c2970c7..3d8fdfe 100644 --- a/libtoprammer/main.py +++ b/libtoprammer/main.py @@ -37,64 +37,51 @@ from util import * import time import re -try: - import usb -except (ImportError), e: - print "Python USB support module not found. Please install python-usb." - sys.exit(1) +from hardware_access_usb import * from top_devices import * from chips import * from user_interface import * -class TOP: +class FoundDev(object): + def __init__(self, toptype, devIdentifier, busdata=None): + self.toptype = toptype + self.devIdentifier = devIdentifier + self.busdata = busdata + +class TOP(object): # Supported programmer types TYPE_TOP2049 = "TOP2049" - def __init__(self, busDev=None, verbose=0, + def __init__(self, devIdentifier=None, verbose=0, forceLevel=0, noqueue=False, usebroken=False, forceBitfileUpload=False, userInterface=ConsoleUserInterface()): - """busDev is a tuple (BUSID, DEVID) or None.""" self.verbose = verbose self.forceLevel = forceLevel self.forceBitfileUpload = forceBitfileUpload - self.noqueue = noqueue self.usebroken = usebroken self.userInterface = userInterface + self.hw = None self.chip = None - self.commandQueue = [] # Find the device - for bus in usb.busses(): - if busDev and bus.dirname != "%03d" % busDev[0]: - continue - for dev in bus.devices: - if busDev and dev.filename != "%03d" % busDev[1]: - continue - if self.__usbdev2toptype(dev): - break - if busDev: - raise TOPException( - "Device %03d.%03d is not a TOP device" %\ - (busDev[0], busDev[1])) - else: - continue - break - else: + devices = self.findDevices() + if devIdentifier: + devices = filter(lambda d: d.devIdentifier == devIdentifier, + devices) + if not devices: raise TOPException("TOP programmer device not found!") - self.usbbus = bus - self.usbdev = dev - self.usbh = None + foundDev = devices[0] # Select first - if self.noqueue: + if noqueue: self.printWarning("WARNING: Command queuing disabled. " +\ "Hardware access will be _really_ slow.") - self.initializeProgrammer() + self.initializeProgrammer(foundDev, noqueue) def getProgrammerType(self): "Returns the TYPE_TOPxxxx" @@ -171,8 +158,8 @@ class TOP: self.flushCommands() self.userInterface.debugMessage(message) - @staticmethod - def __usbdev2toptype(usbdev): + @classmethod + def __usbdev2toptype(cls, usbdev): "Returns the TOP type of the USB device. None, if this is not a TOP device." try: toptype = { @@ -182,78 +169,44 @@ class TOP: return None return toptype - @staticmethod - def findDevices(): - """Rescan the USB busses and return a list of tuples (busNr, devNr) - for the found device.""" - devices = [] - for bus in usb.busses(): - for dev in bus.devices: - toptype = TOP.__usbdev2toptype(dev) - if not toptype: - continue - try: - busNr = int(bus.dirname) - devNr = int(dev.filename) - except (ValueError), e: - pass - devices.append( (toptype, busNr, devNr) ) + @classmethod + def findDevices(cls): + """Rescan all busses for TOP devices. + Returns a list of FoundDev()""" + usbFound = HardwareAccessUSB.scan(cls.__usbdev2toptype) + devices = [ FoundDev(cls.__usbdev2toptype(d.usbdev), + "usb:%03d:%03d" % (d.busNr, d.devNr), + d) + for d in usbFound ] return devices - def __initializeUSB(self): - # Set up the USB interface - self.__shutdownUSB() - try: - self.usbh = self.usbdev.open() - config = self.usbdev.configurations[0] - interface = config.interfaces[0][0] - - # Find the endpoints - self.bulkOut = None - self.bulkIn = None - for ep in interface.endpoints: - if not self.bulkIn and \ - ep.type == usb.ENDPOINT_TYPE_BULK and \ - (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_IN: - self.bulkIn = ep - if not self.bulkOut and \ - ep.type == usb.ENDPOINT_TYPE_BULK and \ - (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_OUT: - self.bulkOut = ep - if not self.bulkIn or not self.bulkOut: - raise TOPException("Did not find all USB EPs") - - self.usbh.setConfiguration(config) - self.usbh.claimInterface(interface) - self.usbh.setAltInterface(interface) - self.usbh.clearHalt(self.bulkOut.address) - self.usbh.clearHalt(self.bulkIn.address) - except (usb.USBError), e: - self.usbh = None - raise TOPException("USB error: " + str(e)) - - def __shutdownUSB(self): - try: - if self.usbh: - self.usbh.releaseInterface() - self.usbh = None - except (usb.USBError), e: - raise TOPException("USB error: " + str(e)) - - def initializeProgrammer(self): + def initializeProgrammer(self, foundDev, noQueue): "Initialize the hardware" - self.__initializeUSB() + self.shutdownProgrammer() + + if foundDev.toptype == self.TYPE_TOP2049: + self.hw = top2049.hardware_access.HardwareAccess( + foundUSBDev = foundDev.busdata, + noQueue = noQueue, + doRawDump = (self.verbose >= 3)) + self.vcc = top2049.vcc_layouts.VCCLayout(self) + self.vpp = top2049.vpp_layouts.VPPLayout(self) + self.gnd = top2049.gnd_layouts.GNDLayout(self) + else: + assert(0) + self.topType = foundDev.toptype versionRegex = ( (r"top2049\s+ver\s*(\d+\.\d+)", self.TYPE_TOP2049), ) - versionString = self.cmdRequestVersion() - for (regex, topType) in versionRegex: + versionString = self.hw.readVersionString() + for (regex, t) in versionRegex: + if t != self.topType: + continue m = re.match(regex, versionString, re.IGNORECASE) if m: - self.topType = topType self.topVersion = m.group(1) break else: @@ -261,49 +214,15 @@ class TOP: "' is not supported by Toprammer, yet") self.printInfo("Initializing the " + self.topType + " version " + self.topVersion) - # Initialize the programmer specific layouts - if self.topType == self.TYPE_TOP2049: - self.vcc = top2049.vcc_layouts.VCCLayout(self) - self.vpp = top2049.vpp_layouts.VPPLayout(self) - self.gnd = top2049.gnd_layouts.GNDLayout(self) - else: - assert(0) - - self.queueCommand(b"\x0D") - stat = self.cmdReadBufferReg32() - if stat != 0x00020C69: - self.printWarning("Init: Unexpected status (a): 0x%08X" % stat) - - self.cmdSetVPPVoltage(0) - self.cmdSetVPPVoltage(0) - self.queueCommand(b"\x0E\x20\x00\x00") - self.cmdDelay(0.01) - self.cmdSetVCCVoltage(0) - - self.cmdLoadGNDLayout(0) - self.cmdLoadVPPLayout(0) - self.cmdLoadVCCLayout(0) - - self.queueCommand(b"\x0E\x20\x00\x00") - self.cmdDelay(0.01) - self.queueCommand(b"\x0E\x25\x00\x00") - stat = self.cmdReadBufferReg32() - if stat != 0x0000686C: - self.printWarning("Init: Unexpected status (b): 0x%08X" % stat) - self.cmdEnableZifPullups(False) - self.flushCommands() + self.hw.hardwareInit() def shutdownProgrammer(self): - self.__shutdownUSB() + if self.hw: + self.hw.shutdown() + self.hw = None self.topType = None self.topVersion = None - def getProgrammerType(self): - return self.topType - - def getProgrammerVersion(self): - return self.topVersion - def __readBitfileID(self): self.cmdFPGARead(0xFD) self.cmdFPGARead(0xFE) @@ -331,17 +250,11 @@ class TOP: self.printDebug("Uploading bitfile %s..." % self.bitfile.getFilename()) - self.cmdFPGAInitiateConfig() - stat = self.cmdReadBufferReg8() - expected = 0x01 - if stat != expected: - raise TOPException("bit-upload: Failed to initiate " +\ - "config sequence (got 0x%02X, expected 0x%02X)" %\ - (stat, expected)) - + self.hw.FPGAInitiateConfig() data = self.bitfile.getPayload() - for i in range(0, len(data), 60): - self.cmdFPGAUploadConfig(data[i : i + 60]) + chunksz = self.hw.FPGAMaxConfigChunkSize() + for i in range(0, len(data), chunksz): + self.hw.FPGAUploadConfig(data[i : i + chunksz]) self.flushCommands() if requiredID and requiredRevision: @@ -462,42 +375,23 @@ class TOP: self.flushCommands() self.printDebug("Done writing the image.") - def __cmdDelay_4usec(self): - self.queueCommand(int2byte(0x00)) - - def __cmdDelay_10msec(self): - self.queueCommand(int2byte(0x1B)) - def cmdDelay(self, seconds): """Send a delay request to the device. Note that this causes the programmer to execute the delay. For a host-delay, use hostDelay()""" - assert(seconds < 0.5) - if seconds > 0.000255: - # Need to round up to ten milliseconds - millisecs = int(math.ceil(seconds * 1000)) - millisecs = roundup(millisecs, 10) - for i in range(0, millisecs // 10): - self.__cmdDelay_10msec() - else: - # Round up to 4 usec boundary - microsecs = int(math.ceil(seconds * 1000000)) - microsecs = roundup(microsecs, 4) - for i in range(0, microsecs // 4): - self.__cmdDelay_4usec() + self.hw.delay(seconds) def hostDelay(self, seconds): """Flush all commands and delay the host computer for 'seconds'""" - self.flushCommands() - time.sleep(seconds) + self.hw.flushCommands(seconds) def getOscillatorHz(self): """Returns the FPGA oscillator frequency, in Hz. The oscillator is connected to the FPGA clk pin.""" - return 24000000 + self.hw.getOscillatorHz() def getBufferRegSize(self): """Returns the size (in bytes) of the buffer register.""" - return 64 + return self.hw.getBufferRegSize() def cmdReadBufferReg(self, nrBytes=-1): """Read the buffer register. Returns nrBytes (default all bytes).""" @@ -506,9 +400,8 @@ class TOP: nrBytes = regSize assert(nrBytes <= regSize) if not nrBytes: - return "" - self.queueCommand(int2byte(0x07)) - return self.receive(regSize)[0:nrBytes] + return b"" + return self.hw.readBufferReg(nrBytes) def cmdReadBufferReg8(self): """Read a 8bit value from the buffer register.""" @@ -543,139 +436,37 @@ class TOP: (byte2int(stat[4]) << 32) | (byte2int(stat[5]) << 40) return stat - def cmdRequestVersion(self): - """Returns the device ID and versioning string.""" - self.queueCommand(b"\x0E\x11\x00\x00") - data = self.cmdReadBufferReg(16) - return data.strip() - - def cmdFPGAInitiateConfig(self): - """Initiate a configuration sequence on the FPGA.""" - self.queueCommand(b"\x0E\x21\x00\x00") - - def cmdFPGAUploadConfig(self, data): - """Upload configuration data into the FPGA.""" - assert(len(data) <= 60) - cmd = b"\x0E\x22\x00\x00" + data - cmd += b"\x00" * (64 - len(cmd)) # padding - self.queueCommand(cmd) - - def __makeFPGAAddr(self, address): - # Set the "address OK" bit - return address | 0x10 - def cmdFPGARead(self, address): """Read a byte from the FPGA at address into the buffer register.""" - address = self.__makeFPGAAddr(address) - if address == self.__makeFPGAAddr(0): # Fast tracked - self.queueCommand(int2byte(0x01)) - return - self.queueCommand(int2byte(0x0B) + int2byte(address)) + self.hw.FPGARead(address) def cmdFPGAWrite(self, address, byte): """Write a byte to an FPGA address.""" - address = self.__makeFPGAAddr(address) - if address == self.__makeFPGAAddr(0): # Fast tracked - self.queueCommand(int2byte(0x10) + int2byte(byte)) - return - self.queueCommand(int2byte(0x0A) + int2byte(address) + int2byte(byte)) + return self.hw.FPGAWrite(address, byte) def cmdLoadGNDLayout(self, layout): - """Load the GND configuration into the H/L shiftregisters.""" - cmd = int2byte(0x0E) + int2byte(0x16) + int2byte(layout) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) - self.hostDelay(0.15) + """Load the GND configuration into the programmer.""" + self.hw.loadGNDLayout(layout) def cmdSetVPPVoltage(self, voltage): """Set the VPP voltage. voltage is a floating point voltage number.""" - centivolt = int(voltage * 10) - cmd = int2byte(0x0E) + int2byte(0x12) + int2byte(centivolt) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) + self.hw.setVPPVoltage(voltage) def cmdLoadVPPLayout(self, layout): - """Load the VPP configuration into the shift registers.""" - cmd = int2byte(0x0E) + int2byte(0x14) + int2byte(layout) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) - self.hostDelay(0.15) + """Load the VPP configuration into the programmer.""" + self.hw.loadVPPLayout(layout) def cmdSetVCCVoltage(self, voltage): """Set the VCC voltage. voltage is a floating point voltage number.""" - centivolt = int(voltage * 10) - cmd = int2byte(0x0E) + int2byte(0x13) + int2byte(centivolt) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) + self.hw.setVCCVoltage(voltage) def cmdLoadVCCLayout(self, layout): """Load the VCC configuration into the shift registers.""" - cmd = int2byte(0x0E) + int2byte(0x15) + int2byte(layout) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) - self.hostDelay(0.15) + self.hw.loadVCCLayout(layout) def cmdEnableZifPullups(self, enable): """Enable the ZIF socket signal pullups.""" - param = 0 - if enable: - param = 1 - cmd = int2byte(0x0E) + int2byte(0x28) + int2byte(param) + int2byte(0) - self.queueCommand(cmd) - - def __doSend(self, command): - try: - assert(len(command) <= 64) - if self.verbose >= 3: - print "Sending command:" - dumpMem(command) - ep = self.bulkOut.address - self.usbh.bulkWrite(ep, command) - except (usb.USBError), e: - raise TOPException("USB bulk write error: " + str(e)) - - def queueCommand(self, command): - """Queue a raw command for transmission.""" - assert(len(command) <= 64) - if self.noqueue: - self.__doSend(command) - else: - self.commandQueue.append(command) - - def runCommandSync(self, command): - """Run a command synchronously. - Warning: This is slow. Don't use it unless there's a very good reason.""" - self.flushCommands() - self.queueCommand(command) - self.flushCommands() - - def receive(self, size): - """Receive 'size' bytes on the bulk-in ep.""" - # If there are blocked commands in the queue, send them now. - self.flushCommands() - try: - ep = self.bulkIn.address - data = b"".join(map(lambda b: int2byte(b), - self.usbh.bulkRead(ep, size))) - if len(data) != size: - raise TOPException("USB bulk read error: Could not read the " +\ - "requested number of bytes (req %d, got %d)" % (size, len(data))) - if self.verbose >= 3: - print "Received data:" - dumpMem(data) - except (usb.USBError), e: - raise TOPException("USB bulk read error: " + str(e)) - return data + self.hw.enableZifPullups(enable) def flushCommands(self): - """Flush the command queue.""" - command = b"" - for oneCommand in self.commandQueue: - assert(len(oneCommand) <= 64) - if len(command) + len(oneCommand) > 64: - self.__doSend(command) - command = b"" - command += oneCommand - if command: - self.__doSend(command) - self.commandQueue = [] + self.hw.flushCommands() diff --git a/libtoprammer/top2049/hardware_access.py b/libtoprammer/top2049/hardware_access.py new file mode 100644 index 0000000..b43ee7c --- /dev/null +++ b/libtoprammer/top2049/hardware_access.py @@ -0,0 +1,183 @@ +""" +# TOP2049 Open Source programming suite +# +# TOP2049 Lowlevel hardware access +# +# Copyright (c) 2012 Michael Buesch +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +""" + +from libtoprammer.util import * +from libtoprammer.hardware_access_usb import * + + +class HardwareAccess(HardwareAccessUSB): + "TOP2049 hardware access" + + ADDR_OK_BIT = 4 + + def __init__(self, foundUSBDev, + noQueue=False, doRawDump=False): + HardwareAccessUSB.__init__(self, + usbdev = foundUSBDev.usbdev, + maxPacketBytes = 64, + noQueue = noQueue, + doRawDump = doRawDump) + + def getOscillatorHz(self): + return 24000000 + + def getBufferRegSize(self): + return 64 + + def readBufferReg(self, nrBytes): + self.queueCommand(int2byte(0x07)) + return self.receive(self.getBufferRegSize())[:nrBytes] + + def hardwareInit(self): + self.queueCommand(b"\x0D") + if self.readBufferReg(4) != b"\x69\x0C\x02\x00": + self.printWarning("Init: Unexpected status (a)") + + self.setVPPVoltage(0) + self.setVPPVoltage(0) + self.queueCommand(b"\x0E\x20\x00\x00") + self.delay(0.01) + self.setVCCVoltage(0) + + self.loadGNDLayout(0) + self.loadVPPLayout(0) + self.loadVCCLayout(0) + + self.queueCommand(b"\x0E\x20\x00\x00") + self.delay(0.01) + self.queueCommand(b"\x0E\x25\x00\x00") + if self.readBufferReg(4) != b"\x6C\x68\x00\x00": + self.printWarning("Init: Unexpected status (b)") + self.enableZifPullups(False) + self.flushCommands() + + def readVersionString(self): + """Returns the device ID and versioning string.""" + self.queueCommand(b"\x0E\x11\x00\x00") + data = self.readBufferReg(16) + return data.strip() + + def FPGAMaxConfigChunkSize(self): + """Maximum config chunk size.""" + return 60 + + def FPGAInitiateConfig(self): + """Initiate a configuration sequence on the FPGA.""" + self.queueCommand(b"\x0E\x21\x00\x00") + stat = byte2int(self.readBufferReg(1)) + expected = 0x01 + if stat != expected: + raise TOPException("bit-upload: Failed to initiate " +\ + "config sequence (got 0x%02X, expected 0x%02X)" %\ + (stat, expected)) + + def FPGAUploadConfig(self, data): + """Upload configuration data into the FPGA.""" + assert(len(data) <= self.FPGAMaxConfigChunkSize()) + cmd = b"\x0E\x22\x00\x00" + data + cmd += b"\x00" * (64 - len(cmd)) # padding + self.queueCommand(cmd) + + def makeFPGAAddr(self, address): + # Set the "address OK" bit + return address | (1 << self.ADDR_OK_BIT) + + def FPGARead(self, address): + address = self.makeFPGAAddr(address) + if address == self.makeFPGAAddr(0): # Fast tracked + self.queueCommand(int2byte(0x01)) + return + self.queueCommand(int2byte(0x0B) + int2byte(address)) + + def FPGAWrite(self, address, byte): + address = self.makeFPGAAddr(address) + if address == self.makeFPGAAddr(0): # Fast tracked + self.queueCommand(int2byte(0x10) + int2byte(byte)) + return + self.queueCommand(int2byte(0x0A) + int2byte(address) + int2byte(byte)) + + def loadGNDLayout(self, layout): + # Load the GND configuration into the H/L shiftregisters. + cmd = int2byte(0x0E) + int2byte(0x16) + int2byte(layout) + int2byte(0) + self.queueCommand(cmd) + self.delay(0.01) + self.flushCommands(0.15) + + def setVPPVoltage(self, voltage): + # Set the VPP voltage. voltage is a floating point voltage number. + centivolt = int(voltage * 10) + cmd = int2byte(0x0E) + int2byte(0x12) + int2byte(centivolt) + int2byte(0) + self.queueCommand(cmd) + self.delay(0.01) + + def loadVPPLayout(self, layout): + # Load the VPP configuration into the shift registers. + cmd = int2byte(0x0E) + int2byte(0x14) + int2byte(layout) + int2byte(0) + self.queueCommand(cmd) + self.delay(0.01) + self.flushCommands(0.15) + + def setVCCVoltage(self, voltage): + # Set the VCC voltage. + centivolt = int(voltage * 10) + cmd = int2byte(0x0E) + int2byte(0x13) + int2byte(centivolt) + int2byte(0) + self.queueCommand(cmd) + self.delay(0.01) + + def loadVCCLayout(self, layout): + # Load the VCC configuration into the shift registers. + cmd = int2byte(0x0E) + int2byte(0x15) + int2byte(layout) + int2byte(0) + self.queueCommand(cmd) + self.delay(0.01) + self.flushCommands(0.15) + + def enableZifPullups(self, enable): + # Enable the ZIF socket signal pullups. + param = 0 + if enable: + param = 1 + cmd = int2byte(0x0E) + int2byte(0x28) + int2byte(param) + int2byte(0) + self.queueCommand(cmd) + + def __delay_4usec(self): + self.queueCommand(int2byte(0x00)) + + def __delay_10msec(self): + self.queueCommand(int2byte(0x1B)) + + def delay(self, seconds): + if seconds >= 0.5: + # Perform long delays on the host + self.flushCommands(seconds) + return + if seconds > 0.000255: + # Need to round up to ten milliseconds + millisecs = int(math.ceil(seconds * 1000)) + millisecs = roundup(millisecs, 10) + for i in range(0, millisecs // 10): + self.__delay_10msec() + else: + # Round up to 4 usec boundary + microsecs = int(math.ceil(seconds * 1000000)) + microsecs = roundup(microsecs, 4) + for i in range(0, microsecs // 4): + self.__delay_4usec() diff --git a/libtoprammer/top_devices.py b/libtoprammer/top_devices.py index 39526dd..91ac1ab 100644 --- a/libtoprammer/top_devices.py +++ b/libtoprammer/top_devices.py @@ -1,6 +1,7 @@ # Import programmer specific stuff # TOP2049 specific stuff +import top2049.hardware_access import top2049.vcc_layouts import top2049.vpp_layouts import top2049.gnd_layouts diff --git a/toprammer b/toprammer index dab17fb..77beb0a 100755 --- a/toprammer +++ b/toprammer @@ -61,7 +61,8 @@ def usage(): print "Other options:" print " -t|--list Print a list of supported chips and exit." print " Use -V|--verbose to control the list verbosity (1-4)" - print " -d|--device BUS.DEV Use the programmer at BUS.DEV" + print " -d|--device DEVID Use a specific programmer. For USB:" + print " usb:BUSNR.DEVNR" print " First found programmer is used, if not given." print " -V|--verbose LEVEL Set the verbosity level:" print " 0 => show warnings" @@ -149,10 +150,12 @@ def main(argv): opt_action = "print-list" if o in ("-d", "--device"): try: - v = v.split(".") - opt_device = (int(v[0]), int(v[1])) + v = v.replace('.', ':').split(':') + opt_device = "%s:%03d:%03d" %\ + (v[0], int(v[1], 10), + int(v[2], 10)) except (IndexError, ValueError), e: - print "-d|--device invalid BUS.DEV id." + print("-d|--device invalid DEVID.") return 1 if o in ("-V", "--verbose"): opt_verbose = int(v) @@ -228,7 +231,7 @@ def main(argv): verbose=opt_verbose, showBroken=True) return 0 - top = TOP(busDev = opt_device, + top = TOP(devIdentifier = opt_device, verbose = opt_verbose, forceLevel = opt_forceLevel, noqueue = opt_noqueue, usebroken = opt_usebroken, forceBitfileUpload = opt_forceBitfileUpload) diff --git a/toprammer-gui b/toprammer-gui index e599918..49cecdf 100755 --- a/toprammer-gui +++ b/toprammer-gui @@ -685,12 +685,9 @@ class HwThread(QThread): self.mutex.unlock() self.wait() - def setDevice(self, busNr=None, devNr=None): + def setDevice(self, devIdentifier=None): assert(self.top is None and self.task is None) - busDev = None - if busNr is not None and devNr is not None: - busDev = (busNr, devNr) - self.param_busDev = busDev + self.param_devIdentifier = devIdentifier def setTopParameters(self, verbose=2, forceLevel=0, usebroken=True, forceBitfileUpload=False): @@ -792,7 +789,7 @@ class HwThread(QThread): if not self.top: # Initialize Hardware access self.__blockCancellation() - self.top = TOP(busDev = self.param_busDev, + self.top = TOP(devIdentifier = self.param_devIdentifier, verbose = self.param_verbose, forceLevel = self.param_forceLevel, usebroken = self.param_usebroken, @@ -2022,16 +2019,17 @@ class ProgrammerSelectDialog(QDialog): def rescan(self): self.deviceList.clear() - for (toptype, busNr, devNr) in TOP.findDevices(): - text = "%s (usb:%03d:%03d)" % (toptype, busNr, devNr) + for foundDev in TOP.findDevices(): + text = "%s (%s)" % (foundDev.toptype, + foundDev.devIdentifier) item = QListWidgetItem(text, self.deviceList) - item.setData(Qt.UserRole, (busNr, devNr)) + item.setData(Qt.UserRole, foundDev.devIdentifier) self.deviceList.setCurrentRow(0) - def getUSBIDs(self): + def getIdentifier(self): item = self.deviceList.currentItem() if not item: - return (None, None) + return None return item.data(Qt.UserRole) def selectionChanged(self, unused): @@ -2186,9 +2184,9 @@ class MainWindow(QMainWindow): dlg = ProgrammerSelectDialog(self) if dlg.exec_() != QDialog.Accepted: return - (busNr, devNr) = dlg.getUSBIDs() + devIdentifier = dlg.getIdentifier() self.runOperationSync(HwThread.TASK_SHUTDOWN) - self.hwThread.setDevice(busNr, devNr) + self.hwThread.setDevice(devIdentifier) if self.currentChipDescription: self.runOperation(self.OP_INITCHIP, HwThread.TASK_INITCHIP, self.currentChipDescription.chipID) -- cgit v1.2.3