#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # AWL simulator - Commandline interface # # Copyright 2012-2014 Michael Buesch # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # from __future__ import division, absolute_import, print_function, unicode_literals import sys import os import getopt import traceback import signal from awlsim.common import * from awlsim.core import * from awlsim.coreclient import * class TextInterfaceAwlSimClient(AwlSimClient): def handle_CPUDUMP(self, dumpText): emitCpuDump(dumpText) def usage(): printInfo("awlsim version %d.%d" % (VERSION_MAJOR, VERSION_MINOR)) printInfo("") printInfo("%s [OPTIONS] " % sys.argv[0]) printInfo("") printInfo("Options:") printInfo(" -C|--cycle-limit SEC Cycle time limit, in seconds (default 5.0)") printInfo(" -M|--max-runtime SEC CPU will be stopped after SEC seconds (default: off)") printInfo(" -2|--twoaccu Force 2-accu mode") printInfo(" -4|--fouraccu Force 4-accu mode") printInfo(" -D|--no-cpu-dump Do not show CPU status while running") printInfo(" -q|--quiet Do not show any status messages") printInfo(" -x|--extended-insns Enable extended instructions") printInfo(" -t|--obtemp 1/0 Enable/disable writing of OB-temp variables (Default: off)") printInfo(" -T|--clock-mem ADDR Force clock memory address (Default: off)") printInfo(" -m|--mnemonics auto Force mnemonics type: en, de, auto") printInfo(" -P|--profile 0 Set profiling level (Default: 0)") printInfo("") printInfo("Server backend related options:") printInfo(" -c|--connect IP:PORT Connect to server backend") printInfo(" -b|--spawn-backend Spawn a new backend server and connect to it") if not isWinStandalone: printInfo(" -i|--interpreter EXE Set the backend interpreter executable") printInfo("") printInfo("Loading hardware modules:") printInfo(" -H|--hardware NAME:PARAM=VAL:PARAM=VAL...") printInfo("Print module information:") printInfo(" -I|--hardware-info NAME") printInfo("") printInfo(" Where NAME is the name of the hardware module.") printInfo(" PARAM=VAL are optional hardware specific parameters.") printInfo("") printInfo("Other options:") printInfo(" --list-sfc Print a list of all supported SFCs") printInfo(" --list-sfc-verbose Verbose SFC list") printInfo(" --list-sfb Print a list of all supported SFBs") printInfo(" --list-sfb-verbose Verbose SFB list") def printSysblockInfo(blockTable, prefix, withExtended, withInterface): for block in sorted(blockTable.values(), key = lambda b: b.name[0]): if block.broken: continue number, name, desc = block.name if number < 0 and not withExtended: continue if desc: desc = " (%s)" % desc else: desc = "" printInfo(" %s %d \"%s\"%s" % (prefix, number, name, desc)) if withInterface: for ftype in (BlockInterfaceField.FTYPE_IN, BlockInterfaceField.FTYPE_OUT, BlockInterfaceField.FTYPE_INOUT): try: fields = block.interfaceFields[ftype] except KeyError: continue for field in fields: field.fieldType = ftype printInfo(" %s" % str(field)) def writeStdout(message): if not opt_quiet: sys.stdout.write(message) sys.stdout.flush() nextScreenUpdate = 0.0 lastDump = "" def clearConsole(): # Make cursor visible, clear console and # move cursor to homeposition. if osIsPosix: writeStdout("\x1B[?25h\x1B[2J\x1B[H") elif osIsWindows: os.system("cls") def emitCpuDump(dump): # Pad lines dump = '\n'.join(line + (78 - len(line)) * ' ' + '|' for line in dump.splitlines()) global lastDump lastDump = dump if osIsPosix: writeStdout("\x1B[H" + dump) else: clearConsole() writeStdout(dump) def cpuDumpCallback(cpu): global nextScreenUpdate if cpu.now >= nextScreenUpdate: nextScreenUpdate = cpu.now + 0.1 emitCpuDump(str(cpu)) def makeLoglevel(): return Logging.LOG_ERROR if opt_quiet else Logging.LOG_INFO def assignCpuSpecs(cpuSpecs, projectCpuSpecs): cpuSpecs.assignFrom(projectCpuSpecs) if opt_mnemonics is not None: cpuSpecs.setConfiguredMnemonics(opt_mnemonics) if opt_nrAccus is not None: cpuSpecs.setNrAccus(opt_nrAccus) if opt_clockMem is not None: cpuSpecs.setClockMemByte(opt_clockMem) def readInputFile(inputFile): if Project.fileIsProject(inputFile): project = Project.fromFile(inputFile) else: # make a fake project awlSrc = AwlSource.fromFile(name = inputFile, filepath = inputFile) project = Project(projectFile = None, awlSources = [ awlSrc, ]) return project def run(inputFile): s = None try: import awlsim.cython_helper if awlsim.cython_helper.shouldUseCython(): writeStdout("*** Using accelerated CYTHON core " "(AWLSIMCYTHON environment variable is set)\n") project = readInputFile(inputFile) writeStdout("Parsing code...\n") parseTrees = [] for awlSrc in project.getAwlSources(): p = AwlParser() p.parseSource(awlSrc) parseTrees.append(p.getParseTree()) symTables = [] for symTabSrc in project.getSymTabSources(): mnemonics = project.cpuSpecs.getConfiguredMnemonics() if opt_mnemonics is not None: mnemonics = opt_mnemonics tab = SymTabParser.parseSource(symTabSrc, autodetectFormat = True, mnemonics = mnemonics) symTables.append(tab) writeStdout("Initializing core...\n") s = AwlSim(profileLevel = opt_profile) s.reset() # Load hardware modules for name, parameters in opt_hwmods: writeStdout("Loading hardware module '%s'...\n" % name) hwClass = s.loadHardwareModule(name) s.registerHardwareClass(hwClass = hwClass, parameters = parameters) cpu = s.getCPU() assignCpuSpecs(cpu.getSpecs(), project.cpuSpecs) cpu.enableObTempPresets(project.getObTempPresetsEn() or opt_obtemp) cpu.enableExtendedInsns(project.getExtInsnsEn() or opt_extInsns) if not opt_noCpuDump and not opt_quiet: cpu.setBlockExitCallback(cpuDumpCallback, cpu) cpu.setCycleTimeLimit(opt_cycletime) cpu.setRunTimeLimit(opt_maxRuntime) writeStdout("Initializing CPU...\n") for symTable in symTables: s.loadSymbolTable(symTable) for libSel in project.getLibSelections(): s.loadLibraryBlock(libSel) for parseTree in parseTrees: s.load(parseTree) s.startup() writeStdout("[Initialization finished - CPU is executing user code]\n") try: if not opt_noCpuDump: clearConsole() while 1: s.runCycle() finally: if not opt_noCpuDump and not opt_quiet: clearConsole() writeStdout(lastDump + '\n') except (AwlParserError, AwlSimError) as e: printError(e.getReport()) return 1 except KeyboardInterrupt as e: pass except MaintenanceRequest as e: if e.requestType in (MaintenanceRequest.TYPE_SHUTDOWN, MaintenanceRequest.TYPE_STOP, MaintenanceRequest.TYPE_RTTIMEOUT): writeStdout("Shutting down, as requested (%s)...\n" % str(e)) else: writeStdout("Received unknown maintenance request " "(%d: %s)...\n" % (e.requestType, str(e))) finally: if s: ps = s.getProfileStats() if ps: writeStdout("\n\nProfile stats (level %d) follow:\n" %\ opt_profile) writeStdout(ps) writeStdout("\n") s.shutdown() return 0 def runWithServerBackend(inputFile): client = None try: import awlsim.cython_helper if awlsim.cython_helper.shouldUseCython(): printError("The accelerated CYTHON core currently is incompatible " "with the backend server. Please remove the " "AWLSIMCYTHON environment variable.") return 1 project = readInputFile(inputFile) # Connect to the server client = TextInterfaceAwlSimClient() if opt_spawnBackend: host, port = AwlSimServer.DEFAULT_HOST, AwlSimServer.DEFAULT_PORT if opt_connect: host, port = opt_connect if isWinStandalone: client.spawnServer(serverExecutable = "awlsim-server-module.exe", listenHost = host, listenPort = port) else: client.spawnServer(interpreter = opt_interpreter, listenHost = host, listenPort = port) writeStdout("Connecting to core server...\n") if opt_connect: client.connectToServer(host = opt_connect[0], port = opt_connect[1]) else: client.connectToServer() writeStdout("Initializing core...\n") client.setLoglevel(makeLoglevel()) client.setRunState(False) client.reset() # Load hardware modules for name, parameters in opt_hwmods: client.loadHardwareModule(name, parameters) # Configure the core if opt_noCpuDump: client.setPeriodicDumpInterval(0) else: client.setPeriodicDumpInterval(300) client.enableOBTempPresets(project.getObTempPresetsEn() or opt_obtemp) client.enableExtendedInsns(project.getExtInsnsEn() or opt_extInsns) client.setCycleTimeLimit(opt_cycletime) client.setRunTimeLimit(opt_maxRuntime) specs = client.getCpuSpecs() assignCpuSpecs(specs, project.cpuSpecs) client.setCpuSpecs(specs) # Fire up the core writeStdout("Initializing CPU...\n") for symTabSrc in project.getSymTabSources(): client.loadSymbolTable(symTabSrc) for libSel in project.getLibSelections(): client.loadLibraryBlock(libSel) for awlSrc in project.getAwlSources(): client.loadCode(awlSrc) client.setRunState(True) # Run the client-side event loop writeStdout("[Initialization finished - Remote-CPU is executing user code]\n") try: if not opt_noCpuDump: clearConsole() while True: client.processMessages(None) finally: if not opt_noCpuDump and not opt_quiet: clearConsole() writeStdout(lastDump + '\n') except AwlSimError as e: printError(e.getReport()) return 1 except MaintenanceRequest as e: if e.requestType in (MaintenanceRequest.TYPE_SHUTDOWN, MaintenanceRequest.TYPE_STOP, MaintenanceRequest.TYPE_RTTIMEOUT): writeStdout("Shutting down, as requested (%s)...\n" % str(e)) else: writeStdout("Received unknown maintenance request " "(%d: %s)...\n" % (e.requestType, str(e))) except KeyboardInterrupt as e: pass finally: if client: client.shutdown() return 0 def __signalHandler(sig, frame): printInfo("Received signal %d" % sig) if sig == signal.SIGTERM: # Raise SIGINT. It will shut down everything. os.kill(os.getpid(), signal.SIGINT) def main(): global opt_cycletime global opt_maxRuntime global opt_quiet global opt_noCpuDump global opt_nrAccus global opt_extInsns global opt_obtemp global opt_clockMem global opt_mnemonics global opt_hwmods global opt_hwinfos global opt_profile global opt_connect global opt_spawnBackend global opt_interpreter opt_cycletime = 5.0 opt_maxRuntime = -1.0 opt_quiet = False opt_noCpuDump = False opt_nrAccus = None opt_extInsns = False opt_obtemp = False opt_clockMem = None opt_mnemonics = None opt_hwmods = [] opt_hwinfos = [] opt_profile = 0 opt_connect = None opt_spawnBackend = False opt_interpreter = None try: (opts, args) = getopt.getopt(sys.argv[1:], "hC:M:24qDxt:T:m:H:I:P:c:bi:", [ "help", "cycle-time=", "max-runtime=", "twoaccu", "fouraccu", "quiet", "no-cpu-dump", "extended-insns", "obtemp=", "clock-mem=", "mnemonics=", "hardware=", "hardware-info=", "profile=", "connect=", "spawn-backend", "interpreter=", "list-sfc", "list-sfc-verbose", "list-sfb", "list-sfb-verbose", ]) except getopt.GetoptError as e: printError(str(e)) usage() return 1 for (o, v) in opts: if o in ("-h", "--help"): usage() return 0 if o in ("-C", "--cycle-time"): try: opt_cycletime = float(v) except ValueError: printError("-C|--cycle-time: Invalid time format") sys.exit(1) if o in ("-M", "--max-runtime"): try: opt_maxRuntime = float(v) except ValueError: printError("-M|--max-runtime: Invalid time format") sys.exit(1) if o in ("-2", "--twoaccu"): opt_nrAccus = 2 if o in ("-4", "--fouraccu"): opt_nrAccus = 4 if o in ("-q", "--quiet"): opt_quiet = True if o in ("-D", "--no-cpu-dump"): opt_noCpuDump = True if o in ("-x", "--extended-insns"): opt_extInsns = True if o in ("-t", "--obtemp"): opt_obtemp = str2bool(v) if o in ("-T", "--clock-mem"): try: opt_clockMem = int(v) if opt_clockMem < -1 or opt_clockMem > 0xFFFF: raise ValueError except ValueError: printError("-T|--clock-mem: Invalid byte address") if o in ("-m", "--mnemonics"): opt_mnemonics = v.lower() if opt_mnemonics not in ("en", "de", "auto"): printError("-m|--mnemonics: Invalid mnemonics type") sys.exit(1) if o in ("-H", "--hardware"): try: v = v.split(':') if not v: raise ValueError name = v[0] params = {} for pstr in v[1:]: if not pstr: continue i = pstr.find('=') if i < 0: raise ValueError pname = pstr[:i] pval = pstr[i+1:] if not pname or not pval: raise ValueError params[pname] = pval opt_hwmods.append( (name, params) ) except (ValueError, IndexError) as e: printError("-H|--hardware: Invalid module name or parameters") sys.exit(1) if o in ("-I", "--hardware-info"): opt_hwinfos.append(v.split(':')[0]) if o in ("-P", "--profile"): try: opt_profile = int(v) except ValueError: printError("-P|--profile: Invalid profile level") if o in ("-c", "--connect"): try: idx = v.rfind(":") if idx <= 0: raise ValueError opt_connect = (v[:idx], int(v[idx+1:])) except ValueError: printError("-c|--connect: Invalid host/port") sys.exit(1) if o in ("-b", "--spawn-backend"): opt_spawnBackend = True if o in ("-i", "--interpreter"): if isWinStandalone: printError("-i|--interpreter not supported on win-standalone") sys.exit(1) opt_interpreter = v if o in ("--list-sfc", "--list-sfc-verbose"): printInfo("The supported system functions (SFCs) are:") printSysblockInfo(SFC_table, "SFC", opt_extInsns, o.endswith("verbose")) return 0 if o in ("--list-sfb", "--list-sfb-verbose"): printInfo("The supported system function blocks (SFBs) are:") printSysblockInfo(SFB_table, "SFB", opt_extInsns, o.endswith("verbose")) return 0 if len(args) != 1 and not opt_hwinfos: usage() return 1 if args: inputFile = args[0] Logging.setLoglevel(makeLoglevel()) opt_mnemonics = { None : None, "en" : S7CPUSpecs.MNEMONICS_EN, "de" : S7CPUSpecs.MNEMONICS_DE, "auto" : S7CPUSpecs.MNEMONICS_AUTO, }[opt_mnemonics] try: if opt_hwinfos: # Just print the hardware-infos and exit. for name in opt_hwinfos: cls = AwlSim.loadHardwareModule(name) printInfo(cls.getModuleInfo()) return 0 except (AwlParserError, AwlSimError) as e: printError(e.getReport()) return 1 signal.signal(signal.SIGTERM, __signalHandler) if opt_interpreter and not opt_spawnBackend: printError("Selected an --interpreter, but no " "--spawn-backend was requested.") return 1 if opt_spawnBackend or opt_connect: return runWithServerBackend(inputFile) return run(inputFile) if __name__ == "__main__": sys.exit(main())