# -*- coding: utf-8 -*- # # AWL simulator - CPU # # Copyright 2012-2013 Michael Buesch # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import time import datetime import random from awlsim.cpuspecs import * from awlsim.parser import * from awlsim.symbolparser import * from awlsim.datatypes import * from awlsim.instructions.all_insns import * from awlsim.operators import * from awlsim.insntrans import * from awlsim.optrans import * from awlsim.blocks import * from awlsim.datablocks import * from awlsim.statusword import * from awlsim.labels import * from awlsim.timers import * from awlsim.counters import * from awlsim.callstack import * from awlsim.obtemp import * from awlsim.util import * from awlsim.system_sfc import * from awlsim.system_sfb import * class ParenStackElem(object): "Parenthesis stack element" def __init__(self, cpu, insnType, statusWord): self.cpu = cpu self.insnType = insnType self.NER = statusWord.NER self.VKE = statusWord.VKE self.OR = statusWord.OR def __repr__(self): mnemonics = self.cpu.specs.getMnemonics() type2name = { S7CPUSpecs.MNEMONICS_EN : AwlInsn.type2name_english, S7CPUSpecs.MNEMONICS_DE : AwlInsn.type2name_german, }[mnemonics] return '(insn="%s" VKE=%s OR=%d)' %\ (type2name[self.insnType], self.VKE, self.OR) class McrStackElem(object): "MCR stack element" def __init__(self, statusWord): self.VKE = statusWord.VKE def __bool__(self): return bool(self.VKE) __nonzero__ = __bool__ class S7CPU(object): "STEP 7 CPU" def __init__(self, sim): self.sim = sim self.specs = S7CPUSpecs(self) self.setCycleTimeLimit(5.0) self.setCycleExitCallback(None) self.setBlockExitCallback(None) self.setPostInsnCallback(None) self.setPeripheralReadCallback(None) self.setPeripheralWriteCallback(None) self.setScreenUpdateCallback(None) self.reset() self.enableExtendedInsns(False) self.enableObTempPresets(False) def enableObTempPresets(self, en=True): self.__obTempPresetsEnabled = bool(en) def obTempPresetsEnabled(self): return self.__obTempPresetsEnabled def enableExtendedInsns(self, en=True): self.__extendedInsnsEnabled = bool(en) def extendedInsnsEnabled(self): return self.__extendedInsnsEnabled def setCycleTimeLimit(self, newLimit): self.cycleTimeLimit = float(newLimit) def __detectMnemonics(self, parseTree): specs = self.getSpecs() if specs.getConfiguredMnemonics() != S7CPUSpecs.MNEMONICS_AUTO: return codeBlocks = list(parseTree.obs.values()) codeBlocks.extend(parseTree.fbs.values()) codeBlocks.extend(parseTree.fcs.values()) errorCounts = { S7CPUSpecs.MNEMONICS_EN : 0, S7CPUSpecs.MNEMONICS_DE : 0, } detected = None for mnemonics in (S7CPUSpecs.MNEMONICS_EN, S7CPUSpecs.MNEMONICS_DE): for block in codeBlocks: for rawInsn in block.insns: ret = AwlInsnTranslator.name2type(rawInsn.getName(), mnemonics) if ret is None: errorCounts[mnemonics] += 1 try: optrans = AwlOpTranslator(None, mnemonics) optrans.translateFromRawInsn(rawInsn) except AwlSimError: errorCounts[mnemonics] += 1 if errorCounts[mnemonics] == 0: # No error. Use these mnemonics. detected = mnemonics if detected is None: # Select the mnemonics with the lower error count. if errorCounts[S7CPUSpecs.MNEMONICS_EN] <= errorCounts[S7CPUSpecs.MNEMONICS_DE]: detected = S7CPUSpecs.MNEMONICS_EN else: detected = S7CPUSpecs.MNEMONICS_DE if specs.getMnemonics() != S7CPUSpecs.MNEMONICS_AUTO: # Autodetected mnemonics were already set before if specs.getMnemonics() != detected: raise AwlSimError("Cannot mix multiple AWL files with "\ "distinct mnemonics. This error may be caused by "\ "incorrect autodetection. "\ "Force mnemonics to EN or DE to avoid this error.") specs.setDetectedMnemonics(detected) def __translateInsn(self, rawInsn, ip): ex = None try: insn = AwlInsnTranslator.fromRawInsn(self, rawInsn) insn.setIP(ip) except AwlSimError as e: if e.getRawInsn() is None: e.setRawInsn(rawInsn) raise e return insn def __translateInsns(self, rawInsns): insns = [] # Translate raw instructions to simulator instructions for ip, rawInsn in enumerate(rawInsns): insns.append(self.__translateInsn(rawInsn, ip)) # If the last instruction is not BE or BEA, add an implicit BE if not insns or insns[-1].type not in (AwlInsn.TYPE_BE, AwlInsn.TYPE_BEA): insns.append(AwlInsn_BE(cpu = self, rawInsn = None)) return insns def __translateInterfaceField(self, rawVar): dtype = AwlDataType.makeByName(rawVar.typeTokens) if rawVar.valueTokens is None: initialValue = None else: initialValue = dtype.parseMatchingImmediate(rawVar.valueTokens) field = BlockInterface.Field(name = rawVar.name, dataType = dtype, initialValue = initialValue) return field def __translateCodeBlock(self, rawBlock, blockClass): insns = self.__translateInsns(rawBlock.insns) block = blockClass(insns, rawBlock.index) for rawVar in rawBlock.vars_in: block.interface.addField_IN(self.__translateInterfaceField(rawVar)) for rawVar in rawBlock.vars_out: block.interface.addField_OUT(self.__translateInterfaceField(rawVar)) if rawBlock.retTypeTokens: dtype = AwlDataType.makeByName(rawBlock.retTypeTokens) if dtype.type != AwlDataType.TYPE_VOID: field = BlockInterface.Field(name = "RET_VAL", dataType = dtype) block.interface.addField_OUT(field) for rawVar in rawBlock.vars_inout: block.interface.addField_INOUT(self.__translateInterfaceField(rawVar)) for rawVar in rawBlock.vars_static: block.interface.addField_STAT(self.__translateInterfaceField(rawVar)) for rawVar in rawBlock.vars_temp: block.interface.addField_TEMP(self.__translateInterfaceField(rawVar)) block.interface.buildDataStructure() return block def __translateGlobalDB(self, rawDB): db = DB(rawDB.index, None) # Create the data structure fields for f in rawDB.fields: if not f.typeTokens: raise AwlSimError( "DB %d assigns field '%s', " "but does not declare it." %\ (rawDB.index, f.name)) if f.valueTokens is None: raise AwlSimError( "DB %d declares field '%s', " "but does not initialize." %\ (rawDB.index, f.name)) dtype = AwlDataType.makeByName(f.typeTokens) db.struct.addFieldNaturallyAligned(f.name, dtype) # Allocate the data structure fields db.allocate() # Initialize the data structure fields for f in rawDB.fields: dtype = AwlDataType.makeByName(f.typeTokens) value = dtype.parseMatchingImmediate(f.valueTokens) db.structInstance.setFieldDataByName(f.name, value) return db def __translateInstanceDB(self, rawDB): fbStr = "SFB" if rawDB.fb.isSFB else "FB" try: if rawDB.fb.isSFB: fb = self.sfbs[rawDB.fb.fbNumber] else: fb = self.fbs[rawDB.fb.fbNumber] except KeyError: raise AwlSimError("Instance DB %d references %s %d, " "but %s %d does not exist." %\ (rawDB.index, fbStr, rawDB.fb.fbNumber, fbStr, rawDB.fb.fbNumber)) db = DB(rawDB.index, fb) interface = fb.interface # Sanity checks for f in rawDB.fields: if f.typeTokens: raise AwlSimError("DB %d is an " "instance DB, but it also " "declares a data structure." %\ rawDB.index) # Allocate the data structure fields db.allocate() # Initialize the data structure fields for f in rawDB.fields: dtype = interface.getFieldByName(f.name).dataType value = dtype.parseMatchingImmediate(f.valueTokens) db.structInstance.setFieldDataByName(f.name, value) return db def __translateDB(self, rawDB): if rawDB.index < 0: raise AwlSimError("DB number %d is invalid" % rawDB.index) if rawDB.isInstanceDB(): return self.__translateInstanceDB(rawDB) return self.__translateGlobalDB(rawDB) def __allocateFCBounceDB(self, fc, isSFC): if isSFC: # Use negative FC number with an offset as bounce-DB number if fc.index >= 0: dbNumber = -abs(fc.index) - (1 << 32) else: dbNumber = -abs(fc.index) - (1 << 33) else: # Use negative FC number as bounce-DB number dbNumber = -abs(fc.index) db = DB(dbNumber, fc) db.allocate() return db # Translate classic symbols ("abc") def __resolveClassicSym(self, block, insn, oper): if oper.type == AwlOperator.SYMBOLIC: symbol = self.symbolTable.findByName(oper.value) if not symbol: raise AwlSimError("Symbol \"%s\" not found in " "symbol table." % oper.value, insn = insn) oper = symbol.operator return oper # Translate symbolic OB/FB/FC/DB block name def __resolveBlockName(self, blockTypeId, blockName): if isString(blockName): symbol = self.symbolTable.findByName(blockName) if not symbol: raise AwlSimError("Symbolic block name \"%s\" " "not found in symbol table." % blockName) if symbol.type.type != blockTypeId: raise AwlSimError("Symbolic block name \"%s\" " "has an invalid type." % blockName) return symbol.operator.value.byteOffset return blockName # Translate local symbols (#abc) def __resolveNamedLocalSym(self, block, insn, oper): if oper.type == AwlOperator.NAMED_LOCAL: newOper = block.interface.getOperatorForFieldName(oper.value, False) newOper.setInsn(oper.insn) return newOper return oper # Translate local symbol pointers (P##abc) def __resolveNamedLocalPointer(self, block, insn, oper): if oper.type == AwlOperator.NAMED_LOCAL_PTR: newOper = block.interface.getOperatorForFieldName(oper.value, True) newOper.setInsn(oper.insn) return newOper return oper def __resolveSymbols_block(self, block): for insn in block.insns: for i in range(len(insn.ops)): insn.ops[i] = self.__resolveClassicSym(block, insn, insn.ops[i]) insn.ops[i] = self.__resolveNamedLocalSym(block, insn, insn.ops[i]) if insn.ops[i].type == AwlOperator.INDIRECT: insn.ops[i].offsetOper = \ self.__resolveClassicSym(block, insn, insn.ops[i].offsetOper) insn.ops[i].offsetOper = \ self.__resolveNamedLocalSym(block, insn, insn.ops[i].offsetOper) insn.ops[i] = self.__resolveNamedLocalPointer(block, insn, insn.ops[i]) for i in range(len(insn.params)): insn.params[i].rvalueOp = self.__resolveClassicSym(block, insn, insn.params[i].rvalueOp) insn.params[i].rvalueOp = self.__resolveNamedLocalSym(block, insn, insn.params[i].rvalueOp) if insn.params[i].rvalueOp.type == AwlOperator.INDIRECT: insn.params[i].rvalueOp.offsetOper =\ self.__resolveClassicSym(block, insn, insn.params[i].rvalueOp.offsetOper) insn.params[i].rvalueOp.offsetOper =\ self.__resolveNamedLocalSym(block, insn, insn.params[i].rvalueOp.offsetOper) insn.params[i].rvalueOp = self.__resolveNamedLocalPointer(block, insn, insn.params[i].rvalueOp) def __resolveSymbols(self): for ob in self.obs.values(): self.__resolveSymbols_block(ob) for fb in self.fbs.values(): self.__resolveSymbols_block(fb) for fc in self.fcs.values(): self.__resolveSymbols_block(fc) # Run static error checks for code block def __staticSanityChecks_block(self, block): for insn in block.insns: insn.staticSanityChecks() # Run static error checks def __staticSanityChecks(self): try: self.obs[1] except KeyError: raise AwlSimError("No OB1 defined") for ob in self.obs.values(): self.__staticSanityChecks_block(ob) for fb in self.fbs.values(): self.__staticSanityChecks_block(fb) for fc in self.fcs.values(): self.__staticSanityChecks_block(fc) def load(self, parseTree): # Mnemonics autodetection self.__detectMnemonics(parseTree) # Translate OBs for obNumber, rawOB in parseTree.obs.items(): obNumber = self.__resolveBlockName(AwlDataType.TYPE_OB_X, obNumber) if obNumber in self.obs: raise AwlSimError("Multiple definitions of "\ "OB %d" % obNumber) rawOB.index = obNumber ob = self.__translateCodeBlock(rawOB, OB) self.obs[obNumber] = ob # Create the TEMP-preset handler table try: presetHandlerClass = OBTempPresets_table[obNumber] except KeyError: presetHandlerClass = OBTempPresets_dummy self.obTempPresetHandlers[obNumber] = presetHandlerClass(self) # Translate FBs for fbNumber, rawFB in parseTree.fbs.items(): fbNumber = self.__resolveBlockName(AwlDataType.TYPE_FB_X, fbNumber) if fbNumber in self.fbs: raise AwlSimError("Multiple definitions of "\ "FB %d" % fbNumber) rawFB.index = fbNumber fb = self.__translateCodeBlock(rawFB, FB) self.fbs[fbNumber] = fb # Translate FCs for fcNumber, rawFC in parseTree.fcs.items(): fcNumber = self.__resolveBlockName(AwlDataType.TYPE_FC_X, fcNumber) if fcNumber in self.fcs: raise AwlSimError("Multiple definitions of "\ "FC %d" % fcNumber) rawFC.index = fcNumber fc = self.__translateCodeBlock(rawFC, FC) self.fcs[fcNumber] = fc bounceDB = self.__allocateFCBounceDB(fc, False) self.dbs[bounceDB.index] = bounceDB if not self.sfbs: # Create the SFB tables for sfbNumber in SFB_table.keys(): if sfbNumber < 0 and not self.__extendedInsnsEnabled: continue sfb = SFB_table[sfbNumber](self) sfb.interface.buildDataStructure() self.sfbs[sfbNumber] = sfb if not self.sfcs: # Create the SFC tables for sfcNumber in SFC_table.keys(): if sfcNumber < 0 and not self.__extendedInsnsEnabled: continue sfc = SFC_table[sfcNumber](self) sfc.interface.buildDataStructure() self.sfcs[sfcNumber] = sfc bounceDB = self.__allocateFCBounceDB(sfc, True) self.dbs[bounceDB.index] = bounceDB # Translate DBs for dbNumber, rawDB in parseTree.dbs.items(): dbNumber = self.__resolveBlockName(AwlDataType.TYPE_DB_X, dbNumber) if dbNumber in self.dbs: raise AwlSimError("Multiple definitions of "\ "DB %d" % dbNumber) rawDB.index = dbNumber db = self.__translateDB(rawDB) self.dbs[dbNumber] = db def loadSymbolTable(self, symbolTable): self.symbolTable.merge(symbolTable) def reallocate(self, force=False): if force or (self.specs.nrAccus == 4) != self.is4accu: self.accu1, self.accu2 = Accu(), Accu() if self.specs.nrAccus == 2: self.accu3, self.accu4 = None, None elif self.specs.nrAccus == 4: self.accu3, self.accu4 = Accu(), Accu() else: assert(0) if force or self.specs.nrTimers != len(self.timers): self.timers = [ Timer(self, i) for i in range(self.specs.nrTimers) ] if force or self.specs.nrCounters != len(self.counters): self.counters = [ Counter(self, i) for i in range(self.specs.nrCounters) ] if force or self.specs.nrFlags != len(self.flags): self.flags = ByteArray(self.specs.nrFlags) if force or self.specs.nrInputs != len(self.inputs): self.inputs = ByteArray(self.specs.nrInputs) if force or self.specs.nrOutputs != len(self.outputs): self.outputs = ByteArray(self.specs.nrOutputs) CallStackElem.resetCache() def reset(self): self.dbs = { # DBs 0 : DB(0, permissions = 0), # read/write-protected system-DB } self.obs = { # OBs } self.obTempPresetHandlers = { # OB TEMP-preset handlers } self.fcs = { # User FCs } self.fbs = { # User FBs } self.sfcs = { # System SFCs } self.sfbs = { # System SFBs } self.symbolTable = SymbolTable() self.reallocate(force=True) self.ar1 = Adressregister() self.ar2 = Adressregister() self.dbRegister = self.dbs[0] self.diRegister = self.dbs[0] self.callStack = [ ] self.callStackTop = None self.setMcrActive(False) self.mcrStack = [ ] self.statusWord = S7StatusWord() self.relativeJump = 1 # Stats self.__insnCount = 0 self.__insnCountMod = 64 self.__cycleCount = 0 self.insnPerSecond = 0.0 self.avgInsnPerCycle = 0.0 self.cycleStartTime = 0.0 self.minCycleTime = 86400.0 self.maxCycleTime = 0.0 self.avgCycleTime = 0.0 self.startupTime = 0.0 self.__speedMeasureStartTime = 0 self.__speedMeasureStartInsnCount = 0 self.__speedMeasureStartCycleCount = 0 self.updateTimestamp() def setCycleExitCallback(self, cb, data=None): self.cbCycleExit = cb self.cbCycleExitData = data def setBlockExitCallback(self, cb, data=None): self.cbBlockExit = cb self.cbBlockExitData = data def setPostInsnCallback(self, cb, data=None): self.cbPostInsn = cb self.cbPostInsnData = data def setPeripheralReadCallback(self, cb, data=None): self.cbPeripheralRead = cb self.cbPeripheralReadData = data def setPeripheralWriteCallback(self, cb, data=None): self.cbPeripheralWrite = cb self.cbPeripheralWriteData = data def setScreenUpdateCallback(self, cb, data=None): self.cbScreenUpdate = cb self.cbScreenUpdateData = data def requestScreenUpdate(self): if self.cbScreenUpdate: self.cbScreenUpdate(self.cbScreenUpdateData) @property def is4accu(self): return self.accu4 is not None # Get the active parenthesis stack @property def parenStack(self): return self.callStackTop.parenStack def __runOB(self, block): # Update timekeeping self.updateTimestamp() self.cycleStartTime = self.now # Initialize CPU state self.callStack = [ CallStackElem(self, block) ] self.dbRegister = self.diRegister = self.dbs[0] cse = self.callStackTop = self.callStack[-1] if self.__obTempPresetsEnabled: # Populate the TEMP region self.obTempPresetHandlers[block.index].generate(cse.localdata) # Run the user program cycle while self.callStack: while cse.ip < len(cse.insns): insn, self.relativeJump = cse.insns[cse.ip], 1 insn.run() if self.cbPostInsn: self.cbPostInsn(self.cbPostInsnData) cse.ip += self.relativeJump cse, self.__insnCount = self.callStackTop,\ (self.__insnCount + 1) & 0x00FFFFFF if self.__insnCount % self.__insnCountMod == 0: self.updateTimestamp() self.__runTimeCheck() self.__insnCountMod = 64 if self.cbBlockExit: self.cbBlockExit(self.cbBlockExitData) prevCse = self.callStack.pop() if self.callStack: cse = self.callStackTop = self.callStack[-1] prevCse.handleBlockExit() prevCse.destroy() if self.cbCycleExit: self.cbCycleExit(self.cbCycleExitData) # Run startup code def startup(self): # Resolve symbolic instructions and operators self.__resolveSymbols() # Run some static sanity checks on the code self.__staticSanityChecks() self.updateTimestamp() self.__speedMeasureStartTime = self.now self.__speedMeasureStartInsnCount = 0 self.__speedMeasureStartCycleCount = 0 self.startupTime = self.now # Run startup OB for obNumber in (100, 101, 102): ob = self.obs.get(obNumber) if ob is not None: self.__runOB(ob) break # Run one cycle of the user program def runCycle(self): # Run the actual OB1 code self.__runOB(self.obs[1]) # Update timekeeping and statistics self.updateTimestamp() self.__cycleCount = (self.__cycleCount + 1) & 0x00FFFFFF # Evaluate speed measurement elapsedTime = self.now - self.__speedMeasureStartTime if elapsedTime >= 1.0: # Calculate instruction and cycle counts. cycleCount = (self.__cycleCount - self.__speedMeasureStartCycleCount) &\ 0x00FFFFFF insnCount = (self.__insnCount - self.__speedMeasureStartInsnCount) &\ 0x00FFFFFF # Calculate instruction statistics. self.insnPerSecond = insnCount / elapsedTime self.avgInsnPerCycle = insnCount / cycleCount # Get the average cycle time over the measurement period. cycleTime = elapsedTime / cycleCount # Store overall-average cycle time and maximum cycle time. self.maxCycleTime = max(self.maxCycleTime, cycleTime) self.minCycleTime = min(self.minCycleTime, cycleTime) self.avgCycleTime = (self.avgCycleTime + cycleTime) / 2 # Reset the counters self.__speedMeasureStartTime = self.now self.__speedMeasureStartInsnCount = self.__insnCount self.__speedMeasureStartCycleCount = self.__cycleCount def __updateTimestamp_perf(self): self.now = time.perf_counter() def __updateTimestamp_time(self): self.now = time.time() # Construct updateTimestamp() method. # updateTimestamp() updates self.now, which is a # floating point count of seconds. if hasattr(time, "perf_counter"): updateTimestamp = __updateTimestamp_perf else: updateTimestamp = __updateTimestamp_time __dateAndTimeWeekdayMap = { 0 : 2, # monday 1 : 3, # tuesday 2 : 4, # wednesday 3 : 5, # thursday 4 : 6, # friday 5 : 7, # saturday 6 : 1, # sunday } # Make a DATE_AND_TIME for the current wall-time and # store it in byteArray, which is a list of GenericByte objects. # If byteArray is smaller than 8 bytes, an IndexError is raised. def makeCurrentDateAndTime(self, byteArray, offset): dt = datetime.datetime.now() year, month, day, hour, minute, second, msec =\ dt.year, dt.month, dt.day, dt.hour, \ dt.minute, dt.second, dt.microsecond // 1000 byteArray[offset] = (year % 10) | (((year // 10) % 10) << 4) byteArray[offset + 1] = (month % 10) | (((month // 10) % 10) << 4) byteArray[offset + 2] = (day % 10) | (((day // 10) % 10) << 4) byteArray[offset + 3] = (hour % 10) | (((hour // 10) % 10) << 4) byteArray[offset + 4] = (minute % 10) | (((minute // 10) % 10) << 4) byteArray[offset + 5] = (second % 10) | (((second // 10) % 10) << 4) byteArray[offset + 6] = ((msec // 10) % 10) | (((msec // 100) % 10) << 4) byteArray[offset + 7] = ((msec % 10) << 4) |\ self.__dateAndTimeWeekdayMap[dt.weekday()] def __runTimeCheck(self): if self.now - self.cycleStartTime > self.cycleTimeLimit: raise AwlSimError("Cycle time exceed %.3f seconds" %\ self.cycleTimeLimit) def getCurrentIP(self): try: return self.callStackTop.ip except IndexError as e: return None def getCurrentInsn(self): try: cse = self.callStackTop if not cse: return None return cse.insns[cse.ip] except IndexError as e: return None def labelIdxToRelJump(self, labelIndex): cse = self.callStackTop label = cse.labels[labelIndex] referencedInsn = label.getInsn() referencedIp = referencedInsn.getIP() assert(referencedIp < len(cse.insns)) return referencedIp - cse.ip def jumpToLabel(self, labelIndex): self.relativeJump = self.labelIdxToRelJump(labelIndex) def jumpRelative(self, insnOffset): self.relativeJump = insnOffset def __call_FC(self, blockOper, dbOper, parameters): fc = self.fcs[blockOper.value.byteOffset] bounceDB = self.dbs[-abs(fc.index)] # Get bounce-DB return CallStackElem(self, fc, bounceDB, parameters) def __call_FB(self, blockOper, dbOper, parameters): fb = self.fbs[blockOper.value.byteOffset] db = self.dbs[dbOper.value.byteOffset] cse = CallStackElem(self, fb, db, parameters) self.dbRegister, self.diRegister = self.diRegister, db return cse def __call_SFC(self, blockOper, dbOper, parameters): sfc = self.sfcs[blockOper.value.byteOffset] # Get bounce-DB if sfc.index >= 0: dbNumber = -abs(sfc.index) - (1 << 32) else: dbNumber = -abs(sfc.index) - (1 << 33) bounceDB = self.dbs[dbNumber] return CallStackElem(self, sfc, bounceDB, parameters) def __call_SFB(self, blockOper, dbOper, parameters): sfb = self.sfbs[blockOper.value.byteOffset] db = self.dbs[dbOper.value.byteOffset] cse = CallStackElem(self, sfb, db, parameters) self.dbRegister, self.diRegister = self.diRegister, db return cse __callHelpers = { AwlOperator.BLKREF_FC : __call_FC, AwlOperator.BLKREF_FB : __call_FB, AwlOperator.BLKREF_SFC : __call_SFC, AwlOperator.BLKREF_SFB : __call_SFB, } def run_CALL(self, blockOper, dbOper=None, parameters=()): try: callHelper = self.__callHelpers[blockOper.type] except KeyError: raise AwlSimError("Invalid CALL operand") newCse = callHelper(self, blockOper, dbOper, parameters) if newCse: self.callStack.append(newCse) self.callStackTop = newCse def run_BE(self): s = self.statusWord s.OS, s.OR, s.STA, s.NER = 0, 0, 1, 0 # Jump beyond end of block cse = self.callStackTop self.relativeJump = len(cse.insns) - cse.ip def run_AUF(self, dbOper): dbOper = dbOper.resolve() try: db = self.dbs[dbOper.value.byteOffset] except KeyError: raise AwlSimError("Datablock %i does not exist" %\ dbOper.value.byteOffset) if dbOper.type == AwlOperator.BLKREF_DB: self.dbRegister = db elif dbOper.type == AwlOperator.BLKREF_DI: self.diRegister = db else: raise AwlSimError("Invalid DB reference in AUF") def run_TDB(self): # Swap global and instance DB self.diRegister, self.dbRegister = self.dbRegister, self.diRegister def getStatusWord(self): return self.statusWord def getAccu(self, index): if index < 1 or index > self.specs.nrAccus: raise AwlSimError("Invalid ACCU offset") return (self.accu1, self.accu2, self.accu3, self.accu4)[index - 1] def getAR(self, index): if index < 1 or index > 2: raise AwlSimError("Invalid AR offset") return (self.ar1, self.ar2)[index - 1] def getTimer(self, index): try: return self.timers[index] except IndexError as e: raise AwlSimError("Fetched invalid timer %d" % index) def getCounter(self, index): try: return self.counters[index] except IndexError as e: raise AwlSimError("Fetched invalid counter %d" % index) def getSpecs(self): return self.specs def setMcrActive(self, active): self.mcrActive = active def mcrIsOn(self): return (not self.mcrActive or all(self.mcrStack)) def mcrStackAppend(self, statusWord): self.mcrStack.append(McrStackElem(statusWord)) if len(self.mcrStack) > 8: raise AwlSimError("MCR stack overflow") def mcrStackPop(self): try: return self.mcrStack.pop() except IndexError: raise AwlSimError("MCR stack underflow") def parenStackAppend(self, insnType, statusWord): self.parenStack.append(ParenStackElem(self, insnType, statusWord)) if len(self.parenStack) > 7: raise AwlSimError("Parenthesis stack overflow") # Fetch a range in the 'output' memory area. # 'byteOffset' is the byte offset into the output area. # 'byteCount' is the number if bytes to fetch. # Returns a bytearray. def fetchOutputRange(self, byteOffset, byteCount): return self.outputs[byteOffset : byteOffset + byteCount] # Store a range in the 'input' memory area. # 'byteOffset' is the byte offset into the input area. # 'data' is a bytearray. def storeInputRange(self, byteOffset, data): self.inputs[byteOffset : byteOffset + len(data)] = data def fetch(self, operator, enforceWidth=()): if operator.type == AwlOperator.INDIRECT: return self.fetch(operator.resolve(False), enforceWidth) try: fetchMethod = self.fetchTypeMethods[operator.type] except KeyError: raise AwlSimError("Invalid fetch request: %s" %\ AwlOperator.type2str[operator.type]) # Check width of fetch operation if operator.width not in enforceWidth and enforceWidth: width = operator.width # Special handling for T and Z if operator.type in (AwlOperator.MEM_T, AwlOperator.MEM_Z): if operator.insn.type in (AwlInsn.TYPE_L, AwlInsn.TYPE_LC): width = 32 else: width = 1 if width not in enforceWidth: raise AwlSimError("Data fetch of %d bits, " "but only %s bits are allowed." %\ (width, listToHumanStr(enforceWidth))) return fetchMethod(self, operator) def fetchIMM(self, operator): return operator.value def fetchDBLG(self, operator): return self.dbRegister.struct.getSize() def fetchDBNO(self, operator): return self.dbRegister.index def fetchDILG(self, operator): return self.diRegister.struct.getSize() def fetchDINO(self, operator): return self.diRegister.index def fetchSTW(self, operator): if operator.width == 1: return self.statusWord.getByBitNumber(operator.value.bitOffset) elif operator.width == 16: return self.statusWord.getWord() else: assert(0) def fetchSTW_Z(self, operator): return (self.statusWord.A0 ^ 1) & (self.statusWord.A1 ^ 1) def fetchSTW_NZ(self, operator): return self.statusWord.A0 | self.statusWord.A1 def fetchSTW_POS(self, operator): return (self.statusWord.A0 ^ 1) & self.statusWord.A1 def fetchSTW_NEG(self, operator): return self.statusWord.A0 & (self.statusWord.A1 ^ 1) def fetchSTW_POSZ(self, operator): return self.statusWord.A0 ^ 1 def fetchSTW_NEGZ(self, operator): return self.statusWord.A1 ^ 1 def fetchSTW_UO(self, operator): return self.statusWord.A0 & self.statusWord.A1 def fetchE(self, operator): return self.inputs.fetch(operator.value, operator.width) def fetchA(self, operator): return self.outputs.fetch(operator.value, operator.width) def fetchM(self, operator): return self.flags.fetch(operator.value, operator.width) def fetchL(self, operator): return self.callStackTop.localdata.fetch(operator.value, operator.width) def fetchVL(self, operator): try: cse = self.callStack[-2] except IndexError: raise AwlSimError("Fetch of parent localstack, " "but no parent present.") return cse.localdata.fetch(operator.value, operator.width) def fetchDB(self, operator): if operator.value.dbNumber is not None: # This is a fully qualified access (DBx.DBx X) # Open the data block first. self.run_AUF(AwlOperator(AwlOperator.BLKREF_DB, 16, AwlOffset(operator.value.dbNumber), operator.insn)) if not self.dbRegister: raise AwlSimError("Fetch from global DB, " "but no DB is opened") return self.dbRegister.fetch(operator) def fetchDI(self, operator): if not self.diRegister: raise AwlSimError("Fetch from instance DI, " "but no DI is opened") return self.diRegister.fetch(operator) def fetchINTERF_DB(self, operator): cse = self.callStackTop if not cse.interfaceDB: raise AwlSimError("Fetch from block interface, but " "no interface is declared.") return cse.interfaceDB.fetch(operator) def fetchPE(self, operator): value = None if self.cbPeripheralRead: value = self.cbPeripheralRead(self.cbPeripheralReadData, operator.width, operator.value.byteOffset) if value is None: raise AwlSimError("There is no hardware to handle " "the direct peripheral fetch. " "(width=%d, offset=%d)" %\ (operator.width, operator.value.byteOffset)) self.inputs.store(operator.value, operator.width, value) return self.inputs.fetch(operator.value, operator.width) def fetchT(self, operator): timer = self.getTimer(operator.value.byteOffset) if operator.insn.type == AwlInsn.TYPE_L: return timer.getTimevalBin() elif operator.insn.type == AwlInsn.TYPE_LC: return timer.getTimevalS5T() return timer.get() def fetchZ(self, operator): counter = self.getCounter(operator.value.byteOffset) if operator.insn.type == AwlInsn.TYPE_L: return counter.getValueBin() elif operator.insn.type == AwlInsn.TYPE_LC: return counter.getValueBCD() return counter.get() def fetchVirtACCU(self, operator): return self.getAccu(operator.value.byteOffset).get() def fetchVirtAR(self, operator): return self.getAR(operator.value.byteOffset).get() def fetchVirtDBR(self, operator): if operator.value.byteOffset == 1: if self.dbRegister: return self.dbRegister.index elif operator.value.byteOffset == 2: if self.diRegister: return self.diRegister.index else: raise AwlSimError("Invalid __DBR %d. " "Must be 1 for DB-register or " "2 for DI-register." %\ operator.value.byteOffset) return 0 fetchTypeMethods = { AwlOperator.IMM : fetchIMM, AwlOperator.IMM_REAL : fetchIMM, AwlOperator.IMM_S5T : fetchIMM, AwlOperator.IMM_PTR : fetchIMM, AwlOperator.MEM_E : fetchE, AwlOperator.MEM_A : fetchA, AwlOperator.MEM_M : fetchM, AwlOperator.MEM_L : fetchL, AwlOperator.MEM_VL : fetchVL, AwlOperator.MEM_DB : fetchDB, AwlOperator.MEM_DI : fetchDI, AwlOperator.MEM_T : fetchT, AwlOperator.MEM_Z : fetchZ, AwlOperator.MEM_PE : fetchPE, AwlOperator.MEM_DBLG : fetchDBLG, AwlOperator.MEM_DBNO : fetchDBNO, AwlOperator.MEM_DILG : fetchDILG, AwlOperator.MEM_DINO : fetchDINO, AwlOperator.MEM_STW : fetchSTW, AwlOperator.MEM_STW_Z : fetchSTW_Z, AwlOperator.MEM_STW_NZ : fetchSTW_NZ, AwlOperator.MEM_STW_POS : fetchSTW_POS, AwlOperator.MEM_STW_NEG : fetchSTW_NEG, AwlOperator.MEM_STW_POSZ : fetchSTW_POSZ, AwlOperator.MEM_STW_NEGZ : fetchSTW_NEGZ, AwlOperator.MEM_STW_UO : fetchSTW_UO, AwlOperator.INTERF_DB : fetchINTERF_DB, AwlOperator.VIRT_ACCU : fetchVirtACCU, AwlOperator.VIRT_AR : fetchVirtAR, AwlOperator.VIRT_DBR : fetchVirtDBR, } def store(self, operator, value, enforceWidth=()): if operator.type == AwlOperator.INDIRECT: self.store(operator.resolve(True), value, enforceWidth) return try: storeMethod = self.storeTypeMethods[operator.type] except KeyError: raise AwlSimError("Invalid store request") # Check width of store operation if operator.width not in enforceWidth and enforceWidth: raise AwlSimError("Data store of %d bits, " "but only %s bits are allowed." %\ (operator.width, listToHumanStr(enforceWidth))) storeMethod(self, operator, value) def storeE(self, operator, value): self.inputs.store(operator.value, operator.width, value) def storeA(self, operator, value): self.outputs.store(operator.value, operator.width, value) def storeM(self, operator, value): self.flags.store(operator.value, operator.width, value) def storeL(self, operator, value): self.callStackTop.localdata.store(operator.value, operator.width, value) def storeVL(self, operator, value): try: cse = self.callStack[-2] except IndexError: raise AwlSimError("Store to parent localstack, " "but no parent present.") cse.localdata.store(operator.value, operator.width, value) def storeDB(self, operator, value): if operator.value.dbNumber is None: db = self.dbRegister if not db: raise AwlSimError("Store to global DB, " "but no DB is opened") else: try: db = self.dbs[operator.value.dbNumber] except KeyError: raise AwlSimError("Store to DB %d, but DB " "does not exist" % operator.value.dbNumber) db.store(operator, value) def storeDI(self, operator, value): if not self.diRegister: raise AwlSimError("Store to instance DI, " "but no DI is opened") self.diRegister.store(operator, value) def storeINTERF_DB(self, operator, value): cse = self.callStackTop if not cse.interfaceDB: raise AwlSimError("Store to block interface, but " "no interface is declared.") cse.interfaceDB.store(operator, value) def storePA(self, operator, value): self.outputs.store(operator.value, operator.width, value) ok = False if self.cbPeripheralWrite: ok = self.cbPeripheralWrite(self.cbPeripheralWriteData, operator.width, operator.value.byteOffset, value) if not ok: raise AwlSimError("There is no hardware to handle " "the direct peripheral store. " "(width=%d, offset=%d, value=0x%X)" %\ (operator.width, operator.value.byteOffset, value)) def storeSTW(self, operator, value): if operator.width == 1: raise AwlSimError("Cannot store to individual STW bits") elif operator.width == 16: self.statusWord.setWord(value) else: assert(0) storeTypeMethods = { AwlOperator.MEM_E : storeE, AwlOperator.MEM_A : storeA, AwlOperator.MEM_M : storeM, AwlOperator.MEM_L : storeL, AwlOperator.MEM_VL : storeVL, AwlOperator.MEM_DB : storeDB, AwlOperator.MEM_DI : storeDI, AwlOperator.MEM_PA : storePA, AwlOperator.MEM_STW : storeSTW, AwlOperator.INTERF_DB : storeINTERF_DB, } def __dumpMem(self, prefix, memArray, maxLen): ret, line, first, count, i = [], [], True, 0, 0 while i < maxLen: line.append("%02X" % memArray[i]) count += 1 if count >= 16: if not first: prefix = ' ' * len(prefix) first = False ret.append(prefix + ' '.join(line)) line, count = [], 0 i += 1 assert(count == 0) return '\n'.join(ret) def __repr__(self): if not self.callStack: return "" self.updateTimestamp() ret = [] ret.append("=== S7-CPU dump === (t: %.01fs)" %\ (self.now - self.startupTime)) ret.append(" STW: " + str(self.statusWord)) if self.is4accu: accus = [ accu.toHex() for accu in (self.accu1, self.accu2, self.accu3, self.accu4) ] else: accus = [ accu.toHex() for accu in (self.accu1, self.accu2) ] ret.append(" Accu: " + " ".join(accus)) ars = [ "%s (%s)" % (ar.toHex(), ar.toPointerString()) for ar in (self.ar1, self.ar2) ] ret.append(" AR: " + " ".join(ars)) ret.append(self.__dumpMem(" M: ", self.flags, min(64, self.specs.nrFlags))) ret.append(self.__dumpMem(" PAE: ", self.inputs, min(64, self.specs.nrInputs))) ret.append(self.__dumpMem(" PAA: ", self.outputs, min(64, self.specs.nrOutputs))) pstack = str(self.parenStack) if self.parenStack else "Empty" ret.append(" PStack: " + pstack) ret.append(" DB: %s" % str(self.dbRegister)) ret.append(" DI: %s" % str(self.diRegister)) if self.callStack: elems = [ str(cse) for cse in self.callStack ] elems = " => ".join(elems) ret.append(" Calls: depth:%d %s" %\ (len(self.callStack), elems)) cse = self.callStack[-1] ret.append(self.__dumpMem(" L: ", cse.localdata, min(16, self.specs.nrLocalbytes))) else: ret.append(" Calls: None") curInsn = self.getCurrentInsn() ret.append(" Stmt: IP:%s %s" %\ (str(self.getCurrentIP()), str(curInsn) if curInsn else "")) ret.append(" Speed: %d stmt/s %.01f stmt/cycle" %\ (int(round(self.insnPerSecond)), self.avgInsnPerCycle)) ret.append(" CycleT: avg:%.06fs min:%.06fs max:%.06fs" %\ (self.avgCycleTime, self.minCycleTime, self.maxCycleTime)) return '\n'.join(ret)