import time
import pandas as pd
from brainflow.board_shim import (
BoardIds,
BoardShim,
BrainFlowInputParams,
BrainFlowError,
)
from timeflux.core.node import Node
[docs]class BrainFlow(Node):
"""Driver for BrainFlow.
This plugin provides a unified interface for all boards supported by BrainFlow.
Attributes:
o (Port): Default output, provides DataFrame.
Args:
board (string|int): The board ID.
Allowed values: numeric ID or name (e.g. ``synthetic``, ``cyton_wifi``,
``brainbit``, etc.).
channels (list): The EEG channel labels.
If not set, incrementing numbers will be used.
command (string): Send a command to the board.
Use it carefully and only if you understand what you are doing.
debug (boolean): Print debug messages.
**kwargs: The parameters specific for each board.
Allowed arguments: ``serial_port``, ``mac_address``, ``ip_address``,
``ip_port``, ``ip_protocol``, ``serial_number``, ``other_info``.
.. seealso::
List of `supported boads <https://brainflow.readthedocs.io/en/stable/SupportedBoards.html>`_.
Example:
.. literalinclude:: /../examples/synthetic.yaml
:language: yaml
"""
def __init__(self, board, channels=None, command=None, debug=False, **kwargs):
# Get board id
if isinstance(board, str):
# Board name
try:
self._board_id = getattr(BoardIds, board.upper() + "_BOARD").value
except AttributeError:
raise ValueError(f"Invalid board name: `{board}`") from None
else:
# Assume this is a numeric ID
try:
BoardIds(board) # Attempt to access by value
self._board_id = board
except ValueError:
raise ValueError(f"Invalid board ID: `{board}`") from None
# Enable or disable debug mode
if debug:
BoardShim.enable_dev_board_logger()
else:
BoardShim.disable_board_logger()
# Set board parameters
params = BrainFlowInputParams()
for key, value in kwargs.items():
setattr(params, key, str(value))
# Set private variables
self._channels = list(range(0, BoardShim.get_num_rows(self._board_id)))
self._timestamp_channel = BoardShim.get_timestamp_channel(self._board_id)
self._counter_channel = BoardShim.get_package_num_channel(self._board_id)
self._meta = {"rate": BoardShim.get_sampling_rate(self._board_id)}
# Set channel labels
accel_channels = self._rename_channels("accel", ("x", "y", "z"))
gyro_channels = self._rename_channels("gyro", ("x", "y", "z"))
analog_channels = self._rename_channels("analog")
other_channels = self._rename_channels("other")
eeg_channels = BoardShim.get_eeg_channels(self._board_id)
num_eeg_channels = len(eeg_channels)
if channels is not None:
if not isinstance(channels, list) or len(channels) != num_eeg_channels:
self.logger.warn(f"`channels` must contain {num_eeg_channels} elements")
channels = None
if channels is None:
channels = [f"eeg_{channel}" for channel in range(1, num_eeg_channels + 1)]
for channel, label in zip(eeg_channels, channels):
self._channels[channel] = label
self._channels[self._counter_channel] = "counter"
self._channels[self._timestamp_channel] = "timestamp"
# Initialize board and start streaming
self._board = BoardShim(self._board_id, params)
self._board.prepare_session()
if command:
self._board.config_board(command)
self._board.start_stream()
[docs] def update(self):
data = self._board.get_board_data()
if data is not None:
# TODO: check self._counter_channel and warn if we missed a packet
indices = pd.to_datetime(data[self._timestamp_channel], unit="s")
self.o.set(data.T, indices, self._channels, self._meta)
[docs] def terminate(self):
self._board.stop_stream()
self._board.release_session()
def _rename_channels(self, type, indices=None):
try:
channels = getattr(BoardShim, f"get_{type}_channels")(self._board_id)
except BrainFlowError:
return
if indices:
items = zip(indices, channels)
else:
items = enumerate(channels, start=1)
for index, channel in items:
self._channels[channel] = f"{type}_{index}"