summaryrefslogtreecommitdiffstats
path: root/libtoprammer/main.py
diff options
context:
space:
mode:
authorMichael Buesch <m@bues.ch>2012-04-26 19:06:16 +0200
committerMichael Buesch <m@bues.ch>2012-04-26 19:06:16 +0200
commit7bfffdbbfe7b6ca42575c176083a2c9466692e6e (patch)
tree873b2f1af0dba68bd23007460a54e96af39e45c6 /libtoprammer/main.py
parentb20b4a6fd7826c7a377a2268675ba915aabba904 (diff)
downloadtoprammer-7bfffdbbfe7b6ca42575c176083a2c9466692e6e.tar.xz
toprammer-7bfffdbbfe7b6ca42575c176083a2c9466692e6e.zip
Move hardware access routines to programmer implementation
Signed-off-by: Michael Buesch <m@bues.ch>
Diffstat (limited to 'libtoprammer/main.py')
-rw-r--r--libtoprammer/main.py353
1 files changed, 72 insertions, 281 deletions
diff --git a/libtoprammer/main.py b/libtoprammer/main.py
index c2970c7..3d8fdfe 100644
--- a/libtoprammer/main.py
+++ b/libtoprammer/main.py
@@ -37,64 +37,51 @@ from util import *
import time
import re
-try:
- import usb
-except (ImportError), e:
- print "Python USB support module not found. Please install python-usb."
- sys.exit(1)
+from hardware_access_usb import *
from top_devices import *
from chips import *
from user_interface import *
-class TOP:
+class FoundDev(object):
+ def __init__(self, toptype, devIdentifier, busdata=None):
+ self.toptype = toptype
+ self.devIdentifier = devIdentifier
+ self.busdata = busdata
+
+class TOP(object):
# Supported programmer types
TYPE_TOP2049 = "TOP2049"
- def __init__(self, busDev=None, verbose=0,
+ def __init__(self, devIdentifier=None, verbose=0,
forceLevel=0, noqueue=False, usebroken=False,
forceBitfileUpload=False,
userInterface=ConsoleUserInterface()):
- """busDev is a tuple (BUSID, DEVID) or None."""
self.verbose = verbose
self.forceLevel = forceLevel
self.forceBitfileUpload = forceBitfileUpload
- self.noqueue = noqueue
self.usebroken = usebroken
self.userInterface = userInterface
+ self.hw = None
self.chip = None
- self.commandQueue = []
# Find the device
- for bus in usb.busses():
- if busDev and bus.dirname != "%03d" % busDev[0]:
- continue
- for dev in bus.devices:
- if busDev and dev.filename != "%03d" % busDev[1]:
- continue
- if self.__usbdev2toptype(dev):
- break
- if busDev:
- raise TOPException(
- "Device %03d.%03d is not a TOP device" %\
- (busDev[0], busDev[1]))
- else:
- continue
- break
- else:
+ devices = self.findDevices()
+ if devIdentifier:
+ devices = filter(lambda d: d.devIdentifier == devIdentifier,
+ devices)
+ if not devices:
raise TOPException("TOP programmer device not found!")
- self.usbbus = bus
- self.usbdev = dev
- self.usbh = None
+ foundDev = devices[0] # Select first
- if self.noqueue:
+ if noqueue:
self.printWarning("WARNING: Command queuing disabled. " +\
"Hardware access will be _really_ slow.")
- self.initializeProgrammer()
+ self.initializeProgrammer(foundDev, noqueue)
def getProgrammerType(self):
"Returns the TYPE_TOPxxxx"
@@ -171,8 +158,8 @@ class TOP:
self.flushCommands()
self.userInterface.debugMessage(message)
- @staticmethod
- def __usbdev2toptype(usbdev):
+ @classmethod
+ def __usbdev2toptype(cls, usbdev):
"Returns the TOP type of the USB device. None, if this is not a TOP device."
try:
toptype = {
@@ -182,78 +169,44 @@ class TOP:
return None
return toptype
- @staticmethod
- def findDevices():
- """Rescan the USB busses and return a list of tuples (busNr, devNr)
- for the found device."""
- devices = []
- for bus in usb.busses():
- for dev in bus.devices:
- toptype = TOP.__usbdev2toptype(dev)
- if not toptype:
- continue
- try:
- busNr = int(bus.dirname)
- devNr = int(dev.filename)
- except (ValueError), e:
- pass
- devices.append( (toptype, busNr, devNr) )
+ @classmethod
+ def findDevices(cls):
+ """Rescan all busses for TOP devices.
+ Returns a list of FoundDev()"""
+ usbFound = HardwareAccessUSB.scan(cls.__usbdev2toptype)
+ devices = [ FoundDev(cls.__usbdev2toptype(d.usbdev),
+ "usb:%03d:%03d" % (d.busNr, d.devNr),
+ d)
+ for d in usbFound ]
return devices
- def __initializeUSB(self):
- # Set up the USB interface
- self.__shutdownUSB()
- try:
- self.usbh = self.usbdev.open()
- config = self.usbdev.configurations[0]
- interface = config.interfaces[0][0]
-
- # Find the endpoints
- self.bulkOut = None
- self.bulkIn = None
- for ep in interface.endpoints:
- if not self.bulkIn and \
- ep.type == usb.ENDPOINT_TYPE_BULK and \
- (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_IN:
- self.bulkIn = ep
- if not self.bulkOut and \
- ep.type == usb.ENDPOINT_TYPE_BULK and \
- (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_OUT:
- self.bulkOut = ep
- if not self.bulkIn or not self.bulkOut:
- raise TOPException("Did not find all USB EPs")
-
- self.usbh.setConfiguration(config)
- self.usbh.claimInterface(interface)
- self.usbh.setAltInterface(interface)
- self.usbh.clearHalt(self.bulkOut.address)
- self.usbh.clearHalt(self.bulkIn.address)
- except (usb.USBError), e:
- self.usbh = None
- raise TOPException("USB error: " + str(e))
-
- def __shutdownUSB(self):
- try:
- if self.usbh:
- self.usbh.releaseInterface()
- self.usbh = None
- except (usb.USBError), e:
- raise TOPException("USB error: " + str(e))
-
- def initializeProgrammer(self):
+ def initializeProgrammer(self, foundDev, noQueue):
"Initialize the hardware"
- self.__initializeUSB()
+ self.shutdownProgrammer()
+
+ if foundDev.toptype == self.TYPE_TOP2049:
+ self.hw = top2049.hardware_access.HardwareAccess(
+ foundUSBDev = foundDev.busdata,
+ noQueue = noQueue,
+ doRawDump = (self.verbose >= 3))
+ self.vcc = top2049.vcc_layouts.VCCLayout(self)
+ self.vpp = top2049.vpp_layouts.VPPLayout(self)
+ self.gnd = top2049.gnd_layouts.GNDLayout(self)
+ else:
+ assert(0)
+ self.topType = foundDev.toptype
versionRegex = (
(r"top2049\s+ver\s*(\d+\.\d+)", self.TYPE_TOP2049),
)
- versionString = self.cmdRequestVersion()
- for (regex, topType) in versionRegex:
+ versionString = self.hw.readVersionString()
+ for (regex, t) in versionRegex:
+ if t != self.topType:
+ continue
m = re.match(regex, versionString, re.IGNORECASE)
if m:
- self.topType = topType
self.topVersion = m.group(1)
break
else:
@@ -261,49 +214,15 @@ class TOP:
"' is not supported by Toprammer, yet")
self.printInfo("Initializing the " + self.topType + " version " + self.topVersion)
- # Initialize the programmer specific layouts
- if self.topType == self.TYPE_TOP2049:
- self.vcc = top2049.vcc_layouts.VCCLayout(self)
- self.vpp = top2049.vpp_layouts.VPPLayout(self)
- self.gnd = top2049.gnd_layouts.GNDLayout(self)
- else:
- assert(0)
-
- self.queueCommand(b"\x0D")
- stat = self.cmdReadBufferReg32()
- if stat != 0x00020C69:
- self.printWarning("Init: Unexpected status (a): 0x%08X" % stat)
-
- self.cmdSetVPPVoltage(0)
- self.cmdSetVPPVoltage(0)
- self.queueCommand(b"\x0E\x20\x00\x00")
- self.cmdDelay(0.01)
- self.cmdSetVCCVoltage(0)
-
- self.cmdLoadGNDLayout(0)
- self.cmdLoadVPPLayout(0)
- self.cmdLoadVCCLayout(0)
-
- self.queueCommand(b"\x0E\x20\x00\x00")
- self.cmdDelay(0.01)
- self.queueCommand(b"\x0E\x25\x00\x00")
- stat = self.cmdReadBufferReg32()
- if stat != 0x0000686C:
- self.printWarning("Init: Unexpected status (b): 0x%08X" % stat)
- self.cmdEnableZifPullups(False)
- self.flushCommands()
+ self.hw.hardwareInit()
def shutdownProgrammer(self):
- self.__shutdownUSB()
+ if self.hw:
+ self.hw.shutdown()
+ self.hw = None
self.topType = None
self.topVersion = None
- def getProgrammerType(self):
- return self.topType
-
- def getProgrammerVersion(self):
- return self.topVersion
-
def __readBitfileID(self):
self.cmdFPGARead(0xFD)
self.cmdFPGARead(0xFE)
@@ -331,17 +250,11 @@ class TOP:
self.printDebug("Uploading bitfile %s..." % self.bitfile.getFilename())
- self.cmdFPGAInitiateConfig()
- stat = self.cmdReadBufferReg8()
- expected = 0x01
- if stat != expected:
- raise TOPException("bit-upload: Failed to initiate " +\
- "config sequence (got 0x%02X, expected 0x%02X)" %\
- (stat, expected))
-
+ self.hw.FPGAInitiateConfig()
data = self.bitfile.getPayload()
- for i in range(0, len(data), 60):
- self.cmdFPGAUploadConfig(data[i : i + 60])
+ chunksz = self.hw.FPGAMaxConfigChunkSize()
+ for i in range(0, len(data), chunksz):
+ self.hw.FPGAUploadConfig(data[i : i + chunksz])
self.flushCommands()
if requiredID and requiredRevision:
@@ -462,42 +375,23 @@ class TOP:
self.flushCommands()
self.printDebug("Done writing the image.")
- def __cmdDelay_4usec(self):
- self.queueCommand(int2byte(0x00))
-
- def __cmdDelay_10msec(self):
- self.queueCommand(int2byte(0x1B))
-
def cmdDelay(self, seconds):
"""Send a delay request to the device. Note that this causes the
programmer to execute the delay. For a host-delay, use hostDelay()"""
- assert(seconds < 0.5)
- if seconds > 0.000255:
- # Need to round up to ten milliseconds
- millisecs = int(math.ceil(seconds * 1000))
- millisecs = roundup(millisecs, 10)
- for i in range(0, millisecs // 10):
- self.__cmdDelay_10msec()
- else:
- # Round up to 4 usec boundary
- microsecs = int(math.ceil(seconds * 1000000))
- microsecs = roundup(microsecs, 4)
- for i in range(0, microsecs // 4):
- self.__cmdDelay_4usec()
+ self.hw.delay(seconds)
def hostDelay(self, seconds):
"""Flush all commands and delay the host computer for 'seconds'"""
- self.flushCommands()
- time.sleep(seconds)
+ self.hw.flushCommands(seconds)
def getOscillatorHz(self):
"""Returns the FPGA oscillator frequency, in Hz.
The oscillator is connected to the FPGA clk pin."""
- return 24000000
+ self.hw.getOscillatorHz()
def getBufferRegSize(self):
"""Returns the size (in bytes) of the buffer register."""
- return 64
+ return self.hw.getBufferRegSize()
def cmdReadBufferReg(self, nrBytes=-1):
"""Read the buffer register. Returns nrBytes (default all bytes)."""
@@ -506,9 +400,8 @@ class TOP:
nrBytes = regSize
assert(nrBytes <= regSize)
if not nrBytes:
- return ""
- self.queueCommand(int2byte(0x07))
- return self.receive(regSize)[0:nrBytes]
+ return b""
+ return self.hw.readBufferReg(nrBytes)
def cmdReadBufferReg8(self):
"""Read a 8bit value from the buffer register."""
@@ -543,139 +436,37 @@ class TOP:
(byte2int(stat[4]) << 32) | (byte2int(stat[5]) << 40)
return stat
- def cmdRequestVersion(self):
- """Returns the device ID and versioning string."""
- self.queueCommand(b"\x0E\x11\x00\x00")
- data = self.cmdReadBufferReg(16)
- return data.strip()
-
- def cmdFPGAInitiateConfig(self):
- """Initiate a configuration sequence on the FPGA."""
- self.queueCommand(b"\x0E\x21\x00\x00")
-
- def cmdFPGAUploadConfig(self, data):
- """Upload configuration data into the FPGA."""
- assert(len(data) <= 60)
- cmd = b"\x0E\x22\x00\x00" + data
- cmd += b"\x00" * (64 - len(cmd)) # padding
- self.queueCommand(cmd)
-
- def __makeFPGAAddr(self, address):
- # Set the "address OK" bit
- return address | 0x10
-
def cmdFPGARead(self, address):
"""Read a byte from the FPGA at address into the buffer register."""
- address = self.__makeFPGAAddr(address)
- if address == self.__makeFPGAAddr(0): # Fast tracked
- self.queueCommand(int2byte(0x01))
- return
- self.queueCommand(int2byte(0x0B) + int2byte(address))
+ self.hw.FPGARead(address)
def cmdFPGAWrite(self, address, byte):
"""Write a byte to an FPGA address."""
- address = self.__makeFPGAAddr(address)
- if address == self.__makeFPGAAddr(0): # Fast tracked
- self.queueCommand(int2byte(0x10) + int2byte(byte))
- return
- self.queueCommand(int2byte(0x0A) + int2byte(address) + int2byte(byte))
+ return self.hw.FPGAWrite(address, byte)
def cmdLoadGNDLayout(self, layout):
- """Load the GND configuration into the H/L shiftregisters."""
- cmd = int2byte(0x0E) + int2byte(0x16) + int2byte(layout) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
- self.hostDelay(0.15)
+ """Load the GND configuration into the programmer."""
+ self.hw.loadGNDLayout(layout)
def cmdSetVPPVoltage(self, voltage):
"""Set the VPP voltage. voltage is a floating point voltage number."""
- centivolt = int(voltage * 10)
- cmd = int2byte(0x0E) + int2byte(0x12) + int2byte(centivolt) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
+ self.hw.setVPPVoltage(voltage)
def cmdLoadVPPLayout(self, layout):
- """Load the VPP configuration into the shift registers."""
- cmd = int2byte(0x0E) + int2byte(0x14) + int2byte(layout) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
- self.hostDelay(0.15)
+ """Load the VPP configuration into the programmer."""
+ self.hw.loadVPPLayout(layout)
def cmdSetVCCVoltage(self, voltage):
"""Set the VCC voltage. voltage is a floating point voltage number."""
- centivolt = int(voltage * 10)
- cmd = int2byte(0x0E) + int2byte(0x13) + int2byte(centivolt) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
+ self.hw.setVCCVoltage(voltage)
def cmdLoadVCCLayout(self, layout):
"""Load the VCC configuration into the shift registers."""
- cmd = int2byte(0x0E) + int2byte(0x15) + int2byte(layout) + int2byte(0)
- self.queueCommand(cmd)
- self.cmdDelay(0.01)
- self.hostDelay(0.15)
+ self.hw.loadVCCLayout(layout)
def cmdEnableZifPullups(self, enable):
"""Enable the ZIF socket signal pullups."""
- param = 0
- if enable:
- param = 1
- cmd = int2byte(0x0E) + int2byte(0x28) + int2byte(param) + int2byte(0)
- self.queueCommand(cmd)
-
- def __doSend(self, command):
- try:
- assert(len(command) <= 64)
- if self.verbose >= 3:
- print "Sending command:"
- dumpMem(command)
- ep = self.bulkOut.address
- self.usbh.bulkWrite(ep, command)
- except (usb.USBError), e:
- raise TOPException("USB bulk write error: " + str(e))
-
- def queueCommand(self, command):
- """Queue a raw command for transmission."""
- assert(len(command) <= 64)
- if self.noqueue:
- self.__doSend(command)
- else:
- self.commandQueue.append(command)
-
- def runCommandSync(self, command):
- """Run a command synchronously.
- Warning: This is slow. Don't use it unless there's a very good reason."""
- self.flushCommands()
- self.queueCommand(command)
- self.flushCommands()
-
- def receive(self, size):
- """Receive 'size' bytes on the bulk-in ep."""
- # If there are blocked commands in the queue, send them now.
- self.flushCommands()
- try:
- ep = self.bulkIn.address
- data = b"".join(map(lambda b: int2byte(b),
- self.usbh.bulkRead(ep, size)))
- if len(data) != size:
- raise TOPException("USB bulk read error: Could not read the " +\
- "requested number of bytes (req %d, got %d)" % (size, len(data)))
- if self.verbose >= 3:
- print "Received data:"
- dumpMem(data)
- except (usb.USBError), e:
- raise TOPException("USB bulk read error: " + str(e))
- return data
+ self.hw.enableZifPullups(enable)
def flushCommands(self):
- """Flush the command queue."""
- command = b""
- for oneCommand in self.commandQueue:
- assert(len(oneCommand) <= 64)
- if len(command) + len(oneCommand) > 64:
- self.__doSend(command)
- command = b""
- command += oneCommand
- if command:
- self.__doSend(command)
- self.commandQueue = []
+ self.hw.flushCommands()
bues.ch cgit interface