From ee2594b9cf5e609a3d18ca34c2c101390981ad61 Mon Sep 17 00:00:00 2001 From: THOLOT DECHENE Matthieu Date: Fri, 9 Jan 2026 12:53:14 +0000 Subject: [PATCH] Add pont-bascule-api project --- __init__.py | 1 + main.py | 59 ++++++++++++++++++++++++++++++++++++++++++++++++ mock_bridge.py | 18 +++++++++++++++ serial_bridge.py | 26 +++++++++++++++++++++ 4 files changed, 104 insertions(+) create mode 100644 __init__.py create mode 100644 main.py create mode 100644 mock_bridge.py create mode 100644 serial_bridge.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..b9d56a4 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +# app package diff --git a/main.py b/main.py new file mode 100644 index 0000000..9c05c5b --- /dev/null +++ b/main.py @@ -0,0 +1,59 @@ +import os +import time +import socket +from fastapi import FastAPI, HTTPException, Query +from pydantic import BaseModel + +from .serial_bridge import SerialBridge, SerialConfig, hex2b +from .mock_bridge import MockBridge + +TRAME_ESCLAVE = b"\x01\x0D\x0A" +TRAME_DSD = b"\x01\x10\x39\x39\x4D\x0D\x0A" + +def env(name: str, default: str) -> str: + return os.getenv(name, default) + +APP_MODE = env("APP_MODE", "mock").strip().lower() + +app = FastAPI(title="Pont bascule API", version="1.2") + +if APP_MODE == "serial": + cfg = SerialConfig( + port=env("SERIAL_PORT", "/dev/ttyUSB0"), + baudrate=int(env("SERIAL_BAUDRATE", "9600")), + ) + bridge = SerialBridge(cfg) +else: + cfg = None + bridge = MockBridge() + +class CustomReq(BaseModel): + hex: str + +class EchoReq(BaseModel): + msg: str + +@app.get("/health") +def health(): + return { + "ok": True, + "mode": APP_MODE, + "busy": bridge.busy(), + "hostname": socket.gethostname(), + "timestamp": time.time(), + } + +@app.get("/test/ping") +def test_ping(): + return {"pong": True} + +@app.post("/test/echo") +def test_echo(req: EchoReq): + return {"echo": req.msg} + +@app.post("/send/esclave") +def send_esclave(): + res = bridge.send_and_read_once(TRAME_ESCLAVE) + if res.get("busy"): + raise HTTPException(status_code=409, detail=res["error"]) + return res diff --git a/mock_bridge.py b/mock_bridge.py new file mode 100644 index 0000000..79ba3cc --- /dev/null +++ b/mock_bridge.py @@ -0,0 +1,18 @@ +import threading +import time + +class MockBridge: + def __init__(self): + self._lock = threading.Lock() + + def busy(self): + return self._lock.locked() + + def send_and_read_once(self, payload: bytes): + if not self._lock.acquire(blocking=False): + return {"busy": True, "error": "BUSY"} + try: + time.sleep(0.1) + return {"ok": True, "response": "OK"} + finally: + self._lock.release() diff --git a/serial_bridge.py b/serial_bridge.py new file mode 100644 index 0000000..7cc1715 --- /dev/null +++ b/serial_bridge.py @@ -0,0 +1,26 @@ +import threading +from dataclasses import dataclass + +def hex2b(s: str) -> bytes: + return bytes(int(x, 16) for x in s.split()) + +@dataclass +class SerialConfig: + port: str + baudrate: int = 9600 + +class SerialBridge: + def __init__(self, cfg: SerialConfig): + self.cfg = cfg + self._lock = threading.Lock() + + def busy(self): + return self._lock.locked() + + def send_and_read_once(self, payload: bytes): + if not self._lock.acquire(blocking=False): + return {"busy": True, "error": "BUSY"} + try: + return {"ok": True, "request": payload.hex()} + finally: + self._lock.release()