1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
from pyprofibus.compat import *
import time
PARITY_EVEN = "E"
PARITY_ODD = "O"
STOPBITS_ONE = 1
STOPBITS_TWO = 2
class SerialException(Exception):
pass
class Serial(object):
def __init__(self):
self.__isMicropython = isMicropython
self.port = "/dev/ttyS0"
self.__portNum = None
self.baudrate = 9600
self.bytesize = 8
self.parity = PARITY_EVEN
self.stopbits = STOPBITS_ONE
self.timeout = 0
self.xonxoff = False
self.rtscts = False
self.dsrdtr = False
self.__lowlevel = None
def open(self):
if self.__isMicropython:
port = self.port
for sub in ("/dev/ttyS", "/dev/ttyUSB", "/dev/ttyACM", "COM", "UART", ):
port = port.replace(sub, "")
try:
self.__portNum = int(port.strip())
except ValueError:
raise SerialException("Invalid port: %s" % self.port)
try:
import machine
self.__lowlevel = machine.UART(
self.__portNum,
self.baudrate,
self.bytesize,
0 if self.parity == PARITY_EVEN else 1,
1 if self.stopbits == STOPBITS_ONE else 2)
print("Opened machine.UART(%d)" % self.__portNum)
except Exception as e:
raise SerialException("UART%d: Failed to open:\n%s" % (
self.__portNum, str(e)))
return
raise NotImplementedError
def close(self):
if self.__isMicropython:
try:
if self.__lowlevel is not None:
self.__lowlevel.deinit()
self.__lowlevel = None
print("Closed machine.UART(%d)" % self.__portNum)
except Exception as e:
raise SerialException("UART%d: Failed to close:\n%s" % (
self.__portNum, str(e)))
return
raise NotImplementedError
def write(self, data):
if self.__isMicropython:
try:
self.__lowlevel.write(data)
except Exception as e:
raise SerialException("UART%d write(%d bytes) failed: %s" % (
self.__portNum, len(data), str(e)))
return
raise NotImplementedError
def read(self, size=1):
if self.__isMicropython:
try:
data = self.__lowlevel.read(size)
if data is None:
return b""
return data
except Exception as e:
raise SerialException("UART%d read(%d bytes) failed: %s" % (
self.__portNum, size, str(e)))
raise NotImplementedError
def flushInput(self):
if self.__isMicropython:
while self.__lowlevel.any():
self.__lowlevel.read()
return
raise NotImplementedError
def flushOutput(self):
if self.__isMicropython:
time.sleep(0.01)
return
raise NotImplementedError
|