1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
#!/usr/bin/env python3
#
# Read values from DHT11/22 to I2C converter
#
# Copyright (c) 2018 Michael Buesch <m@bues.ch>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import sys
import time
import getopt
import smbus
class DHT_via_I2C(object):
"""Read a DHT11/22 sensor via I2C.
"""
# Device types.
DHT11 = 0
DHT22 = 1
DEFAULT_BUS = 1
DEFAULT_ADDR = 0x76
def __init__(self,
devType=DHT11,
i2cBus=DEFAULT_BUS,
i2cAddr=DEFAULT_ADDR):
self.__devType = devType
self.__seqCount = -1
self.__temp = 0.0
self.__hum = 0.0
self.__i2cAddr = i2cAddr
self.__i2c = smbus.SMBus(i2cBus)
def __crc(self, data):
crc = 0
for d in data:
crc ^= d
for i in range(8):
if crc & 0x80:
crc = (crc << 1) & 0xFF
crc ^= 0x07
else:
crc = (crc << 1) & 0xFF
return crc & 0xFF
def update(self):
"""Try to get new temperature and humidity values.
Returns True, if new values have been fetched.
"""
# Read the data from I2C.
try:
data = self.__i2c.read_i2c_block_data(
self.__i2cAddr,
self.__devType, 7)
except OSError as e:
return False
# Extract and check the I2C data.
seqCount = data[0]
dhtData = data[1:-1]
dataCRC = data[-1]
calcCRC = self.__crc(data[0:-1])
if dataCRC != calcCRC:
return False
if seqCount == self.__seqCount:
return False
# Extract and check the DHT11/22 data.
hum = (dhtData[0] << 8) | (dhtData[1])
temp = (dhtData[2] << 8) | (dhtData[3])
calcChecksum = sum(dhtData[0:-1]) & 0xFF
dataChecksum = dhtData[-1]
if dataChecksum != calcChecksum:
return False
# Convert the temperature and humidity values.
if self.__devType == self.DHT11:
divisor = 256.0
tempFactor = 1.0
else:
divisor = 10.0
if temp & 0x8000:
temp &= 0x7FFF
tempFactor = -1.0
else:
tempFactor = 1.0
self.__hum = float(hum) / divisor
self.__temp = (float(temp) / divisor) * tempFactor
self.__seqCount = seqCount
return True
@property
def tempCelsius(self):
"""Get the measured temperature, in Celcius.
"""
return self.__temp
@property
def tempFahrenheit(self):
"""Get the measured temperature, in Fahrenheit.
"""
return self.tempCelsius * 1.8 + 32
@property
def humidityPercent(self):
"""Get the measured humidity, in percent.
"""
return self.__hum
def usage():
print("Usage: dhtget [OPTIONS]")
print()
print(" -1|--dht11 The connected device is a DHT11 (default).")
print(" -2|--dht22 The connected device is a DHT22.")
print(" -l|--loop COUNT Run in a loop.")
print(" If count is negative, run an infinite loop.")
print(" Default: Run once.")
print(" -c|--csv Print output as CSV.")
print(" -H|--no-header Do not print CSV header.")
print(" -f|--fahrenheit Use Fahrenheit instead of Celsius.")
print(" -b|--i2c-bus BUS Use I2C bus number BUS. Default: 1.")
print(" -a|--i2c-addr ADDR Use I2C address ADDR. Default: 0x76.")
print(" -h|--help Print this help text.")
def main(argv):
opt_devType = DHT_via_I2C.DHT11
opt_loop = 1
opt_csv = False
opt_fahrenheit = False
opt_header = True
opt_i2cBus = DHT_via_I2C.DEFAULT_BUS
opt_i2cAddr = DHT_via_I2C.DEFAULT_ADDR
try:
opts, args = getopt.getopt(argv[1:],
"h12l:cHfb:a:",
[ "help",
"dht11", "dht22",
"loop=",
"csv", "no-header",
"fahrenheit",
"i2c-bus=",
"i2c-addr=", ])
except getopt.GetoptError as e:
print(str(e), file=sys.stderr)
return 1
for o, v in opts:
if o in ("-h", "--help"):
usage()
return 0
if o in ("-1", "--dht11"):
opt_devType = DHT_via_I2C.DHT11
if o in ("-2", "--dht22"):
opt_devType = DHT_via_I2C.DHT22
if o in ("-l", "--loop"):
try:
opt_loop = int(v, 0)
except ValueError as e:
print("Invalid -l|--loop value.", file=sys.stderr)
return 1
if o in ("-c", "--csv"):
opt_csv = True
if o in ("-H", "--no-header"):
opt_header = False
if o in ("-f", "--fahrenheit"):
opt_fahrenheit = True
if o in ("-b", "--i2c-bus"):
try:
opt_i2cBus = int(v, 0)
except ValueError as e:
print("Invalid -b|--i2c-bus value.", file=sys.stderr)
return 1
if o in ("-a", "--i2c-addr"):
try:
opt_i2cAddr = int(v, 0)
except ValueError as e:
print("Invalid -a|--i2c-addr value.", file=sys.stderr)
return 1
try:
dht = DHT_via_I2C(devType=opt_devType,
i2cBus=opt_i2cBus,
i2cAddr=opt_i2cAddr)
except OSError as e:
print("Failed to establish DHT-I2C communication:\n%s" % str(e),
file=sys.stderr)
return 1
tempUnit = "F" if opt_fahrenheit else "*C"
if opt_csv and opt_header:
print("Temperature %s;Humidity %%" % tempUnit)
count = opt_loop
stream = sys.stdout
while count != 0:
if dht.update():
temp = dht.tempFahrenheit if opt_fahrenheit else dht.tempCelsius
hum = dht.humidityPercent
if opt_csv:
stream.write("%.1f;%.1f\n" % (temp, hum))
else:
stream.write("Temperature: %.1f %s\n" % (
temp, tempUnit))
stream.write("Humidity: %.1f %%\n" % hum)
stream.flush()
if count > 0:
count -= 1
time.sleep(0.1)
return 0
try:
sys.exit(main(sys.argv))
except KeyboardInterrupt as e:
sys.exit(0)
|