aboutsummaryrefslogtreecommitdiffstats
path: root/pyprofibus/phy_fpga.py
blob: ed55c305883b925e8014982aee7dbeba6315b7f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
#
# PROFIBUS DP - Communication Processor PHY access library
#
# Copyright (c) 2019 Michael Buesch <m@bues.ch>
#
# Licensed under the terms of the GNU General Public License version 2,
# or (at your option) any later version.
#

from __future__ import division, absolute_import, print_function, unicode_literals
from pyprofibus.compat import *

from pyprofibus.phy import *
from pyprofibus.fdl import *
from pyprofibus.dp import *
from pyprofibus.phy_fpga_driver import *

from collections import deque


__all__ = [
	"CpPhyFPGA",
]


class CpPhyFPGA(CpPhy):
	"""FPGA based PROFIBUS CP PHYsical layer
	"""

	PFX = "PHY-fpga: "

	def __init__(self, spiBus, spiCS, spiSpeedHz, *args, **kwargs):
		super(CpPhyFPGA, self).__init__(*args, **kwargs)
		self.__rxDeque = deque()
		self.__driver = None
		self.__spiBus = spiBus
		self.__spiCS = spiCS
		self.__spiSpeedHz = spiSpeedHz

	def close(self):
		"""Close the PHY device.
		"""
		if self.__driver is not None:
			try:
				self.__driver.shutdown()
			except FpgaPhyError as e:
				pass
			self.__rxDeque.clear()
			self.__driver = None
		super(CpPhyFPGA, self).close()

	def __tryRestartDriver(self, exception):
		try:
			self._debugMsg("Driver exception: %s" % str(exception))
			if self.__driver is not None:
				self.__driver.restart()
		except FpgaPhyError as e:
			self._debugMsg("Error recovery restart failed: %s" % (
				str(e)))

	def sendData(self, telegramData, srd):
		"""Send data to the physical line.
		"""
		if self.__driver is None:
			return

		if self.debug:
			self._debugMsg("TX   %s" % bytesToHex(telegramData))

		try:
			self.__driver.telegramSend(telegramData)
		except FpgaPhyError as e:
			self.__tryRestartDriver(e)

	def pollData(self, timeout=0.0):
		"""Poll received data from the physical line.
		timeout => timeout in seconds.
			   0 = no timeout, return immediately.
			   negative = unlimited.
		"""
		if self.__driver is None:
			return None

		telegramData = None
		try:
			if self.__rxDeque:
				telegramData = self.__rxDeque.popleft()
			else:
				timeoutStamp = monotonic_time() + timeout#TODO
				telegramDataList = self.__driver.telegramReceive()
				count = len(telegramDataList)
				if count >= 1:
					telegramData = telegramDataList[0]
					if count >= 2:
						self.__rxDeque.extend(telegramDataList[1:])
		except FpgaPhyError as e:
			self.__tryRestartDriver(e)
			telegramData = None

		if self.debug and telegramData:
			self._debugMsg("RX   %s" % bytesToHex(telegramData))
		return telegramData

	def setConfig(self, baudrate=CpPhy.BAUD_9600, *args, **kwargs):
		super(CpPhyFPGA, self).setConfig(baudrate=baudrate, *args, **kwargs)
		self.close()
		try:
			self.__driver = FpgaPhyDriver(spiDev=self.__spiBus,
						      spiChipSelect=self.__spiCS,
						      spiSpeedHz=self.__spiSpeedHz)
			self.__driver.setBaudRate(baudrate)
		except FpgaPhyError as e:
			raise PhyError(self.PFX + ("Failed to setup driver:\n%s" % str(e)))
bues.ch cgit interface