from fastapi import Request, Response, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse
-from itsdangerous import TimestampSigner
import os
import ipaddress
-import time
# Import local modules
+from backend.security import is_logged_in, require_login
from backend.db.hosts import (
get_hosts,
get_host,
update_host,
delete_host
)
-from backend.db.users import (
- verify_login
-)
+from backend.routes.health import router as health_router
+from backend.routes.login import router as login_router
# Import config variables
-from backend.config import SECRET_KEY
-from backend.config import HTTP_PORT
-from backend.config import LOGIN_MAX_ATTEMPTS
-from backend.config import LOGIN_WINDOW_SECONDS
-
-# IP → lista timestamp tentativi
-login_attempts = {}
+from backend.config import FRONTEND_DIR, HTTP_PORT
# Start FastAPI app
app = FastAPI()
+app.include_router(health_router)
+app.include_router(login_router)
# Allow frontend JS to call the API
app.add_middleware(
allow_headers=["Content-Type"],
)
-# Token signer for session management
-signer = TimestampSigner(SECRET_KEY)
-
-def require_login(request: Request):
- token = request.cookies.get("session")
- if not token:
- raise HTTPException(status_code=401, detail="Not authenticated")
- try:
- signer.unsign(token, max_age=86400)
- except:
- raise HTTPException(status_code=401, detail="Invalid session")
-
-def check_rate_limit(ip: str):
- now = time.time()
-
- attempts = login_attempts.get(ip, [])
- # tieni solo tentativi negli ultimi LOGIN_WINDOW_SECONDS secondi
- attempts = [t for t in attempts if now - t < LOGIN_WINDOW_SECONDS]
-
- if len(attempts) >= LOGIN_MAX_ATTEMPTS:
- raise HTTPException(status_code=429, detail="Too many login attempts")
-
- # registra nuovo tentativo
- attempts.append(now)
- login_attempts[ip] = attempts
-
# ---------------------------------------------------------
# FRONTEND PATHS (absolute paths inside Docker)
# ---------------------------------------------------------
-FRONTEND_DIR = "/app/frontend"
+# Protect html pages
+def html_protected(request: Request, filename: str):
+ if not is_logged_in(request):
+ return RedirectResponse("/login")
+ return FileResponse(os.path.join(FRONTEND_DIR, filename))
# Homepage
@app.get("/")
def home(request: Request):
- token = request.cookies.get("session")
- if not token:
- return RedirectResponse("/login")
- try:
- signer.unsign(token, max_age=86400) # 24h
- except:
- return RedirectResponse("/login")
- return FileResponse(os.path.join(FRONTEND_DIR, "hosts.html"))
-
-# Login
-@app.get("/login")
-def login_page():
- return FileResponse(os.path.join(FRONTEND_DIR, "login.html"))
+ return html_protected(request, "hosts.html")
-# Hosts management
+# Hosts page
@app.get("/hosts")
def hosts(request: Request):
- token = request.cookies.get("session")
- if not token:
- return RedirectResponse("/login")
- try:
- signer.unsign(token, max_age=86400)
- except:
- return RedirectResponse("/login")
- return FileResponse(os.path.join(FRONTEND_DIR, "hosts.html"))
+ return html_protected(request, "hosts.html")
# Serve hosts.css
@app.get("/css/hosts.css")
def css_hosts():
return FileResponse(os.path.join(FRONTEND_DIR, "css/hosts.css"))
-# Serve login.css
-@app.get("/css/login.css")
-def css_login():
- return FileResponse(os.path.join(FRONTEND_DIR, "css/login.css"))
-
# Serve app.js
@app.get("/app.js")
def js():
# API ENDPOINTS
# ---------------------------------------------------------
-@app.post("/api/login")
-def api_login(request: Request, data: dict, response: Response):
- ip = request.client.host
- check_rate_limit(ip)
-
- user = data.get("username")
- pwd = data.get("password")
- if (verify_login(user, pwd)):
- #if user == "admin" and pwd == "admin":
- # reset tentativi su IP
- login_attempts.pop(ip, None)
-
- token = signer.sign(user).decode()
- response.set_cookie(
- "session",
- token,
- httponly=True,
- max_age=86400,
- path="/",
- #secure=True, # solo via HTTPS
- samesite="Strict" # riduce CSRF
- )
- return {"status": "ok"}
- return {"error": "Invalid credentials"}
-
-@app.post("/api/logout")
-def api_logout(response: Response):
- response.delete_cookie("session")
- return {"status": "logged_out"}
-
@app.get("/api/hosts")
def api_get_hosts(request: Request):
require_login(request)
--- /dev/null
+# backend/routes/login.py
+
+# import standard modules
+from fastapi import APIRouter, Request, Response
+from fastapi.responses import FileResponse, RedirectResponse
+import os
+import time
+# Import local modules
+from backend.security import is_logged_in, signer
+from backend.db.users import verify_login
+# Import config variables
+from backend.config import FRONTEND_DIR, LOGIN_MAX_ATTEMPTS, LOGIN_WINDOW_SECONDS
+
+router = APIRouter()
+
+# IP -> lista timestamp tentativi
+login_attempts = {}
+
+def check_rate_limit(ip: str):
+ now = time.time()
+ attempts = login_attempts.get(ip, [])
+ # tieni solo tentativi negli ultimi LOGIN_WINDOW_SECONDS secondi
+ attempts = [t for t in attempts if now - t < LOGIN_WINDOW_SECONDS]
+
+ if len(attempts) >= LOGIN_MAX_ATTEMPTS:
+ raise HTTPException(status_code=429, detail="Too many login attempts")
+
+ # registra nuovo tentativo
+ attempts.append(now)
+ login_attempts[ip] = attempts
+
+# ---------------------------------------------------------
+# FRONTEND PATHS (absolute paths inside Docker)
+# ---------------------------------------------------------
+
+# Login page
+@router.get("/login")
+def login_page(request: Request):
+ if is_logged_in(request):
+ return RedirectResponse("/")
+ return FileResponse(os.path.join(FRONTEND_DIR, "login.html"))
+
+# Serve login.css
+@router.get("/css/login.css")
+def css_login():
+ return FileResponse(os.path.join(FRONTEND_DIR, "css/login.css"))
+
+# ---------------------------------------------------------
+# API ENDPOINTS
+# ---------------------------------------------------------
+
+@router.post("/api/login")
+def api_login(request: Request, data: dict, response: Response):
+ ip = request.client.host
+ check_rate_limit(ip)
+
+ user = data.get("username")
+ pwd = data.get("password")
+
+ if verify_login(user, pwd):
+ # reset tentativi su IP
+ login_attempts.pop(ip, None)
+
+ token = signer.sign(user).decode()
+ response.set_cookie(
+ "session",
+ token,
+ httponly=True,
+ max_age=86400,
+ path="/",
+ #secure=True, # solo via HTTPS
+ samesite="Strict"
+ )
+ return {"status": "ok"}
+
+ return {"error": "Invalid credentials"}
+
+@router.post("/api/logout")
+def api_logout(response: Response):
+ response.delete_cookie("session")
+ return {"status": "logged_out"}