#!/usr/bin/env python """ # Copyright (C) 2008-2009 Michael Buesch # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ import getopt import sys try: from serial.serialposix import * except ImportError: print "ERROR: pyserial module not available." print "On Debian Linux please do: apt-get install python-serial" sys.exit(1) from PyQt4.QtCore import * from PyQt4.QtGui import * # Serial communication port configuration CONFIG_BAUDRATE = 115200 CONFIG_BYTESIZE = 8 CONFIG_PARITY = PARITY_NONE CONFIG_STOPBITS = 2 # The size of one message MSG_SIZE = 6 MSG_PAYLOAD_SIZE = 4 # Message IDs MSG_ID_MASK = 0x7F MSG_INVALID = 0 MSG_ERROR = 1 MSG_LOGMESSAGE = 2 MSG_PING = 3 MSG_PONG = 4 MSG_GET_CURRENT_PRESSURE = 5 MSG_CURRENT_PRESSURE = 6 MSG_GET_DESIRED_PRESSURE = 7 MSG_DESIRED_PRESSURE = 8 MSG_SET_DESIRED_PRESSURE = 9 MSG_GET_HYSTERESIS = 10 MSG_HYSTERESIS = 11 MSG_SET_HYSTERESIS = 12 MSG_GET_CONFIG_FLAGS = 13 MSG_CONFIG_FLAGS = 14 MSG_SET_CONFIG_FLAGS = 15 MSG_SET_VALVE = 16 MSG_RESTARTED = 17 MSG_SHUTDOWN = 18 MSG_TURNON = 19 # Message flags MSG_FLAG_REQ_ERRCODE = 0x80 # Message error codes MSG_ERR_NONE = 0 # No error MSG_ERR_CHKSUM = 1 # Checksum error MSG_ERR_NOCMD = 2 # Unknown command MSG_ERR_BUSY = 3 # Busy MSG_ERR_INVAL = 4 # Invalid argument MSG_ERR_NOREPLY = -1 # internal. Not sent over wire. # Config flags CFG_FLAG_AUTOADJUST_ENABLE = 0 def usage(): print "Pressure control - remote configuration" print "" print "Copyright (C) 2008-2009 Michael Buesch " print "Licensed under the GNU/GPL version 3" print "" print "Usage: pctl-remote [OPTIONS] /dev/ttyS0" print "" print "-h|--help Print this help text" print "-p|--noping Don't initially ping the device" print "-f|--nofetch Don't initially fetch the device state" print "-k|--noka Don't send keep-alive pings" print "-l|--log FILE Log status information to FILE" def parseArgs(): global opt_ttyfile global opt_noping global opt_nofetch global opt_noka global opt_logfile if len(sys.argv) < 2: usage() sys.exit(1) opt_ttyfile = sys.argv[-1] opt_noping = 0 opt_nofetch = 0 opt_noka = 0 opt_logfile = None try: (opts, args) = getopt.getopt(sys.argv[1:-1], "hpfkl:", [ "help", "noping", "nofetch", "noka", "log=", ]) except getopt.GetoptError: usage() sys.exit(1) for (o, v) in opts: if o in ("-h", "--help"): usage() sys.exit(0) if o in ("-p", "--noping"): opt_noping = 1 if o in ("-f", "--nofetch"): opt_nofetch = 1 if o in ("-k", "--noka"): opt_noka = 1 if o in ("-l", "--log"): opt_logfile = v class Log(QObject): def __init__(self, logfile): QObject.__init__(self) self.fd = None if not logfile: return try: self.fd = file(logfile, "w+b") except IOError, e: print "Failed to open logfile %s: %s" % (logfile, e.strerror) sys.exit(1) self.write("X/Y,X/Y lower threshold,X/Y upper threshold,"+\ "Z,Z lower threshold,Z upper threshold,\n") def write(self, message): if not self.fd: return self.fd.write(message) self.fd.flush() def logPressure(self, xy, xy_desired, xy_hyst, z, z_desired, z_hyst): xy_lower = xy_desired - xy_hyst xy_upper = xy_desired + xy_hyst z_lower = z_desired - z_hyst z_upper = z_desired + z_hyst self.write("%d,%d,%d,%d,%d,%d,\n" %\ (xy, xy_lower, xy_upper, z, z_lower, z_upper)) class RemoteProtocol(QObject): def __init__(self, ttyfile): QObject.__init__(self) global remote try: remote = self self.serial = Serial(ttyfile, CONFIG_BAUDRATE, CONFIG_BYTESIZE, CONFIG_PARITY, CONFIG_STOPBITS) self.__setResetLine(False) QThread.msleep(100) self.serial.flushInput() self.devRestarted = False self.pollTimer = QTimer(self) self.connect(self.pollTimer, SIGNAL("timeout()"), self.__poll) self.pollTimer.start(50) self.keepAliveTimer = QTimer(self) self.connect(self.keepAliveTimer, SIGNAL("timeout()"), self.__keepAlive) if not opt_noka: self.keepAliveTimer.start(1000) if not opt_noping: reply = self.sendMessageSyncReply(MSG_PING, 0, "", MSG_PONG) if reply: mainwnd.centralWidget().log.hostLog( "PING->PONG success. Device is alife.\n") else: mainwnd.centralWidget().log.hostLog( "Communication with device failed. "+\ "No reply to PING request.\n") except (SerialException, OSError, IOError), e: print e sys.exit(1) def __setResetLine(self, resetOn): # RTS is connected to the microcontroller reset line. # If RTS is logic high, the reset line is pulled low # and the microcontroller is put into reset. # If RTS is logic low, the microcontroller is in # normal operation. try: self.serial.setRTS(bool(resetOn)) except (SerialException, OSError, IOError), e: mainwnd.statusBar().showMessage("Failed to toggle reset. %s" % e) def rebootDevice(self): # Reboot the microcontroller self.__setResetLine(True) QThread.msleep(50) self.__setResetLine(False) QThread.msleep(50) def stopKeepAliveTimer(self): self.keepAliveTimer.stop() def __keepAlive(self): reply = self.sendMessageSyncReply(MSG_PING, 0, "", MSG_PONG) if not reply: mainwnd.centralWidget().log.hostLog(self.tr( "Keep-alife: Device did not reply to ping "+\ "request. Rebooting device...\n")) self.rebootDevice() if not mainwnd.centralWidget().fetchState(): mainwnd.centralWidget().log.hostLog(self.tr( "Keep-alife: Failed to fetch configuration\n")) def __poll(self): try: if self.serial.inWaiting() >= MSG_SIZE: self.parseMessage(self.serial.read(MSG_SIZE)) except (SerialException, OSError, IOError), e: mainwnd.statusBar().showMessage("Failed to poll message. %s" % e) return if self.devRestarted: if mainwnd.centralWidget().fetchState(): mainwnd.centralWidget().log.hostLog(self.tr("Device rebooted\n")) mainwnd.centralWidget().turnOnDevice() self.devRestarted = False def checksumMessage(self, msg): calc_crc = self.__crc8_update_buffer(0xFF, msg[0:-1]) calc_crc ^= 0xFF want_crc = ord(msg[-1]) if calc_crc != want_crc: text = self.tr("ERROR: message CRC mismatch\n") mainwnd.centralWidget().log.hostLog(text) try: self.serial.flushInput() except (SerialException, OSError, IOError), e: pass return False return True def parseMessage(self, msg): if not self.checksumMessage(msg): return id = ord(msg[0]) & MSG_ID_MASK if (id == MSG_LOGMESSAGE): str = self.getPayload(msg).rstrip('\0') mainwnd.centralWidget().log.devLog(str) if (id == MSG_CURRENT_PRESSURE): mainwnd.centralWidget().parseCurrentPressureMsg(msg) if (id == MSG_RESTARTED): self.devRestarted = True def getPayload(self, msg): return msg[1:-1] def sendMessage(self, id, flags, payload): """Send a message""" assert(len(payload) <= MSG_PAYLOAD_SIZE) # Create the header msg = "%c" % (id | flags) # Add the payload msg += payload # Pad the payload up to the constant size msg += '\0' * (MSG_PAYLOAD_SIZE - len(payload)) # Calculate the CRC crc = self.__crc8_update_buffer(0xFF, msg) crc ^= 0xFF # Add the CRC to the message msg += "%c" % crc # Send the message assert(len(msg) == MSG_SIZE) try: self.serial.write(msg) except (SerialException, OSError, IOError), e: mainwnd.statusBar().showMessage("Failed to send message. %s" % e) def sendMessageSyncReply(self, id, flags, payload, replyId): """Send a message and synchronously wait for the reply.""" self.pollTimer.stop() self.sendMessage(id, flags, payload) timeout = QDateTime.currentDateTime().addMSecs(500) try: while True: if QDateTime.currentDateTime() >= timeout: msg = None break if self.serial.inWaiting() < MSG_SIZE: QThread.msleep(1) continue msg = self.serial.read(MSG_SIZE) if not self.checksumMessage(msg): continue msgid = ord(msg[0]) if msgid == replyId: break # This is not a reply to our message. self.parseMessage(msg) except (SerialException, OSError, IOError), e: mainwnd.statusBar().showMessage("Failed to fetch reply. %s" % e) msg = "\0" * MSG_SIZE self.pollTimer.start() return msg def sendMessageSyncError(self, id, flags, payload): """Sends a message and synchronously waits for the MSG_ERROR reply.""" flags |= MSG_FLAG_REQ_ERRCODE reply = self.sendMessageSyncReply(id, flags, payload, MSG_ERROR) if not reply: return MSG_ERR_NOREPLY return ord(self.getPayload(reply)[0]) def configFlagsFetch(self): reply = self.sendMessageSyncReply(MSG_GET_CONFIG_FLAGS, 0, "", MSG_CONFIG_FLAGS) if not reply: return None reply = remote.getPayload(reply) xy = ord(reply[0]) z = ord(reply[1]) return (xy, z) def configFlagsSet(self, island, flags): data = "%c%c" % (island, flags) err = self.sendMessageSyncError(MSG_SET_CONFIG_FLAGS, 0, data) return err def setValve(self, islandId, valveNr, state): data = "%c%c%c" % (islandId, valveNr, (state != 0)) i = 5 # Retry a few times while i != 0: err = self.sendMessageSyncError(MSG_SET_VALVE, 0, data) if err == MSG_ERR_NONE: break i -= 1 return err def __crc8_update_buffer(self, crc, buf): for c in buf: crc ^= ord(c) for i in range(0, 8): if crc & 1: crc = (crc >> 1) ^ 0x8C else: crc >>= 1 return crc & 0xFF class StatusBar(QStatusBar): def showMessage(self, msg): print msg QStatusBar.showMessage(self, msg, 3000) class LogBrowser(QTextEdit): def __init__(self, parent=None): QTextEdit.__init__(self, parent) self.msgs = [] self.curDevMsg = "" self.setReadOnly(1) self.hostLog(self.tr("Pressure Control logging started\n")); def __commit(self): MSG_LIMIT = 100 if len(self.msgs) > MSG_LIMIT: self.msgs = self.msgs[1:] assert(len(self.msgs) == MSG_LIMIT) self.setPlainText("".join(self.msgs)) # Scroll to the end of the log vScroll = self.verticalScrollBar() vScroll.setValue(vScroll.maximum()) def __dateString(self): date = QDateTime.currentDateTime() return str(date.toString("[hh:mm:ss] ")) def devLog(self, text): text = str(text) # to python string if not text: return if not self.curDevMsg: text = self.__dateString() + "Dev: " + text self.curDevMsg += text if text[-1] in "\r\n": self.msgs.append(self.curDevMsg) self.curDevMsg = "" self.__commit() def hostLog(self, text): text = str(text) # to python string text = self.__dateString() + "Host: " + text self.msgs.append(text) self.__commit() class PressureGauge(QWidget): def __init__(self, name, min, max, units, parent): QWidget.__init__(self, parent) self.min = min self.max = max self.units = units self.setLayout(QVBoxLayout()) self.title = QLabel(name, self) self.title.setAlignment(Qt.AlignHCenter) self.layout().addWidget(self.title) self.dial = QDial(self) self.dial.setNotchesVisible(1) self.dial.setEnabled(0) self.dial.setSingleStep(100) self.dial.setPageStep(1000) self.dial.setNotchTarget(2) self.dial.setMinimum(int(min * 1000)) self.dial.setMaximum(int(max * 1000)) self.dial.setFixedSize(100, 100) self.layout().addWidget(self.dial) self.num = QLabel(self) self.num.setAlignment(Qt.AlignHCenter) self.layout().addWidget(self.num) self.layout().addStretch() self.setValue(0) def setValue(self, value): if (value < self.min): value = self.min if (value > self.max): value = self.max self.num.setText("%.2f %s" % (float(value), self.units)) self.dial.setValue(int(value * 1000)) class ValveIslandWidget(QGroupBox): def __init__(self, name, islandId, parent): QGroupBox.__init__(self, name, parent) self.islandId = islandId self.setLayout(QGridLayout()) self.gauge = PressureGauge("Current pressure", 0, 10, "Bar", self) self.layout().addWidget(self.gauge, 0, 0, 3, 1) h = QHBoxLayout() h.addStretch() self.autoCheckbox = QCheckBox(self.tr("Automatically adjust pressure"), self) self.connect(self.autoCheckbox, SIGNAL("stateChanged(int)"), self.autoadjustChanged) h.addWidget(self.autoCheckbox) self.layout().addLayout(h, 0, 1) h = QHBoxLayout() h.addStretch() label = QLabel(self.tr("Desired pressure:"), self) h.addWidget(label) self.pressureSpin = QDoubleSpinBox(self) self.pressureSpin.setMinimum(0.5) self.pressureSpin.setMaximum(8) self.pressureSpin.setSingleStep(0.1) self.pressureSpin.setSuffix(self.tr(" Bar")) self.connect(self.pressureSpin, SIGNAL("valueChanged(double)"), self.desiredPressureChanged) h.addWidget(self.pressureSpin) self.layout().addLayout(h, 1, 1) h = QHBoxLayout() h.addStretch() label = QLabel(self.tr("Hysteresis:"), self) h.addWidget(label) self.hystSpin = QDoubleSpinBox(self) self.hystSpin.setMinimum(0.05) self.hystSpin.setMaximum(8) self.hystSpin.setSingleStep(0.05) self.hystSpin.setSuffix(self.tr(" Bar")) self.connect(self.hystSpin, SIGNAL("valueChanged(double)"), self.desiredHysteresisChanged) h.addWidget(self.hystSpin) self.layout().addLayout(h, 2, 1) h = QHBoxLayout() self.inButton = QPushButton(self.tr("IN-Valve"), self) self.connect(self.inButton, SIGNAL("pressed()"), self.inValvePressed) self.connect(self.inButton, SIGNAL("released()"), self.inValveReleased) h.addWidget(self.inButton) self.outButton = QPushButton(self.tr("OUT-Valve"), self) h.addWidget(self.outButton) self.connect(self.outButton, SIGNAL("pressed()"), self.outValvePressed) self.connect(self.outButton, SIGNAL("released()"), self.outValveReleased) self.layout().addLayout(h, 3, 0, 1, 2) self.autoadjustChanged(Qt.Unchecked) def desiredPressureChanged(self, value): if not self.parent().initialized: return mainwnd.centralWidget().pokeUiTimer() mbar = int(value * 1000) data = "%c%c%c" % (self.islandId, (mbar & 0xFF), ((mbar >> 8) & 0xFF)) err = remote.sendMessageSyncError(MSG_SET_DESIRED_PRESSURE, 0, data) if err != MSG_ERR_NONE: self.parent().log.hostLog(self.tr("Failed to change pressure. Error=%u\n" % err)) def getDesiredPressure(self): return int(self.pressureSpin.value() * 1000) def desiredHysteresisChanged(self, value): if not self.parent().initialized: return mainwnd.centralWidget().pokeUiTimer() mbar = int(value * 1000) data = "%c%c%c" % (self.islandId, (mbar & 0xFF), ((mbar >> 8) & 0xFF)) err = remote.sendMessageSyncError(MSG_SET_HYSTERESIS, 0, data) if err != MSG_ERR_NONE: self.parent().log.hostLog(self.tr("Failed to change hysteresis. Error=%u\n" % err)) def getHysteresis(self): return int(self.hystSpin.value() * 1000) def autoadjustChanged(self, state): self.inButton.setEnabled(state == Qt.Unchecked) self.outButton.setEnabled(state == Qt.Unchecked) if not self.parent().initialized: return mainwnd.centralWidget().pokeUiTimer() flags = remote.configFlagsFetch() if flags == None: self.parent().log.hostLog(self.tr("Failed to fetch config flags\n")) return flags = flags[self.islandId] if state == Qt.Checked: flags |= (1 << CFG_FLAG_AUTOADJUST_ENABLE) else: flags &= ~(1 << CFG_FLAG_AUTOADJUST_ENABLE) err = remote.configFlagsSet(self.islandId, flags) if err != MSG_ERR_NONE: self.parent().log.hostLog(self.tr("Failed to set config flags\n")) def inValvePressed(self): mainwnd.centralWidget().stopUiTimer() err = remote.setValve(self.islandId, 0, 1) if err != MSG_ERR_NONE: self.parent().log.hostLog(self.tr("Failed to switch valve 0 ON\n")) def inValveReleased(self): mainwnd.centralWidget().startUiTimer() err = remote.setValve(self.islandId, 0, 0) if err != MSG_ERR_NONE: self.parent().log.hostLog(self.tr("Failed to switch valve 0 OFF\n")) def outValvePressed(self): mainwnd.centralWidget().stopUiTimer() err = remote.setValve(self.islandId, 1, 1) if err != MSG_ERR_NONE: self.parent().log.hostLog(self.tr("Failed to switch valve 1 ON\n")) def outValveReleased(self): mainwnd.centralWidget().startUiTimer() err = remote.setValve(self.islandId, 1, 0) if err != MSG_ERR_NONE: self.parent().log.hostLog(self.tr("Failed to switch valve 1 OFF\n")) class MainWidget(QWidget): def __init__(self, parent=None): QWidget.__init__(self, parent) self.initialized = False self.setLayout(QVBoxLayout()) self.uiLock = QCheckBox(self.tr("User interface enabled"), self) self.uiLock.setCheckState(Qt.Unchecked) self.connect(self.uiLock, SIGNAL("stateChanged(int)"), self.__uiLockChanged) self.layout().addWidget(self.uiLock) self.uiLockTimer = QTimer(self) self.connect(self.uiLockTimer, SIGNAL("timeout()"), self.__uiLockTimerExpired) self.xy = ValveIslandWidget("X/Y joints", 0, self) self.layout().addWidget(self.xy) self.z = ValveIslandWidget("Z joint", 1, self) self.layout().addWidget(self.z) self.log = LogBrowser(self) self.layout().addWidget(self.log) self.__uiEnable(False) def __uiEnable(self, enabled): self.xy.setEnabled(enabled) self.z.setEnabled(enabled) def stopUiTimer(self): self.uiLockTimer.stop() def startUiTimer(self): self.uiLockTimer.start(10000) def pokeUiTimer(self): self.stopUiTimer() self.startUiTimer() def __uiLockChanged(self, state): enabled = (state != Qt.Unchecked) self.__uiEnable(enabled) if enabled: self.pokeUiTimer() else: self.stopUiTimer() def __uiLockTimerExpired(self): self.uiLock.setCheckState(Qt.Unchecked) def initializeState(self): if not opt_nofetch: if not self.fetchState(): self.log.hostLog( "Failed to fetch active configuration "+\ "from device.\n") self.turnOnDevice() self.initialized = True def turnOnDevice(self): error = remote.sendMessageSyncError(MSG_TURNON, 0, "") if error: self.log.hostLog("Failed to turn on device\n") else: self.log.hostLog("Device turned on\n") def shutdown(self): remote.stopKeepAliveTimer() if self.initialized: error = remote.sendMessageSyncError(MSG_SHUTDOWN, 0, "") if error != MSG_ERR_NONE: QMessageBox.critical(self, "Pressure Control", "Failed to shutdown device") def fetchState(self): # Get the current pressure reply = remote.sendMessageSyncReply(MSG_GET_CURRENT_PRESSURE, 0, "", MSG_CURRENT_PRESSURE) if not reply: print "Failed to fetch current pressure. No reply." return False self.parseCurrentPressureMsg(reply) # Get the desired pressure reply = remote.sendMessageSyncReply(MSG_GET_DESIRED_PRESSURE, 0, "", MSG_DESIRED_PRESSURE) if not reply: print "Failed to fetch desired pressure. No reply." return False reply = remote.getPayload(reply) xy_mbar = ord(reply[0]) | (ord(reply[1]) << 8) z_mbar = ord(reply[2]) | (ord(reply[3]) << 8) self.xy.pressureSpin.setValue(float(xy_mbar) / 1000) self.z.pressureSpin.setValue(float(z_mbar) / 1000) # Get the hysteresis reply = remote.sendMessageSyncReply(MSG_GET_HYSTERESIS, 0, "", MSG_HYSTERESIS) if not reply: print "Failed to fetch hysteresis. No reply." return False reply = remote.getPayload(reply) xy_mbar = ord(reply[0]) | (ord(reply[1]) << 8) z_mbar = ord(reply[2]) | (ord(reply[3]) << 8) self.xy.hystSpin.setValue(float(xy_mbar) / 1000) self.z.hystSpin.setValue(float(z_mbar) / 1000) # Get the config flags flags = remote.configFlagsFetch() if flags == None: print "Failed to fetch config flags. No reply." return False if flags[0] & (1 << CFG_FLAG_AUTOADJUST_ENABLE): self.xy.autoCheckbox.setCheckState(Qt.Checked) if flags[1] & (1 << CFG_FLAG_AUTOADJUST_ENABLE): self.z.autoCheckbox.setCheckState(Qt.Checked) return True def parseCurrentPressureMsg(self, msg): msg = remote.getPayload(msg) xy_mbar = ord(msg[0]) | (ord(msg[1]) << 8) z_mbar = ord(msg[2]) | (ord(msg[3]) << 8) self.xy.gauge.setValue(float(xy_mbar) / 1000) self.z.gauge.setValue(float(z_mbar) / 1000) log.logPressure(xy_mbar, self.xy.getDesiredPressure(), self.xy.getHysteresis(), z_mbar, self.z.getDesiredPressure(), self.z.getHysteresis()) class MainWindow(QMainWindow): def __init__(self, parent=None): QMainWindow.__init__(self, parent) self.setWindowTitle(self.tr("Pneumatic pressure control")) mb = QMenuBar(self) ctlmen = QMenu(self.tr("Control"), mb) ctlmen.addAction(self.tr("Exit"), self.close) mb.addMenu(ctlmen) helpmen = QMenu(self.tr("Help"), mb) helpmen.addAction(self.tr("About"), self.about) mb.addMenu(helpmen) self.setMenuBar(mb) self.setStatusBar(StatusBar()) self.setCentralWidget(MainWidget()) self.resize(400, 350) def initializeState(self): self.centralWidget().initializeState() def shutdown(self): self.centralWidget().shutdown() def about(self): QMessageBox.information(self, self.tr("About"), self.tr("Pneumatic pressure control\n" "Copyright (c) 2008-2009 Michael Buesch")) def main(): global remote global mainwnd global app global log mainwnd = None app = QApplication(sys.argv) parseArgs() log = Log(opt_logfile) mainwnd = MainWindow() remote = RemoteProtocol(opt_ttyfile) mainwnd.initializeState() mainwnd.show() result = app.exec_() mainwnd.shutdown() exit(result) if __name__ == "__main__": main()