"""A set of Port helpers."""
import json
import pandas as pd
from timeflux.helpers.clock import now
[docs]def make_event(label, data={}, serialize=True):
"""Create an event DataFrame
Args:
label (str): The event label.
data (dict): The optional data dictionary.
serialize (bool): Whether to JSON serialize the data or not.
Returns:
Dataframe
"""
if serialize:
data = json.dumps(data)
return pd.DataFrame([[label, data]], index=[now()], columns=["label", "data"])
[docs]def match_events(port, label):
"""Find the given label in an event DataFrame
Args:
port (Port): The event port.
label (str): The string to look for in the label column.
Returns:
DataFrame: The list of matched events, or `None` if there is no match.
"""
matches = None
if port.ready():
matches = port.data[port.data["label"] == label]
if matches.empty:
matches = None
return matches
[docs]def traverse(dictionary, keys, default=None):
"""Find a deep value in a dictionary
Args:
dictionary (dict): The event port.
keys (tuple|str): The hiearchical list of keys.
default: The default value if not found.
Returns:
The value, or `default` if not found.
"""
if keys is None:
return default
if type(keys) == str:
keys = (keys,)
for key in keys:
try:
dictionary = dictionary[key]
except KeyError:
return default
return dictionary