Add pont-bascule-api project

This commit is contained in:
2026-01-09 12:53:14 +00:00
parent c6d9d8cbf4
commit ee2594b9cf
4 changed files with 104 additions and 0 deletions

1
__init__.py Normal file
View File

@@ -0,0 +1 @@
# app package

59
main.py Normal file
View File

@@ -0,0 +1,59 @@
import os
import time
import socket
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from .serial_bridge import SerialBridge, SerialConfig, hex2b
from .mock_bridge import MockBridge
TRAME_ESCLAVE = b"\x01\x0D\x0A"
TRAME_DSD = b"\x01\x10\x39\x39\x4D\x0D\x0A"
def env(name: str, default: str) -> str:
return os.getenv(name, default)
APP_MODE = env("APP_MODE", "mock").strip().lower()
app = FastAPI(title="Pont bascule API", version="1.2")
if APP_MODE == "serial":
cfg = SerialConfig(
port=env("SERIAL_PORT", "/dev/ttyUSB0"),
baudrate=int(env("SERIAL_BAUDRATE", "9600")),
)
bridge = SerialBridge(cfg)
else:
cfg = None
bridge = MockBridge()
class CustomReq(BaseModel):
hex: str
class EchoReq(BaseModel):
msg: str
@app.get("/health")
def health():
return {
"ok": True,
"mode": APP_MODE,
"busy": bridge.busy(),
"hostname": socket.gethostname(),
"timestamp": time.time(),
}
@app.get("/test/ping")
def test_ping():
return {"pong": True}
@app.post("/test/echo")
def test_echo(req: EchoReq):
return {"echo": req.msg}
@app.post("/send/esclave")
def send_esclave():
res = bridge.send_and_read_once(TRAME_ESCLAVE)
if res.get("busy"):
raise HTTPException(status_code=409, detail=res["error"])
return res

18
mock_bridge.py Normal file
View File

@@ -0,0 +1,18 @@
import threading
import time
class MockBridge:
def __init__(self):
self._lock = threading.Lock()
def busy(self):
return self._lock.locked()
def send_and_read_once(self, payload: bytes):
if not self._lock.acquire(blocking=False):
return {"busy": True, "error": "BUSY"}
try:
time.sleep(0.1)
return {"ok": True, "response": "OK"}
finally:
self._lock.release()

26
serial_bridge.py Normal file
View File

@@ -0,0 +1,26 @@
import threading
from dataclasses import dataclass
def hex2b(s: str) -> bytes:
return bytes(int(x, 16) for x in s.split())
@dataclass
class SerialConfig:
port: str
baudrate: int = 9600
class SerialBridge:
def __init__(self, cfg: SerialConfig):
self.cfg = cfg
self._lock = threading.Lock()
def busy(self):
return self._lock.locked()
def send_and_read_once(self, payload: bytes):
if not self._lock.acquire(blocking=False):
return {"busy": True, "error": "BUSY"}
try:
return {"ok": True, "request": payload.hex()}
finally:
self._lock.release()