import os import time import socket from fastapi import FastAPI, HTTPException from pydantic import BaseModel from dotenv import load_dotenv from .serial_bridge import SerialBridge, SerialConfig from .mock_bridge import MockBridge app = FastAPI(title="Pont bascule API", version="1.3") TRAME_ESCLAVE = b"\x01\x0D\x0A" TRAME_DSD = b"\x01\x10\x39\x39\x4D\x0D\x0A" def env(name: str, default: str) -> str: return os.getenv(name, default) load_dotenv() def hex2b(s: str) -> bytes: s = s.strip().replace("0x", "").replace(",", " ") parts = [p for p in s.split() if p] return bytes(int(p, 16) for p in parts) def serial_port_connected(port: str): try: from serial.tools import list_ports ports = {p.device for p in list_ports.comports()} return port in ports, None except Exception as e: return None, str(e) APP_MODE = env("APP_MODE", "serial").strip().lower() if APP_MODE not in ("serial", "mock"): APP_MODE = "serial" if APP_MODE == "mock": cfg = None bridge = MockBridge() else: cfg = SerialConfig( port=env("SERIAL_PORT", "/dev/ttyUSB0"), baudrate=int(env("SERIAL_BAUDRATE", "9600")), timeout_s=float(env("SERIAL_TIMEOUT_S", "1.0")), open_delay_s=float(env("SERIAL_OPEN_DELAY_S", "2.0")), # ✅ comme Tkinter post_write_delay_s=float(env("SERIAL_POST_WRITE_DELAY_S", "0.5")), # ✅ ) bridge = SerialBridge(cfg) class CustomReq(BaseModel): hex: str @app.get("/health") def health(): port_connected = None port_error = None port_status = None if cfg and cfg.port: port_connected, port_error = serial_port_connected(cfg.port) if port_connected is False: port_status = "Port COM pas connecte" if cfg is None or not cfg.port: port_status = "Port COM pas configure" return { "ok": False if port_connected is False or (cfg is None or not cfg.port) else True, "mode": APP_MODE, "busy": bridge.busy(), "hostname": socket.gethostname(), "timestamp": time.time(), "port": port_status if port_status else (cfg.port if cfg else None), "baudrate": cfg.baudrate if cfg else None, "port_connected": port_connected, "port_error": port_error, } @app.get("/last") def last(): return bridge.last() @app.post("/send/esclave") def send_esclave(): res = bridge.send_and_read_once(TRAME_ESCLAVE) if res.get("busy"): raise HTTPException(status_code=409, detail=res["error"]) return res @app.post("/send/dsd") def send_dsd(): res = bridge.send_and_read_once(TRAME_DSD) if res.get("busy"): raise HTTPException(status_code=409, detail=res["error"]) return res @app.post("/send/custom") def send_custom(req: CustomReq): try: payload = hex2b(req.hex) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid hex: {e}") res = bridge.send_and_read_once(payload) if res.get("busy"): raise HTTPException(status_code=409, detail=res["error"]) return res