import time import threading from dataclasses import dataclass from typing import Dict, Any def b2hex(b: bytes) -> str: return " ".join(f"{x:02X}" for x in b) @dataclass class SerialConfig: port: str baudrate: int = 9600 timeout_s: float = 1.0 open_delay_s: float = 2.0 # ✅ comme ton script post_write_delay_s: float = 0.5 # ✅ comme ton script class SerialBridge: """ Adapté au comportement du script Tkinter: - open port - wait 2s - write - wait 0.5s - read once (in_waiting) - ascii decode ignore + lock non bloquant => 1 appel à la fois """ def __init__(self, cfg: SerialConfig): self.cfg = cfg self._lock = threading.Lock() self._last: Dict[str, Any] = { "ts": None, "request_hex": None, "response_hex": None, "response_ascii": None, "error": None, "duration_ms": None, } def busy(self) -> bool: return self._lock.locked() def last(self) -> Dict[str, Any]: return { **self._last, "mode": "serial", "port": self.cfg.port, "baudrate": self.cfg.baudrate, "busy": self.busy(), } def send_and_read_once(self, payload: bytes) -> Dict[str, Any]: if not self._lock.acquire(blocking=False): return { "ok": False, "busy": True, "mode": "serial", "port": self.cfg.port, "baudrate": self.cfg.baudrate, "request_hex": b2hex(payload), "response_hex": None, "response_ascii": None, "duration_ms": 0, "error": "BUSY: une requête série est déjà en cours", } t0 = time.time() try: import serial # pyserial with serial.Serial(self.cfg.port, self.cfg.baudrate, timeout=self.cfg.timeout_s) as ser: time.sleep(self.cfg.open_delay_s) # ✅ 2s ser.write(payload) time.sleep(self.cfg.post_write_delay_s) # ✅ 0.5s n = getattr(ser, "in_waiting", 0) if n and n > 0: data = ser.read(n) else: data = b"" texte = data.decode("ascii", errors="ignore").strip() if data else None result = { "ok": bool(data), "busy": False, "mode": "serial", "port": self.cfg.port, "baudrate": self.cfg.baudrate, "request_hex": b2hex(payload), "response_hex": b2hex(data) if data else None, "response_ascii": texte, "duration_ms": int((time.time() - t0) * 1000), "error": None if data else "Pas de réponse reçue.", } self._last.update( { "ts": time.time(), "request_hex": result["request_hex"], "response_hex": result["response_hex"], "response_ascii": result["response_ascii"], "error": result["error"], "duration_ms": result["duration_ms"], } ) return result except Exception as e: return { "ok": False, "busy": False, "mode": "serial", "port": self.cfg.port, "baudrate": self.cfg.baudrate, "request_hex": b2hex(payload), "response_hex": None, "response_ascii": None, "duration_ms": int((time.time() - t0) * 1000), "error": str(e), } finally: self._lock.release()