import threading from dataclasses import dataclass def hex2b(s: str) -> bytes: return bytes(int(x, 16) for x in s.split()) @dataclass class SerialConfig: port: str baudrate: int = 9600 class SerialBridge: def __init__(self, cfg: SerialConfig): self.cfg = cfg self._lock = threading.Lock() def busy(self): return self._lock.locked() def send_and_read_once(self, payload: bytes): if not self._lock.acquire(blocking=False): return {"busy": True, "error": "BUSY"} try: return {"ok": True, "request": payload.hex()} finally: self._lock.release()