Source code for timeflux_hackeeg.nodes.driver

# import sys
import numpy as np
import hackeeg
from hackeeg import ads1299
from hackeeg.driver import SPEEDS, GAINS, Status
from timeflux.core.node import Node
from timeflux.helpers.clock import now
from threading import Thread, Lock
from time import sleep, time


[docs]class HackEEG(Node): """HackEEG driver. Attributes: o (Port): Default output, provides DataFrame. Args: port (string): The serial port. e.g. ``COM3`` on Windows; ``/dev/cu.usbmodem14601`` on MacOS; ``/dev/ttyUSB0`` on GNU/Linux. rate (int): The device rate in Hz. Allowed values: ``250``, ``500``, ``1024``, ``2048``, ``4096``, ``8192``, ``16384``. Default: ``250``. gain (int): The amplifier gain. Allowed values: ``1``, ``2``, ``4``, ``6``, ``8``, ``12``, ``24``. Default: ``24``. channels (int): The number of channels to enable. Default: ``8``. debug (bool): If ``True``, print debug information. Default: ``False``. Example: .. literalinclude:: /../examples/simple.yaml :language: yaml """ def __init__(self, port, rate=250, gain=24, channels=8, names=None, debug=False): # Validate input if rate not in SPEEDS.keys(): raise ValueError( f"`{rate}` is not a valid rate; valid rates are: {sorted(SPEEDS.keys())}" ) if gain not in GAINS.keys(): raise ValueError( f"`{gain}` is not a valid gain; valid gains are: {sorted(GAINS.keys())}" ) # Set channel names if isinstance(names, list) and len(names) == channels: self.names = names else: self.names = list(range(1, channels + 1)) # Setup board self._hackeeg = hackeeg.HackEEGBoard(port, baudrate=2000000, debug=debug) self._hackeeg.connect() self._hackeeg.stop_and_sdatac_messagepack() self._hackeeg.sdatac() self._hackeeg.blink_board_led() self._hackeeg.wreg(ads1299.CONFIG1, SPEEDS[rate] | ads1299.CONFIG1_const) self._hackeeg.disable_all_channels() for channel in range(1, channels + 1): self._hackeeg.wreg( ads1299.CHnSET + channel, ads1299.ELECTRODE_INPUT | GAINS[gain] ) self._hackeeg.wreg(ads1299.MISC1, ads1299.SRB1 | ads1299.MISC1_const) self._hackeeg.wreg(ads1299.MISC1, ads1299.MISC1_const) self._hackeeg.messagepack_mode() self._hackeeg.start() self._hackeeg.rdatac() # Compute time offset self._start = 2**32 row = self._read() # Remember sample count self._count = row[0] self._missed = 0 # Set meta self.meta = {"rate": rate} # Launch background thread self._reset() self._lock = Lock() self._running = True self._thread = Thread(target=self._loop).start() def _reset(self): """Empty cache.""" self._rows = [] self._timestamps = [] def _loop(self): """Acquire and cache data.""" while self._running: try: row = self._read() if row: self._check(row[0]) timestamp = np.datetime64(int(row[1] + self._offset), "us") self._lock.acquire() # `with self.lock:` is about twice as slow self._timestamps.append(timestamp) self._rows.append(row[2]) self._lock.release() except: pass def _check(self, count): """Report dropped samples. We don't even bother about a possible overflow of the sample counter as it would take about 3 days at 16K SPS before reaching the maximum value. """ missed = (count - self._count) - 1 self._count = count if missed: self._missed += missed else: if self._missed: self.logger.warn(f"Missed {self._missed} samples") self._missed = 0 def _read(self): """Read a line of data from the device.""" row = False result = self._hackeeg.read_rdatac_response() if result: code = result.get("C") data = result.get("channel_data") if code == Status.Ok and data: row = ( result.get("sample_number"), result.get("timestamp"), data, ) # The timestamp is a 32-byte unsigned integer, with microseconds units. # It will overflow after about 71 minutes, so we need to reset the offset. if row[1] <= self._start: self._start = row[1] self._offset = time() * 1e6 - self._start return row
[docs] def update(self): """Update the node output.""" with self._lock: if self._rows: self.o.set(self._rows, self._timestamps, self.names, meta=self.meta) self._reset()
[docs] def terminate(self): """Cleanup.""" self._running = False while self._thread and self._thread.is_alive(): sleep(0.001) if self._hackeeg: try: self._hackeeg.stop_and_sdatac_messagepack() self._hackeeg.blink_board_led() except: pass