aboutsummaryrefslogtreecommitdiffstats
path: root/pyprofibus/phy_dummy.py
blob: c1448dc667650d86371d5bcd01fe6b8646114be2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
#
# PROFIBUS DP - Communication Processor PHY access library
#
# Copyright (c) 2016-2021 Michael Buesch <m@bues.ch>
#
# Licensed under the terms of the GNU General Public License version 2,
# or (at your option) any later version.
#

from __future__ import division, absolute_import, print_function, unicode_literals
from pyprofibus.compat import *

from pyprofibus.phy import *
from pyprofibus.fdl import *
from pyprofibus.dp import *
from pyprofibus.util import *

__all__ = [
	"CpPhyDummySlave",
]

class CpPhyDummySlave(CpPhy):
	"""Dummy slave PROFIBUS CP PHYsical layer
	"""

	__slots__ = (
		"__echoDX",
		"__echoDXSize",
		"__pollQueue",
	)

	def __init__(self, *args, **kwargs):
		super(CpPhyDummySlave, self).__init__(*args, **kwargs)
		self.__echoDX = kwargs.get("echoDX", True)
		self.__echoDXSize = kwargs.get("echoDXSize", None)
		self.__pollQueue = []

	def __msg(self, message):
		if self.debug:
			print("CpPhyDummySlave: %s" % message)

	def close(self):
		"""Close the PHY device.
		"""
		self.__pollQueue = []
		super(CpPhyDummySlave, self).close()

	def sendData(self, telegramData, srd):
		"""Send data to the physical line.
		"""
		telegramData = bytearray(telegramData)
		self.__msg("Sending %s  %s" % ("SRD" if srd else "SDN",
					       bytesToHex(telegramData)))
		self.__mockSend(telegramData, srd = srd)

	def pollData(self, timeout=0.0):
		"""Poll received data from the physical line.
		timeout => timeout in seconds.
			   0.0 = no timeout, return immediately.
			   negative = unlimited.
		"""
		try:
			telegramData = self.__pollQueue.pop(0)
		except IndexError as e:
			return None
		self.__msg("Receiving    %s" % bytesToHex(telegramData))
		return telegramData

	def setConfig(self, baudrate=CpPhy.BAUD_9600, *args, **kwargs):
		self.__msg("Baudrate = %d" % baudrate)
		self.__pollQueue = []
		super(CpPhyDummySlave, self).setConfig(baudrate=baudrate, *args, **kwargs)

	def __mockSend(self, telegramData, srd):
		if not srd:
			return
		try:
			fdl = FdlTelegram.fromRawData(telegramData)

			if (fdl.fc & FdlTelegram.FC_REQFUNC_MASK) == FdlTelegram.FC_FDL_STAT:
				telegram = FdlTelegram_FdlStat_Con(da = fdl.sa,
								   sa = fdl.da)
				self.__pollQueue.append(telegram.getRawData())
				return

			dp = DpTelegram.fromFdlTelegram(fdl, thisIsMaster = False)

			if DpTelegram_SlaveDiag_Req.checkType(dp):
				telegram = DpTelegram_SlaveDiag_Con(da = fdl.sa,
								    sa = fdl.da)
				telegram.b1 |= DpTelegram_SlaveDiag_Con.B1_ONE
				self.__pollQueue.append(telegram.toFdlTelegram().getRawData())
				return
			if DpTelegram_SetPrm_Req.checkType(dp):
				telegram = FdlTelegram_ack()
				self.__pollQueue.append(telegram.getRawData())
				return
			if DpTelegram_ChkCfg_Req.checkType(dp):
				telegram = FdlTelegram_ack()
				self.__pollQueue.append(telegram.getRawData())
				return
			if DpTelegram_DataExchange_Req.checkType(dp):
				if self.__echoDX:
					du = bytearray([ d ^ 0xFF for d in dp.du ])
					if self.__echoDXSize is not None:
						if len(du) > self.__echoDXSize:
							du = du[ : self.__echoDXSize]
						if len(du) < self.__echoDXSize:
							du += bytearray(self.__echoDXSize - len(du))
					telegram = DpTelegram_DataExchange_Con(da = fdl.sa,
									       sa = fdl.da,
									       du = du)
					self.__pollQueue.append(telegram.toFdlTelegram().getRawData())
				else:
					telegram = FdlTelegram_ack()
					self.__pollQueue.append(telegram.getRawData())
				return

			self.__msg("Dropping SRD telegram: %s" % str(fdl))
		except ProfibusError as e:
			text = "SRD mock-send error: %s" % str(e)
			self.__msg(text)
			raise PhyError(text)
bues.ch cgit interface