import os import time import socket from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from .serial_bridge import SerialBridge, SerialConfig, hex2b from .mock_bridge import MockBridge TRAME_ESCLAVE = b"\x01\x0D\x0A" TRAME_DSD = b"\x01\x10\x39\x39\x4D\x0D\x0A" def env(name: str, default: str) -> str: return os.getenv(name, default) APP_MODE = env("APP_MODE", "mock").strip().lower() app = FastAPI(title="Pont bascule API", version="1.2") if APP_MODE == "serial": cfg = SerialConfig( port=env("SERIAL_PORT", "/dev/ttyUSB0"), baudrate=int(env("SERIAL_BAUDRATE", "9600")), ) bridge = SerialBridge(cfg) else: cfg = None bridge = MockBridge() class CustomReq(BaseModel): hex: str class EchoReq(BaseModel): msg: str @app.get("/health") def health(): return { "ok": True, "mode": APP_MODE, "busy": bridge.busy(), "hostname": socket.gethostname(), "timestamp": time.time(), } @app.get("/test/ping") def test_ping(): return {"pong": True} @app.post("/test/echo") def test_echo(req: EchoReq): return {"echo": req.msg} @app.post("/send/esclave") def send_esclave(): res = bridge.send_and_read_once(TRAME_ESCLAVE) if res.get("busy"): raise HTTPException(status_code=409, detail=res["error"]) return res