summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Buesch <m@bues.ch>2012-04-26 19:06:16 +0200
committerMichael Buesch <m@bues.ch>2012-04-26 19:06:16 +0200
commit7bfffdbbfe7b6ca42575c176083a2c9466692e6e (patch)
tree873b2f1af0dba68bd23007460a54e96af39e45c6
parentb20b4a6fd7826c7a377a2268675ba915aabba904 (diff)
downloadtoprammer-7bfffdbbfe7b6ca42575c176083a2c9466692e6e.tar.xz
toprammer-7bfffdbbfe7b6ca42575c176083a2c9466692e6e.zip
Move hardware access routines to programmer implementation
Signed-off-by: Michael Buesch <m@bues.ch>
-rw-r--r--libtoprammer/command_queue.py69
-rw-r--r--libtoprammer/hardware_access_usb.py133
-rw-r--r--libtoprammer/main.py353
-rw-r--r--libtoprammer/top2049/hardware_access.py183
-rw-r--r--libtoprammer/top_devices.py1
-rwxr-xr-xtoprammer13
-rwxr-xr-xtoprammer-gui24
7 files changed, 477 insertions, 299 deletions
diff --git a/libtoprammer/command_queue.py b/libtoprammer/command_queue.py
new file mode 100644
index 0000000..233b8ac
--- /dev/null
+++ b/libtoprammer/command_queue.py
@@ -0,0 +1,69 @@
+"""
+# TOP2049 Open Source programming suite
+#
+# Generic command queue.
+#
+# Copyright (c) 2012 Michael Buesch <m@bues.ch>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+
+from util import *
+import time
+
+
+class CommandQueue(object):
+ "Generic hardware-command queue. Needs to be subclassed."
+
+ def __init__(self, maxPacketBytes, synchronous=False):
+ self.maxPacketBytes = maxPacketBytes
+ self.synchronous = synchronous
+ self.commandQueue = []
+
+ def queueCommand(self, command):
+ """Queue a raw command for transmission."""
+ assert(len(command) <= self.maxPacketBytes)
+ if self.synchronous:
+ self.send(command)
+ else:
+ self.commandQueue.append(command)
+
+ def runCommandSync(self, command):
+ """Run a command synchronously.
+ This is slow. Don't use it without a very good reason."""
+ self.flushCommands()
+ self.queueCommand(command)
+ self.flushCommands()
+
+ def flushCommands(self, sleepSeconds=0):
+ """Flush the command queue."""
+ command = b""
+ for oneCommand in self.commandQueue:
+ assert(len(oneCommand) <= self.maxPacketBytes)
+ if len(command) + len(oneCommand) > self.maxPacketBytes:
+ self.send(command)
+ command = b""
+ command += oneCommand
+ if command:
+ self.send(command)
+ self.commandQueue = []
+ if sleepSeconds:
+ time.sleep(sleepSeconds)
+
+ def send(self, data):
+ raise NotImplementedError # Reimplement in subclass.
+
+ def receive(self, size):
+ raise NotImplementedError # Reimplement in subclass.
diff --git a/libtoprammer/hardware_access_usb.py b/libtoprammer/hardware_access_usb.py
new file mode 100644
index 0000000..682dee9
--- /dev/null
+++ b/libtoprammer/hardware_access_usb.py
@@ -0,0 +1,133 @@
+"""
+# TOP2049 Open Source programming suite
+#
+# Lowlevel USB hardware access.
+#
+# Copyright (c) 2012 Michael Buesch <m@bues.ch>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+
+from util import *
+from command_queue import *
+try:
+ import usb
+except (ImportError), e:
+ print "Python USB support module not found. Please install python-usb."
+ sys.exit(1)
+
+
+class FoundUSBDev(object):
+ def __init__(self, usbdev, busNr, devNr):
+ self.usbdev = usbdev
+ self.busNr = busNr
+ self.devNr = devNr
+
+class HardwareAccessUSB(CommandQueue):
+ "Lowlevel USB hardware access"
+
+ @classmethod
+ def scan(cls, checkCallback):
+ "Scan for devices. Returns a list of FoundUSBDev()."
+ devices = []
+ for bus in usb.busses():
+ for dev in bus.devices:
+ if not checkCallback(dev):
+ continue
+ try:
+ busNr = int(bus.dirname, 10)
+ devNr = int(dev.filename, 10)
+ except (ValueError), e:
+ continue
+ devices.append(FoundUSBDev(dev, busNr, devNr))
+ return devices
+
+ def __init__(self, usbdev, maxPacketBytes, noQueue,
+ doRawDump=False):
+ CommandQueue.__init__(self,
+ maxPacketBytes = maxPacketBytes,
+ synchronous = noQueue)
+ self.doRawDump = doRawDump
+ self.usbdev = usbdev
+ self.usbh = None
+
+ self.__initUSB()
+
+ def __initUSB(self):
+ try:
+ self.usbh = self.usbdev.open()
+ config = self.usbdev.configurations[0]
+ interface = config.interfaces[0][0]
+
+ # Find the endpoints
+ self.bulkOut = None
+ self.bulkIn = None
+ for ep in interface.endpoints:
+ if not self.bulkIn and \
+ ep.type == usb.ENDPOINT_TYPE_BULK and \
+ (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_IN:
+ self.bulkIn = ep
+ if not self.bulkOut and \
+ ep.type == usb.ENDPOINT_TYPE_BULK and \
+ (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_OUT:
+ self.bulkOut = ep
+ if not self.bulkIn or not self.bulkOut:
+ raise TOPException("Did not find all USB EPs")
+
+ self.usbh.setConfiguration(config)
+ self.usbh.claimInterface(interface)
+ self.usbh.setAltInterface(interface)
+ self.usbh.clearHalt(self.bulkOut.address)
+ self.usbh.clearHalt(self.bulkIn.address)
+ except (usb.USBError), e:
+ self.usbh = None
+ raise TOPException("USB error: " + str(e))
+
+ def shutdown(self):
+ "Shutdown the USB connection"
+ try:
+ if self.usbh:
+ self.usbh.releaseInterface()
+ self.usbh = None
+ except (usb.USBError), e:
+ raise TOPException("USB error: " + str(e))
+
+ def send(self, data):
+ try:
+ assert(len(data) <= self.maxPacketBytes)
+ if self.doRawDump:
+ print("Sending command:")
+ dumpMem(data)
+ self.usbh.bulkWrite(self.bulkOut.address, data)
+ except (usb.USBError), e:
+ raise TOPException("USB bulk write error: " + str(e))
+
+ def receive(self, size):
+ """Receive 'size' bytes on the bulk-in ep."""
+ # If there are blocked commands in the queue, send them now.
+ self.flushCommands()
+ try:
+ ep = self.bulkIn.address
+ data = b"".join(map(lambda b: int2byte(b),
+ self.usbh.bulkRead(ep, size)))
+ if len(data) != size:
+ raise TOPException("USB bulk read error: Could not read the " +\
+ "requested number of bytes (req %d, got %d)" % (size, len(data)))
+ if self.doRawDump:
+ print("Received data:")
+ dumpMem(data)
+ except (usb.USBError), e:
+ raise TOPException("USB bulk read error: " + str(e))
+ return data
diff --git a/libtoprammer/main.py b/libtoprammer/main.py
index c2970c7..3d8fdfe 100644
--- a/libtoprammer/main.py
+++ b/libtoprammer/main.py
@@ -37,64 +37,51 @@ from util import *
import time
import re
-try:
- import usb
-except (ImportError), e:
- print "Python USB support module not found. Please install python-usb."
- sys.exit(1)
+from hardware_access_usb import *
from top_devices import *
from chips import *
from user_interface import *
-class TOP:
+class FoundDev(object):
+ def __init__(self, toptype, devIdentifier, busdata=None):
+ self.toptype = toptype
+ self.devIdentifier = devIdentifier
+ self.busdata = busdata
+
+class TOP(object):
# Supported programmer types
TYPE_TOP2049 = "TOP2049"
- def __init__(self, busDev=None, verbose=0,
+ def __init__(self, devIdentifier=None, verbose=0,
forceLevel=0, noqueue=False, usebroken=False,
forceBitfileUpload=False,
userInterface=ConsoleUserInterface()):
- """busDev is a tuple (BUSID, DEVID) or None."""
self.verbose = verbose
self.forceLevel = forceLevel
self.forceBitfileUpload = forceBitfileUpload
- self.noqueue = noqueue
self.usebroken = usebroken
self.userInterface = userInterface
+ self.hw = None
self.chip = None
- self.commandQueue = []
# Find the device
- for bus in usb.busses():
- if busDev and bus.dirname != "%03d" % busDev[0]:
- continue
- for dev in bus.devices:
- if busDev and dev.filename != "%03d" % busDev[1]:
- continue
- if self.__usbdev2toptype(dev):
- break
- if busDev:
- raise TOPException(
- "Device %03d.%03d is not a TOP device" %\
- (busDev[0], busDev[1]))
- else:
- continue
- break
- else:
+ devices = self.findDevices()
+ if devIdentifier:
+ devices = filter(lambda d: d.devIdentifier == devIdentifier,
+ devices)
+ if not devices:
raise TOPException("TOP programmer device not found!")
- self.usbbus = bus
- self.usbdev = dev
- self.usbh = None
+ foundDev = devices[0] # Select first
- if self.noqueue:
+ if noqueue:
self.printWarning("WARNING: Command queuing disabled. " +\
"Hardware access will be _really_ slow.")
- self.initializeProgrammer()
+ self.initializeProgrammer(foundDev, noqueue)
def getProgrammerType(self):
"Returns the TYPE_TOPxxxx"
@@ -171,8 +158,8 @@ class TOP:
self.flushCommands()
self.userInterface.debugMessage(message)
- @staticmethod
- def __usbdev2toptype(usbdev):
+ @classmethod
+ def __usbdev2toptype(cls, usbdev):
"Returns the TOP type of the USB device. None, if this is not a TOP device."
try:
toptype = {
@@ -182,78 +169,44 @@ class TOP:
return None
return toptype
- @staticmethod
- def findDevices():
- """Rescan the USB busses and return a list of tuples (busNr, devNr)
- for the found device."""
- devices = []
- for bus in usb.busses():
- for dev in bus.devices:
- toptype = TOP.__usbdev2toptype(dev)
- if not toptype:
- continue
- try:
- busNr = int(bus.dirname)
- devNr = int(dev.filename)
- except (ValueError), e:
- pass
- devices.append( (toptype, busNr, devNr) )
+ @classmethod
+ def findDevices(cls):
+ """Rescan all busses for TOP devices.
+ Returns a list of FoundDev()"""
+ usbFound = HardwareAccessUSB.scan(cls.__usbdev2toptype)
+ devices = [ FoundDev(cls.__usbdev2toptype(d.usbdev),
+ "usb:%03d:%03d" % (d.busNr, d.devNr),
+ d)
+ for d in usbFound ]
return devices
- def __initializeUSB(self):
- # Set up the USB interface
- self.__shutdownUSB()
- try:
- self.usbh = self.usbdev.open()
- config = self.usbdev.configurations[0]
- interface = config.interfaces[0][0]
-
- # Find the endpoints
- self.bulkOut = None
- self.bulkIn = None
- for ep in interface.endpoints:
- if not self.bulkIn and \
- ep.type == usb.ENDPOINT_TYPE_BULK and \
- (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_IN:
- self.bulkIn = ep
- if not self.bulkOut and \
- ep.type == usb.ENDPOINT_TYPE_BULK and \
- (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_OUT:
- self.bulkOut = ep
- if not self.bulkIn or not self.bulkOut:
- raise TOPException("Did not find all USB EPs")
-
- self.usbh.setConfiguration(config)
- self.usbh.claimInterface(interface)
- self.usbh.setAltInterface(interface)
- self.usbh.clearHalt(self.bulkOut.address)
- self.usbh.clearHalt(self.bulkIn.address)
- except (usb.USBError), e:
- self.usbh = None
- raise TOPException("USB error: " + str(e))
-
- def __shutdownUSB(self):
- try:
- if self.usbh:
- self.usbh.releaseInterface()
- self.usbh = None
- except (usb.USBError), e:
- raise TOPException("USB error: " + str(e))
-
- def initializeProgrammer(self):
+ def initializeProgrammer(self, foundDev, noQueue):
"Initialize the hardware"
- self.__initializeUSB()
+ self.shutdownProgrammer()
+
+ if foundDev.toptype == self.TYPE_TOP2049:
+ self.hw = top2049.hardware_access.HardwareAccess(
+ foundUSBDev = foundDev.busdata,
+ noQueue = noQueue,
+ doRawDump = (self.verbose >= 3))
+ self.vcc = top2049.vcc_layouts.VCCLayout(self)
+ self.vpp = top2049.vpp_layouts.VPPLayout(self)
+ self.gnd = top2049.gnd_layouts.GNDLayout(self)
+ else:
+ assert(0)
+ self.topType = foundDev.toptype
versionRegex = (
(r"top2049\s+ver\s*(\d+\.\d+)", self.TYPE_TOP2049),
)
- versionString = self.cmdRequestVersion()
- for (regex, topType) in versionRegex:
+ versionString = self.hw.readVersionString()
+ for (regex, t) in versionRegex:
+ if t != self.topType:
+ continue
m = re.match(regex, versionString, re.IGNORECASE)
if m:
- self.topType = topType
self.topVersion = m.group(1)
break
else:
@@ -261,49 +214,15 @@ class TOP:
"' is not supported by Toprammer, yet")
self.printInfo("Initializing the " + self.topType + " version " + self.topVersion)
- # Initialize the programmer specific layouts
- if self.topType == self.TYPE_TOP2049:
- self.vcc = top2049.vcc_layouts.VCCLayout(self)
- self.vpp = top2049.vpp_layouts.VPPLayout(self)
- self.gnd = top2049.gnd_layouts.GNDLayout(self)
- else:
- assert(0)
-
- self.queueCommand(b"\x0D")
- stat = self.cmdReadBufferReg32()
- if stat != 0x00020C69:
- self.printWarning("Init: Unexpected status (a): 0x%08X" % stat)
-
- self.cmdSetVPPVoltage(0)
- self.cmdSetVPPVoltage(0)
- self.queueCommand(b"\x0E\x20\x00\x00")
- self.cmdDelay(0.01)
- self.cmdSetVCCVoltage(0)
-
- self.cmdLoadGNDLayout(0)
- self.cmdLoadVPPLayout(0)
- self.cmdLoadVCCLayout(0)
-
- self.queueCommand(b"\x0E\x20\x00\x00")
- self.cmdDelay(0.01)
- self.queueCommand(b"\x0E\x25\x00\x00")
- stat = self.cmdReadBufferReg32()
- if stat != 0x0000686C:
- self.printWarning("Init: Unexpected status (b): 0x%08X" % stat)
- self.cmdEnableZifPullups(False)
- self.flushCommands()
+ self.hw.hardwareInit()
def shutdownProgrammer(self):
- self.__shutdownUSB()
+ if self.hw:
+ self.hw.shutdown()
+ self.hw = None
self.topType = None
self.topVersion = None
- def getProgrammerType(self):
- return self.topType
-
- def getProgrammerVersion(self):
- return self.topVersion
-
def __readBitfileID(self):
self.cmdFPGARead(0xFD)
self.cmdFPGARead(0xFE)
@@ -331,17 +250,11 @@ class TOP:
self.printDebug("Uploading bitfile %s..." % self.bitfile.getFilename())
- self.cmdFPGAInitiateConfig()
- stat = self.cmdReadBufferReg8()
- expected = 0x01
- if stat != expected:
- raise TOPException("bit-upload: Failed to initiate " +\
- "config sequence (got 0x%02X, expected 0x%02X)" %\
- (stat, expected))
-
+ self.hw.FPGAInitiateConfig()
data = self.bitfile.getPayload()
- for i in range(0, len(data), 60):
- self.cmdFPGAUploadConfig(data[i : i + 60])
+ chunksz = self.hw.FPGAMaxConfigChunkSize()
+ for i in range(0, len(data), chunksz):
+ self.hw.FPGAUploadConfig(data[i : i + chunksz])
self.flushCommands()
if requiredID and requiredRevision:
@@ -462,42 +375,23 @@ class TOP:
self.flushCommands()
self.printDebug("Done writing the image.")
- def __cmdDelay_4usec(self):
- self.queueCommand(int2byte(0x00))
-
- def __cmdDelay_10msec(self):
- self.queueCommand(int2byte(0x1B))
-
def cmdDelay(self, seconds):
"""Send a delay request to the device. Note that this causes the
programmer to execute the delay. For a host-delay, use hostDelay()"""
- assert(seconds < 0.5)
- if seconds > 0.000255:
- # Need to round up to ten milliseconds
- millisecs = int(math.ceil(seconds * 1000))
- millisecs = roundup(millisecs, 10)
- for i in range(0, millisecs // 10):
- self.__cmdDelay_10msec()
- else:
- # Round up to 4 usec boundary
- microsecs = int(math.ceil(seconds * 1000000))
- microsecs = roundup(microsecs, 4)
- for i in range(0, microsecs // 4):
- self.__cmdDelay_4usec()
+ self.hw.delay(seconds)
def hostDelay(self, seconds):
"""Flush all commands and delay the host computer for 'seconds'"""
- self.flushCommands()
- time.sleep(seconds)
+ self.hw.flushCommands(seconds)
def getOscillatorHz(self):
"""Returns the FPGA oscillator frequency, in Hz.
The oscillator is connected to the FPGA clk pin."""
- return 24000000
+ self.hw.getOscillatorHz()
def getBufferRegSize(self):
"""Returns the size (in bytes) of the buffer register."""
- return 64
+ return self.hw.getBufferRegSize()
def cmdReadBufferReg(self, nrBytes=-1):
"""Read the buffer register. Returns nrBytes (default all bytes)."""
@@ -506,9 +400,8 @@ class TOP:
nrBytes = regSize
assert(nrBytes <= regSize)
if not nrBytes:
- return ""
- self.queueCommand(int2byte(0x07))
- return self.receive(regSize)[0:nrBytes]
+ return b""
+ return self.hw.readBufferReg(nrBytes)
def cmdReadBufferReg8(self):
"""Read a 8bit value from the buffer register."""
@@ -543,139 +436,37 @@ class TOP:
(byte2int(stat[4]) << 32) | (byte2int(stat[5]) << 40)
return stat
- def cmdRequestVersion(self):
- """Returns the device ID and versioning string."""
- self.queueCommand(b"\x0E\x11\x00\x00")
- data = self.cmdReadBufferReg(16)
- return data.strip()
-
- def cmdFPGAInitiateConfig(self):
- """Initiate a configuration sequence on the FPGA."""
- self.queueCommand(b"\x0E\x21\x00\x00")
-
- def cmdFPGAUploadConfig(self, data):
- """Upload configuration data into the FPGA."""
- assert(len(data) <= 60)
- cmd = b"\x0E\x22\x00\x00" + data
- cmd += b"\x00" * (64 - len(cmd)) # padding
- self.queueCommand(cmd)
-
- def __makeFPGAAddr(self, address):
- # Set the "address OK" bit
- return address | 0x10
-
def cmdFPGARead(self, address):
"""Read a byte from the FPGA at address into the buffer register."""
- address = self.__makeFPGAAddr(address)
- if address == self.__makeFPGAAddr(0): # Fast tracked
- self.queueCommand(int2byte(0x01))
- return
- self.queueCommand(int2byte(0x0B) + int2byte(address))
+ self.hw.FPGARead(address)
def cmdFPGAWrite(self, address, byte):
"""Write a byte to an FPGA address."""
- address = self.__makeFPGAAddr(address)
- if address == self.__makeFPGAAddr(0): # Fast tracked
- self.queueCommand(int2byte(0x10) + int2byte(byte))
- return
- self.queueCommand(int2byte(0x0A) + int2byte(address) + int2byte(byte))
+ return self.hw.FPGAWrite(address, byte)
def cmdLoadGNDLayout(self, layout):
- """Load the GND configuration into the H/L shiftregisters."""
- cmd = int2byte(0x0E) + int2byte(0x16) + int2byte(layout) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
- self.hostDelay(0.15)
+ """Load the GND configuration into the programmer."""
+ self.hw.loadGNDLayout(layout)
def cmdSetVPPVoltage(self, voltage):
"""Set the VPP voltage. voltage is a floating point voltage number."""
- centivolt = int(voltage * 10)
- cmd = int2byte(0x0E) + int2byte(0x12) + int2byte(centivolt) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
+ self.hw.setVPPVoltage(voltage)
def cmdLoadVPPLayout(self, layout):
- """Load the VPP configuration into the shift registers."""
- cmd = int2byte(0x0E) + int2byte(0x14) + int2byte(layout) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
- self.hostDelay(0.15)
+ """Load the VPP configuration into the programmer."""
+ self.hw.loadVPPLayout(layout)
def cmdSetVCCVoltage(self, voltage):
"""Set the VCC voltage. voltage is a floating point voltage number."""
- centivolt = int(voltage * 10)
- cmd = int2byte(0x0E) + int2byte(0x13) + int2byte(centivolt) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
+ self.hw.setVCCVoltage(voltage)
def cmdLoadVCCLayout(self, layout):
"""Load the VCC configuration into the shift registers."""
- cmd = int2byte(0x0E) + int2byte(0x15) + int2byte(layout) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
- self.hostDelay(0.15)
+ self.hw.loadVCCLayout(layout)
def cmdEnableZifPullups(self, enable):
"""Enable the ZIF socket signal pullups."""
- param = 0
- if enable:
- param = 1
- cmd = int2byte(0x0E) + int2byte(0x28) + int2byte(param) + int2byte(0)
- self.queueCommand(cmd)
-
- def __doSend(self, command):
- try:
- assert(len(command) <= 64)
- if self.verbose >= 3:
- print "Sending command:"
- dumpMem(command)
- ep = self.bulkOut.address
- self.usbh.bulkWrite(ep, command)
- except (usb.USBError), e:
- raise TOPException("USB bulk write error: " + str(e))
-
- def queueCommand(self, command):
- """Queue a raw command for transmission."""
- assert(len(command) <= 64)
- if self.noqueue:
- self.__doSend(command)
- else:
- self.commandQueue.append(command)
-
- def runCommandSync(self, command):
- """Run a command synchronously.
- Warning: This is slow. Don't use it unless there's a very good reason."""
- self.flushCommands()
- self.queueCommand(command)
- self.flushCommands()
-
- def receive(self, size):
- """Receive 'size' bytes on the bulk-in ep."""
- # If there are blocked commands in the queue, send them now.
- self.flushCommands()
- try:
- ep = self.bulkIn.address
- data = b"".join(map(lambda b: int2byte(b),
- self.usbh.bulkRead(ep, size)))
- if len(data) != size:
- raise TOPException("USB bulk read error: Could not read the " +\
- "requested number of bytes (req %d, got %d)" % (size, len(data)))
- if self.verbose >= 3:
- print "Received data:"
- dumpMem(data)
- except (usb.USBError), e:
- raise TOPException("USB bulk read error: " + str(e))
- return data
+ self.hw.enableZifPullups(enable)
def flushCommands(self):
- """Flush the command queue."""
- command = b""
- for oneCommand in self.commandQueue:
- assert(len(oneCommand) <= 64)
- if len(command) + len(oneCommand) > 64:
- self.__doSend(command)
- command = b""
- command += oneCommand
- if command:
- self.__doSend(command)
- self.commandQueue = []
+ self.hw.flushCommands()
diff --git a/libtoprammer/top2049/hardware_access.py b/libtoprammer/top2049/hardware_access.py
new file mode 100644
index 0000000..b43ee7c
--- /dev/null
+++ b/libtoprammer/top2049/hardware_access.py
@@ -0,0 +1,183 @@
+"""
+# TOP2049 Open Source programming suite
+#
+# TOP2049 Lowlevel hardware access
+#
+# Copyright (c) 2012 Michael Buesch <m@bues.ch>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+"""
+
+from libtoprammer.util import *
+from libtoprammer.hardware_access_usb import *
+
+
+class HardwareAccess(HardwareAccessUSB):
+ "TOP2049 hardware access"
+
+ ADDR_OK_BIT = 4
+
+ def __init__(self, foundUSBDev,
+ noQueue=False, doRawDump=False):
+ HardwareAccessUSB.__init__(self,
+ usbdev = foundUSBDev.usbdev,
+ maxPacketBytes = 64,
+ noQueue = noQueue,
+ doRawDump = doRawDump)
+
+ def getOscillatorHz(self):
+ return 24000000
+
+ def getBufferRegSize(self):
+ return 64
+
+ def readBufferReg(self, nrBytes):
+ self.queueCommand(int2byte(0x07))
+ return self.receive(self.getBufferRegSize())[:nrBytes]
+
+ def hardwareInit(self):
+ self.queueCommand(b"\x0D")
+ if self.readBufferReg(4) != b"\x69\x0C\x02\x00":
+ self.printWarning("Init: Unexpected status (a)")
+
+ self.setVPPVoltage(0)
+ self.setVPPVoltage(0)
+ self.queueCommand(b"\x0E\x20\x00\x00")
+ self.delay(0.01)
+ self.setVCCVoltage(0)
+
+ self.loadGNDLayout(0)
+ self.loadVPPLayout(0)
+ self.loadVCCLayout(0)
+
+ self.queueCommand(b"\x0E\x20\x00\x00")
+ self.delay(0.01)
+ self.queueCommand(b"\x0E\x25\x00\x00")
+ if self.readBufferReg(4) != b"\x6C\x68\x00\x00":
+ self.printWarning("Init: Unexpected status (b)")
+ self.enableZifPullups(False)
+ self.flushCommands()
+
+ def readVersionString(self):
+ """Returns the device ID and versioning string."""
+ self.queueCommand(b"\x0E\x11\x00\x00")
+ data = self.readBufferReg(16)
+ return data.strip()
+
+ def FPGAMaxConfigChunkSize(self):
+ """Maximum config chunk size."""
+ return 60
+
+ def FPGAInitiateConfig(self):
+ """Initiate a configuration sequence on the FPGA."""
+ self.queueCommand(b"\x0E\x21\x00\x00")
+ stat = byte2int(self.readBufferReg(1))
+ expected = 0x01
+ if stat != expected:
+ raise TOPException("bit-upload: Failed to initiate " +\
+ "config sequence (got 0x%02X, expected 0x%02X)" %\
+ (stat, expected))
+
+ def FPGAUploadConfig(self, data):
+ """Upload configuration data into the FPGA."""
+ assert(len(data) <= self.FPGAMaxConfigChunkSize())
+ cmd = b"\x0E\x22\x00\x00" + data
+ cmd += b"\x00" * (64 - len(cmd)) # padding
+ self.queueCommand(cmd)
+
+ def makeFPGAAddr(self, address):
+ # Set the "address OK" bit
+ return address | (1 << self.ADDR_OK_BIT)
+
+ def FPGARead(self, address):
+ address = self.makeFPGAAddr(address)
+ if address == self.makeFPGAAddr(0): # Fast tracked
+ self.queueCommand(int2byte(0x01))
+ return
+ self.queueCommand(int2byte(0x0B) + int2byte(address))
+
+ def FPGAWrite(self, address, byte):
+ address = self.makeFPGAAddr(address)
+ if address == self.makeFPGAAddr(0): # Fast tracked
+ self.queueCommand(int2byte(0x10) + int2byte(byte))
+ return
+ self.queueCommand(int2byte(0x0A) + int2byte(address) + int2byte(byte))
+
+ def loadGNDLayout(self, layout):
+ # Load the GND configuration into the H/L shiftregisters.
+ cmd = int2byte(0x0E) + int2byte(0x16) + int2byte(layout) + int2byte(0)
+ self.queueCommand(cmd)
+ self.delay(0.01)
+ self.flushCommands(0.15)
+
+ def setVPPVoltage(self, voltage):
+ # Set the VPP voltage. voltage is a floating point voltage number.
+ centivolt = int(voltage * 10)
+ cmd = int2byte(0x0E) + int2byte(0x12) + int2byte(centivolt) + int2byte(0)
+ self.queueCommand(cmd)
+ self.delay(0.01)
+
+ def loadVPPLayout(self, layout):
+ # Load the VPP configuration into the shift registers.
+ cmd = int2byte(0x0E) + int2byte(0x14) + int2byte(layout) + int2byte(0)
+ self.queueCommand(cmd)
+ self.delay(0.01)
+ self.flushCommands(0.15)
+
+ def setVCCVoltage(self, voltage):
+ # Set the VCC voltage.
+ centivolt = int(voltage * 10)
+ cmd = int2byte(0x0E) + int2byte(0x13) + int2byte(centivolt) + int2byte(0)
+ self.queueCommand(cmd)
+ self.delay(0.01)
+
+ def loadVCCLayout(self, layout):
+ # Load the VCC configuration into the shift registers.
+ cmd = int2byte(0x0E) + int2byte(0x15) + int2byte(layout) + int2byte(0)
+ self.queueCommand(cmd)
+ self.delay(0.01)
+ self.flushCommands(0.15)
+
+ def enableZifPullups(self, enable):
+ # Enable the ZIF socket signal pullups.
+ param = 0
+ if enable:
+ param = 1
+ cmd = int2byte(0x0E) + int2byte(0x28) + int2byte(param) + int2byte(0)
+ self.queueCommand(cmd)
+
+ def __delay_4usec(self):
+ self.queueCommand(int2byte(0x00))
+
+ def __delay_10msec(self):
+ self.queueCommand(int2byte(0x1B))
+
+ def delay(self, seconds):
+ if seconds >= 0.5:
+ # Perform long delays on the host
+ self.flushCommands(seconds)
+ return
+ if seconds > 0.000255:
+ # Need to round up to ten milliseconds
+ millisecs = int(math.ceil(seconds * 1000))
+ millisecs = roundup(millisecs, 10)
+ for i in range(0, millisecs // 10):
+ self.__delay_10msec()
+ else:
+ # Round up to 4 usec boundary
+ microsecs = int(math.ceil(seconds * 1000000))
+ microsecs = roundup(microsecs, 4)
+ for i in range(0, microsecs // 4):
+ self.__delay_4usec()
diff --git a/libtoprammer/top_devices.py b/libtoprammer/top_devices.py
index 39526dd..91ac1ab 100644
--- a/libtoprammer/top_devices.py
+++ b/libtoprammer/top_devices.py
@@ -1,6 +1,7 @@
# Import programmer specific stuff
# TOP2049 specific stuff
+import top2049.hardware_access
import top2049.vcc_layouts
import top2049.vpp_layouts
import top2049.gnd_layouts
diff --git a/toprammer b/toprammer
index dab17fb..77beb0a 100755
--- a/toprammer
+++ b/toprammer
@@ -61,7 +61,8 @@ def usage():
print "Other options:"
print " -t|--list Print a list of supported chips and exit."
print " Use -V|--verbose to control the list verbosity (1-4)"
- print " -d|--device BUS.DEV Use the programmer at BUS.DEV"
+ print " -d|--device DEVID Use a specific programmer. For USB:"
+ print " usb:BUSNR.DEVNR"
print " First found programmer is used, if not given."
print " -V|--verbose LEVEL Set the verbosity level:"
print " 0 => show warnings"
@@ -149,10 +150,12 @@ def main(argv):
opt_action = "print-list"
if o in ("-d", "--device"):
try:
- v = v.split(".")
- opt_device = (int(v[0]), int(v[1]))
+ v = v.replace('.', ':').split(':')
+ opt_device = "%s:%03d:%03d" %\
+ (v[0], int(v[1], 10),
+ int(v[2], 10))
except (IndexError, ValueError), e:
- print "-d|--device invalid BUS.DEV id."
+ print("-d|--device invalid DEVID.")
return 1
if o in ("-V", "--verbose"):
opt_verbose = int(v)
@@ -228,7 +231,7 @@ def main(argv):
verbose=opt_verbose, showBroken=True)
return 0
- top = TOP(busDev = opt_device,
+ top = TOP(devIdentifier = opt_device,
verbose = opt_verbose, forceLevel = opt_forceLevel,
noqueue = opt_noqueue, usebroken = opt_usebroken,
forceBitfileUpload = opt_forceBitfileUpload)
diff --git a/toprammer-gui b/toprammer-gui
index e599918..49cecdf 100755
--- a/toprammer-gui
+++ b/toprammer-gui
@@ -685,12 +685,9 @@ class HwThread(QThread):
self.mutex.unlock()
self.wait()
- def setDevice(self, busNr=None, devNr=None):
+ def setDevice(self, devIdentifier=None):
assert(self.top is None and self.task is None)
- busDev = None
- if busNr is not None and devNr is not None:
- busDev = (busNr, devNr)
- self.param_busDev = busDev
+ self.param_devIdentifier = devIdentifier
def setTopParameters(self, verbose=2, forceLevel=0,
usebroken=True, forceBitfileUpload=False):
@@ -792,7 +789,7 @@ class HwThread(QThread):
if not self.top:
# Initialize Hardware access
self.__blockCancellation()
- self.top = TOP(busDev = self.param_busDev,
+ self.top = TOP(devIdentifier = self.param_devIdentifier,
verbose = self.param_verbose,
forceLevel = self.param_forceLevel,
usebroken = self.param_usebroken,
@@ -2022,16 +2019,17 @@ class ProgrammerSelectDialog(QDialog):
def rescan(self):
self.deviceList.clear()
- for (toptype, busNr, devNr) in TOP.findDevices():
- text = "%s (usb:%03d:%03d)" % (toptype, busNr, devNr)
+ for foundDev in TOP.findDevices():
+ text = "%s (%s)" % (foundDev.toptype,
+ foundDev.devIdentifier)
item = QListWidgetItem(text, self.deviceList)
- item.setData(Qt.UserRole, (busNr, devNr))
+ item.setData(Qt.UserRole, foundDev.devIdentifier)
self.deviceList.setCurrentRow(0)
- def getUSBIDs(self):
+ def getIdentifier(self):
item = self.deviceList.currentItem()
if not item:
- return (None, None)
+ return None
return item.data(Qt.UserRole)
def selectionChanged(self, unused):
@@ -2186,9 +2184,9 @@ class MainWindow(QMainWindow):
dlg = ProgrammerSelectDialog(self)
if dlg.exec_() != QDialog.Accepted:
return
- (busNr, devNr) = dlg.getUSBIDs()
+ devIdentifier = dlg.getIdentifier()
self.runOperationSync(HwThread.TASK_SHUTDOWN)
- self.hwThread.setDevice(busNr, devNr)
+ self.hwThread.setDevice(devIdentifier)
if self.currentChipDescription:
self.runOperation(self.OP_INITCHIP, HwThread.TASK_INITCHIP,
self.currentChipDescription.chipID)
bues.ch cgit interface