From 30389ab2bb476dcde8e2c10ce5f976a0d1a03650 Mon Sep 17 00:00:00 2001 From: Michael Buesch Date: Thu, 26 Nov 2015 19:11:19 +0100 Subject: Port to latest PyUSB Signed-off-by: Michael Buesch --- libtoprammer/hardware_access_usb.py | 75 ++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 42 deletions(-) (limited to 'libtoprammer/hardware_access_usb.py') diff --git a/libtoprammer/hardware_access_usb.py b/libtoprammer/hardware_access_usb.py index cd44e0a..774c572 100644 --- a/libtoprammer/hardware_access_usb.py +++ b/libtoprammer/hardware_access_usb.py @@ -23,17 +23,17 @@ from util import * from command_queue import * try: - import usb + import usb.core + import usb.util except (ImportError), e: - print "Python USB support module not found. Please install python-usb." + print("Python USB (PyUSB) support module not found.\n" + "Please install python-usb.") sys.exit(1) class FoundUSBDev(object): - def __init__(self, usbdev, busNr, devNr): + def __init__(self, usbdev): self.usbdev = usbdev - self.busNr = busNr - self.devNr = devNr class HardwareAccessUSB(CommandQueue): "Lowlevel USB hardware access" @@ -43,17 +43,9 @@ class HardwareAccessUSB(CommandQueue): @classmethod def scan(cls, checkCallback): "Scan for devices. Returns a list of FoundUSBDev()." - devices = [] - for bus in usb.busses(): - for dev in bus.devices: - if not checkCallback(dev): - continue - try: - busNr = int(bus.dirname, 10) - devNr = int(dev.filename, 10) - except (ValueError), e: - continue - devices.append(FoundUSBDev(dev, busNr, devNr)) + devices = list(usb.core.find(find_all = True, + custom_match = checkCallback)) + devices = [ FoundUSBDev(dev) for dev in devices ] return devices def __init__(self, usbdev, maxPacketBytes, noQueue, @@ -63,47 +55,41 @@ class HardwareAccessUSB(CommandQueue): synchronous = noQueue) self.doRawDump = doRawDump self.usbdev = usbdev - self.usbh = None self.__initUSB() def __initUSB(self): try: - self.usbh = self.usbdev.open() - config = self.usbdev.configurations[0] - interface = config.interfaces[0][0] + config = self.usbdev.configurations()[0] + interface = config.interfaces()[0] # Find the endpoints self.bulkOut = None self.bulkIn = None - for ep in interface.endpoints: + for ep in interface.endpoints(): if not self.bulkIn and \ - ep.type == usb.ENDPOINT_TYPE_BULK and \ - (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_IN: + usb.util.endpoint_type(ep.bmAttributes) == usb.util.ENDPOINT_TYPE_BULK and \ + usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN: self.bulkIn = ep if not self.bulkOut and \ - ep.type == usb.ENDPOINT_TYPE_BULK and \ - (ep.address & (usb.ENDPOINT_IN | usb.ENDPOINT_OUT)) == usb.ENDPOINT_OUT: + usb.util.endpoint_type(ep.bmAttributes) == usb.util.ENDPOINT_TYPE_BULK and \ + usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT: self.bulkOut = ep if not self.bulkIn or not self.bulkOut: raise TOPException("Did not find all USB EPs") - self.usbh.setConfiguration(config) - self.usbh.claimInterface(interface) - self.usbh.setAltInterface(interface) - self.usbh.clearHalt(self.bulkOut.address) - self.usbh.clearHalt(self.bulkIn.address) - except (usb.USBError), e: - self.usbh = None + self.usbdev.set_configuration(config) + self.bulkIn.clear_halt() + self.bulkOut.clear_halt() + + except (usb.core.USBError), e: raise TOPException("USB error: " + str(e)) def shutdown(self): "Shutdown the USB connection" try: - if self.usbh: - self.usbh.releaseInterface() - self.usbh = None - except (usb.USBError), e: + usb.util.dispose_resources(self.usbdev) + except (usb.core.USBError), e: raise TOPException("USB error: " + str(e)) def send(self, data): @@ -112,9 +98,14 @@ class HardwareAccessUSB(CommandQueue): if self.doRawDump: print("Sending command:") dumpMem(data) - self.usbh.bulkWrite(self.bulkOut.address, data, - self.TIMEOUT_MSEC) - except (usb.USBError), e: + nrWritten = self.usbdev.write( + self.bulkOut.bEndpointAddress, + data, + self.TIMEOUT_MSEC) + if nrWritten != len(data): + raise TOPException("USB bulk write error: " + "short write") + except (usb.core.USBError), e: raise TOPException("USB bulk write error: " + str(e)) def receive(self, size): @@ -122,9 +113,9 @@ class HardwareAccessUSB(CommandQueue): # If there are blocked commands in the queue, send them now. self.flushCommands() try: - ep = self.bulkIn.address + ep = self.bulkIn.bEndpointAddress data = b"".join([ int2byte(b) for b in - self.usbh.bulkRead(ep, size, + self.usbdev.read(ep, size, self.TIMEOUT_MSEC) ]) if len(data) != size: raise TOPException("USB bulk read error: Could not read the " +\ @@ -132,6 +123,6 @@ class HardwareAccessUSB(CommandQueue): if self.doRawDump: print("Received data:") dumpMem(data) - except (usb.USBError), e: + except (usb.core.USBError), e: raise TOPException("USB bulk read error: " + str(e)) return data -- cgit v1.2.3