Source code for timeflux.core.sync

"""timeflux.core.sync: time synchronisation based on NTP"""

import socket
import logging
import re
import time
import numpy as np
from scipy import stats


[docs]class Server: _buffer_size = 128 def __init__(self, host="", port=12300, now=time.perf_counter): self.logger = logging.getLogger(__name__) self._host = host self._port = port self._sock = None self.now = now
[docs] def start(self): self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.settimeout(None) # blocking mode self._sock.bind((self._host, self._port)) self.logger.info("Sync server listening on %s:%d", self._host, self._port) while True: try: data, address = self._sock.recvfrom(self._buffer_size) t1 = self.now() t2 = self.now() data += b",%.6f,%.9f" % (t1, t2) l = self._sock.sendto(data, address) except: pass
[docs] def stop(self): self._sock.close()
[docs]class Client: _buffer_size = 128 def __init__( self, host="localhost", port=12300, rounds=600, timeout=1, now=time.perf_counter ): self.logger = logging.getLogger(__name__) self._host = host self._port = port self._rounds = rounds self._timeout = timeout self.now = now self._sock = None self.offset_local = time.time() - now() self.offset_remote = None
[docs] def sync(self): self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.settimeout(self._timeout) t = [[], [], [], []] reg = re.compile(b"(.*),(.*),(.*)") self.logger.info("Syncing") i = 0 while i < self._rounds: try: t0 = b"%.9f" % self.now() self._sock.sendto(t0, (self._host, self._port)) data, address = self._sock.recvfrom(self._buffer_size) t3 = self.now() r = reg.findall(data)[0] if r[0] == t0: # make sure we have a matching UDP packet r = np.float64(r) for j in range(3): t[j].append(r[j]) t[3].append(np.float64(t3)) i += 1 except socket.timeout: continue progress = "Progress: %.2f%%" % (i * 100 / self._rounds) print(progress, end="\r", flush=True) self._sock.close() t = np.array(t) offset = ((t[1] - t[0]) + (t[2] - t[3])) / 2 delay = (t[3] - t[0]) - (t[2] - t[1]) _, offset, _, _ = stats.theilslopes(offset, delay) self.offset_remote = offset self.logger.info("Offset: %f", offset) return offset
[docs] def stop(self): self._sock.close()