""" Driver for the Arduino Temperature Sensors. The device is
a...
The :class:`ArduinoBoard` main class manages the interface
to the device and provides connection and communication
methods through USB. The driver implements an auxiliary
:class:`Channel` class to hold information about the
available sensors. A custom exception :class:`StateError`
is used for internal error management.
"""
# Imports
from serial import (
Serial,
EIGHTBITS
)
from typing import List
import configparser
# Third party
from lab_utils.custom_logging import getLogger
[docs]class StateError(BaseException):
""" Mock-up exception to deal with unexpected device status.
It is used to signal for instance that the device should be
connected but it is not at a certain execution point.
"""
pass
[docs]class Channel:
""" Simple container to hold channel information.
"""
sensor_number: int = None #: The channel ID number.
logging: bool = False #: Data from the sensor should be logged.
label: str = '' #: Label of the sensor, to be used when logging to a database.
data: float = None #: Latest temperature readout value.
status: bool = False #: Valid data
[docs] def __init__(self, sensor_number: int = None, label: str = None):
self.sensor_number = sensor_number
self.label = label
[docs]class ArduinoBoard(object): # noqa (ignore CamelCase convention)
""" Driver implementation for the ArduinoBoard.
"""
# Attributes
ETX = chr(3) #: End text (Ctrl-c), chr(3), \\x03
CR = chr(13) #: Carriage return, chr(13), \\r
LF = chr(10) #: Line feed, chr(10), \\n
ENQ = chr(5) #: Enquiry, chr(5), \\x05
ACK = chr(6) #: Acknowledge, chr(6), \\x06
NAK = chr(21) #: Negative acknowledge, chr(21), \\x15
# Serial port configuration
serial: Serial = None #: Serial port handler.
baud_rate: int = 9600 #: Baud rate for serial communication.
serial_port: str = '/dev/ArduinoTemperatureSensors' #: Physical address of the device file.
timeout: float = 1.0 #: Time-out for serial connection error.
warmup_time: float = 0.0 #: Waiting time before logging data to the database.
# Device setup
config_file: str = 'conf/arduinoTemperatureSensors.ini' #: Device configuration file
channel_info: List[Channel] = [] #: Channel information, loaded from the config file.
# Others
connected: bool = False #: Status flag.
number_of_channels: int = 10 #: Maximum number of channels
[docs] def __init__(self,
serial_port: str = None,
baud_rate: int = None,
connect: bool = False,
timeout: float = None,
config_file: str = None,
):
""" Initializes the :class:`ArduinoBoard` object. It calls
the :meth:`config` method to set up the device if a
:paramref:`~ArduinoBoard.__init__.config_file` is given. If
the :paramref:`~ArduinoBoard.__init__.connect` flag is set
to `True`, attempts the connection to the device.
Parameters
----------
serial_port : str, optional
Physical address of the device file, default is 'None'
timeout : float, optional
Serial communication time out, default is 'None'
baud_rate: int, optional
Baud rate for serial communication, default is 'None'
connect: bool, optional
If set, attempt connection to the device, default is `False`
config_file : str, optional
Configuration file, default is 'None'.
Raises
------
:class:`configparser.Error`
Configuration file error
:class:`~serial.SerialException`
The connection to the device has failed
:class:`IOError`
Communication error, probably message misspelt.
:class:`StateError`
Device was in the wrong state.
"""
# Initialize variables
self.connected = False
for ch in range(self.number_of_channels):
self.channel_info.append(Channel(sensor_number=ch+1))
# Load config file, if given
if config_file is not None:
self.config(config_file)
# Assign attributes, if given
# They override they configuration file
if baud_rate is not None:
self.baud_rate = baud_rate
if serial_port is not None:
self.serial_port = serial_port
if timeout is not None:
self.timeout = timeout
# Connect to the device
if connect:
self.connect()
[docs] def config(self, new_config_file: str = None):
""" Loads the Arduino Temperature Sensors configuration
from a file. If :paramref:`~ArduinoBoard.config.new_config_file`
is not given, the latest :attr:`config_file` is re-loaded;
if it is given and the file is successfully parsed,
:attr:`config_file` is updated to the new value.
Parameters
----------
new_config_file : str, optional
New configuration file to be loaded.
Raises
------
:class:`configparser.Error`
Configuration file error
"""
# Update configuration file, if given
if new_config_file is None:
new_config_file = self.config_file
# Initialize config parser and read file
getLogger().info("Loading configuration file %s", new_config_file)
config_parser = configparser.ConfigParser()
config_parser.read(new_config_file)
# Load serial port configuration
self.serial_port = config_parser.get(section='Connection', option='device')
self.baud_rate = config_parser.getint(section='Connection', option='baud_rate')
self.timeout = config_parser.getfloat(section='Connection', option='timeout')
self.warmup_time = config_parser.getfloat(section='Connection', option='warmup_time')
# Load channel information
for ch in range(self.number_of_channels):
sec_name = 'Sensor_{}'.format(ch+1)
log = False
lab = None
if config_parser.has_section(sec_name):
log = config_parser.getboolean(sec_name, 'logging')
lab = config_parser.get(sec_name, 'label')
getLogger().debug('Found sensor %d: %s, %s', ch+1, str(log), lab)
else:
getLogger().debug('%s not found', sec_name)
self.channel_info[ch].logging = log
self.channel_info[ch].label = lab
# If everything worked, update local config_file for future calls
self.config_file = new_config_file
[docs] def connect(self):
""" Connects to the Arduino device.
Raises
------
:class:`~serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
:class:`StateError`
Device was in the wrong state.
"""
if self.connected:
raise StateError('device was already ON')
getLogger().info('Connecting to Arduino Temperature Sensors on port %s', self.serial_port)
self.serial = Serial(
port=self.serial_port,
baudrate=self.baud_rate,
bytesize=EIGHTBITS,
timeout=self.timeout,
)
self.connected = True
getLogger().info('Connection successful')
[docs] def disconnect(self):
""" Closes the connection to the Arduino Temperature Sensors.
Raises
------
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
:class:`StateError`
Device was in the wrong state.
"""
# Check the device is connected
if not self.connected:
getLogger().warning('Device is not ON')
raise StateError('Device is not ON')
getLogger().info('Closing connection to Arduino Temperature Sensors on port %s', self.serial_port)
self.connected = False
self.serial.close()
getLogger().info('Connection closed')
[docs] def flush(self):
""" Cleans the input buffer.
Raises
------
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
:class:`StateError`
Device was in the wrong state.
"""
# Check the device is connected
if not self.connected:
getLogger().warning('Device is not ON')
raise StateError('Device is not ON')
getLogger().debug('Cleaning input buffer')
self.serial.reset_input_buffer()
[docs] def clear_data(self):
""" Clears sensor data.
Sets the :attr:`~.Channel.status` of every :class:`Channel`
in :attr:`channel_info` to False.
"""
getLogger().debug('Clearing sensor data')
for ch in self.channel_info:
ch.status = False
[docs] def read_data(self):
""" Reads data from the device buffer.
Raises
------
:class:`serial.SerialException`
The connection to the device has failed.
:class:`IOError`
Communication error, probably message misspelt.
:class:`StateError`
Device was in the wrong state.
"""
# Check the device is connected
if not self.connected:
getLogger().warning('Device is not ON')
raise StateError('Device is not ON')
# Read buffer
getLogger().debug('Reading a line now')
attempts = 0
max_attempts = 10
buffer = None
while attempts < max_attempts:
buffer = self.serial.readline().decode().rstrip(chr(10)).rstrip(chr(13))
attempts += 1
getLogger().debug('Attempt {}: {}'.format(attempts, buffer))
# Check buffer length, sometimes empty lines are read
if len(buffer) < 3:
getLogger().debug('Empty line')
continue
# Check first and last characters
if buffer[0] != '#' or buffer[-1] != '#':
getLogger().warning('Readout data format error: {}'.format(buffer))
continue
# Data is valid, break loop
break
# Number of attempts
if attempts == max_attempts:
message = 'Could not read meaningful data from the device after {} attempts'.format(max_attempts)
getLogger().warning(message)
raise IOError(message)
# Remove first and last characters
buffer = buffer[1:-2]
getLogger().debug('Buffer: {}'.format(buffer))
# Split into list
data = buffer.split(sep=';')
getLogger().debug('Data: {}'.format(data))
# Parse data
for item in data:
getLogger().debug('Item: {}'.format(item))
channel, value = item.split(sep=' ')
try:
channel = int(channel)
if channel-1 not in range(self.number_of_channels):
getLogger().warning('Unknown channel {}'.format(channel))
self.channel_info[channel-1].data = float(value)
self.channel_info[channel-1].status = True
except ValueError as e:
getLogger().error("{}: {}".format(type(e).__name__, e))
getLogger().debug('Channel: {} Value: {}'.format(channel, value))
# Clean the buffer
self.flush()