Source code for pyTelops.connection

"""Connection diagnostics and download tuning for Telops cameras."""

from __future__ import annotations

import logging
from dataclasses import dataclass, field

logger = logging.getLogger(__name__)


@dataclass
class TrialResult:
    """One measured (packet_size, bitrate, socket_buffer) configuration."""

    packet_size: int
    bitrate_mbps: float
    socket_buffer: int
    throughput_mbps: float
    pct_incomplete: float


@dataclass
class ProbeInfo:
    """Read-only facts gathered before/without running downloads."""

    max_packet_size: int | None = None
    link_speed_mbps: int | None = None
    adapter_name: str | None = None
    is_usb_adapter: bool | None = None
    so_rcvbuf_max: int | None = None


[docs] @dataclass class ConnectionReport: """Result of :func:`tune_connection`.""" recommended: dict sweep: list[TrialResult] = field(default_factory=list) probe: ProbeInfo | None = None warnings: list[str] = field(default_factory=list)
[docs] @classmethod def from_trials(cls, trials, probe, warnings): ranked = _rank_trials(trials) best = ranked[0] if ranked else None recommended = ( { "packet_size": best.packet_size, "bitrate_mbps": best.bitrate_mbps, "socket_buffer": best.socket_buffer, } if best is not None else {} ) return cls(recommended=recommended, sweep=ranked, probe=probe, warnings=list(warnings))
[docs] def apply(self, cam): """Store the recommended kwargs on *cam* for later buffer_download calls. Does not change any system or camera setting; just records the recommendation as ``cam.recommended_download_kwargs``. """ cam.recommended_download_kwargs = dict(self.recommended) return self.recommended
def _rank_trials(trials): """Rank stability-first: zero-drop beats lossy; ties broken by throughput.""" return sorted( trials, key=lambda t: (t.pct_incomplete > 0, t.pct_incomplete, -t.throughput_mbps), )
[docs] def tune_connection( cam, probe_only=False, candidate_packet_sizes=None, candidate_bitrates=None, socket_buffers=None, test_frames=300, read_nic_info=False, ): """Probe the link and sweep download settings to find a stable+fast config. Phase 1 probes the path (read-only). With *probe_only*, returns after the probe. Phase 2 runs real downloads across candidate settings and ranks them stability-first. No system or camera settings are persisted or mutated. Parameters ---------- cam : Camera A connected Telops camera with frames already in its memory buffer (record a sequence before sweeping). probe_only : bool, optional Stop after the read-only probe; run no downloads. Default ``False``. candidate_packet_sizes, candidate_bitrates, socket_buffers : list or None Values to sweep. ``None`` picks sensible defaults (packet sizes capped at the probed path maximum). test_frames : int, optional Frames per trial download. Default 300. read_nic_info : bool, optional Read (never change) host NIC facts to annotate the report. Default ``False``. Requires ``psutil``. Returns ------- ConnectionReport """ warnings = list(_preflight_warnings(cam)) probe = ProbeInfo() try: probe.max_packet_size = cam._probe_max_packet_size(16260) except Exception: # noqa: BLE001 - probe is best-effort probe.max_packet_size = None if read_nic_info: _read_nic_info(cam, probe, warnings) if probe_only: return ConnectionReport(recommended={}, sweep=[], probe=probe, warnings=warnings) if candidate_packet_sizes is None: ceiling = probe.max_packet_size or 1500 candidate_packet_sizes = sorted({s for s in (1500, 4000, 8000, 16260) if s <= ceiling}) if candidate_bitrates is None: candidate_bitrates = [1000, 700, 500, 300] if socket_buffers is None: socket_buffers = [0] if any(sb for sb in socket_buffers): warnings.append( "socket_buffers values are recorded but not yet applied " "(SO_RCVBUF tuning is not wired into buffer_download); " "sweeping multiple values produces trials that differ only by label." ) trials = [] for ps in candidate_packet_sizes: for br in candidate_bitrates: for sb in socket_buffers: try: cam.buffer_download( n_frames=test_frames, packet_size=ps, bitrate_mbps=br, convert=False, strip_header=False, verbose=False, max_dropped_frames=test_frames, retries=0, ) except Exception as exc: # noqa: BLE001 warnings.append(f"trial ps={ps} br={br} failed: {exc}") continue s = cam.last_download_stats if s is None: continue pct = 100.0 * s.n_incomplete / max(1, test_frames) trials.append( TrialResult( packet_size=ps, bitrate_mbps=br, socket_buffer=sb, throughput_mbps=s.throughput_mbps, pct_incomplete=pct, ) ) return ConnectionReport.from_trials(trials, probe=probe, warnings=warnings)
_VPN_NAME_KEYS = ("tailscale", "zerotier", "wireguard", "vpn", "wg") _USB_ADAPTER_KEYS = ("usb", "ax88", "asix", "rtl8153", "rtl8156", "hub") def _link_local_warning(interfaces): """Return a warning when the camera's link-local route is ambiguous. Parameters ---------- interfaces : list of (str, str) ``(name, ipv4)`` for each host interface. Notes ----- A direct camera link is itself link-local (169.254.x.x), so the presence of a single link-local adapter is normal and is NOT flagged. A warning is returned only when the route is genuinely ambiguous: more than one link-local adapter is up, or a VPN-like adapter is present alongside one. Returns ``None`` otherwise. """ link_local = [(n, ip) for n, ip in interfaces if ip.startswith("169.254.")] if not link_local: return None vpn = [n for n, _ in interfaces if any(k in n.lower() for k in _VPN_NAME_KEYS)] if len(link_local) > 1 or vpn: names = ", ".join(sorted({n for n, _ in link_local} | set(vpn))) return ( f"Multiple link-local / VPN adapters are up ({names}); a VPN adapter " f"can take over the camera route. If discovery or downloads misbehave, " f"stop it or pass an explicit local_ip for the camera NIC." ) return None def _is_usb_adapter(name, description=""): """Return ``True`` if a NIC looks like a USB-to-Ethernet adapter. Checks the connection *name* and the adapter *description* (the latter is where Windows records "ASIX USB to Gigabit Ethernet"; the connection name is just "Ethernet 7"). """ text = f"{name} {description or ''}".lower() return any(k in text for k in _USB_ADAPTER_KEYS) def _host_interfaces(): """Return ``[(name, ipv4)]`` for host interfaces (best-effort, psutil).""" try: import socket import psutil out = [] for name, addrs in psutil.net_if_addrs().items(): for a in addrs: if a.family == socket.AF_INET: out.append((name, a.address)) return out except Exception: # noqa: BLE001 return [] def _adapter_descriptions(): """Return ``{connection_name: interface_description}`` on Windows. psutil keys interfaces by connection name ("Ethernet 7"), not the adapter description ("ASIX USB to Gigabit Ethernet"), so USB detection needs this extra lookup. Best-effort: returns ``{}`` on non-Windows or any failure. """ import sys if not sys.platform.startswith("win"): return {} try: import json import subprocess proc = subprocess.run( [ "powershell", "-NoProfile", "-Command", "Get-NetAdapter | Select-Object Name,InterfaceDescription | ConvertTo-Json", ], capture_output=True, text=True, timeout=5, ) data = json.loads(proc.stdout or "[]") if isinstance(data, dict): data = [data] return {d.get("Name", ""): d.get("InterfaceDescription", "") for d in data} except Exception: # noqa: BLE001 return {} def _preflight_warnings(cam): """Yield human-readable warnings for common environment problems.""" warning = _link_local_warning(_host_interfaces()) if warning: yield warning def _read_nic_info(cam, probe, warnings): """Read-only NIC facts. Never changes system state. Best-effort. Targets the NIC carrying the camera's ``local_ip`` (so the USB/speed facts describe the camera link, not an unrelated WiFi adapter), and detects USB adapters by their description, which is where Windows records the vendor. """ try: import socket import psutil except Exception: # noqa: BLE001 warnings.append("read_nic_info=True but psutil is not installed; skipping NIC facts.") return try: local_ip = getattr(cam, "_local_ip", "") or "" stats = psutil.net_if_stats() addrs = psutil.net_if_addrs() descriptions = _adapter_descriptions() # Prefer the NIC carrying the camera's local_ip; else the first up NIC # that reports a link speed. target = None for name, alist in addrs.items(): if any(a.family == socket.AF_INET and a.address == local_ip for a in alist): target = name break if target is None: for name, st in stats.items(): if st.isup and getattr(st, "speed", 0): target = name break if target is None: return st = stats.get(target) desc = descriptions.get(target, "") probe.adapter_name = target probe.link_speed_mbps = getattr(st, "speed", None) if st else None probe.is_usb_adapter = _is_usb_adapter(target, desc) if probe.is_usb_adapter: label = f"NIC '{target}'" + (f" ({desc})" if desc else "") warnings.append( f"{label} looks like a USB adapter; issue #8 documents ~half " f"throughput and rare unrecoverable drops on these." ) except Exception: # noqa: BLE001 pass