# import standard modules
from contextlib import asynccontextmanager
-from fastapi import FastAPI, Request, HTTPException, status
+from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse, JSONResponse, Response
import logging
allow_credentials=True,
)
+# ------------------------------------------------------------------------------
+# Security Headers middleware (basic hardening)
+# ------------------------------------------------------------------------------
+class SecurityHeadersMiddleware(BaseHTTPMiddleware):
+ async def dispatch(self, request: Request, call_next):
+ response: Response = await call_next(request)
+
+ # Hardening base
+ response.headers["X-Content-Type-Options"] = "nosniff"
+ response.headers["X-Frame-Options"] = "DENY"
+ response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
+ response.headers["Permissions-Policy"] = (
+ "geolocation=(), microphone=(), camera=(), payment=(), usb=(), "
+ "accelerometer=(), autoplay=(), clipboard-read=(), clipboard-write=()"
+ )
+
+ # HSTS (richiede HTTPS)
+ response.headers["Strict-Transport-Security"] = (
+ "max-age=31536000; includeSubDomains; preload"
+ )
+
+ # COOP / CORP isolano la pagina (protezione anti-XSS/XFO)
+ response.headers["Cross-Origin-Opener-Policy"] = "same-origin"
+ response.headers["Cross-Origin-Resource-Policy"] = "same-origin"
+
+ # CSP rigida per produzione
+ response.headers["Content-Security-Policy"] = (
+ "default-src 'self'; "
+ "base-uri 'self'; "
+ "object-src 'none'; "
+ "frame-ancestors 'none'; "
+ "img-src 'self' data:; "
+ "font-src 'self' data:; "
+ "style-src 'self'; "
+ "script-src 'self'; "
+ "connect-src 'self'; "
+ "manifest-src 'self'; "
+ "worker-src 'self'"
+ )
+ return response
+
+# GRGR -> to be enabled in production
+#app.add_middleware(SecurityHeadersMiddleware)
+
+# ------------------------------------------------------------------------------
+# Public paths
+# ------------------------------------------------------------------------------
+PUBLIC_PATHS = (
+ "/login",
+ "/api/login",
+ "/logout",
+ "/api/logout",
+ "/about",
+ "/api/health",
+ "/docs",
+ "/openapi.json",
+)
+
+STATIC_PREFIXES = (
+ "/css",
+ "/js",
+ "/static",
+)
+
+STATIC_SUFFIXES = (".js", ".css", ".png", ".jpg", ".jpeg", ".ico", ".svg", ".map")
+
# ------------------------------------------------------------------------------
# Session / Auth middleware
# ------------------------------------------------------------------------------
@app.middleware("http")
async def session_middleware(request: Request, call_next):
path = request.url.path
- token = request.cookies.get("session")
+ method = request.method.upper()
- # Excludes the login/logout methods
- if path.startswith("/login") or path.startswith("/api/login") or \
- path.startswith("/logout") or path.startswith("/api/logout"):
+ # 1) Always let CORS preflight through (browsers send OPTIONS before real requests)
+ if method == "OPTIONS":
return await call_next(request)
- # Excludes the about & health methods
- if path.startswith("/about") or path.startswith("/api/health"):
+ # 2) Skip public endpoints (login/logout/about/health/docs/openapi)
+ if path.startswith(PUBLIC_PATHS):
return await call_next(request)
- # Excludes static files
- if (
- path.startswith("/css") or
- path.startswith("/js") or
- path.startswith("/static") or
- path.endswith((".js", ".css", ".png", ".jpg", ".jpeg", ".ico", ".svg", ".map"))
- ):
+ # 3) Skip static assets
+ if path.startswith(STATIC_PREFIXES) or path.endswith(STATIC_SUFFIXES):
return await call_next(request)
- # Protected APIs
+ # 4) Read session token from cookie (adjust name if different)
+ token = request.cookies.get("session")
+
+ # 5) Check authentication (your function should validate the cookie/session)
+ authenticated = is_logged_in(request)
+
+ # 6) Protect JSON APIs
if path.startswith("/api"):
- if not is_logged_in(request):
- logger.error("API access denied - not logged in")
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={
- "error": "Unauthorized"
+ if not authenticated:
+ logger.warning("API access denied - not logged in: %s %s", method, path)
+ return JSONResponse(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ headers={"WWW-Authenticate": "Session"},
+ content={
+ "detail": {
+ "code": "UNAUTHORIZED",
+ "status": "failure",
+ "message": "Unauthorized",
+ "path": path,
+ }
},
)
+
+ # Optionally attach user info to request.state for downstream handlers
+ # request.state.user = <current_user>
+
+ # Call the downstream route/handler
response = await call_next(request)
- # Sliding expiration
- apply_session(response, username=None, token=token)
+
+ # Apply sliding expiration only on successful responses (2xx) and if a token exists
+ if token and 200 <= response.status_code < 300:
+ apply_session(response, username=None, token=token)
return response
- # Protected HTML pages
- if not is_logged_in(request):
- return RedirectResponse("/login")
+ # 7) Protect HTML pages (non-API): redirect unauthenticated users to login
+ if not authenticated:
+ # 303 See Other avoids reusing POST/other methods
+ return RedirectResponse("/login", status_code=status.HTTP_303_SEE_OTHER)
+ # 8) Authenticated HTML request ? proceed
response = await call_next(request)
- # Sliding expiration
- apply_session(response, username=None, token=token)
+
+ # 9) Apply sliding expiration only on successful responses (2xx)
+ if token and 200 <= response.status_code < 300:
+ apply_session(response, username=None, token=token)
+
return response
# ------------------------------------------------------------------------------