"""timeflux.nodes.hdf5: HDF5 nodes"""
import pandas as pd
import timeflux.helpers.clock as clock
import sys
import os
import time
from timeflux.core.exceptions import WorkerInterrupt, WorkerLoadError
from timeflux.core.node import Node
# Ignore the "object name is not a valid Python identifier" message
import warnings
from tables.exceptions import NaturalNameWarning
warnings.simplefilter("ignore", NaturalNameWarning)
[docs]class Replay(Node):
"""Replay a HDF5 file."""
def __init__(self, filename, keys, speed=1, timespan=None, resync=True, start=0):
"""
Initialize.
Parameters
----------
filename : string
The path to the HDF5 file.
keys: list
The list of keys to replay.
speed: float
The speed at which the data must be replayed. 1 means real-time.
Default: 1
timespan: float
The timespan of each chunk, in seconds.
If not None, will take precedence over the `speed` parameter
Default: None
resync: boolean
If False, timestamps will not be resync'ed to current time
Default: True
start: float
Start directly at the given time offset, in seconds
Default: 0
"""
# Load store
try:
self._store = pd.HDFStore(self._find_path(filename), mode="r")
except IOError as e:
raise WorkerInterrupt(e)
# Init
self._sources = {}
self._start = pd.Timestamp.max
self._stop = pd.Timestamp.min
self._speed = speed
self._timespan = None if not timespan else pd.Timedelta(f"{timespan}s")
self._resync = resync
for key in keys:
try:
# Check format
if not self._store.get_storer(key).is_table:
self.logger.warning("%s: Fixed format. Will be skipped.", key)
continue
# Get first index
first = self._store.select(key, start=0, stop=1).index[0]
# Get last index
nrows = self._store.get_storer(key).nrows
last = self._store.select(key, start=nrows - 1, stop=nrows).index[0]
# Check index type
if type(first) != pd.Timestamp:
self.logger.warning("%s: Invalid index. Will be skipped.", key)
continue
# Find lowest and highest indices across stores
if first < self._start:
self._start = first
if last > self._stop:
self._stop = last
# Extract meta
if self._store.get_node(key)._v_attrs.__contains__("meta"):
meta = self._store.get_node(key)._v_attrs["meta"]
else:
meta = {}
# Set output port name, port will be created dynamically
name = "o" + key.replace("/", "_")
# Update sources
self._sources[key] = {
"start": first,
"stop": last,
"nrows": nrows,
"name": name,
"meta": meta,
}
except KeyError:
self.logger.warning("%s: Key not found.", key)
# Current time
now = clock.now()
# Starting timestamp
self._start += pd.Timedelta(f"{start}s")
# Time offset
self._offset = pd.Timestamp(now) - self._start
# Current query time
self._current = self._start
# Last update
self._last = now
[docs] def update(self):
if self._current > self._stop:
raise WorkerInterrupt("No more data.")
min = self._current
if self._timespan:
max = min + self._timespan
else:
now = clock.now()
ellapsed = now - self._last
max = min + ellapsed * self._speed
self._last = now
for key, source in self._sources.items():
# Select data
data = self._store.select(key, "index >= min & index < max")
# Add offset
if self._resync:
data.index += self._offset
# Update port
getattr(self, source["name"]).data = data
getattr(self, source["name"]).meta = source["meta"]
self._current = max
[docs] def terminate(self):
self._store.close()
def _find_path(self, path):
path = os.path.normpath(path)
if os.path.isabs(path):
if os.path.isfile(path):
return path
else:
for base in sys.path:
full_path = os.path.join(base, path)
if os.path.isfile(full_path):
return full_path
raise WorkerLoadError(f"File `{path}` could not be found in the search path.")
[docs]class Save(Node):
"""Save to HDF5."""
def __init__(
self, filename=None, path="/tmp", complib="zlib", complevel=3, min_itemsize=None
):
"""
Initialize.
Parameters
----------
filename: string
Name of the file (inside the path set by parameter). If not set,
an auto-generated filename is used.
path : string
The directory where the HDF5 file will be written.
Default: "/tmp"
complib : string
The compression lib to be used.
see: https://www.pytables.org/usersguide/libref/helper_classes.html
Default: "zlib"
complevel : int
The compression level. A value of 0 disables compression.
Default: 3
see: https://www.pytables.org/usersguide/libref/helper_classes.html
min_itemsize : int
The string columns size
Default: None
see: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.HDFStore.append.html
see: http://pandas.pydata.org/pandas-docs/stable/io.html#string-columns
"""
os.makedirs(path, exist_ok=True)
if filename is None:
filename = os.path.join(
path, time.strftime("%Y%m%d-%H%M%S.hdf5", time.gmtime())
)
else:
filename = os.path.join(path, filename)
self.logger.info("Saving to %s", filename)
self._store = pd.HDFStore(filename, complib=complib, complevel=complevel)
self.min_itemsize = min_itemsize
[docs] def update(self):
if self.ports is not None:
for name, port in self.ports.items():
if not name.startswith("i"):
continue
key = "/" + name[2:].replace("_", "/")
if port.data is not None:
if isinstance(port.data, pd.DataFrame):
port.data.index.freq = None
self._store.append(key, port.data, min_itemsize=self.min_itemsize)
if port.meta is not None and port.meta:
# Note: not none and not an empty dict, because this operation
# overwrites previous metadata and an empty dict would
# just remove any previous change
node = self._store.get_node(key)
if node:
self._store.get_node(key)._v_attrs["meta"] = port.meta
[docs] def terminate(self):
try:
self._store.close()
except Exception:
# Just in case
pass