aboutsummaryrefslogtreecommitdiffstats
path: root/stublibs/serial.py
blob: 775610bc0dbbe05ccb0bcf67002b21892fc30553 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from pyprofibus.compat import *
import time

PARITY_EVEN	= "E"
PARITY_ODD	= "O"
STOPBITS_ONE	= 1
STOPBITS_TWO	= 2

class SerialException(Exception):
	pass

class Serial(object):
	def __init__(self):
		self.__isMicropython = isMicropython
		self.port = "/dev/ttyS0"
		self.__portNum = None
		self.baudrate = 9600
		self.bytesize = 8
		self.parity = PARITY_EVEN
		self.stopbits = STOPBITS_ONE
		self.timeout = 0
		self.xonxoff = False
		self.rtscts = False
		self.dsrdtr = False
		self.__lowlevel = None

	def open(self):
		if self.__isMicropython:
			port = self.port
			for sub in ("/dev/ttyS", "/dev/ttyUSB", "/dev/ttyACM", "COM", "UART", ):
				port = port.replace(sub, "")
			try:
				self.__portNum = int(port.strip())
			except ValueError:
				raise SerialException("Invalid port: %s" % self.port)
			try:
				import machine
				self.__lowlevel = machine.UART(
					self.__portNum,
					self.baudrate,
					self.bytesize,
					0 if self.parity == PARITY_EVEN else 1,
					1 if self.stopbits == STOPBITS_ONE else 2)
				print("Opened machine.UART(%d)" % self.__portNum)
			except Exception as e:
				raise SerialException("UART%d: Failed to open:\n%s" % (
					self.__portNum, str(e)))
			return
		raise NotImplementedError

	def close(self):
		if self.__isMicropython:
			try:
				if self.__lowlevel is not None:
					self.__lowlevel.deinit()
					self.__lowlevel = None
					print("Closed machine.UART(%d)" % self.__portNum)
			except Exception as e:
				raise SerialException("UART%d: Failed to close:\n%s" % (
					self.__portNum, str(e)))
			return
		raise NotImplementedError

	def write(self, data):
		if self.__isMicropython:
			try:
				self.__lowlevel.write(data)
			except Exception as e:
				raise SerialException("UART%d write(%d bytes) failed: %s" % (
					self.__portNum, len(data), str(e)))
			return
		raise NotImplementedError

	def read(self, size=1):
		if self.__isMicropython:
			try:
				data = self.__lowlevel.read(size)
				if data is None:
					return b""
				return data
			except Exception as e:
				raise SerialException("UART%d read(%d bytes) failed: %s" % (
					self.__portNum, size, str(e)))
		raise NotImplementedError

	def flushInput(self):
		if self.__isMicropython:
			while self.__lowlevel.any():
				self.__lowlevel.read()
			return
		raise NotImplementedError

	def flushOutput(self):
		if self.__isMicropython:
			time.sleep(0.01)
			return
		raise NotImplementedError
bues.ch cgit interface