Source code for timeflux.helpers.background

""" Run a Python method in a lightweight background process.

Example:

    .. code-block:: python

        from timeflux.helpers.background import Task
        from my.module import MyClass

        task = Task(MyClass(), 'my_method', my_arg=42).start()
        while not task.done:
            status = task.status()
        print(status)

"""

import sys
import time
import logging
import traceback
import zmq
from subprocess import Popen


[docs]class Runner: """Background base class. Provides common methods. .. warning:: Do not use directly! """ def __init__(self): self.logger = logging.getLogger(__name__) def _send(self, data): try: self._socket.send_pyobj(data, copy=False) except zmq.ZMQError as e: self.logger.error(e) def _receive(self, blocking=True): flag = 0 if blocking else zmq.NOBLOCK try: return self._socket.recv_pyobj(flag) except zmq.ZMQError: pass # No data
[docs]class Task(Runner): """Background task. Launch a 0MQ PAIR server, start a client and dispatch the task. Attributes: done (bool): Indicates if the task is complete. Args: instance (object): A picklable class instance. method (string): The method name to call from the instance. *args: Arbitrary variable arguments to be passed to the method. **kwargs: Arbitrary keyword arguments to be passed to the method. """ def __init__(self, instance, method, *args, **kwargs): super().__init__() context = zmq.Context() self._socket = context.socket(zmq.PAIR) self._port = self._socket.bind_to_random_port("tcp://127.0.0.1") self.done = False self.instance = instance self.method = method self.args = args self.kwargs = kwargs
[docs] def start(self): """Run the task.""" self._process = Popen(["python", "-m", __name__, str(self._port)]) self._send( { "instance": self.instance, "method": self.method, "args": self.args, "kwargs": self.kwargs, } ) return self
[docs] def stop(self): """Terminate the task.""" self._process.kill() self.done = True
[docs] def status(self): """Get the task status. Returns: `None` if the task is not complete or a dict containing the following keys. - ``success``: A boolean indicating if the task ran successfully. - ``instance``: The (possibly modified) instance. - ``result``: The result of the method call, if `success` is `True`. - ``exception``: The exception, if `success` is `False`. - ``traceback``: The traceback, if `success` is `False`. - ``time``: The time it took to run the task. """ response = self._receive(False) if response is not None: self.done = True return response
[docs]class Worker(Runner): """Background worker. Connects to the server and executes the task. .. warning:: Do not use directly! """ def __init__(self, port): super().__init__() self.logger = logging.getLogger(__name__) context = zmq.Context() self._socket = context.socket(zmq.PAIR) self._socket.connect(f"tcp://127.0.0.1:{port}")
[docs] def execute(self): """Get the task from the socket and run it.""" response = {} start = time.perf_counter() try: data = self._receive() result = getattr(data["instance"], data["method"])( *data["args"], **data["kwargs"] ) response["instance"] = data["instance"] response["result"] = result response["success"] = True except Exception as e: response["exception"] = e response["traceback"] = traceback.format_tb(e.__traceback__) response["success"] = False response["time"] = time.perf_counter() - start self._send(response)
if __name__ == "__main__": if len(sys.argv) == 1: sys.exit() port = sys.argv[1] Worker(port).execute()