import struct
import time
import numpy as np
from bitalino import BITalino, ExceptionCode
from timeflux.core.exceptions import WorkerInterrupt
from timeflux.core.node import Node
import timeflux_bitalino.helpers.transfer as transfer
# Available transfer functions
TRANSFER = [f for f in dir(transfer) if not f.startswith("_")]
[docs]class Bitalino(Node):
"""BITalino driver.
This node connects to a BITalino device and streams data at a provided rate.
It is based on the original BITalino Python library, with some performance
improvements and careful timestamping.
Two output streams are provided. The default output is the data read from the
analog and digital channels. The ``o_offsets`` output provides continuous offsets
between the local time and the estimated device time. This enables drift correction
to be performed during post-processing, although no significant drift has been
observed during testing.
Attributes:
o (Port): BITalino data, provides DataFrame.
o_offsets (Port): Time offsets, provide DataFrame.
Args:
port (string): The serial port.
e.g. ``COM3`` on Windows; ``/dev/tty.bitalino-DevB`` on MacOS;
``/dev/ttyUSB0`` on GNU/Linux.
rate (int): The device rate in Hz.
Possible values: ``1``, ``10``, ``100``, ``1000``. Default: ``1000``.
channels (tupple): The analog channels to read from.
Default: ``('A1', 'A2', 'A3', 'A4', 'A5', 'A6')``.
sensors (dict): The map of attached sensors. If set, transfer functions will be applied.
e.g. ``{"A1": "ECG", "A3": "EMG"}``.
Default: ``None``.
Example:
.. literalinclude:: /../examples/bitalino.yaml
:language: yaml
Notes:
.. attention::
Make sure to set your graph rate to an high-enough value, otherwise the device
internal buffer may saturate, and data may be lost. A 30Hz graph rate is
recommended for a 1000Hz device rate.
"""
def __init__(
self,
port,
rate=1000,
channels=("A1", "A2", "A3", "A4", "A5", "A6"),
sensors=None,
):
# Check port
if not port.startswith("/dev/") and not port.startswith("COM"):
raise ValueError(f"Invalid serial port: {port}")
# Check rate
if rate not in (1, 10, 100, 1000):
raise ValueError(f"Invalid rate: {rate}")
# Check channels
if sensors:
channels += tuple(sensors.keys())
unique_channels = set(channels)
analog_channels = ["A1", "A2", "A3", "A4", "A5", "A6"]
channels = []
for channel_num, channel_name in enumerate(analog_channels):
if channel_name in unique_channels:
channels.append(channel_num)
# Set column names
# Sequence number and numeric channels are always present
self.columns = ["SEQ", "I1", "I2", "O1", "O2"]
# Add required analog channels
for channel in channels:
self.columns.append(analog_channels[channel])
# Map channels to transfer functions
self.functions = {}
for channel, sensor in sensors.items():
sensor = sensor.upper()
if channel in self.columns:
if sensor in TRANSFER:
resolution = None
if channel in ("A1", "A2", "A3", "A4"):
resolution = 10
if channel in ("A5", "A6"):
resolution = 6
column = f"{channel}_{sensor}"
self.columns.append(column)
self.functions[self.columns.index(channel)] = {
"function": sensor,
"resolution": resolution,
}
# Compute the sample size in bytes
self.channel_count = len(channels)
if self.channel_count <= 4:
self.sample_size = int(np.ceil((12.0 + 10.0 * self.channel_count) / 8.0))
else:
self.sample_size = int(
np.ceil((52.0 + 6.0 * (self.channel_count - 4)) / 8.0)
)
# Connect to BITalino
try:
self.device = BITalino(port)
except UnicodeDecodeError:
# This can happen after an internal buffer overflow.
# The solution seems to power off the device and repair.
raise WorkerInterrupt("Unstable state. Could not connect.")
except Exception as e:
raise WorkerInterrupt(e)
# Set battery threshold
# The red led will light up at 5-10%
self.device.battery(30)
# Read BITalino version
self.logger.info(self.device.version())
# Read state and show battery level
# http://forum.bitalino.com/viewtopic.php?t=448
state = self.device.state()
battery = round(1 + (state["battery"] - 511) * ((99 - 1) / (645 - 511)), 2)
self.logger.info("Battery: %.2f%%", battery)
# Start Acquisition
self.device.start(rate, channels)
# Initialize counters for timestamp indices and continuity checks
self.last_sample_counter = 15
self.time_device = np.datetime64(int(time.time() * 1e6), "us")
self.time_local = self.time_device
self.time_delta = np.timedelta64(int(1000 / rate), "ms")
# Set meta
self.meta = {"rate": rate}
[docs] def update(self):
# Send BITalino data
data, timestamps = self._read_all()
if self.functions:
converted = self._transfer(data[:, list(self.functions.keys())])
data = np.concatenate((data, converted), axis=1)
self.o.set(data, timestamps, self.columns, self.meta)
# Send time offsets
if len(timestamps) > 0:
offset = (self.time_local - self.time_device).astype(int)
self.o_offsets.set(
[[self.time_device, offset]],
[self.time_local],
["time_device", "time_offset"],
)
def _read_all(self):
"""Read all available data"""
# Make sure the device is in aquisition mode
if not self.device.started:
raise Exception(ExceptionCode.DEVICE_NOT_IN_ACQUISITION)
# We only support serial connections
if not self.device.serial:
raise Exception("Device must be opened in serial mode.")
# Check buffer size and limits
buffer_size = self.device.socket.in_waiting
if buffer_size == 1020:
# The device buffer can hold up to 1020 bytes
self.logger.warn(
"OS serial buffer saturated. Increase graph rate or decrease device rate."
)
# Compute the maximum number of samples we can get
sample_count = int(buffer_size / self.sample_size)
# Infer timestamps from sample count and rate
# Will fail dramatically if too much packets are lost
# Tests show that there is no significant drift during a 2-hour session
start = self.time_device
stop = start + self.time_delta * sample_count
self.time_device = stop
timestamps = np.arange(start, stop, self.time_delta)
self.time_local = np.datetime64(int(time.time() * 1e6), "us")
# Infer timestamps from local time and rate
# /!\ Not monotonic
# stop = np.datetime64(int(time.time() * 1e6), 'us')
# start = stop - (sample_count * self.time_delta)
# timestamps = np.arange(start, stop, self.time_delta)
# Read raw samples from device
raw = self.device.socket.read(sample_count * self.sample_size)
# Initialize the output matrix
data = np.full((sample_count, 5 + self.channel_count), np.nan)
# Parse the raw data
# http://bitalino.com/datasheets/REVOLUTION_MCU_Block_Datasheet.pdf
for sample_number in range(sample_count):
# Extract sample
start = sample_number * self.sample_size
stop = start + self.sample_size
sample = list(struct.unpack(self.sample_size * "B ", raw[start:stop]))
# Is the sample corrupted?
crc = sample[-1] & 0x0F
sample[-1] = sample[-1] & 0xF0
x = 0
for i in range(self.sample_size):
for bit in range(7, -1, -1):
x = x << 1
if x & 0x10:
x = x ^ 0x03
x = x ^ ((sample[i] >> bit) & 0x01)
if crc != x & 0x0F:
self.logger.warn("Checksum failed.")
continue
# Parse sample
data[sample_number, 0] = sample[-1] >> 4
data[sample_number, 1] = sample[-2] >> 7 & 0x01
data[sample_number, 2] = sample[-2] >> 6 & 0x01
data[sample_number, 3] = sample[-2] >> 5 & 0x01
data[sample_number, 4] = sample[-2] >> 4 & 0x01
if self.channel_count > 0:
data[sample_number, 5] = ((sample[-2] & 0x0F) << 6) | (sample[-3] >> 2)
if self.channel_count > 1:
data[sample_number, 6] = ((sample[-3] & 0x03) << 8) | sample[-4]
if self.channel_count > 2:
data[sample_number, 7] = (sample[-5] << 2) | (sample[-6] >> 6)
if self.channel_count > 3:
data[sample_number, 8] = ((sample[-6] & 0x3F) << 4) | (sample[-7] >> 4)
if self.channel_count > 4:
data[sample_number, 9] = ((sample[-7] & 0x0F) << 2) | (sample[-8] >> 6)
if self.channel_count > 5:
data[sample_number, 10] = sample[-8] & 0x3F
# Did we miss any sample?
# Check for discontinuity in the internal sample counter, encoded to 4 bits.
sample_counter = data[sample_number, 0]
if sample_counter == self.last_sample_counter + 1:
pass
elif sample_counter == 0 and self.last_sample_counter == 15:
pass
else:
self.logger.warn("Missed sample.")
self.last_sample_counter = sample_counter
return data, timestamps
def _transfer(self, data):
"""Convert signal to meaningful units"""
for index, converter in enumerate(self.functions.values()):
data[:, index] = getattr(transfer, converter["function"])(
data[:, index], converter["resolution"]
)
return data
[docs] def terminate(self):
self.device.stop()
self.device.close()