-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
549 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
""" | ||
Input steps for Adafruit digital I2C devices (e.g. SHT31, HTU21, BMP280, etc). | ||
""" | ||
|
||
# Standard library imports | ||
import importlib | ||
|
||
# Third party imports | ||
import board | ||
import busio | ||
|
||
# Local imports | ||
import brokkr.pipeline.baseinput | ||
|
||
|
||
DEFAULT_ADC_CHANNEL = 0 | ||
DEFAULT_ADC_MODULE = "adafruit_ads1x15.ads1115" | ||
DEFAULT_ADC_CLASS = "ADS1115" | ||
DEFAULT_ANALOG_MODULE = "adafruit_ads1x15.analog_in" | ||
DEFAULT_ANALOG_CLASS = "AnalogIn" | ||
|
||
|
||
class AdafruitADCInput(brokkr.pipeline.baseinput.PropertyInputStep): | ||
def __init__( | ||
self, | ||
sensor_module, | ||
sensor_class, | ||
adc_channel=None, | ||
adc_kwargs=None, | ||
analog_module=DEFAULT_ANALOG_MODULE, | ||
analog_class=DEFAULT_ANALOG_CLASS, | ||
i2c_kwargs=None, | ||
**property_input_kwargs): | ||
super().__init__( | ||
sensor_module=sensor_module, | ||
sensor_class=sensor_class, | ||
cache_sensor_object=False, | ||
**property_input_kwargs) | ||
self._i2c_kwargs = {} if i2c_kwargs is None else i2c_kwargs | ||
self._adc_kwargs = {} if adc_kwargs is None else adc_kwargs | ||
|
||
analog_object = importlib.import_module(analog_module) | ||
self._analog_class = getattr(analog_object, analog_class) | ||
|
||
if (adc_channel is None | ||
and self._adc_kwargs.get("positive_pin", None) is None): | ||
adc_channel = DEFAULT_ADC_CHANNEL | ||
if adc_channel is not None: | ||
self._adc_kwargs["positive_pin"] = adc_channel | ||
|
||
def read_sensor_data(self, sensor_object=None): | ||
with busio.I2C(board.SCL, board.SDA, **self._i2c_kwargs) as i2c: | ||
sensor_object = self.init_sensor_object(i2c) | ||
if sensor_object is None: | ||
return None | ||
channel_object = self._analog_class( | ||
sensor_object, **self._adc_kwargs) | ||
raw_data = super().read_sensor_data( | ||
sensor_object=channel_object) | ||
return raw_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
""" | ||
Input steps for Adafruit digital I2C devices (e.g. SHT31, HTU21, BMP280, etc). | ||
""" | ||
|
||
# Third party imports | ||
import board | ||
import busio | ||
|
||
# Local imports | ||
import brokkr.pipeline.baseinput | ||
|
||
|
||
class BaseAdafruitI2CInput(brokkr.pipeline.baseinput.PropertyInputStep): | ||
def __init__( | ||
self, | ||
i2c_kwargs=None, | ||
**property_input_kwargs): | ||
super().__init__(**property_input_kwargs) | ||
self._i2c_kwargs = {} if i2c_kwargs is None else i2c_kwargs | ||
|
||
|
||
class AdafruitI2CInput(BaseAdafruitI2CInput): | ||
def read_sensor_data(self, sensor_object=None): | ||
with busio.I2C(board.SCL, board.SDA, **self._i2c_kwargs) as i2c: | ||
sensor_object = self.init_sensor_object(i2c) | ||
if sensor_object is None: | ||
return None | ||
sensor_data = super().read_sensor_data(sensor_object=sensor_object) | ||
return sensor_data | ||
|
||
|
||
class AdafruitPersistantI2CInput(BaseAdafruitI2CInput): | ||
def __init__( | ||
self, | ||
**adafruit_i2c_input_kwargs): | ||
super().__init__(cache_sensor_object=True, **adafruit_i2c_input_kwargs) | ||
|
||
def init_sensor_object(self, *sensor_args, **sensor_kwargs): | ||
i2c = busio.I2C(board.SCL, board.SDA, **self._i2c_kwargs) | ||
sensor_object = super().init_sensor_object( | ||
i2c, *sensor_args, **sensor_kwargs) | ||
return sensor_object |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
""" | ||
Input steps for Adafruit Onewire devices (e.g. DHT11, DHT22). | ||
""" | ||
|
||
# Local imports | ||
import brokkr.pipeline.baseinput | ||
|
||
|
||
class AdafruitOnewireInput(brokkr.pipeline.baseinput.PropertyInputStep): | ||
def __init__(self, pin, sensor_kwargs=None, **property_input_kwargs): | ||
if sensor_kwargs is None: | ||
sensor_kwargs = {} | ||
sensor_kwargs["pin"] = pin | ||
|
||
super().__init__( | ||
sensor_kwargs=sensor_kwargs, | ||
cache_sensor_object=True, | ||
**property_input_kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
""" | ||
Input steps for digital GPIO devices. | ||
""" | ||
|
||
# Standard library imports | ||
import collections | ||
import time | ||
|
||
# Third party imports | ||
import gpiozero | ||
|
||
# Local imports | ||
import brokkr.pipeline.baseinput | ||
import brokkr.utils.misc | ||
|
||
|
||
class GPIOCounterDevice(brokkr.utils.misc.AutoReprMixin): | ||
def __init__(self, pin, max_counts=None, gpio_kwargs=None): | ||
self._gpio_kwargs = {} if gpio_kwargs is None else gpio_kwargs | ||
self._gpio_kwargs["pin"] = pin | ||
|
||
self._count_times = collections.deque(maxlen=max_counts) | ||
self._gpio_device = gpiozero.DigitalInputDevice(**gpio_kwargs) | ||
self._gpio_device.when_activated = self._count | ||
|
||
self.start_time = time.monotonic() | ||
|
||
def _count(self): | ||
"""Count one transition. Used as a callback.""" | ||
self._count_times.append(time.monotonic()) | ||
|
||
@property | ||
def time_elapsed_s(self): | ||
"""The time elapsed, in s, since the start time was last reset.""" | ||
return time.monotonic() - self.start_time | ||
|
||
def get_count(self, period_s=None, mean=False): | ||
if not period_s: | ||
count = len(self._count_times) | ||
else: | ||
# Tabulate the number of counts over a given period | ||
count = -1 | ||
for count, count_time in enumerate(reversed(self._count_times)): | ||
if count_time < (time.monotonic() - period_s): | ||
break | ||
else: | ||
count += 1 | ||
|
||
if mean: | ||
count = count / max([min([self.time_elapsed_s, period_s]), | ||
time.get_clock_info("time").resolution]) | ||
|
||
return count | ||
|
||
def reset(self): | ||
""" | ||
Reset the count to zero and start time to the current time. | ||
Returns | ||
------- | ||
None. | ||
""" | ||
self._count_times.clear() | ||
self.start_time = time.monotonic() | ||
|
||
|
||
class GPIOCounterInput(brokkr.pipeline.baseinput.ValueInputStep): | ||
def __init__( | ||
self, | ||
pin, | ||
max_counts=None, | ||
gpio_kwargs=None, | ||
reset_after_read=False, | ||
**value_input_kwargs): | ||
super().__init__(**value_input_kwargs) | ||
self._counter_device = GPIOCounterDevice( | ||
pin=pin, max_counts=max_counts, gpio_kwargs=gpio_kwargs) | ||
self._reset_after_read = reset_after_read | ||
|
||
def read_raw_data(self, input_data=None): | ||
raw_data = [ | ||
self._counter_device.get_count( | ||
period_s=getattr(data_type, "period_s", None), | ||
mean=getattr(data_type, "mean", False), | ||
) for data_type in self.data_types] | ||
if self._reset_after_read: | ||
self._counter_device.reset() | ||
return raw_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
""" | ||
Generalized input class for an I2C/SMBus device using the SMBus library. | ||
""" | ||
|
||
# Standard library imports | ||
import logging | ||
from pathlib import Path | ||
|
||
# Third party imports | ||
import smbus2 | ||
|
||
# Local imports | ||
import brokkr.pipeline.baseinput | ||
import brokkr.utils.misc | ||
|
||
|
||
MAX_I2C_BUS_N = 6 | ||
|
||
I2C_BLOCK_READ_FUNCTION = "read_i2c_block_data" | ||
DEFAULT_READ_FUNCTION = I2C_BLOCK_READ_FUNCTION | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class SMBusI2CDevice(brokkr.utils.misc.AutoReprMixin): | ||
def __init__(self, bus=None, force=None): | ||
self.force = force | ||
|
||
# Automatically try to find first I2C bus and use that | ||
if bus is None: | ||
for n_bus in range(0, MAX_I2C_BUS_N + 1): | ||
if Path(f"/dev/i2c-{n_bus}").exists(): | ||
bus = n_bus | ||
LOGGER.debug("Found I2C device at bus %s", bus) | ||
break | ||
else: | ||
raise RuntimeError("Could not find I2C any bus device") | ||
|
||
self.bus = bus | ||
|
||
def read(self, force=None, | ||
read_function=DEFAULT_READ_FUNCTION, **read_kwargs): | ||
if force is None: | ||
force = self.force | ||
LOGGER.debug("Reading I2C data with function %s at bus %r, kwargs %r", | ||
read_function, self.bus, read_kwargs) | ||
with smbus2.SMBus(self.bus, force=self.force) as i2c_bus: | ||
buffer = getattr(i2c_bus, read_function)( | ||
force=force, **read_kwargs) | ||
LOGGER.debug("Read I2C data %r", buffer) | ||
return buffer | ||
|
||
|
||
class SMBusI2CInput(brokkr.pipeline.baseinput.SensorInputStep): | ||
def __init__( | ||
self, | ||
bus=None, | ||
read_function=DEFAULT_READ_FUNCTION, | ||
init_kwargs=None, | ||
read_kwargs=None, | ||
include_all_data_each=True, | ||
**sensor_input_kwargs): | ||
init_kwargs = {} if init_kwargs is None else init_kwargs | ||
self._read_kwargs = {} if read_kwargs is None else read_kwargs | ||
self._read_function = read_function | ||
sensor_kwargs = {"bus": bus, **init_kwargs} | ||
|
||
super().__init__( | ||
sensor_class=SMBusI2CDevice, | ||
sensor_kwargs=sensor_kwargs, | ||
include_all_data_each=include_all_data_each, | ||
**sensor_input_kwargs) | ||
|
||
def read_sensor_data(self, sensor_object=None): | ||
sensor_object = self.get_sensor_object(sensor_object=sensor_object) | ||
if sensor_object is None: | ||
return None | ||
|
||
try: | ||
sensor_data = sensor_object.read( | ||
read_function=self._read_function, **self._read_kwargs) | ||
except Exception as e: | ||
self.logger.error( | ||
"%s reading data from I2C SMBus device with function %s " | ||
"of %s sensor object %s on step %s: %s", | ||
type(e).__name__, self._read_function, | ||
type(self), self.object_class, self.name, e) | ||
self.logger.info("Error details:", exc_info=True) | ||
sensor_data = None | ||
|
||
return sensor_data | ||
|
||
|
||
class SMBusI2CBlockInput(SMBusI2CInput): | ||
def __init__( | ||
self, | ||
i2c_addr, | ||
register=0, | ||
length=1, | ||
force=None, | ||
**smbus_input_kwargs): | ||
read_kwargs = { | ||
"i2c_addr": i2c_addr, | ||
"register": register, | ||
"length": length, | ||
"force": force, | ||
} | ||
|
||
super().__init__(read_function=I2C_BLOCK_READ_FUNCTION, | ||
read_kwargs=read_kwargs, **smbus_input_kwargs) |
Oops, something went wrong.