#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # AWL simulator - Commandline testing interface # # Copyright 2012-2018 Michael Buesch # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # from __future__ import division, absolute_import, print_function, unicode_literals import sys import os import getopt import traceback import signal from awlsim_loader.common import * from awlsim_loader.core import * from awlsim_loader.coreclient import * from awlsim_loader.awlcompiler import * from awlsim_loader.fupcompiler import * import awlsim_loader.cython_helper as cython_helper class TestAwlSimClient(AwlSimClient): def handle_CPUDUMP(self, dumpText): emitCpuDump(dumpText) def handle_MEMORY(self, memAreas): for memArea in memAreas: if memArea.flags & (memArea.FLG_ERR_READ | memArea.FLG_ERR_WRITE): raise AwlSimError("awlsim-test: " "Failed to access memory: %s" % ( str(memArea))) def usage(): print("awlsim version %s" % VERSION_STRING) print("") print("Usage: awlsim-test [OPTIONS] ") print("") print("Options:") print(" -Y|--cycle-limit SEC Cycle time limit, in seconds (default 1.0)") print(" -M|--max-runtime SEC CPU will be stopped after SEC seconds (default: off)") print(" -2|--twoaccu Force 2-accu mode") print(" -4|--fouraccu Force 4-accu mode") print(" -D|--no-cpu-dump Do not show CPU status while running") print(" -x|--extended-insns Enable extended instructions") print(" -t|--obtemp 1/0 Enable/disable writing of OB-temp variables (Default: off)") print(" -T|--clock-mem ADDR Force clock memory address (Default: off)") print(" -m|--mnemonics auto Force mnemonics type: en, de, auto") print(" -O|--optimizers OPT Sets the optimization mode.") print(" OPT may be one of:") print(" default: Keep project settings (default)") print(" all: Enable all optimizers") print(" off: Disable all optimizers") print(" --insn-meas OUTFILE Detailed instruction timing measurements") print(" -L|--loglevel LVL Set the log level:") print(" 0: Log nothing") print(" 1: Log errors") print(" 2: Log errors and warnings") print(" 3: Log errors, warnings and info messages (default)") print(" 4: Verbose logging") print(" 5: Extremely verbose logging") print("") print("Server backend related options:") print(" -c|--connect Connect to server backend") print(" -C|--connect-to IP:PORT Connect to server backend") print(" -b|--spawn-backend Spawn a new backend server and connect to it") if not isWinStandalone: print(" -i|--interpreter EXE Set the backend interpreter executable") print(" -R|--mem-read AREA:OFFS:BITWIDTH Memory read access.") print(" -W|--mem-write AREA:OFFS:BITWIDTH:VAL Memory write access.") print("") print("Loading hardware modules:") print(" -H|--hardware NAME:PARAM=VAL:PARAM=VAL...") print("Print module information:") print(" -I|--hardware-info NAME") print("") print(" Where NAME is the name of the hardware module.") print(" PARAM=VAL are optional hardware specific parameters.") print("") print("Environment variables:") print(" AWLSIM_PROFILE =0 Disable profiling (default)") print(" =1 Enable core cycle profiling") print(" =2 Enable full core profiling (including startup)") print("") print(" AWLSIM_CYTHON =0 Do not attempt to use Cython core (default)") print(" =1 Attempt to use Cython core, but fall back to Python") print(" =2 Enforce Cython core") print("") print(" AWLSIM_AFFINITY =0,2,... Comma separated list of host CPU cores") print(" to run on. Default: all cores.") print(" AWLSIM_COVERAGE =DATAFILE Enable code coverage tracing.") def writeStdout(message): if Logging.loglevel >= Logging.LOG_INFO: sys.stdout.write(message) sys.stdout.flush() nextScreenUpdate = 0.0 lastDump = "" lastDumpNrLines = 0 emptyLine = " " * 79 def clearConsole(): # Make cursor visible, clear console and # move cursor to homeposition. if osIsPosix: writeStdout("\x1B[?25h\x1B[2J\x1B[H") elif osIsWindows: os.system("cls") def emitCpuDump(dump): global lastDump global lastDumpNrLines # Pad lines dumpLines = list(line + (78 - len(line)) * ' ' + '|' for line in dump.splitlines()) dumpNrLines = len(dumpLines) # Clear lines from previous dump. if dumpNrLines < lastDumpNrLines: dumpLines.extend([ emptyLine, ] * (lastDumpNrLines - dumpNrLines)) dump = "\n".join(dumpLines) lastDumpNrLines = dumpNrLines lastDump = dump if osIsPosix: # Clear console, move home and print dump. writeStdout("\x1B[2J\x1B[H" + dump) else: # Clear console, move home and print dump. clearConsole() writeStdout(dump) def cpuBlockExitCallback(cpu): global nextScreenUpdate if cpu.now >= nextScreenUpdate: nextScreenUpdate = cpu.now + 0.3 emitCpuDump(str(cpu)) def assignCpuSpecs(cpuSpecs, projectCpuSpecs): cpuSpecs.assignFrom(projectCpuSpecs) if opt_nrAccus is not None: cpuSpecs.setNrAccus(opt_nrAccus) def assignCpuConf(cpuConf, projectCpuConf): cpuConf.assignFrom(projectCpuConf) if opt_mnemonics is not None: cpuConf.setConfiguredMnemonics(opt_mnemonics) if opt_clockMem is not None: cpuConf.setClockMemByte(opt_clockMem) if opt_cycletime is not None: cpuConf.setCycleTimeLimitUs(int(round(opt_cycletime * 1000000.0))) if opt_maxRuntime is not None: cpuConf.setRunTimeLimitUs(int(round(opt_maxRuntime * 1000000.0))) if opt_obtemp is not None: cpuConf.setOBStartinfoEn(opt_obtemp) if opt_extInsns is not None: cpuConf.setExtInsnsEn(opt_extInsns) def readInputFile(inputFile): if inputFile == "-": if isPy2Compat: dataBytes = sys.stdin.read() else: dataBytes = sys.stdin.buffer.read() project = Project.fromProjectOrRawAwlData(dataBytes) else: project = Project.fromProjectOrRawAwlFile(inputFile) return project def run(inputFile): insnMeas = None s = None try: if cython_helper.shouldUseCython(): printInfo("*** Using accelerated CYTHON core " "(AWLSIM_CYTHON environment variable is set)") if opt_memReads or opt_memWrites: raise AwlSimError("awlsim-test --mem-read and --mem-write " "are not supported in non-server-mode.") project = readInputFile(inputFile) printInfo("Parsing code...") generatedAwlSrcs = [] # Get mnemonics type mnemonics = project.getCpuConf().getConfiguredMnemonics() if opt_mnemonics is not None: mnemonics = opt_mnemonics # Parse FUP sources optSettCont = None if opt_optimizers == "off": optSettCont = AwlOptimizerSettingsContainer(globalEnable=False) elif opt_optimizers == "all": optSettCont = AwlOptimizerSettingsContainer(globalEnable=True, allEnable=True) for fupSrc in project.getFupSources(): if not fupSrc.enabled: continue generatedAwlSrcs.append(FupCompiler().compile( fupSource=fupSrc, symTabSources=project.getSymTabSources(), mnemonics=mnemonics, optimizerSettingsContainer=optSettCont)) # Parse KOP sources for kopSrc in project.getKopSources(): if not kopSrc.enabled: continue pass#TODO # Parse AWL sources parseTrees = [] for awlSrc in itertools.chain(project.getAwlSources(), generatedAwlSrcs): if not awlSrc.enabled: continue p = AwlParser() p.parseSource(awlSrc) parseTrees.append(p.getParseTree()) # Parse symbol tables symTables = [] for symTabSrc in project.getSymTabSources(): if not symTabSrc.enabled: continue tab = SymTabParser.parseSource(symTabSrc, autodetectFormat = True, mnemonics = mnemonics) symTables.append(tab) printInfo("Initializing core...") s = AwlSim() s.reset() # Load hardware modules def loadMod(name, parameters): printInfo("Loading hardware module '%s'..." % name) hwClass = s.loadHardwareModule(name) s.registerHardwareClass(hwClass = hwClass, parameters = parameters) for modDesc in project.getHwmodSettings().getLoadedModules(): loadMod(modDesc.getModuleName(), modDesc.getParameters()) for name, parameters in opt_hwmods: loadMod(name, parameters) # Configure the CPU cpu = s.getCPU() assignCpuSpecs(cpu.getSpecs(), project.getCpuSpecs()) assignCpuConf(cpu.getConf(), project.getCpuConf()) if not opt_noCpuDump and opt_loglevel >= Logging.LOG_INFO: cpu.setBlockExitCallback(cpuBlockExitCallback, cpu) # Download the program printInfo("Initializing CPU...") for symTable in symTables: s.loadSymbolTable(symTable) for libSel in project.getLibSelections(): s.loadLibraryBlock(libSel) for parseTree in parseTrees: s.load(parseTree) s.build() if opt_insnMeas: insnMeas = cpu.setupInsnMeas() # Run the program s.startup() printInfo("[Initialization finished - CPU is executing user code]") try: if not opt_noCpuDump: clearConsole() while 1: s.runCycle() finally: if not opt_noCpuDump and opt_loglevel >= Logging.LOG_INFO: clearConsole() writeStdout(lastDump + '\n') except (AwlParserError, AwlSimError) as e: printError(e.getReport()) return ExitCodes.EXIT_ERR_SIM except KeyboardInterrupt as e: if insnMeas: if insnMeas.haveAnyMeasurements: if opt_insnMeas == "-": writeStdout(insnMeas.dump()) else: with open(opt_insnMeas, "wb") as fd: fd.write(insnMeas.dumpCSV().encode("UTF-8")) else: printError("Instruction timing measurement: Not enough samples.") return ExitCodes.EXIT_ERR_OTHER except MaintenanceRequest as e: if e.requestType in (MaintenanceRequest.TYPE_SHUTDOWN, MaintenanceRequest.TYPE_STOP, MaintenanceRequest.TYPE_RTTIMEOUT): printInfo("Shutting down, as requested (%s)..." % str(e)) else: printError("Received unknown maintenance request " "(%d: %s)..." % (e.requestType, str(e))) finally: if s: s.shutdown() return ExitCodes.EXIT_OK def runWithServerBackend(inputFile): client = None tunnel = None try: project = readInputFile(inputFile) linkSettings = project.getCoreLinkSettings() if opt_spawnBackend: host = AwlSimServer.DEFAULT_HOST port = range(AwlSimServer.DEFAULT_PORT, AwlSimServer.DEFAULT_PORT + 4096) else: host = linkSettings.getConnectHost() port = linkSettings.getConnectPort() if opt_connectTo: host, port = opt_connectTo # Establish SSH tunnel, if requested. if linkSettings.getTunnel() == linkSettings.TUNNEL_SSH and\ not opt_spawnBackend: printInfo("Establishing SSH tunnel...") localPort = linkSettings.getTunnelLocalPort() if localPort == linkSettings.TUNNEL_LOCPORT_AUTO: localPort = None tunnel = SSHTunnel( remoteHost=host, remotePort=port, localPort=localPort, sshUser=linkSettings.getSSHUser(), sshPort=linkSettings.getSSHPort(), sshExecutable=linkSettings.getSSHExecutable(), ) host, port = tunnel.connect() # Connect to the server client = TestAwlSimClient() if opt_spawnBackend: client.spawnServer(interpreter = opt_interpreter, listenHost = host, listenPort = port) port = client.serverProcessPort printInfo("Connecting to core server...") client.connectToServer(host=host, port=port, timeout=20.0) printInfo("Initializing core...") client.setLoglevel(opt_loglevel) client.setRunState(False) client.reset() # Load hardware modules client.loadHardwareModules(project.getHwmodSettings().getLoadedModules()) for name, parameters in opt_hwmods: client.loadHardwareModule(HwmodDescriptor(name, parameters)) # Configure the core if opt_noCpuDump: client.setPeriodicDumpInterval(0) else: client.setPeriodicDumpInterval(300) specs = client.getCpuSpecs() assignCpuSpecs(specs, project.getCpuSpecs()) client.setCpuSpecs(specs) conf = client.getCpuConf() assignCpuConf(conf, project.getCpuConf()) client.setCpuConf(conf) #TODO configure optimizers # Fire up the core printInfo("Initializing CPU...") client.loadProject(project, loadCpuSpecs=False, loadCpuConf=False, loadHwMods=False) client.setRunState(True) # Run the client-side event loop printInfo("[Initialization finished - Remote-CPU is executing user code]") try: if opt_memReads: client.setMemoryReadRequests(memAreas=opt_memReads, repetitionPeriod=0.001, sync=False) if not opt_noCpuDump: clearConsole() while True: client.processMessages(timeout=0.05) if opt_memWrites: client.writeMemory(memAreas=opt_memWrites, sync=True) finally: if not opt_noCpuDump and opt_loglevel >= Logging.LOG_INFO: clearConsole() writeStdout(lastDump + '\n') except AwlSimError as e: printError(e.getReport()) return ExitCodes.EXIT_ERR_SIM except MaintenanceRequest as e: if e.requestType in (MaintenanceRequest.TYPE_SHUTDOWN, MaintenanceRequest.TYPE_STOP, MaintenanceRequest.TYPE_RTTIMEOUT): printInfo("Shutting down, as requested (%s)..." % str(e)) else: printError("Received unknown maintenance request " "(%d: %s)..." % (e.requestType, str(e))) except KeyboardInterrupt as e: pass finally: if client: client.shutdown() if tunnel: tunnel.shutdown() return ExitCodes.EXIT_OK def __signalHandler(sig, frame): printInfo("Received signal %d" % sig) if sig == signal.SIGTERM: # Raise SIGINT. It will shut down everything. os.kill(os.getpid(), signal.SIGINT) def parseMemoryArea(memAreaStr, withData): try: def dataToBytes(value, length): if not (0 <= value <= ((1 << length) - 1)): raise ValueError return WordPacker.toBytes(byteBuffer=bytearray(length // 8), bitWidth=length, value=value) memAreaStr = memAreaStr.split(":") start = index = length = 0 data = b'' memType = { "E" : MemoryArea.TYPE_E, "A" : MemoryArea.TYPE_A, "M" : MemoryArea.TYPE_M, "L" : MemoryArea.TYPE_L, "DB" : MemoryArea.TYPE_DB, "T" : MemoryArea.TYPE_T, "Z" : MemoryArea.TYPE_Z, "STW" : MemoryArea.TYPE_STW, }[memAreaStr[0].upper().strip()] if memType in { MemoryArea.TYPE_E, MemoryArea.TYPE_A, MemoryArea.TYPE_M, MemoryArea.TYPE_L, }: start = int(memAreaStr[1]) length = int(memAreaStr[2]) if (not (0 <= start <= 0xFFFF) or length not in (8, 16, 32)): raise ValueError if withData: data = dataToBytes(int(memAreaStr[3]), length) elif memType == MemoryArea.TYPE_DB: index = int(memAreaStr[1]) start = int(memAreaStr[2]) length = int(memAreaStr[3]) if (not (0 <= start <= 0xFFFF) or not (0 <= index <= 0xFFFF) or length not in (8, 16, 32)): raise ValueError if withData: data = dataToBytes(int(memAreaStr[4]), length) elif memType in { MemoryArea.TYPE_T, MemoryArea.TYPE_Z, }: index = int(memAreaStr[1]) length = 16 if not (0 <= index <= 0xFFFF): raise ValueError if withData: data = dataToBytes(int(memAreaStr[2]), 16) elif memType == MemoryArea.TYPE_STW: length = 16 if withData: data = dataToBytes(int(memAreaStr[1]), 16) else: assert(0) return MemoryArea(memType=memType, flags=0, index=index, start=start, length=length // 8, data=data) except (ValueError, IndexError, KeyError, AwlSimError) as e: pass return None def main(): global opt_cycletime global opt_maxRuntime global opt_noCpuDump global opt_nrAccus global opt_extInsns global opt_obtemp global opt_clockMem global opt_mnemonics global opt_optimizers global opt_insnMeas global opt_hwmods global opt_hwinfos global opt_loglevel global opt_connect global opt_connectTo global opt_spawnBackend global opt_interpreter global opt_memReads global opt_memWrites opt_cycletime = None opt_maxRuntime = None opt_noCpuDump = False opt_nrAccus = None opt_extInsns = None opt_obtemp = None opt_clockMem = None opt_mnemonics = None opt_optimizers = "default" opt_insnMeas = None opt_hwmods = [] opt_hwinfos = [] opt_loglevel = Logging.LOG_INFO opt_connect = None opt_connectTo = False opt_spawnBackend = False opt_interpreter = None opt_memReads = [] opt_memWrites = [] try: (opts, args) = getopt.getopt(sys.argv[1:], "hY:M:24qDxt:T:m:O:H:I:P:L:cC:bi:R:W:", [ "help", "cycle-limit=", "max-runtime=", "twoaccu", "fouraccu", "quiet", "no-cpu-dump", "extended-insns", "obtemp=", "clock-mem=", "mnemonics=", "optimizers=", "hardware=", "hardware-info=", "profile=", "loglevel=", "connect", "connect-to=", "spawn-backend", "interpreter=", "mem-read=", "mem-write=", "insn-meas=", ]) except getopt.GetoptError as e: printError(str(e)) usage() return ExitCodes.EXIT_ERR_CMDLINE for (o, v) in opts: if o in ("-h", "--help"): usage() return ExitCodes.EXIT_OK if o in ("-Y", "--cycle-limit"): try: opt_cycletime = float(v) except ValueError: printError("-Y|--cycle-limit: Invalid time format") sys.exit(1) if o in ("-M", "--max-runtime"): try: opt_maxRuntime = float(v) except ValueError: printError("-M|--max-runtime: Invalid time format") sys.exit(1) if o in ("-2", "--twoaccu"): opt_nrAccus = 2 if o in ("-4", "--fouraccu"): opt_nrAccus = 4 if o in ("-D", "--no-cpu-dump"): opt_noCpuDump = True if o in ("-x", "--extended-insns"): opt_extInsns = True if o in ("-t", "--obtemp"): opt_obtemp = str2bool(v) if o in ("-T", "--clock-mem"): try: opt_clockMem = int(v) if opt_clockMem < -1 or opt_clockMem > 0xFFFF: raise ValueError except ValueError: printError("-T|--clock-mem: Invalid byte address") sys.exit(1) if o in ("-m", "--mnemonics"): opt_mnemonics = v.lower() if opt_mnemonics not in ("en", "de", "auto"): printError("-m|--mnemonics: Invalid mnemonics type") sys.exit(1) if o in ("-O", "--optimizers"): try: modes = v.split(",") for mode in modes: mode = mode.lower() if mode in ("off", "all", "default"): opt_optimizers = mode else: printError("-O|--optimizers: Unknown optimizer: %s" % mode) sys.exit(1) except (ValueError, IndexError) as e: printError("-O|--optimizers: Invalid optimization mode") sys.exit(1) if o == "--insn-meas": opt_insnMeas = v if o in ("-H", "--hardware"): try: v = v.split(':') if not v: raise ValueError name = v[0] params = {} for pstr in v[1:]: if not pstr: continue i = pstr.find('=') if i < 0: raise ValueError pname = pstr[:i] pval = pstr[i+1:] if not pname or not pval: raise ValueError params[pname] = pval opt_hwmods.append( (name, params) ) except (ValueError, IndexError) as e: printError("-H|--hardware: Invalid module name or parameters") sys.exit(1) if o in ("-I", "--hardware-info"): opt_hwinfos.append(v.split(':')[0]) if o in ("-L", "--loglevel"): try: opt_loglevel = int(v) except ValueError: printError("-L|--loglevel: Invalid log level") sys.exit(1) if o in ("-c", "--connect"): opt_connect = True if o in ("-C", "--connect-to"): try: idx = v.rfind(":") if idx <= 0: raise ValueError opt_connectTo = (v[:idx], int(v[idx+1:])) except ValueError: printError("-c|--connect: Invalid host/port") sys.exit(1) if o in ("-b", "--spawn-backend"): opt_spawnBackend = True if o in ("-i", "--interpreter"): if isWinStandalone: printError("-i|--interpreter not supported on win-standalone") sys.exit(1) opt_interpreter = v if o in ("-R", "--mem-read"): memArea = parseMemoryArea(v, withData=False) if not memArea: printError("-R|--mem-read invalid arguments.") sys.exit(1) opt_memReads.append(memArea) if o in ("-W", "--mem-write"): memArea = parseMemoryArea(v, withData=True) if not memArea: printError("-W|--mem-write invalid arguments.") sys.exit(1) opt_memWrites.append(memArea) if len(args) != 1 and not opt_hwinfos: usage() return ExitCodes.EXIT_ERR_CMDLINE if args: inputFile = args[0] Logging.setLoglevel(opt_loglevel) opt_mnemonics = { None : None, "en" : S7CPUConfig.MNEMONICS_EN, "de" : S7CPUConfig.MNEMONICS_DE, "auto" : S7CPUConfig.MNEMONICS_AUTO, }[opt_mnemonics] try: if opt_hwinfos: # Just print the hardware-infos and exit. for name in opt_hwinfos: cls = AwlSim.loadHardwareModule(name) print(cls.getModuleInfo()) return ExitCodes.EXIT_OK except (AwlParserError, AwlSimError) as e: printError(e.getReport()) return ExitCodes.EXIT_ERR_SIM signal.signal(signal.SIGTERM, __signalHandler) if opt_interpreter and not opt_spawnBackend: printError("Selected an --interpreter, but no " "--spawn-backend was requested.") return ExitCodes.EXIT_ERR_CMDLINE if opt_spawnBackend or opt_connect or opt_connectTo: return runWithServerBackend(inputFile) return run(inputFile) if __name__ == "__main__": sys.exit(main())