Source code for timeflux.helpers.port

"""A set of Port helpers."""

import json
import pandas as pd
from timeflux.helpers.clock import now


[docs]def make_event(label, data={}, serialize=True): """Create an event DataFrame Args: label (str): The event label. data (dict): The optional data dictionary. serialize (bool): Whether to JSON serialize the data or not. Returns: Dataframe """ if serialize: data = json.dumps(data) return pd.DataFrame([[label, data]], index=[now()], columns=["label", "data"])
[docs]def match_events(port, label): """Find the given label in an event DataFrame Args: port (Port): The event port. label (str): The string to look for in the label column. Returns: DataFrame: The list of matched events, or `None` if there is no match. """ matches = None if port.ready(): matches = port.data[port.data["label"] == label] if matches.empty: matches = None return matches
[docs]def get_meta(port, keys, default=None): """Find a deep value in a port's meta Args: port (Port): The event port. keys (tuple|str): The hiearchical list of keys. default: The default value if not found. Returns: The value, or `default` if not found. """ return traverse(port.meta, keys, default)
[docs]def traverse(dictionary, keys, default=None): """Find a deep value in a dictionary Args: dictionary (dict): The event port. keys (tuple|str): The hiearchical list of keys. default: The default value if not found. Returns: The value, or `default` if not found. """ if keys is None: return default if type(keys) == str: keys = (keys,) for key in keys: try: dictionary = dictionary[key] except KeyError: return default return dictionary