timeflux.nodes.epoch


Epoching nodes

epoch

class timeflux.nodes.epoch.Samples(trigger, length=0.6, rate=None, buffer=5)[source]

Bases: timeflux.core.node.Node

Fixed-size epoching.

This node produces equal-length epochs from the default input stream. These epochs are triggered from the events stream. Each epoch contains contextual metadata, making this node ideal in front of the ml node to train a model. Non-monotonic data, late data, late events, jittered data and jumbled events are all handled reasonably well. Multiple epochs are automatically assigned to dynamic outputs ports. For convenience, the first epoch is bound to the default output, so you can avoid enumerating all output ports if you expects only one epoch.

Variables:
  • i (Port) – Default data input, expects DataFrame.

  • i_events (Port) – Event input, expects DataFrame.

  • o (Port) – Default output, provides DataFrame and meta.

  • o_* (Port) – Dynamic outputs, provide DataFrame and meta.

Parameters:
  • trigger (string) – The marker name.

  • length (float) – The length of the epoch, in seconds.

  • rate (float) – The rate of the input stream. If None (the default), it will be taken from the meta data.

  • buffer (float) – The length of the buffer, in seconds (default: 5).

Instantiate the node.

update()[source]

Update the input and output ports.

class timeflux.nodes.epoch.Epoch(event_trigger, before=0.2, after=0.6)[source]

Bases: timeflux.core.node.Node

Event-triggered epoching.

This node continuously buffers a small amount of data (of a duration of before seconds) from the default input stream. When it detects a marker matching the event_trigger in the label column of the event input stream, it starts accumulating data for after seconds. It then sends the epoched data to an output stream, and sets the metadata to a dictionary containing the triggering marker and optional event data. Multiple, overlapping epochs are authorized. Each concurrent epoch is assigned its own Port. For convenience, the first epoch is bound to the default output, so you can avoid enumerating all output ports if you expects only one epoch.

Variables:
  • i (Port) – Default data input, expects DataFrame.

  • i_events (Port) – Event input, expects DataFrame.

  • o (Port) – Default output, provides DataFrame and meta.

  • o_* (Port) – Dynamic outputs, provide DataFrame and meta.

Parameters:
  • event_trigger (string) – The marker name.

  • before (float) – Length before onset, in seconds.

  • after (float) – Length after onset, in seconds.

Example

graphs:

  # Nothing is displayed because the epoch does not have any event input
  - id: example
    nodes:
    - id: random
      module: timeflux.nodes.random
      class: Random
    - id: epoch
      module: timeflux.nodes.epoch
      class: Epoch
      params:
        event_trigger: test
    - id: display
      module: timeflux.nodes.debug
      class: Display
    edges:
    - source: random
      target: epoch
    - source: epoch
      target: display
    rate: 10

Instantiate the node.

update()[source]

Update the input and output ports.

class timeflux.nodes.epoch.Trim(samples=0)[source]

Bases: timeflux.core.node.Node

Trim data so epochs are of equal length.

Because real-time data is often jittered, the Epoch node is not always able to provide dataframes of equal dimensions. This can be problematic if the data is further processed by the Pipeline node, for example. This simple node takes care of trimming the extra samples. It should be placed just after an Epoch node.

Variables:
  • i_* (Port) – Epoched data input, expects DataFrame.

  • o_* (Port) – Trimmed epochs, provides DataFrame and meta.

Parameters:

samples (int) – The maximum number of samples per epoch. If 0, the size of the first epoch is used.

Instantiate the node.

update()[source]

Update the input and output ports.

class timeflux.nodes.epoch.ToXArray(reporting='warn', output='DataArray', context_key=None)[source]

Bases: timeflux.core.node.Node

Convert multiple epochs to DataArray

This node iterates over input ports with valid epochs, concatenates them on the first axis, and creates a XArray with dimensions (‘epoch’, ‘time’, ‘space’) where epoch corresponds to th input ports, time to the ports data index and space to the ports data columns. A port is considered to be valid if it has meta with key ‘epoch’ and data with expected number of samples. If some epoch have an invalid length (which happens when the data has jitter), the node either raises a warning, an error or pass.

Variables:
  • i_* (Port) – Dynamic inputs, expects DataFrame and meta.

  • o (Port) – Default output, provides DataArray and meta.

Parameters:
  • reporting (string| None) – How this function handles epochs with invalid length: warn will issue a warning with warnings.warn(), error will raise an exception, None will ignore it.

  • output (DataArray`|`Dataset) – Type of output to return

  • context_key (string|None) – If output type is Dataset, key to define the target of the event. If None, the whole context is considered.

Instantiate the node.

update()[source]

Update the input and output ports.