#!/usr/bin/env python3 # # Read values from DHT11/22 to I2C converter # # Copyright (c) 2018 Michael Buesch # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import sys import time import getopt import smbus class DHT_via_I2C(object): """Read a DHT11/22 sensor via I2C. """ # Device types. DHT11 = 0 DHT22 = 1 DEFAULT_BUS = 1 DEFAULT_ADDR = 0x76 def __init__(self, devType=DHT11, i2cBus=DEFAULT_BUS, i2cAddr=DEFAULT_ADDR): self.__devType = devType self.__seqCount = -1 self.__temp = 0.0 self.__hum = 0.0 self.__i2cAddr = i2cAddr self.__i2c = smbus.SMBus(i2cBus) def __crc(self, data): crc = 0 for d in data: crc ^= d for i in range(8): if crc & 0x80: crc = (crc << 1) & 0xFF crc ^= 0x07 else: crc = (crc << 1) & 0xFF return crc & 0xFF def update(self): """Try to get new temperature and humidity values. Returns True, if new values have been fetched. """ # Read the data from I2C. try: data = self.__i2c.read_i2c_block_data( self.__i2cAddr, self.__devType, 7) except OSError as e: return False # Extract and check the I2C data. seqCount = data[0] dhtData = data[1:-1] dataCRC = data[-1] calcCRC = self.__crc(data[0:-1]) if dataCRC != calcCRC: return False if seqCount == self.__seqCount: return False # Extract and check the DHT11/22 data. hum = (dhtData[0] << 8) | (dhtData[1]) temp = (dhtData[2] << 8) | (dhtData[3]) calcChecksum = sum(dhtData[0:-1]) & 0xFF dataChecksum = dhtData[-1] if dataChecksum != calcChecksum: return False # Convert the temperature and humidity values. if self.__devType == self.DHT11: divisor = 256.0 tempFactor = 1.0 else: divisor = 10.0 if temp & 0x8000: temp &= 0x7FFF tempFactor = -1.0 else: tempFactor = 1.0 self.__hum = float(hum) / divisor self.__temp = (float(temp) / divisor) * tempFactor self.__seqCount = seqCount return True @property def tempCelsius(self): """Get the measured temperature, in Celcius. """ return self.__temp @property def tempFahrenheit(self): """Get the measured temperature, in Fahrenheit. """ return self.tempCelsius * 1.8 + 32 @property def humidityPercent(self): """Get the measured humidity, in percent. """ return self.__hum def usage(): print("Usage: dhtget [OPTIONS]") print() print(" -1|--dht11 The connected device is a DHT11 (default).") print(" -2|--dht22 The connected device is a DHT22.") print(" -l|--loop COUNT Run in a loop.") print(" If count is negative, run an infinite loop.") print(" Default: Run once.") print(" -c|--csv Print output as CSV.") print(" -H|--no-header Do not print CSV header.") print(" -f|--fahrenheit Use Fahrenheit instead of Celsius.") print(" -b|--i2c-bus BUS Use I2C bus number BUS. Default: 1.") print(" -a|--i2c-addr ADDR Use I2C address ADDR. Default: 0x76.") print(" -h|--help Print this help text.") def main(argv): opt_devType = DHT_via_I2C.DHT11 opt_loop = 1 opt_csv = False opt_fahrenheit = False opt_header = True opt_i2cBus = DHT_via_I2C.DEFAULT_BUS opt_i2cAddr = DHT_via_I2C.DEFAULT_ADDR try: opts, args = getopt.getopt(argv[1:], "h12l:cHfb:a:", [ "help", "dht11", "dht22", "loop=", "csv", "no-header", "fahrenheit", "i2c-bus=", "i2c-addr=", ]) except getopt.GetoptError as e: print(str(e), file=sys.stderr) return 1 for o, v in opts: if o in ("-h", "--help"): usage() return 0 if o in ("-1", "--dht11"): opt_devType = DHT_via_I2C.DHT11 if o in ("-2", "--dht22"): opt_devType = DHT_via_I2C.DHT22 if o in ("-l", "--loop"): try: opt_loop = int(v, 0) except ValueError as e: print("Invalid -l|--loop value.", file=sys.stderr) return 1 if o in ("-c", "--csv"): opt_csv = True if o in ("-H", "--no-header"): opt_header = False if o in ("-f", "--fahrenheit"): opt_fahrenheit = True if o in ("-b", "--i2c-bus"): try: opt_i2cBus = int(v, 0) except ValueError as e: print("Invalid -b|--i2c-bus value.", file=sys.stderr) return 1 if o in ("-a", "--i2c-addr"): try: opt_i2cAddr = int(v, 0) except ValueError as e: print("Invalid -a|--i2c-addr value.", file=sys.stderr) return 1 try: dht = DHT_via_I2C(devType=opt_devType, i2cBus=opt_i2cBus, i2cAddr=opt_i2cAddr) except OSError as e: print("Failed to establish DHT-I2C communication:\n%s" % str(e), file=sys.stderr) return 1 tempUnit = "F" if opt_fahrenheit else "*C" if opt_csv and opt_header: print("Temperature %s;Humidity %%" % tempUnit) count = opt_loop stream = sys.stdout while count != 0: if dht.update(): temp = dht.tempFahrenheit if opt_fahrenheit else dht.tempCelsius hum = dht.humidityPercent if opt_csv: stream.write("%.1f;%.1f\n" % (temp, hum)) else: stream.write("Temperature: %.1f %s\n" % ( temp, tempUnit)) stream.write("Humidity: %.1f %%\n" % hum) stream.flush() if count > 0: count -= 1 time.sleep(0.1) return 0 try: sys.exit(main(sys.argv)) except KeyboardInterrupt as e: sys.exit(0)