Files
pont-bascule-connector/app/main.py

115 lines
3.0 KiB
Python

import os
import time
import socket
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from .serial_bridge import SerialBridge, SerialConfig
from .mock_bridge import MockBridge
app = FastAPI(title="Pont bascule API", version="1.3")
TRAME_ESCLAVE = b"\x01\x0D\x0A"
TRAME_DSD = b"\x01\x10\x39\x39\x4D\x0D\x0A"
def env(name: str, default: str) -> str:
return os.getenv(name, default)
load_dotenv()
def hex2b(s: str) -> bytes:
s = s.strip().replace("0x", "").replace(",", " ")
parts = [p for p in s.split() if p]
return bytes(int(p, 16) for p in parts)
def serial_port_connected(port: str):
try:
from serial.tools import list_ports
ports = {p.device for p in list_ports.comports()}
return port in ports, None
except Exception as e:
return None, str(e)
APP_MODE = env("APP_MODE", "serial").strip().lower()
if APP_MODE not in ("serial", "mock"):
APP_MODE = "serial"
if APP_MODE == "mock":
cfg = None
bridge = MockBridge()
else:
cfg = SerialConfig(
port=env("SERIAL_PORT", "/dev/ttyUSB0"),
baudrate=int(env("SERIAL_BAUDRATE", "9600")),
timeout_s=float(env("SERIAL_TIMEOUT_S", "1.0")),
open_delay_s=float(env("SERIAL_OPEN_DELAY_S", "2.0")), # ✅ comme Tkinter
post_write_delay_s=float(env("SERIAL_POST_WRITE_DELAY_S", "0.5")), # ✅
)
bridge = SerialBridge(cfg)
class CustomReq(BaseModel):
hex: str
@app.get("/health")
def health():
port_connected = None
port_error = None
port_status = None
if cfg and cfg.port:
port_connected, port_error = serial_port_connected(cfg.port)
if port_connected is False:
port_status = "Port COM pas connecte"
if cfg is None or not cfg.port:
port_status = "Port COM pas configure"
return {
"ok": False if port_connected is False or (cfg is None or not cfg.port) else True,
"mode": APP_MODE,
"busy": bridge.busy(),
"hostname": socket.gethostname(),
"timestamp": time.time(),
"port": port_status if port_status else (cfg.port if cfg else None),
"baudrate": cfg.baudrate if cfg else None,
"port_connected": port_connected,
"port_error": port_error,
}
@app.get("/last")
def last():
return bridge.last()
@app.post("/send/esclave")
def send_esclave():
res = bridge.send_and_read_once(TRAME_ESCLAVE)
if res.get("busy"):
raise HTTPException(status_code=409, detail=res["error"])
return res
@app.post("/send/dsd")
def send_dsd():
res = bridge.send_and_read_once(TRAME_DSD)
if res.get("busy"):
raise HTTPException(status_code=409, detail=res["error"])
return res
@app.post("/send/custom")
def send_custom(req: CustomReq):
try:
payload = hex2b(req.hex)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid hex: {e}")
res = bridge.send_and_read_once(payload)
if res.get("busy"):
raise HTTPException(status_code=409, detail=res["error"])
return res