#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # AWL simulator - Commandline testing interface # # Copyright 2012-2016 Michael Buesch # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # from __future__ import division, absolute_import, print_function, unicode_literals import sys import os import getopt import traceback import signal from awlsim_loader.common import * from awlsim_loader.core import * from awlsim_loader.coreclient import * from awlsim_loader.fupcompiler import * import awlsim_loader.cython_helper as cython_helper class TestAwlSimClient(AwlSimClient): def handle_CPUDUMP(self, dumpText): emitCpuDump(dumpText) class ConsoleSSHTunnel(SSHTunnel): def sshMessage(self, message, isDebug): if opt_loglevel > Logging.LOG_INFO: isDebug = False super(ConsoleSSHTunnel, self).sshMessage(message, isDebug) def usage(): print("awlsim version %s" % VERSION_STRING) print("") print("Usage: awlsim-test [OPTIONS] ") print("") print("Options:") print(" -Y|--cycle-limit SEC Cycle time limit, in seconds (default 5.0)") print(" -M|--max-runtime SEC CPU will be stopped after SEC seconds (default: off)") print(" -2|--twoaccu Force 2-accu mode") print(" -4|--fouraccu Force 4-accu mode") print(" -D|--no-cpu-dump Do not show CPU status while running") print(" -x|--extended-insns Enable extended instructions") print(" -t|--obtemp 1/0 Enable/disable writing of OB-temp variables (Default: off)") print(" -T|--clock-mem ADDR Force clock memory address (Default: off)") print(" -m|--mnemonics auto Force mnemonics type: en, de, auto") print(" -P|--profile 0 Set profiling level (Default: 0)") print(" -L|--loglevel LVL Set the log level:") print(" 0: Log nothing") print(" 1: Log errors") print(" 2: Log errors and warnings") print(" 3: Log errors, warnings and info messages (default)") print(" 4: Verbose logging") print(" 5: Extremely verbose logging") print("") print("Server backend related options:") print(" -c|--connect Connect to server backend") print(" -C|--connect-to IP:PORT Connect to server backend") print(" -b|--spawn-backend Spawn a new backend server and connect to it") if not isWinStandalone: print(" -i|--interpreter EXE Set the backend interpreter executable") print("") print("Loading hardware modules:") print(" -H|--hardware NAME:PARAM=VAL:PARAM=VAL...") print("Print module information:") print(" -I|--hardware-info NAME") print("") print(" Where NAME is the name of the hardware module.") print(" PARAM=VAL are optional hardware specific parameters.") print("") print("Other options:") print(" --list-sfc Print a list of all supported SFCs") print(" --list-sfc-verbose Verbose SFC list") print(" --list-sfb Print a list of all supported SFBs") print(" --list-sfb-verbose Verbose SFB list") def printSysblockInfo(blockTable, prefix, withExtended, withInterface): for block in sorted(dictValues(blockTable), key = lambda b: b.name[0]): if block.broken: continue number, name, desc = block.name if number < 0 and not withExtended: continue if desc: desc = " (%s)" % desc else: desc = "" print(" %s %d \"%s\"%s" % (prefix, number, name, desc)) if withInterface: for ftype in (BlockInterfaceField.FTYPE_IN, BlockInterfaceField.FTYPE_OUT, BlockInterfaceField.FTYPE_INOUT): try: fields = block.interfaceFields[ftype] except KeyError: continue for field in fields: field.fieldType = ftype print(" %s" % str(field)) def writeStdout(message): if Logging.loglevel >= Logging.LOG_INFO: sys.stdout.write(message) sys.stdout.flush() nextScreenUpdate = 0.0 lastDump = "" def clearConsole(): # Make cursor visible, clear console and # move cursor to homeposition. if osIsPosix: writeStdout("\x1B[?25h\x1B[2J\x1B[H") elif osIsWindows: os.system("cls") def emitCpuDump(dump): # Pad lines dump = '\n'.join(line + (78 - len(line)) * ' ' + '|' for line in dump.splitlines()) global lastDump lastDump = dump if osIsPosix: writeStdout("\x1B[H" + dump) else: clearConsole() writeStdout(dump) def cpuDumpCallback(cpu): global nextScreenUpdate if cpu.now >= nextScreenUpdate: nextScreenUpdate = cpu.now + 0.1 emitCpuDump(str(cpu)) def assignCpuSpecs(cpuSpecs, projectCpuSpecs): cpuSpecs.assignFrom(projectCpuSpecs) if opt_mnemonics is not None: cpuSpecs.setConfiguredMnemonics(opt_mnemonics) if opt_nrAccus is not None: cpuSpecs.setNrAccus(opt_nrAccus) if opt_clockMem is not None: cpuSpecs.setClockMemByte(opt_clockMem) def run(inputFile): s = None try: if cython_helper.shouldUseCython(): writeStdout("*** Using accelerated CYTHON core " "(AWLSIM_CYTHON environment variable is set)\n") project = Project.fromProjectOrRawAwlFile(inputFile) writeStdout("Parsing code...\n") generatedAwlSrcs = [] # Get mnemonics type mnemonics = project.getCpuSpecs().getConfiguredMnemonics() if opt_mnemonics is not None: mnemonics = opt_mnemonics # Parse FUP sources for fupSrc in project.getFupSources(): generatedAwlSrcs.append(FupCompiler().compile( fupSource=fupSrc, mnemonics=mnemonics)) # Parse KOP sources for kopSrc in project.getKopSources(): pass#TODO # Parse AWL sources parseTrees = [] for awlSrc in itertools.chain(project.getAwlSources(), generatedAwlSrcs): p = AwlParser() p.parseSource(awlSrc) parseTrees.append(p.getParseTree()) # Parse symbol tables symTables = [] for symTabSrc in project.getSymTabSources(): tab = SymTabParser.parseSource(symTabSrc, autodetectFormat = True, mnemonics = mnemonics) symTables.append(tab) writeStdout("Initializing core...\n") s = AwlSim(profileLevel = opt_profile) s.reset() # Load hardware modules def loadMod(name, parameters): writeStdout("Loading hardware module '%s'...\n" % name) hwClass = s.loadHardwareModule(name) s.registerHardwareClass(hwClass = hwClass, parameters = parameters) for modDesc in project.getHwmodSettings().getLoadedModules(): loadMod(modDesc.getModuleName(), modDesc.getParameters()) for name, parameters in opt_hwmods: loadMod(name, parameters) # Configure the CPU cpu = s.getCPU() assignCpuSpecs(cpu.getSpecs(), project.getCpuSpecs()) cpu.enableObTempPresets(project.getObTempPresetsEn() or opt_obtemp) cpu.enableExtendedInsns(project.getExtInsnsEn() or opt_extInsns) if not opt_noCpuDump and opt_loglevel >= Logging.LOG_INFO: cpu.setBlockExitCallback(cpuDumpCallback, cpu) cpu.setCycleTimeLimit(opt_cycletime) cpu.setRunTimeLimit(opt_maxRuntime) # Download the program writeStdout("Initializing CPU...\n") for symTable in symTables: s.loadSymbolTable(symTable) for libSel in project.getLibSelections(): s.loadLibraryBlock(libSel) for parseTree in parseTrees: s.load(parseTree) # Run the program s.startup() writeStdout("[Initialization finished - CPU is executing user code]\n") try: if not opt_noCpuDump: clearConsole() while 1: s.runCycle() finally: if not opt_noCpuDump and opt_loglevel >= Logging.LOG_INFO: clearConsole() writeStdout(lastDump + '\n') except (AwlParserError, AwlSimError) as e: printError(e.getReport()) return ExitCodes.EXIT_ERR_SIM except KeyboardInterrupt as e: pass except MaintenanceRequest as e: if e.requestType in (MaintenanceRequest.TYPE_SHUTDOWN, MaintenanceRequest.TYPE_STOP, MaintenanceRequest.TYPE_RTTIMEOUT): writeStdout("Shutting down, as requested (%s)...\n" % str(e)) else: writeStdout("Received unknown maintenance request " "(%d: %s)...\n" % (e.requestType, str(e))) finally: if s: s.unregisterAllHardware() ps = s.getProfileStats() if ps: writeStdout("\n\nProfile stats (level %d) follow:\n" %\ opt_profile) writeStdout(ps) writeStdout("\n") return ExitCodes.EXIT_OK def runWithServerBackend(inputFile): client = None tunnel = None try: if cython_helper.shouldUseCython(): printError("The accelerated CYTHON core currently is incompatible " "with the backend server. Please remove the " "AWLSIM_CYTHON environment variable.") return ExitCodes.EXIT_ERR_INTERP project = Project.fromProjectOrRawAwlFile(inputFile) linkSettings = project.getCoreLinkSettings() if opt_spawnBackend: host = AwlSimServer.DEFAULT_HOST port = AwlSimServer.DEFAULT_PORT else: host = linkSettings.getConnectHost() port = linkSettings.getConnectPort() if opt_connectTo: host, port = opt_connectTo # Establish SSH tunnel, if requested. if linkSettings.getTunnel() == linkSettings.TUNNEL_SSH: writeStdout("Establishing SSH tunnel...\n") localPort = linkSettings.getTunnelLocalPort() if localPort == linkSettings.TUNNEL_LOCPORT_AUTO: localPort = None tunnel = ConsoleSSHTunnel( remoteHost = host, remotePort = port, localPort = localPort, sshUser = linkSettings.getSSHUser(), sshPort = linkSettings.getSSHPort(), sshExecutable = linkSettings.getSSHExecutable(), ) host, port = tunnel.connect() # Connect to the server client = TestAwlSimClient() if opt_spawnBackend: client.spawnServer(interpreter = opt_interpreter, listenHost = host, listenPort = port) writeStdout("Connecting to core server...\n") client.connectToServer(host = host, port = port) writeStdout("Initializing core...\n") client.setLoglevel(opt_loglevel) client.setRunState(False) client.reset() # Load hardware modules client.loadHardwareModules(project.getHwmodSettings().getLoadedModules()) for name, parameters in opt_hwmods: client.loadHardwareModule(HwmodDescriptor(name, parameters)) # Configure the core if opt_noCpuDump: client.setPeriodicDumpInterval(0) else: client.setPeriodicDumpInterval(300) client.enableOBTempPresets(project.getObTempPresetsEn() or opt_obtemp) client.enableExtendedInsns(project.getExtInsnsEn() or opt_extInsns) client.setCycleTimeLimit(opt_cycletime) client.setRunTimeLimit(opt_maxRuntime) specs = client.getCpuSpecs() assignCpuSpecs(specs, project.getCpuSpecs()) client.setCpuSpecs(specs) # Fire up the core writeStdout("Initializing CPU...\n") client.loadProject(project, loadCpuSpecs=False, loadTempPresets=False, loadExtInsns=False, loadHwMods=False) client.setRunState(True) # Run the client-side event loop writeStdout("[Initialization finished - Remote-CPU is executing user code]\n") try: if not opt_noCpuDump: clearConsole() while True: client.processMessages(None) finally: if not opt_noCpuDump and opt_loglevel >= Logging.LOG_INFO: clearConsole() writeStdout(lastDump + '\n') except AwlSimError as e: printError(e.getReport()) return ExitCodes.EXIT_ERR_SIM except MaintenanceRequest as e: if e.requestType in (MaintenanceRequest.TYPE_SHUTDOWN, MaintenanceRequest.TYPE_STOP, MaintenanceRequest.TYPE_RTTIMEOUT): writeStdout("Shutting down, as requested (%s)...\n" % str(e)) else: writeStdout("Received unknown maintenance request " "(%d: %s)...\n" % (e.requestType, str(e))) except KeyboardInterrupt as e: pass finally: if tunnel: tunnel.shutdown() if client: client.shutdown() return ExitCodes.EXIT_OK def __signalHandler(sig, frame): printInfo("Received signal %d" % sig) if sig == signal.SIGTERM: # Raise SIGINT. It will shut down everything. os.kill(os.getpid(), signal.SIGINT) def main(): global opt_cycletime global opt_maxRuntime global opt_noCpuDump global opt_nrAccus global opt_extInsns global opt_obtemp global opt_clockMem global opt_mnemonics global opt_hwmods global opt_hwinfos global opt_profile global opt_loglevel global opt_connect global opt_connectTo global opt_spawnBackend global opt_interpreter opt_cycletime = 5.0 opt_maxRuntime = -1.0 opt_noCpuDump = False opt_nrAccus = None opt_extInsns = False opt_obtemp = False opt_clockMem = None opt_mnemonics = None opt_hwmods = [] opt_hwinfos = [] opt_profile = 0 opt_loglevel = Logging.LOG_INFO opt_connect = None opt_connectTo = False opt_spawnBackend = False opt_interpreter = None try: (opts, args) = getopt.getopt(sys.argv[1:], "hY:M:24qDxt:T:m:H:I:P:L:cC:bi:", [ "help", "cycle-time=", "max-runtime=", "twoaccu", "fouraccu", "quiet", "no-cpu-dump", "extended-insns", "obtemp=", "clock-mem=", "mnemonics=", "hardware=", "hardware-info=", "profile=", "loglevel=", "connect", "connect-to=", "spawn-backend", "interpreter=", "list-sfc", "list-sfc-verbose", "list-sfb", "list-sfb-verbose", ]) except getopt.GetoptError as e: printError(str(e)) usage() return ExitCodes.EXIT_ERR_CMDLINE for (o, v) in opts: if o in ("-h", "--help"): usage() return ExitCodes.EXIT_OK if o in ("-Y", "--cycle-time"): try: opt_cycletime = float(v) except ValueError: printError("-Y|--cycle-time: Invalid time format") sys.exit(1) if o in ("-M", "--max-runtime"): try: opt_maxRuntime = float(v) except ValueError: printError("-M|--max-runtime: Invalid time format") sys.exit(1) if o in ("-2", "--twoaccu"): opt_nrAccus = 2 if o in ("-4", "--fouraccu"): opt_nrAccus = 4 if o in ("-D", "--no-cpu-dump"): opt_noCpuDump = True if o in ("-x", "--extended-insns"): opt_extInsns = True if o in ("-t", "--obtemp"): opt_obtemp = str2bool(v) if o in ("-T", "--clock-mem"): try: opt_clockMem = int(v) if opt_clockMem < -1 or opt_clockMem > 0xFFFF: raise ValueError except ValueError: printError("-T|--clock-mem: Invalid byte address") if o in ("-m", "--mnemonics"): opt_mnemonics = v.lower() if opt_mnemonics not in ("en", "de", "auto"): printError("-m|--mnemonics: Invalid mnemonics type") sys.exit(1) if o in ("-H", "--hardware"): try: v = v.split(':') if not v: raise ValueError name = v[0] params = {} for pstr in v[1:]: if not pstr: continue i = pstr.find('=') if i < 0: raise ValueError pname = pstr[:i] pval = pstr[i+1:] if not pname or not pval: raise ValueError params[pname] = pval opt_hwmods.append( (name, params) ) except (ValueError, IndexError) as e: printError("-H|--hardware: Invalid module name or parameters") sys.exit(1) if o in ("-I", "--hardware-info"): opt_hwinfos.append(v.split(':')[0]) if o in ("-P", "--profile"): try: opt_profile = int(v) except ValueError: printError("-P|--profile: Invalid profile level") if o in ("-L", "--loglevel"): try: opt_loglevel = int(v) except ValueError: printError("-L|--loglevel: Invalid log level") sys.exit(1) if o in ("-c", "--connect"): opt_connect = True if o in ("-C", "--connect-to"): try: idx = v.rfind(":") if idx <= 0: raise ValueError opt_connectTo = (v[:idx], int(v[idx+1:])) except ValueError: printError("-c|--connect: Invalid host/port") sys.exit(1) if o in ("-b", "--spawn-backend"): opt_spawnBackend = True if o in ("-i", "--interpreter"): if isWinStandalone: printError("-i|--interpreter not supported on win-standalone") sys.exit(1) opt_interpreter = v if o in ("--list-sfc", "--list-sfc-verbose"): print("The supported system functions (SFCs) are:") printSysblockInfo(SFC_table, "SFC", opt_extInsns, o.endswith("verbose")) return ExitCodes.EXIT_OK if o in ("--list-sfb", "--list-sfb-verbose"): print("The supported system function blocks (SFBs) are:") printSysblockInfo(SFB_table, "SFB", opt_extInsns, o.endswith("verbose")) return ExitCodes.EXIT_OK if len(args) != 1 and not opt_hwinfos: usage() return ExitCodes.EXIT_ERR_CMDLINE if args: inputFile = args[0] Logging.setLoglevel(opt_loglevel) opt_mnemonics = { None : None, "en" : S7CPUSpecs.MNEMONICS_EN, "de" : S7CPUSpecs.MNEMONICS_DE, "auto" : S7CPUSpecs.MNEMONICS_AUTO, }[opt_mnemonics] try: if opt_hwinfos: # Just print the hardware-infos and exit. for name in opt_hwinfos: cls = AwlSim.loadHardwareModule(name) print(cls.getModuleInfo()) return ExitCodes.EXIT_OK except (AwlParserError, AwlSimError) as e: printError(e.getReport()) return ExitCodes.EXIT_ERR_SIM signal.signal(signal.SIGTERM, __signalHandler) if opt_interpreter and not opt_spawnBackend: printError("Selected an --interpreter, but no " "--spawn-backend was requested.") return ExitCodes.EXIT_ERR_CMDLINE if opt_spawnBackend or opt_connect or opt_connectTo: return runWithServerBackend(inputFile) return run(inputFile) if __name__ == "__main__": sys.exit(main())