From 7bfffdbbfe7b6ca42575c176083a2c9466692e6e Mon Sep 17 00:00:00 2001 From: Michael Buesch Date: Thu, 26 Apr 2012 19:06:16 +0200 Subject: Move hardware access routines to programmer implementation Signed-off-by: Michael Buesch --- libtoprammer/main.py | 353 +++++++++++---------------------------------------- 1 file changed, 72 insertions(+), 281 deletions(-) (limited to 'libtoprammer/main.py') diff --git a/libtoprammer/main.py b/libtoprammer/main.py index c2970c7..3d8fdfe 100644 --- a/libtoprammer/main.py +++ b/libtoprammer/main.py @@ -37,64 +37,51 @@ from util import * import time import re -try: - import usb -except (ImportError), e: - print "Python USB support module not found. Please install python-usb." - sys.exit(1) +from hardware_access_usb import * from top_devices import * from chips import * from user_interface import * -class TOP: +class FoundDev(object): + def __init__(self, toptype, devIdentifier, busdata=None): + self.toptype = toptype + self.devIdentifier = devIdentifier + self.busdata = busdata + +class TOP(object): # Supported programmer types TYPE_TOP2049 = "TOP2049" - def __init__(self, busDev=None, verbose=0, + def __init__(self, devIdentifier=None, verbose=0, forceLevel=0, noqueue=False, usebroken=False, forceBitfileUpload=False, userInterface=ConsoleUserInterface()): - """busDev is a tuple (BUSID, DEVID) or None.""" self.verbose = verbose self.forceLevel = forceLevel self.forceBitfileUpload = forceBitfileUpload - self.noqueue = noqueue self.usebroken = usebroken self.userInterface = userInterface + self.hw = None self.chip = None - self.commandQueue = [] # Find the device - for bus in usb.busses(): - if busDev and bus.dirname != "%03d" % busDev[0]: - continue - for dev in bus.devices: - if busDev and dev.filename != "%03d" % busDev[1]: - continue - if self.__usbdev2toptype(dev): - break - if busDev: - raise TOPException( - "Device %03d.%03d is not a TOP device" %\ - (busDev[0], busDev[1])) - else: - continue - break - else: + devices = self.findDevices() + if devIdentifier: + devices = filter(lambda d: d.devIdentifier == devIdentifier, + devices) + if not devices: raise TOPException("TOP programmer device not found!") - self.usbbus = bus - self.usbdev = dev - self.usbh = None + foundDev = devices[0] # Select first - if self.noqueue: + if noqueue: self.printWarning("WARNING: Command queuing disabled. " +\ "Hardware access will be _really_ slow.") - self.initializeProgrammer() + self.initializeProgrammer(foundDev, noqueue) def getProgrammerType(self): "Returns the TYPE_TOPxxxx" @@ -171,8 +158,8 @@ class TOP: self.flushCommands() self.userInterface.debugMessage(message) - @staticmethod - def __usbdev2toptype(usbdev): + @classmethod + def __usbdev2toptype(cls, usbdev): "Returns the TOP type of the USB device. None, if this is not a TOP device." try: toptype = { @@ -182,78 +169,44 @@ class TOP: return None return toptype - @staticmethod - def findDevices(): - """Rescan the USB busses and return a list of tuples (busNr, devNr) - for the found device.""" - devices = [] - for bus in usb.busses(): - for dev in bus.devices: - toptype = TOP.__usbdev2toptype(dev) - if not toptype: - continue - try: - busNr = int(bus.dirname) - devNr = int(dev.filename) - except (ValueError), e: - pass - devices.append( (toptype, busNr, devNr) ) + @classmethod + def findDevices(cls): + """Rescan all busses for TOP devices. + Returns a list of FoundDev()""" + usbFound = HardwareAccessUSB.scan(cls.__usbdev2toptype) + devices = [ FoundDev(cls.__usbdev2toptype(d.usbdev), + "usb:%03d:%03d" % (d.busNr, d.devNr), + d) + for d in usbFound ] return devices - def __initializeUSB(self): - # Set up the USB interface - self.__shutdownUSB() - try: - self.usbh = self.usbdev.open() - config = self.usbdev.configurations[0] - interface = config.interfaces[0][0] - - # Find the endpoints - self.bulkOut = None - self.bulkIn = None - for ep in interface.endpoints: - if not self.bulkIn and \ - ep.type == usb.ENDPOINT_TYPE_BULK and \ - (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_IN: - self.bulkIn = ep - if not self.bulkOut and \ - ep.type == usb.ENDPOINT_TYPE_BULK and \ - (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_OUT: - self.bulkOut = ep - if not self.bulkIn or not self.bulkOut: - raise TOPException("Did not find all USB EPs") - - self.usbh.setConfiguration(config) - self.usbh.claimInterface(interface) - self.usbh.setAltInterface(interface) - self.usbh.clearHalt(self.bulkOut.address) - self.usbh.clearHalt(self.bulkIn.address) - except (usb.USBError), e: - self.usbh = None - raise TOPException("USB error: " + str(e)) - - def __shutdownUSB(self): - try: - if self.usbh: - self.usbh.releaseInterface() - self.usbh = None - except (usb.USBError), e: - raise TOPException("USB error: " + str(e)) - - def initializeProgrammer(self): + def initializeProgrammer(self, foundDev, noQueue): "Initialize the hardware" - self.__initializeUSB() + self.shutdownProgrammer() + + if foundDev.toptype == self.TYPE_TOP2049: + self.hw = top2049.hardware_access.HardwareAccess( + foundUSBDev = foundDev.busdata, + noQueue = noQueue, + doRawDump = (self.verbose >= 3)) + self.vcc = top2049.vcc_layouts.VCCLayout(self) + self.vpp = top2049.vpp_layouts.VPPLayout(self) + self.gnd = top2049.gnd_layouts.GNDLayout(self) + else: + assert(0) + self.topType = foundDev.toptype versionRegex = ( (r"top2049\s+ver\s*(\d+\.\d+)", self.TYPE_TOP2049), ) - versionString = self.cmdRequestVersion() - for (regex, topType) in versionRegex: + versionString = self.hw.readVersionString() + for (regex, t) in versionRegex: + if t != self.topType: + continue m = re.match(regex, versionString, re.IGNORECASE) if m: - self.topType = topType self.topVersion = m.group(1) break else: @@ -261,49 +214,15 @@ class TOP: "' is not supported by Toprammer, yet") self.printInfo("Initializing the " + self.topType + " version " + self.topVersion) - # Initialize the programmer specific layouts - if self.topType == self.TYPE_TOP2049: - self.vcc = top2049.vcc_layouts.VCCLayout(self) - self.vpp = top2049.vpp_layouts.VPPLayout(self) - self.gnd = top2049.gnd_layouts.GNDLayout(self) - else: - assert(0) - - self.queueCommand(b"\x0D") - stat = self.cmdReadBufferReg32() - if stat != 0x00020C69: - self.printWarning("Init: Unexpected status (a): 0x%08X" % stat) - - self.cmdSetVPPVoltage(0) - self.cmdSetVPPVoltage(0) - self.queueCommand(b"\x0E\x20\x00\x00") - self.cmdDelay(0.01) - self.cmdSetVCCVoltage(0) - - self.cmdLoadGNDLayout(0) - self.cmdLoadVPPLayout(0) - self.cmdLoadVCCLayout(0) - - self.queueCommand(b"\x0E\x20\x00\x00") - self.cmdDelay(0.01) - self.queueCommand(b"\x0E\x25\x00\x00") - stat = self.cmdReadBufferReg32() - if stat != 0x0000686C: - self.printWarning("Init: Unexpected status (b): 0x%08X" % stat) - self.cmdEnableZifPullups(False) - self.flushCommands() + self.hw.hardwareInit() def shutdownProgrammer(self): - self.__shutdownUSB() + if self.hw: + self.hw.shutdown() + self.hw = None self.topType = None self.topVersion = None - def getProgrammerType(self): - return self.topType - - def getProgrammerVersion(self): - return self.topVersion - def __readBitfileID(self): self.cmdFPGARead(0xFD) self.cmdFPGARead(0xFE) @@ -331,17 +250,11 @@ class TOP: self.printDebug("Uploading bitfile %s..." % self.bitfile.getFilename()) - self.cmdFPGAInitiateConfig() - stat = self.cmdReadBufferReg8() - expected = 0x01 - if stat != expected: - raise TOPException("bit-upload: Failed to initiate " +\ - "config sequence (got 0x%02X, expected 0x%02X)" %\ - (stat, expected)) - + self.hw.FPGAInitiateConfig() data = self.bitfile.getPayload() - for i in range(0, len(data), 60): - self.cmdFPGAUploadConfig(data[i : i + 60]) + chunksz = self.hw.FPGAMaxConfigChunkSize() + for i in range(0, len(data), chunksz): + self.hw.FPGAUploadConfig(data[i : i + chunksz]) self.flushCommands() if requiredID and requiredRevision: @@ -462,42 +375,23 @@ class TOP: self.flushCommands() self.printDebug("Done writing the image.") - def __cmdDelay_4usec(self): - self.queueCommand(int2byte(0x00)) - - def __cmdDelay_10msec(self): - self.queueCommand(int2byte(0x1B)) - def cmdDelay(self, seconds): """Send a delay request to the device. Note that this causes the programmer to execute the delay. For a host-delay, use hostDelay()""" - assert(seconds < 0.5) - if seconds > 0.000255: - # Need to round up to ten milliseconds - millisecs = int(math.ceil(seconds * 1000)) - millisecs = roundup(millisecs, 10) - for i in range(0, millisecs // 10): - self.__cmdDelay_10msec() - else: - # Round up to 4 usec boundary - microsecs = int(math.ceil(seconds * 1000000)) - microsecs = roundup(microsecs, 4) - for i in range(0, microsecs // 4): - self.__cmdDelay_4usec() + self.hw.delay(seconds) def hostDelay(self, seconds): """Flush all commands and delay the host computer for 'seconds'""" - self.flushCommands() - time.sleep(seconds) + self.hw.flushCommands(seconds) def getOscillatorHz(self): """Returns the FPGA oscillator frequency, in Hz. The oscillator is connected to the FPGA clk pin.""" - return 24000000 + self.hw.getOscillatorHz() def getBufferRegSize(self): """Returns the size (in bytes) of the buffer register.""" - return 64 + return self.hw.getBufferRegSize() def cmdReadBufferReg(self, nrBytes=-1): """Read the buffer register. Returns nrBytes (default all bytes).""" @@ -506,9 +400,8 @@ class TOP: nrBytes = regSize assert(nrBytes <= regSize) if not nrBytes: - return "" - self.queueCommand(int2byte(0x07)) - return self.receive(regSize)[0:nrBytes] + return b"" + return self.hw.readBufferReg(nrBytes) def cmdReadBufferReg8(self): """Read a 8bit value from the buffer register.""" @@ -543,139 +436,37 @@ class TOP: (byte2int(stat[4]) << 32) | (byte2int(stat[5]) << 40) return stat - def cmdRequestVersion(self): - """Returns the device ID and versioning string.""" - self.queueCommand(b"\x0E\x11\x00\x00") - data = self.cmdReadBufferReg(16) - return data.strip() - - def cmdFPGAInitiateConfig(self): - """Initiate a configuration sequence on the FPGA.""" - self.queueCommand(b"\x0E\x21\x00\x00") - - def cmdFPGAUploadConfig(self, data): - """Upload configuration data into the FPGA.""" - assert(len(data) <= 60) - cmd = b"\x0E\x22\x00\x00" + data - cmd += b"\x00" * (64 - len(cmd)) # padding - self.queueCommand(cmd) - - def __makeFPGAAddr(self, address): - # Set the "address OK" bit - return address | 0x10 - def cmdFPGARead(self, address): """Read a byte from the FPGA at address into the buffer register.""" - address = self.__makeFPGAAddr(address) - if address == self.__makeFPGAAddr(0): # Fast tracked - self.queueCommand(int2byte(0x01)) - return - self.queueCommand(int2byte(0x0B) + int2byte(address)) + self.hw.FPGARead(address) def cmdFPGAWrite(self, address, byte): """Write a byte to an FPGA address.""" - address = self.__makeFPGAAddr(address) - if address == self.__makeFPGAAddr(0): # Fast tracked - self.queueCommand(int2byte(0x10) + int2byte(byte)) - return - self.queueCommand(int2byte(0x0A) + int2byte(address) + int2byte(byte)) + return self.hw.FPGAWrite(address, byte) def cmdLoadGNDLayout(self, layout): - """Load the GND configuration into the H/L shiftregisters.""" - cmd = int2byte(0x0E) + int2byte(0x16) + int2byte(layout) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) - self.hostDelay(0.15) + """Load the GND configuration into the programmer.""" + self.hw.loadGNDLayout(layout) def cmdSetVPPVoltage(self, voltage): """Set the VPP voltage. voltage is a floating point voltage number.""" - centivolt = int(voltage * 10) - cmd = int2byte(0x0E) + int2byte(0x12) + int2byte(centivolt) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) + self.hw.setVPPVoltage(voltage) def cmdLoadVPPLayout(self, layout): - """Load the VPP configuration into the shift registers.""" - cmd = int2byte(0x0E) + int2byte(0x14) + int2byte(layout) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) - self.hostDelay(0.15) + """Load the VPP configuration into the programmer.""" + self.hw.loadVPPLayout(layout) def cmdSetVCCVoltage(self, voltage): """Set the VCC voltage. voltage is a floating point voltage number.""" - centivolt = int(voltage * 10) - cmd = int2byte(0x0E) + int2byte(0x13) + int2byte(centivolt) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) + self.hw.setVCCVoltage(voltage) def cmdLoadVCCLayout(self, layout): """Load the VCC configuration into the shift registers.""" - cmd = int2byte(0x0E) + int2byte(0x15) + int2byte(layout) + int2byte(0) - self.queueCommand(cmd) - self.cmdDelay(0.01) - self.hostDelay(0.15) + self.hw.loadVCCLayout(layout) def cmdEnableZifPullups(self, enable): """Enable the ZIF socket signal pullups.""" - param = 0 - if enable: - param = 1 - cmd = int2byte(0x0E) + int2byte(0x28) + int2byte(param) + int2byte(0) - self.queueCommand(cmd) - - def __doSend(self, command): - try: - assert(len(command) <= 64) - if self.verbose >= 3: - print "Sending command:" - dumpMem(command) - ep = self.bulkOut.address - self.usbh.bulkWrite(ep, command) - except (usb.USBError), e: - raise TOPException("USB bulk write error: " + str(e)) - - def queueCommand(self, command): - """Queue a raw command for transmission.""" - assert(len(command) <= 64) - if self.noqueue: - self.__doSend(command) - else: - self.commandQueue.append(command) - - def runCommandSync(self, command): - """Run a command synchronously. - Warning: This is slow. Don't use it unless there's a very good reason.""" - self.flushCommands() - self.queueCommand(command) - self.flushCommands() - - def receive(self, size): - """Receive 'size' bytes on the bulk-in ep.""" - # If there are blocked commands in the queue, send them now. - self.flushCommands() - try: - ep = self.bulkIn.address - data = b"".join(map(lambda b: int2byte(b), - self.usbh.bulkRead(ep, size))) - if len(data) != size: - raise TOPException("USB bulk read error: Could not read the " +\ - "requested number of bytes (req %d, got %d)" % (size, len(data))) - if self.verbose >= 3: - print "Received data:" - dumpMem(data) - except (usb.USBError), e: - raise TOPException("USB bulk read error: " + str(e)) - return data + self.hw.enableZifPullups(enable) def flushCommands(self): - """Flush the command queue.""" - command = b"" - for oneCommand in self.commandQueue: - assert(len(oneCommand) <= 64) - if len(command) + len(oneCommand) > 64: - self.__doSend(command) - command = b"" - command += oneCommand - if command: - self.__doSend(command) - self.commandQueue = [] + self.hw.flushCommands() -- cgit v1.2.3