# # Abstract sensor # Copyright (c) 2020 Michael Büsch # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import bme280 import dht import micropython import time from airhum import relhum2abshum from curve import Curve from micropython import const from movingavg import MovingAvg from util import lim class SensorError(Exception): pass class SensorWrapper(object): TYPE_TH = const(0) TYPE_THP = const(1) def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs self.dev = None def close(self): self.dev = None class SensorWrapperDHT(SensorWrapper): type = SensorWrapper.TYPE_TH sleep = 2.0 def setup(self): if not self.dev: try: dev = dht.DHT22(*self.args, **self.kwargs) dev.measure() self.dev = dev except Exception as e: return False return True def read(self): dev = self.dev try: dev.measure() t = lim(dev.temperature(), -40.0, 80.0) h = lim(dev.humidity() * 1e-2, 0.0, 1.0) except Exception as e: raise SensorError(str(e)) return t, h class SensorWrapperBME(SensorWrapper): type = SensorWrapper.TYPE_THP sleep = 0.0 def setup(self): if not self.dev: try: self.dev = bme280.BME280(*self.args, **self.kwargs) except bme280.BME280Error as e: return False return True def close(self): if self.dev: self.dev.close() super().close() def read(self): dev = self.dev try: return dev.readForced(filter=bme280.FILTER_2, tempOversampling=bme280.OVSMPL_4, humidityOversampling=bme280.OVSMPL_4, pressureOversampling=bme280.OVSMPL_4) except bme280.BME280Error as e: raise SensorError(str(e)) class Sensor(object): nullCurve = Curve() def __init__(self, name, wrapper, avgSize, tCurve=None, hCurve=None, pCurve=None): self.name = name self.wrapper = wrapper count = 2 + 1 if wrapper.type == SensorWrapper.TYPE_TH else 3 + 1 self.__movingAvgs = [ MovingAvg(avgSize) for _ in range(count) ] self.__tCurve = tCurve or self.nullCurve self.__hCurve = hCurve or self.nullCurve if wrapper.type == SensorWrapper.TYPE_THP: self.__pCurve = pCurve or self.nullCurve def close(self): self.wrapper.close() def run(self): wrapper = self.wrapper if not wrapper.setup(): return None if wrapper.sleep: time.sleep(wrapper.sleep) values = wrapper.read() values = tuple(self.__movingAvgs[i](value) for i, value in enumerate(values)) p = None if wrapper.type == SensorWrapper.TYPE_TH: t, h = values else: t, h, p = values t = self.__tCurve.intp(t, lambda x, y: x) h = self.__hCurve.intp(h, lambda x, y: x) if wrapper.type == SensorWrapper.TYPE_THP: p = self.__pCurve.intp(p, lambda x, y: x) a = self.__movingAvgs[-1](relhum2abshum(h, t)) return t, h, a, p # vim: ts=4 sw=4 expandtab