import os import time import socket from fastapi import FastAPI, HTTPException from pydantic import BaseModel from .serial_bridge import SerialBridge, SerialConfig app = FastAPI(title="Pont bascule API", version="1.3") TRAME_ESCLAVE = b"\x01\x0D\x0A" TRAME_DSD = b"\x01\x10\x39\x39\x4D\x0D\x0A" def env(name: str, default: str) -> str: return os.getenv(name, default) def hex2b(s: str) -> bytes: s = s.strip().replace("0x", "").replace(",", " ") parts = [p for p in s.split() if p] return bytes(int(p, 16) for p in parts) cfg = SerialConfig( port=env("SERIAL_PORT", "/dev/ttyUSB0"), baudrate=int(env("SERIAL_BAUDRATE", "9600")), timeout_s=float(env("SERIAL_TIMEOUT_S", "1.0")), open_delay_s=float(env("SERIAL_OPEN_DELAY_S", "2.0")), # ✅ comme Tkinter post_write_delay_s=float(env("SERIAL_POST_WRITE_DELAY_S", "0.5")), # ✅ ) bridge = SerialBridge(cfg) class CustomReq(BaseModel): hex: str @app.get("/health") def health(): return { "ok": True, "mode": "serial", "busy": bridge.busy(), "hostname": socket.gethostname(), "timestamp": time.time(), "port": cfg.port, "baudrate": cfg.baudrate, } @app.get("/last") def last(): return bridge.last() @app.post("/send/esclave") def send_esclave(): res = bridge.send_and_read_once(TRAME_ESCLAVE) if res.get("busy"): raise HTTPException(status_code=409, detail=res["error"]) return res @app.post("/send/dsd") def send_dsd(): res = bridge.send_and_read_once(TRAME_DSD) if res.get("busy"): raise HTTPException(status_code=409, detail=res["error"]) return res @app.post("/send/custom") def send_custom(req: CustomReq): try: payload = hex2b(req.hex) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid hex: {e}") res = bridge.send_and_read_once(payload) if res.get("busy"): raise HTTPException(status_code=409, detail=res["error"]) return res