#!/usr/bin/env python """ # Copyright (C) 2008 Michael Buesch # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ import getopt import sys from serial.serialposix import * from PyQt4.QtCore import * from PyQt4.QtGui import * # Serial communication port configuration CONFIG_BAUDRATE = 9600 CONFIG_BYTESIZE = 8 CONFIG_PARITY = PARITY_EVEN CONFIG_STOPBITS = 2 # The size of one message MSG_SIZE = 38 MSG_PAYLOAD_SIZE = 32 # Message IDs MSG_INVALID = 0 MSG_ERROR = 1 MSG_LOGMESSAGE = 2 MSG_PING = 3 MSG_PONG = 4 MSG_GET_CURRENT_PRESSURE = 5 MSG_CURRENT_PRESSURE = 6 MSG_GET_DESIRED_PRESSURE = 7 MSG_DESIRED_PRESSURE = 8 MSG_SET_DESIRED_PRESSURE = 9 MSG_GET_HYSTERESIS = 10 MSG_HYSTERESIS = 11 MSG_SET_HYSTERESIS = 12 MSG_GET_CONFIG_FLAGS = 13 MSG_CONFIG_FLAGS = 14 MSG_SET_CONFIG_FLAGS = 15 # Message error codes MSG_ERR_NONE = 0 MSG_ERR_CHKSUM = 1 MSG_ERR_NOREPLY = -1 # internal. Not sent over wire. # Config flags CFG_FLAG_AUTOADJUST_ENABLE = 0 def usage(): print "Pressure control - remote configuration" print "" print "Copyright (C) 2008 Michael Buesch " print "Licensed under the GNU/GPL version 3" print "" print "Usage: pctl-remote [OPTIONS] /dev/ttyS0" print "" print "-h|--help Print this help text" def parseArgs(): global opt_ttyfile if len(sys.argv) < 2: usage() sys.exit(1) opt_ttyfile = sys.argv[-1] try: (opts, args) = getopt.getopt(sys.argv[1:-1], "h", [ "help" ]) except getopt.GetoptError: usage() sys.exit(1) for (o, v) in opts: if o in ("-h", "--help"): usage() sys.exit(0) class RemoteProtocol(QObject): def __init__(self, ttyfile): QObject.__init__(self) self.serial = Serial(ttyfile, CONFIG_BAUDRATE, CONFIG_BYTESIZE, CONFIG_PARITY, CONFIG_STOPBITS) self.pollTimer = QTimer(self) self.connect(self.pollTimer, SIGNAL("timeout()"), self.poll) self.pollTimer.start(50) reply = self.sendMessageSyncReply(MSG_PING, "", MSG_PONG) if not reply: print "Communication with device failed. No reply to PING request." sys.exit(1) mainwnd.centralWidget().log.addText("PING->PONG success. Device is alife.\n") def poll(self): if self.serial.inWaiting() < MSG_SIZE: return self.parseMessage(self.serial.read(MSG_SIZE)) def checksumMessage(self, msg): calc_crc = self.__crc16_update_buffer(0xFFFF, msg[0:-2]) calc_crc ^= 0xFFFF want_crc = (ord(msg[-2]) | (ord(msg[-1]) << 8)) if calc_crc != want_crc: text = self.tr("ERROR: message CRC mismatch\n") mainwnd.centralWidget().log.addText(text) self.serial.flushInput() return False return True def parseMessage(self, msg): if not self.checksumMessage(msg): return id = ord(msg[0]) if (id == MSG_LOGMESSAGE): str = self.getPayload(msg).rstrip('\0') mainwnd.centralWidget().log.addText(str) if (id == MSG_PING): sendMessage(MSG_PING, []) if (id == MSG_CURRENT_PRESSURE): mainwnd.centralWidget().parseCurrentPressureMsg(msg) def getPayload(self, msg): return msg[4:-2] def sendMessage(self, id, payload): """Send a message""" assert(len(payload) <= MSG_PAYLOAD_SIZE) # Create the header msg = "%c\0\0\0" % id # Add the payload msg += payload # Pad the payload up to the constant size i = MSG_PAYLOAD_SIZE - len(payload) while i: msg += '\0' i -= 1 # Calculate the CRC crc = self.__crc16_update_buffer(0xFFFF, msg) crc ^= 0xFFFF # Add the CRC to the message msg += "%c%c" % ((crc & 0xFF), ((crc >> 8) & 0xFF)) # Send the message assert(len(msg) == MSG_SIZE) self.serial.write(msg) def sendMessageSyncReply(self, id, payload, replyId): """Send a message and synchronously wait for the reply.""" self.pollTimer.stop() self.sendMessage(id, payload) timeout = QDateTime.currentDateTime().addSecs(2) while True: if QDateTime.currentDateTime() >= timeout: msg = None break if self.serial.inWaiting() < MSG_SIZE: app.processEvents() QThread.msleep(50) continue msg = self.serial.read(MSG_SIZE) if not self.checksumMessage(msg): continue msgid = ord(msg[0]) if msgid == replyId: break # This is not a reply to our message. self.parseMessage(msg) self.pollTimer.start() return msg def sendMessageSyncError(self, id, payload): """Sends a message and synchronously waits for the MSG_ERROR reply.""" reply = self.sendMessageSyncReply(id, payload, MSG_ERROR) if not reply: return MSG_ERR_NOREPLY return ord(self.getPayload(reply)[0]) def __crc16_update_buffer(self, crc, buf): for c in buf: crc ^= ord(c) for i in range(0, 8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc = (crc >> 1) return crc class StatusBar(QStatusBar): def showMessage(self, msg): QStatusBar.showMessage(self, msg, 10000) class LogBrowser(QTextEdit): def __init__(self, parent=None): QTextBrowser.__init__(self, parent) self.needTimeStamp = True self.setReadOnly(1) self.addText(self.tr("Pressure Control logging started\n")); def addText(self, text): if self.needTimeStamp: date = QDateTime.currentDateTime() text = date.toString("[hh:mm:ss] ") + text self.textCursor().setPosition(len(self.toPlainText())) self.insertPlainText(text) self.needTimeStamp = (text[-1] in QString("\r\n")) class MainWidget(QWidget): def __init__(self, parent=None): QWidget.__init__(self, parent) self.initialized = False layout = QVBoxLayout() h = QHBoxLayout() label = QLabel(self.tr("Current pressure:"), self) h.addWidget(label) self.curPressure = QLCDNumber(self) self.curPressure.setSegmentStyle(QLCDNumber.Flat) self.curPressure.display(5.1) h.addWidget(self.curPressure) label = QLabel(self.tr("Bar"), self) h.addWidget(label) h.addStretch() layout.addLayout(h) self.autoCheckbox = QCheckBox(self.tr("Automatically adjust pressure"), self) layout.addWidget(self.autoCheckbox) h = QHBoxLayout() label = QLabel(self.tr("Desired pressure:"), self) h.addStretch() h.addWidget(label) self.pressureSpin = QDoubleSpinBox(self) self.pressureSpin.setMinimum(1) self.pressureSpin.setMaximum(8) self.pressureSpin.setSingleStep(0.1) self.pressureSpin.setSuffix(self.tr(" Bar")) self.connect(self.pressureSpin, SIGNAL("valueChanged(double)"), self.desiredPressureChanged) h.addWidget(self.pressureSpin) layout.addLayout(h) h = QHBoxLayout() label = QLabel(self.tr("Hysteresis:"), self) h.addStretch() h.addWidget(label) self.hystSpin = QDoubleSpinBox(self) self.hystSpin.setMinimum(0.1) self.hystSpin.setMaximum(8) self.hystSpin.setSingleStep(0.1) self.hystSpin.setSuffix(self.tr(" Bar")) self.connect(self.hystSpin, SIGNAL("valueChanged(double)"), self.desiredHysteresisChanged) h.addWidget(self.hystSpin) layout.addLayout(h) self.log = LogBrowser(self) layout.addWidget(self.log) self.setLayout(layout) def initializeState(self): # Get the current pressure reply = remote.sendMessageSyncReply(MSG_GET_CURRENT_PRESSURE, "", MSG_CURRENT_PRESSURE) if not reply: print "Failed to fetch current pressure. No reply." sys.exit(1) self.parseCurrentPressureMsg(reply) # Get the desired pressure reply = remote.sendMessageSyncReply(MSG_GET_DESIRED_PRESSURE, "", MSG_DESIRED_PRESSURE) if not reply: print "Failed to fetch desired pressure. No reply." sys.exit(1) reply = remote.getPayload(reply) mbar = ord(reply[0]) | (ord(reply[1]) << 8) self.pressureSpin.setValue(float(mbar) / 1000) # Get the hysteresis reply = remote.sendMessageSyncReply(MSG_GET_HYSTERESIS, "", MSG_HYSTERESIS) if not reply: print "Failed to fetch hysteresis. No reply." sys.exit(1) reply = remote.getPayload(reply) mbar = ord(reply[0]) | (ord(reply[1]) << 8) self.hystSpin.setValue(float(mbar) / 1000) # Get the config flags reply = remote.sendMessageSyncReply(MSG_GET_CONFIG_FLAGS, "", MSG_CONFIG_FLAGS) if not reply: print "Failed to fetch config flags. No reply." sys.exit(1) reply = remote.getPayload(reply) flags = ord(reply[0]) | (ord(reply[1]) << 8) | \ (ord(reply[2]) << 16) | (ord(reply[3]) << 24) if flags & (1 << CFG_FLAG_AUTOADJUST_ENABLE): self.autoCheckbox.setCheckState(Qt.Checked) #TODO self.initialized = True def parseCurrentPressureMsg(self, msg): msg = remote.getPayload(msg) mbar = ord(msg[0]) | (ord(msg[1]) << 8) self.curPressure.display(float(mbar) / 1000) def desiredPressureChanged(self): if not self.initialized: return mbar = int(self.pressureSpin.value() * 1000) data = "%c%c" % ((mbar & 0xFF), ((mbar >> 8) & 0xFF)) err = remote.sendMessageSyncError(MSG_SET_DESIRED_PRESSURE, data) if err != MSG_ERR_NONE: self.log.addText(self.tr("Failed to change pressure. Error=%u\n" % err)) def desiredHysteresisChanged(self): if not self.initialized: return mbar = int(self.hystSpin.value() * 1000) data = "%c%c" % ((mbar & 0xFF), ((mbar >> 8) & 0xFF)) err = remote.sendMessageSyncError(MSG_SET_HYSTERESIS, data) if err != MSG_ERR_NONE: self.log.addText(self.tr("Failed to change hysteresis. Error=%u\n" % err)) class MainWindow(QMainWindow): def __init__(self, parent=None): QMainWindow.__init__(self, parent) self.setWindowTitle(self.tr("Pneumatic pressure control")) mb = QMenuBar(self) ctlmen = QMenu(self.tr("Control"), mb) ctlmen.addAction(self.tr("Exit"), self.close) mb.addMenu(ctlmen) helpmen = QMenu(self.tr("Help"), mb) helpmen.addAction(self.tr("About"), self.about) mb.addMenu(helpmen) self.setMenuBar(mb) self.setStatusBar(StatusBar()) self.setCentralWidget(MainWidget()) self.resize(400, 500) def initializeState(self): self.centralWidget().initializeState() def about(self): QMessageBox.information(self, self.tr("About"), self.tr("Pneumatic pressure control\n" "Copyright (c) 2008 Michael Buesch")) def main(): global remote global mainwnd global app mainwnd = None app = QApplication(sys.argv) parseArgs() mainwnd = MainWindow() remote = RemoteProtocol(opt_ttyfile) mainwnd.initializeState() mainwnd.show() exit(app.exec_()) if __name__ == "__main__": try: main() except SerialException, e: print "[Serial error] %s" % e.message