"""Machine Learning"""
import importlib
import numpy as np
import pandas as pd
import json
from joblib import load
from jsonschema import validate
from sklearn.pipeline import make_pipeline
from timeflux.core.node import Node
from timeflux.core.exceptions import ValidationError, WorkerInterrupt
from timeflux.helpers.background import Task
from timeflux.helpers.port import make_event, match_events, get_meta
from timeflux.helpers.clock import now, min_time, max_time
# Statuses
IDLE = 0
ACCUMULATING = 1
FITTING = 2
READY = 3
[docs]class Pipeline(Node):
"""Fit, transform and predict.
Training on continuous data is always unsupervised.
Training on epoched data can either be supervised or unsupervised.
If fit is `False`, input events are ignored, and initital training is not performed.
Automatically set to False if mode is either 'fit_predict' or fit_transform'.
Automatically set to True if mode is either 'predict', 'predict_proba' or 'predict_log_proba'.
Attributes:
i (Port): Continuous data input, expects DataFrame.
i_* (Port): Epoched data input, expects DataFrame.
i_training (Port): Continuous training data input, expects DataFrame.
i_training_* (Port): Epoched training data input, expects DataFrame.
i_events (Port): Event input, expects DataFrame.
o (Port): Continuous data output, provides DataFrame.
o_* (Port): Epoched data output, provides DataFrame.
o_events (Port): Event output, provides DataFrame.
Args:
steps (dict): Pipeline steps and settings (ignored if 'model' is set)
fit (bool):
mode ('predict'|'predict_proba'|'predict_log_proba'|'transform'|'fit_predict'|'fit_transform'):
meta_label (str|tuple|None):
event_start_accumulation (str):
event_stop_accumulation (str):
event_start_training (str):
event_reset (str):
buffer_size (str):
passthrough (bool):
resample (bool):
resample_direction ('right'|'left'|'both'):
resample_rate (None|float):
warmup (str): Load a .npy or .npz file and bootstrap the model with initial data
model (str): Load a pre-computed model, persisted with joblib
cv: Cross-validation - NOT IMPLEMENTED
"""
def __init__(
self,
steps=None,
fit=True,
mode="predict",
meta_label=("epoch", "context", "target"),
event_start_accumulation="accumulation_starts",
event_stop_accumulation="accumulation_stops",
event_start_training="training_starts",
event_reset=None,
buffer_size="5s",
passthrough=False,
resample=False,
resample_direction="right",
resample_rate=None,
warmup=None,
model=None,
cv=None,
):
# TODO: validation
# TODO: model loading from file
# TODO: cross-validation
# TODO: provide more context for errors
self.fit = fit
self.mode = mode
self.meta_label = meta_label
self.event_start_accumulation = event_start_accumulation
self.event_stop_accumulation = event_stop_accumulation
self.event_start_training = event_start_training
self.event_reset = event_reset
self.passthrough = passthrough
self.resample = resample
self.resample_direction = resample_direction
self.resample_rate = resample_rate
self.warmup = warmup
self.model = model
self._buffer_size = pd.Timedelta(buffer_size)
if model:
self._load_pipeline(model)
elif steps:
self._make_pipeline(steps)
else:
raise ValueError("You must pass either a 'steps' or 'model' argument")
self._reset()
[docs] def update(self):
# Let's get ready
self._clear()
# Reset
if self.event_reset:
matches = match_events(self.i_events, self.event_reset)
if matches is not None:
self.logger.debug("Reset")
if self._status == FITTING:
self._task.stop()
self._reset()
# Are we dealing with continuous data or epochs?
if self._dimensions is None:
port_name = "i_training" if self.fit else "i"
if getattr(self, port_name).ready():
self._dimensions = 2
elif len(list(self.iterate(port_name + "_*"))) > 0:
self._dimensions = 3
# Set the accumulation boundaries
if self._accumulation_start is None:
matches = match_events(self.i_events, self.event_start_accumulation)
if matches is not None:
self._accumulation_start = matches.index.values[0]
self._status = ACCUMULATING
self.logger.debug("Start accumulation")
if self._accumulation_stop is None:
matches = match_events(self.i_events, self.event_stop_accumulation)
if matches is not None:
self._accumulation_stop = matches.index.values[0]
self.logger.debug("Stop accumulation")
# Always buffer a few seconds, in case the start event is coming late
if self._status == IDLE:
start = (now() - self._buffer_size).to_datetime64()
stop = max_time()
self._accumulate(start, stop)
# Accumulate between boundaries
if self._status == ACCUMULATING:
start = self._accumulation_start
stop = self._accumulation_stop if self._accumulation_stop else max_time()
self._accumulate(start, stop)
# Should we start fitting the model?
if self._status < FITTING:
if match_events(self.i_events, self.event_start_training) is not None:
self._status = FITTING
self.logger.debug("Start training")
self._warmup()
self._task = Task(
self._pipeline, "fit", self._X_train, self._y_train
).start()
# Is the model ready?
if self._status == FITTING:
status = self._task.status()
if status:
if status["success"]:
self._pipeline = status["instance"]
self._status = READY
self.logger.debug(f"Model fitted in {status['time']} seconds")
self.o_events.data = make_event("ready")
else:
self.logger.error(
f"An error occured while fitting: {status['exception'].args[0]}"
)
self.logger.debug(
"\nTraceback (most recent call last):\n"
+ "".join(status["traceback"])
)
raise WorkerInterrupt()
# Run the pipeline
if self._status == READY:
self._receive()
if self._X is not None:
args = [self._X]
if self.mode.startswith("fit"):
args.append(self._y)
# TODO: optionally loop through epochs instead of sending them all at once
self._out = getattr(self._pipeline, self.mode)(*args)
# Set output streams
self._send()
[docs] def terminate(self):
# Kill the fit subprocess
if self._task is not None:
self._task.stop()
def _reset(self):
self._X_train = None
self._y_train = None
self._X_train_indices = np.array([], dtype=np.datetime64)
self._accumulation_start = None
self._accumulation_stop = None
self._dimensions = None
self._shape = ()
self._task = None
if self.mode.startswith("fit"):
self.fit = False
elif self.mode.startswith("predict"):
self.fit = True
if self.model is not None:
self.fit = False
if not self.mode.startswith("fit"):
self.meta_label = None
if self.fit:
self._status = IDLE
else:
self._status = READY
def _clear(self):
self._X = None
self._y = None
self._X_indices = []
self._X_columns = []
self._X_meta = None
self._out = None
def _make_pipeline(self, steps):
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
"module": {"type": "string"},
"class": {"type": "string"},
"args": {"type": "object"},
},
"required": ["module", "class"],
},
}
try:
validate(instance=steps, schema=schema)
except Exception as error:
raise ValidationError("steps", error.message)
pipeline = []
for step in steps:
try:
args = step["args"] if "args" in step else {}
m = importlib.import_module(step["module"])
c = getattr(m, step["class"])
i = c(**args)
pipeline.append(i)
except ImportError as error:
raise ValidationError("steps", f"could not import '{step['module']}'")
except AttributeError as error:
raise ValidationError(
"steps", f"could not find class '{step['class']}'"
)
except TypeError as error:
raise ValidationError(
"steps",
f"could not instantiate class '{step['class']}' with the given params",
)
# TODO: memory and verbose args
self._pipeline = make_pipeline(*pipeline, memory=None, verbose=False)
def _load_pipeline(self, path):
try:
self._pipeline = load(path)
except:
self.logger.error("Could not load model")
raise WorkerInterrupt()
def _warmup(self):
if self.warmup:
try:
data = np.load(self.warmup)
if type(data) == np.ndarray:
data = {
"X": data
} # .npy return an ndarray while .npz return a dict
if "X" in data:
if self._X_train is None:
self._X_train = data["X"]
else:
self._X_train = np.vstack((data["X"], self._X_train))
else:
self.logger.warning("Warmup data is missing")
if "y" in data:
if self._y_train is None:
self._y_train = data["y"]
else:
self._y_train = np.append(data["y"], self._y_train)
else:
self.logger.info("Warmup labels are missing") # OK if unsupervised
except OSError:
self.logger.error("Warmup file does not exist or cannot be read")
raise WorkerInterrupt()
except ValueError:
self.logger.error("Warmup and training data dimensions do not match")
raise WorkerInterrupt()
def _accumulate(self, start, stop):
# Do nothing if no fitting required
if not self.fit:
return
# Set defaults
indices = np.array([], dtype=np.datetime64)
# Accumulate continuous data
if self._dimensions == 2:
if self.i_training.ready():
data = self.i_training.data
mask = (data.index >= start) & (data.index < stop)
data = data[mask]
if not data.empty:
if self._X_train is None:
self._X_train = data.values
self._shape = self._X_train.shape[1]
indices = data.index.values
else:
if data.shape[1] == self._shape:
self._X_train = np.vstack((self._X_train, data.values))
indices = data.index.values
else:
self.logger.warning("Invalid shape")
# Accumulate epoched data
if self._dimensions == 3:
for _, _, port in self.iterate("i_training_*"):
if port.ready():
index = port.data.index.values[0]
if index >= start and index < stop:
data = port.data.values
label = get_meta(port, self.meta_label)
if self._shape and (data.shape != self._shape):
self.logger.warning("Invalid shape")
continue
if self.meta_label is not None and label is None:
self.logger.warning("Invalid label")
continue
if self._X_train is None:
self._X_train = np.array([data])
self._shape = self._X_train.shape[1:]
else:
self._X_train = np.vstack((self._X_train, [data]))
indices = np.append(indices, index)
if label is not None:
if self._y_train is None:
self._y_train = np.array([label])
else:
self._y_train = np.append(self._y_train, [label])
# Store indices
if indices.size != 0:
self._X_train_indices = np.append(self._X_train_indices, indices)
# Trim
if self._X_train is not None:
mask = (self._X_train_indices >= start) & (self._X_train_indices < stop)
self._X_train = self._X_train[mask]
self._X_train_indices = self._X_train_indices[mask]
if self._y_train is not None:
self._y_train = self._y_train[mask]
def _receive(self):
# Continuous data
if self._dimensions == 2:
if self.i.ready():
if not self._X_columns:
self._X_columns = list(self.i.data.columns)
if self._shape and (self.i.data.shape[1] != self._shape):
self.logger.warning("Invalid shape")
else:
self._X = self.i.data.values
self._X_indices = self.i.data.index.values
self._X_meta = self.i.meta
# Epochs
if self._dimensions == 3:
for name, _, port in self.iterate("i_*"):
if port.ready() and "training" not in name and "events" not in name:
data = port.data.values
meta = port.meta
indices = port.data.index.values
label = get_meta(port, self.meta_label)
if not self._X_columns:
self._X_columns = list(port.data.columns)
if self._shape and (data.shape != self._shape):
self.logger.warning("Invalid shape")
continue
if not self.fit and self.meta_label is not None and label is None:
self.logger.warning("Invalid label")
continue
if self._X is None:
self._X = []
if self._y is None and label is not None:
self._y = []
if self._X_meta is None:
self._X_meta = []
self._X.append(data)
self._X_indices.append(indices)
self._X_meta.append(meta)
if label is not None:
self._y.append(label)
def _send(self):
# Passthrough
if self._status < READY and self.passthrough:
inputs = []
for _, suffix, port in self.iterate("i*"):
if not suffix.startswith("_training") and not suffix.startswith(
"_events"
):
inputs.append((suffix, port))
for suffix, src_port in inputs:
dst_port = getattr(self, "o" + suffix)
dst_port.data = src_port.data
dst_port.meta = src_port.meta
# Model
if self._out is not None:
if "predict" in self.mode:
# Send events
if len(self._X_indices) == len(self._out):
# TODO: skip JSON serialization?
data = [
[self.mode, json.dumps({"result": self._np_to_native(result)})]
for result in self._out
]
times = (
self._X_indices
if self._dimensions == 2
else np.asarray(self._X_indices)[:, 0]
) # Keep the first timestamp of each epoch
names = ["label", "data"]
meta = (
self._X_meta
if self._dimensions == 2
else {"epochs": self._X_meta}
) # port.meta should always be an object
if hasattr(self._pipeline, "classes_"):
meta["classes"] = list(self._pipeline.classes_)
rows = pd.DataFrame(data, index=times, columns=names)
if self.o_events.ready():
# Make sure we don't overwrite other events
self.o_events.data = pd.concat([self.o_events.data, rows])
else:
self.o_events.data = rows
self.o_events.meta = meta
else:
self.logger.warning(
"Number of predictions inconsistent with input length"
)
else:
# Send data
if self._dimensions == 2:
try:
self.o.data = self._reindex(
self._out, self._X_indices, self._X_columns
)
self.o.meta = self._X_meta
except Exception as e:
self.logger.warning(getattr(e, "message", repr(e)))
if self._dimensions == 3:
if len(self._X_indices) == len(self._out):
for i, (data, times) in enumerate(
zip(self._out, self._X_indices)
):
try:
getattr(self, "o_" + str(i)).data = self._reindex(
data, times, self._X_columns
)
getattr(self, "o_" + str(i)).meta = self._X_meta[i]
except Exception as e:
self.logger.warning(getattr(e, "message", repr(e)))
else:
self.logger.warning(
"Number of transforms inconsistent with number of epochs"
)
def _np_to_native(self, data):
"""Convert numpy scalars and objects to native types."""
return getattr(data, "tolist", lambda: data)()
def _reindex(self, data, times, columns):
if len(data) != len(times):
if self.resample:
# Resample at a specific frequency
kwargs = {"periods": len(data)}
if self.resample_rate is None:
kwargs["freq"] = pd.infer_freq(times)
kwargs["freq"] = pd.tseries.frequencies.to_offset(kwargs["freq"])
else:
kwargs["freq"] = pd.DateOffset(seconds=1 / self.resample_rate)
if self.resample_direction == "right":
kwargs["start"] = times[0]
elif self.resample_direction == "left":
kwargs["end"] = times[-1]
else:
def middle(a):
return int(np.ceil(len(a) / 2)) - 1
kwargs["start"] = times[middle(times)] - (
middle(data) * kwargs["freq"]
)
times = pd.date_range(**kwargs)
else:
# Linearly arange between first and last
times = pd.date_range(start=times[0], end=times[-1], periods=len(data))
return pd.DataFrame(data, times, columns)