aboutsummaryrefslogtreecommitdiffstats
path: root/pyprofibus/gsd/interp.py
blob: 5378261f24416ec221a86df7db1679016c231b0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# -*- coding: utf-8 -*-
#
# PROFIBUS - GSD file interpreter
#
# Copyright (c) 2016-2021 Michael Buesch <m@bues.ch>
#
# Licensed under the terms of the GNU General Public License version 2,
# or (at your option) any later version.
#

from __future__ import division, absolute_import, print_function, unicode_literals
from pyprofibus.compat import *

from pyprofibus.gsd.parser import GsdParser, GsdError
from pyprofibus.dp import DpCfgDataElement

import difflib

__all__ = [
	"GsdInterp",
]

class GsdInterp(GsdParser):
	"""GSD file/data interpreter.
	"""

	def __init__(self, text, filename=None, debug=False):
		super(GsdInterp, self).__init__(text, filename, debug)
		self.__configMods = []
		self.__addPresetModules(onlyFixed=False)
		if not self.isModular():
			self.__addAllModules()

	def __interpErr(self, errorText):
		raise GsdError("GSD '%s': %s" % (
			self.getFileName() or "<data>",
			errorText))

	def __interpWarn(self, errorText):
		if not self.debugEnabled():
			return
		print("Warning in GSD '%s': %s" % (
			self.getFileName() or "<data>",
			errorText))

	@staticmethod
	def __findInSequence(sequence, findName, getItemName):
		if not sequence:
			return None
		nameLower = findName.lower().strip()

		# Check if there's only one matching exactly.
		matches = [ item for item in sequence
			    if findName == getItemName(item) ]
		if len(matches) == 1:
			return matches[0]

		# Check if there's only one matching exactly (case insensitive).
		matches = [ item for item in sequence
			    if nameLower == getItemName(item).lower().strip() ]
		if len(matches) == 1:
			return matches[0]

		# Check if there's only one matching at the start.
		matches = [ item for item in sequence
			    if getItemName(item).lower().strip().startswith(nameLower) ]
		if len(matches) == 1:
			return matches[0]

		# Fuzzy match.
		matches = difflib.get_close_matches(
				findName,
				( getItemName(item) for item in sequence ),
				n = 1)
		if matches:
			matches = [ item for item in sequence
				    if getItemName(item) == matches[0] ]
			if matches:
				return matches[0]
		return None

	def findModule(self, name):
		"""Find a module by name.
		Returns a _Module instance, if found. None otherwise.
		"""
		return self.__findInSequence(self.getField("Module"),
					     name,
					     lambda module: module.name)

	def __addPresetModules(self, onlyFixed=False):
		if not self.getField("FixPresetModules", False) and\
		   onlyFixed:
			return
		for mod in self.getField("Module", []):
			if mod.getField("Preset", False):
				self.__configMods.append(mod)

	def __addAllModules(self):
		"""Configure all available modules as plugged into the device.
		"""
		for mod in self.getField("Module", []):
			if not mod.getField("Preset", False):
				self.__configMods.append(mod)

	def clearConfiguredModules(self):
		"""Remove all configured modules.
		This also removes all preset modules, except for the fixed preset mods.
		"""
		self.__configMods = []
		self.__addPresetModules(onlyFixed=True)

	def setConfiguredModule(self, moduleName, index=-1):
		"""Set a configured module that is plugged into the device.
		If index>=0 then set the module at the specified index.
		If index<0 then append the module.
		If moduleName is None then the module is removed.
		"""
		if index >= 0 and index < len(self.__configMods) and\
		   self.getField("FixPresetModules", False) and\
		   self.__configMods[index].getField("Preset", False):
			self.__interpErr("Not modifying fixed preset module "
				"at index %d." % index)
		if moduleName is None:
			if index >= 0 and index < len(self.__configMods):
				self.__configMods.pop(index)
			else:
				self.__interpErr("Module index %d out of range." % (
					index))
		else:
			mod = self.findModule(moduleName)
			if not mod:
				self.__interpErr("Module '%s' not found in GSD." % (
					moduleName))
			if index < 0 or index >= len(self.__configMods):
				self.__configMods.append(mod)
			else:
				self.__configMods[index] = mod

	def isModular(self):
		"""Returns True, if this is a modular device.
		"""
		return self.getField("Modular_Station", False)

	def isDPV1(self):
		"""Returns True, if this is a DPV1 slave.
		"""
		return self.getField("DPV1_Slave", False)

	def getCfgDataElements(self):
		"""Get a tuple of config data elements (DpCfgDataElement)
		for this station with the configured modules.
		"""
		elems = []
		for mod in self.__configMods:
			elems.append(DpCfgDataElement(
				mod.configBytes[0],
				mod.configBytes[1:]))
		return elems

	def getUserPrmData(self, dp1PrmMask = None, dp1PrmSet = None):
		"""Get a bytearray of User_Prm_Data
		for this station with the configured modules.
		dp1PrmMask/Set: Optional mask/set override for the DPV1 prm.
		"""
		def merge(baseData, extData, offset):
			if extData is not None:
				baseData[offset : len(extData) + offset] = extData
		def trunc(data, length, fieldname, extend = True):
			if length is not None:
				if extend:
					data.extend(bytearray(max(length - len(data), 0)))
				if len(data) > length:
					self.__interpWarn("User_Prm_Data "
						"truncated by %s" % fieldname)
					data[:] = data[0:length]
		# Get the global data.
		data = bytearray(self.getField("User_Prm_Data", b""))
		trunc(data, self.getField("User_Prm_Data_Len"),
		      "User_Prm_Data_Len")
		for dataConst in self.getField("Ext_User_Prm_Data_Const", []):
			merge(data, dataConst.dataBytes, dataConst.offset)
		# Append the module parameter data.
		for mod in self.__configMods:
			modData = bytearray()
			for dataConst in mod.getField("Ext_User_Prm_Data_Const", []):
				merge(modData, dataConst.dataBytes, dataConst.offset)
			trunc(modData, mod.getField("Ext_Module_Prm_Data_Len"),
			      "Ext_Module_Prm_Data_Len")
			# Add to global data.
			data += modData
		if self.isDPV1():
			assert((dp1PrmMask is None and dp1PrmSet is None) or\
			       (dp1PrmMask is not None and dp1PrmSet is not None))
			if dp1PrmMask is not None:
				assert(len(dp1PrmMask) == 3 and len(dp1PrmSet) == 3)
				if len(data) < 3:
					self.__interpErr("DPv1 User_Prm_Data is "
						"shorter than 3 bytes.")
				# Apply the DPv1 prm override.
				for i in range(3):
					data[i] = (data[i] & ~dp1PrmMask[i]) |\
						  (dp1PrmSet[i] & dp1PrmMask[i])
		elif dp1PrmMask is not None:
			self.__interpWarn("DPv1 User_Prm_Data override ignored")
		trunc(data, self.getField("Max_User_Prm_Data_Len"),
		      "Max_User_Prm_Data_Len", False)
		return bytes(data)

	def getIdentNumber(self):
		"""Get the ident number.
		"""
		ident = self.getField("Ident_Number")
		if ident is None:
			self.__interpErr("No Ident_Number in GSD.")
		return ident

	def getMaxTSDR(self, baudrate):
		"""Get the max-tSDR.
		Might return None.
		"""
		baud2fieldname = {
			9600		: "MaxTsdr_9.6",
			19200		: "MaxTsdr_19.2",
			45450		: "MaxTsdr_45.45",
			93750		: "MaxTsdr_93.75",
			187500		: "MaxTsdr_187.5",
			500000		: "MaxTsdr_500",
			1500000		: "MaxTsdr_1.5M",
			3000000		: "MaxTsdr_3M",
			6000000		: "MaxTsdr_6M",
			12000000	: "MaxTsdr_12M",
		}
		if baudrate not in baud2fieldname:
			self.__interpErr("getMaxTSDR: Invalid baud rate.")
		fieldname = baud2fieldname[baudrate]
		return self.getField(fieldname, None)

	def __str__(self):
		text = []

		if self.getFileName():
			text.append("File:              %s\n" %\
				    self.getFileName())
		vendor = self.getField("Vendor_Name", "")
		model = self.getField("Model_Name", "")
		rev = self.getField("Revision", "")
		ident = self.getField("Ident_Number")
		text.append("Device:            %s; %s; %s; Ident %s\n" % (
			vendor, model, rev,
			("0x%04X" % ident) if ident is not None else "-"))

		order = self.getField("OrderNumber")
		if order:
			text.append("Order number:      %s\n" % order)

		for module in self.getField("Module", []):
			if module.getField("Preset"):
				continue
			text.append("Available module:  \"%s\"\n" % module.name)

		return "".join(text)
bues.ch cgit interface