Source code for timeflux.nodes.zmq

"""timeflux.nodes.zmq: a simple 0MQ pub/sub broker"""

import time
import pandas
import zmq
from zmq.devices import ThreadProxy
from timeflux.core.node import Node
from timeflux.core.exceptions import WorkerInterrupt
from timeflux.core.io import Port
import timeflux.core.message


[docs]class Broker(Node): """Must run in its own graph.""" def __init__( self, address_in="tcp://127.0.0.1:5559", address_out="tcp://127.0.0.1:5560" ): """ Initialize frontend and backend. If used on a LAN, bind to tcp://*:5559 and tcp://*:5560 instead of localhost. """ try: context = zmq.Context.instance() self._frontend = context.socket(zmq.XSUB) self._frontend.bind(address_in) self._backend = context.socket(zmq.XPUB) self._backend.bind(address_out) except zmq.ZMQError as e: self.logger.error(e)
[docs] def update(self): """Start a blocking proxy.""" zmq.proxy(self._frontend, self._backend)
[docs]class BrokerMonitored(Node): """ Run a monitored pub/sub proxy. Will shut itself down after [timeout] seconds if no data is received. Useful for unit testing and replays. """ _last_event = time.time() def __init__( self, address_in="tcp://127.0.0.1:5559", address_out="tcp://127.0.0.1:5560", timeout=5, ): self._timeout = timeout try: # Capture address_monitor = "inproc://monitor" context = zmq.Context.instance() self._monitor = context.socket(zmq.PULL) self._monitor.bind(address_monitor) # Proxy proxy = ThreadProxy(zmq.XSUB, zmq.XPUB, zmq.PUSH) proxy.bind_in(address_in) proxy.bind_out(address_out) proxy.connect_mon(address_monitor) # proxy.setsockopt_mon(zmq.CONFLATE, True) # Do not clutter the network proxy.start() except zmq.ZMQError as error: self.logger.error(error)
[docs] def update(self): """Monitor proxy""" if self._timeout == 0: pass now = time.time() count = 0 try: while True: self._monitor.recv_multipart(zmq.NOBLOCK, copy=False) self._last_event = now count += 1 except zmq.ZMQError: if count > 0: self.logger.debug("Received %d messages", count) if (now - self._last_event) > self._timeout: raise WorkerInterrupt("No data after %d seconds" % self._timeout)
[docs]class BrokerLVC(Node): """A monitored pub/sub broker with last value caching.""" def __init__( self, address_in="tcp://127.0.0.1:5559", address_out="tcp://127.0.0.1:5560", timeout=1000, ): self._timeout = timeout try: context = zmq.Context.instance() self._frontend = context.socket(zmq.SUB) self._frontend.setsockopt(zmq.SUBSCRIBE, b"") self._frontend.bind(address_in) self._backend = context.socket(zmq.XPUB) self._backend.setsockopt(zmq.XPUB_VERBOSE, True) self._backend.bind(address_out) self._poller = zmq.Poller() self._poller.register(self._frontend, zmq.POLLIN) self._poller.register(self._backend, zmq.POLLIN) except zmq.ZMQError as error: self.logger.error(error)
[docs] def update(self): """Main poll loop.""" cache = {} while True: events = dict(self._poller.poll(self._timeout)) # Any new topic data we cache and then forward if self._frontend in events: message = self._frontend.recv_multipart() topic, current = message cache[topic] = current self._backend.send_multipart(message) # handle subscriptions # When we get a new subscription we pull data from the cache: if self._backend in events: event = self._backend.recv() # print(event) # Event is one byte 0=unsub or 1=sub, followed by topic # if event[0] == b'\x01': if event[0] == 1: topic = event[1:] if topic in cache: self.logger.debug( "Sending cached topic %s", topic.decode("utf-8") ) self._backend.send_multipart([topic, cache[topic]])
[docs]class Pub(Node): def __init__( self, topic, address="tcp://127.0.0.1:5559", serializer="pickle", wait=0 ): """Create a publisher""" self._topic = topic.encode("utf-8") self._serializer = getattr(timeflux.core.message, serializer + "_serialize") try: context = zmq.Context.instance() self._socket = context.socket(zmq.PUB) self._socket.setsockopt(zmq.LINGER, 0) self._socket.connect(address) except zmq.ZMQError as e: self.logger.error(e) # Quick fix to the slow joiner syndrome # TODO: remove when Last Value Caching is implemented # Wait for subscribers to connect # http://zguide.zeromq.org/page%3aall#Getting-the-Message-Out # http://zguide.zeromq.org/page%3aall#Node-Coordination # http://zguide.zeromq.org/page%3aall#Last-Value-Caching # https://stackoverflow.com/questions/30864145/zmq-no-subscription-message-on-xpub-socket-for-multiple-subscribers-last-value time.sleep(wait)
[docs] def update(self): for name, suffix, port in self.iterate("i*"): if port.ready() or port.meta: if not suffix: topic = self._topic else: topic = self._topic + suffix.encode("utf-8") try: if not port.ready(): port.data = None # make sure we do not send corrupted data self._socket.send_serialized( [topic, port.data, port.meta], self._serializer ) except zmq.ZMQError as e: self.logger.error(e)
[docs]class Sub(Node): def __init__( self, topics=[""], address="tcp://127.0.0.1:5560", deserializer="pickle" ): """Create a subscriber""" try: context = zmq.Context.instance() self._socket = context.socket(zmq.SUB) self._socket.connect(address) for topic in topics: self._socket.setsockopt(zmq.SUBSCRIBE, topic.encode("utf-8")) if topic: if not topic.isidentifier(): raise ValueError("Invalid topic name: %s" % topic) self._deserializer = getattr( timeflux.core.message, deserializer + "_deserialize" ) except zmq.ZMQError as e: self.logger.error(e)
[docs] def update(self): self._chunks = {} try: while True: [topic, data, meta] = self._socket.recv_serialized( self._deserializer, zmq.NOBLOCK ) if not topic in self._chunks: self._chunks[topic] = {"data": [], "meta": {}} self._append_data(topic, data) self._append_meta(topic, meta) except zmq.ZMQError: pass # No more data self._update_ports()
def _append_data(self, topic, data): if data is not None: self._chunks[topic]["data"].append(data) def _append_meta(self, topic, meta): if meta: self._chunks[topic]["meta"].update(meta) def _update_ports(self): for topic in self._chunks.keys(): if len(self._chunks[topic]["data"]) == 0: data = None elif len(self._chunks[topic]["data"]) == 1: data = self._chunks[topic]["data"][0] else: data = pandas.concat(self._chunks[topic]["data"]) meta = self._chunks[topic]["meta"] self._update_port(topic, data, meta) def _update_port(self, topic, data, meta): getattr(self, "o_" + topic).data = data getattr(self, "o_" + topic).meta = meta